Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-14675

ClassFormatError in codegen when using Aggregator

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 2.0.0
    • SQL
    • None
    • spark 2.0.0-SNAPSHOT

    Description

      code:

        val toList = new Aggregator[(String, Int), Seq[Int], Seq[Int]] {
          def bufferEncoder: Encoder[Seq[Int]] = implicitly[Encoder[Seq[Int]]]
          def finish(reduction: Seq[Int]): Seq[Int] = reduction
          def merge(b1: Seq[Int],b2: Seq[Int]): Seq[Int] = b1 ++ b2
          def outputEncoder: Encoder[Seq[Int]] = implicitly[Encoder[Seq[Int]]]
          def reduce(b: Seq[Int],a: (String, Int)): Seq[Int] = b :+ a._2
          def zero: Seq[Int] = Seq.empty[Int]
        }
      
        val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
        val ds2 = ds1.groupByKey(_._1).agg(toList.toColumn)
        ds2.show
      

      this gives me:

      6/04/15 18:31:22 WARN TaskSetManager: Lost task 1.0 in stage 3.0 (TID 7, localhost): java.lang.ClassFormatError: Duplicate field name&signature in class file org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificMutableProjection
      	at java.lang.ClassLoader.defineClass1(Native Method)
      	at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
      	at org.codehaus.janino.ByteArrayClassLoader.findClass(ByteArrayClassLoader.java:66)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass.generate(Unknown Source)
      	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$create$2.apply(GenerateMutableProjection.scala:140)
      	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$create$2.apply(GenerateMutableProjection.scala:139)
      	at org.apache.spark.sql.execution.aggregate.AggregationIterator.generateProcessRow(AggregationIterator.scala:178)
      	at org.apache.spark.sql.execution.aggregate.AggregationIterator.<init>(AggregationIterator.scala:197)
      	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.<init>(SortBasedAggregationIterator.scala:39)
      	at org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1$$anonfun$3.apply(SortBasedAggregate.scala:80)
      	at org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1$$anonfun$3.apply(SortBasedAggregate.scala:71)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:768)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:768)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:72)
      	at org.apache.spark.scheduler.Task.run(Task.scala:86)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      

      when i do:

       ds2.queryExecution.debug.codegen()
      

      i get:

      Found 2 WholeStageCodegen subtrees.
      == Subtree 1 / 2 ==
      WholeStageCodegen
      :  +- Sort [value#6 ASC], false, 0
      :     +- INPUT
      +- AppendColumns <function1>, newInstance(class scala.Tuple2), [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String], true) AS value#6]
         +- LocalTableScan [_1#2,_2#3], [[0,1800000001,1,61],[0,1800000001,2,61],[0,1800000001,3,61]]
      
      Generated code:
      /* 001 */ public Object generate(Object[] references) {
      /* 002 */   return new GeneratedIterator(references);
      /* 003 */ }
      /* 004 */ 
      /* 005 */ /** Codegened pipeline for:
      /* 006 */ * Sort [value#6 ASC], false, 0
      /* 007 */ +- INPUT
      /* 008 */ */
      /* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
      /* 010 */   private Object[] references;
      /* 011 */   private boolean sort_needToSort;
      /* 012 */   private org.apache.spark.sql.execution.Sort sort_plan;
      /* 013 */   private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
      /* 014 */   private org.apache.spark.executor.TaskMetrics sort_metrics;
      /* 015 */   private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
      /* 016 */   private scala.collection.Iterator inputadapter_input;
      /* 017 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_dataSize;
      /* 018 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue;
      /* 019 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_spillSize;
      /* 020 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1;
      /* 021 */   
      /* 022 */   public GeneratedIterator(Object[] references) {
      /* 023 */     this.references = references;
      /* 024 */   }
      /* 025 */   
      /* 026 */   public void init(int index, scala.collection.Iterator inputs[]) {
      /* 027 */     partitionIndex = index;
      /* 028 */     sort_needToSort = true;
      /* 029 */     this.sort_plan = (org.apache.spark.sql.execution.Sort) references[0];
      /* 030 */     sort_sorter = sort_plan.createSorter();
      /* 031 */     sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
      /* 032 */     
      /* 033 */     inputadapter_input = inputs[0];
      /* 034 */     this.sort_dataSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
      /* 035 */     sort_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_dataSize.localValue();
      /* 036 */     this.sort_spillSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
      /* 037 */     sort_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_spillSize.localValue();
      /* 038 */   }
      /* 039 */   
      /* 040 */   private void sort_addToSorter() throws java.io.IOException {
      /* 041 */     /*** PRODUCE: INPUT */
      /* 042 */     
      /* 043 */     while (inputadapter_input.hasNext()) {
      /* 044 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
      /* 045 */       /*** CONSUME: Sort [value#6 ASC], false, 0 */
      /* 046 */       
      /* 047 */       sort_sorter.insertRow((UnsafeRow)inputadapter_row);
      /* 048 */       if (shouldStop()) return;
      /* 049 */     }
      /* 050 */     
      /* 051 */   }
      /* 052 */   
      /* 053 */   protected void processNext() throws java.io.IOException {
      /* 054 */     /*** PRODUCE: Sort [value#6 ASC], false, 0 */
      /* 055 */     if (sort_needToSort) {
      /* 056 */       sort_addToSorter();
      /* 057 */       Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
      /* 058 */       sort_sortedIter = sort_sorter.sort();
      /* 059 */       sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
      /* 060 */       sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
      /* 061 */       sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
      /* 062 */       sort_needToSort = false;
      /* 063 */     }
      /* 064 */     
      /* 065 */     while (sort_sortedIter.hasNext()) {
      /* 066 */       UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
      /* 067 */       
      /* 068 */       /*** CONSUME: WholeStageCodegen */
      /* 069 */       
      /* 070 */       append(sort_outputRow);
      /* 071 */       
      /* 072 */       if (shouldStop()) return;
      /* 073 */     }
      /* 074 */   }
      /* 075 */ }
      
      == Subtree 2 / 2 ==
      WholeStageCodegen
      :  +- Sort [value#6 ASC], false, 0
      :     +- INPUT
      +- Exchange hashpartitioning(value#6, 4), None
         +- SortBasedAggregate(key=[value#6], functions=[(anon$1(scala.Tuple2),mode=Partial,isDistinct=false)], output=[value#6,value#15])
            +- WholeStageCodegen
               :  +- Sort [value#6 ASC], false, 0
               :     +- INPUT
               +- AppendColumns <function1>, newInstance(class scala.Tuple2), [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String], true) AS value#6]
                  +- LocalTableScan [_1#2,_2#3], [[0,1800000001,1,61],[0,1800000001,2,61],[0,1800000001,3,61]]
      
      Generated code:
      /* 001 */ public Object generate(Object[] references) {
      /* 002 */   return new GeneratedIterator(references);
      /* 003 */ }
      /* 004 */ 
      /* 005 */ /** Codegened pipeline for:
      /* 006 */ * Sort [value#6 ASC], false, 0
      /* 007 */ +- INPUT
      /* 008 */ */
      /* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
      /* 010 */   private Object[] references;
      /* 011 */   private boolean sort_needToSort;
      /* 012 */   private org.apache.spark.sql.execution.Sort sort_plan;
      /* 013 */   private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
      /* 014 */   private org.apache.spark.executor.TaskMetrics sort_metrics;
      /* 015 */   private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
      /* 016 */   private scala.collection.Iterator inputadapter_input;
      /* 017 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_dataSize;
      /* 018 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue;
      /* 019 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_spillSize;
      /* 020 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1;
      /* 021 */   
      /* 022 */   public GeneratedIterator(Object[] references) {
      /* 023 */     this.references = references;
      /* 024 */   }
      /* 025 */   
      /* 026 */   public void init(int index, scala.collection.Iterator inputs[]) {
      /* 027 */     partitionIndex = index;
      /* 028 */     sort_needToSort = true;
      /* 029 */     this.sort_plan = (org.apache.spark.sql.execution.Sort) references[0];
      /* 030 */     sort_sorter = sort_plan.createSorter();
      /* 031 */     sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
      /* 032 */     
      /* 033 */     inputadapter_input = inputs[0];
      /* 034 */     this.sort_dataSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
      /* 035 */     sort_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_dataSize.localValue();
      /* 036 */     this.sort_spillSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
      /* 037 */     sort_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_spillSize.localValue();
      /* 038 */   }
      /* 039 */   
      /* 040 */   private void sort_addToSorter() throws java.io.IOException {
      /* 041 */     /*** PRODUCE: INPUT */
      /* 042 */     
      /* 043 */     while (inputadapter_input.hasNext()) {
      /* 044 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
      /* 045 */       /*** CONSUME: Sort [value#6 ASC], false, 0 */
      /* 046 */       
      /* 047 */       sort_sorter.insertRow((UnsafeRow)inputadapter_row);
      /* 048 */       if (shouldStop()) return;
      /* 049 */     }
      /* 050 */     
      /* 051 */   }
      /* 052 */   
      /* 053 */   protected void processNext() throws java.io.IOException {
      /* 054 */     /*** PRODUCE: Sort [value#6 ASC], false, 0 */
      /* 055 */     if (sort_needToSort) {
      /* 056 */       sort_addToSorter();
      /* 057 */       Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
      /* 058 */       sort_sortedIter = sort_sorter.sort();
      /* 059 */       sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
      /* 060 */       sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
      /* 061 */       sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
      /* 062 */       sort_needToSort = false;
      /* 063 */     }
      /* 064 */     
      /* 065 */     while (sort_sortedIter.hasNext()) {
      /* 066 */       UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
      /* 067 */       
      /* 068 */       /*** CONSUME: WholeStageCodegen */
      /* 069 */       
      /* 070 */       append(sort_outputRow);
      /* 071 */       
      /* 072 */       if (shouldStop()) return;
      /* 073 */     }
      /* 074 */   }
      /* 075 */ }
      

      Attachments

        Activity

          People

            cloud_fan Wenchen Fan
            koert koert kuipers
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: