-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Refactor data ingest and read_csv #1116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I've added compression through |
dask/dataframe/tests/test_csv.py
Outdated
| files2 = valmap(compress, files) | ||
| with filetexts(files2, mode='b'): | ||
| df = read_csv('2014-01-*.csv', compression=fmt) | ||
| eq(df, expected, check_dtype=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wabu can you try this implementation against your dataset and compare performance? I would actually be pretty interested in the result of profiling. I might do this in the following way.
import dask
from dask.dataframe.csv import read_csv
df = read_csv(path, compression='xz')I would do this in the IPython console and then use the %prun magic
%prun df.some_column.sum().compute(get=dask.get) # single threaded scheduler|
It looks like |
|
@martindurant this could use your review if you have time sometime next week. This is an attempt to refactor all of the location/compression/format work we prototyped in |
748079f to
985d6d2
Compare
|
I've now updated this to support |
|
Playing with a small xz file In [1]: from dask.bag.text import read_text
In [2]: %time read_text('trip_data_1_00.csv.xz').count().compute() # single core streams through file sequentialy
CPU times: user 25.7 ms, sys: 17.2 ms, total: 42.9 ms
Wall time: 2.47 s
Out[2]: 1000000
In [3]: %time read_text('trip_data_1_00.csv.xz', blocksize=1000000).count().compute() # multiple cores take random access chunks off one at a time
CPU times: user 61.9 ms, sys: 25.9 ms, total: 87.8 ms
Wall time: 4.15 s
Out[3]: 126523
In [4]: %time read_text('trip_data_1_00.csv.xz', blocksize=10000000).count().compute() # multiple cores take random access chunks off one at a time
CPU times: user 21 ms, sys: 13.1 ms, total: 34 ms
Wall time: 1.41 s
Out[4]: 180746
In [5]: !du trip_data_1_00.csv.xz
20328 trip_data_1_00.csv.xz
In [6]: !du trip_data_1_00.csv.xz -hs
20M trip_data_1_00.csv.xzPresumably this would work better for a smaller xz blocksize to dask.bag blocksize ratio |
|
Hrm, noting now that I'm getting different outputs, which is concerning. |
|
Ah, I was handling |
|
@wabu do you have a dataset I can play with or a nice way to produce a similar dataset? Also, are you confident that the python lzma module supports random access? Do you know of ways to produce xz files with small blocks using the command line? |
|
@updiversity regarding #1115 can you try the solution in this branch? import dask.bag as db
from dask.bag.text import read_text
b = read_text('s3://bucket/keys.*.txt', **s3_params)
b.take(0) |
For this I think you need the lzmaffi module (as seen in this comment: #1096 (comment)) |
|
Successfully replaced both |
|
Getting some odd errors in S3 tests like this: https://travis-ci.org/dask/dask/jobs/125713226. Some possible causes:
CCing @martindurant |
|
Yeah, something fishy is definitely going on. (Pdb) pp compute(values[0][0])
('{"amount": 100, "name": "Alice"}\n{"amount": 200, "name": "Bob"}\n{"amount": 300, "name": "Charlie"}\n{"amount": 400, "name": "Dennis"}\n',)
(Pdb) pp compute(values[1][0])
('{"amount": 500, "name": "Alice"}\n{"amount": 600, "name": "Bob"}\n{"amount": 700, "name": "Charlie"}\n{"amount": 800, "name": "Dennis"}\n',)
(Pdb) pp compute(values[0][0], values[1][0])
('{"amount": 500, "name": "Alice"}\n{"amount": 600, "name": "Bob"}\n{"amount": 700, "name": "Charlie"}\n{"amount": 800, "name": "Dennis"}\n',
'{"amount": 500, "name": "Alice"}\n{"amount": 600, "name": "Bob"}\n{"amount": 700, "name": "Charlie"}\n{"amount": 800, "name": "Dennis"}\n')(Pdb) pp values[0][0].dask
{'read_block_from_s3-dee321a7-acb2-4fbb-a142-03cf788ab0a8': (<function read_block_from_s3 at 0x7f6cfd5c6c08>,
'test/test/accounts.1.json',
0,
134217728,
(<type 'dict'>,
[]),
None,
None)}
(Pdb) pp values[1][0].dask
{'read_block_from_s3-4aca7fe2-75d1-432a-ad82-be2eed056a19': (<function read_block_from_s3 at 0x7f6cfd5c6c08>,
'test/test/accounts.2.json',
0,
134217728,
(<type 'dict'>,
[]),
None,
None)} |
|
Turns out moto isn't threadsafe. getmoto/moto#313 Switching to single thread. All is well. Removing the WIP label |
Using the |
|
|
||
| def read_text(path, blocksize=None, compression='infer', | ||
| encoding=system_encoding, errors='strict', | ||
| linedelimiter=os.linesep, collection=True, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't '\n' more likely to be the line delimiter for any non-local file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. Maybe we keep this as None and let the bytes backends use their own defaults.
|
Alright, merging. |
We refactor how data gets ingested from different sources.
Locations
Systems like local, s3, and hdfs (will be in
distributedrepo) implement functionsread_bytes,open_filesand, if available,open_text_files.There is a
bytes.core.{read_bytes,open_files,open_text_files}function that intelligently dispatches to the correct location based on protocol.dd.read_csv('s3://bucket/myfiles.*.csv')correctly dispatches tobytes.s3.read_bytes.Compression
We have a variety of compression formats filled out and rely heavily on File-like objects found in those libraries. If such a file-like object is registered then all of the
read_bytesandopen_filesfunctions support that compression automatically. If that file object is seekable then we can useblocksize=options to split data. Again, this all happens automatically. There is no need for downstream code to care about itFormats
We've hooked this in to back both
dd.read_csvanddb.read_textwith good success. They get extra locations and compression for free.cc @jcrist cc @wabu I'd love review on this before I move forward much more. @wabu in particular if you can tear holes in this I'd love to know sooner rather than later.