Skip to content

Two anchors cannot use the same source #492

@xiaoyongzhu

Description

@xiaoyongzhu

Running the following code:


location_id = TypedKey(key_column="DOLocationID",
                       key_column_type=ValueType.INT32,
                       description="location id in NYC",
                       full_name="nyc_taxi.location_id")

batch_source = HdfsSource(name="nycTaxiBatchSource1",
                          path="wasbs://[email protected]/sample_data/green_tripdata_2020-04_with_index.csv",
                          event_timestamp_column="lpep_dropoff_datetime",
                          preprocessing=feathr_udf_day_calc,
                          timestamp_format="yyyy-MM-dd HH:mm:ss")

f_trip_distance = Feature(name="f_trip_distance",
                          feature_type=FLOAT, 
                          key=location_id,
                          transform="trip_distance")
f_trip_time_duration = Feature(name="f_trip_time_duration",
                               feature_type=INT32,
                               key=location_id,
                               transform="(to_unix_timestamp(lpep_dropoff_datetime) - to_unix_timestamp(lpep_pickup_datetime))/60")

features = [
    f_trip_distance,
    f_trip_time_duration,
    Feature(name="f_is_long_trip_distance",
            feature_type=BOOLEAN,
            key=location_id,
            transform="cast_float(trip_distance)>30"),
    Feature(name="f_day_of_week",
            feature_type=INT32,
            key=location_id,
            transform="dayofweek(lpep_dropoff_datetime)"),
]

request_anchor = FeatureAnchor(name="request_features",
                               source=batch_source,
                               features=features)


agg_features = [Feature(name="f_location_avg_fare",
                        key=location_id,
                        feature_type=FLOAT,
                        transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)",
                                                          agg_func="AVG",
                                                          window="7d")),
                Feature(name="f_location_max_fare",
                        key=location_id,
                        feature_type=FLOAT,
                        transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)",
                                                          agg_func="MAX",
                                                          window="7d")),
                Feature(name="f_location_total_fare_cents",
                        key=location_id,
                        feature_type=FLOAT,
                        transform=WindowAggTransformation(agg_expr="fare_amount_cents",
                                                          agg_func="SUM",
                                                          window="7d")),
                ]

agg_anchor = FeatureAnchor(name="aggregationFeatures",
                           source=batch_source,
                           features=agg_features)

# f_trip_time_distance = DerivedFeature(name="f_trip_time_distance",
#                                       feature_type=FLOAT,
#                                       input_features=[
#                                           f_trip_distance, f_trip_time_duration],
#                                       transform="f_trip_distance * f_trip_time_duration")

# f_trip_time_rounded = DerivedFeature(name="f_trip_time_rounded",
#                                      feature_type=INT32,
#                                      input_features=[f_trip_time_duration],
#                                      transform="f_trip_time_duration % 10")
client.build_features(anchor_list=[agg_anchor, request_anchor], 

It will yield this error:

Traceback (most recent call last):
  File "/Users/xiazhu/Desktop/FeathrDemoHDFSSink.py", line 312, in <module>
    client.build_features(anchor_list=[agg_anchor, request_anchor], 
  File "/Users/xiazhu/Documents/GitHub/feathr/feathr_project/feathr/client.py", line 227, in build_features
    raise RuntimeError(f"Source name should be unique but there are duplicate source names in your source "
RuntimeError: Source name should be unique but there are duplicate source names in your source definitions. Source name of <function feathr_udf_day_calc at 0x7fbff1856f70>

Two different anchors can use the same source, but seems there's a small bug in the code which prevents us to do so.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions