Skip to content

Commit 0a08693

Browse files
authored
docs(skills): Add AsyncIterator kind and two-plugin pattern to orchestrion (#7652)
* docs(skills): Add AsyncIterator kind and two-plugin pattern to orchestrion
1 parent f395560 commit 0a08693

File tree

2 files changed

+220
-3
lines changed

2 files changed

+220
-3
lines changed
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
# AsyncIterator Orchestrion Transform
2+
3+
**CRITICAL:** If you are working with async iterators or async generators (methods like `stream()`, `*generate()`, or anything returning `Promise<AsyncIterable>`), you **MUST** read and follow this entire document. The AsyncIterator pattern requires TWO plugins and has specific implementation requirements.
4+
5+
## When to Use AsyncIterator
6+
7+
Use `kind: 'AsyncIterator'` in your Orchestrion config when the target method:
8+
9+
- Returns `Promise<AsyncIterable<T>>`
10+
- Returns `Promise<AsyncIterableIterator<T>>`
11+
- Returns `Promise<IterableReadableStream<T>>`
12+
- Is an async generator function: `async *methodName()`
13+
- Returns any promise that resolves to an async iterable
14+
15+
**Examples:**
16+
```javascript
17+
// These ALL need kind: 'AsyncIterator'
18+
async stream(input) { /* returns Promise<AsyncIterable> */ }
19+
async *generate() { /* async generator */ }
20+
async getStream() { /* returns Promise<ReadableStream> */ }
21+
```
22+
23+
## Two-Channel Pattern
24+
25+
**When `kind: 'AsyncIterator'` is used, Orchestrion automatically creates TWO channels:**
26+
27+
1. **Base channel**: `tracing:orchestrion:{package}:{channelName}:*`
28+
- Fires when the method is called (before iteration starts)
29+
- Used to create the span
30+
31+
2. **Next channel**: `tracing:orchestrion:{package}:{channelName}_next:*`
32+
- Fires on EACH iteration (`next()` call)
33+
- Used to finish the span when `result.done === true`
34+
35+
## Critical Implementation Requirements
36+
37+
You **MUST** create TWO plugins to handle both channels. See the complete LangGraph example below for the full implementation pattern.
38+
39+
### 1. Channel Naming
40+
- Base channel: Uses `channelName` from config exactly as-is
41+
- Next channel: Automatically appends `_next` to `channelName`
42+
- Plugin prefix MUST match the full channel name including `_next`
43+
44+
### 2. Plugin Class Relationship
45+
- Next plugin typically extends the main plugin for consistency
46+
- Both plugins MUST use the same `static id`
47+
- Both plugins handle the same integration
48+
49+
### 3. Span Lifecycle
50+
- **Main plugin `bindStart()`**: Creates span via `this.startSpan()`
51+
- **Next plugin `bindStart()`**: Returns inherited store (NO new span)
52+
- **Next plugin `asyncEnd()`**: Finishes span ONLY when `ctx.result.done === true`
53+
- **Either plugin `error()`**: Finishes span immediately on error
54+
55+
### 4. Plugin Export and Registration
56+
Both plugins MUST be:
57+
- Exported from the plugin file: `module.exports = [StreamPlugin, NextStreamPlugin]`
58+
- Registered in the plugin system (see LangGraph example below)
59+
60+
## Common Mistakes
61+
62+
### ❌ Only creating one plugin
63+
64+
### ❌ Creating new span in Next plugin
65+
66+
### ❌ Finishing span on every iteration
67+
68+
### ❌ Wrong channel suffix
69+
70+
## Complete Example: LangGraph Stream
71+
72+
### Orchestrion Config
73+
```javascript
74+
// packages/datadog-instrumentations/src/helpers/rewriter/instrumentations/langgraph.js
75+
module.exports = [
76+
{
77+
module: {
78+
name: '@langchain/langgraph',
79+
versionRange: '>=1.2.0',
80+
filePath: 'dist/pregel/index.js'
81+
},
82+
functionQuery: {
83+
methodName: 'stream',
84+
className: 'Pregel',
85+
kind: 'AsyncIterator' // ← Critical
86+
},
87+
channelName: 'Pregel_stream'
88+
}
89+
]
90+
```
91+
92+
### Plugin Implementation
93+
```javascript
94+
// packages/datadog-plugin-langchain-langgraph/src/tracing.js
95+
const { TracingPlugin } = require('../../dd-trace/src/plugins/tracing')
96+
97+
class StreamPlugin extends TracingPlugin {
98+
static id = 'langgraph'
99+
static prefix = 'tracing:orchestrion:@langchain/langgraph:Pregel_stream'
100+
101+
bindStart (ctx) {
102+
const input = ctx.arguments?.[0]
103+
104+
this.startSpan('langgraph.stream', {
105+
service: this.config.service,
106+
kind: 'internal',
107+
component: 'langgraph',
108+
meta: {
109+
'langgraph.input': JSON.stringify(input)
110+
}
111+
}, ctx)
112+
113+
return ctx.currentStore
114+
}
115+
}
116+
117+
class NextStreamPlugin extends StreamPlugin {
118+
static id = 'langgraph'
119+
static prefix = 'tracing:orchestrion:@langchain/langgraph:Pregel_stream_next'
120+
121+
bindStart (ctx) {
122+
return ctx.currentStore // Inherit span from StreamPlugin
123+
}
124+
125+
asyncEnd (ctx) {
126+
const span = ctx.currentStore?.span
127+
if (!span) return
128+
129+
if (ctx.result.done === true) {
130+
span.setTag('langgraph.chunks', ctx.result.value?.length || 0)
131+
span.finish()
132+
}
133+
}
134+
135+
error (ctx) {
136+
const span = ctx.currentStore?.span
137+
if (span) {
138+
this.addError(ctx?.error, span)
139+
span.finish()
140+
}
141+
}
142+
}
143+
144+
module.exports = [StreamPlugin, NextStreamPlugin]
145+
```
146+
147+
## Testing AsyncIterator Integrations
148+
149+
When testing AsyncIterator instrumentation:
150+
151+
1. **Test span creation**: Verify span starts when method is called
152+
2. **Test iteration**: Verify span stays open during iteration
153+
3. **Test completion**: Verify span finishes when iterator is exhausted
154+
4. **Test early termination**: Verify span finishes if iteration stops early
155+
5. **Test error handling**: Verify span finishes and captures error
156+
157+
```javascript
158+
it('should trace stream() method with AsyncIterator', async () => {
159+
const result = await myLib.stream(input)
160+
161+
// Iterate through results
162+
const chunks = []
163+
for await (const chunk of result) {
164+
chunks.push(chunk)
165+
}
166+
167+
// Verify span exists and finished
168+
await agent.assertSomeTraces(traces => {
169+
const span = traces[0][0]
170+
expect(span.name).to.equal('mylib.stream')
171+
expect(span.meta.component).to.equal('mylib')
172+
// Span should be complete after iteration finishes
173+
})
174+
})
175+
```
176+
177+
## Summary Checklist
178+
179+
When implementing AsyncIterator instrumentation:
180+
181+
- [ ] Orchestrion config uses `kind: 'AsyncIterator'`
182+
- [ ] Created TWO plugin classes (Main + Next)
183+
- [ ] Next plugin prefix has `_next` suffix
184+
- [ ] Both plugins use same `static id`
185+
- [ ] Main plugin creates span in `bindStart()`
186+
- [ ] Next plugin returns inherited store in `bindStart()`
187+
- [ ] Next plugin checks `result.done === true` before finishing span
188+
- [ ] Both plugins handle errors and finish span
189+
- [ ] Both plugins exported in module.exports array
190+
- [ ] Tests verify span lifecycle (start, iteration, completion)

.agents/skills/apm-integrations/references/orchestrion.md

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ Each entry in the instrumentations array:
5252

5353
// Option A: functionQuery (recommended)
5454
functionQuery: {
55-
kind: 'Async' | 'Callback' | 'Sync', // transform type (see below)
55+
kind: 'Async' | 'AsyncIterator' | 'Callback' | 'Sync', // transform type (see below)
5656
methodName: string, // class method or property method name
5757
className?: string, // scope to a specific class
5858
functionName?: string, // target a FunctionDeclaration (alternative to methodName)
@@ -119,18 +119,45 @@ Example with `module.name: "@langchain/core"` and `channelName: "RunnableSequenc
119119

120120
## Function Kinds and Transforms
121121

122-
Orchestrion supports three transform types, selected by the `kind` field:
122+
Orchestrion supports four transform types, selected by the `kind` field:
123123

124124
| Kind | Transform | Behavior |
125125
|------|-----------|----------|
126126
| `Async` | `tracePromise` | Wraps in async arrow, calls `channel.tracePromise()` — handles promise resolution/rejection |
127+
| `AsyncIterator` | `traceAsyncIterator` | Wraps async generators/iterators — creates TWO channels: base and `_next` (**see [async-iterator-pattern.md](./async-iterator-pattern.md)**) |
127128
| `Callback` | `traceCallback` | Intercepts callback at `arguments[index]` (default: last arg, i.e. `-1`), wraps it to publish `asyncStart`/`asyncEnd`/`error` events |
128129
| `Sync` | `traceSync` | Wraps in non-async arrow, calls `channel.traceSync()` — handles synchronous return/throw. **Note:** `Sync` is the default when `kind` is omitted or unrecognized. |
129130

130-
All three transforms dispatch to `traceFunction` (for standalone functions) or `traceInstanceMethod` (for class methods, including inherited ones via constructor patching).
131+
All transforms dispatch to `traceFunction` (for standalone functions) or `traceInstanceMethod` (for class methods, including inherited ones via constructor patching).
131132

132133
For `Callback` kind, use the `index` field to specify which argument is the callback (defaults to `-1`, meaning the last argument).
133134

135+
### AsyncIterator Pattern (Two Plugins Required)
136+
137+
**⚠️ CRITICAL:** `AsyncIterator` is a special transform that requires **TWO plugins** and has specific implementation requirements.
138+
139+
**When to use:**
140+
- Method returns `Promise<AsyncIterable>`, `Promise<AsyncIterableIterator>`, or `Promise<IterableReadableStream>`
141+
- Async generator functions: `async *methodName()`
142+
143+
**How it works:**
144+
- Orchestrion creates **TWO channels**: base channel and `{channelName}_next` channel
145+
- **Main plugin**: Creates span when method is called
146+
- **Next plugin**: Finishes span when `result.done === true` (after all iterations complete)
147+
148+
**📖 REQUIRED READING:** If you are implementing an AsyncIterator integration, you **MUST** read the complete guide:
149+
150+
👉 **[AsyncIterator Pattern Reference](./async-iterator-pattern.md)** 👈
151+
152+
This pattern is complex and easy to get wrong. The reference document covers:
153+
- Two-channel pattern details
154+
- Complete plugin implementation examples
155+
- Common mistakes and how to avoid them
156+
- Testing strategies
157+
- Full working example (LangGraph)
158+
159+
**DO NOT** attempt to implement AsyncIterator without reading the full reference.
160+
134161
## Finding the Right filePath
135162

136163
1. Install the package: `npm install <package>`

0 commit comments

Comments
 (0)