-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Labels
Description
Apache Airflow version
2.2.0 (latest released)
Operating System
Debian GNU/Linux 11 (bullseye)
Versions of Apache Airflow Providers
apache-airflow-providers-jdbc==1!2.0.1
Deployment
Astronomer
Deployment details
Astro CLI Version: 0.25.4, Git Commit: e95011fbf800fda9fdef1dc1d5149d62bc017aed
What happened
The execute method calls the hook run method without passing any optional handler parameter to the DbApiHook parent class of JdbcHook.
What you expected to happen
Without the handler, the results of the DbApiHook, and the JdbcOperator, will always be an empty list. In the case of the JdbcOperator, the underlying JayDeBeApi connection uses a handler of the form lambda x: x.fetchall() to return results.
How to reproduce
- Using the astro cli
- Download postgresql-42.3.0.jar into the working directory
- Download zulu11.52.13-ca-jdk11.0.13-linux_x64.tar.gz to the working directory
- Copy the following to the Dockerfile
FROM quay.io/astronomer/ap-airflow:2.2.0-buster-onbuild
COPY postgresql-42.3.0.jar /usr/local/airflow/.
USER root
RUN cd /opt && mkdir java
COPY zulu11.52.13-ca-jdk11.0.13-linux_x64.tar.gz /opt/java
RUN cd /opt/java && pwd && ls && tar xfvz ./zulu11.52.13-ca-jdk11.0.13-linux_x64.tar.gz
ENV JAVA_HOME /opt/java/zulu11.52.13-ca-jdk11.0.13-linux_x64
RUN export JAVA_HOME
- Copy the following into the
airflow_settings.yamlfile
airflow:
connections:
- conn_id: my_jdbc_connection
conn_type: jdbc
conn_host: "jdbc:postgresql://postgres:5432/"
conn_schema:
conn_login: postgres
conn_password: postgres
conn_port:
conn_extra: '{"extra__jdbc__drv_clsname":"org.postgresql.Driver", "extra__jdbc__drv_path":"/usr/local/airflow/postgresql-42.3.0.jar"}'- Copy the following DAG and run it. No data will be passed to XCOM
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.jdbc.operators.jdbc import JdbcOperator
with DAG(
dag_id='example_jdbc_operator',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
dagrun_timeout=timedelta(minutes=60),
tags=['example'],
catchup=False,
) as dag:
run_this_last = DummyOperator(task_id='run_this_last')
query_dags_data = JdbcOperator(
task_id='query_dags_data',
sql='SELECT dag_id, is_active FROM dag',
jdbc_conn_id='my_jdbc_connection',
autocommit=True,
)
query_dags_data >> run_this_last - Copy the following DAG and run it. All DAGs and their active status will be put onto XCOM
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.jdbc.operators.jdbc import JdbcOperator
from airflow.providers.jdbc.hooks.jdbc import JdbcHook
class JdbcHandlerOperator(JdbcOperator):
def execute(self, context) -> None:
self.log.info('Executing: %s', self.sql)
hook = JdbcHook(jdbc_conn_id=self.jdbc_conn_id)
return hook.run(
self.sql,
self.autocommit,
parameters=self.parameters,
# Defined by how JayDeBeApi operates
handler=lambda x: x.fetchall()
)
with DAG(
dag_id='example_jdbc_operator',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
dagrun_timeout=timedelta(minutes=60),
tags=['example'],
catchup=False,
) as dag:
run_this_last = DummyOperator(task_id='run_this_last')
query_dags_data = JdbcHandlerOperator(
task_id='query_dags_data',
sql='SELECT dag_id, is_active FROM dag',
jdbc_conn_id='my_jdbc_connection',
autocommit=True,
)
query_dags_data >> run_this_last Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct