Skip to content

Commit

Permalink
More single-pass analyzer functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimirg-db committed Jan 24, 2025
1 parent 44966c9 commit 2d496ab
Show file tree
Hide file tree
Showing 60 changed files with 5,143 additions and 1,655 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

package object resolver {

type LogicalPlanResolver = TreeNodeResolver[LogicalPlan, LogicalPlan]

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis.resolver

import org.apache.spark.sql.catalyst.analysis.{
AnalysisErrorAt,
AnsiTypeCoercion,
CollationTypeCoercion,
TypeCoercion
}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Project}

/**
* A resolver for [[AggregateExpression]]s which are introduced while resolving an
* [[UnresolvedFunction]]. It is responsible for the following:
* - Handling of the exceptions related to [[AggregateExpressions]].
* - Updating the [[ExpressionResolver.expressionResolutionContextStack]].
* - Applying type coercion rules to the [[AggregateExpressions]]s children. This is the only
* resolution that we apply here as we already resolved the children of [[AggregateExpression]]
* in the [[FunctionResolver]].
*/
class AggregateExpressionResolver(
expressionResolver: ExpressionResolver,
timezoneAwareExpressionResolver: TimezoneAwareExpressionResolver)
extends TreeNodeResolver[AggregateExpression, Expression]
with ResolvesExpressionChildren {
private val typeCoercionTransformations: Seq[Expression => Expression] =
if (conf.ansiEnabled) {
AggregateExpressionResolver.ANSI_TYPE_COERCION_TRANSFORMATIONS
} else {
AggregateExpressionResolver.TYPE_COERCION_TRANSFORMATIONS
}

private val typeCoercionResolver: TypeCoercionResolver =
new TypeCoercionResolver(timezoneAwareExpressionResolver, typeCoercionTransformations)

private val expressionResolutionContextStack =
expressionResolver.getExpressionResolutionContextStack

/**
* Resolves the given [[AggregateExpression]] by applying:
* - Type coercion rules
* - Validity checks. Those include:
* - Whether the [[AggregateExpression]] is under a valid operator.
* - Whether there is a nested [[AggregateExpression]].
* - Whether there is a nondeterministic child.
* - Updates to the [[ExpressionResolver.expressionResolutionContextStack]]
*/
override def resolve(aggregateExpression: AggregateExpression): Expression = {
val aggregateExpressionWithTypeCoercion =
withResolvedChildren(aggregateExpression, typeCoercionResolver.resolve)

throwIfNotUnderValidOperator(aggregateExpression)
throwIfNestedAggregateExists(aggregateExpressionWithTypeCoercion)
throwIfHasNondeterministicChildren(aggregateExpressionWithTypeCoercion)

expressionResolutionContextStack
.peek()
.hasAggregateExpressionsInASubtree = true

// There are two different cases that we handle regarding the value of the flag:
//
// - We have an attribute under an `AggregateExpression`:
// {{{ SELECT COUNT(col1) FROM VALUES (1); }}}
// In this case, value of the `hasAttributeInASubtree` flag should be `false` as it
// indicates whether there is an attribute in the subtree that's not `AggregateExpression`
// so we can throw the `MISSING_GROUP_BY` exception appropriately.
//
// - In the following example:
// {{{ SELECT COUNT(*), col1 + 1 FROM VALUES (1); }}}
// It would be `true` as described above.
expressionResolutionContextStack.peek().hasAttributeInASubtree = false

aggregateExpressionWithTypeCoercion
}

private def throwIfNotUnderValidOperator(aggregateExpression: AggregateExpression): Unit = {
expressionResolver.getParentOperator.get match {
case _: Aggregate | _: Project =>
case filter: Filter =>
filter.failAnalysis(
errorClass = "INVALID_WHERE_CONDITION",
messageParameters = Map(
"condition" -> toSQLExpr(filter.condition),
"expressionList" -> Seq(aggregateExpression).mkString(", ")
)
)
case other =>
other.failAnalysis(
errorClass = "UNSUPPORTED_EXPR_FOR_OPERATOR",
messageParameters = Map(
"invalidExprSqls" -> Seq(aggregateExpression).mkString(", ")
)
)
}
}

private def throwIfNestedAggregateExists(aggregateExpression: AggregateExpression): Unit = {
if (expressionResolutionContextStack
.peek()
.hasAggregateExpressionsInASubtree) {
aggregateExpression.failAnalysis(
errorClass = "NESTED_AGGREGATE_FUNCTION",
messageParameters = Map.empty
)
}
}

private def throwIfHasNondeterministicChildren(aggregateExpression: AggregateExpression): Unit = {
aggregateExpression.aggregateFunction.children.foreach(child => {
if (!child.deterministic) {
child.failAnalysis(
errorClass = "AGGREGATE_FUNCTION_WITH_NONDETERMINISTIC_EXPRESSION",
messageParameters = Map("sqlExpr" -> toSQLExpr(aggregateExpression))
)
}
})
}
}

object AggregateExpressionResolver {
// Ordering in the list of type coercions should be in sync with the list in [[TypeCoercion]].
private val TYPE_COERCION_TRANSFORMATIONS: Seq[Expression => Expression] = Seq(
CollationTypeCoercion.apply,
TypeCoercion.InTypeCoercion.apply,
TypeCoercion.FunctionArgumentTypeCoercion.apply,
TypeCoercion.IfTypeCoercion.apply,
TypeCoercion.ImplicitTypeCoercion.apply
)

// Ordering in the list of type coercions should be in sync with the list in [[AnsiTypeCoercion]].
private val ANSI_TYPE_COERCION_TRANSFORMATIONS: Seq[Expression => Expression] = Seq(
CollationTypeCoercion.apply,
AnsiTypeCoercion.InTypeCoercion.apply,
AnsiTypeCoercion.FunctionArgumentTypeCoercion.apply,
AnsiTypeCoercion.IfTypeCoercion.apply,
AnsiTypeCoercion.ImplicitTypeCoercion.apply
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,8 @@

package org.apache.spark.sql.catalyst.analysis.resolver

import org.apache.spark.sql.catalyst.analysis.{AliasResolution, UnresolvedAlias}
import org.apache.spark.sql.catalyst.expressions.{
Alias,
Cast,
CreateNamedStruct,
Expression,
NamedExpression
}
import org.apache.spark.sql.catalyst.analysis.{AliasResolution, MultiAlias, UnresolvedAlias}
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression}

/**
* Resolver class that resolves unresolved aliases and handles user-specified aliases.
Expand All @@ -34,109 +28,41 @@ class AliasResolver(expressionResolver: ExpressionResolver, scopes: NameScopeSta
with ResolvesExpressionChildren {

/**
* Resolves [[UnresolvedAlias]] by handling two specific cases:
* - Alias(CreateNamedStruct(...)) - instead of calling [[CreateNamedStructResolver]] which will
* clean up its inner aliases, we manually resolve [[CreateNamedStruct]]'s children, because we
* need to preserve inner aliases until after the alias name is computed. This is a hack because
* fixed-point analyzer computes [[Alias]] name before removing inner aliases.
* - Alias(...) - recursively call [[ExpressionResolver]] to resolve the child expression.
*
* After the children are resolved, call [[AliasResolution]] to compute the alias name. Finally,
* clean up inner aliases from [[CreateNamedStruct]].
* Resolves [[UnresolvedAlias]] by resolving its child and computing the alias name by calling
* [[AliasResolution]] on the result. After resolving it, we assign a correct exprId to the
* resulting [[Alias]]. Here we allow inner aliases to persist until the end of single-pass
* resolution, after which they will be removed in the post-processing phase.
*/
override def resolve(unresolvedAlias: UnresolvedAlias): NamedExpression = {
val aliasWithResolvedChildren = withResolvedChildren(
unresolvedAlias, {
case createNamedStruct: CreateNamedStruct =>
withResolvedChildren(createNamedStruct, expressionResolver.resolve)
case other => expressionResolver.resolve(other)
}
)
override def resolve(unresolvedAlias: UnresolvedAlias): NamedExpression =
scopes.top.lcaRegistry.withNewLcaScope {
val aliasWithResolvedChildren =
withResolvedChildren(unresolvedAlias, expressionResolver.resolve)

val resolvedAlias =
AliasResolution.resolve(aliasWithResolvedChildren).asInstanceOf[NamedExpression]
val resolvedAlias =
AliasResolution.resolve(aliasWithResolvedChildren).asInstanceOf[NamedExpression]

scopes.top.addAlias(resolvedAlias.name)
AliasResolver.cleanupAliases(resolvedAlias)
}
resolvedAlias match {
case multiAlias: MultiAlias =>
throw new ExplicitlyUnsupportedResolverFeature(
s"unsupported expression: ${multiAlias.getClass.getName}"
)
case alias: Alias =>
expressionResolver.getExpressionIdAssigner
.mapExpression(alias)
.asInstanceOf[Alias]
}
}

/**
* Handle already resolved [[Alias]] nodes, i.e. user-specified aliases. We disallow stacking
* of [[Alias]] nodes by collapsing them so that only the top node remains.
*
* For an example query like:
*
* {{{ SELECT 1 AS a }}}
*
* parsed plan will be:
*
* Project [Alias(1, a)]
* +- OneRowRelation
*
* Handle already resolved [[Alias]] nodes, i.e. user-specified aliases. Here we only need to
* resolve its children and afterwards reassign exprId to the resulting [[Alias]].
*/
def handleResolvedAlias(alias: Alias): Alias = {
val aliasWithResolvedChildren = withResolvedChildren(alias, expressionResolver.resolve)
scopes.top.addAlias(aliasWithResolvedChildren.name)
AliasResolver.collapseAlias(aliasWithResolvedChildren)
}
}

object AliasResolver {

/**
* For a query like:
*
* {{{ SELECT STRUCT(1 AS a, 2 AS b) AS st }}}
*
* After resolving [[CreateNamedStruct]] the plan will be:
* CreateNamedStruct(Seq("a", Alias(1, "a"), "b", Alias(2, "b")))
*
* For a query like:
*
* {{{ df.select($"col1".cast("int").cast("double")) }}}
*
* After resolving top-most [[Alias]] the plan will be:
* Alias(Cast(Alias(Cast(col1, int), col1)), double), col1)
*
* Both examples contain inner aliases that are not expected in the analyzed logical plan,
* therefore need to be removed. However, in both examples inner aliases are necessary in order
* for the outer alias to compute its name. To achieve this, we delay removal of inner aliases
* until after the outer alias name is computed.
*
* For cases where there are no dependencies on inner alias, inner alias should be removed by the
* resolver that produces it.
*/
private def cleanupAliases(namedExpression: NamedExpression): NamedExpression =
namedExpression
.withNewChildren(namedExpression.children.map {
case cast @ Cast(alias: Alias, _, _, _) =>
cast.copy(child = alias.child)
case createNamedStruct: CreateNamedStruct =>
CreateNamedStructResolver.cleanupAliases(createNamedStruct)
case other => other
})
.asInstanceOf[NamedExpression]

/**
* If an [[Alias]] node appears on top of another [[Alias]], remove the bottom one. Here we don't
* handle a case where a node of different type appears between two [[Alias]] nodes: in this
* case, removal of inner alias (if it is unnecessary) should be handled by respective node's
* resolver, in order to preserve the bottom-up contract.
*/
private def collapseAlias(alias: Alias): Alias =
alias.child match {
case innerAlias: Alias =>
val metadata = if (alias.metadata.isEmpty) {
None
} else {
Some(alias.metadata)
}
alias.copy(child = innerAlias.child)(
exprId = alias.exprId,
qualifier = alias.qualifier,
explicitMetadata = metadata,
nonInheritableMetadataKeys = alias.nonInheritableMetadataKeys
)
case _ => alias
scopes.top.lcaRegistry.withNewLcaScope {
val aliasWithResolvedChildren = withResolvedChildren(alias, expressionResolver.resolve)
expressionResolver.getExpressionIdAssigner
.mapExpression(aliasWithResolvedChildren)
.asInstanceOf[Alias]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class AttributeScopeStack {
/**
* Overwrite current relevant scope with a sequence of attributes which is an output of some
* operator. `attributes` can have duplicate IDs if the output of the operator contains multiple
* occurrences of the same attribute.
* occurencies of the same attribute.
*/
def overwriteTop(attributes: Seq[Attribute]): Unit = {
stack.pop()
Expand Down
Loading

0 comments on commit 2d496ab

Please sign in to comment.