From 17381a486adc899fe6dcab623aa8615c5564a7ac Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Wed, 10 Jan 2024 14:01:00 +0100 Subject: [PATCH] Updated Moquette version to official v0.17 --- .../frostserver/settings/CachedSettings.java | 1 + .../settings/SettingsMigrator.java | 9 +++++ FROST-Server.MQTT.Moquette/pom.xml | 4 +- .../mqtt/moquette/MoquetteMqttServer.java | 39 ++++++++++++------- .../iosb/ilt/statests/AbstractTestClass.java | 2 + .../src/test/resources/logback-test.xml | 3 +- docs/settings/settings.md | 8 ++-- helm/frost-server/README.md | 2 +- 8 files changed, 45 insertions(+), 23 deletions(-) diff --git a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/settings/CachedSettings.java b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/settings/CachedSettings.java index cc37a0430..1e45c1e2f 100644 --- a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/settings/CachedSettings.java +++ b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/settings/CachedSettings.java @@ -178,6 +178,7 @@ public int getInt(String name, int defaultValue) { } int value = super.getInt(name, defaultValue); valuesInt.put(name, value); + valuesString.put(name, Integer.toString(value)); return value; } diff --git a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/settings/SettingsMigrator.java b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/settings/SettingsMigrator.java index fb405dc52..c949c701b 100644 --- a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/settings/SettingsMigrator.java +++ b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/settings/SettingsMigrator.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Properties; import java.util.TreeMap; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +70,14 @@ public SettingsMigrator() { keyChanges.put("enableActuation", "plugins.actuation.enable"); keyChanges.put("enableMultiDatastream", "plugins.multiDatastream.enable"); keyChanges.put("persistence.alwaysOrderbyId", "alwaysOrderbyId"); + keyChanges.put("mqtt.session.timeout.seconds", "mqtt.persistent.client.expiration"); + valueCheckers.put("mqtt.persistent.client.expiration", oldValue -> { + final int lastIdx = oldValue.length() - 1; + if (StringUtils.indexOfAny(oldValue, 's', 'm', 'h', 'd', 'w', 'M', 'y') < lastIdx) { + return oldValue + 's'; + } + return oldValue; + }); } public void migrateOldSettings(Properties properties) { diff --git a/FROST-Server.MQTT.Moquette/pom.xml b/FROST-Server.MQTT.Moquette/pom.xml index 2d57e0edd..4fc6bcca1 100644 --- a/FROST-Server.MQTT.Moquette/pom.xml +++ b/FROST-Server.MQTT.Moquette/pom.xml @@ -15,7 +15,7 @@ https://github.com/FraunhoferIOSB/FROST-Server - 0.15.2 + 0.17 UTF-8 ${project.parent.basedir} @@ -32,7 +32,7 @@ ${project.version} - de.fraunhofer.iosb.io.moquette + io.moquette moquette-broker ${moquette.version} diff --git a/FROST-Server.MQTT.Moquette/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/moquette/MoquetteMqttServer.java b/FROST-Server.MQTT.Moquette/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/moquette/MoquetteMqttServer.java index ce5ff8210..3bb9fe5ff 100644 --- a/FROST-Server.MQTT.Moquette/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/moquette/MoquetteMqttServer.java +++ b/FROST-Server.MQTT.Moquette/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/moquette/MoquetteMqttServer.java @@ -30,7 +30,6 @@ import de.fraunhofer.iosb.ilt.frostserver.settings.annotation.DefaultValueInt; import de.fraunhofer.iosb.ilt.frostserver.util.StringHelper; import de.fraunhofer.iosb.ilt.frostserver.util.user.PrincipalExtended; -import io.moquette.BrokerConstants; import io.moquette.broker.Server; import io.moquette.broker.config.IConfig; import io.moquette.interception.AbstractInterceptHandler; @@ -48,6 +47,7 @@ import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; +import java.io.IOException; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; @@ -176,32 +176,36 @@ public void start() { IConfig config = new ConfigWrapper(customSettings); // Ensure the immediate_flush property has a default of true. - customSettings.getBoolean(BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME, true); - config.setProperty(BrokerConstants.PORT_PROPERTY_NAME, Integer.toString(mqttSettings.getPort())); - config.setProperty(BrokerConstants.HOST_PROPERTY_NAME, mqttSettings.getHost()); - config.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.TRUE.toString()); + config.intProp(IConfig.BUFFER_FLUSH_MS_PROPERTY_NAME, 0); + config.setProperty(IConfig.PORT_PROPERTY_NAME, Integer.toString(mqttSettings.getPort())); + config.setProperty(IConfig.HOST_PROPERTY_NAME, mqttSettings.getHost()); + config.setProperty(IConfig.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.TRUE.toString()); String persistentStoreType = customSettings.get(TAG_PERSISTENT_STORE_TYPE, getClass()); if (VALUE_STORE_TYPE_H2.equalsIgnoreCase(persistentStoreType)) { - String defaultPersistentStore = Paths.get(settings.getTempPath(), BrokerConstants.DEFAULT_MOQUETTE_STORE_H2_DB_FILENAME).toString(); - String persistentStore = customSettings.get(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, defaultPersistentStore); - config.setProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, persistentStore); + String tempPath = Paths.get(settings.getTempPath()).toString(); + String persistentStore = customSettings.get(IConfig.DATA_PATH_PROPERTY_NAME, tempPath); + config.setProperty(IConfig.DATA_PATH_PROPERTY_NAME, persistentStore); } - config.setProperty(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, customSettings.get(TAG_WEBSOCKET_PORT, getClass())); + config.setProperty(IConfig.WEB_SOCKET_PORT_PROPERTY_NAME, customSettings.get(TAG_WEBSOCKET_PORT, getClass())); String keystorePath = customSettings.get(TAG_KEYSTORE_PATH, getClass()); if (!keystorePath.isEmpty()) { LOGGER.info("Configuring keystore for ssl"); - config.setProperty(BrokerConstants.JKS_PATH_PROPERTY_NAME, keystorePath); - config.setProperty(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, customSettings.get(TAG_KEYSTORE_PASS, getClass(), false)); - config.setProperty(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, customSettings.get(TAG_KEYMANAGER_PASS, getClass(), false)); - config.setProperty(BrokerConstants.SSL_PORT_PROPERTY_NAME, customSettings.get(TAG_SSL_PORT, getClass())); - config.setProperty(BrokerConstants.WSS_PORT_PROPERTY_NAME, customSettings.get(TAG_SSL_WEBSOCKET_PORT, getClass())); + config.setProperty(IConfig.JKS_PATH_PROPERTY_NAME, keystorePath); + config.setProperty(IConfig.KEY_STORE_PASSWORD_PROPERTY_NAME, customSettings.get(TAG_KEYSTORE_PASS, getClass(), false)); + config.setProperty(IConfig.KEY_MANAGER_PASSWORD_PROPERTY_NAME, customSettings.get(TAG_KEYMANAGER_PASS, getClass(), false)); + config.setProperty(IConfig.SSL_PORT_PROPERTY_NAME, customSettings.get(TAG_SSL_PORT, getClass())); + config.setProperty(IConfig.WSS_PORT_PROPERTY_NAME, customSettings.get(TAG_SSL_WEBSOCKET_PORT, getClass())); } authWrapper = createAuthWrapper(); - mqttBroker.startServer(config, userHandlers, null, authWrapper, authWrapper); + try { + mqttBroker.startServer(config, userHandlers, null, authWrapper, authWrapper); + } catch (IOException ex) { + LOGGER.error("Failed to start MQTT Broker!", ex); + } } private AuthWrapper createAuthWrapper() { @@ -305,6 +309,11 @@ public void onUnsubscribe(InterceptUnsubscribeMessage msg) { public String getID() { return frostClientId; } + + @Override + public void onSessionLoopError(Throwable thrwbl) { + LOGGER.error("MQTT Session Loop caused an exception!", thrwbl); + } } } diff --git a/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/AbstractTestClass.java b/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/AbstractTestClass.java index 05f249ac7..230397e59 100644 --- a/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/AbstractTestClass.java +++ b/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/AbstractTestClass.java @@ -17,6 +17,7 @@ */ package de.fraunhofer.iosb.ilt.statests; +import static de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings.PREFIX_MQTT; import static de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings.PREFIX_PLUGINS; import static de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings.TAG_FILTER_DELETE_ENABLE; @@ -72,6 +73,7 @@ public abstract class AbstractTestClass { defaultProperties.put(PREFIX_PLUGINS + ActuationModelSettings.TAG_ENABLE_ACTUATION, "true"); defaultProperties.put(PREFIX_PLUGINS + MdsModelSettings.TAG_ENABLE_MDS_MODEL, "true"); defaultProperties.put(TAG_FILTER_DELETE_ENABLE, "true"); + defaultProperties.put(PREFIX_MQTT + "session.timeout.seconds", "100"); } public AbstractTestClass(ServerVersion serverVersion) { diff --git a/FROST-Server.Tests/src/test/resources/logback-test.xml b/FROST-Server.Tests/src/test/resources/logback-test.xml index 42f6c00fa..92017606b 100644 --- a/FROST-Server.Tests/src/test/resources/logback-test.xml +++ b/FROST-Server.Tests/src/test/resources/logback-test.xml @@ -9,7 +9,8 @@ - + + diff --git a/docs/settings/settings.md b/docs/settings/settings.md index b81fc5339..2c39d2654 100644 --- a/docs/settings/settings.md +++ b/docs/settings/settings.md @@ -133,12 +133,12 @@ These are settings for the MQTT package. * **mqtt.persistentStoreType:** The way the MQTT server keeps track of subscriptions, either in-memory (`memory`) or using an H2 database (`h2`). Default: `memory`. -* **mqtt.session_queue_size:** +* **mqtt.session.queue.size:** The size of the internal queue the mqtt broker uses per CPU core. Default: `1024`. -* **mqtt.session_timeout_seconds:** - The maximum lifetime of disconnected sessions, in seconds. - Default: `3600`. +* **mqtt.persistent.client.expiration:** + The maximum lifetime of disconnected sessions. For example `100s` for 100 seconds. + Default: `3600s`. * **mqtt.maxInFlight:** The maximum number of "in-flight" messages to allow when sending notifications. * **mqtt.netty.mqtt.message_size:** diff --git a/helm/frost-server/README.md b/helm/frost-server/README.md index 14683ff95..3a895e5cd 100644 --- a/helm/frost-server/README.md +++ b/helm/frost-server/README.md @@ -242,7 +242,7 @@ The following table lists the configurable parameters of the FROST-Server chart | `frost.mqtt.persistentStoreType` | The way the MQTT server keeps track of subscriptions, either in-memory (`memory`) or using an H2 database (`h2`). | `memory` | | `frost.mqtt.secureWebsocketPort` | The port the MQTT server is reachable via secure WebSocket. | `nil` | | `frost.mqtt.session_queue_size` | The size of the internal queue the mqtt broker uses per CPU core. | `1024` | -| `frost.mqtt.session_timeout_seconds` | The maximum lifetime of disconnected sessions, in seconds. | `3600` | +| `frost.mqtt.persistent_client_expiration` | The maximum lifetime of disconnected sessions, postfix with `s` for seconds. | `3600s` | | `frost.mqtt.sslPort` | The port the MQTT server runs on, using ssl. | `nil` | | `frost.mqtt.netty.mqtt.message_size` | The maximum size of MQTT messages. | `8092` (Bytes) | | `frost.mqtt.host` | The external IP address or host name the MQTT server should listen on. Set by default to 0.0.0.0 to listen on all interfaces. | `0.0.0.0` |