Skip to content

Commit

Permalink
Postgres Destintion Schema
Browse files Browse the repository at this point in the history
  • Loading branch information
bh2smith committed Dec 3, 2024
1 parent 2f965c2 commit d13edfe
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 24 deletions.
4 changes: 3 additions & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
[MASTER]
disable=fixme,too-few-public-methods
disable=fixme,too-few-public-methods,too-many-positional-arguments
# Maximum number of arguments for function / method
max-args=6
2 changes: 1 addition & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,5 @@ jobs:
poll_frequency: 5
destination:
ref: PG
table_name: app_data
table_name: cow.solvers
if_exists: replace
38 changes: 35 additions & 3 deletions src/destinations/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,23 @@ def __init__(
self,
db_url: str,
table_name: str,
schema: str = "public",
if_exists: TableExistsPolicy = "append",
index_columns: list[str] | None = None,
):
if index_columns is None:
index_columns = []
self.engine: sqlalchemy.engine.Engine = create_engine(db_url)
self.table_name: str = table_name
self.schema: str | None = schema

# Split table_name if it contains schema
if "." in table_name:
self.schema, self.table_name = table_name.split(".", 1)
else:
self.schema = schema
self.table_name = table_name

self.if_exists: TableExistsPolicy = if_exists
# List of column forming the ON CONFLICT condition.
# Only relevant for "upsert" TableExistsPolicy
Expand All @@ -58,6 +68,19 @@ def __init__(

def validate(self) -> bool:
"""Validate the destination setup."""
# Check if schema exists
inspector = inspect(self.engine)
available_schemas = inspector.get_schema_names()
if self.schema not in available_schemas:
log.error(
"Schema '%s' does not exist. Available schemas: %s\n"
"To create this schema, run the following SQL command:\n"
"CREATE SCHEMA %s;",
self.schema,
", ".join(available_schemas),
self.schema,
)
return False
if self.if_exists == "upsert" and len(self.index_columns) == 0:
log.error("Upsert without index columns.")
return False
Expand All @@ -66,7 +89,9 @@ def validate(self) -> bool:
def validate_unique_constraints(self) -> None:
"""Validate table has unique or exclusion constraint for index columns."""
inspector = inspect(self.engine)
constraints = inspector.get_unique_constraints(self.table_name)
constraints = inspector.get_unique_constraints(
self.table_name, schema=self.schema
)
index_columns_set = set(self.index_columns)

for constraint in constraints:
Expand Down Expand Up @@ -96,7 +121,7 @@ def table_exists(self) -> bool:
:return: True if the table exists, False otherwise.
"""
inspector = inspect(self.engine)
tables = inspector.get_table_names()
tables = inspector.get_table_names(schema=self.schema)
return self.table_name in tables

def save(
Expand Down Expand Up @@ -146,6 +171,7 @@ def replace(
df.to_sql(
self.table_name,
connection,
schema=self.schema,
if_exists="replace",
index=False,
dtype=dtypes,
Expand All @@ -161,6 +187,7 @@ def append(
df.to_sql(
self.table_name,
connection,
schema=self.schema,
if_exists="append",
index=False,
dtype=dtypes,
Expand All @@ -185,7 +212,12 @@ def insert(
columns = df.columns.tolist()

metadata = MetaData()
table = Table(self.table_name, metadata, autoload_with=self.engine)
table = Table(
self.table_name,
metadata,
autoload_with=self.engine,
schema=self.schema,
)
statement = insert(table).values(**{col: df[col] for col in columns})

if on_conflict == "update":
Expand Down
5 changes: 4 additions & 1 deletion src/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ class Validate(ABC):

def __init__(self) -> None:
if not self.validate():
raise ValueError(f"Config for {self.__class__.__name__} is invalid")
raise ValueError(
f"Config for {self.__class__.__name__} is invalid. "
"See ERROR log for details."
)

@abstractmethod
def validate(self) -> bool:
Expand Down
28 changes: 10 additions & 18 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,22 @@

from src.args import Args
from src.config import RuntimeConfig
from src.job import Job


async def main() -> None:
"""Load configuration and execute jobs asynchronously.
The function:
1. Parses command line arguments
2. Loads the configuration from the specified config file (defaults to config.yaml)
3. Executes each configured job
4. Logs the completion of each job
async def main(jobs: list[Job]) -> None:
"""Asynchronously execute a list of jobs.
Raises:
FileNotFoundError: If config file is not found
yaml.YAMLError: If config file is invalid
Various exceptions depending on job configuration and execution
"""
tasks = [job.run() for job in jobs]
for completed_task in asyncio.as_completed(tasks):
await completed_task


if __name__ == "__main__":
args = Args.from_command_line()
config = RuntimeConfig.load(args.config)

Expand All @@ -51,11 +50,4 @@ async def main() -> None:
if args.jobs is not None
else config.jobs
)

tasks = [job.run() for job in jobs_to_run]
for completed_task in asyncio.as_completed(tasks):
await completed_task


if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main(jobs_to_run))

0 comments on commit d13edfe

Please sign in to comment.