Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: lambda custom runtime #496

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ jobs:

- name: Make target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
run: mkdir -p lambda-cloudformation-custom-resource/.js/target lambda-http4s/.jvm/target unidocs/target lambda-http4s/.js/target lambda/js/target scalafix/rules/target lambda/jvm/target sbt-lambda/target lambda-cloudformation-custom-resource/.jvm/target project/target
run: mkdir -p lambda-kernel/jvm/target lambda-kernel/js/target lambda-runtime-binding/jvm/target lambda-cloudformation-custom-resource/.js/target lambda-http4s/.jvm/target unidocs/target lambda-runtime/.js/target lambda-http4s/.js/target lambda-runtime/.jvm/target scalafix/rules/target lambda-runtime-binding/js/target sbt-lambda/target lambda-cloudformation-custom-resource/.jvm/target project/target

- name: Compress target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
run: tar cf targets.tar lambda-cloudformation-custom-resource/.js/target lambda-http4s/.jvm/target unidocs/target lambda-http4s/.js/target lambda/js/target scalafix/rules/target lambda/jvm/target sbt-lambda/target lambda-cloudformation-custom-resource/.jvm/target project/target
run: tar cf targets.tar lambda-kernel/jvm/target lambda-kernel/js/target lambda-runtime-binding/jvm/target lambda-cloudformation-custom-resource/.js/target lambda-http4s/.jvm/target unidocs/target lambda-runtime/.js/target lambda-http4s/.js/target lambda-runtime/.jvm/target scalafix/rules/target lambda-runtime-binding/js/target sbt-lambda/target lambda-cloudformation-custom-resource/.jvm/target project/target

- name: Upload target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
Expand Down
94 changes: 82 additions & 12 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ lazy val commonSettings = Seq(
lazy val root =
tlCrossRootProject
.aggregate(
lambda,
`lambda-kernel`,
`lambda-runtime-binding`,
`lambda-runtime`,
lambdaHttp4s,
lambdaCloudFormationCustomResource,
examples,
Expand All @@ -82,10 +84,10 @@ lazy val rootSbtScalafix = project
.aggregate(scalafix.componentProjectReferences: _*)
.enablePlugins(NoPublishPlugin)

lazy val lambda = crossProject(JSPlatform, JVMPlatform)
.in(file("lambda"))
lazy val `lambda-kernel` = crossProject(JSPlatform, JVMPlatform)
.in(file("lambda-kernel"))
.settings(
name := "feral-lambda",
name := "feral-lambda-kernel",
libraryDependencies ++= Seq(
"org.typelevel" %%% "cats-effect" % catsEffectVersion,
"org.tpolecat" %%% "natchez-core" % natchezVersion,
Expand All @@ -94,18 +96,79 @@ lazy val lambda = crossProject(JSPlatform, JVMPlatform)
"com.comcast" %%% "ip4s-core" % "3.6.0",
"org.scodec" %%% "scodec-bits" % "1.2.0",
"org.scalameta" %%% "munit-scalacheck" % munitVersion % Test,
"io.circe" %%% "circe-literal" % circeVersion % Test
),
mimaPreviousArtifacts := Set(
"org.typelevel" %%% "feral-lambda" % "0.3.0"
),
mimaBinaryIssueFilters ++= Seq(
// These classes are moved to lambda-runtime-binding module
ProblemFilters.exclude[MissingTypesProblem]("feral.lambda.Context$"),
ProblemFilters.exclude[MissingClassProblem]("feral.lambda.ContextCompanionPlatform"),
ProblemFilters.exclude[MissingClassProblem]("feral.lambda.IOLambda"),
ProblemFilters.exclude[MissingClassProblem]("feral.lambda.IOLambda$"),
ProblemFilters.exclude[MissingClassProblem]("feral.lambda.IOLambda$Simple"),
ProblemFilters.exclude[MissingClassProblem]("feral.lambda.IOLambdaPlatform")
)
)
.settings(commonSettings)
.jsSettings(
libraryDependencies += "io.github.cquiroz" %%% "scala-java-time" % "2.5.0",
mimaBinaryIssueFilters ++= Seq(
// These classes are moved to lambda-runtime-binding module
ProblemFilters.exclude[DirectMissingMethodProblem]("feral.lambda.Context.fromJS"),
ProblemFilters.exclude[MissingClassProblem]("feral.lambda.facade.ClientContext"),
ProblemFilters.exclude[MissingClassProblem]("feral.lambda.facade.ClientContextClient"),
ProblemFilters.exclude[MissingClassProblem]("feral.lambda.facade.ClientContextEnv"),
ProblemFilters.exclude[MissingClassProblem]("feral.lambda.facade.CognitoIdentity"),
ProblemFilters.exclude[MissingClassProblem]("feral.lambda.facade.Context")
)
)
.jvmSettings(
mimaBinaryIssueFilters ++= Seq(
// These classes are moved to lambda-runtime-binding module
ProblemFilters.exclude[DirectMissingMethodProblem]("feral.lambda.Context.fromJava")
)
)

lazy val `lambda-runtime` = crossProject(JVMPlatform, JSPlatform)
.crossType(CrossType.Pure)
.in(file("lambda-runtime"))
.settings(
name := "feral-lambda-runtime",
libraryDependencies ++= Seq(
"org.http4s" %%% "http4s-core" % http4sVersion,
"org.http4s" %%% "http4s-ember-client" % http4sVersion,
"org.http4s" %%% "http4s-circe" % http4sVersion,
"org.http4s" %%% "http4s-dsl" % http4sVersion % Test,
"org.typelevel" %%% "munit-cats-effect-3" % munitCEVersion % Test,
"io.circe" %%% "circe-literal" % circeVersion % Test
),
mimaPreviousArtifacts := Set.empty
)
.settings(commonSettings)
.dependsOn(`lambda-kernel`)

lazy val `lambda-runtime-binding` = crossProject(JSPlatform, JVMPlatform)
.in(file("lambda-runtime-binding"))
.settings(
name := "feral-lambda-runtime-binding",
libraryDependencies ++= Seq(
"org.typelevel" %%% "munit-cats-effect-3" % munitCEVersion % Test,
"io.circe" %%% "circe-literal" % circeVersion % Test
),
mimaPreviousArtifacts := Set(
"org.typelevel" %%% "feral-lambda" % "0.3.0"
),
mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[IncompatibleResultTypeProblem]("feral.lambda.IOLambda.setupMemo")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("feral.lambda.IOLambda.setupMemo"),
ProblemFilters.exclude[MissingClassProblem]("feral.lambda.*")
)
)
.settings(commonSettings)
.jsSettings(
libraryDependencies ++= Seq(
"io.circe" %%% "circe-scalajs" % circeVersion,
"io.github.cquiroz" %%% "scala-java-time" % "2.5.0"
"io.circe" %%% "circe-scalajs" % circeVersion
)
)
.jvmSettings(
Expand All @@ -115,6 +178,7 @@ lazy val lambda = crossProject(JSPlatform, JVMPlatform)
"co.fs2" %%% "fs2-io" % fs2Version
)
)
.dependsOn(`lambda-kernel`)

lazy val sbtLambda = project
.in(file("sbt-lambda"))
Expand All @@ -129,7 +193,9 @@ lazy val sbtLambda = project
scriptedLaunchOpts := {
scriptedLaunchOpts.value ++ Seq("-Xmx1024M", "-Dplugin.version=" + version.value)
},
scripted := scripted.dependsOn(lambda.js / publishLocal).evaluated,
scripted := scripted
.dependsOn(`lambda-kernel`.js / publishLocal, `lambda-runtime-binding`.js / publishLocal)
.evaluated,
Test / test := scripted.toTask("").value
)

Expand All @@ -143,7 +209,9 @@ lazy val lambdaHttp4s = crossProject(JSPlatform, JVMPlatform)
)
)
.settings(commonSettings)
.dependsOn(lambda % "compile->compile;test->test")
.dependsOn(
`lambda-kernel` % "compile->compile;test->test",
`lambda-runtime-binding` % "compile->compile;test->test")

lazy val lambdaCloudFormationCustomResource = crossProject(JSPlatform, JVMPlatform)
.crossType(CrossType.Pure)
Expand All @@ -168,7 +236,7 @@ lazy val lambdaCloudFormationCustomResource = crossProject(JSPlatform, JVMPlatfo
)
)
.settings(commonSettings)
.dependsOn(lambda)
.dependsOn(`lambda-runtime-binding`)

lazy val examples = crossProject(JSPlatform, JVMPlatform)
.crossType(CrossType.Pure)
Expand All @@ -183,7 +251,7 @@ lazy val examples = crossProject(JSPlatform, JVMPlatform)
)
)
.settings(commonSettings)
.dependsOn(lambda, lambdaHttp4s)
.dependsOn(`lambda-runtime-binding`, lambdaHttp4s)
.enablePlugins(NoPublishPlugin)

lazy val unidocs = project
Expand All @@ -196,7 +264,9 @@ lazy val unidocs = project
inProjects(sbtLambda)
else
inProjects(
lambda.jvm,
`lambda-kernel`.jvm,
`lambda-runtime-binding`.jvm,
`lambda-runtime`.jvm,
lambdaHttp4s.jvm,
lambdaCloudFormationCustomResource.jvm
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ sealed abstract class Context[F[_]] {

}

object Context extends ContextCompanionPlatform {
object Context {
i10416 marked this conversation as resolved.
Show resolved Hide resolved
def apply[F[_]](
functionName: String,
functionVersion: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.circe.scalajs._

import scala.concurrent.duration._

private[lambda] trait ContextCompanionPlatform {
private[lambda] object ContextPlatform {

private[lambda] def fromJS[F[_]: Sync](context: facade.Context): Context[F] =
Context(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[lambda] abstract class IOLambdaPlatform[Event, Result] {
val io =
for {
event <- IO.fromEither(decodeJs[Event](event))
result <- handle(Invocation.pure(event, Context.fromJS(context)))
result <- handle(Invocation.pure(event, ContextPlatform.fromJS(context)))
} yield result.map(_.asJsAny).orUndefined

dispatcher.unsafeToPromise(io)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import io.circe.jawn.parse
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

private[lambda] trait ContextCompanionPlatform {
private[lambda] object ContextPlatform {

private[lambda] def fromJava[F[_]: Sync](context: runtime.Context): Context[F] =
Context(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private[lambda] abstract class IOLambdaPlatform[Event, Result]
output: OutputStream,
runtimeContext: lambdaRuntime.Context): Unit = {
val event = jawn.decodeChannel[Event](Channels.newChannel(input)).fold(throw _, identity(_))
val context = Context.fromJava[IO](runtimeContext)
val context = ContextPlatform.fromJava[IO](runtimeContext)
dispatcher
.unsafeRunTimed(
handle.flatMap(_(Invocation.pure(event, context))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
* limitations under the License.
*/

package feral
package lambda
package feral.lambda

import cats.effect.IO
import cats.effect.kernel.Resource
Expand Down
40 changes: 40 additions & 0 deletions lambda-runtime/src/main/scala/feral/lambda/runtime/Context.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2021 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package feral.lambda.runtime

import _root_.feral.lambda.{Context => IContext}
import cats.effect.Temporal
import cats.syntax.functor._

import api.LambdaRequest

private[runtime] object Context {
def from[F[_]](request: LambdaRequest, settings: LambdaSettings)(
implicit F: Temporal[F]): IContext[F] =
IContext[F](
settings.functionName,
settings.functionVersion,
request.invokedFunctionArn,
settings.functionMemorySize,
request.id,
settings.logGroupName,
settings.logStreamName,
request.identity,
request.clientContext,
F.realTime.map(curTime => request.deadlineTime - curTime)
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2021 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package feral.lambda
package runtime

import cats.effect.Resource
import cats.effect.Temporal
import cats.effect.std.Queue
import cats.effect.syntax.resource._
import cats.syntax.all._
import io.circe.Decoder
import io.circe.Encoder
import org.http4s.client.Client

import scala.util.control.NonFatal

import api._

object LambdaRuntime {

def apply[F[_]: Temporal: LambdaRuntimeEnv, Event: Decoder, Result: Encoder](
client: Client[F])(
handler: Resource[F, Invocation[F, Event] => F[Option[Result]]]): F[Nothing] =
LambdaRuntimeAPIClient(client).flatMap(client =>
handler.both(LambdaSettings.fromLambdaEnv.toResource).attempt.use[INothing] {
case Right((handler, settings)) =>
runloop[F, Event, Result](client, settings, handler)
case Left(ex) => client.reportInitError(ex) *> ex.raiseError[F, INothing]
})

private def runloop[F[_]: Temporal, Event: Decoder, Result: Encoder](
client: LambdaRuntimeAPIClient[F],
settings: LambdaSettings,
run: Invocation[F, Event] => F[Option[Result]]) = {
Queue.unbounded[F, LambdaRequest].flatMap { q =>
val producer = client
.nextInvocation()
i10416 marked this conversation as resolved.
Show resolved Hide resolved
.flatMap(q.offer)
.handleErrorWith {
case ex @ ContainerError => ex.raiseError[F, Unit]
case NonFatal(_) => ().pure
case ex => ex.raiseError
}
.foreverM[INothing]
val handleRequest = handleSingleRequest(client, settings, run)
val workers = fs2
.Stream
.fromQueueUnterminated(q)
.parEvalMapUnordered(1024)(handleRequest)
i10416 marked this conversation as resolved.
Show resolved Hide resolved
.compile
.drain
Temporal[F].both(producer, workers).map(_._1)
i10416 marked this conversation as resolved.
Show resolved Hide resolved
}
}
private def handleSingleRequest[F[_]: Temporal, Event: Decoder, Result: Encoder](
client: LambdaRuntimeAPIClient[F],
settings: LambdaSettings,
run: Invocation[F, Event] => F[Option[Result]])(request: LambdaRequest): F[Unit] = {
val program = for {
event <- request.body.as[Event].liftTo[F]
maybeResult <- run(Invocation.pure(event, Context.from[F](request, settings)))
_ <- maybeResult.traverse(client.submit(request.id, _))
} yield ()
program.handleErrorWith {
case ex @ ContainerError => ex.raiseError
case NonFatal(ex) => client.reportInvocationError(request.id, ex)
case ex => ex.raiseError
}
}
}
Loading