KAFKA-2983: Remove Scala consumers and related code#5230
KAFKA-2983: Remove Scala consumers and related code#5230ijuma merged 16 commits intoapache:trunkfrom
Conversation
|
@lindong28, do you have time to take a look? I started a system tests run, but maybe you could take an initial pass, if you have time. |
|
@ijuma Sure! I will review it. |
81407be to
92da9bd
Compare
|
@lindong28 I rebased as a PR had been merged that had broken this branch. Also added a description to the pull request of what is included in the PR. |
|
@rajinisivaram, @lindong28 said that he will be able to review this PR. |
lindong28
left a comment
There was a problem hiding this comment.
Thanks for the patch! Just have some minor comments. LGTM. I think we can commit the patch after it passes all system tests.
There was a problem hiding this comment.
Originally we need two while loops because mirrorMakerConsumer.receive will block infinitely for old consumer if there is no data. I think it should be safe to just remote the second while loop.
There was a problem hiding this comment.
Thanks for the explanation. I'll keep the existing code (will just update the comment), but will file a JIRA for simplifying MirrorMaker.
There was a problem hiding this comment.
Thanks. What I was wondering about is whether there were cases where TimeoutException is thrown that we don't want to catch and continue. ConsumerTimeoutException is only thrown by NewShinyConsumer if poll doesn't return any records. Maybe to be safe, I should maintain the exact behaviour and then we can a separate PR to simplify MirrorMaker (instead of being in the middle of this huge one).
There was a problem hiding this comment.
It seems that Blacklist is only used in test. Do we still need this?
There was a problem hiding this comment.
Good point, we only need to keep Whitelist.
There was a problem hiding this comment.
Since broker is still using zookeeper, would this test class still be useful?
There was a problem hiding this comment.
The broker doesn't use ZkUtils anymore and hence why I removed the ZKCheckedEphemeral code and tests. There is a KafkaZkClient.CheckedEphemeral that hasn't been removed and it's still used.
|
@ijuma nit: could you please also remove ZookeeperConsumerConnector related |
There was a problem hiding this comment.
Can we remove options/code related to old metrics reporter?
docs/upgrade.html
Outdated
There was a problem hiding this comment.
We can also remove "Legacy APIs" , "Old Consumer Configs" sections from docs.
There was a problem hiding this comment.
Yeah, as I said in the PR description the documentation hasn't been updated and we can do that separately.
There was a problem hiding this comment.
nit: can remove "New" from comment. also ProducerConfig constructor is public now
There was a problem hiding this comment.
Can we also remove ImportZkOffsets?
Also we have not removed ZK based offset management. that means we are going to maintain ZK based offset management some more time? Existing users can continue to use older release scala consumers if they want?
In that case, Can we deprecate ZK based offset management and mention in docs?
There was a problem hiding this comment.
Good point, removed ImportZkOffsets. What do you mean by "we have not removed ZK based offset management?
There was a problem hiding this comment.
I was mentioning about below code segments.
https://github.com/omkreddy/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L322
https://github.com/omkreddy/kafka/blob/trunk/core/src/main/scala/kafka/zk/ZkData.scala#L428
There was a problem hiding this comment.
Yeah, we want the broker to still support everything for now. We need a KIP to discuss removal and deprecation of broker functionality related to the old consumer. I restored a change in ZkData as I hadn't realised that the broker was relying on that functionality itself.
docs/upgrade.html
Outdated
There was a problem hiding this comment.
nit: can we include ExportZkOffsets
92da9bd to
6f10455
Compare
|
Rebased and addressed the comments so far. Filed KAFKA-7062 and KAFKA-7063 for follow-ups. If anyone wants to pick them up, feel free. :) |
aceb212 to
dabfa0d
Compare
|
@lindong28, I fixed a few system test issues. One of the remaining ones is # Verify the following:
# 1) The broker with offline directory is the only in-sync broker of the partition of topic2
# 2) Messages can still be produced and consumed from topic2
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic2,
throughput=self.producer_throughput, offline_nodes=offline_nodes)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic2, group_id="test-consumer-group-2",
consumer_timeout_ms=90000, message_validator=is_int)
self.consumer_start_timeout_sec = 90
self.start_producer_and_consumer()Since you wrote this test, is there a chance you could take a look? It's a bit worrying that this doesn't work with the Java consumer. |
|
@ijuma Sure, I will look into this issue. |
|
I'm running the broker system tests in Jenkins again (https://jenkins.confluent.io/view/All/job/system-test-kafka-branch-builder/1803/console), but I believe the only remaining failing test is |
dabfa0d to
7a8493f
Compare
|
@lindong28 do you think you'll have time to look into this today? If not, I'll partially disable the test, file a JIRA and merge this PR. These changes don't introduce the issue after all, they are just exposing the fact that the test as it stands doesn't work with the new consumer. |
|
@ijuma Yes! I am looking into this now. Sorry for the delay. |
|
@lindong28 Thanks! To be clear, there was no delay on your part, it's just that the timeline for this PR to make it into 2.0 is very short. :) So, I was trying to understand your availability to figure out the best path forward. I will wait in case there's an easy tweak to the test to make it pass with the new consumer. |
|
@lindong28 interestingly, only 1 of 4 log dir failure tests failed in the run I started last night: So, it looks flaky instead of failing deterministically. |
|
@lindong28 Looking at the test a bit more, when we "Shutdown all other brokers so that the broker with offline log dir is the only online broker", how do we ensure that there are no consumer_offsets partitions that become offline? |
|
@ijuma Sure thing. I would be more than happy to make this into 2.0 to make it shiny instead of disabling the test :) Mostly likely I will be able to fix it today. That is a very good point. The default offsets.topic.replication.factor in Kafka server is 3. Since we start three brokers in the test before we start producer and consumer, the offset topic will be created with RF=3. And I verified that the consumer offset topic is replicated across all three brokers and the test still failed. |
|
Right. Do I understand correctly that we have 2 out of 3 brokers down and the remaining online broker has one log dir offline? It seems like some consumer offset partitions could be in the offline log dir, which would cause the coordinator to never become available again. |
|
@ijuma Ah I see you point. Yes you are right. If the consumer offset topic is in the offline log directory of the only online broker then it will cause the CoordinatorNotAvailableException. Let me try to fix this problem. Thanks! |
|
@lindong28 yes :) |
…e to work with newer broker versions
7a8493f to
2e94ea1
Compare
|
Thanks @lindong28. The log dir failure tests pass after that patch is applied: |
|
@ijuma Great! The patch looks good overall when I reviewed a few days ago. A few minor commits have been made to ensure that we pass integration tests and system tests. Do you need me to go over this again? |
|
@lindong28, it's enough for you to review the commits added since your last review |
lindong28
left a comment
There was a problem hiding this comment.
LGTM. Left only one minor comment.
| .ofType(classOf[java.lang.Integer]) | ||
| val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + | ||
| "skip it instead of halt.") | ||
| val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") |
There was a problem hiding this comment.
Is csvMetricsReporterEnabledOpt only used with scala consumer? It seems that KafkaCSVMetricsReporter is not deprecated or removed.
There was a problem hiding this comment.
KafkaCSVMetricsReporter is based on Yammer Metrics. This is still used by the broker and hence why it's not deprecated. The Java clients use Kafka Metrics instead. So, it seems to me that there will be no metrics reported if this is enabled with the Java consumer making it useless. Am I missing something?
There was a problem hiding this comment.
Ah I see. I didn't know that KafkaCSVMetricsReporter is based on Yammer metrics. Then it makes sense to remove it since only scala clients use Yammer metrics. Thanks for the explanation. I have no further comments.
|
Thanks for the reviews. Merging to trunk and 2.0. |
- Removed Scala consumers (`SimpleConsumer` and `ZooKeeperConsumerConnector`) and their tests. - Removed Scala request/response/message classes. - Removed any mention of new consumer or new producer in the code with the exception of MirrorMaker where the new.consumer option was never deprecated so we have to keep it for now. The non-code documentation has not been updated either, that will be done separately. - Removed a number of tools that only made sense in the context of the Scala consumers (see upgrade notes). - Updated some tools that worked with both Scala and Java consumers so that they only support the latter (see upgrade notes). - Removed `BaseConsumer` and related classes apart from `BaseRecord` which is used in `MirrorMakerMessageHandler`. The latter is a pluggable interface so effectively public API. - Removed `ZkUtils` methods that were only used by the old consumers. - Removed `ZkUtils.registerBroker` and `ZKCheckedEphemeral` since the broker now uses the methods in `KafkaZkClient` and no-one else should be using that method. - Updated system tests so that they don't use the Scala consumers except for multi-version tests. - Updated LogDirFailureTest so that the consumer offsets topic would continue to be available after all the failures. This was necessary for it to work with the Java consumer. - Some multi-version system tests had not been updated to include recently released Kafka versions, fixed it. - Updated findBugs and checkstyle configs not to refer to deleted classes and packages. Reviewers: Dong Lin <[email protected]>, Manikumar Reddy <[email protected]>
- Removed Scala consumers (`SimpleConsumer` and `ZooKeeperConsumerConnector`) and their tests. - Removed Scala request/response/message classes. - Removed any mention of new consumer or new producer in the code with the exception of MirrorMaker where the new.consumer option was never deprecated so we have to keep it for now. The non-code documentation has not been updated either, that will be done separately. - Removed a number of tools that only made sense in the context of the Scala consumers (see upgrade notes). - Updated some tools that worked with both Scala and Java consumers so that they only support the latter (see upgrade notes). - Removed `BaseConsumer` and related classes apart from `BaseRecord` which is used in `MirrorMakerMessageHandler`. The latter is a pluggable interface so effectively public API. - Removed `ZkUtils` methods that were only used by the old consumers. - Removed `ZkUtils.registerBroker` and `ZKCheckedEphemeral` since the broker now uses the methods in `KafkaZkClient` and no-one else should be using that method. - Updated system tests so that they don't use the Scala consumers except for multi-version tests. - Updated LogDirFailureTest so that the consumer offsets topic would continue to be available after all the failures. This was necessary for it to work with the Java consumer. - Some multi-version system tests had not been updated to include recently released Kafka versions, fixed it. - Updated findBugs and checkstyle configs not to refer to deleted classes and packages. Reviewers: Dong Lin <[email protected]>, Manikumar Reddy <[email protected]>
: Remove Scala consumers and related code (apache#5230)" (commit cc4dce9).
…hich was removed by "KAFKA-2983: Remove Scala consumers and related code (apache#5230)" (commit cc4dce9). TICKET = LI_DESCRIPTION = EXIT_CRITERIA = MANUAL ["describe exit criteria"]
…hich was removed by "KAFKA-2983: Remove Scala consumers and related code (apache#5230)" (commit cc4dce9). TICKET = LI_DESCRIPTION = EXIT_CRITERIA = MANUAL ["describe exit criteria"]
…hich was removed by "KAFKA-2983: Remove Scala consumers and related code (apache#5230)" (commit cc4dce9). TICKET = LI_DESCRIPTION = This patch included fix by Jon lee from 2.0-li branch and a follow up fix by xiongqi welsey wu for 2.3 branch. EXIT_CRITERIA = MANUAL ["When old scala consumer is no longer needed"]
…hich was removed by "KAFKA-2983: Remove Scala consumers and related code (apache#5230)" (commit cc4dce9) in upstream. TICKET = LI_DESCRIPTION = This patch included fix by Jon Lee from 2.0-li branch and a follow up fix by Xiongqi Welsey Wu for 2.3-li branch. EXIT_CRITERIA = MANUAL ["When old Scala consumer is no longer needed"]
SimpleConsumerandZooKeeperConsumerConnector)and their tests.
with the exception of MirrorMaker where the new.consumer option was
never deprecated so we have to keep it for now. The non-code
documentation has not been updated either, that will be done
separately.
of the Scala consumers (see upgrade notes).
so that they only support the latter (see upgrade notes).
BaseConsumerand related classes apart fromBaseRecordwhich is used in
MirrorMakerMessageHandler. The latter is a pluggableinterface so effectively public API.
ZkUtilsmethods that were only used by the old consumers.ZkUtils.registerBrokerandZKCheckedEphemeralsincethe broker now uses the methods in
KafkaZkClientand no-one elseshould be using that method.
for multi-version tests.
continue to be available after all the failures. This was necessary for it
to work with the Java consumer.
recently released Kafka versions, fixed it.
classes and packages.
Committer Checklist (excluded from commit message)