-
Notifications
You must be signed in to change notification settings - Fork 199
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
18 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,13 +20,13 @@ def read_sql_x( | |
def genome_resource(): | ||
# create genome resource with merge on `upid` primary key | ||
genome = dlt.resource( | ||
name="acanthochromis_polyacanthus", | ||
name="genome", | ||
write_disposition="merge", | ||
primary_key="analysis_id", | ||
primary_key="upid", | ||
standalone=True, | ||
)(read_sql_x)( | ||
"mysql://[email protected]:3306/acanthochromis_polyacanthus_core_100_1", # type: ignore[arg-type] | ||
"SELECT * FROM analysis LIMIT 20", | ||
"mysql://[email protected]:4497/Rfam", # type: ignore[arg-type] | ||
"SELECT * FROM genome ORDER BY created LIMIT 1000", | ||
) | ||
# add incremental on created at | ||
genome.apply_hints(incremental=dlt.sources.incremental("created")) | ||
|
@@ -47,6 +47,6 @@ def genome_resource(): | |
|
||
# check that stuff was loaded # @@@DLT_REMOVE | ||
row_counts = pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE | ||
assert row_counts["acanthochromis_polyacanthus"] == 20 # @@@DLT_REMOVE | ||
assert row_counts["genome"] == 1000 # @@@DLT_REMOVE | ||
|
||
# @@@DLT_SNIPPET_END example |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -99,25 +99,21 @@ def db_snippet() -> None: | |
# use any sql database supported by SQLAlchemy, below we use a public mysql instance to get data | ||
# NOTE: you'll need to install pymysql with "pip install pymysql" | ||
# NOTE: loading data from public mysql instance may take several seconds | ||
engine = create_engine( | ||
"mysql+pymysql://[email protected]:3306/acanthochromis_polyacanthus_core_100_1" | ||
) | ||
engine = create_engine("mysql+pymysql://[email protected]:4497/Rfam") | ||
with engine.connect() as conn: | ||
# select genome table, stream data in batches of 100 elements | ||
rows = conn.execution_options(yield_per=100).exec_driver_sql( | ||
"SELECT * FROM analysis LIMIT 1000" | ||
"SELECT * FROM genome LIMIT 1000" | ||
) | ||
|
||
pipeline = dlt.pipeline( | ||
pipeline_name="from_database", | ||
destination="duckdb", | ||
dataset_name="acanthochromis_polyacanthus_data", | ||
dataset_name="genome_data", | ||
) | ||
|
||
# here we convert the rows into dictionaries on the fly with a map function | ||
load_info = pipeline.run( | ||
map(lambda row: dict(row._mapping), rows), table_name="acanthochromis_polyacanthus" | ||
) | ||
load_info = pipeline.run(map(lambda row: dict(row._mapping), rows), table_name="genome") | ||
|
||
print(load_info) | ||
# @@@DLT_SNIPPET_END db | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,13 +18,14 @@ def intro_snippet() -> None: | |
response.raise_for_status() | ||
data.append(response.json()) | ||
# Extract, normalize, and load the data | ||
load_info = pipeline.run(data, table_name="player") | ||
load_info = pipeline.run(data, table_name='player') | ||
# @@@DLT_SNIPPET_END api | ||
|
||
assert_load_info(load_info) | ||
|
||
|
||
def csv_snippet() -> None: | ||
|
||
# @@@DLT_SNIPPET_START csv | ||
import dlt | ||
import pandas as pd | ||
|
@@ -49,8 +50,8 @@ def csv_snippet() -> None: | |
|
||
assert_load_info(load_info) | ||
|
||
|
||
def db_snippet() -> None: | ||
|
||
# @@@DLT_SNIPPET_START db | ||
import dlt | ||
from sqlalchemy import create_engine | ||
|
@@ -59,27 +60,27 @@ def db_snippet() -> None: | |
# MySQL instance to get data. | ||
# NOTE: you'll need to install pymysql with `pip install pymysql` | ||
# NOTE: loading data from public mysql instance may take several seconds | ||
engine = create_engine( | ||
"mysql+pymysql://[email protected]:3306/acanthochromis_polyacanthus_core_100_1" | ||
) | ||
engine = create_engine("mysql+pymysql://[email protected]:4497/Rfam") | ||
|
||
with engine.connect() as conn: | ||
# Select genome table, stream data in batches of 100 elements | ||
query = "SELECT * FROM analysis LIMIT 1000" | ||
query = "SELECT * FROM genome LIMIT 1000" | ||
rows = conn.execution_options(yield_per=100).exec_driver_sql(query) | ||
|
||
pipeline = dlt.pipeline( | ||
pipeline_name="from_database", | ||
destination="duckdb", | ||
dataset_name="acanthochromis_polyacanthus_data", | ||
dataset_name="genome_data", | ||
) | ||
|
||
# Convert the rows into dictionaries on the fly with a map function | ||
load_info = pipeline.run( | ||
map(lambda row: dict(row._mapping), rows), table_name="acanthochromis_polyacanthus" | ||
map(lambda row: dict(row._mapping), rows), | ||
table_name="genome" | ||
) | ||
|
||
print(load_info) | ||
# @@@DLT_SNIPPET_END db | ||
|
||
assert_load_info(load_info) | ||
|