Skip to content
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

All changes in this project will be noted in this file.

### 0.8.7 (unreleased)

> **BREAKING PATCH DUE TO MINIMUM VERSION UPGRADE**
> - **Minimum Supported Skytable Version**: 0.8.2
> - **Field change warnings**:
> - The `Config` struct now has one additional field. This is not a breaking change because the functionality of the library remains unchanged
- Added support for pipelines

### 0.8.6

Reduced allocations in `Query`.
Expand Down
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Introduction

This library is the official client for the free and open-source NoSQL database [Skytable](https://github.com/skytable/skytable). First, go ahead and install Skytable by following the instructions [here](https://docs.skytable.io/getting-started). This library supports all Skytable versions that work with the [Skyhash 2 Protocol](https://docs.skytable.io/protocol/overview). This version of the library was tested with the latest Skytable release (release [0.8.0-beta](https://github.com/skytable/skytable/releases/v0.8.0-beta)).
This library is the official client for the free and open-source NoSQL database [Skytable](https://github.com/skytable/skytable). First, go ahead and install Skytable by following the instructions [here](https://docs.skytable.io/getting-started). This library supports all Skytable versions that work with the [Skyhash 2 Protocol](https://docs.skytable.io/protocol/overview). This version of the library was tested with the latest Skytable release (release [0.8.1](https://github.com/skytable/skytable/releases/v0.8.1)). [Read more about supported versions here](#version-support).

## Definitive example

Expand Down Expand Up @@ -53,6 +53,12 @@ assert_eq!(user, our_user);

> **Read [docs here to learn BlueQL](https://docs.skytable.io/)**


## Version support

- Minimum Supported Rust Version (MSRV): 1.51.0
- Minimum Supported Skytable Version: 0.8.0

## Features

- Sync API
Expand All @@ -64,8 +70,8 @@ assert_eq!(user, our_user);

## Contributing

Open-source, and contributions ... — they're always welcome! For ideas and suggestions, [create an issue on GitHub](https://github.com/skytable/client-rust/issues/new) and for patches, fork and open those pull requests [here](https://github.com/skytable/client-rust)!
Contributions are always welcome. To submit patches please fork this repository and submit a pull request. If you find any bugs, [please open an issue here](https://github.com/skytable/client-rust/issues/new).

## License

This client library is distributed under the permissive [Apache-2.0 License](https://github.com/skytable/client-rust/blob/next/LICENSE). Now go build great apps!
This library is distributed under the [Apache-2.0 License](https://github.com/skytable/client-rust/blob/next/LICENSE).
43 changes: 32 additions & 11 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@
*/

//! # Configuration
//!
//!
//! This module provides items to help with database connection setup and configuration.
//!
//!
//! ## Example
//!
//!
//! ```no_run
//! use skytable::Config;
//!
//!
//! // establish a sync connection to 127.0.0.1:2003
//! let mut db = Config::new_default("username", "password").connect().unwrap();
//!
//!
//! // establish a connection to a specific host `subnetx2_db1` and port `2008`
//! let mut db = Config::new("subnetx2_db1", 2008, "username", "password").connect().unwrap();
//! ```

use crate::protocol::handshake::ProtocolVersion;

/// The default host
///
/// NOTE: If you are using a clustering setup, don't use this!
Expand All @@ -46,21 +48,40 @@ pub struct Config {
port: u16,
username: Box<str>,
password: Box<str>,
pub(crate) protocol: ProtocolVersion,
}

impl Config {
fn _new(
host: Box<str>,
port: u16,
username: Box<str>,
password: Box<str>,
protocol: ProtocolVersion,
) -> Self {
Self {
host,
port,
username,
password,
protocol,
}
}
/// Create a new [`Config`] using the default connection settings and using the provided username and password
pub fn new_default(username: &str, password: &str) -> Self {
Self::new(DEFAULT_HOST, DEFAULT_TCP_PORT, username, password)
}
/// Create a new [`Config`] using the given settings
/// Create a new [`Config`] using the given settings.
///
/// **PROTOCOL VERSION**: Defaults to [`ProtocolVersion::V2_0`]
pub fn new(host: &str, port: u16, username: &str, password: &str) -> Self {
Self {
host: host.into(),
Self::_new(
host.into(),
port,
username: username.into(),
password: password.into(),
}
username.into(),
password.into(),
ProtocolVersion::V2_0,
)
}
/// Returns the host setting for this this configuration
pub fn host(&self) -> &str {
Expand Down
1 change: 1 addition & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ impl fmt::Display for ProtocolError {
Self::InvalidServerResponseUnknownDataType => {
write!(f, "new or unknown data type received from server")
}
Self::InvalidPacket => write!(f, "invalid packet received from server"),
}
}
}
Expand Down
88 changes: 62 additions & 26 deletions src/aio.rs → src/io/aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
use {
crate::{
error::{ClientResult, ConnectionSetupError, Error},
protocol::{ClientHandshake, DecodeState, Decoder, RState, ServerHandshake},
protocol::{
handshake::{ClientHandshake, ServerHandshake},
state_init::{DecodeState, MRespState, PipelineResult, RState},
Decoder,
},
query::Pipeline,
response::{FromResponse, Response},
Config, Query,
},
Expand Down Expand Up @@ -80,17 +85,12 @@ impl DerefMut for ConnectionTlsAsync {
impl Config {
/// Establish an async connection to the database using the current configuration
pub async fn connect_async(&self) -> ClientResult<ConnectionAsync> {
let mut tcpstream = TcpStream::connect((self.host(), self.port())).await?;
let handshake = ClientHandshake::new(self);
tcpstream.write_all(handshake.inner()).await?;
let mut resp = [0u8; 4];
tcpstream.read_exact(&mut resp).await?;
match ServerHandshake::parse(resp)? {
ServerHandshake::Error(e) => return Err(ConnectionSetupError::HandshakeError(e).into()),
ServerHandshake::Okay(_suggestion) => {
return Ok(ConnectionAsync(TcpConnection::new(tcpstream)))
}
}
TcpStream::connect((self.host(), self.port()))
.await
.map(TcpConnection::new)?
._handshake(self)
.await
.map(ConnectionAsync)
}
/// Establish an async TLS connection to the database using the current configuration.
/// Pass the certificate in PEM format.
Expand All @@ -110,22 +110,15 @@ impl Config {
let connector = builder.build().map_err(|e| {
ConnectionSetupError::Other(format!("failed to set up TLS acceptor: {e}"))
})?;
// init
let mut stream = TlsConnector::from(connector)
// init and handshake
TlsConnector::from(connector)
.connect(self.host(), stream)
.await
.map_err(|e| ConnectionSetupError::Other(format!("TLS handshake failed: {e}")))?;
// handshake
let handshake = ClientHandshake::new(self);
stream.write_all(handshake.inner()).await?;
let mut resp = [0u8; 4];
stream.read_exact(&mut resp).await?;
match ServerHandshake::parse(resp)? {
ServerHandshake::Error(e) => return Err(ConnectionSetupError::HandshakeError(e).into()),
ServerHandshake::Okay(_suggestion) => {
return Ok(ConnectionTlsAsync(TcpConnection::new(stream)))
}
}
.map(TcpConnection::new)
.map_err(|e| ConnectionSetupError::Other(format!("TLS handshake failed: {e}")))?
._handshake(self)
.await
.map(ConnectionTlsAsync)
}
}

Expand All @@ -143,6 +136,49 @@ impl<C: AsyncWriteExt + AsyncReadExt + Unpin> TcpConnection<C> {
buf: Vec::with_capacity(crate::BUFSIZE),
}
}
async fn _handshake(mut self, cfg: &Config) -> ClientResult<Self> {
let handshake = ClientHandshake::new(cfg);
self.con.write_all(handshake.inner()).await?;
let mut resp = [0u8; 4];
self.con.read_exact(&mut resp).await?;
match ServerHandshake::parse(resp)? {
ServerHandshake::Error(e) => return Err(ConnectionSetupError::HandshakeError(e).into()),
ServerHandshake::Okay(_suggestion) => return Ok(self),
}
}
/// Execute a pipeline. The server returns the queries in the order they were sent (unless otherwise set).
pub async fn execute_pipeline(&mut self, pipeline: &Pipeline) -> ClientResult<Vec<Response>> {
self.buf.clear();
self.buf.push(b'P');
// packet size
self.buf
.extend(itoa::Buffer::new().format(pipeline.buf().len()).as_bytes());
self.buf.push(b'\n');
// write
self.con.write_all(&self.buf).await?;
self.con.write_all(pipeline.buf()).await?;
self.buf.clear();
// read
let mut cursor = 0;
let mut state = MRespState::default();
loop {
let mut buf = [0u8; crate::BUFSIZE];
let n = self.con.read(&mut buf).await?;
if n == 0 {
return Err(Error::IoError(std::io::ErrorKind::ConnectionReset.into()));
}
self.buf.extend_from_slice(&buf[..n]);
let mut decoder = Decoder::new(&self.buf, cursor);
match decoder.validate_pipe(pipeline.query_count(), state) {
PipelineResult::Completed(r) => return Ok(r),
PipelineResult::Pending(_state) => {
cursor = decoder.position();
state = _state;
}
PipelineResult::Error(e) => return Err(e.into()),
}
}
}
/// Run a query and return a raw [`Response`]
pub async fn query(&mut self, q: &Query) -> ClientResult<Response> {
self.buf.clear();
Expand Down
18 changes: 18 additions & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright 2023, Sayan Nandan <nandansayan@outlook.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

pub mod aio;
pub mod sync;
Loading