Skip to content

Commit d432c7f

Browse files
authored
perf(http): speed up varchar parsing in CSV import (#5985)
1 parent e45b2aa commit d432c7f

File tree

8 files changed

+86
-8
lines changed

8 files changed

+86
-8
lines changed

core/src/main/java/io/questdb/cutlass/text/AbstractTextLexer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.questdb.std.Unsafe;
3636
import io.questdb.std.Vect;
3737
import io.questdb.std.str.DirectUtf8String;
38+
import io.questdb.std.str.Utf8s;
3839

3940
import java.io.Closeable;
4041

@@ -43,7 +44,6 @@ public abstract class AbstractTextLexer implements Closeable, Mutable {
4344
private static final long MASK_CR = SwarUtils.broadcast((byte) '\r');
4445
private static final long MASK_NEW_LINE = SwarUtils.broadcast((byte) '\n');
4546
private static final long MASK_QUOTE = SwarUtils.broadcast((byte) '"');
46-
private static final long NON_ASCII_MASK_FULL = SwarUtils.broadcast((byte) 0x80);
4747

4848
private final ObjectPool<DirectUtf8String> csPool;
4949
private final ObjList<DirectUtf8String> fields = new ObjList<>();
@@ -310,7 +310,7 @@ private void parse0(long lo, long hi) {
310310
if (zeroBytesWord == 0) {
311311
ptr += 7;
312312
this.fieldHi += 7;
313-
this.ascii &= (word & NON_ASCII_MASK_FULL) == 0;
313+
this.ascii &= Utf8s.isAscii(word);
314314
continue;
315315
} else {
316316
int firstIndex = SwarUtils.indexOfFirstMarkedByte(zeroBytesWord);
@@ -320,7 +320,7 @@ private void parse0(long lo, long hi) {
320320
// These bytes come on LOW bits of the "word". To check that these bytes are
321321
// positive, we need to isolate them. We do that by masking out the entire
322322
// word, save for the bytes we intend to keep.
323-
this.ascii &= ((word & (0xffffffffffffffffL >>> (64 - firstIndex * 8))) & NON_ASCII_MASK_FULL) == 0;
323+
this.ascii &= Utf8s.isAscii(word & (0xffffffffffffffffL >>> (64 - firstIndex * 8)));
324324
ptr += firstIndex;
325325
}
326326
this.fieldHi += firstIndex;

core/src/main/java/io/questdb/cutlass/text/CopyTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ public void run(long fileBufPtr, long fileBufSize) throws TextException {
449449
long zeroBytesWord = SwarUtils.markZeroBytes(word ^ MASK_NEW_LINE)
450450
| SwarUtils.markZeroBytes(word ^ MASK_QUOTE);
451451
if (zeroBytesWord == 0) {
452-
ptr += 7;
452+
ptr += 8;
453453
continue;
454454
} else {
455455
ptr += SwarUtils.indexOfFirstMarkedByte(zeroBytesWord);

core/src/main/java/io/questdb/cutlass/text/types/VarcharAdapter.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@
2626

2727
import io.questdb.cairo.ColumnType;
2828
import io.questdb.cairo.TableWriter;
29+
import io.questdb.std.SwarUtils;
2930
import io.questdb.std.str.DirectUtf16Sink;
3031
import io.questdb.std.str.DirectUtf8Sequence;
3132
import io.questdb.std.str.DirectUtf8Sink;
3233

3334

3435
public class VarcharAdapter extends AbstractTypeAdapter {
3536
private static final byte DOUBLE_QUOTE = '"';
37+
private static final long DOUBLE_QUOTE_WORD = SwarUtils.broadcast(DOUBLE_QUOTE);
3638
private final DirectUtf8Sink utf8Sink;
3739

3840
public VarcharAdapter(DirectUtf8Sink utf8Sink) {
@@ -65,11 +67,25 @@ public void write(TableWriter.Row row, int column, DirectUtf8Sequence value, Dir
6567
private static void deflateConsecutiveDoubleQuotes(DirectUtf8Sequence value, DirectUtf8Sink utf8Sink) {
6668
utf8Sink.clear();
6769
int quoteCount = 0;
68-
for (int i = 0; i < value.size(); i++) {
69-
byte b = value.byteAt(i);
70+
final int len = value.size();
71+
int i = 0;
72+
while (i < len) {
73+
if (i < len - 7) {
74+
final long word = value.longAt(i);
75+
final long zeroBytesWord = SwarUtils.markZeroBytes(word ^ DOUBLE_QUOTE_WORD);
76+
if (zeroBytesWord == 0) {
77+
// fast path for no double quotes in consequent 8 bytes
78+
quoteCount = 0;
79+
utf8Sink.putAny8(word);
80+
i += 8;
81+
continue;
82+
}
83+
}
84+
85+
byte b = value.byteAt(i++);
7086
if (b == DOUBLE_QUOTE) {
7187
if (quoteCount++ % 2 == 0) {
72-
utf8Sink.putAny(b);
88+
utf8Sink.putAny(DOUBLE_QUOTE);
7389
}
7490
} else {
7591
quoteCount = 0;

core/src/main/java/io/questdb/std/str/DirectUtf8Sink.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,14 @@ public DirectUtf8Sink putAny(byte b) {
117117
return this;
118118
}
119119

120+
/**
121+
* Same as {@link #putAny(byte)}, but writes 8 consequent bytes (a long).
122+
*/
123+
public void putAny8(long w) {
124+
setAscii(isAscii() & Utf8s.isAscii(w));
125+
sink.putLong(w);
126+
}
127+
120128
@Override
121129
public DirectUtf8Sink putAscii(char c) {
122130
sink.put((byte) c);

core/src/main/java/io/questdb/std/str/Utf8s.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -794,10 +794,15 @@ public static boolean isAscii(Utf8Sequence utf8) {
794794
return true;
795795
}
796796

797+
// checks 8 consequent bytes at once for non-ASCII chars, convenient for SWAR
798+
public static boolean isAscii(long w) {
799+
return (w & ASCII_MASK) == 0;
800+
}
801+
797802
public static boolean isAscii(long ptr, int size) {
798803
long i = 0;
799804
for (; i + 7 < size; i += 8) {
800-
if ((Unsafe.getUnsafe().getLong(ptr + i) & ASCII_MASK) != 0) {
805+
if (isAscii(Unsafe.getUnsafe().getLong(ptr + i))) {
801806
return false;
802807
}
803808
}

core/src/test/java/io/questdb/test/cutlass/text/ParallelCsvFileImporterTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1762,6 +1762,43 @@ public IOURing newInstance(int capacity) {
17621762
);
17631763
}
17641764

1765+
@Test
1766+
public void testImportVarcharDoubleQuotes() throws Exception {
1767+
executeWithPool(
1768+
2,
1769+
8,
1770+
(CairoEngine engine, SqlCompiler compiler, SqlExecutionContext sqlExecutionContext) -> {
1771+
execute(
1772+
compiler,
1773+
"create table x (\n" +
1774+
" str varchar,\n" +
1775+
" ts timestamp" +
1776+
") timestamp(ts) partition by DAY;", sqlExecutionContext
1777+
);
1778+
try (ParallelCsvFileImporter importer = new ParallelCsvFileImporter(engine, sqlExecutionContext.getWorkerCount())) {
1779+
importer.of("x", "test-varchar-double-quotes.csv", 1, PartitionBy.DAY, (byte) ',', "ts", "yyyy-MM-ddTHH:mm:ss.SSSUUUZ", true);
1780+
importer.process(AllowAllSecurityContext.INSTANCE);
1781+
}
1782+
1783+
refreshTablesInBaseEngine();
1784+
assertQueryNoLeakCheck(
1785+
"str\tts\n" +
1786+
"foobar\t1970-01-02T00:00:00.000000Z\n" +
1787+
"foobar foobar foobar foobar\t1970-01-02T00:00:00.000000Z\n" +
1788+
"foobar foobar \"foobar\" foobar foobar\t1970-01-02T00:00:00.000000Z\n" +
1789+
"\"foobar\" foobar foobar foobar\t1970-01-02T00:00:00.000000Z\n" +
1790+
"foobar\"\"\t1970-01-02T00:00:00.000000Z\n" +
1791+
"фубар \"фубар\" фубар\t1970-01-02T00:00:00.000000Z\n",
1792+
"x",
1793+
"ts",
1794+
true,
1795+
false,
1796+
true
1797+
);
1798+
}
1799+
);
1800+
}
1801+
17651802
@Test
17661803
public void testImportWithSkipAllAtomicityFailsWhenNonTimestampColumnCantBeParsedAtDataImportPhase() throws Exception {
17671804
executeWithPool(

core/src/test/java/io/questdb/test/std/str/Utf8sTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,11 @@ public void testIsAscii() {
397397
Assert.assertFalse(Utf8s.isAscii(sink.ptr(), sink.size()));
398398
sink.clear();
399399
sink.put("12345678ы87654321");
400+
Assert.assertTrue(Utf8s.isAscii(sink.longAt(0)));
401+
for (int i = 1; i < 10; i++) {
402+
Assert.assertFalse(Utf8s.isAscii(sink.longAt(i)));
403+
}
404+
Assert.assertTrue(Utf8s.isAscii(sink.longAt(10)));
400405
Assert.assertFalse(Utf8s.isAscii(sink));
401406
Assert.assertFalse(Utf8s.isAscii(sink.ptr(), sink.size()));
402407
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"str","ts"
2+
"foobar","1970-01-02T00:00:00.000000Z"
3+
"foobar foobar foobar foobar","1970-01-02T00:00:00.000000Z"
4+
"foobar foobar ""foobar"" foobar foobar","1970-01-02T00:00:00.000000Z"
5+
"""foobar"" foobar foobar foobar","1970-01-02T00:00:00.000000Z"
6+
foobar","1970-01-02T00:00:00.000000Z"
7+
"фубар ""фубар"" фубар","1970-01-02T00:00:00.000000Z"

0 commit comments

Comments
 (0)