Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/client_json.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![deny(warnings)]
#![warn(rust_2018_idioms)]


use hyper::Body;
use hyper::{body::Buf, Request};
use serde::Deserialize;
Expand Down
2 changes: 1 addition & 1 deletion examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {

// Return the 404 Not Found for other routes.
_ => {
let mut not_found = Response::default();
let mut not_found = Response::new(());
*not_found.status_mut() = StatusCode::NOT_FOUND;
Ok(not_found)
}
Expand Down
123 changes: 19 additions & 104 deletions src/body/body.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::borrow::Cow;
use std::fmt;

use bytes::Bytes;
Expand Down Expand Up @@ -35,7 +34,8 @@ pub struct Body {
}

enum Kind {
Once(Option<Bytes>),
#[allow(dead_code)]
Empty,
Chan {
content_length: DecodedLength,
want_tx: watch::Sender,
Expand Down Expand Up @@ -104,21 +104,6 @@ const WANT_PENDING: usize = 1;
const WANT_READY: usize = 2;

impl Body {
/// Create an empty `Body` stream.
///
/// # Example
///
/// ```
/// use hyper::{Body, Request};
///
/// // create a `GET /` request
/// let get = Request::new(Body::empty());
/// ```
#[inline]
pub fn empty() -> Body {
Body::new(Kind::Once(None))
}

/// Create a `Body` stream with an associated sender half.
///
/// Useful when wanting to stream chunks from another thread.
Expand Down Expand Up @@ -176,6 +161,18 @@ impl Body {
body
}

#[cfg(feature = "ffi")]
#[inline]
pub(crate) fn ffi() -> Self {
Body::new(Kind::Ffi(crate::ffi::UserBody::new()))
}

#[allow(dead_code)]
#[inline]
pub(crate) fn empty() -> Self {
Body::new(Kind::Empty)
}

#[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
Expand Down Expand Up @@ -249,7 +246,7 @@ impl Body {

fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
match self.kind {
Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
Kind::Empty => Poll::Ready(None),
Kind::Chan {
content_length: ref mut len,
ref mut data_rx,
Expand Down Expand Up @@ -286,23 +283,6 @@ impl Body {
Kind::Ffi(ref mut body) => body.poll_data(cx),
}
}

#[cfg(feature = "http1")]
pub(super) fn take_full_data(&mut self) -> Option<Bytes> {
if let Kind::Once(ref mut chunk) = self.kind {
chunk.take()
} else {
None
}
}
}

impl Default for Body {
/// Returns `Body::empty()`.
#[inline]
fn default() -> Body {
Body::empty()
}
}

impl HttpBody for Body {
Expand All @@ -321,6 +301,7 @@ impl HttpBody for Body {
#[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
match self.kind {
Kind::Empty => Poll::Ready(Ok(None)),
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
Kind::H2 {
recv: ref mut h2,
Expand All @@ -342,13 +323,12 @@ impl HttpBody for Body {
},
#[cfg(feature = "ffi")]
Kind::Ffi(ref mut body) => body.poll_trailers(cx),
_ => Poll::Ready(Ok(None)),
}
}

fn is_end_stream(&self) -> bool {
match self.kind {
Kind::Once(ref val) => val.is_none(),
Kind::Empty => true,
Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
Expand All @@ -371,8 +351,7 @@ impl HttpBody for Body {
}

match self.kind {
Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64),
Kind::Once(None) => SizeHint::with_exact(0),
Kind::Empty => SizeHint::with_exact(0),
Kind::Chan { content_length, .. } => opt_len!(content_length),
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
Kind::H2 { content_length, .. } => opt_len!(content_length),
Expand All @@ -388,79 +367,17 @@ impl fmt::Debug for Body {
struct Streaming;
#[derive(Debug)]
struct Empty;
#[derive(Debug)]
struct Full<'a>(&'a Bytes);

let mut builder = f.debug_tuple("Body");
match self.kind {
Kind::Once(None) => builder.field(&Empty),
Kind::Once(Some(ref chunk)) => builder.field(&Full(chunk)),
Kind::Empty => builder.field(&Empty),
_ => builder.field(&Streaming),
};

builder.finish()
}
}

impl From<Bytes> for Body {
#[inline]
fn from(chunk: Bytes) -> Body {
if chunk.is_empty() {
Body::empty()
} else {
Body::new(Kind::Once(Some(chunk)))
}
}
}

impl From<Vec<u8>> for Body {
#[inline]
fn from(vec: Vec<u8>) -> Body {
Body::from(Bytes::from(vec))
}
}

impl From<&'static [u8]> for Body {
#[inline]
fn from(slice: &'static [u8]) -> Body {
Body::from(Bytes::from(slice))
}
}

impl From<Cow<'static, [u8]>> for Body {
#[inline]
fn from(cow: Cow<'static, [u8]>) -> Body {
match cow {
Cow::Borrowed(b) => Body::from(b),
Cow::Owned(o) => Body::from(o),
}
}
}

impl From<String> for Body {
#[inline]
fn from(s: String) -> Body {
Body::from(Bytes::from(s.into_bytes()))
}
}

impl From<&'static str> for Body {
#[inline]
fn from(slice: &'static str) -> Body {
Body::from(Bytes::from(slice.as_bytes()))
}
}

impl From<Cow<'static, str>> for Body {
#[inline]
fn from(cow: Cow<'static, str>) -> Body {
match cow {
Cow::Borrowed(b) => Body::from(b),
Cow::Owned(o) => Body::from(o),
}
}
}

impl Sender {
/// Check to see if this `Sender` can send more data.
pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
Expand Down Expand Up @@ -595,8 +512,6 @@ mod tests {
assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
}

eq(Body::from("Hello"), SizeHint::with_exact(5), "from str");

eq(Body::empty(), SizeHint::with_exact(0), "empty");

eq(Body::channel().1, SizeHint::new(), "channel");
Expand Down
27 changes: 0 additions & 27 deletions src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,6 @@ mod body;
mod length;
mod to_bytes;

/// An optimization to try to take a full body if immediately available.
///
/// This is currently limited to *only* `hyper::Body`s.
#[cfg(feature = "http1")]
pub(crate) fn take_full_data<T: HttpBody + 'static>(body: &mut T) -> Option<T::Data> {
use std::any::{Any, TypeId};

// This static type check can be optimized at compile-time.
if TypeId::of::<T>() == TypeId::of::<Body>() {
let mut full = (body as &mut dyn Any)
.downcast_mut::<Body>()
.expect("must be Body")
.take_full_data();
// This second cast is required to make the type system happy.
// Without it, the compiler cannot reason that the type is actually
// `T::Data`. Oh wells.
//
// It's still a measurable win!
(&mut full as &mut dyn Any)
.downcast_mut::<Option<T::Data>>()
.expect("must be T::Data")
.take()
} else {
None
}
}

fn _assert_send_sync() {
fn _assert_send<T: Send>() {}
fn _assert_sync<T: Sync>() {}
Expand Down
2 changes: 1 addition & 1 deletion src/client/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ mod tests {
let (mut tx, mut rx) = channel::<Request<Body>, Response<Body>>();

b.iter(move || {
let _ = tx.send(Request::default()).unwrap();
let _ = tx.send(Request::new(Body::empty())).unwrap();
rt.block_on(async {
loop {
let poll_once = PollOnce(&mut rx);
Expand Down
2 changes: 1 addition & 1 deletion src/ffi/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ ffi_fn! {
///
/// If not configured, this body acts as an empty payload.
fn hyper_body_new() -> *mut hyper_body {
Box::into_raw(Box::new(hyper_body(Body::empty())))
Box::into_raw(Box::new(hyper_body(Body::ffi())))
} ?= ptr::null_mut()
}

Expand Down
4 changes: 2 additions & 2 deletions src/ffi/http_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type hyper_request_on_informational_callback = extern "C" fn(*mut c_void, *mut h
ffi_fn! {
/// Construct a new HTTP request.
fn hyper_request_new() -> *mut hyper_request {
Box::into_raw(Box::new(hyper_request(Request::new(Body::empty()))))
Box::into_raw(Box::new(hyper_request(Request::new(Body::ffi()))))
} ?= std::ptr::null_mut()
}

Expand Down Expand Up @@ -335,7 +335,7 @@ ffi_fn! {
///
/// It is safe to free the response even after taking ownership of its body.
fn hyper_response_body(resp: *mut hyper_response) -> *mut hyper_body {
let body = std::mem::take(non_null!(&mut *resp ?= std::ptr::null_mut()).0.body_mut());
let body = std::mem::replace(non_null!(&mut *resp ?= std::ptr::null_mut()).0.body_mut(), crate::Body::empty());
Box::into_raw(Box::new(hyper_body(body)))
} ?= std::ptr::null_mut()
}
Expand Down
18 changes: 0 additions & 18 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,24 +522,6 @@ where
}
}

pub(crate) fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) {
if let Some(encoder) =
self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64)))
{
let is_last = encoder.is_last();
// Make sure we don't write a body if we weren't actually allowed
// to do so, like because its a HEAD request.
if !encoder.is_eof() {
encoder.danger_full_buf(body, self.io.write_buf());
}
self.state.writing = if is_last {
Writing::Closed
} else {
Writing::KeepAlive
}
}
}

fn encode_head(
&mut self,
mut head: MessageHead<T::Outgoing>,
Expand Down
23 changes: 9 additions & 14 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use tracing::{debug, trace};
use super::{Http1Transaction, Wants};
use crate::body::{Body, DecodedLength, HttpBody};
use crate::common::{task, Future, Pin, Poll, Unpin};
use crate::proto::{
BodyLength, Conn, Dispatched, MessageHead, RequestHead,
};
use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead};
use crate::upgrade::OnUpgrade;

pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> {
Expand Down Expand Up @@ -295,16 +293,7 @@ where
&& self.dispatch.should_poll()
{
if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) {
let (head, mut body) = msg.map_err(crate::Error::new_user_service)?;

// Check if the body knows its full data immediately.
//
// If so, we can skip a bit of bookkeeping that streaming
// bodies need to do.
if let Some(full) = crate::body::take_full_data(&mut body) {
self.conn.write_full_msg(head, full);
return Poll::Ready(Ok(()));
}
let (head, body) = msg.map_err(crate::Error::new_user_service)?;

let body_type = if body.is_end_stream() {
self.body_rx.set(None);
Expand Down Expand Up @@ -708,9 +697,15 @@ mod tests {
let dispatcher = Dispatcher::new(Client::new(rx), conn);
let _dispatcher = tokio::spawn(async move { dispatcher.await });

let body = {
let (mut tx, body) = crate::Body::new_channel(DecodedLength::new(4), false);
tx.try_send_data("reee".into()).unwrap();
body
};

let req = crate::Request::builder()
.method("POST")
.body(crate::Body::from("reee"))
.body(body)
.unwrap();

let res = tx.try_send(req).unwrap().await.expect("response");
Expand Down
Loading