Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,64 @@ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
clientBuilder.memoryLimit(10000L, SizeUnit.BYTES);
}

@Test
public void testSeekChunkMessages() throws PulsarClientException {
log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(5);
final int totalMessages = 5;
final String topicName = "persistent://my-property/my-ns/test-seek-chunk";

ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);

Producer<byte[]> producer = producerBuilder
.enableChunking(true)
.enableBatching(false)
.create();

Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("inclusive-seek")
.startMessageIdInclusive()
.subscribe();

Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("default-seek")
.subscribe();

for (int i = 0; i < totalMessages; i++) {
String message = createMessagePayload(10);
producer.send(message.getBytes());
}

Message<byte[]> msg = null;
List<MessageId> msgIds = Lists.newArrayList();
for (int i = 0; i < totalMessages; i++) {
msg = consumer1.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.info("[{}] - Received message: [{}]", i, receivedMessage);
msgIds.add(msg.getMessageId());
}

consumer1.seek(msgIds.get(1));
for (int i = 1; i < totalMessages; i++) {
Message<byte[]> msgAfterSeek = consumer1.receive(5, TimeUnit.SECONDS);
assertEquals(msgIds.get(i), msgAfterSeek.getMessageId());
}

consumer2.seek(msgIds.get(1));
for (int i = 2; i < totalMessages; i++) {
Message<byte[]> msgAfterSeek = consumer2.receive(5, TimeUnit.SECONDS);
assertEquals(msgIds.get(i), msgAfterSeek.getMessageId());
}

consumer1.close();
consumer2.close();
producer.close();

log.info("-- Exiting {} test --", methodName);
}

private String createMessagePayload(int size) {
StringBuilder str = new StringBuilder();
Random rand = new Random();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.Objects;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.MessageIdData;

public class ChunkMessageIdImpl extends MessageIdImpl implements MessageId {
private final MessageIdImpl firstChunkMsgId;

public ChunkMessageIdImpl(MessageIdImpl firstChunkMsgId, MessageIdImpl lastChunkMsgId) {
super(lastChunkMsgId.getLedgerId(), lastChunkMsgId.getEntryId(), lastChunkMsgId.getPartitionIndex());
this.firstChunkMsgId = firstChunkMsgId;
}

public MessageIdImpl getFirstChunkMessageId() {
return firstChunkMsgId;
}

public MessageIdImpl getLastChunkMessageId() {
return this;
}

@Override
public String toString() {
return firstChunkMsgId.toString() + ';' + super.toString();
}

@Override
public byte[] toByteArray() {

// write last chunk message id
MessageIdData msgId = super.writeMessageIdData(null, -1, 0);

// write first chunk message id
msgId.setFirstChunkMessageId();
firstChunkMsgId.writeMessageIdData(msgId.getFirstChunkMessageId(), -1, 0);

int size = msgId.getSerializedSize();
ByteBuf serialized = Unpooled.buffer(size, size);
msgId.writeTo(serialized);

return serialized.array();
}

@Override
public boolean equals(Object o) {
return super.equals(o);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), firstChunkMsgId.hashCode());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,24 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
if (uncompressedPayload == null) {
return;
}

// last chunk received: so, stitch chunked-messages and clear up chunkedMsgBuffer
if (log.isDebugEnabled()) {
log.debug("Chunked message completed chunkId {}, total-chunks {}, msgId {} sequenceId {}",
msgMetadata.getChunkId(), msgMetadata.getNumChunksFromMsg(), msgId,
msgMetadata.getSequenceId());
}

// remove buffer from the map, set the chunk message id
ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.remove(msgMetadata.getUuid());
if (chunkedMsgCtx.chunkedMessageIds.length > 0) {
msgId = new ChunkMessageIdImpl(chunkedMsgCtx.chunkedMessageIds[0],
chunkedMsgCtx.chunkedMessageIds[chunkedMsgCtx.chunkedMessageIds.length - 1]);
Comment on lines +1226 to +1227
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdhabalia Here is where the consumer sets the first chunk message id and the last chunk message-id. It does not depend on the first chunk message-id set by the producer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove all producer-side changes then because the producer can't give a guarantee to set messageId(ledgerId,entryId) of the chunk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdhabalia The producer will not set the message ID of the chunk, the producer side change just return a chunk message ID(start, end) to users.

}
// add chunked messageId to unack-message tracker, and reduce pending-chunked-message count
unAckedChunkedMessageIdSequenceMap.put(msgId, chunkedMsgCtx.chunkedMessageIds);
pendingChunkedMessageCount--;
chunkedMsgCtx.recycle();
}

// If the topic is non-persistent, we should not ignore any messages.
Expand Down Expand Up @@ -1317,18 +1335,8 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
return null;
}

// last chunk received: so, stitch chunked-messages and clear up chunkedMsgBuffer
if (log.isDebugEnabled()) {
log.debug("Chunked message completed chunkId {}, total-chunks {}, msgId {} sequenceId {}",
msgMetadata.getChunkId(), msgMetadata.getNumChunksFromMsg(), msgId, msgMetadata.getSequenceId());
}
// remove buffer from the map, add chunked messageId to unack-message tracker, and reduce pending-chunked-message count
chunkedMessagesMap.remove(msgMetadata.getUuid());
unAckedChunkedMessageIdSequenceMap.put(msgId, chunkedMsgCtx.chunkedMessageIds);
pendingChunkedMessageCount--;
compressedPayload.release();
compressedPayload = chunkedMsgCtx.chunkedMsgBuffer;
chunkedMsgCtx.recycle();
ByteBuf uncompressedPayload = uncompressPayloadIfNeeded(messageId, msgMetadata, compressedPayload, cnx, false);
compressedPayload.release();
return uncompressedPayload;
Expand Down Expand Up @@ -1986,6 +1994,10 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
ackSet.recycle();

seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
} else if (messageId instanceof ChunkMessageIdImpl) {
ChunkMessageIdImpl msgId = (ChunkMessageIdImpl) messageId;
seek = Commands.newSeek(consumerId, requestId, msgId.getFirstChunkMessageId().getLedgerId(),
msgId.getFirstChunkMessageId().getEntryId(), new long[0]);
} else {
MessageIdImpl msgId = (MessageIdImpl) messageId;
seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), new long[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ public static MessageId fromByteArray(byte[] data) throws IOException {
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
idData.getBatchIndex());
}
} else if (idData.hasFirstChunkMessageId()) {
MessageIdData firstChunkIdData = idData.getFirstChunkMessageId();
messageId = new ChunkMessageIdImpl(
new MessageIdImpl(firstChunkIdData.getLedgerId(), firstChunkIdData.getEntryId(),
firstChunkIdData.getPartition()),
new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition()));
} else {
messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition());
}
Expand Down Expand Up @@ -166,12 +172,14 @@ public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName)
return messageId;
}

// batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message
protected byte[] toByteArray(int batchIndex, int batchSize) {
MessageIdData msgId = LOCAL_MESSAGE_ID.get()
.clear()
.setLedgerId(ledgerId)
.setEntryId(entryId);
protected MessageIdData writeMessageIdData(MessageIdData msgId, int batchIndex, int batchSize) {
if(msgId == null) {
msgId = LOCAL_MESSAGE_ID.get()
.clear();
}

msgId.setLedgerId(ledgerId).setEntryId(entryId);

if (partitionIndex >= 0) {
msgId.setPartition(partitionIndex);
}
Expand All @@ -184,6 +192,13 @@ protected byte[] toByteArray(int batchIndex, int batchSize) {
msgId.setBatchSize(batchSize);
}

return msgId;
}

// batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message
protected byte[] toByteArray(int batchIndex, int batchSize) {
MessageIdData msgId = writeMessageIdData(null, batchIndex, batchSize);

int size = msgId.getSerializedSize();
ByteBuf serialized = Unpooled.buffer(size, size);
msgId.writeTo(serialized);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ public boolean hasReplicateFrom() {
return msgMetadata.hasReplicatedFrom();
}

void setMessageId(MessageIdImpl messageId) {
void setMessageId(MessageId messageId) {
this.messageId = messageId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -485,6 +487,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
sequenceId = msgMetadata.getSequenceId();
}
String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion() ?
msg.getMessageBuilder().getSchemaVersion() : null;
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
Expand All @@ -497,7 +500,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
}
serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed,
compressedPayload.readableBytes(), uncompressedSize, callback);
compressedPayload.readableBytes(), uncompressedSize, callback, chunkedMessageCtx);
readStartIndex = ((chunkId + 1) * ClientCnx.getMaxMessageSize());
}
}
Expand All @@ -517,7 +520,7 @@ public int getNumOfPartitions() {
private void serializeAndSendMessage(MessageImpl<?> msg, ByteBuf payload,
long sequenceId, String uuid, int chunkId, int totalChunks, int readStartIndex, int chunkMaxSizeInBytes, ByteBuf compressedPayload,
boolean compressed, int compressedPayloadSize,
int uncompressedSize, SendCallback callback) throws IOException, InterruptedException {
int uncompressedSize, SendCallback callback, ChunkedMessageCtx chunkedMessageCtx) throws IOException, InterruptedException {
ByteBuf chunkPayload = compressedPayload;
MessageMetadata msgMetadata = msg.getMessageBuilder();
if (totalChunks > 1 && TopicName.get(topic).isPersistent()) {
Expand Down Expand Up @@ -612,6 +615,7 @@ private void serializeAndSendMessage(MessageImpl<?> msg, ByteBuf payload,
op.totalChunks = totalChunks;
op.chunkId = chunkId;
}
op.chunkedMessageCtx = chunkedMessageCtx;
lastSendFuture = callback.getFuture();
processOpSendMsg(op);
}
Expand Down Expand Up @@ -1038,6 +1042,16 @@ void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long le
OpSendMsg finalOp = op;
LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, getHighestSequenceId(finalOp)));
op.setMessageId(ledgerId, entryId, partitionIndex);
if (op.totalChunks > 1) {
if (op.chunkId == 0) {
op.chunkedMessageCtx.firstChunkMessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex);
} else if (op.chunkId == op.totalChunks - 1) {
op.chunkedMessageCtx.lastChunkMessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex);
op.setMessageId(op.chunkedMessageCtx.getChunkMessageId());
}
}


// if message is chunked then call callback only on last chunk
if (op.totalChunks <= 1 || (op.chunkId == op.totalChunks - 1)) {
try {
Expand Down Expand Up @@ -1185,12 +1199,54 @@ protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) {
}
}

static class ChunkedMessageCtx extends AbstractReferenceCounted {
protected MessageIdImpl firstChunkMessageId;
protected MessageIdImpl lastChunkMessageId;

public ChunkMessageIdImpl getChunkMessageId() {
return new ChunkMessageIdImpl(firstChunkMessageId, lastChunkMessageId);
}

private static final Recycler<ProducerImpl.ChunkedMessageCtx> RECYCLER =
new Recycler<ProducerImpl.ChunkedMessageCtx>() {
protected ProducerImpl.ChunkedMessageCtx newObject(
Recycler.Handle<ProducerImpl.ChunkedMessageCtx> handle) {
return new ProducerImpl.ChunkedMessageCtx(handle);
}
};

public static ChunkedMessageCtx get(int totalChunks) {
ChunkedMessageCtx chunkedMessageCtx = RECYCLER.get();
chunkedMessageCtx.setRefCnt(totalChunks);
return chunkedMessageCtx;
}

private final Handle<ProducerImpl.ChunkedMessageCtx> recyclerHandle;

private ChunkedMessageCtx(Handle<ChunkedMessageCtx> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

@Override
protected void deallocate() {
this.firstChunkMessageId = null;
this.lastChunkMessageId = null;
recyclerHandle.recycle(this);
}

@Override
public ReferenceCounted touch(Object hint) {
return this;
}
}

protected static final class OpSendMsg {
MessageImpl<?> msg;
List<MessageImpl<?>> msgs;
ByteBufPair cmd;
SendCallback callback;
Runnable rePopulate;
ChunkedMessageCtx chunkedMessageCtx;
long uncompressedSize;
long sequenceId;
long createdAt;
Expand Down Expand Up @@ -1293,6 +1349,8 @@ void recycle() {
retryCount = 0;
batchSizeByte = 0;
numMessagesInBatch = 1;
ReferenceCountUtil.safeRelease(chunkedMessageCtx);
chunkedMessageCtx = null;
recyclerHandle.recycle(this);
}

Expand All @@ -1315,6 +1373,12 @@ void setMessageId(long ledgerId, long entryId, int partitionIndex) {
}
}

void setMessageId(ChunkMessageIdImpl chunkMessageId) {
if (msg != null) {
msg.setMessageId(chunkMessageId);
}
}

private OpSendMsg(Handle<OpSendMsg> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
Expand Down
Loading