Skip to content

Commit c24740c

Browse files
committed
KAFKA-1328 New consumer APIs; reviewed by Jun Rao and Guozhang Wang
1 parent bf7fb63 commit c24740c

File tree

11 files changed

+1720
-24
lines changed

11 files changed

+1720
-24
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
3+
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
4+
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
5+
* License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package org.apache.kafka.clients.consumer;
14+
15+
import java.io.Closeable;
16+
import java.util.Collection;
17+
import java.util.Map;
18+
19+
import org.apache.kafka.common.Metric;
20+
import org.apache.kafka.common.TopicPartition;
21+
22+
/**
23+
* @see KafkaConsumer
24+
* @see MockConsumer
25+
*/
26+
public interface Consumer extends Closeable {
27+
28+
/**
29+
* Incrementally subscribe to the given list of topics. This API is mutually exclusive to
30+
* {@link #subscribe(TopicPartition...) subscribe(partitions)}
31+
* @param topics A variable list of topics that the consumer subscribes to
32+
*/
33+
public void subscribe(String...topics);
34+
35+
/**
36+
* Incrementally subscribes to a specific topic and partition. This API is mutually exclusive to
37+
* {@link #subscribe(String...) subscribe(topics)}
38+
* @param partitions Partitions to subscribe to
39+
*/
40+
public void subscribe(TopicPartition... partitions);
41+
42+
/**
43+
* Unsubscribe from the specific topics. Messages for this topic will not be returned from the next {@link #poll(long) poll()}
44+
* onwards. This should be used in conjunction with {@link #subscribe(String...) subscribe(topics)}. It is an error to
45+
* unsubscribe from a topic that was never subscribed to using {@link #subscribe(String...) subscribe(topics)}
46+
* @param topics Topics to unsubscribe from
47+
*/
48+
public void unsubscribe(String... topics);
49+
50+
/**
51+
* Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next
52+
* {@link #poll(long) poll()} onwards. This should be used in conjunction with
53+
* {@link #subscribe(TopicPartition...) subscribe(topic, partitions)}. It is an error to
54+
* unsubscribe from a partition that was never subscribed to using {@link #subscribe(TopicPartition...) subscribe(partitions)}
55+
* @param partitions Partitions to unsubscribe from
56+
*/
57+
public void unsubscribe(TopicPartition... partitions);
58+
59+
/**
60+
* Fetches data for the subscribed list of topics and partitions
61+
* @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits indefinitely. Must not be negative
62+
* @return Map of topic to records for the subscribed topics and partitions as soon as data is available for a topic partition. Availability
63+
* of data is controlled by {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG} and {@link ConsumerConfig#FETCH_MAX_WAIT_MS_CONFIG}.
64+
* If no data is available for timeout ms, returns an empty list
65+
*/
66+
public Map<String, ConsumerRecords> poll(long timeout);
67+
68+
/**
69+
* Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
70+
* @param sync If true, the commit should block until the consumer receives an acknowledgment
71+
* @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null
72+
* if the sync flag is set to false
73+
*/
74+
public OffsetMetadata commit(boolean sync);
75+
76+
/**
77+
* Commits the specified offsets for the specified list of topics and partitions to Kafka.
78+
* @param offsets The map of offsets to commit for the given topic partitions
79+
* @param sync If true, commit will block until the consumer receives an acknowledgment
80+
* @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null
81+
* if the sync flag is set to false.
82+
*/
83+
public OffsetMetadata commit(Map<TopicPartition, Long> offsets, boolean sync);
84+
85+
/**
86+
* Overrides the fetch positions that the consumer will use on the next fetch request. If the consumer subscribes to a list of topics
87+
* using {@link #subscribe(String...) subscribe(topics)}, an exception will be thrown if the specified topic partition is not owned by
88+
* the consumer.
89+
* @param offsets The map of fetch positions per topic and partition
90+
*/
91+
public void seek(Map<TopicPartition, Long> offsets);
92+
93+
/**
94+
* Returns the fetch position of the <i>next message</i> for the specified topic partition to be used on the next {@link #poll(long) poll()}
95+
* @param partitions Partitions for which the fetch position will be returned
96+
* @return The position from which data will be fetched for the specified partition on the next {@link #poll(long) poll()}
97+
*/
98+
public Map<TopicPartition, Long> position(Collection<TopicPartition> partitions);
99+
100+
/**
101+
* Fetches the last committed offsets for the input list of partitions
102+
* @param partitions The list of partitions to return the last committed offset for
103+
* @return The list of offsets for the specified list of partitions
104+
*/
105+
public Map<TopicPartition, Long> committed(Collection<TopicPartition> partitions);
106+
107+
/**
108+
* Fetches offsets before a certain timestamp
109+
* @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest available timestamp.
110+
* @param partitions The list of partitions for which the offsets are returned
111+
* @return The offsets for messages that were written to the server before the specified timestamp.
112+
*/
113+
public Map<TopicPartition, Long> offsetsBeforeTime(long timestamp, Collection<TopicPartition> partitions);
114+
115+
/**
116+
* Return a map of metrics maintained by the consumer
117+
*/
118+
public Map<String, ? extends Metric> metrics();
119+
120+
/**
121+
* Close this consumer
122+
*/
123+
public void close();
124+
125+
}
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
3+
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
4+
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
5+
* License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package org.apache.kafka.clients.consumer;
14+
15+
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
16+
17+
import java.util.Map;
18+
19+
import org.apache.kafka.common.config.AbstractConfig;
20+
import org.apache.kafka.common.config.ConfigDef;
21+
import org.apache.kafka.common.config.ConfigDef.Importance;
22+
import org.apache.kafka.common.config.ConfigDef.Type;
23+
24+
/**
25+
* The consumer configuration keys
26+
*/
27+
public class ConsumerConfig extends AbstractConfig {
28+
private static final ConfigDef config;
29+
30+
/**
31+
* The identifier of the group this consumer belongs to. This is required if the consumer uses either the
32+
* group management functionality by using {@link Consumer#subscribe(String...) subscribe(topics)}. This is also required
33+
* if the consumer uses the default Kafka based offset management strategy.
34+
*/
35+
public static final String GROUP_ID_CONFIG = "group.id";
36+
37+
/**
38+
* The timeout after which, if the {@link Consumer#poll(long) poll(timeout)} is not invoked, the consumer is
39+
* marked dead and a rebalance operation is triggered for the group identified by {@link #GROUP_ID_CONFIG}. Relevant
40+
* if the consumer uses the group management functionality by invoking {@link Consumer#subscribe(String...) subscribe(topics)}
41+
*/
42+
public static final String SESSION_TIMEOUT_MS = "session.timeout.ms";
43+
44+
/**
45+
* The number of times a consumer sends a heartbeat to the co-ordinator broker within a {@link #SESSION_TIMEOUT_MS} time window.
46+
* This frequency affects the latency of a rebalance operation since the co-ordinator broker notifies a consumer of a rebalance
47+
* in the heartbeat response. Relevant if the consumer uses the group management functionality by invoking
48+
* {@link Consumer#subscribe(String...) subscribe(topics)}
49+
*/
50+
public static final String HEARTBEAT_FREQUENCY = "heartbeat.frequency";
51+
52+
/**
53+
* A list of URLs to use for establishing the initial connection to the cluster. This list should be in the form
54+
* <code>host1:port1,host2:port2,...</code>. These urls are just used for the initial connection to discover the
55+
* full cluster membership (which may change dynamically) so this list need not contain the full set of servers (you
56+
* may want more than one, though, in case a server is down).
57+
*/
58+
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
59+
60+
/**
61+
* If true, periodically commit to Kafka the offsets of messages already returned by the consumer. This committed
62+
* offset will be used when the process fails as the position from which the consumption will begin.
63+
*/
64+
public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
65+
66+
/**
67+
* The friendly name of the partition assignment strategy that the server will use to distribute partition ownership
68+
* amongst consumer instances when group management is used
69+
*/
70+
public static final String PARTITION_ASSIGNMENT_STRATEGY = "partition.assignment.strategy";
71+
72+
/**
73+
* The frequency in milliseconds that the consumer offsets are committed to Kafka. Relevant if {@link #ENABLE_AUTO_COMMIT_CONFIG}
74+
* is turned on.
75+
*/
76+
public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
77+
78+
/**
79+
* What to do when there is no initial offset in Kafka or if an offset is out of range:
80+
* <ul>
81+
* <li> smallest: automatically reset the offset to the smallest offset
82+
* <li> largest: automatically reset the offset to the largest offset
83+
* <li> disable: throw exception to the consumer if no previous offset is found for the consumer's group
84+
* <li> anything else: throw exception to the consumer.
85+
* </ul>
86+
*/
87+
public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
88+
89+
/**
90+
* The minimum amount of data the server should return for a fetch request. If insufficient data is available the
91+
* request will wait for that much data to accumulate before answering the request.
92+
*/
93+
public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
94+
95+
/**
96+
* The maximum amount of time the server will block before answering the fetch request if there isn't sufficient
97+
* data to immediately satisfy {@link #FETCH_MIN_BYTES_CONFIG}. This should be less than or equal to the timeout used in
98+
* {@link KafkaConsumer#poll(long) poll(timeout)}
99+
*/
100+
public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
101+
102+
/**
103+
* The maximum amount of time to block waiting to fetch metadata about a topic the first time a record is received
104+
* from that topic. The consumer will throw a TimeoutException if it could not successfully fetch metadata within
105+
* this timeout.
106+
*/
107+
public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
108+
109+
/**
110+
* The total memory used by the consumer to buffer records received from the server. This config is meant to control
111+
* the consumer's memory usage, so it is the size of the global fetch buffer that will be shared across all partitions.
112+
*/
113+
public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes";
114+
115+
/**
116+
* The minimum amount of memory that should be used to fetch at least one message for a partition. This puts a lower
117+
* bound on the consumer's memory utilization when there is at least one message for a partition available on the server.
118+
* This size must be at least as large as the maximum message size the server allows or else it is possible for the producer
119+
* to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large
120+
* message on a certain partition.
121+
*/
122+
public static final String FETCH_BUFFER_CONFIG = "fetch.buffer.bytes";
123+
124+
/**
125+
* The id string to pass to the server when making requests. The purpose of this is to be able to track the source
126+
* of requests beyond just ip/port by allowing a logical application name to be included.
127+
*/
128+
public static final String CLIENT_ID_CONFIG = "client.id";
129+
130+
/**
131+
* The size of the TCP send buffer to use when fetching data
132+
*/
133+
public static final String SOCKET_RECEIVE_BUFFER_CONFIG = "socket.receive.buffer.bytes";
134+
135+
/**
136+
* The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a
137+
* host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.
138+
*/
139+
public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
140+
141+
/** <code>metrics.sample.window.ms</code> */
142+
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
143+
private static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The metrics system maintains a configurable number of samples over a fixed window size. This configuration " + "controls the size of the window. For example we might maintain two samples each measured over a 30 second period. "
144+
+ "When a window expires we erase and overwrite the oldest window.";
145+
146+
/** <code>metrics.num.samples</code> */
147+
public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
148+
private static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics.";
149+
150+
/** <code>metric.reporters</code> */
151+
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
152+
private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
153+
154+
static {
155+
/* TODO: add config docs */
156+
config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, "blah blah")
157+
.define(GROUP_ID_CONFIG, Type.STRING, Importance.HIGH, "blah blah")
158+
.define(SESSION_TIMEOUT_MS, Type.LONG, 1000, Importance.HIGH, "blah blah")
159+
.define(HEARTBEAT_FREQUENCY, Type.INT, 3, Importance.MEDIUM, "blah blah")
160+
.define(PARTITION_ASSIGNMENT_STRATEGY, Type.STRING, Importance.MEDIUM, "blah blah")
161+
.define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), Importance.MEDIUM, "blah blah")
162+
.define(ENABLE_AUTO_COMMIT_CONFIG, Type.BOOLEAN, true, Importance.MEDIUM, "blah blah")
163+
.define(AUTO_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, 5000, atLeast(0), Importance.LOW, "blah blah")
164+
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.LOW, "blah blah")
165+
.define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.LOW, "blah blah")
166+
.define(FETCH_BUFFER_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), Importance.HIGH, "blah blah")
167+
.define(SOCKET_RECEIVE_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.LOW, "blah blah")
168+
.define(FETCH_MIN_BYTES_CONFIG, Type.LONG, 1024, atLeast(0), Importance.HIGH, "blah blah")
169+
.define(FETCH_MAX_WAIT_MS_CONFIG, Type.LONG, 500, atLeast(0), Importance.LOW, "blah blah")
170+
.define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, "blah blah")
171+
.define(AUTO_OFFSET_RESET_CONFIG, Type.STRING, "largest", Importance.MEDIUM, "blah blah")
172+
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
173+
Type.LONG,
174+
30000,
175+
atLeast(0),
176+
Importance.LOW,
177+
METRICS_SAMPLE_WINDOW_MS_DOC)
178+
.define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
179+
.define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC);
180+
181+
}
182+
183+
ConsumerConfig(Map<? extends Object, ? extends Object> props) {
184+
super(config, props);
185+
}
186+
187+
}

0 commit comments

Comments
 (0)