diff --git a/benchmarks/src/main/scala/zio/query/FromRequestBenchmark.scala b/benchmarks/src/main/scala/zio/query/FromRequestBenchmark.scala index 28c2f163..334b4e3d 100644 --- a/benchmarks/src/main/scala/zio/query/FromRequestBenchmark.scala +++ b/benchmarks/src/main/scala/zio/query/FromRequestBenchmark.scala @@ -32,6 +32,13 @@ class FromRequestBenchmark { unsafeRunCache(query, Cache.unsafeMake(count)) } + @Benchmark + def fromRequestUncached(): Long = { + val reqs = Chunk.fromIterable((0 until count).map(i => ZQuery.fromRequest(Req(i))(ds))) + val query = ZQuery.collectAllBatched(reqs).map(_.sum.toLong) + unsafeRun(query.uncached) + } + @Benchmark def fromRequestZipRight(): Long = { val reqs = Chunk.fromIterable((0 until count).map(i => ZQuery.fromRequest(Req(i))(ds))) diff --git a/build.sbt b/build.sbt index 883ba938..20f69fa6 100644 --- a/build.sbt +++ b/build.sbt @@ -52,6 +52,9 @@ lazy val zioQuery = crossProject(JSPlatform, JVMPlatform) .settings(enableZIO()) .settings(scalacOptions += "-Wconf:msg=[zio.stacktracer.TracingImplicits.disableAutoTrace]:silent") .settings( + libraryDependencies ++= Seq( + "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" + ), scalacOptions ++= (if (scalaBinaryVersion.value == "3") Seq() diff --git a/zio-query/shared/src/main/scala/zio/query/Cache.scala b/zio-query/shared/src/main/scala/zio/query/Cache.scala index 176f45df..3ddbfca7 100644 --- a/zio-query/shared/src/main/scala/zio/query/Cache.scala +++ b/zio-query/shared/src/main/scala/zio/query/Cache.scala @@ -86,14 +86,10 @@ object Cache { ev: A <:< Request[E, B], trace: Trace ): UIO[Either[Promise[E, B], Promise[E, B]]] = - ZIO.fiberId.map(lookupUnsafe(request, _)(Unsafe.unsafe, implicitly)) - - def lookupUnsafe[E, A, B]( - request: A, - fiberId: FiberId - )(implicit - unsafe: Unsafe, - ev: A <:< Request[E, B] + ZIO.fiberId.map(lookupUnsafe(request, _)(Unsafe.unsafe)) + + def lookupUnsafe[E, A, B](request: Request[_, _], fiberId: FiberId)(implicit + unsafe: Unsafe ): Either[Promise[E, B], Promise[E, B]] = { val newPromise = Promise.unsafe.make[E, B](fiberId) val existing = map.putIfAbsent(request, newPromise).asInstanceOf[Promise[E, B]] @@ -101,18 +97,10 @@ object Cache { } def put[E, A](request: Request[E, A], result: Promise[E, A])(implicit trace: Trace): UIO[Unit] = - ZIO.succeed(putUnsafe(request, result)) - - def putUnsafe[E, A](request: Request[E, A], result: Promise[E, A]): Unit = { - map.put(request, result) - () - } + ZIO.succeed(map.put(request, result)) def remove[E, A](request: Request[E, A])(implicit trace: Trace): UIO[Unit] = - ZIO.succeed { - map.remove(request) - () - } + ZIO.succeed(map.remove(request)) } // TODO: Initialize the map with a sensible default value. Default is 16, which seems way too small for a cache diff --git a/zio-query/shared/src/main/scala/zio/query/CompletedRequestMap.scala b/zio-query/shared/src/main/scala/zio/query/CompletedRequestMap.scala index c63020ab..e623d9c1 100644 --- a/zio-query/shared/src/main/scala/zio/query/CompletedRequestMap.scala +++ b/zio-query/shared/src/main/scala/zio/query/CompletedRequestMap.scala @@ -19,6 +19,7 @@ package zio.query import zio.Exit import zio.stacktracer.TracingImplicits.disableAutoTrace +import scala.collection.compat._ import scala.collection.immutable.HashMap import scala.collection.mutable @@ -76,11 +77,8 @@ final class CompletedRequestMap private (private val map: HashMap[Any, Exit[Any, def isEmpty: Boolean = map.isEmpty - private[query] def toMutableMap: mutable.HashMap[Request[?, ?], Exit[Any, Any]] = { - val map0 = new mutable.HashMap[Request[?, ?], Exit[Any, Any]]() - map0.sizeHint(map.size) - map0 ++= map.asInstanceOf[HashMap[Request[?, ?], Exit[Any, Any]]] - } + private[query] def toMutableMap: mutable.HashMap[Request[?, ?], Exit[Any, Any]] = + mutable.HashMap.from(map.asInstanceOf[HashMap[Request[?, ?], Exit[Any, Any]]]) override def toString: String = s"CompletedRequestMap(${map.mkString(", ")})" @@ -97,11 +95,8 @@ object CompletedRequestMap { /** * Constructs a completed requests map from the specified results. */ - def fromIterable[E, A](iterable: Iterable[(Request[E, A], Exit[E, A])]): CompletedRequestMap = { - val builder = HashMap.newBuilder[Any, Exit[Any, Any]] - builder ++= iterable - new CompletedRequestMap(builder.result()) - } + def fromIterable[E, A](iterable: Iterable[(Request[E, A], Exit[E, A])]): CompletedRequestMap = + new CompletedRequestMap(HashMap.from(iterable)) /** * Constructs a completed requests map from the specified optional results. diff --git a/zio-query/shared/src/main/scala/zio/query/ZQuery.scala b/zio-query/shared/src/main/scala/zio/query/ZQuery.scala index d07ad334..86982f33 100644 --- a/zio-query/shared/src/main/scala/zio/query/ZQuery.scala +++ b/zio-query/shared/src/main/scala/zio/query/ZQuery.scala @@ -20,7 +20,8 @@ import zio._ import zio.query.internal._ import zio.stacktracer.TracingImplicits.disableAutoTrace -import scala.collection.mutable.Builder +import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.compat.{BuildFrom => _, _} import scala.reflect.ClassTag /** @@ -1010,16 +1011,15 @@ object ZQuery { )( f: A => ZQuery[R, E, B] )(implicit bf: BuildFrom[Collection[A], B, Collection[B]], trace: Trace): ZQuery[R, E, Collection[B]] = - if (as.isEmpty) ZQuery.succeed(bf.newBuilder(as).result()) - else { - val iterator = as.iterator - var builder: ZQuery[R, E, Builder[B, Collection[B]]] = null - while (iterator.hasNext) { - val a = iterator.next() - if (builder eq null) builder = f(a).map(bf.newBuilder(as) += _) - else builder = builder.zipWith(f(a))(_ += _) - } - builder.map(_.result()) + as.sizeCompare(1) match { + case -1 => ZQuery.succeed(bf.newBuilder(as).result()) + case 0 => f(as.head).map(bf.newBuilder(as) += _).map(_.result()) + case _ => + ZQuery { + ZIO + .foreach[R, Nothing, A, Result[R, E, B], Iterable](as)(f(_).step) + .map(collectResults(as, _, mode = 0)) + } } /** @@ -1089,112 +1089,16 @@ object ZQuery { )( f: A => ZQuery[R, E, B] )(implicit bf: BuildFrom[Collection[A], B, Collection[B]], trace: Trace): ZQuery[R, E, Collection[B]] = - if (as.isEmpty) ZQuery.succeed(bf.newBuilder(as).result()) - else - ZQuery( - ZIO.suspendSucceed { - var blockedRequests: BlockedRequests[R] = BlockedRequests.empty - val doneBuilder: Builder[B, Collection[B]] = bf.newBuilder(as) - val doneIndicesBuilder: ChunkBuilder[Int] = new ChunkBuilder.Int - val effectBuilder: ChunkBuilder[ZQuery[R, E, B]] = ChunkBuilder.make[ZQuery[R, E, B]]() - val effectIndicesBuilder: ChunkBuilder[Int] = new ChunkBuilder.Int - val failBuilder: ChunkBuilder[Cause[E]] = ChunkBuilder.make[Cause[E]]() - val getBuilder: ChunkBuilder[IO[E, B]] = ChunkBuilder.make[IO[E, B]]() - val getIndicesBuilder: ChunkBuilder[Int] = new ChunkBuilder.Int - var index: Int = 0 - val iterator: Iterator[A] = as.iterator - - ZIO.whileLoop { - iterator.hasNext - } { - f(iterator.next()).step - } { - case Result.Blocked(blockedRequest, Continue.Effect(query)) => - blockedRequests = blockedRequests && blockedRequest - effectBuilder += query - effectIndicesBuilder += index - index += 1 - case Result.Blocked(blockedRequest, Continue.Get(io)) => - blockedRequests = blockedRequests && blockedRequest - getBuilder += io - getIndicesBuilder += index - index += 1 - case Result.Done(b) => - doneBuilder += b - doneIndicesBuilder += index - index += 1 - case Result.Fail(e) => - failBuilder += e - index += 1 - }.as { - val dones = doneBuilder.result() - val effects = effectBuilder.result() - val fails = failBuilder.result() - val gets = getBuilder.result() - if (gets.isEmpty && effects.isEmpty && fails.isEmpty) { - Result.done(bf.fromSpecific(as)(dones)) - } else if (fails.isEmpty) { - val continue = if (effects.isEmpty) { - val getIndices = getIndicesBuilder.result() - val doneIndices = doneIndicesBuilder.result() - val io = ZIO.collectAll(gets).map { gets => - val array = Array.ofDim[AnyRef](index) - val getsIterator = gets.iterator - val getIndicesIterator = getIndices.iterator - while (getsIterator.hasNext) { - val get = getsIterator.next() - val index = getIndicesIterator.next() - array(index) = get.asInstanceOf[AnyRef] - } - val donesIterator = dones.iterator - val doneIndicesIterator = doneIndices.iterator - while (donesIterator.hasNext) { - val done = donesIterator.next() - val index = doneIndicesIterator.next() - array(index) = done.asInstanceOf[AnyRef] - } - bf.fromSpecific(as)(array.asInstanceOf[Array[B]]) - } - Continue.get(io) - } else { - val effectIndices = effectIndicesBuilder.result() - val getIndices = getIndicesBuilder.result() - val doneIndices = doneIndicesBuilder.result() - val query = ZQuery.collectAllBatched(effects).flatMap { effects => - ZQuery.fromZIONow(ZIO.collectAll(gets).map { gets => - val array = Array.ofDim[AnyRef](index) - val effectsIterator = effects.iterator - val effectIndicesIterator = effectIndices.iterator - while (effectsIterator.hasNext) { - val effect = effectsIterator.next() - val index = effectIndicesIterator.next() - array(index) = effect.asInstanceOf[AnyRef] - } - val getsIterator = gets.iterator - val getIndicesIterator = getIndices.iterator - while (getsIterator.hasNext) { - val get = getsIterator.next() - val index = getIndicesIterator.next() - array(index) = get.asInstanceOf[AnyRef] - } - val donesIterator = dones.iterator - val doneIndicesIterator = doneIndices.iterator - while (donesIterator.hasNext) { - val done = donesIterator.next() - val index = doneIndicesIterator.next() - array(index) = done.asInstanceOf[AnyRef] - } - bf.fromSpecific(as)(array.asInstanceOf[Array[B]]) - }) - } - Continue.effect(query) - } - Result.blocked(blockedRequests, continue) - } else - Result.fail(fails.foldLeft[Cause[E]](Cause.empty)(_ && _)) - } + as.sizeCompare(1) match { + case -1 => ZQuery.succeed(bf.newBuilder(as).result()) + case 0 => f(as.head).map(bf.newBuilder(as) += _).map(_.result()) + case _ => + ZQuery { + ZIO + .foreach[R, Nothing, A, Result[R, E, B], Iterable](as)(f(_).step) + .map(collectResults(as, _, mode = 2)) } - ) + } final def foreachBatched[R, E, A, B](as: Set[A])(fn: A => ZQuery[R, E, B])(implicit trace: Trace @@ -1247,26 +1151,113 @@ object ZQuery { )( f: A => ZQuery[R, E, B] )(implicit bf: BuildFrom[Collection[A], B, Collection[B]], trace: Trace): ZQuery[R, E, Collection[B]] = - ZQuery.suspend { - if (as.isEmpty) - ZQuery.succeed(bf.newBuilder(as).result()) - else if (isSingleElementIterable(as)) - f(as.head).map(bf.newBuilder(as) += _).map(_.result()) - else - ZQuery( + as.sizeCompare(1) match { + case -1 => ZQuery.succeed(bf.newBuilder(as).result()) + case 0 => f(as.head).map(bf.newBuilder(as) += _).map(_.result()) + case _ => + ZQuery { ZIO .foreachPar[R, Nothing, A, Result[R, E, B], Iterable](as)(f(_).step) - .map(Result.collectAllPar(_).map(bf.fromSpecific(as))) - ) + .map(collectResults(as, _, mode = 1)) + } } - private def isSingleElementIterable(iterable: Iterable[?]): Boolean = - iterable match { - case _ :: Nil => true - case _: List[?] => false - case _ => iterable.size == 1 + private def collectResults[R, E, A, B, Collection[+Element] <: Iterable[Element]]( + as: Collection[A], + results: Iterable[Result[R, E, B]], + mode: Int // 0 = sequential, 1 = parallel, 2 = batched + )(implicit bf: BuildFrom[Collection[A], B, Collection[B]], trace: Trace): Result[R, E, Collection[B]] = { + + def addToArray(array: Array[B])(idxs: Chunk[RuntimeFlags], values: Chunk[B]): Unit = + if (idxs.nonEmpty) { + var i = 0 + val iter = values.chunkIterator + val idxIter = idxs.chunkIterator + while (idxIter.hasNextAt(i)) { + val idx = idxIter.nextAt(i) + val value = iter.nextAt(i) + array(idx) = value + i += 1 + } + } + + var blockedRequests: BlockedRequests[R] = BlockedRequests.empty + val doneBuilder: ChunkBuilder[B] = ChunkBuilder.make[B]() + val doneIndicesBuilder: ChunkBuilder[Int] = new ChunkBuilder.Int + val effectBuilder: ChunkBuilder[ZQuery[R, E, B]] = ChunkBuilder.make[ZQuery[R, E, B]]() + val effectIndicesBuilder: ChunkBuilder[Int] = new ChunkBuilder.Int + val failBuilder: ChunkBuilder[Cause[E]] = ChunkBuilder.make[Cause[E]]() + val getBuilder: ChunkBuilder[IO[E, B]] = ChunkBuilder.make[IO[E, B]]() + val getIndicesBuilder: ChunkBuilder[Int] = new ChunkBuilder.Int + var index: Int = 0 + val iter = results.iterator + + while (iter.hasNext) { + iter.next() match { + case Result.Blocked(blockedRequest, continue) => + blockedRequests = if (mode == 0) blockedRequests ++ blockedRequest else blockedRequests && blockedRequest + continue match { + case Continue.Effect(query) => + effectBuilder.addOne(query) + effectIndicesBuilder.addOne(index) + case Continue.Get(io) => + getBuilder.addOne(io) + getIndicesBuilder.addOne(index) + } + index += 1 + case Result.Done(b) => + doneBuilder.addOne(b) + doneIndicesBuilder.addOne(index) + index += 1 + case Result.Fail(e) => + failBuilder.addOne(e) + index += 1 + } } + val dones = doneBuilder.result() + val doneIndices = doneIndicesBuilder.result() + val effects = effectBuilder.result() + val effectIndices = effectIndicesBuilder.result() + val gets = getBuilder.result() + val getIndices = getIndicesBuilder.result() + val fails = failBuilder.result() + val size = index // Store in a val to avoid boxing of var when used in the functions below + + if (gets.isEmpty && effects.isEmpty && fails.isEmpty) + Result.done(bf.fromSpecific(as)(dones)) + else if (fails.isEmpty) { + val continue = + if (effects.isEmpty) { + val io = ZIO.collectAll(gets).map { gets => + val array = Array.ofDim[B](size)(ClassTag.AnyRef.asInstanceOf[ClassTag[B]]) + addToArray(array)(getIndices, gets) + addToArray(array)(doneIndices, dones) + bf.fromSpecific(as)(array) + } + Continue.get(io) + } else { + val collect = mode match { + case 0 => ZQuery.collectAll(effects) + case 1 => ZQuery.collectAllPar(effects) + case 2 => ZQuery.collectAllBatched(effects) + } + val query = collect.mapZIO { effects => + ZIO.collectAll(gets).map { gets => + val array = Array.ofDim[B](size)(ClassTag.AnyRef.asInstanceOf[ClassTag[B]]) + addToArray(array)(effectIndices, effects) + addToArray(array)(getIndices, gets) + addToArray(array)(doneIndices, dones) + bf.fromSpecific(as)(array) + } + } + Continue.effect(query) + } + Result.blocked(blockedRequests, continue) + } else + Result.fail(fails.foldLeft[Cause[E]](Cause.empty)(_ && _)) + } + /** * Performs a query for each element in a Set, collecting the results into a * query returning a collection of their results. Requests will be executed in @@ -1345,26 +1336,28 @@ object ZQuery { ZIO.succeed( Result.blocked( BlockedRequests.single(dataSource, BlockedRequest(request, promise)), - Continue[R, E, A, B](promise) + Continue(promise) ) ) case Right(promise) => promise.poll.flatMap { - case None => ZIO.succeed(Result.blocked(BlockedRequests.empty, Continue[R, E, A, B](promise))) + case None => ZIO.succeed(Result.blocked(BlockedRequests.empty, Continue(promise))) case Some(io) => io.exit.map(Result.fromExit) } } ZQuery.currentCache.getWith { - case cache: Cache.Default => foldPromise(cache.lookupUnsafe(request, fiberId)(Unsafe.unsafe, implicitly)) + case cache: Cache.Default => foldPromise(cache.lookupUnsafe(request, fiberId)(Unsafe.unsafe)) case cache => cache.lookup(request).flatMap(foldPromise) } - } else - Promise.makeAs[E, B](fiberId).map { promise => + } else { + ZIO.succeed { + val promise = Promise.unsafe.make[E, B](fiberId)(Unsafe.unsafe) Result.blocked( BlockedRequests.single(dataSource, BlockedRequest(request, promise)), - Continue[R, E, A, B](promise) + Continue(promise) ) } + } } } } @@ -1536,23 +1529,17 @@ object ZQuery { query.step.raceWith[R, Nothing, Nothing, B1, Result[R, E, B1]](fiber.join)( (leftExit, rightFiber) => leftExit.foldExitZIO( - cause => rightFiber.interrupt *> ZIO.succeed(Result.fail(cause)), - result => - result match { - case Result.Blocked(blockedRequests, continue) => - continue match { - case Continue.Effect(query) => - ZIO.succeed(Result.blocked(blockedRequests, Continue.effect(race(query, fiber)))) - case Continue.Get(io) => - ZIO.succeed( - Result.blocked(blockedRequests, Continue.effect(race(ZQuery.fromZIONow(io), fiber))) - ) - } - case Result.Done(value) => rightFiber.interrupt *> ZIO.succeed(Result.done(value)) - case Result.Fail(cause) => rightFiber.interrupt *> ZIO.succeed(Result.fail(cause)) - } + cause => rightFiber.interrupt.as(Result.fail(cause)), + { + case Result.Blocked(blockedRequests, Continue.Effect(query)) => + ZIO.succeed(Result.blocked(blockedRequests, Continue.effect(race(query, fiber)))) + case Result.Blocked(blockedRequests, Continue.Get(io)) => + ZIO.succeed(Result.blocked(blockedRequests, Continue.effect(race(ZQuery.fromZIONow(io), fiber)))) + case Result.Done(value) => rightFiber.interrupt.as(Result.done(value)) + case Result.Fail(cause) => rightFiber.interrupt.as(Result.fail(cause)) + } ), - (rightExit, leftFiber) => leftFiber.interrupt *> ZIO.succeed(Result.fromExit(rightExit)) + (rightExit, leftFiber) => leftFiber.interrupt.as(Result.fromExit(rightExit)) ) } @@ -1595,8 +1582,8 @@ object ZQuery { val cs = ChunkBuilder.make[C]() as.foreach { a => f(a) match { - case Left(b) => bs += b - case Right(c) => cs += c + case Left(b) => bs addOne b + case Right(c) => cs addOne c } } (bs.result(), cs.result()) @@ -1636,39 +1623,38 @@ object ZQuery { ZQuery.unwrap { ZQuery.currentScope.getWith { scope => ZIO.environmentWithZIO[R] { environment => - Ref.make(true).flatMap { ref => - ZIO.uninterruptible { - ZIO.suspendSucceed(acquire()).tap { a => - scope.addFinalizerExit { - case Exit.Failure(cause) => - release(a, Exit.failCause(cause.stripFailures)) - .provideEnvironment(environment) - .whenZIO(ref.getAndSet(false)) - case Exit.Success(_) => - ZIO.unit - } + val ref = new AtomicBoolean(true) + ZIO.uninterruptible { + ZIO.suspendSucceed(acquire()).tap { a => + scope.addFinalizerExit { + case Exit.Failure(cause) => + release(a, Exit.failCause(cause.stripFailures)) + .provideEnvironment(environment) + .when(ref.getAndSet(false)) + case Exit.Success(_) => + ZIO.unit } - }.map { a => - ZQuery - .suspend(use(a)) - .foldCauseQuery( - cause => - ZQuery.fromZIONow { - ZIO - .suspendSucceed(release(a, Exit.failCause(cause))) - .whenZIO(ref.getAndSet(false)) - .mapErrorCause(cause ++ _) *> - ZIO.refailCause(cause) - }, - b => - ZQuery.fromZIONow { - ZIO - .suspendSucceed(release(a, Exit.succeed(b))) - .whenZIO(ref.getAndSet(false)) - .as(b) - } - ) } + }.map { a => + ZQuery + .suspend(use(a)) + .foldCauseQuery( + cause => + ZQuery.fromZIONow { + ZIO + .suspendSucceed(release(a, Exit.failCause(cause))) + .when(ref.getAndSet(false)) + .mapErrorCause(cause ++ _) *> + ZIO.refailCause(cause) + }, + b => + ZQuery.fromZIONow { + ZIO + .suspendSucceed(release(a, Exit.succeed(b))) + .when(ref.getAndSet(false)) + .as(b) + } + ) } } } diff --git a/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala b/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala index 09d12446..1bf58b06 100644 --- a/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala +++ b/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala @@ -120,9 +120,9 @@ private[query] sealed trait BlockedRequests[-R] { self => } } .flatMap { completedRequests => - ZQuery.cachingEnabled.getWith { cachingEnabled => + ZQuery.cachingEnabled.getWith { val completedRequestsM = completedRequests.toMutableMap - if (cachingEnabled) { + if (_) { completePromises(dataSource, sequential) { req => // Pop the entry, and fallback to the immutable one if we already removed it completedRequestsM.remove(req) orElse completedRequests.lookup(req) @@ -296,29 +296,22 @@ private[query] object BlockedRequests { cache: Cache, map: mutable.HashMap[Request[_, _], Exit[Any, Any]] )(implicit trace: Trace): UIO[Unit] = - ZIO.fiberIdWith { fiberId => - cache match { - case cache: Cache.Default => + cache match { + case cache: Cache.Default => + ZIO.fiberIdWith { fiberId => ZIO.succeedUnsafe { implicit unsafe => - map.foreach { case (request, exit) => - val promise = Promise.unsafe.make[Any, Any](fiberId) - promise.unsafe.done(exit) - cache.putUnsafe(request.asInstanceOf[Request[Any, Any]], promise) - } - } - case cache => - val iter = map.iterator - ZIO.whileLoop(iter.hasNext) { - Promise.makeAs[Any, Any](fiberId).flatMap { promise => - val (request, exit) = iter.next() + map.foreach { case (request: Request[Any, Any], exit) => cache - .get(request) - .orElse( - promise.done(exit) *> - cache.put(request.asInstanceOf[Request[Any, Any]], promise) - ) + .lookupUnsafe(request, fiberId) + .merge + .unsafe + .done(exit) } - }(_ => ()) - } + } + } + case cache => + ZIO.foreachDiscard(map) { case (request: Request[Any, Any], exit) => + cache.lookup(request).flatMap(_.merge.done(exit)) + } } } diff --git a/zio-query/shared/src/main/scala/zio/query/internal/Continue.scala b/zio-query/shared/src/main/scala/zio/query/internal/Continue.scala index 001aa8d9..69a6c51a 100644 --- a/zio-query/shared/src/main/scala/zio/query/internal/Continue.scala +++ b/zio-query/shared/src/main/scala/zio/query/internal/Continue.scala @@ -163,47 +163,8 @@ private[query] object Continue { * Constructs a continuation from a request, a data source, and a `Promise` * that will contain the result of the request when it is executed. */ - def apply[R, E, A, B](promise: Promise[E, B])(implicit - ev: A <:< Request[E, B], - trace: Trace - ): Continue[R, E, B] = - Continue.get(promise.await) - - /** - * Collects a collection of continuation into a continuation returning a - * collection of their results, in parallel. - */ - def collectAllPar[R, E, A, Collection[+Element] <: Iterable[Element]]( - continues: Collection[Continue[R, E, A]] - )(implicit - bf: BuildFrom[Collection[Continue[R, E, A]], A, Collection[A]], - trace: Trace - ): Continue[R, E, Collection[A]] = - continues.zipWithIndex - .foldLeft[(Chunk[(ZQuery[R, E, A], Int)], Chunk[(IO[E, A], Int)])]((Chunk.empty, Chunk.empty)) { - case ((queries, ios), (continue, index)) => - continue match { - case Effect(query) => (queries :+ ((query, index)), ios) - case Get(io) => (queries, ios :+ ((io, index))) - } - } match { - case (Chunk(), ios) => - get(ZIO.collectAll(ios.map(_._1)).map(bf.fromSpecific(continues))) - case (queries, ios) => - val query = ZQuery.collectAllPar(queries.map(_._1)).flatMap { as => - val array = Array.ofDim[AnyRef](continues.size) - as.zip(queries.map(_._2)).foreach { case (a, i) => - array(i) = a.asInstanceOf[AnyRef] - } - ZQuery.fromZIONow(ZIO.collectAll(ios.map(_._1))).map { as => - as.zip(ios.map(_._2)).foreach { case (a, i) => - array(i) = a.asInstanceOf[AnyRef] - } - bf.fromSpecific(continues)(array.asInstanceOf[Array[A]]) - } - } - effect(query) - } + def apply[E, A](promise: Promise[E, A])(implicit trace: Trace): Continue[Any, E, A] = + Get(promise.await) /** * Constructs a continuation that may perform arbitrary effects. diff --git a/zio-query/shared/src/main/scala/zio/query/internal/Result.scala b/zio-query/shared/src/main/scala/zio/query/internal/Result.scala index 39a77e79..d810db40 100644 --- a/zio-query/shared/src/main/scala/zio/query/internal/Result.scala +++ b/zio-query/shared/src/main/scala/zio/query/internal/Result.scala @@ -111,43 +111,6 @@ private[query] object Result { def blocked[R, E, A](blockedRequests: BlockedRequests[R], continue: Continue[R, E, A]): Result[R, E, A] = Blocked(blockedRequests, continue) - /** - * Collects a collection of results into a single result. Blocked requests and - * their continuations will be executed in parallel. - */ - def collectAllPar[R, E, A, Collection[+Element] <: Iterable[Element]](results: Collection[Result[R, E, A]])(implicit - bf: BuildFrom[Collection[Result[R, E, A]], A, Collection[A]], - trace: Trace - ): Result[R, E, Collection[A]] = - results.zipWithIndex - .foldLeft[(Chunk[((BlockedRequests[R], Continue[R, E, A]), Int)], Chunk[(A, Int)], Chunk[(Cause[E], Int)])]( - (Chunk.empty, Chunk.empty, Chunk.empty) - ) { case ((blocked, done, fails), (result, index)) => - result match { - case Blocked(br, c) => (blocked :+ (((br, c), index)), done, fails) - case Done(a) => (blocked, done :+ ((a, index)), fails) - case Fail(e) => (blocked, done, fails :+ ((e, index))) - } - } match { - case (Chunk(), done, Chunk()) => - Result.done(bf.fromSpecific(results)(done.map(_._1))) - case (blocked, done, Chunk()) => - val blockedRequests = blocked.map(_._1._1).foldLeft[BlockedRequests[R]](BlockedRequests.empty)(_ && _) - val continue = Continue.collectAllPar(blocked.map(_._1._2)).map { as => - val array = Array.ofDim[AnyRef](results.size) - as.zip(blocked.map(_._2)).foreach { case (a, i) => - array(i) = a.asInstanceOf[AnyRef] - } - done.foreach { case (a, i) => - array(i) = a.asInstanceOf[AnyRef] - } - bf.fromSpecific(results)(array.asInstanceOf[Array[A]]) - } - Result.blocked(blockedRequests, continue) - case (_, _, fail) => - Result.fail(fail.map(_._1).foldLeft[Cause[E]](Cause.empty)(_ && _)) - } - /** * Constructs a result that is done with the specified value. */