Coverage for src/async_icap_server.py : 87%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
############################################################################### # # vim:ts=3:sw=3:expandtab # # Tornado based asynchronous ICAP server # # Copyright (C) 2017, Menlo Security, Inc. # # All rights reserved. # # Summary: Previous ICAP server implementation was based on open source # library 'pyicap'. This library uses multi-threading as the scaling strategy. # In a multi-core environment, this library was not using all the cpus. # In this implementation, pyicap library has been retrofitted to use tornado # asynchronous tcp server. # # Design: Using a tornado tcp server as the base, parse the icap messages using # pyicap library. It creates a filter chain so that many icap_filters with # different purposes like whitelist_icap, auth_icap, pe_icap etc.. can reside in # the same icap server. This avoids IO operations between squid and # icap_server. Also, recycle tornado icap worker processes every x icap # transactions/connection or y minutes to avoid any memory leaks piling up. ###############################################################################
# Create the operations logger
# Load app-specific configuration first. 'config/override.ini'] + pnrconfig.CONFIG_FILES
# Use the libcurl httpclient over the simple http client 'tornado.curl_httpclient.CurlAsyncHTTPClient')
"""Helper class which encapsulates the socket stream
Encasulates reads/writes to the socket stream opened between squid and icap_server. It also keeps a counter of number of icap transactions and the number of sockets currently open and total sockets opened in this worker. """
socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) socket.IPPROTO_TCP, socket.SO_KEEPALIVE, 1)
"""called by tornado when squid closes socket""" 'pending_connections': ICAPConnection.in_use}, event='close_icap_connection')
def on_connect(self): """Squid is trying to initiate a new connection""" except Exception: pass 'pending_connections': ICAPConnection.in_use, 'peer': raddr}, event='new_icap_connection') tenant_config, cc) # squid closes connection to icap_server when the system is idle. # do not flag it as error.
def read_bytes(self, num_bytes): """used during reading response preview"""
def read_line(self): """used to find out how many bytes to read during preview."""
def read_chunk(self): """used during reading icap/http headers"""
def write_chunk(self, chunk): """writing responses back to squid"""
"""Extends Tornado TCPServer and manages recycling of worker processes.
Launches configurable number of tornado worker processes. It also recycles the worker processes every x number of icap transaction or icap connections or y number of secs. All these values are configurable using ini settings. Before a worker process is terminated, the worker stops accepting new icap connections. It keeps servicing existing icap connections for n secs (grace period). The worker will be terminated if connections in use is 0 even before the grace period is complete. Some amount of randomous has been built in so that not all workers get terminated or stop listening around the same time. """
# Load the page translations, and set default to 'en' os.path.join( os.path.dirname(__file__), os.pardir, "locale")) # Create dump directory if not present os.mkdir(dump_dir)
def handle_stream(self, stream, address): """Called for each new connection"""
"""Stop the worker from accepting new icap connections""" if ICAPConnection.close_all_connections: # worker has already already stopped return
logger.info({}, event='icap_server_stop') ICAPConnection.close_all_connections = True self.stop()
def terminate(deadline): """Terminate worker if pending connection = 0 or after grace period""" logger.info({'pending_connections': ICAPConnection.in_use}, event='icap_server_stop') now = self.io_loop.time() if now < deadline and ICAPConnection.in_use: self.io_loop.call_later(10, terminate, deadline) else: logger.info({'pending_connections': ICAPConnection.in_use}, event='icap_server_shutdown') self.io_loop.stop()
deadline = self.io_loop.time() + (timeout or 0) self.io_loop.call_later(10, terminate, deadline)
def _fetch_tenant_caps(self): 'timeout_ms') / 1000.0) pe_url, method='POST', body=json.dumps(data), headers=headers, request_timeout=pe_timeout) event='fetch-tenant-caps') except Exception as ex: logger.error({'details': ex}, event='fetch-tenant-caps')
"""Check if the worker needs to be stopped. Also load the tenant_config""" 'total_connections': ICAPConnection.connection_id}, event='periodic_callback') self._stop_listening(config.getint('icap_server', 'grace_lifespan'))
"""Start the workers and ensure new workers are spawned if any worker dies
This is a copy-paste of Tornado's implementation of TCPServer.start(). Tornado TCPServer implementation allows only 100 workers to be launched if any worker dies abnormally. We are relying on tornado to launch workers every few hours. So, overriding the default tornado implementation with max_restarts=sys.maxsize """ tornado.process.fork_processes(num_processes, max_restarts=sys.maxsize)
"""Bind to a reuseable port and setup random lifespans for workers"""
time_func=monotonic_time) # Forks multiple sub-processes
'max_conns': self._max_conn}, event='icap_server_startup')
sys.exit(-1) # On normal exit, tornado will not launch new worker.
# Remove the existing handlers like console_handler and add the file_handler
|