Skip to content

Parallel parsing of data formats.#5372

Closed
overshov wants to merge 31 commits intoClickHouse:masterfrom
overshov:master
Closed

Parallel parsing of data formats.#5372
overshov wants to merge 31 commits intoClickHouse:masterfrom
overshov:master

Conversation

@overshov
Copy link
Copy Markdown

@overshov overshov commented May 21, 2019

I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en

For changelog. Remove if this is non-significant change.

Category (leave one):

  • New Feature
  • Performance Improvement

Short description (up to few sentences):
Parallel parsing of input data formats.

@alexey-milovidov alexey-milovidov changed the title MVP for hse project Parallel parsing of data formats. May 21, 2019
@alexey-milovidov alexey-milovidov added can be tested pr-feature Pull request with new product feature labels May 21, 2019

void registerChunkGetterCSV(FormatFactory & factory)
{
factory.registerChunkGetter("CSV", [](
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make these just plain function instead of anonymous lambdas? That's easier to navigate and also has less indentation.

target = output_creator;
}

void FormatFactory::registerChunkGetter(const String & name, ChunkCreator chunk_creator)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is it a getter or a creator? I'd name it something like FileSegmentationEngine.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed

class FormatFactory final : public ext::singleton<FormatFactory>
{
public:
using ChunkCreator = std::function<bool(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a comment describing the requirements for the implementation of a file segmentation function, i.e. what it should do, what the arguments mean, when and in what context it is called etc.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment

}
}

bool safeInBuffer(ReadBuffer & in, DB::Memory<> & memory, char * & begin_pos, bool force)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do, and what "force" means? Needs a comment and probably a better name.

{
size_t old_size = memory.size();
memory.resize(old_size + static_cast<size_t>(in.position() - begin_pos));
memcpy(memory.data() + old_size, begin_pos, in.position() - begin_pos);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: maybe it's possible to use memcpySmallAllowReadWriteOverflow15 here.
DB::Memory has extra 15 bytes. As for ReadBuffer, can't be sure.

/// Skip to next character after next unescaped \n. If no \n in stream, skip to end. Does not throw on invalid escape sequences.
void skipToUnescapedNextLineOrEOF(ReadBuffer & buf);

bool safeInBuffer(ReadBuffer & in, DB::Memory<> & memory, char * & begin_pos, bool force = false);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong name:

  • safe -> save
  • we save data from buffer to memory, but names says opposite
  • it isn't mentioned that function may return false and do nothing (use try in prefix do it)
  • it's not understandable in which case function won't copy data

Comment is missing.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it better


void registerChunkGetterCSV(FormatFactory & factory)
{
factory.registerChunkGetter("CSV", [](
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code is difficult to understand. (Can't be sure it's correct.)
Maybe it's possible create a class which will support invariant for begin_pos and copy data to memory when needed?

using Creators = std::pair<InputCreator, OutputCreator>;
struct Creators
{
InputCreator first;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to rename first and second to input_creator and output_creator.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

namespace DB
{

class SharedReadBuffer : public BufferWithOwnMemory<ReadBuffer>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing comment.

auto buf_mutex = std::make_shared<std::mutex>();
for (size_t i = 0; i < max_threads_to_use; ++i)
{
buffers.emplace_back(std::make_unique<SharedReadBuffer>(buf, buf_mutex, chunk_getter, settings.min_bytes_in_chunk));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need comment why SharedReadBuffer is used.

@ivan-v-kush
Copy link
Copy Markdown
Contributor

you're making changes in your master
More convenient, if you'd create feature branch and work in it and merge it.
http://nvie.com/posts/a-successful-git-branching-model/

@alexey-milovidov
Copy link
Copy Markdown
Member

you're making changes in your master

It's totally fine, IMO.

if (eof)
return false;

std::lock_guard<std::mutex> lock(*mutex);
Copy link
Copy Markdown
Contributor

@ivan-v-kush ivan-v-kush May 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need somewhere check, that mutex is not nullptr

@overshov
Copy link
Copy Markdown
Author

overshov commented May 24, 2019

I have checked stateless tests logs from sandbox. Most diffs caused by rearrangement of output lines, because of parallel reading. Except 00418_input_format_allow_errors, there is some logical error.
Yes, each ReadBuffer has unique errors counter and parameter input_format_allow_errors_num does not work.

@alexey-milovidov
Copy link
Copy Markdown
Member

I have checked stateless tests logs from sandbox. Most diffs caused by rearrangement of output lines, because of parallel reading.

This is the expected result (it is fine).

BTW, we have discussed an option for order-preserving parallel parsing of formats. It is a little bit more difficult.

Yes, each ReadBuffer has unique errors counter and parameter input_format_allow_errors_num does not work.

This is also the expected result. Local counters for parallel processing are fine.

@alexey-milovidov
Copy link
Copy Markdown
Member

Now let's explicitly turn off the setting in those tests that depend on order or error counters.

@amosbird
Copy link
Copy Markdown
Collaborator

M(SettingBool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.") \
M(SettingBool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.") \
\
M(SettingBool, enable_parallel_reading, true, "Enable parallel_reading for several data formats (JSON, TSV, TKSV, Values, CSV).") \
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to input_format_parallel_parsing. Otherwise the name is extremely stupid.

}
} else
{
in.position() = find_first_symbols<'"','\r', '\n'>(in.position(), in.buffer().end());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent whitespaces.

{
quotes = true;
++in.position();
} else if (*in.position() == '\n')
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style.

bool nextImpl() override
{
if (eof || !mutex)
return false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe in the case of null mutex also write some LOG_ERROR?
What do you think @KochetovNicolai ?

\
M(SettingBool, input_format_parallel_parsing, true, "Enable parallel parsing for several data formats (JSON, TSV, TKSV, Values, CSV).") \
M(SettingUInt64, max_threads_for_parallel_reading, 0, "The maximum number of threads to parallel reading. By default, it is set to max_threads.") \
M(SettingUInt64, min_chunk_size_for_parallel_reading, 4, "The minimum chunk size, which each thread tries to parse under mutex in parallel reading.") \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, what this "4" means? Looks like number of chunks, but looking at CSVRowInputStream.cpp while loop it looks like 4 bytes. Am I missing something?

@stale
Copy link
Copy Markdown

stale bot commented Oct 20, 2019

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the not planned Known issue, no plans to fix it currenlty label Oct 20, 2019
@blinkov blinkov removed the not planned Known issue, no plans to fix it currenlty label Oct 20, 2019
@tavplubix
Copy link
Copy Markdown
Member

Continued in #6553

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-feature Pull request with new product feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants