@@ -574,31 +574,34 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
574574
575575 first_file_tmp_path = tmp_path + file_name;
576576
577- WriteBufferFromFile out{first_file_tmp_path};
578- CompressedWriteBuffer compress{out};
579- NativeBlockOutputStream stream{compress, ClickHouseRevision::get (), block.cloneEmpty ()};
580-
581- // / Prepare the header.
582- // / We wrap the header into a string for compatibility with older versions:
583- // / a shard will able to read the header partly and ignore other parts based on its version.
584- WriteBufferFromOwnString header_buf;
585- writeVarUInt (ClickHouseRevision::get (), header_buf);
586- writeStringBinary (query_string, header_buf);
587- context.getSettingsRef ().serialize (header_buf);
588- context.getClientInfo ().write (header_buf, ClickHouseRevision::get ());
589-
590- // / Add new fields here, for example:
591- // / writeVarUInt(my_new_data, header_buf);
592-
593- // / Write the header.
594- const StringRef header = header_buf.stringRef ();
595- writeVarUInt (DBMS_DISTRIBUTED_SIGNATURE_HEADER, out);
596- writeStringBinary (header, out);
597- writePODBinary (CityHash_v1_0_2::CityHash128 (header.data , header.size ), out);
598-
599- stream.writePrefix ();
600- stream.write (block);
601- stream.writeSuffix ();
577+ // / Write batch to temporary location
578+ {
579+ WriteBufferFromFile out{first_file_tmp_path};
580+ CompressedWriteBuffer compress{out};
581+ NativeBlockOutputStream stream{compress, ClickHouseRevision::get (), block.cloneEmpty ()};
582+
583+ // / Prepare the header.
584+ // / We wrap the header into a string for compatibility with older versions:
585+ // / a shard will able to read the header partly and ignore other parts based on its version.
586+ WriteBufferFromOwnString header_buf;
587+ writeVarUInt (ClickHouseRevision::get (), header_buf);
588+ writeStringBinary (query_string, header_buf);
589+ context.getSettingsRef ().serialize (header_buf);
590+ context.getClientInfo ().write (header_buf, ClickHouseRevision::get ());
591+
592+ // / Add new fields here, for example:
593+ // / writeVarUInt(my_new_data, header_buf);
594+
595+ // / Write the header.
596+ const StringRef header = header_buf.stringRef ();
597+ writeVarUInt (DBMS_DISTRIBUTED_SIGNATURE_HEADER, out);
598+ writeStringBinary (header, out);
599+ writePODBinary (CityHash_v1_0_2::CityHash128 (header.data , header.size ), out);
600+
601+ stream.writePrefix ();
602+ stream.write (block);
603+ stream.writeSuffix ();
604+ }
602605
603606 // Create hardlink here to reuse increment number
604607 const std::string block_file_path (path + ' /' + file_name);
0 commit comments