From 7867ae4fb206290be2ab360718f5498a456e5bba Mon Sep 17 00:00:00 2001 From: zerafachris PERSONAL Date: Mon, 6 Jan 2025 16:50:24 +0100 Subject: [PATCH] inclusion of kafka_settings as part of KafkaOptions to allow for pyspark kafka settings Signed-off-by: zerafachris PERSONAL --- sdk/python/feast/data_source.py | 4 ++ .../infra/contrib/spark_kafka_processor.py | 44 ++++++++----------- 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 25475fcb4c..171ce40c23 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -41,11 +41,13 @@ def __init__( message_format: StreamFormat, topic: str, watermark_delay_threshold: Optional[timedelta] = None, + kafka_settings: Optional[Dict[str, str]] = None, ): self.kafka_bootstrap_servers = kafka_bootstrap_servers self.message_format = message_format self.topic = topic self.watermark_delay_threshold = watermark_delay_threshold or None + self.kafka_settings = kafka_settings or None @classmethod def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions): @@ -70,6 +72,7 @@ def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions): message_format=StreamFormat.from_proto(kafka_options_proto.message_format), topic=kafka_options_proto.topic, watermark_delay_threshold=watermark_delay_threshold, + kafka_settings=kafka_settings, ) return kafka_options @@ -91,6 +94,7 @@ def to_proto(self) -> DataSourceProto.KafkaOptions: message_format=self.message_format.to_proto(), topic=self.topic, watermark_delay_threshold=watermark_delay_threshold, + kafka_settings=self.kafka_settings, ) return kafka_options_proto diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index e148000bc9..536271ad6c 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -80,12 +80,7 @@ def ingest_stream_feature_view( @no_type_check def _ingest_stream_data(self) -> StreamTable: """Only supports json and avro formats currently.""" - if self.format == "json": - if not isinstance( - self.data_source.kafka_options.message_format, JsonFormat - ): - raise ValueError("kafka source message format is not jsonformat") - stream_df = ( + stream_df = ( self.spark.readStream.format("kafka") .option( "kafka.bootstrap.servers", @@ -93,39 +88,38 @@ def _ingest_stream_data(self) -> StreamTable: ) .option("subscribe", self.data_source.kafka_options.topic) .option("startingOffsets", "latest") # Query start - .load() - .selectExpr("CAST(value AS STRING)") - .select( + ) + + for k,v in self.data_source.kafka_options.kafka_settings.items(): + stream_df = stream_df.option(k,v) + + stream_df = stream_df.load().selectExpr("CAST(value AS STRING)") + + if self.format == "json": + if not isinstance(self.data_source.kafka_options.message_format, JsonFormat): + raise ValueError("kafka source message format is not jsonformat") + stream_df = stream_df.select( from_json( col("value"), self.data_source.kafka_options.message_format.schema_json, ).alias("table") ) - .select("table.*") - ) - else: + elif self.format == "avro": if not isinstance( self.data_source.kafka_options.message_format, AvroFormat ): raise ValueError("kafka source message format is not avro format") - stream_df = ( - self.spark.readStream.format("kafka") - .option( - "kafka.bootstrap.servers", - self.data_source.kafka_options.kafka_bootstrap_servers, - ) - .option("subscribe", self.data_source.kafka_options.topic) - .option("startingOffsets", "latest") # Query start - .load() - .selectExpr("CAST(value AS STRING)") - .select( + stream_df = stream_df.select( from_avro( col("value"), self.data_source.kafka_options.message_format.schema_json, ).alias("table") ) - .select("table.*") - ) + else: + raise ValueError("kafka source message format is not currently supported") + + stream_df = stream_df.select("table.*") + return stream_df def _construct_transformation_plan(self, df: StreamTable) -> StreamTable: