Skip to content

Latest commit

 

History

History
259 lines (203 loc) · 8.7 KB

File metadata and controls

259 lines (203 loc) · 8.7 KB
 
1
/*
2
* librdkafka - Apache Kafka C library
3
*
Jun 29, 2023
Jun 29, 2023
4
* Copyright (c) 2020-2022, Magnus Edenhill
5
* All rights reserved.
6
*
7
* Redistribution and use in source and binary forms, with or without
8
* modification, are permitted provided that the following conditions are met:
9
*
10
* 1. Redistributions of source code must retain the above copyright notice,
11
* this list of conditions and the following disclaimer.
12
* 2. Redistributions in binary form must reproduce the above copyright notice,
13
* this list of conditions and the following disclaimer in the documentation
14
* and/or other materials provided with the distribution.
15
*
16
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26
* POSSIBILITY OF SUCH DAMAGE.
27
*/
28
29
#include "test.h"
30
31
#include "../src/rdkafka_proto.h"
32
33
34
/**
35
* @name Verify that producer and consumer resumes operation after
36
* a topic has been deleted and recreated.
37
*/
38
39
/**
40
* The message value to produce, one of:
41
* "before" - before topic deletion
42
* "during" - during topic deletion
43
* "after" - after topic has been re-created
44
* "end" - stop producing
45
*/
46
static mtx_t value_mtx;
47
static char *value;
48
Oct 28, 2021
Oct 28, 2021
49
static const int msg_rate = 10; /**< Messages produced per second */
Oct 28, 2021
Oct 28, 2021
51
static struct test *this_test; /**< Exposes current test struct (in TLS) to
52
* producer thread. */
53
54
55
/**
56
* @brief Treat all error_cb as non-test-fatal.
57
*/
Oct 28, 2021
Oct 28, 2021
58
static int
59
is_error_fatal(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) {
60
return rd_false;
61
}
62
63
/**
64
* @brief Producing thread
65
*/
Oct 28, 2021
Oct 28, 2021
66
static int run_producer(void *arg) {
67
const char *topic = arg;
68
rd_kafka_t *producer = test_create_producer();
Oct 28, 2021
Oct 28, 2021
69
int ret = 0;
70
71
test_curr = this_test;
72
73
/* Don't check message status */
74
test_curr->exp_dr_status = (rd_kafka_msg_status_t)-1;
75
76
while (1) {
77
rd_kafka_resp_err_t err;
78
79
mtx_lock(&value_mtx);
80
if (!strcmp(value, "end")) {
81
mtx_unlock(&value_mtx);
82
break;
83
} else if (strcmp(value, "before")) {
84
/* Ignore Delivery report errors after topic
85
* has been deleted and eventually re-created,
86
* we rely on the consumer to verify that
87
* messages are produced. */
88
test_curr->ignore_dr_err = rd_true;
89
}
90
91
err = rd_kafka_producev(
Oct 28, 2021
Oct 28, 2021
92
producer, RD_KAFKA_V_TOPIC(topic),
93
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
94
RD_KAFKA_V_VALUE(value, strlen(value)), RD_KAFKA_V_END);
95
96
if (err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART ||
97
err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
98
TEST_SAY("Produce failed (expectedly): %s\n",
99
rd_kafka_err2name(err));
100
else
101
TEST_ASSERT(!err, "producev() failed: %s",
102
rd_kafka_err2name(err));
103
104
mtx_unlock(&value_mtx);
105
106
rd_usleep(1000000 / msg_rate, NULL);
107
108
rd_kafka_poll(producer, 0);
109
}
110
111
if (rd_kafka_flush(producer, 5000)) {
112
TEST_WARN("Failed to flush all message(s), %d remain\n",
113
rd_kafka_outq_len(producer));
114
/* Purge the messages to see which partition they were for */
Oct 28, 2021
Oct 28, 2021
115
rd_kafka_purge(producer, RD_KAFKA_PURGE_F_QUEUE |
116
RD_KAFKA_PURGE_F_INFLIGHT);
117
rd_kafka_flush(producer, 5000);
118
TEST_SAY("%d message(s) in queue after purge\n",
119
rd_kafka_outq_len(producer));
120
121
ret = 1; /* Fail test from main thread */
122
}
123
124
rd_kafka_destroy(producer);
125
126
return ret;
127
}
128
129
130
/**
131
* @brief Expect at least \p cnt messages with value matching \p exp_value,
132
* else fail the current test.
133
*/
Oct 28, 2021
Oct 28, 2021
134
static void
135
expect_messages(rd_kafka_t *consumer, int cnt, const char *exp_value) {
136
int match_cnt = 0, other_cnt = 0, err_cnt = 0;
137
size_t exp_len = strlen(exp_value);
138
Oct 28, 2021
Oct 28, 2021
139
TEST_SAY("Expecting >= %d messages with value \"%s\"...\n", cnt,
140
exp_value);
141
142
while (match_cnt < cnt) {
143
rd_kafka_message_t *rkmessage;
144
145
rkmessage = rd_kafka_consumer_poll(consumer, 1000);
146
if (!rkmessage)
147
continue;
148
149
if (rkmessage->err) {
150
TEST_SAY("Consume error: %s\n",
151
rd_kafka_message_errstr(rkmessage));
152
err_cnt++;
153
} else if (rkmessage->len == exp_len &&
154
!memcmp(rkmessage->payload, exp_value, exp_len)) {
155
match_cnt++;
156
} else {
Oct 28, 2021
Oct 28, 2021
157
TEST_SAYL(3,
158
"Received \"%.*s\", expected \"%s\": "
159
"ignored\n",
160
(int)rkmessage->len,
Oct 28, 2021
Oct 28, 2021
161
(const char *)rkmessage->payload, exp_value);
162
other_cnt++;
163
}
164
165
rd_kafka_message_destroy(rkmessage);
166
}
167
Oct 28, 2021
Oct 28, 2021
168
TEST_SAY(
169
"Consumed %d messages matching \"%s\", "
170
"ignored %d others, saw %d error(s)\n",
171
match_cnt, exp_value, other_cnt, err_cnt);
172
}
173
174
175
/**
176
* @brief Test topic create + delete + create with first topic having
177
* \p part_cnt_1 partitions and second topic having \p part_cnt_2 .
178
*/
Oct 28, 2021
Oct 28, 2021
179
static void do_test_create_delete_create(int part_cnt_1, int part_cnt_2) {
180
rd_kafka_t *consumer;
181
thrd_t producer_thread;
182
const char *topic = test_mk_topic_name(__FUNCTION__, 1);
Oct 28, 2021
Oct 28, 2021
183
int ret = 0;
184
185
TEST_SAY(_C_MAG
186
"[ Test topic create(%d parts)+delete+create(%d parts) ]\n",
187
part_cnt_1, part_cnt_2);
188
189
consumer = test_create_consumer(topic, NULL, NULL, NULL);
190
191
/* Create topic */
Dec 13, 2024
Dec 13, 2024
192
test_create_topic_wait_exists(consumer, topic, part_cnt_1, 3, 5000);
193
194
/* Start consumer */
195
test_consumer_subscribe(consumer, topic);
Feb 19, 2021
Feb 19, 2021
196
test_consumer_wait_assignment(consumer, rd_true);
197
198
mtx_lock(&value_mtx);
199
value = "before";
200
mtx_unlock(&value_mtx);
201
202
/* Create producer thread */
Oct 28, 2021
Oct 28, 2021
203
if (thrd_create(&producer_thread, run_producer, (void *)topic) !=
204
thrd_success)
205
TEST_FAIL("thrd_create failed");
206
207
/* Consume messages for 5s */
208
expect_messages(consumer, msg_rate * 5, value);
209
210
/* Delete topic */
211
mtx_lock(&value_mtx);
212
value = "during";
213
mtx_unlock(&value_mtx);
214
215
test_delete_topic(consumer, topic);
216
rd_sleep(5);
217
218
/* Re-create topic */
Dec 13, 2024
Dec 13, 2024
219
test_create_topic_wait_exists(consumer, topic, part_cnt_2, 3, 5000);
220
221
mtx_lock(&value_mtx);
222
value = "after";
223
mtx_unlock(&value_mtx);
224
225
/* Consume for 5 more seconds, should see new messages */
226
expect_messages(consumer, msg_rate * 5, value);
227
228
rd_kafka_destroy(consumer);
229
230
/* Wait for producer to exit */
231
mtx_lock(&value_mtx);
232
value = "end";
233
mtx_unlock(&value_mtx);
234
235
if (thrd_join(producer_thread, &ret) != thrd_success || ret != 0)
236
TEST_FAIL("Producer failed: see previous errors");
237
238
TEST_SAY(_C_GRN
239
"[ Test topic create(%d parts)+delete+create(%d parts): "
240
"PASS ]\n",
241
part_cnt_1, part_cnt_2);
242
}
243
244
Oct 28, 2021
Oct 28, 2021
245
int main_0107_topic_recreate(int argc, char **argv) {
246
this_test = test_curr; /* Need to expose current test struct (in TLS)
247
* to producer thread. */
248
249
this_test->is_fatal_cb = is_error_fatal;
250
251
mtx_init(&value_mtx, mtx_plain);
252
253
test_conf_init(NULL, NULL, 60);
254
255
do_test_create_delete_create(10, 3);
256
do_test_create_delete_create(3, 6);
257
258
return 0;
259
}