1919package org .apache .pinot .core .common ;
2020
2121import com .clearspring .analytics .stream .cardinality .HyperLogLog ;
22+ import com .google .common .base .Preconditions ;
2223import com .google .common .primitives .Longs ;
2324import com .tdunning .math .stats .MergingDigest ;
2425import com .tdunning .math .stats .TDigest ;
4546import it .unimi .dsi .fastutil .longs .LongSet ;
4647import it .unimi .dsi .fastutil .objects .ObjectOpenHashSet ;
4748import it .unimi .dsi .fastutil .objects .ObjectSet ;
48- import java .io .ByteArrayOutputStream ;
49- import java .io .DataOutputStream ;
5049import java .io .IOException ;
5150import java .math .BigDecimal ;
5251import java .nio .ByteBuffer ;
5352import java .util .ArrayList ;
5453import java .util .HashMap ;
55- import java .util .Iterator ;
5654import java .util .List ;
5755import java .util .Map ;
5856import java .util .Set ;
@@ -540,39 +538,38 @@ public byte[] serialize(Map<Object, Object> map) {
540538 return new byte [Integer .BYTES ];
541539 }
542540
543- // No need to close these 2 streams
544- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream ();
545- DataOutputStream dataOutputStream = new DataOutputStream (byteArrayOutputStream );
546-
547- try {
548- // Write the size of the map
549- dataOutputStream .writeInt (size );
550-
551- // Write the serialized key-value pairs
552- Iterator <Map .Entry <Object , Object >> iterator = map .entrySet ().iterator ();
553- // First write the key type and value type
554- Map .Entry <Object , Object > firstEntry = iterator .next ();
555- Object firstKey = firstEntry .getKey ();
556- Object firstValue = firstEntry .getValue ();
557- int keyTypeValue = ObjectType .getObjectType (firstKey ).getValue ();
558- int valueTypeValue = ObjectType .getObjectType (firstValue ).getValue ();
559- dataOutputStream .writeInt (keyTypeValue );
560- dataOutputStream .writeInt (valueTypeValue );
561- // Then write each key-value pair
562- for (Map .Entry <Object , Object > entry : map .entrySet ()) {
563- byte [] keyBytes = ObjectSerDeUtils .serialize (entry .getKey (), keyTypeValue );
564- dataOutputStream .writeInt (keyBytes .length );
565- dataOutputStream .write (keyBytes );
566-
567- byte [] valueBytes = ObjectSerDeUtils .serialize (entry .getValue (), valueTypeValue );
568- dataOutputStream .writeInt (valueBytes .length );
569- dataOutputStream .write (valueBytes );
570- }
571- } catch (IOException e ) {
572- throw new RuntimeException ("Caught exception while serializing Map" , e );
541+ long bufferSize = (3 + 2 * (long ) size ) * Integer .BYTES ;
542+ byte [][] keyBytesArray = new byte [size ][];
543+ byte [][] valueBytesArray = new byte [size ][];
544+ Map .Entry <Object , Object > firstEntry = map .entrySet ().iterator ().next ();
545+ int keyTypeValue = ObjectType .getObjectType (firstEntry .getKey ()).getValue ();
546+ int valueTypeValue = ObjectType .getObjectType (firstEntry .getValue ()).getValue ();
547+ ObjectSerDe keySerDe = SER_DES [keyTypeValue ];
548+ ObjectSerDe valueSerDe = SER_DES [valueTypeValue ];
549+ int index = 0 ;
550+ for (Map .Entry <Object , Object > entry : map .entrySet ()) {
551+ byte [] keyBytes = keySerDe .serialize (entry .getKey ());
552+ bufferSize += keyBytes .length ;
553+ keyBytesArray [index ] = keyBytes ;
554+ byte [] valueBytes = valueSerDe .serialize (entry .getValue ());
555+ bufferSize += valueBytes .length ;
556+ valueBytesArray [index ++] = valueBytes ;
573557 }
574-
575- return byteArrayOutputStream .toByteArray ();
558+ Preconditions .checkState (bufferSize <= Integer .MAX_VALUE , "Buffer size exceeds 2GB" );
559+ byte [] bytes = new byte [(int ) bufferSize ];
560+ ByteBuffer byteBuffer = ByteBuffer .wrap (bytes );
561+ byteBuffer .putInt (size );
562+ byteBuffer .putInt (keyTypeValue );
563+ byteBuffer .putInt (valueTypeValue );
564+ for (int i = 0 ; i < index ; i ++) {
565+ byte [] keyBytes = keyBytesArray [i ];
566+ byteBuffer .putInt (keyBytes .length );
567+ byteBuffer .put (keyBytes );
568+ byte [] valueBytes = valueBytesArray [i ];
569+ byteBuffer .putInt (valueBytes .length );
570+ byteBuffer .put (valueBytes );
571+ }
572+ return bytes ;
576573 }
577574
578575 @ Override
@@ -736,20 +733,23 @@ public DoubleOpenHashSet deserialize(ByteBuffer byteBuffer) {
736733 @ Override
737734 public byte [] serialize (Set <String > stringSet ) {
738735 int size = stringSet .size ();
739- // NOTE: No need to close the ByteArrayOutputStream.
740- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream ();
741- DataOutputStream dataOutputStream = new DataOutputStream (byteArrayOutputStream );
742- try {
743- dataOutputStream .writeInt (size );
744- for (String value : stringSet ) {
745- byte [] bytes = value .getBytes (UTF_8 );
746- dataOutputStream .writeInt (bytes .length );
747- dataOutputStream .write (bytes );
748- }
749- } catch (IOException e ) {
750- throw new RuntimeException ("Caught exception while serializing Set<String>" , e );
736+ long bufferSize = (1 + (long ) size ) * Integer .BYTES ;
737+ byte [][] valueBytesArray = new byte [size ][];
738+ int index = 0 ;
739+ for (String value : stringSet ) {
740+ byte [] valueBytes = value .getBytes (UTF_8 );
741+ bufferSize += valueBytes .length ;
742+ valueBytesArray [index ++] = valueBytes ;
751743 }
752- return byteArrayOutputStream .toByteArray ();
744+ Preconditions .checkState (bufferSize <= Integer .MAX_VALUE , "Buffer size exceeds 2GB" );
745+ byte [] bytes = new byte [(int ) bufferSize ];
746+ ByteBuffer byteBuffer = ByteBuffer .wrap (bytes );
747+ byteBuffer .putInt (size );
748+ for (byte [] valueBytes : valueBytesArray ) {
749+ byteBuffer .putInt (valueBytes .length );
750+ byteBuffer .put (valueBytes );
751+ }
752+ return bytes ;
753753 }
754754
755755 @ Override
@@ -776,20 +776,20 @@ public ObjectOpenHashSet<String> deserialize(ByteBuffer byteBuffer) {
776776 @ Override
777777 public byte [] serialize (Set <ByteArray > bytesSet ) {
778778 int size = bytesSet .size ();
779- // NOTE: No need to close the ByteArrayOutputStream.
780- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream ();
781- DataOutputStream dataOutputStream = new DataOutputStream ( byteArrayOutputStream );
782- try {
783- dataOutputStream . writeInt ( size );
784- for ( ByteArray value : bytesSet ) {
785- byte [] bytes = value . getBytes ( );
786- dataOutputStream . writeInt ( bytes . length );
787- dataOutputStream . write ( bytes );
788- }
789- } catch ( IOException e ) {
790- throw new RuntimeException ( "Caught exception while serializing Set<ByteArray>" , e );
779+ long bufferSize = ( 1 + ( long ) size ) * Integer . BYTES ;
780+ for ( ByteArray value : bytesSet ) {
781+ bufferSize += value . length ( );
782+ }
783+ Preconditions . checkState ( bufferSize <= Integer . MAX_VALUE , "Buffer size exceeds 2GB" );
784+ byte [] bytes = new byte [( int ) bufferSize ];
785+ ByteBuffer byteBuffer = ByteBuffer . wrap ( bytes );
786+ byteBuffer . putInt ( size );
787+ for ( ByteArray value : bytesSet ) {
788+ byte [] valueBytes = value . getBytes ();
789+ byteBuffer . putInt ( valueBytes . length );
790+ byteBuffer . put ( valueBytes );
791791 }
792- return byteArrayOutputStream . toByteArray () ;
792+ return bytes ;
793793 }
794794
795795 @ Override
@@ -941,30 +941,26 @@ public byte[] serialize(List<Object> list) {
941941 return new byte [Integer .BYTES ];
942942 }
943943
944- // No need to close these 2 streams (close() is no-op)
945- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream ();
946- DataOutputStream dataOutputStream = new DataOutputStream (byteArrayOutputStream );
947-
948- try {
949- // Write the size of the list
950- dataOutputStream .writeInt (size );
951-
952- // Write the value type
953- Object firstValue = list .get (0 );
954- int valueType = ObjectType .getObjectType (firstValue ).getValue ();
955- dataOutputStream .writeInt (valueType );
956-
957- // Write the serialized values
958- for (Object value : list ) {
959- byte [] bytes = ObjectSerDeUtils .serialize (value , valueType );
960- dataOutputStream .writeInt (bytes .length );
961- dataOutputStream .write (bytes );
962- }
963- } catch (IOException e ) {
964- throw new RuntimeException ("Caught exception while serializing List" , e );
944+ long bufferSize = (2 + (long ) size ) * Integer .BYTES ;
945+ byte [][] valueBytesArray = new byte [size ][];
946+ int valueType = ObjectType .getObjectType (list .get (0 )).getValue ();
947+ ObjectSerDe serDe = SER_DES [valueType ];
948+ int index = 0 ;
949+ for (Object value : list ) {
950+ byte [] valueBytes = serDe .serialize (value );
951+ bufferSize += valueBytes .length ;
952+ valueBytesArray [index ++] = valueBytes ;
965953 }
966-
967- return byteArrayOutputStream .toByteArray ();
954+ Preconditions .checkState (bufferSize <= Integer .MAX_VALUE , "Buffer size exceeds 2GB" );
955+ byte [] bytes = new byte [(int ) bufferSize ];
956+ ByteBuffer byteBuffer = ByteBuffer .wrap (bytes );
957+ byteBuffer .putInt (size );
958+ byteBuffer .putInt (valueType );
959+ for (byte [] valueBytes : valueBytesArray ) {
960+ byteBuffer .putInt (valueBytes .length );
961+ byteBuffer .put (valueBytes );
962+ }
963+ return bytes ;
968964 }
969965
970966 @ Override
0 commit comments