use std::sync::Arc;
use anyhow::Result;
use arrow::array::*;
use arrow::datatypes::DataType::*;
use arrow::datatypes::{Field, Schema};
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::properties::EnabledStatistics;
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use swh_graph::graph::{NodeId, SwhGraph, SwhGraphWithProperties};
use swh_graph::SWHType;
use swh_graph::utils::dataset_writer::StructArrayBuilder;
pub fn schema() -> Schema {
Schema::new(vec![
Field::new("id", UInt64, false),
Field::new("type", Dictionary(Int8.into(), Utf8.into()), false),
Field::new("sha1_git", FixedSizeBinary(20), false),
])
}
pub fn writer_properties<G: SwhGraph>(graph: &G) -> WriterPropertiesBuilder {
WriterProperties::builder()
.set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED)
.set_column_compression(
"id".into(),
Compression::ZSTD(ZstdLevel::try_new(3).unwrap()),
)
.set_column_statistics_enabled("id".into(), EnabledStatistics::Page)
.set_column_bloom_filter_enabled("sha1_git".into(), true)
.set_column_statistics_enabled("sha1_git".into(), EnabledStatistics::Chunk)
.set_dictionary_enabled(true)
.set_key_value_metadata(Some(crate::parquet_metadata(graph)))
}
#[derive(Debug)]
pub struct NodeTableBuilder {
ids: UInt64Builder,
types: Int8Builder,
sha1_gits: FixedSizeBinaryBuilder,
}
impl NodeTableBuilder {
pub fn add_node<G>(&mut self, graph: &G, node: NodeId)
where
G: SwhGraphWithProperties,
<G as SwhGraphWithProperties>::Maps: swh_graph::properties::Maps,
{
let swhid = graph.properties().swhid(node);
self.types.append_value(swhid.node_type as i8);
self.sha1_gits
.append_value(swhid.hash)
.expect("Could not append sha1_git");
let node: u64 = node.try_into().expect("Node id overflow u64");
self.ids.append_value(node);
}
}
const SHA1GIT_LEN: i32 = 20;
impl Default for NodeTableBuilder {
fn default() -> Self {
NodeTableBuilder {
ids: Default::default(),
types: Default::default(),
sha1_gits: FixedSizeBinaryBuilder::new(SHA1GIT_LEN),
}
}
}
impl StructArrayBuilder for NodeTableBuilder {
fn len(&self) -> usize {
self.ids.len()
}
fn finish(mut self) -> Result<StructArray> {
let types = self.types.finish();
let types_dictionary = StringArray::from(
SWHType::all()
.into_iter()
.map(|type_| type_.to_str())
.collect::<Vec<_>>(),
);
let types = Arc::new(DictionaryArray::new(types, Arc::new(types_dictionary)));
let ids = Arc::new(self.ids.finish());
let sha1_gits = Arc::new(self.sha1_gits.finish());
let columns: Vec<Arc<dyn Array>> = vec![ids, types, sha1_gits];
Ok(StructArray::new(
schema().fields().clone(),
columns,
None, ))
}
}