Skip to content

[Feature] Add HTTP source Connector #4618

@Fungx

Description

@Fungx

Search before asking

  • I had searched in the issues and found no similar issues.

Feature Request

According to eventmesh-connectors, the Http connector is still not implemented. I would like to work on the Http source connector and here are some ideas.

In a nutshell, the goal is to implement a http source that receives http requests, converts them into cloud events, and then responds to the client.

Conversion

This specification defines three content modes for transferring events: binary, structured and batched. Every compliant implementation SHOULD support the structured and binary modes.

According to http-protocol-binding, and java-sdk only supports binary and structured, this time we also only support these two modes.

Http Server

The source connector uses Vert.x, a lightweight http server, to handle requests. Because the cloudevents java-sdk has good support for Vert.x, and Vert.x is also an efficient and stable http server.

The Vert.x http server is initialised in the connector and waits to handle http requests.

    private void doInit() throws Exception {
        this.queue = new LinkedBlockingQueue<>(1000);

        final Vertx vertx = Vertx.vertx();
        final Router router = Router.router(vertx);
        router.route()
            .path(this.sourceConfig.connectorConfig.getPath())
            .method(HttpMethod.POST)
            .handler(ctx -> {
                VertxMessageFactory.createReader(ctx.request())
                    // converts requests into cloud events and supports both binary mode and structured mode
                    .map(MessageReader::toEvent) 
                    .onSuccess(event -> {
                        queue.add(event);
                        ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end();
                    })
                    .onFailure(t -> {
                        ctx.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).setStatusMessage(t.getMessage()).end();
                    });
            });
        this.server = vertx.createHttpServer(new HttpServerOptions()
            .setPort(this.sourceConfig.connectorConfig.getPort())
        ).requestHandler(router);
    }

Here are two valid examples for each of binary and structured.

curl --location --request POST 'http://localhost:3755/test' \
--header 'ce-id: 11' \
--header 'ce-specversion: 1.0' \
--header 'ce-type: com.example.someevent' \
--header 'ce-source: /mycontext' \
--header 'ce-subject: test' \
--header 'Content-Type: text/plain' \
--data-raw 'aaa'
curl --location --request POST 'http://localhost:3755/test' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
    "specversion": "1.0",
    "type": "com.example.someevent",
    "source": "/mycontext",
    "subject":"test_topic",
    "datacontenttype":"text/plain",
    "id": "A234-1234-1234",
    "data": "aa"
}'

Configuration

The URI can be configured in source-config.yml. For example, the following configuration creates an API POST http://127.0.0.1:3755/test. We can call it in various ways (e.g. curl, GitHub Webhooks, etc.) to send messages.

connectorConfig:
    connectorName: httpSource
    path: /test
    port: 3755

@pandaapo Hi, what do you think?

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions