Skip to content

Commit 800b628

Browse files
authored
[ISSUE #4264]Some optimizations for ProducerService (#4334)
1 parent 19dd2b1 commit 800b628

File tree

2 files changed

+7
-7
lines changed

2 files changed

+7
-7
lines changed

eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/service/ConsumerService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@
4242
@Slf4j
4343
public class ConsumerService extends ConsumerServiceGrpc.ConsumerServiceImplBase {
4444

45-
public OpenFunctionSinkConnector openFunctionSinkConnector;
45+
private final OpenFunctionSinkConnector openFunctionSinkConnector;
4646

47-
public BlockingQueue<ConnectRecord> queue;
47+
private final BlockingQueue<ConnectRecord> queue;
4848

49-
public OpenFunctionServerConfig config;
49+
private final OpenFunctionServerConfig config;
5050

5151
private final transient ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 10115).usePlaintext().build();
5252

eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/service/ProducerService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@
4444
@Slf4j
4545
public class ProducerService extends PublisherServiceGrpc.PublisherServiceImplBase {
4646

47-
public OpenFunctionSourceConnector openFunctionSourceConnector;
47+
private final OpenFunctionSourceConnector openFunctionSourceConnector;
4848

49-
public BlockingQueue<ConnectRecord> queue;
49+
private final BlockingQueue<ConnectRecord> queue;
5050

51-
public OpenFunctionServerConfig config;
51+
private final OpenFunctionServerConfig config;
5252

5353
public ProducerService(OpenFunctionSourceConnector openFunctionSourceConnector, OpenFunctionServerConfig serverConfig) {
5454
this.openFunctionSourceConnector = openFunctionSourceConnector;
@@ -85,7 +85,7 @@ public void publish(CloudEvent event, StreamObserver<CloudEvent> responseObserve
8585
CloudEventAttributeValue.newBuilder().setCeString(StatusCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()).build())
8686
.putAttributes(ProtocolKey.GRPC_RESPONSE_TIME, CloudEventAttributeValue.newBuilder()
8787
.setCeTimestamp(Timestamp.newBuilder().setSeconds(instant.getEpochSecond()).setNanos(instant.getNano()).build()).build());
88-
throw new RuntimeException(e);
88+
Thread.currentThread().interrupt();
8989
}
9090

9191
responseObserver.onNext(builder.build());

0 commit comments

Comments
 (0)