# Copyright (C) 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

r"""Read a MultiCarrierSettings file and update CarrierSettings data.

For APNs in the input file, they are simply appended to the apn list of the
corresponding carrier in CarrierSettings data. If a new carrier (identified by
canonical_name) appears in input, the other_carriers.textpb will be updated.

How to run:

update_carrier_data.par \
--in_file=/tmp/tmpapns.textpb \
--data_dir=/tmp/carrier/data
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import copy
import os
import compare
from google.protobuf import text_format
import carrier_list_pb2
import carrier_settings_pb2

parser = argparse.ArgumentParser()
parser.add_argument(
    '--data_dir', default='./data', help='Folder path for CarrierSettings data')
parser.add_argument(
    '--in_file', default='./tmpapns.textpb', help='Temp APN file')
FLAGS = parser.parse_args()

TIER1_CARRIERS_TEXTPB = os.path.join(FLAGS.data_dir, 'tier1_carriers.textpb')
OTHER_CARRIERS_TEXTPB = os.path.join(FLAGS.data_dir, 'other_carriers.textpb')


def equals_apn(a, b):
  """Tell if two ApnItem proto are the same."""
  a = compare.NormalizeRepeatedFields(copy.deepcopy(a))
  b = compare.NormalizeRepeatedFields(copy.deepcopy(b))
  # ignore 'name' field
  a.ClearField('name')
  b.ClearField('name')
  return compare.Proto2Equals(a, b)


def find_apn(apn, apns):
  """Tell if apn is in apns."""
  for a in apns:
    if equals_apn(apn, a):
      return True
  return False


def merge_mms_apn(a, b):
  """Try to merge mmsc fields of b into a, if that's the only diff."""
  aa = compare.NormalizeRepeatedFields(copy.deepcopy(a))
  bb = compare.NormalizeRepeatedFields(copy.deepcopy(b))
  # check if any fields other than mms are different
  for field in ['name', 'mmsc_proxy', 'mmsc_proxy_port']:
    aa.ClearField(field)
    bb.ClearField(field)
  if compare.Proto2Equals(aa, bb):
    for field in ['mmsc_proxy', 'mmsc_proxy_port']:
      if b.HasField(field):
        setattr(a, field, getattr(b, field))


def clean_apn(setting):
  """Remove duplicated ApnItems from a CarrierSettings proto.

  Args:
    setting: a CarrierSettings proto

  Returns:
    None
  """
  if not setting.HasField('apns') or len(setting.apns.apn) <= 1:
    return
  apns = setting.apns.apn[:]
  cleaned_apns = [a for n, a in enumerate(apns) if not find_apn(a, apns[:n])]
  del setting.apns.apn[:]
  setting.apns.apn.extend(cleaned_apns)


def merge_apns(dest_apns, source_apns):
  """Merge source_apns into dest_apns."""
  for apn in dest_apns:
    for source in source_apns:
      merge_mms_apn(apn, source)
  ret = list(dest_apns)
  for source in source_apns:
    if not find_apn(source, ret):
      ret.append(source)
  return ret


def to_string(cid):
  """Return the string representation of a CarrierId."""
  ret = cid.mcc_mnc
  if cid.HasField('spn'):
    ret += 'SPN=' + cid.spn.upper()
  if cid.HasField('imsi'):
    ret += 'IMSI=' + cid.imsi.upper()
  if cid.HasField('gid1'):
    ret += 'GID1=' + cid.gid1.upper()
  return ret


def to_carrier_id(cid_string):
  """Return the CarrierId from its string representation."""
  cid = carrier_list_pb2.CarrierId()
  if 'SPN=' in cid_string:
    ind = cid_string.find('SPN=')
    cid.mcc_mnc = cid_string[:ind]
    cid.spn = cid_string[ind + len('SPN='):]
  elif 'IMSI=' in cid_string:
    ind = cid_string.find('IMSI=')
    cid.mcc_mnc = cid_string[:ind]
    cid.imsi = cid_string[ind + len('IMSI='):]
  elif 'GID1=' in cid_string:
    ind = cid_string.find('GID1=')
    cid.mcc_mnc = cid_string[:ind]
    cid.gid1 = cid_string[ind + len('GID1='):]
  else:
    cid.mcc_mnc = cid_string
  return cid


def get_input(path):
  """Read input MultiCarrierSettings textpb file.

  Args:
    path: the path to input MultiCarrierSettings textpb file

  Returns:
    A MultiCarrierSettings. None when failed.
  """
  mcs = None
  with open(path, 'r', encoding='utf-8') as f:
    mcs = carrier_settings_pb2.MultiCarrierSettings()
    text_format.Merge(f.read(), mcs)

  return mcs


def get_knowncarriers(files):
  """Create a mapping from mccmnc and possible mvno data to canonical name.

  Args:
    files: list of paths to carrier list textpb files

  Returns:
    A dict, key is to_string(carrier_id), value is cname.
  """
  ret = dict()
  for path in files:
    with open(path, 'r', encoding='utf-8') as f:
      carriers = carrier_list_pb2.CarrierList()
      text_format.Merge(f.read(), carriers)
      for carriermap in carriers.entry:
        for cid in carriermap.carrier_id:
          ret[to_string(cid)] = carriermap.canonical_name

  return ret


def clear_apn_fields_in_default_value(carrier_settings):

  def clean(apn):
    if apn.HasField('bearer_bitmask') and apn.bearer_bitmask == '0':
      apn.ClearField('bearer_bitmask')
    return apn

  for apn in carrier_settings.apns.apn:
    clean(apn)
  return carrier_settings


def merge_carrier_settings(patch, carrier_file):
  """Merge a CarrierSettings into a base CarrierSettings in textpb file.

  This function merge apns only. It assumes that the patch and base have the
  same canonical_name.

  Args:
    patch: the carrier_settings to be merged
    carrier_file: the path to the base carrier_settings file
  """
  # Load base
  with open(carrier_file, 'r', encoding='utf-8') as f:
    base_setting = text_format.ParseLines(f,
                                          carrier_settings_pb2.CarrierSettings())

  clean_apn(patch)
  clean_apn(base_setting)

  # Merge apns
  apns = base_setting.apns.apn[:]
  apns = merge_apns(apns, patch.apns.apn[:])
  del base_setting.apns.apn[:]
  base_setting.apns.apn.extend(apns)

  # Write back
  with open(carrier_file, 'w', encoding='utf-8') as f:
    text_format.PrintMessage(base_setting, f, as_utf8=True)


def merge_multi_carrier_settings(patch_list, carrier_file):
  """Merge CarrierSettings into a base MultiCarrierSettings in textpb file.

  This function merge apns only. The base may or may not contains an entry with
  the same canonical_name as the patch.

  Args:
    patch_list: a list of CarrierSettings to be merged
    carrier_file: the path to the base MultiCarrierSettings file
  """
  # Load base
  with open(carrier_file, 'r', encoding='utf-8') as f:
    base_settings = text_format.ParseLines(
        f, carrier_settings_pb2.MultiCarrierSettings())

  for patch in patch_list:
    clean_apn(patch)
    # find the (first and only) entry with patch.canonical_name and update it.
    for setting in base_settings.setting:
      if setting.canonical_name == patch.canonical_name:
        clean_apn(setting)
        apns = setting.apns.apn[:]
        apns = merge_apns(apns, patch.apns.apn[:])
        del setting.apns.apn[:]
        setting.apns.apn.extend(apns)
        break
    # Or if no match, append it to base_settings
    else:
      base_settings.setting.extend([patch])

  # Write back
  with open(carrier_file, 'w', encoding='utf-8') as f:
    text_format.PrintMessage(base_settings, f, as_utf8=True)


def add_new_carriers(cnames, carrier_list_file):
  """Add a new carrier into a CarrierList in textpb file.

  The carrier_id of the new carrier is induced from the cname, assuming
  that the cname is constructed by to_string.

  Args:
    cnames: a list of canonical_name of new carriers
    carrier_list_file: the path to the CarrierList textpb file

  Returns:
    None
  """
  with open(carrier_list_file, 'r', encoding='utf-8') as f:
    carriers = text_format.ParseLines(f, carrier_list_pb2.CarrierList())

  for cname in cnames:
    # Append the new carrier
    new_carrier = carriers.entry.add()
    new_carrier.canonical_name = cname
    new_carrier.carrier_id.extend([to_carrier_id(cname)])

  tmp = sorted(carriers.entry, key=lambda c: c.canonical_name)
  del carriers.entry[:]
  carriers.entry.extend(tmp)

  with open(carrier_list_file, 'w', encoding='utf-8') as f:
    text_format.PrintMessage(carriers, f, as_utf8=True)


def add_apns_for_other_carriers_by_mccmnc(apns, tier1_carriers, other_carriers):
  """Add APNs for carriers in others.textpb that doesn't have APNs, by mccmnc.

  If a carrier defined as mccmnc + mvno_data doesn't hava APNs, it should use
  APNs from the carrier defined as mccmnc only.

  Modifies others.textpb file in-place.

  If a carriersettingstool.no_apn_for_mvno_bool is defined as true for a MVNO,
  the APNs from the corresponding MNO(by MCC/MNC) will not be used.

  Args:
    apns: a list of CarrierSettings message with APNs only.
    tier1_carriers: parsed tier-1 carriers list; must not contain new carriers.
      A dict, key is to_string(carrier_id), value is cname.
    other_carriers: parsed other carriers list; must not contain new carriers. A
      dict, key is to_string(carrier_id), value is cname.
  """
  # Convert apns from a list to a map, key being the canonical_name
  apns_dict = {
      carrier_settings.canonical_name: carrier_settings
      for carrier_settings in apns
  }

  others_textpb = '%s/setting/others.textpb' % FLAGS.data_dir
  with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file:
    others = text_format.ParseLines(others_textpb_file,
                                    carrier_settings_pb2.MultiCarrierSettings())

  for setting in others.setting:
    if not setting.HasField('apns'):
      carrier_id = to_carrier_id(setting.canonical_name)
      if carrier_id.HasField('mvno_data'):
        # in case we don't need MNO APN for this MVNO
        skip_mno_apn = False
        if setting.HasField('configs'):
          for conf in setting.configs.config:
            if conf.key == 'carriersettingstool.no_apn_for_mvno_bool':
              skip_mno_apn = conf.bool_value
              break
        if skip_mno_apn:
          continue
        carrier_id.ClearField('mvno_data')
        carrier_id_str_of_mccmnc = to_string(carrier_id)
        cname_of_mccmnc = tier1_carriers.get(
            carrier_id_str_of_mccmnc) or other_carriers.get(
                carrier_id_str_of_mccmnc)
        if cname_of_mccmnc:
          apn = apns_dict.get(cname_of_mccmnc)
          if apn:
            setting.apns.CopyFrom(apn.apns)

  sanitise_carrier_config(others.setting)

  with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file:
    text_format.PrintMessage(others, others_textpb_file, as_utf8=True)

def sanitise_carrier_config(setting):
  """Remove temparary carrier config items that's only used for conversion tool"""
  for carrier_setting in setting:
    if carrier_setting.HasField('configs'):
      configs = carrier_setting.configs.config[:]
      del carrier_setting.configs.config[:]
      for config in configs:
        if not config.key.startswith('carriersettingstool.'):
          carrier_setting.configs.config.append(config)

def add_carrierconfig_for_new_carriers(cnames, tier1_carriers, other_carriers):
  """Add carrier configs for new (non-tier-1) carriers.

   For new carriers, ie. carriers existing in APN but not CarrierConfig:
      - for <mccmnc>: copy carrier config of <mcc>.
      - for <mccmnc>(GID1|SPN|IMSI)=<mvnodata>: copy carrier config of <mccmnc>,
      or <mcc>.

  Modifies others.textpb file in-place.

  Args:
    cnames: a list of canonical_name of new carriers.
    tier1_carriers: parsed tier-1 carriers list; must not contain new carriers.
      A dict, key is to_string(carrier_id), value is cname.
    other_carriers: parsed other carriers list; must not contain new carriers. A
      dict, key is to_string(carrier_id), value is cname.
  """
  carrier_configs_map = {}

  others_textpb = '%s/setting/others.textpb' % FLAGS.data_dir
  with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file:
    others = text_format.ParseLines(others_textpb_file,
                                    carrier_settings_pb2.MultiCarrierSettings())
    for setting in others.setting:
      if setting.canonical_name in other_carriers:
        carrier_configs_map[setting.canonical_name] = setting.configs
  for cid_str, cname in tier1_carriers.items():
    tier1_textpb = '%s/setting/%s.textpb' % (FLAGS.data_dir, cname)
    with open(tier1_textpb, 'r', encoding='utf-8') as tier1_textpb_file:
      tier1 = text_format.ParseLines(tier1_textpb_file,
                                     carrier_settings_pb2.CarrierSettings())
      carrier_configs_map[cid_str] = tier1.configs

  for setting in others.setting:
    if setting.canonical_name in cnames:
      carrier_id = to_carrier_id(setting.canonical_name)
      mccmnc = carrier_id.mcc_mnc
      mcc = mccmnc[:3]
      if mccmnc in carrier_configs_map:
        setting.configs.config.extend(carrier_configs_map[mccmnc].config[:])
      elif mcc in carrier_configs_map:
        setting.configs.config.extend(carrier_configs_map[mcc].config[:])

  with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file:
    text_format.PrintMessage(others, others_textpb_file, as_utf8=True)


def cleanup_mcc_only_carriers():
  """Removes mcc-only carriers from other_carriers.textpb & others.textpb.

  Modifies other_carriers.textpb file & others.textpb file in-place.
  """
  mcc_only_carriers = set()

  with open(
      OTHER_CARRIERS_TEXTPB, 'r',
      encoding='utf-8') as other_carriers_textpb_file:
    other_carriers = text_format.ParseLines(other_carriers_textpb_file,
                                            carrier_list_pb2.CarrierList())

  other_carriers_entry_with_mccmnc = []
  for carrier in other_carriers.entry:
    for carrier_id in carrier.carrier_id:
      if len(carrier_id.mcc_mnc) == 3:
        mcc_only_carriers.add(carrier.canonical_name)
      else:
        other_carriers_entry_with_mccmnc.append(carrier)
  del other_carriers.entry[:]
  other_carriers.entry.extend(other_carriers_entry_with_mccmnc)

  # Finish early if no mcc_only_carriers; that means no file modification
  # required.
  if not mcc_only_carriers:
    return

  with open(
      OTHER_CARRIERS_TEXTPB, 'w',
      encoding='utf-8') as other_carriers_textpb_file:
    text_format.PrintMessage(
        other_carriers, other_carriers_textpb_file, as_utf8=True)

  others_textpb = os.path.join(FLAGS.data_dir, 'setting', 'others.textpb')
  with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file:
    others = text_format.ParseLines(others_textpb_file,
                                    carrier_settings_pb2.MultiCarrierSettings())
  copy_others_setting = others.setting[:]
  del others.setting[:]
  others.setting.extend([
      setting for setting in copy_others_setting
      if setting.canonical_name not in mcc_only_carriers
  ])

  with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file:
    text_format.PrintMessage(others, others_textpb_file, as_utf8=True)


def main():
  apns = get_input(FLAGS.in_file).setting
  tier1_carriers = get_knowncarriers([TIER1_CARRIERS_TEXTPB])
  other_carriers = get_knowncarriers([OTHER_CARRIERS_TEXTPB])
  new_carriers = []

  # Step 1a: merge APNs into CarrierConfigs by canonical name.
  # Also find out "new carriers" existing in APNs but not in CarrierConfigs.
  other_carriers_patch = []
  for carrier_settings in apns:
    carrier_settings = clear_apn_fields_in_default_value(carrier_settings)

    cname = carrier_settings.canonical_name
    if cname in tier1_carriers.values():
      merge_carrier_settings(carrier_settings,
                             '%s/setting/%s.textpb' % (FLAGS.data_dir, cname))
    else:
      other_carriers_patch.append(carrier_settings)
      if cname not in other_carriers.values():
        new_carriers.append(cname)

  merge_multi_carrier_settings(other_carriers_patch,
                               '%s/setting/others.textpb' % FLAGS.data_dir)

  # Step 1b: populate carrier configs for new carriers.
  add_carrierconfig_for_new_carriers(new_carriers, tier1_carriers,
                                     other_carriers)

  # Step 2: merge new carriers into non-tier1 carrier list.
  add_new_carriers(new_carriers, OTHER_CARRIERS_TEXTPB)
  # Update other_carriers map
  other_carriers = get_knowncarriers([OTHER_CARRIERS_TEXTPB])

  # Step 3: merge APNs into CarrierConfigs by mccmnc: for a carrier defined
  # as mccmnc + gid/spn/imsi, if it doesn't have any APNs, it should use APNs
  # from carrier defined as mccmnc only.
  # Only handle non-tier1 carriers, as tier1 carriers are assumed to be better
  # maintained and are already having APNs defined.
  add_apns_for_other_carriers_by_mccmnc(apns, tier1_carriers, other_carriers)

  # Step 4: clean up mcc-only carriers; they're used in step 3 but should not
  # be in final carrier settings to avoid confusing CarrierSettings app.
  cleanup_mcc_only_carriers()


if __name__ == '__main__':
  main()
