File tree Expand file tree Collapse file tree 2 files changed +25
-8
lines changed
eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector
eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect Expand file tree Collapse file tree 2 files changed +25
-8
lines changed Original file line number Diff line number Diff line change 2626import java .util .List ;
2727import java .util .concurrent .BlockingQueue ;
2828import java .util .concurrent .LinkedBlockingQueue ;
29+ import java .util .concurrent .TimeUnit ;
2930
3031import lombok .extern .slf4j .Slf4j ;
3132
3233@ Slf4j
3334public class OpenFunctionSourceConnector implements Source {
3435
36+ private static final int DEFAULT_BATCH_SIZE = 10 ;
37+
3538 private OpenFunctionSourceConfig sourceConfig ;
3639
3740 private BlockingQueue <ConnectRecord > queue ;
@@ -74,10 +77,20 @@ public BlockingQueue<ConnectRecord> queue() {
7477
7578 @ Override
7679 public List <ConnectRecord > poll () {
77- List <ConnectRecord > connectRecords = new ArrayList <>();
78- ConnectRecord connectRecord = queue .poll ();
79- if (connectRecord != null ) {
80- connectRecords .add (connectRecord );
80+
81+ List <ConnectRecord > connectRecords = new ArrayList <>(DEFAULT_BATCH_SIZE );
82+
83+ for (int count = 0 ; count < DEFAULT_BATCH_SIZE ; ++count ) {
84+ try {
85+ ConnectRecord connectRecord = queue .poll (3 , TimeUnit .SECONDS );
86+ if (connectRecord == null ) {
87+ break ;
88+ }
89+ connectRecords .add (connectRecord );
90+ } catch (InterruptedException e ) {
91+ // nothing to do
92+ break ;
93+ }
8194 }
8295 return connectRecords ;
8396 }
Original file line number Diff line number Diff line change 2929import org .apache .eventmesh .openconnect .api .data .ConnectRecord ;
3030import org .apache .eventmesh .openconnect .api .source .Source ;
3131
32+ import org .apache .commons .collections4 .CollectionUtils ;
33+
3234import java .net .URI ;
3335import java .nio .charset .StandardCharsets ;
3436import java .util .List ;
@@ -120,7 +122,7 @@ public void startPoll() {
120122 try {
121123 connectRecord = queue .poll (5 , TimeUnit .SECONDS );
122124 } catch (InterruptedException e ) {
123- e . printStackTrace ();
125+ Thread . currentThread (). interrupt ();
124126 log .error ("poll connect record error" , e );
125127 }
126128 if (connectRecord == null ) {
@@ -136,8 +138,11 @@ private void startConnector() throws Exception {
136138 source .start ();
137139 while (isRunning ) {
138140 List <ConnectRecord > connectorRecordList = source .poll ();
139- for (ConnectRecord connectRecord : connectorRecordList ) {
140- queue .put (connectRecord );
141+ if (CollectionUtils .isEmpty (connectorRecordList )) {
142+ continue ;
143+ }
144+ for (ConnectRecord record : connectorRecordList ) {
145+ queue .put (record );
141146 }
142147 }
143148 }
@@ -175,7 +180,6 @@ public void stop() {
175180 try {
176181 eventMeshTCPClient .close ();
177182 } catch (Exception e ) {
178- e .printStackTrace ();
179183 log .error ("event mesh client close error" , e );
180184 }
181185 log .info ("source worker stopped" );
You can’t perform that action at this time.
0 commit comments