Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions ci/tsqa/tests/test_origin_max_connections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
'''
Test the configure entry : proxy.config.http.origin_max_connections
'''

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import logging
import uuid
import socket
import requests
import tsqa.test_cases
import helpers
import thread
from multiprocessing import Pool
import SocketServer

log = logging.getLogger(__name__)


# TODO: seems like a useful shared class- either add to httpbin or some shared lib
class KAHandler(SocketServer.BaseRequestHandler):
'''SocketServer that returns the connection-id as the body
'''
# class variable to set number of active sessions
alive_sessions = 0

def handle(self):
KAHandler.alive_sessions += 1
# Receive the data in small chunks and retransmit it
conn_id = uuid.uuid4().hex
start = time.time()
while True:
data = self.request.recv(4096).strip()
if data:
log.info('Sending data back to the client: {uid}'.format(uid=conn_id))
else:
log.info('Client disconnected: {timeout}seconds'.format(timeout=time.time() - start))
break
body = conn_id
time.sleep(1)
resp = ('HTTP/1.1 200 OK\r\n'
'Content-Length: {content_length}\r\n'
'Content-Type: text/html; charset=UTF-8\r\n'
'Connection: keep-alive\r\n'
'X-Current-Sessions: {alive_sessions}\r\n'
'\r\n'
'{body}'.format(content_length=len(body), alive_sessions=KAHandler.alive_sessions, body=body))
self.request.sendall(resp)
KAHandler.alive_sessions -= 1


class TestKeepAlive_Origin_Max_connections(helpers.EnvironmentCase):
@classmethod
def setUpEnv(cls, env):
cls.traffic_server_host = '127.0.0.1'
cls.traffic_server_port = int(cls.configs['records.config']['CONFIG']['proxy.config.http.server_ports'])
cls.socket_server_port = int(tsqa.utils.bind_unused_port()[1])

log.info("socket_server_port = %d" % (cls.socket_server_port))
cls.server = tsqa.endpoint.SocketServerDaemon(KAHandler, port=cls.socket_server_port)
cls.server.start()
cls.server.ready.wait()

cls.socket_server_port2 = int(tsqa.utils.bind_unused_port()[1])
cls.server2 = tsqa.endpoint.SocketServerDaemon(KAHandler, port=cls.socket_server_port2)
cls.server2.start()
cls.server2.ready.wait()

cls.configs['remap.config'].add_lines([
'map /other/ http://127.0.0.1:{0}'.format(cls.socket_server_port2),
'map / http://127.0.0.1:{0}'.format(cls.socket_server_port),
])

cls.origin_keep_alive_timeout = 1

cls.configs['records.config']['CONFIG'].update({
'proxy.config.http.origin_max_connections': 1,
'proxy.config.http.keep_alive_enabled_out': 1,
'proxy.config.http.keep_alive_no_activity_timeout_out': cls.origin_keep_alive_timeout,
'proxy.config.exec_thread.limit': 2,
'proxy.config.exec_thread.autoconfig': 0,
})


def test_max(self):
'''
'''
REQUEST_COUNT = 8
url = 'http://{0}:{1}/'.format(self.traffic_server_host, self.traffic_server_port)
url2 = 'http://{0}:{1}/other/'.format(self.traffic_server_host, self.traffic_server_port)
results = []
results2 = []
pool = Pool(processes=4)
for _ in xrange(0, REQUEST_COUNT):
results.append(pool.apply_async(requests.get, (url,)))
results2.append(pool.apply_async(requests.get, (url2,)))

# TS-4340
# ensure that the 2 origins (2 different ports on loopback) were running in parallel
for i in xrange(0, REQUEST_COUNT):
self.assertEqual(int(results[i].get().headers['X-Current-Sessions']), 2)
self.assertEqual(int(results2[i].get().headers['X-Current-Sessions']), 2)
20 changes: 20 additions & 0 deletions lib/ts/ink_inet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,26 @@ ats_ip_hash(sockaddr const *addr)
return zret.i;
}

uint64_t
ats_ip_port_hash(sockaddr const *addr)
{
union md5sum {
uint64_t i;
uint16_t b[4];
unsigned char c[16];
} zret;

zret.i = 0;
if (ats_is_ip4(addr)) {
zret.i = (static_cast<uint64_t>(ats_ip4_addr_cast(addr)) << 16) | (ats_ip_port_cast(addr));
} else if (ats_is_ip6(addr)) {
ink_code_md5(const_cast<uint8_t *>(ats_ip_addr8_cast(addr)), TS_IP6_SIZE, zret.c);
// now replace the bottom 16bits so we can account for the port.
zret.b[3] = ats_ip_port_cast(addr);
}
return zret.i;
}

int
ats_ip_to_hex(sockaddr const *src, char *dst, size_t len)
{
Expand Down
2 changes: 2 additions & 0 deletions lib/ts/ink_inet.h
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,8 @@ int ats_ip_getbestaddrinfo(char const *name, ///< [in] Address name (IPv4, IPv6,
*/
uint32_t ats_ip_hash(sockaddr const *addr);

uint64_t ats_ip_port_hash(sockaddr const *addr);

/** Convert address to string as a hexidecimal value.
The string is always nul terminated, the output string is clipped
if @a dst is insufficient.
Expand Down
98 changes: 89 additions & 9 deletions proxy/http/HttpConnectionCount.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
#include "ts/ink_inet.h"
#include "ts/ink_mutex.h"
#include "ts/Map.h"
#include "ts/Diags.h"
#include "ts/INK_MD5.h"
#include "ts/ink_config.h"
#include "HttpProxyAPIEnums.h"

#ifndef _HTTP_CONNECTION_COUNT_H_
#define _HTTP_CONNECTION_COUNT_H_
Expand All @@ -52,10 +56,10 @@ class ConnectionCount
* @return Number of connections
*/
int
getCount(const IpEndpoint &addr)
getCount(const IpEndpoint &addr, const INK_MD5 &hostname_hash, TSServerSessionSharingMatchType match_type)
{
ink_mutex_acquire(&_mutex);
int count = _hostCount.get(ConnAddr(addr));
int count = _hostCount.get(ConnAddr(addr, hostname_hash, match_type));
ink_mutex_release(&_mutex);
return count;
}
Expand All @@ -66,9 +70,10 @@ class ConnectionCount
* @param delta Default is +1, can be set to negative to decrement
*/
void
incrementCount(const IpEndpoint &addr, const int delta = 1)
incrementCount(const IpEndpoint &addr, const INK_MD5 &hostname_hash, TSServerSessionSharingMatchType match_type,
const int delta = 1)
{
ConnAddr caddr(addr);
ConnAddr caddr(addr, hostname_hash, match_type);
ink_mutex_acquire(&_mutex);
int count = _hostCount.get(caddr);
_hostCount.put(caddr, count + delta);
Expand All @@ -77,14 +82,35 @@ class ConnectionCount

struct ConnAddr {
IpEndpoint _addr;
INK_MD5 _hostname_hash;
TSServerSessionSharingMatchType _match_type;

ConnAddr() { ink_zero(_addr); }
ConnAddr(int x)
ConnAddr() : _match_type(TS_SERVER_SESSION_SHARING_MATCH_NONE)
{
ink_zero(_addr);
ink_zero(_hostname_hash);
}

ConnAddr(int x) : _match_type(TS_SERVER_SESSION_SHARING_MATCH_NONE)
{
ink_release_assert(x == 0);
ink_zero(_addr);
ink_zero(_hostname_hash);
}

ConnAddr(const IpEndpoint &addr, const INK_MD5 &hostname_hash, TSServerSessionSharingMatchType match_type)
: _addr(addr), _hostname_hash(hostname_hash), _match_type(match_type)
{
}
ConnAddr(const IpEndpoint &addr) : _addr(addr) {}

ConnAddr(const IpEndpoint &addr, const char *hostname, TSServerSessionSharingMatchType match_type)
: _addr(addr), _match_type(match_type)
{
MD5Context md5_ctx;
md5_ctx.hash_immediate(_hostname_hash, static_cast<const void *>(hostname), strlen(hostname));
}


operator bool() { return ats_is_ip(&_addr); }
};

Expand All @@ -94,12 +120,66 @@ class ConnectionCount
static uintptr_t
hash(ConnAddr &addr)
{
return (uintptr_t)ats_ip_hash(&addr._addr.sa);
if (addr._match_type == TS_SERVER_SESSION_SHARING_MATCH_IP) {
return (uintptr_t)ats_ip_port_hash(&addr._addr.sa);
} else if (addr._match_type == TS_SERVER_SESSION_SHARING_MATCH_HOST) {
return (uintptr_t)addr._hostname_hash.u64[0];
} else if (addr._match_type == TS_SERVER_SESSION_SHARING_MATCH_BOTH) {
return ((uintptr_t)ats_ip_port_hash(&addr._addr.sa) ^ (uintptr_t)addr._hostname_hash.u64[0]);
} else {
return 0; // they will never be equal() because of it returns false for NONE matches.
}
}

static int
equal(ConnAddr &a, ConnAddr &b)
{
return ats_ip_addr_eq(&a._addr, &b._addr);
char addrbuf1[INET6_ADDRSTRLEN];
char addrbuf2[INET6_ADDRSTRLEN];
char md5buf1[33];
char md5buf2[33];
ink_code_md5_stringify(md5buf1, sizeof(md5buf1), reinterpret_cast<const char *>(a._hostname_hash.u8));
ink_code_md5_stringify(md5buf2, sizeof(md5buf2), reinterpret_cast<const char *>(b._hostname_hash.u8));
Debug("conn_count", "Comparing hostname hash %s dest %s match method %d to hostname hash %s dest %s match method %d", md5buf1,
ats_ip_nptop(&a._addr.sa, addrbuf1, sizeof(addrbuf1)), a._match_type, md5buf2,
ats_ip_nptop(&b._addr.sa, addrbuf2, sizeof(addrbuf2)), b._match_type);

if (a._match_type != b._match_type || a._match_type == TS_SERVER_SESSION_SHARING_MATCH_NONE) {
Debug("conn_count", "result = 0, a._match_type != b._match_type || a._match_type == TS_SERVER_SESSION_SHARING_MATCH_NONE");
return 0;
}

if (a._match_type == TS_SERVER_SESSION_SHARING_MATCH_IP) {
if (ats_ip_addr_port_eq(&a._addr.sa, &b._addr.sa)) {
Debug("conn_count", "result = 1, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_IP");
return 1;
} else {
Debug("conn_count", "result = 0, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_IP");
return 0;
}
}

if (a._match_type == TS_SERVER_SESSION_SHARING_MATCH_HOST) {
if ((a._hostname_hash.u64[0] == b._hostname_hash.u64[0] && a._hostname_hash.u64[1] == b._hostname_hash.u64[1])) {
Debug("conn_count", "result = 1, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_HOST");
return 1;
} else {
Debug("conn_count", "result = 0, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_HOST");
return 0;
}
}

if (a._match_type == TS_SERVER_SESSION_SHARING_MATCH_BOTH) {
if ((ats_ip_addr_port_eq(&a._addr.sa, &b._addr.sa)) &&
(a._hostname_hash.u64[0] == b._hostname_hash.u64[0] && a._hostname_hash.u64[1] == b._hostname_hash.u64[1])) {
Debug("conn_count", "result = 1, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_BOTH");

return 1;
}
}

Debug("conn_count", "result = 0, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_BOTH");
return 0;
}
};

Expand Down
14 changes: 11 additions & 3 deletions proxy/http/HttpSM.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,7 @@ HttpSM::state_http_server_open(int event, void *data)
UnixNetVConnection *server_vc = (UnixNetVConnection*)data;
printf("client fd is :%d , server fd is %d\n",vc->con.fd,
server_vc->con.fd); */
session->attach_hostname(t_state.current.server->name);
ats_ip_copy(&session->server_ip, &t_state.current.server->dst_addr);
session->new_connection(static_cast<NetVConnection *>(data));
session->state = HSS_ACTIVE;
Expand Down Expand Up @@ -4748,10 +4749,17 @@ HttpSM::do_http_server_open(bool raw)
if (t_state.txn_conf->origin_max_connections > 0) {
ConnectionCount *connections = ConnectionCount::getInstance();

char addrbuf[INET6_ADDRSTRLEN];
if (connections->getCount((t_state.current.server->dst_addr)) >= t_state.txn_conf->origin_max_connections) {
INK_MD5 hostname_hash;
MD5Context md5_ctx;
md5_ctx.hash_immediate(hostname_hash, static_cast<const void *>(t_state.current.server->name),
strlen(t_state.current.server->name));

ip_port_text_buffer addrbuf;
if (connections->getCount(t_state.current.server->dst_addr, hostname_hash,
(TSServerSessionSharingMatchType)t_state.txn_conf->server_session_sharing_match) >=
t_state.txn_conf->origin_max_connections) {
DebugSM("http", "[%" PRId64 "] over the number of connection for this host: %s", sm_id,
ats_ip_ntop(&t_state.current.server->dst_addr.sa, addrbuf, sizeof(addrbuf)));
ats_ip_nptop(&t_state.current.server->dst_addr.sa, addrbuf, sizeof(addrbuf)));
ink_assert(pending_action == NULL);
pending_action = eventProcessor.schedule_in(this, HRTIME_MSECONDS(100));
return;
Expand Down
19 changes: 11 additions & 8 deletions proxy/http/HttpServerSession.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ HttpServerSession::new_connection(NetVConnection *new_vc)
if (enable_origin_connection_limiting == true) {
if (connection_count == NULL)
connection_count = ConnectionCount::getInstance();
connection_count->incrementCount(server_ip);
char addrbuf[INET6_ADDRSTRLEN];
connection_count->incrementCount(server_ip, hostname_hash, sharing_match);
ip_port_text_buffer addrbuf;
Debug("http_ss", "[%" PRId64 "] new connection, ip: %s, count: %u", con_id,
ats_ip_ntop(&server_ip.sa, addrbuf, sizeof(addrbuf)), connection_count->getCount(server_ip));
ats_ip_nptop(&server_ip.sa, addrbuf, sizeof(addrbuf)),
connection_count->getCount(server_ip, hostname_hash, sharing_match));
}
#ifdef LAZY_BUF_ALLOC
read_buffer = new_empty_MIOBuffer(HTTP_SERVER_RESP_HDR_BUFFER_INDEX);
Expand Down Expand Up @@ -139,13 +140,15 @@ HttpServerSession::do_io_close(int alerrno)
// Check to see if we are limiting the number of connections
// per host
if (enable_origin_connection_limiting == true) {
if (connection_count->getCount(server_ip) > 0) {
connection_count->incrementCount(server_ip, -1);
char addrbuf[INET6_ADDRSTRLEN];
if (connection_count->getCount(server_ip, hostname_hash, sharing_match) > 0) {
connection_count->incrementCount(server_ip, hostname_hash, sharing_match, -1);
ip_port_text_buffer addrbuf;
Debug("http_ss", "[%" PRId64 "] connection closed, ip: %s, count: %u", con_id,
ats_ip_ntop(&server_ip.sa, addrbuf, sizeof(addrbuf)), connection_count->getCount(server_ip));
ats_ip_nptop(&server_ip.sa, addrbuf, sizeof(addrbuf)),
connection_count->getCount(server_ip, hostname_hash, sharing_match));
} else {
Error("[%" PRId64 "] number of connections should be greater than zero: %u", con_id, connection_count->getCount(server_ip));
Error("[%" PRId64 "] number of connections should be greater than zero: %u", con_id,
connection_count->getCount(server_ip, hostname_hash, sharing_match));
}
}

Expand Down
4 changes: 2 additions & 2 deletions proxy/http/HttpSessionManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ ServerSessionPool::eventHandler(int event, void *data)
// origin, then reset the timeouts on our end and do not close the connection
if ((event == VC_EVENT_INACTIVITY_TIMEOUT || event == VC_EVENT_ACTIVE_TIMEOUT) && s->state == HSS_KA_SHARED &&
s->enable_origin_connection_limiting) {
bool connection_count_below_min =
s->connection_count->getCount(s->server_ip) <= http_config_params->origin_min_keep_alive_connections;
bool connection_count_below_min = s->connection_count->getCount(s->server_ip, s->hostname_hash, s->sharing_match) <=
http_config_params->origin_min_keep_alive_connections;

if (connection_count_below_min) {
Debug("http_ss", "[%" PRId64 "] [session_bucket] session received io notice [%s], "
Expand Down