Skip to content

Commit bb3324a

Browse files
2tsumo-hitoriolegz
authored andcommitted
Replaced ReentrantLock with ConcurrentHashMap.computeIfAbsent()
to improve performance in StreamBridge.send() method. - Avoids unnecessary locking overhead - Improves concurrency and reduces contention - Enhances throughput for high-load scenarios Signed-off-by: 2tsumo-hitori <[email protected]>
1 parent c733627 commit bb3324a

1 file changed

Lines changed: 6 additions & 17 deletions

File tree

  • core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import java.lang.reflect.Type;
2020
import java.util.Collections;
21-
import java.util.HashMap;
21+
import java.util.concurrent.ConcurrentHashMap;
2222
import java.util.LinkedHashMap;
2323
import java.util.Map;
2424
import java.util.concurrent.ExecutorService;
@@ -163,7 +163,7 @@ protected boolean removeEldestEntry(Map.Entry<String, MessageChannel> eldest) {
163163
}
164164
};
165165
this.functionInvocationHelper = applicationContext.getBean(FunctionInvocationHelper.class);
166-
this.streamBridgeFunctionCache = new HashMap<>();
166+
this.streamBridgeFunctionCache = new ConcurrentHashMap<>();
167167
observationRegistries.ifAvailable(registry -> this.observationRegistry = registry);
168168
}
169169

@@ -198,15 +198,7 @@ public boolean send(String bindingName, @Nullable String binderName, Object data
198198
ProducerProperties producerProperties = this.bindingServiceProperties.getProducerProperties(bindingName);
199199
MessageChannel messageChannel = this.resolveDestination(bindingName, producerProperties, binderName);
200200

201-
Function functionToInvoke;
202-
lock.lock();
203-
try {
204-
functionToInvoke = this.getStreamBridgeFunction(outputContentType.toString(), producerProperties);
205-
}
206-
finally {
207-
lock.unlock();
208-
}
209-
201+
Function functionToInvoke = this.getStreamBridgeFunction(outputContentType.toString(), producerProperties);
210202

211203
if (producerProperties != null && producerProperties.isPartitioned()) {
212204
functionToInvoke = new PartitionAwareFunctionWrapper(functionToInvoke, this.applicationContext, producerProperties);
@@ -253,15 +245,12 @@ private int hashProducerProperties(ProducerProperties producerProperties, String
253245

254246
private FunctionInvocationWrapper getStreamBridgeFunction(String outputContentType, ProducerProperties producerProperties) {
255247
int streamBridgeFunctionKey = this.hashProducerProperties(producerProperties, outputContentType);
256-
if (this.streamBridgeFunctionCache.containsKey(streamBridgeFunctionKey)) {
257-
return this.streamBridgeFunctionCache.get(streamBridgeFunctionKey);
258-
}
259-
else {
248+
249+
return this.streamBridgeFunctionCache.computeIfAbsent(streamBridgeFunctionKey, key -> {
260250
FunctionInvocationWrapper functionToInvoke = this.functionCatalog.lookup(STREAM_BRIDGE_FUNC_NAME, outputContentType.toString());
261-
this.streamBridgeFunctionCache.put(streamBridgeFunctionKey, functionToInvoke);
262251
functionToInvoke.setSkipOutputConversion(producerProperties.isUseNativeEncoding());
263252
return functionToInvoke;
264-
}
253+
});
265254
}
266255

267256
@Override

0 commit comments

Comments
 (0)