Coverage for pool.py : 70%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
############################################################################## # # Copyright (C) 2015-2017 Menlo Security, Inc. # All rights reserved. # # Overview: # # The goal here is to manage a local pool of Surrogates and report # availability of Surrogates to the cluster-wide Global Surrogate Pool # (GSP) that resides in Redis. Clients are expected to allocate a # Surrogate simply by pop()'ing an entry from the global set and couple # with it. # # The key design goals include fast and simple Surrogate allocation, # tolerating buggy clients (e.g., those that allocate a Surrogate but # fail to couple with it), and tolerating unclean shutdowns that may # leave behind stale Surrogate entries in the global pool. # # Global Surrogate Pool (GSP): # # This is a Redis Set data-structure shared by all cluster # GlobalSurrogatePoolProducers (SurrogateManager) and TabAllocators # (SurrogateRouter). The set supports atomic addition and popping of # elements. Each entry in the set is a JSON string that uniquely identifies # the Surrogate and has the following fields: secret_rid, safe_rid, host, # and port. We use a set rather than a list due to the former's support for # fast membership checks and removals (needed for GC'ing) see below. # # Allocation protocol: # # Clients are expected to pop() a Surrogate from the GSP and then couple # with it by initiating a direct connection to it. Since the pop() is # atomic, it is guaranteed that no other client will be able to allocate the # same Surrogate if they stick to this protocol. Note that there are no # ordering guarantees on the pop()'d Surrogate. Surrogate connection # details are found in the pop()'d set entry. # # Stale entry handling: # # The global pool may contain stale entries due to unclean shutdown of the # SurrogateManager service or its node. It is the client's responsibility to # detect these stale entries and initiate re-allocation. The details are # left to the client, but examples include detecting the Surrogate # connection failure and simply retry'ing. # # Stale entry cleanup (TODO): # # Stale entries in the global pool have some cost: they induce client # allocation retries and can potentially increase page load times. While it # is not possible to avoid stale entries entirely with our allocation # protocol, we do try to minimize them by periodically expiring pool # entries. # # Robustness to buggy clients: # # Allocation clients (surrogate routers) may fail to couple with allocated # Surrogates due to unclean shutdown (or bugs). Without extra handling, # these allocated Surrogates would continue to live forever even though they # cannot be allocated by other clients. To prevent this, we give Surrogates # N seconds to enter coupling state after allocation, and GC it if it does # not meet that deadline. # # See also: TabAllocator in the SurrogateRouter for more details on how entries # consumed from the GSP are managed in the Global Surrogate Map (GSM). # ##############################################################################
"""A pool of Surrogates that grows and shrinks per a simple RAM and CPU aware policy. It periodically monitors system resources and adjusts the pool accordingly."""
io_loop=None): 'headroom_kb') * 1024 # How frequently to check for node resource changes. 'resize_interval_ms') # Maximum number of Surrogates added or removed from the pool during # a resize interval. 'resize_step_count') 'avg_ram_per_surrogate_kb')*1024 assert(self._avg_ram_per_surrogate > 0) # We cull Surrogates if we go over this threshold. 'cpu_critical_threshold') / 100.0) assert(0 <= self._cpu_critical_threshold) 'cpu_pct_critical_threshold') / 100.0) assert(0 <= self._cpu_pct_critical_threshold <= 1) 'cpu_cooldown_secs') 'max_pool_size') 'min_free_pool_size') # If config val is 0, then choose a value based on CPU count in order # to use up available CPU capacity in large nodes. 'max_free_pool_size') or max(min_free_pool_size, max(16, psutil.cpu_count() / 2))) 'sm-lsp', min_size=int(min_size), max_size=int(max_size), min_free_pool_size=min_free_pool_size, max_free_pool_size=max_free_pool_size) {'headroom_bytes': self._headroom_bytes, 'avg_ram_per_surrogate': self._avg_ram_per_surrogate, 'cpu_critical_threshold': self._cpu_critical_threshold, 'cpu_pct_critical_threshold': self._cpu_pct_critical_threshold, 'min_size': min_size, 'max_size': max_size, 'min_free_pool_size': min_free_pool_size, 'max_free_pool_size': max_free_pool_size, 'resize_interval_ms': self._resize_interval_ms, 'resize_step_count': self._resize_step_count}, event='started') self._resize_interval_ms)
"""The result assumes that each Surrogate consumes at most @self._avg_ram_per_surrogate.""" # 'total_rss' includes current cgroup and all descendant cgroups. # CAREFUL: RSS includes shared pages, so @pool_rss_bytes could # exceed total estimated ram needs for the entire pool. Should be a # negligible amount. pool_rss_bytes)), pool_rss_bytes
"""Bounds count by the resize step count.""" return max(-self._resize_step_count, count) else:
"""Returns a positive number indicating the number of Surrogates to add, if there is excess RAM capacity. Else, returns a negative number that culls a portion of the free pool needed to get us back into the green.""" # Motivation for @headroom_bytes: do not use up all available free # space --- other services on the node need some too. # We don't have enough RAM to support Surrogates in the current pool # (alloc + free), so free just enough to get us back in the green. count = -int(math.ceil(float(abs(excess_ram)) / self._avg_ram_per_surrogate)) else: # Only a small cooldown is needed since, unlike CPU stats, RAM stats # stabilize quickly. cooldown_secs=1, details={'event': 'ram-is-low' if count <= 0 else 'ram-is-ok', 'excess_ram': excess_ram, 'avail_ram': avail_ram, 'ram_needed': ram_needed, 'size': len(self), 'pool_rss': pool_rss, 'headroom_bytes': self._headroom_bytes, 'avg_ram_per_surrogate': self._avg_ram_per_surrogate})
"""Returns a recommendation to either cull a portion of the free pool if the system has been very busy or prohibit growth if moderately busy.""" event = 'very-busy' count = -self.free_size() # Cull the entire pool. # Prohibit growth if recent CPU activity (one sec or one min) is high. one_sec_cpu > cpu_pct_critical_threshold*100): event = 'busy' count = 0 # Prohibit growth. else: # Cooldown needed to let CPU stats stabilize, or else pool will # thrash. self._step_limit(count), 'cpu', cooldown_secs=self._cpu_cooldown_secs, details={'event': 'cpu-%s' % event, 'loadavg': [one_min, five_min, fifteen_min], 'one_sec_cpu': one_sec_cpu, 'cpu_critical_threshold': cpu_critical_threshold, 'cpu_pct_critical_threshold': cpu_pct_critical_threshold})
"""Parent override: conservatively returns the min of all sub-recommendations to avoid oversubscribing the node.""" # Consult all sub-recommenders. self._get_free_pool_resize_recommendation_based_on_cpu()] # Useful for understanding why resize did or did not take place. # E.g., recommendations are ignored if they do not conform to pool # min/max limits.
# We shouldn't be creating Surrogates if the system load is high. def create_one(self): """Parent override: creates a single pool object.""" self._state_change_cb, io_loop=self._io_loop, surrogate_log_fd=self._surrogate_log_fd)
"""Parent override: handles the details of cleaning up the Surrogate pool object.""" return # While this is a blocking call, it shouldn't block for long since # we just killed the process.
# An empty pool could happen if there are rogue processes on # the node consuming lots of RAM/CPU, or if headroom_bytes is # too high. {'avail_ram': psutil.virtual_memory().available, 'loadavg': os.getloadavg()}, event=('empty' if not len(self) else 'empty-cleared'))
"""Parent override."""
"""Parent override."""
return
'safeview-surrogate.log') # Note: we'd normally want to set the flag O_CLOEXEC, but this is not # available in python *and* is not necessary on our case, as all the # fds will be closed by safeview code prior to doing the exec(2). # # See also: https://docs.python.org/2/library/os.html#os.open log_file_name, os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o660) except Exception as e: self.log.error({'error': e, 'log_file_name': log_file_name}, event='log-open-failed', opslog=True) raise
return # This should never fail, so we don't report the exception here
"""Manages a local pool of Surrogates and advertises their availability.
Availability is advertised to the cluster-wide Global Surrogate Pool (GSP) that resides in Redis. If Redis is down (could happen during startup), we keep trying until all Surrogates have been reported. """ # Frequency at which we garbage-collect unused and/or abandoned # Surrogates.
surrogate_class=SurrogateContainer): auto_prefix=True) self._on_surrogate_state_change, self._on_surrogate_destroy, surrogate_class, io_loop=self._io_loop) # Surrogates that were not advertised in the GSP due to insertion # failure stemming from a failed Redis call. # Retry Surrogate GSP insertion until successful. # Is the underlying pool in a frozen state?
"""Returns the GSP entry name for @surrogate. It uniquely identifies the Surrogate and contains connection information that clients can use to connect to it.""" assert(surrogate.http_host) assert(surrogate.secret_rid) assert(surrogate.safe_rid) assert(surrogate.s_nid) 'secret_rid': surrogate.secret_rid, 'safe_rid': surrogate.safe_rid, 'host': surrogate.get_ip_addr(), 'display_id': surrogate._get_fully_qualified_display_name()})
"""Refresh and/or clean up state.""" # TODO: consider GC'ing abandoned surrogates (e.g., those from an # unclean shutdown). This would reduce the number of client # retries due to stale entries.
# Handle the case where a Surrogate is allocated by a consumer # but is never coupled, e.g., due to unclean shutdown on the # client side. self._redis.sismember(self.REDIS_SET_NAME, self._entry_name(surrogate))) except redis.RedisError: # Suspend GC until we have good relations with Redis again. self.log.exception('sismember failed, GC attempt aborted') return is_surrogate_allocated):
"""Advertise Surrogate in the GSP, taking care to handle failures.
This can fail if Redis is down. In that event, we keep track of the un-inserted Surrogates and retry insertion at a later time.""" except Exception as e: # Not an error--this is expected if Redis is slow to start. self.log.warning({'surrogate': surrogate, 'error': e}, event='gsp-insert-failed') self._uninserted_surrogates.add(surrogate)
"""Try to re-insert un-advertised Surrogates into the GSP, if any.""" try: self._redis.sadd(self.REDIS_SET_NAME, self._entry_name(surrogate)) except Exception: # Redis call failure. pass else: self._uninserted_surrogates.remove(surrogate) self.log.info({'surrogate': surrogate}, event='gsp-insert-retry-succeeded')
"""Update the GSP in response to Surrogate state changes.""" # Surrogate is ready to be coupled. Advertise its existence on # the GSP. if __debug__: # Coupling with a Surrogate that's not ready is forbidden. assert(not self._redis.sismember(self.REDIS_SET_NAME, self._entry_name(surrogate))) # Surrogate cannot be coupled with unless they are in the GSP. assert(surrogate not in self._uninserted_surrogates) # Mark Surrogate as in-use. # Cleanup: remove Surrogate from either the GSM or the GSP (cannot be # in both). This is just to be nice: consumers should be robust to # stale entries in the Redis datastructures. # If the surrogate has a safe_bid, it is coupled => clean the GSM. # Make sure to check the current key value to avoid races # (another SR could have tried to connect to the now dead # surrogate, saw it was dead, created a fresh one and inserted # it in the GSM). surrogate.safe_bid, val) event='gsm-delete') # Not coupled => clean the GSP. else: self._entry_name(surrogate)) except Exception as e: # Ensure proper local cleanup despite Redis exceptions. # # FIXME: tighten this up --- the only valid exception should be # network/connection related. self.log.warning({'error': e, 'error_type': type(e).__name__}, event='redis-cleanup-failed') # Looks like Surrogate died before GSP insertion succeeded. self._uninserted_surrogates.remove(surrogate)
# Remove surrogate from free pool if there. self._entry_name(surrogate)) except Exception as e: self.log.warning({'error': e, 'error_type': type(e).__name__}, event='redis-cleanup-failed') return False
if not self._local_pool.freeze(): return False self._frozen = True return True
if not self._local_pool.unfreeze(): return False self._frozen = False return True
return { 'f': self._frozen, 's': [s.get_status_dict() for s in self._local_pool.values()]}
"""Helper method to clean up Redis stale state on startup.""" self.REDIS_SET_NAME, '*%s*' % safly.config.node_id, 100): try: if json.loads(m)['s_nid'] == safly.config.node_id: stale.append(m) except Exception as e: self.log.warning( {'error': e, 'error_type': type(e).__name__, 'entry': m}, event='redis-bad-entry') except Exception as e: attempts += 1 if attempts % 100 == 1: # Print one of these every 10s. self.log.warning({'attempts': attempts, 'error': e, 'error_type': type(e).__name__}, event='redis-failure') time.sleep(0.1)
try: self._redis.srem(self.REDIS_SET_NAME, *stale) self.log.info({'entries': stale}, event='redis-cleanup') except Exception as e: self.log.warning( {'error': e, 'error_type': type(e).__name__, 'entries': stale}, event='redis-cleanup-failed')
# Clean the GSM, but don't fail if something goes wrong. # We cannot match on hash values, so we scan the entire hash. try: value = json.loads(m[1])['value'] if (json.loads(value)['s_nid'] != safly.config.node_id): continue except Exception as e: self.log.warning( {'error': e, 'error_type': type(e).__name__, 'entry': m}, event='redis-bad-entry') continue try: if self._redis.hdelx_if_equal(self.REDIS_KEY_NAME, m[0], value): self.log.info({'entry': m}, event='redis-cleanup') except Exception as e: self.log.warning({'error': e, 'error_type': type(e).__name__, 'entry': m}, event='redis-cleanup-failed') except Exception as e: self.log.warning({'error': e, 'error_type': type(e).__name__}, event='redis-cleanup-failed')
"""Initialize pool state.
Because we clean up stale redis entries here, this blocks until Redis can be reached. """ self.GC_INTERVAL_MS) self._on_gsp_insert_timer, self.GSP_INSERT_RETRY_INTERVAL_MS)
|