Skip to content

Commit

Permalink
add beacon integration
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Oct 30, 2023
1 parent cbd9d89 commit e5dd4d6
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 10 deletions.
4 changes: 3 additions & 1 deletion dlt/common/storages/load_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class LoadPackageInfo(NamedTuple):
package_path: str
state: TLoadPackageState
schema_name: str
schema: Schema
schema_update: TSchemaTables
completed_at: datetime.datetime
jobs: Dict[TJobState, List[LoadJobInfo]]
Expand All @@ -110,6 +111,7 @@ def asdict(self) -> DictStrAny:
table["columns"] = columns
d.pop("schema_update")
d["tables"] = tables
d["schema"] = self.schema.to_dict()
return d

def asstr(self, verbosity: int = 0) -> str:
Expand Down Expand Up @@ -288,7 +290,7 @@ def get_load_package_info(self, load_id: str) -> LoadPackageInfo:
jobs.append(self._read_job_file_info(state, file, package_created_at))
all_jobs[state] = jobs

return LoadPackageInfo(load_id, self.storage.make_full_path(package_path), package_state, schema.name, applied_update, package_created_at, all_jobs)
return LoadPackageInfo(load_id, self.storage.make_full_path(package_path), package_state, schema.name, schema, applied_update, package_created_at, all_jobs)

def begin_schema_update(self, load_id: str) -> Optional[TSchemaTables]:
package_path = self.get_package_path(load_id)
Expand Down
25 changes: 17 additions & 8 deletions dlt/pipeline/platform.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
"""Implements SupportsTracking"""
from dlt.pipeline.trace import PipelineTrace, PipelineStepTrace, TPipelineStep, SupportsPipeline
from typing import Any

from dlt.sources.helpers import requests
from dlt.pipeline.trace import PipelineTrace, PipelineStepTrace, TPipelineStep, SupportsPipeline
from dlt.common import json

count = 0

def _send_to_beacon(trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline):
if not pipeline.runtime_config.beacon_token or not pipeline.runtime_config.beacon_url:
return
def _send_to_beacon(trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline, some):
if pipeline.runtime_config.beacon_token and pipeline.runtime_config.beacon_url:
trace_dump = json.dumps(trace.asdict())
url = f"{pipeline.runtime_config.beacon_url}/pipeline/{pipeline.runtime_config.beacon_token}/traces"
requests.put(url, json=trace_dump)

def on_start_trace(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> None:
_send_to_beacon(trace, step, pipeline, None)
# _send_to_beacon(trace, step, pipeline, None)
pass

def on_start_trace_step(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> None:
_send_to_beacon(trace, step, pipeline, None)
# _send_to_beacon(trace, step, pipeline, None)
pass

def on_end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline, step_info: Any) -> None:
_send_to_beacon(trace, step, pipeline, step_info)
# _send_to_beacon(trace, step, pipeline, step_info)
pass

def on_end_trace(trace: PipelineTrace, pipeline: SupportsPipeline) -> None:
_send_to_beacon(trace, None, pipeline, None)

pass

2 changes: 1 addition & 1 deletion dlt/pipeline/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def start_trace(step: TPipelineStep, pipeline: SupportsPipeline) -> PipelineTrac

def start_trace_step(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> PipelineStepTrace:
trace_step = PipelineStepTrace(uniq_id(), step, pendulum.now())
trace.steps.append(trace_step)
for module in TRACKING_MODULES:
with suppress_and_warn():
module.on_start_trace_step(trace, step, pipeline)
Expand Down Expand Up @@ -215,7 +216,6 @@ def end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: Supp
) , _RESOLVED_TRACES.values())

trace.resolved_config_values = list(resolved_values)
trace.steps.append(step)
for module in TRACKING_MODULES:
with suppress_and_warn():
module.on_end_trace_step(trace, step, pipeline, step_info)
Expand Down

0 comments on commit e5dd4d6

Please sign in to comment.