|
18 | 18 |
|
19 | 19 | import java.lang.reflect.Type; |
20 | 20 | import java.util.Collections; |
21 | | -import java.util.HashMap; |
| 21 | +import java.util.concurrent.ConcurrentHashMap; |
22 | 22 | import java.util.LinkedHashMap; |
23 | 23 | import java.util.Map; |
24 | 24 | import java.util.concurrent.ExecutorService; |
@@ -163,7 +163,7 @@ protected boolean removeEldestEntry(Map.Entry<String, MessageChannel> eldest) { |
163 | 163 | } |
164 | 164 | }; |
165 | 165 | this.functionInvocationHelper = applicationContext.getBean(FunctionInvocationHelper.class); |
166 | | - this.streamBridgeFunctionCache = new HashMap<>(); |
| 166 | + this.streamBridgeFunctionCache = new ConcurrentHashMap<>(); |
167 | 167 | observationRegistries.ifAvailable(registry -> this.observationRegistry = registry); |
168 | 168 | } |
169 | 169 |
|
@@ -198,15 +198,7 @@ public boolean send(String bindingName, @Nullable String binderName, Object data |
198 | 198 | ProducerProperties producerProperties = this.bindingServiceProperties.getProducerProperties(bindingName); |
199 | 199 | MessageChannel messageChannel = this.resolveDestination(bindingName, producerProperties, binderName); |
200 | 200 |
|
201 | | - Function functionToInvoke; |
202 | | - lock.lock(); |
203 | | - try { |
204 | | - functionToInvoke = this.getStreamBridgeFunction(outputContentType.toString(), producerProperties); |
205 | | - } |
206 | | - finally { |
207 | | - lock.unlock(); |
208 | | - } |
209 | | - |
| 201 | + Function functionToInvoke = this.getStreamBridgeFunction(outputContentType.toString(), producerProperties); |
210 | 202 |
|
211 | 203 | if (producerProperties != null && producerProperties.isPartitioned()) { |
212 | 204 | functionToInvoke = new PartitionAwareFunctionWrapper(functionToInvoke, this.applicationContext, producerProperties); |
@@ -253,15 +245,12 @@ private int hashProducerProperties(ProducerProperties producerProperties, String |
253 | 245 |
|
254 | 246 | private FunctionInvocationWrapper getStreamBridgeFunction(String outputContentType, ProducerProperties producerProperties) { |
255 | 247 | int streamBridgeFunctionKey = this.hashProducerProperties(producerProperties, outputContentType); |
256 | | - if (this.streamBridgeFunctionCache.containsKey(streamBridgeFunctionKey)) { |
257 | | - return this.streamBridgeFunctionCache.get(streamBridgeFunctionKey); |
258 | | - } |
259 | | - else { |
| 248 | + |
| 249 | + return this.streamBridgeFunctionCache.computeIfAbsent(streamBridgeFunctionKey, key -> { |
260 | 250 | FunctionInvocationWrapper functionToInvoke = this.functionCatalog.lookup(STREAM_BRIDGE_FUNC_NAME, outputContentType.toString()); |
261 | | - this.streamBridgeFunctionCache.put(streamBridgeFunctionKey, functionToInvoke); |
262 | 251 | functionToInvoke.setSkipOutputConversion(producerProperties.isUseNativeEncoding()); |
263 | 252 | return functionToInvoke; |
264 | | - } |
| 253 | + }); |
265 | 254 | } |
266 | 255 |
|
267 | 256 | @Override |
|
0 commit comments