3939import org .apache .bookkeeper .mledger .proto .MLDataFormats .ManagedCursorInfo ;
4040import org .apache .bookkeeper .mledger .proto .MLDataFormats .ManagedLedgerInfo ;
4141import org .apache .bookkeeper .util .SafeRunnable ;
42+ import org .apache .commons .lang .StringUtils ;
4243import org .apache .pulsar .common .allocator .PulsarByteBufAllocator ;
4344import org .apache .pulsar .common .compression .CompressionCodec ;
4445import org .apache .pulsar .common .compression .CompressionCodecProvider ;
@@ -55,30 +56,39 @@ public class MetaStoreImpl implements MetaStore {
5556 private final MetadataStore store ;
5657 private final OrderedExecutor executor ;
5758
58- private static final int MAGIC_MANAGED_LEDGER_INFO_METADATA = 0x4778 ; // 0100 0111 0111 1000
59- private final CompressionType compressionType ;
59+ private static final int MAGIC_MANAGED_INFO_METADATA = 0x4778 ; // 0100 0111 0111 1000
60+ private final CompressionType ledgerInfoCompressionType ;
61+ private final CompressionType cursorInfoCompressionType ;
6062
6163 public MetaStoreImpl (MetadataStore store , OrderedExecutor executor ) {
6264 this .store = store ;
6365 this .executor = executor ;
64- this .compressionType = CompressionType .NONE ;
66+ this .ledgerInfoCompressionType = CompressionType .NONE ;
67+ this .cursorInfoCompressionType = CompressionType .NONE ;
6568 }
6669
67- public MetaStoreImpl (MetadataStore store , OrderedExecutor executor , String compressionType ) {
70+ public MetaStoreImpl (MetadataStore store , OrderedExecutor executor , String ledgerInfoCompressionType ,
71+ String cursorInfoCompressionType ) {
6872 this .store = store ;
6973 this .executor = executor ;
70- CompressionType finalCompressionType ;
71- if (compressionType != null ) {
72- try {
73- finalCompressionType = CompressionType .valueOf (compressionType );
74- } catch (Exception e ) {
75- log .error ("Failed to get compression type {} error msg: {}." , compressionType , e .getMessage ());
76- throw e ;
77- }
78- } else {
79- finalCompressionType = CompressionType .NONE ;
74+ this .ledgerInfoCompressionType = parseCompressionType (ledgerInfoCompressionType );
75+ this .cursorInfoCompressionType = parseCompressionType (cursorInfoCompressionType );
76+ }
77+
78+ private CompressionType parseCompressionType (String value ) {
79+ if (StringUtils .isEmpty (value )) {
80+ return CompressionType .NONE ;
81+ }
82+
83+ CompressionType compressionType ;
84+ try {
85+ compressionType = CompressionType .valueOf (value );
86+ } catch (Exception e ) {
87+ log .error ("Failed to get compression type {} error msg: {}." , value , e .getMessage ());
88+ throw e ;
8089 }
81- this .compressionType = finalCompressionType ;
90+
91+ return compressionType ;
8292 }
8393
8494 @ Override
@@ -185,7 +195,7 @@ public void asyncGetCursorInfo(String ledgerName, String cursorName,
185195 .thenAcceptAsync (optRes -> {
186196 if (optRes .isPresent ()) {
187197 try {
188- ManagedCursorInfo info = ManagedCursorInfo . parseFrom (optRes .get ().getValue ());
198+ ManagedCursorInfo info = parseManagedCursorInfo (optRes .get ().getValue ());
189199 callback .operationComplete (info , optRes .get ().getStat ());
190200 } catch (InvalidProtocolBufferException e ) {
191201 callback .operationFailed (getException (e ));
@@ -208,7 +218,7 @@ public void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedC
208218 info .getCursorsLedgerId (), info .getMarkDeleteLedgerId (), info .getMarkDeleteEntryId ());
209219
210220 String path = PREFIX + ledgerName + "/" + cursorName ;
211- byte [] content = info . toByteArray (); // Binary format
221+ byte [] content = compressCursorInfo ( info );
212222
213223 long expectedVersion ;
214224
@@ -322,32 +332,97 @@ private static MetaStoreException getException(Throwable t) {
322332 }
323333 }
324334
335+ public byte [] compressLedgerInfo (ManagedLedgerInfo managedLedgerInfo ) {
336+ if (ledgerInfoCompressionType .equals (CompressionType .NONE )) {
337+ return managedLedgerInfo .toByteArray ();
338+ }
339+ MLDataFormats .ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats .ManagedLedgerInfoMetadata
340+ .newBuilder ()
341+ .setCompressionType (ledgerInfoCompressionType )
342+ .setUncompressedSize (managedLedgerInfo .getSerializedSize ())
343+ .build ();
344+ return compressManagedInfo (managedLedgerInfo .toByteArray (), mlInfoMetadata .toByteArray (),
345+ mlInfoMetadata .getSerializedSize (), ledgerInfoCompressionType );
346+ }
347+
348+ public byte [] compressCursorInfo (ManagedCursorInfo managedCursorInfo ) {
349+ if (cursorInfoCompressionType .equals (CompressionType .NONE )) {
350+ return managedCursorInfo .toByteArray ();
351+ }
352+ MLDataFormats .ManagedCursorInfoMetadata metadata = MLDataFormats .ManagedCursorInfoMetadata
353+ .newBuilder ()
354+ .setCompressionType (cursorInfoCompressionType )
355+ .setUncompressedSize (managedCursorInfo .getSerializedSize ())
356+ .build ();
357+ return compressManagedInfo (managedCursorInfo .toByteArray (), metadata .toByteArray (),
358+ metadata .getSerializedSize (), cursorInfoCompressionType );
359+ }
360+
361+ public ManagedLedgerInfo parseManagedLedgerInfo (byte [] data ) throws InvalidProtocolBufferException {
362+ ByteBuf byteBuf = Unpooled .wrappedBuffer (data );
363+
364+ byte [] metadataBytes = extractCompressMetadataBytes (byteBuf );
365+ if (metadataBytes != null ) {
366+ try {
367+ MLDataFormats .ManagedLedgerInfoMetadata metadata =
368+ MLDataFormats .ManagedLedgerInfoMetadata .parseFrom (metadataBytes );
369+ return ManagedLedgerInfo .parseFrom (getCompressionCodec (metadata .getCompressionType ())
370+ .decode (byteBuf , metadata .getUncompressedSize ()).nioBuffer ());
371+ } catch (Exception e ) {
372+ log .error ("Failed to parse managedLedgerInfo metadata, "
373+ + "fall back to parse managedLedgerInfo directly." , e );
374+ return ManagedLedgerInfo .parseFrom (data );
375+ } finally {
376+ byteBuf .release ();
377+ }
378+ } else {
379+ return ManagedLedgerInfo .parseFrom (data );
380+ }
381+ }
382+
383+ public ManagedCursorInfo parseManagedCursorInfo (byte [] data ) throws InvalidProtocolBufferException {
384+ ByteBuf byteBuf = Unpooled .wrappedBuffer (data );
385+
386+ byte [] metadataBytes = extractCompressMetadataBytes (byteBuf );
387+ if (metadataBytes != null ) {
388+ try {
389+ MLDataFormats .ManagedCursorInfoMetadata metadata =
390+ MLDataFormats .ManagedCursorInfoMetadata .parseFrom (metadataBytes );
391+ return ManagedCursorInfo .parseFrom (getCompressionCodec (metadata .getCompressionType ())
392+ .decode (byteBuf , metadata .getUncompressedSize ()).nioBuffer ());
393+ } catch (Exception e ) {
394+ log .error ("Failed to parse ManagedCursorInfo metadata, "
395+ + "fall back to parse ManagedCursorInfo directly" , e );
396+ return ManagedCursorInfo .parseFrom (data );
397+ } finally {
398+ byteBuf .release ();
399+ }
400+ } else {
401+ return ManagedCursorInfo .parseFrom (data );
402+ }
403+ }
404+
325405 /**
326- * Compress ManagedLedgerInfo data.
406+ * Compress Managed Info data such as LedgerInfo, CursorInfo .
327407 *
328408 * compression data structure
329409 * [MAGIC_NUMBER](2) + [METADATA_SIZE](4) + [METADATA_PAYLOAD] + [MANAGED_LEDGER_INFO_PAYLOAD]
330- */
331- public byte [] compressLedgerInfo (ManagedLedgerInfo managedLedgerInfo ) {
410+ */
411+ private byte [] compressManagedInfo (byte [] info , byte [] metadata , int metadataSerializedSize ,
412+ MLDataFormats .CompressionType compressionType ) {
332413 if (compressionType == null || compressionType .equals (CompressionType .NONE )) {
333- return managedLedgerInfo . toByteArray () ;
414+ return info ;
334415 }
335416 ByteBuf metadataByteBuf = null ;
336417 ByteBuf encodeByteBuf = null ;
337418 try {
338- MLDataFormats .ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats .ManagedLedgerInfoMetadata
339- .newBuilder ()
340- .setCompressionType (compressionType )
341- .setUncompressedSize (managedLedgerInfo .getSerializedSize ())
342- .build ();
343- metadataByteBuf = PulsarByteBufAllocator .DEFAULT .buffer (
344- mlInfoMetadata .getSerializedSize () + 6 , mlInfoMetadata .getSerializedSize () + 6 );
345- metadataByteBuf .writeShort (MAGIC_MANAGED_LEDGER_INFO_METADATA );
346- metadataByteBuf .writeInt (mlInfoMetadata .getSerializedSize ());
347- metadataByteBuf .writeBytes (mlInfoMetadata .toByteArray ());
348-
419+ metadataByteBuf = PulsarByteBufAllocator .DEFAULT .buffer (metadataSerializedSize + 6 ,
420+ metadataSerializedSize + 6 );
421+ metadataByteBuf .writeShort (MAGIC_MANAGED_INFO_METADATA );
422+ metadataByteBuf .writeInt (metadataSerializedSize );
423+ metadataByteBuf .writeBytes (metadata );
349424 encodeByteBuf = getCompressionCodec (compressionType )
350- .encode (Unpooled .wrappedBuffer (managedLedgerInfo . toByteArray () ));
425+ .encode (Unpooled .wrappedBuffer (info ));
351426 CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator .DEFAULT .compositeBuffer ();
352427 compositeByteBuf .addComponent (true , metadataByteBuf );
353428 compositeByteBuf .addComponent (true , encodeByteBuf );
@@ -364,42 +439,14 @@ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
364439 }
365440 }
366441
367- public ManagedLedgerInfo parseManagedLedgerInfo (byte [] data ) throws InvalidProtocolBufferException {
368- ByteBuf byteBuf = Unpooled .wrappedBuffer (data );
369- if (byteBuf .readableBytes () > 0 && byteBuf .readShort () == MAGIC_MANAGED_LEDGER_INFO_METADATA ) {
370- ByteBuf decodeByteBuf = null ;
371- try {
372- int metadataSize = byteBuf .readInt ();
373- byte [] metadataBytes = new byte [metadataSize ];
374- byteBuf .readBytes (metadataBytes );
375- MLDataFormats .ManagedLedgerInfoMetadata metadata =
376- MLDataFormats .ManagedLedgerInfoMetadata .parseFrom (metadataBytes );
377-
378- long unpressedSize = metadata .getUncompressedSize ();
379- decodeByteBuf = getCompressionCodec (metadata .getCompressionType ())
380- .decode (byteBuf , (int ) unpressedSize );
381- byte [] decodeBytes ;
382- // couldn't decode data by ZLIB compression byteBuf array() directly
383- if (decodeByteBuf .hasArray () && !CompressionType .ZLIB .equals (metadata .getCompressionType ())) {
384- decodeBytes = decodeByteBuf .array ();
385- } else {
386- decodeBytes = new byte [decodeByteBuf .readableBytes () - decodeByteBuf .readerIndex ()];
387- decodeByteBuf .readBytes (decodeBytes );
388- }
389- return ManagedLedgerInfo .parseFrom (decodeBytes );
390- } catch (Exception e ) {
391- log .error ("Failed to parse managedLedgerInfo metadata, "
392- + "fall back to parse managedLedgerInfo directly." , e );
393- return ManagedLedgerInfo .parseFrom (data );
394- } finally {
395- if (decodeByteBuf != null ) {
396- decodeByteBuf .release ();
397- }
398- byteBuf .release ();
399- }
400- } else {
401- return ManagedLedgerInfo .parseFrom (data );
442+ private byte [] extractCompressMetadataBytes (ByteBuf data ) {
443+ if (data .readableBytes () > 0 && data .readShort () == MAGIC_MANAGED_INFO_METADATA ) {
444+ int metadataSize = data .readInt ();
445+ byte [] metadataBytes = new byte [metadataSize ];
446+ data .readBytes (metadataBytes );
447+ return metadataBytes ;
402448 }
449+ return null ;
403450 }
404451
405452 private CompressionCodec getCompressionCodec (CompressionType compressionType ) {
0 commit comments