Skip to content

Commit

Permalink
Merge pull request #820 from s1ck/align_columns_returnitems
Browse files Browse the repository at this point in the history
Implement canonical physical record representation for Cypher results
  • Loading branch information
s1ck authored Feb 22, 2019
2 parents d5c7ef5 + a7b532e commit 395cc12
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 146 deletions.
36 changes: 9 additions & 27 deletions dev-support/caps-style.xml
Original file line number Diff line number Diff line change
@@ -1,33 +1,15 @@
<code_scheme name="caps-style" version="173">
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="99" />
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="99" />
<option name="IMPORT_LAYOUT_TABLE">
<value>
<package name="" withSubpackages="true" static="false" />
<emptyLine />
<package name="java" withSubpackages="true" static="false" />
<package name="javax" withSubpackages="true" static="false" />
<emptyLine />
<package name="org.neo4j" withSubpackages="true" static="false" />
<package name="com.neotechnology" withSubpackages="true" static="false" />
<emptyLine />
<package name="" withSubpackages="true" static="true" />
<emptyLine />
<package name="java" withSubpackages="true" static="false" />
<package name="javax" withSubpackages="true" static="false" />
<emptyLine />
<package name="org.neo4j" withSubpackages="true" static="false" />
<package name="com.neotechnology" withSubpackages="true" static="false" />
</value>
</option>
<option name="JD_ALIGN_PARAM_COMMENTS" value="false" />
<option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false" />
<option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true" />
<option name="JD_PRESERVE_LINE_FEEDS" value="true" />
<option name="WRAP_COMMENTS" value="true" />
<option name="RIGHT_MARGIN" value="160" />
<MarkdownNavigatorCodeStyleSettings>
<option name="RIGHT_MARGIN" value="72" />
</MarkdownNavigatorCodeStyleSettings>
<ScalaCodeStyleSettings>
<option name="alwaysUsedImports">
<array>
<option value="org.opencypher.caps.benchmarks.ConfigurationParsing._" />
</array>
</option>
</ScalaCodeStyleSettings>
<XML>
<option name="XML_LEGACY_SETTINGS_IMPORTED" value="true" />
</XML>
Expand Down Expand Up @@ -105,4 +87,4 @@
<option name="TAB_SIZE" value="2" />
</indentOptions>
</codeStyleSettings>
</code_scheme>
</code_scheme>
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,6 @@ trait CypherTable {
*/
def logicalColumns: Option[Seq[String]] = None

/**
* Get the names of the physical columns that hold the values of the given return item.
* A return item is given by an identifier or alias of a return clause. For example
* in the query 'MATCH (n) RETURN n.foo' the only return item would be 'n.foo'
*
* It returns a list with a single value if the return item is a primitive. It will return
* a list of column names if the return item is an entity, such as a node. The listed columns
* hold the members of the entity (ids, label/type, properties) using an internal naming scheme.
*
* @param returnItem name of one of the return items represented in this table.
* @return a list of names of the physical columns that hold the data for the return item.
*/
def columnsFor(returnItem: String): Set[String]

/**
* CypherType of columns stored in this table.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ final case class Param(name: String)(val cypherType: CypherType = CTWildcard) ex
sealed trait Var extends Expr {
def name: String

override def withoutType: String = s"`$name`"
override def withoutType: String = name
}

object Var {
Expand Down Expand Up @@ -127,9 +127,6 @@ final case class NodeVar(name: String)(val cypherType: CypherType = CTNode) exte
case o => throw IllegalArgumentException(CTNode, o)
}
}

override def withoutType: String = s"$name"

}

final case class RelationshipVar(name: String)(val cypherType: CypherType = CTRelationship) extends ReturnItem {
Expand All @@ -143,8 +140,6 @@ final case class RelationshipVar(name: String)(val cypherType: CypherType = CTRe
case o => throw IllegalArgumentException(CTRelationship, o)
}
}
override def withoutType: String = s"$name"

}

final case class SimpleVar(name: String)(val cypherType: CypherType) extends ReturnItem {
Expand All @@ -153,8 +148,6 @@ final case class SimpleVar(name: String)(val cypherType: CypherType) extends Ret

override def withOwner(expr: Var): SimpleVar = SimpleVar(expr.name)(expr.cypherType)

override def withoutType: String = s"$name"

}

final case class StartNode(rel: Expr)(val cypherType: CypherType = CTWildcard) extends Expr {
Expand Down Expand Up @@ -265,7 +258,7 @@ final case class Not(expr: Expr)(val cypherType: CypherType = CTWildcard) extend

def inner: Expr = expr

override def withoutType = s"NOT ${expr.withoutType}"
override def withoutType = s"NOT(${expr.withoutType})"

}

Expand Down Expand Up @@ -297,7 +290,7 @@ final case class HasType(rel: Expr, relType: RelType)

override def withOwner(v: Var): HasType = HasType(v, relType)(cypherType)

override def withoutType: String = s"type(${rel.withoutType}) = '${relType.name}'"
override def withoutType: String = s"${rel.withoutType}:${relType.name}"

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ import org.opencypher.okapi.impl.util.PrintOptions
import org.opencypher.okapi.logical.impl.LogicalOperator
import org.opencypher.okapi.relational.api.graph.{RelationalCypherGraph, RelationalCypherSession}
import org.opencypher.okapi.relational.api.table.{RelationalCypherRecords, Table}
import org.opencypher.okapi.relational.impl.operators.{RelationalOperator, ReturnGraph}
import org.opencypher.okapi.relational.impl.operators.{AlignColumnsWithReturnItems, RelationalOperator, ReturnGraph}

case class RelationalCypherResult[T <: Table[T]](
import scala.reflect.runtime.universe.TypeTag

case class RelationalCypherResult[T <: Table[T] : TypeTag](
maybeLogical: Option[LogicalOperator],
maybeRelational: Option[RelationalOperator[T]]
)(implicit session: RelationalCypherSession[T]) extends CypherResult {
Expand All @@ -47,11 +49,27 @@ case class RelationalCypherResult[T <: Table[T]](
case _ => None
}

override def getRecords: Option[Records] =
maybeRelational.flatMap {
case _: ReturnGraph[T] => None
case other => Some(session.records.from(other.header, other.table, other.returnItems.map(_.map(_.name))))
}
/**
* Returns records with minimal number of columns and arbitrary column names.
* The column structure is reflected in the RecordHeader.
*/
def getInternalRecords: Option[Records] = maybeRelational.flatMap {
case _: ReturnGraph[T] => None
case relationalOperator => Some(session.records.from(
relationalOperator.header,
relationalOperator.table,
relationalOperator.maybeReturnItems.map(_.map(_.name))))
}

override def getRecords: Option[Records] = maybeRelational.flatMap {
case _: ReturnGraph[T] => None
case relationalOperator =>
val alignedResult = AlignColumnsWithReturnItems[T](relationalOperator)
Some(session.records.from(
alignedResult.header,
alignedResult.table,
alignedResult.maybeReturnItems.map(_.map(_.name))))
}

override def show(implicit options: PrintOptions): Unit = getRecords match {
case Some(r) => r.show
Expand All @@ -63,10 +81,10 @@ case class RelationalCypherResult[T <: Table[T]](

object RelationalCypherResult {

def empty[T <: Table[T]](implicit session: RelationalCypherSession[T]): RelationalCypherResult[T] =
def empty[T <: Table[T] : TypeTag](implicit session: RelationalCypherSession[T]): RelationalCypherResult[T] =
RelationalCypherResult(None, None)

def apply[T <: Table[T]](
def apply[T <: Table[T] : TypeTag](
logical: LogicalOperator,
relational: RelationalOperator[T]
)(implicit session: RelationalCypherSession[T]): RelationalCypherResult[T] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,6 @@ trait RelationalCypherRecords[T <: Table[T]] extends CypherRecords {

override def physicalColumns: Seq[String] = table.physicalColumns

override def columnsFor(returnItem: String): Set[String] = {
val returnExpressions = header.returnItems.filter(_.name == returnItem)

if(returnExpressions.isEmpty)
throw IllegalArgumentException(s"A return item in this table, which contains: $header", returnItem)

val withChildExpressions = returnExpressions.foldLeft(Set.empty[Expr]) {
case (acc, expr) =>
acc ++ header.expressionsFor(expr)
}
withChildExpressions.map(header.column)
}

override def show(implicit options: PrintOptions): Unit =
RecordsPrinter.print(this)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
package org.opencypher.okapi.relational.api.table

import org.opencypher.okapi.api.table.CypherTable
import org.opencypher.okapi.api.types.{CTNull, CypherType}
import org.opencypher.okapi.api.types.CypherType
import org.opencypher.okapi.api.value.CypherValue.CypherMap
import org.opencypher.okapi.ir.api.expr.{Aggregator, Expr, Var}
import org.opencypher.okapi.relational.impl.planning.{JoinType, Order}
Expand Down Expand Up @@ -57,7 +57,18 @@ trait Table[T <: Table[T]] extends CypherTable {
* @param cols columns to select
* @return table containing only requested columns
*/
def select(cols: String*): T
def select(cols: String*): T = {
val tuples = cols.zip(cols)
select(tuples.head, tuples.tail: _*)
}

/**
* Returns a table containing only the given columns. The column order within the table is aligned with the argument.
*
* @param cols columns to select and their alias
* @return table containing only requested aliased columns
*/
def select(col: (String, String), cols: (String, String)*): T

/**
* Returns a table containing only rows where the given expression evaluates to true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ abstract class RelationalOperator[T <: Table[T] : TypeTag] extends AbstractTreeN

def graphName: QualifiedGraphName = children.head.graphName

def returnItems: Option[Seq[Var]] = children.head.returnItems
def maybeReturnItems: Option[Seq[Var]] = children.head.maybeReturnItems

protected def resolve(qualifiedGraphName: QualifiedGraphName)
(implicit context: RelationalRuntimeContext[T]): RelationalCypherGraph[T] =
Expand Down Expand Up @@ -165,7 +165,7 @@ final case class Start[T <: Table[T] : TypeTag](

override lazy val graphName: QualifiedGraphName = qgn

override lazy val returnItems: Option[Seq[Var]] = None
override lazy val maybeReturnItems: Option[Seq[Var]] = None

override def toString: String = {
val graphArg = qgn.toString
Expand Down Expand Up @@ -196,7 +196,7 @@ final case class PrefixGraph[T <: Table[T] : TypeTag](
* Cache is a marker operator that indicates that its child operator is used multiple times within the query.
*/
final case class Cache[T <: Table[T] : TypeTag](in: RelationalOperator[T])
extends RelationalOperator[T] {
extends RelationalOperator[T] {

override lazy val _table: T = in._table.cache()

Expand Down Expand Up @@ -306,7 +306,7 @@ final case class Filter[T <: Table[T] : TypeTag](
}

final case class ReturnGraph[T <: Table[T] : TypeTag](in: RelationalOperator[T])
extends RelationalOperator[T] {
extends RelationalOperator[T] {

override lazy val header: RecordHeader = RecordHeader.empty

Expand All @@ -330,10 +330,33 @@ final case class Select[T <: Table[T] : TypeTag](
in.table.select(selectExpressions.map(header.column).distinct: _*)
}

override lazy val returnItems: Option[Seq[Var]] =
override lazy val maybeReturnItems: Option[Seq[Var]] =
Some(returnExpressions.flatMap(_.owner).collect { case e: Var => e }.distinct)
}

/**
* Renames physical columns to given header expression names.
* Ensures that there is a physical column for each return item, i.e. aliases lead to duplicate physical columns.
*/
final case class AlignColumnsWithReturnItems[T <: Table[T] : TypeTag](
in: RelationalOperator[T]
) extends RelationalOperator[T] {

private lazy val logicalColumns = in.maybeReturnItems
.getOrElse(List.empty)
.flatMap(in.header.expressionsFor)
.map(expr => expr -> expr.withoutType.toString)
.toList

override lazy val header: RecordHeader = RecordHeader(logicalColumns.toMap)

override lazy val _table: T = {
val columnsWithAliases = logicalColumns.map { case (expr, col) => in.header.column(expr) -> col }
in.table.select(columnsWithAliases.head, columnsWithAliases.tail: _*)
}

}

final case class Distinct[T <: Table[T] : TypeTag](
in: RelationalOperator[T],
fields: Set[Var]
Expand All @@ -349,7 +372,7 @@ final case class Aggregate[T <: Table[T] : TypeTag](
aggregations: Set[(Var, Aggregator)]
) extends RelationalOperator[T] {

override lazy val header: RecordHeader = in.header.select(group).withExprs(aggregations.map(_._1))
override lazy val header: RecordHeader = in.header.select(group).withExprs(aggregations.map { case (v, _) => v })

override lazy val _table: T = {
val preparedAggregations = aggregations.map { case (v, agg) => agg -> (header.column(v) -> v.cypherType) }
Expand Down Expand Up @@ -505,7 +528,7 @@ final case class ConstructGraph[T <: Table[T] : TypeTag](

override lazy val _table: T = session.records.unit().table

override def returnItems: Option[Seq[Var]] = None
override def maybeReturnItems: Option[Seq[Var]] = None

override lazy val graph: RelationalCypherGraph[T] = constructedGraph

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ object DataFrameOutputExample extends ConsoleApp {
val df: DataFrame = results.records.asDataFrame

// 5) Select specific return items from the query result
val cols = results.records.columnsFor("a.name") ++ results.records.columnsFor("b.name")
val projection: DataFrame = df.select(cols.head, cols.tail.toSeq: _*)
val projection: DataFrame = df.select("`a.name`", "`b.name`")

projection.show()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
+----------------+-----------------+
|_name __ STRING?|b_name __ STRING?|
+----------------+-----------------+
| Alice| Bob|
| Bob| Carol|
+----------------+-----------------+
+------+------+
|a.name|b.name|
+------+------+
| Alice| Bob|
| Bob| Carol|
+------+------+

Loading

0 comments on commit 395cc12

Please sign in to comment.