Skip to content

Commit cab333d

Browse files
committed
Optimize ser/de to avoid using output stream
1 parent 4f95945 commit cab333d

File tree

2 files changed

+179
-84
lines changed

2 files changed

+179
-84
lines changed

pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java

Lines changed: 80 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pinot.core.common;
2020

2121
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
22+
import com.google.common.base.Preconditions;
2223
import com.google.common.primitives.Longs;
2324
import com.tdunning.math.stats.MergingDigest;
2425
import com.tdunning.math.stats.TDigest;
@@ -45,14 +46,11 @@
4546
import it.unimi.dsi.fastutil.longs.LongSet;
4647
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
4748
import it.unimi.dsi.fastutil.objects.ObjectSet;
48-
import java.io.ByteArrayOutputStream;
49-
import java.io.DataOutputStream;
5049
import java.io.IOException;
5150
import java.math.BigDecimal;
5251
import java.nio.ByteBuffer;
5352
import java.util.ArrayList;
5453
import java.util.HashMap;
55-
import java.util.Iterator;
5654
import java.util.List;
5755
import java.util.Map;
5856
import java.util.Set;
@@ -540,39 +538,38 @@ public byte[] serialize(Map<Object, Object> map) {
540538
return new byte[Integer.BYTES];
541539
}
542540

543-
// No need to close these 2 streams
544-
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
545-
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
546-
547-
try {
548-
// Write the size of the map
549-
dataOutputStream.writeInt(size);
550-
551-
// Write the serialized key-value pairs
552-
Iterator<Map.Entry<Object, Object>> iterator = map.entrySet().iterator();
553-
// First write the key type and value type
554-
Map.Entry<Object, Object> firstEntry = iterator.next();
555-
Object firstKey = firstEntry.getKey();
556-
Object firstValue = firstEntry.getValue();
557-
int keyTypeValue = ObjectType.getObjectType(firstKey).getValue();
558-
int valueTypeValue = ObjectType.getObjectType(firstValue).getValue();
559-
dataOutputStream.writeInt(keyTypeValue);
560-
dataOutputStream.writeInt(valueTypeValue);
561-
// Then write each key-value pair
562-
for (Map.Entry<Object, Object> entry : map.entrySet()) {
563-
byte[] keyBytes = ObjectSerDeUtils.serialize(entry.getKey(), keyTypeValue);
564-
dataOutputStream.writeInt(keyBytes.length);
565-
dataOutputStream.write(keyBytes);
566-
567-
byte[] valueBytes = ObjectSerDeUtils.serialize(entry.getValue(), valueTypeValue);
568-
dataOutputStream.writeInt(valueBytes.length);
569-
dataOutputStream.write(valueBytes);
570-
}
571-
} catch (IOException e) {
572-
throw new RuntimeException("Caught exception while serializing Map", e);
541+
long bufferSize = (3 + 2 * (long) size) * Integer.BYTES;
542+
byte[][] keyBytesArray = new byte[size][];
543+
byte[][] valueBytesArray = new byte[size][];
544+
Map.Entry<Object, Object> firstEntry = map.entrySet().iterator().next();
545+
int keyTypeValue = ObjectType.getObjectType(firstEntry.getKey()).getValue();
546+
int valueTypeValue = ObjectType.getObjectType(firstEntry.getValue()).getValue();
547+
ObjectSerDe keySerDe = SER_DES[keyTypeValue];
548+
ObjectSerDe valueSerDe = SER_DES[valueTypeValue];
549+
int index = 0;
550+
for (Map.Entry<Object, Object> entry : map.entrySet()) {
551+
byte[] keyBytes = keySerDe.serialize(entry.getKey());
552+
bufferSize += keyBytes.length;
553+
keyBytesArray[index] = keyBytes;
554+
byte[] valueBytes = valueSerDe.serialize(entry.getValue());
555+
bufferSize += valueBytes.length;
556+
valueBytesArray[index++] = valueBytes;
573557
}
574-
575-
return byteArrayOutputStream.toByteArray();
558+
Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Buffer size exceeds 2GB");
559+
byte[] bytes = new byte[(int) bufferSize];
560+
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
561+
byteBuffer.putInt(size);
562+
byteBuffer.putInt(keyTypeValue);
563+
byteBuffer.putInt(valueTypeValue);
564+
for (int i = 0; i < index; i++) {
565+
byte[] keyBytes = keyBytesArray[i];
566+
byteBuffer.putInt(keyBytes.length);
567+
byteBuffer.put(keyBytes);
568+
byte[] valueBytes = valueBytesArray[i];
569+
byteBuffer.putInt(valueBytes.length);
570+
byteBuffer.put(valueBytes);
571+
}
572+
return bytes;
576573
}
577574

578575
@Override
@@ -736,20 +733,23 @@ public DoubleOpenHashSet deserialize(ByteBuffer byteBuffer) {
736733
@Override
737734
public byte[] serialize(Set<String> stringSet) {
738735
int size = stringSet.size();
739-
// NOTE: No need to close the ByteArrayOutputStream.
740-
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
741-
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
742-
try {
743-
dataOutputStream.writeInt(size);
744-
for (String value : stringSet) {
745-
byte[] bytes = value.getBytes(UTF_8);
746-
dataOutputStream.writeInt(bytes.length);
747-
dataOutputStream.write(bytes);
748-
}
749-
} catch (IOException e) {
750-
throw new RuntimeException("Caught exception while serializing Set<String>", e);
736+
long bufferSize = (1 + (long) size) * Integer.BYTES;
737+
byte[][] valueBytesArray = new byte[size][];
738+
int index = 0;
739+
for (String value : stringSet) {
740+
byte[] valueBytes = value.getBytes(UTF_8);
741+
bufferSize += valueBytes.length;
742+
valueBytesArray[index++] = valueBytes;
751743
}
752-
return byteArrayOutputStream.toByteArray();
744+
Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Buffer size exceeds 2GB");
745+
byte[] bytes = new byte[(int) bufferSize];
746+
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
747+
byteBuffer.putInt(size);
748+
for (byte[] valueBytes : valueBytesArray) {
749+
byteBuffer.putInt(valueBytes.length);
750+
byteBuffer.put(valueBytes);
751+
}
752+
return bytes;
753753
}
754754

755755
@Override
@@ -776,20 +776,20 @@ public ObjectOpenHashSet<String> deserialize(ByteBuffer byteBuffer) {
776776
@Override
777777
public byte[] serialize(Set<ByteArray> bytesSet) {
778778
int size = bytesSet.size();
779-
// NOTE: No need to close the ByteArrayOutputStream.
780-
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
781-
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
782-
try {
783-
dataOutputStream.writeInt(size);
784-
for (ByteArray value : bytesSet) {
785-
byte[] bytes = value.getBytes();
786-
dataOutputStream.writeInt(bytes.length);
787-
dataOutputStream.write(bytes);
788-
}
789-
} catch (IOException e) {
790-
throw new RuntimeException("Caught exception while serializing Set<ByteArray>", e);
779+
long bufferSize = (1 + (long) size) * Integer.BYTES;
780+
for (ByteArray value : bytesSet) {
781+
bufferSize += value.length();
782+
}
783+
Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Buffer size exceeds 2GB");
784+
byte[] bytes = new byte[(int) bufferSize];
785+
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
786+
byteBuffer.putInt(size);
787+
for (ByteArray value : bytesSet) {
788+
byte[] valueBytes = value.getBytes();
789+
byteBuffer.putInt(valueBytes.length);
790+
byteBuffer.put(valueBytes);
791791
}
792-
return byteArrayOutputStream.toByteArray();
792+
return bytes;
793793
}
794794

795795
@Override
@@ -941,30 +941,26 @@ public byte[] serialize(List<Object> list) {
941941
return new byte[Integer.BYTES];
942942
}
943943

944-
// No need to close these 2 streams (close() is no-op)
945-
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
946-
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
947-
948-
try {
949-
// Write the size of the list
950-
dataOutputStream.writeInt(size);
951-
952-
// Write the value type
953-
Object firstValue = list.get(0);
954-
int valueType = ObjectType.getObjectType(firstValue).getValue();
955-
dataOutputStream.writeInt(valueType);
956-
957-
// Write the serialized values
958-
for (Object value : list) {
959-
byte[] bytes = ObjectSerDeUtils.serialize(value, valueType);
960-
dataOutputStream.writeInt(bytes.length);
961-
dataOutputStream.write(bytes);
962-
}
963-
} catch (IOException e) {
964-
throw new RuntimeException("Caught exception while serializing List", e);
944+
long bufferSize = (2 + (long) size) * Integer.BYTES;
945+
byte[][] valueBytesArray = new byte[size][];
946+
int valueType = ObjectType.getObjectType(list.get(0)).getValue();
947+
ObjectSerDe serDe = SER_DES[valueType];
948+
int index = 0;
949+
for (Object value : list) {
950+
byte[] valueBytes = serDe.serialize(value);
951+
bufferSize += valueBytes.length;
952+
valueBytesArray[index++] = valueBytes;
965953
}
966-
967-
return byteArrayOutputStream.toByteArray();
954+
Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Buffer size exceeds 2GB");
955+
byte[] bytes = new byte[(int) bufferSize];
956+
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
957+
byteBuffer.putInt(size);
958+
byteBuffer.putInt(valueType);
959+
for (byte[] valueBytes : valueBytesArray) {
960+
byteBuffer.putInt(valueBytes.length);
961+
byteBuffer.put(valueBytes);
962+
}
963+
return bytes;
968964
}
969965

970966
@Override
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.perf;
20+
21+
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
22+
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Set;
28+
import java.util.concurrent.TimeUnit;
29+
import org.apache.commons.lang3.RandomStringUtils;
30+
import org.apache.pinot.common.utils.HashUtil;
31+
import org.apache.pinot.core.common.ObjectSerDeUtils;
32+
import org.apache.pinot.spi.utils.ByteArray;
33+
import org.openjdk.jmh.annotations.Benchmark;
34+
import org.openjdk.jmh.annotations.BenchmarkMode;
35+
import org.openjdk.jmh.annotations.Fork;
36+
import org.openjdk.jmh.annotations.Measurement;
37+
import org.openjdk.jmh.annotations.Mode;
38+
import org.openjdk.jmh.annotations.OutputTimeUnit;
39+
import org.openjdk.jmh.annotations.Scope;
40+
import org.openjdk.jmh.annotations.Setup;
41+
import org.openjdk.jmh.annotations.State;
42+
import org.openjdk.jmh.annotations.Warmup;
43+
import org.openjdk.jmh.runner.Runner;
44+
import org.openjdk.jmh.runner.options.OptionsBuilder;
45+
46+
import static java.nio.charset.StandardCharsets.UTF_8;
47+
48+
49+
@BenchmarkMode(Mode.AverageTime)
50+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
51+
@Fork(1)
52+
@Warmup(iterations = 3, time = 10)
53+
@Measurement(iterations = 5, time = 10)
54+
@State(Scope.Benchmark)
55+
public class BenchmarkObjectSerDe {
56+
private static final int NUM_VALUES = 5_000_000;
57+
58+
List<String> _stringList = new ArrayList<>(NUM_VALUES);
59+
Set<String> _stringSet = new ObjectOpenHashSet<>(NUM_VALUES);
60+
Map<String, String> _stringToStringMap = new HashMap<>(HashUtil.getHashMapCapacity(NUM_VALUES));
61+
Set<ByteArray> _bytesSet = new ObjectOpenHashSet<>(NUM_VALUES);
62+
63+
@Setup
64+
public void setUp()
65+
throws IOException {
66+
for (int i = 0; i < NUM_VALUES; i++) {
67+
String stringValue = RandomStringUtils.randomAlphanumeric(10, 201);
68+
_stringList.add(stringValue);
69+
_stringSet.add(stringValue);
70+
_stringToStringMap.put(stringValue, stringValue);
71+
_bytesSet.add(new ByteArray(stringValue.getBytes(UTF_8)));
72+
}
73+
}
74+
75+
@Benchmark
76+
public int stringList() {
77+
return ObjectSerDeUtils.serialize(_stringList).length;
78+
}
79+
80+
@Benchmark
81+
public int stringSet() {
82+
return ObjectSerDeUtils.serialize(_stringSet).length;
83+
}
84+
85+
@Benchmark
86+
public int stringToStringMap() {
87+
return ObjectSerDeUtils.serialize(_stringToStringMap).length;
88+
}
89+
90+
@Benchmark
91+
public int bytesSet() {
92+
return ObjectSerDeUtils.serialize(_bytesSet).length;
93+
}
94+
95+
public static void main(String[] args)
96+
throws Exception {
97+
new Runner(new OptionsBuilder().include(BenchmarkObjectSerDe.class.getSimpleName()).build()).run();
98+
}
99+
}

0 commit comments

Comments
 (0)