Skip to content

Commit

Permalink
feat: implement breakline detection in TableLikeReader and update Fas…
Browse files Browse the repository at this point in the history
…tqSource to use new detection method. That's all. Happy birthday to me! 🎉🎉🎉🎂
  • Loading branch information
dwpeng committed Nov 15, 2024
1 parent 40ac7b8 commit 8e7730c
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 129 deletions.
13 changes: 11 additions & 2 deletions src/filterx_engine/src/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,24 @@ impl Vm {
self.status.printed = false;
match self.source_type() {
SourceType::Fasta | SourceType::Fastq => {
if self.status.stop {
return Ok(None);
}
let left = self.status.limit_rows - self.status.consume_rows;
let left = left.min(self.status.chunk_size);
if left > 0 {
match self.source.inner {
SourceInner::Fasta(ref mut fasta) => {
fasta.into_dataframe(left)?;
let count = fasta.into_dataframe(left)?;
if count < left || count == 0 {
self.status.stop = true;
}
}
SourceInner::Fastq(ref mut fastq) => {
fastq.into_dataframe(left)?;
let count = fastq.into_dataframe(left)?;
if count < left || count == 0 {
self.status.stop = true;
}
}
_ => {
unreachable!();
Expand Down
155 changes: 56 additions & 99 deletions src/filterx_source/src/block/fastx/fasta.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::block::reader::TableLikeReader;
use crate::block::reader::{detect_breakline_len, TableLikeReader};
use crate::dataframe::DataframeSource;
use filterx_core::{FilterxResult, Hint};
use memchr::memchr;
use polars::prelude::*;
use std::collections::HashSet;
use std::io::BufRead;

use filterx_core::{FilterxResult, Hint};

#[derive(Debug, Clone, Copy)]
pub struct FastaParserOptions {
pub include_comment: bool,
Expand Down Expand Up @@ -52,7 +52,7 @@ impl FastaSource {
})
}

pub fn into_dataframe(&mut self, n: usize) -> FilterxResult<Option<()>> {
pub fn into_dataframe(&mut self, n: usize) -> FilterxResult<usize> {
let records = &mut self.records;
if records.capacity() < n {
unsafe {
Expand Down Expand Up @@ -82,12 +82,11 @@ impl FastaSource {
records.set_len(count);
}
if records.is_empty() {
return Ok(None);
return Ok(0);
}

let df = Fasta::as_dataframe(&records, &self.fasta.parser_options)?;
self.dataframe.update(df.lazy());
Ok(Some(()))
Ok(count)
}

pub fn reset(&mut self) -> FilterxResult<()> {
Expand All @@ -97,13 +96,13 @@ impl FastaSource {

pub struct Fasta {
reader: TableLikeReader,
prev_line: Vec<u8>,
read_end: bool,
pub path: String,
pub parser_options: FastaParserOptions,
record: FastaRecord,
pub record_type: FastaRecordType,
break_line_len: Option<usize>,
buffer_unprocess_size: usize,
}

#[derive(Clone, Copy, Debug, clap::ValueEnum, PartialEq)]
Expand Down Expand Up @@ -184,43 +183,39 @@ impl FastaRecord {
pub fn len(&self) -> usize {
self._sequence.1 - self._sequence.0 + 1
}
}

impl std::fmt::Display for FastaRecord {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, ">{}", self.name())?;
if let Some(comment) = self.comment() {
write!(f, " {}", comment)?;
pub fn goto_next_record(&mut self, left: usize) {
if self.buffer.len() < self.buffer.capacity() && left > 0 {
unsafe {
std::ptr::copy(
self.buffer.as_ptr().add(self.buffer.len()),
self.buffer.as_mut_ptr(),
left,
);
self.buffer.set_len(left);
}
}
write!(f, "\n{}", self.seq())
self._comment = (0, 0);
self._name = (0, 0);
self._sequence = (0, 0);
}
}

pub struct FastaRecordIter {
fasta: Fasta,
}

impl Iterator for FastaRecordIter {
type Item = FastaRecord;

fn next(&mut self) -> Option<Self::Item> {
match self.fasta.parse_next() {
Ok(Some(record)) => Some(record.clone()),
Ok(None) => None,
Err(e) => {
eprintln!("{}", e);
None
pub fn remove_breakline_from_buffer(&mut self, len: usize) {
if len > 0 {
unsafe {
self.buffer.set_len(self.buffer.len() - len);
}
}
}
}

impl IntoIterator for Fasta {
type Item = FastaRecord;
type IntoIter = FastaRecordIter;

fn into_iter(self) -> Self::IntoIter {
FastaRecordIter { fasta: self }
impl std::fmt::Display for FastaRecord {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, ">{}", self.name())?;
if let Some(comment) = self.comment() {
write!(f, " {}", comment)?;
}
write!(f, "\n{}", self.seq())
}
}

Expand All @@ -230,11 +225,11 @@ impl Clone for Fasta {
reader: self.reader.clone(),
path: self.path.clone(),
parser_options: self.parser_options.clone(),
prev_line: self.prev_line.clone(),
read_end: false,
record: self.record.clone(),
record_type: self.record_type.clone(),
break_line_len: self.break_line_len.clone(),
buffer_unprocess_size: self.buffer_unprocess_size,
}
}
}
Expand All @@ -247,14 +242,15 @@ impl Fasta {
) -> FilterxResult<Fasta> {
let mut fasta = Fasta {
reader: TableLikeReader::new(path)?,
prev_line: Vec::new(),
read_end: false,
path: path.to_string(),
parser_options: FastaParserOptions::default(),
record: FastaRecord::default(),
record_type,
break_line_len: None,
buffer_unprocess_size: 0,
};
fasta.break_line_len = detect_breakline_len(&mut fasta.reader)?;
if record_type == FastaRecordType::Auto {
fasta.detect_record_type(n_detect)?;
}
Expand Down Expand Up @@ -313,29 +309,32 @@ impl Fasta {
Ok(())
}

pub fn goto_next_record(&mut self) {
self.record.goto_next_record(self.buffer_unprocess_size);
self.buffer_unprocess_size = 0;
}

pub fn set_parser_options(mut self, parser_options: FastaParserOptions) -> Self {
self.parser_options = parser_options;
self
}

pub fn reset(&mut self) -> FilterxResult<()> {
self.reader.reset()?;
self.prev_line.clear();
self.record.clear();
self.read_end = false;
self.buffer_unprocess_size = 0;
Ok(())
}

pub fn parse_next(&mut self) -> FilterxResult<Option<&mut FastaRecord>> {
if self.read_end {
return Ok(None);
}
self.goto_next_record();
let record: &mut FastaRecord = &mut self.record;
record.clear();
// read name and comment
if !self.prev_line.is_empty() {
record.buffer.extend_from_slice(&self.prev_line);
} else {
if record.buffer.is_empty() {
let bytes = self.reader.read_until(b'\n', &mut record.buffer)?;
if bytes == 0 {
self.read_end = true;
Expand All @@ -361,36 +360,16 @@ impl Fasta {
h.print_and_exit();
}

let break_line_len = self.break_line_len.unwrap();

// fill name and comment
record._name.0 = 1;
record._name.1 = record.buffer.len();
record._name.1 = record.buffer.len() - break_line_len;

// remove \n or \r\n
let end = record.buffer.len();
let break_line_len;
if self.break_line_len.is_some() {
break_line_len = self.break_line_len.unwrap();
} else {
let name = &record.buffer[..];
if name.ends_with(&[b'\n', b'\r']) {
break_line_len = 2;
} else {
break_line_len = 1;
}
self.break_line_len = Some(break_line_len);
}
record._name.1 = end - break_line_len;
let start = memchr(b' ', &record.buffer[1..record._name.1]);

let mut start = None;

for i in 0..record._name.1 {
if record.buffer[i] == b' ' {
start = Some(i);
break;
}
}

if let Some(start) = start {
if let Some(mut start) = start {
start += 1;
record._name.1 = start;
if self.parser_options.include_comment {
record._comment.0 = start + 1;
Expand All @@ -404,20 +383,22 @@ impl Fasta {
}

// fill sequence
self.prev_line.clear();
record._sequence.0 = record.buffer.len();
loop {
let bytes = self.reader.read_until(b'\n', &mut self.prev_line)?;
let buffer_offset = record.buffer.len();
let bytes = self.reader.read_until(b'\n', &mut record.buffer)?;
if bytes == 0 {
self.read_end = true;
break;
}
if self.prev_line[0] == b'>' {
if record.buffer[buffer_offset] == b'>' {
unsafe {
record.buffer.set_len(buffer_offset);
}
self.buffer_unprocess_size = bytes;
break;
}
let line = &self.prev_line[..bytes - break_line_len];
record.buffer.extend_from_slice(line);
self.prev_line.clear();
record.remove_breakline_from_buffer(break_line_len);
}
if record.buffer.is_empty() {
return Ok(None);
Expand Down Expand Up @@ -460,27 +441,3 @@ impl Fasta {
Ok(df)
}
}

pub mod test {

#[allow(unused)]
use super::*;

#[test]
fn test_open_plain_file() -> FilterxResult<()> {
let path = "test_data/fasta/1.fa";
let fasta = Fasta::from_path(path, FastaRecordType::Auto, 3)?;
let records = fasta.into_iter().collect::<Vec<FastaRecord>>();
assert!(records.len() == 2);
Ok(())
}

#[test]
fn test_open_gzip_file() -> FilterxResult<()> {
let path = "test_data/fasta/1.fa.gz";
let fasta = Fasta::from_path(path, FastaRecordType::Auto, 3)?;
let records = fasta.into_iter().collect::<Vec<FastaRecord>>();
assert!(records.len() == 2);
Ok(())
}
}
34 changes: 6 additions & 28 deletions src/filterx_source/src/block/fastx/fastq.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use memchr::memchr;
use polars::prelude::*;
use std::{fmt::Display, io::BufRead};

use crate::block::reader::TableLikeReader;
use crate::block::reader::{TableLikeReader, detect_breakline_len};
use crate::dataframe::DataframeSource;

use filterx_core::{FilterxError, FilterxResult, Hint};
Expand Down Expand Up @@ -45,7 +44,7 @@ impl FastqSource {
})
}

pub fn into_dataframe(&mut self, n: usize) -> FilterxResult<Option<()>> {
pub fn into_dataframe(&mut self, n: usize) -> FilterxResult<usize> {
let records = &mut self.records;

if records.capacity() < n {
Expand Down Expand Up @@ -77,11 +76,11 @@ impl FastqSource {
records.set_len(count);
}
if records.is_empty() {
Ok(None)
Ok(0)
} else {
let df = Fastq::as_dataframe(&records, &self.fastq.parser_option)?;
self.dataframe.update(df.lazy());
Ok(Some(()))
Ok(count)
}
}

Expand Down Expand Up @@ -291,37 +290,16 @@ impl Fastq {
path: path.to_string(),
record: FastqRecord::default(),
break_line_len: None,
quality_type: quality_type,
quality_type,
buffer_unprocess_size: 0,
};
fq.detect_breakline_len()?;
fq.break_line_len = detect_breakline_len(&mut fq.reader)?;
if quality_type == QualityType::Auto {
fq.guess_quality_type(detect_size)?;
}
Ok(fq)
}

pub fn detect_breakline_len(&mut self) -> FilterxResult<()> {
loop {
let data = self.reader.fill_buf()?;
if data.is_empty() {
self.break_line_len = Some(0);
}
let offset = memchr(b'\n', data);
if offset.is_some() {
// test if endwith is \r\n
let offset = offset.unwrap();
if offset > 0 && data[offset - 1] == b'\r' {
self.break_line_len = Some(2);
} else {
self.break_line_len = Some(1);
}
break;
}
}
self.reset()?;
Ok(())
}

fn guess_quality_type(&mut self, detect_size: usize) -> FilterxResult<()> {
if !self.parser_option.include_qual {
Expand Down
Loading

0 comments on commit 8e7730c

Please sign in to comment.