[autotest] Log forwarding for shards
tko/retrieve_logs.cgi reads the value of "[SERVER] shards" in config file
to forward logs relevant to a shard to the shard.
whenever a shard node is added/deleted/updated, we should create a CL that
updates the file
chromeos-admin/puppet/modules/lab/templates/cautotest_shadow_config.ini.erb,
merge it to the tree, and push to prod.
This is not scalable.
This CL makes retrieve_logs.cgi refer to server db to get the list
of shard hostnames.
BUG=chromium:467104
TEST=In local autotest setting, click 'debug log' of a completed job
and checks if shard hostnames are retrieved from server db.
Change-Id: If4dcd0516107577375f2fda9497a1c939ac8775c
Reviewed-on: https://chromium-review.googlesource.com/285315
Reviewed-by: Dan Shi <[email protected]>
Commit-Queue: Mungyung Ryu <[email protected]>
Tested-by: Mungyung Ryu <[email protected]>
diff --git a/site_utils/server_manager_utils.py b/site_utils/server_manager_utils.py
index da93add..9a1d4e3 100644
--- a/site_utils/server_manager_utils.py
+++ b/site_utils/server_manager_utils.py
@@ -283,6 +283,16 @@
return [s.hostname for s in servers]
+def get_shards():
+ """Get a list of shards in status primary.
+
+ @return: A list of shards in status primary.
+ """
+ servers = get_servers(role=server_models.ServerRole.ROLE.SHARD,
+ status=server_models.Server.STATUS.PRIMARY)
+ return [s.hostname for s in servers]
+
+
def confirm_server_has_role(hostname, role):
"""Confirm a given server has the given role, and its status is primary.
diff --git a/tko/retrieve_logs.cgi b/tko/retrieve_logs.cgi
index a4597a8..9ce6143 100755
--- a/tko/retrieve_logs.cgi
+++ b/tko/retrieve_logs.cgi
@@ -2,11 +2,13 @@
import cgi, os, sys, urllib2
import common
+from multiprocessing import pool
from autotest_lib.frontend import setup_django_environment
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.bin import utils
from autotest_lib.frontend.afe.json_rpc import serviceHandler
+from autotest_lib.server import utils as server_utils
from autotest_lib.site_utils import server_manager_utils
_PAGE = """\
@@ -29,13 +31,17 @@
"autotest_lib.tko.site_retrieve_logs", "site_find_repository_host",
_retrieve_logs_dummy)
-
form = cgi.FieldStorage(keep_blank_values=True)
-# determine if this is a JSON-RPC request. we support both so that the new TKO
+# determine if this is a JSON-RPC request. we support both so that the new TKO
# client can use its RPC client code, but the old TKO can still use simple GET
# params.
_is_json_request = form.has_key('callback')
+# if this key exists, we check if requested log exists in local machine,
+# and do not return Google Storage URL when the log doesn't exist.
+_local_only = form.has_key('localonly')
+
+
def _get_requested_path():
if _is_json_request:
request_data = form['request'].value
@@ -46,62 +52,90 @@
return form['job'].value
+def _check_result(args):
+ host = args['host']
+ job_path = args['job_path']
+ shard = args['shard']
+ if shard:
+ http_path = 'http://%s/tko/retrieve_logs.cgi?localonly&job=%s' % (
+ host, job_path)
+ else:
+ http_path = 'http://%s%s' % (host, job_path)
+
+ try:
+ utils.urlopen(http_path)
+
+ # On Vms the shard name is set to the default gateway but the
+ # browser used to navigate frontends (that runs on the host of
+ # the vms) is immune to the same NAT routing the vms have, so we
+ # need to replace the gateway with 'localhost'.
+ if utils.DEFAULT_VM_GATEWAY in host:
+ normalized_host = host.replace(utils.DEFAULT_VM_GATEWAY, 'localhost')
+ else:
+ normalized_host = utils.normalize_hostname(host)
+ return 'http', normalized_host, job_path
+ except urllib2.URLError:
+ return None
+
+
+def _get_tpool_args(hosts, job_path, is_shard, host_set):
+ """Get a list of arguments to be passed to multiprocessing.pool.ThreadPool.
+
+ @param hosts: a list of host names.
+ @param job_path: a requested job path.
+ @param is_shard: True if hosts are shards, False otherwise.
+ @param host_set: a Set to filter out duplicated hosts.
+
+ @return: a list of dictionaries to be used as input of _check_result().
+ """
+ args = []
+ for host in hosts:
+ host = host.strip()
+ if host and host != 'localhost' and host not in host_set:
+ host_set.add(host)
+ arg = {'host': host, 'job_path': job_path, 'shard': is_shard}
+ args.append(arg)
+ return args
+
+
def find_repository_host(job_path):
"""Find the machine holding the given logs and return a URL to the logs"""
site_repo_info = site_find_repository_host(job_path)
if site_repo_info is not None:
return site_repo_info
- config = global_config.global_config
- if server_manager_utils.use_server_db():
- drones = server_manager_utils.get_drones()
- else:
- drones = config.get_config_value('SCHEDULER', 'drones').split(',')
+ # This cgi script is run only in master (cautotest) and shards.
+ # Drones do not run this script when receiving '/results/...' request.
+ # Only master should check drones and shards for the requested log.
+ if not server_utils.is_shard():
+ shards = []
+ if server_manager_utils.use_server_db():
+ drones = server_manager_utils.get_drones()
+ shards = server_manager_utils.get_shards()
+ else:
+ config = global_config.global_config
+ drones = config.get_config_value(
+ 'SCHEDULER', 'drones', default='').split(',')
+ shards = config.get_config_value(
+ 'SERVER', 'shards', default='').split(',')
- # TODO: This won't scale as we add more shards. Ideally the frontend would
- # pipe the shard_hostname with the job_id but there are helper scripts like
- # dut_history that hit the main cautotest frontend for logs. For these, it
- # is easier to handle the shard translation internally just like we do with
- # drones.
- shards = config.get_config_value('SERVER', 'shards', default='')
- results_host = config.get_config_value('SCHEDULER', 'results_host')
- archive_host = config.get_config_value('SCHEDULER', 'archive_host',
- default='')
- results_repos = [results_host]
- for host in drones + shards.split(','):
- host = host.strip()
- if host and host not in results_repos:
- results_repos.append(host)
+ host_set = set()
+ tpool_args = _get_tpool_args(drones, job_path, False, host_set)
+ tpool_args += _get_tpool_args(shards, job_path, True, host_set)
- if archive_host and archive_host not in results_repos:
- results_repos.append(archive_host)
-
- for drone in results_repos:
- if drone == 'localhost':
- continue
- http_path = 'http://%s%s' % (drone, job_path)
- try:
- utils.urlopen(http_path)
-
- # On Vms the shard name is set to the default gateway but the
- # browser used to navigate frontends (that runs on the host of
- # the vms) is immune to the same NAT routing the vms have, so we
- # need to replace the gateway with 'localhost'.
- if utils.DEFAULT_VM_GATEWAY in drone:
- drone = drone.replace(utils.DEFAULT_VM_GATEWAY, 'localhost')
- else:
- drone = utils.normalize_hostname(drone)
- return 'http', drone, job_path
- except urllib2.URLError:
- pass
+ tpool = pool.ThreadPool()
+ for result_path in tpool.imap_unordered(_check_result, tpool_args):
+ if result_path:
+ return result_path
# If the URL requested is a test result, it is now either on the local
# host or in Google Storage.
if job_path.startswith('/results/'):
# We only care about the path after '/results/'.
job_relative_path = job_path[9:]
- if not os.path.exists(os.path.join('/usr/local/autotest/results',
- job_relative_path)):
+ if not _local_only and not os.path.exists(
+ os.path.join('/usr/local/autotest/results',
+ job_relative_path)):
gsuri = utils.get_offload_gsuri().split('gs://')[1]
return ['https', GOOGLE_STORAGE_PATTERN, gsuri + job_relative_path]