1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
//! APIs to write to CSV
mod serialize;

use super::super::iterator::StreamingIterator;

use std::io::Write;

// re-export necessary public APIs from csv
pub use csv::{ByteRecord, WriterBuilder};

pub use serialize::*;

use crate::array::Array;
use crate::chunk::Chunk;
use crate::error::Result;

/// Creates serializers that iterate over each column that serializes each item according
/// to `options`.
fn new_serializers<'a, A: AsRef<dyn Array>>(
    columns: &'a [A],
    options: &'a SerializeOptions,
) -> Result<Vec<Box<dyn StreamingIterator<Item = [u8]> + 'a>>> {
    columns
        .iter()
        .map(|column| new_serializer(column.as_ref(), options))
        .collect()
}

/// Serializes [`Chunk`] to a vector of rows.
/// The vector is guaranteed to have `columns.len()` entries.
/// Each `row` is guaranteed to have `columns.array().len()` fields.
pub fn serialize<A: AsRef<dyn Array>>(
    columns: &Chunk<A>,
    options: &SerializeOptions,
) -> Result<Vec<Vec<u8>>> {
    let mut serializers = new_serializers(columns, options)?;

    let mut rows = Vec::with_capacity(columns.len());
    let mut row = vec![];

    // this is where the (expensive) transposition happens: the outer loop is on rows, the inner on columns
    (0..columns.len()).try_for_each(|_| {
        serializers
            .iter_mut()
            // `unwrap` is infalible because `array.len()` equals `Chunk::len`
            .for_each(|iter| {
                let field = iter.next().unwrap();
                row.extend_from_slice(field);
                row.push(options.delimiter);
            });
        if !row.is_empty() {
            // replace last delimiter with new line
            let last_byte = row.len() - 1;
            row[last_byte] = b'\n';
            rows.push(row.clone());
            row.clear();
        }
        Result::Ok(())
    })?;

    Ok(rows)
}

/// Writes [`Chunk`] to `writer` according to the serialization options `options`.
pub fn write_chunk<W: Write, A: AsRef<dyn Array>>(
    writer: &mut W,
    columns: &Chunk<A>,
    options: &SerializeOptions,
) -> Result<()> {
    let mut serializers = new_serializers(columns.arrays(), options)?;

    let rows = columns.len();
    let mut row = Vec::with_capacity(columns.arrays().len() * 10);

    // this is where the (expensive) transposition happens: the outer loop is on rows, the inner on columns
    (0..rows).try_for_each(|_| {
        serializers
            .iter_mut()
            // `unwrap` is infalible because `array.len()` equals `Chunk::len`
            .for_each(|iter| {
                let field = iter.next().unwrap();
                row.extend_from_slice(field);
                row.push(options.delimiter);
            });
        // replace last delimiter with new line
        let last_byte = row.len() - 1;
        row[last_byte] = b'\n';
        writer.write_all(&row)?;
        row.clear();
        Result::Ok(())
    })?;
    Ok(())
}

/// Writes a CSV header to `writer`
pub fn write_header<W: Write, T>(
    writer: &mut W,
    names: &[T],
    options: &SerializeOptions,
) -> Result<()>
where
    T: AsRef<str>,
{
    let names = names.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
    writer.write_all(
        names
            .join(std::str::from_utf8(&[options.delimiter]).unwrap())
            .as_bytes(),
    )?;
    writer.write_all(&[b'\n'])?;
    Ok(())
}