blob: 3383ad4b16bfb072771dafe9e73cef2984ca23e4 [file] [log] [blame]
# Lint as: python2, python3
# Copyright (c) 2022 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
#
# Expects to be run in an environment with sudo and no interactive password
# prompt, such as within the Chromium OS development chroot.
"""Host class for GSC devboard connected host."""
import contextlib
import logging
try:
import docker
except ImportError:
logging.info("Docker API is not installed in this environment")
DOCKER_IMAGE = "gcr.io/satlab-images/gsc_dev_board:release"
SATLAB_DOCKER_HOST = 'tcp://192.168.231.1:2375'
LOCAL_DOCKER_HOST = 'tcp://127.0.0.1:2375'
DEFAULT_DOCKER_HOST = 'unix:///var/run/docker.sock'
DEFAULT_SERVICE_PORT = 39999
ULTRADEBUG = '18d1:0304'
class GSCDevboardHost(object):
"""
A host that is physically connected to a GSC devboard.
It could either be a SDK workstation (chroot) or a SatLab box.
"""
def _initialize(self,
hostname,
service_debugger_serial=None,
service_ip=None,
service_port=DEFAULT_SERVICE_PORT,
*args,
**dargs):
"""Construct a GSCDevboardHost object.
@hostname: Name of the devboard host, will be used in future to look up
the debugger serial, not currently used.
@service_debugger_serial: debugger connected to devboard, defaults to
the first one found on the container.
@service_ip: devboard service ip, default is to start a new container.
@service_port: devboard service port, defaults to 39999.
"""
# Use docker host from environment or by probing a list of candidates.
self._client = None
try:
self._client = docker.from_env()
logging.info("Created docker host from env")
except NameError:
raise NameError('Please install docker using '
'"autotest/files/utils/install_docker_chroot.sh"')
except docker.errors.DockerException:
docker_host = None
candidate_hosts = [
SATLAB_DOCKER_HOST, DEFAULT_DOCKER_HOST, LOCAL_DOCKER_HOST
]
for h in candidate_hosts:
try:
c = docker.DockerClient(base_url=h, timeout=2)
c.close()
docker_host = h
break
except docker.errors.DockerException:
pass
if docker_host is not None:
self._client = docker.DockerClient(base_url=docker_host,
timeout=300)
else:
raise ValueError('Invalid DOCKER_HOST, ensure dockerd is'
' running.')
logging.info("Using docker host at %s", docker_host)
self._satlab = False
# GSCDevboardHost should only be created on Satlab or localhost, so
# assume Satlab if a drone container is running.
if len(self._client.containers.list(filters={'name': 'drone'})) > 0:
logging.info("In Satlab")
self._satlab = True
self._service_debugger_serial = service_debugger_serial
self._service_ip = service_ip
self._service_port = service_port
logging.info("Using service port %s", self._service_port)
self._docker_network = 'default_satlab' if self._satlab else 'host'
self._docker_container = None
serials = self._list_debugger_serials()
if len(serials) == 0:
raise ValueError('No debuggers found')
logging.info("Available debuggers: [%s]", ', '.join(serials))
if self._service_debugger_serial is None:
self._service_debugger_serial = serials[0]
else:
if self._service_debugger_serial not in serials:
raise ValueError(
'%s debugger not found in [%s]' %
(self._service_debugger_serial, ', '.join(serials)))
logging.info("Using debugger %s", self._service_debugger_serial)
self._docker_container_name = "gsc_dev_board_{}".format(
self._service_debugger_serial)
def _list_debugger_serials(self):
"""List all attached debuggers."""
c = self._client.containers.run(DOCKER_IMAGE,
remove=True,
privileged=True,
name='list_debugger_serial',
hostname='list_debugger_serial',
detach=True,
volumes=["/dev:/hostdev"],
command=['sleep', '5'])
res, output = c.exec_run(['lsusb', '-v', '-d', ULTRADEBUG],
stderr=False,
privileged=True)
c.kill()
if res != 0:
return []
output = output.decode("utf-8").split('\n')
serials = [
l.strip().split(' ')[-1] for l in output
if l.strip()[:7] == 'iSerial'
]
return serials
@contextlib.contextmanager
def service_context(self):
"""Service context manager that provides the service endpoint."""
self.start_service()
try:
yield "{}:{}".format(self.service_ip, self.service_port)
finally:
self.stop_service()
def start_service(self):
"""Starts service if needed."""
if self._docker_container is not None:
return
if self._service_ip:
# Assume container was manually started if service_ip was set
logging.info("Skip start_service due to set service_ip")
return
#TODO(b/215767105): Pull image onto Satlab box if not present.
environment = {
'DEVBOARDSVC_PORT': self._service_port,
'DEBUGGER_SERIAL': self._service_debugger_serial
}
start_cmd = ['/opt/gscdevboard/start_devboardsvc.sh']
# Stop any leftover containers
try:
c = self._client.containers.get(self._docker_container_name)
c.kill()
except docker.errors.NotFound:
pass
self._client.containers.run(DOCKER_IMAGE,
remove=True,
privileged=True,
name=self._docker_container_name,
hostname=self._docker_container_name,
network=self._docker_network,
cap_add=["NET_ADMIN"],
detach=True,
volumes=["/dev:/hostdev"],
environment=environment,
command=start_cmd)
# A separate containers.get call is needed to capture network attributes
self._docker_container = self._client.containers.get(
self._docker_container_name)
def stop_service(self):
"""Stops service by killing the container."""
if self._docker_container is None:
return
self._docker_container.kill()
self._docker_container = None
@property
def service_port(self):
"""Return service port (local to the container host)."""
return self._service_port
@property
def service_ip(self):
"""Return service ip (local to the container host)."""
if self._service_ip is not None:
return self._service_ip
if self._docker_network == 'host':
return '127.0.0.1'
else:
if self._docker_container is None:
return ''
else:
settings = self._docker_container.attrs['NetworkSettings']
return settings['Networks'][self._docker_network]['IPAddress']