Поддержка SSE и streaming responses

- Proxy: chunked streaming с Flush() для real-time данных
- PHP FastCGI: потоковая обработка ответов через streamFastCGIResponse()
- Удалена буферизация - данные отправляются сразу"
This commit is contained in:
2025-11-26 22:30:54 +07:00
parent 2b040ed51c
commit a6007a8906
2 changed files with 122 additions and 62 deletions

View File

@@ -369,32 +369,32 @@ func PHPHandler(w http.ResponseWriter, r *http.Request, host string, originalURI
packet = createFCGIPacket(FCGI_STDIN, requestID, []byte{})
conn.Write(packet)
// Читаем ответ
response, err := readFastCGIResponse(conn, requestID)
// Читаем и стримим ответ (с поддержкой SSE и chunked transfer)
err = streamFastCGIResponse(conn, requestID, w)
if err != nil {
tools.Logs_file(1, "PHP", "❌ Ошибка чтения FastCGI ответа: "+err.Error(), "logs_php.log", false)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
// Не вызываем http.Error здесь, т.к. заголовки уже могли быть отправлены
return
}
// Обрабатываем ответ
processPHPResponse(w, response)
tools.Logs_file(0, "PHP", fmt.Sprintf("✅ FastCGI обработал: %s (порт %d)", phpPath, port), "logs_php.log", false)
}
// Чтение FastCGI ответа
func readFastCGIResponse(conn net.Conn, requestID uint16) ([]byte, error) {
// Streaming чтение FastCGI ответа с поддержкой SSE и chunked transfer
func streamFastCGIResponse(conn net.Conn, requestID uint16, w http.ResponseWriter) error {
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
var stdout bytes.Buffer
var stderr bytes.Buffer
var headerBuffer bytes.Buffer
headersWritten := false
flusher, canFlush := w.(http.Flusher)
for {
// Читаем заголовок FastCGI
headerBuf := make([]byte, 8)
_, err := io.ReadFull(conn, headerBuf)
if err != nil {
return nil, err
return err
}
var header FCGIHeader
@@ -406,7 +406,7 @@ func readFastCGIResponse(conn net.Conn, requestID uint16) ([]byte, error) {
if header.ContentLength > 0 {
_, err = io.ReadFull(conn, content)
if err != nil {
return nil, err
return err
}
}
@@ -420,9 +420,55 @@ func readFastCGIResponse(conn net.Conn, requestID uint16) ([]byte, error) {
switch header.Type {
case FCGI_STDOUT:
if header.ContentLength > 0 {
stdout.Write(content)
if !headersWritten {
// Накапливаем данные до разделителя заголовков
headerBuffer.Write(content)
// Ищем разделитель между заголовками и телом
headerStr := headerBuffer.String()
sepIndex := strings.Index(headerStr, "\r\n\r\n")
if sepIndex == -1 {
sepIndex = strings.Index(headerStr, "\n\n")
}
if sepIndex != -1 {
// Нашли разделитель - обрабатываем заголовки
var sepLen int
if strings.Contains(headerStr[:sepIndex+4], "\r\n\r\n") {
sepLen = 4
} else {
sepLen = 2
}
headersPart := headerStr[:sepIndex]
bodyPart := headerStr[sepIndex+sepLen:]
// Парсим и устанавливаем заголовки
processStreamingHeaders(w, headersPart)
headersWritten = true
// Отправляем первую часть тела
if len(bodyPart) > 0 {
w.Write([]byte(bodyPart))
if canFlush {
flusher.Flush()
}
}
}
} else {
// Заголовки уже отправлены - стримим тело
w.Write(content)
// Принудительно отправляем данные (критично для SSE)
if canFlush {
flusher.Flush()
}
}
} else {
// Пустой STDOUT означает конец
// Пустой STDOUT - конец данных, если остались заголовки без тела
if !headersWritten && headerBuffer.Len() > 0 {
processStreamingHeaders(w, headerBuffer.String())
headersWritten = true
}
}
case FCGI_STDERR:
if header.ContentLength > 0 {
@@ -433,61 +479,50 @@ func readFastCGIResponse(conn net.Conn, requestID uint16) ([]byte, error) {
if stderr.Len() > 0 {
tools.Logs_file(1, "PHP", "FastCGI stderr: "+stderr.String(), "logs_php.log", false)
}
return stdout.Bytes(), nil
// Если заголовки так и не были записаны (пустой ответ)
if !headersWritten {
w.WriteHeader(http.StatusOK)
}
return nil
}
}
}
// Обработка PHP ответа (как раньше)
func processPHPResponse(w http.ResponseWriter, response []byte) {
responseStr := string(response)
// Обработка заголовков для streaming ответа
func processStreamingHeaders(w http.ResponseWriter, headersPart string) {
headers := strings.Split(headersPart, "\n")
statusCode := 200
// Разбираем заголовки и тело
parts := strings.SplitN(responseStr, "\r\n\r\n", 2)
if len(parts) < 2 {
parts = strings.SplitN(responseStr, "\n\n", 2)
}
if len(parts) >= 2 {
headers := strings.Split(parts[0], "\n")
statusCode := 200
for _, header := range headers {
header = strings.TrimSpace(header)
if header == "" {
continue
}
if strings.HasPrefix(strings.ToLower(header), "content-type:") {
contentType := strings.TrimSpace(strings.SplitN(header, ":", 2)[1])
w.Header().Set("Content-Type", contentType)
} else if strings.HasPrefix(strings.ToLower(header), "set-cookie:") {
cookie := strings.TrimSpace(strings.SplitN(header, ":", 2)[1])
w.Header().Add("Set-Cookie", cookie)
} else if strings.HasPrefix(strings.ToLower(header), "location:") {
location := strings.TrimSpace(strings.SplitN(header, ":", 2)[1])
w.Header().Set("Location", location)
w.WriteHeader(http.StatusFound)
return
} else if strings.HasPrefix(strings.ToLower(header), "status:") {
status := strings.TrimSpace(strings.SplitN(header, ":", 2)[1])
if code, err := strconv.Atoi(strings.Split(status, " ")[0]); err == nil {
statusCode = code
}
} else if strings.Contains(header, ":") {
headerParts := strings.SplitN(header, ":", 2)
if len(headerParts) == 2 {
w.Header().Set(strings.TrimSpace(headerParts[0]), strings.TrimSpace(headerParts[1]))
}
}
for _, header := range headers {
header = strings.TrimSpace(header)
if header == "" {
continue
}
w.WriteHeader(statusCode)
w.Write([]byte(parts[1]))
} else {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Write(response)
if strings.HasPrefix(strings.ToLower(header), "content-type:") {
contentType := strings.TrimSpace(strings.SplitN(header, ":", 2)[1])
w.Header().Set("Content-Type", contentType)
} else if strings.HasPrefix(strings.ToLower(header), "set-cookie:") {
cookie := strings.TrimSpace(strings.SplitN(header, ":", 2)[1])
w.Header().Add("Set-Cookie", cookie)
} else if strings.HasPrefix(strings.ToLower(header), "location:") {
location := strings.TrimSpace(strings.SplitN(header, ":", 2)[1])
w.Header().Set("Location", location)
statusCode = http.StatusFound
} else if strings.HasPrefix(strings.ToLower(header), "status:") {
status := strings.TrimSpace(strings.SplitN(header, ":", 2)[1])
if code, err := strconv.Atoi(strings.Split(status, " ")[0]); err == nil {
statusCode = code
}
} else if strings.Contains(header, ":") {
headerParts := strings.SplitN(header, ":", 2)
if len(headerParts) == 2 {
w.Header().Set(strings.TrimSpace(headerParts[0]), strings.TrimSpace(headerParts[1]))
}
}
}
w.WriteHeader(statusCode)
}
// PHP_Stop останавливает все FastCGI процессы

View File

@@ -154,9 +154,34 @@ func StartHandlerProxy(w http.ResponseWriter, r *http.Request) (valid bool) {
// Устанавливаем статус код
w.WriteHeader(resp.StatusCode)
// Копируем тело ответа
if _, err := io.Copy(w, resp.Body); err != nil {
log.Printf("Ошибка копирования тела ответа: %v", err)
// Копируем тело ответа с поддержкой streaming (SSE, chunked responses)
// Используем буферизированное копирование с принудительной отправкой данных
flusher, canFlush := w.(http.Flusher)
// Буфер для чанков (32KB - оптимальный размер для баланса производительности)
buffer := make([]byte, 32*1024)
for {
n, err := resp.Body.Read(buffer)
if n > 0 {
// Записываем прочитанные данные
if _, writeErr := w.Write(buffer[:n]); writeErr != nil {
log.Printf("Ошибка записи тела ответа: %v", writeErr)
break
}
// Принудительно отправляем данные клиенту (критично для SSE)
if canFlush {
flusher.Flush()
}
}
if err != nil {
if err != io.EOF {
log.Printf("Ошибка чтения тела ответа: %v", err)
}
break
}
}
return valid