From 02baf4a2f49b2d977c3f5853f11b032f7d66c718 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 12 May 2026 12:50:30 +0200 Subject: [PATCH 1/2] Fix stuck 'cluster_name_in_settings' --- .../StorageObjectStorageCluster.cpp | 62 +++++------ .../StorageObjectStorageCluster.h | 3 +- .../test_remote_initiator.py | 104 ++++++++++++++++++ 3 files changed, 137 insertions(+), 32 deletions(-) create mode 100644 tests/integration/test_storage_iceberg_with_spark/test_remote_initiator.py diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 8828cf70f1df..e5131d06ae2e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -289,7 +289,7 @@ std::optional StorageObjectStorageCluster::totalBytes(ContextPtr query_c return configuration->totalBytes(query_context); } -void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context, bool make_cluster_function) +bool StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context, bool make_cluster_function) { // Change table engine on table function for distributed request // CREATE TABLE t (...) ENGINE=IcebergS3(...) @@ -300,7 +300,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr auto * select_query = query->as(); if (!select_query || !select_query->tables()) - return; + return false; auto * tables = select_query->tables()->as(); @@ -313,10 +313,10 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr auto * table_expression = tables->children[0]->as()->table_expression->as(); if (!table_expression) - return; + return false; if (!table_expression->database_and_table_name) - return; + return false; auto & table_identifier_typed = table_expression->database_and_table_name->as(); @@ -389,34 +389,34 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr table_expression->table_function = function_ast_ptr; table_expression->children[0] = function_ast_ptr; - if (make_cluster_function) - { - auto cluster_name = getClusterName(context); + if (!make_cluster_function) + return false; - if (cluster_name.empty()) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Can't be here without cluster name, no cluster name in query {}", - query->formatForLogging()); - } + auto cluster_name = getClusterName(context); - auto settings = select_query->settings(); - if (settings) - { - auto & settings_ast = settings->as(); - settings_ast.changes.insertSetting("object_storage_cluster", cluster_name); - } - else - { - auto settings_ast_ptr = make_intrusive(); - settings_ast_ptr->is_standalone = false; - settings_ast_ptr->changes.setSetting("object_storage_cluster", cluster_name); - select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, std::move(settings_ast_ptr)); - } + if (cluster_name.empty()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Can't be here without cluster name, no cluster name in query {}", + query->formatForLogging()); + } - cluster_name_in_settings = true; + auto settings = select_query->settings(); + if (settings) + { + auto & settings_ast = settings->as(); + settings_ast.changes.insertSetting("object_storage_cluster", cluster_name); } + else + { + auto settings_ast_ptr = make_intrusive(); + settings_ast_ptr->is_standalone = false; + settings_ast_ptr->changes.setSetting("object_storage_cluster", cluster_name); + select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, std::move(settings_ast_ptr)); + } + + return true; } void StorageObjectStorageCluster::updateQueryToSendIfNeeded( @@ -425,7 +425,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( const ContextPtr & context, bool make_cluster_function) { - updateQueryForDistributedEngineIfNeeded(query, context, make_cluster_function); + bool cluster_name_added_to_settings = updateQueryForDistributedEngineIfNeeded(query, context, make_cluster_function); auto * table_function = extractTableFunctionFromSelectQuery(query); if (!table_function) @@ -450,7 +450,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( } ASTPtr object_storage_type_arg; - configuration->extractDynamicStorageType(args, context, &object_storage_type_arg, !cluster_name_in_settings); + configuration->extractDynamicStorageType(args, context, &object_storage_type_arg, !cluster_name_in_settings && !cluster_name_added_to_settings); ASTPtr settings_temporary_storage = nullptr; for (auto it = args.begin(); it != args.end(); ++it) @@ -464,7 +464,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( } } - if (cluster_name_in_settings || !endsWith(table_function->name, "Cluster")) + if (cluster_name_in_settings || cluster_name_added_to_settings || !endsWith(table_function->name, "Cluster")) { configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->getFormat(), context, /*with_structure=*/true); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index c168af53c28b..52c7d5951855 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -207,8 +207,9 @@ class StorageObjectStorageCluster : public IStorageCluster to SELECT * FROM s3(...) SETTINGS object_storage_cluster='cluster' to make distributed request over cluster 'cluster'. + Returns true if cluster name was added to settings. */ - void updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context, bool make_cluster_function); + bool updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context, bool make_cluster_function); const String engine_name; StorageObjectStorageConfigurationPtr configuration; diff --git a/tests/integration/test_storage_iceberg_with_spark/test_remote_initiator.py b/tests/integration/test_storage_iceberg_with_spark/test_remote_initiator.py new file mode 100644 index 000000000000..ce02c0cb9e53 --- /dev/null +++ b/tests/integration/test_storage_iceberg_with_spark/test_remote_initiator.py @@ -0,0 +1,104 @@ +import pytest +import uuid + +from helpers.iceberg_utils import ( + get_uuid_str, + create_iceberg_table, + execute_spark_query_general, +) + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_remote_initiator_after_non_remote(started_cluster_iceberg_with_spark, storage_type): + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = "test_remote_initiator_after_non_remote_table_" + get_uuid_str() + VIEW_NAME = TABLE_NAME + "_view" + + def execute_spark_query(query: str): + return execute_spark_query_general( + spark, + started_cluster_iceberg_with_spark, + storage_type, + TABLE_NAME, + query, + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME} ( + tag INT, + number INT + ) + USING iceberg + PARTITIONED BY (identity(tag)) + OPTIONS('format-version'='2') + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (1, 1) + """ + ) + + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark) + + def flush_logs(): + for node in started_cluster_iceberg_with_spark.instances.values(): + node.query("SYSTEM FLUSH LOGS") + + query_id = uuid.uuid4().hex + res = instance.query(f""" + SELECT * + FROM {TABLE_NAME} + WHERE number=1 + SETTINGS + object_storage_cluster='cluster_simple' + """, + query_id = query_id) + assert res == "1\t1\n" + flush_logs() + queries = instance.query(f""" + SELECT count() + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + """) + assert queries == "4\n" + + query_id = uuid.uuid4().hex + res = instance.query(f""" + SELECT * + FROM {TABLE_NAME} + WHERE number=1 + SETTINGS + object_storage_remote_initiator=1, + object_storage_cluster='cluster_simple' + """, + query_id = query_id) + assert res == "1\t1\n" + flush_logs() + queries = instance.query(f""" + SELECT count() + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + """) + assert queries == "6\n" + + query_id = uuid.uuid4().hex + res = instance.query(f""" + SELECT * + FROM {TABLE_NAME} + WHERE number=1 + """, + query_id = query_id) + assert res == "1\t1\n" + flush_logs() + queries = instance.query(f""" + SELECT count() + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + """) + assert queries == "1\n" + From eb71a124244da5522415ea6ada87a9e4d1b8bc25 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 12 May 2026 13:39:44 +0200 Subject: [PATCH 2/2] Fix object_storage_remote_initiator+object_storage_cluster_join_mode=local+allow_experimental_analyzer=0 --- src/Storages/IStorageCluster.cpp | 12 +++-- .../test_remote_initiator.py | 52 ++++++++++++++++++- 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 7ae4d85d92f2..962b123234bb 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -214,6 +214,10 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( { case ObjectStorageClusterJoinMode::LOCAL: { + if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); + auto info = getQueryTreeInfo(query_tree, context); if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) @@ -324,16 +328,16 @@ void IStorageCluster::read( { if (settings[Setting::object_storage_remote_initiator]) { + auto remote_initiator_cluster_name = settings[Setting::object_storage_remote_initiator_cluster].value; + if (remote_initiator_cluster_name.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting 'object_storage_remote_initiator' can be used only with 'object_storage_remote_initiator_cluster' or 'object_storage_cluster'"); + /// rewrite query to execute `remote('remote_host', s3(...))` /// remote_host can execute query itself or make on-cluster query depends on own `object_storage_cluster` setting updateConfigurationIfNeeded(context); updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context); updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context, /*make_cluster_function*/ false); - auto remote_initiator_cluster_name = settings[Setting::object_storage_remote_initiator_cluster].value; - if (remote_initiator_cluster_name.empty()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting 'object_storage_remote_initiator' can be used only with 'object_storage_remote_initiator_cluster' or 'object_storage_cluster'"); - auto remote_initiator_cluster = getClusterImpl(context, remote_initiator_cluster_name); auto storage_and_context = convertToRemote(remote_initiator_cluster, context, remote_initiator_cluster_name, query_to_send); auto src_distributed = std::dynamic_pointer_cast(storage_and_context.storage); diff --git a/tests/integration/test_storage_iceberg_with_spark/test_remote_initiator.py b/tests/integration/test_storage_iceberg_with_spark/test_remote_initiator.py index ce02c0cb9e53..ba0a61f9a998 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_remote_initiator.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_remote_initiator.py @@ -13,7 +13,6 @@ def test_remote_initiator_after_non_remote(started_cluster_iceberg_with_spark, s instance = started_cluster_iceberg_with_spark.instances["node1"] spark = started_cluster_iceberg_with_spark.spark_session TABLE_NAME = "test_remote_initiator_after_non_remote_table_" + get_uuid_str() - VIEW_NAME = TABLE_NAME + "_view" def execute_spark_query(query: str): return execute_spark_query_general( @@ -102,3 +101,54 @@ def flush_logs(): """) assert queries == "1\n" + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_remote_initiator_after_with_join_old_analyzer(started_cluster_iceberg_with_spark, storage_type): + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = "test_remote_initiator_after_with_join_old_analyzer_table_" + get_uuid_str() + TABLE2_NAME = "test_remote_initiator_after_with_join_old_analyzer_table_2_" + get_uuid_str() + + def execute_spark_query(query: str): + return execute_spark_query_general( + spark, + started_cluster_iceberg_with_spark, + storage_type, + TABLE_NAME, + query, + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME} ( + tag INT, + number INT + ) + USING iceberg + PARTITIONED BY (identity(tag)) + OPTIONS('format-version'='2') + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (1, 1) + """ + ) + + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark) + + instance.query(f"CREATE TABLE {TABLE2_NAME} (tag INT, number2 INT) ENGINE=Memory") + instance.query(f"INSERT INTO {TABLE2_NAME} VALUES (1, 2)") + + assert "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true" in instance.query_and_get_error(f""" + SELECT * + FROM {TABLE_NAME} AS t1 + JOIN {TABLE2_NAME} AS t2 USING (tag) + SETTINGS + object_storage_remote_initiator=1, + object_storage_remote_initiator_cluster='cluster_simple', + object_storage_cluster_join_mode='local', + allow_experimental_analyzer=0 + """)