1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use polars_core::prelude::*;

pub enum ScanAggregation {
    Sum {
        column: String,
        alias: Option<String>,
    },
    Min {
        column: String,
        alias: Option<String>,
    },
    Max {
        column: String,
        alias: Option<String>,
    },
    First {
        column: String,
        alias: Option<String>,
    },
    Last {
        column: String,
        alias: Option<String>,
    },
}

impl ScanAggregation {
    /// Evaluate the aggregations per batch.
    #[cfg(any(
        feature = "ipc",
        feature = "parquet",
        feature = "json",
        feature = "avro"
    ))]
    pub(crate) fn evaluate_batch(&self, df: &DataFrame) -> Result<Series> {
        use ScanAggregation::*;
        let s = match self {
            Sum { column, .. } => df.column(column)?.sum_as_series(),
            Min { column, .. } => df.column(column)?.min_as_series(),
            Max { column, .. } => df.column(column)?.max_as_series(),
            First { column, .. } => df.column(column)?.head(Some(1)),
            Last { column, .. } => df.column(column)?.tail(Some(1)),
        };
        Ok(s)
    }

    /// After all batches are concatenated the aggregation is determined for the whole set.
    pub(crate) fn finish(&self, df: &DataFrame) -> Result<Series> {
        use ScanAggregation::*;
        match self {
            Sum { column, alias } => {
                let mut s = df.column(column)?.sum_as_series();
                if let Some(alias) = alias {
                    s.rename(alias);
                }
                Ok(s)
            }
            Min { column, alias } => {
                let mut s = df.column(column)?.min_as_series();
                if let Some(alias) = alias {
                    s.rename(alias);
                }
                Ok(s)
            }
            Max { column, alias } => {
                let mut s = df.column(column)?.max_as_series();
                if let Some(alias) = alias {
                    s.rename(alias);
                }
                Ok(s)
            }
            First { column, alias } => {
                let mut s = df.column(column)?.head(Some(1));
                if let Some(alias) = alias {
                    s.rename(alias);
                }
                Ok(s)
            }
            Last { column, alias } => {
                let mut s = df.column(column)?.tail(Some(1));
                if let Some(alias) = alias {
                    s.rename(alias);
                }
                Ok(s)
            }
        }
    }
}

#[cfg(any(
    feature = "ipc",
    feature = "parquet",
    feature = "json",
    feature = "avro"
))]
pub(crate) fn apply_aggregations(
    df: &mut DataFrame,
    aggregate: Option<&[ScanAggregation]>,
) -> Result<()> {
    if let Some(aggregate) = aggregate {
        let cols = aggregate
            .iter()
            .map(|scan_agg| scan_agg.evaluate_batch(df))
            .collect::<Result<_>>()?;
        if cfg!(debug_assertions) {
            *df = DataFrame::new(cols).unwrap();
        } else {
            *df = DataFrame::new_no_checks(cols)
        }
    }
    Ok(())
}