Skip to content

Commit cc4dce9

Browse files
authored
KAFKA-2983: Remove Scala consumers and related code (#5230)
- Removed Scala consumers (`SimpleConsumer` and `ZooKeeperConsumerConnector`) and their tests. - Removed Scala request/response/message classes. - Removed any mention of new consumer or new producer in the code with the exception of MirrorMaker where the new.consumer option was never deprecated so we have to keep it for now. The non-code documentation has not been updated either, that will be done separately. - Removed a number of tools that only made sense in the context of the Scala consumers (see upgrade notes). - Updated some tools that worked with both Scala and Java consumers so that they only support the latter (see upgrade notes). - Removed `BaseConsumer` and related classes apart from `BaseRecord` which is used in `MirrorMakerMessageHandler`. The latter is a pluggable interface so effectively public API. - Removed `ZkUtils` methods that were only used by the old consumers. - Removed `ZkUtils.registerBroker` and `ZKCheckedEphemeral` since the broker now uses the methods in `KafkaZkClient` and no-one else should be using that method. - Updated system tests so that they don't use the Scala consumers except for multi-version tests. - Updated LogDirFailureTest so that the consumer offsets topic would continue to be available after all the failures. This was necessary for it to work with the Java consumer. - Some multi-version system tests had not been updated to include recently released Kafka versions, fixed it. - Updated findBugs and checkstyle configs not to refer to deleted classes and packages. Reviewers: Dong Lin <[email protected]>, Manikumar Reddy <[email protected]>
1 parent e8955f7 commit cc4dce9

File tree

207 files changed

+677
-14931
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

207 files changed

+677
-14931
lines changed

bin/kafka-simple-consumer-shell.sh

Lines changed: 0 additions & 17 deletions
This file was deleted.

bin/windows/kafka-simple-consumer-shell.bat

Lines changed: 0 additions & 17 deletions
This file was deleted.

checkstyle/import-control-core.xml

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,26 +38,9 @@
3838
<allow pkg="kafka.serializer" />
3939
<allow pkg="org.apache.kafka.common" />
4040

41-
<subpackage name="javaapi">
42-
<subpackage name="consumer">
43-
<allow pkg="kafka.consumer" />
44-
</subpackage>
45-
46-
<subpackage name="message">
47-
<allow pkg="kafka.message" />
48-
</subpackage>
49-
50-
<subpackage name="producer">
51-
<allow pkg="kafka.producer" />
52-
</subpackage>
53-
</subpackage>
54-
5541
<subpackage name="tools">
5642
<allow pkg="org.apache.kafka.clients.admin" />
5743
<allow pkg="kafka.admin" />
58-
<allow pkg="kafka.javaapi" />
59-
<allow pkg="kafka.producer" />
60-
<allow pkg="kafka.consumer" />
6144
<allow pkg="joptsimple" />
6245
<allow pkg="org.apache.kafka.clients.consumer" />
6346
<allow class="javax.xml.datatype.Duration" />
@@ -71,9 +54,6 @@
7154

7255
<subpackage name="examples">
7356
<allow pkg="org.apache.kafka.clients" />
74-
<allow pkg="kafka.api" />
75-
<allow pkg="kafka.javaapi" />
76-
<allow pkg="kafka.message" />
7757
</subpackage>
7858

7959
</import-control>

clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.io.EOFException;
2020
import java.io.IOException;
2121
import java.nio.ByteBuffer;
22-
import java.nio.channels.ReadableByteChannel;
2322
import java.nio.channels.ScatteringByteChannel;
2423
import org.apache.kafka.common.memory.MemoryPool;
2524
import org.slf4j.Logger;
@@ -90,33 +89,6 @@ public boolean complete() {
9089
}
9190

9291
public long readFrom(ScatteringByteChannel channel) throws IOException {
93-
return readFromReadableChannel(channel);
94-
}
95-
96-
@Override
97-
public boolean requiredMemoryAmountKnown() {
98-
return requestedBufferSize != -1;
99-
}
100-
101-
@Override
102-
public boolean memoryAllocated() {
103-
return buffer != null;
104-
}
105-
106-
107-
@Override
108-
public void close() throws IOException {
109-
if (buffer != null && buffer != EMPTY_BUFFER) {
110-
memoryPool.release(buffer);
111-
buffer = null;
112-
}
113-
}
114-
115-
// Need a method to read from ReadableByteChannel because BlockingChannel requires read with timeout
116-
// See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
117-
// This can go away after we get rid of BlockingChannel
118-
@Deprecated
119-
public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
12092
int read = 0;
12193
if (size.hasRemaining()) {
12294
int bytesRead = channel.read(size);
@@ -151,6 +123,25 @@ public long readFromReadableChannel(ReadableByteChannel channel) throws IOExcept
151123
return read;
152124
}
153125

126+
@Override
127+
public boolean requiredMemoryAmountKnown() {
128+
return requestedBufferSize != -1;
129+
}
130+
131+
@Override
132+
public boolean memoryAllocated() {
133+
return buffer != null;
134+
}
135+
136+
137+
@Override
138+
public void close() throws IOException {
139+
if (buffer != null && buffer != EMPTY_BUFFER) {
140+
memoryPool.release(buffer);
141+
buffer = null;
142+
}
143+
}
144+
154145
public ByteBuffer payload() {
155146
return this.buffer;
156147
}

core/src/main/scala/kafka/admin/AdminUtils.scala

Lines changed: 9 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -363,76 +363,17 @@ object AdminUtils extends Logging with AdminUtilities {
363363

364364
@deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
365365
def deleteTopic(zkUtils: ZkUtils, topic: String) {
366-
if (topicExists(zkUtils, topic)) {
367-
try {
368-
zkUtils.createPersistentPath(getDeleteTopicPath(topic))
369-
} catch {
370-
case _: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
371-
"topic %s is already marked for deletion".format(topic))
372-
case e2: Throwable => throw new AdminOperationException(e2)
373-
}
374-
} else {
375-
throw new UnknownTopicOrPartitionException(s"Topic `$topic` to delete does not exist")
366+
if (topicExists(zkUtils, topic)) {
367+
try {
368+
zkUtils.createPersistentPath(getDeleteTopicPath(topic))
369+
} catch {
370+
case _: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
371+
"topic %s is already marked for deletion".format(topic))
372+
case e2: Throwable => throw new AdminOperationException(e2)
376373
}
374+
} else {
375+
throw new UnknownTopicOrPartitionException(s"Topic `$topic` to delete does not exist")
377376
}
378-
379-
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
380-
def isConsumerGroupActive(zkUtils: ZkUtils, group: String) = {
381-
zkUtils.getConsumersInGroup(group).nonEmpty
382-
}
383-
384-
/**
385-
* Delete the whole directory of the given consumer group if the group is inactive.
386-
*
387-
* @param zkUtils Zookeeper utilities
388-
* @param group Consumer group
389-
* @return whether or not we deleted the consumer group information
390-
*/
391-
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
392-
def deleteConsumerGroupInZK(zkUtils: ZkUtils, group: String) = {
393-
if (!isConsumerGroupActive(zkUtils, group)) {
394-
val dir = new ZKGroupDirs(group)
395-
zkUtils.deletePathRecursive(dir.consumerGroupDir)
396-
true
397-
}
398-
else false
399-
}
400-
401-
/**
402-
* Delete the given consumer group's information for the given topic in Zookeeper if the group is inactive.
403-
* If the consumer group consumes no other topics, delete the whole consumer group directory.
404-
*
405-
* @param zkUtils Zookeeper utilities
406-
* @param group Consumer group
407-
* @param topic Topic of the consumer group information we wish to delete
408-
* @return whether or not we deleted the consumer group information for the given topic
409-
*/
410-
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
411-
def deleteConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, group: String, topic: String) = {
412-
val topics = zkUtils.getTopicsByConsumerGroup(group)
413-
if (topics == Seq(topic)) {
414-
deleteConsumerGroupInZK(zkUtils, group)
415-
}
416-
else if (!isConsumerGroupActive(zkUtils, group)) {
417-
val dir = new ZKGroupTopicDirs(group, topic)
418-
zkUtils.deletePathRecursive(dir.consumerOwnerDir)
419-
zkUtils.deletePathRecursive(dir.consumerOffsetDir)
420-
true
421-
}
422-
else false
423-
}
424-
425-
/**
426-
* Delete every inactive consumer group's information about the given topic in Zookeeper.
427-
*
428-
* @param zkUtils Zookeeper utilities
429-
* @param topic Topic of the consumer group information we wish to delete
430-
*/
431-
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
432-
def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String): Set[String] = {
433-
val groups = zkUtils.getAllConsumerGroupsForTopic(topic)
434-
groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
435-
groups
436377
}
437378

438379
def topicExists(zkUtils: ZkUtils, topic: String): Boolean =

0 commit comments

Comments
 (0)