From 89a443be1b25493360609db7c85b5d9d3bbf0fde Mon Sep 17 00:00:00 2001 From: Julian Rother Date: Sun, 25 May 2025 03:55:34 +0200 Subject: [PATCH] Initial commit --- netbox-aruba-instant-sync.py | 549 +++++++++++++++++++++++++++++++++++ 1 file changed, 549 insertions(+) create mode 100755 netbox-aruba-instant-sync.py diff --git a/netbox-aruba-instant-sync.py b/netbox-aruba-instant-sync.py new file mode 100755 index 0000000..35830bf --- /dev/null +++ b/netbox-aruba-instant-sync.py @@ -0,0 +1,549 @@ +#!/usr/bin/python3 + +# Aruba Instant Netbox Sync +# +# Syncs APs from the Aruba Instant VC to Netbox. Syncs AP-specific config +# (hostname, zone, ...) and ssid-profiles from Netbox to the Aruba Instant +# VC and the individual APs. +# +# Requirements in Netbox: +# - Platform with slug "aruba-instant" +# - Device Types for each model with slug "aruba-instant-", e.g. "aruba-instant-515" +# - Device Types need to have "aruba-instant" as the default platform +# - All created AP Devices need to keep "aruba-instant" as their platform +# - Device Types and AP Devices need an interface with name "E0" (it links them to APs in the VC) +# - Config Template AP Devices with "show ap-env" syntax: +# name:my-ap-123 +# uplink_vlan:23 +# iap_zone:foobar +# - Platform "aruba-instant" must have such a Config Template as the default template +# - Export template for Devices with name "device_configs.json" and content type "application/json": +# [ +# {%- for device in queryset if device.get_config_template() %} +# {%- set config_template = device.get_config_template() %} +# {%- set tmp = dict(id=device.id, name=device.name, config=device.get_config_template().render(context=dict(device.get_config_context(), device=device))) %} +# {%- set mac_addresses = device.interfaces.all()|map(attribute='mac_address')|select|list %} +# {%- set tmp = dict(tmp, **({'mac_address': mac_addresses[0]|string} if mac_addresses else {})) %} +# {{ tmp|tojson }}{{ "," if not loop.last }} +# {%- endfor %} +# ] +# - Export template for Wireless LANs with name "aruba_instant_vc_wlans.conf" +# that generates ssid-profile blocks with "netbox-"-prefixed profile names. +# SSID profiles have mandatory commands. Commands inside the ssid-profile +# blocks need to be indented. Do not include the "index" command. Secrets +# (like wpa-passphrase) must not be encrypted (as it would be in +# "show running-config" output or the config backups). +# Example: +# wlan ssid-profile netbox-test +# enable +# type employee +# essid test +# wpa-passphrase foobar +# opmode wpa2-psk-aes +# max-authentication-failures 0 +# rf-band all +# captive-portal disable +# dtim-period 1 +# broadcast-filter arp +# dmo-channel-utilization-threshold 90 +# local-probe-req-thresh 0 +# max-clients-threshold 64 +# +# Limitations: +# - Incorrectly handles sequence-sensitive VC config commands +# - Supports only a few VC config commands (see CONFIG_COMMANDS) +# - Supports only a few ap-env variables (see AP_ENV_VARS) +# +# Run with the environment variables: +# ARUBA_INSTANT_CONDUCTOR_HOST=192.168.1.2 +# ARUBA_INSTANT_CONDUCTOR_PASSWORD=vcsecret +# NETBOX_URL=https://netbox.example.com +# NETBOX_TOKEN=netboxapitoken +# NETBOX_SITE=default-site-slug + +import re +import os +import sys +import logging +logger = logging.getLogger(__name__) + +import netmiko +import pynetbox +from netmiko.aruba import ArubaSSH +from cryptography.hazmat.primitives.ciphers import Cipher +from cryptography.hazmat.primitives.ciphers.modes import CBC +try: + from cryptography.hazmat.primitives.ciphers.algorithms import TripleDES +except ImportError: + from cryptography.hazmat.decrepit.ciphers.algorithms import TripleDES + +# Secrets in the VC config as returned by "show running-config" and similar +# means Triple DES encrypted. While applying config commands, secrets must be +# unencrypted (even for "no ..." commands), so this complicates both change +# detection and generation. Luckily the key static and publicly known (making +# the encryption pointless from a security perspective). We transparently +# decrypt parameters for specific commands when reading the config and +# otherwise pretend this encryption thing does not exist. +# +# See also https://seclists.org/fulldisclosure/2016/May/19 + +CONFIG_SECRET_PREFIX_REGEX = re.compile(r'^ *(snmp-server community|wpa-passphrase) +') + +def decrypt_config_secret(hex_value): + key = b'\x32\x74\x10\x84\x91\x17\x75\x46\x14\x75\x82\x92\x43\x49\x04\x59\x18\x69\x15\x94\x27\x84\x30\x03' + value = bytes.fromhex(hex_value) + iv = value[:8] + ct = value[8:] + decryptor = Cipher(TripleDES(key), CBC(iv)).decryptor() + pt = (decryptor.update(ct) + decryptor.finalize()).rstrip(b'\x00') + return pt.decode() + +def decrypt_config(text): + lines = [] + for line in text.split('\n'): + if m := CONFIG_SECRET_PREFIX_REGEX.match(line): + line = line[:m.end()] + decrypt_config_secret(line[m.end():]) + lines.append(line) + return '\n'.join(lines) + +# The VC config commands have somewhat inconsistent syntax: +# - "virtual-controller-country " has a parameter and executing it with a +# different value overwrites the old setting, "no virtual-controller-country" +# clears the setting +# - "rf-band " is always present, can be updated but not cleared with "no ..." +# - "allowed-ap " has a parameter but it is list-like, executing it with a +# different value adds it, "no allowed-ap " removes it. +# - "wlan ssid-profile " starts a block, the whole block can be cleared +# with "no wlan ssid-profile " +# - "rf dot11g-radio-profile " starts a block, as does +# "rf dot11g-radio-profile" (without a parameter) +# - "allow-new-aps" is flag-like, it is either set or missing and can be +# cleared with "no allow-new-aps" +# - "disable" (in an ssid-profile block) is also flag-like, but either +# "disable" or "enable" is always present and either command overwrites +# the other +# +# We can detect from the prompt if a command started a block. We can +# generically compare two configs. However, there is no generic way to clear +# or update a setting without knowing the specific command syntax. +# So this gets messy ... + +CONFIG_COMMANDS = [ + # regex, key expr, clear expr + (r'(wlan ssid-profile .+)', r'\1', r'no \1'), + (r'enable|disable', r'enable', None), + (r'(index|type|opmode|max-authentication-failures|rf-band|captive-portal|dmo-channel-utilization-threshold|local-probe-req-thresh|max-clients-threshold) .+', r'\1', None), + (r'(zone|essid|wpa-passphrase|vlan|dtim-period|broadcast-filter) .+', r'\1', r'no \1'), + + (r'(wlan access-rule .+)', r'\1', r'no \1'), + (r'(rule .+)', r'\1', None), # TODO: Sequence Sensitive Command + + (r'(.+)', r'\1', None), +] + +CONFIG_COMMANDS = [ + (re.compile(regex), key_expr, clear_expr) + for regex, key_expr, clear_expr in CONFIG_COMMANDS +] + +# TODO: Correct character escaping +CONFIG_QUOTE_REGEX = re.compile(r'^ *(essid|wpa-passphrase) +') + +def parse_config(text): + result = {} + current_block = None + for line in text.split('\n'): + indented = line.startswith(' ') + line = line.strip() + if not line: + continue + if m := CONFIG_QUOTE_REGEX.match(line): + arg = line[m.end():] + if not arg.startswith('"') and not arg.endswith('"'): + line = line[:m.end()] + '"' + line[m.end():] + '"' + matching_cmd = None + for regex, key_expr, clear_expr in CONFIG_COMMANDS: + if m := regex.fullmatch(line): + matching_cmd = m.expand(key_expr), m.expand(clear_expr) if clear_expr is not None else None + break + assert matching_cmd + key, clear_cmd = matching_cmd + if not indented: + assert key not in result + result[key] = (line, clear_cmd, {}) + # key-value command cannot start a block + current_block = result[key] if key == line else None + else: + assert key not in current_block + current_block[2][key] = (line, clear_cmd) + return result + +# Most "show ..." commands return one or more ASCII tables preceded by a +# heading and followed by other stuff. E.g. "show upgrade info": +# +# swarm upgrade status +# -------------------- +# Mac IP Address Seed AP AP Class Status Image Info Error Detail +# --- ---------- ------- -------- ------ ---------- ------------ +# 12:34:56:78:9a:bc 192.168.12.34 No Draco image-ok From Seed none +# 12:34:56:78:9a:bd 192.168.12.35 No Draco image-ok From Seed none +# Auto reboot :enable +# Use external URL :disable +# Conductor wait Time :0 secs 0 count +# Switch Partition :enable +# Upgrade in process :No +# UAP convert process :No +# Cloud cert verify :disable +# Cloud cert check in process :No + +def parse_tables(text): + i = 0 + lines = text.split('\n') + tables = {} + + while i < len(lines) - 4: + # Check for title + if len(lines[i]) != len(lines[i+1]) or not re.match('^-+$', lines[i+1]): + i += 1 + continue + title = lines[i] + # Pre-check for columns + if len(lines[i+2]) != len(lines[i+3]) or not re.match('^-+ +-+[- ]*$', lines[i+3]): + i += 1 + continue + # Extract colums and column positions + column_line = lines[i+2] + dash_line = lines[i+3] + dash_line_i = 0 + column_regex = '^' + row_regex = '^' + while dash_line_i < len(dash_line): + m = re.match('^(-+)( *)', dash_line[dash_line_i:]) + assert m + dashes, spaces = m.groups() + if spaces: + # Most tables are fully right-padded with spaces. In those the last column is like any other column. + assert len(spaces) >= 2 + column_regex += f'(.{{{len(dashes)}}}) {{{len(spaces)}}}' + row_regex += f'(.{{{len(dashes) + len(spaces)}}})' + else: + # In tables that are not right-padded, the last column ends up here. + column_regex += '(.*)' + row_regex += '(.*)' + dash_line_i += len(dashes) + len(spaces) + column_regex += '$' + row_regex += '$' + column_names = list(re.match(column_regex, column_line).groups()) + assert column_names == [name.strip() for name in column_names] + assert column_names == [name.strip() for name in re.match(row_regex, column_line).groups()] + # At this point, we know for sure we have a table (possibly without any rows) + i += 4 + rows = [] + while i < len(lines): + m = re.match(row_regex, lines[i]) + if not m: + break + rows.append(dict(zip(column_names, [group.rstrip() for group in m.groups()]))) + i += 1 + tables[title] = rows + return tables + +# AP-specific config is stored as bootloader variables (as returned by +# "show ap-env" but updated with commands. Command names and behavior are +# inconsistent and cannot be derived from the bootloader variables. There is +# also no consistent way to get the current status in command form. Some +# variables are always present, some can be cleared with "no ...", some can +# be cleared with a special value. +# +# There is the command "ap-env ", but it always requires +# a reboot to apply the change. +# +# Again, pretty messy ... we maintain a list of known variables. +# +# We ignore unknown variables from APs, and reject them from Netbox configs. + +AP_ENV_VARS = { + # no unset_command = mandatory var + 'name': { + 'set_command': 'hostname {value}', + }, + 'uplink_vlan': { + 'set_command': 'uplink-vlan {value}', + 'unset_command': 'uplink-vlan 0', + }, + 'iap_zone': { + 'set_command': 'zonename {value}', + 'unset_command': 'no zonename', + }, + 'iap_conductor': { + 'set_command': 'iap-conductor', + 'unset_command': 'no iap-conductor', + }, +} + +class ArubaInstantSSH(ArubaSSH): + def __init__(self, dry_run=False, **kwargs): + super().__init__(**kwargs) + self.dry_run = dry_run + self.config_command_count = 0 + + def config_mode(self, **kwargs): + super().config_mode(**kwargs) + self.config_command_count = 0 + + def fetch_aps(self): + aps = {} # ip addr -> {...} + ap_table = list(parse_tables(self.send_command('show aps')).values())[0] + upgrade_table = parse_tables(self.send_command('show upgrade info'))['swarm upgrade status'] + for ap in ap_table: + ip_addr = ap['IP Address'].rstrip('*') + aps[ip_addr] = { + 'name': ap['Name'], + 'ip_address': ip_addr, + 'type': ap['Type'].split('(', 1)[0], + 'serial': ap['Serial #'], + 'mac_address': None, + } + for ap in upgrade_table: + aps[ap['IP Address']]['mac_address'] = ap['Mac'].upper() + return aps + + def fetch_ap_env(self, ip_address=None): + output = self.send_command('show ap-env' if not ip_address else f'show ap-env {ip_address}') + env = dict(line.split(':', 1) for line in output.split('\n') if ':' in line) + return {key: value for key, value in env.items() if key in AP_ENV_VARS} + + def update_ap_env(self, env, log_prefix=''): + logger.info(f'{log_prefix}Updating ap-env variables') + new_env = {key: value for key, value in env.items() if key in AP_ENV_VARS} + assert env == new_env + old_env = self.fetch_ap_env() + for key in new_env.keys() | old_env.keys(): + value = new_env.get(key) + if value == old_env.get(key): + continue + if not value: + command = AP_ENV_VARS[key]['unset_command'] + else: + command = AP_ENV_VARS[key]['set_command'].format(value=value) + logger.info(f'{log_prefix}CMD: {command}') + if self.dry_run: + continue + if value and key == 'name': + # "hostname " command changes the prompt + output = self.send_command_timing(command, delay_factor=5) + self.set_base_prompt() + else: + output = self.send_command(command) + if '%' in output or 'ERROR' in output: + logger.error(f'{log_prefix}Updating ap-env variable {key} failed: {output!r}') + if not self.dry_run and self.fetch_ap_env() != new_env: + logger.warning(f'{log_prefix}ap-env variables still differ after update') + + def fetch_vc_config(self): + return parse_config(decrypt_config(self.send_command('show running-config'))) + + def send_config_command(self, command, expect_inside_block=False): + self.config_command_count += 1 + command_without_secret = command + if m := CONFIG_SECRET_PREFIX_REGEX.match(command): + command_without_secret = command[:m.end()] + '' + logger.info(f'CMD: {command_without_secret}') + output = self.send_command(command, expect_string='\\) #', strip_prompt=False) + if '%' in output or 'ERROR' in output: + logger.critical(f'Config command error: {output!r}') + exit(2) + if '(config) #' in output: + if expect_inside_block: + logger.critical(f'Unexpected top-level config prompt after command') + exit(2) + elif not expect_inside_block: + logger.info('CMD: exit') + output2 = self.send_command('exit', expect_string='\\) #', strip_prompt=False) + if '(config) #' not in output2: + logger.critical(f'Unexpected prompt after command') + exit(2) + + def flush_config_if_needed(self, exit_config_mode=False): + if self.config_command_count >= 20 or exit_config_mode: + if not exit_config_mode: + logger.info('Committing config changes to avoid exceeding the CLI buffer size') + else: + logger.info('Committing config changes') + self.exit_config_mode() + logger.debug('Output of "show uncommitted-config":\n' + self.send_command('show uncommitted-config')) + if not self.dry_run: + output = self.send_command('commit apply') + if '%' in output or 'ERROR' in output: + logger.critical(f'Commit failed: {output!r}') + exit(2) + else: + self.send_command('commit revert') + if not exit_config_mode: + self.config_mode() + + def update_vc_config(self, new, old=None): + if old is None: + old = self.fetch_vc_config() + if new == old: + return False + + logger.info('Updating virtual controller config') + self.config_mode() + for block_key in old.keys() - new.keys(): + line, clear_cmd, block = old[block_key] + if clear_cmd is None: + logger.critical(f'Unable to clear command {line}') + exit(2) + self.send_config_command(clear_cmd) + self.flush_config_if_needed() + for block_key in new.keys() - old.keys(): + line, clear_cmd, block = new[block_key] + self.send_config_command(line, expect_inside_block=bool(block)) + for key in block: + line, clear_cmd = block[key] + self.send_config_command(line, expect_inside_block=True) + if block: + self.send_config_command('exit') + self.flush_config_if_needed() + for block_key in new.keys() & old.keys(): + if new[block_key] == old[block_key]: + continue + old_line, old_clear_cmd, old_block = old[block_key] + new_line, new_clear_cmd, new_block = new[block_key] + has_non_clearable = any(old_block[key][1] is None for key in old_block.keys() - new_block.keys()) + if has_non_clearable or old_line != new_line: + if old_clear_cmd is None: + logger.critical(f'Unable to clear command {old_line}') + exit(2) + self.send_config_command(old_clear_cmd) + self.send_config_command(new_line, expect_inside_block=bool(new_block)) + for key in new_block: + line, clear_cmd = new_block[key] + self.send_config_command(line, expect_inside_block=True) + if new_block: + self.send_config_command('exit') + self.flush_config_if_needed() + continue + self.send_config_command(new_line, expect_inside_block=True) + for key in old_block.keys() | new_block.keys(): + if old_block.get(key) == new_block.get(key): + continue + if key in old_block and key not in new_block: + self.send_config_command(old_block[key][1], expect_inside_block=True) + else: + self.send_config_command(new_block[key][0], expect_inside_block=True) + self.send_config_command('exit') + self.flush_config_if_needed() + self.flush_config_if_needed(exit_config_mode=True) + return True + +def run_sync(conductor_host, conductor_password, netbox_url, netbox_token, netbox_site, dry_run=False): + if dry_run: + logger.warning('Running in dry-run mode') + + conductor = ArubaInstantSSH(host=conductor_host, username='admin', password=conductor_password, dry_run=dry_run) + netbox = pynetbox.api(netbox_url, token=netbox_token) + + aps_by_mac = {ap['mac_address']: ap for ap in conductor.fetch_aps().values()} + device_config_by_mac = { + device_config.mac_address: device_config.config + for device_config in netbox.dcim.devices.filter(platform='aruba-instant', export='device_configs.json') + } + + device_types = { + device_type.slug.split('-', 2)[-1]: device_type + for device_type in netbox.dcim.device_types.filter(default_platform='aruba-instant') + if device_type.slug.startswith('aruba-ap-') + } + site_id = netbox.dcim.sites.get(slug=netbox_site).id + ap_role_id = netbox.dcim.device_roles.get(slug='access-point').id + + devices_created = False + for mac in aps_by_mac.keys() - device_config_by_mac.keys(): + ap = aps_by_mac[mac] + logger.info(f'Creating Netbox device for new AP {mac}') + ap_type = ap['type'] + if ap_type not in device_types: + logger.error(f'Cannot create AP: Netbox device type with slug "aruba-instant-{ap_type}" not found') + logger.info(f'Skipping new AP {mac}') + continue + device_type = device_types[ap_type] + if dry_run: + continue + device = netbox.dcim.devices.create( + serial=ap['serial'], + device_type=device_type.id, + site=site_id, + role=ap_role_id, + status='inventory', + ) + e0_interface = netbox.dcim.interfaces.get(device_id=device.id, name='E0') + e0_interface.primary_mac_address = netbox.dcim.mac_addresses.create( + mac_address=mac, + assigned_object_type='dcim.interface', + assigned_object_id=e0_interface.id + ) + e0_interface.save() + devices_created = True + if devices_created: + device_config_by_mac = { + device_config.mac_address: device_config.config + for device_config in netbox.dcim.devices.filter(platform='aruba-instant', export='device_configs.json') + } + + for mac, ap in aps_by_mac.items(): + old_env = conductor.fetch_ap_env(ap['ip_address']) + new_env = dict(line.split(':', 1) for line in device_config_by_mac[mac].split('\n') if ':' in line) + if unsupported_vars := ', '.join(new_env.keys() - AP_ENV_VARS.keys()): + logger.error(f'[{mac}] Unsupported ap-env variables in generated config: {unsupported_vars}') + logger.info(f'[{mac}] Skipping ap-env update') + continue + for key, args in AP_ENV_VARS.items(): + if 'unset_command' not in args and key not in new_env: + logger.error(f'[{mac}] Mandatory ap-env variable missing in generated config: {key}') + logger.info(f'[{mac}] Skipping ap-env update') + continue + if old_env != new_env: + with ArubaInstantSSH(host=ap['ip_address'], username='admin', password=conductor_password, dry_run=dry_run) as iap: + iap.update_ap_env(new_env, log_prefix=f'[{mac}] ') + + req = netbox.http_session.get( + f'{netbox.wireless.wireless_lans.url}/?export=aruba_instant_vc_wlans.conf', + headers={'authorization': f'Token {netbox.token}'} + ) + req.raise_for_status() + new_wlan_config = parse_config(req.text) + old_config = conductor.fetch_vc_config() + old_wlan_config = { + key: data + for key, data in old_config.items() + if key.startswith('wlan ssid-profile netbox-') or key.startswith('wlan access-rule netbox-') + } + for key, data in old_wlan_config.items(): + data[2].pop('index', None) + conductor.update_vc_config(old=old_wlan_config, new=new_wlan_config) + conductor.disconnect() + +if __name__ == '__main__': + class StdoutFilter(logging.Filter): + def filter(self, record): + return record.levelno <= logging.INFO + + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.setLevel(logging.DEBUG) + stdout_handler.addFilter(StdoutFilter()) + stderr_handler = logging.StreamHandler(sys.stderr) + stderr_handler.setLevel(logging.WARNING) + logger.setLevel(logging.DEBUG) + logger.addHandler(stdout_handler) + logger.addHandler(stderr_handler) + + run_sync( + conductor_host=os.environ['ARUBA_INSTANT_CONDUCTOR_HOST'], + conductor_password=os.environ['ARUBA_INSTANT_CONDUCTOR_PASSWORD'], + netbox_url=os.environ['NETBOX_URL'], + netbox_token=os.environ['NETBOX_TOKEN'], + netbox_site=os.environ['NETBOX_SITE'], + dry_run=False + )