Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#605 add support for spring amqp #606

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion spring/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

<properties>
<module-name>io.cloudevents.spring</module-name>
<spring-boot.version>2.4.3</spring-boot.version>
<spring-boot.version>2.7.18</spring-boot.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -63,6 +63,11 @@
<artifactId>spring-messaging</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2020-Present The CloudEvents Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.spring.amqp;

import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventContext;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.CloudEventUtils;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.message.impl.MessageUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConverter;

/**
* A {@link MessageConverter} that can translate to and from a {@link Message} and a {@link CloudEvent}.
* The {@link CloudEventContext} is canonicalized, with key names given a {@code cloudEvents_} prefix in the
* {@link MessageProperties}.
*
* @author Lars Michele
* @see io.cloudevents.spring.messaging.CloudEventMessageConverter used as stencil for the implementation
*/
public class CloudEventMessageConverter implements MessageConverter {

@Override
public CloudEvent fromMessage(Message message) {
return createMessageReader(message).toEvent();
}

@Override
public Message toMessage(Object object, MessageProperties messageProperties) {
if (object instanceof CloudEvent) {
CloudEvent event = (CloudEvent) object;
return CloudEventUtils.toReader(event).read(new MessageBuilderMessageWriter(messageProperties));
}
return null;
}

private MessageReader createMessageReader(Message message) {
return MessageUtils.parseStructuredOrBinaryMessage(
() -> contentType(message.getMessageProperties()),
format -> structuredMessageReader(message, format),
() -> version(message.getMessageProperties()),
version -> binaryMessageReader(message, version)
);
}

private String version(MessageProperties properties) {
Object header = properties.getHeader(CloudEventsHeaders.SPEC_VERSION);
if (header == null) {
header = properties.getHeader(CloudEventsHeaders.ALT_SPEC_VERSION);
}
return header == null ? null : header.toString();
}

private MessageReader binaryMessageReader(Message message, SpecVersion version) {
return new MessageBinaryMessageReader(version, message.getMessageProperties(), message.getBody());
}

private MessageReader structuredMessageReader(Message message, EventFormat format) {
return new GenericStructuredMessageReader(format, message.getBody());
}

private String contentType(MessageProperties properties) {
String contentType = properties.getContentType();
if (contentType == null) {
Object header = properties.getHeader(CloudEventsHeaders.CONTENT_TYPE);
if (header == null) {
header = properties.getHeader(CloudEventsHeaders.ALT_CONTENT_TYPE);
}
return header == null ? null : header.toString();
}
return contentType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2020-Present The CloudEvents Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.cloudevents.spring.amqp;

/**
* Constants used throughout the Spring AMQP binding for cloud events.
*/
public class CloudEventsHeaders {

/**
* CloudEvent attributes MUST be prefixed with either "cloudEvents_" or "cloudEvents:" for use in the application-properties section.
*
* @see <a href="https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/amqp-protocol-binding.md#3131-amqp-application-property-names">
* AMQP Protocol Binding for CloudEvents</a>
*/
public static final String CE_PREFIX = "cloudEvents_";
/**
* CloudEvents AMQP consumers SHOULD understand the "cloudEvents" prefix with both the '_' and the ':' separators as permitted within the constraints of the client model.
*
* @see <a href="https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/amqp-protocol-binding.md#3131-amqp-application-property-names">
* AMQP Protocol Binding for CloudEvents</a>
*/
public static final String ALT_CE_PREFIX = "cloudEvents:";
/**
* The spec version header name.
*/
public static final String SPEC_VERSION = CE_PREFIX + "specversion";
/**
* The alternative spec version header name.
*/
public static final String ALT_SPEC_VERSION = ALT_CE_PREFIX + "specversion";
/**
* The data content-type header name.
*/
public static final String CONTENT_TYPE = CE_PREFIX + "datacontenttype";

/**
* The alternative data content-type header name.
*/
public static final String ALT_CONTENT_TYPE = ALT_CE_PREFIX + "datacontenttype";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2020-Present The CloudEvents Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.spring.amqp;

import static io.cloudevents.spring.amqp.CloudEventsHeaders.*;
import static org.springframework.amqp.support.AmqpHeaders.CONTENT_TYPE;

import java.util.Map;
import java.util.function.BiConsumer;

import io.cloudevents.SpecVersion;
import io.cloudevents.core.data.BytesCloudEventData;
import io.cloudevents.core.impl.StringUtils;
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
import org.springframework.amqp.core.MessageProperties;

/**
* Utility for converting {@link MessageProperties} (message headers) to `CloudEvent` contexts.
*
* @author Lars Michele
* @see io.cloudevents.spring.messaging.MessageBinaryMessageReader used as stencil for the implementation
*/
class MessageBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, Object> {

private final Map<String, Object> headers;

public MessageBinaryMessageReader(SpecVersion version, MessageProperties properties, byte[] payload) {
super(version, payload == null ? null : BytesCloudEventData.wrap(payload));
this.headers = properties.getHeaders();
}

@Override
protected boolean isContentTypeHeader(String key) {
return CONTENT_TYPE.equalsIgnoreCase(key);
}

@Override
protected boolean isCloudEventsHeader(String key) {
return key != null && key.length() > CE_PREFIX.length() && (StringUtils.startsWithIgnoreCase(key, CE_PREFIX)
|| StringUtils.startsWithIgnoreCase(key, ALT_CE_PREFIX));
}

@Override
protected String toCloudEventsKey(String key) {
return key.substring(CE_PREFIX.length()).toLowerCase();
}

@Override
protected void forEachHeader(BiConsumer<String, Object> fn) {
headers.forEach(fn);
}

@Override
protected String toCloudEventsValue(Object value) {
return value.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2020-Present The CloudEvents Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.spring.amqp;

import static io.cloudevents.spring.amqp.CloudEventsHeaders.*;

import java.util.HashMap;
import java.util.Map;

import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;

/**
* Internal utility class for copying <code>CloudEvent</code> context to {@link MessageProperties} (message
* headers).
*
* @author Lars Michele
* @see io.cloudevents.spring.messaging.MessageBuilderMessageWriter used as stencil for the implementation
*/
class MessageBuilderMessageWriter implements CloudEventWriter<Message>, MessageWriter<MessageBuilderMessageWriter, Message> {

private final Map<String, Object> headers = new HashMap<>();

public MessageBuilderMessageWriter(MessageProperties properties) {
this.headers.putAll(properties.getHeaders());
}

@Override
public Message setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
headers.put(CONTENT_TYPE, format.serializedContentType());
return MessageBuilder.withBody(value).copyHeaders(headers).build();
}

@Override
public Message end(CloudEventData value) throws CloudEventRWException {
return MessageBuilder.withBody(value == null ? new byte[0] : value.toBytes()).copyHeaders(headers).build();
}

@Override
public Message end() {
return MessageBuilder.withBody(new byte[0]).copyHeaders(headers).build();
}

@Override
public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
headers.put(CE_PREFIX + name, value);
return this;
}

@Override
public MessageBuilderMessageWriter create(SpecVersion version) {
headers.put(SPEC_VERSION, version.toString());
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* Provides classes related to working with Cloud Events within the context of Spring Amqp.
*/
package io.cloudevents.spring.amqp;
Loading
Loading