Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: inclusion of kafka_settings as part of KafkaOptions to allow for pysp… #4901

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ def __init__(
message_format: StreamFormat,
topic: str,
watermark_delay_threshold: Optional[timedelta] = None,
kafka_settings: Optional[Dict[str, str]] = None,
):
self.kafka_bootstrap_servers = kafka_bootstrap_servers
self.message_format = message_format
self.topic = topic
self.watermark_delay_threshold = watermark_delay_threshold or None
self.kafka_settings = kafka_settings or None

@classmethod
def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
Expand All @@ -70,6 +72,7 @@ def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
message_format=StreamFormat.from_proto(kafka_options_proto.message_format),
topic=kafka_options_proto.topic,
watermark_delay_threshold=watermark_delay_threshold,
kafka_settings=kafka_settings,
)

return kafka_options
Expand All @@ -91,6 +94,7 @@ def to_proto(self) -> DataSourceProto.KafkaOptions:
message_format=self.message_format.to_proto(),
topic=self.topic,
watermark_delay_threshold=watermark_delay_threshold,
kafka_settings=self.kafka_settings,
)

return kafka_options_proto
Expand Down
44 changes: 19 additions & 25 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,52 +80,46 @@ def ingest_stream_feature_view(
@no_type_check
def _ingest_stream_data(self) -> StreamTable:
"""Only supports json and avro formats currently."""
if self.format == "json":
if not isinstance(
self.data_source.kafka_options.message_format, JsonFormat
):
raise ValueError("kafka source message format is not jsonformat")
stream_df = (
stream_df = (
self.spark.readStream.format("kafka")
.option(
"kafka.bootstrap.servers",
self.data_source.kafka_options.kafka_bootstrap_servers,
)
.option("subscribe", self.data_source.kafka_options.topic)
.option("startingOffsets", "latest") # Query start
.load()
.selectExpr("CAST(value AS STRING)")
.select(
)

for k,v in self.data_source.kafka_options.kafka_settings.items():
stream_df = stream_df.option(k,v)

stream_df = stream_df.load().selectExpr("CAST(value AS STRING)")

if self.format == "json":
if not isinstance(self.data_source.kafka_options.message_format, JsonFormat):
raise ValueError("kafka source message format is not jsonformat")
stream_df = stream_df.select(
from_json(
col("value"),
self.data_source.kafka_options.message_format.schema_json,
).alias("table")
)
.select("table.*")
)
else:
elif self.format == "avro":
if not isinstance(
self.data_source.kafka_options.message_format, AvroFormat
):
raise ValueError("kafka source message format is not avro format")
stream_df = (
self.spark.readStream.format("kafka")
.option(
"kafka.bootstrap.servers",
self.data_source.kafka_options.kafka_bootstrap_servers,
)
.option("subscribe", self.data_source.kafka_options.topic)
.option("startingOffsets", "latest") # Query start
.load()
.selectExpr("CAST(value AS STRING)")
.select(
stream_df = stream_df.select(
from_avro(
col("value"),
self.data_source.kafka_options.message_format.schema_json,
).alias("table")
)
.select("table.*")
)
else:
raise ValueError("kafka source message format is not currently supported")

stream_df = stream_df.select("table.*")

return stream_df

def _construct_transformation_plan(self, df: StreamTable) -> StreamTable:
Expand Down