Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

458

459

460

461

462

463

464

465

466

467

468

469

470

471

472

473

474

475

476

477

478

479

480

481

482

483

484

485

486

487

488

489

490

491

492

493

494

495

496

497

498

499

500

501

502

503

504

505

506

507

508

509

510

511

512

513

514

515

516

517

518

519

520

521

522

523

524

525

526

527

528

529

530

531

532

533

534

535

536

537

538

539

540

541

542

543

544

545

546

547

548

549

550

551

552

553

554

555

556

557

558

559

560

561

562

563

564

565

566

567

568

569

570

571

572

573

574

575

576

############################################################################## 

# 

# Copyright (C) 2015-2017 Menlo Security, Inc. 

# All rights reserved. 

# 

# Overview: 

# 

# The goal here is to manage a local pool of Surrogates and report 

# availability of Surrogates to the cluster-wide Global Surrogate Pool 

# (GSP) that resides in Redis. Clients are expected to allocate a 

# Surrogate simply by pop()'ing an entry from the global set and couple 

# with it. 

# 

# The key design goals include fast and simple Surrogate allocation, 

# tolerating buggy clients (e.g., those that allocate a Surrogate but 

# fail to couple with it), and tolerating unclean shutdowns that may 

# leave behind stale Surrogate entries in the global pool. 

# 

# Global Surrogate Pool (GSP): 

# 

# This is a Redis Set data-structure shared by all cluster 

# GlobalSurrogatePoolProducers (SurrogateManager) and TabAllocators 

# (SurrogateRouter). The set supports atomic addition and popping of 

# elements. Each entry in the set is a JSON string that uniquely identifies 

# the Surrogate and has the following fields: secret_rid, safe_rid, host, 

# and port. We use a set rather than a list due to the former's support for 

# fast membership checks and removals (needed for GC'ing) see below. 

# 

# Allocation protocol: 

# 

# Clients are expected to pop() a Surrogate from the GSP and then couple 

# with it by initiating a direct connection to it. Since the pop() is 

# atomic, it is guaranteed that no other client will be able to allocate the 

# same Surrogate if they stick to this protocol. Note that there are no 

# ordering guarantees on the pop()'d Surrogate. Surrogate connection 

# details are found in the pop()'d set entry. 

# 

# Stale entry handling: 

# 

# The global pool may contain stale entries due to unclean shutdown of the 

# SurrogateManager service or its node. It is the client's responsibility to 

# detect these stale entries and initiate re-allocation. The details are 

# left to the client, but examples include detecting the Surrogate 

# connection failure and simply retry'ing. 

# 

# Stale entry cleanup (TODO): 

# 

# Stale entries in the global pool have some cost: they induce client 

# allocation retries and can potentially increase page load times. While it 

# is not possible to avoid stale entries entirely with our allocation 

# protocol, we do try to minimize them by periodically expiring pool 

# entries. 

# 

# Robustness to buggy clients: 

# 

# Allocation clients (surrogate routers) may fail to couple with allocated 

# Surrogates due to unclean shutdown (or bugs). Without extra handling, 

# these allocated Surrogates would continue to live forever even though they 

# cannot be allocated by other clients. To prevent this, we give Surrogates 

# N seconds to enter coupling state after allocation, and GC it if it does 

# not meet that deadline. 

# 

# See also: TabAllocator in the SurrogateRouter for more details on how entries 

# consumed from the GSP are managed in the Global Surrogate Map (GSM). 

# 

############################################################################## 

import json 

import logging 

import math 

import os 

import psutil 

import redis 

import time 

import tornado.ioloop 

 

import safly.config 

import safly.redis_client 

from surrogate import SurrogateContainer 

from safly.config import config 

from safly.logger import SafelyLoggerMixin 

from safly.pool import SafelyPool, ResizeRecommendation 

from safly.timex import alert_if_slow 

 

class LocalSurrogatePool(SafelyPool): 

"""A pool of Surrogates that grows and shrinks per a simple RAM and CPU 

aware policy. It periodically monitors system resources and adjusts the 

pool accordingly.""" 

 

 

def __init__(self, ns_pool_man, state_change_cb, destroy_cb, surrogate_class, 

io_loop=None): 

self._ns_pool_man = ns_pool_man 

self._state_change_cb = state_change_cb 

self._destroy_cb = destroy_cb 

self._surrogate_class = surrogate_class 

self._io_loop = io_loop or tornado.ioloop.IOLoop.instance() 

self._headroom_bytes = config.getint('surrogate_manager', 

'headroom_kb') * 1024 

# How frequently to check for node resource changes. 

self._resize_interval_ms = config.getint('surrogate_manager', 

'resize_interval_ms') 

# Maximum number of Surrogates added or removed from the pool during 

# a resize interval. 

self._resize_step_count = config.getint('surrogate_manager', 

'resize_step_count') 

self._avg_ram_per_surrogate = config.getint('surrogate_manager', 

'avg_ram_per_surrogate_kb')*1024 

assert(self._avg_ram_per_surrogate > 0) 

# We cull Surrogates if we go over this threshold. 

self._cpu_critical_threshold = (config.getint('surrogate_manager', 

'cpu_critical_threshold') / 100.0) 

assert(0 <= self._cpu_critical_threshold) 

self._cpu_pct_critical_threshold = (config.getint('surrogate_manager', 

'cpu_pct_critical_threshold') / 100.0) 

assert(0 <= self._cpu_pct_critical_threshold <= 1) 

self._cpu_cooldown_secs = config.getint('surrogate_manager', 

'cpu_cooldown_secs') 

max_size = config.getint('surrogate_manager', 

'max_pool_size') 

min_size = config.get('surrogate_manager', 'min_pool_size') 

if min_size == 'cpu_count': 

min_size = psutil.cpu_count() 

min_free_pool_size = config.getint('surrogate_manager', 

'min_free_pool_size') 

# If config val is 0, then choose a value based on CPU count in order 

# to use up available CPU capacity in large nodes. 

max_free_pool_size = (config.getint('surrogate_manager', 

'max_free_pool_size') or 

max(min_free_pool_size, 

max(16, psutil.cpu_count() / 2))) 

self._surrogate_log_fd = None 

super(LocalSurrogatePool, self).__init__( 

'sm-lsp', 

min_size=int(min_size), 

max_size=int(max_size), 

min_free_pool_size=min_free_pool_size, 

max_free_pool_size=max_free_pool_size) 

self.log.info( 

{'headroom_bytes': self._headroom_bytes, 

'avg_ram_per_surrogate': self._avg_ram_per_surrogate, 

'cpu_critical_threshold': self._cpu_critical_threshold, 

'cpu_pct_critical_threshold': self._cpu_pct_critical_threshold, 

'min_size': min_size, 

'max_size': max_size, 

'min_free_pool_size': min_free_pool_size, 

'max_free_pool_size': max_free_pool_size, 

'resize_interval_ms': self._resize_interval_ms, 

'resize_step_count': self._resize_step_count}, 

event='started') 

self._resize_pc = tornado.ioloop.PeriodicCallback(self._on_resize_timer, 

self._resize_interval_ms) 

self._resize_pc.start() 

self._prior_recs = None 

self._prior_len = 0 

 

def _calc_additional_ram_needed_for_current_pool(self): 

"""The result assumes that each Surrogate consumes at most 

@self._avg_ram_per_surrogate.""" 

pool_rss_bytes = 0 

for surrogate in self.values(): 

# 'total_rss' includes current cgroup and all descendant cgroups. 

rss_usage_bytes = surrogate.get_stats()['total_rss'] 

pool_rss_bytes += rss_usage_bytes 

# CAREFUL: RSS includes shared pages, so @pool_rss_bytes could 

# exceed total estimated ram needs for the entire pool. Should be a 

# negligible amount. 

return max(0, ((len(self) * self._avg_ram_per_surrogate) - 

pool_rss_bytes)), pool_rss_bytes 

 

def _step_limit(self, count): 

"""Bounds count by the resize step count.""" 

if count < 0: 

return max(-self._resize_step_count, count) 

else: 

return min(self._resize_step_count, count) 

 

def _get_free_pool_resize_recommendation_based_on_ram(self): 

"""Returns a positive number indicating the number of Surrogates to 

add, if there is excess RAM capacity. Else, returns a negative number 

that culls a portion of the free pool needed to get us back into the 

green.""" 

avail_ram = psutil.virtual_memory().available 

ram_needed, pool_rss = self._calc_additional_ram_needed_for_current_pool() 

# Motivation for @headroom_bytes: do not use up all available free 

# space --- other services on the node need some too. 

excess_ram = (avail_ram - self._headroom_bytes - ram_needed) 

if excess_ram < 0: 

# We don't have enough RAM to support Surrogates in the current pool 

# (alloc + free), so free just enough to get us back in the green. 

count = -int(math.ceil(float(abs(excess_ram)) / 

self._avg_ram_per_surrogate)) 

else: 

count = int(excess_ram / self._avg_ram_per_surrogate) 

# Only a small cooldown is needed since, unlike CPU stats, RAM stats 

# stabilize quickly. 

return ResizeRecommendation(self._step_limit(count), 'ram', 

cooldown_secs=1, 

details={'event': 'ram-is-low' if count <= 0 else 'ram-is-ok', 

'excess_ram': excess_ram, 

'avail_ram': avail_ram, 

'ram_needed': ram_needed, 

'size': len(self), 

'pool_rss': pool_rss, 

'headroom_bytes': self._headroom_bytes, 

'avg_ram_per_surrogate': self._avg_ram_per_surrogate}) 

 

def _get_free_pool_resize_recommendation_based_on_cpu(self): 

"""Returns a recommendation to either cull a portion of the free pool 

if the system has been very busy or prohibit growth if moderately 

busy.""" 

one_min, five_min, fifteen_min = os.getloadavg() 

one_sec_cpu = psutil.cpu_percent(interval=None) 

cpu_critical_threshold = psutil.cpu_count() * self._cpu_critical_threshold 

cpu_pct_critical_threshold = self._cpu_pct_critical_threshold 

if five_min > cpu_critical_threshold: 

event = 'very-busy' 

count = -self.free_size() # Cull the entire pool. 

# Prohibit growth if recent CPU activity (one sec or one min) is high. 

elif (one_min > cpu_critical_threshold or 

one_sec_cpu > cpu_pct_critical_threshold*100): 

event = 'busy' 

count = 0 # Prohibit growth. 

else: 

event = 'free' 

count = self.max_free_pool_size # Allow growth. 

# Cooldown needed to let CPU stats stabilize, or else pool will 

# thrash. 

return ResizeRecommendation( 

self._step_limit(count), 'cpu', 

cooldown_secs=self._cpu_cooldown_secs, 

details={'event': 'cpu-%s' % event, 

'loadavg': [one_min, five_min, 

fifteen_min], 

'one_sec_cpu': one_sec_cpu, 

'cpu_critical_threshold': cpu_critical_threshold, 

'cpu_pct_critical_threshold': cpu_pct_critical_threshold}) 

 

def get_free_pool_resize_recommendation(self, reason): 

"""Parent override: conservatively returns the min of all 

sub-recommendations to avoid oversubscribing the node.""" 

# Consult all sub-recommenders. 

recs = [self._get_free_pool_resize_recommendation_based_on_ram(), 

self._get_free_pool_resize_recommendation_based_on_cpu()] 

if self._prior_recs != recs: # Avoid spraying the logs. 

self._prior_recs = recs 

# Useful for understanding why resize did or did not take place. 

# E.g., recommendations are ignored if they do not conform to pool 

# min/max limits. 

self.log.info({'recs': recs}, event='recommendations-changed') 

return sorted(recs, key=lambda rec: rec.count)[0] 

 

# We shouldn't be creating Surrogates if the system load is high. 

@alert_if_slow(max_secs=10) 

def create_one(self): 

"""Parent override: creates a single pool object.""" 

sr = self._surrogate_class(self._ns_pool_man, 

self._state_change_cb, 

io_loop=self._io_loop, 

surrogate_log_fd=self._surrogate_log_fd) 

sr.start(alsa_list_size=config.getint('surrogate', 'alsa_devices_per_browser')) 

return sr, sr.safe_rid 

 

@alert_if_slow(max_secs=1) 

def destroy_one(self, safe_rid, free_only=False): 

"""Parent override: handles the details of cleaning up the Surrogate 

pool object.""" 

surrogate = self[safe_rid] 

was_free = self._destroy_cb(surrogate) 

if free_only and not was_free: 

return 

super(LocalSurrogatePool, self).destroy_one(safe_rid, free_only) 

surrogate.kill('destroy-one') 

# While this is a blocking call, it shouldn't block for long since 

# we just killed the process. 

surrogate.wait() 

 

def _on_resize_timer(self): 

self.resize_pool('periodic') 

if len(self) != self._prior_len: 

if not self._prior_len or not len(self): 

# An empty pool could happen if there are rogue processes on 

# the node consuming lots of RAM/CPU, or if headroom_bytes is 

# too high. 

self.log.log(logging.ERROR if not len(self) else logging.INFO, 

{'avail_ram': psutil.virtual_memory().available, 

'loadavg': os.getloadavg()}, 

event=('empty' if not len(self) else 'empty-cleared')) 

self._prior_len = len(self) 

 

def start(self): 

"""Parent override.""" 

self.init_surrogate_related() 

super(LocalSurrogatePool, self).start() 

 

def destroy(self): 

"""Parent override.""" 

if self._resize_pc: 

self._resize_pc.stop() 

self._resize_pc = None 

super(LocalSurrogatePool, self).destroy() 

self.destroy_surrogate_related() 

 

def init_surrogate_related(self): 

if self._surrogate_log_fd is not None: 

return 

 

log_file_name = os.path.join(config.get('service', 'log_dir'), 

'safeview-surrogate.log') 

try: 

# Note: we'd normally want to set the flag O_CLOEXEC, but this is not 

# available in python *and* is not necessary on our case, as all the 

# fds will be closed by safeview code prior to doing the exec(2). 

# 

# See also: https://docs.python.org/2/library/os.html#os.open 

self._surrogate_log_fd = os.open( 

log_file_name, 

os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o660) 

except Exception as e: 

self.log.error({'error': e, 'log_file_name': log_file_name}, 

event='log-open-failed', opslog=True) 

raise 

 

def destroy_surrogate_related(self): 

if self._surrogate_log_fd is None: 

return 

# This should never fail, so we don't report the exception here 

os.close(self._surrogate_log_fd) 

self._surrogate_log_fd = None 

 

 

class GlobalSurrogatePoolProducer(SafelyLoggerMixin): 

"""Manages a local pool of Surrogates and advertises their availability. 

 

Availability is advertised to the cluster-wide Global Surrogate Pool (GSP) 

that resides in Redis. If Redis is down (could happen during startup), we 

keep trying until all Surrogates have been reported. 

""" 

REDIS_SET_NAME = 'GSP' # Global Surrogate Pool 

REDIS_KEY_NAME = 'GSM' # Global Surrogate Map 

# Frequency at which we garbage-collect unused and/or abandoned 

# Surrogates. 

GC_INTERVAL_MS = 10000 

GSP_INSERT_RETRY_INTERVAL_MS = 12000 

 

def __init__(self, ns_pool_man, io_loop=None, 

surrogate_class=SurrogateContainer): 

super(GlobalSurrogatePoolProducer, self).__init__('sm', 

auto_prefix=True) 

self._ns_pool_man = ns_pool_man 

self._io_loop = io_loop or tornado.ioloop.IOLoop.instance() 

self._local_pool = LocalSurrogatePool(ns_pool_man, 

self._on_surrogate_state_change, 

self._on_surrogate_destroy, 

surrogate_class, 

io_loop=self._io_loop) 

self._redis = safly.redis_client.get_redis() 

# Surrogates that were not advertised in the GSP due to insertion 

# failure stemming from a failed Redis call. 

self._uninserted_surrogates = set() 

# Retry Surrogate GSP insertion until successful. 

self._gsp_insert_pc = None 

# Is the underlying pool in a frozen state? 

self._frozen = False 

 

def _entry_name(self, surrogate): 

"""Returns the GSP entry name for @surrogate. It uniquely 

identifies the Surrogate and contains connection information that 

clients can use to connect to it.""" 

assert(surrogate.http_host) 

assert(surrogate.secret_rid) 

assert(surrogate.safe_rid) 

assert(surrogate.s_nid) 

return json.dumps({'s_nid': surrogate.s_nid, 

'secret_rid': surrogate.secret_rid, 

'safe_rid': surrogate.safe_rid, 

'host': surrogate.get_ip_addr(), 

'display_id': surrogate._get_fully_qualified_display_name()}) 

 

def _on_gc_timer(self): 

"""Refresh and/or clean up state.""" 

for surrogate in self._local_pool.values(): 

# TODO: consider GC'ing abandoned surrogates (e.g., those from an 

# unclean shutdown). This would reduce the number of client 

# retries due to stale entries. 

 

# Handle the case where a Surrogate is allocated by a consumer 

# but is never coupled, e.g., due to unclean shutdown on the 

# client side. 

try: 

is_surrogate_allocated = (not 

self._redis.sismember(self.REDIS_SET_NAME, 

self._entry_name(surrogate))) 

except redis.RedisError: 

# Suspend GC until we have good relations with Redis again. 

self.log.exception('sismember failed, GC attempt aborted') 

return 

if (surrogate.state == surrogate.STATE_READY and 

is_surrogate_allocated): 

surrogate.set_coupling_deadline() 

 

def _insert_into_gsp(self, surrogate): 

"""Advertise Surrogate in the GSP, taking care to handle failures. 

 

This can fail if Redis is down. In that event, we keep track 

of the un-inserted Surrogates and retry insertion at a later time.""" 

try: 

self._redis.sadd(self.REDIS_SET_NAME, self._entry_name(surrogate)) 

except Exception as e: 

# Not an error--this is expected if Redis is slow to start. 

self.log.warning({'surrogate': surrogate, 'error': e}, 

event='gsp-insert-failed') 

self._uninserted_surrogates.add(surrogate) 

 

def _on_gsp_insert_timer(self): 

"""Try to re-insert un-advertised Surrogates into the GSP, if any.""" 

for surrogate in list(self._uninserted_surrogates): 

try: 

self._redis.sadd(self.REDIS_SET_NAME, self._entry_name(surrogate)) 

except Exception: 

# Redis call failure. 

pass 

else: 

self._uninserted_surrogates.remove(surrogate) 

self.log.info({'surrogate': surrogate}, 

event='gsp-insert-retry-succeeded') 

 

def _on_surrogate_state_change(self, surrogate): 

"""Update the GSP in response to Surrogate state changes.""" 

if surrogate.state == surrogate.STATE_READY: 

# Surrogate is ready to be coupled. Advertise its existence on 

# the GSP. 

self._insert_into_gsp(surrogate) 

elif surrogate.state == surrogate.STATE_COUPLED: 

if __debug__: 

# Coupling with a Surrogate that's not ready is forbidden. 

assert(not self._redis.sismember(self.REDIS_SET_NAME, 

self._entry_name(surrogate))) 

# Surrogate cannot be coupled with unless they are in the GSP. 

assert(surrogate not in self._uninserted_surrogates) 

# Mark Surrogate as in-use. 

self._local_pool.get_specific(surrogate.safe_rid) 

elif surrogate.state == surrogate.STATE_DEAD: 

# Cleanup: remove Surrogate from either the GSM or the GSP (cannot be 

# in both). This is just to be nice: consumers should be robust to 

# stale entries in the Redis datastructures. 

try: 

# If the surrogate has a safe_bid, it is coupled => clean the GSM. 

if surrogate.safe_bid: 

val = self._entry_name(surrogate) 

# Make sure to check the current key value to avoid races 

# (another SR could have tried to connect to the now dead 

# surrogate, saw it was dead, created a fresh one and inserted 

# it in the GSM). 

deleted = self._redis.hdelx_if_equal(self.REDIS_KEY_NAME, 

surrogate.safe_bid, val) 

if deleted: 

self.log.info('key=%s, val=%s', surrogate.safe_bid, val, 

event='gsm-delete') 

# Not coupled => clean the GSP. 

else: 

self._redis.srem(self.REDIS_SET_NAME, 

self._entry_name(surrogate)) 

except Exception as e: 

# Ensure proper local cleanup despite Redis exceptions. 

# 

# FIXME: tighten this up --- the only valid exception should be 

# network/connection related. 

self.log.warning({'error': e, 'error_type': type(e).__name__}, 

event='redis-cleanup-failed') 

if surrogate in self._uninserted_surrogates: 

# Looks like Surrogate died before GSP insertion succeeded. 

self._uninserted_surrogates.remove(surrogate) 

self._local_pool.remove(surrogate.safe_rid) 

 

def _on_surrogate_destroy(self, surrogate): 

# Remove surrogate from free pool if there. 

try: 

res = self._redis.srem(self.REDIS_SET_NAME, 

self._entry_name(surrogate)) 

except Exception as e: 

self.log.warning({'error': e, 'error_type': type(e).__name__}, 

event='redis-cleanup-failed') 

return False 

return bool(res) 

 

def freeze(self): 

if not self._local_pool.freeze(): 

return False 

self._frozen = True 

return True 

 

def unfreeze(self): 

if not self._local_pool.unfreeze(): 

return False 

self._frozen = False 

return True 

 

def get_status(self): 

return { 

'f': self._frozen, 

's': [s.get_status_dict() for s in self._local_pool.values()]} 

 

def _cleanup_redis(self): 

"""Helper method to clean up Redis stale state on startup.""" 

attempts = 0 

stale = [] 

while True: 

try: 

for m in self._redis.sscan_iter( 

self.REDIS_SET_NAME, '*%s*' % safly.config.node_id, 100): 

try: 

if json.loads(m)['s_nid'] == safly.config.node_id: 

stale.append(m) 

except Exception as e: 

self.log.warning( 

{'error': e, 'error_type': type(e).__name__, 'entry': m}, 

event='redis-bad-entry') 

break 

except Exception as e: 

attempts += 1 

if attempts % 100 == 1: # Print one of these every 10s. 

self.log.warning({'attempts': attempts, 'error': e, 

'error_type': type(e).__name__}, 

event='redis-failure') 

time.sleep(0.1) 

 

if stale: 

try: 

self._redis.srem(self.REDIS_SET_NAME, *stale) 

self.log.info({'entries': stale}, event='redis-cleanup') 

except Exception as e: 

self.log.warning( 

{'error': e, 'error_type': type(e).__name__, 'entries': stale}, 

event='redis-cleanup-failed') 

 

# Clean the GSM, but don't fail if something goes wrong. 

try: 

# We cannot match on hash values, so we scan the entire hash. 

for m in self._redis.hscan_iter(self.REDIS_KEY_NAME, count=100): 

try: 

value = json.loads(m[1])['value'] 

if (json.loads(value)['s_nid'] != safly.config.node_id): 

continue 

except Exception as e: 

self.log.warning( 

{'error': e, 'error_type': type(e).__name__, 'entry': m}, 

event='redis-bad-entry') 

continue 

try: 

if self._redis.hdelx_if_equal(self.REDIS_KEY_NAME, m[0], value): 

self.log.info({'entry': m}, event='redis-cleanup') 

except Exception as e: 

self.log.warning({'error': e, 'error_type': type(e).__name__, 

'entry': m}, event='redis-cleanup-failed') 

except Exception as e: 

self.log.warning({'error': e, 'error_type': type(e).__name__}, 

event='redis-cleanup-failed') 

 

def start(self): 

"""Initialize pool state. 

 

Because we clean up stale redis entries here, this blocks until Redis can 

be reached. 

""" 

self._cleanup_redis() 

self._io_loop.add_callback(self._local_pool.start) 

self._gc_pc = tornado.ioloop.PeriodicCallback(self._on_gc_timer, 

self.GC_INTERVAL_MS) 

self._gc_pc.start() 

self._gsp_insert_pc = tornado.ioloop.PeriodicCallback( 

self._on_gsp_insert_timer, 

self.GSP_INSERT_RETRY_INTERVAL_MS) 

self._gsp_insert_pc.start() 

 

def destroy(self): 

self._local_pool.destroy()