diff --git a/packages/envd/internal/services/process/handler/handler.go b/packages/envd/internal/services/process/handler/handler.go index 98108ea881..fca00c0b8f 100644 --- a/packages/envd/internal/services/process/handler/handler.go +++ b/packages/envd/internal/services/process/handler/handler.go @@ -8,6 +8,7 @@ import ( "os" "os/exec" "os/user" + "slices" "strconv" "strings" "sync" @@ -242,21 +243,25 @@ func New( go logs.LogBufferedDataEvents(stdoutLogs, &stdoutLogger, "data") + // Reusable read buffer to avoid allocation per Read cycle when no + // subscribers are connected. + readBuf := make([]byte, stdChunkSize) + for { - buf := make([]byte, stdChunkSize) + n, readErr := stdout.Read(readBuf) - n, readErr := stdout.Read(buf) + if n > 0 && outMultiplex.HasSubscribers() { + data := slices.Clone(readBuf[:n]) - if n > 0 { outMultiplex.Source <- rpc.ProcessEvent_Data{ Data: &rpc.ProcessEvent_DataEvent{ Output: &rpc.ProcessEvent_DataEvent_Stdout{ - Stdout: buf[:n], + Stdout: data, }, }, } - stdoutLogs <- buf[:n] + stdoutLogs <- data } if errors.Is(readErr, io.EOF) { @@ -284,21 +289,23 @@ func New( go logs.LogBufferedDataEvents(stderrLogs, &stderrLogger, "data") + readBuf := make([]byte, stdChunkSize) + for { - buf := make([]byte, stdChunkSize) + n, readErr := stderr.Read(readBuf) - n, readErr := stderr.Read(buf) + if n > 0 && outMultiplex.HasSubscribers() { + data := slices.Clone(readBuf[:n]) - if n > 0 { outMultiplex.Source <- rpc.ProcessEvent_Data{ Data: &rpc.ProcessEvent_DataEvent{ Output: &rpc.ProcessEvent_DataEvent_Stderr{ - Stderr: buf[:n], + Stderr: data, }, }, } - stderrLogs <- buf[:n] + stderrLogs <- data } if errors.Is(readErr, io.EOF) { diff --git a/packages/envd/internal/services/process/handler/multiplex.go b/packages/envd/internal/services/process/handler/multiplex.go index ff699812db..9b9b3ad12d 100644 --- a/packages/envd/internal/services/process/handler/multiplex.go +++ b/packages/envd/internal/services/process/handler/multiplex.go @@ -84,6 +84,20 @@ func (m *MultiplexedChannel[T]) run() { m.channels = nil } +// HasSubscribers reports whether any non-cancelled subscriber exists. +func (m *MultiplexedChannel[T]) HasSubscribers() bool { + m.mu.RLock() + defer m.mu.RUnlock() + + for _, s := range m.channels { + if !s.isCancelled() { + return true + } + } + + return false +} + // Fork registers a new subscriber and returns its channel plus a cancel func. // If Source is already closed it returns a pre-closed channel and a no-op cancel. // The channel is bidirectional for backwards compat with start.go which writes diff --git a/packages/envd/pkg/version.go b/packages/envd/pkg/version.go index e251abd82f..01857d46e4 100644 --- a/packages/envd/pkg/version.go +++ b/packages/envd/pkg/version.go @@ -1,3 +1,3 @@ package pkg -const Version = "0.5.18" +const Version = "0.5.19"