Adds Pulsar sender#273
Conversation
b77e1e0 to
3285e16
Compare
|
@CodePrometheus ps on things you need to complete this task, please mention me directly. I don't watch all repo notifications for hobby time stuff. |
codefromthecrypt
left a comment
There was a problem hiding this comment.
I think this is very good and will defer to @reta for follow-up. Main hesitation here is about the sender creating topics, which I think is too much responsibility. Other is having an import dep on slf4j which while today could be ok, could be a headache tomorrow.
|
|
||
| import static org.testcontainers.utility.DockerImageName.parse; | ||
|
|
||
| public class PulsarSenderBenchmarks extends SenderBenchmarks { |
There was a problem hiding this comment.
please add to the description the run of this benchmark in triple backticks
| </dependency> | ||
| <dependency> | ||
| <groupId>org.apache.pulsar</groupId> | ||
| <artifactId>pulsar-client</artifactId> |
There was a problem hiding this comment.
good. sender name matches the artifact here!
| } | ||
| }); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException("Pulsar producer send failed." + e.getMessage(), e); |
| createIfNeeded(message); | ||
| } | ||
|
|
||
| void createIfNeeded(byte[] message) { |
There was a problem hiding this comment.
try to refactor this a bit like KafkaSender, you can make a function get() which can throw as needed. Then, at the call site handle the exceptions in one place.
I would recommend not doing logging and the main reason is for slf4j lockups. If the version of slf4j changes for pulsar, we'd have a revlock here. this is one reason why others either don't log or they use JUL.
Finally, consider if we should implicitly create a topic, as that causes failure cases and more code. I don't think we imiplicitly create topics anywhere else, but I could be mistaken. If the docker image we use should have a default topic of zipkin we could add it into the image for convenience or set the image to auto-create topics.
There was a problem hiding this comment.
Well, I try to make the message sending method clearer, for pulsar, the topic is automatically created by default.
Great job @CodePrometheus , a few comments but pretty much LGTM ! Thank you! |
|
@codefromthecrypt @reta Thanks for your time! I tried to fix and answer the questions raised. Benchmark results on my local are as follows: |
|
online until end of tomorrow UTC+8, so hopping in. Nice one! |
| void sendMessage(byte[] message) { | ||
| if (client == null) { | ||
| synchronized (this) { | ||
| if (client == null) { |
There was a problem hiding this comment.
Doesn't this method (sendMessage) do nothing after the client is created (client != null)? Generally meaning that calls after the first to sendMessage will not actually send the message? Am I missing something?
There was a problem hiding this comment.
I added the following test to ITPulsarSender which fails on the second assertion with "Timed out waiting to read message."
@Test void send_multiple_JSON_messages() throws Exception {
try (PulsarSender sender = pulsar.newSenderBuilder(testName)
.encoding(Encoding.JSON)
.build()) {
send(sender, CLIENT_SPAN, CLIENT_SPAN);
send(sender, CLIENT_SPAN);
assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender))).containsExactly(
CLIENT_SPAN, CLIENT_SPAN);
assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender))).containsExactly(
CLIENT_SPAN);
}
}There was a problem hiding this comment.
Yeah, you are right @shakuzen , probably lost in rounds of review
There was a problem hiding this comment.
OMG, this was a big mistake of mine. Thank you for pointing it out, I'll fix it as soon as possible.
Ref openzipkin/zipkin#3788