-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmessages.go
More file actions
138 lines (118 loc) · 3.9 KB
/
messages.go
File metadata and controls
138 lines (118 loc) · 3.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package writ
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/anthropics/anthropic-sdk-go"
"github.com/anthropics/anthropic-sdk-go/packages/ssestream"
)
// MessagesService wraps anthropic's Messages API with gate + audit.
type MessagesService struct {
wc *Client
}
// New evaluates the OPA policy gate, writes a pre-call audit entry,
// calls the inner anthropic.Client if allowed, then writes a post-call entry.
// Returns *DenialError if the policy denies the call (no network call made).
func (s *MessagesService) New(ctx context.Context, params anthropic.MessageNewParams) (*anthropic.Message, error) {
decision, entry, err := s.wc.gater.evaluate(ctx, params, s.wc.cfg)
if err != nil {
return nil, fmt.Errorf("writ: gate evaluation: %w", err)
}
entry.Timestamp = time.Now().UTC()
entry, err = computeEntryHash(s.wc.chain, entry)
if err != nil {
return nil, fmt.Errorf("writ: hash pre-call audit entry: %w", err)
}
if err := s.wc.chain.Append(entry); err != nil {
return nil, fmt.Errorf("writ: write pre-call audit entry: %w", err)
}
if !decision.Allowed {
return nil, &DenialError{
Reason: decision.DenialReason,
AuditID: decision.AuditID,
Tier: decision.Tier,
}
}
resp, callErr := s.wc.inner.Messages.New(ctx, params)
postEntry, buildErr := buildPostCallEntry(s.wc.chain, entry, resp, callErr, s.wc.cfg)
if buildErr == nil {
_ = s.wc.chain.Append(postEntry)
}
if s.wc.payloads != nil {
inputJSON, _ := json.Marshal(params)
var outputJSON json.RawMessage
if resp != nil {
outputJSON, _ = json.Marshal(resp)
}
s.wc.payloads.write(payloadEntry{
AuditID: entry.ID,
Timestamp: entry.Timestamp.(time.Time),
EventType: entry.EventType,
Input: inputJSON,
Output: outputJSON,
})
}
return resp, callErr
}
// NewStreaming evaluates the gate and, if allowed, opens a streaming response.
// A streaming-started audit entry is written before the stream opens.
// A streaming-complete entry is written after the stream closes (ADR #19).
// Returns *DenialError if the policy denies the call.
func (s *MessagesService) NewStreaming(ctx context.Context, params anthropic.MessageNewParams) (*WritStream, error) {
decision, entry, err := s.wc.gater.evaluate(ctx, params, s.wc.cfg)
if err != nil {
return nil, fmt.Errorf("writ: gate evaluation: %w", err)
}
entry.EventType = "llm_call_streaming_started"
entry.Timestamp = time.Now().UTC()
entry, err = computeEntryHash(s.wc.chain, entry)
if err != nil {
return nil, fmt.Errorf("writ: hash streaming-started audit entry: %w", err)
}
if err := s.wc.chain.Append(entry); err != nil {
return nil, fmt.Errorf("writ: write streaming-started audit entry: %w", err)
}
if !decision.Allowed {
return nil, &DenialError{
Reason: decision.DenialReason,
AuditID: decision.AuditID,
Tier: decision.Tier,
}
}
if s.wc.payloads != nil {
inputJSON, _ := json.Marshal(params)
s.wc.payloads.write(payloadEntry{
AuditID: entry.ID,
Timestamp: entry.Timestamp.(time.Time),
EventType: entry.EventType,
Input: inputJSON,
})
}
inner := s.wc.inner.Messages.NewStreaming(ctx, params)
return &WritStream{
inner: inner,
wc: s.wc,
startEntry: entry,
}, nil
}
// WritStream wraps the anthropic SSE stream to write the streaming-complete
// audit entry when the stream closes.
type WritStream struct {
inner *ssestream.Stream[anthropic.MessageStreamEventUnion]
wc *Client
startEntry ChainEntry
}
// Stream returns the underlying SSE stream for reading events.
func (s *WritStream) Stream() *ssestream.Stream[anthropic.MessageStreamEventUnion] {
return s.inner
}
// Close writes the streaming-complete audit entry and closes the stream.
func (s *WritStream) Close() error {
closeErr := s.inner.Close()
completeEntry, err := buildStreamCompleteEntry(s.wc.chain, s.startEntry, closeErr, s.wc.cfg)
if err == nil {
_ = s.wc.chain.Append(completeEntry)
}
return closeErr
}