Skip to content

Feathr will throw out error if running the job in a "single node" cluster and reading files such as parquet or delta lake #474

@xiaoyongzhu

Description

@xiaoyongzhu

This issue is very interesting. Some background:

For databricks, users can create several types of clusters, one of them being "single node", and another being "standard". When running in a single node cluster, the spark context is set to "local" by databricks.

I.e. executing this line in databricks notebook:

spark.sparkContext.isLocal

and the result will be "True" for single node cluster, and will be "False" for standard cluster.

This causes some problem. If running Feathr in a single node cluster, Feathr might throw out this error message:

: java.lang.NullPointerException
	at com.linkedin.feathr.offline.util.SourceUtils$.getLocalPath(SourceUtils.scala:690)
	at com.linkedin.feathr.offline.util.SourceUtils$.getLocalDF(SourceUtils.scala:300)
	at com.linkedin.feathr.offline.util.SourceUtils$.loadObservationAsDF(SourceUtils.scala:681)
	at com.linkedin.feathr.offline.job.FeatureJoinJob$.feathrJoinRun(FeatureJoinJob.scala:161)
	at com.linkedin.feathr.offline.job.FeatureJoinJob$.run(FeatureJoinJob.scala:68)
	at com.linkedin.feathr.offline.job.FeatureJoinJob$.main(FeatureJoinJob.scala:314)
	at com.linkedin.feathr.offline.job.FeatureJoinJob$.mainWithPreprocessedDataFrame(FeatureJoinJob.scala:307)
	at com.linkedin.feathr.offline.job.FeatureJoinJob.mainWithPreprocessedDataFrame(FeatureJoinJob.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:295)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:251)
	at java.lang.Thread.run(Thread.java:748)
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)

The reason being that per those lines:

https://github.com/linkedin/feathr/blob/xiaoyzhu/kafka_udf/src/main/scala/com/linkedin/feathr/offline/util/SourceUtils.scala#L694-L700

If the context is local (i.e. single node), and the path cannot be parsed correctly (say using a delta table but is not ending with .delta in the file name), the code will treat it as local code and try to load the dataframe from local paths. But those files are actually in dbfs/wasb/etc, and this is not the expected behavior.

case _ => {
        if (ss.sparkContext.isLocal){
          getLocalDF(ss, inputData.inputPath, dataLoaderHandlers)
        } else {
          loadAsDataFrame(ss, SimplePath(inputData.inputPath),dataLoaderHandlers)
        }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions