Skip to content

Commit 7dd1c6b

Browse files
authored
Fix TextSource incorrect handling in channels that return short reads. (#23376)
* Fix TextSource incorrect handling in channels that return short reads. The issue is that readDefaultLine and readCustomLine was incorrectly calculating the appendLength when the buffer returned was 0 length. This was solved by ensuring that the internal read loop always read at least one byte allowing for the code to ensure that we were making progress. For readDefaultLine we kept track of whether we need to skip an LF in the next buffer if the current buffer ended with a CR and for readCustomLine we had to remember how much of the delimiter we have read so far in this buffer. The bug was introduced in 30a48f0 There was no noticeable change in the TextSourceBenchmark performance results. Fixes #23375
1 parent 4499242 commit 7dd1c6b

2 files changed

Lines changed: 144 additions & 23 deletions

File tree

sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ static class TextBasedReader extends FileBasedReader<String> {
109109
private volatile @Nullable String currentValue;
110110
private int bufferLength = 0; // the number of bytes of real data in the buffer
111111
private int bufferPosn = 0; // the current position in the buffer
112+
private boolean skipLineFeedAtStart; // skip an LF if at the start of the next buffer
112113

113114
private TextBasedReader(TextSource source, byte[] delimiter) {
114115
super(source);
@@ -249,17 +250,16 @@ private boolean readDefaultLine() throws IOException {
249250
assert !eof;
250251

251252
int newlineLength = 0; // length of terminating newline
252-
boolean prevCharCR = false; // true of prev char was CR
253+
boolean prevCharCR = false; // true if prev char was CR
253254
long bytesConsumed = 0;
255+
EOF:
254256
for (; ; ) {
255257
int startPosn = bufferPosn; // starting from where we left off the last time
256258

257-
// Read the next chunk from the file
258-
if (bufferPosn == bufferLength) {
259+
// Read the next chunk from the file, ensure that we read at least one byte
260+
// or reach EOF.
261+
while (bufferPosn == bufferLength) {
259262
startPosn = bufferPosn = 0;
260-
if (prevCharCR) {
261-
++bytesConsumed; // account for CR from previous read
262-
}
263263
byteBuffer.clear();
264264
bufferLength = inChannel.read(byteBuffer);
265265

@@ -273,10 +273,18 @@ private boolean readDefaultLine() throws IOException {
273273
}
274274

275275
currentValue = str.toString(StandardCharsets.UTF_8.name());
276-
break;
276+
break EOF;
277277
}
278278
}
279279

280+
// Consume any LF after CR if it is the first character of the next buffer
281+
if (skipLineFeedAtStart && buffer[bufferPosn] == LF) {
282+
++bytesConsumed;
283+
++startPosn;
284+
++bufferPosn;
285+
skipLineFeedAtStart = false;
286+
}
287+
280288
// Search for the newline
281289
for (; bufferPosn < bufferLength; ++bufferPosn) {
282290
if (buffer[bufferPosn] == LF) {
@@ -291,20 +299,23 @@ private boolean readDefaultLine() throws IOException {
291299
prevCharCR = (buffer[bufferPosn] == CR);
292300
}
293301

294-
int readLength = bufferPosn - startPosn;
295-
if (prevCharCR && newlineLength == 0) {
296-
--readLength; // CR at the end of the buffer
302+
// CR at the end of the buffer
303+
if (newlineLength == 0 && prevCharCR) {
304+
skipLineFeedAtStart = true;
305+
newlineLength = 1;
306+
} else {
307+
skipLineFeedAtStart = false;
297308
}
298-
bytesConsumed += readLength;
299309

310+
int readLength = bufferPosn - startPosn;
311+
bytesConsumed += readLength;
312+
int appendLength = readLength - newlineLength;
300313
if (newlineLength == 0) {
301-
// Append the prefix of the value to str until we find a newline
302-
str.write(buffer, startPosn, readLength);
314+
// Append the prefix of the value to str skipping the partial delimiter
315+
str.write(buffer, startPosn, appendLength);
303316
} else {
304-
int appendLength = readLength - newlineLength;
305-
306-
// Optimize for the common case where the string is wholly contained within the buffer
307317
if (str.size() == 0) {
318+
// Optimize for the common case where the string is wholly contained within the buffer
308319
currentValue = new String(buffer, startPosn, appendLength, StandardCharsets.UTF_8);
309320
} else {
310321
str.write(buffer, startPosn, appendLength);
@@ -313,6 +324,7 @@ private boolean readDefaultLine() throws IOException {
313324
break;
314325
}
315326
}
327+
316328
startOfNextRecord = startOfRecord + bytesConsumed;
317329
str.reset();
318330
return true;
@@ -331,11 +343,13 @@ private boolean readCustomLine() throws IOException {
331343

332344
long bytesConsumed = 0;
333345
int delPosn = 0;
346+
EOF:
334347
for (; ; ) {
335348
int startPosn = bufferPosn; // starting from where we left off the last time
336349

337-
// Read the next chunk from the file
338-
if (bufferPosn >= bufferLength) {
350+
// Read the next chunk from the file, ensure that we read at least one byte
351+
// or reach EOF.
352+
while (bufferPosn >= bufferLength) {
339353
startPosn = bufferPosn = 0;
340354
byteBuffer.clear();
341355
bufferLength = inChannel.read(byteBuffer);
@@ -355,17 +369,17 @@ private boolean readCustomLine() throws IOException {
355369
}
356370

357371
currentValue = str.toString(StandardCharsets.UTF_8.name());
358-
break; // EOF
372+
break EOF;
359373
}
360374
}
361375

376+
int prevDelPosn = delPosn;
362377
DELIMITER_MATCH:
363378
{
364379
if (delPosn > 0) {
365380
// slow-path: Handle the case where we only matched part of the delimiter, possibly
366381
// adding that to str fixing up any partially consumed delimiter if we don't match the
367382
// whole delimiter
368-
int prevDelPosn = delPosn;
369383
for (; bufferPosn < bufferLength; ++bufferPosn) {
370384
if (buffer[bufferPosn] == delimiter[delPosn]) {
371385
delPosn++;
@@ -399,7 +413,7 @@ private boolean readCustomLine() throws IOException {
399413

400414
int readLength = bufferPosn - startPosn;
401415
bytesConsumed += readLength;
402-
int appendLength = readLength - delPosn;
416+
int appendLength = readLength - (delPosn - prevDelPosn);
403417
if (delPosn < delimiter.length) {
404418
// Append the prefix of the value to str skipping the partial delimiter
405419
str.write(buffer, startPosn, appendLength);

sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,13 @@
4343
import java.io.File;
4444
import java.io.FileOutputStream;
4545
import java.io.IOException;
46+
import java.io.InputStream;
4647
import java.io.OutputStream;
4748
import java.io.PrintStream;
4849
import java.io.Writer;
50+
import java.nio.ByteBuffer;
51+
import java.nio.channels.Channels;
52+
import java.nio.channels.ReadableByteChannel;
4953
import java.nio.charset.Charset;
5054
import java.nio.file.Files;
5155
import java.nio.file.Path;
@@ -59,7 +63,9 @@
5963
import javax.annotation.Nullable;
6064
import org.apache.beam.sdk.Pipeline;
6165
import org.apache.beam.sdk.coders.StringUtf8Coder;
66+
import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
6267
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
68+
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
6369
import org.apache.beam.sdk.options.PipelineOptions;
6470
import org.apache.beam.sdk.options.PipelineOptionsFactory;
6571
import org.apache.beam.sdk.options.ValueProvider;
@@ -337,7 +343,7 @@ public void testReadWithAuto() throws Exception {
337343

338344
/** Tests for reading files with various delimiters. */
339345
@RunWith(Parameterized.class)
340-
public static class ReadWithDelimiterTest {
346+
public static class ReadWithDefaultDelimiterTest {
341347
private static final ImmutableList<String> EXPECTED = ImmutableList.of("asdf", "hjkl", "xyz");
342348
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
343349

@@ -363,10 +369,59 @@ public static Iterable<Object[]> data() {
363369
public ImmutableList<String> expected;
364370

365371
@Test
366-
public void testReadLinesWithDelimiter() throws Exception {
372+
public void testReadLinesWithDefaultDelimiter() throws Exception {
367373
runTestReadWithData(line.getBytes(UTF_8), expected);
368374
}
369375

376+
@Test
377+
public void testReadLinesWithDefaultDelimiterAndZeroAndOneLengthReturningChannel()
378+
throws Exception {
379+
Path path = tempFolder.newFile().toPath();
380+
Files.write(path, line.getBytes(UTF_8));
381+
Metadata metadata = FileSystems.matchSingleFileSpec(path.toString());
382+
FileBasedSource source =
383+
getTextSource(path.toString(), null)
384+
.createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
385+
FileBasedReader<String> reader =
386+
source.createSingleFileReader(PipelineOptionsFactory.create());
387+
ReadableByteChannel channel =
388+
FileSystems.open(
389+
FileSystems.matchSingleFileSpec(source.getFileOrPatternSpec()).resourceId());
390+
InputStream stream = Channels.newInputStream(channel);
391+
reader.startReading(
392+
// Placeholder channel that only yields 0- and 1-length buffers.
393+
// Data is read at most one byte at a time from line parameter.
394+
new ReadableByteChannel() {
395+
int readCount = 0;
396+
397+
@Override
398+
public int read(ByteBuffer dst) throws IOException {
399+
if (++readCount % 3 == 0) {
400+
if (dst.hasRemaining()) {
401+
int value = stream.read();
402+
if (value == -1) {
403+
return -1;
404+
}
405+
dst.put((byte) value);
406+
return 1;
407+
}
408+
}
409+
return 0;
410+
}
411+
412+
@Override
413+
public boolean isOpen() {
414+
return channel.isOpen();
415+
}
416+
417+
@Override
418+
public void close() throws IOException {
419+
stream.close();
420+
}
421+
});
422+
assertEquals(expected, SourceTestUtils.readFromStartedReader(reader));
423+
}
424+
370425
@Test
371426
public void testSplittingSource() throws Exception {
372427
TextSource source = prepareSource(line.getBytes(UTF_8));
@@ -421,6 +476,58 @@ public void testReadLinesWithCustomDelimiter() throws Exception {
421476
TextIOReadTest.prepareSource(tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}),
422477
PipelineOptionsFactory.create());
423478
}
479+
480+
@Test
481+
public void testReadLinesWithCustomDelimiterAndZeroAndOneLengthReturningChannel()
482+
throws Exception {
483+
byte[] delimiter = new byte[] {'|', '*'};
484+
Path path = tempFolder.newFile().toPath();
485+
Files.write(path, testCase.getBytes(UTF_8));
486+
Metadata metadata = FileSystems.matchSingleFileSpec(path.toString());
487+
FileBasedSource source =
488+
getTextSource(path.toString(), delimiter)
489+
.createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
490+
FileBasedReader<String> reader =
491+
source.createSingleFileReader(PipelineOptionsFactory.create());
492+
ReadableByteChannel channel =
493+
FileSystems.open(
494+
FileSystems.matchSingleFileSpec(source.getFileOrPatternSpec()).resourceId());
495+
InputStream stream = Channels.newInputStream(channel);
496+
reader.startReading(
497+
// Placeholder channel that only yields 0- and 1-length buffers.
498+
// Data is read at most one byte at a time from testCase parameter.
499+
new ReadableByteChannel() {
500+
int readCount = 0;
501+
502+
@Override
503+
public int read(ByteBuffer dst) throws IOException {
504+
if (++readCount % 3 == 0) {
505+
if (dst.hasRemaining()) {
506+
int value = stream.read();
507+
if (value == -1) {
508+
return -1;
509+
}
510+
dst.put((byte) value);
511+
return 1;
512+
}
513+
}
514+
return 0;
515+
}
516+
517+
@Override
518+
public boolean isOpen() {
519+
return channel.isOpen();
520+
}
521+
522+
@Override
523+
public void close() throws IOException {
524+
stream.close();
525+
}
526+
});
527+
assertEquals(
528+
SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()),
529+
SourceTestUtils.readFromStartedReader(reader));
530+
}
424531
}
425532

426533
/** Tests for some basic operations in {@link TextIO.Read}. */

0 commit comments

Comments
 (0)