Skip to content

Commit

Permalink
Non-Owning Decoder (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
felixwrt authored Jun 25, 2024
2 parents 5f6aac5 + 1f550b9 commit b0e819e
Showing 1 changed file with 97 additions and 69 deletions.
166 changes: 97 additions & 69 deletions src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,7 @@ enum DecodeState {
/// ```
pub struct Decoder<B: Buffer> {
buf: B,
raw_msg_len: usize,
crc: crc::Digest<'static, u16>,
crc_idx: usize,
state: DecodeState,
decoder: NonOwningDecoder,
}

impl<B: Buffer> Default for Decoder<B> {
Expand All @@ -359,13 +356,7 @@ impl<B: Buffer> Decoder<B> {
buf.clear();
Decoder {
buf,
raw_msg_len: 0,
crc: CRC_X25.digest(),
crc_idx: 0,
state: DecodeState::LookingForMessageStart {
num_discarded_bytes: 0,
num_init_seq_bytes: 0,
},
decoder: NonOwningDecoder::new(),
}
}

Expand All @@ -378,17 +369,7 @@ impl<B: Buffer> Decoder<B> {

/// Resets the `Decoder` and returns an error if it contained an incomplete message.
pub fn finalize(&mut self) -> Option<DecodeErr> {
use DecodeState::*;
let res = match self.state {
LookingForMessageStart {
num_discarded_bytes: 0,
num_init_seq_bytes: 0,
} => None,
Done => None,
_ => Some(DecodeErr::DiscardedBytes(self.raw_msg_len)),
};
self.reset();
res
self.decoder.finalize(&mut self.buf)
}

/// Main function of the parser.
Expand All @@ -398,6 +379,51 @@ impl<B: Buffer> Decoder<B> {
/// - `Ok(false)` when more bytes are necessary to complete parsing a message.
/// - `Err(_)` if an error occurred during parsing
pub(crate) fn _push_byte(&mut self, b: u8) -> Result<bool, DecodeErr> {
self.decoder.push_byte(&mut self.buf, b)
}

pub(crate) fn borrow_buf(&self) -> &[u8] {
if !self.decoder.is_done() {
panic!("Reading from the internal buffer is only allowed when a complete message is present (DecodeState::Done).");
}
&self.buf[..self.buf.len()]
}

/// Resets the `Decoder` and returns the number of bytes that were discarded
pub fn reset(&mut self) -> usize {
self.decoder.reset(&mut self.buf)
}
}

pub(crate) struct NonOwningDecoder {
raw_msg_len: usize,
crc: crc::Digest<'static, u16>,
state: DecodeState,
}

impl Default for NonOwningDecoder {
fn default() -> Self {
Self {
raw_msg_len: Default::default(),
crc: CRC_X25.digest(),
state: DecodeState::LookingForMessageStart {
num_discarded_bytes: 0,
num_init_seq_bytes: 0,
},
}
}
}

impl NonOwningDecoder {
pub fn new() -> Self {
Default::default()
}

/// Returns:
/// - Ok(true) Full message was written into buf
/// - Ok(false) Needs more input
/// - Error There was an error
pub fn push_byte(&mut self, buf: &mut impl Buffer, b: u8) -> Result<bool, DecodeErr> {
use DecodeState::*;
self.raw_msg_len += 1;
match self.state {
Expand All @@ -416,8 +442,7 @@ impl<B: Buffer> Decoder<B> {
let num_discarded_bytes = *num_discarded_bytes;
self.state = ParsingNormal;
self.raw_msg_len = 8;
assert_eq!(self.buf.len(), 0);
assert_eq!(self.crc_idx, 0);
assert_eq!(buf.len(), 0);
self.crc = CRC_X25.digest();
self.crc
.update(&[0x1b, 0x1b, 0x1b, 0x1b, 0x01, 0x01, 0x01, 0x01]);
Expand All @@ -427,53 +452,47 @@ impl<B: Buffer> Decoder<B> {
}
}
ParsingNormal => {
self.crc.update(&[b]);
if b == 0x1b {
// this could be the first byte of an escape sequence
self.state = ParsingEscChars(1);
} else {
// regular data
self.push(b)?;
self.push(buf, b)?;
}
}
ParsingEscChars(n) => {
self.crc.update(&[b]);
if b != 0x1b {
// push previous 0x1b bytes as they didn't belong to an escape sequence
for _ in 0..n {
self.push(0x1b)?;
self.push(buf, 0x1b)?;
}
// push current byte
self.push(b)?;
self.push(buf, b)?;
// continue in regular parsing state
self.state = ParsingNormal;
} else if n == 3 {
// this is the fourth 0x1b byte, so we're seeing an escape sequence.
// continue by parsing the escape sequence's payload.

// also update the crc here. the escape bytes aren't stored in `buf`, but
// still need to count for the crc calculation
// (1) add everything that's in the buffer and hasn't been added to the crc previously
self.crc.update(&self.buf[self.crc_idx..self.buf.len()]);
// (2) add the four escape bytes
self.crc.update(&[0x1b, 0x1b, 0x1b, 0x1b]);
// update crc_idx to indicate that everything that's currently in the buffer has already
// been used to update the crc
self.crc_idx = self.buf.len();

self.state = ParsingEscPayload(0);
} else {
self.state = ParsingEscChars(n + 1);
}
}
ParsingEscPayload(n) => {
self.push(b)?;
self.push(buf, b)?;
if n < 3 {
self.state = ParsingEscPayload(n + 1);
} else {
// last 4 elements in self.buf are the escape sequence payload
let payload = &self.buf[self.buf.len() - 4..self.buf.len()];
let payload = &buf[buf.len() - 4..buf.len()];
if payload == [0x1b, 0x1b, 0x1b, 0x1b] {
// escape sequence in user data

self.crc.update(payload);

// nothing to do here as the input has already been added to the buffer (see above)
self.state = ParsingNormal;
} else if payload == [0x01, 0x01, 0x01, 0x01] {
Expand All @@ -482,11 +501,10 @@ impl<B: Buffer> Decoder<B> {
// ignore everything that has previously been read and start reading a new transmission
let ignored_bytes = self.raw_msg_len - 8;
self.raw_msg_len = 8;
self.buf.clear();
buf.clear();
self.crc = CRC_X25.digest();
self.crc
.update(&[0x1b, 0x1b, 0x1b, 0x1b, 0x01, 0x01, 0x01, 0x01]);
self.crc_idx = 0;
self.state = ParsingNormal;
return Err(DecodeErr::DiscardedBytes(ignored_bytes));
} else if payload[0] == 0x1a {
Expand All @@ -498,8 +516,7 @@ impl<B: Buffer> Decoder<B> {
// compute and compare checksum
let read_crc = u16::from_le_bytes([payload[2], payload[3]]);
// update the crc, but exclude the last two bytes (which contain the crc itself)
self.crc
.update(&self.buf[self.crc_idx..(self.buf.len() - 2)]);
self.crc.update(&[payload[0], payload[1]]);
// get the calculated crc and reset it afterwards
let calculated_crc = {
let mut crc = CRC_X25.digest();
Expand All @@ -508,14 +525,14 @@ impl<B: Buffer> Decoder<B> {
};

// check alignment (end marker needs to have 4-byte alignment)
let misaligned = self.buf.len() % 4 != 0;
let misaligned = buf.len() % 4 != 0;

// check if padding is larger than the message length
let padding_too_large = num_padding_bytes > 3
|| (num_padding_bytes as usize + 4) > self.buf.len();
let padding_too_large =
num_padding_bytes > 3 || (num_padding_bytes as usize + 4) > buf.len();

if read_crc != calculated_crc || misaligned || padding_too_large {
self.reset();
self.reset(buf);
return Err(DecodeErr::InvalidMessage {
checksum_mismatch: (read_crc, calculated_crc),
end_esc_misaligned: misaligned,
Expand All @@ -524,8 +541,7 @@ impl<B: Buffer> Decoder<B> {
}

// subtract padding bytes and escape payload length from buffer length
self.buf
.truncate(self.buf.len() - num_padding_bytes as usize - 4);
buf.truncate(buf.len() - num_padding_bytes as usize - 4);

self.set_done();

Expand All @@ -545,17 +561,18 @@ impl<B: Buffer> Decoder<B> {
// ^^^^^^^^
// real escape sequence
//
// The solution for this issue is to check whether the read esacpe code
// The solution for this issue is to check whether the read escape code
// isn't aligned to a 4-byte boundary and followed by an aligned end
// escape sequence (`1b1b1b1b 1a...`).
// If that's the case, simply reset the parser state by 1-3 steps. This
// will parse the 0x1b bytes in the message as regular bytes and check
// for the end escape code at the right position.
let bytes_until_alignment = (4 - (self.buf.len() % 4)) % 4;
let bytes_until_alignment = (4 - (buf.len() % 4)) % 4;
if bytes_until_alignment > 0
&& payload[..bytes_until_alignment].iter().all(|x| *x == 0x1b)
&& payload[bytes_until_alignment] == 0x1a
{
self.crc.update(&payload[..bytes_until_alignment]);
self.state = ParsingEscPayload(4 - bytes_until_alignment as u8);
return Ok(false);
}
Expand All @@ -564,33 +581,37 @@ impl<B: Buffer> Decoder<B> {

// unwrap is safe here because payload is guaranteed to have size 4
let esc_bytes: [u8; 4] = payload.try_into().unwrap();
self.reset();
self.reset(buf);
return Err(DecodeErr::InvalidEsc(esc_bytes));
}
}
}
Done => {
// reset and let's go again
self.reset();
return self._push_byte(b);
self.reset(buf);
return self.push_byte(buf, b);
}
}
Ok(false)
}

pub(crate) fn borrow_buf(&self) -> &[u8] {
if !matches!(self.state, DecodeState::Done) {
panic!("Reading from the internal buffer is only allowed when a complete message is present (DecodeState::Done). Found state {:?}.", self.state);
}
&self.buf[..self.buf.len()]
}

fn set_done(&mut self) {
self.state = DecodeState::Done;
/// Resets the `Decoder` and returns an error if it contained an incomplete message.
pub fn finalize(&mut self, buf: &mut impl Buffer) -> Option<DecodeErr> {
use DecodeState::*;
let res = match self.state {
LookingForMessageStart {
num_discarded_bytes: 0,
num_init_seq_bytes: 0,
} => None,
Done => None,
_ => Some(DecodeErr::DiscardedBytes(self.raw_msg_len)),
};
self.reset(buf);
res
}

/// Resets the `Decoder` and returns the number of bytes that were discarded
pub fn reset(&mut self) -> usize {
pub fn reset(&mut self, buf: &mut impl Buffer) -> usize {
let num_discarded = match self.state {
DecodeState::Done => 0,
_ => self.raw_msg_len,
Expand All @@ -599,19 +620,26 @@ impl<B: Buffer> Decoder<B> {
num_discarded_bytes: 0,
num_init_seq_bytes: 0,
};
self.buf.clear();
self.crc_idx = 0;
buf.clear();
self.raw_msg_len = 0;
num_discarded
}

fn push(&mut self, b: u8) -> Result<(), DecodeErr> {
if self.buf.push(b).is_err() {
self.reset();
fn push(&mut self, buf: &mut impl Buffer, b: u8) -> Result<(), DecodeErr> {
if buf.push(b).is_err() {
self.reset(buf);
return Err(DecodeErr::OutOfMemory);
}
Ok(())
}

fn set_done(&mut self) {
self.state = DecodeState::Done;
}

fn is_done(&self) -> bool {
matches!(self.state, DecodeState::Done)
}
}

/// Decode a given slice of bytes and returns a vector of messages / errors.
Expand Down

0 comments on commit b0e819e

Please sign in to comment.