Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
{
case ObjectStorageClusterJoinMode::LOCAL:
{
if (!context->getSettingsRef()[Setting::allow_experimental_analyzer])
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");

auto info = getQueryTreeInfo(query_tree, context);

if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
Expand Down Expand Up @@ -324,16 +328,16 @@ void IStorageCluster::read(
{
if (settings[Setting::object_storage_remote_initiator])
{
auto remote_initiator_cluster_name = settings[Setting::object_storage_remote_initiator_cluster].value;
if (remote_initiator_cluster_name.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting 'object_storage_remote_initiator' can be used only with 'object_storage_remote_initiator_cluster' or 'object_storage_cluster'");

/// rewrite query to execute `remote('remote_host', s3(...))`
/// remote_host can execute query itself or make on-cluster query depends on own `object_storage_cluster` setting
updateConfigurationIfNeeded(context);
updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context);
updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context, /*make_cluster_function*/ false);

auto remote_initiator_cluster_name = settings[Setting::object_storage_remote_initiator_cluster].value;
if (remote_initiator_cluster_name.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting 'object_storage_remote_initiator' can be used only with 'object_storage_remote_initiator_cluster' or 'object_storage_cluster'");

auto remote_initiator_cluster = getClusterImpl(context, remote_initiator_cluster_name);
auto storage_and_context = convertToRemote(remote_initiator_cluster, context, remote_initiator_cluster_name, query_to_send);
auto src_distributed = std::dynamic_pointer_cast<StorageDistributed>(storage_and_context.storage);
Expand Down
62 changes: 31 additions & 31 deletions src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ std::optional<UInt64> StorageObjectStorageCluster::totalBytes(ContextPtr query_c
return configuration->totalBytes(query_context);
}

void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context, bool make_cluster_function)
bool StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context, bool make_cluster_function)
{
// Change table engine on table function for distributed request
// CREATE TABLE t (...) ENGINE=IcebergS3(...)
Expand All @@ -300,7 +300,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr

auto * select_query = query->as<ASTSelectQuery>();
if (!select_query || !select_query->tables())
return;
return false;

auto * tables = select_query->tables()->as<ASTTablesInSelectQuery>();

Expand All @@ -313,10 +313,10 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
auto * table_expression = tables->children[0]->as<ASTTablesInSelectQueryElement>()->table_expression->as<ASTTableExpression>();

if (!table_expression)
return;
return false;

if (!table_expression->database_and_table_name)
return;
return false;

auto & table_identifier_typed = table_expression->database_and_table_name->as<ASTTableIdentifier &>();

Expand Down Expand Up @@ -389,34 +389,34 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
table_expression->table_function = function_ast_ptr;
table_expression->children[0] = function_ast_ptr;

if (make_cluster_function)
{
auto cluster_name = getClusterName(context);
if (!make_cluster_function)
return false;

if (cluster_name.empty())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Can't be here without cluster name, no cluster name in query {}",
query->formatForLogging());
}
auto cluster_name = getClusterName(context);

auto settings = select_query->settings();
if (settings)
{
auto & settings_ast = settings->as<ASTSetQuery &>();
settings_ast.changes.insertSetting("object_storage_cluster", cluster_name);
}
else
{
auto settings_ast_ptr = make_intrusive<ASTSetQuery>();
settings_ast_ptr->is_standalone = false;
settings_ast_ptr->changes.setSetting("object_storage_cluster", cluster_name);
select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, std::move(settings_ast_ptr));
}
if (cluster_name.empty())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Can't be here without cluster name, no cluster name in query {}",
query->formatForLogging());
}

cluster_name_in_settings = true;
auto settings = select_query->settings();
if (settings)
{
auto & settings_ast = settings->as<ASTSetQuery &>();
settings_ast.changes.insertSetting("object_storage_cluster", cluster_name);
}
else
{
auto settings_ast_ptr = make_intrusive<ASTSetQuery>();
settings_ast_ptr->is_standalone = false;
settings_ast_ptr->changes.setSetting("object_storage_cluster", cluster_name);
select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, std::move(settings_ast_ptr));
}

return true;
}

void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
Expand All @@ -425,7 +425,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
const ContextPtr & context,
bool make_cluster_function)
{
updateQueryForDistributedEngineIfNeeded(query, context, make_cluster_function);
bool cluster_name_added_to_settings = updateQueryForDistributedEngineIfNeeded(query, context, make_cluster_function);

auto * table_function = extractTableFunctionFromSelectQuery(query);
if (!table_function)
Expand All @@ -450,7 +450,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
}

ASTPtr object_storage_type_arg;
configuration->extractDynamicStorageType(args, context, &object_storage_type_arg, !cluster_name_in_settings);
configuration->extractDynamicStorageType(args, context, &object_storage_type_arg, !cluster_name_in_settings && !cluster_name_added_to_settings);

ASTPtr settings_temporary_storage = nullptr;
for (auto it = args.begin(); it != args.end(); ++it)
Expand All @@ -464,7 +464,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
}
}

if (cluster_name_in_settings || !endsWith(table_function->name, "Cluster"))
if (cluster_name_in_settings || cluster_name_added_to_settings || !endsWith(table_function->name, "Cluster"))
{
configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->getFormat(), context, /*with_structure=*/true);

Expand Down
3 changes: 2 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ class StorageObjectStorageCluster : public IStorageCluster
to
SELECT * FROM s3(...) SETTINGS object_storage_cluster='cluster'
to make distributed request over cluster 'cluster'.
Returns true if cluster name was added to settings.
*/
void updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context, bool make_cluster_function);
bool updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context, bool make_cluster_function);

const String engine_name;
StorageObjectStorageConfigurationPtr configuration;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import pytest
import uuid

from helpers.iceberg_utils import (
get_uuid_str,
create_iceberg_table,
execute_spark_query_general,
)


@pytest.mark.parametrize("storage_type", ["s3"])
def test_remote_initiator_after_non_remote(started_cluster_iceberg_with_spark, storage_type):
instance = started_cluster_iceberg_with_spark.instances["node1"]
spark = started_cluster_iceberg_with_spark.spark_session
TABLE_NAME = "test_remote_initiator_after_non_remote_table_" + get_uuid_str()

def execute_spark_query(query: str):
return execute_spark_query_general(
spark,
started_cluster_iceberg_with_spark,
storage_type,
TABLE_NAME,
query,
)

execute_spark_query(
f"""
CREATE TABLE {TABLE_NAME} (
tag INT,
number INT
)
USING iceberg
PARTITIONED BY (identity(tag))
OPTIONS('format-version'='2')
"""
)

execute_spark_query(
f"""
INSERT INTO {TABLE_NAME} VALUES
(1, 1)
"""
)

create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark)

def flush_logs():
for node in started_cluster_iceberg_with_spark.instances.values():
node.query("SYSTEM FLUSH LOGS")

query_id = uuid.uuid4().hex
res = instance.query(f"""
SELECT *
FROM {TABLE_NAME}
WHERE number=1
SETTINGS
object_storage_cluster='cluster_simple'
""",
query_id = query_id)
assert res == "1\t1\n"
flush_logs()
queries = instance.query(f"""
SELECT count()
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
""")
assert queries == "4\n"

query_id = uuid.uuid4().hex
res = instance.query(f"""
SELECT *
FROM {TABLE_NAME}
WHERE number=1
SETTINGS
object_storage_remote_initiator=1,
object_storage_cluster='cluster_simple'
""",
query_id = query_id)
assert res == "1\t1\n"
flush_logs()
queries = instance.query(f"""
SELECT count()
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
""")
assert queries == "6\n"

query_id = uuid.uuid4().hex
res = instance.query(f"""
SELECT *
FROM {TABLE_NAME}
WHERE number=1
""",
query_id = query_id)
assert res == "1\t1\n"
flush_logs()
queries = instance.query(f"""
SELECT count()
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
""")
assert queries == "1\n"


@pytest.mark.parametrize("storage_type", ["s3"])
def test_remote_initiator_after_with_join_old_analyzer(started_cluster_iceberg_with_spark, storage_type):
instance = started_cluster_iceberg_with_spark.instances["node1"]
spark = started_cluster_iceberg_with_spark.spark_session
TABLE_NAME = "test_remote_initiator_after_with_join_old_analyzer_table_" + get_uuid_str()
TABLE2_NAME = "test_remote_initiator_after_with_join_old_analyzer_table_2_" + get_uuid_str()

def execute_spark_query(query: str):
return execute_spark_query_general(
spark,
started_cluster_iceberg_with_spark,
storage_type,
TABLE_NAME,
query,
)

execute_spark_query(
f"""
CREATE TABLE {TABLE_NAME} (
tag INT,
number INT
)
USING iceberg
PARTITIONED BY (identity(tag))
OPTIONS('format-version'='2')
"""
)

execute_spark_query(
f"""
INSERT INTO {TABLE_NAME} VALUES
(1, 1)
"""
)

create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark)

instance.query(f"CREATE TABLE {TABLE2_NAME} (tag INT, number2 INT) ENGINE=Memory")
instance.query(f"INSERT INTO {TABLE2_NAME} VALUES (1, 2)")

assert "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true" in instance.query_and_get_error(f"""
SELECT *
FROM {TABLE_NAME} AS t1
JOIN {TABLE2_NAME} AS t2 USING (tag)
SETTINGS
object_storage_remote_initiator=1,
object_storage_remote_initiator_cluster='cluster_simple',
object_storage_cluster_join_mode='local',
allow_experimental_analyzer=0
""")
Loading