-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsnowflake_manager.py
196 lines (168 loc) · 7.6 KB
/
snowflake_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import logging
import os
from collections import OrderedDict
from logging import getLogger
from urllib.parse import quote_plus
import streamlit as st
from sqlalchemy import create_engine, text
from sqlalchemy.dialects import registry
from sqlalchemy.exc import DatabaseError, InterfaceError
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
sql_sections = {
"snowflake_setup": "Setting up the dbt User an Roles",
"snowflake_import": "Importing Raw Tables",
"snowflake_reporter": "Creating Reporter Role",
}
def get_snowflake_connection(account, username, password):
# URL encode the username and password to handle special characters
encoded_username = quote_plus(username)
encoded_password = quote_plus(password)
engine = create_engine(
f"snowflake://{encoded_username}:{encoded_password}@{account}/AIRBNB/DEV?warehouse=COMPUTE_WH&role=ACCOUNTADMIN&account_identifier={account}"
)
connection = engine.connect()
return connection
def streamlit_session_id():
from streamlit.runtime import get_instance
from streamlit.runtime.scriptrunner import get_script_run_ctx
runtime = get_instance()
session_id = get_script_run_ctx().session_id
session_info = runtime._session_mgr.get_session_info(session_id)
if session_info is None:
return "nosession"
return session_info.session.id
def get_sql_commands(md):
commands = OrderedDict()
current_section = None
in_named_sql = False
commands_only = ""
for l in md.split("\n"):
if in_named_sql:
if l.startswith("```"):
in_named_sql = False
else:
if l.strip() == "" or l.startswith("--"):
continue
# add command to current section
if current_section not in commands:
commands[current_section] = ""
commands[current_section] += l + "\n"
elif l.startswith("```sql {#"):
in_named_sql = True
current_section = l.split("{#")[1].split("}")[0]
return {
k: [c.strip("\n") for c in v.split(";") if c.strip() != ""]
for k, v in commands.items()
}
hello_msg = """
# dbt Zero to Hero Snowflake Importer (beta 2)
Hi,
this webapp helps you set up your Snowflake account
and import the course resources, such as raw tables and user roles
into your Snowflake account. Simply add the snowflake hostname, username and password
and then click start setup.
**This app will do the following**:
* Create the `dbt` user using the password `dbtPassword123`
* Import the raw AirBnB tables
* Create the REPORTER role and grant it to the `dbt` user. _We'll use this role later in the course when we build a dashboard.
Please keep in mind that this is a beta version implemented in late October 2024 and it may have some rought edges.
I'd be very happy if you could provide feedback on wether the tool works and how to improve it. Just send me a message on Udemy.
"""
logging.root.setLevel(logging.INFO)
logger = getLogger(__name__)
logger.Formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
def main():
session_id = streamlit_session_id()
logger.info("Starting Streamlit app")
with open(CURRENT_DIR + "/course-resources.md", "r") as file:
md = file.read().rstrip()
sql_commands = get_sql_commands(md)
print(sql_commands)
registry.register("snowflake", "snowflake.sqlalchemy", "dialect")
pw = (
os.environ.get("SNOWFLAKE_PASSWORD")
if os.environ.get("SNOWFLAKE_PASSWORD")
else ""
)
st.markdown(hello_msg)
hostname = st.text_input(
"Snowflake account (this looks like as `frgcsyo-ie17820` or `frgcsyo-ie17820.aws`, check your snowlake registration email).\n\n_**This is not your Snowflake username**, but the first part of the snowflake url you received in your snowflake registration email_:",
"jdehewj-vmb00970",
)
username = st.text_input(
"Snowflake username (change this is you didn't set it to `admin` at registration):",
"admin",
)
password = st.text_input("Snowflake Password:", pw, type="password")
if st.button("Start Setup"):
if len(password) == 0:
st.error("Please provide a password")
return
try:
with st.status("Connecting to Snowflake"):
connection = get_snowflake_connection(hostname, username, password)
except InterfaceError as e:
st.error(
f"""Error connecting to Snowflake. This usually means that the snowflake account is invalid.
Please verify the snowflake account and try again.\n\nOriginal Error: \n\n{e.orig}"""
)
logging.warning(
f"{session_id}: Error connecting to Snowflake. Account: {hostname}, Username: {username}: {e}"
)
return
except DatabaseError as e:
print(e)
st.error(
f"Error connecting to Snowflake. This usually means that the snowflake username or password you provided is not valid. Please correct them and retry by pressing the Start Setup button.\n\nOriginal Error:\n\n{e.orig}"
)
logging.warning(
f"{session_id}: Error connecting to Snowflake. Account name: {hostname}\n Original Error: {e}"
)
return
except Exception as e:
st.error(f"Error connecting to Snowflake.\n\nOriginal Error:\n\n{e.orig}")
logging.warning(
f"{session_id}: Error connecting to Snowflake. Account name: {hostname}\n Original Error: {e}"
)
return
st.success("Connected to Snowflake successfully!")
try:
for section, commands in sql_commands.items():
with st.status(sql_sections[section]):
for command in commands:
if command.startswith(
"GRANT USAGE ON SCHEMA AIRBNB.DEV TO ROLE REPORTER"
):
command = "GRANT USAGE ON FUTURE SCHEMAS IN DATABASE AIRBNB TO ROLE REPORTER;"
st.write(f"Patching Reporter command: `{command}`")
else:
st.write(f"Executing command: `{command}`")
connection.execute(text(command))
connection.commit()
for table in ["RAW_LISTINGS", "RAW_HOSTS", "RAW_REVIEWS"]:
with st.status(
f"Checking if data was imported for table {table} correctly"
):
result = connection.execute(text(f"SELECT COUNT(*) FROM {table}"))
count = result.fetchone()[0]
st.write(f"Table {table} has {count} rows")
if count == 0:
st.error(
f"Table {table} has no rows. This is unexpected. Please check the logs and try again."
)
return
st.toast("Setup complete! You can now go back to the course!", icon="🔥")
st.success("Setup complete! You can now go back to the course!", icon="🔥")
except Exception as e:
st.error(
f"Error executing command {command}.\n\nOriginal Error:\n\n{e.orig}"
)
logging.warning(
f"{session_id}: Error executing command {command}. Account name: {hostname}\n Original Error: {e}"
)
return
if __name__ == "__main__":
print("Starting Streamlit app")
main()