Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RecordTooLargeException: When producing message to Kafka control topic #297

Open
ArkaSarkar19 opened this issue Sep 23, 2024 · 4 comments
Open

Comments

@ArkaSarkar19
Copy link

ArkaSarkar19 commented Sep 23, 2024

Hi Team

We are getting RecordTooLargeException when the connector tries to produce a message to the control topic.

re\nCaused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 7222162 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.\n"

Can you suggest some ways to reduce the message size that the connector produces to Kafka control topic. We have tried the following configuration inorder to reduce the message size but that doesn't seem to help :

{
  "iceberg.kafka.producer.override.max.request.size": 1048576,
  "iceberg.kafka.write.metadata.metrics.default": "none",
  "iceberg.kafka.write.metadata.metrics.max-inferred-column-defaults": "1",
  "iceberg.kafka.write.metadata.compression-codec": "gzip",
  "iceberg.catalog.table-override.write.metadata.previous-versions-max": 5,
  "iceberg.catalog.table-override.write.parquet.row-group-size-bytes": 1048576,
  "iceberg.kafka.producer.override.buffer.memory": 524288,
  "iceberg.kafka.producer.override.compression.type": "snappy"
}

This has been a huge blocker for us as we cannot increase these limits on the Kafka broker end. Can you please suggest a fix for the same.

QQ: Is there an enterprise version for this connector ?

Here is the connector config :


{
  "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
  "iceberg.catalog.table-override.write.data.path": "<REDACTED>",
  "iceberg.catalog.table-override.write.parquet.compression-codec": "snappy",
  "errors.log.include.messages": "true",
  "iceberg.tables.cdc-field": "operationType",
  "iceberg.catalog.s3.region": "us-east-1",
  "iceberg.catalog.client.region": "us-east-1",
  "iceberg.table.dummy_table_name.partition-by": "checkIn, checkOut",
  "iceberg.catalog.table-override.write.metadata.path": "<REDACTED>",
  "errors.log.enable": "true",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "consumer.override.bootstrap.servers": "<REDACTED>",
  "value.converter.schema.registry.url": "<REDACTED>",
  "iceberg.partition": "part1, part2",
  "name": "dummy_connector_name",
  "iceberg.tables.evolve-schema-enabled": "true",
  "iceberg.catalog.table-override.write.metadata.previous-versions-max": "5",
  "tasks.max": "9",
  "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
  "iceberg.catalog.s3.sse.key": "AES256",
  "iceberg.tables.upsert-mode-enabled": "true",
  "iceberg.tables.auto-create-enabled": "true",
  "iceberg.tables": "demo.dummy_table_name",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "iceberg.catalog.s3.sse.type": "s3",
  "iceberg.catalog.table-override.write.metadata.delete-after-commit.enabled": "true",
  "iceberg.table.dummy_table_name.id-columns": "id_column",
  "topics": "dummy_topic_name",
  "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
  "iceberg.kafka.write.metadata.metrics.default": "none",
  "iceberg.control.commit.interval-ms": "900000",
  "iceberg.catalog.uri": "<REDACTED>",
  "key.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
  "iceberg.catalog": "spark_catalog",
  "iceberg.kafka.write.metadata.compression-codec": "gzip",
  "consumer.override.auto.offset.reset": "latest",
  "iceberg.catalog.warehouse": "<REDACTED>",
  "iceberg.control.topic": "control_dummy_topic_name",
  "iceberg.kafka.write.metadata.metrics.max-inferred-column-defaults": "1",
  "key.converter.schema.registry.url": "<REDACTED>",
  "iceberg.catalog.type": "hive",
  "iceberg.catalog.s3.path-style-access": "true"
}

@whisssky
Copy link

Hi ArkaSarkar19, I meet the same issue. Do you have any solution now? Thanks

@Sharu95
Copy link

Sharu95 commented Jan 16, 2025

This is not a iceberg connector issue, but you can increase the max.request.size producer config, although that is usually not recommended.

Are you getting the exception for messages produced to the control topic or your own topic?

@annurahar
Copy link

It is not possible to increase max.request.size producer config as it is controlled by different team. This exception is occurring for control topic.
I would like to understand why control topic message is becoming bigger than 1 MB

@Sharu95
Copy link

Sharu95 commented Jan 16, 2025

Schema can also be embedded in the record as well, depending on config, so check that. If not, it might just be that you have a lot of data so you need to dig into the data payload.

max.request.size is a producer config, so unless another team controls your application producer, iceberg.kafka.producer.override.max.request.size should solve that issue 👀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants