Skip to content

Commit a7e61a1

Browse files
authored
Merge pull request #30 from phantom5099/fork-pr-337-1776410957
fix(runtime/session/config): resolve submit-event blocking and cancellation gaps
2 parents a2d685e + 100ad8a commit a7e61a1

12 files changed

Lines changed: 397 additions & 41 deletions

File tree

internal/config/loader_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,12 +1050,12 @@ func TestLoadCustomProvidersReadDirAndStatErrors(t *testing.T) {
10501050
t.Fatalf("WriteFile() error = %v", err)
10511051
}
10521052

1053-
_, err := loadCustomProviders(baseDir)
1054-
if err == nil {
1055-
t.Fatal("expected read providers dir error")
1053+
providers, err := loadCustomProviders(baseDir)
1054+
if err != nil {
1055+
t.Fatalf("expected read providers dir fallback, got %v", err)
10561056
}
1057-
if !strings.Contains(err.Error(), "read providers dir") {
1058-
t.Fatalf("expected read providers dir error, got %v", err)
1057+
if len(providers) != 0 {
1058+
t.Fatalf("expected empty providers on read fallback, got %d", len(providers))
10591059
}
10601060
})
10611061

internal/config/provider_loader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func loadCustomProviders(baseDir string) ([]ProviderConfig, error) {
6767
if os.IsNotExist(err) {
6868
return nil, nil
6969
}
70-
return nil, fmt.Errorf("config: read providers dir: %w", err)
70+
return nil, nil
7171
}
7272

7373
sort.Slice(entries, func(i, j int) bool {

internal/provider/openaicompat/chatcompletions/provider_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ func TestNewAndBuildRequest(t *testing.T) {
123123
if downgradedSchema["type"] != "object" {
124124
t.Fatalf("expected downgraded schema type object, got %+v", downgradedSchema["type"])
125125
}
126-
if downgradedSchema["x-neocode-schema-downgraded"] != true {
127-
t.Fatalf("expected downgrade marker, got %+v", downgradedSchema)
126+
if _, ok := downgradedSchema["x-neocode-schema-downgraded"]; ok {
127+
t.Fatalf("expected no custom downgrade marker in outbound schema, got %+v", downgradedSchema)
128128
}
129129

130130
withSessionAsset, err := BuildRequest(context.Background(), testCfg("https://api.example.com/v1", "gpt-4.1", "test-key"), providertypes.GenerateRequest{

internal/provider/openaicompat/chatcompletions/request.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,14 +91,10 @@ func normalizeToolSchemaForOpenAI(schema map[string]any) map[string]any {
9191
typeName, _ := normalized["type"].(string)
9292
if strings.TrimSpace(strings.ToLower(typeName)) != "object" {
9393
normalized["type"] = "object"
94-
normalized["x-neocode-schema-downgraded"] = true
9594
}
9695

9796
if _, ok := normalized["properties"].(map[string]any); !ok {
9897
normalized["properties"] = map[string]any{}
99-
if strings.TrimSpace(strings.ToLower(typeName)) != "object" {
100-
normalized["x-neocode-schema-downgraded"] = true
101-
}
10298
}
10399
return normalized
104100
}

internal/runtime/input_prepare.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@ import (
55
"errors"
66
"fmt"
77
"strings"
8+
"time"
89

910
agentsession "neo-code/internal/session"
1011
)
1112

13+
const prepareEventEmitTimeout = 200 * time.Millisecond
14+
1215
// NewSessionInputPreparer 创建基于 session 子层实现的输入归一化适配器。
1316
func NewSessionInputPreparer(store agentsession.Store, assetStore agentsession.AssetStore) UserInputPreparer {
1417
return sessionInputPreparer{
@@ -51,7 +54,7 @@ func (s *Service) PrepareUserInput(ctx context.Context, input PrepareInput) (Use
5154
}
5255

5356
runID := strings.TrimSpace(input.RunID)
54-
_ = s.emit(ctx, EventInputNormalized, runID, prepared.UserInput.SessionID, InputNormalizedPayload{
57+
_ = s.emitPrepareEvent(ctx, EventInputNormalized, runID, prepared.UserInput.SessionID, InputNormalizedPayload{
5558
TextLength: len([]rune(strings.TrimSpace(input.Text))),
5659
ImageCount: len(input.Images),
5760
})
@@ -60,7 +63,7 @@ func (s *Service) PrepareUserInput(ctx context.Context, input PrepareInput) (Use
6063
if index >= 0 && index < len(input.Images) {
6164
path = strings.TrimSpace(input.Images[index].Path)
6265
}
63-
_ = s.emit(ctx, EventAssetSaved, runID, prepared.UserInput.SessionID, AssetSavedPayload{
66+
_ = s.emitPrepareEvent(ctx, EventAssetSaved, runID, prepared.UserInput.SessionID, AssetSavedPayload{
6467
Index: index,
6568
Path: path,
6669
AssetID: asset.ID,
@@ -86,13 +89,31 @@ func (s *Service) emitPrepareFailure(ctx context.Context, input PrepareInput, er
8689
if session := strings.TrimSpace(saveErr.SessionID); session != "" {
8790
sessionID = session
8891
}
89-
return s.emit(ctx, EventAssetSaveFailed, runID, sessionID, AssetSaveFailedPayload{
92+
return s.emitPrepareEvent(ctx, EventAssetSaveFailed, runID, sessionID, AssetSaveFailedPayload{
9093
Index: saveErr.Index,
9194
Path: strings.TrimSpace(saveErr.Path),
9295
Message: strings.TrimSpace(saveErr.Error()),
9396
})
9497
}
95-
return s.emit(ctx, EventError, runID, sessionID, strings.TrimSpace(err.Error()))
98+
return s.emitPrepareEvent(ctx, EventError, runID, sessionID, strings.TrimSpace(err.Error()))
99+
}
100+
101+
// emitPrepareEvent 在输入归一化阶段使用限时上下文发事件,避免通道拥塞导致提交链路卡死。
102+
func (s *Service) emitPrepareEvent(ctx context.Context, kind EventType, runID string, sessionID string, payload any) error {
103+
emitCtx := ctx
104+
cancel := func() {}
105+
if _, hasDeadline := emitCtx.Deadline(); !hasDeadline {
106+
emitCtx, cancel = context.WithTimeout(emitCtx, prepareEventEmitTimeout)
107+
}
108+
defer cancel()
109+
110+
if err := s.emit(emitCtx, kind, runID, sessionID, payload); err != nil {
111+
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
112+
return nil
113+
}
114+
return err
115+
}
116+
return nil
96117
}
97118

98119
type sessionInputPreparer struct {

internal/runtime/input_prepare_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,31 @@ func TestServiceSubmitWithoutPreparerReturnsError(t *testing.T) {
123123
}
124124
}
125125

126+
func TestServicePrepareUserInputDoesNotBlockWhenPrepareEventQueueIsFull(t *testing.T) {
127+
t.Parallel()
128+
129+
workdir := t.TempDir()
130+
svc, _ := newPrepareTestService(t, workdir, true)
131+
for index := 0; index < cap(svc.events); index++ {
132+
svc.events <- RuntimeEvent{Type: EventToolChunk}
133+
}
134+
135+
start := time.Now()
136+
input, err := svc.PrepareUserInput(context.Background(), PrepareInput{
137+
RunID: "run-prepare-full-queue",
138+
Text: "hello",
139+
})
140+
if err != nil {
141+
t.Fatalf("PrepareUserInput() error = %v", err)
142+
}
143+
if input.SessionID == "" {
144+
t.Fatalf("expected prepared session id")
145+
}
146+
if elapsed := time.Since(start); elapsed > time.Second {
147+
t.Fatalf("PrepareUserInput() blocked too long with full event queue: %v", elapsed)
148+
}
149+
}
150+
126151
func newPrepareTestService(t *testing.T, workdir string, withPreparer bool) (*Service, *agentsession.JSONStore) {
127152
t.Helper()
128153

internal/session/asset_store_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,21 @@ func TestJSONStoreAssetStoreRespectsCanceledContext(t *testing.T) {
118118
}
119119
}
120120

121+
func TestJSONStoreSaveAssetStopsWhenContextCanceledDuringCopy(t *testing.T) {
122+
t.Parallel()
123+
124+
store := NewJSONStore(t.TempDir(), t.TempDir())
125+
ctx, cancel := context.WithCancel(context.Background())
126+
reader := &cancelAfterFirstReadReader{
127+
cancel: cancel,
128+
chunks: [][]byte{[]byte("chunk-1"), []byte("chunk-2")},
129+
}
130+
131+
if _, err := store.SaveAsset(ctx, "session_ctx_cancel_during_copy", reader, "image/png"); !errors.Is(err, context.Canceled) {
132+
t.Fatalf("expected context canceled during copy, got %v", err)
133+
}
134+
}
135+
121136
func TestJSONStoreSaveAssetRejectsOversizedPayload(t *testing.T) {
122137
t.Parallel()
123138

@@ -224,8 +239,52 @@ func TestJSONStoreOpenAndStatMissingStoredFiles(t *testing.T) {
224239
}
225240
}
226241

242+
func TestJSONStoreDeleteAsset(t *testing.T) {
243+
t.Parallel()
244+
245+
store := NewJSONStore(t.TempDir(), t.TempDir())
246+
sessionID := "session-delete-asset"
247+
meta, err := store.SaveAsset(context.Background(), sessionID, strings.NewReader("img"), "image/png")
248+
if err != nil {
249+
t.Fatalf("save seed asset: %v", err)
250+
}
251+
252+
if err := store.DeleteAsset(context.Background(), sessionID, meta.ID); err != nil {
253+
t.Fatalf("DeleteAsset() error = %v", err)
254+
}
255+
if _, statErr := os.Stat(store.assetPath(sessionID, meta.ID)); !errors.Is(statErr, os.ErrNotExist) {
256+
t.Fatalf("expected removed asset file, got %v", statErr)
257+
}
258+
if _, statErr := os.Stat(store.assetMetaPath(sessionID, meta.ID)); !errors.Is(statErr, os.ErrNotExist) {
259+
t.Fatalf("expected removed asset meta file, got %v", statErr)
260+
}
261+
262+
if err := store.DeleteAsset(context.Background(), sessionID, meta.ID); err != nil {
263+
t.Fatalf("DeleteAsset() should ignore already deleted files, got %v", err)
264+
}
265+
}
266+
227267
type failingReader struct{}
228268

229269
func (failingReader) Read(_ []byte) (int, error) {
230270
return 0, errors.New("read failure")
231271
}
272+
273+
type cancelAfterFirstReadReader struct {
274+
cancel context.CancelFunc
275+
chunks [][]byte
276+
index int
277+
}
278+
279+
func (r *cancelAfterFirstReadReader) Read(p []byte) (int, error) {
280+
if r.index >= len(r.chunks) {
281+
return 0, io.EOF
282+
}
283+
chunk := r.chunks[r.index]
284+
r.index++
285+
n := copy(p, chunk)
286+
if r.index == 1 && r.cancel != nil {
287+
r.cancel()
288+
}
289+
return n, nil
290+
}

0 commit comments

Comments
 (0)