Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
timvw committed Nov 6, 2023
1 parent c4f4516 commit 6faa73e
Show file tree
Hide file tree
Showing 35 changed files with 1,189 additions and 0 deletions.
29 changes: 29 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: workflow

on: push

jobs:
build:

runs-on: ubuntu-latest

steps:

- uses: actions/checkout@v2

- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8

- name: Test
run: sbt '+clean; +cleanFiles; +compile; +test'

- name: Release
if: startsWith(github.ref, 'refs/tags/v')
run: sbt 'ci-release'
env:
PGP_PASSPHRASE: ${{ secrets.PGP_PASSPHRASE }}
PGP_SECRET: ${{ secrets.PGP_SECRET }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }}
23 changes: 23 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
metastore_db/
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties
dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties
*.iml
.idea/*
derby.log
.project
.classpath
.settings
.vscode
project/target
.bloop
.metals
metals.sbt
null
.bsp/
Empty file added .scalafmt.conf
Empty file.
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Datasource for Adobe Analytics Data Feed

Adobe Analytics [Data feeds](https://experienceleague.adobe.com/docs/analytics/export/analytics-data-feed/data-feed-contents/datafeeds-contents.html?lang=en) are a means to get raw data out of Adobe Analytics.

This project implements an [Apache Spark data source](https://spark.apache.org/docs/latest/sql-data-sources.html) leveraging [uniVocity TSV Parser](https://github.com/uniVocity/univocity-parsers/tree/master) and does not suffer from the flaws found in many online examples which treat the [(hit)data files](https://experienceleague.adobe.com/docs/analytics/export/analytics-data-feed/data-feed-contents/datafeeds-contents.html?lang=en#hit-data-files) as CSV.
Concretly, escaped values are not handled correctly by a CSV parser due to inherent [differences between CSV and TSV](https://github.com/eBay/tsv-utils/blob/master/docs/comparing-tsv-and-csv.md).

## Usage

Make sure the package is in the classpath, eg: by using the --packages option:

```bash
spark-shell --packages "be.timvw:adobe-analytics-datafeed-datasource_2.12:0.1.0"
```

And you can read the feed as following:

```scala
val df = spark.read
.format("be.timvw.adobe.analytics.datafeed")
.load("./src/test/resources/randyzwitch")
```

## Features

* Correct handling of records which contain [special characters](https://experienceleague.adobe.com/docs/analytics/export/analytics-data-feed/data-feed-contents/datafeeds-spec-chars.html?lang=en)
* Capability to translate lookup columns with their actual value as specified in the [Lookup files](https://experienceleague.adobe.com/docs/analytics/export/analytics-data-feed/data-feed-contents/datafeeds-contents.html?lang=en#lookup-files)

## Options

* FileEncoding (default: ISO-8859-1)
* MaxCharsPerColumn (default: -1)
* EnableLookups (default: true)

We also support the Generic file source options:
* [Path Glob Filter](https://spark.apache.org/docs/latest/sql-data-sources-generic-options.html#path-glob-filter) for manifest files (so should end with *.txt)
* [Modification Time Path Filters](https://spark.apache.org/docs/latest/sql-data-sources-generic-options.html#modification-time-path-filters) for manifest files

```scala
val df = spark.read
.format("clickstream")
.option(ClickstreamOptions.MODIFIED_AFTER, "2023-11-01T00:00:00")
.load("./src/test/resources/randyzwitch")
```
1 change: 1 addition & 0 deletions build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.9.4
26 changes: 26 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
lazy val scala212 = "2.12.18"

lazy val supportedScalaVersions = List(scala212)

ThisBuild / organization := "be.timvw"
ThisBuild / name := "adobe-analytics-datafeed-datasource"

ThisBuild / homepage := Some(url("https://github.com/timvw/adobe-analytics-datafeed-datasource"))
ThisBuild / licenses := List("Apache-2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0"))
ThisBuild / developers := List(Developer("timvw", "Tim Van Wassenhove", "[email protected]", url("https://timvw.be")))

ThisBuild / javacOptions ++= Seq("-source", "1.8", "-target", "1.8")

ThisBuild / scalaVersion := scala212

ThisBuild / crossScalaVersions := supportedScalaVersions

val sparkVersion = "3.3.0"
ThisBuild / libraryDependencies ++= List(
"org.apache.spark" %% "spark-sql" % sparkVersion
)

val scalaTestVersion = "3.1.2"
ThisBuild / libraryDependencies ++= List(
"org.scalatest" %% "scalatest" % scalaTestVersion % Test
)
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.9.4
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.12")
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
be.timvw.adobe.analytics.datafeed.DefaultSource
13 changes: 13 additions & 0 deletions src/main/scala/be/timvw/adobe/analytics/datafeed/DataFile.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package be.timvw.adobe.analytics.datafeed

import org.apache.hadoop.fs.Path

case class DataFile(path: Path, md5: String, size: String, recordCount: Long = 1) {
def filesetName: String = DataFile.filesetName(this)
}

case object DataFile {
def filesetName(dataFile: DataFile): String = {
dataFile.path.getName.substring(dataFile.path.getName.indexOf("-")+1).replace(".tsv.gz", "")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package be.timvw.adobe.analytics.datafeed

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}

import java.nio.charset.Charset

trait DataSourceOptions {
def newOption(name: String): String = name
}

/** *
* Represents reading options which can be specified for the Datafeed datasource
* @param parameters
*/
case class DatafeedOptions(parameters: CaseInsensitiveMap[String]) extends Logging with Serializable {

import implicits._
import DatafeedOptions._

/***
* Defines the maximum number of characters allowed for any given value being written/read. Used to avoid OutOfMemoryErrors (defaults to 4096).
* To enable auto-expansion of the internal array, set this property to -1
*/
val maxCharsPerColumn = parameters.getInt(MAX_CHARS_PER_COLUMN, -1)

/***
* Defines the file encoding
*/
def fileEncoding = Charset.forName(parameters.getOrElse(FILE_ENCODING, "ISO-8859-1"))

val enableLookups = parameters.getBool(ENABLE_LOOKUPS, true)
}

object DatafeedOptions extends DataSourceOptions {

// source name
val SOURCE_NAME = newOption(classOf[DefaultSource].getPackage.getName)

// generic file data source options
val TIME_ZONE = newOption(DateTimeUtils.TIMEZONE_OPTION)
val MODIFIED_BEFORE = newOption("modifiedBefore")
val MODIFIED_AFTER = newOption("modifiedAfter")
val PATH_GLOB_FILTER = newOption("pathGlobFilter")

// Datafeed specific options
val MAX_CHARS_PER_COLUMN = newOption("maxCharsPerColumn")
val FILE_ENCODING = newOption("fileEncoding")
val ENABLE_LOOKUPS = newOption("enableLookups")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package be.timvw.adobe.analytics.datafeed

import org.apache.spark.sql.connector.read.InputPartition

case class DatafeedPartition(dataFile: DataFile, manifestFile: ManifestFile, options: DatafeedOptions) extends InputPartition
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package be.timvw.adobe.analytics.datafeed

import com.univocity.parsers.tsv.{TsvParser, TsvParserSettings}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.execution.datasources.CodecStreams
import org.apache.spark.sql.types.StructType

case class DatafeedPartitionReader(conf: Configuration, partition: DatafeedPartition, requestedSchema: StructType, options: DatafeedOptions) extends PartitionReader[InternalRow] {

val lookupFilesByName = partition.manifestFile.extractLookupFiles(conf)
val schemaForDatafile = LookupFile.getSchemaForDataFiles(lookupFilesByName, options)

// https://experienceleague.adobe.com/docs/analytics/export/analytics-data-feed/data-feed-contents/datafeeds-spec-chars.html?lang=en
val dataFileInputStream = CodecStreams.createInputStreamWithCloseResource(conf, partition.dataFile.path)
val tsvParserSettings = new TsvParserSettings
tsvParserSettings.setLineJoiningEnabled(true)
tsvParserSettings.setMaxColumns(schemaForDatafile.length + 1)
tsvParserSettings.setMaxCharsPerColumn(options.maxCharsPerColumn)
val tokenizer = new TsvParser(tsvParserSettings)
val iter = tokenizer.iterate(dataFileInputStream).iterator()

val valuesContributor = ValuesContributor(options.enableLookups, lookupFilesByName, schemaForDatafile)
val contributor = valuesContributor.getContributor(List.empty, requestedSchema)

override def next(): Boolean = iter.hasNext

override def get(): InternalRow = {
val row = new GenericInternalRow(requestedSchema.length)
val columns = iter.next()
contributor.contributeFunction(row, columns)
row
}

override def close(): Unit = {
dataFileInputStream.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package be.timvw.adobe.analytics.datafeed

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

case class DatafeedPartitionReaderFactory(broadcastedConf: Broadcast[SerializableConfiguration], requestedSchema: StructType, options: DatafeedOptions) extends PartitionReaderFactory {
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
val conf = broadcastedConf.value.value
DatafeedPartitionReader(conf, partition.asInstanceOf[DatafeedPartition], requestedSchema, options)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package be.timvw.adobe.analytics.datafeed

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

case class DatafeedScan(sparkSession: SparkSession, manifestFiles: Seq[ManifestFile], requestedSchema: StructType, options: DatafeedOptions)
extends Scan with Batch {

override def readSchema(): StructType = requestedSchema

override def planInputPartitions(): Array[InputPartition] = {
manifestFiles
.flatMap(manifestFile => manifestFile.dataFiles.map(dataFile => DatafeedPartition(dataFile, manifestFile, options)))
.toArray
}

override def toBatch: Batch = this

override def createReaderFactory(): PartitionReaderFactory = {
val caseSensitiveMap = Map.empty[String, String]
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
val broadcastedConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
DatafeedPartitionReaderFactory(broadcastedConf, requestedSchema, options)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package be.timvw.adobe.analytics.datafeed

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownRequiredColumns}
import org.apache.spark.sql.types.StructType

case class DatafeedScanBuilder(sparkSession: SparkSession,
options: DatafeedOptions,
manifestFiles: List[ManifestFile],
schema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns {

var maybeRequestedSchema: Option[StructType] = None

override def build(): Scan = {
val requestedSchema = maybeRequestedSchema.getOrElse(schema)
DatafeedScan(sparkSession, manifestFiles, requestedSchema, options)
}

override def pruneColumns(requiredSchema: StructType): Unit = {
maybeRequestedSchema = Some(requiredSchema)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package be.timvw.adobe.analytics.datafeed

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.execution.datasources.PathFilterFactory
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import java.util
import scala.collection.JavaConverters._

case class DatafeedTable(name: String,
sparkSession: SparkSession,
options: DatafeedOptions,
paths: Seq[String],
maybeUserSpecifiedSchema: Option[StructType]) extends Table with SupportsRead {

val conf = sparkSession.sparkContext.hadoopConfiguration

def findManifestFilesInPaths(): List[ManifestFile] = {
val pathFilters = PathFilterFactory.create(options.parameters)
paths.flatMap(x => ManifestFile.getManifestFilesInPath(conf, new Path(x), pathFilters)).toList
}

lazy val manifestFiles = findManifestFilesInPaths()

override def schema(): StructType = maybeUserSpecifiedSchema.getOrElse(inferSchema())

def inferSchema(): StructType = {
// in case no manifests are found, we blow up...
val manifestFile = manifestFiles.head
val lookupFilesByName = manifestFile.extractLookupFiles(conf)
val schemaForDataFiles = LookupFile.getSchemaForDataFiles(lookupFilesByName, options)

val valuesContributor = ValuesContributor(options.enableLookups, lookupFilesByName, schemaForDataFiles)
val fieldsWhichCanBeContributed = valuesContributor.getFieldsWhichCanBeContributed()
StructType(fieldsWhichCanBeContributed)
}

override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_READ).asJava

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
DatafeedScanBuilder(sparkSession, this.options, manifestFiles, schema)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package be.timvw.adobe.analytics.datafeed

import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import scala.collection.JavaConverters._

class DefaultSource extends FileDataSourceV2 {
override def shortName(): String = "datafeed"

override def fallbackFileFormat: Class[_ <: FileFormat] = ???

override def getTable(options: CaseInsensitiveStringMap): Table = makeTable(options, None)

override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = makeTable(options, Some(schema))

def makeTable(options: CaseInsensitiveStringMap, maybeSchema: Option[StructType]): Table = {
val paths = getPaths(options)
val tableName = getTableName(options, paths)
val optionsWithoutPaths = getOptionsWithoutPaths(options)
val DatafeedOptions = new DatafeedOptions(CaseInsensitiveMap(optionsWithoutPaths.asScala.toMap))
DatafeedTable(tableName, sparkSession, DatafeedOptions, paths, maybeSchema)
}
}
Loading

0 comments on commit 6faa73e

Please sign in to comment.