Output Handling#

IO backends handle writing model outputs to disk or memory. Use them when saving forecast results, ensemble data, or other workflow outputs. While input data handling is primarily managed by the data sources in earth2studio.data, output handling is managed by the IO backends available in earth2studio.io. These backends are designed to balance the ability for you to customize the arrays and metadata within the exposed backend while also simplifying the design of reusable workflows.

The key extension of the typical (x, coords) data structure movement throughout the rest of the earth2studio code and output store compatibility is the notion of an array_name. Names distinguish between different arrays within the backend and are currently a requirement for storing Datasets in xarray, zarr, and netcdf. This means that you must supply a name when adding an array to a store or when writing an array. A frequent pattern is to extract one dimension of an array, such as "variable" to act as individual arrays in the backend.

IO Backend Interface#

The full requirements for a standard IO backend are defined explicitly in the earth2studio/io/base.py.

@runtime_checkable
class IOBackend(Protocol):
    """Interface for a generic IO backend."""

    def add_array(
        self, coords: CoordSystem, array_name: str | list[str], **kwargs: dict[str, Any]
    ) -> None:
        """
        Add an array with `array_name` to the existing IO backend object.

        Parameters
        ----------
        coords : OrderedDict
            Ordered dictionary of representing the dimensions and coordinate data
            of x.
        array_name : str
            Name of the arrays that will be initialized with coordinates as dimensions.
        kwargs : dict[str, Any], optional
            Optional keyword arguments that will be passed to the IO backend constructor.
        """
        pass

    def write(
        self,
        x: torch.Tensor | list[torch.Tensor],
        coords: CoordSystem,
        array_name: str | list[str],
    ) -> None:
        """
        Write data to the current backend using the passed array_name.

        Parameters
        ----------
        x : torch.Tensor | list[torch.Tensor]
            Tensor(s) to be written to zarr store.
        coords : OrderedDict
            Coordinates of the passed data.
        array_name : str | list[str]
            Name(s) of the array(s) that will be written to.
        """
        pass

Note

IO Backends do not need to inherit this protocol; this is used to define the required APIs. Some built-in IO backends may also offer additional functionality that is not universally supported (and hence not required).

There are two important methods that must be supported:

  • add_array, which adds an array to the underlying store and any attached coordinates

  • write, which explicitly stores the provided data in the backend

The write command can induce synchronization when the input tensor resides on the GPU and the store.

The earth2studio.io.kv backend has the option for storing data on the GPU, which can be done asynchronously.

Most stores make a conversion from PyTorch to numpy in this process, and offer several additional utilities such as __contains__, __getitem__, __len__, and __iter__. Refer to the implementation in earth2studio.io.ZarrBackend:

    def __init__(
        self,
        file_name: str = None,
        chunks: dict[str, int] = {  # to avoid writing in the same chunk by default
            "ensemble": 1,  # dimensions not present in data are ignored
            "time": 1,
            "lead_time": 1,
            "variable": 1,
        },
        backend_kwargs: dict[str, Any] = {"overwrite": False},
        zarr_codecs: CompressorsLike = None,
    ) -> None:

        if file_name is None:
            self.store = zarr.storage.MemoryStore()
        else:
            self.store = zarr.storage.LocalStore(file_name)

        self.root = zarr.group(self.store, **backend_kwargs)
        self.zarr_codecs = zarr_codecs

        # Read data from file, if available
        self.coords: CoordSystem = OrderedDict({})
        self.chunks = chunks.copy()
        for array in self.root:
            # https://github.com/pydata/xarray/pull/9669
            dims = self.root[array].metadata.dimension_names
            for dim in dims:
                if dim not in self.coords:
                    self.coords[dim] = self.root[dim][:]

        for array in self.root:
            if array not in self.coords:
                # https://github.com/pydata/xarray/pull/9669
                dims = self.root[array].metadata.dimension_names
            for c, d in zip(self.root[array].chunks, dims):
                self.chunks[d] = c

    def __contains__(self, item: str) -> bool:
        """Checks if item in Zarr Group.

        Parameters
        ----------
        item : str
        """
        return self.root.__contains__(item)

    def __getitem__(self, item: str) -> "ZarrArray":
        """Gets item in Zarr Group.

        Parameters
        ----------
        item : str
        """
        return self.root.__getitem__(item)

    def __len__(
        self,
    ) -> int:
        """Gets length of Zarr Group."""
        return self.root.__len__()

    def __iter__(
        self,
    ) -> Iterator:
        """Return an iterator over Zarr Group member names."""
        return self.root.__iter__()

Common backends include earth2studio.io.ZarrBackend, earth2studio.io.NetCDF4Backend, and earth2studio.io.AsyncZarrBackend. Because of datetime compatibility, we recommend using the ZarrBackend as a default.

Initializing a Store#

A common data pattern seen throughout our example workflows is to initialize the variables and dimensions of a backend using a complete CoordSystem, refer to Data Movement for the structure. For example:

# Build a complete CoordSystem
total_coords = OrderedDict(
    dict(
        'ensemble': ...,
        'time': ...,
        'lead_time': ...,
        'variable': ...,
        'lat': ...,
        'lon': ...
    )
)

# Give an informative array name
array_name = 'fields'

# Initialize all dimensions in total_coords and the array 'fields'
io.add_array(total_coords, 'fields')

It can be tedious to define each coordinate and dimension. However, if we have a prognostic or diagnostic model, most of this information is already available. Here is a robust example of such a use-case:

# Set up IO backend
# assume we have `prognostic model`, `time`, and `array_name`
# Copy prognostic model output coordinates
total_coords = OrderedDict(
    {
        k: v for k, v in prognostic.output_coords(prognostic.input_coords()).items() if
        (k != "batch") and (v.shape != 0)
    }
)
total_coords["time"] = time
total_coords["lead_time"] = np.asarray(
    [total_coords["lead_time"] * i for i in range(nsteps + 1)]
).flatten()
total_coords.move_to_end("lead_time", last=False)
total_coords.move_to_end("time", last=False)
io.add_array(total_coords, array_name)

Prognostic models, diagnostic models, statistics, and metrics are required to have an output_coords method, which maps from an input coordinate to a corresponding output coordinate. This method is meant to simulate the result of __call__ without having to actually compute the forward call of the method. Review the API documentation for more details.

Another common IO use-case is to extract a particular dimension (usually variable) as the array names.

# A modification of the previous example:
var_names = total_coords.pop("variable")
io.add_array(total_coords, var_names)

Writing to the Store#

After the data arrays have been initialized in the backend, writing to those arrays is a single line of code.

x, coords = model(x, coords)
io.write(x, coords, array_name)

If, as above, you are extracting a dimension of the tensor to use as array names then you can make use of earth2studio.utils.coords.split_coords:

io.write(*split_coords(x, coords, dim="variable"))

For a complete workflow that uses IO backends, refer to earth2studio.run.deterministic() or the deterministic workflow example in the gallery.