Skip to content

AIP-82: Add KafkaMessageQueueProvider#49938

Merged
vincbeck merged 12 commits intoapache:mainfrom
jason810496:feature/AIP-82/add-kafka-message-queue-provider
May 12, 2025
Merged

AIP-82: Add KafkaMessageQueueProvider#49938
vincbeck merged 12 commits intoapache:mainfrom
jason810496:feature/AIP-82/add-kafka-message-queue-provider

Conversation

@jason810496
Copy link
Member

Why

Several users have pointed out that the Event-Driven Scheduling feature introduced in Airflow 3 currently supports only Amazon SQS.

For example, in Airflow 3.0: The Good, The DAG, and the Ugly (and the Future!), the author notes:

“AssetWatchers, in my opinion, is where TRUE data-aware scheduling comes into play. However, Amazon SQS is the only message queue currently supported. While a great start, I’m excited to see how Airflow’s massive community builds on this, supporting various systems like Snowflake staging tables.”

What

This PR introduces support for Apache Kafka as a MessageQueueProvider, expanding the capabilities of Event-Driven Scheduling.

@vincbeck
Copy link
Contributor

Nice! ❤️

Copy link
Contributor

@vincbeck vincbeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Very happy to see Kafka getting onboard :)

@jason810496 jason810496 force-pushed the feature/AIP-82/add-kafka-message-queue-provider branch from 4cc1e33 to abbeb4c Compare April 29, 2025 10:15
@jason810496 jason810496 added provider:apache-kafka AIP-82 External event driven scheduling in Airflow labels Apr 29, 2025
@jason810496 jason810496 force-pushed the feature/AIP-82/add-kafka-message-queue-provider branch from abbeb4c to 5dcd611 Compare April 29, 2025 13:02
@jason810496 jason810496 marked this pull request as ready for review April 29, 2025 16:30
@jason810496 jason810496 force-pushed the feature/AIP-82/add-kafka-message-queue-provider branch from 5dcd611 to 703dcd7 Compare April 29, 2025 16:30
@jason810496 jason810496 changed the title [WIP] AIP-82: Add KafkaMessageQueueProvider AIP-82: Add KafkaMessageQueueProvider Apr 29, 2025
@jason810496 jason810496 marked this pull request as draft April 30, 2025 07:21
@jason810496 jason810496 force-pushed the feature/AIP-82/add-kafka-message-queue-provider branch from 703dcd7 to 96f68f3 Compare April 30, 2025 12:49
@jason810496 jason810496 marked this pull request as ready for review April 30, 2025 14:29
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requesting changes until we solve the "coupling" issue

@vikramkoka
Copy link
Contributor

Thank you for pointing that out @potiuk

I intended for it to be decoupled with the original messaging abstraction.
I didn't notice the "coupling issue" earlier. That was definitely unintentional.

@potiuk
Copy link
Member

potiuk commented Apr 30, 2025

Thank you for pointing that out @potiuk

I intended for it to be decoupled with the original messaging abstraction. I didn't notice the "coupling issue" earlier. That was definitely unintentional.

BTW. We can even make it backwards compatible when we switch to the "in-provider" abstraction

(I.e we could dynamically load sqs as back-compatibility from common.messaging)

@potiuk
Copy link
Member

potiuk commented Apr 30, 2025

There you go - #50057 I think it's already pretty complete change.

@jason810496
Copy link
Member Author

Thanks for pointing that out @potiuk, and for the PR to address it.

Moving the provider-specific message queue implementations to the providers directory definitely makes more sense!
I'll rebase and relocate my changes to the appropriate provider path once your PR is merged.

@jason810496
Copy link
Member Author

I'd also like to open a discussion about the end-user interface for MessageQueueTrigger. While working on the KafkaMessageQueueProvider, I noticed that the constructor arguments for different provider-specific trigger classes vary significantly.

e.g. Kafka: AwaitMessageTrigger, Google Pub/Sub: PubsubPullTrigger

def __init__(
self,
topics: Sequence[str],
apply_function: str,
kafka_config_id: str = "kafka_default",
apply_function_args: Sequence[Any] | None = None,
apply_function_kwargs: dict[Any, Any] | None = None,
poll_timeout: float = 1,
poll_interval: float = 5,
) -> None:

def __init__(
self,
project_id: str,
subscription: str,
max_messages: int,
ack_messages: bool,
gcp_conn_id: str,
poke_interval: float = 10.0,
impersonation_chain: str | Sequence[str] | None = None,
):

Additionally, the queue URI argument in the current interface is used solely to match the appropriate provider, and not necessarily tied to an Airflow Connection. This might be confusing to users.

Should we consider replacing the queue argument in the MessageQueueTrigger constructor with something more explicit, like type or provider?

For example:

AVAILABLE_MESSAGE_QUEUE_PROVIDERS = Literal[
    "sqs",
    "kafka",
]

class MessageQueueTrigger(BaseEventTrigger):

    def __init__(self, *, provider: AVAILABLE_MESSAGE_QUEUE_PROVIDERS, **kwargs: Any) -> None:
        self.provider = provider
        self.kwargs = kwargs

   @cached_property
    def trigger(self) -> BaseEventTrigger:
        # get trigger_class based on provider attribute

cc @potiuk, @vincbeck, @vikramkoka

@vincbeck
Copy link
Contributor

vincbeck commented May 1, 2025

I'd also like to open a discussion about the end-user interface for MessageQueueTrigger. While working on the KafkaMessageQueueProvider, I noticed that the constructor arguments for different provider-specific trigger classes vary significantly.

e.g. Kafka: AwaitMessageTrigger, Google Pub/Sub: PubsubPullTrigger

def __init__(
self,
topics: Sequence[str],
apply_function: str,
kafka_config_id: str = "kafka_default",
apply_function_args: Sequence[Any] | None = None,
apply_function_kwargs: dict[Any, Any] | None = None,
poll_timeout: float = 1,
poll_interval: float = 5,
) -> None:

def __init__(
self,
project_id: str,
subscription: str,
max_messages: int,
ack_messages: bool,
gcp_conn_id: str,
poke_interval: float = 10.0,
impersonation_chain: str | Sequence[str] | None = None,
):

Additionally, the queue URI argument in the current interface is used solely to match the appropriate provider, and not necessarily tied to an Airflow Connection. This might be confusing to users.

Should we consider replacing the queue argument in the MessageQueueTrigger constructor with something more explicit, like type or provider?

For example:

AVAILABLE_MESSAGE_QUEUE_PROVIDERS = Literal[
    "sqs",
    "kafka",
]

class MessageQueueTrigger(BaseEventTrigger):

    def __init__(self, *, provider: AVAILABLE_MESSAGE_QUEUE_PROVIDERS, **kwargs: Any) -> None:
        self.provider = provider
        self.kwargs = kwargs

   @cached_property
    def trigger(self) -> BaseEventTrigger:
        # get trigger_class based on provider attribute

cc @potiuk, @vincbeck, @vikramkoka

Good question. I really wanted to avoid that and totally abstract the notion of provider to the user, that's why I came up with the parameter queue. But I dont think it is possible to know from a Kafka topic name or a Google pubsub project_id the identity of the provider. One solution would be to "create" a format/protocol where we could identity the provider and extract the queue identifier:

  • Kafka: kafka://{topic_name}
  • Google pubsub: google://{project_id}

But this would create a new complexity specific to Airflow so I dont think this is a good idea. To be honest I am not a big fan of introducing this new parameter provider but I do not see other solutions. WDYT?

@potiuk
Copy link
Member

potiuk commented May 1, 2025

Why don't we add additonal *trigger_kwargs that we could pass additionally to the message queue and use as constructor args ?

@potiuk
Copy link
Member

potiuk commented May 1, 2025

We have very similar approach for connections already where we pass hook kwargs

@potiuk
Copy link
Member

potiuk commented May 1, 2025

Ah we already have the kwargs..... ... bad me .. I see the point now.

First of all: - no, absolutely we should not have list of avaliable providers in common.messaging . That completely defeats the purpose of common.messaging being a generic interface if we want to make list of providers in the common.messaging.

What would happen if we want to add a new provider to support common.messaging? Should we release a new common.messaging provider version? That makes no sense and is completely against the "genericness" of it - also it prevents others from developing their common.messaging capable provider independently.

Other than optional extras that indicate the list of providers that we have in community that support common.messaging - there should be ABSOLUTELY NO DIFFERENCE between our providers, and 3rd-party providers implementing common.messaging. So list of "possible" providers in the code of common.messaging is generally out-of-the-question.

I think the way it works now is not really too confusing and pretty good

The queue should be enough and should be interpreted by the particular MessageQueue implementation - i.e. if the queue matches SQS, it's up to SQSMessageQueue to interpret the queue + kwargs. Ideally it will derive some basic parameters that can be passed to the Trigger without passing extra args. But yes - there might be other MessageQueue implementations that expect more. And it should be up to the implemenation how to interpret - both queue and kwargs.

For example I can imagine a PubSubMessageQueue implementation that takes such a queue (not sure what is the right form - it's more of a logical representation

queue="pubsub://project:subscription", kwargs = {"gcp_conn_id" = "connection" }

In this case the queue could be parsed by the PubSubMessageQueue and converted into appropriate trigger parameters, while allowing additional trigger parameters to be passed by kwargs if they cannot be easily translated into queue URI or where we would like to use some semi-standardized form of the queue format (where some of the parameters of trigger cannot be added to)

We just need to be very clear - for each implementation - what is the format of the queue URI. I don't think this will be universally standard - because basically there is no universal standard for globally recognized URI format for message queues. Or maybe there are 5 competing standards or smth.. I think - similarly as in case of connections (where we have AIRFLOW connection URI form - for good or better) this should be done this way.

Two learning from connections though:

  • we have to be VERY clear that queue is not universally standard and each message queue should precisely describe what kind of URI and kwargs it expects
  • we might consider supporting "json" form of the queue on top of the URI. URI has certain limitations (especially around encoding) and eventually we settled on both URI and JSON formats supported for connection for that reason.

@potiuk potiuk closed this May 1, 2025
@potiuk potiuk reopened this May 1, 2025
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

@jason810496 jason810496 force-pushed the feature/AIP-82/add-kafka-message-queue-provider branch from ffc5cff to cc37548 Compare May 5, 2025 11:49
@jason810496 jason810496 marked this pull request as ready for review May 5, 2025 11:49
@jason810496
Copy link
Member Author

Just moved the Kafka Queue Provider to the Apache Kafka-specific directory and it's ready for review.

@jason810496 jason810496 force-pushed the feature/AIP-82/add-kafka-message-queue-provider branch from cc37548 to ae19216 Compare May 6, 2025 03:08
@jason810496 jason810496 requested review from vikramkoka and vincbeck May 7, 2025 03:42
@jason810496
Copy link
Member Author

Hi @vincbeck do you mind take a look at PR after moving to Kafka-specific directory when you are free? Thanks.
If there isn't other concern, I will keep working on other common Message Queue providers to enrich Event-Driven Scheduling feature.

@vincbeck
Copy link
Contributor

Very nice!

@vincbeck vincbeck merged commit 11376a7 into apache:main May 12, 2025
64 checks passed
@cmarteepants
Copy link
Collaborator

cmarteepants commented May 21, 2025

This is awesome! Which version are we targeting for this? 3.0.x or 3.1?

@jason810496
Copy link
Member Author

jason810496 commented May 22, 2025

This is awesome! Which version are we targeting for this? 3.0.x or 3.1?

This targets 3.0.x and will be included in the next provider release ( UPDATE: just released 4 days ago, #50599 (comment) ).
However, I haven't tested the RC version yet 😅

#50599

sanederchik pushed a commit to sanederchik/airflow that referenced this pull request Jun 7, 2025
* AIP-82: Add KafkaMessageQueueProvider

- Add Apache kafka to index.rst

- Fix mypy static check

* Add TestKafkaMessageQueueProvider unittest

* Fix kafka_message_queue_trigger system test

* Refine docstring of KafkaMessageQueueProvider

* Fix common-messaging docs

* Move Kafka message queue to Apache Kafka provider

* Fix common messaging dev dependency-group

* Fix kafka queue test

* Fix compact test

* Refine doc for KafkaMessageQueueProvider

* Move system test to provider-specific directory
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants