Skip to content

Commit

Permalink
Add contextual interop between cats-effect and ZIO (#1246)
Browse files Browse the repository at this point in the history
  • Loading branch information
iRevive authored Jan 31, 2022
1 parent 9fcd78b commit a1673fe
Show file tree
Hide file tree
Showing 10 changed files with 900 additions and 66 deletions.
33 changes: 17 additions & 16 deletions adapters/http4s/src/main/scala/caliban/Http4sAdapter.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package caliban

import caliban.execution.QueryExecution
import caliban.interop.cats.CatsInterop
import caliban.interop.cats.{ CatsInterop, ToEffect }
import caliban.interop.tapir.TapirAdapter.{ zioMonadError, CalibanPipe, ZioWebSockets }
import caliban.interop.tapir.{ RequestInterceptor, TapirAdapter, WebSocketHooks }
import cats.data.Kleisli
import cats.effect.Async
import cats.effect.std.Dispatcher
import cats.~>
import org.http4s._
import org.http4s.server.websocket.WebSocketBuilder2
Expand Down Expand Up @@ -49,7 +48,7 @@ object Http4sAdapter {
enableIntrospection: Boolean = true,
queryExecution: QueryExecution = QueryExecution.Parallel,
requestInterceptor: RequestInterceptor[R] = RequestInterceptor.empty
)(implicit runtime: Runtime[R]): HttpRoutes[F] = {
)(implicit interop: ToEffect[F, R]): HttpRoutes[F] = {
val endpoints = TapirAdapter.makeHttpService[R, E](
interpreter,
skipValidation,
Expand Down Expand Up @@ -84,7 +83,7 @@ object Http4sAdapter {
enableIntrospection: Boolean = true,
queryExecution: QueryExecution = QueryExecution.Parallel,
requestInterceptor: RequestInterceptor[R] = RequestInterceptor.empty
)(implicit runtime: Runtime[R]): HttpRoutes[F] = {
)(implicit interop: ToEffect[F, R]): HttpRoutes[F] = {
val endpoint = TapirAdapter.makeHttpUploadService[R, E](
interpreter,
skipValidation,
Expand Down Expand Up @@ -120,7 +119,7 @@ object Http4sAdapter {
.toRoutes(builder.asInstanceOf[WebSocketBuilder2[RIO[R1 with Clock with Blocking, *]]])
}

def makeWebSocketServiceF[F[_]: Async: Dispatcher, R, E](
def makeWebSocketServiceF[F[_]: Async, R, E](
builder: WebSocketBuilder2[F],
interpreter: GraphQLInterpreter[R, E],
skipValidation: Boolean = false,
Expand All @@ -129,7 +128,7 @@ object Http4sAdapter {
queryExecution: QueryExecution = QueryExecution.Parallel,
requestInterceptor: RequestInterceptor[R] = RequestInterceptor.empty,
webSocketHooks: WebSocketHooks[R, E] = WebSocketHooks.empty
)(implicit runtime: Runtime[R]): HttpRoutes[F] = {
)(implicit interop: CatsInterop[F, R], runtime: Runtime[R]): HttpRoutes[F] = {
val endpoint = TapirAdapter.makeWebSocketService[R, E](
interpreter,
skipValidation,
Expand Down Expand Up @@ -195,22 +194,22 @@ object Http4sAdapter {
* If you wish to use `Http4sServerInterpreter` with cats-effect IO instead of `ZHttp4sServerInterpreter`,
* you can use this function to convert the tapir endpoints to their cats-effect counterpart.
*/
def convertHttpEndpointToF[F[_]: Async, R, E](
def convertHttpEndpointToF[F[_], R, E](
endpoint: ServerEndpoint[Any, RIO[R, *]]
)(implicit runtime: Runtime[R]): ServerEndpoint[Any, F] =
)(implicit interop: ToEffect[F, R]): ServerEndpoint[Any, F] =
ServerEndpoint[endpoint.A, endpoint.U, endpoint.I, endpoint.E, endpoint.O, Any, F](
endpoint.endpoint,
_ => a => CatsInterop.toEffect(endpoint.securityLogic(zioMonadError)(a)),
_ => u => req => CatsInterop.toEffect(endpoint.logic(zioMonadError)(u)(req))
_ => a => interop.toEffect(endpoint.securityLogic(zioMonadError)(a)),
_ => u => req => interop.toEffect(endpoint.logic(zioMonadError)(u)(req))
)

/**
* If you wish to use `Http4sServerInterpreter` with cats-effect IO instead of `ZHttp4sServerInterpreter`,
* you can use this function to convert the tapir endpoints to their cats-effect counterpart.
*/
def convertWebSocketEndpointToF[F[_]: Async: Dispatcher, R, E](
def convertWebSocketEndpointToF[F[_], R, E](
endpoint: ServerEndpoint[ZioWebSockets, RIO[R, *]]
)(implicit runtime: Runtime[R]): ServerEndpoint[Fs2Streams[F] with WebSockets, F] = {
)(implicit interop: CatsInterop[F, R], runtime: Runtime[R]): ServerEndpoint[Fs2Streams[F] with WebSockets, F] = {
type Fs2Pipe = fs2.Pipe[F, GraphQLWSInput, GraphQLWSOutput]

val e = endpoint
Expand All @@ -220,17 +219,19 @@ object Http4sAdapter {

ServerEndpoint[endpoint.A, endpoint.U, endpoint.I, endpoint.E, Fs2Pipe, Fs2Streams[F] with WebSockets, F](
e.endpoint.asInstanceOf[Endpoint[endpoint.A, endpoint.I, endpoint.E, Fs2Pipe, Any]],
_ => a => CatsInterop.toEffect(e.securityLogic(zioMonadError)(a)),
_ => a => interop.toEffect(e.securityLogic(zioMonadError)(a)),
_ =>
u =>
req =>
CatsInterop.toEffect(
interop.toEffect(
e.logic(zioMonadError)(u)(req)
.map(_.map { zioPipe =>
import zio.stream.interop.fs2z._
fs2InputStream =>
zioPipe(fs2InputStream.translate(CatsInterop.fromEffectK[F, Any]).toZStream()).toFs2Stream
.translate(CatsInterop.toEffectK)
zioPipe(
fs2InputStream.translate(interop.fromEffectK).toZStream().provide(runtime.environment)
).toFs2Stream
.translate(interop.toEffectK)
})
)
)
Expand Down
11 changes: 9 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ val allScala = Seq(scala212, scala213, scala3)
val akkaVersion = "2.6.17"
val catsEffect2Version = "2.5.4"
val catsEffect3Version = "3.3.5"
val catsMtlVersion = "1.2.1"
val circeVersion = "0.14.1"
val http4sVersion = "0.23.9"
val laminextVersion = "0.14.3"
Expand Down Expand Up @@ -198,12 +199,16 @@ lazy val catsInterop = project
.settings(name := "caliban-cats")
.settings(commonSettings)
.settings(
testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework")),
autoAPIMappings := true,
libraryDependencies ++= {
if (scalaVersion.value == scala3) Seq()
else Seq(compilerPlugin(("org.typelevel" %% "kind-projector" % "0.13.2").cross(CrossVersion.full)))
} ++ Seq(
"org.typelevel" %% "cats-effect" % catsEffect3Version,
"dev.zio" %% "zio-interop-cats" % zioInteropCats3Version,
"org.typelevel" %% "cats-effect" % catsEffect3Version
"dev.zio" %% "zio-test" % zioVersion % Test,
"dev.zio" %% "zio-test-sbt" % zioVersion % Test
)
)
.dependsOn(core)
Expand Down Expand Up @@ -387,6 +392,7 @@ lazy val examples = project
crossScalaVersions -= scala3,
libraryDependencySchemes += "org.scala-lang.modules" %% "scala-java8-compat" % "always",
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-mtl" % catsMtlVersion,
"org.http4s" %% "http4s-blaze-server" % http4sVersion,
"org.http4s" %% "http4s-dsl" % http4sVersion,
"com.softwaremill.sttp.client3" %% "async-http-client-backend-zio" % sttpVersion,
Expand Down Expand Up @@ -456,7 +462,8 @@ lazy val docs = project
scalacOptions += "-Wunused:imports",
libraryDependencies ++= Seq(
"com.softwaremill.sttp.client3" %% "async-http-client-backend-zio" % sttpVersion,
"io.circe" %% "circe-generic" % circeVersion
"io.circe" %% "circe-generic" % circeVersion,
"org.typelevel" %% "cats-mtl" % catsMtlVersion
)
)
.dependsOn(core, catsInterop, tapirInterop, http4s, tools)
Expand Down
139 changes: 139 additions & 0 deletions examples/src/main/scala/example/http4s/AuthExampleAppF.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package example.http4s

import caliban.GraphQL._
import caliban.interop.cats.{ CatsInterop, InjectEnv }
import caliban.schema.GenericSchema
import caliban.{ CalibanError, GraphQL, Http4sAdapter, RootResolver }
import cats.data.{ Kleisli, OptionT }
import cats.MonadThrow
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.effect.{ Async, IO, IOApp, Resource }
import cats.effect.std.Dispatcher
import cats.mtl.Local
import cats.mtl.syntax.local._
import org.http4s.HttpApp
import org.http4s.server.Server
import org.http4s.{ HttpRoutes, Request }
import org.http4s.dsl.Http4sDsl
import org.http4s.implicits._
import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.server.{ Router, ServiceErrorHandler }
import org.typelevel.ci._
import zio.Runtime

/**
* The examples shows how to utilize contextual interop between cats-effect and ZIO.
*
* Run server:
* examples/runMain example.http4s.AuthExampleAppF
*
* Send a request:
* curl -X POST -H "X-Token: my-token" -d '{"query": "query { token }"}' localhost:8088/api/graphql
*
* Response:
* {"data":{"token":"my-token"}}
*/
object AuthExampleAppF extends IOApp.Simple {

type AuthLocal[F[_]] = Local[F, AuthInfo]
object AuthLocal {
def apply[F[_]](implicit ev: AuthLocal[F]): AuthLocal[F] = ev

def token[F[_]: MonadThrow](implicit local: AuthLocal[F]): F[AuthInfo.Token] =
local.ask.flatMap {
case t: AuthInfo.Token => MonadThrow[F].pure(t)
case AuthInfo.Empty => MonadThrow[F].raiseError(MissingToken())
}
}

sealed trait AuthInfo
object AuthInfo {
final case object Empty extends AuthInfo
final case class Token(token: String) extends AuthInfo
}

final case class MissingToken() extends Throwable

// http4s middleware that extracts a token from the request and executes the request with AuthInfo available in the scope
object AuthMiddleware {
private val TokenHeader = ci"X-Token"

def httpRoutes[F[_]: MonadThrow: AuthLocal](routes: HttpRoutes[F]): HttpRoutes[F] =
Kleisli { (req: Request[F]) =>
req.headers.get(TokenHeader) match {
case Some(token) => routes.run(req).scope(AuthInfo.Token(token.head.value): AuthInfo)
case None => OptionT.liftF(MonadThrow[F].raiseError(MissingToken()))
}
}
}

// Simple service that returns the token coming from the request
final case class Query[F[_]](token: F[String])

class GQL[F[_]: MonadThrow: AuthLocal](implicit interop: CatsInterop[F, AuthInfo]) {

def createGraphQL: GraphQL[AuthInfo] = {
val schema: GenericSchema[AuthInfo] = new GenericSchema[AuthInfo] {}
import schema._
import caliban.interop.cats.implicits._ // summons `Schema[Auth, F[String]]` instance

graphQL(RootResolver(query))
}

private def query: Query[F] =
Query(
token = AuthLocal.token[F].map(authInfo => authInfo.token)
)
}

class Api[F[_]: Async: AuthLocal](implicit interop: CatsInterop[F, AuthInfo]) extends Http4sDsl[F] {

def httpApp(graphQL: GraphQL[AuthInfo]): F[HttpApp[F]] =
for {
routes <- createRoutes(graphQL)
} yield Router("/api/graphql" -> AuthMiddleware.httpRoutes(routes)).orNotFound

def createRoutes(graphQL: GraphQL[AuthInfo]): F[HttpRoutes[F]] =
for {
interpreter <- interop.toEffect(graphQL.interpreter)
} yield Http4sAdapter.makeHttpServiceF[F, AuthInfo, CalibanError](interpreter)

// http4s error handler to customize the response for our throwable
def errorHandler: ServiceErrorHandler[F] = _ => { case MissingToken() => Forbidden() }
}

def program[F[_]: Async: AuthLocal](implicit
runtime: Runtime[AuthInfo],
injector: InjectEnv[F, AuthInfo]
): Resource[F, Server] = {

def makeHttpServer(httpApp: HttpApp[F], errorHandler: ServiceErrorHandler[F]): Resource[F, Server] =
BlazeServerBuilder[F]
.withServiceErrorHandler(errorHandler)
.bindHttp(8088, "localhost")
.withHttpApp(httpApp)
.resource

Dispatcher[F].flatMap { dispatcher =>
implicit val interop: CatsInterop.Contextual[F, AuthInfo] = CatsInterop.contextual(dispatcher)

val gql = new GQL[F]
val api = new Api[F]

for {
httpApp <- Resource.eval(api.httpApp(gql.createGraphQL))
httpServer <- makeHttpServer(httpApp, api.errorHandler)
} yield httpServer
}
}

def run: IO[Unit] = {
type Effect[A] = Kleisli[IO, AuthInfo, A]

implicit val runtime: Runtime[AuthInfo] = Runtime.default.as(AuthInfo.Empty)

program[Effect].useForever.run(AuthInfo.Empty).void
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package example.interop.cats

import caliban.GraphQL.graphQL
import caliban.interop.cats.CatsInterop
import caliban.schema.GenericSchema
import caliban.{ GraphQL, RootResolver }
import cats.data.Kleisli
import cats.effect.{ Async, ExitCode, IO, IOApp }
import cats.effect.std.{ Console, Dispatcher }
import cats.mtl.syntax.local._
import cats.mtl.Local
import cats.syntax.functor._
import cats.syntax.flatMap._
import zio.{ Runtime, ZEnv }

object ContextualCatsInterop extends IOApp {

import caliban.interop.cats.implicits._

case class Number(value: Int)

case class Queries[F[_]](numbers: List[Number], randomNumber: F[Number])

val query = """
{
numbers {
value
}
randomNumber {
value
}
}"""

case class LogContext(operation: String) {
def child(next: String): LogContext = LogContext(s"$operation -> $next")
}

/**
* The example shows the propagation of the `LogContext` from cats-effect to ZIO and vice-versa.
*
* Console output:
* Executing request - root
* Get random number - root -> execute-request
* Generating a random number - root -> execute-request -> random-number
* Generated number: 485599760 - root -> execute-request -> random-number
* Request result: {"numbers":[{"value":1},{"value":2},{"value":3},{"value":4}],"randomNumber":{"value":485599760}} - root
*/
override def run(args: List[String]): IO[ExitCode] = {
type Effect[A] = Kleisli[IO, LogContext, A]

val root = LogContext("root")

Dispatcher[Effect].use { dispatcher =>
implicit val logger: Logger[Effect] =
new Logger[Effect] {
def info(message: String): Effect[Unit] =
for {
ctx <- Local[Effect, LogContext].ask[LogContext]
_ <- Console[Effect].println(s"$message - ${ctx.operation}")
} yield ()
}

implicit val zioRuntime: Runtime[LogContext] = Runtime.default.as(root)
implicit val interop: CatsInterop[Effect, LogContext] = CatsInterop.contextual(dispatcher)

program[Effect]
}.run(root)
}

def program[F[_]: Async: Logger](implicit
local: Local[F, LogContext],
interop: CatsInterop[F, LogContext], // required for a derivation of the schema
runtime: Runtime[LogContext]
): F[ExitCode] = {
val numbers = List(1, 2, 3, 4).map(Number)

val randomNumber =
Logger[F].info("Get random number") >> {
for {
_ <- Logger[F].info("Generating a random number")
number <- Async[F].delay(scala.util.Random.nextInt())
_ <- Logger[F].info(s"Generated number: $number")
} yield Number(number)
}.local[LogContext](_.child("random-number"))

val queries = Queries[F](numbers, randomNumber)

val api: GraphQL[LogContext] = {
object ContextSchema extends GenericSchema[LogContext]
import ContextSchema._ // required for a derivation of the schema

graphQL(RootResolver(queries))
}

for {
interpreter <- api.interpreterAsync[F]
_ <- interpreter.checkAsync[F](query)
_ <- Logger[F].info("Executing request")
result <- interpreter.executeAsync[F](query)(interop).local[LogContext](_.child("execute-request"))
_ <- Logger[F].info(s"Request result: ${result.data}")
} yield ExitCode.Success
}

trait Logger[F[_]] {
def info(message: String): F[Unit]
}
object Logger {
def apply[F[_]](implicit ev: Logger[F]): Logger[F] = ev
}

}
Loading

0 comments on commit a1673fe

Please sign in to comment.