-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Expand file tree
/
Copy pathinstrumentation.ts
More file actions
333 lines (296 loc) · 11.8 KB
/
instrumentation.ts
File metadata and controls
333 lines (296 loc) · 11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
import {
InstrumentationBase,
type InstrumentationConfig,
type InstrumentationModuleDefinition,
InstrumentationNodeModuleDefinition,
InstrumentationNodeModuleFile,
} from '@opentelemetry/instrumentation';
import type { LangChainOptions } from '@sentry/core';
import {
_INTERNAL_skipAiProviderWrapping,
ANTHROPIC_AI_INTEGRATION_NAME,
createLangChainCallbackHandler,
GOOGLE_GENAI_INTEGRATION_NAME,
instrumentLangChainEmbeddings,
OPENAI_INTEGRATION_NAME,
SDK_VERSION,
} from '@sentry/core';
const supportedVersions = ['>=0.1.0 <2.0.0'];
type LangChainInstrumentationOptions = InstrumentationConfig & LangChainOptions;
/**
* Represents the patched shape of LangChain provider package exports
*/
interface PatchedLangChainExports {
[key: string]: unknown;
}
/**
* Duck-types a LangChain `CallbackManager` instance. We can't `instanceof`
* check because `@langchain/core` may be bundled, deduped, or absent from
* our module graph; checking the shape avoids that coupling.
*/
function isCallbackManager(value: unknown): value is {
addHandler: (handler: unknown, inherit?: boolean) => void;
copy: () => unknown;
handlers?: unknown[];
} {
if (!value || typeof value !== 'object') {
return false;
}
const candidate = value as { addHandler?: unknown; copy?: unknown };
return typeof candidate.addHandler === 'function' && typeof candidate.copy === 'function';
}
/**
* Exported for testing.
* @internal
*/
export const _INTERNAL_augmentCallbackHandlers = augmentCallbackHandlers;
/**
* Augments a callback handler list with Sentry's handler if not already present.
*
* `options.callbacks` may be one of three shapes (per LangChain's RunnableConfig):
* - `undefined` → no callbacks configured
* - `BaseCallbackHandler[]` → list of handler instances
* - `CallbackManager` → a manager that already holds (potentially
* inheritable) child handlers
*
* The `CallbackManager` case is the load-bearing one: when LangGraph sets up
* a run with `streamMode: ['messages']`, it puts a `StreamMessagesHandler`
* onto a `CallbackManager` and passes that manager through `options.callbacks`.
* If we naively wrap the manager into `[manager, sentryHandler]`, LangChain
* downstream treats the whole manager as a single opaque handler — its
* inheritable children (`StreamMessagesHandler`, the tracer, etc.) are never
* unpacked, and per-token streaming events silently disappear.
*
* Instead, when we receive a `CallbackManager`, we copy it (so we don't
* mutate the caller's manager across invocations) and register Sentry's
* handler as an inheritable child via `.addHandler()`.
*/
function augmentCallbackHandlers(handlers: unknown, sentryHandler: unknown): unknown {
// Handle null/undefined - return array with just our handler
if (!handlers) {
return [sentryHandler];
}
// If handlers is already an array
if (Array.isArray(handlers)) {
// Check if our handler is already in the list
if (handlers.includes(sentryHandler)) {
return handlers;
}
// Add our handler to the list
return [...handlers, sentryHandler];
}
// CallbackManager: register our handler as an inheritable child on a copy
// so we preserve any handlers already attached (notably LangGraph's
// StreamMessagesHandler used by `streamMode: ['messages']`).
if (isCallbackManager(handlers)) {
const copied = handlers.copy() as {
addHandler: (handler: unknown, inherit?: boolean) => void;
handlers?: unknown[];
inheritableHandlers?: unknown[];
};
// Avoid double-registering on nested invocations. CallbackManager's own
// `addHandler` keeps `inheritableHandlers ⊆ handlers`, so checking
// `handlers` alone is normally enough — but we check both as a defensive
// guard against externally-constructed managers that bypass `addHandler`.
const alreadyRegistered =
(copied.handlers?.includes(sentryHandler) ?? false) ||
(copied.inheritableHandlers?.includes(sentryHandler) ?? false);
if (!alreadyRegistered) {
copied.addHandler(sentryHandler, true);
}
return copied;
}
// Unknown type - return original
return handlers;
}
/**
* Wraps Runnable methods (invoke, stream, batch) to inject Sentry callbacks at request time
* Uses a Proxy to intercept method calls and augment the options.callbacks
*/
function wrapRunnableMethod(
originalMethod: (...args: unknown[]) => unknown,
sentryHandler: unknown,
_methodName: string,
): (...args: unknown[]) => unknown {
return new Proxy(originalMethod, {
apply(target, thisArg, args: unknown[]): unknown {
// LangChain Runnable method signatures:
// invoke(input, options?) - options contains callbacks
// stream(input, options?) - options contains callbacks
// batch(inputs, options?) - options contains callbacks
// Options is typically the second argument
const optionsIndex = 1;
let options = args[optionsIndex] as Record<string, unknown> | undefined;
// If options don't exist or aren't an object, create them
if (!options || typeof options !== 'object' || Array.isArray(options)) {
options = {};
args[optionsIndex] = options;
}
// Inject our callback handler into options.callbacks (request time callbacks)
const existingCallbacks = options.callbacks;
const augmentedCallbacks = augmentCallbackHandlers(existingCallbacks, sentryHandler);
options.callbacks = augmentedCallbacks;
// Call original method with augmented options
return Reflect.apply(target, thisArg, args);
},
}) as (...args: unknown[]) => unknown;
}
/**
* Sentry LangChain instrumentation using OpenTelemetry.
*/
export class SentryLangChainInstrumentation extends InstrumentationBase<LangChainInstrumentationOptions> {
public constructor(config: LangChainInstrumentationOptions = {}) {
super('@sentry/instrumentation-langchain', SDK_VERSION, config);
}
/**
* Initializes the instrumentation by defining the modules to be patched.
* We patch the BaseChatModel class methods to inject callbacks
*
* We hook into provider packages (@langchain/anthropic, @langchain/openai, etc.)
* because @langchain/core is often bundled and not loaded as a separate module
*/
public init(): InstrumentationModuleDefinition | InstrumentationModuleDefinition[] {
const modules: InstrumentationModuleDefinition[] = [];
// Hook into common LangChain provider packages
const providerPackages = [
'@langchain/anthropic',
'@langchain/openai',
'@langchain/google-genai',
'@langchain/mistralai',
'@langchain/google-vertexai',
'@langchain/groq',
];
for (const packageName of providerPackages) {
// In CJS, LangChain packages re-export from dist/index.cjs files.
// Patching only the root module sometimes misses the real implementation or
// gets overwritten when that file is loaded. We add a file-level patch so that
// _patch runs again on the concrete implementation
modules.push(
new InstrumentationNodeModuleDefinition(
packageName,
supportedVersions,
this._patch.bind(this),
exports => exports,
[
new InstrumentationNodeModuleFile(
`${packageName}/dist/index.cjs`,
supportedVersions,
this._patch.bind(this),
exports => exports,
),
],
),
);
}
// Hook into main 'langchain' package to catch initChatModel (v1+)
modules.push(
new InstrumentationNodeModuleDefinition(
'langchain',
supportedVersions,
this._patch.bind(this),
exports => exports,
[
// To catch the CJS build that contains ConfigurableModel / initChatModel for v1
new InstrumentationNodeModuleFile(
'langchain/dist/chat_models/universal.cjs',
supportedVersions,
this._patch.bind(this),
exports => exports,
),
],
),
);
return modules;
}
/**
* Core patch logic - patches chat model and embedding methods
* This is called when a LangChain provider package is loaded
*/
private _patch(exports: PatchedLangChainExports): PatchedLangChainExports | void {
// Skip AI provider wrapping now that LangChain is actually being used
// This prevents duplicate spans from Anthropic/OpenAI/GoogleGenAI standalone integrations
_INTERNAL_skipAiProviderWrapping([
OPENAI_INTEGRATION_NAME,
ANTHROPIC_AI_INTEGRATION_NAME,
GOOGLE_GENAI_INTEGRATION_NAME,
]);
const config = this.getConfig();
// Create a shared handler instance for chat model callbacks
const sentryHandler = createLangChainCallbackHandler(config);
// Patch Runnable methods to inject callbacks at request time
// This directly manipulates options.callbacks that LangChain uses
this._patchRunnableMethods(exports, sentryHandler);
// Patch embedding methods to create spans directly
// Embeddings don't use the callback system, so we wrap the methods themselves
this._patchEmbeddingsMethods(exports, config);
return exports;
}
/**
* Patches chat model methods (invoke, stream, batch) to inject Sentry callbacks
* Finds a chat model class from the provider package exports and patches its prototype methods
*/
private _patchRunnableMethods(exports: PatchedLangChainExports, sentryHandler: unknown): void {
// Known chat model class names for each provider
const knownChatModelNames = [
'ChatAnthropic',
'ChatOpenAI',
'ChatGoogleGenerativeAI',
'ChatMistralAI',
'ChatVertexAI',
'ChatGroq',
'ConfigurableModel',
];
const exportsToPatch = (exports.universal_exports ?? exports) as Record<string, unknown>;
const chatModelClass = Object.values(exportsToPatch).find(exp => {
return typeof exp === 'function' && knownChatModelNames.includes(exp.name);
}) as { prototype: unknown; name: string } | undefined;
if (!chatModelClass) {
return;
}
// Patch directly on chatModelClass.prototype
const targetProto = chatModelClass.prototype as Record<string, unknown>;
// Skip if already patched (both file-level and module-level hooks resolve to the same prototype)
if (targetProto.__sentry_patched__) {
return;
}
targetProto.__sentry_patched__ = true;
// Patch the methods (invoke, stream, batch)
// All chat model instances will inherit these patched methods
const methodsToPatch = ['invoke', 'stream', 'batch'] as const;
for (const methodName of methodsToPatch) {
const method = targetProto[methodName];
if (typeof method === 'function') {
targetProto[methodName] = wrapRunnableMethod(
method as (...args: unknown[]) => unknown,
sentryHandler,
methodName,
);
}
}
}
/**
* Patches embedding class methods (embedQuery, embedDocuments) to create Sentry spans.
*
* Unlike chat models which use LangChain's callback system, the Embeddings base class
* has no callback support. We wrap the methods directly on the prototype.
*
* Instruments any exported class whose prototype has both embedQuery and embedDocuments as functions.
*/
private _patchEmbeddingsMethods(exports: PatchedLangChainExports, options: LangChainOptions): void {
const exportsToPatch = (exports.universal_exports ?? exports) as Record<string, unknown>;
for (const exp of Object.values(exportsToPatch)) {
if (typeof exp !== 'function' || !exp.prototype) {
continue;
}
const proto = exp.prototype as Record<string, unknown>;
if (typeof proto.embedQuery !== 'function' || typeof proto.embedDocuments !== 'function') {
continue;
}
if (proto.__sentry_patched__) {
continue;
}
proto.__sentry_patched__ = true;
instrumentLangChainEmbeddings(proto, options);
}
}
}