Skip to content

Commit 190df62

Browse files
authored
Merge b4b0a6a into a091017
2 parents a091017 + b4b0a6a commit 190df62

File tree

103 files changed

+592
-1129
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

103 files changed

+592
-1129
lines changed

eventmesh-common/src/main/java/org/apache/eventmesh/common/file/WatchFileManager.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.eventmesh.common.file;
1919

20+
import org.apache.eventmesh.common.utils.LogUtils;
21+
2022
import java.util.HashMap;
2123
import java.util.Map;
2224
import java.util.concurrent.atomic.AtomicBoolean;
@@ -60,14 +62,10 @@ private static void shutdown() {
6062
return;
6163
}
6264

63-
if (log.isInfoEnabled()) {
64-
log.info("[WatchFileManager] start close");
65-
}
65+
LogUtils.info(log, "[WatchFileManager] start close");
6666

6767
for (Map.Entry<String, WatchFileTask> entry : WATCH_FILE_TASK_MAP.entrySet()) {
68-
if (log.isInfoEnabled()) {
69-
log.info("[WatchFileManager] start to shutdown : {}", entry.getKey());
70-
}
68+
LogUtils.info(log, "[WatchFileManager] start to shutdown : {}", entry.getKey());
7169

7270
try {
7371
entry.getValue().shutdown();

eventmesh-common/src/main/java/org/apache/eventmesh/common/file/WatchFileTask.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.eventmesh.common.file;
1919

20+
import org.apache.eventmesh.common.utils.LogUtils;
21+
2022
import java.nio.file.FileSystem;
2123
import java.nio.file.FileSystems;
2224
import java.nio.file.Path;
@@ -87,19 +89,15 @@ public void run() {
8789
for (WatchEvent<?> event : events) {
8890
WatchEvent.Kind<?> kind = event.kind();
8991
if (kind.equals(StandardWatchEventKinds.OVERFLOW)) {
90-
if (log.isWarnEnabled()) {
91-
log.warn("[WatchFileTask] file overflow: {}", event.context());
92-
}
92+
LogUtils.warn(log, "[WatchFileTask] file overflow: {}", event.context());
9393
continue;
9494
}
9595
precessWatchEvent(event);
9696
}
9797
} catch (InterruptedException ex) {
9898
boolean interrupted = Thread.interrupted();
9999
if (interrupted) {
100-
if (log.isDebugEnabled()) {
101-
log.debug("[WatchFileTask] file watch is interrupted");
102-
}
100+
LogUtils.debug(log, "[WatchFileTask] file watch is interrupted");
103101
}
104102
} catch (Exception ex) {
105103
log.error("[WatchFileTask] an exception occurred during file listening : ", ex);

eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.eventmesh.common.protocol.tcp.Subscription;
2626
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
2727
import org.apache.eventmesh.common.utils.JsonUtils;
28+
import org.apache.eventmesh.common.utils.LogUtils;
2829

2930
import org.apache.commons.lang3.ArrayUtils;
3031
import org.apache.commons.lang3.StringUtils;
@@ -145,9 +146,7 @@ private Header parseHeader(ByteBuf in, int headerLength) throws JsonProcessingEx
145146
}
146147
final byte[] headerData = new byte[headerLength];
147148
in.readBytes(headerData);
148-
if (log.isDebugEnabled()) {
149-
log.debug("Decode headerJson={}", deserializeBytes(headerData));
150-
}
149+
LogUtils.debug(log, "Decode headerJson={}", deserializeBytes(headerData));
151150
return JsonUtils.parseObject(headerData, Header.class);
152151
}
153152

@@ -157,9 +156,7 @@ private Object parseBody(ByteBuf in, Header header, int bodyLength) throws JsonP
157156
}
158157
final byte[] bodyData = new byte[bodyLength];
159158
in.readBytes(bodyData);
160-
if (log.isDebugEnabled()) {
161-
log.debug("Decode bodyJson={}", deserializeBytes(bodyData));
162-
}
159+
LogUtils.debug(log, "Decode bodyJson={}", deserializeBytes(bodyData));
163160
return deserializeBody(deserializeBytes(bodyData), header);
164161
}
165162

@@ -211,9 +208,7 @@ private static Object deserializeBody(String bodyJsonString, Header header) thro
211208
case REDIRECT_TO_CLIENT:
212209
return JsonUtils.parseObject(bodyJsonString, RedirectInfo.class);
213210
default:
214-
if (log.isWarnEnabled()) {
215-
log.warn("Invalidate TCP command: {}", command);
216-
}
211+
LogUtils.warn(log, "Invalidate TCP command: {}", command);
217212
return null;
218213
}
219214
}

eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/connection/JdbcConnection.java

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.eventmesh.connector.jdbc.connection;
1919

20+
import org.apache.eventmesh.common.utils.LogUtils;
2021
import org.apache.eventmesh.connector.jdbc.JdbcDriverMetaData;
2122
import org.apache.eventmesh.connector.jdbc.config.JdbcConfig;
2223

@@ -34,11 +35,11 @@
3435

3536
import lombok.extern.slf4j.Slf4j;
3637

37-
@Slf4j
3838
/**
3939
* JdbcConnection class representing a JDBC connection.
4040
* Implements the AutoCloseable interface.
4141
*/
42+
@Slf4j
4243
public class JdbcConnection implements AutoCloseable {
4344

4445
private static final int CONNECTION_VALID_CHECK_TIMEOUT_IN_SEC = 3;
@@ -181,9 +182,7 @@ public JdbcConnection execute(String... sqlStatements) throws SQLException {
181182
return execute(statement -> {
182183
for (String sqlStatement : sqlStatements) {
183184
if (sqlStatement != null) {
184-
if (log.isDebugEnabled()) {
185-
log.debug("Executing '{}'", sqlStatement);
186-
}
185+
LogUtils.debug(log, "Executing '{}'", sqlStatement);
187186
statement.execute(sqlStatement);
188187
}
189188
}
@@ -221,9 +220,7 @@ public JdbcConnection executeWithoutCommitting(String... sqlStatements) throws S
221220

222221
try (Statement statement = conn.createStatement()) {
223222
for (String sqlStatement : sqlStatements) {
224-
if (log.isDebugEnabled()) {
225-
log.debug("Executing sql statement: {}", sqlStatement);
226-
}
223+
LogUtils.debug(log, "Executing sql statement: {}", sqlStatement);
227224
statement.execute(sqlStatement);
228225
}
229226
}
@@ -296,9 +293,7 @@ public JdbcConnection query(String sql, JdbcResultSetConsumer resultConsumer) th
296293
public JdbcConnection query(String sql, StatementFactory statementFactory, JdbcResultSetConsumer resultConsumer) throws SQLException {
297294
Connection conn = connection();
298295
try (Statement statement = statementFactory.createStatement(conn)) {
299-
if (log.isDebugEnabled()) {
300-
log.debug("Query sql '{}'", sql);
301-
}
296+
LogUtils.debug(log, "Query sql '{}'", sql);
302297
try (ResultSet resultSet = statement.executeQuery(sql)) {
303298
if (resultConsumer != null) {
304299
resultConsumer.accept(resultSet);
@@ -336,9 +331,7 @@ public <T> T query(String sql, ResultSetMapper<T> resultSetMapper) throws SQLExc
336331
public <T> T query(String sql, StatementFactory statementFactory, ResultSetMapper<T> resultSetMapper) throws SQLException {
337332
Connection conn = connection();
338333
try (Statement statement = statementFactory.createStatement(conn)) {
339-
if (log.isDebugEnabled()) {
340-
log.debug("Query sql '{}'", sql);
341-
}
334+
LogUtils.debug(log, "Query sql '{}'", sql);
342335
try (ResultSet resultSet = statement.executeQuery(sql)) {
343336
if (resultSetMapper != null) {
344337
return resultSetMapper.map(resultSet);
@@ -381,9 +374,7 @@ public JdbcConnection preparedQuery(String sql, PreparedStatementFactory prepare
381374

382375
Connection conn = connection();
383376
try (PreparedStatement preparedStatement = preparedStatementFactory.createPreparedStatement(conn, sql)) {
384-
if (log.isDebugEnabled()) {
385-
log.debug("Query sql '{}'", sql);
386-
}
377+
LogUtils.debug(log, "Query sql '{}'", sql);
387378
if (preparedParameters != null) {
388379
for (int index = 0; index < preparedParameters.length; ++index) {
389380
final PreparedParameter preparedParameter = preparedParameters[index];
@@ -436,9 +427,7 @@ public <T> T preparedQuery(String sql, PreparedStatementFactory preparedStatemen
436427

437428
Connection conn = connection();
438429
try (PreparedStatement preparedStatement = preparedStatementFactory.createPreparedStatement(conn, sql)) {
439-
if (log.isDebugEnabled()) {
440-
log.debug("Query sql '{}'", sql);
441-
}
430+
LogUtils.debug(log, "Query sql '{}'", sql);
442431
if (preparedParameters != null) {
443432
for (int index = 0; index < preparedParameters.length; ++index) {
444433
final PreparedParameter preparedParameter = preparedParameters[index];
@@ -579,13 +568,9 @@ public static ConnectionFactory createPatternConnectionFactory(String urlWithPla
579568
} else {
580569
url = urlWithPlaceholder;
581570
}
582-
if (log.isDebugEnabled()) {
583-
log.debug("URL: {}", url);
584-
}
571+
LogUtils.debug(log, "URL: {}", url);
585572
Connection connection = DriverManager.getConnection(url, config.asProperties());
586-
if (log.isDebugEnabled()) {
587-
log.debug("User [{}] Connected to {}", config.getUser(), url);
588-
}
573+
LogUtils.debug(log, "User [{}] Connected to {}", config.getUser(), url);
589574
return connection;
590575
};
591576
}

eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/MysqlCdcEngine.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -424,9 +424,7 @@ protected void handleStopEvent(MysqlJdbcContext context, Event event) {
424424
* @param event the event to be handled
425425
*/
426426
protected void handleHeartbeatEvent(MysqlJdbcContext context, Event event) {
427-
if (log.isDebugEnabled()) {
428-
log.debug("Replication client handle {}", event.getHeader().getEventType());
429-
}
427+
LogUtils.debug(log, "Replication client handle {}", event.getHeader().getEventType());
430428
}
431429

432430
/**

eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/mysql/MysqlDatabaseDialect.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.eventmesh.connector.jdbc.source.dialect.mysql;
1919

20+
import org.apache.eventmesh.common.utils.LogUtils;
2021
import org.apache.eventmesh.connector.jdbc.DataTypeConvertor;
2122
import org.apache.eventmesh.connector.jdbc.JdbcDriverMetaData;
2223
import org.apache.eventmesh.connector.jdbc.connection.mysql.MysqlJdbcConnection;
@@ -137,9 +138,7 @@ public List<TableId> listTables(String databaseName) throws CatalogException, Da
137138

138139
// Build the SQL query to return a list of tables for the given database
139140
String sql = MysqlDialectSql.SHOW_DATABASE_TABLE.ofWrapperSQL("`" + databaseName + "`");
140-
if (log.isDebugEnabled()) {
141-
log.debug("List tables SQL:{}", sql);
142-
}
141+
LogUtils.debug(log, "List tables SQL:{}", sql);
143142
this.connection.query(sql, resultSet -> {
144143
// Execute the query and add each table ID to the list
145144
while (resultSet.next()) {
@@ -174,9 +173,7 @@ public CatalogTable getTable(TableId tableId) throws CatalogException, TableNotE
174173

175174
// Get table creation SQL
176175
final String createTableSql = MysqlDialectSql.SHOW_CREATE_TABLE.ofWrapperSQL(tableId.getId());
177-
if (log.isDebugEnabled()) {
178-
log.debug("Show create table SQL:{}", createTableSql);
179-
}
176+
LogUtils.debug(log, "Show create table SQL:{}", createTableSql);
180177

181178
this.connection.query(createTableSql, resultSet -> {
182179
boolean hasNext = resultSet.next();
@@ -189,9 +186,7 @@ public CatalogTable getTable(TableId tableId) throws CatalogException, TableNotE
189186

190187
// Get table columns SQL
191188
final String selectTableSql = MysqlDialectSql.SELECT_TABLE_COLUMNS.ofWrapperSQL(tableId.getId());
192-
if (log.isDebugEnabled()) {
193-
log.debug("Select table SQL:{}", selectTableSql);
194-
}
189+
LogUtils.debug(log, "Select table SQL:{}", selectTableSql);
195190
Map<String, DefaultColumn> columns = new HashMap<>(16);
196191
// Execute query to get table columns
197192
this.connection.query(selectTableSql, resultSet -> {
@@ -215,9 +210,7 @@ public CatalogTable getTable(TableId tableId) throws CatalogException, TableNotE
215210

216211
// Get table columns details SQL
217212
final String showTableSql = MysqlDialectSql.SHOW_TABLE_COLUMNS.ofWrapperSQL(tableId.getTableName(), tableId.getCatalogName());
218-
if (log.isDebugEnabled()) {
219-
log.debug("Show table columns SQL:{}", showTableSql);
220-
}
213+
LogUtils.debug(log, "Show table columns SQL:{}", showTableSql);
221214
// Execute query to get table columns details
222215
List<DefaultColumn> columnList = new ArrayList<>(columns.size());
223216
this.connection.query(showTableSql, resultSet -> {

eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/WorkflowAsyncPublishInstance.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.eventmesh.common.ExampleConstants;
2525
import org.apache.eventmesh.common.protocol.workflow.protos.ExecuteRequest;
2626
import org.apache.eventmesh.common.protocol.workflow.protos.ExecuteResponse;
27+
import org.apache.eventmesh.common.utils.LogUtils;
2728
import org.apache.eventmesh.common.utils.ThreadUtils;
2829
import org.apache.eventmesh.grpc.GrpcAbstractDemo;
2930
import org.apache.eventmesh.selector.NacosSelector;
@@ -65,9 +66,7 @@ public static void main(String[] args) throws Exception {
6566
EventMeshWorkflowClient eventMeshWorkflowClient = new EventMeshWorkflowClient(eventMeshWorkflowClientConfig);
6667
ExecuteResponse response = eventMeshWorkflowClient.getWorkflowClient().execute(executeRequest.build());
6768

68-
if (log.isInfoEnabled()) {
69-
log.info("received response: {}", response.toString());
70-
}
69+
LogUtils.info(log, "received response: {}", response.toString());
7170

7271
ThreadUtils.sleep(1, TimeUnit.MINUTES);
7372

eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsAsyncSubscribe.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.eventmesh.common.protocol.SubscriptionItem;
2525
import org.apache.eventmesh.common.protocol.SubscriptionMode;
2626
import org.apache.eventmesh.common.protocol.SubscriptionType;
27+
import org.apache.eventmesh.common.utils.LogUtils;
2728
import org.apache.eventmesh.common.utils.ThreadUtils;
2829
import org.apache.eventmesh.grpc.GrpcAbstractDemo;
2930

@@ -61,9 +62,7 @@ public static void main(String[] args) throws InterruptedException, IOException
6162

6263
@Override
6364
public Optional<CloudEvent> handle(final CloudEvent msg) {
64-
if (log.isInfoEnabled()) {
65-
log.info("receive async msg: {}", msg);
66-
}
65+
LogUtils.info(log, "receive async msg: {}", msg);
6766
return Optional.empty();
6867
}
6968

eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsSubscribeReply.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.eventmesh.common.protocol.SubscriptionItem;
2525
import org.apache.eventmesh.common.protocol.SubscriptionMode;
2626
import org.apache.eventmesh.common.protocol.SubscriptionType;
27+
import org.apache.eventmesh.common.utils.LogUtils;
2728
import org.apache.eventmesh.common.utils.ThreadUtils;
2829
import org.apache.eventmesh.grpc.GrpcAbstractDemo;
2930

@@ -62,9 +63,7 @@ public static void main(String[] args) throws InterruptedException, IOException
6263

6364
@Override
6465
public Optional<CloudEvent> handle(final CloudEvent msg) {
65-
if (log.isInfoEnabled()) {
66-
log.info("receive request-reply msg: {}", msg);
67-
}
66+
LogUtils.info(log, "receive request-reply msg: {}", msg);
6867

6968
if (msg != null) {
7069
return Optional.of(msg);

eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventMeshAsyncSubscribe.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.eventmesh.common.protocol.SubscriptionItem;
2626
import org.apache.eventmesh.common.protocol.SubscriptionMode;
2727
import org.apache.eventmesh.common.protocol.SubscriptionType;
28+
import org.apache.eventmesh.common.utils.LogUtils;
2829
import org.apache.eventmesh.common.utils.ThreadUtils;
2930
import org.apache.eventmesh.grpc.GrpcAbstractDemo;
3031

@@ -60,9 +61,7 @@ public static void main(String[] args) throws InterruptedException, IOException
6061

6162
@Override
6263
public Optional<EventMeshMessage> handle(final EventMeshMessage msg) {
63-
if (log.isInfoEnabled()) {
64-
log.info("receive async msg: {}", msg);
65-
}
64+
LogUtils.info(log, "receive async msg: {}", msg);
6665
return Optional.empty();
6766
}
6867

0 commit comments

Comments
 (0)