-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathsn_workflow_listener.py
296 lines (259 loc) · 12 KB
/
sn_workflow_listener.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
"""Performs actions based on ServiceNow workflow inputs."""
import logging as log
import re
import urllib
import handlers.aws_assume_role as aws_assume_role
import handlers.config as config
import handlers.git_handler as git_handler
import handlers.tfe_handler as tfe_handler
import handlers.tfe_run_handler as tfe_run_handler
import handlers.tfe_var_handler as tfe_var_handler
import handlers.tfe_workspace_handler as tfe_workspace_handler
from glom import glom
FORMAT = ("[%(asctime)s][%(levelname)s]" +
"[%(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s")
log.basicConfig(filename='terrasnow_enterprise.log', level=log.INFO,
format=FORMAT)
# TFE workspace
def workspace_event_listener(request):
"""Process workflow request."""
log.debug('workspace event: {} recieved'.format(request))
region = glom(request, 'data.0.region', default=False)
org_name = glom(request, 'data.0.org_name', default=False)
workspace_name = glom(request, 'data.0.workspace_name', default=False)
repo_id = glom(request, 'data.0.repo_id', default=False)
repo_version = glom(request, 'data.0.repo_version', default=False)
action = glom(request, 'data.0.action', default=False)
if action == 'CREATE':
log.debug('Recieved workspace CREATE event.')
return (
tfe_workspace_handler.create_workspace(region=region,
org_name=org_name,
workspace_name=workspace_name,
repo_id=repo_id,
repo_version=repo_version)
)
elif action == 'DELETE':
log.debug('Recieved workspace DELETE event.')
return tfe_workspace_handler.delete_workspace(region, workspace_name)
else:
return 'ERROR: workspace action not specified.'
def response_handler(record):
"""Evaulate response."""
try:
response = record.make_request()
log.debug('Create TFE workspace response: {}'.format(response))
return response
except urllib.error.HTTPError as e:
if e.code == 422:
return "ERROR: Workspace already exists"
else:
return "ERROR"
# TFE runs
def TFE_run_listener(request):
"""Process TFE Run requests."""
project_name = glom(request, 'data.0.project_name', default=False).replace(
'/', '-')
repo_url = glom(request, 'data.0.repo_url', default=False)
branch = glom(request, 'data.0.module_version', default=False)
workspace_id = glom(request, 'data.0.workspace_id', default=False)
region = glom(request, 'data.0.region', default=False)
# get the configuraiton version upload url
upload_url = (
tfe_run_handler.get_upload_url(region, workspace_id))
log.info('configuration version response: {}'.format(upload_url))
# download the source module
git_handler.clone_repo(repo_url, project_name, branch)
# zip the module
# adding extra steps wenbhook side (clone then zip vice just download the
# 'pre-zipped' repo) to avoid having to work with gitlab api tokens
tar_path = git_handler.file_check(
git_handler.compress_project(project_name))
response = tfe_run_handler.upload_configuration_files(upload_url,
tar_path)
git_handler.cleanup(project_name)
if not response:
response = '{"Status": "SUCCESS"}'
return response
# TFE variables
def variables_event_listener(request):
"""Process workflow request."""
region = glom(request, 'data.0.region', default=False)
cat_vars = glom(request, 'data.0.cat_vars.0', default=False)
org_name = glom(request, 'data.0.org_name', default=False)
workspace_id = glom(request, 'data.0.workspace_id', default=False)
role = glom(request, 'data.0.role', default=False)
return populate_tfe_vars(region, cat_vars, org_name, workspace_id, role)
def populate_tfe_vars(region, cat_vars, org_name, workspace_id, role):
"""Create TFE vars and return results."""
result = []
create_tf_vars_response = {}
create_env_vars_response = {}
create_tf_vars_response['tf_vars'] = (
create_tfe_tf_vars(region, cat_vars, org_name, workspace_id))
create_env_vars_response['env_vars'] = (
create_tfe_env_vars(region, role, org_name, workspace_id))
result.append(create_tf_vars_response)
result.append(create_env_vars_response)
log.info('returning result: {}'.format(result))
return result
def remove_prefix(text, prefix):
"""Remove string prefix."""
if text.startswith(prefix):
return text[len(prefix):]
return text
def parse_vars(json_obj):
"""Convert SN variables to terraform variables."""
raw_vars = {}
# 1. remove unnecessary vars (gen_)
for var in json_obj:
if var.startswith('tfv_'):
rm_prefix = remove_prefix(var, 'tfv_')
raw_vars[rm_prefix] = json_obj[var]
return raw_vars
def create_tfe_tf_vars(region, json_obj, org, workspace_id):
"""Populate TFE vars with raw var data."""
configFromS3 = config.ConfigFromS3("tfsh-config", "config.ini",
region)
conf = configFromS3.config
raw_vars = parse_vars(json_obj)
response = {}
if raw_vars:
for var in raw_vars:
var_value = raw_vars[var]
# fix map values
if "[{" in var_value:
var_value = re.sub('\[|\]', '', var_value)
new_var = tfe_var_handler.TFEVars(var, var_value, org=org,
workspace_id=workspace_id,
is_hcl="true"
).get_var()
# check if the variable is a map or list
elif "{" in var_value or "[" in var_value:
new_var = tfe_var_handler.TFEVars(var, var_value, org=org,
workspace_id=workspace_id,
is_hcl="true"
).get_var()
else:
new_var = tfe_var_handler.TFEVars(var, var_value, org=org,
workspace_id=workspace_id
).get_var()
print("var payload: {}".format(new_var))
api_endpoint = "/vars"
record = tfe_handler.TFERequest(api_endpoint, new_var, conf)
var_response = record.make_request()
log.debug('Create tfe_var response: {}'.format(response))
response[var] = var_response
return response
else:
log.error("Input json object was empty.")
return "ERROR: vars json object cannot be empty"
# AWS assume role
def assume_role_listener(request):
"""Process assume role request."""
log.debug('Recieved assume role request: {}'.format(request))
region = glom(request, 'data.0.region', default=False)
org_name = glom(request, 'data.0.org_name', default=False)
workspace_id = glom(request, 'data.0.workspace_id', default=False)
role = glom(request, 'data.0.assume_role.0.role', default=False)
duration = 900
temp_creds = aws_assume_role.get_assumed_role_credentials(role, duration)
aws_access_key_id = glom(temp_creds, 'aws_access_key_id', default=False)
if aws_access_key_id:
return create_cred_env_vars(region, workspace_id, org_name,
temp_creds)
else:
log.error("Assume role request failed: {}".format(temp_creds))
return "ERROR: Assume role request failed"
def create_tfe_env_vars(region, role, org_name, workspace_id):
"""Create TFE envirornment variables."""
log.debug('Creating tfe env vars.')
duration = 7200
temp_creds = aws_assume_role.get_assumed_role_credentials(role, duration)
aws_access_key_id = glom(temp_creds, 'aws_access_key_id', default=False)
if aws_access_key_id:
return create_cred_env_vars(region, workspace_id, org_name,
temp_creds)
else:
log.error("Assume role request failed: {}".format(temp_creds))
return "ERROR: Assume role request failed"
def create_cred_env_vars(region, workspace_id, org, temp_creds):
"""Create TFE Credential Environment Variables."""
# pull the configuration
configFromS3 = config.ConfigFromS3("tfsh-config", "config.ini",
region)
conf = configFromS3.config
# get properly formatted json objs for variable creation
access_key_id = create_access_key_id_var(
glom(temp_creds, 'aws_access_key_id', default=False),
workspace_id, org)
secret_access_key = create_secret_access_key_id_var(
glom(temp_creds, 'aws_secret_access_key', default=False),
workspace_id, org)
aws_session_token = create_session_token_var(
glom(temp_creds, 'aws_session_token', default=False),
workspace_id, org)
tfe_region = create_region_var(region, workspace_id, org)
# check to see if the temp creds exist
if access_key_id and secret_access_key:
api_endpoint = "/vars"
response = {}
# create the access key id tfe env var
record = (
tfe_handler.TFERequest(api_endpoint, access_key_id, conf))
response['access_key_id'] = record.make_request()
# create the secret access key tfe env var
record = (
tfe_handler.TFERequest(api_endpoint, secret_access_key, conf))
response['secret_access_key'] = record.make_request()
# create the region tfe env var
# hardcoding this for now
record = (
tfe_handler.TFERequest(api_endpoint, tfe_region, conf))
response['region'] = record.make_request()
# create the aws session token env var
record = (
tfe_handler.TFERequest(api_endpoint, aws_session_token, conf))
response['aws_session_token'] = record.make_request()
log.debug('Create tfe_var response: {}'.format(response))
return response
else:
log.error("Access Key Id or Secret Access Key not found.")
return "ERROR: Access Key Id or Secret Access Key not found"
def create_access_key_id_var(access_key_id, workspace_id, org):
"""Return env var object."""
if access_key_id:
return tfe_var_handler.TFEVars(key="AWS_ACCESS_KEY_ID",
value=access_key_id,
category='env',
org=org,
workspace_id=workspace_id).get_var()
else:
log.error("Access Key Id not found.")
return False
def create_secret_access_key_id_var(secret_access_key, workspace_id, org):
"""Return env var object."""
if secret_access_key:
return tfe_var_handler.TFEVars(key="AWS_SECRET_ACCESS_KEY",
value=secret_access_key,
category='env',
org=org,
workspace_id=workspace_id,
is_senative=True).get_var()
else:
log.error("Secret Access Key not found.")
return False
def create_region_var(region, workspace_id, org):
"""Return the region env var object."""
return tfe_var_handler.TFEVars(key="AWS_DEFAULT_REGION",
value=region,
category='env',
org=org,
workspace_id=workspace_id).get_var()
def create_session_token_var(aws_session_token, workspace_id, org):
"""Return the session token evn var object."""
return tfe_var_handler.TFEVars(key="AWS_SESSION_TOKEN",
value=aws_session_token,
category='env',
org=org,
workspace_id=workspace_id).get_var()