Skip to content

Commit

Permalink
Merge pull request #410 from s1ck/408_store_graph_metadata
Browse files Browse the repository at this point in the history
Support reading/writing graph tags in CSV based data sources
  • Loading branch information
DarthMax authored Apr 27, 2018
2 parents cc52ed5 + dddcc7b commit 9ad1855
Show file tree
Hide file tree
Showing 18 changed files with 286 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ import org.opencypher.okapi.impl.exception.GraphNotFoundException
import org.opencypher.okapi.impl.schema.SchemaImpl
import org.opencypher.okapi.testing.Bag._
import org.opencypher.okapi.testing.propertygraph.{TestGraph, TestGraphFactory}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.BeforeAndAfterEach

import scala.util.{Failure, Success, Try}

trait PGDSAcceptance[Session <: CypherSession] extends BeforeAndAfterAll {
trait PGDSAcceptance[Session <: CypherSession] extends BeforeAndAfterEach {
self: BaseTestSuite =>

val createStatements =
Expand All @@ -59,13 +59,16 @@ trait PGDSAcceptance[Session <: CypherSession] extends BeforeAndAfterAll {

val cypherSession: Session = initSession()

override def beforeAll(): Unit = {
super.beforeAll()
override protected def beforeEach(): Unit = {
super.beforeEach()
val ds = create(gn, testGraph, createStatements)
cypherSession.registerSource(ns, ds)
}

override def afterAll(): Unit = super.afterAll()
override protected def afterEach(): Unit = {
cypherSession.deregisterSource(ns)
super.afterEach()
}

def initSession(): Session

Expand Down Expand Up @@ -170,30 +173,33 @@ trait PGDSAcceptance[Session <: CypherSession] extends BeforeAndAfterAll {
}
}

// TODO: unionAll with same graph requires fixing https://github.com/opencypher/cypher-for-apache-spark/issues/402
ignore("supports UNION ALL (requires storing/loading graph tags for CAPS)") {
it("supports storing a union graph") {
cypherSession.cypher("CREATE GRAPH g1 { CONSTRUCT NEW () RETURN GRAPH }")
cypherSession.cypher("CREATE GRAPH g2 { CONSTRUCT NEW () RETURN GRAPH }")
val unionGraphName = GraphName("union")

val graph = cypherSession.catalog.source(ns).graph(gn)
graph.nodes("n").size shouldBe 3
val g1 = cypherSession.catalog.graph("g1")
val g2 = cypherSession.catalog.graph("g2")

g1.nodes("n").size shouldBe 1
g2.nodes("n").size shouldBe 1

val unionGraph = graph.unionAll(graph)
unionGraph.nodes("n").size shouldBe 6
val unionGraph = g1.unionAll(g2)
unionGraph.nodes("n").size shouldBe 2

Try {
cypherSession.catalog.source(ns).store(unionGraphName, unionGraph)
} match {
case Success(_) =>
withClue("`graph` needs to return graph with correct node size after storing a union graph") {
cypherSession.catalog.source(ns).graph(unionGraphName).nodes("n").size shouldBe 6
cypherSession.catalog.source(ns).graph(unionGraphName).nodes("n").size shouldBe 2
}
case Failure(_: UnsupportedOperationException) =>
case Failure(t) => badFailure(t)
}
}

// TODO: https://github.com/opencypher/cypher-for-apache-spark/issues/408
it("supports repeated CONSTRUCT ON (requires storing/loading graph tags for CAPS)") {
it("supports repeated CONSTRUCT ON") {
val firstConstructedGraphName = GraphName("first")
val secondConstructedGraphName = GraphName("second")
val graph = cypherSession.catalog.source(ns).graph(gn)
Expand Down Expand Up @@ -221,7 +227,7 @@ trait PGDSAcceptance[Session <: CypherSession] extends BeforeAndAfterAll {
| RETURN GRAPH
""".stripMargin).getGraph
secondConstructedGraph.nodes("n").size shouldBe 5
cypherSession.catalog.source(ns).store(firstConstructedGraphName, secondConstructedGraph)
cypherSession.catalog.source(ns).store(secondConstructedGraphName, secondConstructedGraph)
val retrievedSecondConstructedGraph = cypherSession.catalog.source(ns).graph(secondConstructedGraphName)
retrievedSecondConstructedGraph.nodes("n").size shouldBe 5
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.opencypher.okapi.api.graph._
import org.opencypher.okapi.api.table.CypherRecords
import org.opencypher.okapi.api.value.CypherValue.CypherMap
import org.opencypher.okapi.impl.exception.{IllegalArgumentException, UnsupportedOperationException}
import org.opencypher.okapi.impl.exception.UnsupportedOperationException
import org.opencypher.okapi.impl.graph.CypherCatalog
import org.opencypher.okapi.relational.impl.table.ColumnName
import org.opencypher.spark.api.io._
Expand Down Expand Up @@ -66,25 +66,26 @@ trait CAPSSession extends CypherSession {
}

/**
* Reads a graph from a sequence of entity tables and expects that the first table is a node table.
* Reads a graph from a sequence of entity tables that contains at least one node table.
*
* @param nodeTable first parameter to guarantee there is at least one node table
* @param entityTables sequence of node and relationship tables defining the graph
* @return property graph
*/
def readFrom(entityTables: CAPSEntityTable*): PropertyGraph = entityTables.head match {
case h: CAPSNodeTable => readFrom(h, entityTables.tail: _*)
case _ => throw IllegalArgumentException("first argument of type NodeTable", "RelationshipTable")
def readFrom(nodeTable: CAPSNodeTable, entityTables: CAPSEntityTable*): PropertyGraph = {
CAPSGraph.create(nodeTable, entityTables:_ *)(this)
}

/**
* Reads a graph from a sequence of entity tables that contains at least one node table.
*
* @param tags tags that are used by graph entities
* @param nodeTable first parameter to guarantee there is at least one node table
* @param entityTables sequence of node and relationship tables defining the graph
* @return property graph
*/
def readFrom(nodeTable: CAPSNodeTable, entityTables: CAPSEntityTable*): PropertyGraph = {
CAPSGraph.create(nodeTable, entityTables: _*)(this)
def readFrom(tags: Set[Int], nodeTable: CAPSNodeTable, entityTables: CAPSEntityTable*): PropertyGraph = {
CAPSGraph.create(tags, nodeTable, entityTables: _*)(this)
}

private[opencypher] val emptyGraphQgn = QualifiedGraphName(catalog.sessionNamespace, GraphName("emptyGraph"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,13 @@ object CAPSGraph {
}

def create(nodeTable: CAPSNodeTable, entityTables: CAPSEntityTable*)(implicit caps: CAPSSession): CAPSGraph = {
create(Set(0), nodeTable, entityTables: _*)
}

def create(tags: Set[Int], nodeTable: CAPSNodeTable, entityTables: CAPSEntityTable*)(implicit caps: CAPSSession): CAPSGraph = {
val allTables = nodeTable +: entityTables
val schema = allTables.map(_.schema).reduce[Schema](_ ++ _).asCaps
new CAPSScanGraph(allTables, schema, Set(0))
new CAPSScanGraph(allTables, schema, tags)
}

def create(records: CypherRecords, schema: CAPSSchema, tags: Set[Int] = Set(0))(implicit caps: CAPSSession): CAPSGraph = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ import java.net.URI
import java.nio.file.{Files, Paths}
import java.util.stream.Collectors

import io.circe.generic.auto._
import io.circe.syntax._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.opencypher.okapi.impl.exception.{GraphNotFoundException, IllegalArgumentException, InvalidGraphException}
import org.opencypher.okapi.impl.exception.IllegalArgumentException
import org.opencypher.spark.impl.io.hdfs.CsvGraphLoader._

trait CsvFileHandler {

def graphLocation: URI

def listNodeFiles: Array[URI] = listDataFiles(NODES_DIRECTORY)
Expand All @@ -47,70 +50,81 @@ trait CsvFileHandler {

def readSchemaFile(path: URI): String

def readMetaData(fileName: String): CsvGraphMetaData

def writeSchemaFile(directory: String, filename: String, jsonSchema: String): Unit

def writeMetaData(fileName: String, metaData: CsvGraphMetaData): Unit

def exists(path: String): Boolean
}

final class HadoopFileHandler(override val graphLocation: URI, private val hadoopConfig: Configuration)
extends CsvFileHandler {

private val fs: FileSystem = FileSystem.get(graphLocation, hadoopConfig)

override def listDataFiles(directory: String): Array[URI] = {
val graphDirectory = graphLocation.getPath
if (!fs.exists(new Path(graphDirectory))) {
throw GraphNotFoundException(s"CSV graph with name '$graphDirectory'")
}
val entitiesDirectory = new Path(graphDirectory, directory)
if (!fs.exists(entitiesDirectory)) {
throw InvalidGraphException(s"CSV graph is missing required directory '$directory'")
}
fs.listStatus(entitiesDirectory)
override def exists(path: String): Boolean =
fs.exists(new Path(path))

override def listDataFiles(directory: String): Array[URI] =
fs.listStatus(new Path(graphLocation.getPath, directory))
.filter(p => p.getPath.toString.toUpperCase.endsWith(CSV_SUFFIX))
.map(_.getPath.toUri)
}

override def readSchemaFile(path: URI): String = {
val hdfsPath = new Path(path)
val schemaPaths = Seq(hdfsPath.suffix(SCHEMA_SUFFIX.toLowerCase), hdfsPath.suffix(SCHEMA_SUFFIX))
val optSchemaPath = schemaPaths.find(fs.exists)
val schemaPath = optSchemaPath.getOrElse(throw IllegalArgumentException(s"to find a schema file at $path"))
val stream = new BufferedReader(new InputStreamReader(fs.open(schemaPath)))

def readLines = Stream.cons(stream.readLine(), Stream.continually(stream.readLine))
readFile(schemaPath)
}

readLines.takeWhile(_ != null).mkString
override def readMetaData(fileName: String): CsvGraphMetaData = {
val graphDirectory = graphLocation.getPath
val metaDataPath = new Path(graphDirectory, fileName)
CsvGraphMetaData(readFile(metaDataPath))
}

override def writeSchemaFile(directory: String, filename: String, jsonSchema: String): Unit = {
val hdfsPath = new Path(new Path(graphLocation.getPath, directory), filename)
val outputStream = fs.create(hdfsPath)
override def writeSchemaFile(directory: String, filename: String, jsonSchema: String): Unit =
writeFile(new Path(new Path(graphLocation.getPath, directory), filename), jsonSchema)

override def writeMetaData(fileName: String, metaData: CsvGraphMetaData): Unit =
writeFile(new Path(graphLocation.getPath, fileName), metaData.asJson.toString())

private def writeFile(path: Path, content: String): Unit = {
val outputStream = fs.create(path)
val bw = new BufferedWriter(new OutputStreamWriter(outputStream, "UTF-8"))
bw.write(jsonSchema)
bw.write(content)
bw.close()
}

private def readFile(path: Path): String = {
val stream = new BufferedReader(new InputStreamReader(fs.open(path)))

def readLines = Stream.cons(stream.readLine(), Stream.continually(stream.readLine))

readLines.takeWhile(_ != null).mkString
}

}

final class LocalFileHandler(override val graphLocation: URI) extends CsvFileHandler {

import scala.collection.JavaConverters._

override def listDataFiles(directory: String): Array[URI] = {
val graphDirectory = graphLocation.getPath
if (Files.notExists(Paths.get(graphDirectory))) {
throw GraphNotFoundException(s"CSV graph with name '$graphDirectory'")
}
val entitiesDirectory = Paths.get(graphDirectory, directory)
if (Files.notExists(entitiesDirectory)) {
throw InvalidGraphException(s"CSV graph is missing required directory '$directory'")
}
override def exists(path: String): Boolean =
Files.exists(Paths.get(path))

override def listDataFiles(directory: String): Array[URI] =
Files
.list(entitiesDirectory)
.list(Paths.get(graphLocation.getPath, directory))
.collect(Collectors.toList())
.asScala
.filter(p => p.toString.toUpperCase.endsWith(CSV_SUFFIX))
.toArray
.map(_.toUri)
}


override def readSchemaFile(csvUri: URI): String = {
val path = Paths.get(csvUri)
Expand All @@ -125,10 +139,23 @@ final class LocalFileHandler(override val graphLocation: URI) extends CsvFileHan
new String(Files.readAllBytes(schemaPath))
}

override def writeSchemaFile(directory: String, filename: String, jsonSchema: String): Unit = {
val file = new File(Paths.get(graphLocation.getPath, directory, filename).toString)
override def readMetaData(fileName: String): CsvGraphMetaData = {
val graphDirectory = graphLocation.getPath
val metaDataPath = Paths.get(graphDirectory, fileName)
CsvGraphMetaData(new String(Files.readAllBytes(metaDataPath)))
}

override def writeSchemaFile(directory: String, filename: String, jsonSchema: String): Unit =
writeFile(Paths.get(graphLocation.getPath, directory, filename), jsonSchema)

override def writeMetaData(fileName: String, metaData: CsvGraphMetaData): Unit =
writeFile(Paths.get(graphLocation.getPath, fileName), metaData.asJson.toString())

private def writeFile(path: java.nio.file.Path, content: String): Unit = {
val file = new File(path.toString)
val bw = new BufferedWriter(new FileWriter(file))
bw.write(jsonSchema)
bw.write(content)
bw.close()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,18 @@
package org.opencypher.spark.impl.io.hdfs

import java.net.URI
import java.nio.file.Paths

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SparkSession, functions}
import org.opencypher.okapi.api.graph.PropertyGraph
import org.opencypher.okapi.api.io.conversion.{NodeMapping, RelationshipMapping}
import org.opencypher.okapi.impl.exception.{GraphNotFoundException, InvalidGraphException}
import org.opencypher.spark.api.CAPSSession
import org.opencypher.spark.api.io.{CAPSNodeTable, CAPSRelationshipTable}
import org.opencypher.spark.impl.DataFrameOps._
import CsvGraphLoader._

/**
* Loads a graph stored in indexed CSV format from HDFS or the local file system
Expand All @@ -60,7 +63,33 @@ class CsvGraphLoader(fileHandler: CsvFileHandler)(implicit capsSession: CAPSSess

private val sparkSession: SparkSession = capsSession.sparkSession

def load: PropertyGraph = capsSession.readFrom(loadNodes ++ loadRels: _*)
def load: PropertyGraph = {

if (!fileHandler.exists(fileHandler.graphLocation.getPath)) {
throw GraphNotFoundException(s"CSV graph with name '${fileHandler.graphLocation.getPath}'")
}
if (!fileHandler.exists(Paths.get(fileHandler.graphLocation.getPath, NODES_DIRECTORY).toString)) {
throw InvalidGraphException(s"CSV graph is missing required directory '$NODES_DIRECTORY'")
}

val nodeTables = loadNodes

val relTables = if (fileHandler.exists(Paths.get(fileHandler.graphLocation.getPath, RELS_DIRECTORY).toString)) {
loadRels
} else {
List.empty[CAPSRelationshipTable]
}

val metaData = if (fileHandler.exists(Paths.get(fileHandler.graphLocation.getPath, NODES_DIRECTORY).toString)) {
loadMetaData
} else {
CsvGraphMetaData.empty
}
capsSession.readFrom(metaData.tags, nodeTables.head, nodeTables.tail ++ relTables: _*)
}

private def loadMetaData: CsvGraphMetaData =
fileHandler.readMetaData(METADATA_FILE)

private def loadNodes: List[CAPSNodeTable] = {
val csvFiles = fileHandler.listNodeFiles.toList
Expand Down Expand Up @@ -156,6 +185,8 @@ object CsvGraphLoader {

val SCHEMA_SUFFIX = ".SCHEMA"

val METADATA_FILE = "metadata.json"

def apply(location: URI, hadoopConfig: Configuration)(implicit caps: CAPSSession): CsvGraphLoader = {
new CsvGraphLoader(new HadoopFileHandler(location, hadoopConfig))
}
Expand Down
Loading

0 comments on commit 9ad1855

Please sign in to comment.