| # Lint as: python2, python3 |
| # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import collections |
| import dbus |
| import logging |
| import pipes |
| import re |
| import shlex |
| import six |
| |
| from autotest_lib.client.bin import utils |
| from autotest_lib.client.common_lib import error |
| |
| |
| # Represents the result of a dbus-send call. |sender| refers to the temporary |
| # bus name of dbus-send, |responder| to the remote process, and |response| |
| # contains the parsed response. |
| DBusSendResult = collections.namedtuple('DBusSendResult', ['sender', |
| 'responder', |
| 'response']) |
| # Used internally. |
| DictEntry = collections.namedtuple('DictEntry', ['key', 'value']) |
| |
| |
| def _build_token_stream(headerless_dbus_send_output): |
| """A tokenizer for dbus-send output. |
| |
| The output is basically just like splitting on whitespace, except that |
| strings are kept together by " characters. |
| |
| @param headerless_dbus_send_output: list of lines of dbus-send output |
| without the meta-information prefix. |
| @return list of tokens in dbus-send output. |
| """ |
| return shlex.split(' '.join(headerless_dbus_send_output)) |
| |
| |
| def _parse_value(token_stream): |
| """Turn a stream of tokens from dbus-send output into builtin python types. |
| |
| @param token_stream: output from _build_token_stream() above. |
| |
| """ |
| if len(token_stream) == 0: |
| # Return None for dbus-send output with no return values. |
| return None |
| # Assumes properly tokenized output (strings with spaces handled). |
| # Assumes tokens are pre-stripped |
| token_type = token_stream.pop(0) |
| if token_type == 'variant': |
| token_type = token_stream.pop(0) |
| if token_type == 'object': |
| token_type = token_stream.pop(0) # Should be 'path' |
| token_value = token_stream.pop(0) |
| INT_TYPES = ('int16', 'uint16', 'int32', 'uint32', |
| 'int64', 'uint64', 'byte') |
| if token_type in INT_TYPES: |
| return int(token_value) |
| if token_type == 'string' or token_type == 'path': |
| return token_value # shlex removed surrounding " chars. |
| if token_type == 'boolean': |
| return token_value == 'true' |
| if token_type == 'double': |
| return float(token_value) |
| if token_type == 'array': |
| values = [] |
| while token_stream[0] != ']': |
| values.append(_parse_value(token_stream)) |
| token_stream.pop(0) |
| if values and all([isinstance(x, DictEntry) for x in values]): |
| values = dict(values) |
| return values |
| if token_type == 'dict': |
| assert token_value == 'entry(' |
| key = _parse_value(token_stream) |
| value = _parse_value(token_stream) |
| assert token_stream.pop(0) == ')' |
| return DictEntry(key=key, value=value) |
| raise error.TestError('Unhandled DBus type found: %s' % token_type) |
| |
| |
| def _parse_dbus_send_output(dbus_send_stdout): |
| """Turn dbus-send output into usable Python types. |
| |
| This looks like: |
| |
| localhost ~ # dbus-send --system --dest=org.chromium.flimflam \ |
| --print-reply --reply-timeout=2000 / \ |
| org.chromium.flimflam.Manager.GetProperties |
| method return time=1490931987.170070 sender=org.chromium.flimflam -> \ |
| destination=:1.37 serial=6 reply_serial=2 |
| array [ |
| dict entry( |
| string "ActiveProfile" |
| variant string "/profile/default" |
| ) |
| dict entry( |
| string "ArpGateway" |
| variant boolean true |
| ) |
| ... |
| ] |
| |
| @param dbus_send_output: string stdout from dbus-send |
| @return a DBusSendResult. |
| |
| """ |
| lines = dbus_send_stdout.strip().splitlines() |
| # The first line contains meta-information about the response |
| header = lines[0] |
| lines = lines[1:] |
| dbus_address_pattern = r'[:\d\\.]+|[a-zA-Z.]+' |
| # The header may or may not have a time= field. |
| match = re.match(r'method return (time=[\d\\.]+ )?sender=(%s) -> ' |
| r'destination=(%s) serial=\d+ reply_serial=\d+' % |
| (dbus_address_pattern, dbus_address_pattern), header) |
| |
| if match is None: |
| raise error.TestError('Could not parse dbus-send header: %s' % header) |
| |
| sender = match.group(2) |
| responder = match.group(3) |
| token_stream = _build_token_stream(lines) |
| ret_val = _parse_value(token_stream) |
| # Note that DBus permits multiple response values, and this is not handled. |
| logging.debug('Got DBus response: %r', ret_val) |
| return DBusSendResult(sender=sender, responder=responder, response=ret_val) |
| |
| |
| def _dbus2string(raw_arg): |
| """Turn a dbus.* type object into a string that dbus-send expects. |
| |
| @param raw_dbus dbus.* type object to stringify. |
| @return string suitable for dbus-send. |
| |
| """ |
| int_map = { |
| dbus.Int16: 'int16:', |
| dbus.Int32: 'int32:', |
| dbus.Int64: 'int64:', |
| dbus.UInt16: 'uint16:', |
| dbus.UInt32: 'uint32:', |
| dbus.UInt64: 'uint64:', |
| dbus.Double: 'double:', |
| dbus.Byte: 'byte:', |
| } |
| |
| if isinstance(raw_arg, dbus.String): |
| return pipes.quote('string:%s' % raw_arg.replace('"', r'\"')) |
| |
| if isinstance(raw_arg, dbus.Boolean): |
| if raw_arg: |
| return 'boolean:true' |
| else: |
| return 'boolean:false' |
| |
| for prim_type, prefix in six.iteritems(int_map): |
| if isinstance(raw_arg, prim_type): |
| return prefix + str(raw_arg) |
| |
| raise error.TestError('No support for serializing %r' % raw_arg) |
| |
| |
| def _build_arg_string(raw_args): |
| """Construct a string of arguments to a DBus method as dbus-send expects. |
| |
| @param raw_args list of dbus.* type objects to seriallize. |
| @return string suitable for dbus-send. |
| |
| """ |
| return ' '.join([_dbus2string(arg) for arg in raw_args]) |
| |
| |
| def dbus_send(bus_name, interface, object_path, method_name, args=None, |
| host=None, timeout_seconds=2, tolerate_failures=False, user=None): |
| """Call dbus-send without arguments. |
| |
| @param bus_name: string identifier of DBus connection to send a message to. |
| @param interface: string DBus interface of object to call method on. |
| @param object_path: string DBus path of remote object to call method on. |
| @param method_name: string name of method to call. |
| @param args: optional list of arguments. Arguments must be of types |
| from the python dbus module. |
| @param host: An optional host object if running against a remote host. |
| @param timeout_seconds: number of seconds to wait for a response. |
| @param tolerate_failures: boolean True to ignore problems receiving a |
| response. |
| @param user: An option argument to run dbus-send as a given user. |
| |
| """ |
| run = utils.run if host is None else host.run |
| cmd = ('dbus-send --system --print-reply --reply-timeout=%d --dest=%s ' |
| '%s %s.%s' % (int(timeout_seconds * 1000), bus_name, |
| object_path, interface, method_name)) |
| |
| if user is not None: |
| cmd = ('sudo -u %s %s' % (user, cmd)) |
| if args is not None: |
| cmd = cmd + ' ' + _build_arg_string(args) |
| result = run(cmd, ignore_status=tolerate_failures) |
| if result.exit_status != 0: |
| logging.debug('%r', result.stdout) |
| return None |
| return _parse_dbus_send_output(result.stdout) |
| |
| |
| def get_property(bus_name, interface, object_path, property_name, host=None): |
| """A helpful wrapper that extracts the value of a DBus property. |
| |
| @param bus_name: string identifier of DBus connection to send a message to. |
| @param interface: string DBus interface exposing the property. |
| @param object_path: string DBus path of remote object to call method on. |
| @param property_name: string name of property to get. |
| @param host: An optional host object if running against a remote host. |
| |
| """ |
| return dbus_send(bus_name, dbus.PROPERTIES_IFACE, object_path, 'Get', |
| args=[dbus.String(interface), dbus.String(property_name)], |
| host=host) |