-
Notifications
You must be signed in to change notification settings - Fork 238
Closed
Description
Running the following code:
location_id = TypedKey(key_column="DOLocationID",
key_column_type=ValueType.INT32,
description="location id in NYC",
full_name="nyc_taxi.location_id")
batch_source = HdfsSource(name="nycTaxiBatchSource1",
path="wasbs://[email protected]/sample_data/green_tripdata_2020-04_with_index.csv",
event_timestamp_column="lpep_dropoff_datetime",
preprocessing=feathr_udf_day_calc,
timestamp_format="yyyy-MM-dd HH:mm:ss")
f_trip_distance = Feature(name="f_trip_distance",
feature_type=FLOAT,
key=location_id,
transform="trip_distance")
f_trip_time_duration = Feature(name="f_trip_time_duration",
feature_type=INT32,
key=location_id,
transform="(to_unix_timestamp(lpep_dropoff_datetime) - to_unix_timestamp(lpep_pickup_datetime))/60")
features = [
f_trip_distance,
f_trip_time_duration,
Feature(name="f_is_long_trip_distance",
feature_type=BOOLEAN,
key=location_id,
transform="cast_float(trip_distance)>30"),
Feature(name="f_day_of_week",
feature_type=INT32,
key=location_id,
transform="dayofweek(lpep_dropoff_datetime)"),
]
request_anchor = FeatureAnchor(name="request_features",
source=batch_source,
features=features)
agg_features = [Feature(name="f_location_avg_fare",
key=location_id,
feature_type=FLOAT,
transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)",
agg_func="AVG",
window="7d")),
Feature(name="f_location_max_fare",
key=location_id,
feature_type=FLOAT,
transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)",
agg_func="MAX",
window="7d")),
Feature(name="f_location_total_fare_cents",
key=location_id,
feature_type=FLOAT,
transform=WindowAggTransformation(agg_expr="fare_amount_cents",
agg_func="SUM",
window="7d")),
]
agg_anchor = FeatureAnchor(name="aggregationFeatures",
source=batch_source,
features=agg_features)
# f_trip_time_distance = DerivedFeature(name="f_trip_time_distance",
# feature_type=FLOAT,
# input_features=[
# f_trip_distance, f_trip_time_duration],
# transform="f_trip_distance * f_trip_time_duration")
# f_trip_time_rounded = DerivedFeature(name="f_trip_time_rounded",
# feature_type=INT32,
# input_features=[f_trip_time_duration],
# transform="f_trip_time_duration % 10")
client.build_features(anchor_list=[agg_anchor, request_anchor],
It will yield this error:
Traceback (most recent call last):
File "/Users/xiazhu/Desktop/FeathrDemoHDFSSink.py", line 312, in <module>
client.build_features(anchor_list=[agg_anchor, request_anchor],
File "/Users/xiazhu/Documents/GitHub/feathr/feathr_project/feathr/client.py", line 227, in build_features
raise RuntimeError(f"Source name should be unique but there are duplicate source names in your source "
RuntimeError: Source name should be unique but there are duplicate source names in your source definitions. Source name of <function feathr_udf_day_calc at 0x7fbff1856f70>
Two different anchors can use the same source, but seems there's a small bug in the code which prevents us to do so.
Metadata
Metadata
Assignees
Labels
No labels