Skip to content

Commit 2a7c79f

Browse files
authored
fix(spanner): avoid overriding host context manager and isolate session creation context (#8441)
1 parent 13d03c1 commit 2a7c79f

3 files changed

Lines changed: 83 additions & 70 deletions

File tree

handwritten/spanner/src/instrument.ts

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -97,23 +97,26 @@ const {
9797
AsyncHooksContextManager,
9898
} = require('@opentelemetry/context-async-hooks');
9999

100+
let contextManagerInstallAttempted = false;
101+
100102
/*
101-
* This function ensures that async/await works correctly by
102-
* checking if context.active() returns an invalid/unset context
103-
* and if so, sets a global AsyncHooksContextManager otherwise
104-
* spans resulting from async/await invocations won't be correctly
105-
* associated in their respective hierarchies.
103+
* If no global ContextManager is registered, install an AsyncHooksContextManager
104+
* so that async/await trace context propagation works for apps that haven't
105+
* configured OpenTelemetry themselves. If the host app has already installed a
106+
* ContextManager, leave it alone — tearing down a working manager breaks the
107+
* host's baggage and span parentage on the next gRPC call.
108+
*
109+
* setGlobalContextManager() returns false when a manager is already registered,
110+
* which is the documented signal that we shouldn't replace it. The
111+
* `contextManagerInstallAttempted` latch makes the call idempotent so we don't
112+
* allocate a new AsyncHooksContextManager on each Spanner client construction.
106113
*/
107114
function ensureInitialContextManagerSet() {
108-
if (!context['_contextManager'] || context.active() === ROOT_CONTEXT) {
109-
// If no context manager is currently set, or if the active context is the ROOT_CONTEXT,
110-
// trace context propagation cannot
111-
// function correctly with async/await for OpenTelemetry
112-
// See {@link https://opentelemetry.io/docs/languages/js/context/#active-context}
113-
context.disable(); // Disable any prior contextManager.
114-
const contextManager = new AsyncHooksContextManager();
115+
if (contextManagerInstallAttempted) return;
116+
contextManagerInstallAttempted = true;
117+
const contextManager = new AsyncHooksContextManager();
118+
if (context.setGlobalContextManager(contextManager)) {
115119
contextManager.enable();
116-
context.setGlobalContextManager(contextManager);
117120
}
118121
}
119122

handwritten/spanner/src/multiplexed-session.ts

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -124,25 +124,29 @@ export class MultiplexedSession
124124
opts: this._observabilityOptions,
125125
dbName: this.database.formattedName_,
126126
};
127-
return startTrace(
128-
'MultiplexedSession.createSession',
129-
traceConfig,
130-
async span => {
131-
span.addEvent('Requesting a multiplexed session');
132-
try {
133-
const [createSessionResponse] = await this.database.createSession({
134-
multiplexed: true,
135-
});
136-
this._multiplexedSession = createSessionResponse;
137-
span.addEvent('Created a multiplexed session');
138-
} catch (e) {
139-
setSpanError(span, e as Error);
140-
throw e;
141-
} finally {
142-
span.end();
143-
}
144-
},
145-
);
127+
return context.with(ROOT_CONTEXT, () => {
128+
return startTrace(
129+
'MultiplexedSession.createSession',
130+
traceConfig,
131+
async span => {
132+
span.addEvent('Requesting a multiplexed session');
133+
try {
134+
const [createSessionResponse] = await this.database.createSession(
135+
{
136+
multiplexed: true,
137+
},
138+
);
139+
this._multiplexedSession = createSessionResponse;
140+
span.addEvent('Created a multiplexed session');
141+
} catch (e) {
142+
setSpanError(span, e as Error);
143+
throw e;
144+
} finally {
145+
span.end();
146+
}
147+
},
148+
);
149+
});
146150
};
147151

148152
// Assign the running task to the shared promise variable, and ensure

handwritten/spanner/src/session-pool.ts

Lines changed: 44 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -689,49 +689,55 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface {
689689
opts: this._observabilityOptions,
690690
dbName: this.database.formattedName_,
691691
};
692-
return startTrace('SessionPool.createSessions', traceConfig, async span => {
693-
span.addEvent(`Requesting ${amount} sessions`);
694-
695-
// while we can request as many sessions be created as we want, the backend
696-
// will return at most 100 at a time, hence the need for a while loop.
697-
while (amount > 0) {
698-
let sessions: Session[] | null = null;
699-
700-
span.addEvent(`Creating ${amount} sessions`);
701-
702-
try {
703-
[sessions] = await this.database.batchCreateSessions({
704-
count: amount,
705-
labels: labels,
706-
databaseRole: databaseRole,
707-
});
708-
709-
amount -= sessions.length;
710-
nReturned += sessions.length;
711-
} catch (e) {
712-
this._pending -= amount;
713-
this.emit('createError', e);
692+
return context.with(ROOT_CONTEXT, () => {
693+
return startTrace(
694+
'SessionPool.createSessions',
695+
traceConfig,
696+
async span => {
697+
span.addEvent(`Requesting ${amount} sessions`);
698+
699+
// while we can request as many sessions be created as we want, the backend
700+
// will return at most 100 at a time, hence the need for a while loop.
701+
while (amount > 0) {
702+
let sessions: Session[] | null = null;
703+
704+
span.addEvent(`Creating ${amount} sessions`);
705+
706+
try {
707+
[sessions] = await this.database.batchCreateSessions({
708+
count: amount,
709+
labels: labels,
710+
databaseRole: databaseRole,
711+
});
712+
713+
amount -= sessions.length;
714+
nReturned += sessions.length;
715+
} catch (e) {
716+
this._pending -= amount;
717+
this.emit('createError', e);
718+
span.addEvent(
719+
`Requested for ${nRequested} sessions returned ${nReturned}`,
720+
);
721+
setSpanErrorAndException(span, e as Error);
722+
span.end();
723+
throw e;
724+
}
725+
726+
sessions.forEach((session: Session) => {
727+
setImmediate(() => {
728+
this._inventory.borrowed.add(session);
729+
this._pending -= 1;
730+
this.release(session);
731+
});
732+
});
733+
}
734+
714735
span.addEvent(
715736
`Requested for ${nRequested} sessions returned ${nReturned}`,
716737
);
717-
setSpanErrorAndException(span, e as Error);
718738
span.end();
719-
throw e;
720-
}
721-
722-
sessions.forEach((session: Session) => {
723-
setImmediate(() => {
724-
this._inventory.borrowed.add(session);
725-
this._pending -= 1;
726-
this.release(session);
727-
});
728-
});
729-
}
730-
731-
span.addEvent(
732-
`Requested for ${nRequested} sessions returned ${nReturned}`,
739+
},
733740
);
734-
span.end();
735741
});
736742
}
737743

0 commit comments

Comments
 (0)