| # Copyright 2018 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import fnmatch |
| import locale |
| import os |
| import pathlib |
| import shutil |
| import subprocess |
| import sys |
| import tempfile |
| import threading |
| import time |
| from typing import Dict, Iterable, List |
| |
| import google.protobuf.json_format |
| import watchdog.events |
| import watchdog.observers |
| |
| from synthtool.log import logger |
| from synthtool.protos import metadata_pb2 |
| |
| _metadata = metadata_pb2.Metadata() |
| |
| |
| def get_environment_bool(var_name: str) -> bool: |
| val = os.environ.get(var_name) |
| return False if not val or val.lower() == "false" else True |
| |
| |
| _track_obsolete_files = get_environment_bool("SYNTHTOOL_TRACK_OBSOLETE_FILES") |
| |
| # The list of file patterns excluded during a copy() or move() operation. |
| _excluded_patterns: List[str] = [] |
| |
| |
| def reset() -> None: |
| """Clear all metadata so far.""" |
| global _metadata |
| _metadata = metadata_pb2.Metadata() |
| global _excluded_patterns |
| _excluded_patterns = [] |
| |
| |
| def get(): |
| return _metadata |
| |
| |
| def add_git_source(**kwargs) -> None: |
| """Adds a git source to the current metadata.""" |
| _metadata.sources.add(git=metadata_pb2.GitSource(**kwargs)) |
| |
| |
| def add_pattern_excluded_during_copy(glob_pattern: str) -> None: |
| """Adds a file excluded during copy. |
| |
| Used to avoid deleting an obsolete file that is excluded.""" |
| _excluded_patterns.append(glob_pattern) |
| |
| |
| def add_generator_source(**kwargs) -> None: |
| """Adds a generator source to the current metadata.""" |
| _metadata.sources.add(generator=metadata_pb2.GeneratorSource(**kwargs)) |
| |
| |
| def add_template_source(**kwargs) -> None: |
| """Adds a template source to the current metadata.""" |
| _metadata.sources.add(template=metadata_pb2.TemplateSource(**kwargs)) |
| |
| |
| def add_client_destination(**kwargs) -> None: |
| """Adds a client library destination to the current metadata.""" |
| _metadata.destinations.add(client=metadata_pb2.ClientDestination(**kwargs)) |
| |
| |
| def _git_slashes(path: str): |
| # git speaks only forward slashes |
| return path.replace("\\", "/") if sys.platform == "win32" else path |
| |
| |
| def _read_or_empty(path: str = "synth.metadata"): |
| """Reads a metadata json file. Returns empty if that file is not found.""" |
| try: |
| with open(path, "rt") as file: |
| text = file.read() |
| return google.protobuf.json_format.Parse(text, metadata_pb2.Metadata()) |
| except FileNotFoundError: |
| return metadata_pb2.Metadata() |
| |
| |
| def write(outfile: str = "synth.metadata") -> None: |
| """Writes out the metadata to a file.""" |
| jsonified = google.protobuf.json_format.MessageToJson(_metadata) |
| |
| with open(outfile, "w") as fh: |
| fh.write(jsonified) |
| |
| logger.debug(f"Wrote metadata to {outfile}.") |
| |
| |
| def _remove_obsolete_files(old_metadata): |
| """Remove obsolete files from the file system. |
| |
| Call add_new_files() before this function or it will remove all generated |
| files. |
| |
| Parameters: |
| old_metadata: old metadata loaded from a call to read_or_empty(). |
| """ |
| old_files = set(old_metadata.generated_files) |
| new_files = set(_metadata.generated_files) |
| excluded_patterns = set([pattern for pattern in _excluded_patterns]) |
| obsolete_files = old_files - new_files |
| for file_path in git_ignore(obsolete_files): |
| try: |
| matched_pattern = False |
| for pattern in excluded_patterns: |
| if fnmatch.fnmatch(file_path, pattern): |
| matched_pattern = True |
| break |
| if matched_pattern: |
| logger.info( |
| f"Leaving obsolete file {file_path} because it matched excluded pattern {pattern} during copy." |
| ) |
| else: |
| logger.info(f"Removing obsolete file {file_path}...") |
| os.unlink(file_path) |
| except FileNotFoundError: |
| pass # Already deleted. That's OK. |
| |
| |
| def git_ignore(file_paths: Iterable[str]): |
| """Returns a new list of the same files, with ignored files removed.""" |
| # Surprisingly, git check-ignore doesn't ignore .git directories, take those |
| # files out manually. |
| nongit_file_paths = [ |
| file_path |
| for file_path in file_paths |
| if ".git" not in pathlib.Path(file_path).parts |
| ] |
| |
| encoding = locale.getpreferredencoding(False) |
| # Write the files to a temporary text file. |
| with tempfile.TemporaryFile("w+b") as f: |
| for file_path in nongit_file_paths: |
| f.write(_git_slashes(file_path).encode(encoding)) |
| f.write("\n".encode(encoding)) |
| # Invoke git. |
| f.seek(0) |
| git = shutil.which("git") |
| if not git: |
| raise FileNotFoundError("Could not find git in PATH.") |
| completed_process = subprocess.run( |
| [git, "check-ignore", "--stdin"], stdin=f, stdout=subprocess.PIPE |
| ) |
| # Digest git output. |
| output_text = completed_process.stdout.decode(encoding) |
| ignored_file_paths = set( |
| [os.path.normpath(path.strip()) for path in output_text.split("\n")] |
| ) |
| # Filter the ignored paths from the file_paths. |
| return [ |
| path |
| for path in nongit_file_paths |
| if os.path.normpath(path) not in ignored_file_paths |
| ] |
| |
| |
| def set_track_obsolete_files(track_obsolete_files=True): |
| """Instructs synthtool to track and remove obsolete files.""" |
| global _track_obsolete_files |
| _track_obsolete_files = track_obsolete_files |
| |
| |
| def should_track_obsolete_files(): |
| return _track_obsolete_files |
| |
| |
| class FileSystemEventHandler(watchdog.events.FileSystemEventHandler): |
| """Records all the files that were touched.""" |
| |
| def __init__(self, watch_dir: pathlib.Path): |
| super().__init__() |
| self._touched_file_paths: List[str] = list() |
| self._touched_lock = threading.Lock() |
| self._watch_dir = watch_dir |
| |
| def on_any_event(self, event): |
| if event.is_directory: |
| return |
| if event.event_type in ( |
| watchdog.events.EVENT_TYPE_MODIFIED, |
| watchdog.events.EVENT_TYPE_CREATED, |
| ): |
| touched_path = event.src_path |
| elif event.event_type == watchdog.events.EVENT_TYPE_MOVED: |
| touched_path = event.dest_path |
| else: |
| return |
| touched_path = pathlib.Path(touched_path).relative_to(self._watch_dir) |
| with self._touched_lock: |
| self._touched_file_paths.append(str(touched_path)) |
| |
| def get_touched_file_paths(self) -> List[str]: |
| # deduplicate and sort |
| with self._touched_lock: |
| paths = set(self._touched_file_paths) |
| result = list(paths) |
| result.sort() |
| return result |
| |
| |
| class MetadataTrackerAndWriter: |
| """Writes metadata file upon exiting scope.""" |
| |
| def __init__(self, metadata_file_path: str): |
| self.metadata_file_path = metadata_file_path |
| |
| def __enter__(self): |
| self.old_metadata = _read_or_empty(self.metadata_file_path) |
| _add_self_git_source() |
| watch_dir = pathlib.Path(self.metadata_file_path).parent |
| os.makedirs(watch_dir, exist_ok=True) |
| # Create an observer only if obsolete file tracking is enabled. |
| # This prevents inotify errors in synth jobs that may delete the watch |
| # dir. Such synth jobs should leave obsolete file tracking disabled. |
| if should_track_obsolete_files(): |
| self.handler = FileSystemEventHandler(watch_dir) |
| self.observer = watchdog.observers.Observer() |
| self.observer.schedule(self.handler, str(watch_dir), recursive=True) |
| self.observer.start() |
| |
| def __exit__(self, type, value, traceback): |
| if value: |
| pass # An exception was raised. Don't write metadata or clean up. |
| else: |
| if should_track_obsolete_files(): |
| time.sleep(2) # Finish collecting observations about modified files. |
| self.observer.stop() |
| self.observer.join() |
| for path in git_ignore(self.handler.get_touched_file_paths()): |
| _metadata.generated_files.append(path) |
| _remove_obsolete_files(self.old_metadata) |
| _clear_local_paths(get()) |
| _metadata.sources.sort(key=_source_key) |
| if _enable_write_metadata: |
| write(self.metadata_file_path) |
| |
| |
| def _get_git_source_map(metadata) -> Dict[str, object]: |
| """Gets the git sources from the metadata. |
| |
| Parameters: |
| metadata: an instance of metadata_pb2.Metadata. |
| |
| Returns: |
| A dict mapping git source name to metadata_pb2.GitSource instance. |
| """ |
| source_map = {} |
| for source in metadata.sources: |
| if source.HasField("git"): |
| git_source = source.git |
| source_map[git_source.name] = git_source |
| return source_map |
| |
| |
| def _clear_local_paths(metadata): |
| """Clear the local_path from the git sources. |
| |
| There's no reason to preserve it, and it may leak some info we don't |
| want to leak in the path. |
| """ |
| for source in metadata.sources: |
| if source.HasField("git"): |
| git_source = source.git |
| git_source.ClearField("local_path") |
| |
| |
| def _add_self_git_source(): |
| """Adds current working directory as a git source. |
| |
| Returns: |
| The number of git sources added to metadata. |
| """ |
| # Use the repository's root directory name as the name. |
| return _add_git_source_from_directory(".", os.getcwd()) |
| |
| |
| def _add_git_source_from_directory(name: str, dir_path: str) -> int: |
| """Adds the git repo containing the directory as a git source. |
| |
| Returns: |
| The number of git sources added to metadata. |
| """ |
| completed_process = subprocess.run( |
| ["git", "-C", dir_path, "status"], universal_newlines=True |
| ) |
| if completed_process.returncode: |
| logger.warning("%s is not directory in a git repo.", dir_path) |
| return 0 |
| completed_process = subprocess.run( |
| ["git", "-C", dir_path, "remote", "get-url", "origin"], |
| stdout=subprocess.PIPE, |
| universal_newlines=True, |
| ) |
| url = completed_process.stdout.strip() |
| completed_process = subprocess.run( |
| ["git", "-C", dir_path, "log", "--no-decorate", "-1", "--pretty=format:%H"], |
| stdout=subprocess.PIPE, |
| universal_newlines=True, |
| ) |
| latest_sha = completed_process.stdout.strip() |
| add_git_source(name=name, remote=url, sha=latest_sha) |
| return 1 |
| |
| |
| def _source_key(source): |
| """Creates a key to use to sort a list of sources. |
| |
| Arguments: |
| source {metadata_pb2.Source} -- the Source for which to formulate a sort key |
| |
| Returns: |
| tuple -- A key to use to sort a list of sources. |
| """ |
| if source.HasField("git"): |
| return ("git", source.git.name, source.git.remote, source.git.sha) |
| if source.HasField("generator"): |
| return ( |
| "generator", |
| source.generator.name, |
| source.generator.version, |
| source.generator.docker_image, |
| ) |
| if source.HasField("template"): |
| return ( |
| "template", |
| source.template.name, |
| source.template.origin, |
| source.template.version, |
| ) |
| |
| |
| _enable_write_metadata = True |
| |
| |
| def enable_write_metadata(enable: bool = True) -> None: |
| """Control whether synthtool writes synth.metadata file.""" |
| global _enable_write_metadata |
| _enable_write_metadata = enable |