Parallel parsing of data formats.#5372
Parallel parsing of data formats.#5372overshov wants to merge 31 commits intoClickHouse:masterfrom overshov:master
Conversation
|
|
||
| void registerChunkGetterCSV(FormatFactory & factory) | ||
| { | ||
| factory.registerChunkGetter("CSV", []( |
There was a problem hiding this comment.
Could you make these just plain function instead of anonymous lambdas? That's easier to navigate and also has less indentation.
dbms/src/Formats/FormatFactory.cpp
Outdated
| target = output_creator; | ||
| } | ||
|
|
||
| void FormatFactory::registerChunkGetter(const String & name, ChunkCreator chunk_creator) |
There was a problem hiding this comment.
So is it a getter or a creator? I'd name it something like FileSegmentationEngine.
dbms/src/Formats/FormatFactory.h
Outdated
| class FormatFactory final : public ext::singleton<FormatFactory> | ||
| { | ||
| public: | ||
| using ChunkCreator = std::function<bool( |
There was a problem hiding this comment.
This needs a comment describing the requirements for the implementation of a file segmentation function, i.e. what it should do, what the arguments mean, when and in what context it is called etc.
dbms/src/IO/ReadHelpers.cpp
Outdated
| } | ||
| } | ||
|
|
||
| bool safeInBuffer(ReadBuffer & in, DB::Memory<> & memory, char * & begin_pos, bool force) |
There was a problem hiding this comment.
What does this do, and what "force" means? Needs a comment and probably a better name.
dbms/src/IO/ReadHelpers.cpp
Outdated
| { | ||
| size_t old_size = memory.size(); | ||
| memory.resize(old_size + static_cast<size_t>(in.position() - begin_pos)); | ||
| memcpy(memory.data() + old_size, begin_pos, in.position() - begin_pos); |
There was a problem hiding this comment.
Note: maybe it's possible to use memcpySmallAllowReadWriteOverflow15 here.
DB::Memory has extra 15 bytes. As for ReadBuffer, can't be sure.
dbms/src/IO/ReadHelpers.h
Outdated
| /// Skip to next character after next unescaped \n. If no \n in stream, skip to end. Does not throw on invalid escape sequences. | ||
| void skipToUnescapedNextLineOrEOF(ReadBuffer & buf); | ||
|
|
||
| bool safeInBuffer(ReadBuffer & in, DB::Memory<> & memory, char * & begin_pos, bool force = false); |
There was a problem hiding this comment.
Wrong name:
- safe -> save
- we save data from buffer to memory, but names says opposite
- it isn't mentioned that function may return
falseand do nothing (usetryin prefix do it) - it's not understandable in which case function won't copy data
Comment is missing.
|
|
||
| void registerChunkGetterCSV(FormatFactory & factory) | ||
| { | ||
| factory.registerChunkGetter("CSV", []( |
There was a problem hiding this comment.
Code is difficult to understand. (Can't be sure it's correct.)
Maybe it's possible create a class which will support invariant for begin_pos and copy data to memory when needed?
dbms/src/Formats/FormatFactory.h
Outdated
| using Creators = std::pair<InputCreator, OutputCreator>; | ||
| struct Creators | ||
| { | ||
| InputCreator first; |
There was a problem hiding this comment.
Better to rename first and second to input_creator and output_creator.
| namespace DB | ||
| { | ||
|
|
||
| class SharedReadBuffer : public BufferWithOwnMemory<ReadBuffer> |
dbms/src/Formats/FormatFactory.cpp
Outdated
| auto buf_mutex = std::make_shared<std::mutex>(); | ||
| for (size_t i = 0; i < max_threads_to_use; ++i) | ||
| { | ||
| buffers.emplace_back(std::make_unique<SharedReadBuffer>(buf, buf_mutex, chunk_getter, settings.min_bytes_in_chunk)); |
There was a problem hiding this comment.
Need comment why SharedReadBuffer is used.
|
you're making changes in your |
It's totally fine, IMO. |
| if (eof) | ||
| return false; | ||
|
|
||
| std::lock_guard<std::mutex> lock(*mutex); |
There was a problem hiding this comment.
you need somewhere check, that mutex is not nullptr
|
I have checked stateless tests logs from sandbox. Most diffs caused by rearrangement of output lines, because of parallel reading. Except 00418_input_format_allow_errors, there is some logical error. |
This is the expected result (it is fine). BTW, we have discussed an option for order-preserving parallel parsing of formats. It is a little bit more difficult.
This is also the expected result. Local counters for parallel processing are fine. |
|
Now let's explicitly turn off the setting in those tests that depend on order or error counters. |
|
FYI https://github.com/yandex/ClickHouse/blob/96e3574/dbms/src/Interpreters/Aggregator.cpp#L1772 |
dbms/src/Core/Settings.h
Outdated
| M(SettingBool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.") \ | ||
| M(SettingBool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.") \ | ||
| \ | ||
| M(SettingBool, enable_parallel_reading, true, "Enable parallel_reading for several data formats (JSON, TSV, TKSV, Values, CSV).") \ |
There was a problem hiding this comment.
Rename to input_format_parallel_parsing. Otherwise the name is extremely stupid.
| } | ||
| } else | ||
| { | ||
| in.position() = find_first_symbols<'"','\r', '\n'>(in.position(), in.buffer().end()); |
There was a problem hiding this comment.
Inconsistent whitespaces.
| { | ||
| quotes = true; | ||
| ++in.position(); | ||
| } else if (*in.position() == '\n') |
| bool nextImpl() override | ||
| { | ||
| if (eof || !mutex) | ||
| return false; |
There was a problem hiding this comment.
maybe in the case of null mutex also write some LOG_ERROR?
What do you think @KochetovNicolai ?
| \ | ||
| M(SettingBool, input_format_parallel_parsing, true, "Enable parallel parsing for several data formats (JSON, TSV, TKSV, Values, CSV).") \ | ||
| M(SettingUInt64, max_threads_for_parallel_reading, 0, "The maximum number of threads to parallel reading. By default, it is set to max_threads.") \ | ||
| M(SettingUInt64, min_chunk_size_for_parallel_reading, 4, "The minimum chunk size, which each thread tries to parse under mutex in parallel reading.") \ |
There was a problem hiding this comment.
Just curious, what this "4" means? Looks like number of chunks, but looking at CSVRowInputStream.cpp while loop it looks like 4 bytes. Am I missing something?
|
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
|
Continued in #6553 |
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en
For changelog. Remove if this is non-significant change.
Category (leave one):
Short description (up to few sentences):
Parallel parsing of input data formats.