From 28ae85577f8a09464206b134f285a7f17eb9a676 Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Sun, 23 Jun 2024 14:04:17 +1000 Subject: [PATCH 1/4] Use eager ZIO/ZQuery constructors as much as possible --- .../scala/caliban/QuickRequestHandler.scala | 14 +- .../execution/NestedZQueryBenchmark.scala | 371 +++++------------- .../NestedZQueryBenchmarkSchema.scala | 20 + build.sbt | 2 +- .../src/main/scala/caliban/Configurator.scala | 2 +- core/src/main/scala/caliban/GraphQL.scala | 12 +- .../scala/caliban/execution/Executor.scala | 29 +- .../caliban/introspection/Introspector.scala | 2 +- core/src/main/scala/caliban/schema/Step.scala | 8 +- .../scala/caliban/validation/Validator.scala | 4 +- .../wrappers/ApolloPersistedQueries.scala | 2 +- .../main/scala/caliban/wrappers/Caching.scala | 4 +- .../main/scala/caliban/wrappers/Wrapper.scala | 4 +- .../scala/caliban/wrappers/Wrappers.scala | 97 ++--- .../scala/caliban/interop/tapir/package.scala | 4 +- 15 files changed, 226 insertions(+), 349 deletions(-) diff --git a/adapters/quick/src/main/scala/caliban/QuickRequestHandler.scala b/adapters/quick/src/main/scala/caliban/QuickRequestHandler.scala index 554148102..3da1e5719 100644 --- a/adapters/quick/src/main/scala/caliban/QuickRequestHandler.scala +++ b/adapters/quick/src/main/scala/caliban/QuickRequestHandler.scala @@ -42,12 +42,14 @@ final private class QuickRequestHandler[R]( new QuickRequestHandler[R & R1](interpreter, config) def handleHttpRequest(request: Request)(implicit trace: Trace): URIO[R, Response] = - transformHttpRequest(request) - .flatMap(executeRequest(request.method, _)) - .foldZIO( - Exit.succeed, - resp => Exit.succeed(transformResponse(request, resp)) - ) + ZIO.suspendSucceed { // Suspending to ensure that all the CPU work we're about to do happens in the ZIO thread pool + transformHttpRequest(request) + .flatMap(executeRequest(request.method, _)) + .foldZIO( + Exit.succeed, + resp => Exit.succeed(transformResponse(request, resp)) + ) + } def handleUploadRequest(request: Request)(implicit trace: Trace): URIO[R, Response] = transformUploadRequest(request).flatMap { case (req, fileHandle) => diff --git a/benchmarks/src/main/scala/caliban/execution/NestedZQueryBenchmark.scala b/benchmarks/src/main/scala/caliban/execution/NestedZQueryBenchmark.scala index c9afcf4c4..3ac3ba130 100644 --- a/benchmarks/src/main/scala/caliban/execution/NestedZQueryBenchmark.scala +++ b/benchmarks/src/main/scala/caliban/execution/NestedZQueryBenchmark.scala @@ -11,8 +11,8 @@ import java.util.concurrent.TimeUnit @State(Scope.Thread) @BenchmarkMode(Array(Mode.Throughput)) @OutputTimeUnit(TimeUnit.SECONDS) -@Warmup(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) +@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) @Fork(1) class NestedZQueryBenchmark { @@ -22,306 +22,145 @@ class NestedZQueryBenchmark { import NestedZQueryBenchmarkSchema._ - val simple100: GraphQLInterpreter[Any, CalibanError] = - run(graphQL[Any, SimpleRoot, Unit, Unit](RootResolver(NestedZQueryBenchmarkSchema.simple100Elements)).interpreter) - val simple1000: GraphQLInterpreter[Any, CalibanError] = - run(graphQL[Any, SimpleRoot, Unit, Unit](RootResolver(NestedZQueryBenchmarkSchema.simple1000Elements)).interpreter) + @Param(Array("100", "10000")) + var size: Int = _ + + @Param(Array("sequential", "parallel", "batched")) + var execution: String = _ + + val simple100: GraphQLInterpreter[Any, CalibanError] = + graphQL[Any, SimpleRoot, Unit, Unit]( + RootResolver(NestedZQueryBenchmarkSchema.simple100Elements) + ).interpreterUnsafe + + val simple1000: GraphQLInterpreter[Any, CalibanError] = + graphQL[Any, SimpleRoot, Unit, Unit]( + RootResolver(NestedZQueryBenchmarkSchema.simple1000Elements) + ).interpreterUnsafe + val simple10000: GraphQLInterpreter[Any, CalibanError] = - run(graphQL[Any, SimpleRoot, Unit, Unit](RootResolver(NestedZQueryBenchmarkSchema.simple10000Elements)).interpreter) + graphQL[Any, SimpleRoot, Unit, Unit]( + RootResolver(NestedZQueryBenchmarkSchema.simple10000Elements) + ).interpreterUnsafe + + val multifield100: GraphQLInterpreter[Any, CalibanError] = + graphQL[Any, MultifieldRoot, Unit, Unit]( + RootResolver(NestedZQueryBenchmarkSchema.multifield100Elements) + ).interpreterUnsafe + + val multifield1000: GraphQLInterpreter[Any, CalibanError] = + graphQL[Any, MultifieldRoot, Unit, Unit]( + RootResolver(NestedZQueryBenchmarkSchema.multifield1000Elements) + ).interpreterUnsafe - val multifield100: GraphQLInterpreter[Any, CalibanError] = - run( - graphQL[Any, MultifieldRoot, Unit, Unit]( - RootResolver(NestedZQueryBenchmarkSchema.multifield100Elements) - ).interpreter - ) - val multifield1000: GraphQLInterpreter[Any, CalibanError] = - run( - graphQL[Any, MultifieldRoot, Unit, Unit]( - RootResolver(NestedZQueryBenchmarkSchema.multifield1000Elements) - ).interpreter - ) val multifield10000: GraphQLInterpreter[Any, CalibanError] = - run( - graphQL[Any, MultifieldRoot, Unit, Unit]( - RootResolver(NestedZQueryBenchmarkSchema.multifield10000Elements) - ).interpreter - ) + graphQL[Any, MultifieldRoot, Unit, Unit]( + RootResolver(NestedZQueryBenchmarkSchema.multifield10000Elements) + ).interpreterUnsafe + + val multifieldEager100: GraphQLInterpreter[Any, CalibanError] = + graphQL[Any, MultifieldRoot, Unit, Unit]( + RootResolver(NestedZQueryBenchmarkSchema.multifieldEager100Elements) + ).interpreterUnsafe + + val multifieldEager1000: GraphQLInterpreter[Any, CalibanError] = + graphQL[Any, MultifieldRoot, Unit, Unit]( + RootResolver(NestedZQueryBenchmarkSchema.multifieldEager1000Elements) + ).interpreterUnsafe + + val multifieldEager10000: GraphQLInterpreter[Any, CalibanError] = + graphQL[Any, MultifieldRoot, Unit, Unit]( + RootResolver(NestedZQueryBenchmarkSchema.multifieldEager10000Elements) + ).interpreterUnsafe + + val deep100: GraphQLInterpreter[Any, CalibanError] = + graphQL[Any, DeepRoot, Unit, Unit]( + RootResolver[DeepRoot](NestedZQueryBenchmarkSchema.deep100Elements) + ).interpreterUnsafe + + val deep1000: GraphQLInterpreter[Any, CalibanError] = + graphQL[Any, DeepRoot, Unit, Unit]( + RootResolver[DeepRoot](NestedZQueryBenchmarkSchema.deep1000Elements) + ).interpreterUnsafe - val deep100: GraphQLInterpreter[Any, CalibanError] = - run( - graphQL[Any, DeepRoot, Unit, Unit]( - RootResolver[DeepRoot](NestedZQueryBenchmarkSchema.deep100Elements) - ).interpreter - ) - val deep1000: GraphQLInterpreter[Any, CalibanError] = - run( - graphQL[Any, DeepRoot, Unit, Unit]( - RootResolver[DeepRoot](NestedZQueryBenchmarkSchema.deep1000Elements) - ).interpreter - ) val deep10000: GraphQLInterpreter[Any, CalibanError] = - run( - graphQL[Any, DeepRoot, Unit, Unit]( - RootResolver[DeepRoot](NestedZQueryBenchmarkSchema.deep10000Elements) - ).interpreter - ) + graphQL[Any, DeepRoot, Unit, Unit]( + RootResolver[DeepRoot](NestedZQueryBenchmarkSchema.deep10000Elements) + ).interpreterUnsafe val metricsInterpreter: GraphQLInterpreter[Any, CalibanError] = - run( - graphQL[Any, MultifieldRoot, Unit, Unit]( - RootResolver(NestedZQueryBenchmarkSchema.multifield1000Elements) - ).withWrapper(Wrappers.metrics()).interpreter - ) + graphQL[Any, MultifieldRoot, Unit, Unit]( + RootResolver(NestedZQueryBenchmarkSchema.multifield1000Elements) + ).withWrapper(Wrappers.metrics()).interpreterUnsafe val apolloInterpreter: GraphQLInterpreter[Any, CalibanError] = - run( - graphQL[Any, MultifieldRoot, Unit, Unit]( - RootResolver(NestedZQueryBenchmarkSchema.multifield1000Elements) - ).withWrapper(ApolloTracing.apolloTracing()).interpreter - ) + graphQL[Any, MultifieldRoot, Unit, Unit]( + RootResolver(NestedZQueryBenchmarkSchema.multifield1000Elements) + ).withWrapper(ApolloTracing.apolloTracing()).interpreterUnsafe private val batched = ExecutionConfiguration(queryExecution = QueryExecution.Batched) private val parallel = ExecutionConfiguration(queryExecution = QueryExecution.Parallel) private val sequential = ExecutionConfiguration(queryExecution = QueryExecution.Sequential) - @Benchmark - def simpleParallelQuery100(): Any = { - val io = - simple100 - .wrapExecutionWith(Configurator.ref.locally(parallel)(_)) - .execute(simpleQuery) - run(io) - } - - @Benchmark - def simpleParallelQuery1000(): Any = { - val io = - simple1000 - .wrapExecutionWith(Configurator.ref.locally(parallel)(_)) - .execute(simpleQuery) - run(io) - } - - @Benchmark - def simpleParallelQuery10000(): Any = { - val io = - simple10000 - .wrapExecutionWith(Configurator.ref.locally(parallel)(_)) - .execute(simpleQuery) - run(io) - } - - @Benchmark - def simpleSequentialQuery100(): Any = { - val io = - simple100 - .wrapExecutionWith(Configurator.ref.locally(sequential)(_)) - .execute(simpleQuery) - run(io) - } - - @Benchmark - def simpleSequentialQuery1000(): Any = { - val io = - simple1000 - .wrapExecutionWith(Configurator.ref.locally(sequential)(_)) - .execute(simpleQuery) - run(io) - } - - @Benchmark - def simpleSequentialQuery10000(): Any = { - val io = - simple10000 - .wrapExecutionWith(Configurator.ref.locally(sequential)(_)) - .execute(simpleQuery) - run(io) - } - - @Benchmark - def simpleBatchedQuery100(): Any = { - val io = - simple100 - .wrapExecutionWith(Configurator.ref.locally(batched)(_)) - .execute(simpleQuery) - run(io) - } - - @Benchmark - def simpleBatchedQuery1000(): Any = { - val io = - simple1000 - .wrapExecutionWith(Configurator.ref.locally(batched)(_)) - .execute(simpleQuery) - run(io) + private def cfg() = execution match { + case "sequential" => sequential + case "parallel" => parallel + case "batched" => batched } @Benchmark - def simpleBatchedQuery10000(): Any = { - val io = - simple10000 - .wrapExecutionWith(Configurator.ref.locally(batched)(_)) + def simpleQueryBenchmark(): Any = { + val interpreter = size match { + case 100 => simple100 + case 1000 => simple1000 + case 10000 => simple10000 + } + val io = + interpreter + .wrapExecutionWith(Configurator.ref.locally(cfg())(_)) .execute(simpleQuery) run(io) } @Benchmark - def multifieldParallelQuery100(): Any = { - val io = multifield100 - .wrapExecutionWith(Configurator.ref.locally(parallel)(_)) - .execute(multifieldQuery) - run(io) - } - - @Benchmark - def multifieldParallelQuery1000(): Any = { - val io = multifield1000 - .wrapExecutionWith(Configurator.ref.locally(parallel)(_)) - .execute(multifieldQuery) - run(io) - } - - @Benchmark - def multifieldParallelQuery10000(): Any = { - val io = multifield10000 - .wrapExecutionWith(Configurator.ref.locally(parallel)(_)) - .execute(multifieldQuery) - run(io) - } - - @Benchmark - def multifieldSequentialQuery100(): Any = { - val io = multifield100 - .wrapExecutionWith(Configurator.ref.locally(sequential)(_)) - .execute(multifieldQuery) - run(io) - } - - @Benchmark - def multifieldSequentialQuery1000(): Any = { - val io = multifield1000 - .wrapExecutionWith(Configurator.ref.locally(sequential)(_)) + def multifieldQueryBenchmark(): Any = { + val interpreter = size match { + case 100 => multifield100 + case 1000 => multifield1000 + case 10000 => multifield10000 + } + val io = interpreter + .wrapExecutionWith(Configurator.ref.locally(cfg())(_)) .execute(multifieldQuery) run(io) } @Benchmark - def multifieldSequentialQuery10000(): Any = { - val io = multifield10000 - .wrapExecutionWith(Configurator.ref.locally(sequential)(_)) + def multifieldQueryEagerBenchmark(): Any = { + val interpreter = size match { + case 100 => multifieldEager100 + case 1000 => multifieldEager1000 + case 10000 => multifieldEager10000 + } + val io = interpreter + .wrapExecutionWith(Configurator.ref.locally(cfg())(_)) .execute(multifieldQuery) run(io) } @Benchmark - def multifieldBatchedQuery100(): Any = { - val io = multifield100 - .wrapExecutionWith(Configurator.ref.locally(batched)(_)) - .execute(multifieldQuery) - run(io) - } - - @Benchmark - def multifieldBatchedQuery1000(): Any = { - val io = multifield1000 - .wrapExecutionWith(Configurator.ref.locally(batched)(_)) - .execute(multifieldQuery) - run(io) - } - - @Benchmark - def multifieldBatchedQuery10000(): Any = { - val io = multifield10000 - .wrapExecutionWith(Configurator.ref.locally(batched)(_)) - .execute(multifieldQuery) - run(io) - } - - @Benchmark - def deepParallelQuery100(): Any = { - val io = deep100 - .wrapExecutionWith(Configurator.ref.locally(parallel)(_)) - .execute(deepQuery) - run(io) - } - - @Benchmark - def deepParallelQuery1000(): Any = { - val io = deep1000 - .wrapExecutionWith(Configurator.ref.locally(parallel)(_)) + def deepQueryBenchmark(): Any = { + val interpreter = size match { + case 100 => deep100 + case 1000 => deep1000 + case 10000 => deep10000 + } + val io = interpreter + .wrapExecutionWith(Configurator.ref.locally(cfg())(_)) .execute(deepQuery) run(io) } - @Benchmark - def deepParallelQuery10000(): Any = { - val io = - deep10000 - .wrapExecutionWith(Configurator.ref.locally(parallel)(_)) - .execute(deepQuery) - run(io) - } - - @Benchmark - def deepSequentialQuery100(): Any = { - val io = - deep100 - .wrapExecutionWith(Configurator.ref.locally(sequential)(_)) - .execute(deepQuery) - run(io) - } - - @Benchmark - def deepSequentialQuery1000(): Any = { - val io = - deep1000 - .wrapExecutionWith(Configurator.ref.locally(sequential)(_)) - .execute(deepQuery) - run(io) - } - - @Benchmark - def deepSequentialQuery10000(): Any = { - val io = - deep10000 - .wrapExecutionWith(Configurator.ref.locally(sequential)(_)) - .execute(deepQuery) - run(io) - } - - @Benchmark - def deepBatchedQuery100(): Any = { - val io = deep100 - .wrapExecutionWith(Configurator.ref.locally(batched)(_)) - .execute(deepQuery) - run(io) - } - - @Benchmark - def deepBatchedQuery1000(): Any = { - val io = deep1000 - .wrapExecutionWith(Configurator.ref.locally(batched)(_)) - .execute(deepQuery) - run(io) - } - - @Benchmark - def deepBatchedQuery10000(): Any = { - val io = deep10000 - .wrapExecutionWith(Configurator.ref.locally(batched)(_)) - .execute(deepQuery) - run(io) - } - - @Benchmark - def noWrappersBenchmark(): Any = { - val io = multifield1000 - .wrapExecutionWith(Configurator.ref.locally(batched)(_)) - .execute(multifieldQuery) - run(io) - } - @Benchmark def apolloTracingBenchmark(): Any = { val io = apolloInterpreter diff --git a/benchmarks/src/main/scala/caliban/execution/NestedZQueryBenchmarkSchema.scala b/benchmarks/src/main/scala/caliban/execution/NestedZQueryBenchmarkSchema.scala index c3df6f586..3df24e400 100644 --- a/benchmarks/src/main/scala/caliban/execution/NestedZQueryBenchmarkSchema.scala +++ b/benchmarks/src/main/scala/caliban/execution/NestedZQueryBenchmarkSchema.scala @@ -98,6 +98,10 @@ object NestedZQueryBenchmarkSchema { val multifield1000Elements: MultifieldRoot = generateMulti(1000) val multifield10000Elements: MultifieldRoot = generateMulti(10000) + val multifieldEager100Elements: MultifieldRoot = generateMultiEager(100) + val multifieldEager1000Elements: MultifieldRoot = generateMultiEager(1000) + val multifieldEager10000Elements: MultifieldRoot = generateMultiEager(10000) + val multifieldQuery: String = """{ entities { id @@ -178,6 +182,22 @@ object NestedZQueryBenchmarkSchema { MultifieldRoot(ZQuery.succeed(entities)) } + private def generateMultiEager(n: Int) = { + val entities = (1 to n).map { i => + val qi = ZQuery.succeedNow(i) + MultifieldEntity( + i, + i + 1, + i + 2, + qi, + qi, + qi, + ZQuery.succeed(NestedObject(i, qi, qi, qi, qi, qi, qi, qi, qi, qi, qi, qi, qi, qi, qi, qi)) + ) + }.toList + MultifieldRoot(ZQuery.succeed(entities)) + } + private def generateDeep(n: Int) = { def loop(n: Int): DeepEntity = if (n == 0) diff --git a/build.sbt b/build.sbt index 7d1de027d..0c7552ebb 100644 --- a/build.sbt +++ b/build.sbt @@ -29,7 +29,7 @@ val zioInteropCats2Version = "22.0.0.0" val zioInteropCats3Version = "23.1.0.2" val zioInteropReactiveVersion = "2.0.2" val zioConfigVersion = "3.0.7" -val zqueryVersion = "0.7.1" +val zqueryVersion = "0.7.1+5-96f9a1f3-SNAPSHOT" val zioJsonVersion = "0.7.0" val zioHttpVersion = "3.0.0-RC8" val zioOpenTelemetryVersion = "3.0.0-RC21" diff --git a/core/src/main/scala/caliban/Configurator.scala b/core/src/main/scala/caliban/Configurator.scala index 941b87359..8e969b656 100644 --- a/core/src/main/scala/caliban/Configurator.scala +++ b/core/src/main/scala/caliban/Configurator.scala @@ -39,7 +39,7 @@ object Configurator { Unsafe.unsafe(implicit u => FiberRef.unsafe.make(ExecutionConfiguration())) private[caliban] val skipValidation: UIO[Boolean] = - ref.getWith(cfg => ZIO.succeed(cfg.skipValidation)) + ref.getWith(cfg => Exit.succeed(cfg.skipValidation)) /** * Skip validation of the query. diff --git a/core/src/main/scala/caliban/GraphQL.scala b/core/src/main/scala/caliban/GraphQL.scala index 97ffbe5fa..cf393a70a 100644 --- a/core/src/main/scala/caliban/GraphQL.scala +++ b/core/src/main/scala/caliban/GraphQL.scala @@ -98,8 +98,8 @@ trait GraphQL[-R] { self => private val introWrappers = wrappers.collect { case w: IntrospectionWrapper[R] => w } private lazy val introspectionRootSchema: RootSchema[R] = Introspector.introspect(rootType, introWrappers) - private def parseZIO(query: String)(implicit trace: Trace): IO[CalibanError.ParsingError, Document] = - ZIO.fromEither(Parser.parseQuery(query)) + private def parseZIO(query: String): IO[CalibanError.ParsingError, Document] = + Exit.fromEither(Parser.parseQuery(query)) override def check(query: String)(implicit trace: Trace): IO[CalibanError, Unit] = for { @@ -120,7 +120,7 @@ trait GraphQL[-R] { self => coercedVars <- coerceVariables(doc, request.variables.getOrElse(Map.empty)) executionReq <- wrap(validation(request.operationName, coercedVars))(validationWrappers, doc) result <- wrap(execution(schemaToExecute(doc), fieldWrappers))(executionWrappers, executionReq) - } yield result).catchAll(Executor.fail(_)) + } yield result).catchAll(Executor.fail) )(overallWrappers, request) } @@ -132,7 +132,7 @@ trait GraphQL[-R] { self => ZIO.fail(CalibanError.ValidationError("Introspection is disabled", "")) else VariablesCoercer.coerceVariables(variables, doc, typeToValidate(doc), config.skipValidation) match { - case Right(value) => ZIO.succeed(value) + case Right(value) => Exit.succeed(value) case Left(error) => ZIO.fail(error) } } @@ -189,9 +189,9 @@ trait GraphQL[-R] { self => if ((req.operationType eq OperationType.Mutation) && !cfg.allowMutationsOverGetRequests) HttpRequestMethod.getWith { case HttpRequestMethod.GET => ZIO.fail(HttpRequestMethod.MutationOverGetError) - case _ => ZIO.succeed(req) + case _ => Exit.succeed(req) } - else ZIO.succeed(req) + else Exit.succeed(req) } } diff --git a/core/src/main/scala/caliban/execution/Executor.scala b/core/src/main/scala/caliban/execution/Executor.scala index 33233bf59..61f02029d 100644 --- a/core/src/main/scala/caliban/execution/Executor.scala +++ b/core/src/main/scala/caliban/execution/Executor.scala @@ -83,8 +83,7 @@ object Executor { ) } } else - ZIO.succeed(GraphQLResponse(result, resultErrors, hasNext = None)) - + Exit.succeed(GraphQLResponse(result, resultErrors, hasNext = None)) } } @@ -133,8 +132,8 @@ object Executor { } yield response } - private[caliban] def fail(error: CalibanError)(implicit trace: Trace): UIO[GraphQLResponse[CalibanError]] = - ZIO.succeed(GraphQLResponse(NullValue, List(error))) + private[caliban] def fail(error: CalibanError): UIO[GraphQLResponse[CalibanError]] = + Exit.succeed(GraphQLResponse(NullValue, List(error))) private final class StepReducer[R]( transformer: Transformer[R], @@ -243,13 +242,15 @@ object Executor { } } - def reduceQuery(query: ZQuery[R, Throwable, Step[R]]) = - ReducedStep.QueryStep( - query.foldCauseQuery( - e => ZQuery.failCause(effectfulExecutionError(path, Some(currentField.locationInfo), e)), - a => ZQuery.succeed(reduceStep(a, currentField, arguments, path)) - ) - ) + def reduceQuery(q: ZQuery[R, Throwable, Step[R]]): ReducedStep[R] = { + def success(v: Step[R]) = reduceStep(v, currentField, arguments, path) + def fail(e: Cause[Throwable]) = effectfulExecutionError(path, Some(currentField.locationInfo), e) + + q.asExitOrElse(null) match { + case null => ReducedStep.QueryStep(q.mapBothCause(fail, success)) + case res => res.foldExit(e => ReducedStep.FailureStep(fail(e)), success) + } + } def reduceStream(stream: ZStream[R, Throwable, Step[R]]) = if (isSubscription) { @@ -499,7 +500,7 @@ object Executor { def loop(step: ReducedStep[R], isTopLevelField: Boolean = false): ExecutionQuery[ResponseValue] = step match { - case PureStep(value) => ZQuery.succeed(value) + case PureStep(value) => ZQuery.succeedNow(value) case ReducedStep.QueryStep(step) => step.flatMap(loop(_)) case ReducedStep.ObjectStep(steps, hasPureFields, _) => makeObjectQuery(steps, hasPureFields, isTopLevelField) case ReducedStep.ListStep(steps, areItemsNullable, _) => makeListQuery(steps, areItemsNullable) @@ -508,7 +509,7 @@ object Executor { .environmentWith[R](env => ResponseValue.StreamValue( stream.mapChunksZIO { chunk => - collectAll(chunk, isTopLevelField)(loop(_).catchAll(_ => ZQuery.succeed(NullValue))).run + collectAll(chunk, isTopLevelField)(loop(_).catchAll(_ => nullValueQuery)).run }.provideEnvironment(env) ) ) @@ -536,7 +537,7 @@ object Executor { case other => Cause.fail(ExecutionError("Effect failure", path.reverse, locationInfo, other)) } - private val nullValueQuery = ZQuery.succeed(NullValue)(Trace.empty) + private val nullValueQuery = ZQuery.succeedNow(NullValue) // The implicit classes below are for methods that don't exist in Scala 2.12 so we add them as syntax methods instead private implicit class EnrichedListOps[+A](private val list: List[A]) extends AnyVal { diff --git a/core/src/main/scala/caliban/introspection/Introspector.scala b/core/src/main/scala/caliban/introspection/Introspector.scala index 21337bb45..30437f6f7 100644 --- a/core/src/main/scala/caliban/introspection/Introspector.scala +++ b/core/src/main/scala/caliban/introspection/Introspector.scala @@ -101,7 +101,7 @@ object Introspector extends IntrospectionDerivation { val step = introWrappers match { case Nil => introspectionSchema.resolve(resolver) - case ws => QueryStep(ZQuery.fromZIO(wrap(ZIO.succeed(resolver))(ws).map(introspectionSchema.resolve))) + case ws => QueryStep(ZQuery.fromZIONow(wrap(Exit.succeed(resolver))(ws).map(introspectionSchema.resolve))) } RootSchema(Operation(introspectionType, step), None, None) diff --git a/core/src/main/scala/caliban/schema/Step.scala b/core/src/main/scala/caliban/schema/Step.scala index 1e4412ddc..c74180b97 100644 --- a/core/src/main/scala/caliban/schema/Step.scala +++ b/core/src/main/scala/caliban/schema/Step.scala @@ -4,7 +4,9 @@ import caliban.CalibanError.ExecutionError import caliban.Value.NullValue import caliban.execution.{ Field, FieldInfo } import caliban.{ InputValue, PathValue, ResponseValue } +import zio.Cause import zio.query.ZQuery +import zio.stacktracer.TracingImplicits.disableAutoTrace import zio.stream.ZStream sealed trait Step[-R] @@ -23,7 +25,7 @@ object Step { } object FailureStep { - def apply(error: Throwable): Step[Any] = QueryStep(ZQuery.fail(error)) + def apply(error: Throwable): Step[Any] = QueryStep(ZQuery.failNow(error)) } // PureStep is both a Step and a ReducedStep so it is defined outside this object @@ -110,6 +112,10 @@ object ReducedStep { final val isPure = false } + object FailureStep { + def apply(error: Cause[ExecutionError]): ReducedStep[Any] = QueryStep(ZQuery.failCauseNow(error)) + } + // PureStep is both a Step and a ReducedStep so it is defined outside this object // This is to avoid boxing/unboxing pure values during step reduction type PureStep = caliban.schema.PureStep diff --git a/core/src/main/scala/caliban/validation/Validator.scala b/core/src/main/scala/caliban/validation/Validator.scala index 74d835eab..eaaff252f 100644 --- a/core/src/main/scala/caliban/validation/Validator.scala +++ b/core/src/main/scala/caliban/validation/Validator.scala @@ -22,7 +22,7 @@ import caliban.{ Configurator, InputValue } import zio.prelude._ import zio.prelude.fx.ZPure import zio.stacktracer.TracingImplicits.disableAutoTrace -import zio.{ IO, Trace, ZIO } +import zio.{ Exit, IO, Trace } import scala.annotation.tailrec import scala.collection.compat._ @@ -53,7 +53,7 @@ object Validator { * Fails with a [[caliban.CalibanError.ValidationError]] otherwise. */ def validate(document: Document, rootType: RootType)(implicit trace: Trace): IO[ValidationError, Unit] = - Configurator.ref.getWith(v => ZIO.fromEither(check(document, rootType, Map.empty, v.validations).map(_ => ()))) + Configurator.ref.getWith(v => Exit.fromEither(check(document, rootType, Map.empty, v.validations).unit)) /** * Verifies that the given document is valid for this type for all available validations. diff --git a/core/src/main/scala/caliban/wrappers/ApolloPersistedQueries.scala b/core/src/main/scala/caliban/wrappers/ApolloPersistedQueries.scala index fef3de094..31bd691fd 100644 --- a/core/src/main/scala/caliban/wrappers/ApolloPersistedQueries.scala +++ b/core/src/main/scala/caliban/wrappers/ApolloPersistedQueries.scala @@ -95,7 +95,7 @@ object ApolloPersistedQueries { } } .flatMap(process) - .catchAll(ex => ZIO.succeed(GraphQLResponse(NullValue, List(ex)))) + .catchAll(ex => Exit.succeed(GraphQLResponse(NullValue, List(ex)))) case None => docVar.succeed(None) *> process(request) } } diff --git a/core/src/main/scala/caliban/wrappers/Caching.scala b/core/src/main/scala/caliban/wrappers/Caching.scala index 42fae48c5..49bd2dde0 100644 --- a/core/src/main/scala/caliban/wrappers/Caching.scala +++ b/core/src/main/scala/caliban/wrappers/Caching.scala @@ -11,7 +11,7 @@ import caliban.schema.Types import caliban.wrappers.Wrapper.{ EffectfulWrapper, FieldWrapper, OverallWrapper, ValidationWrapper } import zio.prelude._ import zio.query.ZQuery -import zio.{ durationInt, Duration, FiberRef, Ref, UIO, Unsafe, ZIO } +import zio.{ durationInt, Duration, Exit, FiberRef, Ref, UIO, Unsafe, ZIO } import java.util.concurrent.{ ConcurrentHashMap, TimeUnit } @@ -280,7 +280,7 @@ object Caching { query.mapZIO { result => cacheOverride.get.flatMap { case Some(overrideValue) => state.update(_.restrict(Some(overrideValue))) as result - case None => ZIO.succeed(result) + case None => Exit.succeed(result) } } } diff --git a/core/src/main/scala/caliban/wrappers/Wrapper.scala b/core/src/main/scala/caliban/wrappers/Wrapper.scala index 551796c3f..91ab95435 100644 --- a/core/src/main/scala/caliban/wrappers/Wrapper.scala +++ b/core/src/main/scala/caliban/wrappers/Wrapper.scala @@ -7,7 +7,7 @@ import caliban.introspection.adt.__Introspection import caliban.parsing.adt.Document import caliban.wrappers.Wrapper.CombinedWrapper import zio.query.ZQuery -import zio.{ Trace, UIO, ZIO } +import zio.{ Exit, Trace, UIO, ZIO } import zio.stacktracer.TracingImplicits.disableAutoTrace import scala.annotation.tailrec @@ -152,7 +152,7 @@ object Wrapper { } private val emptyWrappers = - ZIO.succeed((Nil, Nil, Nil, Nil, Nil, Nil))(Trace.empty) + Exit.succeed((Nil, Nil, Nil, Nil, Nil, Nil)) private[caliban] def decompose[R](wrappers: List[Wrapper[R]])(implicit trace: Trace): UIO[ ( diff --git a/core/src/main/scala/caliban/wrappers/Wrappers.scala b/core/src/main/scala/caliban/wrappers/Wrappers.scala index 784c99405..0c418039c 100644 --- a/core/src/main/scala/caliban/wrappers/Wrappers.scala +++ b/core/src/main/scala/caliban/wrappers/Wrappers.scala @@ -25,11 +25,11 @@ object Wrappers { process: GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] ): GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] = request => - process(request).tap(response => - ZIO.when(response.errors.nonEmpty)( - printLineError(response.errors.flatMap(prettyStackStrace).mkString("", "\n", "\n")).orDie - ) - ) + process(request).tap { response => + val errors = response.errors + if (errors.nonEmpty) printLineError(errors.flatMap(prettyStackStrace).mkString("", "\n", "\n")).orDie + else Exit.unit + } } private def prettyStackStrace(t: Throwable): Chunk[String] = { @@ -68,7 +68,8 @@ object Wrappers { ): GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] = (request: GraphQLRequest) => process(request).timed.flatMap { case (time, res) => - ZIO.when(time > duration)(f(time, request.query.getOrElse(""))).as(res) + if (time > duration) f(time, request.query.getOrElse("")).as(res) + else Exit.succeed(res) } } @@ -82,20 +83,17 @@ object Wrappers { process: GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] ): GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] = (request: GraphQLRequest) => - process(request) - .timeout(duration) - .map( - _.getOrElse( - GraphQLResponse( - NullValue, - List( - ExecutionError( - s"Query was interrupted after timeout of ${duration.render}:\n${request.query.getOrElse("")}" - ) - ) + process(request).timeoutTo( + GraphQLResponse( + NullValue, + List( + ExecutionError( + s"Query was interrupted after timeout of ${duration.render}:\n${request.query.getOrElse("")}" ) ) ) + )(identity)(duration) + } /** @@ -109,25 +107,36 @@ object Wrappers { ): Document => ZIO[R1, ValidationError, ExecutionRequest] = (doc: Document) => process(doc).tap { req => - ZIO.unlessZIO(Configurator.skipValidation) { - calculateDepth(req.field).flatMap { depth => - ZIO.when(depth > maxDepth)( - ZIO.fail(ValidationError(s"Query is too deep: $depth. Max depth: $maxDepth.", "")) - ) - } + ZIO.unlessZIODiscard(Configurator.skipValidation) { + val depth = calculateDepth(req.field) + if (depth > maxDepth) ZIO.fail(ValidationError(s"Query is too deep: $depth. Max depth: $maxDepth.", "")) + else Exit.unit } } } - private def calculateDepth(field: Field): UIO[Int] = { - val self = if (field.name.nonEmpty) 1 else 0 - val children = field.fields - ZIO - .foreach(children)(calculateDepth) - .map { - case Nil => self - case list => list.max + self + private def calculateDepth(field: Field): Int = { + // Faster because it doesn't allocate a new list on each iteration but not stack-safe + def loopUnsafe(field: Field, currentDepth: Int): Int = { + var children = field.fields + var max = currentDepth + while (children ne Nil) { + val d = loopUnsafe(children.head, currentDepth + 1) + if (d > max) max = d + children = children.tail } + max + } + + @tailrec + def loopSafe(fields: List[Field], currentDepth: Int): Int = + if (fields.isEmpty) currentDepth + else loopSafe(fields.flatMap(_.fields), currentDepth + 1) + + try loopUnsafe(field, 0) + catch { + case _: StackOverflowError => loopSafe(field.fields, 0) + } } /** @@ -141,12 +150,11 @@ object Wrappers { ): Document => ZIO[R1, ValidationError, ExecutionRequest] = (doc: Document) => process(doc).tap { req => - ZIO.unlessZIO(Configurator.skipValidation) { - countFields(req.field).flatMap { fields => - ZIO.when(fields > maxFields)( - ZIO.fail(ValidationError(s"Query has too many fields: $fields. Max fields: $maxFields.", "")) - ) - } + ZIO.unlessZIODiscard(Configurator.skipValidation) { + val fields = countFields(req.field) + if (fields > maxFields) + ZIO.fail(ValidationError(s"Query has too many fields: $fields. Max fields: $maxFields.", "")) + else Exit.unit } } } @@ -167,7 +175,8 @@ object Wrappers { ): Wrapper.EffectfulWrapper[Any] = FieldMetrics.wrapper(totalLabel, durationLabel, buckets, extraLabels) - private def countFields(rootField: Field): UIO[Int] = { + private def countFields(rootField: Field): Int = { + // Faster because it doesn't allocate a new list on each iteration but not stack-safe def loopUnsafe(field: Field): Int = { val iter = field.fields.iterator var count = 0 @@ -178,14 +187,14 @@ object Wrappers { count } - def loopSafe(field: Field): UIO[Int] = { - val fields = field.fields - ZIO.foreach(fields)(loopSafe).map(_.sum + fields.length) - } + @tailrec + def loopSafe(fields: List[Field], acc: Int): Int = + if (fields.isEmpty) acc + else loopSafe(fields.flatMap(_.fields), acc + fields.length) - try Exit.succeed(loopUnsafe(rootField)) + try loopUnsafe(rootField) catch { - case _: StackOverflowError => loopSafe(rootField) + case _: StackOverflowError => loopSafe(rootField.fields, 0) } } diff --git a/interop/tapir/src/main/scala/caliban/interop/tapir/package.scala b/interop/tapir/src/main/scala/caliban/interop/tapir/package.scala index 39cfb621b..2e06767bd 100644 --- a/interop/tapir/src/main/scala/caliban/interop/tapir/package.scala +++ b/interop/tapir/src/main/scala/caliban/interop/tapir/package.scala @@ -11,7 +11,7 @@ import sttp.tapir.internal._ import sttp.tapir.server.ServerEndpoint import sttp.tapir.{ EndpointIO, EndpointInput, EndpointOutput, PublicEndpoint } import _root_.zio.query.{ URQuery, ZQuery } -import _root_.zio.{ URIO, ZIO } +import _root_.zio.{ Exit, URIO, ZIO } import caliban.transformers.Transformer package object tapir { @@ -125,7 +125,7 @@ package object tapir { val replacedArgs = args.map { case (k, v) => reverseArgNames.getOrElse(k, k) -> v } QueryStep( ZQuery - .fromZIO(ZIO.fromEither(argBuilder.build(InputValue.ObjectValue(replacedArgs)))) + .fromZIO(Exit.fromEither(argBuilder.build(InputValue.ObjectValue(replacedArgs)))) .flatMap(input => serverEndpoint.logic(queryMonadError)(())(input)) .map { case Left(error: Throwable) => Step.fail(error) From 35aca45bafb1ae7705ec017efdc8061de99863c0 Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Sun, 23 Jun 2024 14:21:52 +1000 Subject: [PATCH 2/4] Cleanup QuickRequestHandler --- .../scala/caliban/QuickRequestHandler.scala | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/adapters/quick/src/main/scala/caliban/QuickRequestHandler.scala b/adapters/quick/src/main/scala/caliban/QuickRequestHandler.scala index 3da1e5719..ebfb1544c 100644 --- a/adapters/quick/src/main/scala/caliban/QuickRequestHandler.scala +++ b/adapters/quick/src/main/scala/caliban/QuickRequestHandler.scala @@ -16,6 +16,7 @@ import zio.stacktracer.TracingImplicits.disableAutoTrace import zio.stream.{ UStream, ZPipeline, ZStream } import java.nio.charset.StandardCharsets.UTF_8 +import scala.util.Try import scala.util.control.NonFatal final private class QuickRequestHandler[R]( @@ -41,22 +42,23 @@ final private class QuickRequestHandler[R]( def configureWebSocket[R1](config: quick.WebSocketConfig[R1]): QuickRequestHandler[R & R1] = new QuickRequestHandler[R & R1](interpreter, config) - def handleHttpRequest(request: Request)(implicit trace: Trace): URIO[R, Response] = - ZIO.suspendSucceed { // Suspending to ensure that all the CPU work we're about to do happens in the ZIO thread pool - transformHttpRequest(request) - .flatMap(executeRequest(request.method, _)) - .foldZIO( - Exit.succeed, - resp => Exit.succeed(transformResponse(request, resp)) - ) - } + def handleHttpRequest(request: Request)(implicit trace: Trace): URIO[R, Response] = ZIO.suspendSucceed { + transformHttpRequest(request) + .flatMap(executeRequest(request.method, _)) + .foldZIO( + Exit.succeed, + resp => Exit.succeed(transformResponse(request, resp)) + ) + } - def handleUploadRequest(request: Request)(implicit trace: Trace): URIO[R, Response] = + def handleUploadRequest(request: Request)(implicit trace: Trace): URIO[R, Response] = ZIO.suspendSucceed { transformUploadRequest(request).flatMap { case (req, fileHandle) => - executeRequest(request.method, req) - .map(transformResponse(request, _)) - .provideSomeLayer[R](fileHandle) - }.merge + executeRequest(request.method, req).provideSomeLayer[R](fileHandle) + }.foldZIO( + Exit.succeed, + v => Exit.succeed(transformResponse(request, v)) + ) + } def handleWebSocketRequest(request: Request)(implicit trace: Trace): URIO[R, Response] = Response.fromSocketApp { @@ -131,10 +133,10 @@ final private class QuickRequestHandler[R]( partsMap: Map[String, FormField], key: String )(implicit jsonValueCodec: JsonValueCodec[A]): IO[Response, A] = - ZIO + Exit .fromOption(partsMap.get(key)) .flatMap(_.asChunk) - .flatMap(v => ZIO.attempt(readFromArray[A](v.toArray))) + .flatMap(v => Exit.fromTry(Try(readFromArray[A](v.toArray)))) .orElseFail(Response.badRequest) def parsePath(path: String): List[PathValue] = path.split('.').toList.map(PathValue.parse) From 545e8029c26e92b757afe83547423c635c18c455 Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Sun, 23 Jun 2024 17:30:38 +1000 Subject: [PATCH 3/4] Use `ZQuery.fromEither` --- .../tapir/src/main/scala/caliban/interop/tapir/package.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/interop/tapir/src/main/scala/caliban/interop/tapir/package.scala b/interop/tapir/src/main/scala/caliban/interop/tapir/package.scala index 2e06767bd..0990dc931 100644 --- a/interop/tapir/src/main/scala/caliban/interop/tapir/package.scala +++ b/interop/tapir/src/main/scala/caliban/interop/tapir/package.scala @@ -125,7 +125,7 @@ package object tapir { val replacedArgs = args.map { case (k, v) => reverseArgNames.getOrElse(k, k) -> v } QueryStep( ZQuery - .fromZIO(Exit.fromEither(argBuilder.build(InputValue.ObjectValue(replacedArgs)))) + .fromEither(argBuilder.build(InputValue.ObjectValue(replacedArgs))) .flatMap(input => serverEndpoint.logic(queryMonadError)(())(input)) .map { case Left(error: Throwable) => Step.fail(error) From 809a8ada2915f2b6fbb7acbc1a6749c74d799e78 Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Tue, 25 Jun 2024 11:56:55 +1000 Subject: [PATCH 4/4] Update zio-query version --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 0c7552ebb..862efb3d3 100644 --- a/build.sbt +++ b/build.sbt @@ -29,7 +29,7 @@ val zioInteropCats2Version = "22.0.0.0" val zioInteropCats3Version = "23.1.0.2" val zioInteropReactiveVersion = "2.0.2" val zioConfigVersion = "3.0.7" -val zqueryVersion = "0.7.1+5-96f9a1f3-SNAPSHOT" +val zqueryVersion = "0.7.2" val zioJsonVersion = "0.7.0" val zioHttpVersion = "3.0.0-RC8" val zioOpenTelemetryVersion = "3.0.0-RC21"