Skip to content

Commit 07502c1

Browse files
committed
Merge branch 'main' into minor/remove-deprecated
2 parents d8d6441 + ecd14c1 commit 07502c1

File tree

11 files changed

+1472
-54
lines changed

11 files changed

+1472
-54
lines changed

.ai/skills/check-upstream/SKILL.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ The user may specify an area via `$ARGUMENTS`. If no area is specified or "all"
109109
**Evaluated and not requiring separate Python exposure:**
110110
- `show_limit` — already covered by `DataFrame.show()`, which provides the same functionality with a simpler API
111111
- `with_param_values` — already covered by the `param_values` argument on `SessionContext.sql()`, which accomplishes the same thing more robustly
112+
- `union_by_name_distinct` — already covered by `DataFrame.union_by_name(distinct=True)`, which provides a more Pythonic API
112113

113114
**How to check:**
114115
1. Fetch the upstream DataFrame documentation page listing all methods

conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import datafusion as dfn
2121
import numpy as np
22+
import pyarrow as pa
2223
import pytest
2324
from datafusion import col, lit
2425
from datafusion import functions as F
@@ -29,6 +30,7 @@ def _doctest_namespace(doctest_namespace: dict) -> None:
2930
"""Add common imports to the doctest namespace."""
3031
doctest_namespace["dfn"] = dfn
3132
doctest_namespace["np"] = np
33+
doctest_namespace["pa"] = pa
3234
doctest_namespace["col"] = col
3335
doctest_namespace["lit"] = lit
3436
doctest_namespace["F"] = F

crates/core/src/context.rs

Lines changed: 135 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
2828
use datafusion::arrow::pyarrow::PyArrowType;
2929
use datafusion::arrow::record_batch::RecordBatch;
3030
use datafusion::catalog::{CatalogProvider, CatalogProviderList, TableProviderFactory};
31-
use datafusion::common::{ScalarValue, TableReference, exec_err};
31+
use datafusion::common::{DFSchema, ScalarValue, TableReference, exec_err};
3232
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
3333
use datafusion::datasource::file_format::parquet::ParquetFormat;
3434
use datafusion::datasource::listing::{
@@ -41,7 +41,7 @@ use datafusion::execution::context::{
4141
};
4242
use datafusion::execution::disk_manager::DiskManagerMode;
4343
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, UnboundedMemoryPool};
44-
use datafusion::execution::options::ReadOptions;
44+
use datafusion::execution::options::{ArrowReadOptions, ReadOptions};
4545
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
4646
use datafusion::execution::session_state::SessionStateBuilder;
4747
use datafusion::prelude::{
@@ -60,7 +60,7 @@ use datafusion_python_util::{
6060
};
6161
use object_store::ObjectStore;
6262
use pyo3::IntoPyObjectExt;
63-
use pyo3::exceptions::{PyKeyError, PyValueError};
63+
use pyo3::exceptions::{PyKeyError, PyRuntimeError, PyValueError};
6464
use pyo3::prelude::*;
6565
use pyo3::types::{PyCapsule, PyDict, PyList, PyTuple};
6666
use url::Url;
@@ -70,11 +70,13 @@ use crate::catalog::{
7070
PyCatalog, PyCatalogList, RustWrappedPyCatalogProvider, RustWrappedPyCatalogProviderList,
7171
};
7272
use crate::common::data_type::PyScalarValue;
73+
use crate::common::df_schema::PyDFSchema;
7374
use crate::dataframe::PyDataFrame;
7475
use crate::dataset::Dataset;
7576
use crate::errors::{
7677
PyDataFusionError, PyDataFusionResult, from_datafusion_error, py_datafusion_err,
7778
};
79+
use crate::expr::PyExpr;
7880
use crate::expr::sort_expr::PySortExpr;
7981
use crate::options::PyCsvReadOptions;
8082
use crate::physical_plan::PyExecutionPlan;
@@ -434,11 +436,25 @@ impl PySessionContext {
434436
&upstream_host
435437
};
436438
let url_string = format!("{scheme}{derived_host}");
437-
let url = Url::parse(&url_string).unwrap();
439+
let url = Url::parse(&url_string).map_err(|e| PyValueError::new_err(e.to_string()))?;
438440
self.ctx.runtime_env().register_object_store(&url, store);
439441
Ok(())
440442
}
441443

444+
/// Deregister an object store with the given url
445+
#[pyo3(signature = (scheme, host=None))]
446+
pub fn deregister_object_store(
447+
&self,
448+
scheme: &str,
449+
host: Option<&str>,
450+
) -> PyDataFusionResult<()> {
451+
let host = host.unwrap_or("");
452+
let url_string = format!("{scheme}{host}");
453+
let url = Url::parse(&url_string).map_err(|e| PyDataFusionError::Common(e.to_string()))?;
454+
self.ctx.runtime_env().deregister_object_store(&url)?;
455+
Ok(())
456+
}
457+
442458
#[allow(clippy::too_many_arguments)]
443459
#[pyo3(signature = (name, path, table_partition_cols=vec![],
444460
file_extension=".parquet",
@@ -492,6 +508,10 @@ impl PySessionContext {
492508
self.ctx.register_udtf(&name, func);
493509
}
494510

511+
pub fn deregister_udtf(&self, name: &str) {
512+
self.ctx.deregister_udtf(name);
513+
}
514+
495515
#[pyo3(signature = (query, options=None, param_values=HashMap::default(), param_strings=HashMap::default()))]
496516
pub fn sql_with_options(
497517
&self,
@@ -956,6 +976,39 @@ impl PySessionContext {
956976
Ok(())
957977
}
958978

979+
#[pyo3(signature = (name, path, schema=None, file_extension=".arrow", table_partition_cols=vec![]))]
980+
pub fn register_arrow(
981+
&self,
982+
name: &str,
983+
path: &str,
984+
schema: Option<PyArrowType<Schema>>,
985+
file_extension: &str,
986+
table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
987+
py: Python,
988+
) -> PyDataFusionResult<()> {
989+
let mut options = ArrowReadOptions::default().table_partition_cols(
990+
table_partition_cols
991+
.into_iter()
992+
.map(|(name, ty)| (name, ty.0))
993+
.collect::<Vec<(String, DataType)>>(),
994+
);
995+
options.file_extension = file_extension;
996+
options.schema = schema.as_ref().map(|x| &x.0);
997+
998+
let result = self.ctx.register_arrow(name, path, options);
999+
wait_for_future(py, result)??;
1000+
Ok(())
1001+
}
1002+
1003+
pub fn register_batch(
1004+
&self,
1005+
name: &str,
1006+
batch: PyArrowType<RecordBatch>,
1007+
) -> PyDataFusionResult<()> {
1008+
self.ctx.register_batch(name, batch.0)?;
1009+
Ok(())
1010+
}
1011+
9591012
// Registers a PyArrow.Dataset
9601013
pub fn register_dataset(
9611014
&self,
@@ -975,16 +1028,28 @@ impl PySessionContext {
9751028
Ok(())
9761029
}
9771030

1031+
pub fn deregister_udf(&self, name: &str) {
1032+
self.ctx.deregister_udf(name);
1033+
}
1034+
9781035
pub fn register_udaf(&self, udaf: PyAggregateUDF) -> PyResult<()> {
9791036
self.ctx.register_udaf(udaf.function);
9801037
Ok(())
9811038
}
9821039

1040+
pub fn deregister_udaf(&self, name: &str) {
1041+
self.ctx.deregister_udaf(name);
1042+
}
1043+
9831044
pub fn register_udwf(&self, udwf: PyWindowUDF) -> PyResult<()> {
9841045
self.ctx.register_udwf(udwf.function);
9851046
Ok(())
9861047
}
9871048

1049+
pub fn deregister_udwf(&self, name: &str) {
1050+
self.ctx.deregister_udwf(name);
1051+
}
1052+
9881053
#[pyo3(signature = (name="datafusion"))]
9891054
pub fn catalog(&self, py: Python, name: &str) -> PyResult<Py<PyAny>> {
9901055
let catalog = self.ctx.catalog(name).ok_or(PyKeyError::new_err(format!(
@@ -1035,6 +1100,49 @@ impl PySessionContext {
10351100
self.ctx.session_id()
10361101
}
10371102

1103+
pub fn session_start_time(&self) -> String {
1104+
self.ctx.session_start_time().to_rfc3339()
1105+
}
1106+
1107+
pub fn enable_ident_normalization(&self) -> bool {
1108+
self.ctx.enable_ident_normalization()
1109+
}
1110+
1111+
pub fn parse_sql_expr(&self, sql: &str, schema: PyDFSchema) -> PyDataFusionResult<PyExpr> {
1112+
let df_schema: DFSchema = schema.into();
1113+
Ok(self.ctx.parse_sql_expr(sql, &df_schema)?.into())
1114+
}
1115+
1116+
pub fn execute_logical_plan(
1117+
&self,
1118+
plan: PyLogicalPlan,
1119+
py: Python,
1120+
) -> PyDataFusionResult<PyDataFrame> {
1121+
let df = wait_for_future(
1122+
py,
1123+
self.ctx.execute_logical_plan(plan.plan.as_ref().clone()),
1124+
)??;
1125+
Ok(PyDataFrame::new(df))
1126+
}
1127+
1128+
pub fn refresh_catalogs(&self, py: Python) -> PyDataFusionResult<()> {
1129+
wait_for_future(py, self.ctx.refresh_catalogs())??;
1130+
Ok(())
1131+
}
1132+
1133+
pub fn remove_optimizer_rule(&self, name: &str) -> bool {
1134+
self.ctx.remove_optimizer_rule(name)
1135+
}
1136+
1137+
pub fn table_provider(&self, name: &str, py: Python) -> PyResult<PyTable> {
1138+
let provider = wait_for_future(py, self.ctx.table_provider(name))
1139+
// Outer error: runtime/async failure
1140+
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?
1141+
// Inner error: table not found
1142+
.map_err(|e| PyKeyError::new_err(e.to_string()))?;
1143+
Ok(PyTable { table: provider })
1144+
}
1145+
10381146
#[allow(clippy::too_many_arguments)]
10391147
#[pyo3(signature = (path, schema=None, schema_infer_max_records=1000, file_extension=".json", table_partition_cols=vec![], file_compression_type=None))]
10401148
pub fn read_json(
@@ -1169,6 +1277,29 @@ impl PySessionContext {
11691277
Ok(PyDataFrame::new(df))
11701278
}
11711279

1280+
#[pyo3(signature = (path, schema=None, file_extension=".arrow", table_partition_cols=vec![]))]
1281+
pub fn read_arrow(
1282+
&self,
1283+
path: &str,
1284+
schema: Option<PyArrowType<Schema>>,
1285+
file_extension: &str,
1286+
table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
1287+
py: Python,
1288+
) -> PyDataFusionResult<PyDataFrame> {
1289+
let mut options = ArrowReadOptions::default().table_partition_cols(
1290+
table_partition_cols
1291+
.into_iter()
1292+
.map(|(name, ty)| (name, ty.0))
1293+
.collect::<Vec<(String, DataType)>>(),
1294+
);
1295+
options.file_extension = file_extension;
1296+
options.schema = schema.as_ref().map(|x| &x.0);
1297+
1298+
let result = self.ctx.read_arrow(path, options);
1299+
let df = wait_for_future(py, result)??;
1300+
Ok(PyDataFrame::new(df))
1301+
}
1302+
11721303
pub fn read_table(&self, table: Bound<'_, PyAny>) -> PyDataFusionResult<PyDataFrame> {
11731304
let session = self.clone().into_bound_py_any(table.py())?;
11741305
let table = PyTable::new(table, Some(session))?;

0 commit comments

Comments
 (0)