Skip to content

Commit 0d76091

Browse files
Merge pull request ClickHouse#10940 from azat/dist-send-partially-written-read-fix
Avoid sending partially written files by the DistributedBlockOutputStream
2 parents 19b4e68 + 2498041 commit 0d76091

File tree

1 file changed

+28
-25
lines changed

1 file changed

+28
-25
lines changed

src/Storages/Distributed/DistributedBlockOutputStream.cpp

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -574,31 +574,34 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
574574

575575
first_file_tmp_path = tmp_path + file_name;
576576

577-
WriteBufferFromFile out{first_file_tmp_path};
578-
CompressedWriteBuffer compress{out};
579-
NativeBlockOutputStream stream{compress, ClickHouseRevision::get(), block.cloneEmpty()};
580-
581-
/// Prepare the header.
582-
/// We wrap the header into a string for compatibility with older versions:
583-
/// a shard will able to read the header partly and ignore other parts based on its version.
584-
WriteBufferFromOwnString header_buf;
585-
writeVarUInt(ClickHouseRevision::get(), header_buf);
586-
writeStringBinary(query_string, header_buf);
587-
context.getSettingsRef().serialize(header_buf);
588-
context.getClientInfo().write(header_buf, ClickHouseRevision::get());
589-
590-
/// Add new fields here, for example:
591-
/// writeVarUInt(my_new_data, header_buf);
592-
593-
/// Write the header.
594-
const StringRef header = header_buf.stringRef();
595-
writeVarUInt(DBMS_DISTRIBUTED_SIGNATURE_HEADER, out);
596-
writeStringBinary(header, out);
597-
writePODBinary(CityHash_v1_0_2::CityHash128(header.data, header.size), out);
598-
599-
stream.writePrefix();
600-
stream.write(block);
601-
stream.writeSuffix();
577+
/// Write batch to temporary location
578+
{
579+
WriteBufferFromFile out{first_file_tmp_path};
580+
CompressedWriteBuffer compress{out};
581+
NativeBlockOutputStream stream{compress, ClickHouseRevision::get(), block.cloneEmpty()};
582+
583+
/// Prepare the header.
584+
/// We wrap the header into a string for compatibility with older versions:
585+
/// a shard will able to read the header partly and ignore other parts based on its version.
586+
WriteBufferFromOwnString header_buf;
587+
writeVarUInt(ClickHouseRevision::get(), header_buf);
588+
writeStringBinary(query_string, header_buf);
589+
context.getSettingsRef().serialize(header_buf);
590+
context.getClientInfo().write(header_buf, ClickHouseRevision::get());
591+
592+
/// Add new fields here, for example:
593+
/// writeVarUInt(my_new_data, header_buf);
594+
595+
/// Write the header.
596+
const StringRef header = header_buf.stringRef();
597+
writeVarUInt(DBMS_DISTRIBUTED_SIGNATURE_HEADER, out);
598+
writeStringBinary(header, out);
599+
writePODBinary(CityHash_v1_0_2::CityHash128(header.data, header.size), out);
600+
601+
stream.writePrefix();
602+
stream.write(block);
603+
stream.writeSuffix();
604+
}
602605

603606
// Create hardlink here to reuse increment number
604607
const std::string block_file_path(path + '/' + file_name);

0 commit comments

Comments
 (0)