Skip to content

Conversation

@mrocklin
Copy link
Member

@mrocklin mrocklin commented Apr 23, 2016

We refactor how data gets ingested from different sources.

Locations

Systems like local, s3, and hdfs (will be in distributed repo) implement functions read_bytes, open_files and, if available, open_text_files.

>>> sample, values = read_bytes('mydata.*.csv', delimiter=b'\n', blocksize=1000)
>>> sample
b'name,amount,id\nAlice,100,1\n...'
>>> values
[[Delayed('mydata.1.csv-0-1000'), Delayed('mydata-1.csv-1000-2000')],
 [Delayed('mydata.1.csv-0-1000')],
 ...
]

>>> files = open_files('mydata.*.csv')
>>> files = compute(*files)
>>> next(files[0])
b'name,amount,id\nAlice,100,1\n...'

>>> files = open_text_files('mydata.*.csv', encoding='utf-8')
>>> files = compute(*files)
>>> next(files[0])
'name,amount,id\nAlice,100,1\n...'

There is a bytes.core.{read_bytes,open_files,open_text_files} function that intelligently dispatches to the correct location based on protocol. dd.read_csv('s3://bucket/myfiles.*.csv') correctly dispatches to bytes.s3.read_bytes.

Compression

We have a variety of compression formats filled out and rely heavily on File-like objects found in those libraries. If such a file-like object is registered then all of the read_bytes and open_files functions support that compression automatically. If that file object is seekable then we can use blocksize= options to split data. Again, this all happens automatically. There is no need for downstream code to care about it

Formats

We've hooked this in to back both dd.read_csv and db.read_text with good success. They get extra locations and compression for free.

cc @jcrist cc @wabu I'd love review on this before I move forward much more. @wabu in particular if you can tear holes in this I'd love to know sooner rather than later.

@mrocklin
Copy link
Member Author

I've added compression through File objects

files2 = valmap(compress, files)
with filetexts(files2, mode='b'):
df = read_csv('2014-01-*.csv', compression=fmt)
eq(df, expected, check_dtype=False)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wabu can you try this implementation against your dataset and compare performance? I would actually be pretty interested in the result of profiling. I might do this in the following way.

import dask
from dask.dataframe.csv import read_csv

df = read_csv(path, compression='xz')

I would do this in the IPython console and then use the %prun magic

%prun df.some_column.sum().compute(get=dask.get)  # single threaded scheduler

@mrocklin
Copy link
Member Author

It looks like backports.lzma.LZMAFile doesn't support seek.

@mrocklin
Copy link
Member Author

@martindurant this could use your review if you have time sometime next week.

This is an attempt to refactor all of the location/compression/format work we prototyped in distributed.

@mrocklin mrocklin force-pushed the bytes branch 3 times, most recently from 748079f to 985d6d2 Compare April 24, 2016 17:25
@mrocklin
Copy link
Member Author

I've now updated this to support bag.read_text as well as dataframe.read_csv.

@mrocklin
Copy link
Member Author

mrocklin commented Apr 25, 2016

Playing with a small xz file

In [1]: from dask.bag.text import read_text

In [2]: %time read_text('trip_data_1_00.csv.xz').count().compute()  # single core streams through file sequentialy
CPU times: user 25.7 ms, sys: 17.2 ms, total: 42.9 ms
Wall time: 2.47 s
Out[2]: 1000000

In [3]: %time read_text('trip_data_1_00.csv.xz', blocksize=1000000).count().compute()  # multiple cores take random access chunks off one at a time
CPU times: user 61.9 ms, sys: 25.9 ms, total: 87.8 ms
Wall time: 4.15 s
Out[3]: 126523

In [4]: %time read_text('trip_data_1_00.csv.xz', blocksize=10000000).count().compute()  # multiple cores take random access chunks off one at a time
CPU times: user 21 ms, sys: 13.1 ms, total: 34 ms
Wall time: 1.41 s
Out[4]: 180746

In [5]: !du trip_data_1_00.csv.xz
20328   trip_data_1_00.csv.xz

In [6]: !du trip_data_1_00.csv.xz -hs
20M trip_data_1_00.csv.xz

Presumably this would work better for a smaller xz blocksize to dask.bag blocksize ratio

@mrocklin
Copy link
Member Author

Hrm, noting now that I'm getting different outputs, which is concerning.

@mrocklin
Copy link
Member Author

Ah, I was handling getsize incorrectly. Things are accurate now, but profiling shows that we're much slower on xz files when using chunks than we should be if the random access guarantee is accurate.

@mrocklin
Copy link
Member Author

@wabu do you have a dataset I can play with or a nice way to produce a similar dataset?

Also, are you confident that the python lzma module supports random access? Do you know of ways to produce xz files with small blocks using the command line?

@mrocklin
Copy link
Member Author

@updiversity regarding #1115 can you try the solution in this branch?

$ pip install git+https://github.com/mrocklin/dask.bit@bytes --upgrade
import dask.bag as db
from dask.bag.text import read_text
b = read_text('s3://bucket/keys.*.txt', **s3_params)
b.take(0)

@jcrist
Copy link
Member

jcrist commented Apr 25, 2016

Also, are you confident that the python lzma module supports random access?

For this I think you need the lzmaffi module (as seen in this comment: #1096 (comment))

@mrocklin
Copy link
Member Author

Successfully replaced both db.from_filenames and dd.read_csv

@mrocklin
Copy link
Member Author

Getting some odd errors in S3 tests like this: https://travis-ci.org/dask/dask/jobs/125713226.

Some possible causes:

  1. Too many open network files (lots of warnings) I wonder if there is anything we can do in s3fs to manage these better and retry/reopen connections if closed
  2. Moto not behaving correctly?

CCing @martindurant

@mrocklin
Copy link
Member Author

Yeah, something fishy is definitely going on.

(Pdb) pp compute(values[0][0])
('{"amount": 100, "name": "Alice"}\n{"amount": 200, "name": "Bob"}\n{"amount": 300, "name": "Charlie"}\n{"amount": 400, "name": "Dennis"}\n',)
(Pdb) pp compute(values[1][0])
('{"amount": 500, "name": "Alice"}\n{"amount": 600, "name": "Bob"}\n{"amount": 700, "name": "Charlie"}\n{"amount": 800, "name": "Dennis"}\n',)
(Pdb) pp compute(values[0][0], values[1][0])
('{"amount": 500, "name": "Alice"}\n{"amount": 600, "name": "Bob"}\n{"amount": 700, "name": "Charlie"}\n{"amount": 800, "name": "Dennis"}\n',
 '{"amount": 500, "name": "Alice"}\n{"amount": 600, "name": "Bob"}\n{"amount": 700, "name": "Charlie"}\n{"amount": 800, "name": "Dennis"}\n')
(Pdb) pp values[0][0].dask
{'read_block_from_s3-dee321a7-acb2-4fbb-a142-03cf788ab0a8': (<function read_block_from_s3 at 0x7f6cfd5c6c08>,
                                                             'test/test/accounts.1.json',
                                                             0,
                                                             134217728,
                                                             (<type 'dict'>,
                                                              []),
                                                             None,
                                                             None)}
(Pdb) pp values[1][0].dask
{'read_block_from_s3-4aca7fe2-75d1-432a-ad82-be2eed056a19': (<function read_block_from_s3 at 0x7f6cfd5c6c08>,
                                                             'test/test/accounts.2.json',
                                                             0,
                                                             134217728,
                                                             (<type 'dict'>,
                                                              []),
                                                             None,
                                                             None)}

@mrocklin
Copy link
Member Author

Turns out moto isn't threadsafe. getmoto/moto#313

Switching to single thread. All is well.

Removing the WIP label

@mrocklin mrocklin changed the title [WIP] Refactor data loading to manage blocks of bytes Refactor data loading to manage blocks of bytes Apr 26, 2016
@mrocklin mrocklin changed the title Refactor data loading to manage blocks of bytes Refactor data ingest and read_csv Apr 26, 2016
@mrocklin
Copy link
Member Author

For this I think you need the lzmaffi module (as seen in this comment: #1096 (comment))

Using the xz --list command line utility I also learned that my file just had a single block.


def read_text(path, blocksize=None, compression='infer',
encoding=system_encoding, errors='strict',
linedelimiter=os.linesep, collection=True, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't '\n' more likely to be the line delimiter for any non-local file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. Maybe we keep this as None and let the bytes backends use their own defaults.

@mrocklin
Copy link
Member Author

Alright, merging.

@mrocklin mrocklin merged commit ec6d1e0 into dask:master Apr 28, 2016
@mrocklin mrocklin deleted the bytes branch April 28, 2016 18:43
@sinhrks sinhrks added this to the 0.9.0 milestone May 11, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants