Skip to content

Trino-python-client not returning/capturing External Errors when using fetchone #95

@subbareddydagumati

Description

@subbareddydagumati

Trino Python client not identifying the external errors and exiting successfully. Looks like it's not waiting until the end of transaction or relying on transaction status.

Connector: Hive
Trino Version: 356
Trino-python Client: 0.305.0

Steps to Reproduce:

  1. Create stage Table without partition
CREATE TABLE dl_prod.prod_tmp.trino_python_issue_stg (
id int,
c_name varchar,
file_date varchar
)
WITH (
   format = 'ORC',
        external_location='hdfs://datalakeprod/data/dl/test/trino_python_issue_stg/'
)
  1. Insert Data into Stage Table
INSERT INTO dl_prod.prod_tmp.trino_python_issue_stg 
(id,c_name,file_date) VALUES(1,'subba','20210811')
,(2,'subba','20210812')

  1. Create another table with partitions
CREATE TABLE dl_prod.prod_tmp.trino_python_issue (
id int,
c_name varchar,
file_date varchar
)
WITH (
   format = 'ORC',
      external_location='hdfs://datalakeprod/data/dl/test/trino_python_issue/',
       partitioned_by = ARRAY['file_date'] 
)
  1. Enabled Overwrite
    set session dl_prod.insert_existing_partitions_behavior = 'overwrite'

  2. Insert Data into Final Table

INSERT INTO  dl_prod.prod_tmp.trino_python_issue
SELECT * FROM  dl_prod.prod_tmp.trino_python_issue_stg 

6)Drop Final Table
DROP TABLE prod_tmp.trino_python_issue

  1. Re-Create the final Table
CREATE TABLE dl_prod.prod_tmp.trino_python_issue (
id int,
c_name varchar,
file_date varchar
)
WITH (
   format = 'ORC',
      external_location='hdfs://datalakeprod/data/dl/test/trino_python_issue/',
       partitioned_by = ARRAY['file_date'] 
)
  1. Run the below Insert statement. It will fail with external Error
INSERT INTO  dl_prod.prod_tmp.trino_python_issue
SELECT * FROM  dl_prod.prod_tmp.trino_python_issue_stg 

Error in DBeaver:
SQL Error [16777231]: Query failed (#20210812_000114_00000_q5wrz): Unable to rename from hdfs://datalakeprod/tmp/presto/914b6fd7-f658-4a27-a69d-918c74a1bf28/file_date=20210812 to hdfs://datalakeprod/data/dl/test/trino_python_issue/file_date=20210812: target directory already exists

Trino UI:
io.trino.spi.TrinoException: Unable to rename from hdfs://datalakeprod/tmp/presto/36f2a894-77db-4abd-ac01-aa9e9cb2b5da/file_date=20210812 to hdfs://datalakeprod/data/dl/test/trino_python_issue/file_date=20210812: target directory already exists
	at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.renameDirectory(SemiTransactionalHiveMetastore.java:2530)
	at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore$Committer.prepareAddPartition(SemiTransactionalHiveMetastore.java:1858)
	at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.commitShared(SemiTransactionalHiveMetastore.java:1384)
	at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.commit(SemiTransactionalHiveMetastore.java:1162)
	at io.trino.plugin.hive.HiveMetadata.commit(HiveMetadata.java:2988)
	at io.trino.plugin.hive.HiveConnector.commit(HiveConnector.java:225)
	at io.trino.transaction.InMemoryTransactionManager$TransactionMetadata$ConnectorTransactionMetadata.commit(InMemoryTransactionManager.java:594)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
	at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:80)
	at io.trino.$gen.Trino_356____20210811_064245_2.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
  1. Run the same Insert statement using python script. It returns output 2 and exit successfully.
    If you check the Trino UI. You can see the external error and No data loaded into Final table.

Output:

[2]
Load Time: --- 6.178645610809326 seconds ---

Trino UI:

io.trino.spi.TrinoException: Unable to rename from hdfs://datalakeprod/tmp/presto/c0a35086-3750-4b4d-9f28-d3b92734303d/file_date=20210812 to hdfs://datalakeprod/data/dl/test/trino_python_issue/file_date=20210812: target directory already exists
	at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.renameDirectory(SemiTransactionalHiveMetastore.java:2530)
	at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore$Committer.prepareAddPartition(SemiTransactionalHiveMetastore.java:1858)
	at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.commitShared(SemiTransactionalHiveMetastore.java:1384)
	at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.commit(SemiTransactionalHiveMetastore.java:1162)
	at io.trino.plugin.hive.HiveMetadata.commit(HiveMetadata.java:2988)
	at io.trino.plugin.hive.HiveConnector.commit(HiveConnector.java:225)
	at io.trino.transaction.InMemoryTransactionManager$TransactionMetadata$ConnectorTransactionMetadata.commit(InMemoryTransactionManager.java:594)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
	at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:80)
	at io.trino.$gen.Trino_356____20210811_064245_2.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Python Code:

import sys, os, time, datetime, platform
from datetime import date
from requests.auth import HTTPBasicAuth
import requests.packages.urllib3 as urllib3
urllib3.disable_warnings()
import trino

def trino_query_runner(query):
    TRINO_CONN._http_session.verify = False
    cd=date.today()
    start_time = time.time()
    cur_trino = TRINO_CONN.cursor()
    cur_trino.execute("set session dl_prod.insert_existing_partitions_behavior = 'overwrite'")
    rows = cur_trino.fetchone()
    cur_trino.execute(query)
    rows = cur_trino.fetchone()
    print(rows)
    print("Load Time: --- %s seconds ---" % (time.time() - start_time))

def trino_python_connector_issue():
    INST_SQL = """
    INSERT INTO  dl_prod.prod_tmp.trino_python_issue
    SELECT * FROM  dl_prod.prod_tmp.trino_python_issue_stg 
    """
    trino_query_runner(INST_SQL)


if __name__ == "__main__":
    if platform.system() == 'Linux':
        trino_python_connector_issue()
    elif platform.system() == 'Windows':
        TRINO_CONN=trino.dbapi.connect(
                                host='trino.dl.corp.test.com',
                                port=443,
                                user='svc-dl-db-full',
                                catalog='dl_prod',
                                schema='dbo',
                                http_scheme='https',
                                http_headers={'X-Trino-Time-Zone':'America/New_York'},
                                auth=trino.auth.BasicAuthentication("svc-dl-db-full", ""),
                                )
        trino_python_connector_issue()

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions