From fac80724eeb94d0ff10959768a767eb29d6c0caf Mon Sep 17 00:00:00 2001 From: huangguasky <358012348@qq.com> Date: Sun, 21 Jun 2026 21:09:17 +0800 Subject: [PATCH] Add bit and bool aggregate functions --- .../function/BuildInSqlFunctionTable.java | 6 ++ .../dsl/udf/table/agg/BitAndInteger.java | 72 +++++++++++++++++++ .../dsl/udf/table/agg/BitOrInteger.java | 72 +++++++++++++++++++ .../geaflow/dsl/udf/table/agg/BoolAnd.java | 72 +++++++++++++++++++ .../apache/geaflow/dsl/udf/agg/UDAFTest.java | 59 +++++++++++++++ .../runtime/plan/PhysicAggregateRelNode.java | 18 +++++ .../dsl/runtime/query/AggregateTest.java | 9 +++ .../test/resources/expect/aggregate_013.txt | 1 + .../test/resources/query/aggregate_013.sql | 46 ++++++++++++ 9 files changed, 355 insertions(+) create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/table/agg/BitAndInteger.java create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/table/agg/BitOrInteger.java create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/table/agg/BoolAnd.java create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/aggregate_013.txt create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/aggregate_013.sql diff --git a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java index 47addc84a..ed58b3d15 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java @@ -52,6 +52,9 @@ import org.apache.geaflow.dsl.udf.graph.SingleSourceShortestPath; import org.apache.geaflow.dsl.udf.graph.TriangleCount; import org.apache.geaflow.dsl.udf.graph.WeakConnectedComponents; +import org.apache.geaflow.dsl.udf.table.agg.BitAndInteger; +import org.apache.geaflow.dsl.udf.table.agg.BitOrInteger; +import org.apache.geaflow.dsl.udf.table.agg.BoolAnd; import org.apache.geaflow.dsl.udf.table.agg.PercentileDouble; import org.apache.geaflow.dsl.udf.table.agg.PercentileInteger; import org.apache.geaflow.dsl.udf.table.agg.PercentileLong; @@ -222,6 +225,9 @@ public class BuildInSqlFunctionTable extends ListSqlOperatorTable { .add(GeaFlowFunction.of(PercentileLong.class)) .add(GeaFlowFunction.of(PercentileInteger.class)) .add(GeaFlowFunction.of(PercentileDouble.class)) + .add(GeaFlowFunction.of(BitAndInteger.class)) + .add(GeaFlowFunction.of(BitOrInteger.class)) + .add(GeaFlowFunction.of(BoolAnd.class)) // UDGA .add(GeaFlowFunction.of(SingleSourceShortestPath.class)) .add(GeaFlowFunction.of(AllSourceShortestPath.class)) diff --git a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/table/agg/BitAndInteger.java b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/table/agg/BitAndInteger.java new file mode 100644 index 000000000..570c535c8 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/table/agg/BitAndInteger.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.udf.table.agg; + +import java.io.Serializable; +import org.apache.geaflow.dsl.common.function.Description; +import org.apache.geaflow.dsl.common.function.UDAF; +import org.apache.geaflow.dsl.udf.table.agg.BitAndInteger.Accumulator; + +@Description(name = "bit_and", description = "The bitwise AND aggregate function for int.") +public class BitAndInteger extends UDAF { + + @Override + public Accumulator createAccumulator() { + return new Accumulator(null); + } + + @Override + public void accumulate(Accumulator accumulator, Integer input) { + if (input != null) { + accumulator.value = accumulator.value == null ? input : accumulator.value & input; + } + } + + @Override + public void merge(Accumulator accumulator, Iterable its) { + for (Accumulator toMerge : its) { + if (toMerge.value != null) { + accumulate(accumulator, toMerge.value); + } + } + } + + @Override + public void resetAccumulator(Accumulator accumulator) { + accumulator.value = null; + } + + @Override + public Integer getValue(Accumulator accumulator) { + return accumulator.value; + } + + public static class Accumulator implements Serializable { + + public Accumulator() { + } + + public Integer value; + + public Accumulator(Integer value) { + this.value = value; + } + } +} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/table/agg/BitOrInteger.java b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/table/agg/BitOrInteger.java new file mode 100644 index 000000000..fdeaf178e --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/table/agg/BitOrInteger.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.udf.table.agg; + +import java.io.Serializable; +import org.apache.geaflow.dsl.common.function.Description; +import org.apache.geaflow.dsl.common.function.UDAF; +import org.apache.geaflow.dsl.udf.table.agg.BitOrInteger.Accumulator; + +@Description(name = "bit_or", description = "The bitwise OR aggregate function for int.") +public class BitOrInteger extends UDAF { + + @Override + public Accumulator createAccumulator() { + return new Accumulator(null); + } + + @Override + public void accumulate(Accumulator accumulator, Integer input) { + if (input != null) { + accumulator.value = accumulator.value == null ? input : accumulator.value | input; + } + } + + @Override + public void merge(Accumulator accumulator, Iterable its) { + for (Accumulator toMerge : its) { + if (toMerge.value != null) { + accumulate(accumulator, toMerge.value); + } + } + } + + @Override + public void resetAccumulator(Accumulator accumulator) { + accumulator.value = null; + } + + @Override + public Integer getValue(Accumulator accumulator) { + return accumulator.value; + } + + public static class Accumulator implements Serializable { + + public Accumulator() { + } + + public Integer value; + + public Accumulator(Integer value) { + this.value = value; + } + } +} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/table/agg/BoolAnd.java b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/table/agg/BoolAnd.java new file mode 100644 index 000000000..f6992748d --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/table/agg/BoolAnd.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.udf.table.agg; + +import java.io.Serializable; +import org.apache.geaflow.dsl.common.function.Description; +import org.apache.geaflow.dsl.common.function.UDAF; +import org.apache.geaflow.dsl.udf.table.agg.BoolAnd.Accumulator; + +@Description(name = "bool_and", description = "The boolean AND aggregate function.") +public class BoolAnd extends UDAF { + + @Override + public Accumulator createAccumulator() { + return new Accumulator(null); + } + + @Override + public void accumulate(Accumulator accumulator, Boolean input) { + if (input != null) { + accumulator.value = accumulator.value == null ? input : accumulator.value && input; + } + } + + @Override + public void merge(Accumulator accumulator, Iterable its) { + for (Accumulator toMerge : its) { + if (toMerge.value != null) { + accumulate(accumulator, toMerge.value); + } + } + } + + @Override + public void resetAccumulator(Accumulator accumulator) { + accumulator.value = null; + } + + @Override + public Boolean getValue(Accumulator accumulator) { + return accumulator.value; + } + + public static class Accumulator implements Serializable { + + public Accumulator() { + } + + public Boolean value; + + public Accumulator(Boolean value) { + this.value = value; + } + } +} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/test/java/org/apache/geaflow/dsl/udf/agg/UDAFTest.java b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/test/java/org/apache/geaflow/dsl/udf/agg/UDAFTest.java index 87397aff6..b5f805580 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/test/java/org/apache/geaflow/dsl/udf/agg/UDAFTest.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/test/java/org/apache/geaflow/dsl/udf/agg/UDAFTest.java @@ -23,6 +23,9 @@ import org.apache.geaflow.dsl.udf.table.agg.AvgDouble; import org.apache.geaflow.dsl.udf.table.agg.AvgInteger; import org.apache.geaflow.dsl.udf.table.agg.AvgLong; +import org.apache.geaflow.dsl.udf.table.agg.BitAndInteger; +import org.apache.geaflow.dsl.udf.table.agg.BitOrInteger; +import org.apache.geaflow.dsl.udf.table.agg.BoolAnd; import org.apache.geaflow.dsl.udf.table.agg.Count; import org.apache.geaflow.dsl.udf.table.agg.MaxDouble; import org.apache.geaflow.dsl.udf.table.agg.MaxInteger; @@ -86,6 +89,62 @@ public void testCount() { Assert.assertEquals((long) af.getValue(accumulator), 0); } + @Test + public void testBitAndInteger() { + BitAndInteger af = new BitAndInteger(); + BitAndInteger.Accumulator accumulator = af.createAccumulator(); + af.accumulate(accumulator, null); + Assert.assertNull(af.getValue(accumulator)); + af.accumulate(accumulator, 7); + af.accumulate(accumulator, 3); + Assert.assertEquals((int) af.getValue(accumulator), 3); + + BitAndInteger.Accumulator toMerge = af.createAccumulator(); + af.accumulate(toMerge, 1); + af.merge(accumulator, Lists.newArrayList(toMerge)); + Assert.assertEquals((int) af.getValue(accumulator), 1); + + af.resetAccumulator(accumulator); + Assert.assertNull(af.getValue(accumulator)); + } + + @Test + public void testBitOrInteger() { + BitOrInteger af = new BitOrInteger(); + BitOrInteger.Accumulator accumulator = af.createAccumulator(); + af.accumulate(accumulator, null); + Assert.assertNull(af.getValue(accumulator)); + af.accumulate(accumulator, 4); + af.accumulate(accumulator, 2); + Assert.assertEquals((int) af.getValue(accumulator), 6); + + BitOrInteger.Accumulator toMerge = af.createAccumulator(); + af.accumulate(toMerge, 1); + af.merge(accumulator, Lists.newArrayList(toMerge)); + Assert.assertEquals((int) af.getValue(accumulator), 7); + + af.resetAccumulator(accumulator); + Assert.assertNull(af.getValue(accumulator)); + } + + @Test + public void testBoolAnd() { + BoolAnd af = new BoolAnd(); + BoolAnd.Accumulator accumulator = af.createAccumulator(); + af.accumulate(accumulator, null); + Assert.assertNull(af.getValue(accumulator)); + af.accumulate(accumulator, true); + Assert.assertTrue(af.getValue(accumulator)); + + BoolAnd.Accumulator toMerge = af.createAccumulator(); + af.accumulate(toMerge, false); + af.merge(accumulator, Lists.newArrayList(toMerge)); + Assert.assertFalse(af.getValue(accumulator)); + + af.resetAccumulator(accumulator); + Assert.assertNull(af.getValue(accumulator)); + } + @Test public void testMaxDouble() { MaxDouble af = new MaxDouble(); diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/plan/PhysicAggregateRelNode.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/plan/PhysicAggregateRelNode.java index 0d88993e9..a2a382fd5 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/plan/PhysicAggregateRelNode.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/plan/PhysicAggregateRelNode.java @@ -66,6 +66,9 @@ import org.apache.geaflow.dsl.udf.table.agg.AvgDouble; import org.apache.geaflow.dsl.udf.table.agg.AvgInteger; import org.apache.geaflow.dsl.udf.table.agg.AvgLong; +import org.apache.geaflow.dsl.udf.table.agg.BitAndInteger; +import org.apache.geaflow.dsl.udf.table.agg.BitOrInteger; +import org.apache.geaflow.dsl.udf.table.agg.BoolAnd; import org.apache.geaflow.dsl.udf.table.agg.Count; import org.apache.geaflow.dsl.udf.table.agg.MaxBinaryString; import org.apache.geaflow.dsl.udf.table.agg.MaxDouble; @@ -102,6 +105,12 @@ public class PhysicAggregateRelNode extends Aggregate implements PhysicRelNode buildAggFunctionCalls() { aggClasses.add(PercentileInteger.class); aggClasses.add(PercentileDouble.class); break; + case UDAF_BIT_AND: + aggClasses.add(BitAndInteger.class); + break; + case UDAF_BIT_OR: + aggClasses.add(BitOrInteger.class); + break; + case UDAF_BOOL_AND: + aggClasses.add(BoolAnd.class); + break; default: throw new GeaFlowDSLException("Not support aggregate function " + aggName); } diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/AggregateTest.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/AggregateTest.java index 7e8ea37fa..f06c56762 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/AggregateTest.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/AggregateTest.java @@ -131,6 +131,15 @@ public void testAggregate_012() throws Exception { .checkSinkResult(); } + @Test + public void testAggregate_013() throws Exception { + QueryTester + .build() + .withQueryPath("/query/aggregate_013.sql") + .execute() + .checkSinkResult(); + } + @Test public void testStreamAggregate_001() throws Exception { QueryTester diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/aggregate_013.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/aggregate_013.txt new file mode 100644 index 000000000..39d8cc35a --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/aggregate_013.txt @@ -0,0 +1 @@ +0,63,false diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/aggregate_013.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/aggregate_013.sql new file mode 100644 index 000000000..63667287f --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/aggregate_013.sql @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +CREATE TABLE users ( + rt bigint, + f1 bigint, + f2 double, + f3 varchar, + f4 boolean, + f5 varchar +) WITH ( + type='file', + geaflow.dsl.file.path = 'resource:///data/users3.txt' +); + +CREATE TABLE tbl_result ( + bit_and_value int, + bit_or_value int, + bool_and_value boolean +) WITH ( + type='file', + geaflow.dsl.file.path='${target}' +); + +INSERT INTO tbl_result +SELECT + BIT_AND(cast(f1 as int)) AS bit_and_value, + BIT_OR(cast(f1 as int)) AS bit_or_value, + BOOL_AND(f4) AS bool_and_value +FROM users