Skip to content

Commit

Permalink
set max_rows_per_insert to prevent error on larger queries in synapse
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorrit Sandbrink committed Apr 5, 2024
1 parent 2fed10a commit 7ebb64d
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 6 deletions.
6 changes: 6 additions & 0 deletions dlt/destinations/impl/synapse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ def capabilities() -> DestinationCapabilitiesContext:
caps.supports_transactions = True
caps.supports_ddl_transactions = False

# Synapse throws "Some part of your SQL statement is nested too deeply. Rewrite the query or break it up into smaller queries."
# if number of records exceeds a certain number. Which exact number that is seems not deterministic:
# in tests, I've seen a query with 12230 records run succesfully on one run, but fail on a subsequent run, while the query remained exactly the same.
# 10.000 records is a "safe" amount that always seems to work.
caps.max_rows_per_insert = 10000

# datetimeoffset can store 7 digits for fractional seconds
# https://learn.microsoft.com/en-us/sql/t-sql/data-types/datetimeoffset-transact-sql?view=sql-server-ver16
caps.timestamp_precision = 7
Expand Down
9 changes: 4 additions & 5 deletions dlt/destinations/insert_job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ def _insert(self, qualified_table_name: str, file_path: str) -> Iterator[List[st
header = f.readline()
writer_type = self._sql_client.capabilities.insert_values_writer_type
if writer_type == "default":
sep = ","
# properly formatted file has a values marker at the beginning
values_mark = f.readline()
assert values_mark == "VALUES\n"
elif writer_type == "select_union":
sep = " UNION ALL"

max_rows = self._sql_client.capabilities.max_rows_per_insert

Expand All @@ -58,10 +61,6 @@ def _insert(self, qualified_table_name: str, file_path: str) -> Iterator[List[st
# if there was anything left, until_nl contains the last line
is_eof = len(until_nl) == 0 or until_nl[-1] == ";"
if not is_eof:
if writer_type == "default":
sep = ","
elif writer_type == "select_union":
sep = " UNION ALL"
until_nl = until_nl[: -len(sep)] + ";" # replace the separator with ";"
if max_rows is not None:
# mssql has a limit of 1000 rows per INSERT, so we need to split into separate statements
Expand All @@ -79,7 +78,7 @@ def _insert(self, qualified_table_name: str, file_path: str) -> Iterator[List[st
insert_sql.append("".join(chunk) + until_nl)
else:
# Replace the , with ;
insert_sql.append("".join(chunk).strip()[:-1] + ";\n")
insert_sql.append("".join(chunk).strip()[: -len(sep)] + ";\n")
else:
# otherwise write all content in a single INSERT INTO
if writer_type == "default":
Expand Down
2 changes: 1 addition & 1 deletion tests/load/test_insert_job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def test_query_split(client: InsertValuesJobClient, file_storage: FileStorage) -
start = f"'{idx}'"
end = ");"
elif writer_type == "select_union":
start = f"ELECT '{idx}'"
start = f"SELECT '{idx}'"
end = ";"
assert fragment[-1].startswith(start)
assert fragment[-1].endswith(end)
Expand Down

0 comments on commit 7ebb64d

Please sign in to comment.