Skip to content

[Bug] [SQL] PL/pgSQL scripts in version 3.x were incorrectly parsed, causing them to not run properly. #17040

@chaz6chez

Description

@chaz6chez

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

When I select the PostgreSQL data source in my workflow and use the non-query type to execute the following PL/pgSQL script:

DO 
$DO$
    DECLARE
        offset_value INTEGER := 0;
        fetch_size INTEGER := 10000;
        cur RECORD;
    BEGIN
        LOOP
            FOR cur IN
                SELECT
                    (adsp::json ->> 'merchantid')                                              as merchantid,
                    COUNT(DISTINCT CASE WHEN event_code = 'merchant_pvv' THEN utdid END)       AS exposure_num,
                    COUNT(CASE WHEN event_code = 'merchant_pvv' THEN __time__ END)             AS exposure_cnt,
                    COUNT(DISTINCT CASE WHEN event_code = 'merchant_click' THEN utdid END)     AS visit_num,
                    COUNT(CASE WHEN event_code = 'merchant_click' THEN __time__ END)           AS visit_cnt,
                    COUNT(DISTINCT CASE WHEN event_code = 'merchant_collect' THEN utdid END)   AS collect_num,
                    COUNT(DISTINCT CASE WHEN event_code = 'merchant_intention' THEN utdid END) AS intention_num,
                    DATE_TRUNC('day', TO_TIMESTAMP(__time__))                                  AS tag,
                    'daily'                                                                    AS type
                FROM public.djdj_qt_log_holo
                WHERE
                    TO_TIMESTAMP(__time__) >= TO_TIMESTAMP(EXTRACT(EPOCH FROM CURRENT_DATE - INTERVAL '1 day')) AND
                    TO_TIMESTAMP(__time__) < TO_TIMESTAMP(EXTRACT(EPOCH FROM CURRENT_DATE))

                GROUP BY merchantid, tag
                ORDER BY merchantid, tag
                LIMIT fetch_size OFFSET offset_value
                LOOP
                    IF cur.merchantid IS NOT NULL THEN
                        INSERT INTO public.multi_app_statistics (
                            merchantid,
                            exposure_num,
                            exposure_cnt,
                            visit_num,
                            visit_cnt,
                            collect_num,
                            intention_num,
                            tag,
                            type
                        )
                        VALUES (
                                   cur.merchantid,
                                   cur.exposure_num,
                                   cur.exposure_cnt,
                                   cur.visit_num,
                                   cur.visit_cnt,
                                   cur.collect_num,
                                   cur.intention_num,
                                   cur.tag,
                                   cur.type
                               )
                        ON CONFLICT (merchantid, type, tag) DO UPDATE
                            SET
                                exposure_num = EXCLUDED.exposure_num,
                                exposure_cnt = EXCLUDED.exposure_cnt,
                                visit_num = EXCLUDED.visit_num,
                                visit_cnt = EXCLUDED.visit_cnt,
                                collect_num = EXCLUDED.collect_num,
                                intention_num = EXCLUDED.intention_num,
                                type = EXCLUDED.type;
                    END IF;
                END LOOP;

            offset_value := offset_value + fetch_size;
            IF NOT FOUND THEN
                EXIT;
            END IF;
        END LOOP;
    END 
$DO$;

It fails to run.

What you expected to happen

According to the logs, I think the issue was caused by splitting the complete PL/pgSQL error during SQL parsing.

[INFO] 2025-03-03 13:27:12.221 +0800 - sql type : POSTGRESQL, datasource : 2, sql : DO
$DO$
    DECLARE
        offset_value INTEGER := 0; 
        fetch_size   INTEGER := 10000; 
        cur          RECORD; 
    BEGIN
        LOOP
            FOR cur IN
                SELECT (adsp::json ->> 'merchantid')                                              as merchantid,
                       COUNT(DISTINCT CASE WHEN event_code = 'merchant_pvv' THEN utdid END)       AS exposure_num,
                       COUNT(CASE WHEN event_code = 'merchant_pvv' THEN __time__ END)             AS exposure_cnt,
                       COUNT(DISTINCT CASE WHEN event_code = 'merchant_click' THEN utdid END)     AS visit_num,
                       COUNT(CASE WHEN event_code = 'merchant_click' THEN __time__ END)           AS visit_cnt,
                       COUNT(DISTINCT CASE WHEN event_code = 'merchant_collect' THEN utdid END)   AS collect_num,
                       COUNT(DISTINCT CASE WHEN event_code = 'merchant_intention' THEN utdid END) AS intention_num,
                       DATE_TRUNC('day', TO_TIMESTAMP(__time__))                                  AS tag,
                       'daily'                                                                    AS type
                FROM public.djdj_qt_log_holo
                GROUP BY merchantid, tag
                ORDER BY merchantid, tag
                LIMIT fetch_size OFFSET offset_value
                LOOP
                    IF cur.merchantid IS NOT NULL THEN
                        INSERT INTO public.multi_app_statistics (merchantid,
                                                                 exposure_num,
                                                                 exposure_cnt,
                                                                 visit_num,
                                                                 visit_cnt,
                                                                 collect_num,
                                                                 intention_num,
                                                                 tag,
                                                                 type)
                        VALUES (cur.merchantid,
                                cur.exposure_num,
                                cur.exposure_cnt,
                                cur.visit_num,
                                cur.visit_cnt,
                                cur.collect_num,
                                cur.intention_num,
                                cur.tag,
                                cur.type)
                        ON CONFLICT (merchantid, type, tag) DO UPDATE
                            SET exposure_num  = EXCLUDED.exposure_num,
                                exposure_cnt  = EXCLUDED.exposure_cnt,
                                visit_num     = EXCLUDED.visit_num,
                                visit_cnt     = EXCLUDED.visit_cnt,
                                collect_num   = EXCLUDED.collect_num,
                                intention_num = EXCLUDED.intention_num,
                                type          = EXCLUDED.type; 
                    END IF; 
                END LOOP; 
            offset_value := offset_value + fetch_size; 
            IF NOT FOUND THEN
                EXIT; 
            END IF; 
        END LOOP; 
    END
$DO$;  , localParams : [],udfs : null,showType : null,connParams : null,varPool : [] ,query max result limit  0
[INFO] 2025-03-03 13:27:12.222 +0800 - after replace sql , preparing : DO
$DO$
    DECLARE
        offset_value INTEGER := 0
[INFO] 2025-03-03 13:27:12.222 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.222 +0800 - after replace sql , preparing : fetch_size   INTEGER := 10000
[INFO] 2025-03-03 13:27:12.222 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.222 +0800 - after replace sql , preparing : cur          RECORD
[INFO] 2025-03-03 13:27:12.222 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.223 +0800 - after replace sql , preparing : BEGIN
        LOOP
            FOR cur IN
                SELECT (adsp::json ->> 'merchantid')                                              as merchantid,
                       COUNT(DISTINCT CASE WHEN event_code = 'merchant_pvv' THEN utdid END)       AS exposure_num,
                       COUNT(CASE WHEN event_code = 'merchant_pvv' THEN __time__ END)             AS exposure_cnt,
                       COUNT(DISTINCT CASE WHEN event_code = 'merchant_click' THEN utdid END)     AS visit_num,
                       COUNT(CASE WHEN event_code = 'merchant_click' THEN __time__ END)           AS visit_cnt,
                       COUNT(DISTINCT CASE WHEN event_code = 'merchant_collect' THEN utdid END)   AS collect_num,
                       COUNT(DISTINCT CASE WHEN event_code = 'merchant_intention' THEN utdid END) AS intention_num,
                       DATE_TRUNC('day', TO_TIMESTAMP(__time__))                                  AS tag,
                       'daily'                                                                    AS type
                FROM public.djdj_qt_log_holo
                GROUP BY merchantid, tag
                ORDER BY merchantid, tag
                LIMIT fetch_size OFFSET offset_value
                LOOP
                    IF cur.merchantid IS NOT NULL THEN
                        INSERT INTO public.multi_app_statistics (merchantid,
                                                                 exposure_num,
                                                                 exposure_cnt,
                                                                 visit_num,
                                                                 visit_cnt,
                                                                 collect_num,
                                                                 intention_num,
                                                                 tag,
                                                                 type)
                        VALUES (cur.merchantid,
                                cur.exposure_num,
                                cur.exposure_cnt,
                                cur.visit_num,
                                cur.visit_cnt,
                                cur.collect_num,
                                cur.intention_num,
                                cur.tag,
                                cur.type)
                        ON CONFLICT (merchantid, type, tag) DO UPDATE
                            SET exposure_num  = EXCLUDED.exposure_num,
                                exposure_cnt  = EXCLUDED.exposure_cnt,
                                visit_num     = EXCLUDED.visit_num,
                                visit_cnt     = EXCLUDED.visit_cnt,
                                collect_num   = EXCLUDED.collect_num,
                                intention_num = EXCLUDED.intention_num,
                                type          = EXCLUDED.type
[INFO] 2025-03-03 13:27:12.223 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.223 +0800 - after replace sql , preparing : END IF
[INFO] 2025-03-03 13:27:12.223 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.223 +0800 - after replace sql , preparing : END LOOP
[INFO] 2025-03-03 13:27:12.224 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.224 +0800 - after replace sql , preparing : offset_value := offset_value + fetch_size
[INFO] 2025-03-03 13:27:12.224 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.224 +0800 - after replace sql , preparing : IF NOT FOUND THEN
                EXIT
[INFO] 2025-03-03 13:27:12.224 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.224 +0800 - after replace sql , preparing : END IF
[INFO] 2025-03-03 13:27:12.224 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.224 +0800 - after replace sql , preparing : END LOOP
[INFO] 2025-03-03 13:27:12.224 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.224 +0800 - after replace sql , preparing : END
$DO$
[INFO] 2025-03-03 13:27:12.224 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.224 +0800 - can't find udf function resource
[WARN] 2025-03-03 13:27:12.225 +0800 - Connect strings must start with jdbc:snowflake://
[ERROR] 2025-03-03 13:27:12.325 +0800 - execute sql error: SQL task prepareStatementAndBind error
[ERROR] 2025-03-03 13:27:12.325 +0800 - sql task error
org.apache.dolphinscheduler.plugin.task.api.TaskException: SQL task prepareStatementAndBind error
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.prepareStatementAndBind(SqlTask.java:396)
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.executeQuery(SqlTask.java:315)
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.executeFuncAndSql(SqlTask.java:205)
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.handle(SqlTask.java:159)
	at org.apache.dolphinscheduler.server.worker.runner.DefaultWorkerTaskExecutor.executeTask(DefaultWorkerTaskExecutor.java:54)
	at org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor.run(WorkerTaskExecutor.java:175)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.postgresql.util.PSQLException: Unterminated dollar quote started at position 4 in SQL DO
$DO$
    DECLARE
        offset_value INTEGER := 0. Expected terminating $$
	at org.postgresql.core.Parser.checkParsePosition(Parser.java:1380)
	at org.postgresql.core.Parser.parseSql(Parser.java:1279)
	at org.postgresql.core.Parser.replaceProcessing(Parser.java:1231)
	at org.postgresql.core.CachedQueryCreateAction.create(CachedQueryCreateAction.java:43)
	at org.postgresql.core.CachedQueryCreateAction.create(CachedQueryCreateAction.java:19)
	at org.postgresql.util.LruCache.borrow(LruCache.java:123)
	at org.postgresql.core.QueryExecutorBase.borrowQuery(QueryExecutorBase.java:296)
	at org.postgresql.jdbc.PgConnection.borrowQuery(PgConnection.java:196)
	at org.postgresql.jdbc.PgPreparedStatement.<init>(PgPreparedStatement.java:88)
	at org.postgresql.jdbc.PgConnection.prepareStatement(PgConnection.java:1325)
	at org.postgresql.jdbc.PgConnection.prepareStatement(PgConnection.java:1779)
	at org.postgresql.jdbc.PgConnection.prepareStatement(PgConnection.java:454)
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.prepareStatementAndBind(SqlTask.java:380)
	... 8 common frames omitted
[ERROR] 2025-03-03 13:27:12.325 +0800 - Task execute failed, due to meet an exception
org.apache.dolphinscheduler.plugin.task.api.TaskException: Execute sql task failed
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.handle(SqlTask.java:166)
	at org.apache.dolphinscheduler.server.worker.runner.DefaultWorkerTaskExecutor.executeTask(DefaultWorkerTaskExecutor.java:54)
	at org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor.run(WorkerTaskExecutor.java:175)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.dolphinscheduler.plugin.task.api.TaskException: SQL task prepareStatementAndBind error
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.prepareStatementAndBind(SqlTask.java:396)
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.executeQuery(SqlTask.java:315)
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.executeFuncAndSql(SqlTask.java:205)
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.handle(SqlTask.java:159)
	... 5 common frames omitted
Caused by: org.postgresql.util.PSQLException: Unterminated dollar quote started at position 4 in SQL DO
$DO$
    DECLARE
        offset_value INTEGER := 0. Expected terminating $$
	at org.postgresql.core.Parser.checkParsePosition(Parser.java:1380)
	at org.postgresql.core.Parser.parseSql(Parser.java:1279)
	at org.postgresql.core.Parser.replaceProcessing(Parser.java:1231)
	at org.postgresql.core.CachedQueryCreateAction.create(CachedQueryCreateAction.java:43)
	at org.postgresql.core.CachedQueryCreateAction.create(CachedQueryCreateAction.java:19)
	at org.postgresql.util.LruCache.borrow(LruCache.java:123)
	at org.postgresql.core.QueryExecutorBase.borrowQuery(QueryExecutorBase.java:296)
	at org.postgresql.jdbc.PgConnection.borrowQuery(PgConnection.java:196)
	at org.postgresql.jdbc.PgPreparedStatement.<init>(PgPreparedStatement.java:88)
	at org.postgresql.jdbc.PgConnection.prepareStatement(PgConnection.java:1325)
	at org.postgresql.jdbc.PgConnection.prepareStatement(PgConnection.java:1779)
	at org.postgresql.jdbc.PgConnection.prepareStatement(PgConnection.java:454)
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.prepareStatementAndBind(SqlTask.java:380)
	... 8 common frames omitted
[INFO] 2025-03-03 13:27:12.325 +0800 - Get appIds from worker dolphinscheduler-worker-2.dolphinscheduler-worker-headless:1234, taskLogPath: /opt/dolphinscheduler/logs/20250303/16708144210528/8/31/43.log
[INFO] 2025-03-03 13:27:12.326 +0800 - Start finding appId in /opt/dolphinscheduler/logs/20250303/16708144210528/8/31/43.log, fetch way: log 
[INFO] 2025-03-03 13:27:12.326 +0800 - The appId is empty
[INFO] 2025-03-03 13:27:12.326 +0800 - Cancel the task successfully
[INFO] 2025-03-03 13:27:12.327 +0800 - Get a exception when execute the task, will send the task status: FAILURE to master: dolphinscheduler-worker-2.dolphinscheduler-worker-headless:1234
[INFO] 2025-03-03 13:27:12.327 +0800 - FINALIZE_SESSION

How to reproduce

The issue can be reproduced simply by having multiple semicolons in the non-query SQL that gets executed.

Anything else

No response

Version

3.2.x

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions