-
Notifications
You must be signed in to change notification settings - Fork 8.3k
Distributed JOIN over engine Merge fails with 'Missing columns' errors #11755
Description
When I am trying to run this query:
SELECT * FROM products_d AS products GLOBAL ALL LEFT JOIN categories_d USING (categoryId);The clickhouse throws an exception:
[2020-06-18 17:20:01] Code: 47, e.displayText() = DB::Exception: Missing columns: 'categories_d.dateTime' 'categories_d.errorCode' 'categories_d.processingTimestampNs' 'categoryName' 'productIds' while processing query: 'SELECT dateTime, categoryId, productId, name, errorCode, processingTimestampNs, productIds, categoryName, categories_d.dateTime, categories_d.errorCode, categories_d.processingTimestampNs FROM products_d AS products GLOBAL ALL LEFT JOIN categories_d AS categories_d USING (categoryId)', required columns: 'dateTime' 'errorCode' 'categoryId' 'productId' 'productIds' 'name' 'categoryName' 'categories_d.processingTimestampNs' 'processingTimestampNs' 'categories_d.errorCode' 'categories_d.dateTime', source columns: 'processingTimestampNs' 'name' 'productId' 'categoryId' 'errorCode' 'dateTime', joined columns: (version 20.4.5.36 (official build))
Here are the queries that create 2 tables:
CREATE TABLE IF NOT EXISTS products_data_hist (
dateTime DateTime('UTC'),
categoryId LowCardinality(String),
productId UUID,
name LowCardinality(String),
errorCode UInt8,
processingTimestampNs UInt64
) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%%DATABASE%%/products_data_hist', '{replica}', processingTimestampNs)
PARTITION BY toYYYYMM(dateTime)
ORDER BY (cityHash64(name), toDate(dateTime), productId)
PRIMARY KEY (cityHash64(name), toDate(dateTime));
CREATE TABLE IF NOT EXISTS products_data_d_hist AS products_data_hist ENGINE = Distributed('kafka-cluster', %%DATABASE%%, products_data_hist, sipHash64(toString(productId)));
CREATE TABLE IF NOT EXISTS products_data_d_daily AS products_data_hist ENGINE = Distributed('kafka-cluster', %%DATABASE%%, products_data_hist, sipHash64(toString(productId)));
CREATE TABLE products_d as products_data_d_hist ENGINE=Merge('%%DATABASE%%', '^products_data_d_');CREATE TABLE if NOT EXISTS categories_data_hist (
productIds Array(String),
categoryId UUID,
categoryName String,
dateTime DateTime('UTC'),
errorCode UInt8,
processingTimestampNs UInt64
) ENGINE ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%%DATABASE%%/categories_data_hist', '{replica}', processingTimestampNs)
PARTITION BY toYYYYMM(dateTime)
ORDER BY (cityHash64(categoryName), toDate(dateTime))
PRIMARY KEY (cityHash64(categoryName), toDate(dateTime));
CREATE TABLE IF NOT EXISTS categories_data_d_hist AS categories_data_hist ENGINE = Distributed('kafka-cluster', %%DATABASE%%, categories_data_hist, sipHash64(toString(categoryId)));
CREATE TABLE IF NOT EXISTS categories_data_d_daily AS categories_data_hist ENGINE = Distributed('kafka-cluster', %%DATABASE%%, categories_data_hist, sipHash64(toString(categoryId)));
CREATE TABLE categories_d as categories_data_hist ENGINE=Merge('%%DATABASE%%', '^categories_data_d_');I think the issue is about it is not adding the columns from the right table to the joined table when executing the query.