Skip to content

JdbcOperator should pass handler parameter to JdbcHook.run #19313

@cdabella

Description

@cdabella

Apache Airflow version

2.2.0 (latest released)

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

apache-airflow-providers-jdbc==1!2.0.1

Deployment

Astronomer

Deployment details

Astro CLI Version: 0.25.4, Git Commit: e95011fbf800fda9fdef1dc1d5149d62bc017aed

What happened

The execute method calls the hook run method without passing any optional handler parameter to the DbApiHook parent class of JdbcHook.

What you expected to happen

Without the handler, the results of the DbApiHook, and the JdbcOperator, will always be an empty list. In the case of the JdbcOperator, the underlying JayDeBeApi connection uses a handler of the form lambda x: x.fetchall() to return results.

How to reproduce

  1. Using the astro cli
  2. Download postgresql-42.3.0.jar into the working directory
  3. Download zulu11.52.13-ca-jdk11.0.13-linux_x64.tar.gz to the working directory
  4. Copy the following to the Dockerfile
FROM quay.io/astronomer/ap-airflow:2.2.0-buster-onbuild

COPY postgresql-42.3.0.jar /usr/local/airflow/.

USER root
RUN cd /opt && mkdir java
COPY zulu11.52.13-ca-jdk11.0.13-linux_x64.tar.gz /opt/java
RUN cd /opt/java && pwd && ls && tar xfvz ./zulu11.52.13-ca-jdk11.0.13-linux_x64.tar.gz

ENV JAVA_HOME /opt/java/zulu11.52.13-ca-jdk11.0.13-linux_x64
RUN export JAVA_HOME
  1. Copy the following into the airflow_settings.yaml file
airflow:
  connections:
    - conn_id: my_jdbc_connection
      conn_type: jdbc
      conn_host: "jdbc:postgresql://postgres:5432/"
      conn_schema:
      conn_login: postgres
      conn_password: postgres
      conn_port: 
      conn_extra: '{"extra__jdbc__drv_clsname":"org.postgresql.Driver", "extra__jdbc__drv_path":"/usr/local/airflow/postgresql-42.3.0.jar"}'
  1. Copy the following DAG and run it. No data will be passed to XCOM
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.jdbc.operators.jdbc import JdbcOperator

with DAG(
    dag_id='example_jdbc_operator',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    dagrun_timeout=timedelta(minutes=60),
    tags=['example'],
    catchup=False,
) as dag:

    run_this_last = DummyOperator(task_id='run_this_last')

    query_dags_data = JdbcOperator(
        task_id='query_dags_data',
        sql='SELECT dag_id, is_active FROM dag',
        jdbc_conn_id='my_jdbc_connection',
        autocommit=True,
    )

    query_dags_data >> run_this_last 
  1. Copy the following DAG and run it. All DAGs and their active status will be put onto XCOM
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.jdbc.operators.jdbc import JdbcOperator
from airflow.providers.jdbc.hooks.jdbc import JdbcHook


class JdbcHandlerOperator(JdbcOperator):
    def execute(self, context) -> None:
        self.log.info('Executing: %s', self.sql)
        hook = JdbcHook(jdbc_conn_id=self.jdbc_conn_id)
        return hook.run(
            self.sql,
            self.autocommit,
            parameters=self.parameters,
            # Defined by how JayDeBeApi operates 
            handler=lambda x: x.fetchall()
        )


with DAG(
    dag_id='example_jdbc_operator',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    dagrun_timeout=timedelta(minutes=60),
    tags=['example'],
    catchup=False,
) as dag:

    run_this_last = DummyOperator(task_id='run_this_last')

    query_dags_data = JdbcHandlerOperator(
        task_id='query_dags_data',
        sql='SELECT dag_id, is_active FROM dag',
        jdbc_conn_id='my_jdbc_connection',
        autocommit=True,
    )

    query_dags_data >> run_this_last 

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions