diff --git a/include/ap_mmn.h b/include/ap_mmn.h index ce9840c79ca..9bd6b01b84e 100644 --- a/include/ap_mmn.h +++ b/include/ap_mmn.h @@ -646,14 +646,16 @@ * 20200420.9 (2.5.1-dev) Add hooks deliver_report and gather_reports to * mod_dav.h. * 20200420.10 (2.5.1-dev) Add method_precondition hook to mod_dav.h. + * 20200701.0 (2.5.1-dev) Axe ap_mpm_unregister_poll_callback and + * mpm_unregister_poll_callback hook. */ #define MODULE_MAGIC_COOKIE 0x41503235UL /* "AP25" */ #ifndef MODULE_MAGIC_NUMBER_MAJOR -#define MODULE_MAGIC_NUMBER_MAJOR 20200420 +#define MODULE_MAGIC_NUMBER_MAJOR 20200701 #endif -#define MODULE_MAGIC_NUMBER_MINOR 10 /* 0...n */ +#define MODULE_MAGIC_NUMBER_MINOR 0 /* 0...n */ /** * Determine if the server's current MODULE_MAGIC_NUMBER is at least a diff --git a/include/ap_mpm.h b/include/ap_mpm.h index 633ea1faace..f446587836f 100644 --- a/include/ap_mpm.h +++ b/include/ap_mpm.h @@ -242,16 +242,6 @@ AP_DECLARE(apr_status_t) ap_mpm_register_poll_callback_timeout( ap_mpm_callback_fn_t *tofn, void *baton, apr_time_t timeout); -/** -* Unregister a previously registered callback. -* @param pfds Array of apr_pollfd_t -* @return APR_SUCCESS if all sockets/pipes could be removed from the pollset, -* APR_ENOTIMPL if no asynch support, or an apr_pollset_remove error. -* @remark This function triggers the cleanup registered on the pool p during -* callback registration. -*/ -AP_DECLARE(apr_status_t) ap_mpm_unregister_poll_callback(apr_array_header_t *pfds); - typedef enum mpm_child_status { MPM_CHILD_STARTED, MPM_CHILD_EXITED, diff --git a/include/mpm_common.h b/include/mpm_common.h index 480696aeccd..673c470064a 100644 --- a/include/mpm_common.h +++ b/include/mpm_common.h @@ -440,13 +440,6 @@ AP_DECLARE_HOOK(apr_status_t, mpm_register_poll_callback_timeout, void *baton, apr_time_t timeout)) -/** - * Unregister the specified callback - * @ingroup hooks - */ -AP_DECLARE_HOOK(apr_status_t, mpm_unregister_poll_callback, - (apr_array_header_t *pds)) - /** Resume the suspended connection * @ingroup hooks */ diff --git a/modules/proxy/mod_proxy.c b/modules/proxy/mod_proxy.c index d730a6d0477..33060b08f24 100644 --- a/modules/proxy/mod_proxy.c +++ b/modules/proxy/mod_proxy.c @@ -1842,6 +1842,7 @@ static void *create_proxy_dir_config(apr_pool_t *p, char *dummy) new->add_forwarded_headers_set = 0; new->forward_100_continue = 1; new->forward_100_continue_set = 0; + new->async_delay = -1; return (void *) new; } @@ -1889,17 +1890,30 @@ static void *merge_proxy_dir_config(apr_pool_t *p, void *basev, void *addv) new->error_override_set = add->error_override_set || base->error_override_set; new->alias = (add->alias_set == 0) ? base->alias : add->alias; new->alias_set = add->alias_set || base->alias_set; + new->add_forwarded_headers = (add->add_forwarded_headers_set == 0) ? base->add_forwarded_headers : add->add_forwarded_headers; new->add_forwarded_headers_set = add->add_forwarded_headers_set || base->add_forwarded_headers_set; + new->forward_100_continue = (add->forward_100_continue_set == 0) ? base->forward_100_continue : add->forward_100_continue; new->forward_100_continue_set = add->forward_100_continue_set || base->forward_100_continue_set; + new->async_delay = + (add->async_delay_set == 0) ? base->async_delay + : add->async_delay; + new->async_delay_set = add->async_delay_set + || base->async_delay_set; + new->async_idle_timeout = + (add->async_idle_timeout_set == 0) ? base->async_idle_timeout + : add->async_idle_timeout; + new->async_idle_timeout_set = add->async_idle_timeout_set + || base->async_idle_timeout_set; + return new; } @@ -2479,6 +2493,33 @@ static const char * return NULL; } +static const char * + set_proxy_async_delay(cmd_parms *parms, void *dconf, const char *arg) +{ + proxy_dir_conf *conf = dconf; + if (strcmp(arg, "-1") == 0) { + conf->async_delay = -1; + } + else if (ap_timeout_parameter_parse(arg, &conf->async_delay, "s") + || conf->async_delay < 0) { + return "ProxyAsyncDelay has wrong format"; + } + conf->async_delay_set = 1; + return NULL; +} + +static const char * + set_proxy_async_idle(cmd_parms *parms, void *dconf, const char *arg) +{ + proxy_dir_conf *conf = dconf; + if (ap_timeout_parameter_parse(arg, &conf->async_idle_timeout, "s") + || conf->async_idle_timeout < 0) { + return "ProxyAsyncIdleTimeout has wrong format"; + } + conf->async_idle_timeout_set = 1; + return NULL; +} + static const char * set_recv_buffer_size(cmd_parms *parms, void *dummy, const char *arg) { @@ -3068,6 +3109,10 @@ static const command_rec proxy_cmds[] = AP_INIT_FLAG("Proxy100Continue", forward_100_continue, NULL, RSRC_CONF|ACCESS_CONF, "on if 100-Continue should be forwarded to the origin server, off if the " "proxy should handle it by itself"), + AP_INIT_TAKE1("ProxyAsyncDelay", set_proxy_async_delay, NULL, RSRC_CONF|ACCESS_CONF, + "Amount of time to poll before going asynchronous"), + AP_INIT_TAKE1("ProxyAsyncIdleTimeout", set_proxy_async_idle, NULL, RSRC_CONF|ACCESS_CONF, + "Timeout for asynchronous inactivity, ProxyTimeout by default"), {NULL} }; diff --git a/modules/proxy/mod_proxy.h b/modules/proxy/mod_proxy.h index b013fcf0897..2250febcb0e 100644 --- a/modules/proxy/mod_proxy.h +++ b/modules/proxy/mod_proxy.h @@ -247,6 +247,11 @@ typedef struct { unsigned int forward_100_continue_set:1; apr_array_header_t *error_override_codes; + + apr_interval_time_t async_delay; + apr_interval_time_t async_idle_timeout; + unsigned int async_delay_set:1; + unsigned int async_idle_timeout_set:1; } proxy_dir_conf; /* if we interpolate env vars per-request, we'll need a per-request diff --git a/modules/proxy/mod_proxy_http.c b/modules/proxy/mod_proxy_http.c index 01ba1ce5347..e34ab31fca4 100644 --- a/modules/proxy/mod_proxy_http.c +++ b/modules/proxy/mod_proxy_http.c @@ -18,19 +18,17 @@ #include "mod_proxy.h" #include "ap_regex.h" +#include "ap_mpm.h" module AP_MODULE_DECLARE_DATA proxy_http_module; static int (*ap_proxy_clear_connection_fn)(request_rec *r, apr_table_t *headers) = NULL; -static apr_status_t ap_proxy_http_cleanup(const char *scheme, - request_rec *r, - proxy_conn_rec *backend); - static apr_status_t ap_proxygetline(apr_bucket_brigade *bb, char *s, int n, request_rec *r, int flags, int *read); + /* * Canonicalise http-like URLs. * scheme is the scheme for the URL @@ -219,6 +217,12 @@ static void add_cl(apr_pool_t *p, #define MAX_MEM_SPOOL 16384 +typedef enum { + PROXY_HTTP_REQ_HAVE_HEADER = 0, + + PROXY_HTTP_TUNNELING +} proxy_http_state; + typedef enum { RB_INIT = 0, RB_STREAM_CL, @@ -229,29 +233,129 @@ typedef enum { typedef struct { apr_pool_t *p; request_rec *r; + const char *proto; proxy_worker *worker; + proxy_dir_conf *dconf; proxy_server_conf *sconf; - char server_portstr[32]; + proxy_conn_rec *backend; conn_rec *origin; apr_bucket_alloc_t *bucket_alloc; apr_bucket_brigade *header_brigade; apr_bucket_brigade *input_brigade; + char *old_cl_val, *old_te_val; apr_off_t cl_val; + proxy_http_state state; rb_methods rb_method; - int force10; const char *upgrade; - - int expecting_100; - unsigned int do_100_continue:1, - prefetch_nonblocking:1; + proxy_tunnel_rec *tunnel; + apr_array_header_t *pfds; + apr_interval_time_t idle_timeout; + + unsigned int can_go_async :1, + expecting_100 :1, + do_100_continue :1, + prefetch_nonblocking :1, + force10 :1; } proxy_http_req_t; +static void proxy_http_async_finish(proxy_http_req_t *req) +{ + conn_rec *c = req->r->connection; + + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r, + "proxy %s: finish async", req->proto); + + proxy_run_detach_backend(req->r, req->backend); + ap_proxy_release_connection(req->proto, req->backend, req->r->server); + + ap_finalize_request_protocol(req->r); + ap_process_request_after_handler(req->r); + /* don't touch req or req->r from here */ + + c->cs->state = CONN_STATE_LINGER; + ap_mpm_resume_suspended(c); +} + +/* If neither socket becomes readable in the specified timeout, + * this callback will kill the request. + * We do not have to worry about having a cancel and a IO both queued. + */ +static void proxy_http_async_cancel_cb(void *baton) +{ + proxy_http_req_t *req = (proxy_http_req_t *)baton; + + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r, + "proxy %s: cancel async", req->proto); + + req->r->connection->keepalive = AP_CONN_CLOSE; + req->backend->close = 1; + proxy_http_async_finish(req); +} + +/* Invoked by the event loop when data is ready on either end. + * We don't need the invoke_mtx, since we never put multiple callback events + * in the queue. + */ +static void proxy_http_async_cb(void *baton) +{ + proxy_http_req_t *req = (proxy_http_req_t *)baton; + int status; + + if (req->pfds) { + apr_pool_clear(req->pfds->pool); + } + + switch (req->state) { + case PROXY_HTTP_TUNNELING: + /* Pump both ends until they'd block and then start over again */ + status = ap_proxy_tunnel_run(req->tunnel); + if (status == HTTP_GATEWAY_TIME_OUT) { + if (req->pfds) { + apr_pollfd_t *async_pfds = (void *)req->pfds->elts; + apr_pollfd_t *tunnel_pfds = (void *)req->tunnel->pfds->elts; + async_pfds[0].reqevents = tunnel_pfds[0].reqevents; + async_pfds[1].reqevents = tunnel_pfds[1].reqevents; + } + else { + req->pfds = apr_array_copy(req->p, req->tunnel->pfds); + apr_pool_create(&req->pfds->pool, req->p); + } + status = SUSPENDED; + } + break; + + default: + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, req->r, + "proxy %s: unexpected async state (%i)", + req->proto, (int)req->state); + status = HTTP_INTERNAL_SERVER_ERROR; + break; + } + + if (status == SUSPENDED) { + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r, + "proxy %s: suspended, going async", + req->proto); + + ap_mpm_register_poll_callback_timeout(req->pfds, + proxy_http_async_cb, + proxy_http_async_cancel_cb, + req, req->idle_timeout); + } + else if (status != OK) { + proxy_http_async_cancel_cb(req); + } + else { + proxy_http_async_finish(req); + } +} + /* Read what's in the client pipe. If nonblocking is set and read is EAGAIN, * pass a FLUSH bucket to the backend and read again in blocking mode. */ @@ -1200,13 +1304,11 @@ int ap_proxy_http_process_response(proxy_http_req_t *req) int i; const char *te = NULL; int original_status = r->status; - int proxy_status = OK; const char *original_status_line = r->status_line; const char *proxy_status_line = NULL; apr_interval_time_t old_timeout = 0; - proxy_dir_conf *dconf; - - dconf = ap_get_module_config(r->per_dir_config, &proxy_module); + proxy_dir_conf *dconf = req->dconf; + int proxy_status = OK; bb = apr_brigade_create(p, c->bucket_alloc); pass_bb = apr_brigade_create(p, c->bucket_alloc); @@ -1634,9 +1736,6 @@ int ap_proxy_http_process_response(proxy_http_req_t *req) if (proxy_status == HTTP_SWITCHING_PROTOCOLS) { apr_status_t rv; - proxy_tunnel_rec *tunnel; - apr_interval_time_t client_timeout = -1, - backend_timeout = -1; /* If we didn't send the full body yet, do it now */ if (do_100_continue) { @@ -1650,41 +1749,35 @@ int ap_proxy_http_process_response(proxy_http_req_t *req) ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(10239) "HTTP: tunneling protocol %s", upgrade); - rv = ap_proxy_tunnel_create(&tunnel, r, origin, "HTTP"); + rv = ap_proxy_tunnel_create(&req->tunnel, r, origin, upgrade); if (rv != APR_SUCCESS) { ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(10240) "can't create tunnel for %s", upgrade); return HTTP_INTERNAL_SERVER_ERROR; } - /* Set timeout to the lowest configured for client or backend */ - apr_socket_timeout_get(backend->sock, &backend_timeout); - apr_socket_timeout_get(ap_get_conn_socket(c), &client_timeout); - if (backend_timeout >= 0 && backend_timeout < client_timeout) { - tunnel->timeout = backend_timeout; - } - else { - tunnel->timeout = client_timeout; - } + req->proto = upgrade; - /* Let proxy tunnel forward everything */ - status = ap_proxy_tunnel_run(tunnel); - if (ap_is_HTTP_ERROR(status)) { - /* Tunnel always return HTTP_GATEWAY_TIME_OUT on timeout, - * but we can differentiate between client and backend here. - */ - if (status == HTTP_GATEWAY_TIME_OUT - && tunnel->timeout == client_timeout) { - status = HTTP_REQUEST_TIME_OUT; - } + if (req->can_go_async) { + /* Let the MPM schedule the work when idle */ + req->state = PROXY_HTTP_TUNNELING; + req->tunnel->timeout = dconf->async_delay; + proxy_http_async_cb(req); + return SUSPENDED; } - else { + + /* Let proxy tunnel forward everything within this thread */ + req->tunnel->timeout = req->idle_timeout; + status = ap_proxy_tunnel_run(req->tunnel); + if (!ap_is_HTTP_ERROR(status)) { /* Update r->status for custom log */ status = HTTP_SWITCHING_PROTOCOLS; } r->status = status; /* We are done with both connections */ + r->connection->keepalive = AP_CONN_CLOSE; + backend->close = 1; return DONE; } @@ -2000,14 +2093,6 @@ int ap_proxy_http_process_response(proxy_http_req_t *req) return OK; } -static -apr_status_t ap_proxy_http_cleanup(const char *scheme, request_rec *r, - proxy_conn_rec *backend) -{ - ap_proxy_release_connection(scheme, backend, r->server); - return OK; -} - /* * This handles http:// URLs, and other URLs using a remote proxy over http * If proxyhost is NULL, then contact the server directly, otherwise @@ -2029,6 +2114,7 @@ static int proxy_http_handler(request_rec *r, proxy_worker *worker, proxy_http_req_t *req = NULL; proxy_conn_rec *backend = NULL; apr_bucket_brigade *input_brigade = NULL; + int mpm_can_poll = 0; int is_ssl = 0; conn_rec *c = r->connection; proxy_dir_conf *dconf; @@ -2080,20 +2166,26 @@ static int proxy_http_handler(request_rec *r, proxy_worker *worker, worker, r->server)) != OK) { return status; } - backend->is_ssl = is_ssl; + dconf = ap_get_module_config(r->per_dir_config, &proxy_module); + ap_mpm_query(AP_MPMQ_CAN_POLL, &mpm_can_poll); + req = apr_pcalloc(p, sizeof(*req)); req->p = p; req->r = r; req->sconf = conf; + req->dconf = dconf; req->worker = worker; req->backend = backend; + req->proto = proxy_function; req->bucket_alloc = c->bucket_alloc; + req->can_go_async = (mpm_can_poll && + dconf->async_delay_set && + dconf->async_delay >= 0); + req->state = PROXY_HTTP_REQ_HAVE_HEADER; req->rb_method = RB_INIT; - dconf = ap_get_module_config(r->per_dir_config, &proxy_module); - if (apr_table_get(r->subprocess_env, "force-proxy-request-1.0")) { req->force10 = 1; } @@ -2105,6 +2197,22 @@ static int proxy_http_handler(request_rec *r, proxy_worker *worker, } } + if (req->can_go_async || req->upgrade) { + /* If ProxyAsyncIdleTimeout is not set, use backend timeout */ + if (req->can_go_async && dconf->async_idle_timeout_set) { + req->idle_timeout = dconf->async_idle_timeout; + } + else if (worker->s->timeout_set) { + req->idle_timeout = worker->s->timeout; + } + else if (conf->timeout_set) { + req->idle_timeout = conf->timeout; + } + else { + req->idle_timeout = r->server->timeout; + } + } + /* We possibly reuse input data prefetched in previous call(s), e.g. for a * balancer fallback scenario, and in this case the 100 continue settings * should be consistent between balancer members. If not, we need to ignore @@ -2128,15 +2236,19 @@ static int proxy_http_handler(request_rec *r, proxy_worker *worker, * req->expecting_100 (i.e. cleared only if mod_proxy_http sent the * "100 Continue" according to its policy). */ - req->do_100_continue = req->prefetch_nonblocking = 1; - req->expecting_100 = r->expecting_100; + req->do_100_continue = 1; + req->expecting_100 = (r->expecting_100 != 0); r->expecting_100 = 0; } + /* Should we block while prefetching the body or try nonblocking and flush * data to the backend ASAP? */ - else if (input_brigade || apr_table_get(r->subprocess_env, - "proxy-prefetch-nonblocking")) { + if (input_brigade + || req->can_go_async + || req->do_100_continue + || apr_table_get(r->subprocess_env, + "proxy-prefetch-nonblocking")) { req->prefetch_nonblocking = 1; } @@ -2255,6 +2367,9 @@ static int proxy_http_handler(request_rec *r, proxy_worker *worker, /* Step Five: Receive the Response... Fall thru to cleanup */ status = ap_proxy_http_process_response(req); + if (status == SUSPENDED) { + return SUSPENDED; + } if (req->backend) { proxy_run_detach_backend(r, req->backend); } @@ -2267,7 +2382,8 @@ static int proxy_http_handler(request_rec *r, proxy_worker *worker, if (req->backend) { if (status != OK) req->backend->close = 1; - ap_proxy_http_cleanup(proxy_function, r, req->backend); + ap_proxy_release_connection(proxy_function, req->backend, + r->server); } if (req->expecting_100) { /* Restore r->expecting_100 if we didn't touch it */ diff --git a/modules/proxy/mod_proxy_wstunnel.c b/modules/proxy/mod_proxy_wstunnel.c index 75ac8e69eb7..6cf2d37bc31 100644 --- a/modules/proxy/mod_proxy_wstunnel.c +++ b/modules/proxy/mod_proxy_wstunnel.c @@ -29,6 +29,7 @@ typedef struct ws_baton_t { request_rec *r; proxy_conn_rec *backend; proxy_tunnel_rec *tunnel; + apr_array_header_t *pfds; const char *scheme; } ws_baton_t; @@ -84,15 +85,30 @@ static void proxy_wstunnel_callback(void *b) ws_baton_t *baton = (ws_baton_t*)b; proxyws_dir_conf *dconf = ap_get_module_config(baton->r->per_dir_config, &proxy_wstunnel_module); - int status = proxy_wstunnel_pump(baton, 1); - if (status == SUSPENDED) { - ap_mpm_register_poll_callback_timeout(baton->tunnel->pfds, - proxy_wstunnel_callback, - proxy_wstunnel_cancel_callback, - baton, - dconf->idle_timeout); + + if (baton->pfds) { + apr_pool_clear(baton->pfds->pool); + } + + if (proxy_wstunnel_pump(baton, 1) == SUSPENDED) { ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, baton->r, "proxy_wstunnel_callback suspend"); + + if (baton->pfds) { + apr_pollfd_t *async_pfds = (void *)baton->pfds->elts; + apr_pollfd_t *tunnel_pfds = (void *)baton->tunnel->pfds->elts; + async_pfds[0].reqevents = tunnel_pfds[0].reqevents; + async_pfds[1].reqevents = tunnel_pfds[1].reqevents; + } + else { + baton->pfds = apr_array_copy(baton->r->pool, baton->tunnel->pfds); + apr_pool_create(&baton->pfds->pool, baton->r->pool); + } + + ap_mpm_register_poll_callback_timeout(baton->pfds, + proxy_wstunnel_callback, + proxy_wstunnel_cancel_callback, + baton, dconf->idle_timeout); } else { proxy_wstunnel_finish(baton); diff --git a/modules/proxy/proxy_util.c b/modules/proxy/proxy_util.c index 8b99c5f8d2f..e9a76437f2d 100644 --- a/modules/proxy/proxy_util.c +++ b/modules/proxy/proxy_util.c @@ -4327,11 +4327,8 @@ PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_create(proxy_tunnel_rec **ptunnel, apr_socket_opt_set(tunnel->client->pfd->desc.s, APR_SO_NONBLOCK, 1); apr_socket_opt_set(tunnel->origin->pfd->desc.s, APR_SO_NONBLOCK, 1); - /* No coalescing filters */ - ap_remove_output_filter_byhandle(c_i->output_filters, - "SSL/TLS Coalescing Filter"); - ap_remove_output_filter_byhandle(c_o->output_filters, - "SSL/TLS Coalescing Filter"); + /* Bidirectional non-HTTP stream will confuse mod_reqtimeoout */ + ap_remove_input_filter_byhandle(c_i->input_filters, "reqtimeout"); /* Bidirectional non-HTTP stream will confuse mod_reqtimeoout */ ap_remove_input_filter_byhandle(c_i->input_filters, "reqtimeout"); diff --git a/modules/ssl/ssl_engine_io.c b/modules/ssl/ssl_engine_io.c index 4bcf5e9dec4..9ea36a5658c 100644 --- a/modules/ssl/ssl_engine_io.c +++ b/modules/ssl/ssl_engine_io.c @@ -1715,6 +1715,20 @@ static apr_status_t ssl_io_filter_coalesce(ap_filter_t *f, apr_size_t buffered = ctx ? ctx->bytes : 0; /* space used on entry */ unsigned count = 0; + /* Pass down everything if called from ap_filter_output_pending() */ + if (APR_BRIGADE_EMPTY(bb)) { + if (!ctx || !ctx->bytes) { + return APR_SUCCESS; + } + + e = apr_bucket_transient_create(ctx->buffer, ctx->bytes, + bb->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(bb, e); + ctx->bytes = 0; /* buffer now emptied. */ + + return ap_pass_brigade(f->next, bb); + } + /* The brigade consists of zero-or-more small data buckets which * can be coalesced (referred to as the "prefix"), followed by the * remainder of the brigade. diff --git a/server/mpm/event/event.c b/server/mpm/event/event.c index 7f835afa955..4ef72b328f2 100644 --- a/server/mpm/event/event.c +++ b/server/mpm/event/event.c @@ -380,12 +380,13 @@ typedef struct void *baton; } listener_poll_type; -typedef struct +typedef struct socket_callback_baton { ap_mpm_callback_fn_t *cbfunc; void *user_baton; apr_array_header_t *pfds; timer_event_t *cancel_event; /* If a timeout was requested, a pointer to the timer event */ + struct socket_callback_baton *next; unsigned int signaled :1; } socket_callback_baton_t; @@ -984,6 +985,24 @@ static int event_post_read_request(request_rec *r) /* Forward declare */ static void process_lingering_close(event_conn_state_t *cs); +static void update_reqevents_from_sense(event_conn_state_t *cs) +{ + if (cs->pub.sense == CONN_SENSE_WANT_READ) { + cs->pfd.reqevents = APR_POLLIN | APR_POLLHUP; + } + else { + cs->pfd.reqevents = APR_POLLOUT; + } + /* POLLERR is usually returned event only, but some pollset + * backends may require it in reqevents to do the right thing, + * so it shouldn't hurt (ignored otherwise). + */ + cs->pfd.reqevents |= APR_POLLERR; + + /* Reset to default for the next round */ + cs->pub.sense = CONN_SENSE_DEFAULT; +} + /* * process one connection in the worker */ @@ -1161,19 +1180,7 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc cs->queue_timestamp = apr_time_now(); notify_suspend(cs); - if (cs->pub.sense == CONN_SENSE_WANT_READ) { - cs->pfd.reqevents = APR_POLLIN; - } - else { - cs->pfd.reqevents = APR_POLLOUT; - } - /* POLLHUP/ERR are usually returned event only (ignored here), but - * some pollset backends may require them in reqevents to do the - * right thing, so it shouldn't hurt. - */ - cs->pfd.reqevents |= APR_POLLHUP | APR_POLLERR; - cs->pub.sense = CONN_SENSE_DEFAULT; - + update_reqevents_from_sense(cs); apr_thread_mutex_lock(timeout_mutex); TO_QUEUE_APPEND(cs->sc->wc_q, cs); rv = apr_pollset_add(event_pollset, &cs->pfd); @@ -1274,15 +1281,24 @@ static apr_status_t event_resume_suspended (conn_rec *c) apr_atomic_dec32(&suspended_count); c->suspended_baton = NULL; - cs->queue_timestamp = apr_time_now(); - cs->pfd.reqevents = ( - cs->pub.sense == CONN_SENSE_WANT_READ ? APR_POLLIN : - APR_POLLOUT) | APR_POLLHUP | APR_POLLERR; - cs->pub.sense = CONN_SENSE_DEFAULT; - apr_thread_mutex_lock(timeout_mutex); - TO_QUEUE_APPEND(cs->sc->wc_q, cs); - apr_pollset_add(event_pollset, &cs->pfd); - apr_thread_mutex_unlock(timeout_mutex); + if (cs->pub.state == CONN_STATE_LINGER) { + int rc = start_lingering_close_blocking(cs); + if (rc == OK && (cs->pub.state == CONN_STATE_LINGER_NORMAL || + cs->pub.state == CONN_STATE_LINGER_SHORT)) { + process_lingering_close(cs); + } + } + else { + cs->queue_timestamp = apr_time_now(); + cs->pub.state = CONN_STATE_WRITE_COMPLETION; + notify_suspend(cs); + + update_reqevents_from_sense(cs); + apr_thread_mutex_lock(timeout_mutex); + TO_QUEUE_APPEND(cs->sc->wc_q, cs); + apr_pollset_add(event_pollset, &cs->pfd); + apr_thread_mutex_unlock(timeout_mutex); + } return OK; } @@ -1503,7 +1519,7 @@ static timer_event_t * event_get_timer_event(apr_time_t t, ap_mpm_callback_fn_t *cbfn, void *baton, int insert, - apr_array_header_t *remove) + apr_array_header_t *pfds) { timer_event_t *te; apr_time_t now = (t < 0) ? 0 : apr_time_now(); @@ -1525,7 +1541,7 @@ static timer_event_t * event_get_timer_event(apr_time_t t, te->baton = baton; te->canceled = 0; te->when = now + t; - te->remove = remove; + te->pfds = pfds; if (insert) { apr_time_t next_expiry; @@ -1553,9 +1569,9 @@ static timer_event_t * event_get_timer_event(apr_time_t t, static apr_status_t event_register_timed_callback_ex(apr_time_t t, ap_mpm_callback_fn_t *cbfn, void *baton, - apr_array_header_t *remove) + apr_array_header_t *pfds) { - event_get_timer_event(t, cbfn, baton, 1, remove); + event_get_timer_event(t, cbfn, baton, 1, pfds); return APR_SUCCESS; } @@ -1581,6 +1597,7 @@ static apr_status_t event_cleanup_poll_callback(void *data) if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) { final_rc = rc; } + pfd->client_data = NULL; } } @@ -1593,9 +1610,10 @@ static apr_status_t event_register_poll_callback_ex(apr_array_header_t *pfds, void *baton, apr_time_t timeout) { - socket_callback_baton_t *scb = apr_pcalloc(pfds->pool, sizeof(*scb)); - listener_poll_type *pt = apr_palloc(pfds->pool, sizeof(*pt)); - apr_status_t rc, final_rc= APR_SUCCESS; + apr_pool_t *p = pfds->pool; + socket_callback_baton_t *scb = apr_pcalloc(p, sizeof(*scb)); + listener_poll_type *pt = apr_palloc(p, sizeof(*pt)); + apr_status_t rc, final_rc = APR_SUCCESS; int i; pt->type = PT_USER; @@ -1605,23 +1623,33 @@ static apr_status_t event_register_poll_callback_ex(apr_array_header_t *pfds, scb->user_baton = baton; scb->pfds = pfds; - apr_pool_pre_cleanup_register(pfds->pool, pfds, event_cleanup_poll_callback); + apr_pool_pre_cleanup_register(p, pfds, event_cleanup_poll_callback); for (i = 0; i < pfds->nelts; i++) { apr_pollfd_t *pfd = (apr_pollfd_t *)pfds->elts + i; - pfd->reqevents = (pfd->reqevents) | APR_POLLERR | APR_POLLHUP; - pfd->client_data = pt; + if (pfd->reqevents) { + if (pfd->reqevents & APR_POLLIN) { + pfd->reqevents |= APR_POLLHUP; + } + pfd->reqevents |= APR_POLLERR; + pfd->client_data = pt; + } + else { + pfd->client_data = NULL; + } } if (timeout > 0) { - /* XXX: This cancel timer event count fire before the pollset is updated */ + /* XXX: This cancel timer event can fire before the pollset is updated */ scb->cancel_event = event_get_timer_event(timeout, tofn, baton, 1, pfds); } for (i = 0; i < pfds->nelts; i++) { apr_pollfd_t *pfd = (apr_pollfd_t *)pfds->elts + i; - rc = apr_pollset_add(event_pollset, pfd); - if (rc != APR_SUCCESS) { - final_rc = rc; + if (pfd->client_data) { + rc = apr_pollset_add(event_pollset, pfd); + if (rc != APR_SUCCESS) { + final_rc = rc; + } } } return final_rc; @@ -1637,10 +1665,6 @@ static apr_status_t event_register_poll_callback(apr_array_header_t *pfds, baton, 0 /* no timeout */); } -static apr_status_t event_unregister_poll_callback(apr_array_header_t *pfds) -{ - return apr_pool_cleanup_run(pfds->pool, pfds, event_cleanup_poll_callback); -} /* * Close socket and clean up if remote closed its end while we were in @@ -1818,6 +1842,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) const apr_pollfd_t *out_pfd; apr_int32_t num = 0; apr_interval_time_t timeout_interval; + socket_callback_baton_t *user_chain; apr_time_t now, timeout_time; int workers_were_busy = 0; @@ -1887,13 +1912,10 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) } apr_skiplist_pop(timer_skiplist, NULL); if (!te->canceled) { - if (te->remove) { - int i; - for (i = 0; i < te->remove->nelts; i++) { - apr_pollfd_t *pfd; - pfd = (apr_pollfd_t *)te->remove->elts + i; - apr_pollset_remove(event_pollset, pfd); - } + if (te->pfds) { + /* remove all sockets from the pollset */ + apr_pool_cleanup_run(te->pfds->pool, te->pfds, + event_cleanup_poll_callback); } push_timer2worker(te); } @@ -1956,7 +1978,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) break; } - for (; num; --num, ++out_pfd) { + for (user_chain = NULL; num; --num, ++out_pfd) { listener_poll_type *pt = (listener_poll_type *) out_pfd->client_data; if (pt->type == PT_CSD) { /* one of the sockets is readable */ @@ -2126,33 +2148,43 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) #endif else if (pt->type == PT_USER) { - /* masquerade as a timer event that is firing */ - int i = 0; - socket_callback_baton_t *baton = (socket_callback_baton_t *) pt->baton; + socket_callback_baton_t *baton = pt->baton; if (baton->cancel_event) { baton->cancel_event->canceled = 1; } - /* We only signal once per N sockets with this baton */ - if (!(baton->signaled)) { + /* We only signal once per N sockets with this baton, + * and after this loop to avoid any race/lifetime issue + * with the user callback being called while we handle + * the same baton multiple times here. + */ + if (!baton->signaled) { baton->signaled = 1; - te = event_get_timer_event(-1 /* fake timer */, - baton->cbfunc, - baton->user_baton, - 0, /* don't insert it */ - NULL /* no associated socket callback */); - /* remove all sockets in my set */ - for (i = 0; i < baton->pfds->nelts; i++) { - apr_pollfd_t *pfd = (apr_pollfd_t *)baton->pfds->elts + i; - apr_pollset_remove(event_pollset, pfd); - pfd->client_data = NULL; - } - - push_timer2worker(te); + baton->next = user_chain; + user_chain = baton; } } } /* for processing poll */ + /* Time to handle user callbacks chained above */ + while (user_chain) { + socket_callback_baton_t *baton = user_chain; + user_chain = user_chain->next; + baton->next = NULL; + + /* remove all sockets from the pollset */ + apr_pool_cleanup_run(baton->pfds->pool, baton->pfds, + event_cleanup_poll_callback); + + /* masquerade as a timer event that is firing */ + te = event_get_timer_event(-1 /* fake timer */, + baton->cbfunc, + baton->user_baton, + 0, /* don't insert it */ + NULL /* no associated socket callback */); + push_timer2worker(te); + } + /* XXX possible optimization: stash the current time for use as * r->request_time for new requests */ @@ -4045,8 +4077,6 @@ static void event_hooks(apr_pool_t * p) APR_HOOK_MIDDLE); ap_hook_mpm_register_poll_callback_timeout(event_register_poll_callback_ex, NULL, NULL, APR_HOOK_MIDDLE); - ap_hook_mpm_unregister_poll_callback(event_unregister_poll_callback, NULL, NULL, - APR_HOOK_MIDDLE); ap_hook_pre_read_request(event_pre_read_request, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_post_read_request(event_post_read_request, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_mpm_get_name(event_get_name, NULL, NULL, APR_HOOK_MIDDLE); diff --git a/server/mpm_common.c b/server/mpm_common.c index 6a7a3a8b370..f63f84538d7 100644 --- a/server/mpm_common.c +++ b/server/mpm_common.c @@ -70,7 +70,6 @@ APR_HOOK_LINK(mpm_register_timed_callback) \ APR_HOOK_LINK(mpm_register_poll_callback) \ APR_HOOK_LINK(mpm_register_poll_callback_timeout) \ - APR_HOOK_LINK(mpm_unregister_poll_callback) \ APR_HOOK_LINK(mpm_get_name) \ APR_HOOK_LINK(mpm_resume_suspended) \ APR_HOOK_LINK(end_generation) \ @@ -116,9 +115,6 @@ AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_register_poll_callback, AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_register_poll_callback_timeout, (apr_array_header_t *pds, ap_mpm_callback_fn_t *cbfn, ap_mpm_callback_fn_t *tofn, void *baton, apr_time_t timeout), (pds, cbfn, tofn, baton, timeout), APR_ENOTIMPL) -AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_unregister_poll_callback, - (apr_array_header_t *pds), - (pds), APR_ENOTIMPL) AP_IMPLEMENT_HOOK_RUN_FIRST(int, output_pending, (conn_rec *c), (c), DECLINED) AP_IMPLEMENT_HOOK_RUN_FIRST(int, input_pending, @@ -585,12 +581,6 @@ AP_DECLARE(apr_status_t) ap_mpm_register_poll_callback_timeout( timeout); } -AP_DECLARE(apr_status_t) ap_mpm_unregister_poll_callback( - apr_array_header_t *pfds) -{ - return ap_run_mpm_unregister_poll_callback(pfds); -} - AP_DECLARE(const char *)ap_show_mpm(void) { const char *name = ap_run_mpm_get_name(); diff --git a/server/mpm_fdqueue.h b/server/mpm_fdqueue.h index 9aeedde30da..ef9b0ab75f3 100644 --- a/server/mpm_fdqueue.h +++ b/server/mpm_fdqueue.h @@ -69,7 +69,7 @@ struct timer_event_t ap_mpm_callback_fn_t *cbfunc; void *baton; int canceled; - apr_array_header_t *remove; + apr_array_header_t *pfds; }; typedef struct timer_event_t timer_event_t;