Skip to content

[Bug] [SQL] PL/pgSQL scripts in version 3.x were incorrectly parsed, causing them to not run properly. #17040

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
2 of 3 tasks
chaz6chez opened this issue Mar 4, 2025 · 5 comments
Assignees
Labels
bug Something isn't working priority:middle
Milestone

Comments

@chaz6chez
Copy link

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

When I select the PostgreSQL data source in my workflow and use the non-query type to execute the following PL/pgSQL script:

DO 
$DO$
    DECLARE
        offset_value INTEGER := 0;
        fetch_size INTEGER := 10000;
        cur RECORD;
    BEGIN
        LOOP
            FOR cur IN
                SELECT
                    (adsp::json ->> 'merchantid')                                              as merchantid,
                    COUNT(DISTINCT CASE WHEN event_code = 'merchant_pvv' THEN utdid END)       AS exposure_num,
                    COUNT(CASE WHEN event_code = 'merchant_pvv' THEN __time__ END)             AS exposure_cnt,
                    COUNT(DISTINCT CASE WHEN event_code = 'merchant_click' THEN utdid END)     AS visit_num,
                    COUNT(CASE WHEN event_code = 'merchant_click' THEN __time__ END)           AS visit_cnt,
                    COUNT(DISTINCT CASE WHEN event_code = 'merchant_collect' THEN utdid END)   AS collect_num,
                    COUNT(DISTINCT CASE WHEN event_code = 'merchant_intention' THEN utdid END) AS intention_num,
                    DATE_TRUNC('day', TO_TIMESTAMP(__time__))                                  AS tag,
                    'daily'                                                                    AS type
                FROM public.djdj_qt_log_holo
                WHERE
                    TO_TIMESTAMP(__time__) >= TO_TIMESTAMP(EXTRACT(EPOCH FROM CURRENT_DATE - INTERVAL '1 day')) AND
                    TO_TIMESTAMP(__time__) < TO_TIMESTAMP(EXTRACT(EPOCH FROM CURRENT_DATE))

                GROUP BY merchantid, tag
                ORDER BY merchantid, tag
                LIMIT fetch_size OFFSET offset_value
                LOOP
                    IF cur.merchantid IS NOT NULL THEN
                        INSERT INTO public.multi_app_statistics (
                            merchantid,
                            exposure_num,
                            exposure_cnt,
                            visit_num,
                            visit_cnt,
                            collect_num,
                            intention_num,
                            tag,
                            type
                        )
                        VALUES (
                                   cur.merchantid,
                                   cur.exposure_num,
                                   cur.exposure_cnt,
                                   cur.visit_num,
                                   cur.visit_cnt,
                                   cur.collect_num,
                                   cur.intention_num,
                                   cur.tag,
                                   cur.type
                               )
                        ON CONFLICT (merchantid, type, tag) DO UPDATE
                            SET
                                exposure_num = EXCLUDED.exposure_num,
                                exposure_cnt = EXCLUDED.exposure_cnt,
                                visit_num = EXCLUDED.visit_num,
                                visit_cnt = EXCLUDED.visit_cnt,
                                collect_num = EXCLUDED.collect_num,
                                intention_num = EXCLUDED.intention_num,
                                type = EXCLUDED.type;
                    END IF;
                END LOOP;

            offset_value := offset_value + fetch_size;
            IF NOT FOUND THEN
                EXIT;
            END IF;
        END LOOP;
    END 
$DO$;

It fails to run.

What you expected to happen

According to the logs, I think the issue was caused by splitting the complete PL/pgSQL error during SQL parsing.

[INFO] 2025-03-03 13:27:12.221 +0800 - sql type : POSTGRESQL, datasource : 2, sql : DO
$DO$
    DECLARE
        offset_value INTEGER := 0; 
        fetch_size   INTEGER := 10000; 
        cur          RECORD; 
    BEGIN
        LOOP
            FOR cur IN
                SELECT (adsp::json ->> 'merchantid')                                              as merchantid,
                       COUNT(DISTINCT CASE WHEN event_code = 'merchant_pvv' THEN utdid END)       AS exposure_num,
                       COUNT(CASE WHEN event_code = 'merchant_pvv' THEN __time__ END)             AS exposure_cnt,
                       COUNT(DISTINCT CASE WHEN event_code = 'merchant_click' THEN utdid END)     AS visit_num,
                       COUNT(CASE WHEN event_code = 'merchant_click' THEN __time__ END)           AS visit_cnt,
                       COUNT(DISTINCT CASE WHEN event_code = 'merchant_collect' THEN utdid END)   AS collect_num,
                       COUNT(DISTINCT CASE WHEN event_code = 'merchant_intention' THEN utdid END) AS intention_num,
                       DATE_TRUNC('day', TO_TIMESTAMP(__time__))                                  AS tag,
                       'daily'                                                                    AS type
                FROM public.djdj_qt_log_holo
                GROUP BY merchantid, tag
                ORDER BY merchantid, tag
                LIMIT fetch_size OFFSET offset_value
                LOOP
                    IF cur.merchantid IS NOT NULL THEN
                        INSERT INTO public.multi_app_statistics (merchantid,
                                                                 exposure_num,
                                                                 exposure_cnt,
                                                                 visit_num,
                                                                 visit_cnt,
                                                                 collect_num,
                                                                 intention_num,
                                                                 tag,
                                                                 type)
                        VALUES (cur.merchantid,
                                cur.exposure_num,
                                cur.exposure_cnt,
                                cur.visit_num,
                                cur.visit_cnt,
                                cur.collect_num,
                                cur.intention_num,
                                cur.tag,
                                cur.type)
                        ON CONFLICT (merchantid, type, tag) DO UPDATE
                            SET exposure_num  = EXCLUDED.exposure_num,
                                exposure_cnt  = EXCLUDED.exposure_cnt,
                                visit_num     = EXCLUDED.visit_num,
                                visit_cnt     = EXCLUDED.visit_cnt,
                                collect_num   = EXCLUDED.collect_num,
                                intention_num = EXCLUDED.intention_num,
                                type          = EXCLUDED.type; 
                    END IF; 
                END LOOP; 
            offset_value := offset_value + fetch_size; 
            IF NOT FOUND THEN
                EXIT; 
            END IF; 
        END LOOP; 
    END
$DO$;  , localParams : [],udfs : null,showType : null,connParams : null,varPool : [] ,query max result limit  0
[INFO] 2025-03-03 13:27:12.222 +0800 - after replace sql , preparing : DO
$DO$
    DECLARE
        offset_value INTEGER := 0
[INFO] 2025-03-03 13:27:12.222 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.222 +0800 - after replace sql , preparing : fetch_size   INTEGER := 10000
[INFO] 2025-03-03 13:27:12.222 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.222 +0800 - after replace sql , preparing : cur          RECORD
[INFO] 2025-03-03 13:27:12.222 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.223 +0800 - after replace sql , preparing : BEGIN
        LOOP
            FOR cur IN
                SELECT (adsp::json ->> 'merchantid')                                              as merchantid,
                       COUNT(DISTINCT CASE WHEN event_code = 'merchant_pvv' THEN utdid END)       AS exposure_num,
                       COUNT(CASE WHEN event_code = 'merchant_pvv' THEN __time__ END)             AS exposure_cnt,
                       COUNT(DISTINCT CASE WHEN event_code = 'merchant_click' THEN utdid END)     AS visit_num,
                       COUNT(CASE WHEN event_code = 'merchant_click' THEN __time__ END)           AS visit_cnt,
                       COUNT(DISTINCT CASE WHEN event_code = 'merchant_collect' THEN utdid END)   AS collect_num,
                       COUNT(DISTINCT CASE WHEN event_code = 'merchant_intention' THEN utdid END) AS intention_num,
                       DATE_TRUNC('day', TO_TIMESTAMP(__time__))                                  AS tag,
                       'daily'                                                                    AS type
                FROM public.djdj_qt_log_holo
                GROUP BY merchantid, tag
                ORDER BY merchantid, tag
                LIMIT fetch_size OFFSET offset_value
                LOOP
                    IF cur.merchantid IS NOT NULL THEN
                        INSERT INTO public.multi_app_statistics (merchantid,
                                                                 exposure_num,
                                                                 exposure_cnt,
                                                                 visit_num,
                                                                 visit_cnt,
                                                                 collect_num,
                                                                 intention_num,
                                                                 tag,
                                                                 type)
                        VALUES (cur.merchantid,
                                cur.exposure_num,
                                cur.exposure_cnt,
                                cur.visit_num,
                                cur.visit_cnt,
                                cur.collect_num,
                                cur.intention_num,
                                cur.tag,
                                cur.type)
                        ON CONFLICT (merchantid, type, tag) DO UPDATE
                            SET exposure_num  = EXCLUDED.exposure_num,
                                exposure_cnt  = EXCLUDED.exposure_cnt,
                                visit_num     = EXCLUDED.visit_num,
                                visit_cnt     = EXCLUDED.visit_cnt,
                                collect_num   = EXCLUDED.collect_num,
                                intention_num = EXCLUDED.intention_num,
                                type          = EXCLUDED.type
[INFO] 2025-03-03 13:27:12.223 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.223 +0800 - after replace sql , preparing : END IF
[INFO] 2025-03-03 13:27:12.223 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.223 +0800 - after replace sql , preparing : END LOOP
[INFO] 2025-03-03 13:27:12.224 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.224 +0800 - after replace sql , preparing : offset_value := offset_value + fetch_size
[INFO] 2025-03-03 13:27:12.224 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.224 +0800 - after replace sql , preparing : IF NOT FOUND THEN
                EXIT
[INFO] 2025-03-03 13:27:12.224 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.224 +0800 - after replace sql , preparing : END IF
[INFO] 2025-03-03 13:27:12.224 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.224 +0800 - after replace sql , preparing : END LOOP
[INFO] 2025-03-03 13:27:12.224 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.224 +0800 - after replace sql , preparing : END
$DO$
[INFO] 2025-03-03 13:27:12.224 +0800 - Sql Params are replaced sql , parameters:
[INFO] 2025-03-03 13:27:12.224 +0800 - can't find udf function resource
[WARN] 2025-03-03 13:27:12.225 +0800 - Connect strings must start with jdbc:snowflake://
[ERROR] 2025-03-03 13:27:12.325 +0800 - execute sql error: SQL task prepareStatementAndBind error
[ERROR] 2025-03-03 13:27:12.325 +0800 - sql task error
org.apache.dolphinscheduler.plugin.task.api.TaskException: SQL task prepareStatementAndBind error
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.prepareStatementAndBind(SqlTask.java:396)
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.executeQuery(SqlTask.java:315)
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.executeFuncAndSql(SqlTask.java:205)
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.handle(SqlTask.java:159)
	at org.apache.dolphinscheduler.server.worker.runner.DefaultWorkerTaskExecutor.executeTask(DefaultWorkerTaskExecutor.java:54)
	at org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor.run(WorkerTaskExecutor.java:175)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.postgresql.util.PSQLException: Unterminated dollar quote started at position 4 in SQL DO
$DO$
    DECLARE
        offset_value INTEGER := 0. Expected terminating $$
	at org.postgresql.core.Parser.checkParsePosition(Parser.java:1380)
	at org.postgresql.core.Parser.parseSql(Parser.java:1279)
	at org.postgresql.core.Parser.replaceProcessing(Parser.java:1231)
	at org.postgresql.core.CachedQueryCreateAction.create(CachedQueryCreateAction.java:43)
	at org.postgresql.core.CachedQueryCreateAction.create(CachedQueryCreateAction.java:19)
	at org.postgresql.util.LruCache.borrow(LruCache.java:123)
	at org.postgresql.core.QueryExecutorBase.borrowQuery(QueryExecutorBase.java:296)
	at org.postgresql.jdbc.PgConnection.borrowQuery(PgConnection.java:196)
	at org.postgresql.jdbc.PgPreparedStatement.<init>(PgPreparedStatement.java:88)
	at org.postgresql.jdbc.PgConnection.prepareStatement(PgConnection.java:1325)
	at org.postgresql.jdbc.PgConnection.prepareStatement(PgConnection.java:1779)
	at org.postgresql.jdbc.PgConnection.prepareStatement(PgConnection.java:454)
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.prepareStatementAndBind(SqlTask.java:380)
	... 8 common frames omitted
[ERROR] 2025-03-03 13:27:12.325 +0800 - Task execute failed, due to meet an exception
org.apache.dolphinscheduler.plugin.task.api.TaskException: Execute sql task failed
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.handle(SqlTask.java:166)
	at org.apache.dolphinscheduler.server.worker.runner.DefaultWorkerTaskExecutor.executeTask(DefaultWorkerTaskExecutor.java:54)
	at org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor.run(WorkerTaskExecutor.java:175)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.dolphinscheduler.plugin.task.api.TaskException: SQL task prepareStatementAndBind error
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.prepareStatementAndBind(SqlTask.java:396)
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.executeQuery(SqlTask.java:315)
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.executeFuncAndSql(SqlTask.java:205)
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.handle(SqlTask.java:159)
	... 5 common frames omitted
Caused by: org.postgresql.util.PSQLException: Unterminated dollar quote started at position 4 in SQL DO
$DO$
    DECLARE
        offset_value INTEGER := 0. Expected terminating $$
	at org.postgresql.core.Parser.checkParsePosition(Parser.java:1380)
	at org.postgresql.core.Parser.parseSql(Parser.java:1279)
	at org.postgresql.core.Parser.replaceProcessing(Parser.java:1231)
	at org.postgresql.core.CachedQueryCreateAction.create(CachedQueryCreateAction.java:43)
	at org.postgresql.core.CachedQueryCreateAction.create(CachedQueryCreateAction.java:19)
	at org.postgresql.util.LruCache.borrow(LruCache.java:123)
	at org.postgresql.core.QueryExecutorBase.borrowQuery(QueryExecutorBase.java:296)
	at org.postgresql.jdbc.PgConnection.borrowQuery(PgConnection.java:196)
	at org.postgresql.jdbc.PgPreparedStatement.<init>(PgPreparedStatement.java:88)
	at org.postgresql.jdbc.PgConnection.prepareStatement(PgConnection.java:1325)
	at org.postgresql.jdbc.PgConnection.prepareStatement(PgConnection.java:1779)
	at org.postgresql.jdbc.PgConnection.prepareStatement(PgConnection.java:454)
	at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.prepareStatementAndBind(SqlTask.java:380)
	... 8 common frames omitted
[INFO] 2025-03-03 13:27:12.325 +0800 - Get appIds from worker dolphinscheduler-worker-2.dolphinscheduler-worker-headless:1234, taskLogPath: /opt/dolphinscheduler/logs/20250303/16708144210528/8/31/43.log
[INFO] 2025-03-03 13:27:12.326 +0800 - Start finding appId in /opt/dolphinscheduler/logs/20250303/16708144210528/8/31/43.log, fetch way: log 
[INFO] 2025-03-03 13:27:12.326 +0800 - The appId is empty
[INFO] 2025-03-03 13:27:12.326 +0800 - Cancel the task successfully
[INFO] 2025-03-03 13:27:12.327 +0800 - Get a exception when execute the task, will send the task status: FAILURE to master: dolphinscheduler-worker-2.dolphinscheduler-worker-headless:1234
[INFO] 2025-03-03 13:27:12.327 +0800 - FINALIZE_SESSION

How to reproduce

The issue can be reproduced simply by having multiple semicolons in the non-query SQL that gets executed.

Anything else

No response

Version

3.2.x

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@chaz6chez chaz6chez added bug Something isn't working Waiting for reply Waiting for reply labels Mar 4, 2025
@chaz6chez
Copy link
Author

The query type SQL too

@chaz6chez
Copy link
Author

3.1.9 Running Properly

@SbloodyS SbloodyS added help wanted Extra attention is needed and removed Waiting for reply Waiting for reply labels Mar 14, 2025
@DhiyaneshwaranR
Copy link
Contributor

Hi @SbloodyS i can help fix this issue

@SbloodyS
Copy link
Member

SbloodyS commented Apr 4, 2025

Hi @SbloodyS i can help fix this issue

I've assigned to you. Looking forward to your contribution.

DhiyaneshwaranR added a commit to DhiyaneshwaranR/dolphinscheduler that referenced this issue Apr 18, 2025
DhiyaneshwaranR added a commit to DhiyaneshwaranR/dolphinscheduler that referenced this issue Apr 19, 2025
@ruanwenjun ruanwenjun added this to the 3.3.1 milestone Apr 21, 2025
DhiyaneshwaranR added a commit to DhiyaneshwaranR/dolphinscheduler that referenced this issue Apr 21, 2025
DhiyaneshwaranR added a commit to DhiyaneshwaranR/dolphinscheduler that referenced this issue Apr 21, 2025
DhiyaneshwaranR added a commit to DhiyaneshwaranR/dolphinscheduler that referenced this issue Apr 21, 2025
@DhiyaneshwaranR
Copy link
Contributor

Can we close this issue @SbloodyS ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working priority:middle
Projects
None yet
Development

No branches or pull requests

4 participants