Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #5129]Unify protocol port configuration refactoring #5130

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions eventmesh-runtime/conf/eventmesh.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ eventMesh.server.provide.protocols=HTTP,TCP,GRPC
eventMesh.server.cluster=COMMON
eventMesh.server.name=EVENTMESH-runtime
eventMesh.sysid=0000
eventMesh.server.protocol.unified.port=10000
eventMesh.server.protocol.http.enabled=true
eventMesh.server.protocol.grpc.enabled=true
eventMesh.server.protocol.tcp.enabled=true

# Legacy port configurations - to be deprecated
eventMesh.server.tcp.port=10000
eventMesh.server.http.port=10105
eventMesh.server.grpc.port=10205
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -175,9 +177,10 @@ public void start() throws Exception {
.childHandler(new HttpsServerInitializer(useTLS ? SSLContextFactory.getSslContext(eventMeshHttpConfiguration) : null))
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);

log.info("HTTPServer[port={}] started.", this.getPort());
int port = eventMeshHttpConfiguration.getProtocolConfiguration().getUnifiedPort();
log.info("HTTPServer[port={}] started.", port);

bootstrap.bind(this.getPort())
bootstrap.bind(port)
.channel()
.closeFuture()
.sync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,26 @@

package org.apache.eventmesh.runtime.boot;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.ChannelTrafficShapingHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.SimpleChannelInboundHandler;
import io.opentelemetry.api.trace.Span;

import org.apache.eventmesh.common.Pair;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.protocol.tcp.Command;
Expand Down Expand Up @@ -52,26 +72,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.ChannelTrafficShapingHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.opentelemetry.api.trace.Span;

import lombok.extern.slf4j.Slf4j;

/**
Expand Down Expand Up @@ -143,7 +143,7 @@ public void start() throws Exception {
.childHandler(new TcpServerInitializer());

try {
int port = eventMeshTCPConfiguration.getEventMeshTcpServerPort();
int port = eventMeshTCPConfiguration.getProtocolConfiguration().getUnifiedPort();
ChannelFuture f = bootstrap.bind(port).sync();
log.info("EventMeshTCPServer[port={}] started.....", port);
f.channel().closeFuture().sync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void init() throws Exception {

grpcRetryer = new GrpcRetryer(this);

int serverPort = eventMeshGrpcConfiguration.getGrpcServerPort();
int serverPort = eventMeshGrpcConfiguration.getProtocolConfiguration().getUnifiedPort();

server = ServerBuilder.forPort(serverPort)
.addService(new ConsumerService(this, sendMsgExecutor, replyMsgExecutor))
Expand Down Expand Up @@ -175,7 +175,7 @@ public boolean register() {
boolean registerResult = false;
try {
String endPoints = IPUtils.getLocalAddress()
+ EventMeshConstants.IP_PORT_SEPARATOR + eventMeshGrpcConfiguration.getGrpcServerPort();
+ EventMeshConstants.IP_PORT_SEPARATOR + eventMeshGrpcConfiguration.getProtocolConfiguration().getUnifiedPort();
EventMeshRegisterInfo eventMeshRegisterInfo = new EventMeshRegisterInfo();
eventMeshRegisterInfo.setEventMeshClusterName(eventMeshGrpcConfiguration.getEventMeshCluster());
eventMeshRegisterInfo.setEventMeshName(eventMeshGrpcConfiguration.getEventMeshName() + "-"
Expand All @@ -192,7 +192,7 @@ public boolean register() {

private void unRegister() throws Exception {
String endPoints = IPUtils.getLocalAddress()
+ EventMeshConstants.IP_PORT_SEPARATOR + eventMeshGrpcConfiguration.getGrpcServerPort();
+ EventMeshConstants.IP_PORT_SEPARATOR + eventMeshGrpcConfiguration.getProtocolConfiguration().getUnifiedPort();
EventMeshUnRegisterInfo eventMeshUnRegisterInfo = new EventMeshUnRegisterInfo();
eventMeshUnRegisterInfo.setEventMeshClusterName(eventMeshGrpcConfiguration.getEventMeshCluster());
eventMeshUnRegisterInfo.setEventMeshName(eventMeshGrpcConfiguration.getEventMeshName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,25 @@
public class EventMeshGrpcConfiguration extends CommonConfiguration {

@ConfigField(field = "grpc.port", notNull = true, beNumber = true)
@Deprecated
private int grpcServerPort = 10205;

@ConfigField(field = "protocol")
private ProtocolConfiguration protocolConfiguration = new ProtocolConfiguration();

public int getGrpcServerPort() {
return protocolConfiguration.getGrpcPort();
}

public void setGrpcServerPort(int port) {
this.grpcServerPort = port;
this.protocolConfiguration.setGrpcPort(port);
}

public boolean isGrpcEnabled() {
return protocolConfiguration.isGrpcEnabled();
}

@ConfigField(field = "session.expiredInMills")
private int eventMeshSessionExpiredInMills = 60000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import java.util.Collections;
import java.util.List;

import inet.ipaddr.IPAddress;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

import inet.ipaddr.IPAddress;
import org.apache.eventmesh.runtime.configuration.ProtocolConfiguration;

@Data
@EqualsAndHashCode(callSuper = true)
Expand All @@ -37,8 +39,28 @@
public class EventMeshHTTPConfiguration extends CommonConfiguration {

@ConfigField(field = "http.port", notNull = true, beNumber = true)
@Deprecated
private int httpServerPort = 10105;

@ConfigField(field = "protocol")
private ProtocolConfiguration protocolConfiguration = new ProtocolConfiguration();

public int getHttpServerPort() {
return protocolConfiguration.getHttpPort();
}

public void setHttpServerPort(int port) {
this.httpServerPort = port;
this.protocolConfiguration.setHttpPort(port);
}

public boolean isHttpEnabled() {
return protocolConfiguration.isHttpEnabled();
}

@ConfigField(field = "http.path")
private String eventMeshServerHttpPath = "/eventmesh";

@ConfigField(field = "batchmsg.batch.enabled")
private boolean eventMeshServerBatchMsgBatchEnabled = Boolean.TRUE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,25 @@
public class EventMeshTCPConfiguration extends CommonConfiguration {

@ConfigField(field = "tcp.port")
@Deprecated
private int eventMeshTcpServerPort = 10000;

@ConfigField(field = "protocol")
private ProtocolConfiguration protocolConfiguration = new ProtocolConfiguration();

public int getEventMeshTcpServerPort() {
return protocolConfiguration.getTcpPort();
}

public void setEventMeshTcpServerPort(int port) {
this.eventMeshTcpServerPort = port;
this.protocolConfiguration.setTcpPort(port);
}

public boolean isTcpEnabled() {
return protocolConfiguration.isTcpEnabled();
}

@ConfigField(field = "tcp.allIdleSeconds")
private int eventMeshTcpIdleAllSeconds = 60;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.runtime.configuration;

import org.apache.eventmesh.common.config.Config;
import org.apache.eventmesh.common.config.ConfigField;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@Config(prefix = "eventMesh.server.protocol")
public class ProtocolConfiguration {

@ConfigField(field = "unified.port", notNull = true, beNumber = true)
private int unifiedPort = 10000;

@ConfigField(field = "http.enabled")
private boolean httpEnabled = true;

@ConfigField(field = "grpc.enabled")
private boolean grpcEnabled = true;

@ConfigField(field = "tcp.enabled")
private boolean tcpEnabled = true;

public int getHttpPort() {
return unifiedPort;
}

public int getGrpcPort() {
return unifiedPort;
}

public int getTcpPort() {
return unifiedPort;
}

public void setHttpPort(int port) {
this.unifiedPort = port;
}

public void setGrpcPort(int port) {
this.unifiedPort = port;
}

public void setTcpPort(int port) {
this.unifiedPort = port;
}
}
Loading