Skip to content

Commit

Permalink
closes writers on exceptions, passes metrics on exceptions, fixes som…
Browse files Browse the repository at this point in the history
…e edge cases with empty arrow files
  • Loading branch information
rudolfix committed Apr 4, 2024
1 parent 3bef4da commit 912fb82
Show file tree
Hide file tree
Showing 23 changed files with 275 additions and 134 deletions.
35 changes: 22 additions & 13 deletions dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,10 @@ def import_file(self, file_path: str, metrics: DataWriterMetrics) -> DataWriterM
self._rotate_file()
return metrics

def close(self) -> None:
def close(self, skip_flush: bool = False) -> None:
"""Flushes the data, writes footer (skip_flush is True), collects metrics and closes the underlying file."""
self._ensure_open()
self._flush_and_close_file()
self._flush_and_close_file(skip_flush=skip_flush)
self._closed = True

@property
Expand All @@ -177,7 +178,8 @@ def __enter__(self) -> "BufferedDataWriter[TWriter]":
return self

def __exit__(self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb: Any) -> None:
self.close()
# skip flush if we had exception
self.close(skip_flush=exc_val is not None)

def _rotate_file(self, allow_empty_file: bool = False) -> DataWriterMetrics:
metrics = self._flush_and_close_file(allow_empty_file)
Expand All @@ -188,7 +190,7 @@ def _rotate_file(self, allow_empty_file: bool = False) -> DataWriterMetrics:
return metrics

def _flush_items(self, allow_empty_file: bool = False) -> None:
if self._buffered_items_count > 0 or allow_empty_file:
if self._buffered_items or allow_empty_file:
# we only open a writer when there are any items in the buffer and first flush is requested
if not self._writer:
# create new writer and write header
Expand All @@ -205,15 +207,22 @@ def _flush_items(self, allow_empty_file: bool = False) -> None:
self._buffered_items.clear()
self._buffered_items_count = 0

def _flush_and_close_file(self, allow_empty_file: bool = False) -> DataWriterMetrics:
# if any buffered items exist, flush them
self._flush_items(allow_empty_file)
# if writer exists then close it
if not self._writer:
return None
# write the footer of a file
self._writer.write_footer()
self._file.flush()
def _flush_and_close_file(
self, allow_empty_file: bool = False, skip_flush: bool = False
) -> DataWriterMetrics:
if not skip_flush:
# if any buffered items exist, flush them
self._flush_items(allow_empty_file)
# if writer exists then close it
if not self._writer:
return None
# write the footer of a file
self._writer.write_footer()
self._file.flush()
else:
if not self._writer:
return None
self._writer.close()
# add file written to the list so we can commit all the files later
metrics = DataWriterMetrics(
self._file_name,
Expand Down
29 changes: 20 additions & 9 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ def write_data(self, rows: Sequence[Any]) -> None:
def write_footer(self) -> None: # noqa
pass

def close(self) -> None: # noqa
pass

def write_all(self, columns_schema: TTableSchemaColumns, rows: Sequence[Any]) -> None:
self.write_header(columns_schema)
self.write_data(rows)
Expand Down Expand Up @@ -321,9 +324,10 @@ def write_data(self, rows: Sequence[Any]) -> None:
# Write
self.writer.write_table(table, row_group_size=self.parquet_row_group_size)

def write_footer(self) -> None:
self.writer.close()
self.writer = None
def close(self) -> None: # noqa
if self.writer:
self.writer.close()
self.writer = None

@classmethod
def writer_spec(cls) -> FileWriterSpec:
Expand Down Expand Up @@ -362,10 +366,9 @@ def write_data(self, rows: Sequence[Any]) -> None:
# count rows that got written
self.items_count += sum(len(row) for row in rows)

def write_footer(self) -> None:
if self.writer is None:
self.writer = None
self._first_schema = None
def close(self) -> None:
self.writer = None
self._first_schema = None

@classmethod
def writer_spec(cls) -> FileWriterSpec:
Expand Down Expand Up @@ -405,6 +408,9 @@ def write_footer(self) -> None:
raise NotImplementedError("Arrow Writer does not support writing empty files")
return super().write_footer()

def close(self) -> None:
return super().close()

@classmethod
def writer_spec(cls) -> FileWriterSpec:
return FileWriterSpec(
Expand Down Expand Up @@ -488,10 +494,15 @@ def write_footer(self) -> None:
# write empty file
self._f.write(
self.delimiter.join(
[col["name"].encode("utf-8") for col in self._columns_schema.values()]
[
b'"' + col["name"].encode("utf-8") + b'"'
for col in self._columns_schema.values()
]
)
)
else:

def close(self) -> None:
if self.writer:
self.writer.close()
self.writer = None
self._first_schema = None
Expand Down
8 changes: 5 additions & 3 deletions dlt/common/storages/data_item_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,17 @@ def import_items_file(
writer = self._get_writer(load_id, schema_name, table_name)
return writer.import_file(file_path, metrics)

def close_writers(self, load_id: str) -> None:
# flush and close all files
def close_writers(self, load_id: str, skip_flush: bool = False) -> None:
"""Flush, write footers (skip_flush), write metrics and close files in all
writers belonging to `load_id` package
"""
for name, writer in self.buffered_writers.items():
if name.startswith(load_id) and not writer.closed:
logger.debug(
f"Closing writer for {name} with file {writer._file} and actual name"
f" {writer._file_name}"
)
writer.close()
writer.close(skip_flush=skip_flush)

def closed_files(self, load_id: str) -> List[DataWriterMetrics]:
"""Return metrics for all fully processed (closed) files"""
Expand Down
11 changes: 6 additions & 5 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,9 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
if insert_api == "streaming":
if table["write_disposition"] != "append":
raise DestinationTerminalException(
(
"BigQuery streaming insert can only be used with `append` write_disposition, while "
f'the given resource has `{table["write_disposition"]}`.'
)
"BigQuery streaming insert can only be used with `append`"
" write_disposition, while the given resource has"
f" `{table['write_disposition']}`."
)
if file_path.endswith(".jsonl"):
job_cls = DestinationJsonlLoadJob
Expand Down Expand Up @@ -364,7 +363,9 @@ def prepare_load_table(

def _get_column_def_sql(self, column: TColumnSchema, table_format: TTableFormat = None) -> str:
name = self.capabilities.escape_identifier(column["name"])
column_def_sql = f"{name} {self.type_mapper.to_db_type(column, table_format)} {self._gen_not_null(column.get('nullable', True))}"
column_def_sql = (
f"{name} {self.type_mapper.to_db_type(column, table_format)} {self._gen_not_null(column.get('nullable', True))}"
)
if column.get(ROUND_HALF_EVEN_HINT, False):
column_def_sql += " OPTIONS (rounding_mode='ROUND_HALF_EVEN')"
if column.get(ROUND_HALF_AWAY_FROM_ZERO_HINT, False):
Expand Down
6 changes: 2 additions & 4 deletions dlt/destinations/impl/bigquery/bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,8 @@ def bigquery_adapter(
if insert_api is not None:
if insert_api == "streaming" and data.write_disposition != "append":
raise ValueError(
(
"BigQuery streaming insert can only be used with `append` write_disposition, while "
f"the given resource has `{data.write_disposition}`."
)
"BigQuery streaming insert can only be used with `append` write_disposition, while "
f"the given resource has `{data.write_disposition}`."
)
additional_table_hints |= {"x-insert-api": insert_api} # type: ignore[operator]

Expand Down
85 changes: 51 additions & 34 deletions dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections.abc import Sequence as C_Sequence
from copy import copy
import itertools
from typing import List, Dict, Any
from typing import Iterator, List, Dict, Any
import yaml

from dlt.common.configuration.container import Container
Expand Down Expand Up @@ -304,41 +304,58 @@ def _extract_single_source(
load_id, self.extract_storage.item_storages["arrow"], schema, collector=collector
),
}

# make sure we close storage on exception
with collector(f"Extract {source.name}"):
self._step_info_start_load_id(load_id)
# yield from all selected pipes
with PipeIterator.from_pipes(
source.resources.selected_pipes,
max_parallel_items=max_parallel_items,
workers=workers,
futures_poll_interval=futures_poll_interval,
) as pipes:
left_gens = total_gens = len(pipes._sources)
collector.update("Resources", 0, total_gens)
for pipe_item in pipes:
curr_gens = len(pipes._sources)
if left_gens > curr_gens:
delta = left_gens - curr_gens
left_gens -= delta
collector.update("Resources", delta)
signals.raise_if_signalled()
resource = source.resources[pipe_item.pipe.name]
item_format = get_data_item_format(pipe_item.item)
extractors[item_format].write_items(resource, pipe_item.item, pipe_item.meta)

self._write_empty_files(source, extractors)
if left_gens > 0:
# go to 100%
collector.update("Resources", left_gens)

# flush all buffered writers
with self.manage_writers(load_id, source):
# yield from all selected pipes
with PipeIterator.from_pipes(
source.resources.selected_pipes,
max_parallel_items=max_parallel_items,
workers=workers,
futures_poll_interval=futures_poll_interval,
) as pipes:
left_gens = total_gens = len(pipes._sources)
collector.update("Resources", 0, total_gens)
for pipe_item in pipes:
curr_gens = len(pipes._sources)
if left_gens > curr_gens:
delta = left_gens - curr_gens
left_gens -= delta
collector.update("Resources", delta)
signals.raise_if_signalled()
resource = source.resources[pipe_item.pipe.name]
item_format = get_data_item_format(pipe_item.item)
extractors[item_format].write_items(
resource, pipe_item.item, pipe_item.meta
)

self._write_empty_files(source, extractors)
if left_gens > 0:
# go to 100%
collector.update("Resources", left_gens)

@contextlib.contextmanager
def manage_writers(self, load_id: str, source: DltSource) -> Iterator[ExtractStorage]:
self._step_info_start_load_id(load_id)
# self.current_source = source
try:
yield self.extract_storage
except Exception:
# kill writers without flushing the content
self.extract_storage.close_writers(load_id, skip_flush=True)
raise
else:
self.extract_storage.close_writers(load_id)
# gather metrics
self._step_info_complete_load_id(load_id, self._compute_metrics(load_id, source))
# remove the metrics of files processed in this extract run
# NOTE: there may be more than one extract run per load id: ie. the resource and then dlt state
self.extract_storage.remove_closed_files(load_id)
finally:
# gather metrics when storage is closed
self.gather_metrics(load_id, source)

def gather_metrics(self, load_id: str, source: DltSource) -> None:
# gather metrics
self._step_info_complete_load_id(load_id, self._compute_metrics(load_id, source))
# remove the metrics of files processed in this extract run
# NOTE: there may be more than one extract run per load id: ie. the resource and then dlt state
self.extract_storage.remove_closed_files(load_id)

def extract(
self,
Expand Down
5 changes: 3 additions & 2 deletions dlt/extract/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,11 @@ def _write_item(
self.load_id, self.schema.name, table_name, items, columns
)
self.collector.update(table_name, inc=new_rows_count)
if new_rows_count > 0:
# if there were rows or item was empty arrow table
if new_rows_count > 0 or self.__class__ is ArrowExtractor:
self.resources_with_items.add(resource_name)
else:
if isinstance(items, MaterializedEmptyList) or self.__class__ is ArrowExtractor:
if isinstance(items, MaterializedEmptyList):
self.resources_with_empty.add(resource_name)

def _write_to_dynamic_table(self, resource: DltResource, items: TDataItems) -> None:
Expand Down
4 changes: 2 additions & 2 deletions dlt/extract/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ def create_load_package(self, schema: Schema, reuse_exiting_package: bool = True
self.new_packages.save_schema(load_id, schema)
return load_id

def close_writers(self, load_id: str) -> None:
def close_writers(self, load_id: str, skip_flush: bool = False) -> None:
for storage in self.item_storages.values():
storage.close_writers(load_id)
storage.close_writers(load_id, skip_flush=skip_flush)

def closed_files(self, load_id: str) -> List[DataWriterMetrics]:
files = []
Expand Down
6 changes: 5 additions & 1 deletion dlt/normalize/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Any, List
from dlt.common.exceptions import DltException


Expand All @@ -7,10 +8,13 @@ def __init__(self, msg: str) -> None:


class NormalizeJobFailed(NormalizeException):
def __init__(self, load_id: str, job_id: str, failed_message: str) -> None:
def __init__(
self, load_id: str, job_id: str, failed_message: str, writer_metrics: List[Any]
) -> None:
self.load_id = load_id
self.job_id = job_id
self.failed_message = failed_message
self.writer_metrics = writer_metrics
super().__init__(
f"Job for {job_id} failed terminally in load {load_id} with message {failed_message}."
)
7 changes: 4 additions & 3 deletions dlt/normalize/items_normalizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def _write_with_dlt_columns(
items_count = 0
columns_schema = schema.get_table_columns(root_table_name)
# if we use adapter to convert arrow to dicts, then normalization is not necessary
may_normalize = not issubclass(self.item_storage.writer_cls, ArrowToObjectAdapter)
is_native_arrow_writer = not issubclass(self.item_storage.writer_cls, ArrowToObjectAdapter)
should_normalize: bool = None
with self.normalize_storage.extracted_packages.storage.open_file(
extracted_items_file, "rb"
Expand All @@ -293,7 +293,7 @@ def _write_with_dlt_columns(
):
items_count += batch.num_rows
# we may need to normalize
if may_normalize and should_normalize is None:
if is_native_arrow_writer and should_normalize is None:
should_normalize, _, _, _ = pyarrow.should_normalize_arrow_schema(
batch.schema, columns_schema, schema.naming
)
Expand All @@ -315,7 +315,8 @@ def _write_with_dlt_columns(
batch,
columns_schema,
)
if items_count == 0:
# TODO: better to check if anything is in the buffer and skip writing file
if items_count == 0 and not is_native_arrow_writer:
self.item_storage.write_empty_items_file(
load_id,
schema.name,
Expand Down
Loading

0 comments on commit 912fb82

Please sign in to comment.