Coverage for update_manager.py : 40%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
############################################################################# # # Copyright (C) 2017 Menlo Security, Inc. # # Monitor local directory for updates to sv-cr and flash packages. # # Updates occur by modify update.json. Format for update.json is: # {"sv-cr":{"version":"1","file":"sv-cr-1.deb","sha256":"DEAD...BEEF"}, # "flash":{...}} # Files listed in update.json should exist when update.json is written. # #############################################################################
# Needed for inotify.
'sm-updates')
assert(fd == self._inotify_fd) ioloop = tornado.ioloop.IOLoop.current() if events & ioloop.ERROR: self.log.error({'error': 'inotify-error'}, event='unexpected-error') # Drain the events. We don't care what they are and just check the # metadata any time we get an event. events = os.read(fd, 4096) self._handle_update()
"""Validate update metadata.
Verify that all required fields are present, that the update file exists and hashes to the correct value. """ if 'file' not in p or 'sha256' not in p: self.log.error({'details': 'Missing key', 'metadata': p}, event='bad-metadata') return False path = os.path.join(self._update_dir, p['file']) if not os.path.exists(path): self.log.error({'details': 'Missing file', 'file': path}, event='bad-metadata') return False try: file_hash = hashlib.sha256(open(path).read()).hexdigest().lower() except Exception as e: self.log.error({'details': 'Digest error', 'file': path, 'error': e}, event='bad-metadata') return False if file_hash != p['sha256'].lower(): self.log.error({'details': 'Digest mismatch', 'actual': file_hash, 'expected': p['sha256']}, event='bad-metadata') return False return True
"""Parse metadata file and install update if necessary.""" try: update = json.loads(open(self._update_metadata).read()) except Exception as e: self.log.error({'error': e}, event='bad-metadata') return packages = {} if self._cr_version != update.get('sv-cr', {}).get('version', self._cr_version): packages['sv-cr'] = update['sv-cr'] if self._flash_version != update.get('flash', {}).get('version', self._flash_version): packages['flash'] = update['flash'] if not packages: return for p in packages.values(): if not self._check_package_metadata(p): return if self.producer: if not self.producer.freeze(): self.log.error({'details': 'Could not freeze'}, event='unexpected-error') return try: # This is intentionally done synchronously on the ioloop for # simplicity. Surrogate manager will be unresponsive for a few # seconds. dpkg_out = subprocess.check_output( ['dpkg', '-i'] + [os.path.join(self._update_dir, p['file']) for p in packages.values()]) except subprocess.CalledProcessError as e: self.log.error({'details': 'Could not install packages', 'error': e, 'metadata': packages, 'output': e.output.replace('\n', ' ')}, event='unexpected-error') else: self._cr_version = packages.get('sv-cr', {}).get('version', self._cr_version) self._flash_version = packages.get('flash', {}).get('version', self._flash_version) self.log.info({'sv-cr': self._cr_version, 'flash': self._flash_version, 'output': dpkg_out.replace('\n', ' ')}, event='updated') if self.producer: if not self.producer.unfreeze(): self.log.error({'details': 'Could not unfreeze'}, event='unexpected-error')
"""Make sure initial state is correct.
If an update file is present, handle it. Install inotify watch for updates. """ # Get the currently installed package versions. ['dpkg-query', '--showformat', '${Version}', '--show', 'sv-cr-menlosec'], stderr=open(os.devnull, 'w')).strip() ['dpkg-query', '--showformat', '${Version}', '--show', 'adobe-flashplugin'], stderr=open(os.devnull, 'w')).strip() except Exception: # If we cannot get package info, we do not do automatic updates. self.log.info({'details': 'Packages not found'}, event='disabled') return self.log.error({'details': 'Inotify FD error', 'error': os.strerror(self._inotify_fd)}, event='unexpected-error') return self._inotify_fd, self._update_dir, IN_CLOSE_WRITE) self.log.error({'details': 'Inotify watch error', 'error': os.strerror(self._inotify_watch_descriptor)}, event='unexpected-error') return self._inotify_fd, self.handle_inotify_event, ioloop.READ|ioloop.ERROR) event='started') |