Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

############################################################################### 

# 

# vim:ts=3:sw=3:expandtab 

# 

# Tornado based asynchronous ICAP server 

# 

# Copyright (C) 2017, Menlo Security, Inc. 

# 

# All rights reserved. 

# 

# Summary: Previous ICAP server implementation was based on open source 

# library 'pyicap'. This library uses multi-threading as the scaling strategy. 

# In a multi-core environment, this library was not using all the cpus. 

# In this implementation, pyicap library has been retrofitted to use tornado 

# asynchronous tcp server. 

# 

# Design: Using a tornado tcp server as the base, parse the icap messages using 

# pyicap library. It creates a filter chain so that many icap_filters with 

# different purposes like whitelist_icap, auth_icap, pe_icap etc.. can reside in 

# the same icap server. This avoids IO operations between squid and 

# icap_server. Also, recycle tornado icap worker processes every x icap 

# transactions/connection or y minutes to avoid any memory leaks piling up. 

############################################################################### 

import json 

import logging 

import os 

import random 

import socket 

import sys 

 

import tornado.gen 

import tornado.httpclient 

import tornado.ioloop 

import tornado.iostream 

import tornado.locale 

import tornado.platform.epoll 

import tornado.process 

import tornado.tcpserver 

from tornado.platform.auto import monotonic_time 

 

import pnrconfig 

from icap_request_handler import ICAPRequestHandler 

from ops_logging import get_logger, set_global_service_name 

 

# Create the operations logger 

set_global_service_name('pnr.icap') 

logger = get_logger('pnr.icap') 

 

 

# Load app-specific configuration first. 

config_files = ['config/default.ini', 

'config/override.ini'] + pnrconfig.CONFIG_FILES 

config = pnrconfig.PnrConfig(file_names=config_files) 

 

tenant_config = {} 

 

# Use the libcurl httpclient over the simple http client 

tornado.httpclient.AsyncHTTPClient.configure( 

'tornado.curl_httpclient.CurlAsyncHTTPClient') 

 

 

class ICAPConnection(object): 

"""Helper class which encapsulates the socket stream 

 

Encasulates reads/writes to the socket stream opened between squid and 

icap_server. It also keeps a counter of number of icap transactions and the 

number of sockets currently open and total sockets opened in this worker. 

""" 

connection_id = 0 

in_use = 0 

close_all_connections = False 

 

def __init__(self, stream): 

ICAPConnection.connection_id += 1 

self.id = ICAPConnection.connection_id 

self.stream = stream 

 

self.stream.socket.setsockopt( 

socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 

self.stream.socket.setsockopt( 

socket.IPPROTO_TCP, socket.SO_KEEPALIVE, 1) 

self.stream.set_close_callback(self.on_disconnect) 

 

def on_disconnect(self): 

"""called by tornado when squid closes socket""" 

ICAPConnection.in_use -= 1 

logger.debug({'total_connections': ICAPConnection.connection_id, 

'pending_connections': ICAPConnection.in_use}, 

event='close_icap_connection') 

 

@tornado.gen.coroutine 

def on_connect(self): 

"""Squid is trying to initiate a new connection""" 

ICAPConnection.in_use += 1 

raddr = 'closed' 

try: 

raddr = '%s:%d' % self.stream.socket.getpeername() 

except Exception: 

pass 

logger.debug({'total_connections': ICAPConnection.connection_id, 

'pending_connections': ICAPConnection.in_use, 

'peer': raddr}, 

event='new_icap_connection') 

mt = config.getint('icap_server', 'max_transactions_per_connection') 

try: 

counter = 0 

while True: 

counter += 1 

cc = ICAPConnection.close_all_connections or counter >= mt 

icap_request_handler = ICAPRequestHandler(self, config, 

tenant_config, cc) 

yield icap_request_handler.handle_request() 

except tornado.iostream.StreamClosedError: 

# squid closes connection to icap_server when the system is idle. 

# do not flag it as error. 

pass 

 

@tornado.gen.coroutine 

def read_bytes(self, num_bytes): 

"""used during reading response preview""" 

data = yield self.stream.read_bytes(num_bytes) 

raise tornado.gen.Return(data) 

 

@tornado.gen.coroutine 

def read_line(self): 

"""used to find out how many bytes to read during preview.""" 

line = yield self.stream.read_until(b'\r\n') 

raise tornado.gen.Return(line) 

 

@tornado.gen.coroutine 

def read_chunk(self): 

"""used during reading icap/http headers""" 

lines = yield self.stream.read_until(b'\r\n\r\n') 

raise tornado.gen.Return(lines) 

 

@tornado.gen.coroutine 

def write_chunk(self, chunk): 

"""writing responses back to squid""" 

yield self.stream.write(chunk) 

 

 

class ICAPServer(tornado.tcpserver.TCPServer): 

"""Extends Tornado TCPServer and manages recycling of worker processes. 

 

Launches configurable number of tornado worker processes. It also recycles 

the worker processes every x number of icap transaction or icap connections 

or y number of secs. All these values are configurable using ini settings. 

Before a worker process is terminated, the worker stops accepting new icap 

connections. It keeps servicing existing icap connections for n secs (grace 

period). The worker will be terminated if connections in use is 0 even before 

the grace period is complete. Some amount of randomous has been built in so 

that not all workers get terminated or stop listening around the same time. 

""" 

 

def __init__(self, *args, **kwargs): 

super(ICAPServer, self).__init__(*args, **kwargs) 

self.host = config.get('networking', 'pnr_icap_host') 

self.port = config.get('networking', 'pnr_icap_port') 

 

# Load the page translations, and set default to 'en' 

tornado.locale.load_translations( 

os.path.join( 

os.path.dirname(__file__), os.pardir, "locale")) 

tornado.locale.set_default_locale('en') 

# Create dump directory if not present 

dump_dir = config.get('icap_server', 'dump_dir') 

dump_enabled = config.get('icap_server', 'dump_on_exception') 

if dump_enabled and not os.path.exists(dump_dir): 

os.mkdir(dump_dir) 

 

@tornado.gen.coroutine 

def handle_stream(self, stream, address): 

"""Called for each new connection""" 

connection = ICAPConnection(stream) 

yield connection.on_connect() 

 

def _stop_listening(self, timeout=None): 

"""Stop the worker from accepting new icap connections""" 

if ICAPConnection.close_all_connections: 

# worker has already already stopped 

return 

 

logger.info({}, event='icap_server_stop') 

ICAPConnection.close_all_connections = True 

self.stop() 

 

def terminate(deadline): 

"""Terminate worker if pending connection = 0 or after grace period""" 

logger.info({'pending_connections': ICAPConnection.in_use}, 

event='icap_server_stop') 

now = self.io_loop.time() 

if now < deadline and ICAPConnection.in_use: 

self.io_loop.call_later(10, terminate, deadline) 

else: 

logger.info({'pending_connections': ICAPConnection.in_use}, 

event='icap_server_shutdown') 

self.io_loop.stop() 

 

deadline = self.io_loop.time() + (timeout or 0) 

self.io_loop.call_later(10, terminate, deadline) 

 

def _get_tids(self, tids): 

return set([int(x) for x in tids]) 

 

@tornado.gen.coroutine 

def _fetch_tenant_caps(self): 

try: 

pe_timeout = (config.getint('policy_enforcement_server', 

'timeout_ms') / 1000.0) 

pe_url = config.get('policy_enforcement_server', 'caps_lookup_url') 

caps = config.get('icap_server', 'tenant_caps_to_fetch').split(', ') 

data = {'capabilities': caps} 

headers = {'Content-Type': 'application/json'} 

http_client = tornado.httpclient.AsyncHTTPClient() 

pe_response = yield http_client.fetch( 

pe_url, 

method='POST', 

body=json.dumps(data), 

headers=headers, 

request_timeout=pe_timeout) 

json_response = json.loads(pe_response.body) 

for cap in caps: 

result = json_response.get(cap, set()) 

tenant_config[cap + '_tids'] = self._get_tids(result) 

logger.debug({'tenant_config': tenant_config}, 

event='fetch-tenant-caps') 

except Exception as ex: 

logger.error({'details': ex}, event='fetch-tenant-caps') 

 

def _periodic_callback(self): 

"""Check if the worker needs to be stopped. Also load the tenant_config""" 

logger.debug({'pending_connections': ICAPConnection.in_use, 

'total_connections': ICAPConnection.connection_id}, 

event='periodic_callback') 

now = self.io_loop.time() 

if now > self._die_after or ICAPConnection.connection_id > self._max_conn: 

self._stop_listening(config.getint('icap_server', 'grace_lifespan')) 

self._fetch_tenant_caps() 

 

def start(self, num_processes=1): 

"""Start the workers and ensure new workers are spawned if any worker dies 

 

This is a copy-paste of Tornado's implementation of TCPServer.start(). 

Tornado TCPServer implementation allows only 100 workers to be launched 

if any worker dies abnormally. We are relying on tornado to launch workers 

every few hours. So, overriding the default tornado implementation with 

max_restarts=sys.maxsize 

""" 

assert not self._started 

self._started = True 

if num_processes != 1: 

tornado.process.fork_processes(num_processes, max_restarts=sys.maxsize) 

sockets = self._pending_sockets 

self._pending_sockets = [] 

self.add_sockets(sockets) 

set_global_service_name('pnr-icap-%s' % tornado.process.task_id()) 

 

def run(self): 

"""Bind to a reuseable port and setup random lifespans for workers""" 

logger.info('#' * 50) 

logger.info('%-25s: %s', 'name', 'ICAP Server') 

logger.info('%-25s: %s', 'host', self.host) 

logger.info('%-25s: %s', 'port', self.port) 

logger.info('#' * 50) 

 

tornado.ioloop.IOLoop.configure(tornado.platform.epoll.EPollIOLoop, 

time_func=monotonic_time) 

self.bind(self.port, self.host, reuse_port=True) 

# Forks multiple sub-processes 

self.start(config.getint('icap_server', 'max_workers')) 

 

cr = config.getint('icap_server', 'max_connections_range') 

cs = config.getint('icap_server', 'max_connections') 

self._max_conn = cs + random.randint(-1 * cr, cr) 

lr = config.getint('icap_server', 'worker_lifespan_range') 

ls = config.getint('icap_server', 'avg_worker_lifespan') 

self._lifespan = ls + random.randint(-1 * lr, lr) 

self._die_after = self.io_loop.time() + self._lifespan 

logger.info({'die_after': self._lifespan, 

'max_conns': self._max_conn}, 

event='icap_server_startup') 

 

pc = tornado.ioloop.PeriodicCallback(self._periodic_callback, 60 * 1000) 

pc.start() 

self.io_loop.start() # Blocks until self.ioloop.stop() is called. 

sys.exit(-1) # On normal exit, tornado will not launch new worker. 

 

 

def setup_logger(): 

LOG_DIR = config.get('icap_server', 'log_directory') 

LOG_FILE_NAME = config.get('icap_server', 'log_file_name') 

LOG_LEVEL = config.get('icap_server', 'log_level') or 'INFO' 

logging.getLogger('').setLevel(LOG_LEVEL) 

LOG_FORMAT = 'time="%(asctime)-15s" level=%(levelname)-7s %(message)s' 

formatter = logging.Formatter(LOG_FORMAT) 

log_file = os.path.join(LOG_DIR, LOG_FILE_NAME) 

file_handler = logging.handlers.WatchedFileHandler(log_file) 

file_handler.setLevel(LOG_LEVEL) 

file_handler.setFormatter(formatter) 

# Remove the existing handlers like console_handler and add the file_handler 

for handler in logging.getLogger('').handlers: 

logging.getLogger('').removeHandler(handler) 

logging.getLogger('').addHandler(file_handler) 

 

 

if __name__ == "__main__": 

setup_logger() 

server = ICAPServer() 

server.run()