| # Copyright 2021 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Utility module for HTTP.""" |
| import json |
| import logging |
| import os |
| import sys |
| import tempfile |
| import zipfile |
| |
| import requests |
| |
| # pylint: disable=wrong-import-position,import-error |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| import retry |
| |
| _DOWNLOAD_URL_RETRIES = 3 |
| _DOWNLOAD_URL_BACKOFF = 1 |
| |
| |
| def download_and_unpack_zip(url, extract_directory, headers=None): |
| """Downloads and unpacks a zip file from an HTTP URL. |
| |
| Args: |
| url: A url to the zip file to be downloaded and unpacked. |
| extract_directory: The path where the zip file should be extracted to. |
| headers: (Optional) HTTP headers to send with the download request. |
| |
| Returns: |
| True on success. |
| """ |
| if headers is None: |
| headers = {} |
| |
| if not os.path.exists(extract_directory): |
| logging.error('Extract directory: %s does not exist.', extract_directory) |
| return False |
| |
| # Gives the temporary zip file a unique identifier in the case that |
| # that download_and_unpack_zip is done in parallel. |
| with tempfile.NamedTemporaryFile(suffix='.zip') as tmp_file: |
| if not download_url(url, tmp_file.name, headers=headers): |
| return False |
| |
| try: |
| with zipfile.ZipFile(tmp_file.name, 'r') as zip_file: |
| zip_file.extractall(extract_directory) |
| except zipfile.BadZipFile: |
| logging.error('Error unpacking zip from %s. Bad Zipfile.', url) |
| return False |
| |
| return True |
| |
| |
| def download_url(*args, **kwargs): |
| """Wrapper around _download_url that returns False if _download_url |
| exceptions.""" |
| try: |
| return _download_url(*args, **kwargs) |
| except Exception: # pylint: disable=broad-except |
| return False |
| |
| |
| def get_json_from_url(url): |
| """Gets a json object from a specified HTTP URL. |
| |
| Args: |
| url: The url of the json to be downloaded. |
| |
| Returns: |
| A dictionary deserialized from JSON or None on failure. |
| """ |
| response = requests.get(url) |
| try: |
| return response.json() |
| except (ValueError, TypeError, json.JSONDecodeError) as err: |
| logging.error('Loading json from url %s failed with: %s.', url, str(err)) |
| return None |
| |
| |
| @retry.wrap(_DOWNLOAD_URL_RETRIES, _DOWNLOAD_URL_BACKOFF) |
| def _download_url(url, filename, headers=None): |
| """Downloads the file located at |url|, using HTTP to |filename|. |
| |
| Args: |
| url: A url to a file to download. |
| filename: The path the file should be downloaded to. |
| headers: (Optional) HTTP headers to send with the download request. |
| |
| Returns: |
| True on success. |
| """ |
| if headers is None: |
| headers = {} |
| |
| response = requests.get(url, headers=headers) |
| |
| if response.status_code != 200: |
| logging.error('Unable to download from: %s. Code: %d. Content: %s.', url, |
| response.status_code, response.content) |
| return False |
| |
| with open(filename, 'wb') as file_handle: |
| file_handle.write(response.content) |
| |
| return True |