| #!/usr/bin/env python3 |
| |
| # Copyright (c) Facebook, Inc. and its affiliates. |
| # All rights reserved. |
| # |
| # This source code is licensed under the BSD-style license found in the |
| # LICENSE file in the root directory of this source tree. |
| |
| import json |
| from dataclasses import asdict, dataclass, field |
| from enum import Enum |
| from typing import Dict, Union, Optional |
| |
| __all__ = ['EventSource', 'Event', 'NodeState', 'RdzvEvent'] |
| |
| EventMetadataValue = Union[str, int, float, bool, None] |
| |
| |
| class EventSource(str, Enum): |
| """ |
| Known identifiers of the event producers. |
| """ |
| |
| AGENT = "AGENT" |
| WORKER = "WORKER" |
| |
| |
| @dataclass |
| class Event: |
| """ |
| The class represents the generic event that occurs during the torchelastic |
| job execution. The event can be any kind of meaningful action. |
| |
| Args: |
| name: event name. |
| source: the event producer, e.g. agent or worker |
| timestamp: timestamp in milliseconds when event occured. |
| metadata: additional data that is associated with the event. |
| """ |
| |
| name: str |
| source: EventSource |
| timestamp: int = 0 |
| metadata: Dict[str, EventMetadataValue] = field(default_factory=dict) |
| |
| def __str__(self): |
| return self.serialize() |
| |
| @staticmethod |
| def deserialize(data: Union[str, "Event"]) -> "Event": |
| if isinstance(data, Event): |
| return data |
| if isinstance(data, str): |
| data_dict = json.loads(data) |
| data_dict["source"] = EventSource[data_dict["source"]] |
| return Event(**data_dict) |
| |
| def serialize(self) -> str: |
| return json.dumps(asdict(self)) |
| |
| |
| class NodeState(str, Enum): |
| """ |
| The states that a node can be in rendezvous. |
| """ |
| |
| INIT = "INIT" |
| RUNNING = "RUNNING" |
| SUCCEEDED = "SUCCEEDED" |
| FAILED = "FAILED" |
| |
| |
| @dataclass |
| class RdzvEvent: |
| """ |
| Dataclass to represent any rendezvous event. |
| |
| Args: |
| name: Event name. (E.g. Current action being performed) |
| run_id: The run id of the rendezvous |
| message: The message describing the event |
| hostname: Hostname of the node |
| pid: The process id of the node |
| node_state: The state of the node (INIT, RUNNING, SUCCEEDED, FAILED) |
| master_endpoint: The master endpoint for the rendezvous store, if known |
| rank: The rank of the node, if known |
| local_id: The local_id of the node, if defined in dynamic_rendezvous.py |
| error_trace: Error stack trace, if this is an error event. |
| """ |
| |
| name: str |
| run_id: str |
| message: str |
| hostname: str |
| pid: int |
| node_state: NodeState |
| master_endpoint: str = "" |
| rank: Optional[int] = None |
| local_id: Optional[int] = None |
| error_trace: str = "" |
| |
| def __str__(self): |
| return self.serialize() |
| |
| @staticmethod |
| def deserialize(data: Union[str, "RdzvEvent"]) -> "RdzvEvent": |
| if isinstance(data, RdzvEvent): |
| return data |
| if isinstance(data, str): |
| data_dict = json.loads(data) |
| data_dict["node_state"] = NodeState[data_dict["node_state"]] |
| return RdzvEvent(**data_dict) |
| |
| def serialize(self) -> str: |
| return json.dumps(asdict(self)) |