64 lines
2.5 KiB
Python
64 lines
2.5 KiB
Python
#!/usr/bin/env python3
|
|
|
|
def validate_aliases(aliases):
|
|
for alias in aliases:
|
|
unknown_attributes = alias.keys() - {'addresses', 'senders', 'receivers', 'senders_and_receivers'}
|
|
if unknown_attributes:
|
|
raise ValueError('Unknown alias attributes: ' + ', '.join(unknown_attributes))
|
|
for address in alias['addresses']:
|
|
if not address.islower():
|
|
raise ValueError(f'Aliases are not case-sensitive. Make "{address}" all-lowercase!')
|
|
if address.strip() != address:
|
|
raise ValueError(f'Remove surrounding whitespace in "{address}"!')
|
|
|
|
class FilterModule(object):
|
|
def filters(self):
|
|
return {
|
|
'postfix_resolve_senders': self.postfix_resolve_senders,
|
|
'postfix_resolve_receivers': self.postfix_resolve_receivers,
|
|
}
|
|
|
|
def postfix_resolve_senders(self, aliases, match_sender_domain, allow_sender_domains=tuple()):
|
|
validate_aliases(aliases)
|
|
sender_suffix = '@' + match_sender_domain
|
|
address_sender_address_map = {}
|
|
for alias in aliases:
|
|
for address in alias['addresses']:
|
|
address_sender_address_map.setdefault(address, set())
|
|
address_sender_address_map[address] |= set(alias.get('senders', []))
|
|
address_sender_address_map[address] |= set(alias.get('senders_and_receivers', []))
|
|
for _ in range(100):
|
|
done = True
|
|
for address, senders in address_sender_address_map.items():
|
|
for sender in set(senders):
|
|
if sender.endswith(sender_suffix):
|
|
continue
|
|
done = False
|
|
if sender not in address_sender_address_map:
|
|
raise Exception(f'Address does not resolve to sender address: {sender}')
|
|
senders.remove(sender)
|
|
senders |= address_sender_address_map[sender]
|
|
if done:
|
|
break
|
|
if not done:
|
|
raise Exception('Recursion limit reached')
|
|
|
|
address_sender_name_map = {}
|
|
for address, senders in address_sender_address_map.items():
|
|
address_sender_name_map[address] = set()
|
|
for sender in senders:
|
|
username = sender[:-len(sender_suffix)]
|
|
address_sender_name_map[address].add(username)
|
|
for domain in allow_sender_domains:
|
|
address_sender_name_map[address].add(f'{username}@{domain}')
|
|
return address_sender_name_map
|
|
|
|
def postfix_resolve_receivers(self, aliases):
|
|
validate_aliases(aliases)
|
|
address_receiver_address_map = {}
|
|
for alias in aliases:
|
|
for address in alias['addresses']:
|
|
address_receiver_address_map.setdefault(address, set())
|
|
address_receiver_address_map[address] |= set(alias.get('receivers', []))
|
|
address_receiver_address_map[address] |= set(alias.get('senders_and_receivers', []))
|
|
return address_receiver_address_map
|