Skip to content

Commit

Permalink
add verbose param to upload_columns
Browse files Browse the repository at this point in the history
  • Loading branch information
aspeddro committed Jun 21, 2024
1 parent 19bf958 commit d674f04
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 37 deletions.
14 changes: 11 additions & 3 deletions src/databasers_utils/table_architecture.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,22 @@ def create_sql_files(
def update_dbt_project(self) -> None:
return update_dbt_project(self.dataset_id, dir=os.getcwd())

def upload_columns(self, replace_all_schema: bool = True) -> None:
def upload_columns(
self,
if_column_exists: str = "pass",
replace_all_schema: bool = True,
verbose: bool = False,
) -> None:
backend = Backend(graphql_url=constants.API_URL.value["prod"])
for table_id, url in self.__tables.items():
upload_columns_from_architecture(
dataset_id=self.dataset_id,
table_slug=table_id,
table_id=table_id,
url_architecture=url,
replace_all_schema=replace_all_schema,
backend=backend,
if_column_exists=if_column_exists,
replace_all_schema=replace_all_schema,
verbose=verbose,
)

return None
94 changes: 60 additions & 34 deletions src/databasers_utils/upload_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@


def get_directory_column_id(
directory_column_name: str, directory_table_name: str, backend: b.Backend
directory_column_name: str,
directory_table_name: str,
backend: b.Backend,
verbose: bool = False,
) -> str:
"""
Get the directory id from that column
Expand Down Expand Up @@ -39,23 +42,24 @@ def get_directory_column_id(
)
for _, coluna in df[colunas_de_diretorio].iterrows():
if coluna["table.slug"] == directory_table_name:
print(
f"\nConnecting to the directory column: \n\t"
f"{coluna['table.dataset.fullSlug']}.{coluna['table.slug']}:{coluna['name']} "
)
if verbose:
print(
f"Connecting to the directory column: {coluna['table.dataset.fullSlug']}.{coluna['table.slug']}:{coluna['name']}"
)

return coluna["_id"] # type: ignore

raise (
ValueError(
"\nWARNING - Unable to find the directory column with the following information: "
"\n\tcolumn_name: {directory_column_name} \n\ttable: {directory_table_name}"
f"WARNING - Unable to find the directory column with the following information: Column name: {directory_column_name}. Table: {directory_table_name}"
)
)


def create_column(
backend: b.Backend,
mutation_parameters: Optional[dict[str, str]] = None,
verbose: bool = False,
) -> bool:
## tinha que ser create or replace, por enquanto ele duplica
## os dados se rodar duas vezes por isso atenção na hora de rodar!
Expand All @@ -81,25 +85,32 @@ def create_column(

# Print the mutation parameters for debugging purposes
pretty_json = json.dumps(mutation_parameters, indent=4)
print(pretty_json)

if verbose:
print(pretty_json)

# Execute the GraphQL query with the provided mutation parameters and headers
response = backend._execute_query(
query=mutation,
variables={"input": mutation_parameters},
variables={"input": mutation_parameters}, # type: ignore
headers=headers,
)

# Print the response for debugging purposes
if response["CreateUpdateColumn"]["errors"] != []:
pretty_json = json.dumps(response, indent=4)
print(pretty_json)

if verbose:
print(pretty_json)

return False

return True


def get_column_id(table_id: str, column_name: str, backend: b.Backend):
def get_column_id(
table_id: str, column_name: str, backend: b.Backend
) -> Optional[str]:
query = f"""{{
allColumn(table_Id:"{table_id}", name:"{column_name}"){{
edges{{
Expand All @@ -115,7 +126,7 @@ def get_column_id(table_id: str, column_name: str, backend: b.Backend):
if data:
return data[0]["_id"]
else:
print("column does not exists")
return None


def get_n_columns(table_id, backend: b.Backend):
Expand Down Expand Up @@ -164,21 +175,25 @@ def get_bqtype_dict(backend: b.Backend):


def check_metadata_columns(
dataset_id: str, table_slug: str, backend: b.Backend, url_architecture: str
):
dataset_id: str,
table_slug: str,
backend: b.Backend,
architecture: pd.DataFrame,
) -> None:
# Get the table ID using the dataset ID and table ID
table_id = backend._get_table_id_from_name(
gcp_dataset_id=dataset_id, gcp_table_id=table_slug
)

# Read the architecture table
architecture = read_architecture_table(url=url_architecture)

n_columns_metadata = get_n_columns(table_id=table_id, backend=backend)
n_columns_architecture = architecture.shape[0]

print(f"\nn_columns_metadata: {n_columns_metadata}")
print(f"n_columns_architecture: {n_columns_architecture}")
if n_columns_metadata == n_columns_architecture:
print("Upload done!!. Columns metadata equal to columns architecture!!")
else:
print(
f"Something wrong!!. Number of metadata columns not equal to architecture, {n_columns_metadata} != {n_columns_architecture}"
)


def get_all_columns_id(table_id: str, backend: b.Backend):
Expand Down Expand Up @@ -239,11 +254,12 @@ def delete_all_columns(table_id: str, backend: b.Backend):

def upload_columns_from_architecture(
dataset_id: str,
table_slug: str,
table_id: str,
url_architecture: str,
backend: b.Backend,
if_column_exists: str = "pass",
replace_all_schema: bool = True,
verbose: bool = False,
):
"""
Uploads columns from an architecture table to the specified dataset and table in platform.
Expand All @@ -263,10 +279,9 @@ def upload_columns_from_architecture(
)

# Get the table ID using the dataset ID and table ID
table_id = backend._get_table_id_from_name(
gcp_dataset_id=dataset_id, gcp_table_id=table_slug
table_slug = backend._get_table_id_from_name(
gcp_dataset_id=dataset_id, gcp_table_id=table_id
)
print(f"table_id: {table_id}\n")

# Read the architecture table
architecture = read_architecture_table(url=url_architecture)
Expand All @@ -275,18 +290,27 @@ def upload_columns_from_architecture(
bqtype_dict = get_bqtype_dict(backend)

if replace_all_schema:
delete_all_columns(table_id, backend)
delete_all_columns(table_slug, backend)

# Iterate over each row in the 'architecture' DataFrame
for _, row in architecture.iterrows():
print(f"\nColumn: {row['name']}")
column_name = row["name"]

if verbose:
print(f"\nColumn: {column_name}")

column_id = get_column_id(
table_id=table_id, column_name=row["name"], backend=backend
table_id=table_slug,
column_name=column_name,
backend=backend,
)

if column_id and if_column_exists == "pass":
print("row already exists")
if column_id is None and verbose:
print(f"{column_name} dont exists")

if column_id is not None and if_column_exists == "pass":
if verbose:
print("row already exists")
continue

# Define the mutation parameters for creating a new column
Expand All @@ -297,11 +321,11 @@ def upload_columns_from_architecture(
)
directory_column_name = row["directory_column"].split(":")[1]
directory_column_id = get_directory_column_id(
directory_column_name, directory_table_slug, backend
directory_column_name, directory_table_slug, backend, verbose
)

mutation_parameters = {
"table": table_id,
"table": table_slug,
"bigqueryType": bqtype_dict[row["bigquery_type"].upper()],
"name": row["name"],
"description": row["description"],
Expand All @@ -312,14 +336,16 @@ def upload_columns_from_architecture(
"directoryPrimaryKey": directory_column_id,
}

if column_id:
if column_id is not None:
mutation_parameters["id"] = column_id

create_column(backend, mutation_parameters=mutation_parameters)
create_column(
backend, mutation_parameters=mutation_parameters, verbose=verbose
)

check_metadata_columns(
dataset_id=dataset_id,
table_slug=table_slug,
table_slug=table_id,
backend=backend,
url_architecture=url_architecture,
architecture=architecture,
)

0 comments on commit d674f04

Please sign in to comment.