2222
2323import static io .netty .buffer .PooledByteBufAllocator .DEFAULT ;
2424import static java .lang .Class .forName ;
25- import static java .lang .ThreadLocal .withInitial ;
2625import static org .lmdbjava .UnsafeAccess .UNSAFE ;
2726
2827import java .lang .reflect .Field ;
29- import java .util .ArrayDeque ;
3028
3129import io .netty .buffer .ByteBuf ;
30+ import io .netty .buffer .PooledByteBufAllocator ;
3231import jnr .ffi .Pointer ;
3332
3433/**
@@ -47,37 +46,34 @@ public final class ByteBufProxy extends BufferProxy<ByteBuf> {
4746 */
4847 public static final BufferProxy <ByteBuf > PROXY_NETTY = new ByteBufProxy ();
4948
50- private static final long ADDRESS_OFFSET ;
51-
52- /**
53- * A thread-safe pool for a given length. If the buffer found is bigger then
54- * the buffer in the pool creates a new buffer. If no buffer is found creates
55- * a new buffer.
56- */
57- private static final ThreadLocal <ArrayDeque <ByteBuf >> BUFFERS = withInitial (()
58- -> new ArrayDeque <>(16 ));
59-
6049 private static final int BUFFER_RETRIES = 10 ;
6150 private static final String FIELD_NAME_ADDRESS = "memoryAddress" ;
6251 private static final String FIELD_NAME_LENGTH = "length" ;
63- private static final long LENGTH_OFFSET ;
6452 private static final String NAME = "io.netty.buffer.PooledUnsafeDirectByteBuf" ;
53+ private final long lengthOffset ;
54+ private final long addressOffset ;
55+
56+ private final PooledByteBufAllocator nettyAllocator ;
57+
58+ private ByteBufProxy () {
59+ this (DEFAULT );
60+ }
61+
62+ public ByteBufProxy (final PooledByteBufAllocator allocator ) {
63+ this .nettyAllocator = allocator ;
6564
66- static {
6765 try {
68- createBuffer ();
66+ final ByteBuf initBuf = this .allocate ();
67+ initBuf .release ();
6968 final Field address = findField (NAME , FIELD_NAME_ADDRESS );
7069 final Field length = findField (NAME , FIELD_NAME_LENGTH );
71- ADDRESS_OFFSET = UNSAFE .objectFieldOffset (address );
72- LENGTH_OFFSET = UNSAFE .objectFieldOffset (length );
70+ addressOffset = UNSAFE .objectFieldOffset (address );
71+ lengthOffset = UNSAFE .objectFieldOffset (length );
7372 } catch (final SecurityException e ) {
7473 throw new LmdbException ("Field access error" , e );
7574 }
7675 }
7776
78- private ByteBufProxy () {
79- }
80-
8177 static Field findField (final String c , final String name ) {
8278 Class <?> clazz ;
8379 try {
@@ -97,39 +93,27 @@ static Field findField(final String c, final String name) {
9793 throw new LmdbException (name + " not found" );
9894 }
9995
100- private static ByteBuf createBuffer () {
96+ @ Override
97+ protected ByteBuf allocate () {
10198 for (int i = 0 ; i < BUFFER_RETRIES ; i ++) {
102- final ByteBuf bb = DEFAULT .directBuffer (0 );
99+ final ByteBuf bb = nettyAllocator .directBuffer ();
103100 if (NAME .equals (bb .getClass ().getName ())) {
104101 return bb ;
102+ } else {
103+ bb .release ();
105104 }
106105 }
107106 throw new IllegalStateException ("Netty buffer must be " + NAME );
108107 }
109108
110- @ Override
111- protected ByteBuf allocate () {
112- final ArrayDeque <ByteBuf > queue = BUFFERS .get ();
113- final ByteBuf buffer = queue .poll ();
114-
115- if (buffer != null && buffer .capacity () >= 0 ) {
116- return buffer ;
117- } else {
118- return createBuffer ();
119- }
120- }
121-
122109 @ Override
123110 protected int compare (final ByteBuf o1 , final ByteBuf o2 ) {
124111 return o1 .compareTo (o2 );
125112 }
126113
127114 @ Override
128115 protected void deallocate (final ByteBuf buff ) {
129- final ArrayDeque <ByteBuf > queue = BUFFERS .get ();
130- if (!queue .offer (buff )) {
131- buff .release ();
132- }
116+ buff .release ();
133117 }
134118
135119 @ Override
@@ -161,8 +145,8 @@ protected ByteBuf out(final ByteBuf buffer, final Pointer ptr,
161145 final long ptrAddr ) {
162146 final long addr = UNSAFE .getLong (ptrAddr + STRUCT_FIELD_OFFSET_DATA );
163147 final long size = UNSAFE .getLong (ptrAddr + STRUCT_FIELD_OFFSET_SIZE );
164- UNSAFE .putLong (buffer , ADDRESS_OFFSET , addr );
165- UNSAFE .putInt (buffer , LENGTH_OFFSET , (int ) size );
148+ UNSAFE .putLong (buffer , addressOffset , addr );
149+ UNSAFE .putInt (buffer , lengthOffset , (int ) size );
166150 buffer .writerIndex ((int ) size ).readerIndex (0 );
167151 return buffer ;
168152 }
0 commit comments