| |
| # Lint as: python2, python3 |
| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import cmd |
| import dbus |
| import dbus.types |
| import dbus.exceptions |
| |
| import six |
| |
| from six.moves import input |
| |
| import common |
| |
| from autotest_lib.client.cros.cellular.pseudomodem import pm_constants |
| |
| from autotest_lib.client.cros.cellular import mm1_constants |
| |
| class PseudoModemClient(cmd.Cmd): |
| """ |
| Interactive client for PseudoModemManager. |
| |
| """ |
| def __init__(self): |
| cmd.Cmd.__init__(self) |
| self.prompt = '> ' |
| self._bus = dbus.SystemBus() |
| |
| |
| def _get_proxy(self, path=pm_constants.TESTING_PATH): |
| return self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path) |
| |
| |
| def _get_ism_proxy(self, state_machine): |
| return self._get_proxy('/'.join([pm_constants.TESTING_PATH, |
| state_machine])) |
| |
| |
| def Begin(self): |
| """ |
| Starts the interactive shell. |
| |
| """ |
| print('\nWelcome to the PseudoModemManager shell!\n') |
| self.cmdloop() |
| |
| |
| def can_exit(self): |
| """Override""" |
| return True |
| |
| |
| def do_is_alive(self, args): |
| """ |
| Handles the 'is_alive' command. |
| |
| @params args: ignored. |
| |
| """ |
| if args: |
| print('\nCommand "is_alive" expects no arguments.\n') |
| return |
| print(self._get_proxy().IsAlive(dbus_interface=pm_constants.I_TESTING)) |
| |
| |
| def help_is_alive(self): |
| """ Handles the 'help is_alive' command. """ |
| print('\nChecks that pseudomodem child process is alive.\n') |
| |
| |
| def do_properties(self, args): |
| """ |
| Handles the 'properties' command. |
| |
| @param args: Arguments to the command. Unused. |
| |
| """ |
| if args: |
| print('\nCommand "properties" expects no arguments.\n') |
| return |
| try: |
| props = self._get_proxy().GetAll( |
| pm_constants.I_TESTING, |
| dbus_interface=mm1_constants.I_PROPERTIES) |
| print('\nProperties: ') |
| for k, v in six.iteritems(props): |
| print(' ' + k + ': ' + str(v)) |
| print() |
| except dbus.exceptions.DBusException as e: |
| print(('\nAn error occurred while communicating with ' |
| 'PseudoModemManager: ' + e.get_dbus_name() + ' - ' + |
| e.message + '\n')) |
| return False |
| |
| |
| def help_properties(self): |
| """Handles the 'help properties' command.""" |
| print('\nReturns the properties under the testing interface.\n') |
| |
| |
| def do_sms(self, args): |
| """ |
| Simulates a received SMS. |
| |
| @param args: A string containing the sender and the text message |
| content, in which everything before the first ' ' character |
| belongs to the sender and everything else belongs to the |
| message content. For example "Gandalf You shall not pass!" |
| will be parsed into: |
| |
| sender="Gandalf" |
| content="You shall not pass!" |
| |
| Pseudomodem doesn't distinguish between phone numbers and |
| strings containing non-numeric characters for the sender field |
| so args can contain pretty much anything. |
| |
| """ |
| arglist = args.split(' ', 1) |
| if len(arglist) != 2: |
| print('\nMalformed SMS args: ' + args + '\n') |
| return |
| try: |
| self._get_proxy().ReceiveSms( |
| arglist[0], arglist[1], |
| dbus_interface=pm_constants.I_TESTING) |
| print('\nSMS sent!\n') |
| except dbus.exceptions.DBusException as e: |
| print(('\nAn error occurred while communicating with ' |
| 'PseudoModemManager: ' + e.get_dbus_name() + ' - ' + |
| e.message + '\n')) |
| return False |
| |
| |
| def help_sms(self): |
| """Handles the 'help sms' command.""" |
| print('\nUsage: sms <sender phone #> <message text>\n') |
| |
| |
| def do_set(self, args): |
| """ |
| Handles various commands that start with 'set'. |
| |
| @param args: Defines the set command to be issued and its |
| arguments. Currently supported commands are: |
| |
| set pco <pco-value> |
| |
| """ |
| arglist = args.split(' ') |
| if len(arglist) < 1: |
| print('\nInvalid command: set ' + args + '\n') |
| return |
| if arglist[0] == 'pco': |
| if len(arglist) == 1: |
| arglist.append('') |
| elif len(arglist) != 2: |
| print('\nExpected: pco <pco-value>. Found: ' + args + '\n') |
| return |
| pco_value = arglist[1] |
| try: |
| pco_list = [dbus.types.Struct( |
| [dbus.types.UInt32(1), |
| dbus.types.Boolean(True), |
| dbus.types.ByteArray(pco_value)], |
| signature='ubay')] |
| self._get_proxy().UpdatePco( |
| pco_list, dbus_interface=pm_constants.I_TESTING) |
| print('\nPCO value updated!\n') |
| except dbus.exceptions.DBusException as e: |
| print(('\nAn error occurred while communicating with ' |
| 'PseudoModemManager: ' + e.get_dbus_name() + ' - ' + |
| e.message + '\n')) |
| else: |
| print('\nUnknown command: set ' + args + '\n') |
| return False |
| |
| |
| def help_set(self): |
| """Handles the 'help set' command.""" |
| print ('\nUsage: set pco <pco-value>\n<pco-value> can be empty to set' |
| ' the PCO value to an empty list.\n') |
| |
| |
| def _get_state_machine(self, args): |
| arglist = args.split() |
| if len(arglist) != 1: |
| print('\nExpected one argument: Name of state machine\n') |
| return None |
| try: |
| return self._get_ism_proxy(arglist[0]) |
| except dbus.exceptions.DBusException as e: |
| print('\nNo such interactive state machine.\n') |
| print('Error obtained: |%s|\n' % repr(e)) |
| return None |
| |
| |
| def do_is_waiting(self, machine): |
| """ |
| Determine if a machine is waiting for an advance call. |
| |
| @param machine: Case sensitive name of the machine. |
| @return: True if |machine| is waiting to be advanced by the user. |
| |
| """ |
| ism = self._get_state_machine(machine) |
| if not ism: |
| return False |
| |
| try: |
| is_waiting = ism.IsWaiting( |
| dbus_interface=pm_constants.I_TESTING_ISM) |
| print(('\nState machine is %swaiting.\n' % |
| ('' if is_waiting else 'not '))) |
| except dbus.exceptions.DBusException as e: |
| print(('\nCould not determine if |%s| is waiting: |%s|\n' % |
| (machine, repr(e)))) |
| return False |
| |
| |
| def help_is_waiting(self): |
| """Handles the 'help is_waiting' command""" |
| print ('\nUsage: is_waiting <state-machine-name>\n' |
| 'Check whether a state machine is waiting for user action. The ' |
| 'waiting machine can be advanced using the |advance| command.\n' |
| 'state-machine-name is the case sensitive name of the machine' |
| 'whose status is to be queried.\n') |
| |
| |
| def do_advance(self, machine): |
| """ |
| Advance the given state machine. |
| |
| @param machine: Case sensitive name of the state machine to advance. |
| @returns: True if |machine| was successfully advanced, False otherwise. |
| |
| """ |
| ism = self._get_state_machine(machine) |
| if not ism: |
| return False |
| |
| try: |
| success = ism.Advance(dbus_interface=pm_constants.I_TESTING_ISM) |
| print(('\nAdvanced!\n' if success else '\nCould not advance.\n')) |
| except dbus.exceptions.DBusException as e: |
| print('\nError while advancing state machine: |%s|\n' % repr(e)) |
| return False |
| |
| |
| def help_advance(self): |
| """Handles the 'help advance' command""" |
| print ('\nUsage: advance <state-machine-name>\n' |
| 'Advance a waiting state machine to the next step.\n' |
| 'state-machine-name is the case sensitive name of the machine' |
| 'to advance.\n') |
| |
| |
| def do_exit(self, args): |
| """ |
| Handles the 'exit' command. |
| |
| @param args: Arguments to the command. Unused. |
| |
| """ |
| if args: |
| print('\nCommand "exit" expects no arguments.\n') |
| return |
| resp = input('Are you sure? (yes/no): ') |
| if resp == 'yes': |
| print('\nGoodbye!\n') |
| return True |
| if resp != 'no': |
| print('\nDid not understand: ' + resp + '\n') |
| return False |
| |
| |
| def help_exit(self): |
| """Handles the 'help exit' command.""" |
| print ('\nExits the interpreter. Shuts down the pseudo modem manager ' |
| 'if the interpreter was launched by running pseudomodem.py') |
| |
| |
| do_EOF = do_exit |
| help_EOF = help_exit |
| |
| |
| def main(): |
| """ main method, run when this module is executed as stand-alone. """ |
| client = PseudoModemClient() |
| client.Begin() |
| |
| |
| if __name__ == '__main__': |
| main() |