Skip to content

Commit f9c3212

Browse files
committed
Polishing.
Introduce ReferenceCountUtil for safe and conditional release of ReferenceCounted objects. Add missing author tags. [#279][resolves #298] Signed-off-by: Mark Paluch <[email protected]>
1 parent 922997c commit f9c3212

7 files changed

Lines changed: 85 additions & 65 deletions

File tree

src/main/java/io/r2dbc/mssql/codec/BlobCodec.java

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,17 @@
1919
import io.netty.buffer.ByteBuf;
2020
import io.netty.buffer.ByteBufAllocator;
2121
import io.netty.buffer.Unpooled;
22-
import io.r2dbc.mssql.message.type.Length;
23-
import io.r2dbc.mssql.message.type.LengthStrategy;
24-
import io.r2dbc.mssql.message.type.PlpLength;
25-
import io.r2dbc.mssql.message.type.SqlServerType;
26-
import io.r2dbc.mssql.message.type.TypeInformation;
22+
import io.r2dbc.mssql.message.type.*;
2723
import io.r2dbc.mssql.util.Assert;
24+
import io.r2dbc.mssql.util.ReferenceCountUtil;
2825
import io.r2dbc.spi.Blob;
2926
import org.reactivestreams.Publisher;
3027
import reactor.core.publisher.Flux;
3128
import reactor.core.publisher.Mono;
3229
import reactor.util.annotation.Nullable;
3330

3431
import java.nio.ByteBuffer;
35-
import java.util.ArrayList;
36-
import java.util.Collections;
37-
import java.util.EnumSet;
38-
import java.util.List;
39-
import java.util.Set;
32+
import java.util.*;
4033

4134
/**
4235
* Codec for binary values that are represented as {@link Blob}.
@@ -48,6 +41,7 @@
4841
* </ul>
4942
*
5043
* @author Mark Paluch
44+
* @author Tomasz Marciniak
5145
*/
5246
public class BlobCodec extends AbstractCodec<Blob> {
5347

@@ -56,8 +50,7 @@ public class BlobCodec extends AbstractCodec<Blob> {
5650
*/
5751
public static final BlobCodec INSTANCE = new BlobCodec();
5852

59-
private static final Set<SqlServerType> SUPPORTED_TYPES = EnumSet.of(SqlServerType.BINARY, SqlServerType.VARBINARY,
60-
SqlServerType.VARBINARYMAX, SqlServerType.IMAGE);
53+
private static final Set<SqlServerType> SUPPORTED_TYPES = EnumSet.of(SqlServerType.BINARY, SqlServerType.VARBINARY, SqlServerType.VARBINARYMAX, SqlServerType.IMAGE);
6154

6255
private BlobCodec() {
6356
super(Blob.class);
@@ -165,21 +158,11 @@ public Publisher<ByteBuffer> stream() {
165158

166159
result.flip();
167160
return result;
168-
})
169-
.doOnDiscard(ByteBuf.class, buffer ->
170-
{
171-
if (buffer.refCnt() > 0) {
172-
buffer.release();
173-
}
174-
}
175-
)
176-
.doOnCancel(() -> {
177-
for (ByteBuf buffer : this.buffers) {
178-
if (buffer.refCnt() > 0) {
179-
buffer.release();
180-
}
181-
}
182-
});
161+
}).doOnDiscard(ByteBuf.class, ReferenceCountUtil::maybeRelease).doOnCancel(() -> {
162+
for (ByteBuf buffer : this.buffers) {
163+
ReferenceCountUtil.maybeRelease(buffer);
164+
}
165+
});
183166
}
184167

185168
@Override
@@ -188,9 +171,7 @@ public Publisher<Void> discard() {
188171
return Mono.fromRunnable(() -> {
189172

190173
for (ByteBuf buffer : this.buffers) {
191-
if (buffer.refCnt() > 0) {
192-
buffer.release();
193-
}
174+
ReferenceCountUtil.maybeRelease(buffer);
194175
}
195176
});
196177
}

src/main/java/io/r2dbc/mssql/codec/ByteArray.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
import io.netty.buffer.ByteBufAllocator;
2121
import io.netty.buffer.ByteBufUtil;
2222
import io.r2dbc.mssql.util.Assert;
23+
import io.r2dbc.mssql.util.ReferenceCountUtil;
2324

2425
import java.util.function.Function;
2526

2627
/**
2728
* Utility to create byte arrays.
2829
*
2930
* @author Mark Paluch
31+
* @author Tomasz Marciniak
3032
*/
3133
abstract class ByteArray {
3234

@@ -42,14 +44,12 @@ static byte[] fromEncoded(Function<ByteBufAllocator, Encoded> encodeFunction) {
4244

4345
Encoded encoded = encodeFunction.apply(ByteBufAllocator.DEFAULT);
4446

45-
ByteBuf buffer = null;
47+
ByteBuf buffer = encoded.getValue();
4648
try {
47-
buffer = encoded.getValue();
4849
return ByteBufUtil.getBytes(buffer);
4950
} finally {
50-
if (buffer != null && buffer.refCnt() > 0) {
51-
buffer.release();
52-
}
51+
encoded.dispose();
52+
ReferenceCountUtil.maybeRelease(buffer);
5353
}
5454
}
5555

src/main/java/io/r2dbc/mssql/codec/ClobCodec.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,10 @@
2020
import io.netty.buffer.ByteBufAllocator;
2121
import io.netty.buffer.ByteBufUtil;
2222
import io.netty.buffer.CompositeByteBuf;
23-
import io.netty.util.ReferenceCountUtil;
2423
import io.r2dbc.mssql.codec.RpcParameterContext.CharacterValueContext;
25-
import io.r2dbc.mssql.message.type.Length;
26-
import io.r2dbc.mssql.message.type.LengthStrategy;
27-
import io.r2dbc.mssql.message.type.PlpLength;
28-
import io.r2dbc.mssql.message.type.SqlServerType;
29-
import io.r2dbc.mssql.message.type.TypeInformation;
24+
import io.r2dbc.mssql.message.type.*;
3025
import io.r2dbc.mssql.util.Assert;
26+
import io.r2dbc.mssql.util.ReferenceCountUtil;
3127
import io.r2dbc.spi.Clob;
3228
import io.r2dbc.spi.R2dbcNonTransientException;
3329
import org.reactivestreams.Publisher;
@@ -232,11 +228,7 @@ public Publisher<CharSequence> stream() {
232228
}
233229

234230
})
235-
.doFinally(s -> {
236-
if (this.remainder.refCnt() > 0) {
237-
ReferenceCountUtil.safeRelease(this.remainder);
238-
}
239-
});
231+
.doFinally(s -> ReferenceCountUtil.maybeRelease(this.remainder));
240232
}
241233

242234
@Override
@@ -246,8 +238,8 @@ public Publisher<Void> discard() {
246238

247239
private void releaseBuffers() {
248240

249-
ReferenceCountUtil.safeRelease(this.remainder);
250-
ReferenceCountUtil.safeRelease(this.buffer);
241+
ReferenceCountUtil.maybeSafeRelease(this.remainder);
242+
ReferenceCountUtil.maybeSafeRelease(this.buffer);
251243
}
252244

253245
private static Flux<ByteBuf> createBufferStream(ByteBuf plpStream, Length valueLength, TypeInformation type) {
@@ -273,9 +265,7 @@ private static Flux<ByteBuf> createBufferStream(ByteBuf plpStream, Length valueL
273265
}
274266
})
275267
.doFinally(s -> {
276-
if (plpStream.refCnt() > 0) {
277-
ReferenceCountUtil.safeRelease(plpStream);
278-
}
268+
ReferenceCountUtil.maybeSafeRelease(plpStream);
279269
});
280270
}
281271

src/main/java/io/r2dbc/mssql/codec/Encoded.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.netty.buffer.ByteBuf;
2020
import io.r2dbc.mssql.message.type.SqlServerType;
2121
import io.r2dbc.mssql.message.type.TdsDataType;
22+
import io.r2dbc.mssql.util.ReferenceCountUtil;
2223
import reactor.core.Disposable;
2324

2425
import java.util.function.IntFunction;
@@ -115,20 +116,17 @@ static class DisposableSupplier implements Supplier<ByteBuf>, Disposable {
115116

116117
@Override
117118
public ByteBuf get() {
118-
return buf.asReadOnly();
119+
return this.buf.asReadOnly();
119120
}
120121

121122
@Override
122123
public void dispose() {
123-
124-
if (!isDisposed()) {
125-
buf.release();
126-
}
124+
ReferenceCountUtil.maybeSafeRelease(this.buf);
127125
}
128126

129127
@Override
130128
public boolean isDisposed() {
131-
return buf.refCnt() == 0;
129+
return this.buf.refCnt() == 0;
132130
}
133131
}
134132

@@ -146,11 +144,11 @@ public LengthAwareSupplier(int length, IntFunction<ByteBuf> delegate) {
146144

147145
@Override
148146
public ByteBuf get() {
149-
return delegate.apply(length);
147+
return this.delegate.apply(this.length);
150148
}
151149

152150
public int getLength() {
153-
return length;
151+
return this.length;
154152
}
155153
}
156154
}

src/main/java/io/r2dbc/mssql/message/token/Attention.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@
2727
import io.r2dbc.mssql.message.tds.TdsFragment;
2828
import io.r2dbc.mssql.message.tds.TdsPackets;
2929
import io.r2dbc.mssql.util.Assert;
30+
import io.r2dbc.mssql.util.ReferenceCountUtil;
3031

3132
import java.util.Objects;
3233

3334
/**
3435
* Attention signal to cancel a running operation.
3536
*
3637
* @author Mark Paluch
38+
* @author Tomasz Marciniak
3739
* @since 0.9
3840
*/
3941
public final class Attention implements ClientMessage, TokenStream {
@@ -79,9 +81,7 @@ public TdsFragment encode(ByteBufAllocator allocator, int packetSize) {
7981
ByteBuf buffer = allocator.buffer(length);
8082
encode(buffer);
8183

82-
if(buffer.refCnt() > 0) {
83-
buffer.release();
84-
}
84+
ReferenceCountUtil.maybeRelease(buffer);
8585

8686
return TdsPackets.create(header, Unpooled.EMPTY_BUFFER);
8787
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.mssql.util;
18+
19+
import io.netty.util.ReferenceCounted;
20+
import reactor.util.annotation.Nullable;
21+
22+
/**
23+
* Collection of methods to handle objects that may implement {@link ReferenceCounted}.
24+
*
25+
* @author Mark Paluch
26+
* @since 1.0.3
27+
*/
28+
public class ReferenceCountUtil {
29+
30+
/**
31+
* Try to call {@link ReferenceCounted#release()} if the specified object implements {@link ReferenceCounted} and its reference count is greater than zero.
32+
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
33+
*/
34+
public static void maybeRelease(@Nullable Object obj) {
35+
if (obj instanceof ReferenceCounted && ((ReferenceCounted) obj).refCnt() > 0) {
36+
((ReferenceCounted) obj).release();
37+
}
38+
}
39+
40+
/**
41+
* Try to call {@link ReferenceCounted#release()} if the specified object implements {@link ReferenceCounted} and its reference count is greater than zero.
42+
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
43+
*/
44+
public static void maybeSafeRelease(@Nullable Object obj) {
45+
if (obj instanceof ReferenceCounted && ((ReferenceCounted) obj).refCnt() > 0) {
46+
io.netty.util.ReferenceCountUtil.safeRelease(obj);
47+
}
48+
}
49+
50+
// Utility constructor
51+
private ReferenceCountUtil() {
52+
}
53+
}

src/test/java/io/r2dbc/mssql/util/EncodedAssert.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,7 @@ public EncodedAssert isEncodedAs(Consumer<ByteBuf> encoded) {
119119
Assertions.assertThat(ByteBufUtil.prettyHexDump(this.actual)).describedAs("ByteBuf")
120120
.isEqualTo(ByteBufUtil.prettyHexDump(expected));
121121

122-
if(this.actual.refCnt()>0){
123-
this.actual.release();
124-
}
122+
ReferenceCountUtil.maybeSafeRelease(this.actual);
125123

126124
return this;
127125
}

0 commit comments

Comments
 (0)