-
Notifications
You must be signed in to change notification settings - Fork 199
Description
Trino Python client not identifying the external errors and exiting successfully. Looks like it's not waiting until the end of transaction or relying on transaction status.
Connector: Hive
Trino Version: 356
Trino-python Client: 0.305.0
Steps to Reproduce:
- Create stage Table without partition
CREATE TABLE dl_prod.prod_tmp.trino_python_issue_stg (
id int,
c_name varchar,
file_date varchar
)
WITH (
format = 'ORC',
external_location='hdfs://datalakeprod/data/dl/test/trino_python_issue_stg/'
)
- Insert Data into Stage Table
INSERT INTO dl_prod.prod_tmp.trino_python_issue_stg
(id,c_name,file_date) VALUES(1,'subba','20210811')
,(2,'subba','20210812')
- Create another table with partitions
CREATE TABLE dl_prod.prod_tmp.trino_python_issue (
id int,
c_name varchar,
file_date varchar
)
WITH (
format = 'ORC',
external_location='hdfs://datalakeprod/data/dl/test/trino_python_issue/',
partitioned_by = ARRAY['file_date']
)
-
Enabled Overwrite
set session dl_prod.insert_existing_partitions_behavior = 'overwrite' -
Insert Data into Final Table
INSERT INTO dl_prod.prod_tmp.trino_python_issue
SELECT * FROM dl_prod.prod_tmp.trino_python_issue_stg
6)Drop Final Table
DROP TABLE prod_tmp.trino_python_issue
- Re-Create the final Table
CREATE TABLE dl_prod.prod_tmp.trino_python_issue (
id int,
c_name varchar,
file_date varchar
)
WITH (
format = 'ORC',
external_location='hdfs://datalakeprod/data/dl/test/trino_python_issue/',
partitioned_by = ARRAY['file_date']
)
- Run the below Insert statement. It will fail with external Error
INSERT INTO dl_prod.prod_tmp.trino_python_issue
SELECT * FROM dl_prod.prod_tmp.trino_python_issue_stg
Error in DBeaver:
SQL Error [16777231]: Query failed (#20210812_000114_00000_q5wrz): Unable to rename from hdfs://datalakeprod/tmp/presto/914b6fd7-f658-4a27-a69d-918c74a1bf28/file_date=20210812 to hdfs://datalakeprod/data/dl/test/trino_python_issue/file_date=20210812: target directory already exists
Trino UI:
io.trino.spi.TrinoException: Unable to rename from hdfs://datalakeprod/tmp/presto/36f2a894-77db-4abd-ac01-aa9e9cb2b5da/file_date=20210812 to hdfs://datalakeprod/data/dl/test/trino_python_issue/file_date=20210812: target directory already exists
at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.renameDirectory(SemiTransactionalHiveMetastore.java:2530)
at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore$Committer.prepareAddPartition(SemiTransactionalHiveMetastore.java:1858)
at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.commitShared(SemiTransactionalHiveMetastore.java:1384)
at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.commit(SemiTransactionalHiveMetastore.java:1162)
at io.trino.plugin.hive.HiveMetadata.commit(HiveMetadata.java:2988)
at io.trino.plugin.hive.HiveConnector.commit(HiveConnector.java:225)
at io.trino.transaction.InMemoryTransactionManager$TransactionMetadata$ConnectorTransactionMetadata.commit(InMemoryTransactionManager.java:594)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:80)
at io.trino.$gen.Trino_356____20210811_064245_2.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
- Run the same Insert statement using python script. It returns output 2 and exit successfully.
If you check the Trino UI. You can see the external error and No data loaded into Final table.
Output:
[2]
Load Time: --- 6.178645610809326 seconds ---
Trino UI:
io.trino.spi.TrinoException: Unable to rename from hdfs://datalakeprod/tmp/presto/c0a35086-3750-4b4d-9f28-d3b92734303d/file_date=20210812 to hdfs://datalakeprod/data/dl/test/trino_python_issue/file_date=20210812: target directory already exists
at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.renameDirectory(SemiTransactionalHiveMetastore.java:2530)
at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore$Committer.prepareAddPartition(SemiTransactionalHiveMetastore.java:1858)
at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.commitShared(SemiTransactionalHiveMetastore.java:1384)
at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.commit(SemiTransactionalHiveMetastore.java:1162)
at io.trino.plugin.hive.HiveMetadata.commit(HiveMetadata.java:2988)
at io.trino.plugin.hive.HiveConnector.commit(HiveConnector.java:225)
at io.trino.transaction.InMemoryTransactionManager$TransactionMetadata$ConnectorTransactionMetadata.commit(InMemoryTransactionManager.java:594)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:80)
at io.trino.$gen.Trino_356____20210811_064245_2.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Python Code:
import sys, os, time, datetime, platform
from datetime import date
from requests.auth import HTTPBasicAuth
import requests.packages.urllib3 as urllib3
urllib3.disable_warnings()
import trino
def trino_query_runner(query):
TRINO_CONN._http_session.verify = False
cd=date.today()
start_time = time.time()
cur_trino = TRINO_CONN.cursor()
cur_trino.execute("set session dl_prod.insert_existing_partitions_behavior = 'overwrite'")
rows = cur_trino.fetchone()
cur_trino.execute(query)
rows = cur_trino.fetchone()
print(rows)
print("Load Time: --- %s seconds ---" % (time.time() - start_time))
def trino_python_connector_issue():
INST_SQL = """
INSERT INTO dl_prod.prod_tmp.trino_python_issue
SELECT * FROM dl_prod.prod_tmp.trino_python_issue_stg
"""
trino_query_runner(INST_SQL)
if __name__ == "__main__":
if platform.system() == 'Linux':
trino_python_connector_issue()
elif platform.system() == 'Windows':
TRINO_CONN=trino.dbapi.connect(
host='trino.dl.corp.test.com',
port=443,
user='svc-dl-db-full',
catalog='dl_prod',
schema='dbo',
http_scheme='https',
http_headers={'X-Trino-Time-Zone':'America/New_York'},
auth=trino.auth.BasicAuthentication("svc-dl-db-full", ""),
)
trino_python_connector_issue()