Skip to content

Commit

Permalink
[EXPB-2111] MergeJoin removes null values in front of non null values
Browse files Browse the repository at this point in the history
Signed-off-by: Corvin Kuebler <[email protected]>
  • Loading branch information
CoKueb committed Apr 16, 2024
1 parent 3ad25b5 commit 206fb2b
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,12 @@ public static EnumerableMergeJoin create(RelNode left, RelNode right,
Expressions.call(
BuiltInMethod.MERGE_JOIN.method,
Expressions.list(
leftExpression,
rightExpression,
Expressions.call(BuiltInMethod.MERGE_JOIN_NOT_NULL_ENUMERABLE.method, leftExpression,
Expressions.lambda(
leftKeyPhysType.record(leftExpressions), left_)),
Expressions.call(BuiltInMethod.MERGE_JOIN_NOT_NULL_ENUMERABLE.method, rightExpression,
Expressions.lambda(
rightKeyPhysType.record(rightExpressions), right_)),
Expressions.lambda(
leftKeyPhysType.record(leftExpressions), left_),
Expressions.lambda(
Expand Down
13 changes: 3 additions & 10 deletions core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,7 @@
import org.apache.calcite.interpreter.Context;
import org.apache.calcite.interpreter.Row;
import org.apache.calcite.interpreter.Scalar;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.EnumerableDefaults;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.ExtendedEnumerable;
import org.apache.calcite.linq4j.JoinType;
import org.apache.calcite.linq4j.Linq4j;
import org.apache.calcite.linq4j.MemoryFactory;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.linq4j.Queryable;
import org.apache.calcite.linq4j.*;
import org.apache.calcite.linq4j.function.EqualityComparer;
import org.apache.calcite.linq4j.function.Function0;
import org.apache.calcite.linq4j.function.Function1;
Expand Down Expand Up @@ -213,6 +204,8 @@ public enum BuiltInMethod {
MERGE_JOIN(EnumerableDefaults.class, "mergeJoin", Enumerable.class,
Enumerable.class, Function1.class, Function1.class, Predicate2.class, Function2.class,
JoinType.class, Comparator.class, EqualityComparer.class),
MERGE_JOIN_NOT_NULL_ENUMERABLE(
MergeJoinNotNullEnumerable.class, "create", Enumerable.class, Function1.class),
SLICE0(Enumerables.class, "slice0", Enumerable.class),
SEMI_JOIN(EnumerableDefaults.class, "semiJoin", Enumerable.class,
Enumerable.class, Function1.class, Function1.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.calcite.linq4j;

import org.apache.calcite.linq4j.function.Function1;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;

public class MergeJoinNotNullEnumerable<T, K> extends AbstractEnumerable<T> {
private final Enumerable<T> enumerable;
private final Function1<T, K> keySelector;
private boolean skipNulls = true;

private MergeJoinNotNullEnumerable(Enumerable<T> enumerable, Function1<T, K> keySelector) {
this.enumerable = enumerable;
this.keySelector = keySelector;
}

public static <T, K> MergeJoinNotNullEnumerable<T, K> create(Enumerable<T> enumerable, Function1<T, K> keySelector) {
return new MergeJoinNotNullEnumerable<>(enumerable, keySelector);
}

@Override
public Enumerator<T> enumerator() {
Enumerator<T> enumerator = enumerable.enumerator();
return new Enumerator<T>() {
@Override
public T current() {
return enumerator.current();
}

@Override
public boolean moveNext() {
boolean next = enumerator.moveNext();
if (!skipNulls) {
return next;
}
while (next) {
K key = keySelector.apply(enumerator.current());
if (key != null) {
if (key instanceof Object[]) {
if(Arrays.stream((Object[]) key).noneMatch(Objects::isNull)) {
break;
}
} else if (key instanceof List) {
if (((List<?>) key).stream().noneMatch(Objects::isNull)) {
break;
}
} else {
break;
}
}
next = enumerator.moveNext();
}
skipNulls = false;
return next;
}

@Override
public void reset() {
enumerator.reset();
}

@Override
public void close() {
enumerator.close();
}
};
}
}

0 comments on commit 206fb2b

Please sign in to comment.