Skip to content

Commit 2cf2694

Browse files
committed
#819: python bindings for window functions
1 parent bcd1258 commit 2cf2694

2 files changed

Lines changed: 80 additions & 0 deletions

File tree

python/datafusion/tests/test_dataframe.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import pyarrow as pa
1919
import pytest
2020

21+
from datafusion import functions as f
2122
from datafusion import DataFrame, ExecutionContext, column, literal, udf
2223

2324

@@ -117,6 +118,24 @@ def test_join():
117118
assert table.to_pydict() == expected
118119

119120

121+
def test_window_lead(df):
122+
df = df.select(
123+
column("a"),
124+
f.alias(
125+
f.window(
126+
"lead",
127+
[column("b")], order_by=[f.order_by(column("b"))]
128+
),
129+
"a_next",
130+
),
131+
)
132+
133+
table = pa.Table.from_batches(df.collect())
134+
135+
expected = {"a": [1, 2, 3], "a_next": [5, 6, None]}
136+
assert table.to_pydict() == expected
137+
138+
120139
def test_get_dataframe(tmp_path):
121140
ctx = ExecutionContext()
122141

python/src/functions.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use datafusion::physical_plan::{
2323
aggregates::AggregateFunction, functions::BuiltinScalarFunction,
2424
};
2525

26+
use crate::errors;
2627
use crate::expression::PyExpr;
2728

2829
#[pyfunction]
@@ -85,6 +86,63 @@ fn concat_ws(sep: String, args: Vec<PyExpr>) -> PyResult<PyExpr> {
8586
Ok(logical_plan::concat_ws(sep, &args).into())
8687
}
8788

89+
/// Creates a new Sort expression
90+
#[pyfunction]
91+
fn order_by(
92+
expr: PyExpr,
93+
asc: Option<bool>,
94+
nulls_first: Option<bool>,
95+
) -> PyResult<PyExpr> {
96+
Ok(PyExpr {
97+
expr: datafusion::logical_plan::Expr::Sort {
98+
expr: Box::new(expr.expr),
99+
asc: asc.unwrap_or(true),
100+
nulls_first: nulls_first.unwrap_or(true),
101+
},
102+
})
103+
}
104+
105+
/// Creates a new Alias expression
106+
#[pyfunction]
107+
fn alias(expr: PyExpr, name: &str) -> PyResult<PyExpr> {
108+
Ok(PyExpr {
109+
expr: datafusion::logical_plan::Expr::Alias(
110+
Box::new(expr.expr),
111+
String::from(name),
112+
),
113+
})
114+
}
115+
116+
/// Creates a new Window function expression
117+
#[pyfunction]
118+
fn window(
119+
name: &str,
120+
args: Vec<PyExpr>,
121+
partition_by: Option<Vec<PyExpr>>,
122+
order_by: Option<Vec<PyExpr>>,
123+
) -> PyResult<PyExpr> {
124+
use std::str::FromStr;
125+
let fun = datafusion::physical_plan::window_functions::WindowFunction::from_str(name)
126+
.map_err(|e| -> errors::DataFusionError { e.into() })?;
127+
Ok(PyExpr {
128+
expr: datafusion::logical_plan::Expr::WindowFunction {
129+
fun,
130+
args: args.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
131+
partition_by: partition_by
132+
.unwrap_or(vec![])
133+
.into_iter()
134+
.map(|x| x.expr)
135+
.collect::<Vec<_>>(),
136+
order_by: order_by
137+
.unwrap_or(vec![])
138+
.into_iter()
139+
.map(|x| x.expr)
140+
.collect::<Vec<_>>(),
141+
window_frame: None,
142+
},
143+
})
144+
}
145+
88146
macro_rules! scalar_function {
89147
($NAME: ident, $FUNC: ident) => {
90148
scalar_function!($NAME, $FUNC, stringify!($NAME));
@@ -218,6 +276,7 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
218276
m.add_wrapped(wrap_pyfunction!(abs))?;
219277
m.add_wrapped(wrap_pyfunction!(acos))?;
220278
m.add_wrapped(wrap_pyfunction!(approx_distinct))?;
279+
m.add_wrapped(wrap_pyfunction!(alias))?;
221280
m.add_wrapped(wrap_pyfunction!(array))?;
222281
m.add_wrapped(wrap_pyfunction!(ascii))?;
223282
m.add_wrapped(wrap_pyfunction!(asin))?;
@@ -249,6 +308,7 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
249308
m.add_wrapped(wrap_pyfunction!(min))?;
250309
m.add_wrapped(wrap_pyfunction!(now))?;
251310
m.add_wrapped(wrap_pyfunction!(octet_length))?;
311+
m.add_wrapped(wrap_pyfunction!(order_by))?;
252312
m.add_wrapped(wrap_pyfunction!(random))?;
253313
m.add_wrapped(wrap_pyfunction!(regexp_match))?;
254314
m.add_wrapped(wrap_pyfunction!(regexp_replace))?;
@@ -278,5 +338,6 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
278338
m.add_wrapped(wrap_pyfunction!(trim))?;
279339
m.add_wrapped(wrap_pyfunction!(trunc))?;
280340
m.add_wrapped(wrap_pyfunction!(upper))?;
341+
m.add_wrapped(wrap_pyfunction!(window))?;
281342
Ok(())
282343
}

0 commit comments

Comments
 (0)