Skip to content

Commit 94a6745

Browse files
author
Johanna
committed
current state
1 parent 35db878 commit 94a6745

11 files changed

+438
-22
lines changed

src/cli.rs

+39
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,45 @@ pub enum Commands {
147147
#[arg(long, value_parser = valid_cpus, default_value_t = 1)]
148148
threads: usize,
149149
},
150+
/// Uses an inverted index for faster and sparser pairwise comparisons
151+
Inverted {
152+
/// List of input FASTA files
153+
#[arg(long, group = "input", num_args = 1.., value_delimiter = ',')]
154+
seq_files: Option<Vec<String>>,
155+
156+
/// File listing input files (tab separated name, sequences)
157+
#[arg(short, group = "input")]
158+
file_list: Option<String>,
159+
160+
/// Output filename for the merged sketch
161+
#[arg(required = true, short)]
162+
output: String,
163+
164+
/// Ignore reverse complement (all contigs are oriented along same strand)
165+
#[arg(long, default_value_t = DEFAULT_STRAND)]
166+
single_strand: bool,
167+
168+
/// Minimum k-mer count (with reads)
169+
#[arg(long, default_value_t = DEFAULT_MINCOUNT)]
170+
min_count: u16,
171+
172+
/// Minimum k-mer quality (with reads)
173+
#[arg(long, default_value_t = DEFAULT_MINQUAL)]
174+
min_qual: u8,
175+
176+
/// Treat every sequence in an input file as a new sample (aa only)
177+
// TODO: for now, could be extended to dna, but probably no need
178+
#[arg(long, default_value_t = false)]
179+
concat_fasta: bool,
180+
181+
/// Number of CPU threads
182+
#[arg(long, value_parser = valid_cpus, default_value_t = 1)]
183+
threads: usize,
184+
185+
/// aaHash 'level'
186+
#[arg(long, value_enum, default_value_t = DEFAULT_LEVEL)]
187+
level: AaLevel,
188+
},
150189
/// Merge two sketch files (.skm and .skd pair)
151190
Merge {
152191
/// The first .skd (sketch data) file

src/hashing/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -125,4 +125,6 @@ pub trait RollHash: Iterator<Item = u64> {
125125
fn reads(&self) -> bool {
126126
false
127127
}
128+
129+
128130
}

src/lib.rs

+105-3
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,9 @@ pub fn main() -> Result<(), Error> {
118118
rc,
119119
*min_count,
120120
*min_qual,
121+
false,
121122
);
122-
let sketch_vec = MultiSketch::new(&mut sketches, sketch_size, &kmers, seq_type);
123+
let sketch_vec = MultiSketch::new(&mut sketches, sketch_size, &kmers, seq_type, false);
123124
sketch_vec
124125
.save_metadata(output)
125126
.expect("Error saving metadata");
@@ -400,6 +401,106 @@ pub fn main() -> Result<(), Error> {
400401
utils::save_sketch_data(ref_db_name1, ref_db_name2, output)
401402
}
402403

404+
Commands::Inverted {
405+
seq_files,
406+
file_list,
407+
output,
408+
single_strand,
409+
min_count,
410+
min_qual,
411+
concat_fasta,
412+
threads,
413+
level,
414+
} => {
415+
// An extra thread is needed for the writer. This doesn't 'overuse' CPU
416+
check_threads(*threads + 1);
417+
//get input files
418+
log::info!("Getting input files");
419+
let input_files: Vec<(String, String, Option<String>)> =
420+
get_input_list(file_list, seq_files);
421+
log::info!("Parsed {} samples in input list", input_files.len());
422+
// read out sketching information needed to sketch the new files
423+
let kmers: &[usize] = &[2];
424+
let rc = !*single_strand;
425+
let sketch_size = 3;
426+
let seq_type = &HashType::DNA;
427+
if *concat_fasta && matches!(*seq_type, HashType::DNA | HashType::PDB) {
428+
panic!("--concat-fasta currently only supported with --seq-type aa");
429+
}
430+
431+
log::info!(
432+
"Running sketching: k:{:?}; sketch_size:{}; seq:{:?}; threads:{}",
433+
kmers,
434+
sketch_size * u64::BITS as u64,
435+
seq_type,
436+
threads,
437+
);
438+
439+
let seq_type = if let HashType::AA(_) = seq_type {
440+
HashType::AA(level.clone())
441+
} else {
442+
seq_type.clone()
443+
};
444+
// Before sketch_files
445+
println!("Debug: Starting sketch_files with:");
446+
println!("Debug: input_files: {:?}", input_files);
447+
println!("Debug: kmers: {:?}", kmers);
448+
println!("Debug: sketch_size: {}", sketch_size);
449+
println!("Debug: seq_type: {:?}", seq_type);
450+
println!("Debug: rc: {}", rc);
451+
println!("Debug: min_count: {}", min_count);
452+
println!("Debug: min_qual: {}", min_qual);
453+
454+
let mut mini_sketches = sketch_files(
455+
output,
456+
&input_files,
457+
*concat_fasta,
458+
&kmers,
459+
sketch_size,
460+
&seq_type,
461+
rc,
462+
*min_count,
463+
*min_qual,
464+
true,
465+
);
466+
467+
println!("Debug: Sketches after sketch_files:");
468+
for (i, sketch) in mini_sketches.iter().enumerate() {
469+
println!("Debug: Sketch {}", i);
470+
println!("Debug: name: {}", sketch.name());
471+
let usigs = sketch.get_usigs_debug();
472+
println!("Debug: usigs length: {}", usigs.len());
473+
if !usigs.is_empty() {
474+
println!("Debug: first few usigs: {:?}", &usigs[..usigs.len().min(5)]);
475+
}
476+
}
477+
478+
// After creating mini_sketches
479+
println!("Debug: mini_sketches length: {}", mini_sketches.len());
480+
481+
// After creating sketch_vec
482+
let sketch_vec = MultiSketch::new(&mut mini_sketches, sketch_size, &kmers, seq_type, true);
483+
println!("Debug: sketch_vec creation completed");
484+
println!("Debug: sketch_vec.sketch_bins length: {}", MultiSketch::get_sketch_bins_len(&sketch_vec));
485+
println!("Debug: sketch_vec contents: {:?}", sketch_vec);
486+
487+
// Before inverting index
488+
println!("Debug: About to invert index");
489+
let mini_inverted_index = MultiSketch::invert_index(&sketch_vec);
490+
491+
492+
// Write inverted index to a file
493+
sketch_vec
494+
.save_metadata(output)
495+
.expect("Error saving metadata");
496+
match MultiSketch::write_inverted_index_to_file(&mini_inverted_index, output) {
497+
Ok(_) => println!("Successfully wrote inverted index to {}", output),
498+
Err(e) => eprintln!("Error writing inverted index to file: {}", e),
499+
}
500+
501+
Ok(())
502+
}
503+
403504
Commands::Append {
404505
db,
405506
seq_files,
@@ -461,13 +562,14 @@ pub fn main() -> Result<(), Error> {
461562
rc,
462563
*min_count,
463564
*min_qual,
565+
false,
464566
);
465567
let mut db2_metadata =
466-
MultiSketch::new(&mut db2_sketches, sketch_size, kmers, seq_type);
568+
MultiSketch::new(&mut db2_sketches, sketch_size, kmers, seq_type, false);
467569

468570
// save skd data from db1 and from freshly sketched input files
469571
log::info!("Merging and saving sketch data to {}.skd", output);
470-
// let mut output_file = File::create(format!("{}.skd", output))?;
572+
471573
let mut output_file = OpenOptions::new()
472574
.create(true)
473575
.append(true)

src/multisketch.rs

+91-8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use anyhow::bail;
22
use anyhow::Error;
3-
use anyhow::{Result, anyhow};
3+
use anyhow::{anyhow, Result};
44
// use thiserror::Error;
55
use core::panic;
66
use std::fmt;
@@ -15,7 +15,11 @@ use crate::hashing::HashType;
1515
use crate::sketch::{Sketch, BBITS};
1616
use crate::sketch_datafile::SketchArrayFile;
1717

18+
use rayon::prelude::*;
1819
use std::collections::HashSet;
20+
21+
use std::io::Write;
22+
use std::path::Path;
1923
#[derive(Serialize, Deserialize)]
2024
pub struct MultiSketch {
2125
pub sketch_size: u64,
@@ -41,6 +45,7 @@ impl MultiSketch {
4145
sketch_size: u64,
4246
kmer_lengths: &[usize],
4347
hash_type: HashType,
48+
inverted: bool,
4449
) -> Self {
4550
let mut name_map = HashMap::with_capacity(sketches.len());
4651
for sketch in sketches.iter() {
@@ -63,6 +68,8 @@ impl MultiSketch {
6368
}
6469
}
6570

71+
72+
6673
/// Saves the metadata
6774
pub fn save_metadata(&self, file_prefix: &str) -> Result<(), Error> {
6875
let filename = format!("{}.skm", file_prefix);
@@ -205,10 +212,8 @@ impl MultiSketch {
205212
let mut removed_samples = Vec::new();
206213

207214
for sketch in &self.sketch_metadata {
208-
209215
if !genome_ids_to_remove.contains(&(*sketch.name()).to_string()) {
210216
new_sketch_metadata.push(sketch.clone());
211-
212217
} else {
213218
removed_samples.push(sketch.name());
214219
}
@@ -218,8 +223,11 @@ impl MultiSketch {
218223
let set2: HashSet<&str> = genome_ids_to_remove.iter().map(AsRef::as_ref).collect();
219224
let missing: Vec<&&str> = set2.difference(&set1).collect();
220225
if !missing.is_empty() {
221-
bail!("The following samples have not been found in the database: {:?}", missing);
222-
}
226+
bail!(
227+
"The following samples have not been found in the database: {:?}",
228+
missing
229+
);
230+
}
223231

224232
self.sketch_metadata = new_sketch_metadata;
225233
self.save_metadata(output_file_name)?;
@@ -231,7 +239,7 @@ impl MultiSketch {
231239
input_prefix: &str,
232240
output_file: &str,
233241
genome_ids_to_remove: &[String],
234-
) -> anyhow::Result<()> {
242+
) -> anyhow::Result<()> {
235243
let mut positions_to_remove = Vec::new();
236244
let mut missing_ids = Vec::new();
237245

@@ -266,10 +274,85 @@ impl MultiSketch {
266274
Ok(())
267275
}
268276

269-
// This function is called when sketches are merged, not when they are
270-
// first sketched (this is handled by sketch::sketch_files())
277+
pub fn invert_index(sketches: &MultiSketch) -> HashMap<u64, HashSet<usize>> {
278+
println!("Debug: Starting invert_index function");
279+
println!("Debug: Sample stride: {}", sketches.sample_stride);
280+
println!("Debug: Sketch bins length: {}", sketches.sketch_bins.len());
281+
282+
// HashMap storing the inverted index
283+
let mut inverted_index: HashMap<u64, HashSet<usize>> = HashMap::default();
284+
285+
// Parallise the inversion for each sample
286+
let local_indices: Vec<HashMap<u64, HashSet<usize>>> = sketches
287+
.sketch_bins
288+
.par_chunks(sketches.sample_stride)
289+
.enumerate()
290+
.map(|(genome_id, sample_hash)| {
291+
println!("Debug: Processing genome_id: {}", genome_id);
292+
println!("Debug: Sample hash length: {}", sample_hash.len());
293+
294+
// Print all hashes for this genome
295+
println!("Debug: Hashes for genome {}: {:?}", genome_id, sample_hash);
296+
297+
let mut local_index: HashMap<u64, HashSet<usize>> = HashMap::default();
298+
for &hash in sample_hash {
299+
local_index
300+
.entry(hash)
301+
.or_insert_with(HashSet::default)
302+
.insert(genome_id);
303+
}
304+
println!("Debug: Local index for genome {} size: {}", genome_id, local_index.len());
305+
// Print the hash-to-genome mappings for this local index
306+
println!("Debug: Local index contents for genome {}: {:?}", genome_id, local_index);
307+
local_index
308+
})
309+
.collect();
310+
311+
println!("Debug: Number of local indices: {}", local_indices.len());
312+
313+
// Merge all local inverted indices into a global one
314+
for (i, local) in local_indices.iter().enumerate() {
315+
println!("Debug: Merging local index {}, size: {}", i, local.len());
316+
for (hash, genome_set) in local {
317+
inverted_index
318+
.entry(*hash)
319+
.or_insert_with(HashSet::default)
320+
.extend(genome_set);
321+
}
322+
}
323+
324+
println!("Debug: Final inverted index size: {}", inverted_index.len());
325+
println!("Debug: Inverted index contents: {:?}", inverted_index);
326+
327+
inverted_index
328+
}
329+
// remove, only need this for debugging
330+
pub fn get_sketch_bins_len(&self) -> usize {
331+
self.sketch_bins.len()
332+
}
333+
334+
pub fn write_inverted_index_to_file<P: AsRef<Path>>(
335+
inverted_index: &HashMap<u64, HashSet<usize>>,
336+
file_path: P,
337+
) -> std::io::Result<()> {
338+
let mut file = File::create(file_path)?;
339+
340+
for (hash, genome_set) in inverted_index {
341+
writeln!(
342+
file,
343+
"{} {}",
344+
hash,
345+
genome_set.iter().map(|id| id.to_string()).collect::<Vec<_>>().join(" ")
346+
)?;
347+
}
348+
349+
Ok(())
350+
}
271351
}
272352

353+
// This function is called when sketches are merged, not when they are
354+
// first sketched (this is handled by sketch::sketch_files())
355+
273356
impl fmt::Debug for MultiSketch {
274357
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
275358
write!(

0 commit comments

Comments
 (0)