Skip to content

Commit 19008be

Browse files
committed
Addressing review comments
1 parent 535735e commit 19008be

File tree

1 file changed

+5
-1
lines changed
  • pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar

1 file changed

+5
-1
lines changed

pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535

3636
/**
3737
* A {@link MessageBatch} for collecting messages from pulsar topic
38+
*
39+
* When 'enableKeyValueStitch' flag is enabled, existing {@link org.apache.pinot.spi.stream.StreamMessageDecoder}
40+
* plugins will not work. A custom decoder will be needed to unpack key and value byte arrays and decode
41+
* them independently.
3842
*/
3943
public class PulsarMessageBatch implements MessageBatch<byte[]> {
4044
private static final Logger LOGGER = LoggerFactory.getLogger(PulsarMessageBatch.class);
@@ -58,7 +62,7 @@ public byte[] getMessageAtIndex(int index) {
5862
if (_enableKeyValueStitch) {
5963
return stitchKeyValue(msg.getKeyBytes(), msg.getData());
6064
}
61-
return _messageList.get(index).getData();
65+
return msg.getData();
6266
}
6367

6468
@Override

0 commit comments

Comments
 (0)