Coverage for src/util.py : 62%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
""" A collection of utility functions and application wide constants. """
'config/override.ini'] + pnrconfig.CONFIG_FILES
# NOTE : Please do not change the order of page_request_accept_headers array # If required, add more headers to the end of the array 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'text/html, application/xhtml+xml, */*', 'text/html, application/xhtml+xml, image/jxr, */*', 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8' ] # Windows7 + IE11 'image/jpeg, application/x-ms-application, image/gif, ' 'application/xaml+xml, image/pjpeg, application/x-ms-xbap', # Windows10 + IE11 'image/gif, image/jpeg, image/pjpeg, application/x-ms-application, ' 'application/xaml+xml, application/x-ms-xbap' ] 'application/x-font-woff', 'application/font-woff', 'image/x-icon']
# Local pages '403_forbidden_download_size_exceeded.html')
else: # Example: master-xxx.menlosecurity.com (no cookies) SAFEVIEW_HOST = (config.get('networking', 'safeview_domain_hostname') or config.get('networking', 'safeview_hostname'))
1000.0) 'verify_safeview_server_cert') 'icap_whitelist_mod_enabled')
# URLs for external service's APIs 'https://%s/safeview-director/cluster_host' % SAFEVIEW_HOST)
# Url for icap-server to initiate protocol with a file-server 'https://%s/safeview-fileserv-routing/icap_file_request?attempt=%%s' % SAFEVIEW_HOST) # Incomplete url for icap-server to post file to a specific file-server 'https://%s/safeview-fileserv/icap_file_transfer?cid=%%s' % SAFEVIEW_HOST)
# Incomplete file-server url for platform internal status requests 'https://%s/safeview-fileserv/icap_status/%%s/?cid=%%s' % SAFEVIEW_HOST) # Incomplete file-server url for platform internal transfers 'https://%s/safeview-fileserv/icap_retrieval/%%s/?cid=%%s' % SAFEVIEW_HOST)
# Incomplete file-server url for external client to get the status iframe from # the file-server. a is file_id, b is single_use_code 'https://%s/safeview-fileserv/dl_status?cid=%s&a=%s&b=%s')
# Shared secret between icap and file servers until icap can use internal # HAProxy routes
# Timeouts for connections to external services 1000.0) 1000.0) 'native_processing_timeout') 1000.0) 'transfer_timeout_ms') / 1000.0)
# Load all page templates os.path.join(os.path.dirname(__file__), os.pardir, "templates"))
# Dump objects/data during exceptions 'redis_logging_channel') 'icap_server', 'strict_resource_mode_logging_enabled')
global redis_connection socket_timeout=config.getfloat( 'redis', 'socket_timeout'), socket_connect_timeout=config.getfloat( 'redis', 'socket_connect_timeout'), socket_keepalive=True, retry_on_timeout=True) except Exception as ex: log.error({'details': ex}, event='redis-connection-unavailable')
"""Return the IE (trident) engine version from the user agent.""" match = TRIDENT_VERSION_PATTERN.search(user_agent) if match: return match.groups()[0].split('.')[0] return None
"""Add the browser family/version from the user agent to the supplied dict. """
return
return
# Check for IE compatibility mode trident_version = _get_ie_engine_version(user_agent) if trident_version: _dict['browser_version'] = str(int(trident_version) + 4)
"""Unpack the supplied pyicap headers into _dict.
pyicap collates the headers into header dicts, in the form key=[val1, val2], so this function will unpack the first entry of the list into _dict, in the form _dict[key]=val1. """ continue # Druid cannot handle multiple values. A new druid schema may be # required if we see a lot of this error. log.debug({'header': key, 'len': len(value), 'value': value}, event='multiple_value_headers_error')
"""Add the request data from the http_request object into _dict."""
"""Return the user data from the http_request/icap_request object.
The required information is extracted from various header fields.
Return is a tuple of (tid, uid), where tid is an int. If user data is unknown, responds with (-1, "Unknown"). """ # Prefer x-authenticated-user and x-msip-tenant-id but if not set use # x-icap-userdata, and if that's not set either, fall back to the # default of tenant '-1' and user 'Unknown' # If tid is still -1, try to get the TID from the Client IP # this is from the icap request header tid = int(toks[0]) except Exception as e: log.exception({'error': str(e), 'error_type': type(e).__name__}, event='cant-get-user-tenant')
# The icap response headers will be used by squid to tie the # request and response # If squid whitelist module is enabled (for e.g. cloud), log an error # since the whitelist module should have generated the cache key log.error(_dict, event='icap_cache_key_not_found')
"""Get the file name from the http request / response objects.
Inspects the content disposition header first, if this does not give a potential file name, fall back to parsing the http request object's url. If this still does not give a potential file name, use default. """ # Determine the file name file_name = '' content_disposition = http_response.get_header('content-disposition', '') if content_disposition: params = cgi.parse_header(content_disposition)[1] file_name = params.get('filename', '') if not file_name: # No filename from content-disposition, so attempt to determine one # from the final path segment (trailing / are stripped) or fallback # to default. file_name = (urlparse.urlparse(http_request.uri).path .strip('/').split('/').pop() or default)
# Filename could be raw bytes, or url quoted. Therefore ensure it is # decoded to a unicode object. try: unicode_filename = tornado.escape.url_unescape(file_name) except UnicodeError: # Failed to decode using UTF8 codec, so fall back to latin-1 try: unicode_filename = tornado.escape.url_unescape(file_name, 'latin-1') except UnicodeError as e: # Highly unexpected error decoding, as latin-1 should always decode. # Log an error and fall back to 'default' filename. unicode_filename = default log.error({'file_name': file_name, 'type': type(file_name), 'error': e}, event='filename-decode-failure')
# Further sanitize the filename - removing any slashes and odd cases unicode_filename = unicode_filename.replace('/', '-') if unicode_filename in ('', '.', '..', '/', '-'): unicode_filename = default
return unicode_filename
"""Gzip compress and base64 encode the preview_data to send to pnr-e.""" return ''
accept_language='en', mod_type='request'): """Prepare and return block page contents
Get block page from safeview if present, if not then get the standard block page from the local file system """ assert all( x in block_info for x in ['user', 'tid', 'url', 'categories', 'result'])
url_parts = ('https', SAFEVIEW_HOST, SAFEVIEW_BLOCK_PATH, '', '') block_url = urlparse.urlunsplit(url_parts) html_content = get_localized_page(local_block_page, accept_language) log_dict = { 'block_url': block_url, 'mod_type': mod_type, 'accept_language': accept_language } try: http_client = httpclient.AsyncHTTPClient() response = yield http_client.fetch( block_url, method='POST', headers=JSON_HEADER, body=json.dumps(block_info), request_timeout=SAFEVIEW_TIMEOUT, validate_cert=VERIFY_SAFEVIEW_CERT) if response.code == 200: response_data = json.loads(response.body) # Q: Is this needed since we already have a 200 response code if 'html' in response_data: html_content = response_data['html'].encode('utf-8') except httpclient.HTTPError as ex: log_dict['details'] = ex log.error(log_dict, event='prepare-block-page') except Exception as ex: log_dict['details'] = ex log.exception(log_dict, event='prepare-block-page') raise tornado.gen.Return(html_content)
"""Determine the browser locale from the Accept-language header.
Returns a tornado.locale.Locale object using the languages in the accept_language_header, before defaulting to 'default' if a locale cannot be found.
This function is based on tornado's web.RequestHandler.get_browser_locale. """ # Check translation setting is enabled and accept_language is not empty if (config.getboolean('service', 'enable_translations') and accept_language_header): languages = accept_language_header.split(",") locales = [] for language in languages: parts = language.strip().split(";") if len(parts) > 1 and parts[1].startswith("q="): try: score = float(parts[1][2:]) except (ValueError, TypeError): score = 0.0 else: score = 1.0 locales.append((parts[0], score)) if locales: locales.sort(key=lambda pair: pair[1], reverse=True) codes = [l[0] for l in locales] return tornado.locale.get(*codes) return tornado.locale.get(default)
"""Create a localized version of the local page_name template.
Uses the accept_language_header to determine an appropriate language. Page_name should be the name of the template file, preferably using the constants defined in this file. """ html_content = ''
try: template = TEMPLATE_LOADER.load(page_name) locale = get_browser_locale(accept_language_header)
# The _() function is not provided automatically when invoking generate # directly, so add it as done by tornado's render_string method. html_content = template.generate(_=locale.translate, **kwargs) except Exception as e: log.exception({'error': str(e), 'error_type': type(e).__name__, 'page': page_name, 'accept-language': accept_language_header}, event='error-generating-local-page')
return html_content |