add token support to script to support ssh forced commands with tokens

This commit is contained in:
nd 2020-06-27 04:54:17 +02:00
parent cd1e83b833
commit 7dcf71c3ab
No known key found for this signature in database
GPG key ID: 21B5CD4DEE3670E9
4 changed files with 108 additions and 20 deletions

View file

@ -1,8 +1,10 @@
#!/usr/bin/env python3
import sys
from sys import stdin, stdout
from sys import stdin, stdout, stderr
import os
import socket
import sqlite3
import click
def setupdb():
conn = sqlite3.connect('{{ powerdns.letsencrypthandler.dbpath }}', isolation_level=None)
@ -13,24 +15,58 @@ def setupdb():
timestamp DEFAULT (strftime('%s','now'))
)
""")
conn.executescript("""
CREATE TABLE IF NOT EXISTS tokens (
token TEXT NOT NULL,
record TEXT NOT NULL,
last_used DEFAULT (strftime('%s','now')),
added DEFAULT (strftime('%s','now')),
UNIQUE(token, record)
)
""")
conn.commit()
return conn
def get_challenge(db, path):
def cleanup_db(db):
c = db.cursor()
c.execute('SELECT value FROM challenges WHERE q = ?', (path,))
c.execute("DELETE FROM challenges WHERE timestamp < strftime('%s', datetime('now','-2 day'))")
db.commit()
def get_challenge(db, record):
c = db.cursor()
c.execute('SELECT value FROM challenges WHERE q = ?', [record])
result = c.fetchall()
if result:
return result
else:
return ['NO DATA - ' + socket.gethostname()]
return [['NO DATA -' + socket.gethostname()]]
def add_challenge(db, path, value):
def add_challenge(db, record, value):
c = db.cursor()
c.execute('INSERT INTO challenges (q, value) VALUES(?, ?)', (path,value,))
c.execute("DELETE FROM challenges WHERE timestamp < strftime('%s', datetime('now','-2 day'))")
c.execute('INSERT INTO challenges (q, value) VALUES(?, ?)', [record, value])
db.commit()
def add_token(db, token, record):
c = db.cursor()
c.execute('INSERT OR IGNORE INTO tokens (token, record, last_used) VALUES(?, ?, NULL)', [token, record])
db.commit()
def token_has_access(db, token, record):
c = db.cursor()
c.execute('SELECT * FROM tokens WHERE token = ? AND record = ?', [token, record])
result = c.fetchall()
if result:
c.execute("UPDATE tokens SET last_used = strftime('%s','now') WHERE token = ? AND record = ?", [token, record])
db.commit()
return True
else:
return False
@click.group()
def cli():
pass
@cli.command(name="pdns_backend")
def main_query():
db = setupdb()
data = stdin.readline()
@ -38,6 +74,8 @@ def main_query():
stdout.flush()
while True:
data = stdin.readline().strip()
if not data:
continue
kind, qname, qclass, qtype, id, ip = data.split("\t")
if qtype == "SOA":
stdout.write("DATA\t" + qname + "\t" + qclass + "\t" + qtype + "\t300\t" + id + "\t")
@ -49,15 +87,50 @@ def main_query():
stdout.write("END\n")
stdout.flush()
def main_add_challenge():
@cli.command(name="add_challenge")
@click.argument('record')
@click.argument('challenge')
def main_add_challenge(record, challenge):
db = setupdb()
add_challenge(db ,sys.argv[1], sys.argv[2])
add_challenge(db, record, challenge)
cleanup_db(db)
def main():
if len(sys.argv) == 3:
main_add_challenge()
@cli.command(name="add_challenge_with_token")
@click.argument('token')
@click.argument('record')
@click.argument('challenge')
def main_add_challenge_with_token(token, record, challenge):
db = setupdb()
if token_has_access(db, token, record):
add_challenge(db, record, challenge)
cleanup_db(db)
exit(0)
else:
main_query()
cleanup_db(db)
print("Token does not have access to this record")
exit(1)
@cli.command(name="add_challenge_with_token_ssh")
@click.pass_context
def main_add_challenge_with_token_ssh(ctx):
# ssh passes the SSH_ORIGINAL_COMMAND environment variable if you use a forced command
# using this we can support auto renew hooks like this:
# ssh letsencrypt@server <token> <record> <challenge>
cmd = os.environ.get('SSH_ORIGINAL_COMMAND')
if not cmd:
exit(2)
cmd = cmd.strip().split(' ')
if len(cmd) != 3:
exit(3)
ctx.invoke(main_add_challenge_with_token, token=cmd[0], record=cmd[1], challenge= cmd[2])
@cli.command(name="add_token")
@click.argument('token')
@click.argument('record')
def main_add_token(token, record):
db = setupdb()
add_token(db, token, record)
cleanup_db(db)
if __name__ == '__main__':
main()
cli()