Skip to content

Commit 510563d

Browse files
authored
test: fix flaky confluenctinc kafka error (#7460)
The broker needs some time after creating a new topic to settle in. The old commented error test case could not work due to expecting to catch a user error (which we should not). It is therefore removed.
1 parent c4ee484 commit 510563d

File tree

1 file changed

+48
-44
lines changed
  • packages/datadog-plugin-confluentinc-kafka-javascript/test

1 file changed

+48
-44
lines changed

packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js

Lines changed: 48 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ describe('Plugin', () => {
6565
replicationFactor: 1,
6666
}],
6767
})
68+
69+
// `createTopics()` returns before leaders are guaranteed to be elected in this client.
70+
// If we race ahead immediately, consumers/producers can stall on metadata/leader availability.
71+
await waitForTopicReady(admin, testTopic)
6872
})
6973

7074
afterEach(() => admin.disconnect())
@@ -153,17 +157,19 @@ describe('Plugin', () => {
153157
type: 'worker',
154158
})
155159

156-
const consumerReceiveMessagePromise = new Promise(resolve => {
160+
const consumerReceiveMessagePromise = /** @type {Promise<void>} */(new Promise((resolve, reject) => {
157161
consumer.run({
158162
eachMessage: () => {
159163
resolve()
160164
},
161165
})
162-
})
163-
await sendMessages(kafka, testTopic, messages).then(
164-
async () => await consumerReceiveMessagePromise
165-
)
166-
return expectedSpanPromise
166+
}))
167+
168+
await Promise.all([
169+
sendMessages(kafka, testTopic, messages),
170+
consumerReceiveMessagePromise,
171+
expectedSpanPromise,
172+
])
167173
})
168174

169175
it('should run the consumer in the context of the consumer span', done => {
@@ -179,7 +185,7 @@ describe('Plugin', () => {
179185
} catch (e) {
180186
done(e)
181187
} finally {
182-
eachMessage = () => {} // avoid being called for each message
188+
eachMessage = async () => {} // avoid being called for each message
183189
}
184190
}
185191

@@ -271,25 +277,25 @@ describe('Plugin', () => {
271277
dr_cb: true,
272278
})
273279

274-
await new Promise((resolve, reject) => {
280+
await /** @type {Promise<void>} */(new Promise((resolve, reject) => {
275281
nativeProducer.connect({}, (err) => {
276282
if (err) {
277283
return reject(err)
278284
}
279285
resolve()
280286
})
281-
})
287+
}))
282288
})
283289

284290
afterEach(async () => {
285-
await new Promise((resolve, reject) => {
291+
await /** @type {Promise<void>} */(new Promise((resolve, reject) => {
286292
nativeProducer.disconnect((err) => {
287293
if (err) {
288294
return reject(err)
289295
}
290296
resolve()
291297
})
292-
})
298+
}))
293299
})
294300

295301
describe('producer', () => {
@@ -354,30 +360,30 @@ describe('Plugin', () => {
354360
'auto.offset.reset': 'earliest',
355361
})
356362

357-
await new Promise((resolve, reject) => {
363+
await /** @type {Promise<void>} */(new Promise((resolve, reject) => {
358364
nativeConsumer.connect({}, (err) => {
359365
if (err) {
360366
return reject(err)
361367
}
362368
resolve()
363369
})
364-
})
370+
}))
365371
})
366372

367373
afterEach(async () => {
368374
await nativeConsumer.unsubscribe()
369-
await new Promise((resolve, reject) => {
375+
await /** @type {Promise<void>} */(new Promise((resolve, reject) => {
370376
nativeConsumer.disconnect((err) => {
371377
if (err) {
372378
return reject(err)
373379
}
374380
resolve()
375381
})
376-
})
382+
}))
377383
})
378384

379385
function consume (consumer, producer, topic, message, timeoutMs = 9500) {
380-
return new Promise((resolve, reject) => {
386+
return /** @type {Promise<void>} */(new Promise((resolve, reject) => {
381387
const timeoutId = setTimeout(() => {
382388
reject(new Error(`Timeout: Did not consume message on topic "${topic}" within ${timeoutMs}ms`))
383389
}, timeoutMs)
@@ -408,7 +414,7 @@ describe('Plugin', () => {
408414
}
409415
doConsume()
410416
producer.produce(topic, null, message, 'native-consumer-key')
411-
})
417+
}))
412418
}
413419

414420
it('should be instrumented', async () => {
@@ -458,33 +464,6 @@ describe('Plugin', () => {
458464

459465
return expectedSpanPromise
460466
})
461-
462-
// TODO: Fix this test case, fails with 'done() called multiple times'
463-
// it('should be instrumented with error', async () => {
464-
// const fakeError = new Error('Oh No!')
465-
466-
// const expectedSpanPromise = agent.assertSomeTraces(traces => {
467-
// const errorSpans = traces[0].filter(span => span.error === 1)
468-
// expect(errorSpans.length).to.be.at.least(1)
469-
470-
// const errorSpan = errorSpans[0]
471-
// expect(errorSpan).to.exist
472-
// expect(errorSpan.name).to.equal(expectedSchema.receive.opName)
473-
// expect(errorSpan.meta).to.include({
474-
// component: 'confluentinc-kafka-javascript'
475-
// })
476-
477-
// expect(errorSpan.meta[ERROR_TYPE]).to.equal(fakeError.name)
478-
// expect(errorSpan.meta[ERROR_MESSAGE]).to.equal(fakeError.message)
479-
// })
480-
481-
// nativeConsumer.consume(1, (err, messages) => {
482-
// // Ensure we resolve before throwing
483-
// throw fakeError
484-
// })
485-
486-
// return expectedSpanPromise
487-
// })
488467
})
489468
})
490469
})
@@ -511,3 +490,28 @@ async function sendMessages (kafka, topic, messages) {
511490
})
512491
await producer.disconnect()
513492
}
493+
494+
async function waitForTopicReady (admin, topic, timeoutMs = 20000) {
495+
if (typeof admin?.fetchTopicMetadata !== 'function') return
496+
497+
const start = Date.now()
498+
while ((Date.now() - start) < timeoutMs) {
499+
try {
500+
const meta = await admin.fetchTopicMetadata({ topics: [topic], timeout: 1000 })
501+
const topicMeta = Array.isArray(meta) ? meta[0] : meta?.topics?.[0]
502+
503+
const partitions = topicMeta?.partitions
504+
if (Array.isArray(partitions) &&
505+
partitions.length > 0 &&
506+
partitions.every(p => typeof p.leader === 'number' && p.leader >= 0)) {
507+
return
508+
}
509+
} catch {
510+
// Topic creation is async; metadata/leader errors can be transient.
511+
}
512+
513+
await new Promise(resolve => setTimeout(resolve, 50))
514+
}
515+
516+
throw new Error(`Timeout: Topic "${topic}" metadata was not ready within ${timeoutMs}ms`)
517+
}

0 commit comments

Comments
 (0)