AIP-82: Add KafkaMessageQueueProvider#49938
Conversation
|
Nice! ❤️ |
vincbeck
left a comment
There was a problem hiding this comment.
LGTM. Very happy to see Kafka getting onboard :)
4cc1e33 to
abbeb4c
Compare
abbeb4c to
5dcd611
Compare
5dcd611 to
703dcd7
Compare
703dcd7 to
96f68f3
Compare
providers/common/messaging/src/airflow/providers/common/messaging/providers/kafka.py
Show resolved
Hide resolved
providers/common/messaging/src/airflow/providers/common/messaging/providers/__init__.py
Outdated
Show resolved
Hide resolved
potiuk
left a comment
There was a problem hiding this comment.
Requesting changes until we solve the "coupling" issue
providers/common/messaging/src/airflow/providers/common/messaging/providers/kafka.py
Show resolved
Hide resolved
|
Thank you for pointing that out @potiuk I intended for it to be decoupled with the original messaging abstraction. |
BTW. We can even make it backwards compatible when we switch to the "in-provider" abstraction (I.e we could dynamically load sqs as back-compatibility from common.messaging) |
|
There you go - #50057 I think it's already pretty complete change. |
|
Thanks for pointing that out @potiuk, and for the PR to address it. Moving the provider-specific message queue implementations to the |
|
I'd also like to open a discussion about the end-user interface for e.g. Kafka: Additionally, the Should we consider replacing the For example: AVAILABLE_MESSAGE_QUEUE_PROVIDERS = Literal[
"sqs",
"kafka",
]
class MessageQueueTrigger(BaseEventTrigger):
def __init__(self, *, provider: AVAILABLE_MESSAGE_QUEUE_PROVIDERS, **kwargs: Any) -> None:
self.provider = provider
self.kwargs = kwargs
@cached_property
def trigger(self) -> BaseEventTrigger:
# get trigger_class based on provider attributecc @potiuk, @vincbeck, @vikramkoka |
Good question. I really wanted to avoid that and totally abstract the notion of provider to the user, that's why I came up with the parameter
But this would create a new complexity specific to Airflow so I dont think this is a good idea. To be honest I am not a big fan of introducing this new parameter |
|
Why don't we add additonal |
|
We have very similar approach for connections already where we pass hook kwargs |
|
Ah we already have the kwargs..... ... bad me .. I see the point now. First of all: - no, absolutely we should not have list of avaliable providers in What would happen if we want to add a new provider to support common.messaging? Should we release a new common.messaging provider version? That makes no sense and is completely against the "genericness" of it - also it prevents others from developing their Other than optional extras that indicate the list of providers that we have in community that support common.messaging - there should be ABSOLUTELY NO DIFFERENCE between our providers, and 3rd-party providers implementing common.messaging. So list of "possible" providers in the code of common.messaging is generally out-of-the-question. I think the way it works now is not really too confusing and pretty good The For example I can imagine a PubSubMessageQueue implementation that takes such a queue (not sure what is the right form - it's more of a logical representation In this case the queue could be parsed by the PubSubMessageQueue and converted into appropriate trigger parameters, while allowing additional trigger parameters to be passed by kwargs if they cannot be easily translated into We just need to be very clear - for each implementation - what is the format of the Two learning from connections though:
|
ffc5cff to
cc37548
Compare
|
Just moved the Kafka Queue Provider to the Apache Kafka-specific directory and it's ready for review. |
providers/common/messaging/tests/system/common/messaging/kafka_message_queue_trigger.py
Show resolved
Hide resolved
- Add Apache kafka to index.rst - Fix mypy static check
cc37548 to
ae19216
Compare
|
Hi @vincbeck do you mind take a look at PR after moving to Kafka-specific directory when you are free? Thanks. |
|
Very nice! |
|
This is awesome! Which version are we targeting for this? 3.0.x or 3.1? |
This targets 3.0.x and will be included in the |
* AIP-82: Add KafkaMessageQueueProvider - Add Apache kafka to index.rst - Fix mypy static check * Add TestKafkaMessageQueueProvider unittest * Fix kafka_message_queue_trigger system test * Refine docstring of KafkaMessageQueueProvider * Fix common-messaging docs * Move Kafka message queue to Apache Kafka provider * Fix common messaging dev dependency-group * Fix kafka queue test * Fix compact test * Refine doc for KafkaMessageQueueProvider * Move system test to provider-specific directory
Why
Several users have pointed out that the Event-Driven Scheduling feature introduced in Airflow 3 currently supports only Amazon SQS.
For example, in Airflow 3.0: The Good, The DAG, and the Ugly (and the Future!), the author notes:
What
This PR introduces support for Apache Kafka as a
MessageQueueProvider, expanding the capabilities of Event-Driven Scheduling.