Skip to content

Commit

Permalink
Profiles have associated signers
Browse files Browse the repository at this point in the history
  • Loading branch information
robertdfrench committed Jun 16, 2024
1 parent 1d13256 commit 9237119
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 177 deletions.
19 changes: 19 additions & 0 deletions tests/test_00_parse_pubkeys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from . import wmap

def test_parse_algorithm():
key = wmap.AuthorizedKey.parse("ssh-rsa abc123 blah blah blah")
assert(key.algorithm == wmap.Algorithm.RSA)

def test_parse_material():
key = wmap.AuthorizedKey.parse("ssh-rsa abc123 blah blah blah")
assert(key.material == "abc123")

def test_parse_comment():
key = wmap.AuthorizedKey.parse("ssh-rsa abc123 blah blah blah")
assert(key.comment == "blah blah blah")

def test_into_allowed_signer():
key = wmap.AuthorizedKey.parse("ssh-rsa abc123 blah blah blah")
profile = wmap.Profile("example")
signer = key.into_allowed_signer(profile)
assert(signer == "example namespaces=\"[email protected]\" ssh-rsa abc123")
14 changes: 14 additions & 0 deletions tests/test_01_parse_ssh_algorithm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import pytest
from . import wmap

def test_rsa():
rsa = wmap.Algorithm.parse("ssh-rsa")
assert(rsa == wmap.Algorithm.RSA)

def test_ed25519():
ed25519 = wmap.Algorithm.parse("ssh-ed25519")
assert(ed25519 == wmap.Algorithm.ED25519)

def test_bogus():
with pytest.raises(Exception):
wmap.Algorithm.parse("ssh-junk")
17 changes: 17 additions & 0 deletions tests/test_02_profile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from . import wmap

def test_key_url():
profile_url = "https://github.com/robertdfrench"
authorized_keys_url = "https://github.com/robertdfrench.keys"
profile = wmap.Profile(profile_url)
assert(profile.authorized_keys_url() == authorized_keys_url)

def test_fetch_authorized_keys_text():
profile = wmap.Profile("https://github.com/robertdfrench")
authorized_keys = profile.authorized_keys()
assert(len(authorized_keys) > 0)

def test_allowed_signers():
profile = wmap.Profile("https://github.com/robertdfrench")
for signer in profile.allowed_signers():
assert(signer.startswith("https://github.com/robertdfrench"))
4 changes: 4 additions & 0 deletions tests/test_03_private_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from . import wmap

def test_blah():
pass
4 changes: 0 additions & 4 deletions tests/test_one.py

This file was deleted.

233 changes: 60 additions & 173 deletions wmap
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,176 +1,63 @@
#!/usr/bin/env python3

import argparse
import subprocess
import os
import urllib.request
import tempfile
import json
import base64

# Define the namespace
NAMESPACE = 'namespaces="[email protected]"'

# Function to sign a message
def sign_message(private_key_path, message):
with tempfile.NamedTemporaryFile(delete=False) as message_file:
message_file.write(message)
message_file_path = message_file.name

signature_file_path = f"{message_file_path}.sig"

print(f"[DEBUG] Signing message:\n{message.decode('utf-8')}")
print(f"[DEBUG] Using message file: {message_file_path}")
print(f"[DEBUG] Signature file will be: {signature_file_path}")

# Sign the message using ssh-keygen
subprocess.run([
'ssh-keygen',
'-Y', 'sign',
'-f', private_key_path,
'-n', '[email protected]',
message_file_path
], check=True)

# Read the signature from the file
with open(signature_file_path, 'rb') as f:
signature = f.read().decode().strip()

print(f"[DEBUG] Signature:\n{signature}")

# Clean up the temporary files
os.remove(message_file_path)
os.remove(signature_file_path)

return signature

# Function to fetch public keys from a URL
def fetch_public_keys(url):
with urllib.request.urlopen(url) as response:
public_keys = response.read().decode()
return public_keys

# Function to create allowed signers file
def create_allowed_signers_file(principal, public_keys):
allowed_signers_file = tempfile.NamedTemporaryFile(delete=False)
with allowed_signers_file as f:
for key in public_keys.splitlines():
f.write(f"{principal} {NAMESPACE} {key}\n".encode())
print(f"[DEBUG] Allowed signers file content:\n{open(allowed_signers_file.name).read()}")
return allowed_signers_file.name

# Function to verify a signature
def verify_signature(allowed_signers_file, principal, message, signature):
with tempfile.NamedTemporaryFile(delete=False) as message_file:
message_file.write(message)
message_file_path = message_file.name

with tempfile.NamedTemporaryFile(delete=False) as signature_file:
signature_file.write(signature.encode())
signature_file_path = signature_file.name

print(f"[DEBUG] Verifying message:\n{message.decode('utf-8')}")
print(f"[DEBUG] Using message file: {message_file_path}")
print(f"[DEBUG] Using signature file: {signature_file_path}")

# Verify the signature using ssh-keygen
result = subprocess.run([
'ssh-keygen',
'-Y', 'verify',
'-f', allowed_signers_file,
'-I', principal,
'-n', 'file',
'-s', signature_file_path,
'-m', message_file_path
], capture_output=True, text=True)

print(f"[DEBUG] Verification result: {result.stdout.strip()}")
print(f"[DEBUG] Verification error (if any): {result.stderr.strip()}")

# Clean up the temporary files
os.remove(message_file_path)
# os.remove(signature_file_path)

return result.returncode == 0

# Function to create JSON document
def create_json_document(principal, message, signature):
message_base64 = base64.b64encode(message).decode('utf-8')

document = {
"principal": principal,
"message": message_base64,
"signature": signature
}

return json.dumps(document, indent=2)

# Subcommand to sign a file
def sign_command(args):
with open(args.file, 'rb') as f:
message = f.read()

signature = sign_message(args.private_key, message)

json_document = create_json_document(args.principal, message, signature)
print("JSON Document:")
print(json_document)

# Subcommand to verify a JSON document
def verify_command(args):
with open(args.json_file, 'r') as f:
document = json.load(f)

principal = document['principal']
message = base64.b64decode(document['message'])
signature = document['signature']

keys_url = f"{principal}.keys"
public_keys = fetch_public_keys(keys_url)
allowed_signers_file_path = create_allowed_signers_file(principal, public_keys)

is_valid = verify_signature(allowed_signers_file_path, principal, message, signature)
print(f"Signature valid: {is_valid}")

os.remove(allowed_signers_file_path)

# Subcommand to show allowed signers file
def diagnostic_command(args):
keys_url = f"{args.principal}.keys"
public_keys = fetch_public_keys(keys_url)

print(f"Allowed Signers File for {args.principal}:\n")
for key in public_keys.splitlines():
print(f"{args.principal} {NAMESPACE} {key}")

# Main function to handle command-line arguments
def main():
parser = argparse.ArgumentParser(description="Sign and verify messages using ssh-keygen")
subparsers = parser.add_subparsers(title="subcommands", description="valid subcommands", help="sub-command help", dest="subcommand")

# Subcommand to sign a file
parser_sign = subparsers.add_parser('sign', help="Sign a file")
parser_sign.add_argument('file', type=str, help="The file to sign")
parser_sign.add_argument('private_key', type=str, help="Path to the private key")
parser_sign.add_argument('principal', type=str, help="Principal (e.g., URL to GitHub profile)")
parser_sign.set_defaults(func=sign_command)

# Subcommand to verify a JSON document
parser_verify = subparsers.add_parser('verify', help="Verify a JSON document")
parser_verify.add_argument('json_file', type=str, help="The JSON file to verify")
parser_verify.set_defaults(func=verify_command)

# Subcommand to show allowed signers file
parser_diagnostic = subparsers.add_parser('diagnostic', help="Show allowed signers file for a principal")
parser_diagnostic.add_argument('principal', type=str, help="Principal (e.g., URL to GitHub profile)")
parser_diagnostic.set_defaults(func=diagnostic_command)

args = parser.parse_args()

if args.subcommand is None:
parser.print_help()
else:
args.func(args)
from dataclasses import dataclass
from enum import Enum, auto
from urllib import request

NAMESPACE = "[email protected]"

class Algorithm(Enum):
RSA = auto()
ED25519 = auto()

@classmethod
def parse(cls, string):
if string == "ssh-rsa":
return cls.RSA
elif string == "ssh-ed25519":
return cls.ED25519
else:
raise "Unknown ssh key algorithm"

def __str__(self):
if self == Algorithm.RSA:
return "ssh-rsa"
if self == Algorithm.ED25519:
return "ssh-ed25519"


@dataclass
class AuthorizedKey:
algorithm: Algorithm
material: str
comment: str

@classmethod
def parse(cls, string):
parts = string.split()
algorithm = Algorithm.parse(parts[0])
return cls(algorithm, parts[1], " ".join(parts[2:]))

def into_allowed_signer(self, profile):
principal = profile.url
algorithm = str(self.algorithm)
material = self.material
return f"{principal} namespaces=\"{NAMESPACE}\" {algorithm} {material}"


@dataclass
class Profile:
url: str

def authorized_keys_url(self):
return self.url + ".keys"

def authorized_keys(self):
with request.urlopen(self.authorized_keys_url()) as response:
lines = response.read().decode().splitlines()
return [AuthorizedKey.parse(line) for line in lines]

def allowed_signers(self):
return [k.into_allowed_signer(self) for k in self.authorized_keys()]

if __name__ == "__main__":
main()
print("Hello")

0 comments on commit 9237119

Please sign in to comment.