use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{Context, Result};
use arrow::datatypes::Schema;
use parquet::arrow::ArrowWriter as ParquetWriter;
use parquet::file::properties::WriterProperties;
use parquet::format::FileMetaData;
use super::{StructArrayBuilder, TableWriter};
pub struct ParquetTableWriter<Builder: Default + StructArrayBuilder> {
path: PathBuf,
pub flush_threshold: usize,
file_writer: Option<ParquetWriter<File>>, builder: Builder,
}
impl<Builder: Default + StructArrayBuilder> TableWriter for ParquetTableWriter<Builder> {
type Schema = (Arc<Schema>, WriterProperties);
type CloseResult = FileMetaData;
fn new(
mut path: PathBuf,
(schema, properties): Self::Schema,
flush_threshold: Option<usize>,
) -> Result<Self> {
path.set_extension("parquet");
let file =
File::create(&path).with_context(|| format!("Could not create {}", path.display()))?;
let file_writer = ParquetWriter::try_new(file, schema.clone(), Some(properties.clone()))
.with_context(|| {
format!(
"Could not create writer for {} with schema {} and properties {:?}",
path.display(),
schema,
properties.clone()
)
})?;
Ok(ParquetTableWriter {
path,
flush_threshold: flush_threshold.unwrap_or(properties.max_row_group_size() * 9 / 10),
file_writer: Some(file_writer),
builder: Builder::default(),
})
}
fn flush(&mut self) -> Result<()> {
let mut tmp = Builder::default();
std::mem::swap(&mut tmp, &mut self.builder);
let struct_array = tmp.finish()?;
let file_writer = self
.file_writer
.as_mut()
.expect("File writer is unexpectedly None");
file_writer
.write(&struct_array.into())
.with_context(|| format!("Could not write to {}", self.path.display()))?;
file_writer
.flush()
.with_context(|| format!("Could not flush to {}", self.path.display()))
}
fn close(mut self) -> Result<FileMetaData> {
self.flush()?;
self.file_writer
.take()
.expect("File writer is unexpectedly None")
.close()
.with_context(|| format!("Could not close {}", self.path.display()))
}
}
impl<Builder: Default + StructArrayBuilder> ParquetTableWriter<Builder> {
pub fn builder(&mut self) -> Result<&mut Builder> {
if self.builder.len() >= self.flush_threshold {
self.flush()?;
}
Ok(&mut self.builder)
}
}
impl<Builder: Default + StructArrayBuilder> Drop for ParquetTableWriter<Builder> {
fn drop(&mut self) {
if self.file_writer.is_some() {
self.flush().unwrap();
self.file_writer
.take()
.unwrap()
.close()
.with_context(|| format!("Could not close {}", self.path.display()))
.unwrap();
}
}
}