@@ -65,6 +65,10 @@ describe('Plugin', () => {
6565 replicationFactor : 1 ,
6666 } ] ,
6767 } )
68+
69+ // `createTopics()` returns before leaders are guaranteed to be elected in this client.
70+ // If we race ahead immediately, consumers/producers can stall on metadata/leader availability.
71+ await waitForTopicReady ( admin , testTopic )
6872 } )
6973
7074 afterEach ( ( ) => admin . disconnect ( ) )
@@ -153,17 +157,19 @@ describe('Plugin', () => {
153157 type : 'worker' ,
154158 } )
155159
156- const consumerReceiveMessagePromise = new Promise ( resolve => {
160+ const consumerReceiveMessagePromise = /** @type { Promise<void> } */ ( new Promise ( ( resolve , reject ) => {
157161 consumer . run ( {
158162 eachMessage : ( ) => {
159163 resolve ( )
160164 } ,
161165 } )
162- } )
163- await sendMessages ( kafka , testTopic , messages ) . then (
164- async ( ) => await consumerReceiveMessagePromise
165- )
166- return expectedSpanPromise
166+ } ) )
167+
168+ await Promise . all ( [
169+ sendMessages ( kafka , testTopic , messages ) ,
170+ consumerReceiveMessagePromise ,
171+ expectedSpanPromise ,
172+ ] )
167173 } )
168174
169175 it ( 'should run the consumer in the context of the consumer span' , done => {
@@ -179,7 +185,7 @@ describe('Plugin', () => {
179185 } catch ( e ) {
180186 done ( e )
181187 } finally {
182- eachMessage = ( ) => { } // avoid being called for each message
188+ eachMessage = async ( ) => { } // avoid being called for each message
183189 }
184190 }
185191
@@ -271,25 +277,25 @@ describe('Plugin', () => {
271277 dr_cb : true ,
272278 } )
273279
274- await new Promise ( ( resolve , reject ) => {
280+ await /** @type { Promise<void> } */ ( new Promise ( ( resolve , reject ) => {
275281 nativeProducer . connect ( { } , ( err ) => {
276282 if ( err ) {
277283 return reject ( err )
278284 }
279285 resolve ( )
280286 } )
281- } )
287+ } ) )
282288 } )
283289
284290 afterEach ( async ( ) => {
285- await new Promise ( ( resolve , reject ) => {
291+ await /** @type { Promise<void> } */ ( new Promise ( ( resolve , reject ) => {
286292 nativeProducer . disconnect ( ( err ) => {
287293 if ( err ) {
288294 return reject ( err )
289295 }
290296 resolve ( )
291297 } )
292- } )
298+ } ) )
293299 } )
294300
295301 describe ( 'producer' , ( ) => {
@@ -354,30 +360,30 @@ describe('Plugin', () => {
354360 'auto.offset.reset' : 'earliest' ,
355361 } )
356362
357- await new Promise ( ( resolve , reject ) => {
363+ await /** @type { Promise<void> } */ ( new Promise ( ( resolve , reject ) => {
358364 nativeConsumer . connect ( { } , ( err ) => {
359365 if ( err ) {
360366 return reject ( err )
361367 }
362368 resolve ( )
363369 } )
364- } )
370+ } ) )
365371 } )
366372
367373 afterEach ( async ( ) => {
368374 await nativeConsumer . unsubscribe ( )
369- await new Promise ( ( resolve , reject ) => {
375+ await /** @type { Promise<void> } */ ( new Promise ( ( resolve , reject ) => {
370376 nativeConsumer . disconnect ( ( err ) => {
371377 if ( err ) {
372378 return reject ( err )
373379 }
374380 resolve ( )
375381 } )
376- } )
382+ } ) )
377383 } )
378384
379385 function consume ( consumer , producer , topic , message , timeoutMs = 9500 ) {
380- return new Promise ( ( resolve , reject ) => {
386+ return /** @type { Promise<void> } */ ( new Promise ( ( resolve , reject ) => {
381387 const timeoutId = setTimeout ( ( ) => {
382388 reject ( new Error ( `Timeout: Did not consume message on topic "${ topic } " within ${ timeoutMs } ms` ) )
383389 } , timeoutMs )
@@ -408,7 +414,7 @@ describe('Plugin', () => {
408414 }
409415 doConsume ( )
410416 producer . produce ( topic , null , message , 'native-consumer-key' )
411- } )
417+ } ) )
412418 }
413419
414420 it ( 'should be instrumented' , async ( ) => {
@@ -458,33 +464,6 @@ describe('Plugin', () => {
458464
459465 return expectedSpanPromise
460466 } )
461-
462- // TODO: Fix this test case, fails with 'done() called multiple times'
463- // it('should be instrumented with error', async () => {
464- // const fakeError = new Error('Oh No!')
465-
466- // const expectedSpanPromise = agent.assertSomeTraces(traces => {
467- // const errorSpans = traces[0].filter(span => span.error === 1)
468- // expect(errorSpans.length).to.be.at.least(1)
469-
470- // const errorSpan = errorSpans[0]
471- // expect(errorSpan).to.exist
472- // expect(errorSpan.name).to.equal(expectedSchema.receive.opName)
473- // expect(errorSpan.meta).to.include({
474- // component: 'confluentinc-kafka-javascript'
475- // })
476-
477- // expect(errorSpan.meta[ERROR_TYPE]).to.equal(fakeError.name)
478- // expect(errorSpan.meta[ERROR_MESSAGE]).to.equal(fakeError.message)
479- // })
480-
481- // nativeConsumer.consume(1, (err, messages) => {
482- // // Ensure we resolve before throwing
483- // throw fakeError
484- // })
485-
486- // return expectedSpanPromise
487- // })
488467 } )
489468 } )
490469 } )
@@ -511,3 +490,28 @@ async function sendMessages (kafka, topic, messages) {
511490 } )
512491 await producer . disconnect ( )
513492}
493+
494+ async function waitForTopicReady ( admin , topic , timeoutMs = 20000 ) {
495+ if ( typeof admin ?. fetchTopicMetadata !== 'function' ) return
496+
497+ const start = Date . now ( )
498+ while ( ( Date . now ( ) - start ) < timeoutMs ) {
499+ try {
500+ const meta = await admin . fetchTopicMetadata ( { topics : [ topic ] , timeout : 1000 } )
501+ const topicMeta = Array . isArray ( meta ) ? meta [ 0 ] : meta ?. topics ?. [ 0 ]
502+
503+ const partitions = topicMeta ?. partitions
504+ if ( Array . isArray ( partitions ) &&
505+ partitions . length > 0 &&
506+ partitions . every ( p => typeof p . leader === 'number' && p . leader >= 0 ) ) {
507+ return
508+ }
509+ } catch {
510+ // Topic creation is async; metadata/leader errors can be transient.
511+ }
512+
513+ await new Promise ( resolve => setTimeout ( resolve , 50 ) )
514+ }
515+
516+ throw new Error ( `Timeout: Topic "${ topic } " metadata was not ready within ${ timeoutMs } ms` )
517+ }
0 commit comments