Skip to content

Commit

Permalink
Add test.
Browse files Browse the repository at this point in the history
Signed-off-by: Aloysius Lim <[email protected]>
  • Loading branch information
aloysius-lim committed Jan 3, 2025
1 parent d164130 commit e603fa5
Showing 1 changed file with 98 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from feast.infra.offline_stores.offline_utils import (
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
)
from feast.types import Float32, Int32
from feast.types import Float32, Int32, String
from feast.utils import _utc_now
from tests.integration.feature_repos.repo_configuration import (
construct_universal_feature_views,
Expand Down Expand Up @@ -639,3 +639,100 @@ def test_historical_features_containing_backfills(environment):
actual_df,
sort_by=["driver_id"],
)


@pytest.mark.integration
@pytest.mark.universal_offline_stores
@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v))
def test_historical_features_field_mapping(
environment, universal_data_sources, full_feature_names
):
store = environment.feature_store

# (entities, datasets, data_sources) = universal_data_sources
# feature_views = construct_universal_feature_views(data_sources)

now = datetime.now().replace(microsecond=0, second=0, minute=0)
tomorrow = now + timedelta(days=1)
day_after_tomorrow = now + timedelta(days=2)

entity_df = pd.DataFrame(
data=[
{"driver_id": 1001, "event_timestamp": day_after_tomorrow},
{"driver_id": 1002, "event_timestamp": day_after_tomorrow},
]
)

driver_stats_df = pd.DataFrame(
data=[
{
"id": 1001,
"avg_daily_trips": 20,
"event_timestamp": now,
"created": tomorrow,
},
{
"id": 1002,
"avg_daily_trips": 40,
"event_timestamp": tomorrow,
"created": now,
},
]
)

expected_df = pd.DataFrame(
data=[
{
"driver_id": 1001,
"event_timestamp": day_after_tomorrow,
"avg_daily_trips": 20,
},
{
"driver_id": 1002,
"event_timestamp": day_after_tomorrow,
"avg_daily_trips": 40,
},
]
)

driver_stats_data_source = environment.data_source_creator.create_data_source(
df=driver_stats_df,
destination_name=f"test_driver_stats_{int(time.time_ns())}_{random.randint(1000, 9999)}",
timestamp_field="event_timestamp",
created_timestamp_column="created",
# Map original "id" column to "driver_id" join key
field_mapping={"id": "driver_id"}
)

driver = Entity(name="driver", join_keys=["driver_id"])
driver_fv = FeatureView(
name="driver_stats",
entities=[driver],
schema=[
Field(name="driver_id", dtype=String),
Field(name="avg_daily_trips", dtype=Int32)
],
source=driver_stats_data_source,
)

store.apply([driver, driver_fv])

offline_job = store.get_historical_features(
entity_df=entity_df,
features=["driver_stats:avg_daily_trips"],
full_feature_names=False,
)

start_time = _utc_now()
actual_df = offline_job.to_df()

print(f"actual_df shape: {actual_df.shape}")
end_time = _utc_now()
print(str(f"Time to execute job_from_df.to_df() = '{(end_time - start_time)}'\n"))

assert sorted(expected_df.columns) == sorted(actual_df.columns)
validate_dataframes(
expected_df,
actual_df,
sort_by=["driver_id"],
)

0 comments on commit e603fa5

Please sign in to comment.