###############################################################################
#
# vim:ts=3:sw=3:expandtab
#
# Tornado based asynchronous ICAP server
#
# Copyright (C) 2017, Menlo Security, Inc.
#
# All rights reserved.
#
# Summary: Previous ICAP server implementation was based on open source
# library 'pyicap'. This library uses multi-threading as the scaling strategy.
# In a multi-core environment, this library was not using all the cpus.
# In this implementation, pyicap library has been retrofitted to use tornado
# asynchronous tcp server.
#
# Design: Using a tornado tcp server as the base, parse the icap messages using
# pyicap library. It creates a filter chain so that many icap_filters with
# different purposes like whitelist_icap, auth_icap, pe_icap etc.. can reside in
# the same icap server. This avoids IO operations between squid and
# icap_server. Also, recycle tornado icap worker processes every x icap
# transactions/connection or y minutes to avoid any memory leaks piling up.
###############################################################################
import json
import logging
import os
import random
import socket
import sys
import tornado.gen
import tornado.httpclient
import tornado.ioloop
import tornado.iostream
import tornado.locale
import tornado.platform.epoll
import tornado.process
import tornado.tcpserver
from tornado.platform.auto import monotonic_time
import pnrconfig
from icap_request_handler import ICAPRequestHandler
from ops_logging import get_logger, set_global_service_name
# Create the operations logger
set_global_service_name('pnr.icap')
logger = get_logger('pnr.icap')
# Load app-specific configuration first.
config_files = ['config/default.ini',
'config/override.ini'] + pnrconfig.CONFIG_FILES
config = pnrconfig.PnrConfig(file_names=config_files)
tenant_config = {}
# Use the libcurl httpclient over the simple http client
tornado.httpclient.AsyncHTTPClient.configure(
'tornado.curl_httpclient.CurlAsyncHTTPClient')
class ICAPConnection(object):
"""Helper class which encapsulates the socket stream
Encasulates reads/writes to the socket stream opened between squid and
icap_server. It also keeps a counter of number of icap transactions and the
number of sockets currently open and total sockets opened in this worker.
"""
connection_id = 0
in_use = 0
close_all_connections = False
def __init__(self, stream):
ICAPConnection.connection_id += 1
self.id = ICAPConnection.connection_id
self.stream = stream
self.stream.socket.setsockopt(
socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.stream.socket.setsockopt(
socket.IPPROTO_TCP, socket.SO_KEEPALIVE, 1)
self.stream.set_close_callback(self.on_disconnect)
def on_disconnect(self):
"""called by tornado when squid closes socket"""
ICAPConnection.in_use -= 1
logger.debug({'total_connections': ICAPConnection.connection_id,
'pending_connections': ICAPConnection.in_use},
event='close_icap_connection')
@tornado.gen.coroutine
def on_connect(self):
"""Squid is trying to initiate a new connection"""
ICAPConnection.in_use += 1
raddr = 'closed'
try:
raddr = '%s:%d' % self.stream.socket.getpeername()
except Exception:
pass
logger.debug({'total_connections': ICAPConnection.connection_id,
'pending_connections': ICAPConnection.in_use,
'peer': raddr},
event='new_icap_connection')
mt = config.getint('icap_server', 'max_transactions_per_connection')
try:
counter = 0
while True:
counter += 1
cc = ICAPConnection.close_all_connections or counter >= mt
icap_request_handler = ICAPRequestHandler(self, config,
tenant_config, cc)
yield icap_request_handler.handle_request()
except tornado.iostream.StreamClosedError:
# squid closes connection to icap_server when the system is idle.
# do not flag it as error.
pass
@tornado.gen.coroutine
def read_bytes(self, num_bytes):
"""used during reading response preview"""
data = yield self.stream.read_bytes(num_bytes)
raise tornado.gen.Return(data)
@tornado.gen.coroutine
def read_line(self):
"""used to find out how many bytes to read during preview."""
line = yield self.stream.read_until(b'\r\n')
raise tornado.gen.Return(line)
@tornado.gen.coroutine
def read_chunk(self):
"""used during reading icap/http headers"""
lines = yield self.stream.read_until(b'\r\n\r\n')
raise tornado.gen.Return(lines)
@tornado.gen.coroutine
def write_chunk(self, chunk):
"""writing responses back to squid"""
yield self.stream.write(chunk)
class ICAPServer(tornado.tcpserver.TCPServer):
"""Extends Tornado TCPServer and manages recycling of worker processes.
Launches configurable number of tornado worker processes. It also recycles
the worker processes every x number of icap transaction or icap connections
or y number of secs. All these values are configurable using ini settings.
Before a worker process is terminated, the worker stops accepting new icap
connections. It keeps servicing existing icap connections for n secs (grace
period). The worker will be terminated if connections in use is 0 even before
the grace period is complete. Some amount of randomous has been built in so
that not all workers get terminated or stop listening around the same time.
"""
def __init__(self, *args, **kwargs):
super(ICAPServer, self).__init__(*args, **kwargs)
self.host = config.get('networking', 'pnr_icap_host')
self.port = config.get('networking', 'pnr_icap_port')
# Load the page translations, and set default to 'en'
tornado.locale.load_translations(
os.path.join(
os.path.dirname(__file__), os.pardir, "locale"))
tornado.locale.set_default_locale('en')
# Create dump directory if not present
dump_dir = config.get('icap_server', 'dump_dir')
dump_enabled = config.get('icap_server', 'dump_on_exception')
if dump_enabled and not os.path.exists(dump_dir):
os.mkdir(dump_dir)
@tornado.gen.coroutine
def handle_stream(self, stream, address):
"""Called for each new connection"""
connection = ICAPConnection(stream)
yield connection.on_connect()
def _stop_listening(self, timeout=None):
"""Stop the worker from accepting new icap connections"""
if ICAPConnection.close_all_connections:
# worker has already already stopped
return
logger.info({}, event='icap_server_stop')
ICAPConnection.close_all_connections = True
self.stop()
def terminate(deadline):
"""Terminate worker if pending connection = 0 or after grace period"""
logger.info({'pending_connections': ICAPConnection.in_use},
event='icap_server_stop')
now = self.io_loop.time()
if now < deadline and ICAPConnection.in_use:
self.io_loop.call_later(10, terminate, deadline)
else:
logger.info({'pending_connections': ICAPConnection.in_use},
event='icap_server_shutdown')
self.io_loop.stop()
deadline = self.io_loop.time() + (timeout or 0)
self.io_loop.call_later(10, terminate, deadline)
def _get_tids(self, tids):
return set([int(x) for x in tids])
@tornado.gen.coroutine
def _fetch_tenant_caps(self):
try:
pe_timeout = (config.getint('policy_enforcement_server',
'timeout_ms') / 1000.0)
pe_url = config.get('policy_enforcement_server', 'caps_lookup_url')
caps = config.get('icap_server', 'tenant_caps_to_fetch').split(', ')
data = {'capabilities': caps}
headers = {'Content-Type': 'application/json'}
http_client = tornado.httpclient.AsyncHTTPClient()
pe_response = yield http_client.fetch(
pe_url,
method='POST',
body=json.dumps(data),
headers=headers,
request_timeout=pe_timeout)
json_response = json.loads(pe_response.body)
for cap in caps:
result = json_response.get(cap, set())
tenant_config[cap + '_tids'] = self._get_tids(result)
logger.debug({'tenant_config': tenant_config},
event='fetch-tenant-caps')
except Exception as ex:
logger.error({'details': ex}, event='fetch-tenant-caps')
def _periodic_callback(self):
"""Check if the worker needs to be stopped. Also load the tenant_config"""
logger.debug({'pending_connections': ICAPConnection.in_use,
'total_connections': ICAPConnection.connection_id},
event='periodic_callback')
now = self.io_loop.time()
if now > self._die_after or ICAPConnection.connection_id > self._max_conn:
self._stop_listening(config.getint('icap_server', 'grace_lifespan'))
self._fetch_tenant_caps()
def start(self, num_processes=1):
"""Start the workers and ensure new workers are spawned if any worker dies
This is a copy-paste of Tornado's implementation of TCPServer.start().
Tornado TCPServer implementation allows only 100 workers to be launched
if any worker dies abnormally. We are relying on tornado to launch workers
every few hours. So, overriding the default tornado implementation with
max_restarts=sys.maxsize
"""
assert not self._started
self._started = True
if num_processes != 1:
tornado.process.fork_processes(num_processes, max_restarts=sys.maxsize)
sockets = self._pending_sockets
self._pending_sockets = []
self.add_sockets(sockets)
set_global_service_name('pnr-icap-%s' % tornado.process.task_id())
def run(self):
"""Bind to a reuseable port and setup random lifespans for workers"""
logger.info('#' * 50)
logger.info('%-25s: %s', 'name', 'ICAP Server')
logger.info('%-25s: %s', 'host', self.host)
logger.info('%-25s: %s', 'port', self.port)
logger.info('#' * 50)
tornado.ioloop.IOLoop.configure(tornado.platform.epoll.EPollIOLoop,
time_func=monotonic_time)
self.bind(self.port, self.host, reuse_port=True)
# Forks multiple sub-processes
self.start(config.getint('icap_server', 'max_workers'))
cr = config.getint('icap_server', 'max_connections_range')
cs = config.getint('icap_server', 'max_connections')
self._max_conn = cs + random.randint(-1 * cr, cr)
lr = config.getint('icap_server', 'worker_lifespan_range')
ls = config.getint('icap_server', 'avg_worker_lifespan')
self._lifespan = ls + random.randint(-1 * lr, lr)
self._die_after = self.io_loop.time() + self._lifespan
logger.info({'die_after': self._lifespan,
'max_conns': self._max_conn},
event='icap_server_startup')
pc = tornado.ioloop.PeriodicCallback(self._periodic_callback, 60 * 1000)
pc.start()
self.io_loop.start() # Blocks until self.ioloop.stop() is called.
sys.exit(-1) # On normal exit, tornado will not launch new worker.
def setup_logger():
LOG_DIR = config.get('icap_server', 'log_directory')
LOG_FILE_NAME = config.get('icap_server', 'log_file_name')
LOG_LEVEL = config.get('icap_server', 'log_level') or 'INFO'
logging.getLogger('').setLevel(LOG_LEVEL)
LOG_FORMAT = 'time="%(asctime)-15s" level=%(levelname)-7s %(message)s'
formatter = logging.Formatter(LOG_FORMAT)
log_file = os.path.join(LOG_DIR, LOG_FILE_NAME)
file_handler = logging.handlers.WatchedFileHandler(log_file)
file_handler.setLevel(LOG_LEVEL)
file_handler.setFormatter(formatter)
# Remove the existing handlers like console_handler and add the file_handler
for handler in logging.getLogger('').handlers:
logging.getLogger('').removeHandler(handler)
logging.getLogger('').addHandler(file_handler)
if __name__ == "__main__":
setup_logger()
server = ICAPServer()
server.run()
|