1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
use parquet2::FallibleStreamingIterator;
use parquet2::{metadata::ColumnDescriptor, write::Compressor};
use crate::{
array::Array,
chunk::Chunk,
datatypes::Schema,
error::{ArrowError, Result},
};
use super::{
array_to_pages, to_parquet_schema, DynIter, DynStreamingIterator, Encoding, RowGroupIter,
SchemaDescriptor, WriteOptions,
};
pub fn row_group_iter<A: AsRef<dyn Array> + 'static + Send + Sync>(
chunk: Chunk<A>,
encodings: Vec<Encoding>,
columns: Vec<ColumnDescriptor>,
options: WriteOptions,
) -> RowGroupIter<'static, ArrowError> {
DynIter::new(
chunk
.into_arrays()
.into_iter()
.zip(columns.into_iter())
.zip(encodings.into_iter())
.map(move |((array, descriptor), encoding)| {
array_to_pages(array.as_ref(), descriptor, options, encoding).map(move |pages| {
let encoded_pages = DynIter::new(pages.map(|x| Ok(x?)));
let compressed_pages =
Compressor::new(encoded_pages, options.compression, vec![])
.map_err(ArrowError::from);
DynStreamingIterator::new(compressed_pages)
})
}),
)
}
pub struct RowGroupIterator<A: AsRef<dyn Array> + 'static, I: Iterator<Item = Result<Chunk<A>>>> {
iter: I,
options: WriteOptions,
parquet_schema: SchemaDescriptor,
encodings: Vec<Encoding>,
}
impl<A: AsRef<dyn Array> + 'static, I: Iterator<Item = Result<Chunk<A>>>> RowGroupIterator<A, I> {
pub fn try_new(
iter: I,
schema: &Schema,
options: WriteOptions,
encodings: Vec<Encoding>,
) -> Result<Self> {
assert_eq!(schema.fields.len(), encodings.len());
let parquet_schema = to_parquet_schema(schema)?;
Ok(Self {
iter,
options,
parquet_schema,
encodings,
})
}
pub fn parquet_schema(&self) -> &SchemaDescriptor {
&self.parquet_schema
}
}
impl<A: AsRef<dyn Array> + 'static + Send + Sync, I: Iterator<Item = Result<Chunk<A>>>> Iterator
for RowGroupIterator<A, I>
{
type Item = Result<(RowGroupIter<'static, ArrowError>, usize)>;
fn next(&mut self) -> Option<Self::Item> {
let options = self.options;
self.iter.next().map(|maybe_chunk| {
let chunk = maybe_chunk?;
let len = chunk.len();
let encodings = self.encodings.clone();
Ok((
row_group_iter(
chunk,
encodings,
self.parquet_schema.columns().to_vec(),
options,
),
len,
))
})
}
}