Skip to content

Commit

Permalink
f-string-ify
Browse files Browse the repository at this point in the history
Mostly-ish automated with https://github.com/ikamensh/flynt
  • Loading branch information
dlenski committed Apr 10, 2024
1 parent b0140e9 commit 36b706c
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 68 deletions.
96 changes: 48 additions & 48 deletions vpn_slice/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def get_default_providers():
)
else:
return dict(
platform = OSError('Your platform, {}, is unsupported'.format(platform))
platform = OSError(f'Your platform, {platform}, is unsupported')
)


Expand Down Expand Up @@ -118,19 +118,19 @@ def do_disconnect(env, args):
try:
pid = int(open(pidfile).read())
except (OSError, ValueError):
print("WARNING: could not read pid from %s" % pidfile, file=stderr)
print(f"WARNING: could not read pid from {pidfile}", file=stderr)
else:
try: providers.process.kill(pid)
except OSError as e:
print("WARNING: could not kill pid %d from %s: %s" % (pid, pidfile, str(e)), file=stderr)
print(f"WARNING: could not kill pid {pid} from {pidfile}: {str(e)}", file=stderr)
else:
if args.verbose:
print("Killed pid %d from %s" % (pid, pidfile), file=stderr)
print(f"Killed pid {pid} from {pidfile}", file=stderr)

if 'hosts' in providers:
removed = providers.hosts.write_hosts({}, args.name)
if args.verbose:
print("Removed %d hosts from /etc/hosts" % removed, file=stderr)
print(f"Removed {removed} hosts from /etc/hosts", file=stderr)

# delete all explicitly created routes:
# 1. Explicit route to gateway (won't get deleted automatically because
Expand All @@ -144,7 +144,7 @@ def do_disconnect(env, args):
try:
providers.route.remove_route(dest)
except CalledProcessError:
print("WARNING: could not delete %s (%s)" % (desc, dest), file=stderr)
print(f"WARNING: could not delete {desc} ({dest})", file=stderr)
else:
providers.route.flush_cache()

Expand All @@ -153,7 +153,7 @@ def do_disconnect(env, args):
try:
providers.firewall.deconfigure_firewall(env.tundev)
except CalledProcessError:
print("WARNING: failed to deconfigure firewall for VPN interface (%s)" % env.tundev, file=stderr)
print(f"WARNING: failed to deconfigure firewall for VPN interface ({env.tundev})", file=stderr)

if args.vpn_domains is not None:
try:
Expand All @@ -178,7 +178,7 @@ def do_connect(env, args):
if args.verbose > 1:
print("Set explicit route to VPN gateway %s (%s)" % (env.gateway, ', '.join('%s %s' % kv for kv in gwr.items())), file=stderr)
else:
print("WARNING: no route to VPN gateway found %s; cannot set explicit route to it." % env.gateway)
print(f"WARNING: no route to VPN gateway found {env.gateway}; cannot set explicit route to it.")

# drop incoming traffic from VPN
if not args.incoming:
Expand All @@ -205,10 +205,10 @@ def do_connect(env, args):
if dev_mtu:
mtu = int(dev_mtu) - 88
if mtu:
print("WARNING: guessing MTU is %d (the MTU of %s - 88)" % (mtu, dev), file=stderr)
print(f"WARNING: guessing MTU is {mtu} (the MTU of {dev} - 88)", file=stderr)
else:
mtu = 1412
print("WARNING: guessing default MTU of %d (couldn't determine MTU of %s)" % (mtu, dev), file=stderr)
print(f"WARNING: guessing default MTU of {mtu} (couldn't determine MTU of {dev})", file=stderr)
providers.route.set_link_info(env.tundev, state='up', mtu=mtu)

# set IPv4, IPv6 addresses for tunnel device
Expand All @@ -224,18 +224,18 @@ def do_connect(env, args):
if r:
exc_subnets.append((dest, r))
else:
print("WARNING: Ignoring unroutable split-exclude %s" % dest, file=stderr)
print(f"WARNING: Ignoring unroutable split-exclude {dest}", file=stderr)

# set up routes to the DNS and Windows name servers, subnets, and local aliases
ns = env.dns + env.dns6 + (env.nbns if args.nbns else [])
for dest, tag in chain(tagged(ns, "nameserver"), tagged(args.subnets, "subnet"), tagged(args.aliases, "alias")):
if args.verbose > 1:
print("Adding route to %s %s through %s." % (tag, dest, env.tundev), file=stderr)
print(f"Adding route to {tag} {dest} through {env.tundev}.", file=stderr)
providers.route.replace_route(dest, dev=env.tundev)
else:
providers.route.flush_cache()
if args.verbose:
print("Added routes for %d nameservers, %d subnets, %d aliases." % (len(ns), len(args.subnets), len(args.aliases)), file=stderr)
print(f"Added routes for {len(ns)} nameservers, {len(args.subnets)} subnets, {len(args.aliases)} aliases.", file=stderr)

# restore routes to excluded subnets
for dest, exc_route in exc_subnets:
Expand All @@ -245,7 +245,7 @@ def do_connect(env, args):
else:
providers.route.flush_cache()
if args.verbose:
print("Restored routes for %d excluded subnets." % len(exc_subnets), file=stderr)
print(f"Restored routes for {len(exc_subnets)} excluded subnets.", file=stderr)

# Use vpn dns for provided domains
if args.vpn_domains is not None:
Expand All @@ -263,45 +263,45 @@ def do_post_connect(env, args):
host_map = []

if args.ns_hosts:
ns_names = [ (ip, ('dns%d.%s' % (ii, args.name),)) for ii, ip in enumerate(env.dns + env.dns6) ]
ns_names = [ (ip, (f'dns{ii}.{args.name}',)) for ii, ip in enumerate(env.dns + env.dns6) ]
if args.nbns:
ns_names += [ (ip, ('nbns%d.%s' % (ii, args.name),)) for ii, ip in enumerate(env.nbns) ]
ns_names += [ (ip, (f'nbns{ii}.{args.name}',)) for ii, ip in enumerate(env.nbns) ]
host_map += ns_names
if args.verbose:
print("Adding /etc/hosts entries for %d nameservers..." % len(ns_names), file=stderr)
print(f"Adding /etc/hosts entries for {len(ns_names)} nameservers...", file=stderr)
for ip, names in ns_names:
print(" %s = %s" % (ip, ', '.join(map(str, names))), file=stderr)
print(f" {ip} = {', '.join(map(str, names))}", file=stderr)

if args.hosts or args.prevent_idle_timeout or args.kerberos_dc:
providers.dns.configure(dns_servers=(env.dns + env.dns6), search_domains=args.domain, bind_addresses=env.myaddrs)

kdc_hosts = []
if args.kerberos_dc:
if args.verbose:
print("Looking up Kerberos5 DC hosts for realm %r using VPN DNS servers..." % args.kerberos_dc, file=stderr)
print(f"Looking up Kerberos5 DC hosts for realm {args.kerberos_dc!r} using VPN DNS servers...", file=stderr)
try:
kdc_hosts = providers.dns.lookup_srv('_kerberos._tcp.%s' % args.kerberos_dc)
kdc_hosts = providers.dns.lookup_srv(f'_kerberos._tcp.{args.kerberos_dc}')
except Exception as e:
print("WARNING: Lookup for Kerberos5 DC hosts for realm %r on VPN DNS servers failed:\n\t%s" % (args.kerberos_dc, e), file=stderr)
print(f"WARNING: Lookup for Kerberos5 DC hosts for realm {args.kerberos_dc!r} on VPN DNS servers failed:\n\t{e}", file=stderr)
else:
if args.verbose:
print("Got %d Kerberos5 DC hosts." % len(kdc_hosts), file=stderr)
print(f"Got {len(kdc_hosts)} Kerberos5 DC hosts.", file=stderr)

hosts_to_lookup = list(chain(tagged(args.hosts, 'host'), tagged(kdc_hosts, 'kdc')))
if hosts_to_lookup:
if args.verbose:
print("Looking up %d hosts using VPN DNS servers..." % len(hosts_to_lookup), file=stderr)
print(f"Looking up {len(hosts_to_lookup)} hosts using VPN DNS servers...", file=stderr)
for host, why in hosts_to_lookup:
try:
ips = providers.dns.lookup_host(host)
except Exception as e:
print("WARNING: Lookup for %s on VPN DNS servers failed:\n\t%s" % (host, e), file=stderr)
print(f"WARNING: Lookup for {host} on VPN DNS servers failed:\n\t{e}", file=stderr)
else:
if ips is None:
print("WARNING: Lookup for %s on VPN DNS servers returned nothing." % host, file=stderr)
print(f"WARNING: Lookup for {host} on VPN DNS servers returned nothing.", file=stderr)
else:
if args.verbose:
print(" %s = %s" % (host, ', '.join(map(str, ips))), file=stderr)
print(f" {host} = {', '.join(map(str, ips))}", file=stderr)
ip_routes.update(ips)
if why == 'kdc':
host_map.extend((ip, [host]) for ip in ips)
Expand All @@ -315,35 +315,35 @@ def do_post_connect(env, args):
if host_map:
providers.hosts.write_hosts(host_map, args.name)
if args.verbose:
print("Added hostnames and aliases for %d addresses to /etc/hosts." % len(host_map), file=stderr)
print(f"Added hostnames and aliases for {len(host_map)} addresses to /etc/hosts.", file=stderr)

# add routes to hosts
for ip in ip_routes:
if args.verbose > 1:
print("Adding route to %s (for named hosts) through %s." % (ip, env.tundev), file=stderr)
print(f"Adding route to {ip} (for named hosts) through {env.tundev}.", file=stderr)
providers.route.replace_route(ip, dev=env.tundev)
else:
providers.route.flush_cache()
if args.verbose:
print("Added %d routes for named hosts." % len(ip_routes), file=stderr)
print(f"Added {len(ip_routes)} routes for named hosts.", file=stderr)

# run DNS queries in background to prevent idle timeout
if args.prevent_idle_timeout:
dns = env.dns + env.dns6
idle_timeout = env.idle_timeout
setproctitle('vpn-slice --prevent-idle-timeout --name %s' % args.name)
setproctitle(f'vpn-slice --prevent-idle-timeout --name {args.name}')
if args.verbose:
print("Continuing in background as PID %d, attempting to prevent idle timeout every %d seconds." % (providers.process.pid(), idle_timeout))
print(f"Continuing in background as PID {providers.process.pid()}, attempting to prevent idle timeout every {idle_timeout} seconds.")

while True:
delay = randint(2 * idle_timeout // 3, 9 * idle_timeout // 10)
if args.verbose > 1:
print("Sleeping %d seconds until we issue a DNS query to prevent idle timeout..." % delay, file=stderr)
print(f"Sleeping {delay} seconds until we issue a DNS query to prevent idle timeout...", file=stderr)
sleep(delay)

# FIXME: netlink(7) may be a much better way to poll here
if not providers.process.is_alive(args.ppid):
print("Caller (PID %d) has terminated; idle preventer exiting." % args.ppid, file=stderr)
print(f"Caller (PID {args.ppid}) has terminated; idle preventer exiting.", file=stderr)
break

# pick random host or IP to look up without leaking any new information
Expand All @@ -353,11 +353,11 @@ def do_post_connect(env, args):
dummy = choice(pool)
shuffle(dns)
if args.verbose > 1:
print("Issuing DNS lookup of %s to prevent idle timeout..." % dummy, file=stderr)
print(f"Issuing DNS lookup of {dummy} to prevent idle timeout...", file=stderr)
providers.dns.lookup_host(dummy, keep_going=False)

elif args.verbose:
print("Connection setup done, child process %d exiting." % providers.process.pid())
print(f"Connection setup done, child process {providers.process.pid()} exiting.")

########################################

Expand Down Expand Up @@ -397,7 +397,7 @@ def parse_env(environ=os.environ):
if envar in environ:
try: val = maker(environ[envar])
except Exception as e:
print('Exception while setting %s from environment variable %s=%r' % (var, envar, environ[envar]), file=stderr)
print(f'Exception while setting {var} from environment variable {envar}={environ[envar]!r}', file=stderr)
raise
elif default: val, = default
else: val = None
Expand All @@ -408,7 +408,7 @@ def parse_env(environ=os.environ):
orig_netaddr = env.network
env.network = IPv4Network(env.network).supernet(new_prefix=env.netmasklen)
if env.network.network_address != orig_netaddr:
print("WARNING: IPv4 network %s/%d has host bits set, replacing with %s" % (orig_netaddr, env.netmasklen, env.network), file=stderr)
print(f"WARNING: IPv4 network {orig_netaddr}/{env.netmasklen} has host bits set, replacing with {env.network}", file=stderr)
assert env.network.netmask == env.netmask, \
"IPv4 network (INTERNAL_IP4_{{NETADDR,NETMASK}}) {ad}/{nm} does not match INTERNAL_IP4_NETMASKLEN={nml} (implies /{nmi})".format(
ad=orig_netaddr, nm=env.netmask, nml=env.netmasklen, nmi=env.network.netmask)
Expand All @@ -435,9 +435,9 @@ def parse_env(environ=os.environ):
env.splitexc = []
for pfx, n in chain((('INC', n) for n in range(env.nsplitinc)),
(('EXC', n) for n in range(env.nsplitexc))):
ad = IPv4Address(environ['CISCO_SPLIT_%s_%d_ADDR' % (pfx, n)])
nm = IPv4Address(environ['CISCO_SPLIT_%s_%d_MASK' % (pfx, n)])
nml = int(environ['CISCO_SPLIT_%s_%d_MASKLEN' % (pfx, n)])
ad = IPv4Address(environ[f'CISCO_SPLIT_{pfx}_{n}_ADDR'])
nm = IPv4Address(environ[f'CISCO_SPLIT_{pfx}_{n}_MASK'])
nml = int(environ[f'CISCO_SPLIT_{pfx}_{n}_MASKLEN'])
net = IPv4Network(ad).supernet(new_prefix=nml)
if net.network_address != ad:
print("WARNING: IPv4 split network (CISCO_SPLIT_%s_%d_{ADDR,MASK}) %s/%d has host bits set, replacing with %s" % (pfx, n, ad, nml, net), file=stderr)
Expand All @@ -448,8 +448,8 @@ def parse_env(environ=os.environ):

for pfx, n in chain((('INC', n) for n in range(env.nsplitinc6)),
(('EXC', n) for n in range(env.nsplitexc6))):
ad = IPv6Address(environ['CISCO_IPV6_SPLIT_%s_%d_ADDR' % (pfx, n)])
nml = int(environ['CISCO_IPV6_SPLIT_%s_%d_MASKLEN' % (pfx, n)])
ad = IPv6Address(environ[f'CISCO_IPV6_SPLIT_{pfx}_{n}_ADDR'])
nml = int(environ[f'CISCO_IPV6_SPLIT_{pfx}_{n}_MASKLEN'])
net = IPv6Network(ad).supernet(new_prefix=nml)
if net.network_address != ad:
print("WARNING: IPv6 split network (CISCO_IPV6_SPLIT_%s_%d_{ADDR,MASKLEN}) %s/%d has host bits set, replacing with %s" % (pfx, n, ad, nml, net), file=stderr)
Expand Down Expand Up @@ -552,7 +552,7 @@ def main(args=None, environ=os.environ):
raise pv
providers[pn] = pv()
except Exception as e:
print("WARNING: Couldn't configure {} provider: {}".format(pn, e), file=stderr)
print(f"WARNING: Couldn't configure {pn} provider: {e}", file=stderr)

# Fail if necessary providers are missing
required = {'route', 'process'}
Expand All @@ -572,7 +572,7 @@ def main(args=None, environ=os.environ):
required.add('dns')
missing_required = {p for p in required if p not in providers}
if missing_required:
raise RuntimeError("Aborting because providers for %s are required; use --help for more information" % ' '.join(missing_required))
raise RuntimeError(f"Aborting because providers for {' '.join(missing_required)} are required; use --help for more information")

# Finalize arguments that depend on providers
finalize_args_and_env(args, env)
Expand All @@ -592,12 +592,12 @@ def main(args=None, environ=os.environ):
raise SystemExit()

if env.myaddr6 or env.netmask6:
print('WARNING: IPv6 address or netmask set. Support for IPv6 in %s should be considered BETA-QUALITY.' % p.prog, file=stderr)
print(f'WARNING: IPv6 address or netmask set. Support for IPv6 in {p.prog} should be considered BETA-QUALITY.', file=stderr)
if args.dump:
exe = providers.process.pid2exe(args.ppid)
caller = '%s (PID %d)' % (exe, args.ppid) if exe else 'PID %d' % args.ppid
caller = f'{exe} (PID {args.ppid})' if exe else f'PID {args.ppid}'

print('Called by %s with environment variables for vpnc-script:' % caller, file=stderr)
print(f'Called by {caller} with environment variables for vpnc-script:', file=stderr)
width = max((len(envar) for var, envar, *rest in vpncenv if envar in environ), default=0)
for var, envar, *rest in vpncenv:
if envar in environ:
Expand Down Expand Up @@ -640,7 +640,7 @@ def main(args=None, environ=os.environ):
# https://github.com/dlenski/vpn-slice/pull/14#issuecomment-488129621

if args.verbose:
print('WARNING: %s ignores reason=%s' % (p.prog, env.reason.name), file=stderr)
print(f'WARNING: {p.prog} ignores reason={env.reason.name}', file=stderr)
elif env.reason == reasons.connect:
do_connect(env, args)

Expand Down
4 changes: 2 additions & 2 deletions vpn_slice/freebsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
class ProcfsProvider(PosixProcessProvider):
def pid2exe(self, pid):
try:
return os.readlink('/proc/%d/file' % pid)
return os.readlink(f'/proc/{pid}/file')
except OSError:
return None

def ppid_of(self, pid=None):
if pid is None:
return os.getppid()
try:
return int(next(open('/proc/%d/status' % pid)).split()[3])
return int(next(open(f'/proc/{pid}/status')).split()[3])
except (OSError, ValueError):
return None
4 changes: 2 additions & 2 deletions vpn_slice/linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
class ProcfsProvider(PosixProcessProvider):
def pid2exe(self, pid):
try:
return os.readlink('/proc/%d/exe' % pid)
return os.readlink(f'/proc/{pid}/exe')
except OSError:
return None

def ppid_of(self, pid=None):
if pid is None:
return os.getppid()
try:
return int(next(open('/proc/%d/stat' % pid)).split()[3])
return int(next(open(f'/proc/{pid}/stat')).split()[3])
except (OSError, ValueError):
return None

Expand Down
12 changes: 6 additions & 6 deletions vpn_slice/mac.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ def configure_domain_vpn_dns(self, domains, nameservers):
if not os.path.exists('/etc/resolver'):
os.makedirs('/etc/resolver')
for domain in domains:
resolver_file_name = "/etc/resolver/{0}".format(domain)
resolver_file_name = f"/etc/resolver/{domain}"
with open(resolver_file_name, "w") as resolver_file:
for nameserver in nameservers:
resolver_file.write("nameserver {}\n".format(nameserver))
resolver_file.write(f"nameserver {nameserver}\n")

def deconfigure_domain_vpn_dns(self, domains, nameservers):
for domain in domains:
resolver_file_name = "/etc/resolver/{0}".format(domain)
resolver_file_name = f"/etc/resolver/{domain}"
if os.path.exists(resolver_file_name):
os.remove(resolver_file_name)
if not len(os.listdir('/etc/resolver')):
Expand Down Expand Up @@ -167,10 +167,10 @@ def configure_firewall(self, device):
if not enable_token:
print("WARNING: failed to get pf enable reference token, packet filter might not shutdown correctly")

anchor = '{}/{}'.format(self._PF_ANCHOR, device)
anchor = f'{self._PF_ANCHOR}/{device}'
# add anchor to generate rules with
with open(self._PF_CONF_FILE, 'a') as file:
file.write('anchor "{}" # vpn-slice-{} AUTOCREATED {}\n'.format(anchor, device, enable_token))
file.write(f'anchor "{anchor}" # vpn-slice-{device} AUTOCREATED {enable_token}\n')

# reload config file
self._reload_conf()
Expand All @@ -191,7 +191,7 @@ def configure_firewall(self, device):

def deconfigure_firewall(self, device):
# disable anchor
anchor = '{}/{}'.format(self._PF_ANCHOR, device)
anchor = f'{self._PF_ANCHOR}/{device}'
subprocess.check_call([self.pfctl, '-a', anchor, '-F', 'all'])

with open(self._PF_CONF_FILE, 'r') as file:
Expand Down
Loading

0 comments on commit 36b706c

Please sign in to comment.