Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error sending Delta files to Azure Gen2 Storage if over a certain size. #2086

Open
pwr-philarmstrong opened this issue Nov 22, 2024 · 4 comments
Assignees
Labels
question Further information is requested

Comments

@pwr-philarmstrong
Copy link

dlt version

1.3.0

Describe the problem

I have a pipeline that copies a table from sql server to azure gen2 storage. It creates delta files and works fine if the parquet files are small however when they get larger I get a failure sending and it goes into a retry loop.

Logging the azure storage I can see this sort of error details

StatusCode         500
StatusText         InternalError
DurationMs         29870
ServerLatencyMs	   379
Uri                https://playgrounddatapstorage.blob.core.windows.net:443/bronze/risk360_us_sql_server_test/elapsedtime/part-00001-151d1409-68ce-4e58-8065-f6c535dfdd38-c000.snappy.parquet
ServiceType        blob
ObjectKey          /playgrounddatapstorage/bronze/risk360_us_sql_server_test/elapsedtime/part-00001-151d1409-68ce-4e58-8065-f6c535dfdd38-c000.snappy.parquet
RequestHeaderSize  452
RequestBodySize    58179144
ResponseHeaderSize 173
Category           StorageWrite
TlsVersion         TLS 1.2
MetricResponseType NetworkError
SourceAccessTier   Invalid

the pipeline part looks like this

           # Create a dlt pipeline object
            pipeline = dlt.pipeline(
                pipeline_name="client_tables_to_delta_az",
                destination=filesystem(bucket_url=f'az://bronze/'), #abfss
                dataset_name=folder_dataset_name,
            )            
            
            # Run the pipeline
            load_info = pipeline.run(all_data(connection_string, database_item), 
                                     write_disposition="append", 
                                     table_format="delta",                                                                  
                                    )

and this is a chunk of the log file around where it fails though it only shows that its waiting and that if fails after 5 retries.

2024-11-21 17:34:11 - azure.core.pipeline.policies.http_logging_policy - INFO - Request URL: 'https://playgrounddatapstorage.blob.core.windows.net/bronze/risk360_us_sql_server_test/init'
Request method: 'DELETE'
Request headers:
    'x-ms-version': 'REDACTED'
    'Accept': 'application/xml'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': 'd22291ad-a82e-11ef-9aac-8c3b4a05b5bc'
    'User-Agent': 'azsdk-python-storage-blob/12.23.1 Python/3.12.6 (Windows-11-10.0.22631-SP0)'
    'Authorization': 'REDACTED'
No body was attached to the request
2024-11-21 17:34:12 - azure.core.pipeline.policies.http_logging_policy - INFO - Response status: 202
Response headers:
    'Content-Length': '0'
    'Server': 'Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0'
    'x-ms-request-id': '6b951afc-201e-0061-763b-3cd464000000'
    'x-ms-client-request-id': 'd22291ad-a82e-11ef-9aac-8c3b4a05b5bc'
    'x-ms-version': 'REDACTED'
    'x-ms-delete-type-permanent': 'REDACTED'
    'x-ms-deletion-id': 'REDACTED'
    'Date': 'Thu, 21 Nov 2024 17:34:11 GMT'
2024-11-21 17:34:12 - azure.core.pipeline.policies.http_logging_policy - INFO - Request URL: 'https://playgrounddatapstorage.blob.core.windows.net/bronze/risk360_us_sql_server_test/init?comp=REDACTED'
Request method: 'PUT'
Request headers:
    'Content-Length': '52'
    'x-ms-meta-is_directory': 'REDACTED'
    'x-ms-version': 'REDACTED'
    'Content-Type': 'application/xml'
    'Accept': 'application/xml'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': 'd237588e-a82e-11ef-9e2e-8c3b4a05b5bc'
    'User-Agent': 'azsdk-python-storage-blob/12.23.1 Python/3.12.6 (Windows-11-10.0.22631-SP0)'
    'Authorization': 'REDACTED'
A body is sent with the request
2024-11-21 17:34:12 - azure.core.pipeline.policies.http_logging_policy - INFO - Response status: 201
Response headers:
    'Content-Length': '0'
    'Last-Modified': 'Thu, 21 Nov 2024 17:34:12 GMT'
    'Etag': '"0x8DD0A52B69D7BFC"'
    'Server': 'Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0'
    'x-ms-request-id': '6b951b43-201e-0061-333b-3cd464000000'
    'x-ms-client-request-id': 'd237588e-a82e-11ef-9e2e-8c3b4a05b5bc'
    'x-ms-version': 'REDACTED'
    'x-ms-content-crc64': 'REDACTED'
    'x-ms-request-server-encrypted': 'REDACTED'
    'Date': 'Thu, 21 Nov 2024 17:34:11 GMT'
2024-11-21 17:34:12,193|[INFO]|54140|53952|dlt|load.py|resume_started_jobs:291|Found 0 that are already started and should be continued
2024-11-21 17:34:12,193|[INFO]|54140|53952|dlt|load.py|complete_jobs:390|Will complete 0 for 1732210401.2547312
2024-11-21 17:34:12,195|[INFO]|54140|53952|dlt|load.py|start_new_jobs:274|Will load additional 2, creating jobs
2024-11-21 17:34:12,196|[INFO]|54140|53952|dlt|load.py|submit_job:169|Will load file 1732210401.2547312\new_jobs\elapsedtime.ea8c3353ec.0.parquet with table name elapsedtime
2024-11-21 17:34:12,669|[INFO]|54140|53952|dlt|load.py|submit_job:169|Will load file 1732210401.2547312\new_jobs\_dlt_pipeline_state.4857574e24.0.jsonl with table name _dlt_pipeline_state
2024-11-21 17:34:13,686|[INFO]|54140|53952|dlt|load.py|complete_jobs:390|Will complete 2 for 1732210401.2547312
2024-11-21 17:34:13,687|[DEBUG]|54140|53952|dlt|load.py|complete_jobs:393|Checking state for job elapsedtime.ea8c3353ec.parquet
2024-11-21 17:34:13,694|[INFO]|54140|53952|dlt|load.py|create_followup_jobs:369|Job elapsedtime.ea8c3353ec.parquet CREATED a new FOLLOWUP JOB C:\Users\PHILAR~1\AppData\Local\Temp\elapsedtime.ea8c3353ec.0.reference placed in new_jobs
2024-11-21 17:34:13,696|[INFO]|54140|53952|dlt|load.py|complete_jobs:459|Job for elapsedtime.ea8c3353ec.parquet completed in load 1732210401.2547312
--------------------- Load all_data in 1732210401.2547312 ----------------------
Jobs: 1/2 (50.0%) | Time: 4.19s | Rate: 0.24/s
Memory usage: 266.70 MB (83.70%) | CPU usage: 0.00%

2024-11-21 17:34:13,717|[DEBUG]|54140|53952|dlt|load.py|complete_jobs:393|Checking state for job _dlt_pipeline_state.4857574e24.jsonl
2024-11-21 17:34:13,719|[INFO]|54140|53952|dlt|load.py|complete_jobs:459|Job for _dlt_pipeline_state.4857574e24.jsonl completed in load 1732210401.2547312
2024-11-21 17:34:13,720|[INFO]|54140|53952|dlt|load.py|start_new_jobs:274|Will load additional 1, creating jobs
2024-11-21 17:34:13,721|[INFO]|54140|53952|dlt|load.py|submit_job:169|Will load file 1732210401.2547312\new_jobs\elapsedtime.ea8c3353ec.0.reference with table name elapsedtime
2024-11-21 17:34:13,751|[INFO]|54140|53632|dlt|filesystem.py|run:138|Will copy file(s) ['C:\\Users\\PhilArmstrong\\.dlt\\pipelines\\client_tables_to_delta_az\\load\\normalized\\1732210401.2547312\\completed_jobs\\elapsedtime.ea8c3353ec.0.parquet'] to delta table az://bronze/risk360_us_sql_server_test/elapsedtime [arrow buffer: 0]
2024-11-21 17:34:14,764|[INFO]|54140|53952|dlt|load.py|complete_jobs:390|Will complete 1 for 1732210401.2547312
2024-11-21 17:34:14,765|[DEBUG]|54140|53952|dlt|load.py|complete_jobs:393|Checking state for job elapsedtime.ea8c3353ec.reference
2024-11-21 17:34:14,766|[DEBUG]|54140|53952|dlt|load.py|complete_jobs:397|job elapsedtime.ea8c3353ec.reference still running
2024-11-21 17:34:14,768|[INFO]|54140|53952|dlt|load.py|start_new_jobs:274|Will load additional 0, creating jobs
2024-11-21 17:34:15,777|[INFO]|54140|53952|dlt|load.py|complete_jobs:390|Will complete 1 for 1732210401.2547312
2024-11-21 17:34:15,778|[DEBUG]|54140|53952|dlt|load.py|complete_jobs:393|Checking state for job elapsedtime.ea8c3353ec.reference
2024-11-21 17:34:15,778|[DEBUG]|54140|53952|dlt|load.py|complete_jobs:397|job elapsedtime.ea8c3353ec.reference still running
2024-11-21 17:34:15,779|[INFO]|54140|53952|dlt|load.py|start_new_jobs:274|Will load additional 0, creating jobs
2024-11-21 17:34:16,790|[INFO]|54140|53952|dlt|load.py|complete_jobs:390|Will complete 1 for 1732210401.2547312
2024-11-21 17:34:16,791|[DEBUG]|54140|53952|dlt|load.py|complete_jobs:393|Checking state for job elapsedtime.ea8c3353ec.reference
2024-11-21 17:34:16,792|[DEBUG]|54140|53952|dlt|load.py|complete_jobs:397|job elapsedtime.ea8c3353ec.reference still running
2024-11-21 17:34:16,793|[INFO]|54140|53952|dlt|load.py|start_new_jobs:274|Will load additional 0, creating jobs
2024-11-21 17:34:17,802|[INFO]|54140|53952|dlt|load.py|complete_jobs:390|Will complete 1 for 1732210401.2547312
2024-11-21 17:44:33,778|[DEBUG]|36652|42288|dlt|load.py|complete_jobs:393|Checking state for job elapsedtime.ea8c3353ec.reference
2024-11-21 17:44:33,780|[DEBUG]|36652|42288|dlt|load.py|complete_jobs:397|job elapsedtime.ea8c3353ec.reference still running
2024-11-21 17:44:33,781|[INFO]|36652|42288|dlt|load.py|start_new_jobs:274|Will load additional 0, creating jobs
2024-11-21 17:44:34,788|[INFO]|36652|42288|dlt|load.py|complete_jobs:390|Will complete 1 for 1732210401.2547312
2024-11-21 17:44:34,789|[DEBUG]|36652|42288|dlt|load.py|complete_jobs:393|Checking state for job elapsedtime.ea8c3353ec.reference
2024-11-21 17:44:34,790|[DEBUG]|36652|42288|dlt|load.py|complete_jobs:397|job elapsedtime.ea8c3353ec.reference still running
2024-11-21 17:44:34,792|[INFO]|36652|42288|dlt|load.py|start_new_jobs:274|Will load additional 0, creating jobs
2024-11-21 17:44:35,586|[ERROR]|36652|17540|dlt|load.py|w_run_job:248|Transient exception in job elapsedtime.ea8c3353ec.reference in file C:\Users\PhilArmstrong\.dlt\pipelines\client_tables_to_delta_az\load\normalized\1732210401.2547312\started_jobs\elapsedtime.ea8c3353ec.1.reference
Traceback (most recent call last):
  File "C:\Users\PhilArmstrong\OneDrive - POWWR\Dev\venv\Lib\site-packages\dlt\common\destination\reference.py", line 420, in run_managed
    self.run()
  File "C:\Users\PhilArmstrong\OneDrive - POWWR\Dev\venv\Lib\site-packages\dlt\destinations\impl\filesystem\filesystem.py", line 161, in run
    write_delta_table(
  File "C:\Users\PhilArmstrong\OneDrive - POWWR\Dev\venv\Lib\site-packages\dlt\common\libs\deltalake.py", line 88, in write_delta_table
    write_deltalake(  # type: ignore[call-overload]
  File "C:\Users\PhilArmstrong\OneDrive - POWWR\Dev\venv\Lib\site-packages\deltalake\writer.py", line 323, in write_deltalake
    write_deltalake_rust(
OSError: Generic MicrosoftAzure error: Error after 5 retries in 180.8821565s, max_retries:10, retry_timeout:180s, source:error sending request for url (https://playgrounddatapstorage.blob.core.windows.net/bronze/risk360_us_sql_server_test/elapsedtime/part-00001-36118c25-ac6d-43b3-aad3-90a6131126f2-c000.snappy.parquet)
2024-11-21 17:44:35,801|[INFO]|36652|42288|dlt|load.py|complete_jobs:390|Will complete 1 for 1732210401.2547312
2024-11-21 17:44:35,802|[DEBUG]|36652|42288|dlt|load.py|complete_jobs:393|Checking state for job elapsedtime.ea8c3353ec.reference
2024-11-21 17:44:35,805|[WARNING]|36652|42288|dlt|load.py|complete_jobs:430|Job for elapsedtime.ea8c3353ec.reference retried in load 1732210401.2547312 with message Generic MicrosoftAzure error: Error after 5 retries in 180.8821565s, max_retries:10, retry_timeout:180s, source:error sending request for url (https://playgrounddatapstorage.blob.core.windows.net/bronze/risk360_us_sql_server_test/elapsedtime/part-00001-36118c25-ac6d-43b3-aad3-90a6131126f2-c000.snappy.parquet)
2024-11-21 17:44:35,805|[INFO]|36652|42288|dlt|load.py|start_new_jobs:274|Will load additional 1, creating jobs
2024-11-21 17:44:35,806|[INFO]|36652|42288|dlt|load.py|submit_job:169|Will load file 1732210401.2547312\new_jobs\elapsedtime.ea8c3353ec.2.reference with table name elapsedtime
2024-11-21 17:44:35,852|[INFO]|36652|17540|dlt|filesystem.py|run:138|Will copy file(s) ['C:\\Users\\PhilArmstrong\\.dlt\\pipelines\\client_tables_to_delta_az\\load\\normalized\\1732210401.2547312\\completed_jobs\\elapsedtime.ea8c3353ec.0.parquet'] to delta table az://bronze/risk360_us_sql_server_test/elapsedtime [arrow buffer: 0]
2024-11-21 17:44:36,870|[INFO]|36652|42288|dlt|load.py|complete_jobs:390|Will complete 1 for 1732210401.2547312
2024-11-21 17:44:36,924|[DEBUG]|36652|42288|dlt|load.py|complete_jobs:393|Checking state for job elapsedtime.ea8c3353ec.reference
2024-11-21 17:44:36,928|[DEBUG]|36652|42288|dlt|load.py|complete_jobs:397|job elapsedtime.ea8c3353ec.reference still running
2024-11-21 17:44:36,947|[INFO]|36652|42288|dlt|load.py|start_new_jobs:274|Will load additional 0, creating jobs
2024-11-21 17:44:37,962|[INFO]|36652|42288|dlt|load.py|complete_jobs:390|Will complete 1 for 1732210401.2547312

Expected behavior

I would expect the files to either be successfully sent to azure storage either as a single file as with the smaller files or broken into blocks.

Steps to reproduce

I've managed to create some code to generate the issue
It works if the file is sent as parquet but if the format is delta it fails.

import dlt
from dlt import pipeline
from dlt.sources.filesystem import filesystem, read_csv
from dlt.destinations import filesystem as fs_destination
import pandas as pd
import numpy as np

dlt.config["normalize.data_writer.file_max_bytes"] = 5000000
dlt.config["runtime.log_level"] = "DEBUG"
dlt.config["progress"] = "log"

# Generate a dataset of 100MB
def generate_data(num_rows=4000000):
    data = {
        'id': np.arange(num_rows),
        'value': np.random.rand(num_rows),
        'timestamp': pd.date_range(start='1/1/2022', periods=num_rows, freq='T')
    }
    df = pd.DataFrame(data)
    return df

@dlt.resource(table_name='data3')
def reader():
    data = generate_data()
    yield data

# Define the destination (Azure Blob Storage)
mydestination = fs_destination(
    destination=filesystem(bucket_url=f'az://bronze/'), 
    #credentials={
    #    "account_name": "your-azure-account-name",
    #    "account_key": "your-azure-account-key",
    #},
)

# Create a pipeline
pipeline = dlt.pipeline(
    pipeline_name="dummy_to_delta", 
    dataset_name="dummy_to_delta", 
    destination=mydestination,
)

info = pipeline.run(reader, write_disposition="append", 
                    #loader_file_format="parquet",
                    table_format="delta",)
print(info)

Operating system

Windows

Runtime environment

Local

Python version

3.11

dlt data source

microsoft sql server but the problem also happens with a df datasource

dlt destination

Filesystem & buckets

Other deployment details

No response

Additional information

I have a pipeline that copies a table from sql server to azure gen2 storage. It creates delta files and works fine if the parquet files are small however when they get larger I get a failure sending and it goes into a retry loop.

Logging the azure storage I can see this sort of error details

StatusCode         500
StatusText         InternalError
DurationMs         29870
ServerLatencyMs	   379
Uri                https://playgrounddatapstorage.blob.core.windows.net:443/bronze/risk360_us_sql_server_test/elapsedtime/part-00001-151d1409-68ce-4e58-8065-f6c535dfdd38-c000.snappy.parquet
ServiceType        blob
ObjectKey          /playgrounddatapstorage/bronze/risk360_us_sql_server_test/elapsedtime/part-00001-151d1409-68ce-4e58-8065-f6c535dfdd38-c000.snappy.parquet
RequestHeaderSize  452
RequestBodySize    58179144
ResponseHeaderSize 173
Category           StorageWrite
TlsVersion         TLS 1.2
MetricResponseType NetworkError
SourceAccessTier   Invalid

the pipeline part looks like this

           # Create a dlt pipeline object
            pipeline = dlt.pipeline(
                pipeline_name="client_tables_to_delta_az",
                destination=filesystem(bucket_url=f'az://bronze/'), #abfss
                dataset_name=folder_dataset_name,
            )            
            
            # Run the pipeline
            load_info = pipeline.run(all_data(connection_string, database_item), 
                                     write_disposition="append", 
                                     table_format="delta",                                                                  
                                    )

Looking at the azure logs for smaller files they look like this

StatusCode         201
StatusText         Success
DurationMs         10
ServerLatencyMs    10
Uri                https://playgrounddatapstorage2.blob.core.windows.net:443/bronze/risk360_us_sql_server/customercountlog/database_name=Dfsc_Clearview/year=2028/part-00001-38473a92-4ebc-4991-bb54-381028d88834-c000.snappy.parquet
ServiceType        blob
ObjectKey          /playgrounddatapstorage2/bronze/risk360_us_sql_server/customercountlog/database_name=Dfsc_Clearview/year=2028/part-00001-38473a92-4ebc-4991-bb54-381028d88834-c000.snappy.parquet
RequestHeaderSize  489
RequestBodySize    4581
ResponseHeaderSize 279
Category           StorageRead
TlsVersion         TLS 1.2
MetricResponseType Success
SourceAccessTier   Invalid
SourceSystem       Azure

I also tried sending the larger parquet file using a standalone python script and the azure.storage.blob package

from azure.storage.blob import BlobServiceClient
from datetime import timedelta  # Import timedelta

# Create a BlobServiceClient with custom timeout settings
blob_service_client = BlobServiceClient(
    account_url="https://playgrounddatapstorage.blob.core.windows.net/",
    credential="Mykey")

# Upload a large file
blob_client = blob_service_client.get_blob_client(container="bronze", blob="test_load_large_file.parquet")
with open("C:/data/dlt/load_data_delta/dbo_elapsedtime/risk360_us_sql_server/elapsedtime/database_name=PJM_Settlements/year=2018/part-00001-8d376a4b-848b-4f27-84bc-34b70f9999b7-c000.snappy.parquet", "rb") as data:
   blob_client.upload_blob(data, overwrite=True)

This worked fine and seemed to send the file in blocks
the logs for one block look like this

StatusCode         201
StatusText         Success
DurationMs         6812
ServerLatencyMs    205
Uri                https://playgrounddatapstorage.blob.core.windows.net:443/bronze/test_load_large_file.parquet?comp=block&blockid=TURBd01EQXdNREF3TURBd01EQXdNREF3TURBd01EQXdNakE1TnpFMU1qQSUzRA%3D%3D
ServiceType        blob
ObjectKey          /playgrounddatapstorage/bronze/test_load_large_file.parquet
RequestHeaderSize  652
RequestBodySize    4194304
ResponseHeaderSize 237
Category           StorageWrite
TlsVersion         TLS 1.3
MetricResponseType Success
SourceAccessTier   Invalid
SourceSystem       Azure

I was also able to send pure parquet files to azure without an issue however delta seems to create larger parquet files.

I also tried adjusting a number of the dlt config items e.g.

# Set the DLT configuration
dlt.config["buffer_max_items"] = 25000
dlt.config["sources.sql_database.buffer_max_items"] = 25000
dlt.config["extract.workers"] = 1
dlt.config["normalize.workers"] = 1
dlt.config["load.workers"] = 1
dlt.config["normalize.data_writer.file_max_bytes"] = 5000000
dlt.config["data_writer.file_max_bytes"] = 10000
dlt.config["sources.data_writer.file_max_bytes"] = 10000
dlt.config["data_writer.file_max_items"] = 15000
dlt.config["file_max_items"] = 15000
dlt.config["sources.data_writer.file_max_items"] = 25000
dlt.config["sources.data_writer.file_max_items"] = 25000
@rudolfix
Copy link
Collaborator

I think that your problem is related to this: #2030 which in turn unfortunately waits for a delta "bug" to be fixed.

we'll probably merge #2030 to give our users workaround (merge many files instead of a single big dataset)

@rudolfix rudolfix added the question Further information is requested label Nov 22, 2024
@rudolfix rudolfix moved this from Todo to In Progress in dlt core library Nov 22, 2024
@pwr-philarmstrong
Copy link
Author

ok. not sure of the details of that issue. Its worth pointing out that the file is created correctly in the normalized completed_jobs folder, but when it sends to azure it tries do it in a single block. If I use parquet and not delta the file is sent using small blocks. Also Delta files are created fine to a local filesystem its just when I want to use azure that if fails. Not sure if other cloud services have a similar issue.

@sh-rp
Copy link
Collaborator

sh-rp commented Nov 25, 2024

@pwr-philarmstrong yes, if you set it to parquet, we use our own upload code and send it file by file (parallelized but limited to the amount of load step workers). For delta tables we use python deltalake which loads all files into memory before sending them. Like @rudolfix pointed out, we are waiting for a bug to be fixed in the deltalake rust backend.

@pwr-philarmstrong
Copy link
Author

Thanks for the clarification. I'll watch out for that bug being fixed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
Status: In Progress
Development

No branches or pull requests

3 participants