Skip to content

Adds Pulsar sender#273

Merged
codefromthecrypt merged 5 commits intoopenzipkin:masterfrom
CodePrometheus:add-pulsar-sender
Feb 12, 2025
Merged

Adds Pulsar sender#273
codefromthecrypt merged 5 commits intoopenzipkin:masterfrom
CodePrometheus:add-pulsar-sender

Conversation

@CodePrometheus
Copy link
Copy Markdown
Contributor

@CodePrometheus CodePrometheus commented Jan 28, 2025

Ref openzipkin/zipkin#3788

  • Add Pulsar sender
  • Add tests for all changes
  • Add corresponding documentation
Benchmark                                      (messageMaxBytes)   Mode  Cnt    Score   Error  Units
PulsarSenderBenchmarks.report                              65536  thrpt   15  944.957 ± 2.515  ops/s
PulsarSenderBenchmarks.report:messages                     65536  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:messagesDropped              65536  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:spans                        65536  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report                             500000  thrpt   15  943.387 ± 3.743  ops/s
PulsarSenderBenchmarks.report:messages                    500000  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:messagesDropped             500000  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:spans                       500000  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report                            5242880  thrpt   15  942.780 ± 8.561  ops/s
PulsarSenderBenchmarks.report:messages                   5242880  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:messagesDropped            5242880  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:spans                      5242880  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report                           16777216  thrpt   15  942.816 ± 4.404  ops/s
PulsarSenderBenchmarks.report:messages                  16777216  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:messagesDropped           16777216  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:spans                     16777216  thrpt   15      ≈ 0          ops/s

@codefromthecrypt
Copy link
Copy Markdown
Member

@CodePrometheus ps on things you need to complete this task, please mention me directly. I don't watch all repo notifications for hobby time stuff.

Copy link
Copy Markdown
Member

@codefromthecrypt codefromthecrypt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is very good and will defer to @reta for follow-up. Main hesitation here is about the sender creating topics, which I think is too much responsibility. Other is having an import dep on slf4j which while today could be ok, could be a headache tomorrow.

Comment thread benchmarks/src/test/java/zipkin2/reporter/PulsarSenderBenchmarks.java Outdated

import static org.testcontainers.utility.DockerImageName.parse;

public class PulsarSenderBenchmarks extends SenderBenchmarks {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add to the description the run of this benchmark in triple backticks

Comment thread pulsar-client/pom.xml
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good. sender name matches the artifact here!

}
});
} catch (Exception e) {
throw new RuntimeException("Pulsar producer send failed." + e.getMessage(), e);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createIfNeeded(message);
}

void createIfNeeded(byte[] message) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try to refactor this a bit like KafkaSender, you can make a function get() which can throw as needed. Then, at the call site handle the exceptions in one place.

I would recommend not doing logging and the main reason is for slf4j lockups. If the version of slf4j changes for pulsar, we'd have a revlock here. this is one reason why others either don't log or they use JUL.

Finally, consider if we should implicitly create a topic, as that causes failure cases and more code. I don't think we imiplicitly create topics anywhere else, but I could be mistaken. If the docker image we use should have a default topic of zipkin we could add it into the image for convenience or set the image to auto-create topics.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I try to make the message sending method clearer, for pulsar, the topic is automatically created by default.

Comment thread pulsar-client/pom.xml Outdated
Comment thread pulsar-client/src/main/java/zipkin2/reporter/pulsar/PulsarSender.java Outdated
@reta
Copy link
Copy Markdown
Contributor

reta commented Feb 9, 2025

I think this is very good and will defer to @reta for follow-up.

Great job @CodePrometheus , a few comments but pretty much LGTM ! Thank you!

@CodePrometheus
Copy link
Copy Markdown
Contributor Author

@codefromthecrypt @reta Thanks for your time! I tried to fix and answer the questions raised.

Benchmark results on my local are as follows:

Benchmark                                      (messageMaxBytes)   Mode  Cnt    Score   Error  Units
PulsarSenderBenchmarks.report                              65536  thrpt   15  944.957 ± 2.515  ops/s
PulsarSenderBenchmarks.report:messages                     65536  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:messagesDropped              65536  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:spans                        65536  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report                             500000  thrpt   15  943.387 ± 3.743  ops/s
PulsarSenderBenchmarks.report:messages                    500000  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:messagesDropped             500000  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:spans                       500000  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report                            5242880  thrpt   15  942.780 ± 8.561  ops/s
PulsarSenderBenchmarks.report:messages                   5242880  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:messagesDropped            5242880  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:spans                      5242880  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report                           16777216  thrpt   15  942.816 ± 4.404  ops/s
PulsarSenderBenchmarks.report:messages                  16777216  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:messagesDropped           16777216  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:spans                     16777216  thrpt   15      ≈ 0          ops/s

Comment thread pulsar-client/src/main/java/zipkin2/reporter/pulsar/PulsarSender.java Outdated
Comment thread pulsar-client/src/main/java/zipkin2/reporter/pulsar/PulsarSender.java Outdated
Comment thread pulsar-client/src/main/java/zipkin2/reporter/pulsar/PulsarSender.java Outdated
Comment thread pulsar-client/src/main/java/zipkin2/reporter/pulsar/PulsarSender.java Outdated
@codefromthecrypt codefromthecrypt merged commit cacf9e9 into openzipkin:master Feb 12, 2025
@codefromthecrypt
Copy link
Copy Markdown
Member

online until end of tomorrow UTC+8, so hopping in. Nice one!

@CodePrometheus CodePrometheus deleted the add-pulsar-sender branch February 12, 2025 02:38
void sendMessage(byte[] message) {
if (client == null) {
synchronized (this) {
if (client == null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this method (sendMessage) do nothing after the client is created (client != null)? Generally meaning that calls after the first to sendMessage will not actually send the message? Am I missing something?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the following test to ITPulsarSender which fails on the second assertion with "Timed out waiting to read message."

@Test void send_multiple_JSON_messages() throws Exception {
  try (PulsarSender sender = pulsar.newSenderBuilder(testName)
    .encoding(Encoding.JSON)
    .build()) {
    send(sender, CLIENT_SPAN, CLIENT_SPAN);
    send(sender, CLIENT_SPAN);

    assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender))).containsExactly(
      CLIENT_SPAN, CLIENT_SPAN);
    assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender))).containsExactly(
      CLIENT_SPAN);
  }
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right @shakuzen , probably lost in rounds of review

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OMG, this was a big mistake of mine. Thank you for pointing it out, I'll fix it as soon as possible.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love to see this collaboration! thanks @shakuzen!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants