Conversation
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java
Show resolved
Hide resolved
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
Show resolved
Hide resolved
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
Show resolved
Hide resolved
|
- only run herders for enabled flows - don't proactively create topics -- wait for topic to be created by producer - pluggable filters for topics, groups, config properties - topic-level metrics and other improvements to metrics - drop temporary "monitor" connector
...ct/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
Show resolved
Hide resolved
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
Outdated
Show resolved
Hide resolved
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java
Show resolved
Hide resolved
| .filter(x -> x != -1) | ||
| .mapToInt(x -> x) | ||
| .min() | ||
| .orElse(-1); |
There was a problem hiding this comment.
I'm wondering: what is the advantage of using -1 over say null to represent no value?
There was a problem hiding this comment.
That would work too. Whatever is more conventional in Kafka is fine with me.
There was a problem hiding this comment.
There seems to be null some places and other places -1. If possible can we standardize or leave a comment.
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
Show resolved
Hide resolved
|
@harshach : There are still a couple of comments that haven't been addressed. The biggest one is on dealing with compacted topic for offset translation. |
|
|
||
| assertTrue("always emit offset sync on first update", | ||
| partitionState.update(0, 100)); | ||
| assertTrue("upstream offset skipped -> resync", |
There was a problem hiding this comment.
@junrao holes in a compacted topic are handled, as shown here.
|
@junrao sorry, I didn't see a few comments that were folded/hidden for some reason. Hopefully I've addressed everything that would otherwise delay the merge. |
junrao
left a comment
There was a problem hiding this comment.
@ryannedolan : Thanks for the explanation. It all makes sense to me now. So, LGTM.
A few of the limitations with MM2 that I saw (1) if MM2 starts with an existing compact topic with lots of holes, there could be more overhead for writing the offsetSync data. In the worse case, every record requires a checkpoint of offsetSync data. (2) if a consumer starts consuming from the beginning, the offsets won't be translated to the target cluster until the consumer catches up. (3) prefix acls are not propagated. These may not be common issues. However, it would be useful document that in the docs.
Also, we will need a few new message formatters to read the new internal topics. Do you plan to add that before code freeze?
@omkreddy and @harshach : Once the minor issues are addressed, perhaps you could take another look and merge the PR?
| * @param metadata {@link RecordMetadata} record metadata returned from the broker | ||
| * @throws InterruptedException | ||
| */ | ||
| public void commitRecord(SourceRecord record, RecordMetadata metadata) |
There was a problem hiding this comment.
Could we add some comments to make it clear that one only needs to implement one of the commitRecord()?
There was a problem hiding this comment.
Added a couple lines locally. Will hold on to the commit for now -- I don't want to trigger another build at the moment.
...ct/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
Outdated
Show resolved
Hide resolved
| testRuntime libs.slf4jlog4j | ||
| } | ||
|
|
||
| javadoc { |
There was a problem hiding this comment.
@ryannedolan
We need to include javadocs section for newly added public interfaces/classes.
Example: https://github.com/apache/kafka/blob/trunk/build.gradle#L1539
I assume, we will be adding kafka website documentation as part of KAFKA-8930.
http://kafka.apache.org/documentation/#basic_ops_mirror_maker
Also looks like test failures are related.
There was a problem hiding this comment.
Ah thanks @omkreddy I'll fix the javadocs this morning. For the website documentation, we'll need to keep the existing mirror-maker section for now, but I'll add a section re MM2, probably in a separate PR.
The failing tests seem to be related to flakiness in the Connect integration test framework. I'll see what I can do.
|
retest this please |
|
@ryannedolan Any update on test failures? @harshach Please merge the PR once the Jun's comments are addressed and we have green builds. |
|
@omkreddy I traced the build failures to an NPE from KIP-507 committed yesterday. It is breaking MM2's and other Connect integration tests. I'll fix here I guess. |
|
I fixed the NPE from KIP-507 -- let's see if we can get a green build now. |
|
@harshach good to go! |
rhauch
left a comment
There was a problem hiding this comment.
Thanks for updating KIP-416 so that the framework only calls the newer SourceTask.commitRecord(SourceRecord, RecordMetadata).
I have one request to improve the JavaDoc for the new method to make it easier for developers writing their own SourceTask to know whether to override those methods, without having to dive into the Connect runtime to figure that out.
| * </p> | ||
| * | ||
| * @param record {@link SourceRecord} that was successfully sent via the producer | ||
| * @param metadata {@link RecordMetadata} record metadata returned from the broker |
There was a problem hiding this comment.
@ryannedolan, we need to clearly specify that metadata parameter can be null, and then in the JavaDoc above specify what this means for the source task, namely that a transform dropped/skipped the record and it was not written to Kafka.
IMO this is necessary so that developers of connector implementations know what the behavior is so they can properly implement their task. (The JavaDoc was not in KIP-416.)
I also think that it's also worth mentioning here that SourceTask implementations need only implement this method or the older commitRecord(SourceRecord) or neither method, but that generally they do not need to implement both since Connect will only call this method. Again, this will help developers that are implementing their own SourceTask what they need to do.
There was a problem hiding this comment.
@rhauch I improved the javadocs further. Should be clear now.
|
@ryannedolan, @junrao, @harshach: As mentioned above in one of the comments on According to the vote thread, KIP-415 has not yet been approved, and per the AK 2.4.0 Release Plan the KIP deadline was on Sept 25. @junrao, what's required here? |
|
@rhauch : We can fold the changes in WorkerSourceTask into this KIP. Ryan, could you update your kip wiki and send an email to the voting thread about the additional changes to make sure that no one objects to this? If kip-416 is completely subsumed by this, we can just cancel it. Otherwise, kip-416 can cover the remaining part. |
|
Thanks, @junrao. KIP-416 is straightforward, limited to the one new method change we discussed (and agreed to), and required for KIP-382, so pulling those changes into KIP-382 makes sense to me. |
| */ | ||
| public Map<TopicPartition, OffsetAndMetadata> remoteConsumerOffsets(String consumerGroupId, | ||
| String remoteClusterAlias, Duration timeout) { | ||
| long deadline = System.currentTimeMillis() + timeout.toMillis(); |
There was a problem hiding this comment.
Is there a reason why this is not using the Time interface? We generally never use System.currentTimeMillis() in Kafka.
There was a problem hiding this comment.
Good idea, we should replace these in a subsequent PR.
|
@junrao some of the comments mentioned here can be handled in a follow-up patch and can be part of minor release that will follow-up. |
|
@ryannedolan Thanks for the PR. Merging the PR. Lets address any issues in follow-up PRs. Pls raise JIRAs for any pending work (MirrorSinkConnector, legacy mode etc.) for next releases. |
| blacklisted ones), across all enabled replication flows. Each | ||
| replication flow must be explicitly enabled to begin replication: | ||
|
|
||
| A->B.enabled = true |
There was a problem hiding this comment.
This directive (A->B.enabled) is missing in KIP in the "Running MirrorMaker in production section"-- I got the cluster up but no replication was happening. Until I saw this and fixed this. I am new to MIrrorMaker/Connect Framework- forgive me if this is a well known thing.
|
@ryannedolan try to find JIRA of MirrorSinkConnector but search return nothing. can you please point me where i can find status of this work? thanks! |
|
@bpux https://issues.apache.org/jira/browse/KAFKA-7500, the primary JIRA for KIPs are linked from the KIP document itself: https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0 |
|
@ewencp thanks! will comment in there |
|
Hi @ryannedolan -
I just got done setting up Mirror Maker V2 where I work. It was successful, but the documentation doesn't match the code in various places. A couple of things that "got me" -
It should actually be something like (pardon me, I'm using the JMX-prometheus exporter):
In the code, it is
All in all a great product and vastly superior to Mirror Maker V1. Thanks! |
|
I can't seem to find the new metrics mentioned in the KIP. I installed JMXTERM on my mirror-maker pod and this is what I got: Domains: Beans for connect: As you can see, no mentions of "mirror" of any kind anywhere. Anyone able to help me understand what's going on? |

Implementation of KIP-382 "MirrorMaker 2.0" (approved) and KIP-416 "Notify SourceTask of ACK'd offsets, metadata" (not yet approved).
Depends on #6171
Quick start:
./bin/connect-mirror-maker.sh mm2.propertiesSample configuration file:
clusters = one, two, three, four
one.bootstrap.servers = ...
two.bootstrap.servers = ...
...
one->two.enabled = true # false by default
two->one.enabled = true
...
three->four.topics = topic1, topic2 # .* by default
...
sync.topic.acls.enabled = false # disable for non-secure clusters
The following features of the KIP are deferred for now:
- MirrorSinkConnector/Task -- not used by the MirrorMaker driver, but may be useful to run on a Connect cluster
- "legacy mode" script -- per KIP, not part of first release