#!/usr/bin/python3 # Aruba Instant Netbox Sync # # Syncs APs from the Aruba Instant VC to Netbox. Syncs AP-specific config # (hostname, zone, ...) and ssid-profiles from Netbox to the Aruba Instant # VC and the individual APs. # # Requirements in Netbox: # - Platform with slug "aruba-instant" # - Device Types for each model with slug "aruba-instant-", e.g. "aruba-instant-515" # - Device Types need to have "aruba-instant" as the default platform # - All created AP Devices need to keep "aruba-instant" as their platform # - Device Types and AP Devices need an interface with name "E0" (it links them to APs in the VC) # - Config Template AP Devices with "show ap-env" syntax: # name:my-ap-123 # uplink_vlan:23 # iap_zone:foobar # - Platform "aruba-instant" must have such a Config Template as the default template # - Export template for Devices with name "device_configs.json" and content type "application/json": # [ # {%- for device in queryset if device.get_config_template() %} # {%- set config_template = device.get_config_template() %} # {%- set tmp = dict(id=device.id, name=device.name, config=device.get_config_template().render(context=dict(device.get_config_context(), device=device))) %} # {%- set mac_addresses = device.interfaces.all()|map(attribute='mac_address')|select|list %} # {%- set tmp = dict(tmp, **({'mac_address': mac_addresses[0]|string} if mac_addresses else {})) %} # {{ tmp|tojson }}{{ "," if not loop.last }} # {%- endfor %} # ] # - Export template for Wireless LANs with name "aruba_instant_vc_wlans.conf" # that generates ssid-profile blocks with "netbox-"-prefixed profile names. # SSID profiles have mandatory commands. Commands inside the ssid-profile # blocks need to be indented. Do not include the "index" command. Secrets # (like wpa-passphrase) must not be encrypted (as it would be in # "show running-config" output or the config backups). # Example: # wlan ssid-profile netbox-test # enable # type employee # essid test # wpa-passphrase foobar # opmode wpa2-psk-aes # max-authentication-failures 0 # rf-band all # captive-portal disable # dtim-period 1 # broadcast-filter arp # dmo-channel-utilization-threshold 90 # local-probe-req-thresh 0 # max-clients-threshold 64 # # Limitations: # - Incorrectly handles sequence-sensitive VC config commands # - Supports only a few VC config commands (see CONFIG_COMMANDS) # - Supports only a few ap-env variables (see AP_ENV_VARS) # # Run with the environment variables: # ARUBA_INSTANT_CONDUCTOR_HOST=192.168.1.2 # ARUBA_INSTANT_CONDUCTOR_PASSWORD=vcsecret # NETBOX_URL=https://netbox.example.com # NETBOX_TOKEN=netboxapitoken # NETBOX_SITE=default-site-slug import re import os import sys import logging logger = logging.getLogger(__name__) import netmiko import pynetbox from netmiko.aruba import ArubaSSH from cryptography.hazmat.primitives.ciphers import Cipher from cryptography.hazmat.primitives.ciphers.modes import CBC try: from cryptography.hazmat.primitives.ciphers.algorithms import TripleDES except ImportError: from cryptography.hazmat.decrepit.ciphers.algorithms import TripleDES # Secrets in the VC config as returned by "show running-config" and similar # means Triple DES encrypted. While applying config commands, secrets must be # unencrypted (even for "no ..." commands), so this complicates both change # detection and generation. Luckily the key static and publicly known (making # the encryption pointless from a security perspective). We transparently # decrypt parameters for specific commands when reading the config and # otherwise pretend this encryption thing does not exist. # # See also https://seclists.org/fulldisclosure/2016/May/19 CONFIG_SECRET_PREFIX_REGEX = re.compile(r'^ *(snmp-server community|wpa-passphrase) +') def decrypt_config_secret(hex_value): key = b'\x32\x74\x10\x84\x91\x17\x75\x46\x14\x75\x82\x92\x43\x49\x04\x59\x18\x69\x15\x94\x27\x84\x30\x03' value = bytes.fromhex(hex_value) iv = value[:8] ct = value[8:] decryptor = Cipher(TripleDES(key), CBC(iv)).decryptor() pt = (decryptor.update(ct) + decryptor.finalize()).rstrip(b'\x00') return pt.decode() def decrypt_config(text): lines = [] for line in text.split('\n'): if m := CONFIG_SECRET_PREFIX_REGEX.match(line): line = line[:m.end()] + decrypt_config_secret(line[m.end():]) lines.append(line) return '\n'.join(lines) # The VC config commands have somewhat inconsistent syntax: # - "virtual-controller-country " has a parameter and executing it with a # different value overwrites the old setting, "no virtual-controller-country" # clears the setting # - "rf-band " is always present, can be updated but not cleared with "no ..." # - "allowed-ap " has a parameter but it is list-like, executing it with a # different value adds it, "no allowed-ap " removes it. # - "wlan ssid-profile " starts a block, the whole block can be cleared # with "no wlan ssid-profile " # - "rf dot11g-radio-profile " starts a block, as does # "rf dot11g-radio-profile" (without a parameter) # - "allow-new-aps" is flag-like, it is either set or missing and can be # cleared with "no allow-new-aps" # - "disable" (in an ssid-profile block) is also flag-like, but either # "disable" or "enable" is always present and either command overwrites # the other # # We can detect from the prompt if a command started a block. We can # generically compare two configs. However, there is no generic way to clear # or update a setting without knowing the specific command syntax. # So this gets messy ... CONFIG_COMMANDS = [ # regex, key expr, clear expr (r'(wlan ssid-profile .+)', r'\1', r'no \1'), (r'enable|disable', r'enable', None), (r'(index|type|opmode|max-authentication-failures|rf-band|captive-portal|dmo-channel-utilization-threshold|local-probe-req-thresh|max-clients-threshold) .+', r'\1', None), (r'(zone|essid|wpa-passphrase|vlan|dtim-period|broadcast-filter) .+', r'\1', r'no \1'), (r'(wlan access-rule .+)', r'\1', r'no \1'), (r'(rule .+)', r'\1', None), # TODO: Sequence Sensitive Command (r'(.+)', r'\1', None), ] CONFIG_COMMANDS = [ (re.compile(regex), key_expr, clear_expr) for regex, key_expr, clear_expr in CONFIG_COMMANDS ] # TODO: Correct character escaping CONFIG_QUOTE_REGEX = re.compile(r'^ *(essid|wpa-passphrase) +') def parse_config(text): result = {} current_block = None for line in text.split('\n'): indented = line.startswith(' ') line = line.strip() if not line: continue if m := CONFIG_QUOTE_REGEX.match(line): arg = line[m.end():] if not arg.startswith('"') and not arg.endswith('"'): line = line[:m.end()] + '"' + line[m.end():] + '"' matching_cmd = None for regex, key_expr, clear_expr in CONFIG_COMMANDS: if m := regex.fullmatch(line): matching_cmd = m.expand(key_expr), m.expand(clear_expr) if clear_expr is not None else None break assert matching_cmd key, clear_cmd = matching_cmd if not indented: assert key not in result result[key] = (line, clear_cmd, {}) # key-value command cannot start a block current_block = result[key] if key == line else None else: assert key not in current_block current_block[2][key] = (line, clear_cmd) return result # Most "show ..." commands return one or more ASCII tables preceded by a # heading and followed by other stuff. E.g. "show upgrade info": # # swarm upgrade status # -------------------- # Mac IP Address Seed AP AP Class Status Image Info Error Detail # --- ---------- ------- -------- ------ ---------- ------------ # 12:34:56:78:9a:bc 192.168.12.34 No Draco image-ok From Seed none # 12:34:56:78:9a:bd 192.168.12.35 No Draco image-ok From Seed none # Auto reboot :enable # Use external URL :disable # Conductor wait Time :0 secs 0 count # Switch Partition :enable # Upgrade in process :No # UAP convert process :No # Cloud cert verify :disable # Cloud cert check in process :No def parse_tables(text): i = 0 lines = text.split('\n') tables = {} while i < len(lines) - 4: # Check for title if len(lines[i]) != len(lines[i+1]) or not re.match('^-+$', lines[i+1]): i += 1 continue title = lines[i] # Pre-check for columns if len(lines[i+2]) != len(lines[i+3]) or not re.match('^-+ +-+[- ]*$', lines[i+3]): i += 1 continue # Extract colums and column positions column_line = lines[i+2] dash_line = lines[i+3] dash_line_i = 0 column_regex = '^' row_regex = '^' while dash_line_i < len(dash_line): m = re.match('^(-+)( *)', dash_line[dash_line_i:]) assert m dashes, spaces = m.groups() if spaces: # Most tables are fully right-padded with spaces. In those the last column is like any other column. assert len(spaces) >= 2 column_regex += f'(.{{{len(dashes)}}}) {{{len(spaces)}}}' row_regex += f'(.{{{len(dashes) + len(spaces)}}})' else: # In tables that are not right-padded, the last column ends up here. column_regex += '(.*)' row_regex += '(.*)' dash_line_i += len(dashes) + len(spaces) column_regex += '$' row_regex += '$' column_names = list(re.match(column_regex, column_line).groups()) assert column_names == [name.strip() for name in column_names] assert column_names == [name.strip() for name in re.match(row_regex, column_line).groups()] # At this point, we know for sure we have a table (possibly without any rows) i += 4 rows = [] while i < len(lines): m = re.match(row_regex, lines[i]) if not m: break rows.append(dict(zip(column_names, [group.rstrip() for group in m.groups()]))) i += 1 tables[title] = rows return tables # AP-specific config is stored as bootloader variables (as returned by # "show ap-env" but updated with commands. Command names and behavior are # inconsistent and cannot be derived from the bootloader variables. There is # also no consistent way to get the current status in command form. Some # variables are always present, some can be cleared with "no ...", some can # be cleared with a special value. # # There is the command "ap-env ", but it always requires # a reboot to apply the change. # # Again, pretty messy ... we maintain a list of known variables. # # We ignore unknown variables from APs, and reject them from Netbox configs. AP_ENV_VARS = { # no unset_command = mandatory var 'name': { 'set_command': 'hostname {value}', }, 'uplink_vlan': { 'set_command': 'uplink-vlan {value}', 'unset_command': 'uplink-vlan 0', }, 'iap_zone': { 'set_command': 'zonename {value}', 'unset_command': 'no zonename', }, 'iap_conductor': { 'set_command': 'iap-conductor', 'unset_command': 'no iap-conductor', }, } class ArubaInstantSSH(ArubaSSH): def __init__(self, dry_run=False, **kwargs): super().__init__(**kwargs) self.dry_run = dry_run self.config_command_count = 0 def config_mode(self, **kwargs): super().config_mode(**kwargs) self.config_command_count = 0 def fetch_aps(self): aps = {} # ip addr -> {...} ap_table = list(parse_tables(self.send_command('show aps')).values())[0] upgrade_table = parse_tables(self.send_command('show upgrade info'))['swarm upgrade status'] for ap in ap_table: ip_addr = ap['IP Address'].rstrip('*') aps[ip_addr] = { 'name': ap['Name'], 'ip_address': ip_addr, 'type': ap['Type'].split('(', 1)[0], 'serial': ap['Serial #'], 'mac_address': None, } for ap in upgrade_table: aps[ap['IP Address']]['mac_address'] = ap['Mac'].upper() return aps def fetch_ap_env(self, ip_address=None): output = self.send_command('show ap-env' if not ip_address else f'show ap-env {ip_address}') env = dict(line.split(':', 1) for line in output.split('\n') if ':' in line) return {key: value for key, value in env.items() if key in AP_ENV_VARS} def update_ap_env(self, env, log_prefix=''): logger.info(f'{log_prefix}Updating ap-env variables') new_env = {key: value for key, value in env.items() if key in AP_ENV_VARS} assert env == new_env old_env = self.fetch_ap_env() for key in new_env.keys() | old_env.keys(): value = new_env.get(key) if value == old_env.get(key): continue if not value: command = AP_ENV_VARS[key]['unset_command'] else: command = AP_ENV_VARS[key]['set_command'].format(value=value) logger.info(f'{log_prefix}CMD: {command}') if self.dry_run: continue if value and key == 'name': # "hostname " command changes the prompt output = self.send_command_timing(command, delay_factor=5) self.set_base_prompt() else: output = self.send_command(command) if '%' in output or 'ERROR' in output: logger.error(f'{log_prefix}Updating ap-env variable {key} failed: {output!r}') if not self.dry_run and self.fetch_ap_env() != new_env: logger.warning(f'{log_prefix}ap-env variables still differ after update') def fetch_vc_config(self): return parse_config(decrypt_config(self.send_command('show running-config'))) def send_config_command(self, command, expect_inside_block=False): self.config_command_count += 1 command_without_secret = command if m := CONFIG_SECRET_PREFIX_REGEX.match(command): command_without_secret = command[:m.end()] + '' logger.info(f'CMD: {command_without_secret}') output = self.send_command(command, expect_string='\\) #', strip_prompt=False) if '%' in output or 'ERROR' in output: logger.critical(f'Config command error: {output!r}') exit(2) if '(config) #' in output: if expect_inside_block: logger.critical(f'Unexpected top-level config prompt after command') exit(2) elif not expect_inside_block: logger.info('CMD: exit') output2 = self.send_command('exit', expect_string='\\) #', strip_prompt=False) if '(config) #' not in output2: logger.critical(f'Unexpected prompt after command') exit(2) def flush_config_if_needed(self, exit_config_mode=False): if self.config_command_count >= 20 or exit_config_mode: if not exit_config_mode: logger.info('Committing config changes to avoid exceeding the CLI buffer size') else: logger.info('Committing config changes') self.exit_config_mode() logger.debug('Output of "show uncommitted-config":\n' + self.send_command('show uncommitted-config')) if not self.dry_run: output = self.send_command('commit apply') if '%' in output or 'ERROR' in output: logger.critical(f'Commit failed: {output!r}') exit(2) else: self.send_command('commit revert') if not exit_config_mode: self.config_mode() def update_vc_config(self, new, old=None): if old is None: old = self.fetch_vc_config() if new == old: return False logger.info('Updating virtual controller config') self.config_mode() for block_key in old.keys() - new.keys(): line, clear_cmd, block = old[block_key] if clear_cmd is None: logger.critical(f'Unable to clear command {line}') exit(2) self.send_config_command(clear_cmd) self.flush_config_if_needed() for block_key in new.keys() - old.keys(): line, clear_cmd, block = new[block_key] self.send_config_command(line, expect_inside_block=bool(block)) for key in block: line, clear_cmd = block[key] self.send_config_command(line, expect_inside_block=True) if block: self.send_config_command('exit') self.flush_config_if_needed() for block_key in new.keys() & old.keys(): if new[block_key] == old[block_key]: continue old_line, old_clear_cmd, old_block = old[block_key] new_line, new_clear_cmd, new_block = new[block_key] has_non_clearable = any(old_block[key][1] is None for key in old_block.keys() - new_block.keys()) if has_non_clearable or old_line != new_line: if old_clear_cmd is None: logger.critical(f'Unable to clear command {old_line}') exit(2) self.send_config_command(old_clear_cmd) self.send_config_command(new_line, expect_inside_block=bool(new_block)) for key in new_block: line, clear_cmd = new_block[key] self.send_config_command(line, expect_inside_block=True) if new_block: self.send_config_command('exit') self.flush_config_if_needed() continue self.send_config_command(new_line, expect_inside_block=True) for key in old_block.keys() | new_block.keys(): if old_block.get(key) == new_block.get(key): continue if key in old_block and key not in new_block: self.send_config_command(old_block[key][1], expect_inside_block=True) else: self.send_config_command(new_block[key][0], expect_inside_block=True) self.send_config_command('exit') self.flush_config_if_needed() self.flush_config_if_needed(exit_config_mode=True) return True def run_sync(conductor_host, conductor_password, netbox_url, netbox_token, netbox_site, dry_run=False): if dry_run: logger.warning('Running in dry-run mode') conductor = ArubaInstantSSH(host=conductor_host, username='admin', password=conductor_password, dry_run=dry_run) netbox = pynetbox.api(netbox_url, token=netbox_token) aps_by_mac = {ap['mac_address']: ap for ap in conductor.fetch_aps().values()} device_config_by_mac = { device_config.mac_address: device_config.config for device_config in netbox.dcim.devices.filter(platform='aruba-instant', export='device_configs.json') } device_types = { device_type.slug.split('-', 2)[-1]: device_type for device_type in netbox.dcim.device_types.filter(default_platform='aruba-instant') if device_type.slug.startswith('aruba-ap-') } site_id = netbox.dcim.sites.get(slug=netbox_site).id ap_role_id = netbox.dcim.device_roles.get(slug='access-point').id devices_created = False for mac in aps_by_mac.keys() - device_config_by_mac.keys(): ap = aps_by_mac[mac] logger.info(f'Creating Netbox device for new AP {mac}') ap_type = ap['type'] if ap_type not in device_types: logger.error(f'Cannot create AP: Netbox device type with slug "aruba-instant-{ap_type}" not found') logger.info(f'Skipping new AP {mac}') continue device_type = device_types[ap_type] if dry_run: continue device = netbox.dcim.devices.create( serial=ap['serial'], device_type=device_type.id, site=site_id, role=ap_role_id, status='inventory', ) e0_interface = netbox.dcim.interfaces.get(device_id=device.id, name='E0') e0_interface.primary_mac_address = netbox.dcim.mac_addresses.create( mac_address=mac, assigned_object_type='dcim.interface', assigned_object_id=e0_interface.id ) e0_interface.save() devices_created = True if devices_created: device_config_by_mac = { device_config.mac_address: device_config.config for device_config in netbox.dcim.devices.filter(platform='aruba-instant', export='device_configs.json') } for mac, ap in aps_by_mac.items(): old_env = conductor.fetch_ap_env(ap['ip_address']) new_env = dict(line.split(':', 1) for line in device_config_by_mac[mac].split('\n') if ':' in line) if unsupported_vars := ', '.join(new_env.keys() - AP_ENV_VARS.keys()): logger.error(f'[{mac}] Unsupported ap-env variables in generated config: {unsupported_vars}') logger.info(f'[{mac}] Skipping ap-env update') continue for key, args in AP_ENV_VARS.items(): if 'unset_command' not in args and key not in new_env: logger.error(f'[{mac}] Mandatory ap-env variable missing in generated config: {key}') logger.info(f'[{mac}] Skipping ap-env update') continue if old_env != new_env: with ArubaInstantSSH(host=ap['ip_address'], username='admin', password=conductor_password, dry_run=dry_run) as iap: iap.update_ap_env(new_env, log_prefix=f'[{mac}] ') req = netbox.http_session.get( f'{netbox.wireless.wireless_lans.url}/?export=aruba_instant_vc_wlans.conf', headers={'authorization': f'Token {netbox.token}'} ) req.raise_for_status() new_wlan_config = parse_config(req.text) old_config = conductor.fetch_vc_config() old_wlan_config = { key: data for key, data in old_config.items() if key.startswith('wlan ssid-profile netbox-') or key.startswith('wlan access-rule netbox-') } for key, data in old_wlan_config.items(): data[2].pop('index', None) conductor.update_vc_config(old=old_wlan_config, new=new_wlan_config) conductor.disconnect() if __name__ == '__main__': class StdoutFilter(logging.Filter): def filter(self, record): return record.levelno <= logging.INFO stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setLevel(logging.DEBUG) stdout_handler.addFilter(StdoutFilter()) stderr_handler = logging.StreamHandler(sys.stderr) stderr_handler.setLevel(logging.WARNING) logger.setLevel(logging.DEBUG) logger.addHandler(stdout_handler) logger.addHandler(stderr_handler) run_sync( conductor_host=os.environ['ARUBA_INSTANT_CONDUCTOR_HOST'], conductor_password=os.environ['ARUBA_INSTANT_CONDUCTOR_PASSWORD'], netbox_url=os.environ['NETBOX_URL'], netbox_token=os.environ['NETBOX_TOKEN'], netbox_site=os.environ['NETBOX_SITE'], dry_run=False )