Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to avoid small files accumulation and uneven partitioning #323

Open
santurini opened this issue Dec 19, 2024 · 0 comments
Open

How to avoid small files accumulation and uneven partitioning #323

santurini opened this issue Dec 19, 2024 · 0 comments

Comments

@santurini
Copy link

Hello,
I am using KafkaConnect with a SQL source and Iceberg sink to create and sync my iceberg tables in aws s3 with the tables of my MySQL database.

My desiderata are:

  • Sync the tables once a day instead of realtime (possibly setting iceberg.control.commit.interval-ms to 1 day in ms)
  • Store in the table data and metadata bucket only the most recent files and not all the commits
  • Partition the table in evenly sized parquet files (for instance when exporting via spark I was splitting the table in 20 almost equal partitions based on the id column)

For now I only have an idea on how to solve the first problem (not really sure about it) while I have zero idea on how to change my sink configuration file, in this example I was trying to sync a single table and after 2 hours I had already 80 small files both in the data and metadata folder, as each time it commits 3 more files:

{
  "name": "IcebergSinkConnector",
  "config": {
    "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
    "iceberg.table.mytable.id-columns": "ID",
    "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "iceberg.catalog.warehouse": "s3a://datalake/iceberg",
    "name": "IcebergSinkConnector",
    "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "transforms": "unwrap",
    "topics": "mysql.mydb.mytable",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.unwrap.drop.tombstones": "false",
    "iceberg.tables": "iceberg.mytable",
    "iceberg.tables.default-id-columns": "ID",
    "iceberg.tables.upsert-mode-enabled": "true",
    "iceberg.tables.auto-create-enabled": "true",
    "iceberg.tables.schema-case-insensitive": "true",
    "iceberg.tables.evolve-schema-enabled": "true",
    "iceberg.catalog": "iceberg",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true"
  }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant