From 9e2774ab9e5382b280202c64e4a18055bc048224 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Mon, 18 Apr 2022 20:15:32 +0530 Subject: [PATCH 1/4] Add Skyhash 2.0 impl --- src/lib.rs | 1 + src/types.rs | 2 + src/v2/mod.rs | 391 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 394 insertions(+) create mode 100644 src/v2/mod.rs diff --git a/src/lib.rs b/src/lib.rs index 4889079..041d494 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -229,6 +229,7 @@ pub mod ddl; pub mod error; pub mod pool; pub mod types; +pub mod v2; // endof public mods // private mods mod deserializer; diff --git a/src/types.rs b/src/types.rs index 51acc57..d31c687 100644 --- a/src/types.rs +++ b/src/types.rs @@ -313,6 +313,8 @@ pub enum FlatElement { RespCode(RespCode), /// An unsigned integer UnsignedInt(u64), + // A float + Float(f32), } /// A raw string diff --git a/src/v2/mod.rs b/src/v2/mod.rs new file mode 100644 index 0000000..7daa672 --- /dev/null +++ b/src/v2/mod.rs @@ -0,0 +1,391 @@ +/* + * Copyright 2022, Sayan Nandan + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +use crate::{ + types::{Array, FlatElement}, + Element, RespCode, +}; +use core::{ + num::{ParseFloatError, ParseIntError}, + slice, + str::{self, Utf8Error}, +}; + +/// A generic result to indicate parsing errors thorugh the [`ParseError`] enum +pub type ParseResult = Result; + +#[derive(Debug, PartialEq)] +#[non_exhaustive] +#[repr(u8)] +/// # Parser Errors +/// +/// Several errors can arise during parsing and this enum accounts for them +pub enum ParseError { + /// Didn't get the number of expected bytes + NotEnough, + /// The packet simply contains invalid data + BadPacket, + /// The query contains an unexpected byte + UnexpectedByte, + /// A data type was given but the parser failed to serialize it into this type + DataTypeError, + /// A data type that the client doesn't know was passed into the query + /// + /// This is a frequent problem that can arise between different server editions as more data types + /// can be added with changing server versions + UnknownDatatype, +} + +impl From for ParseError { + fn from(_: ParseIntError) -> Self { + Self::DataTypeError + } +} + +impl From for ParseError { + fn from(_: Utf8Error) -> Self { + Self::DataTypeError + } +} + +impl From for ParseError { + fn from(_: ParseFloatError) -> Self { + Self::DataTypeError + } +} + +#[derive(Debug, PartialEq)] +/// # Response types +/// +/// A simple response carries the response for a simple query while a pipelined response carries the response +/// for pipelined queries +pub enum RawResponse { + /// A simple query will just hold one element + SimpleQuery(Element), + /// A pipelined/batch query will hold multiple elements + PipelinedQuery(Vec), +} + +pub struct Parser<'a> { + slice: &'a [u8], + cursor: usize, +} + +impl<'a> Parser<'a> { + pub fn new(slice: &'a [u8]) -> Self { + Self { + slice, + cursor: 0usize, + } + } + fn remaining(&self) -> usize { + self.slice.len() - self.cursor + } + fn has_remaining(&self, c: usize) -> bool { + self.remaining() >= c + } + fn not_exhausted(&self) -> bool { + self.cursor < self.slice.len() + } + unsafe fn direct_read(&self, s: usize, c: usize) -> &[u8] { + slice::from_raw_parts(self.slice.as_ptr().add(s), c) + } + // mut refs + fn incr_cursor_by(&mut self, by: usize) { + debug_assert!(self.has_remaining(by), "Buffer overflow"); + self.cursor += by; + } + fn decr_cursor_by(&mut self, by: usize) { + debug_assert!( + self.cursor != 0 && self.cursor.checked_sub(by).is_some(), + "Size underflow" + ); + self.cursor -= 1; + } + fn decr_cursor(&mut self) { + self.decr_cursor_by(1) + } + fn incr_cursor(&mut self) { + self.incr_cursor_by(1) + } + unsafe fn get_byte_at_cursor(&self) -> u8 { + debug_assert!(self.not_exhausted(), "Buffer overflow"); + *self.slice.as_ptr().add(self.cursor) + } + fn read_until(&mut self, c: usize) -> ParseResult<&[u8]> { + if self.has_remaining(c) { + let cursor = self.cursor; + self.incr_cursor_by(c); + let slice = unsafe { + // UNSAFE(@ohsayan): Just verified length + self.direct_read(cursor, c) + }; + Ok(slice) + } else { + Err(ParseError::NotEnough) + } + } + fn read_line(&mut self) -> ParseResult<&[u8]> { + let cursor = self.cursor; + while self.not_exhausted() + && unsafe { + // UNSAFE(@ohsayan): The first condition ensures + // that the current byte is present in the allocation + self.get_byte_at_cursor() + } != b'\n' + { + self.incr_cursor(); + } + if self.not_exhausted() + && unsafe { + // UNSAFE(@ohsayan): The first condition ensures + // that the current byte is present in the allocation + self.get_byte_at_cursor() + } == b'\n' + { + let len = self.cursor - cursor; + self.incr_cursor(); // skip LF + Ok(unsafe { + // UNSAFE(@ohsayan): Just verified length + self.direct_read(cursor, len) + }) + } else { + Err(ParseError::NotEnough) + } + } + fn read_line_pedantic(&mut self) -> ParseResult<&[u8]> { + let cursor = self.cursor; + while self.not_exhausted() + && unsafe { + // UNSAFE(@ohsayan): The first condition ensures + // that the current byte is present in the allocation + self.get_byte_at_cursor() + } != b'\n' + { + self.incr_cursor(); + } + let len = self.cursor - cursor; + let has_lf = unsafe { + // UNSAFE(@ohsayan): The first condition ensures + // that the current byte is present in the allocation + self.get_byte_at_cursor() + } == b'\n'; + if self.not_exhausted() && has_lf && len != 0 { + self.incr_cursor(); // skip LF + Ok(unsafe { + // UNSAFE(@ohsayan): Just verified lengths + self.direct_read(cursor, len) + }) + } else { + let r = if has_lf { + ParseError::BadPacket + } else { + ParseError::NotEnough + }; + Err(r) + } + } + fn try_read_cursor(&mut self) -> ParseResult { + if self.not_exhausted() { + let r = unsafe { + // UNSAFE(@ohsayan): Just checked len + self.get_byte_at_cursor() + }; + self.incr_cursor(); + Ok(r) + } else { + Err(ParseError::NotEnough) + } + } +} + +// higher level abstractions +impl<'a> Parser<'a> { + fn read_u64(&mut self) -> ParseResult { + let line = self.read_line_pedantic()?; + let r = str::from_utf8(line)?.parse()?; + Ok(r) + } + fn read_usize(&mut self) -> ParseResult { + let line = self.read_line_pedantic()?; + let r = str::from_utf8(line)?.parse()?; + Ok(r) + } + fn read_usize_nullck(&mut self) -> ParseResult> { + match self.try_read_cursor()? { + b'\0' => { + // null + Ok(None) + } + _ => { + self.decr_cursor(); + let usz = self.read_usize()?; + Ok(Some(usz)) + } + } + } + fn read_string(&mut self) -> ParseResult { + let size = self.read_usize()?; + let line = self.read_until(size)?; + let r = str::from_utf8(line)?.to_owned(); + Ok(r) + } + fn read_string_nullck(&mut self) -> ParseResult> { + if let Some(size) = self.read_usize_nullck()? { + Ok(Some(str::from_utf8(self.read_until(size)?)?.to_owned())) + } else { + Ok(None) + } + } + fn read_binary_nullck(&mut self) -> ParseResult>> { + if let Some(size) = self.read_usize_nullck()? { + Ok(Some(self.read_until(size)?.to_owned())) + } else { + Ok(None) + } + } + fn read_binary(&mut self) -> ParseResult> { + let size = self.read_usize()?; + Ok(self.read_until(size)?.to_owned()) + } + fn read_respcode(&mut self) -> ParseResult { + let line = self.read_line()?; + let st = str::from_utf8(line)?; + Ok(RespCode::from_str(st)) + } + fn read_float(&mut self) -> ParseResult { + let line = self.read_line()?; + let st = str::from_utf8(line)?; + Ok(st.parse()?) + } + fn read_flat_array(&mut self) -> ParseResult> { + let array_len = self.read_usize()?; + let mut data = Vec::with_capacity(array_len); + for _ in 0..array_len { + match self.try_read_cursor()? { + b'+' => data.push(FlatElement::String(self.read_string()?)), + b'?' => data.push(FlatElement::Binstr(self.read_binary()?)), + b'!' => data.push(FlatElement::RespCode(self.read_respcode()?)), + b':' => data.push(FlatElement::UnsignedInt(self.read_u64()?)), + b'%' => data.push(FlatElement::Float(self.read_float()?)), + _ => return Err(ParseError::UnknownDatatype), + } + } + Ok(data) + } + fn read_typed_array_string(&mut self) -> ParseResult>> { + let size = self.read_usize()?; + let mut data = Vec::with_capacity(size); + for _ in 0..size { + data.push(self.read_string_nullck()?); + } + Ok(data) + } + fn read_typed_array_binary(&mut self) -> ParseResult>>> { + let size = self.read_usize()?; + let mut data = Vec::with_capacity(size); + for _ in 0..size { + data.push(self.read_binary_nullck()?); + } + Ok(data) + } + fn read_typed_array(&mut self) -> ParseResult { + let r = match self.try_read_cursor()? { + b'+' => Element::Array(Array::Str(self.read_typed_array_string()?)), + b'?' => Element::Array(Array::Bin(self.read_typed_array_binary()?)), + _ => return Err(ParseError::UnknownDatatype), + }; + Ok(r) + } +} + +// response methods +impl<'a> Parser<'a> { + fn _read_simple_resp(&mut self) -> ParseResult { + let r = match self.try_read_cursor()? { + b'+' => Element::String(self.read_string()?), + b'?' => Element::Binstr(self.read_binary()?), + b'!' => Element::RespCode(self.read_respcode()?), + b':' => Element::UnsignedInt(self.read_u64()?), + b'%' => Element::Float(self.read_float()?), + b'@' => self.read_typed_array()?, + b'_' => Element::Array(Array::Flat(self.read_flat_array()?)), + _ => return Err(ParseError::UnknownDatatype), + }; + Ok(r) + } + fn read_simple_resp(&mut self) -> ParseResult { + self._read_simple_resp() + } + fn read_pipeline_resp(&mut self) -> ParseResult> { + let size = self.read_usize()?; + let mut resps = Vec::with_capacity(size); + for _ in 0..size { + resps.push(self._read_simple_resp()?); + } + Ok(resps) + } + pub fn parse(buffer: &'a [u8]) -> ParseResult { + let mut slf = Self::new(buffer); + let ret = match slf.try_read_cursor()? { + b'*' => RawResponse::SimpleQuery(slf.read_simple_resp()?), + b'$' => RawResponse::PipelinedQuery(slf.read_pipeline_resp()?), + _ => return Err(ParseError::BadPacket), + }; + Ok(ret) + } +} + +#[test] +fn set_resp() { + let setresp = b"*!0\n".to_vec(); + assert_eq!( + Parser::parse(&setresp).unwrap(), + RawResponse::SimpleQuery(Element::RespCode(RespCode::Okay)) + ); +} + +#[test] +fn mget_resp() { + let mgetresp = b"*@+4\n5\nsayan2\nis8\nthinking\0\n".to_vec(); + let ret = Parser::parse(&mgetresp).unwrap(); + assert_eq!( + ret, + RawResponse::SimpleQuery(Element::Array(Array::Str(vec![ + Some("sayan".to_owned()), + Some("is".to_owned()), + Some("thinking".to_owned()), + None + ]))) + ) +} + +#[test] +fn pipe_resp() { + let resp = b"$2\n!0\n@+4\n5\nsayan2\nis8\nthinking\0\n".to_vec(); + assert_eq!( + Parser::parse(&resp).unwrap(), + RawResponse::PipelinedQuery(vec![ + Element::RespCode(RespCode::Okay), + Element::Array(Array::Str(vec![ + Some("sayan".to_owned()), + Some("is".to_owned()), + Some("thinking".to_owned()), + None + ])) + ]) + ) +} From 99da19a90375c03cdf005cd365e6469df5f1f874 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Mon, 18 Apr 2022 23:09:50 +0530 Subject: [PATCH 2/4] Add parsing for non-null arrays --- src/v2/mod.rs | 64 ++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 53 insertions(+), 11 deletions(-) diff --git a/src/v2/mod.rs b/src/v2/mod.rs index 7daa672..6744c70 100644 --- a/src/v2/mod.rs +++ b/src/v2/mod.rs @@ -178,11 +178,12 @@ impl<'a> Parser<'a> { self.incr_cursor(); } let len = self.cursor - cursor; - let has_lf = unsafe { - // UNSAFE(@ohsayan): The first condition ensures - // that the current byte is present in the allocation - self.get_byte_at_cursor() - } == b'\n'; + let has_lf = self.not_exhausted() + && unsafe { + // UNSAFE(@ohsayan): The first condition ensures + // that the current byte is present in the allocation + self.get_byte_at_cursor() + } == b'\n'; if self.not_exhausted() && has_lf && len != 0 { self.incr_cursor(); // skip LF Ok(unsafe { @@ -310,6 +311,30 @@ impl<'a> Parser<'a> { }; Ok(r) } + fn read_typed_nonnull_array_string(&mut self) -> ParseResult> { + let size = self.read_usize()?; + let mut data = Vec::with_capacity(size); + for _ in 0..size { + data.push(self.read_string()?); + } + Ok(data) + } + fn read_typed_nonnull_array_binary(&mut self) -> ParseResult>> { + let size = self.read_usize()?; + let mut data = Vec::with_capacity(size); + for _ in 0..size { + data.push(self.read_binary()?); + } + Ok(data) + } + fn read_typed_nonnull_array(&mut self) -> ParseResult { + let r = match self.try_read_cursor()? { + b'+' => Element::Array(Array::NonNullStr(self.read_typed_nonnull_array_string()?)), + b'?' => Element::Array(Array::NonNullBin(self.read_typed_nonnull_array_binary()?)), + _ => return Err(ParseError::UnknownDatatype), + }; + Ok(r) + } } // response methods @@ -322,6 +347,7 @@ impl<'a> Parser<'a> { b':' => Element::UnsignedInt(self.read_u64()?), b'%' => Element::Float(self.read_float()?), b'@' => self.read_typed_array()?, + b'^' => self.read_typed_nonnull_array()?, b'_' => Element::Array(Array::Flat(self.read_flat_array()?)), _ => return Err(ParseError::UnknownDatatype), }; @@ -338,14 +364,17 @@ impl<'a> Parser<'a> { } Ok(resps) } - pub fn parse(buffer: &'a [u8]) -> ParseResult { - let mut slf = Self::new(buffer); - let ret = match slf.try_read_cursor()? { - b'*' => RawResponse::SimpleQuery(slf.read_simple_resp()?), - b'$' => RawResponse::PipelinedQuery(slf.read_pipeline_resp()?), + fn _parse(&mut self) -> ParseResult { + let r = match self.try_read_cursor()? { + b'*' => RawResponse::SimpleQuery(self.read_simple_resp()?), + b'$' => RawResponse::PipelinedQuery(self.read_pipeline_resp()?), _ => return Err(ParseError::BadPacket), }; - Ok(ret) + Ok(r) + } + pub fn parse(buffer: &'a [u8]) -> ParseResult { + let mut slf = Self::new(buffer); + slf._parse() } } @@ -389,3 +418,16 @@ fn pipe_resp() { ]) ) } + +#[test] +fn lskeys_resp() { + let resp = b"*^+3\n5\nsayan2\nis8\nthinking".to_vec(); + assert_eq!( + Parser::parse(&resp).unwrap(), + RawResponse::SimpleQuery(Element::Array(Array::NonNullStr(vec![ + "sayan".to_string(), + "is".to_string(), + "thinking".to_string() + ]))) + ); +} From 54a2c29600e32548222ffc893b626bc818ef3439 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Tue, 19 Apr 2022 00:08:21 +0530 Subject: [PATCH 3/4] Upgrade all interfaces to use the new protocol --- src/aio.rs | 11 +- src/deserializer.rs | 847 +++++++++++++++++--------------------------- src/lib.rs | 1 - src/sync.rs | 11 +- src/v2/mod.rs | 433 ---------------------- 5 files changed, 333 insertions(+), 970 deletions(-) delete mode 100644 src/v2/mod.rs diff --git a/src/aio.rs b/src/aio.rs index c751e36..38739f5 100644 --- a/src/aio.rs +++ b/src/aio.rs @@ -94,16 +94,13 @@ macro_rules! impl_async_methods { } Err(e) => match e { ParseError::NotEnough => (), - ParseError::BadPacket | ParseError::UnexpectedByte => { + ParseError::BadPacket => { self.buffer.clear(); return Err(SkyhashError::InvalidResponse.into()); } ParseError::DataTypeError => { return Err(SkyhashError::ParseError.into()) } - ParseError::Empty => { - return Err(IoError::from(ErrorKind::ConnectionReset).into()) - } ParseError::UnknownDatatype => { return Err(SkyhashError::UnknownDataType.into()) } @@ -113,11 +110,7 @@ macro_rules! impl_async_methods { } /// This function is a subroutine of `run_query` used to parse the response packet fn try_response(&mut self) -> Result<(RawResponse, usize), ParseError> { - if self.buffer.is_empty() { - // The connection was possibly reset - return Err(ParseError::Empty); - } - Parser::new(&self.buffer).parse() + Parser::parse(&self.buffer) } } impl crate::actions::AsyncSocket for $ty { diff --git a/src/deserializer.rs b/src/deserializer.rs index 5bb69a8..3e69979 100644 --- a/src/deserializer.rs +++ b/src/deserializer.rs @@ -27,12 +27,16 @@ //! by Sayan Nandan and this is the first client implementation of the protocol //! -use crate::types::Array; -use crate::types::FlatElement; -use crate::types::FromSkyhashBytes; -use crate::RespCode; -use crate::SkyResult; -use std::hint::unreachable_unchecked; +use crate::{ + types::FromSkyhashBytes, + types::{Array, FlatElement}, + RespCode, SkyResult, +}; +use core::{ + num::{ParseFloatError, ParseIntError}, + slice, + str::{self, Utf8Error}, +}; #[derive(Debug)] /// # Skyhash Deserializer (Parser) @@ -58,7 +62,7 @@ pub(super) struct Parser<'a> { /// Do not even think of touching this externally cursor: usize, /// The buffer slice - buffer: &'a [u8], + slice: &'a [u8], } #[derive(Debug, PartialEq)] @@ -87,35 +91,61 @@ impl Element { T::from_element(self) } } +/* + * Copyright 2022, Sayan Nandan + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +/// A generic result to indicate parsing errors thorugh the [`ParseError`] enum +pub type ParseResult = Result; #[derive(Debug, PartialEq)] #[non_exhaustive] +#[repr(u8)] /// # Parser Errors /// /// Several errors can arise during parsing and this enum accounts for them pub enum ParseError { /// Didn't get the number of expected bytes NotEnough, - /// The query contains an unexpected byte - UnexpectedByte, /// The packet simply contains invalid data - /// - /// This is rarely returned and only in the special cases where a bad client sends `0` as - /// the query count BadPacket, /// A data type was given but the parser failed to serialize it into this type - /// - /// This can happen not just for elements but can also happen for their sizes ([`Self::parse_into_u64`]) DataTypeError, /// A data type that the client doesn't know was passed into the query /// /// This is a frequent problem that can arise between different server editions as more data types /// can be added with changing server versions UnknownDatatype, - /// The query is empty - /// - /// The **parser will never return this**, but instead it is provided for convenience with [`dbnet`] - Empty, +} + +impl From for ParseError { + fn from(_: ParseIntError) -> Self { + Self::DataTypeError + } +} + +impl From for ParseError { + fn from(_: Utf8Error) -> Self { + Self::DataTypeError + } +} + +impl From for ParseError { + fn from(_: ParseFloatError) -> Self { + Self::DataTypeError + } } #[derive(Debug, PartialEq)] @@ -130,580 +160,361 @@ pub enum RawResponse { PipelinedQuery(Vec), } -/// A generic result to indicate parsing errors thorugh the [`ParseError`] enum -pub type ParseResult = Result; - impl<'a> Parser<'a> { - /// Initialize a new parser instance - pub const fn new(buffer: &'a [u8]) -> Self { - Parser { + pub fn new(slice: &'a [u8]) -> Self { + Self { + slice, cursor: 0usize, - buffer, } } - /// Read from the current cursor position to `until` number of positions ahead - /// This **will forward the cursor itself** if the bytes exist or it will just return a `NotEnough` error - fn read_until(&mut self, until: usize) -> ParseResult<&[u8]> { - if let Some(b) = self.buffer.get(self.cursor..self.cursor + until) { - self.cursor += until; - Ok(b) - } else { - Err(ParseError::NotEnough) - } + fn remaining(&self) -> usize { + self.slice.len() - self.cursor } - /// This returns the position at which the line parsing began and the position at which the line parsing - /// stopped, in other words, you should be able to do self.buffer[started_at..stopped_at] to get a line - /// and do it unchecked. This **will move the internal cursor ahead** and place it **at the `\n` byte** - fn read_line(&mut self) -> (usize, usize) { - let started_at = self.cursor; - let mut stopped_at = self.cursor; - while self.cursor < self.buffer.len() { - if self.buffer[self.cursor] == b'\n' { - // Oh no! Newline reached, time to break the loop - // But before that ... we read the newline, so let's advance the cursor - self.incr_cursor(); - break; - } - // So this isn't an LF, great! Let's forward the stopped_at position - stopped_at += 1; - self.incr_cursor(); - } - (started_at, stopped_at) + fn has_remaining(&self, c: usize) -> bool { + self.remaining() >= c + } + fn not_exhausted(&self) -> bool { + self.cursor < self.slice.len() + } + unsafe fn direct_read(&self, s: usize, c: usize) -> &[u8] { + slice::from_raw_parts(self.slice.as_ptr().add(s), c) + } + // mut refs + fn incr_cursor_by(&mut self, by: usize) { + debug_assert!(self.has_remaining(by), "Buffer overflow"); + self.cursor += by; + } + fn decr_cursor_by(&mut self, by: usize) { + debug_assert!( + self.cursor != 0 && self.cursor.checked_sub(by).is_some(), + "Size underflow" + ); + self.cursor -= 1; + } + fn decr_cursor(&mut self) { + self.decr_cursor_by(1) } - /// Push the internal cursor ahead by one fn incr_cursor(&mut self) { - self.cursor += 1; - } - /// This function will evaluate if the byte at the current cursor position equals the `ch` argument, i.e - /// the expression `*v == ch` is evaluated. However, if no element is present ahead, then the function - /// will return `Ok(_this_if_nothing_ahead_)` - fn will_cursor_give_char(&self, ch: u8, this_if_nothing_ahead: bool) -> ParseResult { - self.buffer.get(self.cursor).map_or( - if this_if_nothing_ahead { - Ok(true) - } else { - Err(ParseError::NotEnough) - }, - |v| Ok(*v == ch), - ) - } - /// Will the current cursor position give a linefeed? This will return `ParseError::NotEnough` if - /// the current cursor points at a non-existent index in `self.buffer` - fn will_cursor_give_linefeed(&self) -> ParseResult { - self.will_cursor_give_char(b'\n', false) - } - /// Parse a stream of bytes into [`usize`] - fn parse_into_usize(bytes: &[u8]) -> ParseResult { - if bytes.is_empty() { - return Err(ParseError::NotEnough); - } - let byte_iter = bytes.iter(); - let mut item_usize = 0usize; - for dig in byte_iter { - if !dig.is_ascii_digit() { - // dig has to be an ASCII digit - return Err(ParseError::DataTypeError); - } - // 48 is the ASCII code for 0, and 57 is the ascii code for 9 - // so if 0 is given, the subtraction should give 0; similarly - // if 9 is given, the subtraction should give us 9! - let curdig: usize = dig - .checked_sub(48) - .unwrap_or_else(|| unsafe { unreachable_unchecked() }) - .into(); - // The usize can overflow; check that case - let product = match item_usize.checked_mul(10) { - Some(not_overflowed) => not_overflowed, - None => return Err(ParseError::DataTypeError), - }; - let sum = match product.checked_add(curdig) { - Some(not_overflowed) => not_overflowed, - None => return Err(ParseError::DataTypeError), - }; - item_usize = sum; - } - Ok(item_usize) + self.incr_cursor_by(1) } - /// Pasre a stream of bytes into an [`u64`] - fn parse_into_u64(bytes: &[u8]) -> ParseResult { - if bytes.is_empty() { - return Err(ParseError::NotEnough); - } - let byte_iter = bytes.iter(); - let mut item_u64 = 0u64; - for dig in byte_iter { - if !dig.is_ascii_digit() { - // dig has to be an ASCII digit - return Err(ParseError::DataTypeError); - } - // 48 is the ASCII code for 0, and 57 is the ascii code for 9 - // so if 0 is given, the subtraction should give 0; similarly - // if 9 is given, the subtraction should give us 9! - let curdig: u64 = dig - .checked_sub(48) - .unwrap_or_else(|| unsafe { unreachable_unchecked() }) - .into(); - // Now the entire u64 can overflow, so let's attempt to check it - let product = match item_u64.checked_mul(10) { - Some(not_overflowed) => not_overflowed, - None => return Err(ParseError::DataTypeError), - }; - let sum = match product.checked_add(curdig) { - Some(not_overflowed) => not_overflowed, - None => return Err(ParseError::DataTypeError), - }; - item_u64 = sum; - } - Ok(item_u64) + unsafe fn get_byte_at_cursor(&self) -> u8 { + debug_assert!(self.not_exhausted(), "Buffer overflow"); + *self.slice.as_ptr().add(self.cursor) } - /// This will return the number of datagroups present in this query packet - /// - /// This **will forward the cursor itself** - fn parse_metaframe_get_datagroup_count(&mut self) -> ParseResult { - // the smallest query we can have is: *1\n or 3 chars - if self.buffer.len() < 3 { - return Err(ParseError::NotEnough); - } - // Now we want to read `*\n` - let (start, stop) = self.read_line(); - if let Some(our_chunk) = self.buffer.get(start..stop) { - if our_chunk[0] == b'*' { - // Good, this will tell us the number of actions - // Let us attempt to read the usize from this point onwards - // that is excluding the '*' (so 1..) - let ret = Self::parse_into_usize(&our_chunk[1..])?; - Ok(ret) - } else { - Err(ParseError::UnexpectedByte) - } + fn read_until(&mut self, c: usize) -> ParseResult<&[u8]> { + if self.has_remaining(c) { + let cursor = self.cursor; + self.incr_cursor_by(c); + let slice = unsafe { + // UNSAFE(@ohsayan): Just verified length + self.direct_read(cursor, c) + }; + Ok(slice) } else { Err(ParseError::NotEnough) } } - /// Get the next element **without** the tsymbol - /// - /// This function **does not forward the newline** - fn __get_next_element(&mut self) -> ParseResult<&[u8]> { - let string_sizeline = self.read_line(); - if let Some(line) = self.buffer.get(string_sizeline.0..string_sizeline.1) { - let string_size = Self::parse_into_usize(line)?; - let our_chunk = self.read_until(string_size)?; - Ok(our_chunk) + fn read_line(&mut self) -> ParseResult<&[u8]> { + let cursor = self.cursor; + while self.not_exhausted() + && unsafe { + // UNSAFE(@ohsayan): The first condition ensures + // that the current byte is present in the allocation + self.get_byte_at_cursor() + } != b'\n' + { + self.incr_cursor(); + } + if self.not_exhausted() + && unsafe { + // UNSAFE(@ohsayan): The first condition ensures + // that the current byte is present in the allocation + self.get_byte_at_cursor() + } == b'\n' + { + let len = self.cursor - cursor; + self.incr_cursor(); // skip LF + Ok(unsafe { + // UNSAFE(@ohsayan): Just verified length + self.direct_read(cursor, len) + }) } else { Err(ParseError::NotEnough) } } - /// The cursor should have passed the `?` tsymbol - fn parse_next_binstr(&mut self) -> ParseResult> { - let our_string_chunk = self.__get_next_element()?.to_owned(); - if self.will_cursor_give_linefeed()? { - // there is a lf after the end of the binary string; great! - // let's skip that now + fn read_line_pedantic(&mut self) -> ParseResult<&[u8]> { + let cursor = self.cursor; + while self.not_exhausted() + && unsafe { + // UNSAFE(@ohsayan): The first condition ensures + // that the current byte is present in the allocation + self.get_byte_at_cursor() + } != b'\n' + { self.incr_cursor(); - // let's return our string - Ok(our_string_chunk) - } else { - Err(ParseError::UnexpectedByte) } - } - /// Parse the next null checked element - fn parse_next_chunk_nullck(&mut self) -> ParseResult> { - // we have the chunk - let (start, stop) = self.read_line(); - if let Some(sizeline) = self.buffer.get(start..stop) { - let string_size = Self::parse_into_usize_nullck(sizeline)?; - if let Some(size) = string_size { - // so it isn't null - let our_chunk = self.read_until(size)?; - Ok(Some(our_chunk)) - } else { - Ok(None) - } + let len = self.cursor - cursor; + let has_lf = self.not_exhausted() + && unsafe { + // UNSAFE(@ohsayan): The first condition ensures + // that the current byte is present in the allocation + self.get_byte_at_cursor() + } == b'\n'; + if self.not_exhausted() && has_lf && len != 0 { + self.incr_cursor(); // skip LF + Ok(unsafe { + // UNSAFE(@ohsayan): Just verified lengths + self.direct_read(cursor, len) + }) } else { - Err(ParseError::NotEnough) - } - } - /// The cursor should have passed the `+` tsymbol - fn parse_next_string(&mut self) -> ParseResult { - Ok(String::from_utf8_lossy(&self.parse_next_binstr()?).to_string()) - } - fn parse_next_binstr_nullck(&mut self) -> ParseResult>> { - let our_chunk = self.parse_next_chunk_nullck()?; - if let Some(chunk) = our_chunk { - let our_chunk = chunk.to_owned(); - if self.will_cursor_give_linefeed()? { - // there is a lf after the end of the binary string; great! - // let's skip that now - self.incr_cursor(); - Ok(Some(our_chunk)) + let r = if has_lf { + ParseError::BadPacket } else { - Err(ParseError::UnexpectedByte) - } - } else { - Ok(None) - } - } - fn parse_next_str_nullck(&mut self) -> ParseResult> { - match self.parse_next_binstr_nullck()? { - Some(chunk) => Ok(Some(String::from_utf8_lossy(&chunk).to_string())), - None => Ok(None), + ParseError::NotEnough + }; + Err(r) } } - /// The cursor should have passed the `:` tsymbol - fn parse_next_u64(&mut self) -> ParseResult { - let our_u64_chunk = self.__get_next_element()?; - let our_u64 = Self::parse_into_u64(our_u64_chunk)?; - if self.will_cursor_give_linefeed()? { - // line feed after u64; heck yeah! + fn try_read_cursor(&mut self) -> ParseResult { + if self.not_exhausted() { + let r = unsafe { + // UNSAFE(@ohsayan): Just checked len + self.get_byte_at_cursor() + }; self.incr_cursor(); - // return it - Ok(our_u64) + Ok(r) } else { - Err(ParseError::UnexpectedByte) + Err(ParseError::NotEnough) } } - fn parse_next_respcode(&mut self) -> ParseResult { - let our_respcode_chunk = self.__get_next_element()?; - let our_respcode = RespCode::from_str(&String::from_utf8_lossy(our_respcode_chunk)); - if self.will_cursor_give_linefeed()? { - self.incr_cursor(); - Ok(our_respcode) - } else { - Err(ParseError::UnexpectedByte) - } +} + +// higher level abstractions +impl<'a> Parser<'a> { + fn read_u64(&mut self) -> ParseResult { + let line = self.read_line_pedantic()?; + let r = str::from_utf8(line)?.parse()?; + Ok(r) } - fn parse_next_float(&mut self) -> ParseResult { - let our_float_chunk = self.__get_next_element()?; - match String::from_utf8_lossy(our_float_chunk).parse() { - Ok(f) => { - if self.will_cursor_give_linefeed()? { - self.incr_cursor(); - Ok(f) - } else { - println!("LF error"); - Err(ParseError::UnexpectedByte) - } + fn read_usize(&mut self) -> ParseResult { + let line = self.read_line_pedantic()?; + let r = str::from_utf8(line)?.parse()?; + Ok(r) + } + fn read_usize_nullck(&mut self) -> ParseResult> { + match self.try_read_cursor()? { + b'\0' => { + // null + Ok(None) } - Err(e) => { - println!("Error: {}", e); - Err(ParseError::UnexpectedByte) + _ => { + self.decr_cursor(); + let usz = self.read_usize()?; + Ok(Some(usz)) } } } - /// The cursor should be **at the tsymbol** - fn parse_next_element(&mut self) -> ParseResult { - if let Some(tsymbol) = self.buffer.get(self.cursor) { - // so we have a tsymbol; nice, let's match it - // but advance the cursor before doing that (skip) - self.incr_cursor(); - let ret = match *tsymbol { - b'?' => Element::Binstr(self.parse_next_binstr()?), - b'+' => Element::String(self.parse_next_string()?), - b':' => Element::UnsignedInt(self.parse_next_u64()?), - b'&' => Element::Array(Array::Recursive(self.parse_next_array()?)), - b'!' => Element::RespCode(self.parse_next_respcode()?), - b'%' => Element::Float(self.parse_next_float()?), - b'^' => { - // hmm, a typed non-null array; let's check the tsymbol - if let Some(array_type) = self.buffer.get(self.cursor) { - // got tsymbol, let's skip it - self.incr_cursor(); - match array_type { - b'+' => { - Element::Array(Array::NonNullStr(self.parse_next_nonnull_str()?)) - } - b'?' => { - Element::Array(Array::NonNullBin(self.parse_next_nonnull_bin()?)) - } - _ => return Err(ParseError::UnknownDatatype), - } - } else { - // if we couldn't fetch a tsymbol, there wasn't enough - // data left - return Err(ParseError::NotEnough); - } - } - b'@' => { - // hmmm, a typed array; let's check the tsymbol - if let Some(array_type) = self.buffer.get(self.cursor) { - // got tsymbol, let's skip it - self.incr_cursor(); - match array_type { - b'+' => Element::Array(Array::Str(self.parse_next_typed_array_str()?)), - b'?' => Element::Array(Array::Bin(self.parse_next_typed_array_bin()?)), - _ => return Err(ParseError::UnknownDatatype), - } - } else { - // if we couldn't fetch a tsymbol, there wasn't enough - // data left - return Err(ParseError::NotEnough); - } - } - b'_' => Element::Array(Array::Flat(self.parse_next_flat_array()?)), - _ => return Err(ParseError::UnknownDatatype), - }; - Ok(ret) - } else { - // Not enough bytes to read an element - Err(ParseError::NotEnough) - } + fn read_string(&mut self) -> ParseResult { + let size = self.read_usize()?; + let line = self.read_until(size)?; + let r = str::from_utf8(line)?.to_owned(); + Ok(r) } - /// Parse the next null checked usize - fn parse_into_usize_nullck(inp: &[u8]) -> ParseResult> { - if inp == [0] { - Ok(None) + fn read_string_nullck(&mut self) -> ParseResult> { + if let Some(size) = self.read_usize_nullck()? { + Ok(Some(str::from_utf8(self.read_until(size)?)?.to_owned())) } else { - Ok(Some(Self::parse_into_usize(inp)?)) + Ok(None) } } - - /// The cursor should have passed the `@+` chars - fn parse_next_nonnull_str(&mut self) -> ParseResult> { - let (start, stop) = self.read_line(); - if let Some(our_size_chunk) = self.buffer.get(start..stop) { - // so we have a size chunk; let's get the size - let array_size = Self::parse_into_usize(our_size_chunk)?; - let mut array = Vec::with_capacity(array_size); - for _ in 0..array_size { - // no tsymbol, just elements and their sizes - array.push(self.parse_next_string()?); - } - Ok(array) + fn read_binary_nullck(&mut self) -> ParseResult>> { + if let Some(size) = self.read_usize_nullck()? { + Ok(Some(self.read_until(size)?.to_owned())) } else { - Err(ParseError::NotEnough) + Ok(None) } } - - /// The cursor should have passed the `@+` chars - fn parse_next_typed_array_str(&mut self) -> ParseResult>> { - let (start, stop) = self.read_line(); - if let Some(our_size_chunk) = self.buffer.get(start..stop) { - // so we have a size chunk; let's get the size - let array_size = Self::parse_into_usize(our_size_chunk)?; - let mut array = Vec::with_capacity(array_size); - for _ in 0..array_size { - // no tsymbol, just elements and their sizes - array.push(self.parse_next_str_nullck()?); - } - Ok(array) - } else { - Err(ParseError::NotEnough) - } + fn read_binary(&mut self) -> ParseResult> { + let size = self.read_usize()?; + Ok(self.read_until(size)?.to_owned()) } - /// The cursor should have passed the `@?` chars - fn parse_next_typed_array_bin(&mut self) -> ParseResult>>> { - let (start, stop) = self.read_line(); - if let Some(our_size_chunk) = self.buffer.get(start..stop) { - // got size chunk, let's get the size - let array_size = Self::parse_into_usize(our_size_chunk)?; - let mut array = Vec::with_capacity(array_size); - for _ in 0..array_size { - array.push(self.parse_next_binstr_nullck()?); - } - Ok(array) - } else { - Err(ParseError::NotEnough) - } + fn read_respcode(&mut self) -> ParseResult { + let line = self.read_line()?; + let st = str::from_utf8(line)?; + Ok(RespCode::from_str(st)) } - - /// The cursor should have passed the `@+` chars - fn parse_next_nonnull_bin(&mut self) -> ParseResult>> { - let (start, stop) = self.read_line(); - if let Some(our_size_chunk) = self.buffer.get(start..stop) { - // so we have a size chunk; let's get the size - let array_size = Self::parse_into_usize(our_size_chunk)?; - let mut array = Vec::with_capacity(array_size); - for _ in 0..array_size { - // no tsymbol, just elements and their sizes - array.push(self.parse_next_binstr()?); + fn read_float(&mut self) -> ParseResult { + let line = self.read_line()?; + let st = str::from_utf8(line)?; + Ok(st.parse()?) + } + fn read_flat_array(&mut self) -> ParseResult> { + let array_len = self.read_usize()?; + let mut data = Vec::with_capacity(array_len); + for _ in 0..array_len { + match self.try_read_cursor()? { + b'+' => data.push(FlatElement::String(self.read_string()?)), + b'?' => data.push(FlatElement::Binstr(self.read_binary()?)), + b'!' => data.push(FlatElement::RespCode(self.read_respcode()?)), + b':' => data.push(FlatElement::UnsignedInt(self.read_u64()?)), + b'%' => data.push(FlatElement::Float(self.read_float()?)), + _ => return Err(ParseError::UnknownDatatype), } - Ok(array) - } else { - Err(ParseError::NotEnough) } + Ok(data) } - - /// The cursor should have passed the tsymbol - fn parse_next_flat_array(&mut self) -> ParseResult> { - let (start, stop) = self.read_line(); - if let Some(our_size_chunk) = self.buffer.get(start..stop) { - let array_size = Self::parse_into_usize(our_size_chunk)?; - let mut array = Vec::with_capacity(array_size); - for _ in 0..array_size { - if let Some(tsymbol) = self.buffer.get(self.cursor) { - // good, there is a tsymbol; move the cursor ahead - self.incr_cursor(); - let ret = match *tsymbol { - b'+' => FlatElement::String(self.parse_next_string()?), - b'?' => FlatElement::Binstr(self.parse_next_binstr()?), - b'!' => FlatElement::RespCode(self.parse_next_respcode()?), - b':' => FlatElement::UnsignedInt(self.parse_next_u64()?), - _ => return Err(ParseError::UnknownDatatype), - }; - array.push(ret); - } else { - return Err(ParseError::NotEnough); - } - } - Ok(array) - } else { - Err(ParseError::NotEnough) + fn read_typed_array_string(&mut self) -> ParseResult>> { + let size = self.read_usize()?; + let mut data = Vec::with_capacity(size); + for _ in 0..size { + data.push(self.read_string_nullck()?); } + Ok(data) } - /// The tsymbol `&` should have been passed! - fn parse_next_array(&mut self) -> ParseResult> { - let (start, stop) = self.read_line(); - if let Some(our_size_chunk) = self.buffer.get(start..stop) { - let array_size = Self::parse_into_usize(our_size_chunk)?; - let mut array = Vec::with_capacity(array_size); - for _ in 0..array_size { - array.push(self.parse_next_element()?); - } - Ok(array) - } else { - Err(ParseError::NotEnough) + fn read_typed_array_binary(&mut self) -> ParseResult>>> { + let size = self.read_usize()?; + let mut data = Vec::with_capacity(size); + for _ in 0..size { + data.push(self.read_binary_nullck()?); } + Ok(data) } - /// Parse a query and return the [`Query`] and an `usize` indicating the number of bytes that - /// can be safely discarded from the buffer. It will otherwise return errors if they are found. - /// - /// This object will drop `Self` - pub fn parse(mut self) -> Result<(RawResponse, usize), ParseError> { - let number_of_queries = self.parse_metaframe_get_datagroup_count()?; - if number_of_queries == 0 { - // how on earth do you expect us to execute 0 queries? waste of bandwidth - return Err(ParseError::BadPacket); + fn read_typed_array(&mut self) -> ParseResult { + let r = match self.try_read_cursor()? { + b'+' => Element::Array(Array::Str(self.read_typed_array_string()?)), + b'?' => Element::Array(Array::Bin(self.read_typed_array_binary()?)), + _ => return Err(ParseError::UnknownDatatype), + }; + Ok(r) + } + fn read_typed_nonnull_array_string(&mut self) -> ParseResult> { + let size = self.read_usize()?; + let mut data = Vec::with_capacity(size); + for _ in 0..size { + data.push(self.read_string()?); } - if number_of_queries == 1 { - // This is a simple query - let single_group = self.parse_next_element()?; - // The below line defaults to false if no item is there in the buffer - // or it checks if the next time is a \r char; if it is, then it is the beginning - // of the next query - #[allow(clippy::blocks_in_if_conditions)] - // this lint is pointless here, just some optimizations - if self - .will_cursor_give_char(b'*', true) - .unwrap_or_else(|_| unsafe { - // This will never be the case because we'll always get a result and no error value - // as we've passed true which will yield Ok(true) even if there is no byte ahead - unreachable_unchecked() - }) - { - Ok((RawResponse::SimpleQuery(single_group), self.cursor)) - } else { - // the next item isn't the beginning of a query but something else? - // that doesn't look right! - Err(ParseError::UnexpectedByte) - } - } else { - // This is a pipelined query - // We'll first make space for all the actiongroups - let mut queries = Vec::with_capacity(number_of_queries); - for _ in 0..number_of_queries { - queries.push(self.parse_next_element()?); - } - if self.will_cursor_give_char(b'*', true)? { - Ok((RawResponse::PipelinedQuery(queries), self.cursor)) - } else { - Err(ParseError::UnexpectedByte) - } + Ok(data) + } + fn read_typed_nonnull_array_binary(&mut self) -> ParseResult>> { + let size = self.read_usize()?; + let mut data = Vec::with_capacity(size); + for _ in 0..size { + data.push(self.read_binary()?); } + Ok(data) + } + fn read_typed_nonnull_array(&mut self) -> ParseResult { + let r = match self.try_read_cursor()? { + b'+' => Element::Array(Array::NonNullStr(self.read_typed_nonnull_array_string()?)), + b'?' => Element::Array(Array::NonNullBin(self.read_typed_nonnull_array_binary()?)), + _ => return Err(ParseError::UnknownDatatype), + }; + Ok(r) + } + fn consumed(&self) -> usize { + self.cursor } } -#[test] -fn test_typed_str_array() { - let typed_array_packet = "*1\n@+3\n3\nthe\n3\ncat\n6\nmeowed\n".as_bytes(); - let (parsed, forward) = Parser::new(typed_array_packet).parse().unwrap(); - assert_eq!(forward, typed_array_packet.len()); - assert_eq!( - parsed, - RawResponse::SimpleQuery(Element::Array(Array::Str(vec![ - Some("the".to_owned()), - Some("cat".to_owned()), - Some("meowed".to_owned()) - ]))) - ); +// response methods +impl<'a> Parser<'a> { + fn _read_simple_resp(&mut self) -> ParseResult { + let r = match self.try_read_cursor()? { + b'+' => Element::String(self.read_string()?), + b'?' => Element::Binstr(self.read_binary()?), + b'!' => Element::RespCode(self.read_respcode()?), + b':' => Element::UnsignedInt(self.read_u64()?), + b'%' => Element::Float(self.read_float()?), + b'@' => self.read_typed_array()?, + b'^' => self.read_typed_nonnull_array()?, + b'_' => Element::Array(Array::Flat(self.read_flat_array()?)), + _ => return Err(ParseError::UnknownDatatype), + }; + Ok(r) + } + fn read_simple_resp(&mut self) -> ParseResult { + self._read_simple_resp() + } + fn read_pipeline_resp(&mut self) -> ParseResult> { + let size = self.read_usize()?; + let mut resps = Vec::with_capacity(size); + for _ in 0..size { + resps.push(self._read_simple_resp()?); + } + Ok(resps) + } + fn _parse(&mut self) -> ParseResult { + let r = match self.try_read_cursor()? { + b'*' => RawResponse::SimpleQuery(self.read_simple_resp()?), + b'$' => RawResponse::PipelinedQuery(self.read_pipeline_resp()?), + _ => return Err(ParseError::BadPacket), + }; + Ok(r) + } + pub fn parse(buffer: &'a [u8]) -> ParseResult<(RawResponse, usize)> { + let mut slf = Self::new(buffer); + let r = slf._parse()?; + Ok((r, slf.consumed())) + } } #[test] -fn test_typed_bin_array() { - let typed_array_packet = "*1\n@?3\n3\nthe\n3\ncat\n6\nmeowed\n".as_bytes(); - let (parsed, forward) = Parser::new(typed_array_packet).parse().unwrap(); - assert_eq!(forward, typed_array_packet.len()); +fn set_resp() { + let setresp = b"*!0\n".to_vec(); + let (ret, skip) = Parser::parse(&setresp).unwrap(); + assert_eq!(skip, setresp.len()); assert_eq!( - parsed, - RawResponse::SimpleQuery(Element::Array(Array::Bin(vec![ - Some(Vec::from("the")), - Some(Vec::from("cat")), - Some(Vec::from("meowed")) - ]))) + ret, + RawResponse::SimpleQuery(Element::RespCode(RespCode::Okay)) ); } #[test] -fn test_typed_bin_array_null() { - let typed_array_packet = "*1\n@?3\n3\nthe\n3\ncat\n\0\n".as_bytes(); - let (parsed, forward) = Parser::new(typed_array_packet).parse().unwrap(); - assert_eq!(forward, typed_array_packet.len()); +fn mget_resp() { + let mgetresp = b"*@+4\n5\nsayan2\nis8\nthinking\0".to_vec(); + let (ret, skip) = Parser::parse(&mgetresp).unwrap(); assert_eq!( - parsed, - RawResponse::SimpleQuery(Element::Array(Array::Bin(vec![ - Some(Vec::from("the")), - Some(Vec::from("cat")), + ret, + RawResponse::SimpleQuery(Element::Array(Array::Str(vec![ + Some("sayan".to_owned()), + Some("is".to_owned()), + Some("thinking".to_owned()), None ]))) ); + assert_eq!(skip, mgetresp.len()); } #[test] -fn test_typed_str_array_null() { - let typed_array_packet = "*1\n@+3\n3\nthe\n3\ncat\n\0\n".as_bytes(); - let (parsed, forward) = Parser::new(typed_array_packet).parse().unwrap(); - assert_eq!(forward, typed_array_packet.len()); +fn pipe_resp() { + let resp = b"$2\n!0\n@+4\n5\nsayan2\nis8\nthinking\0".to_vec(); + let (ret, skip) = Parser::parse(&resp).unwrap(); assert_eq!( - parsed, - RawResponse::SimpleQuery(Element::Array(Array::Str(vec![ - Some("the".to_owned()), - Some("cat".to_owned()), - None - ]))) + ret, + RawResponse::PipelinedQuery(vec![ + Element::RespCode(RespCode::Okay), + Element::Array(Array::Str(vec![ + Some("sayan".to_owned()), + Some("is".to_owned()), + Some("thinking".to_owned()), + None + ])) + ]) ); + assert_eq!(skip, resp.len()); } #[test] -fn test_parse_float() { - let packet = b"*1\n%3\n1.1\n"; - let (parsed, forward) = Parser::new(packet).parse().unwrap(); - assert_eq!(forward, packet.len()); - assert_eq!(parsed, RawResponse::SimpleQuery(Element::Float(1.1))) -} - -#[test] -fn test_parse_nonnull_str() { - let packet = b"*1\n^+2\n2\nhi\n5\nthere\n"; - let (parsed, forward) = Parser::new(packet).parse().unwrap(); - assert_eq!(forward, packet.len()); +fn lskeys_resp() { + let resp = b"*^+3\n5\nsayan2\nis8\nthinking".to_vec(); + let (ret, skip) = Parser::parse(&resp).unwrap(); assert_eq!( - parsed, + ret, RawResponse::SimpleQuery(Element::Array(Array::NonNullStr(vec![ - "hi".to_owned(), - "there".to_owned() + "sayan".to_string(), + "is".to_string(), + "thinking".to_string() ]))) - ) -} - -#[test] -fn test_parse_nonnull_bin() { - let packet = b"*1\n^?2\n2\nhi\n5\nthere\n"; - let (parsed, forward) = Parser::new(packet).parse().unwrap(); - assert_eq!(forward, packet.len()); - assert_eq!( - parsed, - RawResponse::SimpleQuery(Element::Array(Array::NonNullBin(vec![ - "hi".as_bytes().to_owned(), - "there".as_bytes().to_owned() - ]))) - ) + ); + assert_eq!(skip, resp.len()); } diff --git a/src/lib.rs b/src/lib.rs index 041d494..4889079 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -229,7 +229,6 @@ pub mod ddl; pub mod error; pub mod pool; pub mod types; -pub mod v2; // endof public mods // private mods mod deserializer; diff --git a/src/sync.rs b/src/sync.rs index d9f7685..6dc30db 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -94,16 +94,13 @@ macro_rules! impl_sync_methods { } Err(e) => match e { ParseError::NotEnough => (), - ParseError::BadPacket | ParseError::UnexpectedByte => { + ParseError::BadPacket => { self.buffer.clear(); return Err(SkyhashError::InvalidResponse.into()); } ParseError::DataTypeError => { return Err(SkyhashError::ParseError.into()) } - ParseError::Empty => { - return Err(IoError::from(ErrorKind::ConnectionReset).into()) - } ParseError::UnknownDatatype => { return Err(SkyhashError::UnknownDataType.into()) } @@ -112,11 +109,7 @@ macro_rules! impl_sync_methods { } } fn try_response(&mut self) -> Result<(RawResponse, usize), ParseError> { - if self.buffer.is_empty() { - // The connection was possibly reset - return Err(ParseError::Empty); - } - Parser::new(&self.buffer).parse() + Parser::parse(&self.buffer) } } impl crate::actions::SyncSocket for $ty { diff --git a/src/v2/mod.rs b/src/v2/mod.rs deleted file mode 100644 index 6744c70..0000000 --- a/src/v2/mod.rs +++ /dev/null @@ -1,433 +0,0 @@ -/* - * Copyright 2022, Sayan Nandan - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -use crate::{ - types::{Array, FlatElement}, - Element, RespCode, -}; -use core::{ - num::{ParseFloatError, ParseIntError}, - slice, - str::{self, Utf8Error}, -}; - -/// A generic result to indicate parsing errors thorugh the [`ParseError`] enum -pub type ParseResult = Result; - -#[derive(Debug, PartialEq)] -#[non_exhaustive] -#[repr(u8)] -/// # Parser Errors -/// -/// Several errors can arise during parsing and this enum accounts for them -pub enum ParseError { - /// Didn't get the number of expected bytes - NotEnough, - /// The packet simply contains invalid data - BadPacket, - /// The query contains an unexpected byte - UnexpectedByte, - /// A data type was given but the parser failed to serialize it into this type - DataTypeError, - /// A data type that the client doesn't know was passed into the query - /// - /// This is a frequent problem that can arise between different server editions as more data types - /// can be added with changing server versions - UnknownDatatype, -} - -impl From for ParseError { - fn from(_: ParseIntError) -> Self { - Self::DataTypeError - } -} - -impl From for ParseError { - fn from(_: Utf8Error) -> Self { - Self::DataTypeError - } -} - -impl From for ParseError { - fn from(_: ParseFloatError) -> Self { - Self::DataTypeError - } -} - -#[derive(Debug, PartialEq)] -/// # Response types -/// -/// A simple response carries the response for a simple query while a pipelined response carries the response -/// for pipelined queries -pub enum RawResponse { - /// A simple query will just hold one element - SimpleQuery(Element), - /// A pipelined/batch query will hold multiple elements - PipelinedQuery(Vec), -} - -pub struct Parser<'a> { - slice: &'a [u8], - cursor: usize, -} - -impl<'a> Parser<'a> { - pub fn new(slice: &'a [u8]) -> Self { - Self { - slice, - cursor: 0usize, - } - } - fn remaining(&self) -> usize { - self.slice.len() - self.cursor - } - fn has_remaining(&self, c: usize) -> bool { - self.remaining() >= c - } - fn not_exhausted(&self) -> bool { - self.cursor < self.slice.len() - } - unsafe fn direct_read(&self, s: usize, c: usize) -> &[u8] { - slice::from_raw_parts(self.slice.as_ptr().add(s), c) - } - // mut refs - fn incr_cursor_by(&mut self, by: usize) { - debug_assert!(self.has_remaining(by), "Buffer overflow"); - self.cursor += by; - } - fn decr_cursor_by(&mut self, by: usize) { - debug_assert!( - self.cursor != 0 && self.cursor.checked_sub(by).is_some(), - "Size underflow" - ); - self.cursor -= 1; - } - fn decr_cursor(&mut self) { - self.decr_cursor_by(1) - } - fn incr_cursor(&mut self) { - self.incr_cursor_by(1) - } - unsafe fn get_byte_at_cursor(&self) -> u8 { - debug_assert!(self.not_exhausted(), "Buffer overflow"); - *self.slice.as_ptr().add(self.cursor) - } - fn read_until(&mut self, c: usize) -> ParseResult<&[u8]> { - if self.has_remaining(c) { - let cursor = self.cursor; - self.incr_cursor_by(c); - let slice = unsafe { - // UNSAFE(@ohsayan): Just verified length - self.direct_read(cursor, c) - }; - Ok(slice) - } else { - Err(ParseError::NotEnough) - } - } - fn read_line(&mut self) -> ParseResult<&[u8]> { - let cursor = self.cursor; - while self.not_exhausted() - && unsafe { - // UNSAFE(@ohsayan): The first condition ensures - // that the current byte is present in the allocation - self.get_byte_at_cursor() - } != b'\n' - { - self.incr_cursor(); - } - if self.not_exhausted() - && unsafe { - // UNSAFE(@ohsayan): The first condition ensures - // that the current byte is present in the allocation - self.get_byte_at_cursor() - } == b'\n' - { - let len = self.cursor - cursor; - self.incr_cursor(); // skip LF - Ok(unsafe { - // UNSAFE(@ohsayan): Just verified length - self.direct_read(cursor, len) - }) - } else { - Err(ParseError::NotEnough) - } - } - fn read_line_pedantic(&mut self) -> ParseResult<&[u8]> { - let cursor = self.cursor; - while self.not_exhausted() - && unsafe { - // UNSAFE(@ohsayan): The first condition ensures - // that the current byte is present in the allocation - self.get_byte_at_cursor() - } != b'\n' - { - self.incr_cursor(); - } - let len = self.cursor - cursor; - let has_lf = self.not_exhausted() - && unsafe { - // UNSAFE(@ohsayan): The first condition ensures - // that the current byte is present in the allocation - self.get_byte_at_cursor() - } == b'\n'; - if self.not_exhausted() && has_lf && len != 0 { - self.incr_cursor(); // skip LF - Ok(unsafe { - // UNSAFE(@ohsayan): Just verified lengths - self.direct_read(cursor, len) - }) - } else { - let r = if has_lf { - ParseError::BadPacket - } else { - ParseError::NotEnough - }; - Err(r) - } - } - fn try_read_cursor(&mut self) -> ParseResult { - if self.not_exhausted() { - let r = unsafe { - // UNSAFE(@ohsayan): Just checked len - self.get_byte_at_cursor() - }; - self.incr_cursor(); - Ok(r) - } else { - Err(ParseError::NotEnough) - } - } -} - -// higher level abstractions -impl<'a> Parser<'a> { - fn read_u64(&mut self) -> ParseResult { - let line = self.read_line_pedantic()?; - let r = str::from_utf8(line)?.parse()?; - Ok(r) - } - fn read_usize(&mut self) -> ParseResult { - let line = self.read_line_pedantic()?; - let r = str::from_utf8(line)?.parse()?; - Ok(r) - } - fn read_usize_nullck(&mut self) -> ParseResult> { - match self.try_read_cursor()? { - b'\0' => { - // null - Ok(None) - } - _ => { - self.decr_cursor(); - let usz = self.read_usize()?; - Ok(Some(usz)) - } - } - } - fn read_string(&mut self) -> ParseResult { - let size = self.read_usize()?; - let line = self.read_until(size)?; - let r = str::from_utf8(line)?.to_owned(); - Ok(r) - } - fn read_string_nullck(&mut self) -> ParseResult> { - if let Some(size) = self.read_usize_nullck()? { - Ok(Some(str::from_utf8(self.read_until(size)?)?.to_owned())) - } else { - Ok(None) - } - } - fn read_binary_nullck(&mut self) -> ParseResult>> { - if let Some(size) = self.read_usize_nullck()? { - Ok(Some(self.read_until(size)?.to_owned())) - } else { - Ok(None) - } - } - fn read_binary(&mut self) -> ParseResult> { - let size = self.read_usize()?; - Ok(self.read_until(size)?.to_owned()) - } - fn read_respcode(&mut self) -> ParseResult { - let line = self.read_line()?; - let st = str::from_utf8(line)?; - Ok(RespCode::from_str(st)) - } - fn read_float(&mut self) -> ParseResult { - let line = self.read_line()?; - let st = str::from_utf8(line)?; - Ok(st.parse()?) - } - fn read_flat_array(&mut self) -> ParseResult> { - let array_len = self.read_usize()?; - let mut data = Vec::with_capacity(array_len); - for _ in 0..array_len { - match self.try_read_cursor()? { - b'+' => data.push(FlatElement::String(self.read_string()?)), - b'?' => data.push(FlatElement::Binstr(self.read_binary()?)), - b'!' => data.push(FlatElement::RespCode(self.read_respcode()?)), - b':' => data.push(FlatElement::UnsignedInt(self.read_u64()?)), - b'%' => data.push(FlatElement::Float(self.read_float()?)), - _ => return Err(ParseError::UnknownDatatype), - } - } - Ok(data) - } - fn read_typed_array_string(&mut self) -> ParseResult>> { - let size = self.read_usize()?; - let mut data = Vec::with_capacity(size); - for _ in 0..size { - data.push(self.read_string_nullck()?); - } - Ok(data) - } - fn read_typed_array_binary(&mut self) -> ParseResult>>> { - let size = self.read_usize()?; - let mut data = Vec::with_capacity(size); - for _ in 0..size { - data.push(self.read_binary_nullck()?); - } - Ok(data) - } - fn read_typed_array(&mut self) -> ParseResult { - let r = match self.try_read_cursor()? { - b'+' => Element::Array(Array::Str(self.read_typed_array_string()?)), - b'?' => Element::Array(Array::Bin(self.read_typed_array_binary()?)), - _ => return Err(ParseError::UnknownDatatype), - }; - Ok(r) - } - fn read_typed_nonnull_array_string(&mut self) -> ParseResult> { - let size = self.read_usize()?; - let mut data = Vec::with_capacity(size); - for _ in 0..size { - data.push(self.read_string()?); - } - Ok(data) - } - fn read_typed_nonnull_array_binary(&mut self) -> ParseResult>> { - let size = self.read_usize()?; - let mut data = Vec::with_capacity(size); - for _ in 0..size { - data.push(self.read_binary()?); - } - Ok(data) - } - fn read_typed_nonnull_array(&mut self) -> ParseResult { - let r = match self.try_read_cursor()? { - b'+' => Element::Array(Array::NonNullStr(self.read_typed_nonnull_array_string()?)), - b'?' => Element::Array(Array::NonNullBin(self.read_typed_nonnull_array_binary()?)), - _ => return Err(ParseError::UnknownDatatype), - }; - Ok(r) - } -} - -// response methods -impl<'a> Parser<'a> { - fn _read_simple_resp(&mut self) -> ParseResult { - let r = match self.try_read_cursor()? { - b'+' => Element::String(self.read_string()?), - b'?' => Element::Binstr(self.read_binary()?), - b'!' => Element::RespCode(self.read_respcode()?), - b':' => Element::UnsignedInt(self.read_u64()?), - b'%' => Element::Float(self.read_float()?), - b'@' => self.read_typed_array()?, - b'^' => self.read_typed_nonnull_array()?, - b'_' => Element::Array(Array::Flat(self.read_flat_array()?)), - _ => return Err(ParseError::UnknownDatatype), - }; - Ok(r) - } - fn read_simple_resp(&mut self) -> ParseResult { - self._read_simple_resp() - } - fn read_pipeline_resp(&mut self) -> ParseResult> { - let size = self.read_usize()?; - let mut resps = Vec::with_capacity(size); - for _ in 0..size { - resps.push(self._read_simple_resp()?); - } - Ok(resps) - } - fn _parse(&mut self) -> ParseResult { - let r = match self.try_read_cursor()? { - b'*' => RawResponse::SimpleQuery(self.read_simple_resp()?), - b'$' => RawResponse::PipelinedQuery(self.read_pipeline_resp()?), - _ => return Err(ParseError::BadPacket), - }; - Ok(r) - } - pub fn parse(buffer: &'a [u8]) -> ParseResult { - let mut slf = Self::new(buffer); - slf._parse() - } -} - -#[test] -fn set_resp() { - let setresp = b"*!0\n".to_vec(); - assert_eq!( - Parser::parse(&setresp).unwrap(), - RawResponse::SimpleQuery(Element::RespCode(RespCode::Okay)) - ); -} - -#[test] -fn mget_resp() { - let mgetresp = b"*@+4\n5\nsayan2\nis8\nthinking\0\n".to_vec(); - let ret = Parser::parse(&mgetresp).unwrap(); - assert_eq!( - ret, - RawResponse::SimpleQuery(Element::Array(Array::Str(vec![ - Some("sayan".to_owned()), - Some("is".to_owned()), - Some("thinking".to_owned()), - None - ]))) - ) -} - -#[test] -fn pipe_resp() { - let resp = b"$2\n!0\n@+4\n5\nsayan2\nis8\nthinking\0\n".to_vec(); - assert_eq!( - Parser::parse(&resp).unwrap(), - RawResponse::PipelinedQuery(vec![ - Element::RespCode(RespCode::Okay), - Element::Array(Array::Str(vec![ - Some("sayan".to_owned()), - Some("is".to_owned()), - Some("thinking".to_owned()), - None - ])) - ]) - ) -} - -#[test] -fn lskeys_resp() { - let resp = b"*^+3\n5\nsayan2\nis8\nthinking".to_vec(); - assert_eq!( - Parser::parse(&resp).unwrap(), - RawResponse::SimpleQuery(Element::Array(Array::NonNullStr(vec![ - "sayan".to_string(), - "is".to_string(), - "thinking".to_string() - ]))) - ); -} From 5cce19c035323015757d86bf4cd946356c80295f Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Tue, 19 Apr 2022 19:53:04 +0530 Subject: [PATCH 4/4] Add changelog entry [skip ci] --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e907c55..54a9e3e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ All changes in this project will be noted in this file. +## Unreleased + +### New features + +- Support for Skyhash 2.0 + ## 0.7.0 ### New features