-
Notifications
You must be signed in to change notification settings - Fork 160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a sample app with Spring Kafka #364
base: main
Are you sure you want to change the base?
Conversation
Closes #359 |
I don't know what's going on with Github actions. Doesn't look like anything I did? |
@dsyer I'm looking at it, it seems related to this PR, given it doesn't fail here: https://github.com/cloudevents/sdk-java/pull/363/checks?check_run_id=2226508723#step:4:6464 The javadoc tool is failing because it can't find a class I guess? https://github.com/cloudevents/sdk-java/pull/364/checks?check_run_id=2216090212#step:4:2680 Can you repro locally running |
Thanks for looking. It fails for me also locally, so I guess that's a problem, but I don't know how to solve it. I mean, the Kafka classes are on the classpath for sure, so what's the problem exactly? |
I think it's a bug in the Maven javadoc plugin. I'll see if I can find a workaround, or maybe bump the plugin version or something. |
If you use Spring Cloud Function 3.1.3 then Kafka should work with Spring Cloud Streams out of the box already. This sample adds support for vanilla Spring Kafka with `@KafkaListener` where the listener can listen for and emit `CloudEvent`. Some issues with existing messaging support came to light and these have been ironed out in the process... Signed-off-by: Dave Syer <[email protected]>
Update: the workaround was to add |
@dsyer I have a super noob spring question here: why do we need this? We already implement serializers and deserializers for Kafka: https://github.com/cloudevents/sdk-java/tree/master/kafka. Isn't Spring using the Kafka |
I guess there's nothing stopping you from using the existing serdes, but Spring is more flexible with listener method signatures so you won't get the full benefit. The sample wouldn't work if you used only cloudevents-kafka. Bits of it might, but, for example the I also made a design decision in the cloudevents-spring components that |
@dsyer I see your point, but IMHO this is not a good reason, at least for me, to have this additional code to maintain that, at the end of the day, does the same thing of another part of this project. From my understanding (but I might be wrong here) you could just swap the serializers here https://github.com/cloudevents/sdk-java/pull/364/files#diff-e0e93e8e5ff2f3d2d6955abaa1a4bd104e93d4fbe49fae37798cbe96cd6e9fcaR2 with the ones provided by
In that case, I think the test code has to be modified to use the byte deserializer in the listener instance.
I don't understand this particular point, which might be the reason why I'm confused to see this PR. Don't you have the |
That's not the purpose of this test. The purpose is to get the raw Kafka message and look at its raw content to verify it independent of any custom serdes.
Yes, probably, but then you'd have to build the knowledge of Kafka header name conventions into the converters we already have (and they so far didn't need to know about that). The same thing will happen if we want to use RabbitMQ with the |
To me this prefix problem sounds either:
But for sure having a single encoder/decoder for each transport doesn't sound like a good approach to me, given it's also not scalable... |
So where are we going with this? Isn't cloudevents-kafka already basically a "single encoder/decoder for each transport"? Anyway, here's a branch using the serdes from cloudevents-kafka: https://github.com/dsyer/sdk-java/tree/kafka-only. If you prefer that approach I can live with it (but it drops the idea of a "canonical" header). |
FWIW, attribute prefixes are only relevant on the edges while in the application space they carry no value, hence having canonical or no-prefix (which is a form of canonical) would address consistency in dealing with CE. This is especially relevant when sources and targets are different (e.g., from HTTP to Kafka). In any case user must NOT care where the CE came from or where it goes when it comes to application code. That should be framework's responsibility |
It is, but my understanding of the reasoning behind I wonder if we can somehow (and maybe this requires some changes on the spring side) use always the https://github.com/cloudevents/sdk-java/blob/master/spring/src/main/java/io/cloudevents/spring/messaging/CloudEventMessageConverter.java, where the I understand how the solution of using |
I don't think we're on the same page here yet. I dislike the idea of adding encoders/decoders for every new transport, but that's the current design of Cloud Events and the sdk ( Did you look at the "kafka-only" branch? Is that closer to what you like (using the |
Yes, but you don't need to know the transport details at all. You just need to map "protocol xyz" to its prefix, if any (some protocol bindings don't even have prefix). You could just provide in the MessageConverter a string identifying the id of the protocol, like: public Object fromMessage(Protocol proto, Message<?> message, Class<?> targetClass) {
if (proto.getName() == "http") // then use ce-
if (proto.getName() == "kafka") // then use ce_
} Alternatively, you could contain the protocol information inside the Because CloudEvents spec supports only a bunch of protocol bindings, the mantainance burden of such converter is very low, and at the same time is more Spring idiomatic and easier for the end user. I think this approach is the best one we can follow IMO, is it somehow feasible? If not, then your other kafka branch is fine for me, although i would really love to try this approach using always CloudEventMessageConverter. |
Fix typo Co-authored-by: Eddú Meléndez Gonzales <[email protected]>
If you use Spring Cloud Function 3.1.3 then Kafka should work
with Spring Cloud Streams out of the box already. This sample
adds support for vanilla Spring Kafka with
@KafkaListener
wherethe listener can listen for and emit
CloudEvent
. Some issueswith existing messaging support came to light and these have been
ironed out in the process.