Skip to content

Commit 140bebd

Browse files
committed
[ISSUE #4133]Optimize OpenFunctionSourceConnector poll
1 parent 991cbda commit 140bebd

File tree

2 files changed

+25
-8
lines changed
  • eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector
  • eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect

2 files changed

+25
-8
lines changed

eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,15 @@
2626
import java.util.List;
2727
import java.util.concurrent.BlockingQueue;
2828
import java.util.concurrent.LinkedBlockingQueue;
29+
import java.util.concurrent.TimeUnit;
2930

3031
import lombok.extern.slf4j.Slf4j;
3132

3233
@Slf4j
3334
public class OpenFunctionSourceConnector implements Source {
3435

36+
private static final int DEFAULT_BATCH_SIZE = 10;
37+
3538
private OpenFunctionSourceConfig sourceConfig;
3639

3740
private BlockingQueue<ConnectRecord> queue;
@@ -74,10 +77,20 @@ public BlockingQueue<ConnectRecord> queue() {
7477

7578
@Override
7679
public List<ConnectRecord> poll() {
77-
List<ConnectRecord> connectRecords = new ArrayList<>();
78-
ConnectRecord connectRecord = queue.poll();
79-
if (connectRecord != null) {
80-
connectRecords.add(connectRecord);
80+
81+
List<ConnectRecord> connectRecords = new ArrayList<>(DEFAULT_BATCH_SIZE);
82+
83+
for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) {
84+
try {
85+
ConnectRecord connectRecord = queue.poll(3, TimeUnit.SECONDS);
86+
if (connectRecord == null) {
87+
break;
88+
}
89+
connectRecords.add(connectRecord);
90+
} catch (InterruptedException e) {
91+
// nothing to do
92+
break;
93+
}
8194
}
8295
return connectRecords;
8396
}

eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
3030
import org.apache.eventmesh.openconnect.api.source.Source;
3131

32+
import org.apache.commons.collections4.CollectionUtils;
33+
3234
import java.net.URI;
3335
import java.nio.charset.StandardCharsets;
3436
import java.util.List;
@@ -120,7 +122,7 @@ public void startPoll() {
120122
try {
121123
connectRecord = queue.poll(5, TimeUnit.SECONDS);
122124
} catch (InterruptedException e) {
123-
e.printStackTrace();
125+
Thread.currentThread().interrupt();
124126
log.error("poll connect record error", e);
125127
}
126128
if (connectRecord == null) {
@@ -136,8 +138,11 @@ private void startConnector() throws Exception {
136138
source.start();
137139
while (isRunning) {
138140
List<ConnectRecord> connectorRecordList = source.poll();
139-
for (ConnectRecord connectRecord : connectorRecordList) {
140-
queue.put(connectRecord);
141+
if (CollectionUtils.isEmpty(connectorRecordList)) {
142+
continue;
143+
}
144+
for (ConnectRecord record : connectorRecordList) {
145+
queue.put(record);
141146
}
142147
}
143148
}
@@ -175,7 +180,6 @@ public void stop() {
175180
try {
176181
eventMeshTCPClient.close();
177182
} catch (Exception e) {
178-
e.printStackTrace();
179183
log.error("event mesh client close error", e);
180184
}
181185
log.info("source worker stopped");

0 commit comments

Comments
 (0)