Skip to content

Commit 8e70e48

Browse files
rakeshkashyap123Rakesh Kashyap Hanasoge Padmanabha
andauthored
Fix bug when SWA feature datapath does not end in daily/hourly (#1027)
Co-authored-by: Rakesh Kashyap Hanasoge Padmanabha <[email protected]>
1 parent a2e4525 commit 8e70e48

File tree

2 files changed

+80
-6
lines changed

2 files changed

+80
-6
lines changed

feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import java.time.Duration
44
import com.linkedin.feathr.common.{DateParam, DateTimeResolution}
55
import com.linkedin.feathr.offline.source.SourceFormatType._
66
import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource
7-
import com.linkedin.feathr.offline.config.location.{PathList, SimplePath}
7+
import com.linkedin.feathr.offline.config.location.{DataLocation, PathList, SimplePath}
88
import com.linkedin.feathr.offline.generation.IncrementalAggContext
99
import com.linkedin.feathr.offline.source.DataSource
1010
import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor
@@ -96,16 +96,19 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH
9696
val dataLoaderHandlers: List[DataLoaderHandler] = dataPathHandlers.map(_.dataLoaderHandler)
9797

9898
// Only file-based source has real "path", others are just single dataset
99-
val adjustedObsTimeRange = if (factDataSource.location.isFileBasedLocation()) {
99+
val (adjustedObsTimeRange, dataSourcePath) = if (factDataSource.location.isFileBasedLocation()) {
100100
val pathChecker = PathChecker(ss, dataLoaderHandlers)
101101
val pathAnalyzer = new TimeBasedHdfsPathAnalyzer(pathChecker, dataLoaderHandlers)
102102
val pathInfo = pathAnalyzer.analyze(factDataSource.path)
103103
if (pathInfo.dateTimeResolution == DateTimeResolution.DAILY) {
104-
obsTimeRange.adjustWithDateTimeResolution(DateTimeResolution.DAILY)
105-
} else obsTimeRange
104+
(obsTimeRange.adjustWithDateTimeResolution(DateTimeResolution.DAILY), pathInfo.basePath)
105+
} else (obsTimeRange, pathInfo.basePath)
106106
} else {
107-
obsTimeRange
107+
(obsTimeRange, factDataSource.path)
108108
}
109+
// Copy the pathInfo's path into the datasource path as it adds the daily/hourly keyword if it is missing from the path
110+
val updatedFactDataSource = DataSource(dataSourcePath, factDataSource.sourceType, factDataSource.timeWindowParams,
111+
factDataSource.timePartitionPattern, factDataSource.postfixPath)
109112

110113
val timeInterval = OfflineDateTimeUtils.getFactDataTimeRange(adjustedObsTimeRange, window, timeDelays)
111114
val needCreateTimestampColumn = SlidingWindowFeatureUtils.needCreateTimestampColumnFromPartition(factDataSource)
@@ -115,7 +118,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH
115118
val timeSeriesSource =
116119
DataSourceAccessor(
117120
ss = ss,
118-
source = factDataSource,
121+
source = updatedFactDataSource,
119122
dateIntervalOpt = Some(timeInterval),
120123
expectDatumType = None,
121124
failOnMissingPartition = failOnMissingPartition,

feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,77 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest {
315315
assertEquals(df.getAs[Float]("simpleFeature"), 20f)
316316
}
317317

318+
/**
319+
* SWA test when path does not have daily attached to it. It should work as expected.
320+
*/
321+
@Test
322+
def testSwaWithMalformedPath(): Unit = {
323+
val joinConfigAsString =
324+
"""
325+
| settings: {
326+
| observationDataTimeSettings: {
327+
| absoluteTimeRange: {
328+
| timeFormat: yyyy-MM-dd
329+
| startTime: "2018-05-01"
330+
| endTime: "2018-05-03"
331+
| }
332+
| }
333+
| joinTimeSettings: {
334+
| timestampColumn: {
335+
| def: timestamp
336+
| format: yyyy-MM-dd
337+
| }
338+
| }
339+
|}
340+
|
341+
|features: [
342+
| {
343+
| key: [x],
344+
| featureList: ["simplePageViewCount", "simpleFeature"]
345+
| }
346+
|]
347+
""".stripMargin
348+
val featureDefAsString =
349+
"""
350+
|sources: {
351+
| swaSource: {
352+
| location: { path: "slidingWindowAgg/localSWADefaultTest/" }
353+
| timePartitionPattern: "yyyy/MM/dd"
354+
| timeWindowParameters: {
355+
| timestampColumn: "timestamp"
356+
| timestampColumnFormat: "yyyy-MM-dd"
357+
| }
358+
| }
359+
|}
360+
|
361+
|anchors: {
362+
| swaAnchor: {
363+
| source: "swaSource"
364+
| key: "x"
365+
| features: {
366+
| simplePageViewCount: {
367+
| def: "aggregationWindow"
368+
| aggregation: COUNT
369+
| window: 3d
370+
| default: 10
371+
| }
372+
| simpleFeature: {
373+
| def: "aggregationWindow"
374+
| aggregation: COUNT
375+
| window: 3d
376+
| default: 20
377+
| }
378+
| }
379+
| }
380+
|}
381+
""".stripMargin
382+
val res = runLocalFeatureJoinForTest(joinConfigAsString, featureDefAsString, observationDataPath = "slidingWindowAgg/localAnchorTestObsData.avro.json").data
383+
res.show()
384+
val df = res.collect()(0)
385+
assertEquals(df.getAs[Float]("simplePageViewCount"), 10f)
386+
assertEquals(df.getAs[Float]("simpleFeature"), 20f)
387+
}
388+
318389
/**
319390
* SWA test with missing features. To enable this test, set the value of FeatureUtils.SKIP_MISSING_FEATURE to True. From
320391
* Spark 3.1, SparkContext.updateConf() is not supported.

0 commit comments

Comments
 (0)