use std::num::NonZeroU16;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use anyhow::{ensure, Context, Result};
use clap::{Parser, ValueEnum};
use dsi_progress_logger::{ProgressLog, ProgressLogger};
use rayon::prelude::*;
use sux::prelude::{AtomicBitVec, BitVec};
use swh_graph::graph::*;
use swh_graph::java_compat::mph::gov::GOVMPH;
use swh_graph::utils::shuffle::par_iter_shuffled_range;
use swh_graph::SWHType;
use swh_graph::utils::dataset_writer::{
ParallelDatasetWriter, ParquetTableWriter, PartitionedTableWriter,
};
use swh_graph_provenance::node_dataset::{schema, writer_properties, NodeTableBuilder};
#[derive(ValueEnum, Debug, Clone, Copy)]
enum NodeFilter {
Heads,
All,
}
#[derive(Parser, Debug)]
struct Args {
graph_path: PathBuf,
#[arg(short, long, action = clap::ArgAction::Count)]
verbose: u8,
#[arg(value_enum)]
#[arg(long, default_value_t = NodeFilter::Heads)]
node_filter: NodeFilter,
#[arg(long, default_value_t = 0)]
num_partitions: u16,
#[arg(long, default_value_t = 1_000_000)] row_group_size: usize,
#[arg(long, default_value_t = 0.05)] bloom_fpp: f64,
#[arg(long, default_value_t = 1_000_000)] bloom_ndv: u64,
#[arg(long)]
nodes_out: PathBuf,
}
pub fn main() -> Result<()> {
let args = Args::parse();
ensure!(
args.num_partitions == 0 || args.num_partitions.is_power_of_two(),
"--num-partitions must be 0 or a power of 2"
);
ensure!(
args.num_partitions <= 256,
"--num-partitions cannot be greater than 256"
);
let num_partitions: Option<NonZeroU16> = args.num_partitions.try_into().ok();
stderrlog::new()
.verbosity(args.verbose as usize)
.timestamp(stderrlog::Timestamp::Second)
.init()
.context("While Initializing the stderrlog")?;
log::info!("Loading graph");
let graph = swh_graph::graph::load_bidirectional(args.graph_path)
.context("Could not load graph")?
.init_properties()
.load_properties(|props| props.load_maps::<GOVMPH>())
.context("Could not load maps")?;
log::info!("Graph loaded.");
let mut dataset_writer = ParallelDatasetWriter::new_with_schema(
args.nodes_out,
(
"partition".to_owned(), num_partitions,
(
Arc::new(schema()),
writer_properties(&graph)
.set_column_bloom_filter_fpp("sha1_git".into(), args.bloom_fpp)
.set_column_bloom_filter_ndv("sha1_git".into(), args.bloom_ndv)
.build(),
),
),
)?;
dataset_writer.flush_threshold = Some(args.row_group_size);
let reachable_nodes = find_reachable_nodes(&graph, args.node_filter)?;
write_reachable_nodes(
&graph,
dataset_writer,
num_partitions.unwrap_or(NonZeroU16::new(1).unwrap()),
&reachable_nodes,
)?;
Ok(())
}
fn find_reachable_nodes<G>(graph: &G, node_filter: NodeFilter) -> Result<BitVec>
where
G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties + Sync,
<G as SwhGraphWithProperties>::Maps: swh_graph::properties::Maps,
{
let mut pl = ProgressLogger::default();
pl.item_name("node");
pl.display_memory(true);
pl.expected_updates(Some(graph.num_nodes()));
pl.start("Listing reachable contents and directories...");
let pl = Arc::new(Mutex::new(pl));
let reachable_from_heads = AtomicBitVec::new(graph.num_nodes());
par_iter_shuffled_range(0..graph.num_nodes()).try_for_each(|root| -> Result<()> {
let search_in_node = match node_filter {
NodeFilter::All => true,
NodeFilter::Heads => swh_graph_provenance::filters::is_head(graph, root),
};
if search_in_node {
let mut stack = vec![root];
while let Some(node) = stack.pop() {
reachable_from_heads.set(node, true, Ordering::Relaxed);
for succ in graph.successors(node) {
if reachable_from_heads.get(succ, Ordering::Relaxed) {
} else if let SWHType::Content | SWHType::Directory =
graph.properties().node_type(succ)
{
stack.push(succ);
}
}
}
}
if root % 32768 == 0 {
pl.lock().unwrap().update_with_count(32768);
}
Ok(())
})?;
pl.lock().unwrap().done();
Ok(reachable_from_heads.into())
}
fn write_reachable_nodes<G>(
graph: &G,
dataset_writer: ParallelDatasetWriter<
PartitionedTableWriter<ParquetTableWriter<NodeTableBuilder>>,
>,
num_partitions: NonZeroU16,
reachable_nodes: &BitVec,
) -> Result<()>
where
G: SwhGraphWithProperties + Sync,
<G as SwhGraphWithProperties>::Maps: swh_graph::properties::Maps,
{
assert!(num_partitions.is_power_of_two());
let mut pl = ProgressLogger::default();
pl.item_name("node");
pl.display_memory(true);
pl.expected_updates(Some(graph.num_nodes()));
pl.start("Writing list of reachable nodes...");
let pl = Arc::new(Mutex::new(pl));
let num_chunks = 96;
let chunk_size = graph.num_nodes().div_ceil(num_chunks);
(0..graph.num_nodes())
.into_par_iter()
.by_uniform_blocks(chunk_size)
.try_for_each_init(
|| dataset_writer.get_thread_writer().unwrap(),
|writer, node| -> Result<()> {
if reachable_nodes.get(node) {
let swhid = graph.properties().swhid(node);
let partition_id: usize =
(swhid.hash[0] as u16 * num_partitions.get() / 256).into();
writer.partitions()[partition_id]
.builder()?
.add_node(graph, node);
}
if node % 32768 == 0 {
pl.lock().unwrap().update_with_count(32768);
}
Ok(())
},
)?;
log::info!("Flushing writers...");
dataset_writer.close()?;
pl.lock().unwrap().done();
Ok(())
}