Skip to content
This repository was archived by the owner on Apr 15, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
/.idea
/build
/dist
/internal/util/version.txt
/internal/version/version.txt
/python/cog/cog-*
/python/coglet/_version.py
/uv.lock
Expand Down
4 changes: 2 additions & 2 deletions cmd/cog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/replicate/cog-runtime/internal/config"
"github.com/replicate/cog-runtime/internal/runner"
"github.com/replicate/cog-runtime/internal/service"
"github.com/replicate/cog-runtime/internal/util"
"github.com/replicate/cog-runtime/internal/version"
)

type ServerCmd struct {
Expand Down Expand Up @@ -125,7 +125,7 @@ func (s *ServerCmd) Run() error {
}

addr := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
log.Infow("starting Cog HTTP server", "addr", addr, "version", util.Version(), "pid", os.Getpid())
log.Infow("starting Cog HTTP server", "addr", addr, "version", version.Version(), "pid", os.Getpid())

// Create service with base logger
svc := service.New(cfg, baseLogger)
Expand Down
5 changes: 2 additions & 3 deletions internal/runner/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ import (

"github.com/gabriel-vasile/mimetype"
"github.com/getkin/kin-openapi/openapi3"

"github.com/replicate/cog-runtime/internal/util"
"github.com/replicate/go/httpclient"
)

var Base64Regex = regexp.MustCompile(`^data:.*;base64,(?P<base64>.*)$`)
Expand Down Expand Up @@ -210,7 +209,7 @@ type uploader struct {
// newUploader creates a new uploader instance
func newUploader(uploadURL string) *uploader {
return &uploader{
client: util.HTTPClientWithRetry(),
client: httpclient.ApplyRetryPolicy(http.DefaultClient),
uploadURL: uploadURL,
}
}
Expand Down
67 changes: 60 additions & 7 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package runner

import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"os/exec"
"path"
Expand All @@ -20,8 +22,10 @@ import (
"github.com/getkin/kin-openapi/openapi3"
"go.uber.org/zap"

"github.com/replicate/go/httpclient"

"github.com/replicate/cog-runtime/internal/config"
"github.com/replicate/cog-runtime/internal/util"
"github.com/replicate/cog-runtime/internal/version"
"github.com/replicate/cog-runtime/internal/webhook"
)

Expand Down Expand Up @@ -293,7 +297,7 @@ type Runner struct {
schema string
doc *openapi3.T
setupResult SetupResult
logs []string
logs LogsSlice
asyncPredict bool
maxConcurrency int
pending map[string]*PendingPrediction
Expand Down Expand Up @@ -464,14 +468,27 @@ func (r *Runner) setupLogCapture() error {
func (r *Runner) logStdout(line string) {
r.captureLogLine(line)

_, _ = fmt.Fprintln(os.Stdout, line) //nolint:forbidigo // mirror log to stdout
// Strip [pid=xxxxx] prefix before mirroring to stdout
mirrorLine := stripPIDPrefix(line)
_, _ = fmt.Fprintln(os.Stdout, mirrorLine) //nolint:forbidigo // mirror log to stdout
}

// logStderr captures a line from stderr and mirrors to stderr
func (r *Runner) logStderr(line string) {
r.captureLogLine(line)

_, _ = fmt.Fprintln(os.Stderr, line) //nolint:forbidigo // mirror log to stderr
// Strip [pid=xxxxx] prefix before mirroring to stderr
mirrorLine := stripPIDPrefix(line)
_, _ = fmt.Fprintln(os.Stderr, mirrorLine) //nolint:forbidigo // mirror log to stderr
}

func stripPIDPrefix(line string) string {
if LogRegex.MatchString(line) {
if m := LogRegex.FindStringSubmatch(line); m != nil {
return m[2] // Extract message without pid prefix
}
}
return line
}

// captureLogLine handles routing log lines like the old implementation
Expand Down Expand Up @@ -521,7 +538,7 @@ func (r *Runner) captureLogLine(line string) {
} else {
// Add to runner logs for crash reporting
r.logs = append(r.logs, line)
r.setupResult.Logs = util.JoinLogs(r.logs)
r.setupResult.Logs = r.logs.String()
}
r.mu.Unlock()
default:
Expand Down Expand Up @@ -566,6 +583,9 @@ func (r *Runner) Config(ctx context.Context) error {
// Default to 1 if not set in cog.yaml, regardless whether async predict or not
maxConcurrency := max(1, cogYaml.Concurrency.Max)

// Send metrics
go r.sendRunnerMetric(*cogYaml)

// Create config.json for the coglet process
configJSON := map[string]any{
"module_name": moduleName,
Expand Down Expand Up @@ -593,6 +613,36 @@ func (r *Runner) Config(ctx context.Context) error {
return nil
}

func (r *Runner) sendRunnerMetric(cogYaml CogYaml) {
log := r.logger.Sugar()
// FIXME: wire this up through more than os.getenv
endpoint := os.Getenv("COG_METRICS_ENDPOINT")
if endpoint == "" {
return
}
data := map[string]any{
"gpu": cogYaml.Build.GPU,
"fast": cogYaml.Build.Fast,
"cog_runtime": cogYaml.Build.CogRuntime,
"version": version.Version(),
}
payload := MetricsPayload{
Source: "cog-runtime",
Type: "runner",
Data: data,
}
body, err := json.Marshal(payload)
if err != nil {
log.Errorw("failed to marshal payload", "error", err)
return
}
resp, err := httpclient.ApplyRetryPolicy(http.DefaultClient).Post(endpoint, "application/json", bytes.NewBuffer(body))
if err != nil || resp.StatusCode != http.StatusOK {
log.Errorw("failed to send runner metrics", "error", err)
}
defer resp.Body.Close()
}

func (r *Runner) Stop() error {
log := r.logger.Sugar()
r.mu.Lock()
Expand Down Expand Up @@ -913,7 +963,10 @@ func (r *Runner) updateSetupResult() {
}

// Set logs first (original pattern)
r.setupResult.Logs = util.JoinLogs(logLines)
r.setupResult.Logs = strings.Join(logLines, "\n")
if r.setupResult.Logs != "" {
r.setupResult.Logs += "\n"
}

setupResultPath := filepath.Join(r.runnerCtx.workingdir, "setup_result.json")
log.Debug("reading setup_result.json", "path", setupResultPath)
Expand Down Expand Up @@ -954,7 +1007,7 @@ func (r *Runner) rotateLogs() string {
r.mu.Lock()
defer r.mu.Unlock()

allLogs := util.JoinLogs(r.logs)
allLogs := r.logs.String()
r.logs = r.logs[:0]
return allLogs
}
Expand Down
2 changes: 1 addition & 1 deletion internal/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,7 @@ func TestPerPredictionWatcher(t *testing.T) {
assert.Equal(t, "partial output", pending.response.Output)
assert.Equal(t, predictionID, pending.response.ID)
assert.Equal(t, map[string]any{"test": "input"}, pending.response.Input)
assert.Equal(t, []string{"existing log"}, pending.response.Logs) // Logs preserved
assert.Equal(t, LogsSlice{"existing log"}, pending.response.Logs) // Logs preserved
pending.mu.Unlock()
})

Expand Down
111 changes: 48 additions & 63 deletions internal/runner/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,50 @@ import (
"syscall"
"time"

"github.com/replicate/cog-runtime/internal/util"
"github.com/replicate/cog-runtime/internal/webhook"
)

// LogsSlice is a []string that marshals to/from a newline-joined string in JSON
type LogsSlice []string

func (l LogsSlice) String() string {
r := strings.Join(l, "\n")
if r != "" {
r += "\n"
}
return r
}

// MarshalJSON implements custom JSON marshaling to convert logs from []string to string
func (l LogsSlice) MarshalJSON() ([]byte, error) {
result := strings.Join(l, "\n")
if result != "" {
result += "\n"
}
return json.Marshal(result)
}

// UnmarshalJSON implements custom JSON unmarshaling to convert logs from string to []string
func (l *LogsSlice) UnmarshalJSON(data []byte) error {
var str string
if err := json.Unmarshal(data, &str); err != nil {
return err
}

if str == "" {
*l = nil
return nil
}

// Split on newline and remove the trailing empty element if it exists
parts := strings.Split(str, "\n")
if len(parts) > 0 && parts[len(parts)-1] == "" {
parts = parts[:len(parts)-1]
}
*l = LogsSlice(parts)
return nil
}

type Status int

const (
Expand Down Expand Up @@ -116,72 +156,11 @@ type PredictionResponse struct {
Input any `json:"input,omitempty"`
Output any `json:"output,omitempty"`
Error string `json:"error,omitempty"`
Logs []string `json:"logs,omitempty"`
Logs LogsSlice `json:"logs,omitempty"`
Metrics any `json:"metrics,omitempty"`
WebhookURL string `json:"webhook,omitempty"`
}

// MarshalJSON implements custom JSON marshaling to convert logs from []string to string
func (pr PredictionResponse) MarshalJSON() ([]byte, error) {
return json.Marshal(&struct {
ID string `json:"id"`
Status PredictionStatus `json:"status"`
Input any `json:"input,omitempty"`
Output any `json:"output,omitempty"`
Error string `json:"error,omitempty"`
Logs string `json:"logs,omitempty"`
Metrics any `json:"metrics,omitempty"`
WebhookURL string `json:"webhook,omitempty"`
}{
ID: pr.ID,
Status: pr.Status,
Input: pr.Input,
Output: pr.Output,
Error: pr.Error,
Logs: util.JoinLogs(pr.Logs),
Metrics: pr.Metrics,
WebhookURL: pr.WebhookURL,
})
}

// UnmarshalJSON implements custom JSON unmarshalling to convert logs from string to []string
func (pr *PredictionResponse) UnmarshalJSON(data []byte) error {
aux := &struct {
ID string `json:"id"`
Status PredictionStatus `json:"status"`
Input any `json:"input,omitempty"`
Output any `json:"output,omitempty"`
Error string `json:"error,omitempty"`
Logs string `json:"logs,omitempty"`
Metrics any `json:"metrics,omitempty"`
WebhookURL string `json:"webhook,omitempty"`
}{}
if err := json.Unmarshal(data, aux); err != nil {
return err
}

pr.ID = aux.ID
pr.Status = aux.Status
pr.Input = aux.Input
pr.Output = aux.Output
pr.Error = aux.Error
pr.Metrics = aux.Metrics
pr.WebhookURL = aux.WebhookURL

// Convert string logs back to []string
if aux.Logs != "" {
// Split on newline and remove the trailing empty element if it exists
parts := strings.Split(aux.Logs, "\n")
if len(parts) > 0 && parts[len(parts)-1] == "" {
parts = parts[:len(parts)-1]
}
pr.Logs = parts
} else {
pr.Logs = nil
}
return nil
}

// RunnerID is a unique identifier for a runner instance.
// Format: 8-character base32 string (no leading zeros)
// Example: "k7m3n8p2", "b9q4x2w1"
Expand Down Expand Up @@ -391,3 +370,9 @@ func (p *PendingPrediction) sendWebhookSync(event webhook.Event) error {
_ = p.webhookSender.SendConditional(p.request.Webhook, bytes.NewReader(body), event, p.request.WebhookEventsFilter, &p.lastUpdated)
return nil
}

type MetricsPayload struct {
Source string `json:"source,omitempty"`
Type string `json:"type,omitempty"`
Data map[string]any `json:"data,omitempty"`
}
4 changes: 2 additions & 2 deletions internal/runner/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestPredictionResponse(t *testing.T) {
assert.Equal(t, PredictionSucceeded, resp.Status)
assert.Equal(t, map[string]any{"result": "success"}, resp.Output)
assert.Empty(t, resp.Error)
assert.Equal(t, []string{"log1", "log2"}, resp.Logs)
assert.Equal(t, LogsSlice{"log1", "log2"}, resp.Logs)
assert.Equal(t, map[string]any{"duration": 1.5}, resp.Metrics)
assert.Equal(t, "http://example.com/webhook", resp.WebhookURL)
})
Expand Down Expand Up @@ -330,7 +330,7 @@ func TestPredictionResponseUnmarshalFromExternalJSON(t *testing.T) {
err := json.Unmarshal([]byte(jsonStr), &response)
require.NoError(t, err)

expected := []string{
expected := LogsSlice{
"starting prediction",
"prediction in progress 1/2",
"prediction in progress 2/2",
Expand Down
Loading
Loading