Skip to content
This repository was archived by the owner on Jan 21, 2026. It is now read-only.

Commit 79ab435

Browse files
authored
fix: adjust async_hooks cls behavior (#734)
PR-URL: #734
1 parent d8f3611 commit 79ab435

5 files changed

Lines changed: 229 additions & 50 deletions

File tree

src/cls/async-hooks.ts

Lines changed: 92 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
* limitations under the License.
1515
*/
1616

17-
// This file requires continuation-local-storage in the AsyncHooksCLS
18-
// constructor, rather than upon module load.
17+
// This file calls require('async_hooks') in the AsyncHooksCLS constructor,
18+
// rather than upon module load.
1919
import * as asyncHooksModule from 'async_hooks';
2020
import {EventEmitter} from 'events';
2121
import * as shimmer from 'shimmer';
@@ -31,29 +31,63 @@ const EVENT_EMITTER_METHODS: Array<keyof EventEmitter> =
3131
const WRAPPED = Symbol('@google-cloud/trace-agent:AsyncHooksCLS:WRAPPED');
3232

3333
type ContextWrapped<T> = T&{[WRAPPED]?: boolean};
34+
type Reference<T> = {
35+
value: T
36+
};
3437

3538
/**
3639
* An implementation of continuation-local storage on top of the async_hooks
3740
* module.
3841
*/
3942
export class AsyncHooksCLS<Context extends {}> implements CLS<Context> {
40-
private currentContext: {value: Context};
41-
private contexts: {[id: number]: Context} = {};
43+
// instance-scope reference to avoid top-level require.
44+
private ah: AsyncHooksModule;
45+
46+
/** A map of AsyncResource IDs to Context objects. */
47+
private contexts: {[id: number]: Reference<Context>} = {};
48+
/** The AsyncHook that proactively populates entries in this.contexts. */
4249
private hook: asyncHooksModule.AsyncHook;
50+
/** Whether this instance is enabled. */
4351
private enabled = false;
4452

4553
constructor(private readonly defaultContext: Context) {
46-
this.currentContext = {value: this.defaultContext};
47-
this.hook = (require('async_hooks') as AsyncHooksModule).createHook({
54+
// Store a reference to the async_hooks module, since we will need to query
55+
// the current AsyncResource ID often.
56+
this.ah = require('async_hooks') as AsyncHooksModule;
57+
58+
// Create the hook.
59+
this.hook = this.ah.createHook({
4860
init: (id: number, type: string, triggerId: number, resource: {}) => {
49-
this.contexts[id] = this.currentContext.value;
50-
},
51-
before: (id: number) => {
52-
if (this.contexts[id]) {
53-
this.currentContext.value = this.contexts[id];
61+
// init is called when a new AsyncResource is created. We want code
62+
// that runs within the scope of this new AsyncResource to see the same
63+
// context as its "parent" AsyncResource. The criteria for the parent
64+
// depends on the type of the AsyncResource.
65+
if (type === 'PROMISE') {
66+
// Opt not to use the trigger ID for Promises, as this causes context
67+
// confusion in applications using async/await.
68+
// Instead, use the ID of the AsyncResource in whose scope we are
69+
// currently running.
70+
this.contexts[id] = this.contexts[this.ah.executionAsyncId()];
71+
} else {
72+
// Use the trigger ID for any other type. In Node core, this is
73+
// usually equal the ID of the AsyncResource in whose scope we are
74+
// currently running (the "current" AsyncResource), or that of one
75+
// of its ancestors, so the behavior is not expected to be different
76+
// from using the ID of the current AsyncResource instead.
77+
// A divergence is expected only to arise through the user
78+
// AsyncResource API, because users of that API can specify their own
79+
// trigger ID. In this case, we choose to respect the user's
80+
// selection.
81+
this.contexts[id] = this.contexts[triggerId];
5482
}
83+
// Note that this function always assigns values in this.contexts to
84+
// values under other keys, which may or may not be undefined. Consumers
85+
// of the CLS API will get the sentinel (default) value if they query
86+
// the current context when it is stored as undefined.
5587
},
5688
destroy: (id: number) => {
89+
// destroy is called when the AsyncResource is no longer used, so also
90+
// delete its entry in the map.
5791
delete this.contexts[id];
5892
}
5993
});
@@ -64,51 +98,85 @@ export class AsyncHooksCLS<Context extends {}> implements CLS<Context> {
6498
}
6599

66100
enable(): void {
67-
this.currentContext.value = this.defaultContext;
101+
this.contexts = {};
68102
this.hook.enable();
69103
this.enabled = true;
70104
}
71105

72106
disable(): void {
73-
this.currentContext.value = this.defaultContext;
107+
this.contexts = {};
74108
this.hook.disable();
75109
this.enabled = false;
76110
}
77111

78112
getContext(): Context {
79-
return this.currentContext.value;
113+
// We don't store this.defaultContext directly in this.contexts.
114+
// Getting undefined when looking up this.contexts means that it wasn't
115+
// set, so return the default context.
116+
const current = this.contexts[this.ah.executionAsyncId()];
117+
return current ? current.value : this.defaultContext;
80118
}
81119

82120
setContext(value: Context): void {
83-
this.currentContext.value = value;
121+
const id = this.ah.executionAsyncId();
122+
const current = this.contexts[id];
123+
if (current) {
124+
current.value = value;
125+
} else {
126+
this.contexts[id] = {value};
127+
}
84128
}
85129

86130
runWithNewContext<T>(fn: Func<T>): T {
87-
const oldContext = this.currentContext.value;
88-
this.currentContext.value = this.defaultContext;
131+
// Run fn() so that any AsyncResource objects that are created in
132+
// fn will have the context set by this.setContext.
133+
const id = this.ah.executionAsyncId();
134+
const oldContext = this.contexts[id];
135+
// Reset the current context. This prevents this.getContext from returning
136+
// a stale value.
137+
this.contexts[id] = {value: this.defaultContext};
89138
try {
90139
return fn();
91140
} finally {
92-
this.currentContext.value = oldContext;
141+
// Revert the current context to what it was before any calls to
142+
// this.setContext from within fn.
143+
this.contexts[id] = oldContext;
93144
}
94145
}
95146

96147
bindWithCurrentContext<T>(fn: Func<T>): Func<T> {
97-
if ((fn as ContextWrapped<Func<T>>)[WRAPPED] || !this.currentContext) {
148+
// Return if we have already wrapped the function.
149+
if ((fn as ContextWrapped<Func<T>>)[WRAPPED]) {
98150
return fn;
99151
}
100-
const current = this.currentContext;
101-
const boundContext = this.currentContext.value;
152+
// Capture the context of the current AsyncResource.
153+
const boundContext = this.contexts[this.ah.executionAsyncId()];
154+
// Return if there is no current context to bind.
155+
if (!boundContext) {
156+
return fn;
157+
}
158+
const that = this;
159+
// TODO(kjin): This code is somewhat duplicated with runWithNewContext.
160+
// Can we merge this?
161+
// Wrap fn so that any AsyncResource objects that are created in fn will
162+
// share context with that of the AsyncResource with the given ID.
102163
const contextWrapper: ContextWrapped<Func<T>> = function(this: {}) {
103-
const oldContext = current.value;
104-
current.value = boundContext;
164+
const id = that.ah.executionAsyncId();
165+
const oldContext = that.contexts[id];
166+
// Restore the captured context.
167+
that.contexts[id] = boundContext;
105168
try {
106169
return fn.apply(this, arguments) as T;
107170
} finally {
108-
current.value = oldContext;
171+
// Revert the current context to what it was before it was set to the
172+
// captured context.
173+
that.contexts[id] = oldContext;
109174
}
110175
};
176+
// Prevent re-wrapping.
111177
contextWrapper[WRAPPED] = true;
178+
// Explicitly inherit the original function's length, because it is
179+
// otherwise zero-ed out.
112180
Object.defineProperty(contextWrapper, 'length', {
113181
enumerable: false,
114182
configurable: true,
@@ -118,11 +186,6 @@ export class AsyncHooksCLS<Context extends {}> implements CLS<Context> {
118186
return contextWrapper;
119187
}
120188

121-
// This function is not technically needed and all tests currently pass
122-
// without it (after removing call sites). While it is not a complete
123-
// solution, restoring correct context before running every request/response
124-
// event handler reduces the number of situations in which userspace queuing
125-
// will cause us to lose context.
126189
patchEmitterToPropagateContext(ee: EventEmitter): void {
127190
const that = this;
128191
EVENT_EMITTER_METHODS.forEach((method) => {

src/cls/base.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ export interface CLS<Context extends {}> {
7272

7373
/**
7474
* Runs the given function as the start of a new continuation.
75+
* TODO(kjin): Merge this with setContext.
7576
* @param fn The function to run synchronously.
7677
* @returns The return result of running `fn`.
7778
*/
@@ -82,6 +83,7 @@ export interface CLS<Context extends {}> {
8283
* the CLS implementation's propagating mechanism doesn't automatically do so.
8384
* If not called from within a continuation, behavior is implementation-
8485
* defined.
86+
* TODO(kjin): Determine a more accurate name for this function.
8587
* @param fn The function to bind.
8688
* @returns A wrapped version of the given function with the same signature.
8789
*/
@@ -91,6 +93,7 @@ export interface CLS<Context extends {}> {
9193
* Patches an EventEmitter to lazily bind all future event listeners on this
9294
* instance so that they belong in the same continuation as the execution
9395
* path in which they were attached to the EventEmitter object.
96+
* TODO(kjin): Determine a more accurate name for this function.
9497
* @param ee The EventEmitter to bind. This instance will be mutated.
9598
*/
9699
patchEmitterToPropagateContext(ee: EventEmitter): void;

test/test-cls-ah.ts

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/**
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import * as assert from 'assert';
18+
import * as asyncHooksModule from 'async_hooks';
19+
import {IContextDefinition} from 'mocha';
20+
import * as semver from 'semver';
21+
22+
import {AsyncHooksCLS} from '../src/cls/async-hooks';
23+
24+
type AsyncHooksModule = typeof asyncHooksModule;
25+
26+
const TEST_ASYNC_RESOURCE = '@google-cloud/trace-agent:test';
27+
const maybeSkip = (describe: IContextDefinition) =>
28+
semver.satisfies(process.version, '>=8.1') ? describe : describe.skip;
29+
30+
maybeSkip(describe)('AsyncHooks-based CLS', () => {
31+
let asyncHooks: AsyncHooksModule;
32+
// tslint:disable-next-line:variable-name
33+
let AsyncResource: typeof asyncHooksModule.AsyncResource;
34+
let cls: AsyncHooksCLS<string>;
35+
36+
before(() => {
37+
asyncHooks = require('async_hooks') as AsyncHooksModule;
38+
AsyncResource = class extends asyncHooks.AsyncResource {
39+
// tslint:disable:no-any
40+
runInAsyncScope<This, Result>(
41+
fn: (this: This, ...args: any[]) => Result, thisArg?: This): Result {
42+
// tslint:enable:no-any
43+
// Polyfill for versions in which runInAsyncScope isn't defined
44+
if (super.runInAsyncScope) {
45+
return super.runInAsyncScope.apply(this, arguments);
46+
} else {
47+
this.emitBefore();
48+
try {
49+
return fn.apply(
50+
thisArg, Array.prototype.slice.apply(arguments).slice(2));
51+
} finally {
52+
this.emitAfter();
53+
}
54+
}
55+
}
56+
};
57+
});
58+
59+
beforeEach(() => {
60+
cls = new AsyncHooksCLS('default');
61+
cls.enable();
62+
});
63+
64+
it('Correctly assumes the type of Promise resources', () => {
65+
const actual: Array<Promise<void>> = [];
66+
const expected: Array<Promise<void>> = [];
67+
const hook = asyncHooks
68+
.createHook({
69+
init:
70+
(uid: number, type: string, tid: number,
71+
resource: {promise: Promise<void>}) => {
72+
if (type === 'PROMISE') {
73+
actual.push(resource.promise);
74+
}
75+
}
76+
})
77+
.enable();
78+
expected.push(Promise.resolve());
79+
expected.push(actual[0].then(() => {}));
80+
assert.deepStrictEqual(actual, expected);
81+
hook.disable();
82+
});
83+
84+
it('Supports basic context propagation across async-await boundaries', () => {
85+
return cls.runWithNewContext(async () => {
86+
cls.setContext('modified');
87+
await Promise.resolve();
88+
assert.strictEqual(cls.getContext(), 'modified');
89+
await Promise.resolve();
90+
assert.strictEqual(cls.getContext(), 'modified');
91+
});
92+
});
93+
94+
describe('Using AsyncResource API', () => {
95+
it('Supports context propagation without trigger ID', async () => {
96+
let res!: asyncHooksModule.AsyncResource;
97+
await cls.runWithNewContext(async () => {
98+
res = new AsyncResource(TEST_ASYNC_RESOURCE);
99+
cls.setContext('modified');
100+
});
101+
res.runInAsyncScope(() => {
102+
assert.strictEqual(cls.getContext(), 'modified');
103+
});
104+
});
105+
106+
it('Supports context propagation with trigger ID', async () => {
107+
let triggerId!: number;
108+
let res!: asyncHooksModule.AsyncResource;
109+
await cls.runWithNewContext(async () => {
110+
triggerId = new AsyncResource(TEST_ASYNC_RESOURCE).asyncId();
111+
cls.setContext('correct');
112+
});
113+
await cls.runWithNewContext(async () => {
114+
res = new AsyncResource(TEST_ASYNC_RESOURCE, triggerId);
115+
cls.setContext('incorrect');
116+
});
117+
res.runInAsyncScope(() => {
118+
assert.strictEqual(cls.getContext(), 'correct');
119+
});
120+
});
121+
});
122+
});

test/test-cls.ts

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -71,22 +71,16 @@ describe('Continuation-Local Storage', () => {
7171
});
7272

7373
describe('Implementations', () => {
74-
const testCases: Array<{clazz: CLSConstructor, testAsyncAwait: boolean}> =
75-
asyncAwaitSupported ?
76-
[
77-
{clazz: AsyncHooksCLS, testAsyncAwait: true},
78-
{clazz: AsyncListenerCLS, testAsyncAwait: false}
79-
] :
80-
[{clazz: AsyncListenerCLS, testAsyncAwait: false}];
74+
const testCases: CLSConstructor[] = asyncAwaitSupported ?
75+
[AsyncHooksCLS, AsyncListenerCLS] :
76+
[AsyncListenerCLS];
8177

8278
for (const testCase of testCases) {
83-
describe(`CLS for class ${testCase.clazz.name}`, () => {
84-
const maybeSkip = (it: ITestDefinition) =>
85-
testCase.testAsyncAwait ? it : it.skip;
79+
describe(`CLS for class ${testCase.name}`, () => {
8680
let c!: CLS<string>;
8781

8882
beforeEach(() => {
89-
c = new testCase.clazz('default');
83+
c = new testCase('default');
9084
c.enable();
9185
});
9286

@@ -157,6 +151,12 @@ describe('Continuation-Local Storage', () => {
157151
runLater();
158152
assert.strictEqual(c.getContext(), 'default');
159153
});
154+
c.runWithNewContext(() => {
155+
c.setContext('modified-but-different');
156+
// bind it again
157+
runLater = c.bindWithCurrentContext(runLater);
158+
});
159+
runLater();
160160
});
161161

162162
it('Corrects context when function run with new context throws', () => {
@@ -234,16 +234,6 @@ describe('Continuation-Local Storage', () => {
234234
});
235235
});
236236
});
237-
238-
maybeSkip(it)(
239-
'Supports basic context propagation across await boundaries',
240-
() => {
241-
return c.runWithNewContext(async () => {
242-
c.setContext('modified');
243-
await Promise.resolve();
244-
assert.strictEqual(c.getContext(), 'modified');
245-
});
246-
});
247237
});
248238
}
249239
});

0 commit comments

Comments
 (0)