Skip to content

Commit 8a7d799

Browse files
authored
Merge 802b96c into 1e59cdf
2 parents 1e59cdf + 802b96c commit 8a7d799

File tree

92 files changed

+6011
-2
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

92 files changed

+6011
-2
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,5 @@ bld/
4747
[Bb]in/
4848
[Oo]bj/
4949
[Ll]og/
50-
[Ll]ogs/
50+
[Ll]ogs/
51+
**/org/apache/eventmesh/connector/jdbc/antlr4/autogeneration/*

build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ allprojects {
9090
.exclude('**/org/apache/eventmesh/common/protocol/grpc/cloudevents**')
9191
.exclude('**/org/apache/eventmesh/connector/openfunction/client/EventMeshGrpcService**')
9292
.exclude('**/org/apache/eventmesh/connector/openfunction/client/CallbackServiceGrpc**')
93+
.exclude('**/org/apache/eventmesh/connector/jdbc/antlr**')
9394

9495
dependencies {
9596
repositories {
@@ -249,6 +250,9 @@ subprojects {
249250
rulesMinimumPriority = 5
250251
ruleSets = ["category/java/errorprone.xml", "category/java/bestpractices.xml"]
251252
ignoreFailures = true
253+
pmdMain {
254+
excludes = ["**/org/apache/eventmesh/connector/jdbc/antlr4/autogeneration/**"]
255+
}
252256
}
253257

254258
jar {
@@ -341,6 +345,7 @@ subprojects {
341345
javadoc {
342346
source = sourceSets.main.java
343347
destinationDir = reporting.file("javadoc")
348+
options.encoding = "UTF-8"
344349
}
345350

346351
task packageJavadoc(type: Jar, dependsOn: ['javadoc']) {
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.common;
19+
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
22+
23+
/**
24+
* ResetCountDownLatch can reset
25+
*
26+
* @see java.util.concurrent.CountDownLatch
27+
*/
28+
public class ResetCountDownLatch {
29+
30+
private final RestSync restSync;
31+
32+
public ResetCountDownLatch(int count) {
33+
this.restSync = new RestSync(count);
34+
}
35+
36+
37+
/**
38+
* Causes the current thread to wait until the latch has counted down to zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
39+
*
40+
* <p>If the current count is zero then this method returns immediately.
41+
*
42+
* <p>If the current count is greater than zero then the current
43+
* thread becomes disabled for thread scheduling purposes and lies dormant until one of two things happen:
44+
* <ul>
45+
* <li>The count reaches zero due to invocations of the
46+
* {@link #countDown} method; or
47+
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
48+
* the current thread.
49+
* </ul>
50+
*
51+
* <p>If the current thread:
52+
* <ul>
53+
* <li>has its interrupted status set on entry to this method; or
54+
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
55+
* </ul>
56+
* then {@link InterruptedException} is thrown and the current thread's
57+
* interrupted status is cleared.
58+
*
59+
* @throws InterruptedException if the current thread is interrupted while waiting
60+
*/
61+
public void await() throws InterruptedException {
62+
restSync.acquireSharedInterruptibly(1);
63+
}
64+
65+
/**
66+
* Causes the current thread to wait until the latch has counted down to zero, unless the thread is {@linkplain Thread#interrupt interrupted}, or
67+
* the specified waiting time elapses.
68+
*
69+
* <p>If the current count is zero then this method returns immediately
70+
* with the value {@code true}.
71+
*
72+
* <p>If the current count is greater than zero then the current
73+
* thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happen:
74+
* <ul>
75+
* <li>The count reaches zero due to invocations of the
76+
* {@link #countDown} method; or
77+
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
78+
* the current thread; or
79+
* <li>The specified waiting time elapses.
80+
* </ul>
81+
*
82+
* <p>If the count reaches zero then the method returns with the
83+
* value {@code true}.
84+
*
85+
* <p>If the current thread:
86+
* <ul>
87+
* <li>has its interrupted status set on entry to this method; or
88+
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
89+
* </ul>
90+
* then {@link InterruptedException} is thrown and the current thread's
91+
* interrupted status is cleared.
92+
*
93+
* <p>If the specified waiting time elapses then the value {@code false}
94+
* is returned. If the time is less than or equal to zero, the method
95+
* will not wait at all.
96+
*
97+
* @param timeout the maximum time to wait
98+
* @param unit the time unit of the {@code timeout} argument
99+
* @return {@code true} if the count reached zero and {@code false} if the waiting time elapsed before the count reached zero
100+
* @throws InterruptedException if the current thread is interrupted while waiting
101+
*/
102+
public boolean await(long timeout, TimeUnit unit)
103+
throws InterruptedException {
104+
return restSync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
105+
}
106+
107+
108+
/**
109+
* Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
110+
*
111+
* <p>If the current count is greater than zero then it is decremented.
112+
* If the new count is zero then all waiting threads are re-enabled for thread scheduling purposes.
113+
*
114+
* <p>If the current count equals zero then nothing happens.
115+
*/
116+
public void countDown() {
117+
restSync.releaseShared(1);
118+
}
119+
120+
/**
121+
* Returns the current count.
122+
*
123+
* <p>This method is typically used for debugging and testing purposes.
124+
*
125+
* @return the current count
126+
*/
127+
public int getCount() {
128+
return restSync.getCount();
129+
}
130+
131+
/**
132+
* Reset the CountDownLatch
133+
*/
134+
public void reset() {
135+
restSync.reset();
136+
}
137+
138+
/**
139+
* Synchronization control For ResetCountDownLatch. Uses AQS state to represent count.
140+
*/
141+
private static final class RestSync extends AbstractQueuedSynchronizer {
142+
143+
private final int initCount;
144+
145+
RestSync(int count) {
146+
if (count < 0) {
147+
throw new IllegalArgumentException("count must be greater than or equal to 0");
148+
}
149+
this.initCount = count;
150+
setState(count);
151+
}
152+
153+
protected void reset() {
154+
setState(initCount);
155+
}
156+
157+
int getCount() {
158+
return getState();
159+
}
160+
161+
@Override
162+
protected int tryAcquireShared(int acquires) {
163+
return (getState() == 0) ? 1 : -1;
164+
}
165+
166+
@Override
167+
protected boolean tryReleaseShared(int releases) {
168+
for (; ; ) {
169+
int count = getState();
170+
if (count == 0) {
171+
return false;
172+
}
173+
int nextCount = count - 1;
174+
if (compareAndSetState(count, nextCount)) {
175+
return nextCount == 0;
176+
}
177+
}
178+
}
179+
}
180+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.common;
19+
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicBoolean;
22+
23+
import lombok.extern.slf4j.Slf4j;
24+
25+
@Slf4j
26+
public abstract class ThreadWrapper implements Runnable {
27+
28+
private final AtomicBoolean started = new AtomicBoolean(false);
29+
protected Thread thread;
30+
protected final ResetCountDownLatch waiter = new ResetCountDownLatch(1);
31+
protected volatile AtomicBoolean hasWakeup = new AtomicBoolean(false);
32+
protected boolean isDaemon = false;
33+
protected volatile boolean isRunning = false;
34+
35+
public ThreadWrapper() {
36+
37+
}
38+
39+
public abstract String getThreadName();
40+
41+
public void start() {
42+
43+
if (!started.compareAndSet(false, true)) {
44+
log.warn("Start thread:{} fail", getThreadName());
45+
return;
46+
}
47+
this.thread = new Thread(this, getThreadName());
48+
this.thread.setDaemon(isDaemon);
49+
this.thread.start();
50+
this.isRunning = true;
51+
log.info("Start thread:{} success", getThreadName());
52+
}
53+
54+
public void await() {
55+
if (hasWakeup.compareAndSet(true, false)) {
56+
return;
57+
}
58+
//reset count
59+
waiter.reset();
60+
try {
61+
waiter.await();
62+
} catch (InterruptedException e) {
63+
log.error("Thread[{}] Interrupted", getThreadName(), e);
64+
} finally {
65+
hasWakeup.set(false);
66+
}
67+
}
68+
69+
public void await(long timeout) {
70+
await(timeout, TimeUnit.MILLISECONDS);
71+
}
72+
73+
public void await(long timeout, TimeUnit timeUnit) {
74+
if (hasWakeup.compareAndSet(true, false)) {
75+
return;
76+
}
77+
//reset count
78+
waiter.reset();
79+
try {
80+
waiter.await(timeout, timeUnit == null ? TimeUnit.MILLISECONDS : timeUnit);
81+
} catch (InterruptedException e) {
82+
log.error("Thread[{}] Interrupted", getThreadName(), e);
83+
} finally {
84+
hasWakeup.set(false);
85+
}
86+
}
87+
88+
public void wakeup() {
89+
if (hasWakeup.compareAndSet(false, true)) {
90+
waiter.countDown();
91+
}
92+
}
93+
94+
public void shutdownImmediately() {
95+
shutdown(true);
96+
}
97+
98+
public void shutdown() {
99+
shutdown(false);
100+
}
101+
102+
private void shutdown(final boolean interruptThread) {
103+
if (!started.compareAndSet(true, false)) {
104+
return;
105+
}
106+
this.isRunning = false;
107+
//wakeup the thread to run
108+
wakeup();
109+
110+
try {
111+
if (interruptThread) {
112+
this.thread.interrupt();
113+
}
114+
if (!this.isDaemon) {
115+
//wait main thread to wait this thread finish
116+
this.thread.join(TimeUnit.SECONDS.toMillis(60));
117+
}
118+
} catch (InterruptedException e) {
119+
log.error("Thread[{}] Interrupted", getThreadName(), e);
120+
}
121+
}
122+
123+
public void setDaemon(boolean daemon) {
124+
isDaemon = daemon;
125+
}
126+
127+
public boolean isStated() {
128+
return this.started.get();
129+
}
130+
}

0 commit comments

Comments
 (0)