Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-16223

Codegen failure with a Dataframe program using an array

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • None
    • None
    • SQL
    • None

    Description

      When we compile a Dataframe program with an operation to large array, compilation failure occurs. This is because a local variable inputadapter_value cannot be referenced in apply() method that is generated by CodegenContext.splitExpressions(). The local variable is defined in processNext() method.

      What is better approach to resolve this? Is it better to pass inputadapter_value to apply() method?

      Example program

      val n = 500
      val statement = (0 to n - 1).map(i => s"value + 1.0d")
            .mkString("Array(", ",", ")")
      sparkContext.parallelize(Seq(0.0d, 1.0d), 1).toDF
        .selectExpr(statement).showString(1)
      

      Generated code and stack trace

      23:10:45.801 ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 30, Column 36: Expression "inputadapter_value" is not an rvalue
      /* 001 */ public Object generate(Object[] references) {
      /* 002 */   return new GeneratedIterator(references);
      /* 003 */ }
      /* 004 */
      /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
      /* 006 */   private Object[] references;
      /* 007 */   private scala.collection.Iterator inputadapter_input;
      /* 008 */   private Object[] project_values;
      /* 009 */   private UnsafeRow project_result;
      /* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
      /* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter;
      /* 012 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter project_arrayWriter;
      /* 013 */
      /* 014 */   public GeneratedIterator(Object[] references) {
      /* 015 */     this.references = references;
      /* 016 */   }
      /* 017 */
      /* 018 */   public void init(int index, scala.collection.Iterator inputs[]) {
      /* 019 */     partitionIndex = index;
      /* 020 */     inputadapter_input = inputs[0];
      /* 021 */     this.project_values = null;
      /* 022 */     project_result = new UnsafeRow(1);
      /* 023 */     this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 32);
      /* 024 */     this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 1);
      /* 025 */     this.project_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
      /* 026 */   }
      /* 027 */
      /* 028 */   private void project_apply_0(InternalRow inputadapter_row) {
      /* 029 */     double project_value1 = -1.0;
      /* 030 */     project_value1 = inputadapter_value + 1.0D;
      /* 031 */     if (false) {
      /* 032 */       project_values[0] = null;
      /* 033 */     } else {
      /* 034 */       project_values[0] = project_value1;
      /* 035 */     }
      /* 036 */
      /* 037 */     double project_value4 = -1.0;
      /* 038 */     project_value4 = inputadapter_value + 1.0D;
      /* 039 */     if (false) {
      /* 040 */       project_values[1] = null;
      /* 041 */     } else {
      /* 042 */       project_values[1] = project_value4;
      /* 043 */     }
      ...
      /* 4032 */   }
      /* 4033 */
      /* 4034 */   protected void processNext() throws java.io.IOException {
      /* 4035 */     while (inputadapter_input.hasNext()) {
      /* 4036 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
      /* 4037 */       System.out.println("row: " + inputadapter_row.getClass() + ", " + inputadapter_row);
      /* 4038 */       double inputadapter_value = inputadapter_row.getDouble(0);
      /* 4039 */
      /* 4040 */       final boolean project_isNull = false;
      /* 4041 */       this.project_values = new Object[500];project_apply_0(inputadapter_row);
      /* 4042 */       project_apply_1(inputadapter_row);
      /* 4043 */       /* final ArrayData project_value = org.apache.spark.sql.catalyst.util.GenericArrayData.allocate(project_values); */
      /* 4044 */       final ArrayData project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_values);
      /* 4045 */       this.project_values = null;
      /* 4046 */       project_holder.reset();
      /* 4047 */
      /* 4048 */       project_rowWriter.zeroOutNullBytes();
      /* 4049 */
      /* 4050 */       if (project_isNull) {
      /* 4051 */         project_rowWriter.setNullAt(0);
      /* 4052 */       } else {
      /* 4053 */         // Remember the current cursor so that we can calculate how many bytes are
      /* 4054 */         // written later.
      /* 4055 */         final int project_tmpCursor = project_holder.cursor;
      /* 4056 */
      /* 4057 */         if (project_value instanceof UnsafeArrayData) {
      /* 4058 */           final int project_sizeInBytes = ((UnsafeArrayData) project_value).getSizeInBytes();
      /* 4059 */           // grow the global buffer before writing data.
      /* 4060 */           project_holder.grow(project_sizeInBytes);
      /* 4061 */           ((UnsafeArrayData) project_value).writeToMemory(project_holder.buffer, project_holder.cursor);
      /* 4062 */           project_holder.cursor += project_sizeInBytes;
      /* 4063 */
      /* 4064 */         } else {
      /* 4065 */           final int project_numElements = project_value.numElements();
      /* 4066 */           project_arrayWriter.initialize(project_holder, project_numElements, 8);
      /* 4067 */
      /* 4068 */           for (int project_index = 0; project_index < project_numElements; project_index++) {
      /* 4069 */             if (project_value.isNullAt(project_index)) {
      /* 4070 */               project_arrayWriter.setNullAt(project_index);
      /* 4071 */             } else {
      /* 4072 */               final double project_element = project_value.getDouble(project_index);
      /* 4073 */               project_arrayWriter.write(project_index, project_element);
      /* 4074 */             }
      /* 4075 */           }
      /* 4076 */         }
      /* 4077 */
      /* 4078 */         project_rowWriter.setOffsetAndSize(0, project_tmpCursor, project_holder.cursor - project_tmpCursor);
      /* 4079 */         project_rowWriter.alignToWords(project_holder.cursor - project_tmpCursor);
      /* 4080 */       }
      /* 4081 */       project_result.setTotalSize(project_holder.totalSize());
      /* 4082 */       append(project_result);
      /* 4083 */       if (shouldStop()) return;
      /* 4084 */     }
      /* 4085 */   }
      /* 4086 */ }
      
      org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 30, Column 36: Expression "inputadapter_value" is not an rvalue
      	at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:10174)
      	at org.codehaus.janino.UnitCompiler.toRvalueOrCompileException(UnitCompiler.java:6036)
      	at org.codehaus.janino.UnitCompiler.getConstantValue2(UnitCompiler.java:4440)
      	at org.codehaus.janino.UnitCompiler.access$9900(UnitCompiler.java:185)
      	at org.codehaus.janino.UnitCompiler$11.visitAmbiguousName(UnitCompiler.java:4417)
      	at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:3138)
      	at org.codehaus.janino.UnitCompiler.getConstantValue(UnitCompiler.java:4427)
      	at org.codehaus.janino.UnitCompiler.getConstantValue2(UnitCompiler.java:4498)
      	at org.codehaus.janino.UnitCompiler.access$8900(UnitCompiler.java:185)
      	at org.codehaus.janino.UnitCompiler$11.visitBinaryOperation(UnitCompiler.java:4394)
      	at org.codehaus.janino.Java$BinaryOperation.accept(Java.java:3768)
      	at org.codehaus.janino.UnitCompiler.getConstantValue(UnitCompiler.java:4427)
      	at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4360)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2669)
      	at org.codehaus.janino.UnitCompiler.access$4500(UnitCompiler.java:185)
      	at org.codehaus.janino.UnitCompiler$7.visitAssignment(UnitCompiler.java:2619)
      	at org.codehaus.janino.Java$Assignment.accept(Java.java:3405)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2654)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1643)
      	at org.codehaus.janino.UnitCompiler.access$1100(UnitCompiler.java:185)
      	at org.codehaus.janino.UnitCompiler$4.visitExpressionStatement(UnitCompiler.java:936)
      	at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2097)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:958)
      	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1007)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2293)
      	at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:822)
      	at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
      	at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
      	at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
      	at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
      	at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
      	at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
      	at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
      	at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
      	at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
      	at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
      	at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
      	at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
      	at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
      	at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
      	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:878)
      	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:903)
      	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:900)
      	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
      	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
      	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
      	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
      	at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
      	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004)
      	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
      	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:832)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:351)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
      	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
      	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
      	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
      	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2176)
      	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
      	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2525)
      	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2175)
      	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2182)
      	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1918)
      	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1917)
      	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2555)
      	at org.apache.spark.sql.Dataset.head(Dataset.scala:1917)
      	at org.apache.spark.sql.Dataset.take(Dataset.scala:2132)
      	at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
      	at org.apache.spark.sql.MySuite$$anonfun$1.apply$mcV$sp(MySuite.scala:254)
      	at org.apache.spark.sql.MySuite$$anonfun$1.apply(MySuite.scala:29)
      	at org.apache.spark.sql.MySuite$$anonfun$1.apply(MySuite.scala:29)
      	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
      	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
      	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
      	at org.scalatest.Transformer.apply(Transformer.scala:22)
      	at org.scalatest.Transformer.apply(Transformer.scala:20)
      	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
      	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:57)
      	at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
      	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
      	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
      	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
      	at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
      	at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
      	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
      	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
      	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
      	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
      	at scala.collection.immutable.List.foreach(List.scala:381)
      	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
      	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
      	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
      	at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
      	at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
      	at org.scalatest.Suite$class.run(Suite.scala:1424)
      	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
      	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
      	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
      	at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
      	at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
      	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:29)
      	at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
      	at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
      	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:29)
      	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
      	at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
      	at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
      	at scala.collection.immutable.List.foreach(List.scala:381)
      	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
      	at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
      	at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
      	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
      	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
      	at org.scalatest.tools.Runner$.run(Runner.scala:883)
      	at org.scalatest.tools.Runner.run(Runner.scala)
      	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
      	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              kiszk Kazuaki Ishizaki
              Votes:
              2 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: