[autotest] Log forwarding for shards

tko/retrieve_logs.cgi reads the value of "[SERVER] shards" in config file
to forward logs relevant to a shard to the shard.
whenever a shard node is added/deleted/updated, we should create a CL that
updates the file
chromeos-admin/puppet/modules/lab/templates/cautotest_shadow_config.ini.erb,
merge it to the tree, and push to prod.
This is not scalable.
This CL makes retrieve_logs.cgi refer to server db to get the list
of shard hostnames.

BUG=chromium:467104
TEST=In local autotest setting, click 'debug log' of a completed job
and checks if shard hostnames are retrieved from server db.

Change-Id: If4dcd0516107577375f2fda9497a1c939ac8775c
Reviewed-on: https://chromium-review.googlesource.com/285315
Reviewed-by: Dan Shi <[email protected]>
Commit-Queue: Mungyung Ryu <[email protected]>
Tested-by: Mungyung Ryu <[email protected]>
diff --git a/site_utils/server_manager_utils.py b/site_utils/server_manager_utils.py
index da93add..9a1d4e3 100644
--- a/site_utils/server_manager_utils.py
+++ b/site_utils/server_manager_utils.py
@@ -283,6 +283,16 @@
     return [s.hostname for s in servers]
 
 
+def get_shards():
+    """Get a list of shards in status primary.
+
+    @return: A list of shards in status primary.
+    """
+    servers = get_servers(role=server_models.ServerRole.ROLE.SHARD,
+                          status=server_models.Server.STATUS.PRIMARY)
+    return [s.hostname for s in servers]
+
+
 def confirm_server_has_role(hostname, role):
     """Confirm a given server has the given role, and its status is primary.
 
diff --git a/tko/retrieve_logs.cgi b/tko/retrieve_logs.cgi
index a4597a8..9ce6143 100755
--- a/tko/retrieve_logs.cgi
+++ b/tko/retrieve_logs.cgi
@@ -2,11 +2,13 @@
 
 import cgi, os, sys, urllib2
 import common
+from multiprocessing import pool
 from autotest_lib.frontend import setup_django_environment
 
 from autotest_lib.client.common_lib import global_config
 from autotest_lib.client.bin import utils
 from autotest_lib.frontend.afe.json_rpc import serviceHandler
+from autotest_lib.server import utils as server_utils
 from autotest_lib.site_utils import server_manager_utils
 
 _PAGE = """\
@@ -29,13 +31,17 @@
     "autotest_lib.tko.site_retrieve_logs", "site_find_repository_host",
     _retrieve_logs_dummy)
 
-
 form = cgi.FieldStorage(keep_blank_values=True)
-# determine if this is a JSON-RPC request.  we support both so that the new TKO
+# determine if this is a JSON-RPC request. we support both so that the new TKO
 # client can use its RPC client code, but the old TKO can still use simple GET
 # params.
 _is_json_request = form.has_key('callback')
 
+# if this key exists, we check if requested log exists in local machine,
+# and do not return Google Storage URL when the log doesn't exist.
+_local_only = form.has_key('localonly')
+
+
 def _get_requested_path():
     if _is_json_request:
         request_data = form['request'].value
@@ -46,62 +52,90 @@
     return form['job'].value
 
 
+def _check_result(args):
+    host = args['host']
+    job_path = args['job_path']
+    shard = args['shard']
+    if shard:
+        http_path = 'http://%s/tko/retrieve_logs.cgi?localonly&job=%s' % (
+                host, job_path)
+    else:
+        http_path = 'http://%s%s' % (host, job_path)
+
+    try:
+        utils.urlopen(http_path)
+
+        # On Vms the shard name is set to the default gateway but the
+        # browser used to navigate frontends (that runs on the host of
+        # the vms) is immune to the same NAT routing the vms have, so we
+        # need to replace the gateway with 'localhost'.
+        if utils.DEFAULT_VM_GATEWAY in host:
+            normalized_host = host.replace(utils.DEFAULT_VM_GATEWAY, 'localhost')
+        else:
+            normalized_host = utils.normalize_hostname(host)
+        return 'http', normalized_host, job_path
+    except urllib2.URLError:
+        return None
+
+
+def _get_tpool_args(hosts, job_path, is_shard, host_set):
+    """Get a list of arguments to be passed to multiprocessing.pool.ThreadPool.
+
+    @param hosts: a list of host names.
+    @param job_path: a requested job path.
+    @param is_shard: True if hosts are shards, False otherwise.
+    @param host_set: a Set to filter out duplicated hosts.
+
+    @return: a list of dictionaries to be used as input of _check_result().
+    """
+    args = []
+    for host in hosts:
+        host = host.strip()
+        if host and host != 'localhost' and host not in host_set:
+            host_set.add(host)
+            arg = {'host': host, 'job_path': job_path, 'shard': is_shard}
+            args.append(arg)
+    return args
+
+
 def find_repository_host(job_path):
     """Find the machine holding the given logs and return a URL to the logs"""
     site_repo_info = site_find_repository_host(job_path)
     if site_repo_info is not None:
         return site_repo_info
 
-    config = global_config.global_config
-    if server_manager_utils.use_server_db():
-        drones = server_manager_utils.get_drones()
-    else:
-        drones = config.get_config_value('SCHEDULER', 'drones').split(',')
+    # This cgi script is run only in master (cautotest) and shards.
+    # Drones do not run this script when receiving '/results/...' request.
+    # Only master should check drones and shards for the requested log.
+    if not server_utils.is_shard():
+        shards = []
+        if server_manager_utils.use_server_db():
+            drones = server_manager_utils.get_drones()
+            shards = server_manager_utils.get_shards()
+        else:
+            config = global_config.global_config
+            drones = config.get_config_value(
+                    'SCHEDULER', 'drones', default='').split(',')
+            shards = config.get_config_value(
+                    'SERVER', 'shards', default='').split(',')
 
-    # TODO: This won't scale as we add more shards. Ideally the frontend would
-    # pipe the shard_hostname with the job_id but there are helper scripts like
-    # dut_history that hit the main cautotest frontend for logs. For these, it
-    # is easier to handle the shard translation internally just like we do with
-    # drones.
-    shards = config.get_config_value('SERVER', 'shards', default='')
-    results_host = config.get_config_value('SCHEDULER', 'results_host')
-    archive_host = config.get_config_value('SCHEDULER', 'archive_host',
-                                            default='')
-    results_repos = [results_host]
-    for host in drones + shards.split(','):
-        host = host.strip()
-        if host and host not in results_repos:
-            results_repos.append(host)
+        host_set = set()
+        tpool_args = _get_tpool_args(drones, job_path, False, host_set)
+        tpool_args += _get_tpool_args(shards, job_path, True, host_set)
 
-    if archive_host and archive_host not in results_repos:
-        results_repos.append(archive_host)
-
-    for drone in results_repos:
-        if drone == 'localhost':
-            continue
-        http_path = 'http://%s%s' % (drone, job_path)
-        try:
-            utils.urlopen(http_path)
-
-            # On Vms the shard name is set to the default gateway but the
-            # browser used to navigate frontends (that runs on the host of
-            # the vms) is immune to the same NAT routing the vms have, so we
-            # need to replace the gateway with 'localhost'.
-            if utils.DEFAULT_VM_GATEWAY in drone:
-                drone = drone.replace(utils.DEFAULT_VM_GATEWAY, 'localhost')
-            else:
-                drone = utils.normalize_hostname(drone)
-            return 'http', drone, job_path
-        except urllib2.URLError:
-            pass
+        tpool = pool.ThreadPool()
+        for result_path in tpool.imap_unordered(_check_result, tpool_args):
+            if result_path:
+                return result_path
 
     # If the URL requested is a test result, it is now either on the local
     # host or in Google Storage.
     if job_path.startswith('/results/'):
         # We only care about the path after '/results/'.
         job_relative_path = job_path[9:]
-        if not os.path.exists(os.path.join('/usr/local/autotest/results',
-                                           job_relative_path)):
+        if not _local_only and not os.path.exists(
+                    os.path.join('/usr/local/autotest/results',
+                                 job_relative_path)):
             gsuri = utils.get_offload_gsuri().split('gs://')[1]
             return ['https', GOOGLE_STORAGE_PATTERN, gsuri + job_relative_path]