diff --git a/dlt/destinations/impl/synapse/__init__.py b/dlt/destinations/impl/synapse/__init__.py index 53dbabc090..f6ad7369c1 100644 --- a/dlt/destinations/impl/synapse/__init__.py +++ b/dlt/destinations/impl/synapse/__init__.py @@ -41,6 +41,12 @@ def capabilities() -> DestinationCapabilitiesContext: caps.supports_transactions = True caps.supports_ddl_transactions = False + # Synapse throws "Some part of your SQL statement is nested too deeply. Rewrite the query or break it up into smaller queries." + # if number of records exceeds a certain number. Which exact number that is seems not deterministic: + # in tests, I've seen a query with 12230 records run succesfully on one run, but fail on a subsequent run, while the query remained exactly the same. + # 10.000 records is a "safe" amount that always seems to work. + caps.max_rows_per_insert = 10000 + # datetimeoffset can store 7 digits for fractional seconds # https://learn.microsoft.com/en-us/sql/t-sql/data-types/datetimeoffset-transact-sql?view=sql-server-ver16 caps.timestamp_precision = 7 diff --git a/dlt/destinations/insert_job_client.py b/dlt/destinations/insert_job_client.py index c25e8b9384..e3ce7265d1 100644 --- a/dlt/destinations/insert_job_client.py +++ b/dlt/destinations/insert_job_client.py @@ -38,9 +38,12 @@ def _insert(self, qualified_table_name: str, file_path: str) -> Iterator[List[st header = f.readline() writer_type = self._sql_client.capabilities.insert_values_writer_type if writer_type == "default": + sep = "," # properly formatted file has a values marker at the beginning values_mark = f.readline() assert values_mark == "VALUES\n" + elif writer_type == "select_union": + sep = " UNION ALL" max_rows = self._sql_client.capabilities.max_rows_per_insert @@ -58,10 +61,6 @@ def _insert(self, qualified_table_name: str, file_path: str) -> Iterator[List[st # if there was anything left, until_nl contains the last line is_eof = len(until_nl) == 0 or until_nl[-1] == ";" if not is_eof: - if writer_type == "default": - sep = "," - elif writer_type == "select_union": - sep = " UNION ALL" until_nl = until_nl[: -len(sep)] + ";" # replace the separator with ";" if max_rows is not None: # mssql has a limit of 1000 rows per INSERT, so we need to split into separate statements @@ -79,7 +78,7 @@ def _insert(self, qualified_table_name: str, file_path: str) -> Iterator[List[st insert_sql.append("".join(chunk) + until_nl) else: # Replace the , with ; - insert_sql.append("".join(chunk).strip()[:-1] + ";\n") + insert_sql.append("".join(chunk).strip()[: -len(sep)] + ";\n") else: # otherwise write all content in a single INSERT INTO if writer_type == "default": diff --git a/tests/load/test_insert_job_client.py b/tests/load/test_insert_job_client.py index 75c8440672..bd20ea9930 100644 --- a/tests/load/test_insert_job_client.py +++ b/tests/load/test_insert_job_client.py @@ -190,7 +190,7 @@ def test_query_split(client: InsertValuesJobClient, file_storage: FileStorage) - start = f"'{idx}'" end = ");" elif writer_type == "select_union": - start = f"ELECT '{idx}'" + start = f"SELECT '{idx}'" end = ";" assert fragment[-1].startswith(start) assert fragment[-1].endswith(end)