diff --git a/iocore/net/I_NetProcessor.h b/iocore/net/I_NetProcessor.h index 5f522a82a4d..c032bddac17 100644 --- a/iocore/net/I_NetProcessor.h +++ b/iocore/net/I_NetProcessor.h @@ -165,6 +165,8 @@ class NetProcessor : public Processor call back with success. If this behaviour is desired use synchronous connect connet_s method. + @see connect_s() + @param cont Continuation to be called back with events. @param addr target address and port to connect to. @param options @see NetVCOptions. @@ -173,12 +175,34 @@ class NetProcessor : public Processor inkcoreapi Action *connect_re(Continuation *cont, sockaddr const *addr, NetVCOptions *options = nullptr); + /** + Open a NetVConnection for connection oriented I/O. This call + is simliar to connect method except that the cont is called + back only after the connections has been established. In the + case of connect the cont could be called back with NET_EVENT_OPEN + event and OS could still be in the process of establishing the + connection. Re-entrant Callbacks: same as connect. If unix + asynchronous type connect is desired use connect_re(). + + @param cont Continuation to be called back with events. + @param addr Address to which to connect (includes port). + @param timeout for connect, the cont will get NET_EVENT_OPEN_FAILED + if connection could not be established for timeout msecs. The + default is 30 secs. + @param options @see NetVCOptions. + + @see connect_re() + + */ + Action *connect_s(Continuation *cont, sockaddr const *addr, int timeout = NET_CONNECT_TIMEOUT, NetVCOptions *opts = nullptr); + /** Starts the Netprocessor. This has to be called before doing any other net call. @param number_of_net_threads is not used. The net processor uses the Event Processor threads for its activity. + */ virtual int start(int number_of_net_threads, size_t stacksize) = 0; diff --git a/iocore/net/P_SSLNetVConnection.h b/iocore/net/P_SSLNetVConnection.h index 17345280c66..47a937c4ac6 100644 --- a/iocore/net/P_SSLNetVConnection.h +++ b/iocore/net/P_SSLNetVConnection.h @@ -98,16 +98,6 @@ class SSLNetVConnection : public UnixNetVConnection write.enabled = 1; } - bool - trackFirstHandshake() override - { - bool retval = sslHandshakeBeginTime == 0; - if (retval) { - sslHandshakeBeginTime = Thread::get_hrtime(); - } - return retval; - } - bool getSSLHandShakeComplete() const override { diff --git a/iocore/net/P_UnixNetVConnection.h b/iocore/net/P_UnixNetVConnection.h index 0f557fe07de..71c3fd1a627 100644 --- a/iocore/net/P_UnixNetVConnection.h +++ b/iocore/net/P_UnixNetVConnection.h @@ -232,12 +232,6 @@ class UnixNetVConnection : public NetVConnection return (true); } - virtual bool - trackFirstHandshake() - { - return false; - } - virtual void net_read_io(NetHandler *nh, EThread *lthread); virtual int64_t load_buffer_and_write(int64_t towrite, MIOBufferAccessor &buf, int64_t &total_written, int &needs); void readDisable(NetHandler *nh); diff --git a/iocore/net/UnixNetProcessor.cc b/iocore/net/UnixNetProcessor.cc index c4e915a050e..793a7a56a99 100644 --- a/iocore/net/UnixNetProcessor.cc +++ b/iocore/net/UnixNetProcessor.cc @@ -283,6 +283,117 @@ UnixNetProcessor::connect(Continuation *cont, UnixNetVConnection ** /* avc */, s return connect_re(cont, target, opt); } +struct CheckConnect : public Continuation { + UnixNetVConnection *vc; + Action action_; + MIOBuffer *buf; + IOBufferReader *reader; + int connect_status; + int recursion; + ink_hrtime timeout; + + int + handle_connect(int event, Event *e) + { + connect_status = event; + switch (event) { + case NET_EVENT_OPEN: + vc = (UnixNetVConnection *)e; + Debug("iocore_net_connect", "connect Net open"); + vc->do_io_write(this, 10, /* some non-zero number just to get the poll going */ + reader); + /* dont wait for more than timeout secs */ + vc->set_inactivity_timeout(timeout); + return EVENT_CONT; + break; + + case NET_EVENT_OPEN_FAILED: + Debug("iocore_net_connect", "connect Net open failed"); + if (!action_.cancelled) + action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)e); + break; + + case VC_EVENT_WRITE_READY: + int sl, ret; + socklen_t sz; + if (!action_.cancelled) { + sz = sizeof(int); + ret = getsockopt(vc->con.fd, SOL_SOCKET, SO_ERROR, (char *)&sl, &sz); + if (!ret && sl == 0) { + Debug("iocore_net_connect", "connection established"); + /* disable write on vc */ + vc->write.enabled = 0; + vc->cancel_inactivity_timeout(); + // write_disable(get_NetHandler(this_ethread()), vc); + /* clean up vc fields */ + vc->write.vio.nbytes = 0; + vc->write.vio.op = VIO::NONE; + vc->write.vio.buffer.clear(); + + action_.continuation->handleEvent(NET_EVENT_OPEN, vc); + delete this; + return EVENT_DONE; + } + } + vc->do_io_close(); + if (!action_.cancelled) + action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)-ENET_CONNECT_FAILED); + break; + case VC_EVENT_INACTIVITY_TIMEOUT: + Debug("iocore_net_connect", "connect timed out"); + vc->do_io_close(); + if (!action_.cancelled) + action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)-ENET_CONNECT_TIMEOUT); + break; + default: + ink_assert(!"unknown connect event"); + if (!action_.cancelled) + action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)-ENET_CONNECT_FAILED); + } + if (!recursion) + delete this; + return EVENT_DONE; + } + + Action * + connect_s(Continuation *cont, sockaddr const *target, int _timeout, NetVCOptions *opt) + { + action_ = cont; + timeout = HRTIME_SECONDS(_timeout); + recursion++; + netProcessor.connect_re(this, target, opt); + recursion--; + if (connect_status != NET_EVENT_OPEN_FAILED) + return &action_; + else { + delete this; + return ACTION_RESULT_DONE; + } + } + + explicit CheckConnect(Ptr &m) : Continuation(m.get()), vc(nullptr), connect_status(-1), recursion(0), timeout(0) + { + SET_HANDLER(&CheckConnect::handle_connect); + buf = new_empty_MIOBuffer(1); + reader = buf->alloc_reader(); + } + + ~CheckConnect() override + { + buf->dealloc_all_readers(); + buf->clear(); + free_MIOBuffer(buf); + } +}; + +Action * +NetProcessor::connect_s(Continuation *cont, sockaddr const *target, int timeout, NetVCOptions *opt) +{ + Debug("iocore_net_connect", "NetProcessor::connect_s called"); + CheckConnect *c = new CheckConnect(cont->mutex); + return c->connect_s(cont, target, timeout, opt); +} + struct PollCont; // This is a little odd, in that the actual threads are created before calling the processor. diff --git a/iocore/net/UnixNetVConnection.cc b/iocore/net/UnixNetVConnection.cc index c2303fa22c6..aa1737c4cae 100644 --- a/iocore/net/UnixNetVConnection.cc +++ b/iocore/net/UnixNetVConnection.cc @@ -432,13 +432,6 @@ write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread) // This function will always return true unless // vc is an SSLNetVConnection. if (!vc->getSSLHandShakeComplete()) { - if (vc->trackFirstHandshake()) { - // Send the write ready on up to the state machine - write_signal_and_update(VC_EVENT_WRITE_READY, vc); - vc->write.triggered = 0; - nh->write_ready_list.remove(vc); - } - int err, ret; if (vc->get_context() == NET_VCONNECTION_OUT) { diff --git a/proxy/http/HttpSM.cc b/proxy/http/HttpSM.cc index d11b21d0d40..59319815700 100644 --- a/proxy/http/HttpSM.cc +++ b/proxy/http/HttpSM.cc @@ -1068,7 +1068,7 @@ HttpSM::state_read_push_response_header(int event, void *data) ////////////////////////////////////////////////////////////////////////////// // -// HttpSM::state_raw_http_server_open() +// HttpSM::state_http_server_open() // ////////////////////////////////////////////////////////////////////////////// int @@ -1689,7 +1689,7 @@ HttpSM::state_http_server_open(int event, void *data) HttpServerSession *session; switch (event) { - case NET_EVENT_OPEN: { + case NET_EVENT_OPEN: session = (TS_SERVER_SESSION_SHARING_POOL_THREAD == t_state.http_config_param->server_session_sharing_pool) ? THREAD_ALLOC_INIT(httpServerSessionAllocator, mutex->thread_holding) : httpServerSessionAllocator.alloc(); @@ -1709,11 +1709,7 @@ HttpSM::state_http_server_open(int event, void *data) printf("client fd is :%d , server fd is %d\n",vc->con.fd, server_vc->con.fd); */ session->attach_hostname(t_state.current.server->name); - UnixNetVConnection *vc = static_cast(data); - ink_release_assert(pending_action == nullptr || pending_action == vc->get_action()); - pending_action = nullptr; - - session->new_connection(vc); + session->new_connection(static_cast(data)); session->state = HSS_ACTIVE; @@ -1726,29 +1722,6 @@ HttpSM::state_http_server_open(int event, void *data) } else { session->to_parent_proxy = false; } - if (plugin_tunnel_type == HTTP_NO_PLUGIN_TUNNEL) { - DebugSM("http", "[%" PRId64 "] setting handler for TCP handshake", sm_id); - // Just want to get a write-ready event so we know that the TCP handshake is complete. - server_entry->vc_handler = &HttpSM::state_http_server_open; - server_entry->write_vio = server_session->do_io_write(this, 1, server_session->get_reader()); - } else { // in the case of an intercept plugin don't to the connect timeout change - DebugSM("http", "[%" PRId64 "] not setting handler for TCP handshake", sm_id); - handle_http_server_open(); - } - return 0; - } - case VC_EVENT_WRITE_READY: - case VC_EVENT_WRITE_COMPLETE: - // Update the time out to the regular connection timeout. - DebugSM("http_ss", "[%" PRId64 "] TCP Handshake complete", sm_id); - server_entry->vc_handler = &HttpSM::state_send_server_request_header; - - // Reset the timeout to the non-connect timeout - if (t_state.api_txn_no_activity_timeout_value != -1) { - server_session->get_netvc()->set_inactivity_timeout(HRTIME_MSECONDS(t_state.api_txn_no_activity_timeout_value)); - } else { - server_session->get_netvc()->set_inactivity_timeout(HRTIME_SECONDS(t_state.txn_conf->transaction_no_activity_timeout_out)); - } handle_http_server_open(); return 0; case EVENT_INTERVAL: // Delayed call from another thread @@ -5071,11 +5044,38 @@ HttpSM::do_http_server_open(bool raw) connect_action_handle = sslNetProcessor.connect_re(this, // state machine &t_state.current.server->dst_addr.sa, // addr + port &opt); - } else { + } else if (t_state.method != HTTP_WKSIDX_CONNECT && t_state.method != HTTP_WKSIDX_POST && t_state.method != HTTP_WKSIDX_PUT) { DebugSM("http", "calling netProcessor.connect_re"); connect_action_handle = netProcessor.connect_re(this, // state machine &t_state.current.server->dst_addr.sa, // addr + port &opt); + } else { + // The request transform would be applied to POST and/or PUT request. + // The server_vc should be established (writeable) before request transform start. + // The CheckConnect is created by connect_s, + // It will callback NET_EVENT_OPEN to HttpSM if server_vc is WRITE_READY, + // Otherwise NET_EVENT_OPEN_FAILED is callbacked. + MgmtInt connect_timeout; + + ink_assert(t_state.method == HTTP_WKSIDX_CONNECT || t_state.method == HTTP_WKSIDX_POST || t_state.method == HTTP_WKSIDX_PUT); + + // Set the inactivity timeout to the connect timeout so that we + // we fail this server if it doesn't start sending the response + // header + if (t_state.method == HTTP_WKSIDX_POST || t_state.method == HTTP_WKSIDX_PUT) { + connect_timeout = t_state.txn_conf->post_connect_attempts_timeout; + } else if (t_state.current.server == &t_state.parent_info) { + connect_timeout = t_state.txn_conf->parent_connect_timeout; + } else if (t_state.pCongestionEntry != nullptr) { + connect_timeout = t_state.pCongestionEntry->connect_timeout(); + } else { + connect_timeout = t_state.txn_conf->connect_attempts_timeout; + } + + DebugSM("http", "calling netProcessor.connect_s"); + connect_action_handle = netProcessor.connect_s(this, // state machine + &t_state.current.server->dst_addr.sa, // addr + port + connect_timeout, &opt); } if (connect_action_handle != ACTION_RESULT_DONE) {