Function count_paths::queue_nodes

source ·
pub(crate) fn queue_nodes<G>(
    graph: G,
    tx: SyncSender<(String, NodeId)>,
    batch_size: usize
) -> JoinHandle<Result<()>>where
    G: SwhGraphWithProperties + Sync + Send + Clone + 'static,
    <G as SwhGraphWithProperties>::Maps: Maps,
Expand description

Reads CSV records from stdin, and queues their SWHIDs and node ids to tx, preserving the order.

This is equivalent to:

std::thread::spawn(move || -> Result<()> {
    let mut reader = csv::ReaderBuilder::new()
        .has_headers(true)
        .from_reader(io::stdin());

    for record in reader.deserialize() {
        let InputRecord { swhid, .. } =
            record.with_context(|| format!("Could not deserialize record"))?;
        let node = graph
            .properties()
            .node_id_from_string_swhid(swhid)
            .with_context(|| format!("Unknown SWHID: {}", swhid))?;

        tx.send((swhid, node))
    }
});

but uses inner parallelism as node_id() could otherwise be a bottleneck on systems where accessing graph.order has high latency (network and/or compressed filesystem). This reduces the runtime from a couple of weeks to less than a day on the 2023-09-06 graph on a ZSTD-compressed ZFS.