1334 lines
49 KiB
Python
1334 lines
49 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
import warnings
|
|
from collections import defaultdict
|
|
from collections.abc import Sequence
|
|
from datetime import time
|
|
from glob import glob
|
|
from io import BufferedReader, BytesIO, StringIO, TextIOWrapper
|
|
from pathlib import Path
|
|
from typing import IO, TYPE_CHECKING, Any, Callable, NoReturn, overload
|
|
|
|
import polars._reexport as pl
|
|
from polars import from_arrow
|
|
from polars import functions as F
|
|
from polars._dependencies import _PYARROW_AVAILABLE, import_optional
|
|
from polars._utils.deprecation import (
|
|
deprecate_renamed_parameter,
|
|
issue_deprecation_warning,
|
|
)
|
|
from polars._utils.various import deduplicate_names, normalize_filepath, parse_version
|
|
from polars.datatypes import (
|
|
N_INFER_DEFAULT,
|
|
Boolean,
|
|
Date,
|
|
Datetime,
|
|
Duration,
|
|
Int64,
|
|
Null,
|
|
String,
|
|
Time,
|
|
UInt8,
|
|
)
|
|
from polars.datatypes.group import FLOAT_DTYPES, INTEGER_DTYPES, NUMERIC_DTYPES
|
|
from polars.exceptions import (
|
|
ModuleUpgradeRequiredError,
|
|
NoDataError,
|
|
ParameterCollisionError,
|
|
)
|
|
from polars.functions import concat
|
|
from polars.io._utils import looks_like_url, process_file_url
|
|
from polars.io.csv.functions import read_csv
|
|
|
|
if TYPE_CHECKING:
|
|
from typing import Literal
|
|
|
|
from polars._typing import ExcelSpreadsheetEngine, FileSource, SchemaDict
|
|
|
|
|
|
def _sources(source: FileSource) -> tuple[Any, bool]:
|
|
"""Unpack any glob patterns, standardise file paths."""
|
|
read_multiple_workbooks = True
|
|
sources: list[Any] = []
|
|
|
|
if isinstance(source, memoryview):
|
|
source = source.tobytes()
|
|
if not isinstance(source, Sequence) or isinstance(source, (bytes, str)):
|
|
read_multiple_workbooks = False
|
|
source = [source] # type: ignore[assignment]
|
|
|
|
for src in source: # type: ignore[union-attr]
|
|
if isinstance(src, (str, os.PathLike)) and not Path(src).exists():
|
|
src = os.path.expanduser(str(src)) # noqa: PTH111
|
|
if looks_like_url(src):
|
|
sources.append(src)
|
|
continue
|
|
sources.extend(files := glob(src, recursive=True)) # noqa: PTH207
|
|
if not files:
|
|
msg = f"no workbook found at path {src!r}"
|
|
raise FileNotFoundError(msg)
|
|
read_multiple_workbooks = True
|
|
else:
|
|
if isinstance(src, os.PathLike):
|
|
src = str(src)
|
|
sources.append(src)
|
|
|
|
return sources, read_multiple_workbooks
|
|
|
|
|
|
def _standardize_duplicates(s: str) -> str:
|
|
"""Standardize columns with '_duplicated_n' names."""
|
|
return re.sub(r"_duplicated_(\d+)", repl=r"\1", string=s)
|
|
|
|
|
|
def _unpack_read_results(
|
|
frames: list[pl.DataFrame] | list[dict[str, pl.DataFrame]],
|
|
*,
|
|
read_multiple_workbooks: bool,
|
|
) -> Any:
|
|
if not frames:
|
|
msg = "no data found in the given workbook(s) and sheet(s)"
|
|
raise NoDataError(msg)
|
|
|
|
if not read_multiple_workbooks:
|
|
# one sheet from one workbook
|
|
return frames[0]
|
|
|
|
if isinstance(frames[0], pl.DataFrame):
|
|
# one sheet from multiple workbooks
|
|
return concat(frames, how="vertical_relaxed") # type: ignore[type-var]
|
|
else:
|
|
# multiple sheets from multiple workbooks
|
|
sheet_frames = defaultdict(list)
|
|
for res in frames:
|
|
for sheet, df in res.items(): # type: ignore[union-attr]
|
|
sheet_frames[sheet].append(df)
|
|
return {k: concat(v, how="vertical_relaxed") for k, v in sheet_frames.items()}
|
|
|
|
|
|
@overload
|
|
def read_excel(
|
|
source: FileSource,
|
|
*,
|
|
sheet_id: None = ...,
|
|
sheet_name: str,
|
|
table_name: str | None = ...,
|
|
engine: ExcelSpreadsheetEngine = ...,
|
|
engine_options: dict[str, Any] | None = ...,
|
|
read_options: dict[str, Any] | None = ...,
|
|
has_header: bool = ...,
|
|
columns: Sequence[int] | Sequence[str] | str | None = ...,
|
|
schema_overrides: SchemaDict | None = ...,
|
|
infer_schema_length: int | None = ...,
|
|
include_file_paths: str | None = ...,
|
|
drop_empty_rows: bool = ...,
|
|
drop_empty_cols: bool = ...,
|
|
raise_if_empty: bool = ...,
|
|
) -> pl.DataFrame: ...
|
|
|
|
|
|
@overload
|
|
def read_excel(
|
|
source: FileSource,
|
|
*,
|
|
sheet_id: None = ...,
|
|
sheet_name: None = ...,
|
|
table_name: str | None = ...,
|
|
engine: ExcelSpreadsheetEngine = ...,
|
|
engine_options: dict[str, Any] | None = ...,
|
|
has_header: bool = ...,
|
|
read_options: dict[str, Any] | None = ...,
|
|
columns: Sequence[int] | Sequence[str] | str | None = ...,
|
|
schema_overrides: SchemaDict | None = ...,
|
|
infer_schema_length: int | None = ...,
|
|
include_file_paths: str | None = ...,
|
|
drop_empty_rows: bool = ...,
|
|
drop_empty_cols: bool = ...,
|
|
raise_if_empty: bool = ...,
|
|
) -> pl.DataFrame: ...
|
|
|
|
|
|
@overload
|
|
def read_excel(
|
|
source: FileSource,
|
|
*,
|
|
sheet_id: int,
|
|
sheet_name: str,
|
|
table_name: str | None = ...,
|
|
engine: ExcelSpreadsheetEngine = ...,
|
|
engine_options: dict[str, Any] | None = ...,
|
|
read_options: dict[str, Any] | None = ...,
|
|
has_header: bool = ...,
|
|
columns: Sequence[int] | Sequence[str] | str | None = ...,
|
|
schema_overrides: SchemaDict | None = ...,
|
|
infer_schema_length: int | None = ...,
|
|
include_file_paths: str | None = ...,
|
|
drop_empty_rows: bool = ...,
|
|
drop_empty_cols: bool = ...,
|
|
raise_if_empty: bool = ...,
|
|
) -> NoReturn: ...
|
|
|
|
|
|
# note: 'ignore' required as mypy thinks that the return value for
|
|
# Literal[0] overlaps with the return value for other integers
|
|
@overload # type: ignore[overload-overlap]
|
|
def read_excel(
|
|
source: FileSource,
|
|
*,
|
|
sheet_id: Literal[0] | Sequence[int],
|
|
sheet_name: None = ...,
|
|
table_name: str | None = ...,
|
|
engine: ExcelSpreadsheetEngine = ...,
|
|
engine_options: dict[str, Any] | None = ...,
|
|
read_options: dict[str, Any] | None = ...,
|
|
has_header: bool = ...,
|
|
columns: Sequence[int] | Sequence[str] | str | None = ...,
|
|
schema_overrides: SchemaDict | None = ...,
|
|
infer_schema_length: int | None = ...,
|
|
include_file_paths: str | None = ...,
|
|
drop_empty_rows: bool = ...,
|
|
drop_empty_cols: bool = ...,
|
|
raise_if_empty: bool = ...,
|
|
) -> dict[str, pl.DataFrame]: ...
|
|
|
|
|
|
@overload
|
|
def read_excel(
|
|
source: FileSource,
|
|
*,
|
|
sheet_id: int,
|
|
sheet_name: None = ...,
|
|
table_name: str | None = ...,
|
|
engine: ExcelSpreadsheetEngine = ...,
|
|
engine_options: dict[str, Any] | None = ...,
|
|
read_options: dict[str, Any] | None = ...,
|
|
has_header: bool = ...,
|
|
columns: Sequence[int] | Sequence[str] | str | None = ...,
|
|
schema_overrides: SchemaDict | None = ...,
|
|
infer_schema_length: int | None = ...,
|
|
include_file_paths: str | None = ...,
|
|
drop_empty_rows: bool = ...,
|
|
drop_empty_cols: bool = ...,
|
|
raise_if_empty: bool = ...,
|
|
) -> pl.DataFrame: ...
|
|
|
|
|
|
@overload
|
|
def read_excel(
|
|
source: FileSource,
|
|
*,
|
|
sheet_id: None = ...,
|
|
sheet_name: list[str] | tuple[str],
|
|
table_name: str | None = ...,
|
|
engine: ExcelSpreadsheetEngine = ...,
|
|
engine_options: dict[str, Any] | None = ...,
|
|
read_options: dict[str, Any] | None = ...,
|
|
has_header: bool = ...,
|
|
columns: Sequence[int] | Sequence[str] | str | None = ...,
|
|
schema_overrides: SchemaDict | None = ...,
|
|
infer_schema_length: int | None = ...,
|
|
include_file_paths: str | None = ...,
|
|
drop_empty_rows: bool = ...,
|
|
drop_empty_cols: bool = ...,
|
|
raise_if_empty: bool = ...,
|
|
) -> dict[str, pl.DataFrame]: ...
|
|
|
|
|
|
@deprecate_renamed_parameter("xlsx2csv_options", "engine_options", version="0.20.6")
|
|
@deprecate_renamed_parameter("read_csv_options", "read_options", version="0.20.7")
|
|
def read_excel(
|
|
source: FileSource,
|
|
*,
|
|
sheet_id: int | Sequence[int] | None = None,
|
|
sheet_name: str | list[str] | tuple[str] | None = None,
|
|
table_name: str | None = None,
|
|
engine: ExcelSpreadsheetEngine = "calamine",
|
|
engine_options: dict[str, Any] | None = None,
|
|
read_options: dict[str, Any] | None = None,
|
|
has_header: bool = True,
|
|
columns: Sequence[int] | Sequence[str] | str | None = None,
|
|
schema_overrides: SchemaDict | None = None,
|
|
infer_schema_length: int | None = N_INFER_DEFAULT,
|
|
include_file_paths: str | None = None,
|
|
drop_empty_rows: bool = True,
|
|
drop_empty_cols: bool = True,
|
|
raise_if_empty: bool = True,
|
|
) -> pl.DataFrame | dict[str, pl.DataFrame]:
|
|
"""
|
|
Read Excel spreadsheet data into a DataFrame.
|
|
|
|
.. versionadded:: 1.20
|
|
Support loading data from named table objects with `table_name` parameter.
|
|
.. versionadded:: 1.18
|
|
Support loading data from a list (or glob pattern) of multiple workbooks.
|
|
.. versionchanged:: 1.0
|
|
Default engine is now "calamine" (was "xlsx2csv").
|
|
.. versionchanged:: 0.20.7
|
|
The `read_csv_options` parameter was renamed `read_options`.
|
|
.. versionchanged:: 0.20.6
|
|
The `xlsx2csv_options` parameter was renamed `engine_options`.
|
|
|
|
Parameters
|
|
----------
|
|
source
|
|
Path(s) to a file or a file-like object (by "file-like object" we refer to
|
|
objects that have a `read()` method, such as a file handler like the builtin
|
|
`open` function, or a `BytesIO` instance). For file-like objects, the stream
|
|
position may not be updated after reading.
|
|
sheet_id
|
|
Sheet number(s) to convert (set `0` to load all sheets as DataFrames) and
|
|
return a `{sheetname:frame,}` dict. (Defaults to `1` if neither this nor
|
|
`sheet_name` are specified). Can also take a sequence of sheet numbers.
|
|
sheet_name
|
|
Sheet name(s) to convert; cannot be used in conjunction with `sheet_id`. If
|
|
more than one is given then a `{sheetname:frame,}` dict is returned.
|
|
table_name
|
|
Name of a specific table to read; note that table names are unique across
|
|
the workbook, so additionally specifying a sheet id or name is optional;
|
|
if one of those parameters *is* specified, an error will be raised if
|
|
the named table is not found in that particular sheet.
|
|
engine : {'calamine', 'openpyxl', 'xlsx2csv'}
|
|
Library used to parse the spreadsheet file; defaults to "calamine".
|
|
|
|
* "calamine": this engine can be used for reading all major types of Excel
|
|
Workbook (`.xlsx`, `.xlsb`, `.xls`) and is dramatically faster than the
|
|
other options, using the `fastexcel` module to bind the Rust-based Calamine
|
|
parser.
|
|
* "openpyxl": this engine is significantly slower than both `calamine` and
|
|
`xlsx2csv`, but can provide a useful fallback if you are otherwise unable
|
|
to read data from your workbook.
|
|
* "xlsx2csv": converts the data to an in-memory CSV before using the native
|
|
polars `read_csv` method to parse the result.
|
|
engine_options
|
|
Additional options passed to the underlying engine's primary parsing
|
|
constructor (given below), if supported:
|
|
|
|
* "calamine": n/a (can only provide `read_options`)
|
|
* "openpyxl": `load_workbook <https://openpyxl.readthedocs.io/en/stable/api/openpyxl.reader.excel.html#openpyxl.reader.excel.load_workbook>`_
|
|
* "xlsx2csv": `Xlsx2csv <https://github.com/dilshod/xlsx2csv/blob/f35734aa453d65102198a77e7b8cd04928e6b3a2/xlsx2csv.py#L157>`_
|
|
read_options
|
|
Options passed to the underlying engine method that reads the sheet data.
|
|
Where supported, this allows for additional control over parsing. The
|
|
specific read methods associated with each engine are:
|
|
|
|
* "calamine": `load_sheet_by_name <https://fastexcel.toucantoco.dev/fastexcel.html#ExcelReader.load_sheet_by_name>`_
|
|
(or `load_table <https://fastexcel.toucantoco.dev/fastexcel.html#ExcelReader.load_table>`_
|
|
if using the `table_name` parameter).
|
|
* "openpyxl": n/a (can only provide `engine_options`)
|
|
* "xlsx2csv": see :meth:`read_csv`
|
|
has_header
|
|
Indicate if the first row of the table data is a header or not. If False,
|
|
column names will be autogenerated in the following format: `column_x`, with
|
|
`x` being an enumeration over every column in the dataset, starting at 1.
|
|
columns
|
|
Columns to read from the sheet; if not specified, all columns are read. Can
|
|
be given as a sequence of column names or indices, or a single column name.
|
|
schema_overrides
|
|
Support type specification or override of one or more columns.
|
|
infer_schema_length
|
|
The maximum number of rows to scan for schema inference. If set to `None`, the
|
|
entire dataset is scanned to determine the dtypes, which can slow parsing for
|
|
large workbooks. Note that only the "calamine" and "xlsx2csv" engines support
|
|
this parameter.
|
|
include_file_paths
|
|
Include the path of the source file(s) as a column with this name.
|
|
drop_empty_rows
|
|
Indicate whether to omit empty rows when reading data into the DataFrame.
|
|
drop_empty_cols
|
|
Indicate whether to omit empty columns (with no headers) when reading data into
|
|
the DataFrame (note that empty column identification may vary depending on the
|
|
underlying engine being used).
|
|
raise_if_empty
|
|
When there is no data in the sheet,`NoDataError` is raised. If this parameter
|
|
is set to False, an empty DataFrame (with no columns) is returned instead.
|
|
|
|
Returns
|
|
-------
|
|
DataFrame
|
|
If reading a single sheet.
|
|
dict
|
|
If reading multiple sheets, a "{sheetname: DataFrame, ...}" dict is returned.
|
|
|
|
See Also
|
|
--------
|
|
read_ods
|
|
|
|
Notes
|
|
-----
|
|
* Where possible, prefer the default "calamine" engine for reading Excel Workbooks,
|
|
as it is significantly faster than the other options.
|
|
* When using the `xlsx2csv` engine the target Excel sheet is first converted
|
|
to CSV using `xlsx2csv.Xlsx2csv(source).convert()` and then parsed with Polars'
|
|
:func:`read_csv` function. You can pass additional options to `read_options`
|
|
to influence this part of the parsing pipeline.
|
|
* If you want to read multiple sheets and set *different* options (`read_options`,
|
|
`schema_overrides`, etc), you should make separate calls as the options are set
|
|
globally, not on a per-sheet basis.
|
|
|
|
Examples
|
|
--------
|
|
Read the "data" worksheet from an Excel file into a DataFrame.
|
|
|
|
>>> pl.read_excel(
|
|
... source="test.xlsx",
|
|
... sheet_name="data",
|
|
... ) # doctest: +SKIP
|
|
|
|
If the correct dtypes can't be determined, use the `schema_overrides` parameter
|
|
to specify them, or increase the inference length with `infer_schema_length`.
|
|
|
|
>>> pl.read_excel(
|
|
... source="test.xlsx",
|
|
... schema_overrides={"dt": pl.Date},
|
|
... infer_schema_length=None,
|
|
... ) # doctest: +SKIP
|
|
|
|
Using the `xlsx2csv` engine, read table data from sheet 3 in an Excel workbook as a
|
|
DataFrame while skipping empty lines in the sheet. As sheet 3 does not have a header
|
|
row, you can pass the necessary additional settings for this to the `read_options`
|
|
parameter; these will be passed to :func:`read_csv`.
|
|
|
|
>>> pl.read_excel(
|
|
... source="test.xlsx",
|
|
... sheet_id=3,
|
|
... engine="xlsx2csv",
|
|
... engine_options={"skip_empty_lines": True},
|
|
... read_options={"has_header": False, "new_columns": ["a", "b", "c"]},
|
|
... ) # doctest: +SKIP
|
|
"""
|
|
sources, read_multiple_workbooks = _sources(source)
|
|
frames: list[pl.DataFrame] | list[dict[str, pl.DataFrame]] = [ # type: ignore[assignment]
|
|
_read_spreadsheet(
|
|
src,
|
|
sheet_id=sheet_id,
|
|
sheet_name=sheet_name,
|
|
table_name=table_name,
|
|
engine=engine,
|
|
engine_options=engine_options,
|
|
read_options=read_options,
|
|
schema_overrides=schema_overrides,
|
|
infer_schema_length=infer_schema_length,
|
|
include_file_paths=include_file_paths,
|
|
raise_if_empty=raise_if_empty,
|
|
has_header=has_header,
|
|
columns=columns,
|
|
drop_empty_rows=drop_empty_rows,
|
|
drop_empty_cols=drop_empty_cols,
|
|
)
|
|
for src in sources
|
|
]
|
|
return _unpack_read_results(
|
|
frames=frames,
|
|
read_multiple_workbooks=read_multiple_workbooks,
|
|
)
|
|
|
|
|
|
@overload
|
|
def read_ods(
|
|
source: FileSource,
|
|
*,
|
|
sheet_id: None = ...,
|
|
sheet_name: str,
|
|
has_header: bool = ...,
|
|
columns: Sequence[int] | Sequence[str] | None = ...,
|
|
schema_overrides: SchemaDict | None = ...,
|
|
infer_schema_length: int | None = ...,
|
|
include_file_paths: str | None = ...,
|
|
drop_empty_rows: bool = ...,
|
|
drop_empty_cols: bool = ...,
|
|
raise_if_empty: bool = ...,
|
|
) -> pl.DataFrame: ...
|
|
|
|
|
|
@overload
|
|
def read_ods(
|
|
source: FileSource,
|
|
*,
|
|
sheet_id: None = ...,
|
|
sheet_name: None = ...,
|
|
has_header: bool = ...,
|
|
columns: Sequence[int] | Sequence[str] | None = ...,
|
|
schema_overrides: SchemaDict | None = ...,
|
|
infer_schema_length: int | None = ...,
|
|
include_file_paths: str | None = ...,
|
|
drop_empty_rows: bool = ...,
|
|
drop_empty_cols: bool = ...,
|
|
raise_if_empty: bool = ...,
|
|
) -> pl.DataFrame: ...
|
|
|
|
|
|
@overload
|
|
def read_ods(
|
|
source: FileSource,
|
|
*,
|
|
sheet_id: int,
|
|
sheet_name: str,
|
|
has_header: bool = ...,
|
|
columns: Sequence[int] | Sequence[str] | None = ...,
|
|
schema_overrides: SchemaDict | None = ...,
|
|
infer_schema_length: int | None = ...,
|
|
include_file_paths: str | None = ...,
|
|
drop_empty_rows: bool = ...,
|
|
drop_empty_cols: bool = ...,
|
|
raise_if_empty: bool = ...,
|
|
) -> NoReturn: ...
|
|
|
|
|
|
@overload # type: ignore[overload-overlap]
|
|
def read_ods(
|
|
source: FileSource,
|
|
*,
|
|
sheet_id: Literal[0] | Sequence[int],
|
|
sheet_name: None = ...,
|
|
has_header: bool = ...,
|
|
columns: Sequence[int] | Sequence[str] | None = ...,
|
|
schema_overrides: SchemaDict | None = ...,
|
|
infer_schema_length: int | None = ...,
|
|
include_file_paths: str | None = ...,
|
|
drop_empty_rows: bool = ...,
|
|
drop_empty_cols: bool = ...,
|
|
raise_if_empty: bool = ...,
|
|
) -> dict[str, pl.DataFrame]: ...
|
|
|
|
|
|
@overload
|
|
def read_ods(
|
|
source: FileSource,
|
|
*,
|
|
sheet_id: int,
|
|
sheet_name: None = ...,
|
|
has_header: bool = ...,
|
|
columns: Sequence[int] | Sequence[str] | None = ...,
|
|
schema_overrides: SchemaDict | None = ...,
|
|
infer_schema_length: int | None = ...,
|
|
include_file_paths: str | None = ...,
|
|
drop_empty_rows: bool = ...,
|
|
drop_empty_cols: bool = ...,
|
|
raise_if_empty: bool = ...,
|
|
) -> pl.DataFrame: ...
|
|
|
|
|
|
@overload
|
|
def read_ods(
|
|
source: FileSource,
|
|
*,
|
|
sheet_id: None = ...,
|
|
sheet_name: list[str] | tuple[str],
|
|
has_header: bool = ...,
|
|
columns: Sequence[int] | Sequence[str] | None = ...,
|
|
schema_overrides: SchemaDict | None = ...,
|
|
infer_schema_length: int | None = ...,
|
|
include_file_paths: str | None = ...,
|
|
drop_empty_rows: bool = ...,
|
|
drop_empty_cols: bool = ...,
|
|
raise_if_empty: bool = ...,
|
|
) -> dict[str, pl.DataFrame]: ...
|
|
|
|
|
|
def read_ods(
|
|
source: FileSource,
|
|
*,
|
|
sheet_id: int | Sequence[int] | None = None,
|
|
sheet_name: str | list[str] | tuple[str] | None = None,
|
|
has_header: bool = True,
|
|
columns: Sequence[int] | Sequence[str] | None = None,
|
|
schema_overrides: SchemaDict | None = None,
|
|
infer_schema_length: int | None = N_INFER_DEFAULT,
|
|
include_file_paths: str | None = None,
|
|
drop_empty_rows: bool = True,
|
|
drop_empty_cols: bool = True,
|
|
raise_if_empty: bool = True,
|
|
) -> pl.DataFrame | dict[str, pl.DataFrame]:
|
|
"""
|
|
Read OpenOffice (ODS) spreadsheet data into a DataFrame.
|
|
|
|
Parameters
|
|
----------
|
|
source
|
|
Path to a file or a file-like object (by "file-like object" we refer to objects
|
|
that have a `read()` method, such as a file handler like the builtin `open`
|
|
function, or a `BytesIO` instance). For file-like objects, the stream position
|
|
may not be updated accordingly after reading.
|
|
sheet_id
|
|
Sheet number(s) to convert, starting from 1 (set `0` to load *all* worksheets
|
|
as DataFrames) and return a `{sheetname:frame,}` dict. (Defaults to `1` if
|
|
neither this nor `sheet_name` are specified). Can also take a sequence of sheet
|
|
numbers.
|
|
sheet_name
|
|
Sheet name(s) to convert; cannot be used in conjunction with `sheet_id`. If
|
|
more than one is given then a `{sheetname:frame,}` dict is returned.
|
|
has_header
|
|
Indicate if the first row of the table data is a header or not. If False,
|
|
column names will be autogenerated in the following format: `column_x`, with
|
|
`x` being an enumeration over every column in the dataset, starting at 1.
|
|
columns
|
|
Columns to read from the sheet; if not specified, all columns are read. Can
|
|
be given as a sequence of column names or indices.
|
|
schema_overrides
|
|
Support type specification or override of one or more columns.
|
|
infer_schema_length
|
|
The maximum number of rows to scan for schema inference. If set to `None`, the
|
|
entire dataset is scanned to determine the dtypes, which can slow parsing for
|
|
large workbooks.
|
|
include_file_paths
|
|
Include the path of the source file(s) as a column with this name.
|
|
drop_empty_rows
|
|
Indicate whether to omit empty rows when reading data into the DataFrame.
|
|
drop_empty_cols
|
|
Indicate whether to omit empty columns (with no headers) when reading data into
|
|
the DataFrame (note that empty column identification may vary depending on the
|
|
underlying engine being used).
|
|
raise_if_empty
|
|
When there is no data in the sheet,`NoDataError` is raised. If this parameter
|
|
is set to False, an empty DataFrame (with no columns) is returned instead.
|
|
|
|
Returns
|
|
-------
|
|
DataFrame, or a `{sheetname: DataFrame, ...}` dict if reading multiple sheets.
|
|
|
|
See Also
|
|
--------
|
|
read_excel
|
|
|
|
Examples
|
|
--------
|
|
Read the "data" worksheet from an OpenOffice spreadsheet file into a DataFrame.
|
|
|
|
>>> pl.read_ods(
|
|
... source="test.ods",
|
|
... sheet_name="data",
|
|
... ) # doctest: +SKIP
|
|
|
|
If the correct dtypes can't be determined, use the `schema_overrides` parameter
|
|
to specify them, or increase the inference length with `infer_schema_length`.
|
|
|
|
>>> pl.read_ods(
|
|
... source="test.ods",
|
|
... sheet_id=3,
|
|
... schema_overrides={"dt": pl.Date},
|
|
... raise_if_empty=False,
|
|
... ) # doctest: +SKIP
|
|
"""
|
|
sources, read_multiple_workbooks = _sources(source)
|
|
frames: list[pl.DataFrame] | list[dict[str, pl.DataFrame]] = [ # type: ignore[assignment]
|
|
_read_spreadsheet(
|
|
src,
|
|
sheet_id=sheet_id,
|
|
sheet_name=sheet_name,
|
|
table_name=None,
|
|
engine="calamine",
|
|
engine_options={},
|
|
read_options=None,
|
|
schema_overrides=schema_overrides,
|
|
infer_schema_length=infer_schema_length,
|
|
include_file_paths=include_file_paths,
|
|
raise_if_empty=raise_if_empty,
|
|
drop_empty_rows=drop_empty_rows,
|
|
drop_empty_cols=drop_empty_cols,
|
|
has_header=has_header,
|
|
columns=columns,
|
|
)
|
|
for src in sources
|
|
]
|
|
return _unpack_read_results(
|
|
frames=frames,
|
|
read_multiple_workbooks=read_multiple_workbooks,
|
|
)
|
|
|
|
|
|
def _read_spreadsheet(
|
|
source: str | IO[bytes] | bytes,
|
|
*,
|
|
sheet_id: int | Sequence[int] | None,
|
|
sheet_name: str | Sequence[str] | None,
|
|
table_name: str | None,
|
|
engine: ExcelSpreadsheetEngine,
|
|
engine_options: dict[str, Any] | None = None,
|
|
read_options: dict[str, Any] | None = None,
|
|
schema_overrides: SchemaDict | None = None,
|
|
infer_schema_length: int | None = N_INFER_DEFAULT,
|
|
include_file_paths: str | None = None,
|
|
columns: Sequence[int] | Sequence[str] | str | None = None,
|
|
has_header: bool = True,
|
|
raise_if_empty: bool = True,
|
|
drop_empty_rows: bool = True,
|
|
drop_empty_cols: bool = True,
|
|
) -> pl.DataFrame | dict[str, pl.DataFrame]:
|
|
if isinstance(source, str):
|
|
source = normalize_filepath(source)
|
|
if looks_like_url(source):
|
|
source = process_file_url(source)
|
|
|
|
if isinstance(columns, str):
|
|
columns = [columns]
|
|
|
|
read_options = _get_read_options(
|
|
read_options,
|
|
engine=engine,
|
|
columns=columns,
|
|
has_header=has_header,
|
|
infer_schema_length=infer_schema_length,
|
|
)
|
|
engine_options = (engine_options or {}).copy()
|
|
schema_overrides = dict(schema_overrides or {})
|
|
|
|
# establish the reading function, parser, and available worksheets
|
|
reader_fn, parser, worksheets = _initialise_spreadsheet_parser(
|
|
engine, source, engine_options
|
|
)
|
|
try:
|
|
# parse data from the indicated sheet(s)
|
|
sheet_names, return_multiple_sheets = _get_sheet_names(
|
|
sheet_id, sheet_name, table_name, worksheets
|
|
)
|
|
parsed_sheets = {
|
|
name: reader_fn(
|
|
parser=parser,
|
|
sheet_name=name,
|
|
schema_overrides=schema_overrides,
|
|
read_options=read_options,
|
|
raise_if_empty=raise_if_empty,
|
|
columns=columns,
|
|
table_name=table_name,
|
|
drop_empty_rows=drop_empty_rows,
|
|
drop_empty_cols=drop_empty_cols,
|
|
)
|
|
for name in sheet_names
|
|
}
|
|
finally:
|
|
if hasattr(parser, "close"):
|
|
parser.close()
|
|
|
|
if not parsed_sheets:
|
|
param, value = ("id", sheet_id) if sheet_name is None else ("name", sheet_name)
|
|
msg = f"no matching sheets found when `sheet_{param}` is {value!r}"
|
|
raise ValueError(msg)
|
|
|
|
if include_file_paths:
|
|
workbook = source if isinstance(source, str) else "in-mem"
|
|
parsed_sheets = {
|
|
name: frame.with_columns(F.lit(workbook).alias(include_file_paths))
|
|
for name, frame in parsed_sheets.items()
|
|
}
|
|
if return_multiple_sheets:
|
|
return parsed_sheets
|
|
return next(iter(parsed_sheets.values()))
|
|
|
|
|
|
def _get_read_options(
|
|
read_options: dict[str, Any] | None,
|
|
*,
|
|
engine: ExcelSpreadsheetEngine,
|
|
columns: Sequence[int] | Sequence[str] | None,
|
|
infer_schema_length: int | None,
|
|
has_header: bool,
|
|
) -> dict[str, Any]:
|
|
"""Normalise top-level parameters to engine-specific 'read_options' dict."""
|
|
read_options = (read_options or {}).copy()
|
|
|
|
if engine == "calamine":
|
|
if ("use_columns" in read_options) and columns:
|
|
msg = 'cannot specify both `columns` and `read_options["use_columns"]`'
|
|
raise ParameterCollisionError(msg)
|
|
elif read_options.get("header_row") is not None and has_header is False:
|
|
msg = 'the values of `has_header` and `read_options["header_row"]` are not compatible'
|
|
raise ParameterCollisionError(msg)
|
|
elif ("schema_sample_rows" in read_options) and (
|
|
infer_schema_length != N_INFER_DEFAULT
|
|
):
|
|
msg = 'cannot specify both `infer_schema_length` and `read_options["schema_sample_rows"]`'
|
|
raise ParameterCollisionError(msg)
|
|
|
|
read_options["schema_sample_rows"] = infer_schema_length
|
|
if has_header is False and "header_row" not in read_options:
|
|
read_options["header_row"] = None
|
|
|
|
elif engine == "xlsx2csv":
|
|
if ("columns" in read_options) and columns:
|
|
msg = 'cannot specify both `columns` and `read_options["columns"]`'
|
|
raise ParameterCollisionError(msg)
|
|
elif (
|
|
"has_header" in read_options
|
|
and read_options["has_header"] is not has_header
|
|
):
|
|
msg = 'the values of `has_header` and `read_options["has_header"]` are not compatible'
|
|
raise ParameterCollisionError(msg)
|
|
elif ("infer_schema_length" in read_options) and (
|
|
infer_schema_length != N_INFER_DEFAULT
|
|
):
|
|
msg = 'cannot specify both `infer_schema_length` and `read_options["infer_schema_length"]`'
|
|
raise ParameterCollisionError(msg)
|
|
|
|
read_options["infer_schema_length"] = infer_schema_length
|
|
if "has_header" not in read_options:
|
|
read_options["has_header"] = has_header
|
|
else:
|
|
read_options["infer_schema_length"] = infer_schema_length
|
|
read_options["has_header"] = has_header
|
|
|
|
return read_options
|
|
|
|
|
|
def _get_sheet_names(
|
|
sheet_id: int | Sequence[int] | None,
|
|
sheet_name: str | Sequence[str] | None,
|
|
table_name: str | None,
|
|
worksheets: list[dict[str, Any]],
|
|
) -> tuple[list[str], bool]:
|
|
"""Establish sheets to read; indicate if we are returning a dict frames."""
|
|
if sheet_id is not None and sheet_name is not None:
|
|
msg = f"cannot specify both `sheet_name` ({sheet_name!r}) and `sheet_id` ({sheet_id!r})"
|
|
raise ValueError(msg)
|
|
|
|
sheet_names = []
|
|
if sheet_id is None and sheet_name is None:
|
|
name = None if table_name else worksheets[0]["name"]
|
|
sheet_names.append(name)
|
|
return_multiple_sheets = False
|
|
elif sheet_id == 0:
|
|
sheet_names.extend(ws["name"] for ws in worksheets)
|
|
return_multiple_sheets = True
|
|
else:
|
|
return_multiple_sheets = (
|
|
(isinstance(sheet_name, Sequence) and not isinstance(sheet_name, str))
|
|
or isinstance(sheet_id, Sequence)
|
|
or sheet_id == 0
|
|
)
|
|
if names := (
|
|
(sheet_name,) if isinstance(sheet_name, str) else sheet_name or ()
|
|
):
|
|
known_sheet_names = {ws["name"] for ws in worksheets}
|
|
for name in names:
|
|
if name not in known_sheet_names:
|
|
msg = f"no matching sheet found when `sheet_name` is {name!r}"
|
|
raise ValueError(msg)
|
|
sheet_names.append(name)
|
|
else:
|
|
ids = (sheet_id,) if isinstance(sheet_id, int) else sheet_id or ()
|
|
sheet_names_by_idx = {
|
|
idx: ws["name"]
|
|
for idx, ws in enumerate(worksheets, start=1)
|
|
if (sheet_id == 0 or ws["index"] in ids or ws["name"] in names)
|
|
}
|
|
for idx in ids:
|
|
if (name := sheet_names_by_idx.get(idx)) is None:
|
|
msg = f"no matching sheet found when `sheet_id` is {idx}"
|
|
raise ValueError(msg)
|
|
sheet_names.append(name)
|
|
|
|
return sheet_names, return_multiple_sheets # type: ignore[return-value]
|
|
|
|
|
|
def _initialise_spreadsheet_parser(
|
|
engine: str | None,
|
|
source: str | IO[bytes] | bytes,
|
|
engine_options: dict[str, Any],
|
|
) -> tuple[Callable[..., pl.DataFrame], Any, list[dict[str, Any]]]:
|
|
"""Instantiate the indicated spreadsheet parser and establish related properties."""
|
|
if isinstance(source, str) and not Path(source).exists():
|
|
raise FileNotFoundError(source)
|
|
|
|
if engine == "xlsx2csv": # default
|
|
xlsx2csv = import_optional("xlsx2csv")
|
|
|
|
# establish sensible defaults for unset options
|
|
for option, value in {
|
|
"exclude_hidden_sheets": False,
|
|
"skip_empty_lines": False,
|
|
"skip_hidden_rows": False,
|
|
"floatformat": "%f",
|
|
}.items():
|
|
engine_options.setdefault(option, value)
|
|
|
|
if isinstance(source, bytes):
|
|
source = BytesIO(source)
|
|
|
|
parser = xlsx2csv.Xlsx2csv(source, **engine_options)
|
|
sheets = parser.workbook.sheets
|
|
return _read_spreadsheet_xlsx2csv, parser, sheets
|
|
|
|
elif engine == "openpyxl":
|
|
openpyxl = import_optional("openpyxl")
|
|
if isinstance(source, bytes):
|
|
source = BytesIO(source)
|
|
|
|
parser = openpyxl.load_workbook(source, data_only=True, **engine_options)
|
|
sheets = [{"index": i + 1, "name": ws.title} for i, ws in enumerate(parser)]
|
|
return _read_spreadsheet_openpyxl, parser, sheets
|
|
|
|
elif engine == "calamine":
|
|
fastexcel = import_optional("fastexcel", min_version="0.7.0")
|
|
reading_bytesio, reading_bytes = (
|
|
isinstance(source, BytesIO),
|
|
isinstance(source, bytes),
|
|
)
|
|
if (reading_bytesio or reading_bytes) and parse_version(
|
|
module_version := fastexcel.__version__
|
|
) < (0, 10):
|
|
msg = f"`fastexcel` >= 0.10 is required to read bytes; found {module_version})"
|
|
raise ModuleUpgradeRequiredError(msg)
|
|
|
|
if reading_bytesio:
|
|
source = source.getvalue() # type: ignore[union-attr]
|
|
elif isinstance(source, (BufferedReader, TextIOWrapper)):
|
|
if "b" not in source.mode:
|
|
msg = f"file {source.name!r} must be opened in binary mode"
|
|
raise OSError(msg)
|
|
elif (filename := source.name) and Path(filename).exists():
|
|
source = filename
|
|
else:
|
|
source = source.read()
|
|
|
|
parser = fastexcel.read_excel(source, **engine_options)
|
|
sheets = [
|
|
{"index": i + 1, "name": nm} for i, nm in enumerate(parser.sheet_names)
|
|
]
|
|
return _read_spreadsheet_calamine, parser, sheets
|
|
|
|
msg = f"unrecognized engine: {engine!r}"
|
|
raise NotImplementedError(msg)
|
|
|
|
|
|
def _csv_buffer_to_frame(
|
|
csv: StringIO,
|
|
*,
|
|
separator: str,
|
|
read_options: dict[str, Any],
|
|
schema_overrides: SchemaDict | None,
|
|
drop_empty_rows: bool,
|
|
drop_empty_cols: bool,
|
|
raise_if_empty: bool,
|
|
) -> pl.DataFrame:
|
|
"""Translate StringIO buffer containing delimited data as a DataFrame."""
|
|
# handle (completely) empty sheet data
|
|
if csv.tell() == 0:
|
|
return _empty_frame(raise_if_empty)
|
|
|
|
# otherwise rewind the buffer and parse as csv
|
|
csv.seek(0)
|
|
|
|
if read_options is None:
|
|
read_options = {}
|
|
|
|
date_cols = []
|
|
if schema_overrides:
|
|
if csv_dtypes := read_options.get("dtypes", {}):
|
|
issue_deprecation_warning(
|
|
"the `dtypes` parameter for `read_csv` is deprecated. It has been renamed to `schema_overrides`.",
|
|
version="0.20.31",
|
|
)
|
|
|
|
csv_schema_overrides = read_options.get("schema_overrides", csv_dtypes)
|
|
if set(csv_schema_overrides).intersection(schema_overrides):
|
|
msg = "cannot specify columns in both `schema_overrides` and `read_options['dtypes']`"
|
|
raise ParameterCollisionError(msg)
|
|
|
|
overrides, schema_overrides = {**csv_schema_overrides, **schema_overrides}, {}
|
|
for nm, dtype in overrides.items():
|
|
if dtype != Date:
|
|
schema_overrides[nm] = dtype
|
|
else:
|
|
date_cols.append(nm)
|
|
|
|
read_options = read_options.copy()
|
|
read_options["schema_overrides"] = schema_overrides
|
|
|
|
df = _drop_null_data(
|
|
df=read_csv(
|
|
csv,
|
|
separator=separator,
|
|
**read_options,
|
|
),
|
|
raise_if_empty=raise_if_empty,
|
|
drop_empty_rows=drop_empty_rows,
|
|
drop_empty_cols=drop_empty_cols,
|
|
)
|
|
if date_cols:
|
|
date_casts, schema = {}, df.schema
|
|
for nm in date_cols:
|
|
if schema[nm] == String:
|
|
date_casts[nm] = (
|
|
F.col(nm)
|
|
.str.replace(r"(?:[ T]00:00:00(?:\.0+)?)$", "")
|
|
.str.to_date()
|
|
)
|
|
if date_casts:
|
|
df = df.with_columns(**date_casts)
|
|
return df
|
|
|
|
|
|
def _drop_null_data(
|
|
df: pl.DataFrame,
|
|
*,
|
|
raise_if_empty: bool,
|
|
drop_empty_rows: bool = True,
|
|
drop_empty_cols: bool = True,
|
|
) -> pl.DataFrame:
|
|
"""If DataFrame contains columns/rows that contain only nulls, drop them."""
|
|
null_cols: list[str] = []
|
|
if drop_empty_cols:
|
|
for col_name in df.columns:
|
|
# note that if multiple unnamed columns are found then all but the first one
|
|
# will be named as "_duplicated_{n}" (or "__UNNAMED__{n}" from calamine)
|
|
if col_name == "" or re.match(r"(_duplicated_|__UNNAMED__)\d+$", col_name):
|
|
col = df[col_name]
|
|
if (
|
|
col.dtype == Null
|
|
or col.null_count() == df.height
|
|
or (
|
|
col.dtype in NUMERIC_DTYPES
|
|
and col.replace(0, None).null_count() == df.height
|
|
)
|
|
):
|
|
null_cols.append(col_name)
|
|
if null_cols:
|
|
df = df.drop(*null_cols)
|
|
|
|
if df.height == df.width == 0:
|
|
return _empty_frame(raise_if_empty)
|
|
if drop_empty_rows:
|
|
return df.filter(~F.all_horizontal(F.all().is_null()))
|
|
return df
|
|
|
|
|
|
def _empty_frame(raise_if_empty: bool) -> pl.DataFrame: # noqa: FBT001
|
|
if raise_if_empty:
|
|
msg = (
|
|
"empty Excel sheet"
|
|
"\n\nIf you want to read this as an empty DataFrame, set `raise_if_empty=False`."
|
|
)
|
|
raise NoDataError(msg)
|
|
return pl.DataFrame()
|
|
|
|
|
|
def _reorder_columns(
|
|
df: pl.DataFrame, columns: Sequence[int] | Sequence[str] | None
|
|
) -> pl.DataFrame:
|
|
if columns:
|
|
from polars.selectors import by_index, by_name
|
|
|
|
cols = by_index(*columns) if isinstance(columns[0], int) else by_name(*columns)
|
|
df = df.select(cols)
|
|
return df
|
|
|
|
|
|
def _read_spreadsheet_calamine(
|
|
parser: Any,
|
|
*,
|
|
sheet_name: str | None,
|
|
read_options: dict[str, Any],
|
|
schema_overrides: SchemaDict | None,
|
|
columns: Sequence[int] | Sequence[str] | None,
|
|
table_name: str | None = None,
|
|
drop_empty_rows: bool,
|
|
drop_empty_cols: bool,
|
|
raise_if_empty: bool,
|
|
) -> pl.DataFrame:
|
|
# if we have 'schema_overrides' and a more recent version of `fastexcel`
|
|
# we can pass translated dtypes to the engine to refine the initial parse
|
|
fastexcel = import_optional("fastexcel")
|
|
fastexcel_version = parse_version(original_version := fastexcel.__version__)
|
|
|
|
if fastexcel_version < (0, 9) and "schema_sample_rows" in read_options:
|
|
msg = f"a more recent version of `fastexcel` is required for 'schema_sample_rows' (>= 0.9; found {original_version})"
|
|
raise ModuleUpgradeRequiredError(msg)
|
|
if fastexcel_version < (0, 10, 2) and "use_columns" in read_options:
|
|
msg = f"a more recent version of `fastexcel` is required for 'use_columns' (>= 0.10.2; found {original_version})"
|
|
raise ModuleUpgradeRequiredError(msg)
|
|
if table_name and fastexcel_version < (0, 12):
|
|
msg = f"a more recent version of `fastexcel` is required for 'table_name' (>= 0.12.0; found {original_version})"
|
|
raise ValueError(msg)
|
|
|
|
if columns:
|
|
if not isinstance(columns, list):
|
|
columns = list(columns) # type: ignore[assignment]
|
|
read_options["use_columns"] = columns
|
|
|
|
schema_overrides = schema_overrides or {}
|
|
if read_options.get("schema_sample_rows") == 0:
|
|
# ref: https://github.com/ToucanToco/fastexcel/issues/236
|
|
del read_options["schema_sample_rows"]
|
|
read_options["dtypes"] = (
|
|
"string"
|
|
if fastexcel_version >= (0, 12, 1)
|
|
else dict.fromkeys(range(16384), "string")
|
|
)
|
|
elif schema_overrides and fastexcel_version >= (0, 10):
|
|
parser_dtypes = read_options.get("dtypes", {})
|
|
for name, dtype in schema_overrides.items():
|
|
if name not in parser_dtypes:
|
|
if (base_dtype := dtype.base_type()) in INTEGER_DTYPES:
|
|
parser_dtypes[name] = "int"
|
|
elif base_dtype in FLOAT_DTYPES:
|
|
parser_dtypes[name] = "float"
|
|
elif base_dtype == String:
|
|
parser_dtypes[name] = "string"
|
|
elif base_dtype == Duration:
|
|
parser_dtypes[name] = "duration"
|
|
elif base_dtype == Boolean:
|
|
parser_dtypes[name] = "boolean"
|
|
|
|
read_options["dtypes"] = parser_dtypes
|
|
|
|
if fastexcel_version < (0, 11, 2):
|
|
ws = parser.load_sheet_by_name(name=sheet_name, **read_options)
|
|
df = ws.to_polars()
|
|
else:
|
|
if table_name:
|
|
if col_names := read_options.get("use_columns"):
|
|
selected_col_names = set(col_names)
|
|
read_options["use_columns"] = lambda col: col.name in selected_col_names
|
|
|
|
xl_table = parser.load_table(table_name, **read_options)
|
|
|
|
if sheet_name and sheet_name != xl_table.sheet_name:
|
|
msg = f"table named {table_name!r} not found in sheet {sheet_name!r}"
|
|
raise RuntimeError(msg)
|
|
df = xl_table.to_polars()
|
|
|
|
elif _PYARROW_AVAILABLE:
|
|
# eager loading is faster / more memory-efficient, but requires pyarrow
|
|
ws_arrow = parser.load_sheet_eager(sheet_name, **read_options)
|
|
df = from_arrow(ws_arrow)
|
|
else:
|
|
ws_arrow = parser.load_sheet(sheet_name, **read_options)
|
|
df = from_arrow(ws_arrow)
|
|
|
|
if read_options.get("header_row", False) is None and not read_options.get(
|
|
"column_names"
|
|
):
|
|
df.columns = [f"column_{i}" for i in range(1, df.width + 1)]
|
|
|
|
df = _drop_null_data(
|
|
df,
|
|
raise_if_empty=raise_if_empty,
|
|
drop_empty_rows=drop_empty_rows,
|
|
drop_empty_cols=drop_empty_cols,
|
|
)
|
|
|
|
# note: even if we applied parser dtypes we still re-apply schema_overrides
|
|
# natively as we can refine integer/float types, temporal precision, etc.
|
|
if schema_overrides:
|
|
lf, schema = df.lazy(), df.schema
|
|
str_to_temporal, updated_overrides = [], {}
|
|
for nm, tp in schema_overrides.items():
|
|
if schema[nm] != String:
|
|
updated_overrides[nm] = tp
|
|
elif tp == Datetime:
|
|
str_to_temporal.append(
|
|
F.col(nm).str.to_datetime(
|
|
time_unit=getattr(tp, "time_unit", None),
|
|
time_zone=getattr(tp, "time_zone", None),
|
|
)
|
|
)
|
|
elif tp == Date:
|
|
dt_str = F.col(nm).str.replace(r"(?:[ T]00:00:00(?:\.0+)?)$", "")
|
|
str_to_temporal.append(dt_str.str.to_date())
|
|
elif tp == Time:
|
|
str_to_temporal.append(F.col(nm).str.to_time())
|
|
else:
|
|
updated_overrides[nm] = tp
|
|
|
|
if str_to_temporal:
|
|
lf = lf.with_columns(*str_to_temporal)
|
|
if updated_overrides:
|
|
lf = lf.cast(dtypes=updated_overrides)
|
|
df = lf.collect()
|
|
|
|
# standardise on string dtype for null columns in empty frame
|
|
if df.is_empty():
|
|
df = df.cast({Null: String})
|
|
|
|
# further refine dtypes
|
|
type_checks = []
|
|
for c, dtype in df.schema.items():
|
|
if c not in schema_overrides:
|
|
# may read integer data as float; cast back to int where possible.
|
|
if dtype in FLOAT_DTYPES:
|
|
check_cast = [
|
|
F.col(c).floor().eq_missing(F.col(c)) & F.col(c).is_not_nan(),
|
|
F.col(c).cast(Int64),
|
|
]
|
|
type_checks.append(check_cast)
|
|
# do a similar check for datetime columns that have only 00:00:00 times.
|
|
elif dtype == Datetime:
|
|
check_cast = [
|
|
F.col(c).dt.time().eq(time(0, 0, 0)),
|
|
F.col(c).cast(Date),
|
|
]
|
|
type_checks.append(check_cast)
|
|
|
|
if type_checks:
|
|
apply_cast = df.select(d[0].all(ignore_nulls=True) for d in type_checks).row(0)
|
|
if downcast := [
|
|
cast for apply, (_, cast) in zip(apply_cast, type_checks) if apply
|
|
]:
|
|
df = df.with_columns(*downcast)
|
|
|
|
return df
|
|
|
|
|
|
def _read_spreadsheet_openpyxl(
|
|
parser: Any,
|
|
*,
|
|
sheet_name: str | None,
|
|
read_options: dict[str, Any],
|
|
schema_overrides: SchemaDict | None,
|
|
columns: Sequence[int] | Sequence[str] | None,
|
|
table_name: str | None = None,
|
|
drop_empty_rows: bool,
|
|
drop_empty_cols: bool,
|
|
raise_if_empty: bool,
|
|
) -> pl.DataFrame:
|
|
"""Use the 'openpyxl' library to read data from the given worksheet."""
|
|
infer_schema_length = read_options.pop("infer_schema_length", None)
|
|
has_header = read_options.pop("has_header", True)
|
|
schema_overrides = schema_overrides or {}
|
|
no_inference = infer_schema_length == 0
|
|
header: list[str | None] = []
|
|
|
|
if table_name and not sheet_name:
|
|
sheet_name, n_tables = None, 0
|
|
for sheet in parser.worksheets:
|
|
n_tables += 1
|
|
if table_name in sheet.tables:
|
|
ws, sheet_name = sheet, sheet.title
|
|
break
|
|
if sheet_name is None:
|
|
msg = (
|
|
f"table named {table_name!r} not found in sheet {sheet_name!r}"
|
|
if n_tables
|
|
else f"no named tables found in sheet {sheet_name!r} (looking for {table_name!r})"
|
|
)
|
|
raise RuntimeError(msg)
|
|
else:
|
|
ws = parser[sheet_name]
|
|
|
|
# prefer detection of actual table objects; otherwise read
|
|
# data in the used worksheet range, dropping null columns
|
|
if tables := getattr(ws, "tables", None):
|
|
table = tables[table_name] if table_name else next(iter(tables.values()))
|
|
rows = list(ws[table.ref])
|
|
if not rows:
|
|
return _empty_frame(raise_if_empty)
|
|
if has_header:
|
|
header.extend(cell.value for cell in rows.pop(0))
|
|
else:
|
|
header.extend(f"column_{n}" for n in range(1, len(rows[0]) + 1))
|
|
if table.totalsRowCount:
|
|
rows = rows[: -table.totalsRowCount]
|
|
rows_iter = rows
|
|
elif table_name:
|
|
msg = f"no named tables found in sheet {sheet_name!r} (looking for {table_name!r})"
|
|
raise RuntimeError(msg)
|
|
else:
|
|
if not has_header:
|
|
if not (rows_iter := list(ws.iter_rows())):
|
|
return _empty_frame(raise_if_empty)
|
|
n_cols = len(rows_iter[0])
|
|
header = [f"column_{n}" for n in range(1, n_cols + 1)]
|
|
else:
|
|
rows_iter = ws.iter_rows()
|
|
for row in rows_iter:
|
|
row_values = [cell.value for cell in row]
|
|
if any(v is not None for v in row_values):
|
|
header.extend(row_values)
|
|
break
|
|
|
|
dtype = String if no_inference else None
|
|
series_data = []
|
|
for name, column_data in zip(header, zip(*rows_iter)):
|
|
if name or not drop_empty_cols:
|
|
values = [cell.value for cell in column_data]
|
|
if no_inference or (dtype := schema_overrides.get(name)) == String: # type: ignore[assignment,arg-type]
|
|
# note: if we initialise the series with mixed-type data (eg: str/int)
|
|
# then the non-strings will become null, so we handle the cast here
|
|
values = [str(v) if (v is not None) else v for v in values]
|
|
|
|
if (tp := schema_overrides.get(name)) in (Date, Datetime, Time): # type: ignore[operator,arg-type]
|
|
s = pl.Series(name, values, strict=False)
|
|
if s.dtype == String:
|
|
if tp == Datetime:
|
|
s = s.str.to_datetime(
|
|
time_unit=getattr(tp, "time_unit", None),
|
|
time_zone=getattr(tp, "time_zone", None),
|
|
)
|
|
elif tp == Date:
|
|
s = s.str.replace(
|
|
r"(?:[ T]00:00:00(?:\.0+)?)$", ""
|
|
).str.to_date()
|
|
elif tp == Time:
|
|
s = s.str.to_time()
|
|
else:
|
|
s = pl.Series(name, values, dtype=dtype, strict=False)
|
|
series_data.append(s)
|
|
|
|
names = deduplicate_names(s.name for s in series_data)
|
|
df = pl.DataFrame(
|
|
dict(zip(names, series_data)),
|
|
schema_overrides=schema_overrides,
|
|
infer_schema_length=infer_schema_length,
|
|
strict=False,
|
|
)
|
|
df = _drop_null_data(
|
|
df,
|
|
raise_if_empty=raise_if_empty,
|
|
drop_empty_rows=drop_empty_rows,
|
|
drop_empty_cols=drop_empty_cols,
|
|
)
|
|
df = _reorder_columns(df, columns)
|
|
return df
|
|
|
|
|
|
def _read_spreadsheet_xlsx2csv(
|
|
parser: Any,
|
|
*,
|
|
sheet_name: str | None,
|
|
read_options: dict[str, Any],
|
|
schema_overrides: SchemaDict | None,
|
|
columns: Sequence[int] | Sequence[str] | None,
|
|
table_name: str | None = None,
|
|
drop_empty_rows: bool,
|
|
drop_empty_cols: bool,
|
|
raise_if_empty: bool,
|
|
) -> pl.DataFrame:
|
|
"""Use the 'xlsx2csv' library to read data from the given worksheet."""
|
|
if table_name:
|
|
msg = "the `table_name` parameter is not supported by the 'xlsx2csv' engine"
|
|
raise ValueError(msg)
|
|
|
|
csv_buffer = StringIO()
|
|
with warnings.catch_warnings():
|
|
# xlsx2csv version 0.8.4 throws a DeprecationWarning in Python 3.13
|
|
# https://github.com/dilshod/xlsx2csv/pull/287
|
|
warnings.filterwarnings("ignore", category=DeprecationWarning)
|
|
parser.convert(outfile=csv_buffer, sheetname=sheet_name)
|
|
|
|
read_options.setdefault("truncate_ragged_lines", True)
|
|
if columns:
|
|
read_options["columns"] = columns
|
|
|
|
cast_to_boolean = []
|
|
if schema_overrides:
|
|
for col, dtype in schema_overrides.items():
|
|
if dtype == Boolean:
|
|
schema_overrides[col] = UInt8 # type: ignore[index]
|
|
cast_to_boolean.append(F.col(col).cast(Boolean))
|
|
|
|
df = _csv_buffer_to_frame(
|
|
csv_buffer,
|
|
separator=",",
|
|
read_options=read_options,
|
|
schema_overrides=schema_overrides,
|
|
raise_if_empty=raise_if_empty,
|
|
drop_empty_rows=drop_empty_rows,
|
|
drop_empty_cols=drop_empty_cols,
|
|
)
|
|
if cast_to_boolean:
|
|
df = df.with_columns(*cast_to_boolean)
|
|
|
|
df = df.rename(_standardize_duplicates)
|
|
return _reorder_columns(df, columns)
|