Function count_paths::queue_nodes
source · pub(crate) fn queue_nodes<G>(
graph: G,
tx: SyncSender<(String, NodeId)>,
batch_size: usize
) -> JoinHandle<Result<()>>where
G: SwhGraphWithProperties + Sync + Send + Clone + 'static,
<G as SwhGraphWithProperties>::Maps: Maps,
Expand description
Reads CSV records from stdin, and queues their SWHIDs and node ids to tx
,
preserving the order.
This is equivalent to:
std::thread::spawn(move || -> Result<()> {
let mut reader = csv::ReaderBuilder::new()
.has_headers(true)
.from_reader(io::stdin());
for record in reader.deserialize() {
let InputRecord { swhid, .. } =
record.with_context(|| format!("Could not deserialize record"))?;
let node = graph
.properties()
.node_id_from_string_swhid(swhid)
.with_context(|| format!("Unknown SWHID: {}", swhid))?;
tx.send((swhid, node))
}
});
but uses inner parallelism as node_id()
could otherwise be a bottleneck on systems
where accessing graph.order
has high latency (network and/or compressed filesystem).
This reduces the runtime from a couple of weeks to less than a day on the 2023-09-06
graph on a ZSTD-compressed ZFS.