1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
// Copyright (C) 2024  The Software Heritage developers
// See the AUTHORS file at the top-level directory of this distribution
// License: GNU General Public License version 3, or any later version
// See top-level LICENSE file for more information

use std::num::NonZeroU16;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};

use anyhow::{ensure, Context, Result};
use clap::{Parser, ValueEnum};
use dsi_progress_logger::{ProgressLog, ProgressLogger};
use rayon::prelude::*;
use sux::prelude::{AtomicBitVec, BitVec};

use swh_graph::graph::*;
use swh_graph::java_compat::mph::gov::GOVMPH;
use swh_graph::utils::shuffle::par_iter_shuffled_range;
use swh_graph::SWHType;

use swh_graph::utils::dataset_writer::{
    ParallelDatasetWriter, ParquetTableWriter, PartitionedTableWriter,
};
use swh_graph_provenance::node_dataset::{schema, writer_properties, NodeTableBuilder};

#[derive(ValueEnum, Debug, Clone, Copy)]
enum NodeFilter {
    /// All releases, and only revisions pointed by either a release or a snapshot
    Heads,
    /// All releases and all revisions.
    All,
}

#[derive(Parser, Debug)]
/** Writes the list of nodes reachable from a 'head' revision or a release.
 */
struct Args {
    graph_path: PathBuf,
    #[arg(short, long, action = clap::ArgAction::Count)]
    verbose: u8,
    #[arg(value_enum)]
    #[arg(long, default_value_t = NodeFilter::Heads)]
    /// Subset of revisions and releases to traverse from
    node_filter: NodeFilter,
    #[arg(long, default_value_t = 0)]
    /// Number of subfolders (Hive partitions) to store Parquet files in.
    ///
    /// Rows are partitioned across these folders based on the high bits of the object hash,
    /// to use Parquet statistics for row group filtering.
    ///
    /// RAM usage and number of written files is proportional to this value.
    num_partitions: u16,
    #[arg(long, default_value_t = 1_000_000)] // Parquet's default max_row_group_size
    /// Number of rows written at once, forming a row group on disk.
    ///
    /// Higher values reduce index scans, but lower values reduce time wasted per
    /// false positive in the probabilitistic filters.
    row_group_size: usize,
    #[arg(long, default_value_t = 0.05)] // Parquet's default
    /// false positive probability for 'sha1_git' Bloom filter
    bloom_fpp: f64,
    #[arg(long, default_value_t = 1_000_000)] // Parquet's default
    /// number of distinct values for 'sha1_git' Bloom filter
    bloom_ndv: u64,
    #[arg(long)]
    /// Directory to write the list of nodes to
    nodes_out: PathBuf,
}

pub fn main() -> Result<()> {
    let args = Args::parse();

    ensure!(
        args.num_partitions == 0 || args.num_partitions.is_power_of_two(),
        "--num-partitions must be 0 or a power of 2"
    );
    ensure!(
        args.num_partitions <= 256,
        "--num-partitions cannot be greater than 256"
    );
    let num_partitions: Option<NonZeroU16> = args.num_partitions.try_into().ok();

    stderrlog::new()
        .verbosity(args.verbose as usize)
        .timestamp(stderrlog::Timestamp::Second)
        .init()
        .context("While Initializing the stderrlog")?;

    log::info!("Loading graph");
    let graph = swh_graph::graph::load_bidirectional(args.graph_path)
        .context("Could not load graph")?
        .init_properties()
        .load_properties(|props| props.load_maps::<GOVMPH>())
        .context("Could not load maps")?;
    log::info!("Graph loaded.");

    let mut dataset_writer = ParallelDatasetWriter::new_with_schema(
        args.nodes_out,
        (
            "partition".to_owned(), // Partition column name
            num_partitions,
            (
                Arc::new(schema()),
                writer_properties(&graph)
                    .set_column_bloom_filter_fpp("sha1_git".into(), args.bloom_fpp)
                    .set_column_bloom_filter_ndv("sha1_git".into(), args.bloom_ndv)
                    .build(),
            ),
        ),
    )?;

    // We write at most one row per call to `dataset_writer.builder()`, so every row
    // group will be exactly this size.
    dataset_writer.flush_threshold = Some(args.row_group_size);

    let reachable_nodes = find_reachable_nodes(&graph, args.node_filter)?;

    write_reachable_nodes(
        &graph,
        dataset_writer,
        num_partitions.unwrap_or(NonZeroU16::new(1).unwrap()),
        &reachable_nodes,
    )?;

    Ok(())
}

fn find_reachable_nodes<G>(graph: &G, node_filter: NodeFilter) -> Result<BitVec>
where
    G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties + Sync,
    <G as SwhGraphWithProperties>::Maps: swh_graph::properties::Maps,
{
    let mut pl = ProgressLogger::default();
    pl.item_name("node");
    pl.display_memory(true);
    pl.expected_updates(Some(graph.num_nodes()));
    pl.start("Listing reachable contents and directories...");
    let pl = Arc::new(Mutex::new(pl));

    let reachable_from_heads = AtomicBitVec::new(graph.num_nodes());

    par_iter_shuffled_range(0..graph.num_nodes()).try_for_each(|root| -> Result<()> {
        let search_in_node = match node_filter {
            NodeFilter::All => true,
            NodeFilter::Heads => swh_graph_provenance::filters::is_head(graph, root),
        };

        if search_in_node {
            let mut stack = vec![root];

            while let Some(node) = stack.pop() {
                reachable_from_heads.set(node, true, Ordering::Relaxed);
                for succ in graph.successors(node) {
                    if reachable_from_heads.get(succ, Ordering::Relaxed) {
                        // Already visited, either by this DFS or an other one
                    } else if let SWHType::Content | SWHType::Directory =
                        graph.properties().node_type(succ)
                    {
                        stack.push(succ);
                    }
                }
            }
        }
        if root % 32768 == 0 {
            pl.lock().unwrap().update_with_count(32768);
        }
        Ok(())
    })?;

    pl.lock().unwrap().done();

    Ok(reachable_from_heads.into())
}

fn write_reachable_nodes<G>(
    graph: &G,
    dataset_writer: ParallelDatasetWriter<
        PartitionedTableWriter<ParquetTableWriter<NodeTableBuilder>>,
    >,
    num_partitions: NonZeroU16,
    reachable_nodes: &BitVec,
) -> Result<()>
where
    G: SwhGraphWithProperties + Sync,
    <G as SwhGraphWithProperties>::Maps: swh_graph::properties::Maps,
{
    assert!(num_partitions.is_power_of_two());

    let mut pl = ProgressLogger::default();
    pl.item_name("node");
    pl.display_memory(true);
    pl.expected_updates(Some(graph.num_nodes()));
    pl.start("Writing list of reachable nodes...");
    let pl = Arc::new(Mutex::new(pl));

    // Split into a small number of chunks. This causes the node ids to form long
    // monotonically increasing sequences in the output dataset, which makes them
    // easy to index using Parquet/ORC chunk statistics. And should compress better
    // with delta-encoding.
    // However, we still want this loop (and readers) to parallelize nicely,
    // so the number of chunks cannot be too low either.
    let num_chunks = 96;
    let chunk_size = graph.num_nodes().div_ceil(num_chunks);

    (0..graph.num_nodes())
        .into_par_iter()
        .by_uniform_blocks(chunk_size)
        .try_for_each_init(
            || dataset_writer.get_thread_writer().unwrap(),
            |writer, node| -> Result<()> {
                if reachable_nodes.get(node) {
                    let swhid = graph.properties().swhid(node);
                    // Bucket SWHIDs by their high bits, so we can use Parquet's column
                    // statistics to row groups.
                    let partition_id: usize =
                        (swhid.hash[0] as u16 * num_partitions.get() / 256).into();
                    writer.partitions()[partition_id]
                        .builder()?
                        .add_node(graph, node);
                }
                if node % 32768 == 0 {
                    pl.lock().unwrap().update_with_count(32768);
                }
                Ok(())
            },
        )?;

    log::info!("Flushing writers...");
    dataset_writer.close()?;

    pl.lock().unwrap().done();

    Ok(())
}