fix h1 client for handling expect header request

This commit is contained in:
fakeshadow 2021-03-07 13:50:38 +08:00
parent 78384c3ff5
commit da7afca516
1 changed files with 42 additions and 20 deletions

View File

@ -7,13 +7,15 @@ use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf};
use bytes::buf::BufMut; use bytes::buf::BufMut;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures_core::Stream; use futures_core::Stream;
use futures_util::future::poll_fn; use futures_util::{future::poll_fn, SinkExt, StreamExt};
use futures_util::{SinkExt, StreamExt};
use crate::error::PayloadError; use crate::error::PayloadError;
use crate::h1; use crate::h1;
use crate::header::HeaderMap; use crate::header::HeaderMap;
use crate::http::header::{IntoHeaderValue, HOST}; use crate::http::{
header::{IntoHeaderValue, EXPECT, HOST},
StatusCode,
};
use crate::message::{RequestHeadType, ResponseHead}; use crate::message::{RequestHeadType, ResponseHead};
use crate::payload::{Payload, PayloadStream}; use crate::payload::{Payload, PayloadStream};
@ -66,33 +68,53 @@ where
io: Some(io), io: Some(io),
}; };
// create Framed and send request let is_expect = head.as_ref().headers.contains_key(EXPECT);
let mut framed_inner = Framed::new(io, h1::ClientCodec::default());
framed_inner.send((head, body.size()).into()).await?;
// create Framed and send request
let mut framed = Framed::new(io, h1::ClientCodec::default());
framed.send((head, body.size()).into()).await?;
// make Pin<&mut Framed> for polling it on stack.
let mut pin_framed = Pin::new(&mut framed);
// special handler for EXPECT request.
let (do_send, mut res_head) = if is_expect {
let head = poll_fn(|cx| pin_framed.as_mut().poll_next(cx))
.await
.ok_or(ConnectError::Disconnected)??;
// return response head in case status code is not continue
// and current head would be used as final response head.
(head.status == StatusCode::CONTINUE, Some(head))
} else {
(true, None)
};
if do_send {
// send request body // send request body
match body.size() { match body.size() {
BodySize::None | BodySize::Empty | BodySize::Sized(0) => {} BodySize::None | BodySize::Empty | BodySize::Sized(0) => {}
_ => send_body(body, Pin::new(&mut framed_inner)).await?, _ => send_body(body, pin_framed.as_mut()).await?,
}; };
// read response and init read body // read response and init read body
let res = Pin::new(&mut framed_inner).into_future().await; let head = poll_fn(|cx| pin_framed.as_mut().poll_next(cx))
let (head, framed) = if let (Some(result), framed) = res { .await
let item = result.map_err(SendRequestError::from)?; .ok_or(ConnectError::Disconnected)??;
(item, framed)
} else {
return Err(SendRequestError::from(ConnectError::Disconnected));
};
match framed.codec_ref().message_type() { res_head = Some(head);
}
let head = res_head.unwrap();
match pin_framed.codec_ref().message_type() {
h1::MessageType::None => { h1::MessageType::None => {
let force_close = !framed.codec_ref().keepalive(); let force_close = !pin_framed.codec_ref().keepalive();
release_connection(framed, force_close); release_connection(pin_framed, force_close);
Ok((head, Payload::None)) Ok((head, Payload::None))
} }
_ => { _ => {
let pl: PayloadStream = PlStream::new(framed_inner).boxed_local(); let pl: PayloadStream = PlStream::new(framed).boxed_local();
Ok((head, pl.into())) Ok((head, pl.into()))
} }
} }