Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions iocore/net/I_NetProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ class NetProcessor : public Processor
call back with success. If this behaviour is desired use
synchronous connect connet_s method.

@see connect_s()

@param cont Continuation to be called back with events.
@param addr target address and port to connect to.
@param options @see NetVCOptions.
Expand All @@ -173,12 +175,34 @@ class NetProcessor : public Processor

inkcoreapi Action *connect_re(Continuation *cont, sockaddr const *addr, NetVCOptions *options = nullptr);

/**
Open a NetVConnection for connection oriented I/O. This call
is simliar to connect method except that the cont is called
back only after the connections has been established. In the
case of connect the cont could be called back with NET_EVENT_OPEN
event and OS could still be in the process of establishing the
connection. Re-entrant Callbacks: same as connect. If unix
asynchronous type connect is desired use connect_re().

@param cont Continuation to be called back with events.
@param addr Address to which to connect (includes port).
@param timeout for connect, the cont will get NET_EVENT_OPEN_FAILED
if connection could not be established for timeout msecs. The
default is 30 secs.
@param options @see NetVCOptions.

@see connect_re()

*/
Action *connect_s(Continuation *cont, sockaddr const *addr, int timeout = NET_CONNECT_TIMEOUT, NetVCOptions *opts = nullptr);

/**
Starts the Netprocessor. This has to be called before doing any
other net call.

@param number_of_net_threads is not used. The net processor
uses the Event Processor threads for its activity.

*/
virtual int start(int number_of_net_threads, size_t stacksize) = 0;

Expand Down
10 changes: 0 additions & 10 deletions iocore/net/P_SSLNetVConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,6 @@ class SSLNetVConnection : public UnixNetVConnection
write.enabled = 1;
}

bool
trackFirstHandshake() override
{
bool retval = sslHandshakeBeginTime == 0;
if (retval) {
sslHandshakeBeginTime = Thread::get_hrtime();
}
return retval;
}

bool
getSSLHandShakeComplete() const override
{
Expand Down
6 changes: 0 additions & 6 deletions iocore/net/P_UnixNetVConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,6 @@ class UnixNetVConnection : public NetVConnection
return (true);
}

virtual bool
trackFirstHandshake()
{
return false;
}

virtual void net_read_io(NetHandler *nh, EThread *lthread);
virtual int64_t load_buffer_and_write(int64_t towrite, MIOBufferAccessor &buf, int64_t &total_written, int &needs);
void readDisable(NetHandler *nh);
Expand Down
111 changes: 111 additions & 0 deletions iocore/net/UnixNetProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,117 @@ UnixNetProcessor::connect(Continuation *cont, UnixNetVConnection ** /* avc */, s
return connect_re(cont, target, opt);
}

struct CheckConnect : public Continuation {
UnixNetVConnection *vc;
Action action_;
MIOBuffer *buf;
IOBufferReader *reader;
int connect_status;
int recursion;
ink_hrtime timeout;

int
handle_connect(int event, Event *e)
{
connect_status = event;
switch (event) {
case NET_EVENT_OPEN:
vc = (UnixNetVConnection *)e;
Debug("iocore_net_connect", "connect Net open");
vc->do_io_write(this, 10, /* some non-zero number just to get the poll going */
reader);
/* dont wait for more than timeout secs */
vc->set_inactivity_timeout(timeout);
return EVENT_CONT;
break;

case NET_EVENT_OPEN_FAILED:
Debug("iocore_net_connect", "connect Net open failed");
if (!action_.cancelled)
action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)e);
break;

case VC_EVENT_WRITE_READY:
int sl, ret;
socklen_t sz;
if (!action_.cancelled) {
sz = sizeof(int);
ret = getsockopt(vc->con.fd, SOL_SOCKET, SO_ERROR, (char *)&sl, &sz);
if (!ret && sl == 0) {
Debug("iocore_net_connect", "connection established");
/* disable write on vc */
vc->write.enabled = 0;
vc->cancel_inactivity_timeout();
// write_disable(get_NetHandler(this_ethread()), vc);
/* clean up vc fields */
vc->write.vio.nbytes = 0;
vc->write.vio.op = VIO::NONE;
vc->write.vio.buffer.clear();

action_.continuation->handleEvent(NET_EVENT_OPEN, vc);
delete this;
return EVENT_DONE;
}
}
vc->do_io_close();
if (!action_.cancelled)
action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)-ENET_CONNECT_FAILED);
break;
case VC_EVENT_INACTIVITY_TIMEOUT:
Debug("iocore_net_connect", "connect timed out");
vc->do_io_close();
if (!action_.cancelled)
action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)-ENET_CONNECT_TIMEOUT);
break;
default:
ink_assert(!"unknown connect event");
if (!action_.cancelled)
action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)-ENET_CONNECT_FAILED);
}
if (!recursion)
delete this;
return EVENT_DONE;
}

Action *
connect_s(Continuation *cont, sockaddr const *target, int _timeout, NetVCOptions *opt)
{
action_ = cont;
timeout = HRTIME_SECONDS(_timeout);
recursion++;
netProcessor.connect_re(this, target, opt);
recursion--;
if (connect_status != NET_EVENT_OPEN_FAILED)
return &action_;
else {
delete this;
return ACTION_RESULT_DONE;
}
}

explicit CheckConnect(Ptr<ProxyMutex> &m) : Continuation(m.get()), vc(nullptr), connect_status(-1), recursion(0), timeout(0)
{
SET_HANDLER(&CheckConnect::handle_connect);
buf = new_empty_MIOBuffer(1);
reader = buf->alloc_reader();
}

~CheckConnect() override
{
buf->dealloc_all_readers();
buf->clear();
free_MIOBuffer(buf);
}
};

Action *
NetProcessor::connect_s(Continuation *cont, sockaddr const *target, int timeout, NetVCOptions *opt)
{
Debug("iocore_net_connect", "NetProcessor::connect_s called");
CheckConnect *c = new CheckConnect(cont->mutex);
return c->connect_s(cont, target, timeout, opt);
}

struct PollCont;

// This is a little odd, in that the actual threads are created before calling the processor.
Expand Down
7 changes: 0 additions & 7 deletions iocore/net/UnixNetVConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -432,13 +432,6 @@ write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
// This function will always return true unless
// vc is an SSLNetVConnection.
if (!vc->getSSLHandShakeComplete()) {
if (vc->trackFirstHandshake()) {
// Send the write ready on up to the state machine
write_signal_and_update(VC_EVENT_WRITE_READY, vc);
vc->write.triggered = 0;
nh->write_ready_list.remove(vc);
}

int err, ret;

if (vc->get_context() == NET_VCONNECTION_OUT) {
Expand Down
62 changes: 31 additions & 31 deletions proxy/http/HttpSM.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,7 @@ HttpSM::state_read_push_response_header(int event, void *data)

//////////////////////////////////////////////////////////////////////////////
//
// HttpSM::state_raw_http_server_open()
// HttpSM::state_http_server_open()
//
//////////////////////////////////////////////////////////////////////////////
int
Expand Down Expand Up @@ -1689,7 +1689,7 @@ HttpSM::state_http_server_open(int event, void *data)
HttpServerSession *session;

switch (event) {
case NET_EVENT_OPEN: {
case NET_EVENT_OPEN:
session = (TS_SERVER_SESSION_SHARING_POOL_THREAD == t_state.http_config_param->server_session_sharing_pool) ?
THREAD_ALLOC_INIT(httpServerSessionAllocator, mutex->thread_holding) :
httpServerSessionAllocator.alloc();
Expand All @@ -1709,11 +1709,7 @@ HttpSM::state_http_server_open(int event, void *data)
printf("client fd is :%d , server fd is %d\n",vc->con.fd,
server_vc->con.fd); */
session->attach_hostname(t_state.current.server->name);
UnixNetVConnection *vc = static_cast<UnixNetVConnection *>(data);
ink_release_assert(pending_action == nullptr || pending_action == vc->get_action());
pending_action = nullptr;

session->new_connection(vc);
session->new_connection(static_cast<NetVConnection *>(data));

session->state = HSS_ACTIVE;

Expand All @@ -1726,29 +1722,6 @@ HttpSM::state_http_server_open(int event, void *data)
} else {
session->to_parent_proxy = false;
}
if (plugin_tunnel_type == HTTP_NO_PLUGIN_TUNNEL) {
DebugSM("http", "[%" PRId64 "] setting handler for TCP handshake", sm_id);
// Just want to get a write-ready event so we know that the TCP handshake is complete.
server_entry->vc_handler = &HttpSM::state_http_server_open;
server_entry->write_vio = server_session->do_io_write(this, 1, server_session->get_reader());
} else { // in the case of an intercept plugin don't to the connect timeout change
DebugSM("http", "[%" PRId64 "] not setting handler for TCP handshake", sm_id);
handle_http_server_open();
}
return 0;
}
case VC_EVENT_WRITE_READY:
case VC_EVENT_WRITE_COMPLETE:
// Update the time out to the regular connection timeout.
DebugSM("http_ss", "[%" PRId64 "] TCP Handshake complete", sm_id);
server_entry->vc_handler = &HttpSM::state_send_server_request_header;

// Reset the timeout to the non-connect timeout
if (t_state.api_txn_no_activity_timeout_value != -1) {
server_session->get_netvc()->set_inactivity_timeout(HRTIME_MSECONDS(t_state.api_txn_no_activity_timeout_value));
} else {
server_session->get_netvc()->set_inactivity_timeout(HRTIME_SECONDS(t_state.txn_conf->transaction_no_activity_timeout_out));
}
handle_http_server_open();
return 0;
case EVENT_INTERVAL: // Delayed call from another thread
Expand Down Expand Up @@ -5071,11 +5044,38 @@ HttpSM::do_http_server_open(bool raw)
connect_action_handle = sslNetProcessor.connect_re(this, // state machine
&t_state.current.server->dst_addr.sa, // addr + port
&opt);
} else {
} else if (t_state.method != HTTP_WKSIDX_CONNECT && t_state.method != HTTP_WKSIDX_POST && t_state.method != HTTP_WKSIDX_PUT) {
DebugSM("http", "calling netProcessor.connect_re");
connect_action_handle = netProcessor.connect_re(this, // state machine
&t_state.current.server->dst_addr.sa, // addr + port
&opt);
} else {
// The request transform would be applied to POST and/or PUT request.
// The server_vc should be established (writeable) before request transform start.
// The CheckConnect is created by connect_s,
// It will callback NET_EVENT_OPEN to HttpSM if server_vc is WRITE_READY,
// Otherwise NET_EVENT_OPEN_FAILED is callbacked.
MgmtInt connect_timeout;

ink_assert(t_state.method == HTTP_WKSIDX_CONNECT || t_state.method == HTTP_WKSIDX_POST || t_state.method == HTTP_WKSIDX_PUT);

// Set the inactivity timeout to the connect timeout so that we
// we fail this server if it doesn't start sending the response
// header
if (t_state.method == HTTP_WKSIDX_POST || t_state.method == HTTP_WKSIDX_PUT) {
connect_timeout = t_state.txn_conf->post_connect_attempts_timeout;
} else if (t_state.current.server == &t_state.parent_info) {
connect_timeout = t_state.txn_conf->parent_connect_timeout;
} else if (t_state.pCongestionEntry != nullptr) {
connect_timeout = t_state.pCongestionEntry->connect_timeout();
} else {
connect_timeout = t_state.txn_conf->connect_attempts_timeout;
}

DebugSM("http", "calling netProcessor.connect_s");
connect_action_handle = netProcessor.connect_s(this, // state machine
&t_state.current.server->dst_addr.sa, // addr + port
connect_timeout, &opt);
}

if (connect_action_handle != ACTION_RESULT_DONE) {
Expand Down