Skip to content

Commit

Permalink
Refactor Broom and Shovel, add tests for HtmlSanitizer and JsonErrorR…
Browse files Browse the repository at this point in the history
…epresenter
  • Loading branch information
makiftutuncu committed Aug 24, 2016
1 parent e0ecb20 commit e0607c4
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 147 deletions.
170 changes: 95 additions & 75 deletions app/com/mehmetakiftutuncu/muezzinapi/broom/BroomActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.time.{Duration, LocalDate, LocalDateTime}
import java.util.concurrent.TimeUnit

import akka.actor.Actor
import com.github.mehmetakiftutuncu.errors.{CommonError, Errors}
import com.github.mehmetakiftutuncu.errors.{CommonError, Errors, Maybe}
import com.google.firebase.database.DatabaseReference.CompletionListener
import com.google.firebase.database._
import com.mehmetakiftutuncu.muezzinapi.broom.BroomActor.Wipe
Expand All @@ -16,7 +16,7 @@ import com.mehmetakiftutuncu.muezzinapi.utilities.{AbstractConf, Log, Logging, T
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}
import scala.util.control.NonFatal

class BroomActor(Conf: AbstractConf,
FirebaseRealtimeDatabase: AbstractFirebaseRealtimeDatabase) extends Actor with Logging {
Expand All @@ -28,105 +28,125 @@ class BroomActor(Conf: AbstractConf,

Timer.start("broom")

val countryReference: DatabaseReference = FirebaseRealtimeDatabase.root / "prayerTimes" / "country"
collectReferencesToWipe(startDate).foreach {
maybeReferencesToWipe: Maybe[List[DatabaseReference]] =>
if (maybeReferencesToWipe.hasErrors) {
val duration: Duration = Timer.stop("broom")

val referencesPromise: Promise[List[DatabaseReference]] = Promise[List[DatabaseReference]]()
Log.warn(s"Broom failed in ${duration.toMillis} ms with errors ${maybeReferencesToWipe.errors}!")
} else {
val referencesToWipe: List[DatabaseReference] = maybeReferencesToWipe.value

val valueEventListener: ValueEventListener = new ValueEventListener {
override def onCancelled(databaseError: DatabaseError): Unit = {
val exception: DatabaseException = databaseError.toException
val errors: Errors = Errors(CommonError.requestFailed.reason(exception.getMessage))
wipeReferences(referencesToWipe).foreach {
wipeErrors: Errors =>
val duration: Duration = Timer.stop("broom")

Log.error("Broom failed!", errors, exception)
if (wipeErrors.hasErrors) {
Log.warn(s"Broom failed in ${duration.toMillis} ms with errors $wipeErrors!")
} else {
Log.warn(s"Broom finished in ${duration.toMillis} ms! ${referencesToWipe.size} prayer times that are older than or equal to $startDate are wiped!")
}
}
}
}

referencesPromise.failure(exception)
}
case m @ _ =>
Log.error("Broom failed!", Errors(CommonError.invalidData.reason("Received unknown message!").data(m.toString)))
}

override def onDataChange(dataSnapshot: DataSnapshot): Unit = {
import scala.collection.JavaConverters._
private[broom] def collectReferencesToWipe(startDate: LocalDate): Future[Maybe[List[DatabaseReference]]] = {
val countryReference: DatabaseReference = FirebaseRealtimeDatabase.root / "prayerTimes" / "country"

val countrySnapshots: List[DataSnapshot] = dataSnapshot.getChildren.iterator().asScala.toList
val referencesPromise: Promise[List[DatabaseReference]] = Promise[List[DatabaseReference]]()

val prayerTimeReferencesToWipe: List[DatabaseReference] = countrySnapshots.flatMap {
countrySnapshot: DataSnapshot =>
val citySnapshots: List[DataSnapshot] = (countrySnapshot / "city").getChildren.iterator().asScala.toList
val valueEventListener: ValueEventListener = new ValueEventListener {
override def onCancelled(databaseError: DatabaseError): Unit = {
referencesPromise.failure(databaseError.toException)
}

citySnapshots.flatMap {
citySnapshot: DataSnapshot =>
val districtSnapshots: List[DataSnapshot] = (citySnapshot / "district").getChildren.iterator().asScala.toList
override def onDataChange(dataSnapshot: DataSnapshot): Unit = {
import scala.collection.JavaConverters._

val prayerTimeSnapshotsToWipeForCity: List[DataSnapshot] = if (districtSnapshots.isEmpty) {
citySnapshot.getChildren.iterator().asScala.toList.takeWhile {
dateSnapshot: DataSnapshot =>
val date: LocalDate = LocalDate.parse(dateSnapshot.getKey, PrayerTimesOfDay.dateFormatter)
val countrySnapshots: List[DataSnapshot] = dataSnapshot.getChildren.iterator().asScala.toList

date.isBefore(startDate) || date.isEqual(startDate)
}
} else {
districtSnapshots.flatMap {
districtSnapshot: DataSnapshot =>
districtSnapshot.getChildren.iterator().asScala.toList.takeWhile {
dateSnapshot: DataSnapshot =>
val date: LocalDate = LocalDate.parse(dateSnapshot.getKey, PrayerTimesOfDay.dateFormatter)
val prayerTimeReferencesToWipe: List[DatabaseReference] = countrySnapshots.flatMap {
countrySnapshot: DataSnapshot =>
val citySnapshots: List[DataSnapshot] = (countrySnapshot / "city").getChildren.iterator().asScala.toList

date.isBefore(startDate) || date.isEqual(startDate)
}
}
}
citySnapshots.flatMap {
citySnapshot: DataSnapshot =>
val districtSnapshots: List[DataSnapshot] = (citySnapshot / "district").getChildren.iterator().asScala.toList

prayerTimeSnapshotsToWipeForCity.map(_.getRef)
}
}
val prayerTimeSnapshotsToWipeForCity: List[DataSnapshot] = if (districtSnapshots.isEmpty) {
citySnapshot.getChildren.iterator().asScala.toList.takeWhile {
dateSnapshot: DataSnapshot =>
val date: LocalDate = LocalDate.parse(dateSnapshot.getKey, PrayerTimesOfDay.dateFormatter)

referencesPromise.success(prayerTimeReferencesToWipe)
date.compareTo(startDate) < 1
}
} else {
districtSnapshots.flatMap {
districtSnapshot: DataSnapshot =>
districtSnapshot.getChildren.iterator().asScala.toList.takeWhile {
dateSnapshot: DataSnapshot =>
val date: LocalDate = LocalDate.parse(dateSnapshot.getKey, PrayerTimesOfDay.dateFormatter)

date.compareTo(startDate) < 1
}
}
}

prayerTimeSnapshotsToWipeForCity.map(_.getRef)
}
}
}

countryReference.addValueEventListener(valueEventListener)
referencesPromise.success(prayerTimeReferencesToWipe)
}
}

val futureResult: Future[(Errors, Int)] = referencesPromise.future.flatMap {
references: List[DatabaseReference] =>
countryReference.removeEventListener(valueEventListener)
countryReference.addValueEventListener(valueEventListener)

val wipeResultFutures: List[Future[Errors]] = references.map {
prayerTimeReference: DatabaseReference =>
val promise: Promise[Errors] = Promise[Errors]()
referencesPromise.future.map {
references: List[DatabaseReference] =>
countryReference.removeEventListener(valueEventListener)

prayerTimeReference.removeValue(new CompletionListener {
override def onComplete(databaseError: DatabaseError, databaseReference: DatabaseReference): Unit = {
val errors: Errors = if (databaseError != null) {
Errors(CommonError.database.reason(databaseError.toException.getMessage).data(databaseReference.getKey))
} else {
Errors.empty
}
Maybe(references)
}.recover {
case NonFatal(t: Throwable) =>
val errors: Errors = Errors(CommonError.database.reason(t.getMessage))
Log.error("Broom failed to collect references to wipe from Firebase Realtime Database!", errors, t)

promise.success(errors)
}
})
Maybe(errors)
}
}

promise.future
}
private[broom] def wipeReferences(referencesToWipe: List[DatabaseReference]): Future[Errors] = {
val wipeFutureResults: List[Future[Errors]] = referencesToWipe.map {
prayerTimeReference: DatabaseReference =>
val promise: Promise[Errors] = Promise[Errors]()

Future.sequence(wipeResultFutures).map {
wipeResults: List[Errors] =>
val wipeResult: Errors = wipeResults.foldLeft(Errors.empty)(_ ++ _)
prayerTimeReference.removeValue(new CompletionListener {
override def onComplete(databaseError: DatabaseError, databaseReference: DatabaseReference): Unit = {
val errors: Errors = if (databaseError != null) {
Errors(CommonError.database.reason(databaseError.toException.getMessage).data(databaseReference.getKey))
} else {
Errors.empty
}

wipeResult -> references.size
promise.success(errors)
}
}

futureResult.onComplete {
case Success((errors: Errors, numberOfDeletedPrayerTimes: Int)) =>
val duration: Duration = Timer.stop("broom")
})

Log.warn(s"Broom finished in ${duration.toMillis} ms! $numberOfDeletedPrayerTimes prayer times that are older than or equal to $startDate are wiped${if (errors.hasErrors) " with errors " + errors else ""}!")
promise.future
}

case Failure(t: Throwable) =>
Log.error("Broom failed!", t)
}
Future.sequence(wipeFutureResults).map(_.foldLeft(Errors.empty)(_ ++ _)).recover {
case NonFatal(t: Throwable) =>
val errors: Errors = Errors(CommonError.database.reason(t.getMessage))
Log.error("Broom failed to wipe collected references from Firebase Realtime Database!", errors, t)

case m @ _ =>
Log.error("Broom failed!", Errors(CommonError.invalidData.reason("Received unknown message!").data(m.toString)))
errors
}
}
}

Expand Down
Loading

0 comments on commit e0607c4

Please sign in to comment.