Skip to content

Commit 4c09f02

Browse files
committed
make sure all pinned code is commented for easy access
1 parent d69e7cd commit 4c09f02

17 files changed

Lines changed: 109 additions & 82 deletions

src/main/java/io/nats/client/BaseConsumerContext.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,14 @@ public interface BaseConsumerContext {
183183
@NonNull
184184
MessageConsumer consume(@NonNull ConsumeOptions consumeOptions, @Nullable Dispatcher dispatcher, @NonNull MessageHandler handler) throws IOException, JetStreamApiException;
185185

186-
/**
187-
* Unpins this consumer
188-
* @throws IOException covers various communication issues with the NATS
189-
* server such as timeout or interruption
190-
* @throws JetStreamApiException the request had an error related to the data
191-
* @return true if the delete succeeded
192-
*/
193-
boolean unpin(String group) throws IOException, JetStreamApiException;
186+
// TODO - PINNED CONSUMER SUPPORT
187+
// /**
188+
// * Unpins this consumer
189+
// * @param group the group name of the consumer's group
190+
// * @throws IOException covers various communication issues with the NATS
191+
// * server such as timeout or interruption
192+
// * @throws JetStreamApiException the request had an error related to the data
193+
// * @return true if the delete succeeded
194+
// */
195+
// boolean unpin(String group) throws IOException, JetStreamApiException;
194196
}

src/main/java/io/nats/client/JetStreamManagement.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -357,17 +357,18 @@ public interface JetStreamManagement {
357357
*/
358358
boolean deleteMessage(String streamName, long seq, boolean erase) throws IOException, JetStreamApiException;
359359

360-
/**
361-
* Unpins a consumer
362-
* @param streamName name of the stream
363-
* @param consumerName name of consumer
364-
* @param consumerGroup name of the consumer's group
365-
* @throws IOException covers various communication issues with the NATS
366-
* server such as timeout or interruption
367-
* @throws JetStreamApiException the request had an error related to the data
368-
* @return true if the delete succeeded
369-
*/
370-
boolean unpinConsumer(String streamName, String consumerName, String consumerGroup) throws IOException, JetStreamApiException;
360+
// TODO - PINNED CONSUMER SUPPORT
361+
// /**
362+
// * Unpins a consumer
363+
// * @param streamName name of the stream
364+
// * @param consumerName name of consumer
365+
// * @param consumerGroup name of the consumer's group
366+
// * @throws IOException covers various communication issues with the NATS
367+
// * server such as timeout or interruption
368+
// * @throws JetStreamApiException the request had an error related to the data
369+
// * @return true if the delete succeeded
370+
// */
371+
// boolean unpinConsumer(String streamName, String consumerName, String consumerGroup) throws IOException, JetStreamApiException;
371372

372373
/**
373374
* Gets a context for publishing and subscribing to subjects backed by Jetstream streams

src/main/java/io/nats/client/api/PriorityGroupState.java

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,30 +16,30 @@
1616
import io.nats.client.support.JsonValue;
1717
import io.nats.client.support.JsonValueUtils;
1818
import org.jspecify.annotations.NonNull;
19-
import org.jspecify.annotations.Nullable;
2019

21-
import java.time.ZonedDateTime;
2220
import java.util.List;
2321

24-
import static io.nats.client.support.ApiConstants.*;
25-
import static io.nats.client.support.JsonValueUtils.readDate;
22+
import static io.nats.client.support.ApiConstants.GROUP;
2623

2724
/**
2825
* Status of a specific consumer priority group
2926
*/
3027
public class PriorityGroupState {
3128
private final String group;
32-
private final String pinnedClientId;
33-
private final ZonedDateTime pinnedTime;
29+
30+
// TODO - PINNED CONSUMER SUPPORT
31+
// private final String pinnedClientId;
32+
// private final ZonedDateTime pinnedTime;
33+
3434

3535
static List<PriorityGroupState> optionalListOf(JsonValue vpgStates) {
3636
return JsonValueUtils.optionalListOf(vpgStates, PriorityGroupState::new);
3737
}
3838

3939
PriorityGroupState(JsonValue vpgState) {
4040
group = JsonValueUtils.readString(vpgState, GROUP);
41-
pinnedClientId = JsonValueUtils.readString(vpgState, PINNED_CLIENT_ID);
42-
pinnedTime = readDate(vpgState, PINNED_TS);
41+
// pinnedClientId = JsonValueUtils.readString(vpgState, PINNED_CLIENT_ID);
42+
// pinnedTime = readDate(vpgState, PINNED_TS);
4343
}
4444

4545
/**
@@ -51,29 +51,30 @@ public String getGroup() {
5151
return group;
5252
}
5353

54-
/**
55-
* The generated ID of the pinned client
56-
* @return the id
57-
*/
58-
@Nullable
59-
public String getPinnedClientId() {
60-
return pinnedClientId;
61-
}
54+
// /**
55+
// * The generated ID of the pinned client
56+
// * @return the id
57+
// */
58+
// @Nullable
59+
// public String getPinnedClientId() {
60+
// return pinnedClientId;
61+
// }
6262

63-
/**
64-
* The timestamp when the client was pinned
65-
* @return the timestamp
66-
*/
67-
public ZonedDateTime getPinnedTime() {
68-
return pinnedTime;
69-
}
63+
// /**
64+
// * The timestamp when the client was pinned
65+
// * @return the timestamp
66+
// */
67+
// @Nullable
68+
// public ZonedDateTime getPinnedTime() {
69+
// return pinnedTime;
70+
// }
7071

7172
@Override
7273
public String toString() {
7374
return "PriorityGroupState{" +
7475
"group='" + group + '\'' +
75-
", pinnedClientId='" + pinnedClientId + '\'' +
76-
", pinnedTime=" + pinnedTime +
76+
// ", pinnedClientId='" + pinnedClientId + '\'' +
77+
// ", pinnedTime=" + pinnedTime +
7778
'}';
7879
}
7980
}

src/main/java/io/nats/client/api/PriorityPolicy.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public enum PriorityPolicy {
2626
Overflow("overflow"),
2727
Prioritized("prioritized");
2828

29+
// TODO - PINNED CONSUMER SUPPORT
2930
// PinnedClient("pinned_client")
3031

3132
private final String policy;

src/main/java/io/nats/client/impl/MessageManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,15 @@ protected void trackJsMessage(Message msg) {
9797
NatsJetStreamMetaData meta = msg.metaData();
9898
lastStreamSeq = meta.streamSequence();
9999
lastConsumerSeq++;
100+
// TODO - PINNED CONSUMER SUPPORT
100101
// subTrackJsMessage(msg); // for subclasses so they don't have to acquire the lock
101102
}
102103
finally {
103104
stateChangeLock.unlock();
104105
}
105106
}
106107

108+
// TODO - PINNED CONSUMER SUPPORT
107109
// protected void subTrackJsMessage(Message msg) {}
108110

109111
protected void handleHeartbeatError() {

src/main/java/io/nats/client/impl/NatsConsumerContext.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ private void checkState() throws IOException {
144144
}
145145
}
146146

147+
// TODO - PINNED CONSUMER SUPPORT
147148
// private void checkNotPinned(String label) throws IOException {
148149
// ConsumerInfo ci = cachedConsumerInfo.get();
149150
// if (ci != null) {
@@ -221,6 +222,7 @@ public Message next(long maxWaitMillis) throws IOException, InterruptedException
221222
try {
222223
stateLock.lock();
223224
checkState();
225+
// TODO - PINNED CONSUMER SUPPORT
224226
// checkNotPinned("Next");
225227

226228
try {
@@ -290,6 +292,7 @@ public FetchConsumer fetch(@NonNull FetchConsumeOptions fetchConsumeOptions) thr
290292
try {
291293
stateLock.lock();
292294
checkState();
295+
// TODO - PINNED CONSUMER SUPPORT
293296
// checkNotPinned("Fetch");
294297
return (FetchConsumer)trackConsume(new NatsFetchConsumer(this, cachedConsumerInfo.get(), fetchConsumeOptions));
295298
}
@@ -375,16 +378,17 @@ public MessageConsumer consume(@NonNull ConsumeOptions consumeOptions,
375378
}
376379
}
377380

378-
@Override
379-
public boolean unpin(String group) throws IOException, JetStreamApiException {
380-
String name = consumerName.get();
381-
if (name == null) {
382-
ConsumerInfo ci = cachedConsumerInfo.get();
383-
if (ci == null) {
384-
ci = getConsumerInfo();
385-
}
386-
name = ci.getName();
387-
}
388-
return streamCtx.jsm.unpinConsumer(streamCtx.streamName, name, group);
389-
}
381+
// TODO - PINNED CONSUMER SUPPORT
382+
// @Override
383+
// public boolean unpin(String group) throws IOException, JetStreamApiException {
384+
// String name = consumerName.get();
385+
// if (name == null) {
386+
// ConsumerInfo ci = cachedConsumerInfo.get();
387+
// if (ci == null) {
388+
// ci = getConsumerInfo();
389+
// }
390+
// name = ci.getName();
391+
// }
392+
// return streamCtx.jsm.unpinConsumer(streamCtx.streamName, name, group);
393+
// }
390394
}

src/main/java/io/nats/client/impl/NatsFetchConsumer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer
4848
inactiveThreshold = expiresInMillis * 110 / 100; // 10% longer than the wait
4949
}
5050

51+
// TODO - PINNED CONSUMER SUPPORT
5152
// PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm.currentPinId,
5253
PullRequestOptions pro = PullRequestOptions.builder(fetchConsumeOptions.getMaxMessages())
5354
.maxBytes(fetchConsumeOptions.getMaxBytes())

src/main/java/io/nats/client/impl/NatsJetStreamManagement.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -362,19 +362,20 @@ public boolean deleteMessage(String streamName, long seq, boolean erase) throws
362362
return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
363363
}
364364

365-
/**
366-
* {@inheritDoc}
367-
*/
368-
@Override
369-
public boolean unpinConsumer(String streamName, String consumerName, String consumerGroup) throws IOException, JetStreamApiException {
370-
validateNotNull(streamName, "Stream Name");
371-
validateNotNull(consumerName, "Consumer Name");
372-
validateNotNull(consumerGroup, "Consumer Group");
373-
String subj = String.format(JSAPI_CONSUMER_UNPIN, streamName, consumerName);
374-
byte[] payload = String.format("{\"group\": \"%s\"}", consumerGroup).getBytes();
375-
Message resp = makeRequestResponseRequired(subj, payload, getTimeout());
376-
return new SuccessApiResponse(resp, true).throwOnHasError().getSuccess();
377-
}
365+
// TODO - PINNED CONSUMER SUPPORT
366+
// /**
367+
// * {@inheritDoc}
368+
// */
369+
// @Override
370+
// public boolean unpinConsumer(String streamName, String consumerName, String consumerGroup) throws IOException, JetStreamApiException {
371+
// validateNotNull(streamName, "Stream Name");
372+
// validateNotNull(consumerName, "Consumer Name");
373+
// validateNotNull(consumerGroup, "Consumer Group");
374+
// String subj = String.format(JSAPI_CONSUMER_UNPIN, streamName, consumerName);
375+
// byte[] payload = String.format("{\"group\": \"%s\"}", consumerGroup).getBytes();
376+
// Message resp = makeRequestResponseRequired(subj, payload, getTimeout());
377+
// return new SuccessApiResponse(resp, true).throwOnHasError().getSuccess();
378+
// }
378379

379380
/**
380381
* {@inheritDoc}

src/main/java/io/nats/client/impl/NatsMessageConsumer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ else if (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.p
104104
private void repull() {
105105
int rePullMessages = Math.max(1, consumeOpts.getBatchSize() - pmm.pendingMessages);
106106
long rePullBytes = consumeOpts.getBatchBytes() == 0 ? 0 : consumeOpts.getBatchBytes() - pmm.pendingBytes;
107+
// TODO - PINNED CONSUMER SUPPORT
107108
// PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm.currentPinId,
108109
PullRequestOptions pro = PullRequestOptions.builder(rePullMessages)
109110
.maxBytes(rePullBytes)

src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ protected void shutdownSub() {
139139
}
140140
}
141141

142+
// TODO - PINNED CONSUMER SUPPORT
142143
// static class PinnablePullRequestOptions extends PullRequestOptions {
143144
// final String pinId;
144145
//

0 commit comments

Comments
 (0)