Skip to content

Commit a29a751

Browse files
sycaigcf-owl-bot[bot]
authored andcommitted
feat: Support dry_run in to_pandas() (#1436)
* feat: Support dry_run in * centralize dry_run logics at block level * fix lint errors * remove unnecessary code * use dataframe for dry_run stats * flatten the job stats to a series * fix lint * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix query job issue * Make pandas surface directly call block._compute_dry_run * type hint update --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 9d438d5 commit a29a751

File tree

8 files changed

+243
-32
lines changed

8 files changed

+243
-32
lines changed

bigframes/core/blocks.py

+101-19
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from __future__ import annotations
2323

2424
import ast
25+
import copy
2526
import dataclasses
2627
import datetime
2728
import functools
@@ -30,6 +31,7 @@
3031
import textwrap
3132
import typing
3233
from typing import (
34+
Any,
3335
Iterable,
3436
List,
3537
Literal,
@@ -49,7 +51,7 @@
4951
import pyarrow as pa
5052

5153
from bigframes import session
52-
import bigframes._config.sampling_options as sampling_options
54+
from bigframes._config import sampling_options
5355
import bigframes.constants
5456
import bigframes.core as core
5557
import bigframes.core.compile.googlesql as googlesql
@@ -535,19 +537,9 @@ def to_pandas(
535537
Returns:
536538
pandas.DataFrame, QueryJob
537539
"""
538-
if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS):
539-
raise NotImplementedError(
540-
f"The downsampling method {sampling_method} is not implemented, "
541-
f"please choose from {','.join(_SAMPLING_METHODS)}."
542-
)
543-
544-
sampling = bigframes.options.sampling.with_max_download_size(max_download_size)
545-
if sampling_method is not None:
546-
sampling = sampling.with_method(sampling_method).with_random_state( # type: ignore
547-
random_state
548-
)
549-
else:
550-
sampling = sampling.with_disabled()
540+
sampling = self._get_sampling_option(
541+
max_download_size, sampling_method, random_state
542+
)
551543

552544
df, query_job = self._materialize_local(
553545
materialize_options=MaterializationOptions(
@@ -559,6 +551,27 @@ def to_pandas(
559551
df.set_axis(self.column_labels, axis=1, copy=False)
560552
return df, query_job
561553

554+
def _get_sampling_option(
555+
self,
556+
max_download_size: Optional[int] = None,
557+
sampling_method: Optional[str] = None,
558+
random_state: Optional[int] = None,
559+
) -> sampling_options.SamplingOptions:
560+
561+
if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS):
562+
raise NotImplementedError(
563+
f"The downsampling method {sampling_method} is not implemented, "
564+
f"please choose from {','.join(_SAMPLING_METHODS)}."
565+
)
566+
567+
sampling = bigframes.options.sampling.with_max_download_size(max_download_size)
568+
if sampling_method is None:
569+
return sampling.with_disabled()
570+
571+
return sampling.with_method(sampling_method).with_random_state( # type: ignore
572+
random_state
573+
)
574+
562575
def try_peek(
563576
self, n: int = 20, force: bool = False, allow_large_results=None
564577
) -> typing.Optional[pd.DataFrame]:
@@ -798,11 +811,73 @@ def split(
798811
return [sliced_block.drop_columns(drop_cols) for sliced_block in sliced_blocks]
799812

800813
def _compute_dry_run(
801-
self, value_keys: Optional[Iterable[str]] = None
802-
) -> bigquery.QueryJob:
814+
self,
815+
value_keys: Optional[Iterable[str]] = None,
816+
*,
817+
ordered: bool = True,
818+
max_download_size: Optional[int] = None,
819+
sampling_method: Optional[str] = None,
820+
random_state: Optional[int] = None,
821+
) -> typing.Tuple[pd.Series, bigquery.QueryJob]:
822+
sampling = self._get_sampling_option(
823+
max_download_size, sampling_method, random_state
824+
)
825+
if sampling.enable_downsampling:
826+
raise NotImplementedError("Dry run with sampling is not supported")
827+
828+
index: List[Any] = []
829+
values: List[Any] = []
830+
831+
index.append("columnCount")
832+
values.append(len(self.value_columns))
833+
index.append("columnDtypes")
834+
values.append(
835+
{
836+
col: self.expr.get_column_type(self.resolve_label_exact_or_error(col))
837+
for col in self.column_labels
838+
}
839+
)
840+
841+
index.append("indexLevel")
842+
values.append(self.index.nlevels)
843+
index.append("indexDtypes")
844+
values.append(self.index.dtypes)
845+
803846
expr = self._apply_value_keys_to_expr(value_keys=value_keys)
804-
query_job = self.session._executor.dry_run(expr)
805-
return query_job
847+
query_job = self.session._executor.dry_run(expr, ordered)
848+
job_api_repr = copy.deepcopy(query_job._properties)
849+
850+
job_ref = job_api_repr["jobReference"]
851+
for key, val in job_ref.items():
852+
index.append(key)
853+
values.append(val)
854+
855+
index.append("jobType")
856+
values.append(job_api_repr["configuration"]["jobType"])
857+
858+
query_config = job_api_repr["configuration"]["query"]
859+
for key in ("destinationTable", "useLegacySql"):
860+
index.append(key)
861+
values.append(query_config.get(key))
862+
863+
query_stats = job_api_repr["statistics"]["query"]
864+
for key in (
865+
"referencedTables",
866+
"totalBytesProcessed",
867+
"cacheHit",
868+
"statementType",
869+
):
870+
index.append(key)
871+
values.append(query_stats.get(key))
872+
873+
index.append("creationTime")
874+
values.append(
875+
pd.Timestamp(
876+
job_api_repr["statistics"]["creationTime"], unit="ms", tz="UTC"
877+
)
878+
)
879+
880+
return pd.Series(values, index=index), query_job
806881

807882
def _apply_value_keys_to_expr(self, value_keys: Optional[Iterable[str]] = None):
808883
expr = self._expr
@@ -2703,11 +2778,18 @@ def to_pandas(
27032778
"Cannot materialize index, as this object does not have an index. Set index column(s) using set_index."
27042779
)
27052780
ordered = ordered if ordered is not None else True
2781+
27062782
df, query_job = self._block.select_columns([]).to_pandas(
2707-
ordered=ordered, allow_large_results=allow_large_results
2783+
ordered=ordered,
2784+
allow_large_results=allow_large_results,
27082785
)
27092786
return df.index, query_job
27102787

2788+
def _compute_dry_run(
2789+
self, *, ordered: bool = True
2790+
) -> Tuple[pd.Series, bigquery.QueryJob]:
2791+
return self._block.select_columns([])._compute_dry_run(ordered=ordered)
2792+
27112793
def resolve_level(self, level: LevelsType) -> typing.Sequence[str]:
27122794
if utils.is_list_like(level):
27132795
levels = list(level)

bigframes/core/indexes/base.py

+37-7
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from __future__ import annotations
1818

1919
import typing
20-
from typing import Hashable, Literal, Optional, Sequence, Union
20+
from typing import Hashable, Literal, Optional, overload, Sequence, Union
2121

2222
import bigframes_vendored.constants as constants
2323
import bigframes_vendored.pandas.core.indexes.base as vendored_pandas_index
@@ -228,15 +228,16 @@ def T(self) -> Index:
228228
return self.transpose()
229229

230230
@property
231-
def query_job(self) -> Optional[bigquery.QueryJob]:
231+
def query_job(self) -> bigquery.QueryJob:
232232
"""BigQuery job metadata for the most recent query.
233233
234234
Returns:
235235
The most recent `QueryJob
236236
<https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob>`_.
237237
"""
238238
if self._query_job is None:
239-
self._query_job = self._block._compute_dry_run()
239+
_, query_job = self._block._compute_dry_run()
240+
self._query_job = query_job
240241
return self._query_job
241242

242243
def __repr__(self) -> str:
@@ -252,7 +253,8 @@ def __repr__(self) -> str:
252253
opts = bigframes.options.display
253254
max_results = opts.max_rows
254255
if opts.repr_mode == "deferred":
255-
return formatter.repr_query_job(self._block._compute_dry_run())
256+
_, dry_run_query_job = self._block._compute_dry_run()
257+
return formatter.repr_query_job(dry_run_query_job)
256258

257259
pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results)
258260
self._query_job = query_job
@@ -490,18 +492,46 @@ def __getitem__(self, key: int) -> typing.Any:
490492
else:
491493
raise NotImplementedError(f"Index key not supported {key}")
492494

493-
def to_pandas(self, *, allow_large_results: Optional[bool] = None) -> pandas.Index:
495+
@overload
496+
def to_pandas(
497+
self,
498+
*,
499+
allow_large_results: Optional[bool] = ...,
500+
dry_run: Literal[False] = ...,
501+
) -> pandas.Index:
502+
...
503+
504+
@overload
505+
def to_pandas(
506+
self, *, allow_large_results: Optional[bool] = ..., dry_run: Literal[True] = ...
507+
) -> pandas.Series:
508+
...
509+
510+
def to_pandas(
511+
self, *, allow_large_results: Optional[bool] = None, dry_run: bool = False
512+
) -> pandas.Index | pandas.Series:
494513
"""Gets the Index as a pandas Index.
495514
496515
Args:
497516
allow_large_results (bool, default None):
498517
If not None, overrides the global setting to allow or disallow large query results
499518
over the default size limit of 10 GB.
519+
dry_run (bool, default False):
520+
If this argument is true, this method will not process the data. Instead, it returns
521+
a Pandas series containing dtype and the amount of bytes to be processed.
500522
501523
Returns:
502-
pandas.Index:
503-
A pandas Index with all of the labels from this Index.
524+
pandas.Index | pandas.Series:
525+
A pandas Index with all of the labels from this Index. If dry run is set to True,
526+
returns a Series containing dry run statistics.
504527
"""
528+
if dry_run:
529+
dry_run_stats, dry_run_job = self._block.index._compute_dry_run(
530+
ordered=True
531+
)
532+
self._query_job = dry_run_job
533+
return dry_run_stats
534+
505535
df, query_job = self._block.index.to_pandas(
506536
ordered=True, allow_large_results=allow_large_results
507537
)

bigframes/dataframe.py

+48-3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
Literal,
3333
Mapping,
3434
Optional,
35+
overload,
3536
Sequence,
3637
Tuple,
3738
Union,
@@ -1594,15 +1595,42 @@ def to_arrow(
15941595
self._set_internal_query_job(query_job)
15951596
return pa_table
15961597

1598+
@overload
1599+
def to_pandas(
1600+
self,
1601+
max_download_size: Optional[int] = ...,
1602+
sampling_method: Optional[str] = ...,
1603+
random_state: Optional[int] = ...,
1604+
*,
1605+
ordered: bool = ...,
1606+
dry_run: Literal[False] = ...,
1607+
allow_large_results: Optional[bool] = ...,
1608+
) -> pandas.DataFrame:
1609+
...
1610+
1611+
@overload
1612+
def to_pandas(
1613+
self,
1614+
max_download_size: Optional[int] = ...,
1615+
sampling_method: Optional[str] = ...,
1616+
random_state: Optional[int] = ...,
1617+
*,
1618+
ordered: bool = ...,
1619+
dry_run: Literal[True] = ...,
1620+
allow_large_results: Optional[bool] = ...,
1621+
) -> pandas.Series:
1622+
...
1623+
15971624
def to_pandas(
15981625
self,
15991626
max_download_size: Optional[int] = None,
16001627
sampling_method: Optional[str] = None,
16011628
random_state: Optional[int] = None,
16021629
*,
16031630
ordered: bool = True,
1631+
dry_run: bool = False,
16041632
allow_large_results: Optional[bool] = None,
1605-
) -> pandas.DataFrame:
1633+
) -> pandas.DataFrame | pandas.Series:
16061634
"""Write DataFrame to pandas DataFrame.
16071635
16081636
Args:
@@ -1624,16 +1652,32 @@ def to_pandas(
16241652
ordered (bool, default True):
16251653
Determines whether the resulting pandas dataframe will be ordered.
16261654
In some cases, unordered may result in a faster-executing query.
1655+
dry_run (bool, default False):
1656+
If this argument is true, this method will not process the data. Instead, it returns
1657+
a Pandas Series containing dry run statistics
16271658
allow_large_results (bool, default None):
16281659
If not None, overrides the global setting to allow or disallow large query results
16291660
over the default size limit of 10 GB.
16301661
16311662
Returns:
16321663
pandas.DataFrame: A pandas DataFrame with all rows and columns of this DataFrame if the
16331664
data_sampling_threshold_mb is not exceeded; otherwise, a pandas DataFrame with
1634-
downsampled rows and all columns of this DataFrame.
1665+
downsampled rows and all columns of this DataFrame. If dry_run is set, a pandas
1666+
Series containing dry run statistics will be returned.
16351667
"""
1668+
16361669
# TODO(orrbradford): Optimize this in future. Potentially some cases where we can return the stored query job
1670+
1671+
if dry_run:
1672+
dry_run_stats, dry_run_job = self._block._compute_dry_run(
1673+
max_download_size=max_download_size,
1674+
sampling_method=sampling_method,
1675+
random_state=random_state,
1676+
ordered=ordered,
1677+
)
1678+
self._set_internal_query_job(dry_run_job)
1679+
return dry_run_stats
1680+
16371681
df, query_job = self._block.to_pandas(
16381682
max_download_size=max_download_size,
16391683
sampling_method=sampling_method,
@@ -1679,7 +1723,8 @@ def to_pandas_batches(
16791723
)
16801724

16811725
def _compute_dry_run(self) -> bigquery.QueryJob:
1682-
return self._block._compute_dry_run()
1726+
_, query_job = self._block._compute_dry_run()
1727+
return query_job
16831728

16841729
def copy(self) -> DataFrame:
16851730
return DataFrame(self._block)

0 commit comments

Comments
 (0)