Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OSS Sync | Sun Jul 16 08:32:17 UTC 2023 #563

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f6a505e
Ds scalafmt (#32485)
leonbur Feb 6, 2023
024f76d
Remove unused test deps (#33323)
Mar 10, 2023
e0f2ba7
Blocking retries attempts tracking fix (#32757)
atamurius Mar 14, 2023
1292320
Revert "Blocking retries attempts tracking fix" (#33818)
natansil Apr 3, 2023
779acdd
[greyhound] remove internal topic creation - wix adapter (#33820)
berman7 Apr 3, 2023
84f38bd
[greyhound] parallel consumer OffsetsAndGaps (#33605)
ben-wattelman Apr 4, 2023
a6ac30b
Blocking retries attempts tracking fix + fix transitive header bug (#…
natansil Apr 4, 2023
facbf92
[greyhound] expose internal kafka producer metrics (#34160)
berman7 Apr 22, 2023
7c7e562
[greyhound] Cooperative Rebalance fix (#34153)
leonbur Apr 23, 2023
570205a
code cleanup (#34171)
leonbur Apr 23, 2023
596bc6c
[greyhound] parallel consumer implementation (#34061)
ben-wattelman May 24, 2023
be4bfc1
[gh-consumers-proxy] s3 bridge (#34839)
berman7 May 29, 2023
141c7a6
[greyhound] parallel consumer - add visibility (#34908)
ben-wattelman May 30, 2023
57343a5
[greyhound] parallel consumer visibility (#34926)
ben-wattelman Jun 1, 2023
e0260ad
[greyhound] parallel consumer - add offsets and gaps init (#35027)
ben-wattelman Jun 6, 2023
a5a1bec
[greyhound] parallel consumer - add grouping for no-key records (#35071)
ben-wattelman Jun 11, 2023
8de7d3e
make GreyhoundNG RetryConfig serializable (so can be sent over gRpc a…
natansil Jun 13, 2023
5111d43
multi-tenant consumer proxy redesign - initial commit (#35244)
berman7 Jun 17, 2023
c500ec9
[greyhound] parallel consumer - add gaps limit (#35313)
ben-wattelman Jun 20, 2023
d7dfb03
Fixes in preparation for ZIO 2.0.15 (#35320)
leonbur Jun 21, 2023
c50eb4c
[greyhound-consumer-proxy] start from latest offset when group doesn'…
berman7 Jun 22, 2023
1e98ca6
[greyhound] parallel consumer - compression and encoding for gaps lim…
ben-wattelman Jun 25, 2023
bbe27cf
greyhound proxy non blocking retries (#35456)
natansil Jun 29, 2023
fe05e49
new kafka monitor server for proxy (currently) (#35598)
berman7 Jul 2, 2023
4262e58
[greyhound] fix OffsetsInitializer metadata bug (#35684)
ben-wattelman Jul 6, 2023
a0f7288
ConsumerIT: add a test: allow to override offsetReset with autoResetO…
natansil Jul 13, 2023
39d1576
gh RecordConsumer - add visibility to extra properties setup with aut…
natansil Jul 13, 2023
02064fc
gh RecordConsumer - allow to override offsetReset with autoResetOffse…
natansil Jul 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.wixpress.dst.greyhound.core.producer.ProducerRecord
import com.wixpress.dst.greyhound.core.testkit.{BaseTestWithSharedEnv, TestMetrics}
import com.wixpress.dst.greyhound.core.zioutils.CountDownLatch
import com.wixpress.dst.greyhound.testenv.ITEnv
import com.wixpress.dst.greyhound.testenv.ITEnv.{Env, TestResources, testResources}
import com.wixpress.dst.greyhound.testenv.ITEnv.{testResources, Env, TestResources}
import org.apache.kafka.common.config.TopicConfig.{DELETE_RETENTION_MS_CONFIG, MAX_MESSAGE_BYTES_CONFIG, RETENTION_MS_CONFIG}
import org.apache.kafka.common.errors.InvalidTopicException
import org.specs2.specification.core.Fragments
Expand Down Expand Up @@ -83,7 +83,7 @@ class AdminClientIT extends BaseTestWithSharedEnv[Env, TestResources] {
}
}

//todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged
// todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged
// "reflect errors" in {
// val topic1 = aTopicConfig()
// val topic2 = aTopicConfig("x" * 250)
Expand All @@ -104,7 +104,7 @@ class AdminClientIT extends BaseTestWithSharedEnv[Env, TestResources] {
// created === Map(badTopic.name -> None)
// }
// }
//todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged
// todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged
// =================================================================================================================================
"ignore TopicExistsException by default" in {
val topic = aTopicConfig()
Expand Down
5 changes: 0 additions & 5 deletions core/src/it/scala/com/wixpress/dst/greyhound/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,8 @@ specs2_ite2e_test(
"//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils",
"//core/src/test/resources",
#"//core/src/test/scala/com/wixpress/dst/greyhound/core/consumer",
#"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
"@ch_qos_logback_logback_classic",
# "@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_2_12",
"@dev_zio_zio_test_2_12",
"@org_apache_kafka_kafka_2_12",
"@org_apache_kafka_kafka_clients",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
],
Expand Down
949 changes: 558 additions & 391 deletions core/src/it/scala/com/wixpress/dst/greyhound/core/ConsumerIT.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ specs2_ite2e_test(
"@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_managed_2_12",
"//core/src/it/resources",
"//core/src/it/scala/com/wixpress/dst/greyhound/core",
"//core/src/it/scala/com/wixpress/dst/greyhound/testenv",
"//core/src/it/scala/com/wixpress/dst/greyhound/testkit",
"//core/src/main/scala/com/wixpress/dst/greyhound/core",
Expand All @@ -23,9 +22,7 @@ specs2_ite2e_test(
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/consumer",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
"@ch_qos_logback_logback_classic",
# "@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_2_12",
"@org_apache_kafka_kafka_clients",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package(default_visibility = ["//visibility:public"])

sources()

specs2_ite2e_test(
name = "parallel",
srcs = [
":sources",
],
deps = [
"//core/src/it/resources",
"//core/src/it/scala/com/wixpress/dst/greyhound/testenv",
"//core/src/it/scala/com/wixpress/dst/greyhound/testkit",
"//core/src/main/scala/com/wixpress/dst/greyhound/core",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/domain",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
"@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_2_12",
"@dev_zio_zio_managed_2_12",
"@org_apache_kafka_kafka_clients",
],
)

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ specs2_ite2e_test(
"@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_managed_2_12",
"//core/src/it/resources",
"//core/src/it/scala/com/wixpress/dst/greyhound/core",
"//core/src/it/scala/com/wixpress/dst/greyhound/testenv",
"//core/src/it/scala/com/wixpress/dst/greyhound/testkit",
"//core/src/main/scala/com/wixpress/dst/greyhound/core",
Expand All @@ -23,9 +22,7 @@ specs2_ite2e_test(
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/consumer",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
"@ch_qos_logback_logback_classic",
# "@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_2_12",
"@org_apache_kafka_kafka_clients",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ specs2_ite2e_test(
deps = [
"@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_managed_2_12",
"//core/src/it/scala/com/wixpress/dst/greyhound/core",
"//core/src/it/scala/com/wixpress/dst/greyhound/testenv",
"//core/src/it/scala/com/wixpress/dst/greyhound/testkit",
"//core/src/main/scala/com/wixpress/dst/greyhound/core",
Expand All @@ -21,11 +20,8 @@ specs2_ite2e_test(
"//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils",
"//core/src/test/resources",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/consumer",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
"@ch_qos_logback_logback_classic",
# "@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_2_12",
"@org_apache_kafka_kafka_clients",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.wixpress.dst.greyhound.core.consumer._
import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription.{TopicPattern, Topics}
import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, RecordHandler}
import com.wixpress.dst.greyhound.core.consumer.retry.NonBlockingRetryHelper.fixedRetryTopic
import com.wixpress.dst.greyhound.core.consumer.retry.RetryConfigForTopic.{finiteBlockingRetryConfigForTopic, nonBlockingRetryConfigForTopic}
import com.wixpress.dst.greyhound.core.consumer.retry._
import com.wixpress.dst.greyhound.core.producer.{Encryptor, ProducerRecord}
import com.wixpress.dst.greyhound.core.testkit.{eventuallyZ, AwaitableRef, BaseTestWithSharedEnv}
Expand Down Expand Up @@ -40,8 +41,8 @@ class RetryIT extends BaseTestWithSharedEnv[Env, TestResources] {
done <- Promise.make[Nothing, ConsumerRecord[String, String]]
retryConfig = ZRetryConfig
.perTopicRetries {
case `topic` => RetryConfigForTopic(() => Nil, NonBlockingBackoffPolicy(1.second :: Nil))
case `anotherTopic` => RetryConfigForTopic(() => Nil, NonBlockingBackoffPolicy(1.second :: Nil))
case `topic` => nonBlockingRetryConfigForTopic(1.second :: Nil)
case `anotherTopic` => nonBlockingRetryConfigForTopic(1.second :: Nil)
}
.copy(produceEncryptor = _ => ZIO.succeed(dummyEncryptor))

Expand Down Expand Up @@ -76,7 +77,7 @@ class RetryIT extends BaseTestWithSharedEnv[Env, TestResources] {
retryConfig =
ZRetryConfig
.finiteBlockingRetry(100.millis, 100.millis)
.withCustomRetriesFor { case `topic2` => RetryConfigForTopic(() => 300.millis :: Nil, NonBlockingBackoffPolicy.empty) }
.withCustomRetriesFor { case `topic2` => finiteBlockingRetryConfigForTopic(300.millis :: Nil) }
retryHandler = failingBlockingRecordHandlerWith(consumedValuesRef, Set(topic, topic2)).withDeserializers(StringSerde, StringSerde)
_ <- RecordConsumer.make(configFor(kafka, group, retryConfig, topic, topic2), retryHandler).flatMap { _ =>
producer.produce(ProducerRecord(topic, "bar", Some("foo")), StringSerde, StringSerde) *> Clock.sleep(2.seconds) *>
Expand Down Expand Up @@ -225,7 +226,7 @@ class RetryIT extends BaseTestWithSharedEnv[Env, TestResources] {
invocations <- Ref.make(0)
done <- Promise.make[Nothing, ConsumerRecord[String, String]]
retryConfig = ZRetryConfig.retryForPattern(
RetryConfigForTopic(() => Nil, NonBlockingBackoffPolicy(Seq(1.second, 1.second, 1.seconds)))
nonBlockingRetryConfigForTopic(List(1.second, 1.second, 1.seconds))
)
retryHandler = failingRecordHandler(invocations, done).withDeserializers(StringSerde, StringSerde)
success <- RecordConsumer
Expand Down Expand Up @@ -258,7 +259,9 @@ class RetryIT extends BaseTestWithSharedEnv[Env, TestResources] {
group <- randomGroup
originalTopicCallCount <- Ref.make[Int](0)
retryTopicCallCount <- Ref.make[Int](0)
retryConfig = ZRetryConfig.blockingFollowedByNonBlockingRetry(List(1.second), NonBlockingBackoffPolicy(List(1.seconds)))
retryConfig =
ZRetryConfig
.blockingFollowedByNonBlockingRetry(FiniteBlockingBackoffPolicy(List(1.second)), NonBlockingBackoffPolicy(List(1.seconds)))
retryHandler = failingBlockingNonBlockingRecordHandler(originalTopicCallCount, retryTopicCallCount, topic).withDeserializers(
StringSerde,
StringSerde
Expand All @@ -281,9 +284,7 @@ class RetryIT extends BaseTestWithSharedEnv[Env, TestResources] {
for {
r <- getShared
TestResources(kafka, _) = r
retryConfig = ZRetryConfig.retryForPattern(
RetryConfigForTopic(() => Nil, NonBlockingBackoffPolicy(Seq(1.second, 1.second, 1.seconds)))
)
retryConfig = ZRetryConfig.retryForPattern(nonBlockingRetryConfigForTopic(List(1.second, 1.second, 1.seconds)))
handler = RecordHandler { _: ConsumerRecord[String, String] => ZIO.unit }.withDeserializers(StringSerde, StringSerde)
_ <- RecordConsumer
.make(configFor(kafka, "group", retryConfig, "topic"), handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,11 @@ scala_library(
"@dev_zio_zio_managed_2_12",
"//core/src/it/scala/com/wixpress/dst/greyhound/testkit",
"//core/src/main/scala/com/wixpress/dst/greyhound/core",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/admin",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
# "@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_2_12",
"@dev_zio_zio_test_2_12",
"@org_apache_curator_curator_test",
"@org_apache_kafka_kafka_2_12",
"@org_apache_kafka_kafka_clients",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ scala_library(
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
# "//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"@dev_zio_zio_2_12",
"@dev_zio_zio_test_2_12",
"@org_apache_curator_curator_test",
"@org_apache_kafka_kafka_2_12",
"@org_apache_kafka_kafka_clients",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ object OffsetAndMetadata {
def apply(offsetAndMetadata: KafkaOffsetAndMetadata): OffsetAndMetadata =
OffsetAndMetadata(offsetAndMetadata.offset(), offsetAndMetadata.metadata())

def apply(offset: Offset): OffsetAndMetadata =
OffsetAndMetadata(offset, NO_METADATA)

val NO_METADATA = ""
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ case class TopicPartition(topic: Topic, partition: Partition) {
}

object TopicPartition {
def fromKafka(topicPartition: KafkaTopicPartition): TopicPartition =
TopicPartition(topicPartition.topic, topicPartition.partition)
def apply(topicPartition: KafkaTopicPartition): TopicPartition =
TopicPartition(topicPartition.topic, topicPartition.partition)
}
Expand Down
Loading