-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpush_file_to_lz.py
145 lines (125 loc) · 5.47 KB
/
push_file_to_lz.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import os
from dotenv import load_dotenv
import requests
import json
import logging
import utils
import constants
logger = logging.getLogger(__name__)
def push_file_to_lz(
filepath: str,
table_name: str,
):
logger.info(f"pushing file to lz. table_name={table_name}, filepath={filepath}")
if os.getenv("DEBUG__SKIP_PUSH_TO_LZ"):
logger.info("Push to LZ skipped by environment variable DEBUG__SKIP_PUSH_TO_LZ")
else:
access_token = __get_access_token(
os.getenv("APP_ID"), os.getenv("SECRET"), os.getenv("TENANT_ID")
)
__patch_file(access_token, filepath, os.getenv("LZ_URL"), table_name)
# identify if any other parquet files in the dir, and remove them, leaving only the last one we just pushed
__clean_up_old_parquet_files(filepath)
def __clean_up_old_parquet_files(filepath: str):
if os.getenv("DEBUG__SKIP_PARQUET_FILES_CLEAN_UP"):
return
filename_stem = os.path.splitext(os.path.basename(filepath))[0]
filename_ext = os.path.splitext(os.path.basename(filepath))[1]
# do nothing if it's a parquet file with prefix, or it's not a parquet file
if not filename_stem.isnumeric() or filename_ext != ".parquet":
return
logger.debug(f"Cleaning up old parquet files. Current file: {filepath}")
dir = os.path.dirname(filepath)
current_filename = os.path.basename(filepath)
old_parquet_filename_list = [
filename
for filename in os.listdir(dir)
if os.path.splitext(filename)[1] == ".parquet"
and os.path.splitext(filename)[0].isnumeric()
and int(os.path.splitext(filename)[0]) < int(filename_stem)
]
for old_parquet_filename in old_parquet_filename_list:
logger.debug(f"Deleting old parquet file {old_parquet_filename}")
os.remove(os.path.join(dir, old_parquet_filename))
def __get_access_token(app_id, client_secret, directory_id):
"""It will create a access token to access the mail apis"""
app_id = app_id # Application Id - on the azure app overview page
client_secret = client_secret
directory_id = directory_id
token_url = (
"https://login.microsoftonline.com/" + directory_id + "/oauth2/v2.0/token"
)
token_data = {
"grant_type": "client_credentials",
"client_id": app_id,
"client_secret": client_secret,
"scope": "https://storage.azure.com/.default",
}
token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
# logger.debug(token_url)
token_response = requests.post(token_url, data=token_data, headers=token_headers)
token_response_dict = json.loads(token_response.text)
# logger.debug(token_response.text)
token = token_response_dict.get("access_token")
if token == None:
logger.debug("Unable to get access token")
logger.debug(str(token_response_dict))
raise Exception("Error in getting in access token")
else:
# logger.debug("Token is:" + token)
return token
def __patch_file(access_token, file_path, lz_url, table_name):
file_name = os.path.basename(file_path)
base_url = lz_url + table_name + "/"
token_url = base_url + file_name + "?resource=file"
token_headers = {"Authorization": "Bearer " + access_token, "content-length": "0"}
logger.debug("creating file in lake")
# Code to create file in lakehouse
response = requests.put(token_url, data={}, headers=token_headers)
logger.debug(response)
token_url = base_url + file_name + "?position=0&action=append&flush=true"
token_headers = {
"Authorization": "Bearer " + access_token,
"x-ms-file-name": file_name,
}
logger.debug(token_url)
logger.debug("pushing data to file in lake")
# file_path = file_name
# Code to push Data to Lakehouse
with open(file_path, "rb") as file:
file_contents = file.read()
response = requests.patch(token_url, data=file_contents, headers=token_headers)
logger.debug(response)
def get_file_from_lz(table_name, file_name):
logger.info(
f"trying to get file from lz. table_name={table_name}, file_name={file_name}"
)
access_token = __get_access_token(
os.getenv("APP_ID"), os.getenv("SECRET"), os.getenv("TENANT_ID")
)
token_headers = {"Authorization": "Bearer " + access_token, "content-length": "0"}
url = os.getenv("LZ_URL") + table_name + "/" + file_name
response = requests.get(url, headers=token_headers)
response_status_code = response.status_code
if response_status_code != 200:
logger.warning(
f"failed to get file from Landing Zone. Server responded with code {response_status_code}"
)
return None
local_file_path = os.path.join(utils.get_table_dir(table_name), file_name)
with open(local_file_path, "wb") as local_file:
for chunk in response.iter_content():
local_file.write(chunk)
return response_status_code
def delete_file_from_lz(table_name, file_name):
logger.info(
f"trying to delete file from lz. table_name={table_name}, file_name={file_name}"
)
access_token = __get_access_token(
os.getenv("APP_ID"), os.getenv("SECRET"), os.getenv("TENANT_ID")
)
token_headers = {"Authorization": "Bearer " + access_token, "content-length": "0"}
url = os.getenv("LZ_URL") + table_name + "/" + file_name
response = requests.delete(url, headers=token_headers)
logger.debug(f"delete response: {response}")
return response.status_code if response.status_code == 200 else None