Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions doc/source/data/transforming-datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ Types of UDFs
=============
There are three types of UDFs that you can use with Ray Data: Function UDFs, Callable Class UDFs, and Generator UDFs.

.. tabbed:: "Function UDFs"
.. tabbed:: Function UDFs

The most basic UDFs are functions that take in a batch or row as input, and returns a batch or row as output. See :ref:`transform_datasets_batch_formats` for the supported batch formats.

Expand All @@ -114,7 +114,7 @@ There are three types of UDFs that you can use with Ray Data: Function UDFs, Cal
:start-after: __writing_default_udfs_tabular_begin__
:end-before: __writing_default_udfs_tabular_end__

.. tabbed:: "Callable Class UDFs"
.. tabbed:: Callable Class UDFs

With the actor compute strategy, you can use per-row and per-batch UDFs
*callable classes*, i.e., classes that implement the ``__call__`` magic method. You
Expand All @@ -132,7 +132,7 @@ There are three types of UDFs that you can use with Ray Data: Function UDFs, Cal
:start-after: __writing_callable_classes_udfs_begin__
:end-before: __writing_callable_classes_udfs_end__

.. tabbed:: "Generator UDFs"
.. tabbed:: Generator UDFs

UDFs can also be written as Python generators, yielding multiple outputs for a batch or row instead of a single item. Generator UDFs are useful when returning large objects. Instead of returning a very large output batch, ``fn`` can instead yield the output batch in chunks to avoid excessive heap memory usage.

Expand Down Expand Up @@ -229,11 +229,14 @@ Here is an overview of the available batch formats:
:language: python
:start-after: __writing_numpy_udfs_begin__
:end-before: __writing_numpy_udfs_end__

.. tabbed:: "zero-copy"


Converting between the underlying Datasets data representations (Arrow, Pandas, and
Python lists) and the requested batch format (``"default"``, ``"pandas"``,
``"pyarrow"``, ``"numpy"``) may incur data copies; which conversions cause data copying
is given in the below table:
``"pyarrow"``, ``"numpy"``) may incur data copies. You can specify ``"zero-copy"`` to always guarantee no extra copies when constructing the batches.
Which conversions cause data copying is given in the below table:


.. list-table:: Data Format Conversion Costs
Expand All @@ -245,21 +248,25 @@ is given in the below table:
- ``"pandas"``
- ``"numpy"``
- ``"pyarrow"``
- ``"zero-copy"``
* - ``"pandas"``
- Zero-copy
- Zero-copy
- Copy*
- Copy*
- Zero-copy
* - ``"arrow"``
- Copy*
- Copy*
- Zero-copy*
- Zero-copy
- Zero-copy
* - ``"simple"``
- Zero-copy
- Copy
- Copy
- Copy
- Zero-copy

.. note::
\* No copies occur when converting between Arrow, Pandas, and NumPy formats for columns
Expand Down
3 changes: 0 additions & 3 deletions python/ray/data/_internal/dataset_iterator_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ def stats(self) -> str:
def schema(self) -> Union[type, "pyarrow.lib.Schema"]:
return self._base_dataset.schema()

def _default_batch_format(self) -> Literal["default", "pandas", "pyarrow", "numpy"]:
return _default_batch_format(self._base_dataset)

def __getattr__(self, name):
if name == "_base_dataset":
raise AttributeError()
Expand Down
4 changes: 0 additions & 4 deletions python/ray/data/_internal/stream_split_dataset_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
)

import ray
from ray.data._internal.util import _default_batch_format

from ray.data.dataset_iterator import DatasetIterator
from ray.data.block import Block, DataBatch
Expand Down Expand Up @@ -134,9 +133,6 @@ def schema(self) -> Union[type, "pyarrow.lib.Schema"]:
"""Implements DatasetIterator."""
return self._base_dataset.schema()

def _default_batch_format(self) -> Literal["default", "pandas", "pyarrow", "numpy"]:
return _default_batch_format(self._base_dataset)


@ray.remote(num_cpus=0)
class SplitCoordinator:
Expand Down
25 changes: 0 additions & 25 deletions python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,31 +406,6 @@ def _split_list(arr: List[Any], num_splits: int) -> List[List[Any]]:
return splits


def _default_batch_format(
ds: "Dataset",
) -> Literal["default", "pandas", "pyarrow", "numpy"]:
"""Get the best batch format that lines up with the dataset format."""
ctx = DatasetContext.get_current()
if ctx.use_streaming_executor:
# TODO: calling dataset_format() triggers bulk execution.
batch_format = "default"
else:
try:
dataset_format = ds.dataset_format()
except ValueError:
# Dataset is empty or cleared, so fall back to "default".
batch_format = "default"
else:
batch_format = (
"pyarrow"
if dataset_format == BlockFormat.ARROW
else "pandas"
if dataset_format == BlockFormat.PANDAS
else "default"
)
return batch_format


def capfirst(s: str):
"""Capitalize the first letter of a string

Expand Down
4 changes: 3 additions & 1 deletion python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def __call__(self, __arg: T) -> Union[U, Iterator[U]]:
# is on by default. When block splitting is off, the type is a plain block.
MaybeBlockPartition = Union[Block, ObjectRefGenerator]

VALID_BATCH_FORMATS = ["default", "native", "pandas", "pyarrow", "numpy"]
VALID_BATCH_FORMATS = ["default", "native", "pandas", "pyarrow", "numpy", "zero-copy"]


@DeveloperAPI
Expand Down Expand Up @@ -326,6 +326,8 @@ def to_batch_format(self, batch_format: str) -> DataBatch:
"""
if batch_format == "default" or batch_format == "native":
return self.to_default()
elif batch_format == "zero-copy":
return self.to_block()
elif batch_format == "pandas":
return self.to_pandas()
elif batch_format == "pyarrow":
Expand Down
15 changes: 7 additions & 8 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ def map_batches(
*,
batch_size: Optional[Union[int, Literal["default"]]] = "default",
compute: Optional[Union[str, ComputeStrategy]] = None,
batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default",
batch_format: Literal["default", "pandas", "pyarrow", "numpy", "zero-copy"] = "default",
prefetch_batches: int = 0,
zero_copy_batch: bool = False,
fn_args: Optional[Iterable[Any]] = None,
Expand Down Expand Up @@ -540,7 +540,7 @@ def map_batches(
(promotes tables to Pandas and tensors to NumPy), ``"pandas"`` to select
``pandas.DataFrame``, "pyarrow" to select ``pyarrow.Table``, or
``"numpy"`` to select ``numpy.ndarray`` for tensor datasets and
``Dict[str, numpy.ndarray]`` for tabular datasets. Default is "default".
``Dict[str, numpy.ndarray]`` for tabular datasets, or ``"zero-copy"`` to return the underlying block exactly as is with no additional formatting. Default is "default".
prefetch_batches: The number of batches to fetch ahead of the current batch
to process. If set to greater than 0, a separate thread will be used
to fetch the specified amount of formatted batches from blocks. This
Expand Down Expand Up @@ -2929,12 +2929,11 @@ def iter_batches(
as batches (blocks may contain different number of rows).
The final batch may include fewer than ``batch_size`` rows if
``drop_last`` is ``False``. Defaults to 256.
batch_format: The format in which to return each batch.
Specify "default" to use the default block format (promoting
tables to Pandas and tensors to NumPy), "pandas" to select
``pandas.DataFrame``, "pyarrow" to select ``pyarrow.Table``, or "numpy"
to select ``numpy.ndarray`` for tensor datasets and
``Dict[str, numpy.ndarray]`` for tabular datasets. Default is "default".
batch_format: Specify ``"default"`` to use the default block format
(promotes tables to Pandas and tensors to NumPy), ``"pandas"`` to select
``pandas.DataFrame``, "pyarrow" to select ``pyarrow.Table``, or
``"numpy"`` to select ``numpy.ndarray`` for tensor datasets and
``Dict[str, numpy.ndarray]`` for tabular datasets, or ``"zero-copy"`` to return the underlying block exactly as is with no additional formatting. Default is "default".
drop_last: Whether to drop the last batch if it's incomplete.
local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
using a local in-memory shuffle buffer, and this value will serve as the
Expand Down
11 changes: 1 addition & 10 deletions python/ray/data/dataset_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,8 @@ def iter_rows(self, *, prefetch_blocks: int = 0) -> Iterator[Union[T, TableRow]]
Returns:
An iterator over rows of the dataset.
"""
target_format = self._default_batch_format()
for batch in self.iter_batches(
batch_size=None, prefetch_blocks=prefetch_blocks, batch_format=target_format
batch_size=None, prefetch_blocks=prefetch_blocks, batch_format="zero-copy"
):
batch = BlockAccessor.for_block(BlockAccessor.batch_to_block(batch))
for row in batch.iter_rows():
Expand Down Expand Up @@ -704,11 +703,3 @@ def _is_tensor_dataset(self) -> bool:
if schema is None or isinstance(schema, type):
return False
return _is_tensor_schema(schema.names)

def _default_batch_format(self) -> Literal["default", "pandas", "pyarrow", "numpy"]:
"""Returns the best batch format that lines up with the dataset format.

NOTE: Return "default" here. Subclass can override this method to decide best
batch format.
"""
return "default"
9 changes: 5 additions & 4 deletions python/ray/data/grouped_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,11 @@ def map_groups(
batch of zero or more records, similar to map_batches().
compute: The compute strategy, either "tasks" (default) to use Ray
tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool.
batch_format: Specify "default" to use the default block format
(promotes Arrow to pandas), "pandas" to select
``pandas.DataFrame`` as the batch format,
or "pyarrow" to select ``pyarrow.Table``.
batch_format: Specify ``"default"`` to use the default block format
(promotes tables to Pandas and tensors to NumPy), ``"pandas"`` to select
``pandas.DataFrame``, "pyarrow" to select ``pyarrow.Table``, or
``"numpy"`` to select ``numpy.ndarray`` for tensor datasets and
``Dict[str, numpy.ndarray]`` for tabular datasets, or ``"zero-copy"`` to return the underlying block exactly as is with no additional formatting. Default is "default".
ray_remote_args: Additional resource requirements to request from
ray (e.g., num_gpus=1 to request GPUs for the map tasks).

Expand Down
39 changes: 3 additions & 36 deletions python/ray/data/tests/test_dataset_consumption.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,14 +391,7 @@ def test_convert_types(ray_start_regular_shared):

arrow_ds = ray.data.range_table(1)
assert arrow_ds.map(lambda x: "plain_{}".format(x["value"])).take() == ["plain_0"]
# In streaming, we set batch_format to "default" (because calling
# ds.dataset_format() will still invoke bulk execution and we want
# to avoid that). As a result, it's receiving PandasRow (the defaut
# batch format), which unwraps [0] to plain 0.
if ray.data.context.DatasetContext.get_current().use_streaming_executor:
assert arrow_ds.map(lambda x: {"a": (x["value"],)}).take() == [{"a": 0}]
else:
assert arrow_ds.map(lambda x: {"a": (x["value"],)}).take() == [{"a": [0]}]
assert arrow_ds.map(lambda x: {"a": (x["value"],)}).take() == [{"a": [0]}]


def test_from_items(ray_start_regular_shared):
Expand Down Expand Up @@ -483,14 +476,7 @@ def to_pylist(table):
# Default ArrowRows.
for row, t_row in zip(ds.iter_rows(), to_pylist(t)):
assert isinstance(row, TableRow)
# In streaming, we set batch_format to "default" because calling
# ds.dataset_format() will still invoke bulk execution and we want
# to avoid that. As a result, it's receiving PandasRow (the defaut
# batch format).
if ray.data.context.DatasetContext.get_current().use_streaming_executor:
assert isinstance(row, PandasRow)
else:
assert isinstance(row, ArrowRow)
assert isinstance(row, ArrowRow)
assert row == t_row

# PandasRows after conversion.
Expand All @@ -504,14 +490,7 @@ def to_pylist(table):
# Prefetch.
for row, t_row in zip(ds.iter_rows(prefetch_blocks=1), to_pylist(t)):
assert isinstance(row, TableRow)
# In streaming, we set batch_format to "default" because calling
# ds.dataset_format() will still invoke bulk execution and we want
# to avoid that. As a result, it's receiving PandasRow (the defaut
# batch format).
if ray.data.context.DatasetContext.get_current().use_streaming_executor:
assert isinstance(row, PandasRow)
else:
assert isinstance(row, ArrowRow)
assert isinstance(row, ArrowRow)
assert row == t_row


Expand Down Expand Up @@ -1588,18 +1567,6 @@ def f(should_import_polars):
ctx.use_polars = original_use_polars


def test_default_batch_format(shutdown_only):
ds = ray.data.range(100)
assert ds.default_batch_format() == list

ds = ray.data.range_tensor(100)
assert ds.default_batch_format() == np.ndarray

df = pd.DataFrame({"foo": ["a", "b"], "bar": [0, 1]})
ds = ray.data.from_pandas(df)
assert ds.default_batch_format() == pd.DataFrame


def test_dataset_schema_after_read_stats(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=1)
Expand Down
21 changes: 1 addition & 20 deletions python/ray/data/tests/test_dataset_tfrecords.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,26 +269,7 @@ def _features_to_schema(features: "tf.train.Features") -> "schema_pb2.Schema":


def _ds_eq_streaming(ds_expected, ds_actual) -> bool:
if not ray.data.context.DatasetContext.get_current().use_streaming_executor:
assert ds_expected.take() == ds_actual.take()
else:
# In streaming, we set batch_format to "default" (because calling
# ds.dataset_format() will still invoke bulk execution and we want
# to avoid that). As a result, it's receiving PandasRow (the defaut
# batch format), which doesn't have the same ordering of columns as
# the ArrowRow.
from ray.data.block import BlockAccessor

def get_rows(ds):
rows = []
for batch in ds.iter_batches(batch_size=None, batch_format="pyarrow"):
batch = BlockAccessor.for_block(BlockAccessor.batch_to_block(batch))
for row in batch.iter_rows():
rows.append(row)
return rows

assert get_rows(ds_expected) == get_rows(ds_actual)

assert ds_expected.take() == ds_actual.take()

@pytest.mark.parametrize("with_tf_schema", (True, False))
def test_read_tfrecords(
Expand Down