1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use polars_core::prelude::*;
pub enum ScanAggregation {
Sum {
column: String,
alias: Option<String>,
},
Min {
column: String,
alias: Option<String>,
},
Max {
column: String,
alias: Option<String>,
},
First {
column: String,
alias: Option<String>,
},
Last {
column: String,
alias: Option<String>,
},
}
impl ScanAggregation {
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "json",
feature = "avro"
))]
pub(crate) fn evaluate_batch(&self, df: &DataFrame) -> Result<Series> {
use ScanAggregation::*;
let s = match self {
Sum { column, .. } => df.column(column)?.sum_as_series(),
Min { column, .. } => df.column(column)?.min_as_series(),
Max { column, .. } => df.column(column)?.max_as_series(),
First { column, .. } => df.column(column)?.head(Some(1)),
Last { column, .. } => df.column(column)?.tail(Some(1)),
};
Ok(s)
}
pub(crate) fn finish(&self, df: &DataFrame) -> Result<Series> {
use ScanAggregation::*;
match self {
Sum { column, alias } => {
let mut s = df.column(column)?.sum_as_series();
if let Some(alias) = alias {
s.rename(alias);
}
Ok(s)
}
Min { column, alias } => {
let mut s = df.column(column)?.min_as_series();
if let Some(alias) = alias {
s.rename(alias);
}
Ok(s)
}
Max { column, alias } => {
let mut s = df.column(column)?.max_as_series();
if let Some(alias) = alias {
s.rename(alias);
}
Ok(s)
}
First { column, alias } => {
let mut s = df.column(column)?.head(Some(1));
if let Some(alias) = alias {
s.rename(alias);
}
Ok(s)
}
Last { column, alias } => {
let mut s = df.column(column)?.tail(Some(1));
if let Some(alias) = alias {
s.rename(alias);
}
Ok(s)
}
}
}
}
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "json",
feature = "avro"
))]
pub(crate) fn apply_aggregations(
df: &mut DataFrame,
aggregate: Option<&[ScanAggregation]>,
) -> Result<()> {
if let Some(aggregate) = aggregate {
let cols = aggregate
.iter()
.map(|scan_agg| scan_agg.evaluate_batch(df))
.collect::<Result<_>>()?;
if cfg!(debug_assertions) {
*df = DataFrame::new(cols).unwrap();
} else {
*df = DataFrame::new_no_checks(cols)
}
}
Ok(())
}