Skip to content

Commit

Permalink
add support for migrating from kvv1 to kvv2 and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
drewmullen committed Jan 7, 2025
1 parent e0de0b1 commit 8ebe62d
Showing 1 changed file with 58 additions and 63 deletions.
121 changes: 58 additions & 63 deletions kv_recursive.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,132 +1,124 @@
#!/usr/bin/python3

import hvac
import requests
import urllib3
import argparse
import logging
import os

# Configure logging

# Wrapper methods
LOG_LEVELS = {
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
}

log_level_name = os.getenv("KV_MIGRATE_LOG", "INFO").upper() # Default to "INFO"
log_level = LOG_LEVELS.get(log_level_name, logging.INFO) # Use INFO if invalid level

logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - %(message)s')

# Wrapper methods
def list_recursive(client, path, kv_version, source_mount):
logging.debug(f"Listing secrets recursively at path: {path}, kv_version: {kv_version}, mount: {source_mount}")
seed_list = list_path(client, path, kv_version, source_mount)
for i, li in enumerate(seed_list):
seed_list[i] = (path + li)
final_list = recursive_path_builder(client, seed_list, kv_version, source_mount)
logging.debug(f"Final list of secrets: {final_list}")
return final_list


def delete_recursive(client, path, kv_version, source_mount):
logging.info(f"Deleting secrets recursively from path: {path}, kv_version: {kv_version}, mount: {source_mount}")
kv_list = list_recursive(client, path, kv_version, source_mount)
delete_secrets_from_list(client, kv_list, kv_version, source_mount)
logging.info(f"Deleted secrets: {kv_list}")


def read_recursive(client, path, kv_version, source_mount):
kv_list = list_recursive(client, path, kv_version, source_mount)
secrets_list = read_secrets_from_list(client, kv_list, kv_version, source_mount)
def read_recursive(client, path, source_kv_version, source_mount):
logging.info(f"Reading secrets recursively from path: {path}, kv_version: {source_kv_version}, mount: {source_mount}")
kv_list = list_recursive(client, path, source_kv_version, source_mount)
secrets_list = read_secrets_from_list(client, kv_list, source_kv_version, source_mount)
logging.debug(f"Secrets read: {secrets_list}")
return secrets_list


def migrate_secrets(src_client, dest_client, src_path, source_mount, dest_mount, dest_path='', kv_version=1):
kv_list = read_recursive(src_client, src_path, kv_version, source_mount)
write_secrets_from_list(dest_client, kv_list, dest_path, src_path, kv_version, dest_mount)
print("Secrets copied: ", len(kv_list))

def migrate_secrets(src_client, dest_client, src_path, source_mount, dest_mount, dest_path='', source_kv_version=1, destination_kv_version=1):
logging.info(f"Copying secrets from {src_path} to {dest_path} (source_mount: {source_mount}, dest_mount: {dest_mount})")
kv_list = read_recursive(src_client, src_path, source_kv_version, source_mount)
write_secrets_from_list(dest_client, kv_list, dest_path, src_path, destination_kv_version, dest_mount)
logging.info(f"Copied {len(kv_list)} secrets successfully.")

# Construction Methods

def recursive_path_builder(client, kv_list, kv_version, source_mount):
logging.debug(f"Building recursive path for kv_version: {kv_version}, mount: {source_mount}")
change = 0
# if any list items end in '/' return 1
for li in kv_list[:]:
if li[-1] == '/':
append_list = list_path(client, li, kv_version, source_mount)
for new_item in append_list:
kv_list.append(li + new_item)
# remove list item ending in '/'
kv_list.remove(li)
change = 1
# new list items added, rerun search
if change == 1:
recursive_path_builder(client, kv_list, kv_version, source_mount)

return kv_list


def list_path(client, path, kv_version, source_mount):
logging.debug(f"Listing path: {path}, kv_version: {kv_version}, mount: {source_mount}")
if kv_version == 2:
return client.secrets.kv.v2.list_secrets(path, mount_point=source_mount)['data']['keys']
elif kv_version == 1:
return client.secrets.kv.v1.list_secrets(path, mount_point=source_mount)['data']['keys']


def read_secrets_from_list(client, kv_list, kv_version, source_mount):
def read_secrets_from_list(client, kv_list, source_kv_version, source_mount):
logging.debug(f"Reading secrets from list for kv_version: {source_kv_version}, mount: {source_mount}")
for i, li in enumerate(kv_list[:]):
k = kv_list[i]
if kv_version == 2:
v = client.secrets.kv.v2.read_secret_version(k, mount_point=source_mount,
raise_on_deleted_version=True)['data']['data']
elif kv_version == 1:
if source_kv_version == 2:
v = client.secrets.kv.v2.read_secret_version(k, mount_point=source_mount, raise_on_deleted_version=True)['data']['data']
elif source_kv_version == 1:
v = client.secrets.kv.v1.read_secret(k, mount_point=source_mount)['data']
kv_list[i] = {k: v}

return kv_list


def write_secrets_from_list(client, kv_list, dest_path, src_path, kv_version, dest_mount):
def write_secrets_from_list(client, kv_list, dest_path, src_path, destination_kv_version, dest_mount):
logging.debug(f"Writing secrets to destination: {dest_path}, kv_version: {destination_kv_version}, mount: {dest_mount}")
for li in kv_list:
sname = list(li)[0]
short_name = sname.replace(src_path, '')

if kv_version == 2:
if destination_kv_version == 2:
client.secrets.kv.v2.create_or_update_secret(
path=(dest_path + short_name),
secret=li[sname],
mount_point=dest_mount
)
elif kv_version == 1:
path=(dest_path + short_name),
secret=li[sname],
mount_point=dest_mount
)
elif destination_kv_version == 1:
client.secrets.kv.v1.create_or_update_secret(
path=(dest_path + short_name),
secret=li[sname],
mount_point=dest_mount
)
logging.info(f"Secrets written to destination: {len(kv_list)} items.")


# this expects the secret to be in the json blob - need to fix
def delete_secrets_from_list(client, kv_list, kv_version, source_mount):
logging.debug(f"Deleting secrets from list for kv_version: {kv_version}, mount: {source_mount}")
for li in kv_list:
if kv_version == 2:
client.secrets.kv.v2.delete_metadata_and_all_versions(path=li, mount_point=source_mount)
if kv_version == 1:
elif kv_version == 1:
client.secrets.kv.v1.delete_secret(path=li, mount_point=source_mount)


def ensure_trailing_slash(s):
if s != '':
if s[-1] != '/':
s += '/'
return s


def main():
pass
# example run vars
# path = 'drew/' #must end in /
# path2 = 'nested/'
# client = hvac.Client(url='https://127.0.0.1:8200', token='<redacted>', verify=False, namespace="ns1")
# client2 = hvac.Client(url='https://127.0.0.1:8200', token='<redacted>', verify=False, namespace="kv1")

# #client = hvac.Client(url='https://vault.example.com', token='abc123', verify=False,
# namespace='namespace/child_namespace/sub_child_namespace')
# migrate_secrets(client, client2, path, path2, kv_version=1)


if __name__ == '__main__':

parser = argparse.ArgumentParser(description='Recursively interact with Hashicorp Vault KV mount')
parser.add_argument('action', choices=['copy', 'move', 'delete', 'list', 'read',
'count'], default='list', metavar='ACTION')
parser.add_argument('action', choices=['copy', 'move', 'delete', 'list', 'read', 'count'], default='list', metavar='ACTION')
parser.add_argument('--tls-skip-verify', action='store_false')
parser.add_argument('--source-path', '-s', default='')
parser.add_argument('--source-url', '-su', required=True)
Expand All @@ -137,11 +129,12 @@ def main():
parser.add_argument('--destination-url', '-du')
parser.add_argument('--destination-token', '-dt')
parser.add_argument('--destination-namespace', '-dns', default='')
parser.add_argument('--kv-version', '-kvv', type=int, default=2, choices=[1, 2])
parser.add_argument('--source-kv-version', '-skv', type=int, default=2, choices=[1, 2])
parser.add_argument('--destination-kv-version', '-dkv', type=int, default=2, choices=[1, 2])
parser.add_argument('--destination-mount', '-dm', default='kv-v2')

args = parser.parse_args()

logging.debug("Script started with arguments: %s", args)

if not args.destination_path:
args.destination_path = args.source_path
if not args.destination_url:
Expand Down Expand Up @@ -178,16 +171,17 @@ def main():
args.source_mount,
args.destination_mount,
args.destination_path,
kv_version=args.kv_version
)
args.source_kv_version,
args.destination_kv_version
)
elif args.action == 'list':
print(list_recursive(source_client, args.source_path, args.kv_version, args.source_mount))
print(list_recursive(source_client, args.source_path, args.source_kv_version, args.source_mount))
elif args.action == 'count':
print(len(list_recursive(source_client, args.source_path, args.kv_version, args.source_mount)))
print(len(list_recursive(source_client, args.source_path, args.source_kv_version, args.source_mount)))
elif args.action == 'read':
print(read_recursive(source_client, args.source_path, args.kv_version, args.source_mount))
print(read_recursive(source_client, args.source_path, args.source_kv_version, args.source_mount))
elif args.action == 'delete':
delete_recursive(source_client, args.source_path, args.kv_version, args.source_mount)
delete_recursive(source_client, args.source_path, args.source_kv_version, args.source_mount)
elif args.action == 'move':
migrate_secrets(
source_client,
Expand All @@ -196,6 +190,7 @@ def main():
args.source_mount,
args.destination_mount,
args.destination_path,
kv_version=args.kv_version
args.source_kv_version,
args.destination_kv_version
)
delete_recursive(source_client, args.source_path, args.kv_version, args.source_mount)

0 comments on commit 8ebe62d

Please sign in to comment.