5656 * Bytes 4 to 8: len(k)
5757 * Bytes 8 to 8 + len(k): key data
5858 * Bytes 8 + len(k) to 8 + len(k) + len(v): value data
59+ * Bytes 8 + len(k) + len(v) to 8 + len(k) + len(v) + 8: pointer to next pair
5960 *
6061 * This means that the first four bytes store the entire record (key + value) length. This format
61- * is consistent with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
62+ * is compatible with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
6263 * so we can pass records from this map directly into the sorter to sort records in place.
6364 */
6465public final class BytesToBytesMap extends MemoryConsumer {
@@ -132,7 +133,12 @@ public final class BytesToBytesMap extends MemoryConsumer {
132133 /**
133134 * Number of keys defined in the map.
134135 */
135- private int numElements ;
136+ private int numKeys ;
137+
138+ /**
139+ * Number of values defined in the map. A key could have multiple values.
140+ */
141+ private int numValues ;
136142
137143 /**
138144 * The map will be expanded once the number of keys exceeds this threshold.
@@ -223,7 +229,12 @@ public BytesToBytesMap(
223229 /**
224230 * Returns the number of keys defined in the map.
225231 */
226- public int numElements () { return numElements ; }
232+ public int numKeys () { return numKeys ; }
233+
234+ /**
235+ * Returns the number of values defined in the map. A key could have multiple values.
236+ */
237+ public int numValues () { return numValues ; }
227238
228239 public final class MapIterator implements Iterator <Location > {
229240
@@ -311,7 +322,8 @@ public Location next() {
311322 if (currentPage != null ) {
312323 int totalLength = Platform .getInt (pageBaseObject , offsetInPage );
313324 loc .with (currentPage , offsetInPage );
314- offsetInPage += 4 + totalLength ;
325+ // [total size] [key size] [key] [value] [pointer to next]
326+ offsetInPage += 4 + totalLength + 8 ;
315327 recordsInPage --;
316328 return loc ;
317329 } else {
@@ -361,7 +373,7 @@ public long spill(long numBytes) throws IOException {
361373 while (numRecords > 0 ) {
362374 int length = Platform .getInt (base , offset );
363375 writer .write (base , offset + 4 , length , 0 );
364- offset += 4 + length ;
376+ offset += 4 + length + 8 ;
365377 numRecords --;
366378 }
367379 writer .close ();
@@ -395,7 +407,7 @@ public void remove() {
395407 * `lookup()`, the behavior of the returned iterator is undefined.
396408 */
397409 public MapIterator iterator () {
398- return new MapIterator (numElements , loc , false );
410+ return new MapIterator (numValues , loc , false );
399411 }
400412
401413 /**
@@ -409,7 +421,7 @@ public MapIterator iterator() {
409421 * `lookup()`, the behavior of the returned iterator is undefined.
410422 */
411423 public MapIterator destructiveIterator () {
412- return new MapIterator (numElements , loc , true );
424+ return new MapIterator (numValues , loc , true );
413425 }
414426
415427 /**
@@ -559,6 +571,20 @@ private Location with(Object base, long offset, int length) {
559571 return this ;
560572 }
561573
574+ /**
575+ * Find the next pair that has the same key as current one.
576+ */
577+ public boolean nextValue () {
578+ assert isDefined ;
579+ long nextAddr = Platform .getLong (baseObject , valueOffset + valueLength );
580+ if (nextAddr == 0 ) {
581+ return false ;
582+ } else {
583+ updateAddressesAndSizes (nextAddr );
584+ return true ;
585+ }
586+ }
587+
562588 /**
563589 * Returns the memory page that contains the current record.
564590 * This is only valid if this is returned by {@link BytesToBytesMap#iterator()}.
@@ -625,10 +651,9 @@ public int getValueLength() {
625651 }
626652
627653 /**
628- * Store a new key and value. This method may only be called once for a given key; if you want
629- * to update the value associated with a key, then you can directly manipulate the bytes stored
630- * at the value address. The return value indicates whether the put succeeded or whether it
631- * failed because additional memory could not be acquired.
654+ * Append a new value for the key. This method could be called multiple times for a given key.
655+ * The return value indicates whether the put succeeded or whether it failed because additional
656+ * memory could not be acquired.
632657 * <p>
633658 * It is only valid to call this method immediately after calling `lookup()` using the same key.
634659 * </p>
@@ -637,15 +662,15 @@ public int getValueLength() {
637662 * </p>
638663 * <p>
639664 * After calling this method, calls to `get[Key|Value]Address()` and `get[Key|Value]Length`
640- * will return information on the data stored by this `putNewKey ` call.
665+ * will return information on the data stored by this `append ` call.
641666 * </p>
642667 * <p>
643668 * As an example usage, here's the proper way to store a new key:
644669 * </p>
645670 * <pre>
646671 * Location loc = map.lookup(keyBase, keyOffset, keyLength);
647672 * if (!loc.isDefined()) {
648- * if (!loc.putNewKey (keyBase, keyOffset, keyLength, ...)) {
673+ * if (!loc.append (keyBase, keyOffset, keyLength, ...)) {
649674 * // handle failure to grow map (by spilling, for example)
650675 * }
651676 * }
@@ -657,26 +682,23 @@ public int getValueLength() {
657682 * @return true if the put() was successful and false if the put() failed because memory could
658683 * not be acquired.
659684 */
660- public boolean putNewKey (Object keyBase , long keyOffset , int keyLength ,
661- Object valueBase , long valueOffset , int valueLength ) {
662- assert (!isDefined ) : "Can only set value once for a key" ;
663- assert (keyLength % 8 == 0 );
664- assert (valueLength % 8 == 0 );
665- assert (longArray != null );
666-
685+ public boolean append (Object kbase , long koff , int klen , Object vbase , long voff , int vlen ) {
686+ assert (klen % 8 == 0 );
687+ assert (vlen % 8 == 0 );
688+ assert (longArray != null );
667689
668- if (numElements == MAX_CAPACITY
690+ if (numKeys == MAX_CAPACITY
669691 // The map could be reused from last spill (because of no enough memory to grow),
670692 // then we don't try to grow again if hit the `growthThreshold`.
671- || !canGrowArray && numElements > growthThreshold ) {
693+ || !canGrowArray && numKeys > growthThreshold ) {
672694 return false ;
673695 }
674696
675697 // Here, we'll copy the data into our data pages. Because we only store a relative offset from
676698 // the key address instead of storing the absolute address of the value, the key and value
677699 // must be stored in the same memory page.
678- // (8 byte key length) (key) (value)
679- final long recordLength = 8 + keyLength + valueLength ;
700+ // (8 byte key length) (key) (value) (8 byte pointer to next value)
701+ final long recordLength = 8 + klen + vlen + 8 ;
680702 if (currentPage == null || currentPage .size () - pageCursor < recordLength ) {
681703 if (!acquireNewPage (recordLength + 4L )) {
682704 return false ;
@@ -687,30 +709,40 @@ public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
687709 final Object base = currentPage .getBaseObject ();
688710 long offset = currentPage .getBaseOffset () + pageCursor ;
689711 final long recordOffset = offset ;
690- Platform .putInt (base , offset , keyLength + valueLength + 4 );
691- Platform .putInt (base , offset + 4 , keyLength );
712+ Platform .putInt (base , offset , klen + vlen + 4 );
713+ Platform .putInt (base , offset + 4 , klen );
692714 offset += 8 ;
693- Platform .copyMemory (keyBase , keyOffset , base , offset , keyLength );
694- offset += keyLength ;
695- Platform .copyMemory (valueBase , valueOffset , base , offset , valueLength );
715+ Platform .copyMemory (kbase , koff , base , offset , klen );
716+ offset += klen ;
717+ Platform .copyMemory (vbase , voff , base , offset , vlen );
718+ offset += vlen ;
719+ Platform .putLong (base , offset , 0 );
696720
697721 // --- Update bookkeeping data structures ----------------------------------------------------
698722 offset = currentPage .getBaseOffset ();
699723 Platform .putInt (base , offset , Platform .getInt (base , offset ) + 1 );
700724 pageCursor += recordLength ;
701- numElements ++;
702725 final long storedKeyAddress = taskMemoryManager .encodePageNumberAndOffset (
703726 currentPage , recordOffset );
704- longArray .set (pos * 2 , storedKeyAddress );
705- longArray .set (pos * 2 + 1 , keyHashcode );
706- updateAddressesAndSizes (storedKeyAddress );
707- isDefined = true ;
727+ numValues ++;
728+ if (isDefined ) {
729+ // put this pair at the end of chain
730+ while (nextValue ()) { /* do nothing */ }
731+ Platform .putLong (baseObject , valueOffset + valueLength , storedKeyAddress );
732+ nextValue (); // point to new added value
733+ } else {
734+ numKeys ++;
735+ longArray .set (pos * 2 , storedKeyAddress );
736+ longArray .set (pos * 2 + 1 , keyHashcode );
737+ updateAddressesAndSizes (storedKeyAddress );
738+ isDefined = true ;
708739
709- if (numElements > growthThreshold && longArray .size () < MAX_CAPACITY ) {
710- try {
711- growAndRehash ();
712- } catch (OutOfMemoryError oom ) {
713- canGrowArray = false ;
740+ if (numKeys > growthThreshold && longArray .size () < MAX_CAPACITY ) {
741+ try {
742+ growAndRehash ();
743+ } catch (OutOfMemoryError oom ) {
744+ canGrowArray = false ;
745+ }
714746 }
715747 }
716748 return true ;
@@ -866,7 +898,8 @@ public LongArray getArray() {
866898 * Reset this map to initialized state.
867899 */
868900 public void reset () {
869- numElements = 0 ;
901+ numKeys = 0 ;
902+ numValues = 0 ;
870903 longArray .zeroOut ();
871904
872905 while (dataPages .size () > 0 ) {
0 commit comments