-
Notifications
You must be signed in to change notification settings - Fork 238
Description
This issue is very interesting. Some background:
For databricks, users can create several types of clusters, one of them being "single node", and another being "standard". When running in a single node cluster, the spark context is set to "local" by databricks.
I.e. executing this line in databricks notebook:
spark.sparkContext.isLocal
and the result will be "True" for single node cluster, and will be "False" for standard cluster.
This causes some problem. If running Feathr in a single node cluster, Feathr might throw out this error message:
: java.lang.NullPointerException
at com.linkedin.feathr.offline.util.SourceUtils$.getLocalPath(SourceUtils.scala:690)
at com.linkedin.feathr.offline.util.SourceUtils$.getLocalDF(SourceUtils.scala:300)
at com.linkedin.feathr.offline.util.SourceUtils$.loadObservationAsDF(SourceUtils.scala:681)
at com.linkedin.feathr.offline.job.FeatureJoinJob$.feathrJoinRun(FeatureJoinJob.scala:161)
at com.linkedin.feathr.offline.job.FeatureJoinJob$.run(FeatureJoinJob.scala:68)
at com.linkedin.feathr.offline.job.FeatureJoinJob$.main(FeatureJoinJob.scala:314)
at com.linkedin.feathr.offline.job.FeatureJoinJob$.mainWithPreprocessedDataFrame(FeatureJoinJob.scala:307)
at com.linkedin.feathr.offline.job.FeatureJoinJob.mainWithPreprocessedDataFrame(FeatureJoinJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
The reason being that per those lines:
If the context is local (i.e. single node), and the path cannot be parsed correctly (say using a delta table but is not ending with .delta in the file name), the code will treat it as local code and try to load the dataframe from local paths. But those files are actually in dbfs/wasb/etc, and this is not the expected behavior.
case _ => {
if (ss.sparkContext.isLocal){
getLocalDF(ss, inputData.inputPath, dataLoaderHandlers)
} else {
loadAsDataFrame(ss, SimplePath(inputData.inputPath),dataLoaderHandlers)
}