DriverTrac/venv/lib/python3.12/site-packages/polars/lazyframe/opt_flags.py
2025-11-28 09:08:33 +05:30

334 lines
12 KiB
Python

from __future__ import annotations
import contextlib
from polars._utils.deprecation import issue_deprecation_warning
with contextlib.suppress(ImportError): # Module not available when building docs
from polars._plr import PyOptFlags
import inspect
from functools import wraps
from typing import TYPE_CHECKING, Callable, TypeVar
if TYPE_CHECKING:
import sys
if sys.version_info >= (3, 10):
from typing import ParamSpec
else:
from typing_extensions import ParamSpec
P = ParamSpec("P")
T = TypeVar("T")
class QueryOptFlags:
"""
The set of the optimizations considered during query optimization.
.. warning::
This functionality is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
"""
def __init__(
self,
*,
predicate_pushdown: None | bool = None,
projection_pushdown: None | bool = None,
simplify_expression: None | bool = None,
slice_pushdown: None | bool = None,
comm_subplan_elim: None | bool = None,
comm_subexpr_elim: None | bool = None,
cluster_with_columns: None | bool = None,
collapse_joins: None | bool = None,
check_order_observe: None | bool = None,
fast_projection: None | bool = None,
) -> None:
self._pyoptflags = PyOptFlags.default()
self.update(
predicate_pushdown=predicate_pushdown,
projection_pushdown=projection_pushdown,
simplify_expression=simplify_expression,
slice_pushdown=slice_pushdown,
comm_subplan_elim=comm_subplan_elim,
comm_subexpr_elim=comm_subexpr_elim,
cluster_with_columns=cluster_with_columns,
collapse_joins=collapse_joins,
check_order_observe=check_order_observe,
fast_projection=fast_projection,
)
@classmethod
def _from_pyoptflags(self, pyoptflags: PyOptFlags) -> QueryOptFlags:
optflags = self.__new__(self)
optflags._pyoptflags = pyoptflags
return optflags
@staticmethod
def none(
*,
predicate_pushdown: None | bool = None,
projection_pushdown: None | bool = None,
simplify_expression: None | bool = None,
slice_pushdown: None | bool = None,
comm_subplan_elim: None | bool = None,
comm_subexpr_elim: None | bool = None,
cluster_with_columns: None | bool = None,
collapse_joins: None | bool = None,
check_order_observe: None | bool = None,
fast_projection: None | bool = None,
) -> QueryOptFlags:
"""Create new empty set off optimizations."""
optflags = QueryOptFlags()
optflags.no_optimizations()
return optflags.update(
predicate_pushdown=predicate_pushdown,
projection_pushdown=projection_pushdown,
simplify_expression=simplify_expression,
slice_pushdown=slice_pushdown,
comm_subplan_elim=comm_subplan_elim,
comm_subexpr_elim=comm_subexpr_elim,
cluster_with_columns=cluster_with_columns,
collapse_joins=collapse_joins,
check_order_observe=check_order_observe,
fast_projection=fast_projection,
)
def update(
self,
*,
predicate_pushdown: None | bool = None,
projection_pushdown: None | bool = None,
simplify_expression: None | bool = None,
slice_pushdown: None | bool = None,
comm_subplan_elim: None | bool = None,
comm_subexpr_elim: None | bool = None,
cluster_with_columns: None | bool = None,
collapse_joins: None | bool = None,
check_order_observe: None | bool = None,
fast_projection: None | bool = None,
) -> QueryOptFlags:
"""Update the current optimization flags."""
if predicate_pushdown is not None:
self.predicate_pushdown = predicate_pushdown
if projection_pushdown is not None:
self.projection_pushdown = projection_pushdown
if simplify_expression is not None:
self.simplify_expression = simplify_expression
if slice_pushdown is not None:
self.slice_pushdown = slice_pushdown
if comm_subplan_elim is not None:
self.comm_subplan_elim = comm_subplan_elim
if comm_subexpr_elim is not None:
self.comm_subexpr_elim = comm_subexpr_elim
if cluster_with_columns is not None:
self.cluster_with_columns = cluster_with_columns
if collapse_joins is not None:
issue_deprecation_warning(
"the `collapse_joins` parameter for `QueryOptFlags` is deprecated. "
"Use `predicate_pushdown` instead.",
version="1.33.1",
)
if not collapse_joins:
self.predicate_pushdown = False
if check_order_observe is not None:
self.check_order_observe = check_order_observe
if fast_projection is not None:
self.fast_projection = fast_projection
return self
@staticmethod
def _eager() -> QueryOptFlags:
"""Create new empty set off optimizations."""
optflags = QueryOptFlags()
optflags.no_optimizations()
optflags._pyoptflags.eager = True
optflags.simplify_expression = True
return optflags
def __copy__(self) -> QueryOptFlags:
return QueryOptFlags._from_pyoptflags(self._pyoptflags.copy())
def __deepcopy__(self) -> QueryOptFlags:
return QueryOptFlags._from_pyoptflags(self._pyoptflags.copy())
def no_optimizations(self) -> None:
"""Remove selected optimizations."""
self._pyoptflags.no_optimizations()
@property
def projection_pushdown(self) -> bool:
"""Only read columns that are used later in the query."""
return self._pyoptflags.projection_pushdown
@projection_pushdown.setter
def projection_pushdown(self, value: bool) -> None:
self._pyoptflags.projection_pushdown = value
@property
def predicate_pushdown(self) -> bool:
"""Apply predicates/filters as early as possible."""
return self._pyoptflags.predicate_pushdown
@predicate_pushdown.setter
def predicate_pushdown(self, value: bool) -> None:
self._pyoptflags.predicate_pushdown = value
@property
def cluster_with_columns(self) -> bool:
"""Cluster sequential `with_columns` calls to independent calls."""
return self._pyoptflags.cluster_with_columns
@cluster_with_columns.setter
def cluster_with_columns(self, value: bool) -> None:
self._pyoptflags.cluster_with_columns = value
@property
def simplify_expression(self) -> bool:
"""Run many expression optimization rules until fixed point."""
return self._pyoptflags.simplify_expression
@simplify_expression.setter
def simplify_expression(self, value: bool) -> None:
self._pyoptflags.simplify_expression = value
@property
def slice_pushdown(self) -> bool:
"""Pushdown slices/limits."""
return self._pyoptflags.slice_pushdown
@slice_pushdown.setter
def slice_pushdown(self, value: bool) -> None:
self._pyoptflags.slice_pushdown = value
@property
def comm_subplan_elim(self) -> bool:
"""Elide duplicate plans and caches their outputs."""
return self._pyoptflags.comm_subplan_elim
@comm_subplan_elim.setter
def comm_subplan_elim(self, value: bool) -> None:
self._pyoptflags.comm_subplan_elim = value
@property
def comm_subexpr_elim(self) -> bool:
"""Elide duplicate expressions and caches their outputs."""
return self._pyoptflags.comm_subexpr_elim
@comm_subexpr_elim.setter
def comm_subexpr_elim(self, value: bool) -> None:
self._pyoptflags.comm_subexpr_elim = value
@property
def check_order_observe(self) -> bool:
"""Do not maintain order if the order would not be observed."""
return self._pyoptflags.check_order_observe
@check_order_observe.setter
def check_order_observe(self, value: bool) -> None:
self._pyoptflags.check_order_observe = value
@property
def fast_projection(self) -> bool:
"""Replace simple projections with a faster inlined projection that skips the expression engine.""" # noqa: W505
return self._pyoptflags.fast_projection
@fast_projection.setter
def fast_projection(self, value: bool) -> None:
self._pyoptflags.fast_projection = value
def __str__(self) -> str:
return f"""
QueryOptFlags {{
type_coercion: {self._pyoptflags.type_coercion}
type_check: {self._pyoptflags.type_check}
predicate_pushdown: {self.predicate_pushdown}
projection_pushdown: {self.projection_pushdown}
simplify_expression: {self.simplify_expression}
slice_pushdown: {self.slice_pushdown}
comm_subplan_elim: {self.comm_subplan_elim}
comm_subexpr_elim: {self.comm_subexpr_elim}
cluster_with_columns: {self.cluster_with_columns}
check_order_observe: {self.check_order_observe}
fast_projection: {self.fast_projection}
eager: {self._pyoptflags.eager}
streaming: {self._pyoptflags.streaming}
}}
""".strip()
DEFAULT_QUERY_OPT_FLAGS: QueryOptFlags
try: # Module not available when building docs
DEFAULT_QUERY_OPT_FLAGS = QueryOptFlags()
except (ImportError, NameError) as _:
DEFAULT_QUERY_OPT_FLAGS = () # type: ignore[assignment]
def forward_old_opt_flags() -> Callable[[Callable[P, T]], Callable[P, T]]:
"""Decorator to mark to forward the old optimization flags."""
def helper(f: QueryOptFlags, field_name: str, value: bool) -> QueryOptFlags: # noqa: FBT001
setattr(f, field_name, value)
return f
def helper_hidden(f: QueryOptFlags, field_name: str, value: bool) -> QueryOptFlags: # noqa: FBT001
setattr(f._pyoptflags, field_name, value)
return f
def clear_optimizations(f: QueryOptFlags, value: bool) -> QueryOptFlags: # noqa: FBT001
if value:
return QueryOptFlags.none()
else:
return f
def eager(f: QueryOptFlags, value: bool) -> QueryOptFlags: # noqa: FBT001
if value:
return QueryOptFlags._eager()
else:
return f
OLD_OPT_PARAMETERS_MAPPING = {
"no_optimization": lambda f, v: clear_optimizations(f, v),
"_eager": lambda f, v: eager(f, v),
"type_coercion": lambda f, v: helper_hidden(f, "type_coercion", v),
"_type_check": lambda f, v: helper_hidden(f, "type_check", v),
"predicate_pushdown": lambda f, v: helper(f, "predicate_pushdown", v),
"projection_pushdown": lambda f, v: helper(f, "projection_pushdown", v),
"simplify_expression": lambda f, v: helper(f, "simplify_expression", v),
"slice_pushdown": lambda f, v: helper(f, "slice_pushdown", v),
"comm_subplan_elim": lambda f, v: helper(f, "comm_subplan_elim", v),
"comm_subexpr_elim": lambda f, v: helper(f, "comm_subexpr_elim", v),
"cluster_with_columns": lambda f, v: helper(f, "cluster_with_columns", v),
"collapse_joins": lambda f, v: helper(f, "collapse_joins", v),
"_check_order": lambda f, v: helper(f, "check_order_observe", v),
}
def decorate(function: Callable[P, T]) -> Callable[P, T]:
@wraps(function)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
optflags: QueryOptFlags = kwargs.get(
"optimizations", DEFAULT_QUERY_OPT_FLAGS
) # type: ignore[assignment]
optflags = optflags.__copy__()
for key in list(kwargs.keys()):
cb = OLD_OPT_PARAMETERS_MAPPING.get(key)
if cb is not None:
from polars._utils.various import issue_warning
message = f"optimization flag `{key}` is deprecated. Please use `optimizations` parameter\n(Deprecated in version 1.30.0)"
issue_warning(message, DeprecationWarning)
optflags = cb(optflags, kwargs.pop(key)) # type: ignore[no-untyped-call,unused-ignore]
kwargs["optimizations"] = optflags
return function(*args, **kwargs)
wrapper.__signature__ = inspect.signature(function) # type: ignore[attr-defined]
return wrapper
return decorate