diff --git a/.golangci.yml b/.golangci.yml index 6b030f98..32410c08 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -23,6 +23,7 @@ linters: - containedctx - contextcheck - copyloopvar + - depguard - dogsled - errcheck - errchkjson @@ -58,6 +59,7 @@ linters: - errcheck - forcetypeassert - gosec + - depguard - path: 'cmd/(.+)/main\.go' linters: - forbidigo @@ -77,6 +79,12 @@ linters: allow-no-explanation: [] require-explanation: true require-specific: true + depguard: + rules: + test-only: + deny: + - pkg: github.com/replicate/cog-runtime/internal/loggingtest + desc: "loggingtest is test-only" revive: confidence: 0.8 severity: warning diff --git a/cmd/cog/main.go b/cmd/cog/main.go index c659947b..96404148 100644 --- a/cmd/cog/main.go +++ b/cmd/cog/main.go @@ -9,11 +9,9 @@ import ( "time" "github.com/alecthomas/kong" - "github.com/replicate/go/logging" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" "github.com/replicate/cog-runtime/internal/config" + "github.com/replicate/cog-runtime/internal/logging" "github.com/replicate/cog-runtime/internal/runner" "github.com/replicate/cog-runtime/internal/service" "github.com/replicate/cog-runtime/internal/version" @@ -42,30 +40,15 @@ type CLI struct { Test TestCmd `cmd:"" help:"Run model tests to verify functionality"` } -// createBaseLogger creates a base logger with configurable level -func createBaseLogger(name string) *zap.Logger { - logLevel := os.Getenv("COG_LOG_LEVEL") - if logLevel == "" { - logLevel = "info" - } - _, err := zapcore.ParseLevel(logLevel) - if err != nil { - fmt.Printf("Failed to parse log level \"%s\": %s\n", logLevel, err) //nolint:forbidigo // logger setup error reporting - } - - return logging.New(name).WithOptions(zap.IncreaseLevel(zapcore.DebugLevel)) -} - // buildServiceConfig converts CLI ServerCmd to service configuration func buildServiceConfig(s *ServerCmd) (config.Config, error) { - log := createBaseLogger("cog-config").Sugar() + log := logging.New("cog-config").Sugar() logLevel := log.Level() - log.Infow("log level", "level", logLevel) - log.Infow("env log level", "level", os.Getenv("COG_LOG_LEVEL")) + log.Debugw("log level", "level", logLevel) // One-shot mode requires procedure mode if s.OneShot && !s.UseProcedureMode { - log.Error("one-shot mode requires procedure mode") + log.Fatal("one-shot mode requires procedure mode") return config.Config{}, fmt.Errorf("one-shot mode requires procedure mode, use --use-procedure-mode") } @@ -115,7 +98,7 @@ func buildServiceConfig(s *ServerCmd) (config.Config, error) { func (s *ServerCmd) Run() error { // Create base logger - baseLogger := createBaseLogger("cog") + baseLogger := logging.New("cog") log := baseLogger.Sugar() // Build service configuration @@ -143,7 +126,7 @@ func (s *ServerCmd) Run() error { } func (s *SchemaCmd) Run() error { - log := createBaseLogger("cog-schema").Sugar() + log := logging.New("cog-schema").Sugar() wd, err := os.Getwd() if err != nil { @@ -169,7 +152,7 @@ func (s *SchemaCmd) Run() error { } func (t *TestCmd) Run() error { - log := createBaseLogger("cog-test").Sugar() + log := logging.New("cog-test").Sugar() wd, err := os.Getwd() if err != nil { diff --git a/internal/logging/logger.go b/internal/logging/logger.go new file mode 100644 index 00000000..3d1fc44f --- /dev/null +++ b/internal/logging/logger.go @@ -0,0 +1,171 @@ +package logging + +import ( + "fmt" + "os" + "strings" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// Custom log levels with Trace below Debug +const ( + TraceLevel = zapcore.Level(-8) // Below Debug (-4) +) + +// customLowercaseLevelEncoder handles our custom Trace level display (lowercase) +func customLowercaseLevelEncoder(level zapcore.Level, enc zapcore.PrimitiveArrayEncoder) { + switch level { + case TraceLevel: + enc.AppendString("trace") + default: + zapcore.LowercaseLevelEncoder(level, enc) + } +} + +// customColorLevelEncoder handles our custom Trace level display (with colors) +func customColorLevelEncoder(level zapcore.Level, enc zapcore.PrimitiveArrayEncoder) { + switch level { + case TraceLevel: + enc.AppendString("\x1b[90mTRACE\x1b[0m") // Gray color for trace + default: + zapcore.CapitalColorLevelEncoder(level, enc) + } +} + +// Logger embeds zap.Logger and adds Trace level support +type Logger struct { + *zap.Logger +} + +// SugaredLogger embeds zap.SugaredLogger and adds Trace level support +type SugaredLogger struct { + *zap.SugaredLogger +} + +// New creates a new logger with the given name +func New(name string) *Logger { + // Check if we should use development config (console format) + logFormat := os.Getenv("LOG_FORMAT") + isDevelopment := logFormat == "development" || logFormat == "console" + + var cfg zap.Config + if isDevelopment { + cfg = zap.NewDevelopmentConfig() + cfg.Level = zap.NewAtomicLevelAt(zapcore.DebugLevel) + cfg.EncoderConfig.EncodeLevel = customColorLevelEncoder + } else { + cfg = zap.NewProductionConfig() + cfg.Level = zap.NewAtomicLevelAt(zapcore.InfoLevel) + cfg.EncoderConfig.EncodeLevel = customLowercaseLevelEncoder + } + + // Set log level from environment (COG_LOG_LEVEL takes precedence, fallback to LOG_LEVEL) + logLevel := os.Getenv("COG_LOG_LEVEL") + if logLevel == "" { + logLevel = os.Getenv("LOG_LEVEL") + } + if logLevel != "" { + level, err := parseLevel(logLevel) + if err != nil { + fmt.Printf("Failed to parse log level \"%s\": %s\n", logLevel, err) //nolint:forbidigo // logger setup error reporting + } else { + cfg.Level = zap.NewAtomicLevelAt(level) + } + } + + // Set output file if LOG_FILE is specified + logFile := os.Getenv("LOG_FILE") + if logFile != "" { + cfg.OutputPaths = []string{logFile} + cfg.ErrorOutputPaths = []string{logFile} + } else { + cfg.OutputPaths = []string{"stdout"} + cfg.ErrorOutputPaths = []string{"stderr"} + } + + // Common encoder config + cfg.EncoderConfig.TimeKey = "timestamp" + cfg.EncoderConfig.LevelKey = "severity" + cfg.EncoderConfig.NameKey = "logger" + cfg.EncoderConfig.CallerKey = "caller" + cfg.EncoderConfig.MessageKey = "message" + cfg.EncoderConfig.StacktraceKey = "stacktrace" + cfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + cfg.EncoderConfig.EncodeDuration = zapcore.StringDurationEncoder + cfg.EncoderConfig.EncodeCaller = zapcore.ShortCallerEncoder + + // Disable sampling for now (can be re-enabled later if needed) + cfg.Sampling = nil + + zapLogger, err := cfg.Build() + if err != nil { + panic(fmt.Sprintf("Failed to build logger: %v", err)) + } + + return &Logger{Logger: zapLogger.Named(name)} +} + +// parseLevel parses log level string including our custom "trace" level +func parseLevel(level string) (zapcore.Level, error) { + switch strings.ToLower(level) { + case "trace": + return TraceLevel, nil + case "debug": + return zapcore.DebugLevel, nil + case "info": + return zapcore.InfoLevel, nil + case "warn", "warning": + return zapcore.WarnLevel, nil + case "error": + return zapcore.ErrorLevel, nil + default: + return zapcore.InfoLevel, fmt.Errorf("unknown log level: %s", level) + } +} + +// Override Sugar to return our custom SugaredLogger +func (l *Logger) Sugar() *SugaredLogger { + return &SugaredLogger{SugaredLogger: l.Logger.Sugar()} +} + +// Override Named to return our custom Logger +func (l *Logger) Named(name string) *Logger { + return &Logger{Logger: l.Logger.Named(name)} +} + +// Override With to return our custom Logger +func (l *Logger) With(fields ...zap.Field) *Logger { + return &Logger{Logger: l.Logger.With(fields...)} +} + +// Override WithOptions to return our custom Logger +func (l *Logger) WithOptions(opts ...zap.Option) *Logger { + return &Logger{Logger: l.Logger.WithOptions(opts...)} +} + +// Add Trace method to Logger +func (l *Logger) Trace(msg string, fields ...zap.Field) { + l.Log(TraceLevel, msg, fields...) +} + +// Add Trace method to SugaredLogger +func (s *SugaredLogger) Trace(args ...any) { + s.Log(TraceLevel, args...) +} + +// Add Tracew method to SugaredLogger +func (s *SugaredLogger) Tracew(msg string, keysAndValues ...any) { + s.Logw(TraceLevel, msg, keysAndValues...) +} + +// Override With to return our custom SugaredLogger +func (s *SugaredLogger) With(args ...any) *SugaredLogger { + return &SugaredLogger{SugaredLogger: s.SugaredLogger.With(args...)} +} + +// Override Named to return our custom SugaredLogger +func (s *SugaredLogger) Named(name string) *SugaredLogger { + return &SugaredLogger{SugaredLogger: s.SugaredLogger.Named(name)} +} diff --git a/internal/logging/logging_test.go b/internal/logging/logging_test.go new file mode 100644 index 00000000..c41232d7 --- /dev/null +++ b/internal/logging/logging_test.go @@ -0,0 +1,168 @@ +package logging + +import ( + "os" + "testing" + + "go.uber.org/zap/zapcore" +) + +func TestNew(t *testing.T) { + // Save original env vars + originalCogLevel := os.Getenv("COG_LOG_LEVEL") + originalLogLevel := os.Getenv("LOG_LEVEL") + originalLogFormat := os.Getenv("LOG_FORMAT") + originalLogFile := os.Getenv("LOG_FILE") + + defer func() { + // Restore original env vars + os.Setenv("COG_LOG_LEVEL", originalCogLevel) + os.Setenv("LOG_LEVEL", originalLogLevel) + os.Setenv("LOG_FORMAT", originalLogFormat) + os.Setenv("LOG_FILE", originalLogFile) + }() + + t.Run("creates logger with default level", func(t *testing.T) { + os.Unsetenv("COG_LOG_LEVEL") + os.Unsetenv("LOG_LEVEL") + os.Unsetenv("LOG_FORMAT") + os.Unsetenv("LOG_FILE") + + logger := New("test") + if logger == nil { + t.Fatal("expected logger to be created") + } + + // Should be able to call basic methods + logger.Info("test message") + logger.Debug("debug message") + logger.Trace("trace message") + }) + + t.Run("respects COG_LOG_LEVEL environment variable", func(t *testing.T) { + os.Setenv("COG_LOG_LEVEL", "debug") + defer os.Unsetenv("COG_LOG_LEVEL") + + logger := New("test") + if logger == nil { + t.Fatal("expected logger to be created") + } + }) + + t.Run("respects LOG_LEVEL as fallback", func(t *testing.T) { + os.Unsetenv("COG_LOG_LEVEL") + os.Setenv("LOG_LEVEL", "warn") + defer os.Unsetenv("LOG_LEVEL") + + logger := New("test") + if logger == nil { + t.Fatal("expected logger to be created") + } + }) + + t.Run("handles development format", func(t *testing.T) { + os.Setenv("LOG_FORMAT", "development") + defer os.Unsetenv("LOG_FORMAT") + + logger := New("test") + if logger == nil { + t.Fatal("expected logger to be created") + } + }) + + t.Run("handles console format", func(t *testing.T) { + os.Setenv("LOG_FORMAT", "console") + defer os.Unsetenv("LOG_FORMAT") + + logger := New("test") + if logger == nil { + t.Fatal("expected logger to be created") + } + }) + + t.Run("respects LOG_FILE environment variable", func(t *testing.T) { + // Use test temp directory for log file output + tempDir := t.TempDir() + logFile := tempDir + "/test.log" + + os.Setenv("LOG_FILE", logFile) + defer os.Unsetenv("LOG_FILE") + + logger := New("test") + if logger == nil { + t.Fatal("expected logger to be created") + } + + // Write a log message + logger.Info("test log to file") + + // Verify file was created (basic check) + if _, err := os.Stat(logFile); os.IsNotExist(err) { + t.Errorf("expected log file to be created at %s", logFile) + } + }) + + t.Run("handles LOG_FILE=stdout", func(t *testing.T) { + os.Setenv("LOG_FILE", "stdout") + defer os.Unsetenv("LOG_FILE") + + logger := New("test") + if logger == nil { + t.Fatal("expected logger to be created") + } + + // Should not panic when writing to stdout + logger.Info("test log to stdout") + }) + + t.Run("handles LOG_FILE=stderr", func(t *testing.T) { + os.Setenv("LOG_FILE", "stderr") + defer os.Unsetenv("LOG_FILE") + + logger := New("test") + if logger == nil { + t.Fatal("expected logger to be created") + } + + // Should not panic when writing to stderr + logger.Info("test log to stderr") + }) +} + +func TestParseLevel(t *testing.T) { + tests := []struct { + input string + expected zapcore.Level + hasError bool + }{ + {"trace", TraceLevel, false}, + {"debug", zapcore.DebugLevel, false}, + {"info", zapcore.InfoLevel, false}, + {"warn", zapcore.WarnLevel, false}, + {"warning", zapcore.WarnLevel, false}, + {"error", zapcore.ErrorLevel, false}, + {"TRACE", TraceLevel, false}, + {"DEBUG", zapcore.DebugLevel, false}, + {"INFO", zapcore.InfoLevel, false}, + {"WARN", zapcore.WarnLevel, false}, + {"WARNING", zapcore.WarnLevel, false}, + {"ERROR", zapcore.ErrorLevel, false}, + {"invalid", zapcore.InfoLevel, true}, + {"", zapcore.InfoLevel, true}, + } + + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + level, err := parseLevel(tt.input) + if tt.hasError && err == nil { + t.Errorf("expected error for input %q", tt.input) + } + if !tt.hasError && err != nil { + t.Errorf("unexpected error for input %q: %v", tt.input, err) + } + if level != tt.expected { + t.Errorf("expected level %v, got %v for input %q", tt.expected, level, tt.input) + } + }) + } +} diff --git a/internal/loggingtest/test_helper.go b/internal/loggingtest/test_helper.go new file mode 100644 index 00000000..37279261 --- /dev/null +++ b/internal/loggingtest/test_helper.go @@ -0,0 +1,50 @@ +package loggingtest + +import ( + "testing" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest" + + "github.com/replicate/cog-runtime/internal/logging" +) + +// customTestLevelEncoder handles our custom Trace level display for tests +func customTestLevelEncoder(level zapcore.Level, enc zapcore.PrimitiveArrayEncoder) { + switch level { + case logging.TraceLevel: + enc.AppendString("TRACE") + default: + zapcore.CapitalLevelEncoder(level, enc) + } +} + +// NewTestLogger creates a logger for tests that outputs to t.Logf +// Behaves exactly like zaptest.NewLogger but with trace support added +func NewTestLogger(t *testing.T) *logging.Logger { + t.Helper() + + // Create test logger with custom level encoder + zapLogger := zaptest.NewLogger(t, + zaptest.Level(logging.TraceLevel), + zaptest.WrapOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core { + // Replace the encoder to handle our custom trace level + enc := zapcore.NewConsoleEncoder(zapcore.EncoderConfig{ + TimeKey: "T", + LevelKey: "L", + NameKey: "N", + CallerKey: "C", + MessageKey: "M", + StacktraceKey: "S", + LineEnding: zapcore.DefaultLineEnding, + EncodeLevel: customTestLevelEncoder, + EncodeTime: zapcore.ISO8601TimeEncoder, + EncodeDuration: zapcore.StringDurationEncoder, + EncodeCaller: zapcore.ShortCallerEncoder, + }) + return zapcore.NewCore(enc, zapcore.AddSync(zaptest.NewTestingWriter(t)), logging.TraceLevel) + })), + ) + return &logging.Logger{Logger: zapLogger} +} diff --git a/internal/loggingtest/test_helper_test.go b/internal/loggingtest/test_helper_test.go new file mode 100644 index 00000000..a8b14ab3 --- /dev/null +++ b/internal/loggingtest/test_helper_test.go @@ -0,0 +1,105 @@ +package loggingtest + +import ( + "os" + "testing" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/replicate/cog-runtime/internal/logging" +) + +func TestLoggerMethods(t *testing.T) { + logger := NewTestLogger(t) + + // Test all logger methods + logger.Trace("trace message", zap.String("key", "value")) + logger.Debug("debug message", zap.String("key", "value")) + logger.Info("info message", zap.String("key", "value")) + logger.Warn("warn message", zap.String("key", "value")) + logger.Error("error message", zap.String("key", "value")) +} + +func TestSugaredLoggerMethods(t *testing.T) { + logger := NewTestLogger(t) + sugar := logger.Sugar() + + // Test all sugared logger methods + sugar.Trace("trace message") + sugar.Tracew("trace message", "key", "value") + sugar.Debug("debug message") + sugar.Debugw("debug message", "key", "value") + sugar.Info("info message") + sugar.Infow("info message", "key", "value") + sugar.Warn("warn message") + sugar.Warnw("warn message", "key", "value") + sugar.Error("error message") + sugar.Errorw("error message", "key", "value") +} + +func TestLoggerChaining(t *testing.T) { + logger := NewTestLogger(t) + + // Test Named returns our custom Logger + namedLogger := logger.Named("child") + if namedLogger == nil { + t.Fatal("expected named logger to be created") + } + + // Test With returns our custom Logger + withLogger := logger.With(zap.String("component", "test")) + if withLogger == nil { + t.Fatal("expected with logger to be created") + } + + // Test WithOptions returns our custom Logger + optionsLogger := logger.WithOptions(zap.AddCaller()) + if optionsLogger == nil { + t.Fatal("expected options logger to be created") + } + + // Test that chained loggers have trace support + namedLogger.Trace("named trace") + withLogger.Trace("with trace") + optionsLogger.Trace("options trace") +} + +func TestSugaredLoggerChaining(t *testing.T) { + logger := NewTestLogger(t) + sugar := logger.Sugar() + + // Test With returns our custom SugaredLogger with Trace support + withSugar := sugar.With("component", "test") + withSugar.Trace("trace with sugar chaining") + withSugar.Tracew("tracew with sugar chaining", "key", "value") + + // Test Named returns our custom SugaredLogger with Trace support + namedSugar := sugar.Named("child") + namedSugar.Trace("trace with named sugar") + namedSugar.Tracew("tracew with named sugar", "key", "value") + + // Test chaining both With and Named + chainedSugar := sugar.With("component", "test").Named("child") + chainedSugar.Trace("trace with full chaining") + chainedSugar.Tracew("tracew with full chaining", "key", "value") +} + +func TestTraceLevel(t *testing.T) { + // Verify TraceLevel is below DebugLevel + if logging.TraceLevel >= zapcore.DebugLevel { + t.Errorf("TraceLevel (%d) should be below DebugLevel (%d)", logging.TraceLevel, zapcore.DebugLevel) + } + + // Create a logger with trace level + os.Setenv("COG_LOG_LEVEL", "trace") + defer os.Unsetenv("COG_LOG_LEVEL") + + logger := NewTestLogger(t) + + // Test that trace methods exist and can be called + logger.Trace("trace message") + sugar := logger.Sugar() + sugar.Trace("sugared trace") + sugar.Tracew("sugared trace with fields", "key", "value") +} diff --git a/internal/runner/manager.go b/internal/runner/manager.go index 198c0736..19eddd70 100644 --- a/internal/runner/manager.go +++ b/internal/runner/manager.go @@ -18,6 +18,7 @@ import ( "go.uber.org/zap" "github.com/replicate/cog-runtime/internal/config" + "github.com/replicate/cog-runtime/internal/logging" "github.com/replicate/cog-runtime/internal/webhook" ) @@ -45,12 +46,12 @@ type Manager struct { mu sync.RWMutex - baseLogger *zap.Logger // base logger passed from parent, used to create named loggers for runners - logger *zap.Logger + baseLogger *logging.Logger // base logger passed from parent, used to create named loggers for runners + logger *logging.Logger } // NewManager creates a new runner manager with channel-based capacity control -func NewManager(ctx context.Context, cfg config.Config, logger *zap.Logger) *Manager { +func NewManager(ctx context.Context, cfg config.Config, logger *logging.Logger) *Manager { m := newManager(ctx, cfg, logger) // Pre-load default runner in non-procedure mode if !cfg.UseProcedureMode { @@ -62,7 +63,7 @@ func NewManager(ctx context.Context, cfg config.Config, logger *zap.Logger) *Man return m } -func newManager(ctx context.Context, cfg config.Config, logger *zap.Logger) *Manager { +func newManager(ctx context.Context, cfg config.Config, logger *logging.Logger) *Manager { maxRunners := cfg.MaxRunners if cfg.UseProcedureMode { if cfg.OneShot { @@ -86,7 +87,7 @@ func newManager(ctx context.Context, cfg config.Config, logger *zap.Logger) *Man maxRunners = 1 } else { maxRunners = max(1, cogYaml.Concurrency.Max) - logger.Info("read concurrency from cog.yaml", zap.Int("max_concurrency", maxRunners)) + logger.Trace("read concurrency from cog.yaml", zap.Int("max_concurrency", maxRunners)) } } } else { @@ -96,7 +97,7 @@ func newManager(ctx context.Context, cfg config.Config, logger *zap.Logger) *Man maxRunners = 1 } else { maxRunners = max(1, cogYaml.Concurrency.Max) - logger.Info("read concurrency from cog.yaml", zap.Int("max_concurrency", maxRunners)) + logger.Debug("read concurrency from cog.yaml", zap.Int("max_concurrency", maxRunners)) } } } @@ -175,7 +176,7 @@ func (m *Manager) PredictAsync(ctx context.Context, req PredictionRequest) error runner, err := m.assignReqToRunner(deadlineCtx, req) if err != nil { - log.Debugw("failed to get runner for async request", "error", err) + log.Tracew("failed to get runner for async request", "error", err) m.releaseSlot() return err } @@ -196,7 +197,7 @@ func (m *Manager) PredictAsync(ctx context.Context, req PredictionRequest) error respChan, err := runner.predict(req) if err != nil { - log.Debugw("failed to predict", "error", err) + log.Tracew("failed to predict", "error", err) m.releaseSlot() return err } @@ -205,7 +206,7 @@ func (m *Manager) PredictAsync(ctx context.Context, req PredictionRequest) error go func() { defer m.releaseSlot() // Release slot after prediction completes <-respChan // Wait for prediction to complete - log.Debugw("async prediction completed", "prediction_id", req.ID) + log.Tracew("async prediction completed", "prediction_id", req.ID) }() return nil @@ -294,7 +295,7 @@ func (m *Manager) createDefaultRunner(ctx context.Context) (*Runner, error) { } } - log.Infow("creating default runner", + log.Debugw("creating default runner", "working_dir", workingDir, "ipc_url", m.cfg.IPCUrl, "python_bin", m.cfg.PythonBinPath, @@ -313,7 +314,7 @@ func (m *Manager) createDefaultRunner(ctx context.Context) (*Runner, error) { "--working-dir", workingDir, } - log.Infow("runner command", "python_path", pythonPath, "args", args, "working_dir", workingDir) + log.Debugw("runner command", "python_path", pythonPath, "args", args, "working_dir", workingDir) tmpDir, err := os.MkdirTemp("", "cog-runner-tmp-") if err != nil { @@ -327,6 +328,14 @@ func (m *Manager) createDefaultRunner(ctx context.Context) (*Runner, error) { cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} env := mergeEnv(os.Environ(), m.cfg.EnvSet, m.cfg.EnvUnset) env = append(env, "TMPDIR="+tmpDir) + + // Ensure Python processes never receive trace level logs + if logLevel := os.Getenv("COG_LOG_LEVEL"); logLevel == "trace" { + env = append(env, "COG_LOG_LEVEL=debug") + } else if logLevel := os.Getenv("LOG_LEVEL"); logLevel == "trace" { + env = append(env, "LOG_LEVEL=debug") + } + cmd.Env = env // Read cog.yaml for runner configuration (capacity was already set in newManager) @@ -476,7 +485,7 @@ func (m *Manager) assignReqToRunner(ctx context.Context, req PredictionRequest) // First, try to find existing runner with capacity and atomically reserve slot procRunner := m.findRunnerWithCapacity(ctx, req) if procRunner != nil { - log.Debugw("allocated request to existing runner", "runner", procRunner.runnerCtx.id) + log.Tracew("allocated request to existing runner", "runner", procRunner.runnerCtx.id) return procRunner, nil } @@ -580,7 +589,7 @@ func (m *Manager) allocateRunnerSlot(procedureHash string) (*Runner, error) { // No empty slots, try to evict an idle runner or defunct runner for i, runner := range m.runners { if runner != nil && ((runner.status == StatusReady && runner.Idle()) || runner.status == StatusDefunct) { - log.Infow("evicting idle runner", "name", runner.runnerCtx.id) + log.Debugw("evicting idle runner", "name", runner.runnerCtx.id) err := runner.Stop() if err != nil { log.Errorw("failed to stop runner", "name", runner.runnerCtx.id, "error", err) @@ -648,6 +657,14 @@ func (m *Manager) createProcedureRunner(runnerName, procedureHash string) (*Runn cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} env := mergeEnv(os.Environ(), m.cfg.EnvSet, m.cfg.EnvUnset) env = append(env, "TMPDIR="+tmpDir) + + // Ensure Python processes never receive trace level logs + if logLevel := os.Getenv("COG_LOG_LEVEL"); logLevel == "trace" { + env = append(env, "COG_LOG_LEVEL=debug") + } else if logLevel := os.Getenv("LOG_LEVEL"); logLevel == "trace" { + env = append(env, "LOG_LEVEL=debug") + } + cmd.Env = env var allocatedUID *int @@ -796,7 +813,7 @@ func (m *Manager) Stop() error { // Wait for runners to become idle or timeout using WaitGroup gracePeriod := m.cfg.RunnerShutdownGracePeriod - log.Infow("grace period configuration", "grace_period", gracePeriod) + log.Debugw("grace period configuration", "grace_period", gracePeriod) graceCtx, cancel := context.WithTimeout(m.ctx, gracePeriod) defer cancel() @@ -807,7 +824,7 @@ func (m *Manager) Stop() error { // Wait for this runner to become idle OR timeout select { case <-runner.readyForShutdown: - log.Infow("runner became idle naturally", "name", runner.runnerCtx.id) + log.Debugw("runner became idle naturally", "name", runner.runnerCtx.id) case <-graceCtx.Done(): log.Warnw("grace period expired for runner", "name", runner.runnerCtx.id, "context_err", graceCtx.Err()) } @@ -860,7 +877,7 @@ func (m *Manager) Status() string { runner.mu.Unlock() return status } - log.Debug("default runner not found, returning STARTING") + log.Trace("default runner not found, returning STARTING") return "STARTING" } @@ -1054,9 +1071,9 @@ func (m *Manager) monitorRunnerSubprocess(ctx context.Context, runnerName string } // Capture crash logs from runner and fail predictions one by one - log.Debugw("checking runner logs for crash", "runner_logs_count", len(runner.logs), "runner_logs", runner.logs) + log.Tracew("checking runner logs for crash", "runner_logs_count", len(runner.logs), "runner_logs", runner.logs) crashLogs := runner.logs - log.Debugw("captured crash logs", "crash_logs_count", len(crashLogs), "crash_logs", crashLogs) + log.Tracew("captured crash logs", "crash_logs_count", len(crashLogs), "crash_logs", crashLogs) for id, pending := range runner.pending { log.Debugw("failing prediction due to setup failure", "prediction_id", id) diff --git a/internal/runner/manager_test.go b/internal/runner/manager_test.go index de869fae..16f87965 100644 --- a/internal/runner/manager_test.go +++ b/internal/runner/manager_test.go @@ -9,9 +9,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" "github.com/replicate/cog-runtime/internal/config" + "github.com/replicate/cog-runtime/internal/loggingtest" ) func TestNewManager(t *testing.T) { @@ -55,7 +55,7 @@ func TestNewManager(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), tt.cfg, logger) assert.True(t, m.cfg.UseProcedureMode) @@ -75,7 +75,7 @@ func TestNewManager(t *testing.T) { MaxRunners: 10, // Should be ignored } - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) assert.False(t, m.cfg.UseProcedureMode) @@ -90,7 +90,7 @@ func TestManager(t *testing.T) { t.Parallel() cfg := config.Config{} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) assert.False(t, m.IsStopped()) @@ -105,7 +105,7 @@ func TestManager(t *testing.T) { t.Parallel() cfg := config.Config{UseProcedureMode: true} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) exitCode := m.ExitCode() @@ -116,7 +116,7 @@ func TestManager(t *testing.T) { t.Parallel() cfg := config.Config{UseProcedureMode: false} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) exitCode := m.ExitCode() @@ -134,7 +134,7 @@ func TestManagerSlots(t *testing.T) { UseProcedureMode: true, MaxRunners: 10, } - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) // Pre-fill some runners to force name collision checks @@ -164,7 +164,7 @@ func TestManagerSlots(t *testing.T) { UseProcedureMode: true, MaxRunners: 2, } - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) // Fill to capacity with idle runners @@ -196,7 +196,7 @@ func TestManagerSlots(t *testing.T) { t.Parallel() cfg := config.Config{MaxRunners: 2, UseProcedureMode: true} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) err := m.claimSlot() @@ -208,7 +208,7 @@ func TestManagerSlots(t *testing.T) { t.Parallel() cfg := config.Config{MaxRunners: 1} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) // Claim the only slot @@ -224,7 +224,7 @@ func TestManagerSlots(t *testing.T) { t.Parallel() cfg := config.Config{MaxRunners: 2, UseProcedureMode: true} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) // Claim a slot @@ -241,7 +241,7 @@ func TestManagerSlots(t *testing.T) { t.Parallel() cfg := config.Config{MaxRunners: 1} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) // Channel starts full, try to release @@ -259,7 +259,7 @@ func TestManagerRunnerManagement(t *testing.T) { UseProcedureMode: true, MaxRunners: 4, } - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) runner1 := &Runner{ @@ -287,7 +287,7 @@ func TestManagerRunnerManagement(t *testing.T) { UseProcedureMode: true, MaxRunners: 4, } - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) killCalled1 := false @@ -296,12 +296,12 @@ func TestManagerRunnerManagement(t *testing.T) { runner1 := &Runner{ runnerCtx: RunnerContext{id: "runner1"}, killFn: func(pid int) error { killCalled1 = true; return nil }, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } runner2 := &Runner{ runnerCtx: RunnerContext{id: "runner2"}, killFn: func(pid int) error { killCalled2 = true; return nil }, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } // Mock running processes @@ -328,7 +328,7 @@ func TestManagerCapacity(t *testing.T) { UseProcedureMode: true, MaxRunners: 5, } - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) capacity := m.Capacity() @@ -356,7 +356,7 @@ func TestManagerCapacity(t *testing.T) { UseProcedureMode: false, MaxRunners: 5, } - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) capacity := m.Capacity() @@ -379,7 +379,7 @@ func TestManagerCapacity(t *testing.T) { UseProcedureMode: false, WorkingDirectory: tempDir, } - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) // In non-procedure mode, newManager should read cog.yaml and set capacity accordingly @@ -395,7 +395,7 @@ func TestManagerCapacity(t *testing.T) { UseProcedureMode: false, WorkingDirectory: tempDir, } - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) // In non-procedure mode, newManager should use fallback concurrency when cog.yaml missing @@ -413,7 +413,7 @@ func TestManagerCapacity(t *testing.T) { UseProcedureMode: false, WorkingDirectory: tempDir, } - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) // In non-procedure mode, newManager should use fallback concurrency when cog.yaml invalid @@ -425,7 +425,7 @@ func TestManagerConcurrency(t *testing.T) { t.Parallel() cfg := config.Config{MaxRunners: 4, UseProcedureMode: true} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) // Claim 2 slots @@ -446,7 +446,7 @@ func TestManagerStatusNonProcedureMode(t *testing.T) { t.Parallel() cfg := config.Config{UseProcedureMode: false} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) status := m.Status() @@ -457,7 +457,7 @@ func TestManagerStatusNonProcedureMode(t *testing.T) { t.Parallel() cfg := config.Config{UseProcedureMode: false} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) runner := &Runner{ runnerCtx: RunnerContext{id: DefaultRunnerName}, @@ -480,7 +480,7 @@ func TestManagerStatusProcedureMode(t *testing.T) { UseProcedureMode: true, MaxRunners: 2, } - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) status := m.Status() @@ -494,7 +494,7 @@ func TestManagerStatusProcedureMode(t *testing.T) { UseProcedureMode: true, MaxRunners: 1, } - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := NewManager(t.Context(), cfg, logger) // Claim the only slot @@ -513,7 +513,7 @@ func TestManagerSetupResult(t *testing.T) { t.Parallel() cfg := config.Config{UseProcedureMode: true} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := NewManager(t.Context(), cfg, logger) result := m.SetupResult() @@ -531,7 +531,7 @@ func TestManagerPredictionHandling(t *testing.T) { cfg := config.Config{ WorkingDirectory: tempDir, } - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) ctx, cancel := context.WithCancel(context.Background()) @@ -541,7 +541,7 @@ func TestManagerPredictionHandling(t *testing.T) { runnerCtx: RunnerContext{id: "test-runner", workingdir: tempDir}, pending: make(map[string]*PendingPrediction), cleanupSlot: make(chan struct{}, 1), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } m.runners[0] = runner @@ -555,7 +555,7 @@ func TestManagerPredictionHandling(t *testing.T) { t.Parallel() cfg := config.Config{} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) runner := &Runner{ @@ -571,7 +571,7 @@ func TestManagerPredictionHandling(t *testing.T) { t.Parallel() cfg := config.Config{MaxRunners: 1} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) // Claim the only slot @@ -592,7 +592,7 @@ func TestManagerRunnerIPC(t *testing.T) { t.Parallel() cfg := config.Config{} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) tempDir := t.TempDir() @@ -604,7 +604,7 @@ func TestManagerRunnerIPC(t *testing.T) { status: StatusStarting, pending: make(map[string]*PendingPrediction), cleanupSlot: make(chan struct{}, 1), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), setupComplete: make(chan struct{}), } m.runners[0] = runner @@ -618,7 +618,7 @@ func TestManagerRunnerIPC(t *testing.T) { t.Parallel() cfg := config.Config{} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) err := m.HandleRunnerIPC("nonexistent", "READY") @@ -637,7 +637,7 @@ func TestManagerStop(t *testing.T) { UseProcedureMode: true, MaxRunners: 4, } - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) runner1 := &Runner{ @@ -646,7 +646,7 @@ func TestManagerStop(t *testing.T) { pending: make(map[string]*PendingPrediction), killFn: func(pid int) error { return nil }, stopped: make(chan bool), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } runner2 := &Runner{ runnerCtx: RunnerContext{id: "runner2"}, @@ -654,7 +654,7 @@ func TestManagerStop(t *testing.T) { pending: make(map[string]*PendingPrediction), killFn: func(pid int) error { return nil }, stopped: make(chan bool), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } m.runners[0] = runner1 @@ -675,7 +675,7 @@ func TestManagerStop(t *testing.T) { t.Parallel() cfg := config.Config{} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) err1 := m.Stop() @@ -695,7 +695,7 @@ func TestManagerSchema(t *testing.T) { t.Parallel() cfg := config.Config{UseProcedureMode: true} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) schema, ok := m.Schema() @@ -707,7 +707,7 @@ func TestManagerSchema(t *testing.T) { t.Parallel() cfg := config.Config{UseProcedureMode: false} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) schema, ok := m.Schema() @@ -719,7 +719,7 @@ func TestManagerSchema(t *testing.T) { t.Parallel() cfg := config.Config{UseProcedureMode: false} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) runner := &Runner{ @@ -737,7 +737,7 @@ func TestManagerSchema(t *testing.T) { t.Parallel() cfg := config.Config{UseProcedureMode: false} - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) m := newManager(t.Context(), cfg, logger) runner := &Runner{ diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 511e6a0d..df785405 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -25,6 +25,7 @@ import ( "github.com/replicate/go/httpclient" "github.com/replicate/cog-runtime/internal/config" + "github.com/replicate/cog-runtime/internal/logging" "github.com/replicate/cog-runtime/internal/version" "github.com/replicate/cog-runtime/internal/webhook" ) @@ -53,7 +54,7 @@ func (r *Runner) watchPredictionResponses(ctx context.Context, predictionID stri for { select { case <-ctx.Done(): - log.Debugw("response watcher canceled", "prediction_id", predictionID) + log.Tracew("response watcher canceled", "prediction_id", predictionID) return // TODO: Add inotify case when implemented @@ -67,7 +68,7 @@ func (r *Runner) watchPredictionResponses(ctx context.Context, predictionID stri // Drain IPC OUTPUT notifications - when inotify available, we blackhole these // When inotify unavailable, this triggers immediate processing // TODO: Only process if inotify unavailable - log.Debugw("received OUTPUT IPC notification", "prediction_id", predictionID) + log.Tracew("received OUTPUT IPC notification", "prediction_id", predictionID) pollTimer.Reset(100 * time.Millisecond) // Reset polling timer since we got an event if err := r.processResponseFiles(predictionID, pending, responsePattern, log); err != nil { log.Errorw("failed to process response files from IPC", "prediction_id", predictionID, "error", err) @@ -85,14 +86,14 @@ func (r *Runner) watchPredictionResponses(ctx context.Context, predictionID stri completed := pending.response.Status.IsCompleted() pending.mu.Unlock() if completed { - log.Debugw("prediction completed, watcher exiting", "prediction_id", predictionID) + log.Tracew("prediction completed, watcher exiting", "prediction_id", predictionID) return } } } // processResponseFiles handles response file processing for a specific prediction -func (r *Runner) processResponseFiles(predictionID string, pending *PendingPrediction, responsePattern string, log *zap.SugaredLogger) error { +func (r *Runner) processResponseFiles(predictionID string, pending *PendingPrediction, responsePattern string, log *logging.SugaredLogger) error { entries, err := os.ReadDir(r.runnerCtx.workingdir) if err != nil { return fmt.Errorf("failed to read directory: %w", err) @@ -118,7 +119,7 @@ func (r *Runner) processResponseFiles(predictionID string, pending *PendingPredi } // handleSingleResponse processes a single response file for a prediction -func (r *Runner) handleSingleResponse(filename, predictionID string, pending *PendingPrediction, log *zap.SugaredLogger) error { +func (r *Runner) handleSingleResponse(filename, predictionID string, pending *PendingPrediction, log *logging.SugaredLogger) error { filePath := path.Join(r.runnerCtx.workingdir, filename) // Read and parse response file @@ -162,7 +163,7 @@ func (r *Runner) handleSingleResponse(filename, predictionID string, pending *Pe } // processResponseOutput handles output path processing for a response -func (r *Runner) processResponseOutput(response *PredictionResponse, pending *PendingPrediction, log *zap.SugaredLogger) error { +func (r *Runner) processResponseOutput(response *PredictionResponse, pending *PendingPrediction, log *logging.SugaredLogger) error { if response.Output == nil { return nil } @@ -213,7 +214,7 @@ func (r *Runner) processResponseOutput(response *PredictionResponse, pending *Pe } // handleResponseWebhooksAndCompletion sends webhooks and handles prediction completion -func (r *Runner) handleResponseWebhooksAndCompletion(response *PredictionResponse, predictionID string, pending *PendingPrediction, log *zap.SugaredLogger) { +func (r *Runner) handleResponseWebhooksAndCompletion(response *PredictionResponse, predictionID string, pending *PendingPrediction, log *logging.SugaredLogger) { // Update pending prediction's response data, preserving accumulated logs pending.mu.Lock() existingLogs := pending.response.Logs @@ -227,7 +228,7 @@ func (r *Runner) handleResponseWebhooksAndCompletion(response *PredictionRespons // Send webhooks based on prediction status switch response.Status { case PredictionStarting: - log.Infow("prediction started", "id", response.ID, "status", response.Status) + log.Debugw("prediction started", "id", response.ID, "status", response.Status) // Send start webhook async (intermediary) go func() { _ = pending.sendWebhook(webhook.EventStart) }() @@ -238,7 +239,7 @@ func (r *Runner) handleResponseWebhooksAndCompletion(response *PredictionRespons pending.mu.Unlock() case PredictionProcessing: - log.Infow("prediction processing", "id", response.ID, "status", response.Status) + log.Debugw("prediction processing", "id", response.ID, "status", response.Status) // Send output/logs webhook async (intermediary) if response.Output != nil { go func() { _ = pending.sendWebhook(webhook.EventOutput) }() @@ -273,7 +274,7 @@ func (r *Runner) handleResponseWebhooksAndCompletion(response *PredictionRespons } // Watcher exits - manager defer will handle webhook and cleanup - log.Debugw("prediction completed, watcher exiting", "prediction_id", predictionID) + log.Tracew("prediction completed, watcher exiting", "prediction_id", predictionID) return } } @@ -316,7 +317,7 @@ type Runner struct { cleanupTimeout time.Duration forceShutdown *config.ForceShutdownSignal - logger *zap.Logger + logger *logging.Logger } func (r *Runner) String() string { @@ -355,7 +356,7 @@ func (r *Runner) WaitForStop() { func (r *Runner) GracefulShutdown() { log := r.logger.Sugar() if !r.shutdownWhenIdle.CompareAndSwap(false, true) { - log.Debugw("graceful shutdown already initiated", "runner_id", r.runnerCtx.id) + log.Tracew("graceful shutdown already initiated", "runner_id", r.runnerCtx.id) return } @@ -363,7 +364,7 @@ func (r *Runner) GracefulShutdown() { shouldSignal := (r.status == StatusReady && len(r.pending) == 0) r.mu.RUnlock() - log.Debugw("graceful shutdown initiated", "runner_id", r.runnerCtx.id, "status", r.status, "pending_count", len(r.pending), "should_signal", shouldSignal) + log.Tracew("graceful shutdown initiated", "runner_id", r.runnerCtx.id, "status", r.status, "pending_count", len(r.pending), "should_signal", shouldSignal) if shouldSignal { if r.readyForShutdown == nil { @@ -371,9 +372,9 @@ func (r *Runner) GracefulShutdown() { } else { select { case <-r.readyForShutdown: - log.Debugw("readyForShutdown already closed", "runner_id", r.runnerCtx.id) + log.Tracew("readyForShutdown already closed", "runner_id", r.runnerCtx.id) default: - log.Debugw("closing readyForShutdown channel", "runner_id", r.runnerCtx.id) + log.Tracew("closing readyForShutdown channel", "runner_id", r.runnerCtx.id) close(r.readyForShutdown) } } @@ -406,7 +407,7 @@ func (r *Runner) Start(ctx context.Context) error { return fmt.Errorf("failed to start subprocess: %w", err) } - log.Debugw("runner process started successfully", "pid", cmd.Process.Pid) + log.Tracew("runner process started successfully", "pid", cmd.Process.Pid) return nil } @@ -436,7 +437,7 @@ func (r *Runner) setupLogCapture() error { line := scanner.Text() r.logStdout(line) } - r.logger.Debug("finished stdout log capture") + r.logger.Trace("finished stdout log capture") }) wg.Go(func() { @@ -445,7 +446,7 @@ func (r *Runner) setupLogCapture() error { line := scanner.Text() r.logStderr(line) } - r.logger.Debug("finished stderr log capture") + r.logger.Trace("finished stderr log capture") }) // Signal when both pipes are closed (with double-close protection) @@ -726,9 +727,9 @@ func (r *Runner) ForceKill() { select { case <-r.cleanupSlot: gotToken = true - log.Infow("acquired cleanup token for force kill", "pid", pid) + log.Tracew("acquired cleanup token for force kill", "pid", pid) default: - log.Infow("cleanup already in progress, skipping force kill", "pid", pid) + log.Tracew("cleanup already in progress, skipping force kill", "pid", pid) return } @@ -791,7 +792,7 @@ func (r *Runner) predict(req PredictionRequest) (chan PredictionResponse, error) r.mu.Lock() defer r.mu.Unlock() - log.Debugw("runner.predict called", "prediction_id", req.ID, "status", r.status) + log.Tracew("runner.predict called", "prediction_id", req.ID, "status", r.status) // Prediction must be pre-allocated by manager pending, exists := r.pending[req.ID] @@ -799,7 +800,7 @@ func (r *Runner) predict(req PredictionRequest) (chan PredictionResponse, error) return nil, fmt.Errorf("prediction %s not allocated", req.ID) } - log.Debugw("prediction found in pending", "prediction_id", req.ID) + log.Tracew("prediction found in pending", "prediction_id", req.ID) // Process input paths (base64 and URL inputs) inputPaths := make([]string, 0) @@ -828,13 +829,13 @@ func (r *Runner) predict(req PredictionRequest) (chan PredictionResponse, error) return nil, fmt.Errorf("failed to write request file: %w", err) } - log.Debugw("wrote prediction request file", "prediction_id", req.ID, "path", requestPath, "working_dir", r.runnerCtx.workingdir, "request_data", string(requestData)) + log.Tracew("wrote prediction request file", "prediction_id", req.ID, "path", requestPath, "working_dir", r.runnerCtx.workingdir, "request_data", string(requestData)) // Debug: Check if file actually exists and list directory contents if _, err := os.Stat(requestPath); err != nil { - log.Debugw("ERROR: written request file does not exist", "prediction_id", req.ID, "path", requestPath, "error", err) + log.Tracew("ERROR: written request file does not exist", "prediction_id", req.ID, "path", requestPath, "error", err) } else { - log.Debugw("confirmed request file exists", "prediction_id", req.ID, "path", requestPath) + log.Tracew("confirmed request file exists", "prediction_id", req.ID, "path", requestPath) } // Debug: List all files in working directory @@ -843,13 +844,13 @@ func (r *Runner) predict(req PredictionRequest) (chan PredictionResponse, error) for i, entry := range entries { fileNames[i] = entry.Name() } - log.Debugw("working directory contents after write", "prediction_id", req.ID, "working_dir", r.runnerCtx.workingdir, "files", fileNames) + log.Tracew("working directory contents after write", "prediction_id", req.ID, "working_dir", r.runnerCtx.workingdir, "files", fileNames) } // Update pending prediction with request details pending.request = req - log.Debugw("returning prediction channel", "prediction_id", req.ID) + log.Tracew("returning prediction channel", "prediction_id", req.ID) return pending.c, nil } @@ -969,21 +970,21 @@ func (r *Runner) updateSetupResult() { } setupResultPath := filepath.Join(r.runnerCtx.workingdir, "setup_result.json") - log.Debug("reading setup_result.json", "path", setupResultPath) + log.Trace("reading setup_result.json", "path", setupResultPath) // Try to read additional setup result data from file var setupResultFromFile SetupResult if err := r.readJSON(setupResultPath, &setupResultFromFile); err != nil { - log.Debugw("failed to read setup_result.json, assuming success", "error", err) + log.Tracew("failed to read setup_result.json, assuming success", "error", err) // If setup_result.json doesn't exist, assume setup succeeded and status is ready r.setupResult.Status = SetupSucceeded r.setupResult.Schema = "" // Will be populated by updateSchema if available r.status = StatusReady - log.Debugw("setup result not found, assuming success", "status", r.status.String()) + log.Tracew("setup result not found, assuming success", "status", r.status.String()) return } - log.Debugw("successfully read setup_result.json", "status", setupResultFromFile.Status, "schema_length", len(setupResultFromFile.Schema)) + log.Tracew("successfully read setup_result.json", "status", setupResultFromFile.Status, "schema_length", len(setupResultFromFile.Schema)) // Update setup result with data from file, preserving logs that were already set r.setupResult.Status = setupResultFromFile.Status @@ -1036,7 +1037,7 @@ func verifyProcessGroupTerminated(pid int) error { } // NewRunner creates a new runner instance with the given context -func NewRunner(ctx context.Context, ctxCancel context.CancelFunc, runnerCtx RunnerContext, command *exec.Cmd, maxConcurrency int, cfg config.Config, logger *zap.Logger) (*Runner, error) { +func NewRunner(ctx context.Context, ctxCancel context.CancelFunc, runnerCtx RunnerContext, command *exec.Cmd, maxConcurrency int, cfg config.Config, logger *logging.Logger) (*Runner, error) { if maxConcurrency <= 0 { maxConcurrency = 1 } diff --git a/internal/runner/runner_test.go b/internal/runner/runner_test.go index d4586fcc..d30783e0 100644 --- a/internal/runner/runner_test.go +++ b/internal/runner/runner_test.go @@ -15,9 +15,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" "github.com/replicate/cog-runtime/internal/config" + "github.com/replicate/cog-runtime/internal/loggingtest" ) func TestRunnerCapacity(t *testing.T) { @@ -56,7 +56,7 @@ func TestRunnerCapacity(t *testing.T) { r := &Runner{ maxConcurrency: tt.maxConcurrency, pending: make(map[string]*PendingPrediction), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } // Add pending predictions @@ -96,7 +96,7 @@ func TestRunnerIdle(t *testing.T) { r := &Runner{ pending: make(map[string]*PendingPrediction), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } for i := 0; i < tt.pendingCount; i++ { @@ -123,7 +123,7 @@ func TestRunnerStart(t *testing.T) { Args: []string{"sleep", "0.1"}, Dir: tempDir, }, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), logCaptureComplete: make(chan struct{}), } @@ -143,7 +143,7 @@ func TestRunnerStart(t *testing.T) { r := &Runner{ status: StatusReady, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), logCaptureComplete: make(chan struct{}), } @@ -162,7 +162,7 @@ func TestRunnerStart(t *testing.T) { Path: "/nonexistent/command", Args: []string{"nonexistent"}, }, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), logCaptureComplete: make(chan struct{}), } @@ -178,7 +178,7 @@ func TestRunnerStart(t *testing.T) { r := &Runner{ status: StatusStarting, cmd: nil, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), logCaptureComplete: make(chan struct{}), } @@ -213,7 +213,7 @@ func TestRunnerConfig(t *testing.T) { workingdir: tempDir, }, mu: sync.RWMutex{}, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), logCaptureComplete: make(chan struct{}), } @@ -237,7 +237,7 @@ func TestRunnerConfig(t *testing.T) { workingdir: tempDir, }, mu: sync.RWMutex{}, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), logCaptureComplete: make(chan struct{}), } @@ -266,7 +266,7 @@ func TestRunnerConfig(t *testing.T) { workingdir: tempDir, }, mu: sync.RWMutex{}, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), logCaptureComplete: make(chan struct{}), } @@ -282,7 +282,7 @@ func TestRunnerConfig(t *testing.T) { r := &Runner{ status: StatusStarting, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), logCaptureComplete: make(chan struct{}), } @@ -306,7 +306,7 @@ func TestRunnerStop(t *testing.T) { pending: make(map[string]*PendingPrediction), killFn: func(pid int) error { return nil }, stopped: make(chan bool), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } // Add pending predictions @@ -335,7 +335,7 @@ func TestRunnerStop(t *testing.T) { r := &Runner{ status: StatusDefunct, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), logCaptureComplete: make(chan struct{}), } @@ -357,7 +357,7 @@ func TestRunnerStop(t *testing.T) { return nil }, stopped: make(chan bool), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } // Mock a running process @@ -384,7 +384,7 @@ func TestRunnerForceKill(t *testing.T) { killCalled = true return nil }, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } // Mock a running process @@ -407,7 +407,7 @@ func TestRunnerForceKill(t *testing.T) { killCallCount++ return nil }, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } r.cmd.Process = &os.Process{Pid: 12345} @@ -424,7 +424,7 @@ func TestRunnerForceKill(t *testing.T) { killCallCount++ return nil }, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } r.ForceKill() @@ -442,7 +442,7 @@ func TestRunnerForceKill(t *testing.T) { return nil }, cleanupSlot: make(chan struct{}, 1), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } r.cleanupSlot <- struct{}{} // Initialize with token r.cmd = exec.Command("echo", "test") @@ -469,7 +469,7 @@ func TestRunnerForceKill(t *testing.T) { killCalled = true return nil }, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } r.ForceKill() @@ -490,7 +490,7 @@ func TestRunnerPredict(t *testing.T) { status: StatusReady, pending: make(map[string]*PendingPrediction), runnerCtx: RunnerContext{workingdir: tempDir}, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } // Pre-allocate prediction @@ -519,7 +519,7 @@ func TestRunnerPredict(t *testing.T) { r := &Runner{ status: StatusReady, pending: make(map[string]*PendingPrediction), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } req := PredictionRequest{ID: "test-id"} @@ -539,7 +539,7 @@ func TestRunnerCancel(t *testing.T) { r := &Runner{ pending: make(map[string]*PendingPrediction), runnerCtx: RunnerContext{workingdir: tempDir, id: "test-runner", tmpDir: tempDir}, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } r.pending["test-id"] = &PendingPrediction{} @@ -558,7 +558,7 @@ func TestRunnerCancel(t *testing.T) { r := &Runner{ pending: make(map[string]*PendingPrediction), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } err := r.Cancel("nonexistent") @@ -576,7 +576,7 @@ func TestRunnerString(t *testing.T) { r := &Runner{ runnerCtx: RunnerContext{id: "test-runner"}, status: StatusReady, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } got := r.String() @@ -593,7 +593,7 @@ func TestRunnerIPC(t *testing.T) { r := &Runner{ status: StatusStarting, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), runnerCtx: RunnerContext{ id: "test-runner", workingdir: t.TempDir(), @@ -611,7 +611,7 @@ func TestRunnerIPC(t *testing.T) { r := &Runner{ status: StatusReady, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } err := r.HandleIPC("READY") @@ -624,7 +624,7 @@ func TestRunnerIPC(t *testing.T) { r := &Runner{ status: StatusReady, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } err := r.HandleIPC("BUSY") @@ -639,7 +639,7 @@ func TestRunnerIPC(t *testing.T) { r := &Runner{ runnerCtx: RunnerContext{workingdir: tempDir}, pending: make(map[string]*PendingPrediction), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } err := r.HandleIPC("OUTPUT") @@ -650,7 +650,7 @@ func TestRunnerIPC(t *testing.T) { t.Parallel() r := &Runner{ - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } err := r.HandleIPC("UNKNOWN") @@ -721,7 +721,7 @@ func TestWaitForStop(t *testing.T) { r := &Runner{ stopped: make(chan bool), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } close(r.stopped) @@ -743,7 +743,7 @@ func TestWaitForStop(t *testing.T) { r := &Runner{ stopped: make(chan bool), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } done := make(chan struct{}) @@ -790,7 +790,7 @@ func TestNewRunner(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) defer cancel() cfg := config.Config{} - r, err := NewRunner(ctx, cancel, runnerCtx, cmd, 1, cfg, zaptest.NewLogger(t)) + r, err := NewRunner(ctx, cancel, runnerCtx, cmd, 1, cfg, loggingtest.NewTestLogger(t)) require.NoError(t, err) assert.Equal(t, "test-runner", r.runnerCtx.id) @@ -827,7 +827,7 @@ func TestNewRunner(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) defer cancel() cfg := config.Config{} - r, err := NewRunner(ctx, cancel, runnerCtx, cmd, 1, cfg, zaptest.NewLogger(t)) + r, err := NewRunner(ctx, cancel, runnerCtx, cmd, 1, cfg, loggingtest.NewTestLogger(t)) require.NoError(t, err) // Should store the command correctly @@ -852,7 +852,7 @@ func TestNewRunner(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) defer cancel() cfg := config.Config{} - r, err := NewRunner(ctx, cancel, runnerCtx, cmd, 1, cfg, zaptest.NewLogger(t)) + r, err := NewRunner(ctx, cancel, runnerCtx, cmd, 1, cfg, loggingtest.NewTestLogger(t)) require.NoError(t, err) require.NotNil(t, r) @@ -891,7 +891,7 @@ func TestProcedureRunnerCreation(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) defer cancel() cfg := config.Config{} - r, err := NewRunner(ctx, cancel, runnerCtx, cmd, 1, cfg, zaptest.NewLogger(t)) + r, err := NewRunner(ctx, cancel, runnerCtx, cmd, 1, cfg, loggingtest.NewTestLogger(t)) require.NoError(t, err) assert.Equal(t, "proc-runner", r.runnerCtx.id) @@ -995,7 +995,7 @@ func TestRunnerTempDirectoryCleanup(t *testing.T) { pending: make(map[string]*PendingPrediction), killFn: func(pid int) error { return nil }, stopped: make(chan bool), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } tmpDir, err := os.MkdirTemp("", "test-cog-runner-tmp-") @@ -1034,7 +1034,7 @@ func TestRunnerTempDirectoryCleanup(t *testing.T) { pending: make(map[string]*PendingPrediction), killFn: func(pid int) error { return nil }, stopped: make(chan bool), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } err := r.Stop() @@ -1074,7 +1074,7 @@ func TestRunnerConfigCreatesConfigJSON(t *testing.T) { pending: make(map[string]*PendingPrediction), killFn: func(pid int) error { return nil }, stopped: make(chan bool), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } // Call Config method @@ -1133,7 +1133,7 @@ func TestPerPredictionWatcher(t *testing.T) { require.NoError(t, err) // Setup runner with mock working directory - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) runner := &Runner{ runnerCtx: RunnerContext{workingdir: tempDir}, logger: logger, @@ -1178,7 +1178,7 @@ func TestPerPredictionWatcher(t *testing.T) { require.NoError(t, err) // Setup runner - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) runner := &Runner{ runnerCtx: RunnerContext{workingdir: tempDir}, logger: logger, @@ -1219,7 +1219,7 @@ func TestPerPredictionWatcher(t *testing.T) { predictionID := "test-prediction-789" // Setup runner - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) runner := &Runner{ runnerCtx: RunnerContext{workingdir: tempDir}, logger: logger, @@ -1286,7 +1286,7 @@ func TestPerPredictionWatcher(t *testing.T) { predictionID := "test-prediction-abc" // Setup runner - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) runner := &Runner{ runnerCtx: RunnerContext{workingdir: tempDir}, logger: logger, @@ -1385,7 +1385,7 @@ func TestForceKillCleanupFailures(t *testing.T) { }, status: StatusReady, forceShutdown: nil, // Non-procedure mode - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } r.ForceKill() @@ -1411,7 +1411,7 @@ func TestForceKillCleanupFailures(t *testing.T) { status: StatusReady, cleanupSlot: make(chan struct{}, 1), forceShutdown: forceShutdown, - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } // Initialize cleanup slot with token @@ -1434,7 +1434,7 @@ func TestForceKillCleanupFailures(t *testing.T) { forceShutdown: forceShutdown, cleanupSlot: make(chan struct{}, 1), stopped: make(chan bool), - logger: zaptest.NewLogger(t), + logger: loggingtest.NewTestLogger(t), } // Start verification process diff --git a/internal/server/server.go b/internal/server/server.go index a7fe6f96..3a83e0cd 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -14,12 +14,11 @@ import ( "sync/atomic" "time" - "go.uber.org/zap" - "github.com/replicate/go/httpclient" "github.com/replicate/go/uuid" "github.com/replicate/cog-runtime/internal/config" + "github.com/replicate/cog-runtime/internal/logging" "github.com/replicate/cog-runtime/internal/runner" ) @@ -50,10 +49,10 @@ type Handler struct { cwd string - logger *zap.Logger + logger *logging.Logger } -func NewHandler(ctx context.Context, cfg config.Config, baseLogger *zap.Logger) (*Handler, error) { +func NewHandler(ctx context.Context, cfg config.Config, baseLogger *logging.Logger) (*Handler, error) { runnerManager := runner.NewManager(ctx, cfg, baseLogger) h := &Handler{ @@ -160,7 +159,7 @@ func (h *Handler) HandleIPC(w http.ResponseWriter, r *http.Request) { log := h.logger.Sugar() // Debug: Log all incoming IPC requests - log.Debugw("received IPC request", + log.Tracew("received IPC request", "method", r.Method, "url", r.URL.String(), "remote_addr", r.RemoteAddr, @@ -186,7 +185,7 @@ func (h *Handler) HandleIPC(w http.ResponseWriter, r *http.Request) { name = ipc.Name } - log.Debugw("handling IPC for runner", "target_runner", name, "procedure_mode", h.cfg.UseProcedureMode, "status", ipc.Status, "pid", ipc.Pid, "name", ipc.Name) + log.Tracew("handling IPC for runner", "target_runner", name, "procedure_mode", h.cfg.UseProcedureMode, "status", ipc.Status, "pid", ipc.Pid, "name", ipc.Name) if err := h.runnerManager.HandleRunnerIPC(name, string(ipc.Status)); err != nil { if !errors.Is(err, runner.ErrRunnerNotFound) { @@ -290,7 +289,7 @@ func (h *Handler) Predict(w http.ResponseWriter, r *http.Request) { } log.Infow("running prediction", "id", req.ID, "webhook", req.Webhook, "procedure_mode", h.cfg.UseProcedureMode) - log.Debugw("procedure mode prediction request", "id", req.ID, "webhook", req.Webhook, "procedure_source_url", req.ProcedureSourceURL) + log.Tracew("procedure mode prediction request", "id", req.ID, "webhook", req.Webhook, "procedure_source_url", req.ProcedureSourceURL) var runnerResult *runner.PredictionResponse if req.Webhook != "" { @@ -306,11 +305,10 @@ func (h *Handler) Predict(w http.ResponseWriter, r *http.Request) { // Convert runner response to server response format c = make(chan PredictionResponse, 1) var logsStr string - log.Debugw("runner result received", "id", runnerResult.ID, "logs_count", len(runnerResult.Logs)) + log.Tracew("runner result received", "id", runnerResult.ID, "logs_count", len(runnerResult.Logs)) if len(runnerResult.Logs) > 0 { - log.Debugw("joining logs", "logs", runnerResult.Logs) + log.Tracew("joining logs", "logs", runnerResult.Logs) logsStr = runnerResult.Logs.String() - log.Debugw("joined logs result", "logs_str", logsStr) } var metrics map[string]any if runnerResult.Metrics != nil { diff --git a/internal/service/service.go b/internal/service/service.go index 241d0bf5..810b2666 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -12,10 +12,10 @@ import ( "syscall" "time" - "go.uber.org/zap" "golang.org/x/sync/errgroup" "github.com/replicate/cog-runtime/internal/config" + "github.com/replicate/cog-runtime/internal/logging" "github.com/replicate/cog-runtime/internal/server" ) @@ -44,7 +44,7 @@ type Service struct { handler *server.Handler forceShutdown *config.ForceShutdownSignal - logger *zap.Logger + logger *logging.Logger } type ServiceOption interface { @@ -73,7 +73,7 @@ var ( ) // New creates a new Service with the given configuration -func New(cfg config.Config, baseLogger *zap.Logger, opts ...ServiceOption) *Service { +func New(cfg config.Config, baseLogger *logging.Logger, opts ...ServiceOption) *Service { svc := &Service{ cfg: cfg, started: make(chan struct{}), @@ -113,7 +113,7 @@ func (s *Service) initializeHandler(ctx context.Context) error { } log := s.logger.Sugar() - log.Info("initializing handler") + log.Debug("initializing handler") h, err := server.NewHandler(ctx, s.cfg, s.logger) if err != nil { @@ -131,7 +131,7 @@ func (s *Service) initializeHTTPServer(ctx context.Context) error { } log := s.logger.Sugar() - log.Info("initializing HTTP server") + log.Debug("initializing HTTP server") mux := server.NewServeMux(s.handler, s.cfg.UseProcedureMode) s.httpServer = &http.Server{ @@ -187,7 +187,7 @@ func (s *Service) Run(ctx context.Context) error { // Signal runners to shutdown gracefully and wait for them if s.handler != nil { - log.Info("stopping runners gracefully") + log.Tracew("stopping runners gracefully") if err := s.handler.Stop(); err != nil { log.Errorw("error stopping handler", "error", err) } @@ -213,7 +213,7 @@ func (s *Service) Run(ctx context.Context) error { case <-egCtx.Done(): // Only force immediate shutdown if graceful shutdown hasn't started if s.shutdownStarted.CompareAndSwap(false, true) { - log.Info("context canceled, forcing immediate shutdown") + log.Trace("context canceled, forcing immediate shutdown") close(s.shutdown) // Context canceled = immediate hard shutdown, no grace period if err := s.httpServer.Close(); err != nil { @@ -233,7 +233,7 @@ func (s *Service) Run(ctx context.Context) error { // Monitor for forced shutdown from cleanup failures eg.Go(func() error { - defer log.Debug("force shutdown goroutine exiting") + defer log.Trace("force shutdown goroutine exiting") select { case <-s.forceShutdown.WatchForForceShutdown(): log.Errorw("process cleanup failed, forcing immediate exit") @@ -249,7 +249,7 @@ func (s *Service) Run(ctx context.Context) error { close(s.started) - log.Debug("waiting for all service goroutines to complete") + log.Trace("waiting for all service goroutines to complete") err := eg.Wait() log.Debug("all service goroutines completed") @@ -271,7 +271,7 @@ func (s *Service) Shutdown() { // Use atomic CAS to ensure only one shutdown if !s.shutdownStarted.CompareAndSwap(false, true) { - log.Debug("already shutting down") + log.Trace("already shutting down") return } @@ -285,7 +285,7 @@ func (s *Service) stop(ctx context.Context) { select { case <-s.stopped: - log.Debug("service already stopped") + log.Trace("service already stopped") default: close(s.stopped) } diff --git a/internal/tests/harness_test.go b/internal/tests/harness_test.go index 3e843ddc..dd105707 100644 --- a/internal/tests/harness_test.go +++ b/internal/tests/harness_test.go @@ -27,9 +27,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" "github.com/replicate/cog-runtime/internal/config" + "github.com/replicate/cog-runtime/internal/loggingtest" "github.com/replicate/cog-runtime/internal/runner" "github.com/replicate/cog-runtime/internal/server" "github.com/replicate/cog-runtime/internal/service" @@ -335,7 +335,7 @@ func setupCogRuntimeServer(t *testing.T, cfg cogRuntimeServerConfig) (*httptest. } // logger := NewTestLogger(t, "harness-test") - logger := zaptest.NewLogger(t).Named("harness-test") + logger := loggingtest.NewTestLogger(t).Named("harness-test") // Create handler with service shutdown function instead of test context cancel handler, err := server.NewHandler(t.Context(), serverCfg, logger) diff --git a/internal/webhook/webhook.go b/internal/webhook/webhook.go index d114853a..8b9e37b6 100644 --- a/internal/webhook/webhook.go +++ b/internal/webhook/webhook.go @@ -8,7 +8,8 @@ import ( "time" "github.com/replicate/go/httpclient" - "go.uber.org/zap" + + "github.com/replicate/cog-runtime/internal/logging" ) // Event represents a webhook event - using string to be compatible with any type @@ -32,12 +33,12 @@ var _ Sender = (*DefaultSender)(nil) // DefaultSender handles webhook delivery type DefaultSender struct { - logger *zap.Logger + logger *logging.Logger client *http.Client } // NewSender creates a new webhook sender -func NewSender(logger *zap.Logger) *DefaultSender { +func NewSender(logger *logging.Logger) *DefaultSender { return &DefaultSender{ logger: logger.Named("webhook"), client: httpclient.ApplyRetryPolicy(http.DefaultClient), @@ -76,14 +77,14 @@ func (s *DefaultSender) SendConditional(url string, payload io.Reader, event Eve // Check event filter if len(allowedEvents) > 0 && !slices.Contains(allowedEvents, event) { - log.Debugw("skipping webhook due to event filter", "url", url, "event", string(event), "allowed_events", allowedEvents) + log.Tracew("skipping webhook due to event filter", "url", url, "event", string(event), "allowed_events", allowedEvents) return nil } // Rate limiting for logs and output events if event == EventLogs || event == EventOutput { if lastUpdated != nil && time.Since(*lastUpdated) < 500*time.Millisecond { - log.Debugw("skipping webhook due to rate limiting", "url", url, "event", string(event), "last_updated", lastUpdated) + log.Tracew("skipping webhook due to rate limiting", "url", url, "event", string(event), "last_updated", lastUpdated) return nil } if lastUpdated != nil { diff --git a/internal/webhook/webhook_test.go b/internal/webhook/webhook_test.go index 7d30f775..b67289e8 100644 --- a/internal/webhook/webhook_test.go +++ b/internal/webhook/webhook_test.go @@ -11,7 +11,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" + + "github.com/replicate/cog-runtime/internal/loggingtest" ) // Helper function to create a JSON reader from any payload @@ -25,7 +26,7 @@ func jsonReader(t *testing.T, payload any) io.Reader { func TestNewSender(t *testing.T) { t.Parallel() - logger := zaptest.NewLogger(t) + logger := loggingtest.NewTestLogger(t) sender := NewSender(logger) require.NotNil(t, sender) @@ -63,7 +64,7 @@ func TestSenderSend(t *testing.T) { payloadBytes, err := json.Marshal(payload) require.NoError(t, err) - sender := NewSender(zaptest.NewLogger(t)) + sender := NewSender(loggingtest.NewTestLogger(t)) err = sender.Send(server.URL, bytes.NewReader(payloadBytes)) require.NoError(t, err) @@ -80,7 +81,7 @@ func TestSenderSend(t *testing.T) { // Use a sender with regular HTTP client for faster test sender := &DefaultSender{ - logger: zaptest.NewLogger(t).Named("webhook"), + logger: loggingtest.NewTestLogger(t).Named("webhook"), client: &http.Client{}, } err := sender.Send(server.URL, jsonReader(t, map[string]string{"test": "data"})) @@ -94,7 +95,7 @@ func TestSenderSend(t *testing.T) { // Use a sender with regular HTTP client for faster test sender := &DefaultSender{ - logger: zaptest.NewLogger(t).Named("webhook"), + logger: loggingtest.NewTestLogger(t).Named("webhook"), client: &http.Client{}, } payload := map[string]string{"test": "data"} @@ -107,7 +108,7 @@ func TestSenderSend(t *testing.T) { t.Run("handles invalid URLs", func(t *testing.T) { t.Parallel() - sender := NewSender(zaptest.NewLogger(t)) + sender := NewSender(loggingtest.NewTestLogger(t)) err := sender.Send(":", jsonReader(t, map[string]string{"test": "data"})) require.Error(t, err) @@ -128,7 +129,7 @@ func TestSenderSendConditional(t *testing.T) { })) defer server.Close() - sender := NewSender(zaptest.NewLogger(t)) + sender := NewSender(loggingtest.NewTestLogger(t)) err := sender.SendConditional(server.URL, jsonReader(t, map[string]string{"test": "data"}), EventStart, nil, nil) require.NoError(t, err) @@ -145,7 +146,7 @@ func TestSenderSendConditional(t *testing.T) { })) defer server.Close() - sender := NewSender(zaptest.NewLogger(t)) + sender := NewSender(loggingtest.NewTestLogger(t)) allowedEvents := []Event{EventStart, EventCompleted} err := sender.SendConditional(server.URL, jsonReader(t, map[string]string{"test": "data"}), EventStart, allowedEvents, nil) @@ -163,7 +164,7 @@ func TestSenderSendConditional(t *testing.T) { })) defer server.Close() - sender := NewSender(zaptest.NewLogger(t)) + sender := NewSender(loggingtest.NewTestLogger(t)) allowedEvents := []Event{EventStart, EventCompleted} err := sender.SendConditional(server.URL, jsonReader(t, map[string]string{"test": "data"}), EventLogs, allowedEvents, nil) @@ -174,7 +175,7 @@ func TestSenderSendConditional(t *testing.T) { t.Run("skips when URL is empty", func(t *testing.T) { t.Parallel() - sender := NewSender(zaptest.NewLogger(t)) + sender := NewSender(loggingtest.NewTestLogger(t)) err := sender.SendConditional("", jsonReader(t, map[string]string{"test": "data"}), EventStart, nil, nil) require.NoError(t, err) @@ -190,7 +191,7 @@ func TestSenderSendConditional(t *testing.T) { })) defer server.Close() - sender := NewSender(zaptest.NewLogger(t)) + sender := NewSender(loggingtest.NewTestLogger(t)) lastUpdated := time.Now().Add(-time.Second) // Start with old timestamp // First call should go through @@ -220,7 +221,7 @@ func TestSenderSendConditional(t *testing.T) { })) defer server.Close() - sender := NewSender(zaptest.NewLogger(t)) + sender := NewSender(loggingtest.NewTestLogger(t)) lastUpdated := time.Now().Add(-time.Second) // Start with old timestamp // First call should go through @@ -244,7 +245,7 @@ func TestSenderSendConditional(t *testing.T) { })) defer server.Close() - sender := NewSender(zaptest.NewLogger(t)) + sender := NewSender(loggingtest.NewTestLogger(t)) lastUpdated := time.Now() // Start event should go through @@ -268,7 +269,7 @@ func TestSenderSendConditional(t *testing.T) { })) defer server.Close() - sender := NewSender(zaptest.NewLogger(t)) + sender := NewSender(loggingtest.NewTestLogger(t)) // Should work with nil lastUpdated err := sender.SendConditional(server.URL, jsonReader(t, map[string]string{"test": "data"}), EventLogs, nil, nil)