diff --git a/ci/tsqa/tests/test_origin_max_connections.py b/ci/tsqa/tests/test_origin_max_connections.py new file mode 100644 index 00000000000..cff24d2bd89 --- /dev/null +++ b/ci/tsqa/tests/test_origin_max_connections.py @@ -0,0 +1,117 @@ +''' +Test the configure entry : proxy.config.http.origin_max_connections +''' + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import logging +import uuid +import socket +import requests +import tsqa.test_cases +import helpers +import thread +from multiprocessing import Pool +import SocketServer + +log = logging.getLogger(__name__) + + +# TODO: seems like a useful shared class- either add to httpbin or some shared lib +class KAHandler(SocketServer.BaseRequestHandler): + '''SocketServer that returns the connection-id as the body + ''' + # class variable to set number of active sessions + alive_sessions = 0 + + def handle(self): + KAHandler.alive_sessions += 1 + # Receive the data in small chunks and retransmit it + conn_id = uuid.uuid4().hex + start = time.time() + while True: + data = self.request.recv(4096).strip() + if data: + log.info('Sending data back to the client: {uid}'.format(uid=conn_id)) + else: + log.info('Client disconnected: {timeout}seconds'.format(timeout=time.time() - start)) + break + body = conn_id + time.sleep(1) + resp = ('HTTP/1.1 200 OK\r\n' + 'Content-Length: {content_length}\r\n' + 'Content-Type: text/html; charset=UTF-8\r\n' + 'Connection: keep-alive\r\n' + 'X-Current-Sessions: {alive_sessions}\r\n' + '\r\n' + '{body}'.format(content_length=len(body), alive_sessions=KAHandler.alive_sessions, body=body)) + self.request.sendall(resp) + KAHandler.alive_sessions -= 1 + + +class TestKeepAlive_Origin_Max_connections(helpers.EnvironmentCase): + @classmethod + def setUpEnv(cls, env): + cls.traffic_server_host = '127.0.0.1' + cls.traffic_server_port = int(cls.configs['records.config']['CONFIG']['proxy.config.http.server_ports']) + cls.socket_server_port = int(tsqa.utils.bind_unused_port()[1]) + + log.info("socket_server_port = %d" % (cls.socket_server_port)) + cls.server = tsqa.endpoint.SocketServerDaemon(KAHandler, port=cls.socket_server_port) + cls.server.start() + cls.server.ready.wait() + + cls.socket_server_port2 = int(tsqa.utils.bind_unused_port()[1]) + cls.server2 = tsqa.endpoint.SocketServerDaemon(KAHandler, port=cls.socket_server_port2) + cls.server2.start() + cls.server2.ready.wait() + + cls.configs['remap.config'].add_lines([ + 'map /other/ http://127.0.0.1:{0}'.format(cls.socket_server_port2), + 'map / http://127.0.0.1:{0}'.format(cls.socket_server_port), + ]) + + cls.origin_keep_alive_timeout = 1 + + cls.configs['records.config']['CONFIG'].update({ + 'proxy.config.http.origin_max_connections': 1, + 'proxy.config.http.keep_alive_enabled_out': 1, + 'proxy.config.http.keep_alive_no_activity_timeout_out': cls.origin_keep_alive_timeout, + 'proxy.config.exec_thread.limit': 2, + 'proxy.config.exec_thread.autoconfig': 0, + }) + + + def test_max(self): + ''' + ''' + REQUEST_COUNT = 8 + url = 'http://{0}:{1}/'.format(self.traffic_server_host, self.traffic_server_port) + url2 = 'http://{0}:{1}/other/'.format(self.traffic_server_host, self.traffic_server_port) + results = [] + results2 = [] + pool = Pool(processes=4) + for _ in xrange(0, REQUEST_COUNT): + results.append(pool.apply_async(requests.get, (url,))) + results2.append(pool.apply_async(requests.get, (url2,))) + + # TS-4340 + # ensure that the 2 origins (2 different ports on loopback) were running in parallel + for i in xrange(0, REQUEST_COUNT): + self.assertEqual(int(results[i].get().headers['X-Current-Sessions']), 2) + self.assertEqual(int(results2[i].get().headers['X-Current-Sessions']), 2) diff --git a/lib/ts/ink_inet.cc b/lib/ts/ink_inet.cc index 7de24faafa8..9016bd12fe5 100644 --- a/lib/ts/ink_inet.cc +++ b/lib/ts/ink_inet.cc @@ -307,6 +307,26 @@ ats_ip_hash(sockaddr const *addr) return zret.i; } +uint64_t +ats_ip_port_hash(sockaddr const *addr) +{ + union md5sum { + uint64_t i; + uint16_t b[4]; + unsigned char c[16]; + } zret; + + zret.i = 0; + if (ats_is_ip4(addr)) { + zret.i = (static_cast(ats_ip4_addr_cast(addr)) << 16) | (ats_ip_port_cast(addr)); + } else if (ats_is_ip6(addr)) { + ink_code_md5(const_cast(ats_ip_addr8_cast(addr)), TS_IP6_SIZE, zret.c); + // now replace the bottom 16bits so we can account for the port. + zret.b[3] = ats_ip_port_cast(addr); + } + return zret.i; +} + int ats_ip_to_hex(sockaddr const *src, char *dst, size_t len) { diff --git a/lib/ts/ink_inet.h b/lib/ts/ink_inet.h index 1754bd69bad..8945de32473 100644 --- a/lib/ts/ink_inet.h +++ b/lib/ts/ink_inet.h @@ -1151,6 +1151,8 @@ int ats_ip_getbestaddrinfo(char const *name, ///< [in] Address name (IPv4, IPv6, */ uint32_t ats_ip_hash(sockaddr const *addr); +uint64_t ats_ip_port_hash(sockaddr const *addr); + /** Convert address to string as a hexidecimal value. The string is always nul terminated, the output string is clipped if @a dst is insufficient. diff --git a/proxy/http/HttpConnectionCount.h b/proxy/http/HttpConnectionCount.h index af4d7f6780e..3fed8541a32 100644 --- a/proxy/http/HttpConnectionCount.h +++ b/proxy/http/HttpConnectionCount.h @@ -26,6 +26,10 @@ #include "ts/ink_inet.h" #include "ts/ink_mutex.h" #include "ts/Map.h" +#include "ts/Diags.h" +#include "ts/INK_MD5.h" +#include "ts/ink_config.h" +#include "HttpProxyAPIEnums.h" #ifndef _HTTP_CONNECTION_COUNT_H_ #define _HTTP_CONNECTION_COUNT_H_ @@ -52,10 +56,10 @@ class ConnectionCount * @return Number of connections */ int - getCount(const IpEndpoint &addr) + getCount(const IpEndpoint &addr, const INK_MD5 &hostname_hash, TSServerSessionSharingMatchType match_type) { ink_mutex_acquire(&_mutex); - int count = _hostCount.get(ConnAddr(addr)); + int count = _hostCount.get(ConnAddr(addr, hostname_hash, match_type)); ink_mutex_release(&_mutex); return count; } @@ -66,9 +70,10 @@ class ConnectionCount * @param delta Default is +1, can be set to negative to decrement */ void - incrementCount(const IpEndpoint &addr, const int delta = 1) + incrementCount(const IpEndpoint &addr, const INK_MD5 &hostname_hash, TSServerSessionSharingMatchType match_type, + const int delta = 1) { - ConnAddr caddr(addr); + ConnAddr caddr(addr, hostname_hash, match_type); ink_mutex_acquire(&_mutex); int count = _hostCount.get(caddr); _hostCount.put(caddr, count + delta); @@ -77,14 +82,35 @@ class ConnectionCount struct ConnAddr { IpEndpoint _addr; + INK_MD5 _hostname_hash; + TSServerSessionSharingMatchType _match_type; - ConnAddr() { ink_zero(_addr); } - ConnAddr(int x) + ConnAddr() : _match_type(TS_SERVER_SESSION_SHARING_MATCH_NONE) + { + ink_zero(_addr); + ink_zero(_hostname_hash); + } + + ConnAddr(int x) : _match_type(TS_SERVER_SESSION_SHARING_MATCH_NONE) { ink_release_assert(x == 0); ink_zero(_addr); + ink_zero(_hostname_hash); + } + + ConnAddr(const IpEndpoint &addr, const INK_MD5 &hostname_hash, TSServerSessionSharingMatchType match_type) + : _addr(addr), _hostname_hash(hostname_hash), _match_type(match_type) + { } - ConnAddr(const IpEndpoint &addr) : _addr(addr) {} + + ConnAddr(const IpEndpoint &addr, const char *hostname, TSServerSessionSharingMatchType match_type) + : _addr(addr), _match_type(match_type) + { + MD5Context md5_ctx; + md5_ctx.hash_immediate(_hostname_hash, static_cast(hostname), strlen(hostname)); + } + + operator bool() { return ats_is_ip(&_addr); } }; @@ -94,12 +120,66 @@ class ConnectionCount static uintptr_t hash(ConnAddr &addr) { - return (uintptr_t)ats_ip_hash(&addr._addr.sa); + if (addr._match_type == TS_SERVER_SESSION_SHARING_MATCH_IP) { + return (uintptr_t)ats_ip_port_hash(&addr._addr.sa); + } else if (addr._match_type == TS_SERVER_SESSION_SHARING_MATCH_HOST) { + return (uintptr_t)addr._hostname_hash.u64[0]; + } else if (addr._match_type == TS_SERVER_SESSION_SHARING_MATCH_BOTH) { + return ((uintptr_t)ats_ip_port_hash(&addr._addr.sa) ^ (uintptr_t)addr._hostname_hash.u64[0]); + } else { + return 0; // they will never be equal() because of it returns false for NONE matches. + } } + static int equal(ConnAddr &a, ConnAddr &b) { - return ats_ip_addr_eq(&a._addr, &b._addr); + char addrbuf1[INET6_ADDRSTRLEN]; + char addrbuf2[INET6_ADDRSTRLEN]; + char md5buf1[33]; + char md5buf2[33]; + ink_code_md5_stringify(md5buf1, sizeof(md5buf1), reinterpret_cast(a._hostname_hash.u8)); + ink_code_md5_stringify(md5buf2, sizeof(md5buf2), reinterpret_cast(b._hostname_hash.u8)); + Debug("conn_count", "Comparing hostname hash %s dest %s match method %d to hostname hash %s dest %s match method %d", md5buf1, + ats_ip_nptop(&a._addr.sa, addrbuf1, sizeof(addrbuf1)), a._match_type, md5buf2, + ats_ip_nptop(&b._addr.sa, addrbuf2, sizeof(addrbuf2)), b._match_type); + + if (a._match_type != b._match_type || a._match_type == TS_SERVER_SESSION_SHARING_MATCH_NONE) { + Debug("conn_count", "result = 0, a._match_type != b._match_type || a._match_type == TS_SERVER_SESSION_SHARING_MATCH_NONE"); + return 0; + } + + if (a._match_type == TS_SERVER_SESSION_SHARING_MATCH_IP) { + if (ats_ip_addr_port_eq(&a._addr.sa, &b._addr.sa)) { + Debug("conn_count", "result = 1, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_IP"); + return 1; + } else { + Debug("conn_count", "result = 0, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_IP"); + return 0; + } + } + + if (a._match_type == TS_SERVER_SESSION_SHARING_MATCH_HOST) { + if ((a._hostname_hash.u64[0] == b._hostname_hash.u64[0] && a._hostname_hash.u64[1] == b._hostname_hash.u64[1])) { + Debug("conn_count", "result = 1, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_HOST"); + return 1; + } else { + Debug("conn_count", "result = 0, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_HOST"); + return 0; + } + } + + if (a._match_type == TS_SERVER_SESSION_SHARING_MATCH_BOTH) { + if ((ats_ip_addr_port_eq(&a._addr.sa, &b._addr.sa)) && + (a._hostname_hash.u64[0] == b._hostname_hash.u64[0] && a._hostname_hash.u64[1] == b._hostname_hash.u64[1])) { + Debug("conn_count", "result = 1, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_BOTH"); + + return 1; + } + } + + Debug("conn_count", "result = 0, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_BOTH"); + return 0; } }; diff --git a/proxy/http/HttpSM.cc b/proxy/http/HttpSM.cc index da418e06ddc..90d819b9df0 100644 --- a/proxy/http/HttpSM.cc +++ b/proxy/http/HttpSM.cc @@ -1652,6 +1652,7 @@ HttpSM::state_http_server_open(int event, void *data) UnixNetVConnection *server_vc = (UnixNetVConnection*)data; printf("client fd is :%d , server fd is %d\n",vc->con.fd, server_vc->con.fd); */ + session->attach_hostname(t_state.current.server->name); ats_ip_copy(&session->server_ip, &t_state.current.server->dst_addr); session->new_connection(static_cast(data)); session->state = HSS_ACTIVE; @@ -4748,10 +4749,17 @@ HttpSM::do_http_server_open(bool raw) if (t_state.txn_conf->origin_max_connections > 0) { ConnectionCount *connections = ConnectionCount::getInstance(); - char addrbuf[INET6_ADDRSTRLEN]; - if (connections->getCount((t_state.current.server->dst_addr)) >= t_state.txn_conf->origin_max_connections) { + INK_MD5 hostname_hash; + MD5Context md5_ctx; + md5_ctx.hash_immediate(hostname_hash, static_cast(t_state.current.server->name), + strlen(t_state.current.server->name)); + + ip_port_text_buffer addrbuf; + if (connections->getCount(t_state.current.server->dst_addr, hostname_hash, + (TSServerSessionSharingMatchType)t_state.txn_conf->server_session_sharing_match) >= + t_state.txn_conf->origin_max_connections) { DebugSM("http", "[%" PRId64 "] over the number of connection for this host: %s", sm_id, - ats_ip_ntop(&t_state.current.server->dst_addr.sa, addrbuf, sizeof(addrbuf))); + ats_ip_nptop(&t_state.current.server->dst_addr.sa, addrbuf, sizeof(addrbuf))); ink_assert(pending_action == NULL); pending_action = eventProcessor.schedule_in(this, HRTIME_MSECONDS(100)); return; diff --git a/proxy/http/HttpServerSession.cc b/proxy/http/HttpServerSession.cc index 7a0db7b2a94..763c6b8c399 100644 --- a/proxy/http/HttpServerSession.cc +++ b/proxy/http/HttpServerSession.cc @@ -77,10 +77,11 @@ HttpServerSession::new_connection(NetVConnection *new_vc) if (enable_origin_connection_limiting == true) { if (connection_count == NULL) connection_count = ConnectionCount::getInstance(); - connection_count->incrementCount(server_ip); - char addrbuf[INET6_ADDRSTRLEN]; + connection_count->incrementCount(server_ip, hostname_hash, sharing_match); + ip_port_text_buffer addrbuf; Debug("http_ss", "[%" PRId64 "] new connection, ip: %s, count: %u", con_id, - ats_ip_ntop(&server_ip.sa, addrbuf, sizeof(addrbuf)), connection_count->getCount(server_ip)); + ats_ip_nptop(&server_ip.sa, addrbuf, sizeof(addrbuf)), + connection_count->getCount(server_ip, hostname_hash, sharing_match)); } #ifdef LAZY_BUF_ALLOC read_buffer = new_empty_MIOBuffer(HTTP_SERVER_RESP_HDR_BUFFER_INDEX); @@ -139,13 +140,15 @@ HttpServerSession::do_io_close(int alerrno) // Check to see if we are limiting the number of connections // per host if (enable_origin_connection_limiting == true) { - if (connection_count->getCount(server_ip) > 0) { - connection_count->incrementCount(server_ip, -1); - char addrbuf[INET6_ADDRSTRLEN]; + if (connection_count->getCount(server_ip, hostname_hash, sharing_match) > 0) { + connection_count->incrementCount(server_ip, hostname_hash, sharing_match, -1); + ip_port_text_buffer addrbuf; Debug("http_ss", "[%" PRId64 "] connection closed, ip: %s, count: %u", con_id, - ats_ip_ntop(&server_ip.sa, addrbuf, sizeof(addrbuf)), connection_count->getCount(server_ip)); + ats_ip_nptop(&server_ip.sa, addrbuf, sizeof(addrbuf)), + connection_count->getCount(server_ip, hostname_hash, sharing_match)); } else { - Error("[%" PRId64 "] number of connections should be greater than zero: %u", con_id, connection_count->getCount(server_ip)); + Error("[%" PRId64 "] number of connections should be greater than zero: %u", con_id, + connection_count->getCount(server_ip, hostname_hash, sharing_match)); } } diff --git a/proxy/http/HttpSessionManager.cc b/proxy/http/HttpSessionManager.cc index ae19ca1e337..2f3bd63d07c 100644 --- a/proxy/http/HttpSessionManager.cc +++ b/proxy/http/HttpSessionManager.cc @@ -171,8 +171,8 @@ ServerSessionPool::eventHandler(int event, void *data) // origin, then reset the timeouts on our end and do not close the connection if ((event == VC_EVENT_INACTIVITY_TIMEOUT || event == VC_EVENT_ACTIVE_TIMEOUT) && s->state == HSS_KA_SHARED && s->enable_origin_connection_limiting) { - bool connection_count_below_min = - s->connection_count->getCount(s->server_ip) <= http_config_params->origin_min_keep_alive_connections; + bool connection_count_below_min = s->connection_count->getCount(s->server_ip, s->hostname_hash, s->sharing_match) <= + http_config_params->origin_min_keep_alive_connections; if (connection_count_below_min) { Debug("http_ss", "[%" PRId64 "] [session_bucket] session received io notice [%s], "