Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OSS Sync | Sat Apr 22 21:50:59 UTC 2023 #554

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.wixpress.dst.greyhound.core.producer.ProducerRecord
import com.wixpress.dst.greyhound.core.testkit.{BaseTestWithSharedEnv, TestMetrics}
import com.wixpress.dst.greyhound.core.zioutils.CountDownLatch
import com.wixpress.dst.greyhound.testenv.ITEnv
import com.wixpress.dst.greyhound.testenv.ITEnv.{Env, TestResources, testResources}
import com.wixpress.dst.greyhound.testenv.ITEnv.{testResources, Env, TestResources}
import org.apache.kafka.common.config.TopicConfig.{DELETE_RETENTION_MS_CONFIG, MAX_MESSAGE_BYTES_CONFIG, RETENTION_MS_CONFIG}
import org.apache.kafka.common.errors.InvalidTopicException
import org.specs2.specification.core.Fragments
Expand Down Expand Up @@ -83,7 +83,7 @@ class AdminClientIT extends BaseTestWithSharedEnv[Env, TestResources] {
}
}

//todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged
// todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged
// "reflect errors" in {
// val topic1 = aTopicConfig()
// val topic2 = aTopicConfig("x" * 250)
Expand All @@ -104,7 +104,7 @@ class AdminClientIT extends BaseTestWithSharedEnv[Env, TestResources] {
// created === Map(badTopic.name -> None)
// }
// }
//todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged
// todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged
// =================================================================================================================================
"ignore TopicExistsException by default" in {
val topic = aTopicConfig()
Expand Down
5 changes: 0 additions & 5 deletions core/src/it/scala/com/wixpress/dst/greyhound/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,8 @@ specs2_ite2e_test(
"//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils",
"//core/src/test/resources",
#"//core/src/test/scala/com/wixpress/dst/greyhound/core/consumer",
#"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
"@ch_qos_logback_logback_classic",
# "@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_2_12",
"@dev_zio_zio_test_2_12",
"@org_apache_kafka_kafka_2_12",
"@org_apache_kafka_kafka_clients",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ specs2_ite2e_test(
"@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_managed_2_12",
"//core/src/it/resources",
"//core/src/it/scala/com/wixpress/dst/greyhound/core",
"//core/src/it/scala/com/wixpress/dst/greyhound/testenv",
"//core/src/it/scala/com/wixpress/dst/greyhound/testkit",
"//core/src/main/scala/com/wixpress/dst/greyhound/core",
Expand All @@ -23,9 +22,7 @@ specs2_ite2e_test(
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/consumer",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
"@ch_qos_logback_logback_classic",
# "@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_2_12",
"@org_apache_kafka_kafka_clients",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ specs2_ite2e_test(
"@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_managed_2_12",
"//core/src/it/resources",
"//core/src/it/scala/com/wixpress/dst/greyhound/core",
"//core/src/it/scala/com/wixpress/dst/greyhound/testenv",
"//core/src/it/scala/com/wixpress/dst/greyhound/testkit",
"//core/src/main/scala/com/wixpress/dst/greyhound/core",
Expand All @@ -23,9 +22,7 @@ specs2_ite2e_test(
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/consumer",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
"@ch_qos_logback_logback_classic",
# "@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_2_12",
"@org_apache_kafka_kafka_clients",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ specs2_ite2e_test(
deps = [
"@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_managed_2_12",
"//core/src/it/scala/com/wixpress/dst/greyhound/core",
"//core/src/it/scala/com/wixpress/dst/greyhound/testenv",
"//core/src/it/scala/com/wixpress/dst/greyhound/testkit",
"//core/src/main/scala/com/wixpress/dst/greyhound/core",
Expand All @@ -21,11 +20,8 @@ specs2_ite2e_test(
"//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils",
"//core/src/test/resources",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/consumer",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
"@ch_qos_logback_logback_classic",
# "@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_2_12",
"@org_apache_kafka_kafka_clients",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,11 @@ scala_library(
"@dev_zio_zio_managed_2_12",
"//core/src/it/scala/com/wixpress/dst/greyhound/testkit",
"//core/src/main/scala/com/wixpress/dst/greyhound/core",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/admin",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
# "@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_2_12",
"@dev_zio_zio_test_2_12",
"@org_apache_curator_curator_test",
"@org_apache_kafka_kafka_2_12",
"@org_apache_kafka_kafka_clients",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ scala_library(
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
# "//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"@dev_zio_zio_2_12",
"@dev_zio_zio_test_2_12",
"@org_apache_curator_curator_test",
"@org_apache_kafka_kafka_2_12",
"@org_apache_kafka_kafka_clients",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,10 @@ object UnsafeOffsetOperations {
}

override def offsetsForTimes(partitions: Set[TopicPartition], timeEpoch: Long, timeout: Duration): Map[TopicPartition, Option[Long]] =
consumer.offsetsForTimes(partitions.map(_.asKafka).map(tp => (tp, new lang.Long(timeEpoch))).toMap.asJava, timeout)
.asScala.toMap.map { case (tp, of) => TopicPartition(tp) -> (Option(of).map(_.offset())) }
consumer
.offsetsForTimes(partitions.map(_.asKafka).map(tp => (tp, new lang.Long(timeEpoch))).toMap.asJava, timeout)
.asScala
.toMap
.map { case (tp, of) => TopicPartition(tp) -> (Option(of).map(_.offset())) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ object EventLoop {
partitionsAssigned <- Promise.make[Nothing, Unit]
// TODO how to handle errors in subscribe?
rebalanceListener = listener(pausedPartitionsRef, config, dispatcher, partitionsAssigned, group, consumer, clientId, offsets)
_ <- report(SubscribingToInitialSubAndRebalanceListener(clientId, group, consumerAttributes))
_ <- report(SubscribingToInitialSubAndRebalanceListener(clientId, group, consumerAttributes))
_ <- subscribe(initialSubscription, rebalanceListener)(consumer)
running <- Ref.make[EventLoopState](Running)
_ <- report(CreatingPollOnceFiber(clientId, group, consumerAttributes))
_ <- report(CreatingPollOnceFiber(clientId, group, consumerAttributes))
fiber <- pollOnce(running, consumer, dispatcher, pausedPartitionsRef, positionsRef, offsets, config, clientId, group)
.repeatWhile(_ == true)
.forkDaemon
_ <- report(AwaitingPartitionsAssignment(clientId, group, consumerAttributes))
_ <- report(AwaitingPartitionsAssignment(clientId, group, consumerAttributes))
_ <- partitionsAssigned.await
env <- ZIO.environment[Env]
} yield (dispatcher, fiber, offsets, positionsRef, running, rebalanceListener.provideEnvironment(env))
Expand Down Expand Up @@ -303,9 +303,11 @@ object EventLoopMetric {

case class FailedToUpdatePositions(t: Throwable, clientId: ClientId, attributes: Map[String, String] = Map.empty) extends EventLoopMetric

case class CreatingDispatcher(clientId: ClientId, group: Group, attributes: Map[String, String], startPaused: Boolean) extends EventLoopMetric
case class CreatingDispatcher(clientId: ClientId, group: Group, attributes: Map[String, String], startPaused: Boolean)
extends EventLoopMetric

case class SubscribingToInitialSubAndRebalanceListener(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric
case class SubscribingToInitialSubAndRebalanceListener(clientId: ClientId, group: Group, attributes: Map[String, String])
extends EventLoopMetric

case class CreatingPollOnceFiber(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.wixpress.dst.greyhound.core.consumer

import com.wixpress.dst.greyhound.core.{Offset, TopicPartition}
import zio._

trait OffsetsAndGaps {
def getCommittableAndClear: UIO[Map[TopicPartition, OffsetAndGaps]]

def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]]

def update(partition: TopicPartition, batch: Seq[Offset]): UIO[Unit]

def contains(partition: TopicPartition, offset: Offset): UIO[Boolean]
}

object OffsetsAndGaps {
def make: UIO[OffsetsAndGaps] =
Ref.make(Map.empty[TopicPartition, OffsetAndGaps]).map { ref =>
new OffsetsAndGaps {
override def getCommittableAndClear: UIO[Map[TopicPartition, OffsetAndGaps]] =
ref.modify(offsetsAndGaps => {
val committable = offsetsAndGaps.filter(_._2.committable)
val updated = offsetsAndGaps.mapValues(_.markCommitted)
(committable, updated)
})

override def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]] =
ref.get.map(_.get(partition).fold(Seq.empty[Gap])(_.gaps.sortBy(_.start)))

override def update(partition: TopicPartition, batch: Seq[Offset]): UIO[Unit] =
ref.update { offsetsAndGaps =>
val sortedBatch = batch.sorted
val maxBatchOffset = sortedBatch.last
val maybeOffsetAndGaps = offsetsAndGaps.get(partition)
val prevOffset = maybeOffsetAndGaps.fold(-1L)(_.offset)
val partitionOffsetAndGaps = maybeOffsetAndGaps.fold(OffsetAndGaps(maxBatchOffset))(identity)

val newGaps = gapsInBatch(sortedBatch, prevOffset)

val updatedGaps = updateGapsByOffsets(
partitionOffsetAndGaps.gaps ++ newGaps,
sortedBatch
)

offsetsAndGaps + (partition -> OffsetAndGaps(maxBatchOffset max prevOffset, updatedGaps))
}.unit

override def contains(partition: TopicPartition, offset: Offset): UIO[Boolean] =
ref.get.map(_.get(partition).fold(false)(_.contains(offset)))

private def gapsInBatch(batch: Seq[Offset], prevLastOffset: Offset): Seq[Gap] =
batch.sorted
.foldLeft(Seq.empty[Gap], prevLastOffset) {
case ((gaps, lastOffset), offset) =>
if (offset <= lastOffset) (gaps, lastOffset)
else if (offset == lastOffset + 1) (gaps, offset)
else {
val newGap = Gap(lastOffset + 1, offset - 1)
(newGap +: gaps, offset)
}
}
._1
.reverse

private def updateGapsByOffsets(gaps: Seq[Gap], offsets: Seq[Offset]): Seq[Gap] = {
val gapsToOffsets = gaps.map(gap => gap -> offsets.filter(o => o >= gap.start && o <= gap.end)).toMap
gapsToOffsets.flatMap {
case (gap, offsets) =>
if (offsets.isEmpty) Seq(gap)
else if (offsets.size == (gap.size)) Seq.empty[Gap]
else gapsInBatch(offsets ++ Seq(gap.start - 1, gap.end + 1), gap.start - 2)
}.toSeq
}
}
}
}

case class Gap(start: Offset, end: Offset) {
def contains(offset: Offset): Boolean = start <= offset && offset <= end

def size: Long = end - start + 1
}

case class OffsetAndGaps(offset: Offset, gaps: Seq[Gap], committable: Boolean = true) {
def contains(offset: Offset): Boolean = gaps.exists(_.contains(offset))

def markCommitted: OffsetAndGaps = copy(committable = false)
}

object OffsetAndGaps {
def apply(offset: Offset): OffsetAndGaps = OffsetAndGaps(offset, Seq.empty[Gap])
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ class OffsetsInitializer(
offsetOperations.pause(toPause)
val rewindUncommittedOffsets =
if (offsetResetIsEarliest || notCommitted.isEmpty || rewindUncommittedOffsetsBy.isZero) Map.empty
else offsetOperations.offsetsForTimes(notCommitted, clock.millis() - rewindUncommittedOffsetsBy.toMillis, effectiveTimeout)
.map{case (tp, maybeRewindedOffset) => (tp, maybeRewindedOffset.orElse(endOffsets.get(tp)).getOrElse(0L))}
else
offsetOperations
.offsetsForTimes(notCommitted, clock.millis() - rewindUncommittedOffsetsBy.toMillis, effectiveTimeout)
.map { case (tp, maybeRewindedOffset) => (tp, maybeRewindedOffset.orElse(endOffsets.get(tp)).getOrElse(0L)) }

val positions =
notCommitted.map(tp => tp -> offsetOperations.position(tp, effectiveTimeout)).toMap ++ toOffsets ++ rewindUncommittedOffsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ object RecordConsumer {
(initialSubscription, topicsToCreate) = config.retryConfig.fold((config.initialSubscription, Set.empty[Topic]))(policy =>
maybeAddRetryTopics(policy, config, nonBlockingRetryHelper)
)
_ <- AdminClient
_ <- ZIO.when(config.createRetryTopics)(AdminClient
.make(AdminClientConfig(config.bootstrapServers, config.kafkaAuthProperties), config.consumerAttributes)
.tap(client =>
client.createTopics(
topicsToCreate.map(topic =>
TopicConfig(topic, partitions = 1, replicationFactor = 1, cleanupPolicy = CleanupPolicy.Delete(86400000L))
)
)
)
))
blockingState <- Ref.make[Map[BlockingTarget, BlockingState]](Map.empty)
blockingStateResolver = BlockingStateResolver(blockingState)
workersShutdownRef <- Ref.make[Map[TopicPartition, ShutdownPromise]](Map.empty)
Expand Down Expand Up @@ -321,7 +321,8 @@ case class RecordConsumerConfig(
decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor,
retryProducerAttributes: Map[String, String] = Map.empty,
commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA,
rewindUncommittedOffsetsBy: Duration = 0.millis
rewindUncommittedOffsetsBy: Duration = 0.millis,
createRetryTopics: Boolean = true
) extends CommonGreyhoundConfig {

override def kafkaProps: Map[String, String] = extraProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ case class ReportingConsumer(clientId: ClientId, group: Group, internal: Consume
implicit trace: Trace
): UIO[DelayedRebalanceEffect] =
(report(PartitionsRevoked(clientId, group, partitions, config.consumerAttributes)) *>
rebalanceListener.onPartitionsRevoked(consumer, partitions)
.timed.tap { case (duration, _) => report(PartitionsRevokedComplete(clientId, group, partitions, config.consumerAttributes, duration.toMillis)) }
.map(_._2)
).provideEnvironment(r)
rebalanceListener
.onPartitionsRevoked(consumer, partitions)
.timed
.tap {
case (duration, _) =>
report(PartitionsRevokedComplete(clientId, group, partitions, config.consumerAttributes, duration.toMillis))
}
.map(_._2)).provideEnvironment(r)

override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[Any] =
(report(PartitionsAssigned(clientId, group, partitions, config.consumerAttributes)) *>
Expand Down Expand Up @@ -241,11 +245,13 @@ object ConsumerMetric {
attributes: Map[String, String] = Map.empty
) extends ConsumerMetric

case class PartitionsRevokedComplete(clientId: ClientId,
group: Group,
partitions: Set[TopicPartition],
attributes: Map[String, String] = Map.empty,
durationMs: Long) extends ConsumerMetric
case class PartitionsRevokedComplete(
clientId: ClientId,
group: Group,
partitions: Set[TopicPartition],
attributes: Map[String, String] = Map.empty,
durationMs: Long
) extends ConsumerMetric

case class SubscribeFailed(clientId: ClientId, group: Group, error: Throwable, attributes: Map[String, String] = Map.empty)
extends ConsumerMetric
Expand Down
Loading