Skip to content

Commit 2487a72

Browse files
author
Tyson Condie
committed
address comments from @zsxwing
1 parent b597cf1 commit 2487a72

File tree

7 files changed

+20
-14
lines changed

7 files changed

+20
-14
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,9 @@ private[kafka010] class KafkaOffsetReaderImpl(
117117
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
118118
// Poll to get the latest assigned partitions
119119
consumer.poll(0)
120-
consumer.assignment().asScala.toSet
120+
val partitions = consumer.assignment()
121+
consumer.pause(partitions)
122+
partitions.asScala.toSet
121123
}
122124

123125
override def fetchSpecificStartingOffsets(

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,10 @@ private[kafka010] class KafkaRelation(
111111
// Obtain TopicPartition offsets with late binding support
112112
kafkaOffsets match {
113113
case EarliestOffsets => partitions.map {
114-
case tp => tp -> -2L
114+
case tp => tp -> KafkaUtils.EARLIEST
115115
}.toMap
116116
case LatestOffsets => partitions.map {
117-
case tp => tp -> -1L
117+
case tp => tp -> KafkaUtils.LATEST
118118
}.toMap
119119
case SpecificOffsets(partitionOffsets) =>
120120
validateTopicPartitions(partitions, partitionOffsets)

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ private[kafka010] class KafkaSource(
123123
private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = {
124124
val result = kafkaReader.fetchSpecificStartingOffsets(specificOffsets)
125125
specificOffsets.foreach {
126-
case (tp, off) if off != -1 && off != -2 =>
126+
case (tp, off) if off != KafkaUtils.LATEST && off != KafkaUtils.EARLIEST =>
127127
if (result(tp) != off) {
128128
reportDataLoss(
129129
s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}")

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister with Stre
327327
throw new IllegalArgumentException("starting relation offset can't be latest")
328328
case Some(json) => (SpecificOffsets(JsonUtils.partitionOffsets(json)))
329329
.partitionOffsets.foreach {
330-
case (tp, off) if off == -1 =>
330+
case (tp, off) if off == KafkaUtils.LATEST =>
331331
throw new IllegalArgumentException(s"startingOffsets for $tp can't be latest")
332332
case _ => // ignore
333333
}
@@ -340,7 +340,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister with Stre
340340
case Some("latest") => // good to go
341341
case Some(json) => (SpecificOffsets(JsonUtils.partitionOffsets(json)))
342342
.partitionOffsets.foreach {
343-
case (tp, off) if off == -2 =>
343+
case (tp, off) if off == KafkaUtils.EARLIEST =>
344344
throw new IllegalArgumentException(s"ending offset for $tp can't be earliest")
345345
case _ => // ignore
346346
}

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,16 @@ private[kafka010] class KafkaSourceRDD(
140140
if (range.fromOffset < 0 || range.untilOffset < 0) {
141141
// Late bind the offset range
142142
val fromOffset = if (range.fromOffset < 0) {
143+
assert(range.fromOffset == KafkaUtils.EARLIEST,
144+
s"earliest offset does not equal ${KafkaUtils.EARLIEST}")
143145
consumer.rawConsumer.seekToBeginning(ju.Arrays.asList(range.topicPartition))
144146
consumer.rawConsumer.position(range.topicPartition)
145147
} else {
146148
range.fromOffset
147149
}
148150
val untilOffset = if (range.untilOffset < 0) {
151+
assert(range.fromOffset == KafkaUtils.LATEST,
152+
s"latest offset does not equal ${KafkaUtils.LATEST}")
149153
consumer.rawConsumer.seekToEnd(ju.Arrays.asList(range.topicPartition))
150154
consumer.rawConsumer.position(range.topicPartition)
151155
} else {

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaUtils.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation
2222

2323
private[kafka010] object KafkaUtils {
2424

25+
// Used to denote unbounded offset positions
26+
val LATEST = -1L
27+
val EARLIEST = -2L
28+
2529
def getSortedExecutorList(sc: SparkContext): Array[String] = {
2630
val bm = sc.env.blockManager
2731
bm.master.getPeers(bm.blockManagerId).toArray

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import org.apache.spark.SparkConf
5050
*
5151
* The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
5252
*/
53-
class KafkaTestUtils(withBrokerProps: Option[Map[String, Object]] = None) extends Logging {
53+
class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends Logging {
5454

5555
// Zookeeper related configurations
5656
private val zkHost = "localhost"
@@ -255,15 +255,15 @@ class KafkaTestUtils(withBrokerProps: Option[Map[String, Object]] = None) extend
255255

256256
def getEarliestOffsets(topics: Set[String]): Map[TopicPartition, Long] = {
257257
val kc = new KafkaConsumer[String, String](consumerConfiguration)
258-
logInfo("Created consumer to get latest offsets")
258+
logInfo("Created consumer to get earliest offsets")
259259
kc.subscribe(topics.asJavaCollection)
260260
kc.poll(0)
261261
val partitions = kc.assignment()
262262
kc.pause(partitions)
263263
kc.seekToBeginning(partitions)
264264
val offsets = partitions.asScala.map(p => p -> kc.position(p)).toMap
265265
kc.close()
266-
logInfo("Closed consumer to get latest offsets")
266+
logInfo("Closed consumer to get earliest offsets")
267267
offsets
268268
}
269269

@@ -292,11 +292,7 @@ class KafkaTestUtils(withBrokerProps: Option[Map[String, Object]] = None) extend
292292
props.put("log.flush.interval.messages", "1")
293293
props.put("replica.socket.timeout.ms", "1500")
294294
props.put("delete.topic.enable", "true")
295-
withBrokerProps.map { p =>
296-
p.foreach {
297-
case (key, value) => props.put(key, value)
298-
}
299-
}
295+
props.putAll(withBrokerProps.asJava)
300296
props
301297
}
302298

0 commit comments

Comments
 (0)