492 lines
16 KiB
Python
492 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
import contextlib
|
|
from collections.abc import Iterable, Mapping, Sequence
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
|
|
from polars import DataFrame, col
|
|
from polars._typing import PartitioningScheme
|
|
from polars._utils.unstable import issue_unstable_warning
|
|
from polars.expr import Expr
|
|
|
|
if TYPE_CHECKING:
|
|
with contextlib.suppress(ImportError): # Module not available when building docs
|
|
from polars._plr import PyDataFrame, PyExpr
|
|
|
|
from typing import IO, Any, Callable
|
|
|
|
with contextlib.suppress(ImportError): # Module not available when building docs
|
|
from polars._plr import PyPartitioning
|
|
|
|
|
|
class KeyedPartition:
|
|
"""
|
|
A key-value pair for a partition.
|
|
|
|
.. warning::
|
|
This functionality is currently considered **unstable**. It may be
|
|
changed at any point without it being considered a breaking change.
|
|
|
|
See Also
|
|
--------
|
|
PartitionByKey
|
|
PartitionParted
|
|
KeyedPartitionContext
|
|
"""
|
|
|
|
def __init__(self, name: str, str_value: str, raw_value: Any) -> None:
|
|
self.name = name
|
|
self.str_value = str_value
|
|
self.raw_value = raw_value
|
|
|
|
name: str #: Name of the key column.
|
|
str_value: str #: Value of the key as a path and URL safe string.
|
|
raw_value: Any #: Value of the key for this partition.
|
|
|
|
def hive_name(self) -> str:
|
|
"""Get the `key=value`."""
|
|
return f"{self.name}={self.str_value}"
|
|
|
|
|
|
class KeyedPartitionContext:
|
|
"""
|
|
Callback context for a partition creation using keys.
|
|
|
|
.. warning::
|
|
This functionality is currently considered **unstable**. It may be
|
|
changed at any point without it being considered a breaking change.
|
|
|
|
See Also
|
|
--------
|
|
PartitionByKey
|
|
PartitionParted
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
file_idx: int,
|
|
part_idx: int,
|
|
in_part_idx: int,
|
|
keys: list[KeyedPartition],
|
|
file_path: Path,
|
|
full_path: Path,
|
|
) -> None:
|
|
self.file_idx = file_idx
|
|
self.part_idx = part_idx
|
|
self.in_part_idx = in_part_idx
|
|
self.keys = keys
|
|
self.file_path = file_path
|
|
self.full_path = full_path
|
|
|
|
file_idx: int #: The index of the created file starting from zero.
|
|
part_idx: int #: The index of the created partition starting from zero.
|
|
in_part_idx: int #: The index of the file within this partition starting from zero.
|
|
keys: list[KeyedPartition] #: All the key names and values used for this partition.
|
|
file_path: Path #: The chosen output path before the callback was called without `base_path`.
|
|
full_path: (
|
|
Path #: The chosen output path before the callback was called with `base_path`.
|
|
)
|
|
|
|
def hive_dirs(self) -> Path:
|
|
"""The keys mapped to hive directories."""
|
|
assert len(self.keys) > 0
|
|
p = Path(self.keys[0].hive_name())
|
|
for key in self.keys[1:]:
|
|
p /= Path(key.hive_name())
|
|
return p
|
|
|
|
|
|
class BasePartitionContext:
|
|
"""
|
|
Callback context for a partition creation.
|
|
|
|
.. warning::
|
|
This functionality is currently considered **unstable**. It may be
|
|
changed at any point without it being considered a breaking change.
|
|
|
|
See Also
|
|
--------
|
|
PartitionMaxSize
|
|
"""
|
|
|
|
def __init__(self, file_idx: int, file_path: Path, full_path: Path) -> None:
|
|
self.file_idx = file_idx
|
|
self.file_path = file_path
|
|
self.full_path = full_path
|
|
|
|
file_idx: int #: The index of the created file starting from zero.
|
|
file_path: Path #: The chosen output path before the callback was called without `base_path`.
|
|
full_path: (
|
|
Path #: The chosen output path before the callback was called with `base_path`.
|
|
)
|
|
|
|
|
|
def _cast_base_file_path_cb(
|
|
file_path_cb: Callable[[BasePartitionContext], Path | str | IO[bytes] | IO[str]]
|
|
| None,
|
|
) -> Callable[[BasePartitionContext], Path | str | IO[bytes] | IO[str]] | None:
|
|
if file_path_cb is None:
|
|
return None
|
|
return lambda ctx: file_path_cb(
|
|
BasePartitionContext(
|
|
file_idx=ctx.file_idx,
|
|
file_path=Path(ctx.file_path),
|
|
full_path=Path(ctx.full_path),
|
|
)
|
|
)
|
|
|
|
|
|
def _cast_keyed_file_path_cb(
|
|
file_path_cb: Callable[[KeyedPartitionContext], Path | str | IO[bytes] | IO[str]]
|
|
| None,
|
|
) -> Callable[[KeyedPartitionContext], Path | str | IO[bytes] | IO[str]] | None:
|
|
if file_path_cb is None:
|
|
return None
|
|
return lambda ctx: file_path_cb(
|
|
KeyedPartitionContext(
|
|
file_idx=ctx.file_idx,
|
|
part_idx=ctx.part_idx,
|
|
in_part_idx=ctx.in_part_idx,
|
|
keys=[
|
|
KeyedPartition(
|
|
name=kv.name, str_value=kv.str_value, raw_value=kv.raw_value
|
|
)
|
|
for kv in ctx.keys
|
|
],
|
|
file_path=Path(ctx.file_path),
|
|
full_path=Path(ctx.full_path),
|
|
)
|
|
)
|
|
|
|
|
|
def _prepare_per_partition_sort_by(
|
|
e: str | Expr | Iterable[str | Expr] | None,
|
|
) -> list[PyExpr] | None:
|
|
def prepare_one(v: str | Expr) -> PyExpr:
|
|
if isinstance(v, str):
|
|
return col(v)._pyexpr
|
|
elif isinstance(v, Expr):
|
|
return v._pyexpr
|
|
else:
|
|
msg = f"cannot do a per partition sort by for {v!r}"
|
|
raise TypeError(msg)
|
|
|
|
if e is None:
|
|
return None
|
|
elif isinstance(e, str):
|
|
return [col(e)._pyexpr]
|
|
elif isinstance(e, Expr):
|
|
return [e._pyexpr]
|
|
elif isinstance(e, Iterable):
|
|
return [prepare_one(v) for v in e]
|
|
else:
|
|
msg = f"cannot do a per partition sort by for {e!r}"
|
|
raise TypeError(msg)
|
|
|
|
|
|
def _prepare_finish_callback(
|
|
f: Callable[[DataFrame], None] | None,
|
|
) -> Callable[[PyDataFrame], None] | None:
|
|
if f is None:
|
|
return None
|
|
|
|
def cb(pydf: PyDataFrame) -> None:
|
|
nonlocal f
|
|
f(DataFrame._from_pydf(pydf))
|
|
|
|
return cb
|
|
|
|
|
|
class PartitionMaxSize(PartitioningScheme):
|
|
"""
|
|
Partitioning scheme to write files with a maximum size.
|
|
|
|
This partitioning scheme generates files that have a given maximum size. If
|
|
the size reaches the maximum size, it is closed and a new file is opened.
|
|
|
|
.. warning::
|
|
This functionality is currently considered **unstable**. It may be
|
|
changed at any point without it being considered a breaking change.
|
|
|
|
Parameters
|
|
----------
|
|
base_path
|
|
The base path for the output files.
|
|
file_path
|
|
A callback to register or modify the output path for each partition
|
|
relative to the `base_path`. The callback provides a
|
|
:class:`polars.io.partition.BasePartitionContext` that contains information
|
|
about the partition.
|
|
|
|
If no callback is given, it defaults to `{ctx.file_idx}.{EXT}`.
|
|
max_size : int
|
|
The maximum size in rows of each of the generated files.
|
|
per_partition_sort_by
|
|
Columns or expressions to sort over within each partition.
|
|
|
|
Note that this might increase the memory consumption needed for each partition.
|
|
finish_callback
|
|
A callback that gets called when the query finishes successfully.
|
|
|
|
For parquet files, the callback is given a dataframe with metrics about all
|
|
files written files.
|
|
|
|
Examples
|
|
--------
|
|
Split a parquet file by over smaller CSV files with 100 000 rows each:
|
|
|
|
>>> pl.scan_parquet("/path/to/file.parquet").sink_csv(
|
|
... pl.PartitionMax("./out/", max_size=100_000),
|
|
... ) # doctest: +SKIP
|
|
|
|
See Also
|
|
--------
|
|
PartitionByKey
|
|
PartitionParted
|
|
polars.io.partition.BasePartitionContext
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
base_path: str | Path,
|
|
*,
|
|
file_path: Callable[[BasePartitionContext], Path | str | IO[bytes] | IO[str]]
|
|
| None = None,
|
|
max_size: int,
|
|
per_partition_sort_by: str | Expr | Iterable[str | Expr] | None = None,
|
|
finish_callback: Callable[[DataFrame], None] | None = None,
|
|
) -> None:
|
|
issue_unstable_warning("partitioning strategies are considered unstable.")
|
|
super().__init__(
|
|
PyPartitioning.new_max_size(
|
|
base_path=base_path,
|
|
file_path_cb=_cast_base_file_path_cb(file_path),
|
|
max_size=max_size,
|
|
per_partition_sort_by=_prepare_per_partition_sort_by(
|
|
per_partition_sort_by
|
|
),
|
|
finish_callback=_prepare_finish_callback(finish_callback),
|
|
)
|
|
)
|
|
|
|
|
|
def _lower_by(
|
|
by: str | Expr | Sequence[str | Expr] | Mapping[str, Expr],
|
|
) -> list[PyExpr]:
|
|
def to_expr(i: str | Expr) -> Expr:
|
|
if isinstance(i, str):
|
|
return col(i)
|
|
else:
|
|
return i
|
|
|
|
lowered_by: list[PyExpr]
|
|
if isinstance(by, str):
|
|
lowered_by = [col(by)._pyexpr]
|
|
elif isinstance(by, Expr):
|
|
lowered_by = [by._pyexpr]
|
|
elif isinstance(by, Sequence):
|
|
lowered_by = [to_expr(e)._pyexpr for e in by]
|
|
elif isinstance(by, Mapping):
|
|
lowered_by = [e.alias(n)._pyexpr for n, e in by.items()]
|
|
else:
|
|
msg = "invalid `by` type"
|
|
raise TypeError(msg)
|
|
|
|
return lowered_by
|
|
|
|
|
|
class PartitionByKey(PartitioningScheme):
|
|
"""
|
|
Partitioning scheme to write files split by the values of keys.
|
|
|
|
This partitioning scheme generates an arbitrary amount of files splitting
|
|
the data depending on what the value is of key expressions.
|
|
|
|
The amount of files that can be written is not limited. However, when
|
|
writing beyond a certain amount of files, the data for the remaining
|
|
partitions is buffered before writing to the file.
|
|
|
|
.. warning::
|
|
This functionality is currently considered **unstable**. It may be
|
|
changed at any point without it being considered a breaking change.
|
|
|
|
Parameters
|
|
----------
|
|
base_path
|
|
The base path for the output files.
|
|
|
|
Use the `mkdir` option on the `sink_*` methods to ensure directories in
|
|
the path are created.
|
|
file_path
|
|
A callback to register or modify the output path for each partition
|
|
relative to the `base_path`. The callback provides a
|
|
:class:`polars.io.partition.KeyedPartitionContext` that contains information
|
|
about the partition.
|
|
|
|
If no callback is given, it defaults to
|
|
`{ctx.keys.hive_dirs()}/{ctx.in_part_idx}.{EXT}`.
|
|
by
|
|
The expressions to partition by.
|
|
include_key : bool
|
|
Whether to include the key columns in the output files.
|
|
per_partition_sort_by
|
|
Columns or expressions to sort over within each partition.
|
|
|
|
Note that this might increase the memory consumption needed for each partition.
|
|
finish_callback
|
|
A callback that gets called when the query finishes successfully.
|
|
|
|
For parquet files, the callback is given a dataframe with metrics about all
|
|
files written files.
|
|
|
|
Examples
|
|
--------
|
|
Split into a hive-partitioning style partition:
|
|
|
|
>>> (
|
|
... pl.LazyFrame(
|
|
... {"a": [1, 2, 3], "b": [5, 7, 9], "c": ["A", "B", "C"]}
|
|
... ).sink_parquet(
|
|
... pl.PartitionByKey(
|
|
... "./out/",
|
|
... by=["a", "b"],
|
|
... include_key=False,
|
|
... ),
|
|
... mkdir=True,
|
|
... )
|
|
... ) # doctest: +SKIP
|
|
|
|
Split a parquet file by a column `year` into CSV files:
|
|
|
|
>>> pl.scan_parquet("/path/to/file.parquet").sink_csv(
|
|
... PartitionByKey(
|
|
... "./out/",
|
|
... file_path=lambda ctx: f"year={ctx.keys[0].str_value}.csv",
|
|
... by="year",
|
|
... ),
|
|
... ) # doctest: +SKIP
|
|
|
|
See Also
|
|
--------
|
|
PartitionMaxSize
|
|
PartitionParted
|
|
polars.io.partition.KeyedPartitionContext
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
base_path: str | Path,
|
|
*,
|
|
file_path: Callable[[KeyedPartitionContext], Path | str | IO[bytes] | IO[str]]
|
|
| None = None,
|
|
by: str | Expr | Sequence[str | Expr] | Mapping[str, Expr],
|
|
include_key: bool = True,
|
|
per_partition_sort_by: str | Expr | Iterable[str | Expr] | None = None,
|
|
finish_callback: Callable[[DataFrame], None] | None = None,
|
|
) -> None:
|
|
issue_unstable_warning("partitioning strategies are considered unstable.")
|
|
|
|
lowered_by = _lower_by(by)
|
|
super().__init__(
|
|
PyPartitioning.new_by_key(
|
|
base_path=base_path,
|
|
file_path_cb=_cast_keyed_file_path_cb(file_path),
|
|
by=lowered_by,
|
|
include_key=include_key,
|
|
per_partition_sort_by=_prepare_per_partition_sort_by(
|
|
per_partition_sort_by
|
|
),
|
|
finish_callback=_prepare_finish_callback(finish_callback),
|
|
)
|
|
)
|
|
|
|
|
|
class PartitionParted(PartitioningScheme):
|
|
"""
|
|
Partitioning scheme to split parted dataframes.
|
|
|
|
This is a specialized version of :class:`PartitionByKey`. Where as
|
|
:class:`PartitionByKey` accepts data in any order, this scheme expects the input
|
|
data to be pre-grouped or pre-sorted. This scheme suffers a lot less overhead than
|
|
:class:`PartitionByKey`, but may not be always applicable.
|
|
|
|
Each new value of the key expressions starts a new partition, therefore repeating
|
|
the same value multiple times may overwrite previous partitions.
|
|
|
|
.. warning::
|
|
This functionality is currently considered **unstable**. It may be
|
|
changed at any point without it being considered a breaking change.
|
|
|
|
Parameters
|
|
----------
|
|
base_path
|
|
The base path for the output files.
|
|
|
|
Use the `mkdir` option on the `sink_*` methods to ensure directories in
|
|
the path are created.
|
|
file_path
|
|
A callback to register or modify the output path for each partition
|
|
relative to the `base_path`.The callback provides a
|
|
:class:`polars.io.partition.KeyedPartitionContext` that contains information
|
|
about the partition.
|
|
|
|
If no callback is given, it defaults to
|
|
`{ctx.keys.hive_dirs()}/{ctx.in_part_idx}.{EXT}`.
|
|
by
|
|
The expressions to partition by.
|
|
include_key : bool
|
|
Whether to include the key columns in the output files.
|
|
per_partition_sort_by
|
|
Columns or expressions to sort over within each partition.
|
|
|
|
Note that this might increase the memory consumption needed for each partition.
|
|
finish_callback
|
|
A callback that gets called when the query finishes successfully.
|
|
|
|
For parquet files, the callback is given a dataframe with metrics about all
|
|
files written files.
|
|
|
|
Examples
|
|
--------
|
|
Split a parquet file by a column `year` into CSV files:
|
|
|
|
>>> pl.scan_parquet("/path/to/file.parquet").sink_csv(
|
|
... pl.PartitionParted("./out/", by="year"),
|
|
... mkdir=True,
|
|
... ) # doctest: +SKIP
|
|
|
|
See Also
|
|
--------
|
|
PartitionMaxSize
|
|
PartitionByKey
|
|
polars.io.partition.KeyedPartitionContext
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
base_path: str | Path,
|
|
*,
|
|
file_path: Callable[[KeyedPartitionContext], Path | str | IO[bytes] | IO[str]]
|
|
| None = None,
|
|
by: str | Expr | Sequence[str | Expr] | Mapping[str, Expr],
|
|
include_key: bool = True,
|
|
per_partition_sort_by: str | Expr | Iterable[str | Expr] | None = None,
|
|
finish_callback: Callable[[DataFrame], None] | None = None,
|
|
) -> None:
|
|
issue_unstable_warning("partitioning strategies are considered unstable.")
|
|
|
|
lowered_by = _lower_by(by)
|
|
super().__init__(
|
|
PyPartitioning.new_by_key(
|
|
base_path=base_path,
|
|
file_path_cb=_cast_keyed_file_path_cb(file_path),
|
|
by=lowered_by,
|
|
include_key=include_key,
|
|
per_partition_sort_by=_prepare_per_partition_sort_by(
|
|
per_partition_sort_by
|
|
),
|
|
finish_callback=_prepare_finish_callback(finish_callback),
|
|
)
|
|
)
|