|
21 | 21 | import java.util.ArrayList; |
22 | 22 | import java.util.List; |
23 | 23 | import java.util.concurrent.CompletableFuture; |
24 | | -import java.util.concurrent.TimeUnit; |
25 | | -import java.util.concurrent.TimeoutException; |
26 | 24 | import lombok.extern.slf4j.Slf4j; |
27 | 25 | import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; |
28 | 26 | import org.apache.bookkeeper.mledger.impl.PositionImpl; |
29 | 27 | import org.apache.commons.collections4.map.LinkedMap; |
30 | | -import org.apache.pulsar.broker.service.BrokerServiceException; |
31 | 28 | import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter; |
32 | 29 | import org.apache.pulsar.broker.service.persistent.PersistentTopic; |
33 | | -import org.apache.pulsar.broker.systopic.SystemTopicClient; |
34 | 30 | import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; |
35 | 31 | import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata; |
36 | 32 | import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot; |
37 | | -import org.apache.pulsar.client.api.Message; |
38 | 33 | import org.apache.pulsar.client.api.transaction.TxnID; |
39 | | -import org.apache.pulsar.client.impl.PulsarClientImpl; |
40 | 34 | import org.apache.pulsar.common.naming.TopicName; |
41 | 35 | import org.apache.pulsar.common.policies.data.TransactionBufferStats; |
42 | | -import org.apache.pulsar.common.util.FutureUtil; |
43 | 36 |
|
44 | 37 | @Slf4j |
45 | 38 | public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcessor { |
@@ -90,48 +83,27 @@ public boolean checkAbortedTransaction(TxnID txnID) { |
90 | 83 | return aborts.containsKey(txnID); |
91 | 84 | } |
92 | 85 |
|
93 | | - private long getSystemClientOperationTimeoutMs() throws Exception { |
94 | | - PulsarClientImpl pulsarClient = (PulsarClientImpl) topic.getBrokerService().getPulsar().getClient(); |
95 | | - return pulsarClient.getConfiguration().getOperationTimeoutMs(); |
96 | | - } |
97 | | - |
98 | 86 | @Override |
99 | 87 | public CompletableFuture<PositionImpl> recoverFromSnapshot() { |
100 | | - return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory() |
101 | | - .getTxnBufferSnapshotService() |
102 | | - .createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> { |
103 | | - try { |
104 | | - PositionImpl startReadCursorPosition = null; |
105 | | - while (reader.hasMoreEvents()) { |
106 | | - Message<TransactionBufferSnapshot> message = reader.readNextAsync() |
107 | | - .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS); |
108 | | - if (topic.getName().equals(message.getKey())) { |
109 | | - TransactionBufferSnapshot transactionBufferSnapshot = message.getValue(); |
110 | | - if (transactionBufferSnapshot != null) { |
111 | | - handleSnapshot(transactionBufferSnapshot); |
112 | | - startReadCursorPosition = PositionImpl.get( |
113 | | - transactionBufferSnapshot.getMaxReadPositionLedgerId(), |
114 | | - transactionBufferSnapshot.getMaxReadPositionEntryId()); |
115 | | - } |
116 | | - } |
117 | | - } |
118 | | - return CompletableFuture.completedFuture(startReadCursorPosition); |
119 | | - } catch (TimeoutException ex) { |
120 | | - Throwable t = FutureUtil.unwrapCompletionException(ex); |
121 | | - String errorMessage = String.format("[%s] Transaction buffer recover fail by read " |
122 | | - + "transactionBufferSnapshot timeout!", topic.getName()); |
123 | | - log.error(errorMessage, t); |
124 | | - return FutureUtil.failedFuture( |
125 | | - new BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t)); |
126 | | - } catch (Exception ex) { |
127 | | - log.error("[{}] Transaction buffer recover fail when read " |
128 | | - + "transactionBufferSnapshot!", topic.getName(), ex); |
129 | | - return FutureUtil.failedFuture(ex); |
130 | | - } finally { |
131 | | - closeReader(reader); |
132 | | - } |
133 | | - }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider() |
134 | | - .getExecutor(this)); |
| 88 | + final var future = new CompletableFuture<PositionImpl>(); |
| 89 | + final var pulsar = topic.getBrokerService().getPulsar(); |
| 90 | + pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> { |
| 91 | + try { |
| 92 | + final var snapshot = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService() |
| 93 | + .getTableView().readLatest(topic.getName()); |
| 94 | + if (snapshot != null) { |
| 95 | + handleSnapshot(snapshot); |
| 96 | + final var startReadCursorPosition = new PositionImpl(snapshot.getMaxReadPositionLedgerId(), |
| 97 | + snapshot.getMaxReadPositionEntryId()); |
| 98 | + future.complete(startReadCursorPosition); |
| 99 | + } else { |
| 100 | + future.complete(null); |
| 101 | + } |
| 102 | + } catch (Throwable e) { |
| 103 | + future.completeExceptionally(e); |
| 104 | + } |
| 105 | + }); |
| 106 | + return future; |
135 | 107 | } |
136 | 108 |
|
137 | 109 | @Override |
@@ -190,13 +162,6 @@ public synchronized CompletableFuture<Void> closeAsync() { |
190 | 162 | return CompletableFuture.completedFuture(null); |
191 | 163 | } |
192 | 164 |
|
193 | | - private void closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) { |
194 | | - reader.closeAsync().exceptionally(e -> { |
195 | | - log.error("[{}]Transaction buffer reader close error!", topic.getName(), e); |
196 | | - return null; |
197 | | - }); |
198 | | - } |
199 | | - |
200 | 165 | private void handleSnapshot(TransactionBufferSnapshot snapshot) { |
201 | 166 | if (snapshot.getAborts() != null) { |
202 | 167 | snapshot.getAborts().forEach(abortTxnMetadata -> |
|
0 commit comments