Skip to content

Commit 322e2b5

Browse files
authored
Merge 09a944c into fb144bb
2 parents fb144bb + 09a944c commit 322e2b5

File tree

1 file changed

+40
-17
lines changed
  • eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker

1 file changed

+40
-17
lines changed

eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/MessageQueue.java

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,21 @@
2424

2525
import com.google.common.base.Preconditions;
2626

27+
import lombok.Getter;
28+
2729
/**
2830
* This is a block queue, can get entity by offset. The queue is a FIFO data structure.
2931
*/
3032
public class MessageQueue {
3133

34+
@Getter
3235
private final MessageEntity[] items;
3336

34-
private int takeIndex;
37+
private volatile int takeIndex;
3538

36-
private int putIndex;
39+
private volatile int putIndex;
3740

38-
private int count;
41+
private volatile int count;
3942

4043
private final ReentrantLock lock;
4144

@@ -59,9 +62,10 @@ public MessageQueue(int capacity) {
5962
}
6063

6164
/**
62-
* Insert the message at the tail of this queue, waiting for space to become available if the queue is full
65+
* Inserts the specified MessageEntity object into the queue.
6366
*
64-
* @param messageEntity
67+
* @param messageEntity The MessageEntity object to be inserted into the queue.
68+
* @throws InterruptedException if the current thread is interrupted while waiting for space to become available in the queue.
6569
*/
6670
public void put(MessageEntity messageEntity) throws InterruptedException {
6771
Preconditions.checkNotNull(messageEntity);
@@ -80,8 +84,8 @@ public void put(MessageEntity messageEntity) throws InterruptedException {
8084
/**
8185
* Get the first message at this queue, waiting for the message is available if the queue is empty, this method will not remove the message
8286
*
83-
* @return MessageEntity
84-
* @throws InterruptedException
87+
* @return The MessageEntity object at the head of the queue.
88+
* @throws InterruptedException if the current thread is interrupted while waiting for an element to become available in the queue.
8589
*/
8690
public MessageEntity take() throws InterruptedException {
8791
ReentrantLock reentrantLock = this.lock;
@@ -145,21 +149,26 @@ public MessageEntity getTail() {
145149
/**
146150
* Get the message by offset, since the offset is increment, so we can get the first message in this queue and calculate the index of this offset
147151
*
148-
* @param offset
149-
* @return MessageEntity
152+
* @param offset The offset of the MessageEntity object to be retrieved.
153+
* @return The MessageEntity object with the specified offset, or null if no such object exists in the queue.
154+
* @throws RuntimeException if the specified offset is less than the offset of the head MessageEntity object.
150155
*/
151156
public MessageEntity getByOffset(long offset) {
152157
ReentrantLock reentrantLock = this.lock;
153158
reentrantLock.lock();
154159
try {
155-
MessageEntity head = getHead();
156-
if (head == null) {
160+
if (count == 0) {
157161
return null;
158162
}
163+
int tailIndex = putIndex - 1;
164+
MessageEntity head = itemAt(takeIndex);
159165
if (head.getOffset() > offset) {
160166
throw new RuntimeException(String.format("The message has been deleted, offset: %s", offset));
161167
}
162-
MessageEntity tail = getTail();
168+
if (tailIndex < 0) {
169+
tailIndex += items.length;
170+
}
171+
MessageEntity tail = itemAt(tailIndex);
163172
if (tail == null || tail.getOffset() < offset) {
164173
return null;
165174
}
@@ -174,6 +183,9 @@ public MessageEntity getByOffset(long offset) {
174183
}
175184
}
176185

186+
/**
187+
* Removes the MessageEntity object at the head of the queue.
188+
*/
177189
public void removeHead() {
178190
ReentrantLock reentrantLock = this.lock;
179191
reentrantLock.lock();
@@ -195,11 +207,21 @@ public int getSize() {
195207
return count;
196208
}
197209

198-
210+
/**
211+
* Returns the MessageEntity object at the specified index.
212+
*
213+
* @param index The index of the MessageEntity object to be returned.
214+
* @return The MessageEntity object at the specified index.
215+
*/
199216
private MessageEntity itemAt(int index) {
200217
return items[index];
201218
}
202219

220+
/**
221+
* Insert the message at the tail of this queue, waiting for space to become available if the queue is full
222+
*
223+
* @param messageEntity The MessageEntity object to be inserted into the queue.
224+
*/
203225
private void enqueue(MessageEntity messageEntity) {
204226
items[putIndex++] = messageEntity;
205227
if (putIndex == items.length) {
@@ -209,6 +231,11 @@ private void enqueue(MessageEntity messageEntity) {
209231
notEmpty.signalAll();
210232
}
211233

234+
/**
235+
* Removes and returns the MessageEntity object at the head of the queue.
236+
*
237+
* @return The MessageEntity object at the head of the queue.
238+
*/
212239
private MessageEntity dequeue() {
213240
MessageEntity item = items[takeIndex++];
214241
if (takeIndex == items.length) {
@@ -225,8 +252,4 @@ public int getTakeIndex() {
225252
public int getPutIndex() {
226253
return putIndex;
227254
}
228-
229-
public MessageEntity[] getItems() {
230-
return items;
231-
}
232255
}

0 commit comments

Comments
 (0)