Skip to content

Commit

Permalink
Updated Moquette version to official v0.17
Browse files Browse the repository at this point in the history
  • Loading branch information
hylkevds committed Jan 10, 2024
1 parent 2b02698 commit 17381a4
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public int getInt(String name, int defaultValue) {
}
int value = super.getInt(name, defaultValue);
valuesInt.put(name, value);
valuesString.put(name, Integer.toString(value));
return value;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -69,6 +70,14 @@ public SettingsMigrator() {
keyChanges.put("enableActuation", "plugins.actuation.enable");
keyChanges.put("enableMultiDatastream", "plugins.multiDatastream.enable");
keyChanges.put("persistence.alwaysOrderbyId", "alwaysOrderbyId");
keyChanges.put("mqtt.session.timeout.seconds", "mqtt.persistent.client.expiration");
valueCheckers.put("mqtt.persistent.client.expiration", oldValue -> {
final int lastIdx = oldValue.length() - 1;
if (StringUtils.indexOfAny(oldValue, 's', 'm', 'h', 'd', 'w', 'M', 'y') < lastIdx) {
return oldValue + 's';
}
return oldValue;
});
}

public void migrateOldSettings(Properties properties) {
Expand Down
4 changes: 2 additions & 2 deletions FROST-Server.MQTT.Moquette/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<url>https://github.com/FraunhoferIOSB/FROST-Server</url>

<properties>
<moquette.version>0.15.2</moquette.version>
<moquette.version>0.17</moquette.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<root.basedir>${project.parent.basedir}</root.basedir>
</properties>
Expand All @@ -32,7 +32,7 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>de.fraunhofer.iosb.io.moquette</groupId>
<groupId>io.moquette</groupId>
<artifactId>moquette-broker</artifactId>
<version>${moquette.version}</version>
<exclusions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import de.fraunhofer.iosb.ilt.frostserver.settings.annotation.DefaultValueInt;
import de.fraunhofer.iosb.ilt.frostserver.util.StringHelper;
import de.fraunhofer.iosb.ilt.frostserver.util.user.PrincipalExtended;
import io.moquette.BrokerConstants;
import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.interception.AbstractInterceptHandler;
Expand All @@ -48,6 +47,7 @@
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -176,32 +176,36 @@ public void start() {
IConfig config = new ConfigWrapper(customSettings);

// Ensure the immediate_flush property has a default of true.
customSettings.getBoolean(BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME, true);
config.setProperty(BrokerConstants.PORT_PROPERTY_NAME, Integer.toString(mqttSettings.getPort()));
config.setProperty(BrokerConstants.HOST_PROPERTY_NAME, mqttSettings.getHost());
config.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.TRUE.toString());
config.intProp(IConfig.BUFFER_FLUSH_MS_PROPERTY_NAME, 0);
config.setProperty(IConfig.PORT_PROPERTY_NAME, Integer.toString(mqttSettings.getPort()));
config.setProperty(IConfig.HOST_PROPERTY_NAME, mqttSettings.getHost());
config.setProperty(IConfig.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.TRUE.toString());

String persistentStoreType = customSettings.get(TAG_PERSISTENT_STORE_TYPE, getClass());
if (VALUE_STORE_TYPE_H2.equalsIgnoreCase(persistentStoreType)) {
String defaultPersistentStore = Paths.get(settings.getTempPath(), BrokerConstants.DEFAULT_MOQUETTE_STORE_H2_DB_FILENAME).toString();
String persistentStore = customSettings.get(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, defaultPersistentStore);
config.setProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, persistentStore);
String tempPath = Paths.get(settings.getTempPath()).toString();
String persistentStore = customSettings.get(IConfig.DATA_PATH_PROPERTY_NAME, tempPath);
config.setProperty(IConfig.DATA_PATH_PROPERTY_NAME, persistentStore);
}
config.setProperty(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, customSettings.get(TAG_WEBSOCKET_PORT, getClass()));
config.setProperty(IConfig.WEB_SOCKET_PORT_PROPERTY_NAME, customSettings.get(TAG_WEBSOCKET_PORT, getClass()));

String keystorePath = customSettings.get(TAG_KEYSTORE_PATH, getClass());
if (!keystorePath.isEmpty()) {
LOGGER.info("Configuring keystore for ssl");
config.setProperty(BrokerConstants.JKS_PATH_PROPERTY_NAME, keystorePath);
config.setProperty(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, customSettings.get(TAG_KEYSTORE_PASS, getClass(), false));
config.setProperty(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, customSettings.get(TAG_KEYMANAGER_PASS, getClass(), false));
config.setProperty(BrokerConstants.SSL_PORT_PROPERTY_NAME, customSettings.get(TAG_SSL_PORT, getClass()));
config.setProperty(BrokerConstants.WSS_PORT_PROPERTY_NAME, customSettings.get(TAG_SSL_WEBSOCKET_PORT, getClass()));
config.setProperty(IConfig.JKS_PATH_PROPERTY_NAME, keystorePath);
config.setProperty(IConfig.KEY_STORE_PASSWORD_PROPERTY_NAME, customSettings.get(TAG_KEYSTORE_PASS, getClass(), false));
config.setProperty(IConfig.KEY_MANAGER_PASSWORD_PROPERTY_NAME, customSettings.get(TAG_KEYMANAGER_PASS, getClass(), false));
config.setProperty(IConfig.SSL_PORT_PROPERTY_NAME, customSettings.get(TAG_SSL_PORT, getClass()));
config.setProperty(IConfig.WSS_PORT_PROPERTY_NAME, customSettings.get(TAG_SSL_WEBSOCKET_PORT, getClass()));
}

authWrapper = createAuthWrapper();

mqttBroker.startServer(config, userHandlers, null, authWrapper, authWrapper);
try {
mqttBroker.startServer(config, userHandlers, null, authWrapper, authWrapper);
} catch (IOException ex) {
LOGGER.error("Failed to start MQTT Broker!", ex);
}
}

private AuthWrapper createAuthWrapper() {
Expand Down Expand Up @@ -305,6 +309,11 @@ public void onUnsubscribe(InterceptUnsubscribeMessage msg) {
public String getID() {
return frostClientId;
}

@Override
public void onSessionLoopError(Throwable thrwbl) {
LOGGER.error("MQTT Session Loop caused an exception!", thrwbl);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package de.fraunhofer.iosb.ilt.statests;

import static de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings.PREFIX_MQTT;
import static de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings.PREFIX_PLUGINS;
import static de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings.TAG_FILTER_DELETE_ENABLE;

Expand Down Expand Up @@ -72,6 +73,7 @@ public abstract class AbstractTestClass {
defaultProperties.put(PREFIX_PLUGINS + ActuationModelSettings.TAG_ENABLE_ACTUATION, "true");
defaultProperties.put(PREFIX_PLUGINS + MdsModelSettings.TAG_ENABLE_MDS_MODEL, "true");
defaultProperties.put(TAG_FILTER_DELETE_ENABLE, "true");
defaultProperties.put(PREFIX_MQTT + "session.timeout.seconds", "100");
}

public AbstractTestClass(ServerVersion serverVersion) {
Expand Down
3 changes: 2 additions & 1 deletion FROST-Server.Tests/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

<logger name="de.fraunhofer.iosb.ilt.frostserver.parser" level="OFF"/>
<logger name="de.fraunhofer.iosb.ilt.statests" level="INFO"/>
<logger name="io.moquette" level="WARN"/>
<logger name="io.moquette" level="INFO"/>
<logger name="io.moquette.broker.PostOffice" level="WARN"/>
<logger name="io.moquette.broker.metrics.MQTTMessageLogger" level="WARN"/>
<logger name="io.moquette.broker.SessionEventLoop" level="WARN"/>
<logger name="liquibase" level="WARN"/>
Expand Down
8 changes: 4 additions & 4 deletions docs/settings/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,12 @@ These are settings for the MQTT package.
* **mqtt.persistentStoreType:**
The way the MQTT server keeps track of subscriptions, either in-memory (`memory`) or using an H2 database (`h2`).
Default: `memory`.
* **mqtt.session_queue_size:**
* **mqtt.session.queue.size:**
The size of the internal queue the mqtt broker uses per CPU core.
Default: `1024`.
* **mqtt.session_timeout_seconds:**
The maximum lifetime of disconnected sessions, in seconds.
Default: `3600`.
* **mqtt.persistent.client.expiration:**
The maximum lifetime of disconnected sessions. For example `100s` for 100 seconds.
Default: `3600s`.
* **mqtt.maxInFlight:**
The maximum number of "in-flight" messages to allow when sending notifications.
* **mqtt.netty.mqtt.message_size:**
Expand Down
2 changes: 1 addition & 1 deletion helm/frost-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ The following table lists the configurable parameters of the FROST-Server chart
| `frost.mqtt.persistentStoreType` | The way the MQTT server keeps track of subscriptions, either in-memory (`memory`) or using an H2 database (`h2`). | `memory` |
| `frost.mqtt.secureWebsocketPort` | The port the MQTT server is reachable via secure WebSocket. | `nil` |
| `frost.mqtt.session_queue_size` | The size of the internal queue the mqtt broker uses per CPU core. | `1024` |
| `frost.mqtt.session_timeout_seconds` | The maximum lifetime of disconnected sessions, in seconds. | `3600` |
| `frost.mqtt.persistent_client_expiration` | The maximum lifetime of disconnected sessions, postfix with `s` for seconds. | `3600s` |
| `frost.mqtt.sslPort` | The port the MQTT server runs on, using ssl. | `nil` |
| `frost.mqtt.netty.mqtt.message_size` | The maximum size of MQTT messages. | `8092` (Bytes) |
| `frost.mqtt.host` | The external IP address or host name the MQTT server should listen on. Set by default to 0.0.0.0 to listen on all interfaces. | `0.0.0.0` |
Expand Down

0 comments on commit 17381a4

Please sign in to comment.