Wrap pubsub code into a reusable class.
BUG=chromium:724523
TEST=Unittest and manully.
Change-Id: Icfa4fa9c1c515945f18a309ef0cc43ed623775be
Reviewed-on: https://chromium-review.googlesource.com/510348
Commit-Ready: Michael Tang <[email protected]>
Tested-by: Michael Tang <[email protected]>
Reviewed-by: Keith Haddow <[email protected]>
Reviewed-by: Michael Tang <[email protected]>
diff --git a/site_utils/pubsub_utils.py b/site_utils/pubsub_utils.py
index f62a31d..2f6c71d 100755
--- a/site_utils/pubsub_utils.py
+++ b/site_utils/pubsub_utils.py
@@ -27,37 +27,61 @@
# number of retry to publish an event.
_PUBSUB_NUM_RETRIES = 3
-
-def _get_pubsub_service():
- """Gets the pubsub service api handle."""
- if not os.path.isfile(CLOUD_SERVICE_ACCOUNT_FILE):
- logging.error('No credential file found')
- return None
-
- try:
- credentials = GoogleCredentials.from_stream(CLOUD_SERVICE_ACCOUNT_FILE)
- if credentials.create_scoped_required():
- credentials = credentials.create_scoped(PUBSUB_SCOPES)
- return discovery.build(PUBSUB_SERVICE_NAME, PUBSUB_VERSION,
- credentials=credentials)
- except ApplicationDefaultCredentialsError as ex:
- logging.error('Failed to get credential.')
- except:
- logging.error('Failed to get the pubsub service handle.')
-
- return None
+class PubSubException(Exception):
+ """Exception to be raised when the test to push to prod failed."""
+ pass
-def publish_notifications(topic, messages=[]):
- """Publishes a test result notification to a given pubsub topic.
+class PubSubClient(object):
+ """A generic pubsub client. """
+ def __init__(self, credential_file=CLOUD_SERVICE_ACCOUNT_FILE):
+ """Constructor for PubSubClient.
- @param topic: The Cloud pubsub topic.
- @param messages: A list of notification messages.
+ @param credential_file: The credential filename.
+ @raises PubSubException if the credential file does not exist or
+ corrupted.
+ """
+ self.credential_file = credential_file
+ self.credential = self._get_credential()
- @returns A list of pubsub message ids, and empty if fails.
- """
- pubsub = _get_pubsub_service()
- if pubsub:
+ def _get_credential(self):
+ """Gets the pubsub service api handle."""
+ if not os.path.isfile(self.credential_file):
+ logging.error('No credential file found')
+ raise PubSubException("Credential file does not exists:"
+ + self.credential_file)
+ try:
+ credential = GoogleCredentials.from_stream(self.credential_file)
+ if credential.create_scoped_required():
+ credential = credential.create_scoped(PUBSUB_SCOPES)
+ return credential
+ except ApplicationDefaultCredentialsError as ex:
+ logging.error('Failed to get credential.')
+ except:
+ logging.error('Failed to get the pubsub service handle.')
+
+ raise PubSubException("Credential file does not exists:"
+ + self.credential_file)
+
+ def _get_pubsub_service(self):
+ try:
+ return discovery.build(PUBSUB_SERVICE_NAME, PUBSUB_VERSION,
+ credentials=self.credential)
+ except:
+ logging.error('Failed to get pubsub resource object.')
+ raise PubSubException("Failed to get pubsub resource object")
+
+ def publish_notifications(self, topic, messages=[]):
+ """Publishes a test result notification to a given pubsub topic.
+
+ @param topic: The Cloud pubsub topic.
+ @param messages: A list of notification messages.
+
+ @returns A list of pubsub message ids, and empty if fails.
+
+ @raises PubSubException if failed to publish the notification.
+ """
+ pubsub = self._get_pubsub_service()
try:
body = {'messages': messages}
resp = pubsub.projects().topics().publish(topic=topic,
@@ -68,6 +92,5 @@
logging.debug('Published notification message')
return msgIds
except:
- pass
- logging.error('Failed to publish test result notifiation.')
- return []
+ logging.error('Failed to publish test result notifiation.')
+ raise PubSubException("Failed to publish the notifiation.")