-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy patharchivebot-jobs
executable file
·288 lines (257 loc) · 13.5 KB
/
archivebot-jobs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
#!/usr/bin/env python3
import argparse
import datetime
import itertools
import json
import math
import os
import re
import sys
import time
import urllib.request
# Column definitions
columns = {
'jobid': (lambda job, pipelines: job["job_data"]["ident"], ()),
'url': (lambda job, pipelines: job["job_data"]["url"], ('truncatable',)),
'user': (lambda job, pipelines: job["job_data"]["started_by"], ()),
'pipenick': (lambda job, pipelines: pipelines[job["job_data"]["pipeline_id"]] if job["job_data"]["pipeline_id"] in pipelines else "unknown", ()),
'queued': (lambda job, pipelines: job["job_data"]["queued_at"], ('date', 'numeric')),
'started': (lambda job, pipelines: job["job_data"]["started_at"], ('date', 'numeric')),
'last active': (lambda job, pipelines: int(job["ts"]), ('date', 'coloured', 'numeric')),
'dl urls': (lambda job, pipelines: job["job_data"]["items_downloaded"], ('numeric',)),
'dl size': (lambda job, pipelines: job["job_data"]["bytes_downloaded"], ('size', 'numeric')),
'queue': (lambda job, pipelines: job["job_data"]["items_queued"] - job["job_data"]["items_downloaded"], ('numeric',)),
'con': (lambda job, pipelines: job["job_data"]["concurrency"], ('numeric',)),
'delay min': (lambda job, pipelines: int(job["job_data"]["delay_min"]), ('hidden', 'numeric')),
'delay max': (lambda job, pipelines: int(job["job_data"]["delay_max"]), ('hidden', 'numeric')),
'delay': (lambda job, pipelines: str(int(job["job_data"]["delay_min"])) + '-' + str(int(job["job_data"]["delay_max"])) if job["job_data"]["delay_min"] != job["job_data"]["delay_max"] else str(int(job["job_data"]["delay_min"])), ()),
}
defaultSort = 'jobid'
# Validate
if any('truncatable' in colDef[1] and any(x in colDef[1] for x in ('date', 'coloured', 'size')) for colDef in columns.values()):
# Truncation code can't handle renderers
raise RuntimeError('Invalid column definitions: cannot combine date/coloured/size with truncatable')
# Filter function
def make_field_filter(column, op, value, caseSensitive = True):
compFunc = {
"=": lambda a, b: a == b,
"<": lambda a, b: a < b,
">": lambda a, b: a > b,
"^": lambda a, b: a.startswith(b),
"*": lambda a, b: b in a,
"$": lambda a, b: a.endswith(b),
"~": lambda a, b: re.search(b, a) is not None,
}[op]
transform = {
True: (lambda x: x),
False: (lambda x: x.lower() if isinstance(x, str) else x)
}[caseSensitive]
return (lambda job: compFunc(transform(job[column]), transform(value)))
# Parse arguments
class FilterAction(argparse.Action):
def __call__(self, parser, namespace, values, optionString = None):
if optionString == '--pyfilter':
try:
func = compile(values[0], '<pyfilter>', 'eval')
except Exception as e:
parser.error(f'Could not compile filter expression: {type(e).__module__}.{type(e).__name__}: {e!s}')
setattr(namespace, self.dest, lambda job: eval(func, {}, {'job': job}))
return
global columns
match = re.match(r"^(?P<column>[A-Za-z ]+)(?P<op>[=<>^*$~])(?P<value>.*)$", values[0])
if not match:
parser.error('Invalid filter')
filterDict = match.groupdict()
filterDict["column"] = filterDict["column"].lower()
assert filterDict["column"] in columns
if 'numeric' in columns[filterDict['column']][1]:
filterDict['value'] = float(filterDict['value'])
if 'date' in columns[filterDict['column']][1] and filterDict['value'] < 0:
filterDict['value'] = time.time() + filterDict['value']
setattr(namespace, self.dest, make_field_filter(filterDict['column'], filterDict['op'], filterDict['value'], caseSensitive = (optionString in ('--filter', '-f'))))
def parse_sort(value):
global columns
sortDesc = value.startswith('-')
if sortDesc:
value = value[1:]
value = value.lower()
if value not in columns:
parser.error('Invalid column name')
return (value, sortDesc)
class SortAction(argparse.Action):
def __call__(self, parser, namespace, values, optionString = None):
result = parse_sort(values[0])
if getattr(namespace, self.dest, None) is None:
setattr(namespace, self.dest, [])
getattr(namespace, self.dest).append(result)
parser = argparse.ArgumentParser(formatter_class = argparse.RawTextHelpFormatter)
parser.add_argument('--filter', '-f', nargs = 1, type = str, action = FilterAction, help = '\n'.join([
'Filter the table for rows where a COLUMN has a certain VALUE. If specified multiple times, only the last value is used.',
'FILTER has the format COLUMN{=|<|>|^|*|$|~}VALUE',
' = means the value must be exactly as specified.',
' < and > mean it must be less/greater than the specified.',
' ^ and $ mean it must start/end with the specified.',
' * means it must contain the specified.',
' ~ means it must match the specified regex.',
]))
parser.add_argument('--ifilter', '-i', nargs = 1, type = str, action = FilterAction, dest = 'filter', help = 'Like --filter but case-insensitive')
parser.add_argument('--pyfilter', nargs = 1, type = str, action = FilterAction, dest = 'filter', help = 'A Python expression for filtering using the local variable `job`')
parser.add_argument('--sort', '-s', nargs = 1, type = str, action = SortAction, help = "Sort the table by a COLUMN (descending if preceded by '-'). This can be used multiple times to refine the sorting.")
parser.add_argument('--mode', choices = ('table', 'dashboard-regex', 'con-d-commands', 'format'), default = 'table', help = '\n'.join([
'Output modes:',
' table: print a table of the matched jobs',
' dashboard-regex: compose a regular expression that can be used on the dashboard to actively watch the jobs matched by the filter',
' con-d-commands: print !con and !d commands for the current settings',
' format: print some output for each job, separated by newlines; this requires the --format option',
]))
parser.add_argument('--no-colours', '--no-colors', action = 'store_true', help = "Don't colourise the last activity column if it's been a while. (Table mode only)")
parser.add_argument('--no-table', action = 'store_true', help = 'Raw output without feeding through column(1); columns are separated by tabs. (Table mode only)')
parser.add_argument('--no-truncate', action = 'store_true', help = 'Disable truncating long values if the terminal width would be exceeded. (Table mode without --no-table only)')
parser.add_argument('--dates', action = 'store_true', help = 'Print dates instead of elapsed times for queued/started/last active columns. (Table mode only)')
parser.add_argument('--replace-concurrency', nargs = 1, metavar = 'CON', type = int, help = 'Replace the delay values with the specified ones. (con-d-commands mode only)')
parser.add_argument('--replace-delay', nargs = 2, metavar = ('MIN', 'MAX'), type = int, help = 'Replace the delay values with the specified ones. (con-d-commands mode only)')
parser.add_argument('--format', help = 'Output format for the format mode; this must be a Python format string and can use any column name in lower-case with spaces replaced by underscores; e.g. "{url} {last_active}". (Format mode only)')
args = parser.parse_args()
if args.mode == 'format' and not args.format:
print('Error: when using format mode, --format is required.', file = sys.stderr)
sys.exit(1)
if not args.sort:
args.sort = [parse_sort(defaultSort)]
if args.mode == 'con-d-commands':
args.mode = 'format'
args.format = '!con {jobid} {con}\n!d {jobid} {delay_min} {delay_max}'
else:
args.replace_concurrency = None
args.replace_delay = None
# Retrieve
def fetch(url):
req = urllib.request.Request(url)
req.add_header('Accept', 'application/json')
with urllib.request.urlopen(req) as f:
if f.getcode() != 200:
raise RuntimeError('Could not fetch job data')
return json.load(f)
jobdata = fetch('http://dashboard.at.ninjawedding.org/logs/recent?count=1')
pipelinedata = fetch('http://dashboard.at.ninjawedding.org/pipelines')
currentTime = time.time()
# Process
pipelines = {p["id"]: p["nickname"] for p in pipelinedata["pipelines"]}
jobs = []
for job in jobdata:
jobs.append({column: columnFunc(job, pipelines) for column, (columnFunc, _) in columns.items()})
if not jobs:
# Nothing to do
sys.exit(0)
# Filter
if args.filter:
jobs = [job for job in jobs if args.filter(job)]
if not jobs:
sys.exit(0)
# Sort
class reversor: # https://stackoverflow.com/a/56842689
def __init__(self, obj):
self.obj = obj
def __eq__(self, other):
return other.obj == self.obj
def __lt__(self, other):
return other.obj < self.obj
sortColumns = tuple((column, descending, columns[column]) for column, descending in args.sort)
if not args.dates:
# Reverse sorting order for columns which have a date attribute since the column will have elapsed time
sortColumns = tuple((column, not descending if 'date' in columnInfo[1] else descending, columnInfo) for column, descending, columnInfo in sortColumns)
jobs = sorted(jobs, key = lambda job: tuple(job[column] if not descending else reversor(job[column]) for column, descending, _ in sortColumns))
# Concurrency and delay overrides if specified and relevant
if args.replace_concurrency is not None or args.replace_delay is not None:
for job in jobs:
if args.replace_concurrency is not None:
job['con'] = args.replace_concurrency[0]
if args.replace_delay is not None:
job['delay min'] = args.replace_delay[0]
job['delay max'] = args.replace_delay[1]
# Non-table output modes
if args.mode == 'dashboard-regex':
print('^(' + '|'.join(re.escape(job['url']) for job in jobs) + ')$')
sys.exit(0)
elif args.mode == 'format':
for job in jobs:
print(args.format.format(**{key.replace(' ', '_'): value for key, value in job.items()}))
sys.exit(0)
# Renderers
def render_date(ts, coloured = False):
global args, currentTime
diff = currentTime - ts
colourStr = f"\x1b[{0 if diff < 6 * 3600 else 7};31m" if coloured and diff >= 300 else ""
colourEndStr = "\x1b[0m" if colourStr else ""
if args.dates:
return (colourStr, datetime.datetime.fromtimestamp(ts).isoformat(sep = " "), colourEndStr)
if diff <= 0:
return "now"
elif diff < 60:
return "<1 min ago"
elif diff < 86400:
return (colourStr, (f"{diff // 3600:.0f}h " if diff >= 3600 else "") + f"{(diff % 3600) // 60:.0f}mn ago", colourEndStr)
else:
return (colourStr, f"{diff // 86400:.0f}d {(diff % 86400) // 3600:.0f}h ago", colourEndStr)
def render_size(size):
units = ('B', 'KiB', 'MiB', 'GiB', 'TiB')
unitIdx = min(int(math.log(size, 1024)), len(units) - 1) if size >= 1 else 0
if unitIdx == 0:
return f'{size} B' # No decimal places
return f'{size / 1024 ** unitIdx:.1f} {units[unitIdx]}'
renderers = {}
for column, (_, columnAttr) in columns.items():
if "date" in columnAttr:
if "coloured" in columnAttr:
renderers[column] = lambda x: render_date(x, coloured = not args.no_colours)
else:
renderers[column] = render_date
elif "size" in columnAttr:
renderers[column] = render_size
elif isinstance(jobs[0][column], (int, float)):
renderers[column] = str
for job in jobs:
for column in renderers:
job[column] = renderers[column](job[column])
# Truncate if applicable
printableColumns = {column: colDef for column, colDef in columns.items() if 'hidden' not in colDef[1]}
if not args.no_table and not args.no_truncate:
widthsD = {column: max(itertools.chain((len(column),), (len(job[column]) if isinstance(job[column], str) else len(job[column][1]) for job in jobs))) for column in printableColumns}
minWidthsD = {column: len(column) for column in printableColumns}
try:
termWidth = os.get_terminal_size().columns
except OSError as e:
if e.errno == 25:
# Inappropriate ioctl for device (stdout not a terminal, happens e.g. when redirecting or piping)
# Silently ignore this and don't truncate
termWidth = float('Inf')
else:
raise
overage = sum(x + 2 for x in widthsD.values()) - 2 - termWidth
if overage > 0:
if sum((widthsD[column] if 'truncatable' not in colDef[1] else minWidthsD[column]) + 2 for column, colDef in printableColumns.items()) - 2 > termWidth:
# Even truncating all truncatable columns to the minimum width is not sufficient, i.e. can't match this terminal width. Print a warning and proceed normally
print('Sorry, cannot truncate columns to terminal width', file = sys.stderr)
else:
# Distribute overage to truncatable columns proportionally to each column's length over the minimum
truncatableColumns = {column: colDef for column, colDef in columns.items() if 'truncatable' in colDef[1]}
totalOverMin = sum(widthsD[column] - minWidthsD[column] for column in truncatableColumns)
trWidthsD = {column: math.floor(widthsD[column] - (widthsD[column] - minWidthsD[column]) / totalOverMin * overage) for column in truncatableColumns}
if sum(widthsD[column] - trWidthsD[column] for column in truncatableColumns) - overage == 1:
# Truncated one more character than necessary due to the flooring; add it again to the shortest column
trWidthsD[min(trWidthsD, key = trWidthsD.get)] += 1
for job in jobs:
for column in truncatableColumns:
if len(job[column]) > trWidthsD[column]:
job[column] = job[column][:trWidthsD[column] - 1] + '…'
# Print
output = []
output.append(tuple(column.upper() for column in columns if "hidden" not in columns[column][1]))
for job in jobs:
output.append(tuple(job[column] for column in columns if "hidden" not in columns[column][1]))
if not args.no_table:
widths = tuple(max(len(field) if isinstance(field, str) else len(field[1]) for field in column) for column in zip(*output))
for row in output:
print(' '.join((value.ljust(width) if isinstance(value, str) else ''.join((value[0], value[1], value[2], ' ' * (width - len(value[1]))))) for value, width in zip(row, widths)))
else:
for row in output:
print('\t'.join(field if isinstance(field, str) else ''.join(field) for field in row))