2121
2222import org .apache .eventmesh .connector .http .sink .HttpSinkConnector ;
2323import org .apache .eventmesh .connector .http .sink .config .HttpSinkConfig ;
24+ import org .apache .eventmesh .connector .http .sink .config .HttpWebhookConfig ;
2425import org .apache .eventmesh .openconnect .offsetmgmt .api .data .ConnectRecord ;
2526import org .apache .eventmesh .openconnect .offsetmgmt .api .data .RecordOffset ;
2627import org .apache .eventmesh .openconnect .offsetmgmt .api .data .RecordPartition ;
3435import org .junit .jupiter .api .AfterEach ;
3536import org .junit .jupiter .api .BeforeEach ;
3637import org .junit .jupiter .api .Test ;
37- import org .mockserver .client .MockServerClient ;
3838import org .mockserver .integration .ClientAndServer ;
3939import org .mockserver .model .HttpRequest ;
4040import org .mockserver .model .HttpResponse ;
4141import org .mockserver .model .MediaType ;
42- import org .mockserver .verify .VerificationTimes ;
43-
44- import io .vertx .core .http .HttpMethod ;
4542
4643import com .alibaba .fastjson2 .JSON ;
44+ import com .alibaba .fastjson2 .JSONArray ;
4745import com .alibaba .fastjson2 .JSONObject ;
4846
47+ import okhttp3 .HttpUrl ;
48+ import okhttp3 .OkHttpClient ;
49+ import okhttp3 .Request ;
50+ import okhttp3 .Response ;
51+ import okhttp3 .ResponseBody ;
52+
4953public class HttpSinkConnectorTest {
5054
5155 private HttpSinkConnector sinkConnector ;
@@ -68,8 +72,7 @@ void before() throws Exception {
6872 this .severUri = URI .create (sinkConfig .connectorConfig .getUrls ()[0 ]);
6973 // start mockServer
7074 mockServer = ClientAndServer .startClientAndServer (severUri .getPort ());
71- // mockServer response
72- new MockServerClient (severUri .getHost (), severUri .getPort ())
75+ mockServer .reset ()
7376 .when (
7477 request ()
7578 .withMethod ("POST" )
@@ -113,46 +116,49 @@ void testPut() throws Exception {
113116 Thread .sleep (5000 );
114117
115118 // verify request
116- new MockServerClient (severUri .getHost (), severUri .getPort ())
117- .verify (
118- HttpRequest .request ()
119- .withMethod (HttpMethod .POST .name ())
120- .withPath (severUri .getPath ()),
121- VerificationTimes .exactly (times ));
122-
123- /*
124- **The following code is only required in webhook mode**
119+ HttpRequest [] recordedRequests = mockServer .retrieveRecordedRequests (null );
120+ assert recordedRequests .length == times ;
125121
126122 // verify response
127123 HttpWebhookConfig webhookConfig = sinkConfig .connectorConfig .getWebhookConfig ();
128- URI uri = new URIBuilder()
129- .setScheme("http")
130- .setHost(severUri.getHost())
131- .setPort(webhookConfig.getPort())
132- .setPath(webhookConfig.getExportPath())
133- .addParameter("pageNum", "1")
134- .addParameter("pageSize", "10")
135- .addParameter("type", "poll")
124+ String url = new HttpUrl .Builder ()
125+ .scheme ("http" )
126+ .host (severUri .getHost ())
127+ .port (webhookConfig .getPort ())
128+ .addPathSegments (webhookConfig .getExportPath ())
129+ .addQueryParameter ("pageNum" , "1" )
130+ .addQueryParameter ("pageSize" , "10" )
131+ .addQueryParameter ("type" , "poll" )
132+ .build ().toString ();
133+
134+ // build request
135+ Request request = new Request .Builder ()
136+ .url (url )
137+ .addHeader ("Content-Type" , "application/json" )
136138 .build ();
137139
138- CloseableHttpClient httpClient = HttpClients.createDefault();
139- HttpGet httpGet = new HttpGet(uri);
140- httpGet.setHeader("Content-Type", "application/json");
141- CloseableHttpResponse response = httpClient.execute(httpGet);
142- String body = EntityUtils.toString(response.getEntity());
143- assert body != null;
144- JSONArray pageItems = JSON.parseObject(body).getJSONArray("pageItems");
145- assert pageItems != null && pageItems.size() == times;
146- for (int i = 0; i < times; i++) {
147- JSONObject pageItem = pageItems.getJSONObject(i);
148- assert pageItem != null;
149- assert pageItem.getJSONObject("data") != null;
150- assert pageItem.getJSONObject("metadata") != null;
140+ OkHttpClient client = new OkHttpClient ();
141+ try (Response response = client .newCall (request ).execute ()) {
142+ // check response code
143+ if (!response .isSuccessful ()) {
144+ throw new RuntimeException ("Unexpected response code: " + response );
145+ }
146+ // check response body
147+ ResponseBody responseBody = response .body ();
148+ if (responseBody != null ) {
149+ JSONObject jsonObject = JSON .parseObject (responseBody .string ());
150+ JSONArray pageItems = jsonObject .getJSONArray ("pageItems" );
151+
152+ assert pageItems != null && pageItems .size () == times ;
153+
154+ for (int i = 0 ; i < times ; i ++) {
155+ JSONObject pageItem = pageItems .getJSONObject (i );
156+ assert pageItem != null ;
157+ assert pageItem .getJSONObject ("data" ) != null ;
158+ assert pageItem .getJSONObject ("metadata" ) != null ;
159+ }
160+ }
151161 }
152-
153- httpClient.close();
154-
155- */
156162 }
157163
158164 private ConnectRecord createConnectRecord () {
0 commit comments