| # Copyright (C) 2020 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| r"""Read a MultiCarrierSettings file and update CarrierSettings data. |
| |
| For APNs in the input file, they are simply appended to the apn list of the |
| corresponding carrier in CarrierSettings data. If a new carrier (identified by |
| canonical_name) appears in input, the other_carriers.textpb will be updated. |
| |
| How to run: |
| |
| update_carrier_data.par \ |
| --in_file=/tmp/tmpapns.textpb \ |
| --data_dir=/tmp/carrier/data |
| """ |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| import argparse |
| import copy |
| import os |
| import compare |
| from google.protobuf import text_format |
| import carrier_list_pb2 |
| import carrier_settings_pb2 |
| |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| '--data_dir', default='./data', help='Folder path for CarrierSettings data') |
| parser.add_argument( |
| '--in_file', default='./tmpapns.textpb', help='Temp APN file') |
| FLAGS = parser.parse_args() |
| |
| TIER1_CARRIERS_TEXTPB = os.path.join(FLAGS.data_dir, 'tier1_carriers.textpb') |
| OTHER_CARRIERS_TEXTPB = os.path.join(FLAGS.data_dir, 'other_carriers.textpb') |
| |
| |
| def equals_apn(a, b): |
| """Tell if two ApnItem proto are the same.""" |
| a = compare.NormalizeRepeatedFields(copy.deepcopy(a)) |
| b = compare.NormalizeRepeatedFields(copy.deepcopy(b)) |
| # ignore 'name' field |
| a.ClearField('name') |
| b.ClearField('name') |
| return compare.Proto2Equals(a, b) |
| |
| |
| def find_apn(apn, apns): |
| """Tell if apn is in apns.""" |
| for a in apns: |
| if equals_apn(apn, a): |
| return True |
| return False |
| |
| |
| def merge_mms_apn(a, b): |
| """Try to merge mmsc fields of b into a, if that's the only diff.""" |
| aa = compare.NormalizeRepeatedFields(copy.deepcopy(a)) |
| bb = compare.NormalizeRepeatedFields(copy.deepcopy(b)) |
| # check if any fields other than mms are different |
| for field in ['name', 'mmsc_proxy', 'mmsc_proxy_port']: |
| aa.ClearField(field) |
| bb.ClearField(field) |
| if compare.Proto2Equals(aa, bb): |
| for field in ['mmsc_proxy', 'mmsc_proxy_port']: |
| if b.HasField(field): |
| setattr(a, field, getattr(b, field)) |
| |
| |
| def clean_apn(setting): |
| """Remove duplicated ApnItems from a CarrierSettings proto. |
| |
| Args: |
| setting: a CarrierSettings proto |
| |
| Returns: |
| None |
| """ |
| if not setting.HasField('apns') or len(setting.apns.apn) <= 1: |
| return |
| apns = setting.apns.apn[:] |
| cleaned_apns = [a for n, a in enumerate(apns) if not find_apn(a, apns[:n])] |
| del setting.apns.apn[:] |
| setting.apns.apn.extend(cleaned_apns) |
| |
| |
| def merge_apns(dest_apns, source_apns): |
| """Merge source_apns into dest_apns.""" |
| for apn in dest_apns: |
| for source in source_apns: |
| merge_mms_apn(apn, source) |
| ret = list(dest_apns) |
| for source in source_apns: |
| if not find_apn(source, ret): |
| ret.append(source) |
| return ret |
| |
| |
| def to_string(cid): |
| """Return the string representation of a CarrierId.""" |
| ret = cid.mcc_mnc |
| if cid.HasField('spn'): |
| ret += 'SPN=' + cid.spn.upper() |
| if cid.HasField('imsi'): |
| ret += 'IMSI=' + cid.imsi.upper() |
| if cid.HasField('gid1'): |
| ret += 'GID1=' + cid.gid1.upper() |
| return ret |
| |
| |
| def to_carrier_id(cid_string): |
| """Return the CarrierId from its string representation.""" |
| cid = carrier_list_pb2.CarrierId() |
| if 'SPN=' in cid_string: |
| ind = cid_string.find('SPN=') |
| cid.mcc_mnc = cid_string[:ind] |
| cid.spn = cid_string[ind + len('SPN='):] |
| elif 'IMSI=' in cid_string: |
| ind = cid_string.find('IMSI=') |
| cid.mcc_mnc = cid_string[:ind] |
| cid.imsi = cid_string[ind + len('IMSI='):] |
| elif 'GID1=' in cid_string: |
| ind = cid_string.find('GID1=') |
| cid.mcc_mnc = cid_string[:ind] |
| cid.gid1 = cid_string[ind + len('GID1='):] |
| else: |
| cid.mcc_mnc = cid_string |
| return cid |
| |
| |
| def get_input(path): |
| """Read input MultiCarrierSettings textpb file. |
| |
| Args: |
| path: the path to input MultiCarrierSettings textpb file |
| |
| Returns: |
| A MultiCarrierSettings. None when failed. |
| """ |
| mcs = None |
| with open(path, 'r', encoding='utf-8') as f: |
| mcs = carrier_settings_pb2.MultiCarrierSettings() |
| text_format.Merge(f.read(), mcs) |
| |
| return mcs |
| |
| |
| def get_knowncarriers(files): |
| """Create a mapping from mccmnc and possible mvno data to canonical name. |
| |
| Args: |
| files: list of paths to carrier list textpb files |
| |
| Returns: |
| A dict, key is to_string(carrier_id), value is cname. |
| """ |
| ret = dict() |
| for path in files: |
| with open(path, 'r', encoding='utf-8') as f: |
| carriers = carrier_list_pb2.CarrierList() |
| text_format.Merge(f.read(), carriers) |
| for carriermap in carriers.entry: |
| for cid in carriermap.carrier_id: |
| ret[to_string(cid)] = carriermap.canonical_name |
| |
| return ret |
| |
| |
| def clear_apn_fields_in_default_value(carrier_settings): |
| |
| def clean(apn): |
| if apn.HasField('bearer_bitmask') and apn.bearer_bitmask == '0': |
| apn.ClearField('bearer_bitmask') |
| return apn |
| |
| for apn in carrier_settings.apns.apn: |
| clean(apn) |
| return carrier_settings |
| |
| |
| def merge_carrier_settings(patch, carrier_file): |
| """Merge a CarrierSettings into a base CarrierSettings in textpb file. |
| |
| This function merge apns only. It assumes that the patch and base have the |
| same canonical_name. |
| |
| Args: |
| patch: the carrier_settings to be merged |
| carrier_file: the path to the base carrier_settings file |
| """ |
| # Load base |
| with open(carrier_file, 'r', encoding='utf-8') as f: |
| base_setting = text_format.ParseLines(f, |
| carrier_settings_pb2.CarrierSettings()) |
| |
| clean_apn(patch) |
| clean_apn(base_setting) |
| |
| # Merge apns |
| apns = base_setting.apns.apn[:] |
| apns = merge_apns(apns, patch.apns.apn[:]) |
| del base_setting.apns.apn[:] |
| base_setting.apns.apn.extend(apns) |
| |
| # Write back |
| with open(carrier_file, 'w', encoding='utf-8') as f: |
| text_format.PrintMessage(base_setting, f, as_utf8=True) |
| |
| |
| def merge_multi_carrier_settings(patch_list, carrier_file): |
| """Merge CarrierSettings into a base MultiCarrierSettings in textpb file. |
| |
| This function merge apns only. The base may or may not contains an entry with |
| the same canonical_name as the patch. |
| |
| Args: |
| patch_list: a list of CarrierSettings to be merged |
| carrier_file: the path to the base MultiCarrierSettings file |
| """ |
| # Load base |
| with open(carrier_file, 'r', encoding='utf-8') as f: |
| base_settings = text_format.ParseLines( |
| f, carrier_settings_pb2.MultiCarrierSettings()) |
| |
| for patch in patch_list: |
| clean_apn(patch) |
| # find the (first and only) entry with patch.canonical_name and update it. |
| for setting in base_settings.setting: |
| if setting.canonical_name == patch.canonical_name: |
| clean_apn(setting) |
| apns = setting.apns.apn[:] |
| apns = merge_apns(apns, patch.apns.apn[:]) |
| del setting.apns.apn[:] |
| setting.apns.apn.extend(apns) |
| break |
| # Or if no match, append it to base_settings |
| else: |
| base_settings.setting.extend([patch]) |
| |
| # Write back |
| with open(carrier_file, 'w', encoding='utf-8') as f: |
| text_format.PrintMessage(base_settings, f, as_utf8=True) |
| |
| |
| def add_new_carriers(cnames, carrier_list_file): |
| """Add a new carrier into a CarrierList in textpb file. |
| |
| The carrier_id of the new carrier is induced from the cname, assuming |
| that the cname is constructed by to_string. |
| |
| Args: |
| cnames: a list of canonical_name of new carriers |
| carrier_list_file: the path to the CarrierList textpb file |
| |
| Returns: |
| None |
| """ |
| with open(carrier_list_file, 'r', encoding='utf-8') as f: |
| carriers = text_format.ParseLines(f, carrier_list_pb2.CarrierList()) |
| |
| for cname in cnames: |
| # Append the new carrier |
| new_carrier = carriers.entry.add() |
| new_carrier.canonical_name = cname |
| new_carrier.carrier_id.extend([to_carrier_id(cname)]) |
| |
| tmp = sorted(carriers.entry, key=lambda c: c.canonical_name) |
| del carriers.entry[:] |
| carriers.entry.extend(tmp) |
| |
| with open(carrier_list_file, 'w', encoding='utf-8') as f: |
| text_format.PrintMessage(carriers, f, as_utf8=True) |
| |
| |
| def add_apns_for_other_carriers_by_mccmnc(apns, tier1_carriers, other_carriers): |
| """Add APNs for carriers in others.textpb that doesn't have APNs, by mccmnc. |
| |
| If a carrier defined as mccmnc + mvno_data doesn't hava APNs, it should use |
| APNs from the carrier defined as mccmnc only. |
| |
| Modifies others.textpb file in-place. |
| |
| If a carriersettingstool.no_apn_for_mvno_bool is defined as true for a MVNO, |
| the APNs from the corresponding MNO(by MCC/MNC) will not be used. |
| |
| Args: |
| apns: a list of CarrierSettings message with APNs only. |
| tier1_carriers: parsed tier-1 carriers list; must not contain new carriers. |
| A dict, key is to_string(carrier_id), value is cname. |
| other_carriers: parsed other carriers list; must not contain new carriers. A |
| dict, key is to_string(carrier_id), value is cname. |
| """ |
| # Convert apns from a list to a map, key being the canonical_name |
| apns_dict = { |
| carrier_settings.canonical_name: carrier_settings |
| for carrier_settings in apns |
| } |
| |
| others_textpb = '%s/setting/others.textpb' % FLAGS.data_dir |
| with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file: |
| others = text_format.ParseLines(others_textpb_file, |
| carrier_settings_pb2.MultiCarrierSettings()) |
| |
| for setting in others.setting: |
| if not setting.HasField('apns'): |
| carrier_id = to_carrier_id(setting.canonical_name) |
| if carrier_id.HasField('mvno_data'): |
| # in case we don't need MNO APN for this MVNO |
| skip_mno_apn = False |
| if setting.HasField('configs'): |
| for conf in setting.configs.config: |
| if conf.key == 'carriersettingstool.no_apn_for_mvno_bool': |
| skip_mno_apn = conf.bool_value |
| break |
| if skip_mno_apn: |
| continue |
| carrier_id.ClearField('mvno_data') |
| carrier_id_str_of_mccmnc = to_string(carrier_id) |
| cname_of_mccmnc = tier1_carriers.get( |
| carrier_id_str_of_mccmnc) or other_carriers.get( |
| carrier_id_str_of_mccmnc) |
| if cname_of_mccmnc: |
| apn = apns_dict.get(cname_of_mccmnc) |
| if apn: |
| setting.apns.CopyFrom(apn.apns) |
| |
| sanitise_carrier_config(others.setting) |
| |
| with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file: |
| text_format.PrintMessage(others, others_textpb_file, as_utf8=True) |
| |
| def sanitise_carrier_config(setting): |
| """Remove temparary carrier config items that's only used for conversion tool""" |
| for carrier_setting in setting: |
| if carrier_setting.HasField('configs'): |
| configs = carrier_setting.configs.config[:] |
| del carrier_setting.configs.config[:] |
| for config in configs: |
| if not config.key.startswith('carriersettingstool.'): |
| carrier_setting.configs.config.append(config) |
| |
| def add_carrierconfig_for_new_carriers(cnames, tier1_carriers, other_carriers): |
| """Add carrier configs for new (non-tier-1) carriers. |
| |
| For new carriers, ie. carriers existing in APN but not CarrierConfig: |
| - for <mccmnc>: copy carrier config of <mcc>. |
| - for <mccmnc>(GID1|SPN|IMSI)=<mvnodata>: copy carrier config of <mccmnc>, |
| or <mcc>. |
| |
| Modifies others.textpb file in-place. |
| |
| Args: |
| cnames: a list of canonical_name of new carriers. |
| tier1_carriers: parsed tier-1 carriers list; must not contain new carriers. |
| A dict, key is to_string(carrier_id), value is cname. |
| other_carriers: parsed other carriers list; must not contain new carriers. A |
| dict, key is to_string(carrier_id), value is cname. |
| """ |
| carrier_configs_map = {} |
| |
| others_textpb = '%s/setting/others.textpb' % FLAGS.data_dir |
| with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file: |
| others = text_format.ParseLines(others_textpb_file, |
| carrier_settings_pb2.MultiCarrierSettings()) |
| for setting in others.setting: |
| if setting.canonical_name in other_carriers: |
| carrier_configs_map[setting.canonical_name] = setting.configs |
| for cid_str, cname in tier1_carriers.items(): |
| tier1_textpb = '%s/setting/%s.textpb' % (FLAGS.data_dir, cname) |
| with open(tier1_textpb, 'r', encoding='utf-8') as tier1_textpb_file: |
| tier1 = text_format.ParseLines(tier1_textpb_file, |
| carrier_settings_pb2.CarrierSettings()) |
| carrier_configs_map[cid_str] = tier1.configs |
| |
| for setting in others.setting: |
| if setting.canonical_name in cnames: |
| carrier_id = to_carrier_id(setting.canonical_name) |
| mccmnc = carrier_id.mcc_mnc |
| mcc = mccmnc[:3] |
| if mccmnc in carrier_configs_map: |
| setting.configs.config.extend(carrier_configs_map[mccmnc].config[:]) |
| elif mcc in carrier_configs_map: |
| setting.configs.config.extend(carrier_configs_map[mcc].config[:]) |
| |
| with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file: |
| text_format.PrintMessage(others, others_textpb_file, as_utf8=True) |
| |
| |
| def cleanup_mcc_only_carriers(): |
| """Removes mcc-only carriers from other_carriers.textpb & others.textpb. |
| |
| Modifies other_carriers.textpb file & others.textpb file in-place. |
| """ |
| mcc_only_carriers = set() |
| |
| with open( |
| OTHER_CARRIERS_TEXTPB, 'r', |
| encoding='utf-8') as other_carriers_textpb_file: |
| other_carriers = text_format.ParseLines(other_carriers_textpb_file, |
| carrier_list_pb2.CarrierList()) |
| |
| other_carriers_entry_with_mccmnc = [] |
| for carrier in other_carriers.entry: |
| for carrier_id in carrier.carrier_id: |
| if len(carrier_id.mcc_mnc) == 3: |
| mcc_only_carriers.add(carrier.canonical_name) |
| else: |
| other_carriers_entry_with_mccmnc.append(carrier) |
| del other_carriers.entry[:] |
| other_carriers.entry.extend(other_carriers_entry_with_mccmnc) |
| |
| # Finish early if no mcc_only_carriers; that means no file modification |
| # required. |
| if not mcc_only_carriers: |
| return |
| |
| with open( |
| OTHER_CARRIERS_TEXTPB, 'w', |
| encoding='utf-8') as other_carriers_textpb_file: |
| text_format.PrintMessage( |
| other_carriers, other_carriers_textpb_file, as_utf8=True) |
| |
| others_textpb = os.path.join(FLAGS.data_dir, 'setting', 'others.textpb') |
| with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file: |
| others = text_format.ParseLines(others_textpb_file, |
| carrier_settings_pb2.MultiCarrierSettings()) |
| copy_others_setting = others.setting[:] |
| del others.setting[:] |
| others.setting.extend([ |
| setting for setting in copy_others_setting |
| if setting.canonical_name not in mcc_only_carriers |
| ]) |
| |
| with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file: |
| text_format.PrintMessage(others, others_textpb_file, as_utf8=True) |
| |
| |
| def main(): |
| apns = get_input(FLAGS.in_file).setting |
| tier1_carriers = get_knowncarriers([TIER1_CARRIERS_TEXTPB]) |
| other_carriers = get_knowncarriers([OTHER_CARRIERS_TEXTPB]) |
| new_carriers = [] |
| |
| # Step 1a: merge APNs into CarrierConfigs by canonical name. |
| # Also find out "new carriers" existing in APNs but not in CarrierConfigs. |
| other_carriers_patch = [] |
| for carrier_settings in apns: |
| carrier_settings = clear_apn_fields_in_default_value(carrier_settings) |
| |
| cname = carrier_settings.canonical_name |
| if cname in tier1_carriers.values(): |
| merge_carrier_settings(carrier_settings, |
| '%s/setting/%s.textpb' % (FLAGS.data_dir, cname)) |
| else: |
| other_carriers_patch.append(carrier_settings) |
| if cname not in other_carriers.values(): |
| new_carriers.append(cname) |
| |
| merge_multi_carrier_settings(other_carriers_patch, |
| '%s/setting/others.textpb' % FLAGS.data_dir) |
| |
| # Step 1b: populate carrier configs for new carriers. |
| add_carrierconfig_for_new_carriers(new_carriers, tier1_carriers, |
| other_carriers) |
| |
| # Step 2: merge new carriers into non-tier1 carrier list. |
| add_new_carriers(new_carriers, OTHER_CARRIERS_TEXTPB) |
| # Update other_carriers map |
| other_carriers = get_knowncarriers([OTHER_CARRIERS_TEXTPB]) |
| |
| # Step 3: merge APNs into CarrierConfigs by mccmnc: for a carrier defined |
| # as mccmnc + gid/spn/imsi, if it doesn't have any APNs, it should use APNs |
| # from carrier defined as mccmnc only. |
| # Only handle non-tier1 carriers, as tier1 carriers are assumed to be better |
| # maintained and are already having APNs defined. |
| add_apns_for_other_carriers_by_mccmnc(apns, tier1_carriers, other_carriers) |
| |
| # Step 4: clean up mcc-only carriers; they're used in step 3 but should not |
| # be in final carrier settings to avoid confusing CarrierSettings app. |
| cleanup_mcc_only_carriers() |
| |
| |
| if __name__ == '__main__': |
| main() |