[IOTDB-565] MQTT Protocol Support (#929)

* [IOTDB-503] Add checkTimeseriesExists for session

* [IOTDB-565] MQTT Protocol Support

* [IOTDB-565] upgrade fastjson to latest version
This commit is contained in:
Xin Wang 2020-04-09 08:17:13 +08:00 committed by GitHub
parent 4bc238f43e
commit d6207c0745
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 1617 additions and 19 deletions

View File

@ -215,3 +215,11 @@ The following class is modified from Apache commons-collections
./tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Murmur128Hash.java
Relevant pr is: https://github.com/apache/commons-collections/pull/83/
------------
The following class is modified from moquette (https://github.com/moquette-io/moquette),
which is under Apache License 2.0:
./server/src/main/java/io/moquette/broker/MQTTConnection.java
Relevant pr is: https://github.com/moquette-io/moquette/pull/454

View File

@ -221,7 +221,7 @@ org.apache.commons:commons-collections4:4.0
org.apache.commons:commons-lang3:3.1
org.apache.thrift:libthrift:0.9.3
org.xerial.snappy:snappy-java:1.0.5-M1
com.alibaba:fastjson:1.2.31
com.alibaba:fastjson:1.2.67
com.sun.xml.fastinfoset:FastInfoset:1.2.14
io.airlift.airline:0.8
com.google.guava.guava:21.0

View File

@ -0,0 +1,100 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# MQTT Protocol
[MQTT](http://mqtt.org/) is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol.
It was designed as an extremely lightweight publish/subscribe messaging transport.
It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.
IoTDB supports the MQTT v3.1(an OASIS Standard) protocol.
IoTDB server includes a built-in MQTT service that allows remote devices send messages into IoTDB server directly.
<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://user-images.githubusercontent.com/6711230/78357432-0c71cf80-75e4-11ea-98aa-c43a54d469ce.png">
## Built-in MQTT Service
The Built-in MQTT Service provide the ability of direct connection to IoTDB through MQTT. It listen the publish messages from MQTT clients
and then write the data into storage immediately.
The MQTT topic is corresponding to IoTDB timeseries.
The messages payload can be format to events by `PayloadFormatter` which loaded by java SPI, and the default implementation is `JSONPayloadFormatter`.
The default `json` formatter support two json format, and the following is an MQTT message payload example:
```json
{
"device":"root.sg.d1",
"timestamp":1586076045524,
"measurements":["s1","s2"],
"values":[0.530635,0.530635]
}
```
or
```json
{
"device":"root.sg.d1",
"timestamps":[1586076045524,1586076065526],
"measurements":["s1","s2"],
"values":[[0.530635,0.530635], [0.530655,0.530695]]
}
```
<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://user-images.githubusercontent.com/6711230/78357469-1bf11880-75e4-11ea-978f-a53996667a0d.png">
## MQTT Configurations
The IoTDB MQTT service load configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-engine.properties` by default.
Configurations are as following:
| NAME | DESCRIPTION | DEFAULT |
| ------------- |:-------------:|:------:|
| enable_mqtt_service | whether to enable the mqtt service | true |
| mqtt_host | the mqtt service binding host | 0.0.0.0 |
| mqtt_port | the mqtt service binding port | 1883 |
| mqtt_handler_pool_size | the handler pool size for handing the mqtt messages | 1 |
| mqtt_payload_formatter | the mqtt message payload formatter | json |
## Examples
The following is an example which a mqtt client send messages to IoTDB server.
```java
MQTT mqtt = new MQTT();
mqtt.setHost("127.0.0.1", 1883);
mqtt.setUserName("root");
mqtt.setPassword("root");
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Random random = new Random();
for (int i = 0; i < 10; i++) {
String payload = String.format("{\n" +
"\"device\":\"root.sg.d1\",\n" +
"\"timestamp\":%d,\n" +
"\"measurements\":[\"s1\"],\n" +
"\"values\":[%f]\n" +
"}", System.currentTimeMillis(), random.nextDouble());
connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
}
connection.disconnect();
}
```

View File

@ -27,7 +27,8 @@ The example is to show how to send data to a IoTDB server from a Flink job.
## Usage
* Run `org.apache.iotdb.flink.FlinkIoTDBSink.java` to launch the local iotDB server and run the flink job on local mini cluster.
* Launch the IoTDB server.
* Run `org.apache.iotdb.flink.FlinkIoTDBSink.java` to run the flink job on local mini cluster.
# TsFile-Flink-Connector Example

View File

@ -36,11 +36,6 @@
<artifactId>flink-iotdb-connector</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>tsfile</artifactId>

View File

@ -20,7 +20,6 @@ package org.apache.iotdb.flink;
import com.google.common.collect.Lists;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.iotdb.db.service.IoTDB;
import java.util.HashMap;
import java.util.Map;
@ -28,11 +27,6 @@ import java.util.Random;
public class FlinkIoTDBSink {
public static void main(String[] args) throws Exception {
// launch the local iotDB server at default port: 6667
IoTDB.main(args);
Thread.sleep(3000);
// run the flink job on local mini cluster
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

32
example/mqtt/README.md Normal file
View File

@ -0,0 +1,32 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# IoTDB-MQTT-Broker Example
## Function
```
The example is to show how to send data to IoTDB from a mqtt client.
```
## Usage
* Launch the IoTDB server.
* setup storage group `SET STORAGE GROUP TO root.sg` and create time timeseries `CREATE TIMESERIES root.sg.d1.s1 WITH DATATYPE=DOUBLE, ENCODING=PLAIN`.
* Run `org.apache.iotdb.mqtt.MQTTClient` to run the mqtt client and send events to server.

39
example/mqtt/pom.xml Normal file
View File

@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-examples</artifactId>
<version>0.10.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>mqtt-example</artifactId>
<name>IoTDB-MQTT Examples</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.mqtt;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import java.util.Random;
public class MQTTClient {
public static void main(String[] args) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost("127.0.0.1", 1883);
mqtt.setUserName("root");
mqtt.setPassword("root");
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Random random = new Random();
for (int i = 0; i < 10; i++) {
String payload = String.format("{\n" +
"\"device\":\"root.sg.d1\",\n" +
"\"timestamp\":%d,\n" +
"\"measurements\":[\"s1\"],\n" +
"\"values\":[%f]\n" +
"}", System.currentTimeMillis(), random.nextDouble());
connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
}
connection.disconnect();
}
}

View File

@ -41,6 +41,7 @@
<module>jdbc</module>
<module>hadoop</module>
<module>flink</module>
<module>mqtt</module>
</modules>
<build>
<pluginManagement>

26
pom.xml
View File

@ -123,6 +123,7 @@
<common.lang3.version>3.8.1</common.lang3.version>
<common.logging.version>1.1.3</common.logging.version>
<guava.version>21.0</guava.version>
<fastjson.version>1.2.67</fastjson.version>
<jline.version>2.14.5</jline.version>
<jetty.version>9.4.24.v20191120</jetty.version>
<metrics.version>3.2.6</metrics.version>
@ -135,6 +136,19 @@
<!-- By default, the argLine is empty-->
<argLine/>
</properties>
<repositories>
<!-- repository for moquette -->
<repository>
<id>bintray</id>
<url>https://jcenter.bintray.com</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<!--
if we claim dependencies in dependencyManagement, then we do not claim
their version in sub-project's pom, but we have to claim themselves again
@ -154,7 +168,7 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.31</version>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
@ -459,6 +473,16 @@
<artifactId>airline</artifactId>
<version>${airline.version}</version>
</dependency>
<dependency>
<groupId>io.moquette</groupId>
<artifactId>moquette-broker</artifactId>
<version>0.12.1</version>
</dependency>
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.12</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>

View File

@ -61,12 +61,14 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.antlr/antlr-runtime4 -->
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
@ -95,6 +97,10 @@
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-json</artifactId>
</dependency>
<dependency>
<groupId>io.moquette</groupId>
<artifactId>moquette-broker</artifactId>
</dependency>
<!-- for mocked test-->
<dependency>
<groupId>org.powermock</groupId>

View File

@ -451,4 +451,24 @@ partition_interval=604800
# For example, your partitionInterval is 86400 and you want to insert data in 3 different days,
# you should set this param >= 6 (for sequence and unsequence)
# default number is 10
memtable_num_in_each_storage_group=10
memtable_num_in_each_storage_group=10
####################
### MQTT Broker Configuration
####################
# whether to enable the mqtt service.
enable_mqtt_service=true
# the mqtt service binding host.
mqtt_host=0.0.0.0
# the mqtt service binding port.
mqtt_port=1883
# the handler pool size for handing the mqtt messages.
mqtt_handler_pool_size=1
# the mqtt message payload formatter.
mqtt_payload_formatter=json

View File

@ -0,0 +1,503 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.moquette.broker;
import io.moquette.broker.subscriptions.Topic;
import io.moquette.broker.security.IAuthenticator;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
import static io.netty.handler.codec.mqtt.MqttQoS.*;
// NOTE:
// override the MQTTConnection class in the moquette 0.12.1 jar to fix the PUBACK flush issue
// https://github.com/moquette-io/moquette/pull/454
// when moquette fixed version released, we can remove this.
final class MQTTConnection {
private static final Logger LOG = LoggerFactory.getLogger(MQTTConnection.class);
final Channel channel;
private BrokerConfiguration brokerConfig;
private IAuthenticator authenticator;
private SessionRegistry sessionRegistry;
private final PostOffice postOffice;
private boolean connected;
private final AtomicInteger lastPacketId = new AtomicInteger(0);
MQTTConnection(Channel channel, BrokerConfiguration brokerConfig, IAuthenticator authenticator,
SessionRegistry sessionRegistry, PostOffice postOffice) {
this.channel = channel;
this.brokerConfig = brokerConfig;
this.authenticator = authenticator;
this.sessionRegistry = sessionRegistry;
this.postOffice = postOffice;
this.connected = false;
}
void handleMessage(MqttMessage msg) {
MqttMessageType messageType = msg.fixedHeader().messageType();
LOG.debug("Received MQTT message, type: {}, channel: {}", messageType, channel);
switch (messageType) {
case CONNECT:
processConnect((MqttConnectMessage) msg);
break;
case SUBSCRIBE:
processSubscribe((MqttSubscribeMessage) msg);
break;
case UNSUBSCRIBE:
processUnsubscribe((MqttUnsubscribeMessage) msg);
break;
case PUBLISH:
processPublish((MqttPublishMessage) msg);
break;
case PUBREC:
processPubRec(msg);
break;
case PUBCOMP:
processPubComp(msg);
break;
case PUBREL:
processPubRel(msg);
break;
case DISCONNECT:
processDisconnect(msg);
break;
case PUBACK:
processPubAck(msg);
break;
case PINGREQ:
MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, AT_MOST_ONCE,
false, 0);
MqttMessage pingResp = new MqttMessage(pingHeader);
channel.writeAndFlush(pingResp).addListener(CLOSE_ON_FAILURE);
break;
default:
LOG.error("Unknown MessageType: {}, channel: {}", messageType, channel);
break;
}
}
private void processPubComp(MqttMessage msg) {
final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
final Session session = sessionRegistry.retrieve(getClientId());
session.processPubComp(messageID);
}
private void processPubRec(MqttMessage msg) {
final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
final Session session = sessionRegistry.retrieve(getClientId());
session.processPubRec(messageID);
}
static MqttMessage pubrel(int messageID) {
MqttFixedHeader pubRelHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false, 0);
return new MqttMessage(pubRelHeader, from(messageID));
}
private void processPubAck(MqttMessage msg) {
final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
Session session = sessionRegistry.retrieve(getClientId());
session.pubAckReceived(messageID);
}
void processConnect(MqttConnectMessage msg) {
MqttConnectPayload payload = msg.payload();
String clientId = payload.clientIdentifier();
final String username = payload.userName();
LOG.trace("Processing CONNECT message. CId={} username: {} channel: {}", clientId, username, channel);
if (isNotProtocolVersion(msg, MqttVersion.MQTT_3_1) && isNotProtocolVersion(msg, MqttVersion.MQTT_3_1_1)) {
LOG.warn("MQTT protocol version is not valid. CId={} channel: {}", clientId, channel);
abortConnection(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);
return;
}
final boolean cleanSession = msg.variableHeader().isCleanSession();
if (clientId == null || clientId.length() == 0) {
if (!brokerConfig.isAllowZeroByteClientId()) {
LOG.warn("Broker doesn't permit MQTT empty client ID. Username: {}, channel: {}", username, channel);
abortConnection(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
return;
}
if (!cleanSession) {
LOG.warn("MQTT client ID cannot be empty for persistent session. Username: {}, channel: {}",
username, channel);
abortConnection(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
return;
}
// Generating client id.
clientId = UUID.randomUUID().toString().replace("-", "");
LOG.debug("Client has connected with integration generated id: {}, username: {}, channel: {}", clientId,
username, channel);
}
if (!login(msg, clientId)) {
abortConnection(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
try {
LOG.trace("Binding MQTTConnection (channel: {}) to session", channel);
sessionRegistry.bindToSession(this, msg, clientId);
initializeKeepAliveTimeout(channel, msg, clientId);
setupInflightResender(channel);
NettyUtils.clientID(channel, clientId);
LOG.trace("CONNACK sent, channel: {}", channel);
postOffice.dispatchConnection(msg);
LOG.trace("dispatch connection: {}", msg.toString());
} catch (SessionCorruptedException scex) {
LOG.warn("MQTT session for client ID {} cannot be created, channel: {}", clientId, channel);
abortConnection(CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}
}
private void setupInflightResender(Channel channel) {
channel.pipeline()
.addFirst("inflightResender", new InflightResender(5_000, TimeUnit.MILLISECONDS));
}
private void initializeKeepAliveTimeout(Channel channel, MqttConnectMessage msg, String clientId) {
int keepAlive = msg.variableHeader().keepAliveTimeSeconds();
NettyUtils.keepAlive(channel, keepAlive);
NettyUtils.cleanSession(channel, msg.variableHeader().isCleanSession());
NettyUtils.clientID(channel, clientId);
int idleTime = Math.round(keepAlive * 1.5f);
setIdleTime(channel.pipeline(), idleTime);
LOG.debug("Connection has been configured CId={}, keepAlive={}, removeTemporaryQoS2={}, idleTime={}",
clientId, keepAlive, msg.variableHeader().isCleanSession(), idleTime);
}
private void setIdleTime(ChannelPipeline pipeline, int idleTime) {
if (pipeline.names().contains("idleStateHandler")) {
pipeline.remove("idleStateHandler");
}
pipeline.addFirst("idleStateHandler", new IdleStateHandler(idleTime, 0, 0));
}
private boolean isNotProtocolVersion(MqttConnectMessage msg, MqttVersion version) {
return msg.variableHeader().version() != version.protocolLevel();
}
private void abortConnection(MqttConnectReturnCode returnCode) {
MqttConnAckMessage badProto = connAck(returnCode, false);
channel.writeAndFlush(badProto).addListener(FIRE_EXCEPTION_ON_FAILURE);
channel.close().addListener(CLOSE_ON_FAILURE);
}
private MqttConnAckMessage connAck(MqttConnectReturnCode returnCode, boolean sessionPresent) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE,
false, 0);
MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(returnCode, sessionPresent);
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
}
private boolean login(MqttConnectMessage msg, final String clientId) {
// handle user authentication
if (msg.variableHeader().hasUserName()) {
byte[] pwd = null;
if (msg.variableHeader().hasPassword()) {
pwd = msg.payload().password().getBytes(StandardCharsets.UTF_8);
} else if (!brokerConfig.isAllowAnonymous()) {
LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);
return false;
}
final String login = msg.payload().userName();
if (!authenticator.checkValid(clientId, login, pwd)) {
LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, login);
return false;
}
NettyUtils.userName(channel, login);
} else if (!brokerConfig.isAllowAnonymous()) {
LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
return false;
}
return true;
}
void handleConnectionLost() {
String clientID = NettyUtils.clientID(channel);
if (clientID == null || clientID.isEmpty()) {
return;
}
LOG.info("Notifying connection lost event. CId: {}, channel: {}", clientID, channel);
Session session = sessionRegistry.retrieve(clientID);
if (session.hasWill()) {
postOffice.fireWill(session.getWill());
}
if (session.isClean()) {
sessionRegistry.remove(clientID);
} else {
sessionRegistry.disconnect(clientID);
}
connected = false;
//dispatch connection lost to intercept.
String userName = NettyUtils.userName(channel);
postOffice.dispatchConnectionLost(clientID,userName);
LOG.trace("dispatch disconnection: clientId={}, userName={}", clientID, userName);
}
void sendConnAck(boolean isSessionAlreadyPresent) {
connected = true;
final MqttConnAckMessage ackMessage = connAck(CONNECTION_ACCEPTED, isSessionAlreadyPresent);
channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
}
boolean isConnected() {
return connected;
}
void dropConnection() {
channel.close().addListener(FIRE_EXCEPTION_ON_FAILURE);
}
void processDisconnect(MqttMessage msg) {
final String clientID = NettyUtils.clientID(channel);
LOG.trace("Start DISCONNECT CId={}, channel: {}", clientID, channel);
if (!connected) {
LOG.info("DISCONNECT received on already closed connection, CId={}, channel: {}", clientID, channel);
return;
}
sessionRegistry.disconnect(clientID);
connected = false;
channel.close().addListener(FIRE_EXCEPTION_ON_FAILURE);
LOG.trace("Processed DISCONNECT CId={}, channel: {}", clientID, channel);
String userName = NettyUtils.userName(channel);
postOffice.dispatchDisconnection(clientID,userName);
LOG.trace("dispatch disconnection: clientId={}, userName={}", clientID, userName);
}
void processSubscribe(MqttSubscribeMessage msg) {
final String clientID = NettyUtils.clientID(channel);
if (!connected) {
LOG.warn("SUBSCRIBE received on already closed connection, CId={}, channel: {}", clientID, channel);
dropConnection();
return;
}
postOffice.subscribeClientToTopics(msg, clientID, NettyUtils.userName(channel), this);
}
void sendSubAckMessage(int messageID, MqttSubAckMessage ackMessage) {
final String clientId = NettyUtils.clientID(channel);
LOG.trace("Sending SUBACK response CId={}, messageId: {}", clientId, messageID);
channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
}
private void processUnsubscribe(MqttUnsubscribeMessage msg) {
List<String> topics = msg.payload().topics();
String clientID = NettyUtils.clientID(channel);
LOG.trace("Processing UNSUBSCRIBE message. CId={}, topics: {}", clientID, topics);
postOffice.unsubscribe(topics, this, msg.variableHeader().messageId());
}
void sendUnsubAckMessage(List<String> topics, String clientID, int messageID) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, AT_MOST_ONCE,
false, 0);
MqttUnsubAckMessage ackMessage = new MqttUnsubAckMessage(fixedHeader, from(messageID));
LOG.trace("Sending UNSUBACK message. CId={}, messageId: {}, topics: {}", clientID, messageID, topics);
channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
LOG.trace("Client <{}> unsubscribed from topics <{}>", clientID, topics);
}
void processPublish(MqttPublishMessage msg) {
final MqttQoS qos = msg.fixedHeader().qosLevel();
final String username = NettyUtils.userName(channel);
final String topicName = msg.variableHeader().topicName();
final String clientId = getClientId();
LOG.trace("Processing PUBLISH message. CId={}, topic: {}, messageId: {}, qos: {}", clientId, topicName,
msg.variableHeader().packetId(), qos);
ByteBuf payload = msg.payload();
final boolean retain = msg.fixedHeader().isRetain();
final Topic topic = new Topic(topicName);
if (!topic.isValid()) {
LOG.debug("Drop connection because of invalid topic format");
dropConnection();
}
switch (qos) {
case AT_MOST_ONCE:
postOffice.receivedPublishQos0(topic, username, clientId, payload, retain, msg);
break;
case AT_LEAST_ONCE: {
final int messageID = msg.variableHeader().packetId();
postOffice.receivedPublishQos1(this, topic, username, payload, messageID, retain, msg);
break;
}
case EXACTLY_ONCE: {
final int messageID = msg.variableHeader().packetId();
final Session session = sessionRegistry.retrieve(clientId);
session.receivedPublishQos2(messageID, msg);
postOffice.receivedPublishQos2(this, msg, username);
// msg.release();
break;
}
default:
LOG.error("Unknown QoS-Type:{}", qos);
break;
}
}
void sendPublishReceived(int messageID) {
LOG.trace("sendPubRec invoked on channel: {}", channel);
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE,
false, 0);
MqttPubAckMessage pubRecMessage = new MqttPubAckMessage(fixedHeader, from(messageID));
sendIfWritableElseDrop(pubRecMessage);
}
private void processPubRel(MqttMessage msg) {
final Session session = sessionRegistry.retrieve(getClientId());
final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
session.receivedPubRelQos2(messageID);
sendPubCompMessage(messageID);
}
void sendPublish(MqttPublishMessage publishMsg) {
final int packetId = publishMsg.variableHeader().packetId();
final String topicName = publishMsg.variableHeader().topicName();
final String clientId = getClientId();
MqttQoS qos = publishMsg.fixedHeader().qosLevel();
if (LOG.isTraceEnabled()) {
LOG.trace("Sending PUBLISH({}) message. MessageId={}, CId={}, topic={}, payload={}", qos, packetId,
clientId, topicName, DebugUtils.payload2Str(publishMsg.payload()));
} else {
LOG.debug("Sending PUBLISH({}) message. MessageId={}, CId={}, topic={}", qos, packetId, clientId,
topicName);
}
sendIfWritableElseDrop(publishMsg);
}
void sendIfWritableElseDrop(MqttMessage msg) {
if (LOG.isDebugEnabled()) {
LOG.debug("OUT {} on channel {}", msg.fixedHeader().messageType(), channel);
}
if (channel.isWritable()) {
// flushing PUBACK immediately
channel.writeAndFlush(msg).addListener(FIRE_EXCEPTION_ON_FAILURE);
}
}
public void writabilityChanged() {
if (channel.isWritable()) {
LOG.debug("Channel {} is again writable", channel);
final Session session = sessionRegistry.retrieve(getClientId());
session.writabilityChanged();
}
}
void sendPubAck(int messageID) {
LOG.trace("sendPubAck invoked");
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE,
false, 0);
MqttPubAckMessage pubAckMessage = new MqttPubAckMessage(fixedHeader, from(messageID));
sendIfWritableElseDrop(pubAckMessage);
}
private void sendPubCompMessage(int messageID) {
LOG.trace("Sending PUBCOMP message on channel: {}, messageId: {}", channel, messageID);
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, AT_MOST_ONCE, false, 0);
MqttMessage pubCompMessage = new MqttMessage(fixedHeader, from(messageID));
sendIfWritableElseDrop(pubCompMessage);
}
String getClientId() {
return NettyUtils.clientID(channel);
}
String getUsername() {
return NettyUtils.userName(channel);
}
public void sendPublishRetainedQos0(Topic topic, MqttQoS qos, ByteBuf payload) {
MqttPublishMessage publishMsg = retainedPublish(topic.toString(), qos, payload);
sendPublish(publishMsg);
}
public void sendPublishRetainedWithPacketId(Topic topic, MqttQoS qos, ByteBuf payload) {
final int packetId = nextPacketId();
MqttPublishMessage publishMsg = retainedPublishWithMessageId(topic.toString(), qos, payload, packetId);
sendPublish(publishMsg);
}
private static MqttPublishMessage retainedPublish(String topic, MqttQoS qos, ByteBuf message) {
return retainedPublishWithMessageId(topic, qos, message, 0);
}
private static MqttPublishMessage retainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message,
int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, true, 0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId);
return new MqttPublishMessage(fixedHeader, varHeader, message);
}
// TODO move this method in Session
void sendPublishNotRetainedQos0(Topic topic, MqttQoS qos, ByteBuf payload) {
MqttPublishMessage publishMsg = notRetainedPublish(topic.toString(), qos, payload);
sendPublish(publishMsg);
}
static MqttPublishMessage notRetainedPublish(String topic, MqttQoS qos, ByteBuf message) {
return notRetainedPublishWithMessageId(topic, qos, message, 0);
}
static MqttPublishMessage notRetainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message,
int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId);
return new MqttPublishMessage(fixedHeader, varHeader, message);
}
public void resendNotAckedPublishes() {
final Session session = sessionRegistry.retrieve(getClientId());
session.resendInflightNotAcked();
}
int nextPacketId() {
return lastPacketId.incrementAndGet();
}
@Override
public String toString() {
return "MQTTConnection{channel=" + channel + ", connected=" + connected + '}';
}
InetSocketAddress remoteAddress() {
return (InetSocketAddress) channel.remoteAddress();
}
}

View File

@ -50,6 +50,34 @@ public class IoTDBConfig {
private boolean enableMetricService = false;
/**
* whether to enable the mqtt service.
*/
private boolean enableMQTTService = true;
/**
* the mqtt service binding host.
*/
private String mqttHost = "0.0.0.0";
/**
* the mqtt service binding port.
*/
private int mqttPort = 1883;
/**
* the handler pool size for handing the mqtt messages.
*/
private int mqttHandlerPoolSize = 1;
/**
* the mqtt message payload formatter.
*/
private String mqttPayloadFormatter = "json";
/**
* Rpc binding address.
*/
private String rpcAddress = "0.0.0.0";
/**
@ -1413,4 +1441,44 @@ public class IoTDBConfig {
public void setQueryCacheSizeInMetric(int queryCacheSizeInMetric) {
this.queryCacheSizeInMetric = queryCacheSizeInMetric;
}
public boolean isEnableMQTTService() {
return enableMQTTService;
}
public void setEnableMQTTService(boolean enableMQTTService) {
this.enableMQTTService = enableMQTTService;
}
public String getMqttHost() {
return mqttHost;
}
public void setMqttHost(String mqttHost) {
this.mqttHost = mqttHost;
}
public int getMqttPort() {
return mqttPort;
}
public void setMqttPort(int mqttPort) {
this.mqttPort = mqttPort;
}
public int getMqttHandlerPoolSize() {
return mqttHandlerPoolSize;
}
public void setMqttHandlerPoolSize(int mqttHandlerPoolSize) {
this.mqttHandlerPoolSize = mqttHandlerPoolSize;
}
public String getMqttPayloadFormatter() {
return mqttPayloadFormatter;
}
public void setMqttPayloadFormatter(String mqttPayloadFormatter) {
this.mqttPayloadFormatter = mqttPayloadFormatter;
}
}

View File

@ -88,4 +88,11 @@ public class IoTDBConstant {
public static final String SCHEMA_FOLDER_NAME = "schema";
public static final String SYNC_FOLDER_NAME = "sync";
public static final String QUERY_FOLDER_NAME = "query";
// mqtt
public static final String ENABLE_MQTT = "enable_mqtt_service";
public static final String MQTT_HOST_NAME = "mqtt_host";
public static final String MQTT_PORT_NAME = "mqtt_port";
public static final String MQTT_HANDLER_POOL_SIZE_NAME = "mqtt_handler_pool_size";
public static final String MQTT_PAYLOAD_FORMATTER_NAME = "mqtt_payload_formatter";
}

View File

@ -339,6 +339,23 @@ public class IoTDBDescriptor {
Integer.parseInt(properties.getProperty("default_fill_interval",
String.valueOf(conf.getDefaultFillInterval()))));
// mqtt
if (properties.getProperty(IoTDBConstant.MQTT_HOST_NAME) != null) {
conf.setMqttHost(properties.getProperty(IoTDBConstant.MQTT_HOST_NAME));
}
if (properties.getProperty(IoTDBConstant.MQTT_PORT_NAME) != null) {
conf.setMqttPort(Integer.parseInt(properties.getProperty(IoTDBConstant.MQTT_PORT_NAME)));
}
if (properties.getProperty(IoTDBConstant.MQTT_HANDLER_POOL_SIZE_NAME) != null) {
conf.setMqttHandlerPoolSize(Integer.parseInt(properties.getProperty(IoTDBConstant.MQTT_HANDLER_POOL_SIZE_NAME)));
}
if (properties.getProperty(IoTDBConstant.MQTT_PAYLOAD_FORMATTER_NAME) != null) {
conf.setMqttPayloadFormatter(properties.getProperty(IoTDBConstant.MQTT_PAYLOAD_FORMATTER_NAME));
}
if (properties.getProperty(IoTDBConstant.ENABLE_MQTT) != null) {
conf.setEnableMQTTService(Boolean.parseBoolean(properties.getProperty(IoTDBConstant.ENABLE_MQTT)));
}
// At the same time, set TSFileConfig
TSFileDescriptor.getInstance().getConfig()
.setTSFileStorageFs(FSType.valueOf(

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.mqtt;
import io.moquette.broker.security.IAuthenticator;
import org.apache.commons.lang.StringUtils;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The MQTT broker authenticator.
*/
public class BrokerAuthenticator implements IAuthenticator {
private static final Logger LOG = LoggerFactory.getLogger(BrokerAuthenticator.class);
@Override
public boolean checkValid(String clientId, String username, byte[] password) {
if (StringUtils.isBlank(username) || password == null) {
return false;
}
try {
IAuthorizer authorizer = LocalFileAuthorizer.getInstance();
return authorizer.login(username, new String(password));
} catch (AuthException e) {
LOG.info("meet error while logging in.", e);
return false;
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.mqtt;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
/**
* The JSON payload formatter.
* two json format supported:
* {
* "device":"root.sg.d1",
* "timestamp":1586076045524,
* "measurements":["s1","s2"],
* "values":[0.530635,0.530635]
* }
*
* {
* "device":"root.sg.d1",
* "timestamps":[1586076045524,1586076065526],
* "measurements":["s1","s2"],
* "values":[[0.530635,0.530635], [0.530655,0.530695]]
* }
*/
public class JSONPayloadFormatter implements PayloadFormatter {
private static final String JSON_KEY_DEVICE = "device";
private static final String JSON_KEY_TIMESTAMP = "timestamp";
private static final String JSON_KEY_TIMESTAMPS = "timestamps";
private static final String JSON_KEY_MEASUREMENTS = "measurements";
private static final String JSON_KEY_VALUES = "values";
@Override
public List<Message> format(ByteBuf payload) {
if (payload == null) {
return null;
}
String txt = payload.toString(StandardCharsets.UTF_8);
JSONObject jsonObject = JSON.parseObject(txt);
Object timestamp = jsonObject.get(JSON_KEY_TIMESTAMP);
if (timestamp != null) {
return Lists.newArrayList(JSON.parseObject(txt, Message.class));
}
String device = jsonObject.getString(JSON_KEY_DEVICE);
JSONArray timestamps = jsonObject.getJSONArray(JSON_KEY_TIMESTAMPS);
JSONArray measurements = jsonObject.getJSONArray(JSON_KEY_MEASUREMENTS);
JSONArray values = jsonObject.getJSONArray(JSON_KEY_VALUES);
List<Message> ret = new ArrayList<>();
for (int i = 0; i < timestamps.size(); i++) {
Long ts = timestamps.getLong(i);
Message message = new Message();
message.setDevice(device);
message.setTimestamp(ts);
message.setMeasurements(measurements.toJavaList(String.class));
message.setValues(((JSONArray)values.get(i)).toJavaList(String.class));
ret.add(message);
}
return ret;
}
@Override
public String getName() {
return "json";
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.mqtt;
import java.util.List;
/**
* Message describes the information sometime sent from the devices.
*/
public class Message {
private String device;
private Long timestamp;
private List<String> measurements;
private List<String> values;
public String getDevice() {
return device;
}
public void setDevice(String device) {
this.device = device;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public List<String> getMeasurements() {
return measurements;
}
public void setMeasurements(List<String> measurements) {
this.measurements = measurements;
}
public List<String> getValues() {
return values;
}
public void setValues(List<String> values) {
this.values = values;
}
@Override
public String toString() {
return "Message{" +
"device='" + device + '\'' +
", timestamp=" + timestamp +
", measurements=" + measurements +
", values=" + values +
'}';
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.mqtt;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
/**
* PayloadFormatManager loads payload formatter from SPI services.
*/
public class PayloadFormatManager {
private static Map<String, PayloadFormatter> map = new HashMap<>();
static {
init();
}
private static void init() {
ServiceLoader<PayloadFormatter> formats = ServiceLoader.load(PayloadFormatter.class);
for (PayloadFormatter format : formats) {
map.put(format.getName(), format);
}
}
public static PayloadFormatter getPayloadFormat(String name) {
Preconditions.checkArgument(map.containsKey(name), "Unknown payload format named: " + name);
return map.get(name);
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.mqtt;
import io.netty.buffer.ByteBuf;
import java.util.List;
/**
* PayloadFormatter format the payload to the messages.
*/
public interface PayloadFormatter {
/**
* format a payload to a list of messages
* @param payload
* @return
*/
List<Message> format(ByteBuf payload);
/**
* get the formatter name
* @return
*/
String getName();
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.mqtt;
import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.executor.IPlanExecutor;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* PublishHandler handle the messages from MQTT clients.
*/
public class PublishHandler extends AbstractInterceptHandler {
private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
private IPlanExecutor executor;
private PayloadFormatter payloadFormat;
public PublishHandler(IoTDBConfig config) {
this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
try {
this.executor = new PlanExecutor();
} catch (QueryProcessException e) {
throw new RuntimeException(e);
}
}
protected PublishHandler(IPlanExecutor executor, PayloadFormatter payloadFormat) {
this.executor = executor;
this.payloadFormat = payloadFormat;
}
@Override
public String getID() {
return "iotdb-mqtt-broker-listener";
}
@Override
public void onPublish(InterceptPublishMessage msg) {
String clientId = msg.getClientID();
ByteBuf payload = msg.getPayload();
String topic = msg.getTopicName();
String username = msg.getUsername();
MqttQoS qos = msg.getQos();
LOG.debug("Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}",
clientId, username, qos, topic, payload);
List<Message> events = payloadFormat.format(payload);
if (events == null) {
return;
}
// since device ids from messages maybe different, so we use the InsertPlan not BatchInsertPlan.
for (Message event : events) {
if (event == null) {
continue;
}
InsertPlan plan = new InsertPlan();
plan.setDeviceId(event.getDevice());
plan.setTime(event.getTimestamp());
plan.setMeasurements(event.getMeasurements().toArray(new String[event.getMeasurements().size()]));
plan.setValues(event.getValues().toArray(new String[event.getValues().size()]));
boolean status;
try {
status = executeNonQuery(plan);
} catch (QueryProcessException e) {
throw new RuntimeException(e);
}
LOG.debug("event process result: {}", status);
}
}
private boolean executeNonQuery(PhysicalPlan plan) throws QueryProcessException {
if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
throw new QueryProcessException(
"Current system mode is read-only, does not support non-query operation");
}
return executor.processNonQuery(plan);
}
}

View File

@ -99,6 +99,9 @@ public class IoTDB implements IoTDBMBean {
if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) {
registerManager.register(MetricsService.getInstance());
}
if (IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService()) {
registerManager.register(MQTTService.getInstance());
}
JMXService.registerMBean(getInstance(), mbeanName);
registerManager.register(StorageEngine.getInstance());

View File

@ -22,7 +22,7 @@ import org.apache.iotdb.db.utils.MemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IoTDBShutdownHook extends Thread{
public class IoTDBShutdownHook extends Thread {
private static final Logger logger = LoggerFactory.getLogger(IoTDBShutdownHook.class);

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.service;
import com.google.common.collect.Lists;
import io.moquette.BrokerConstants;
import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.interception.InterceptHandler;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.mqtt.BrokerAuthenticator;
import org.apache.iotdb.db.mqtt.PublishHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Properties;
/**
* The IoTDB MQTT Service.
*/
public class MQTTService implements IService {
private static final Logger LOG = LoggerFactory.getLogger(MQTTService.class);
private Server server = new Server();
@Override
public void start() throws StartupException {
startup();
}
@Override
public void stop() {
shutdown();
}
public void startup() {
IoTDBConfig iotDBConfig = IoTDBDescriptor.getInstance().getConfig();
IConfig config = createBrokerConfig(iotDBConfig);
List<InterceptHandler> handlers = Lists.newArrayList(new PublishHandler(iotDBConfig));
IAuthenticator authenticator = new BrokerAuthenticator();
server.startServer(config, handlers, null, authenticator, null);
LOG.info("Start MQTT service successfully, listening on ip {} port {}",
iotDBConfig.getMqttHost(), iotDBConfig.getMqttPort());
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOG.info("Stopping IoTDB MQTT service...");
shutdown();
LOG.info("IoTDB MQTT service stopped.");
}));
}
private IConfig createBrokerConfig(IoTDBConfig iotDBConfig) {
Properties properties = new Properties();
properties.setProperty(BrokerConstants.HOST_PROPERTY_NAME, iotDBConfig.getMqttHost());
properties.setProperty(BrokerConstants.PORT_PROPERTY_NAME, String.valueOf(iotDBConfig.getMqttPort()));
properties.setProperty(BrokerConstants.BROKER_INTERCEPTOR_THREAD_POOL_SIZE, String.valueOf(iotDBConfig.getMqttHandlerPoolSize()));
return new MemoryConfig(properties);
}
public void shutdown() {
server.stopServer();
}
@Override
public ServiceType getID() {
return ServiceType.MQTT_SERVICE;
}
public static final MQTTService getInstance() {
return MQTTServiceHolder.INSTANCE;
}
private static class MQTTServiceHolder {
private static final MQTTService INSTANCE = new MQTTService();
private MQTTServiceHolder() {
}
}
}

View File

@ -26,6 +26,7 @@ public enum ServiceType {
JMX_SERVICE("JMX ServerService", "JMX ServerService"),
METRICS_SERVICE("Metrics ServerService","MetricsService"),
JDBC_SERVICE("JDBC ServerService", "JDBCService"),
MQTT_SERVICE("MQTTService", ""),
MONITOR_SERVICE("Monitor ServerService", "Monitor"),
STAT_MONITOR_SERVICE("Statistics ServerService", ""),
WAL_SERVICE("WAL ServerService", ""),

View File

@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
org.apache.iotdb.db.mqtt.JSONPayloadFormatter

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.mqtt;
import org.junit.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class BrokerAuthenticatorTest {
@Test
public void checkValid() {
BrokerAuthenticator authenticator = new BrokerAuthenticator();
assertTrue(authenticator.checkValid(null, "root", "root".getBytes()));
assertFalse(authenticator.checkValid(null, "", "foo".getBytes()));
assertFalse(authenticator.checkValid(null, "root", null));
assertFalse(authenticator.checkValid(null, "foo", "foo".getBytes()));
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import static org.junit.Assert.assertEquals;
public class JSONPayloadFormatTest {
@Test
public void formatJson() {
String payload = " {\n" +
" \"device\":\"root.sg.d1\",\n" +
" \"timestamp\":1586076045524,\n" +
" \"measurements\":[\"s1\",\"s2\"],\n" +
" \"values\":[0.530635,0.530635]\n" +
" }";
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
JSONPayloadFormatter formatter = new JSONPayloadFormatter();
Message message = formatter.format(buf).get(0);
assertEquals("root.sg.d1", message.getDevice());
assertEquals(Long.valueOf(1586076045524L), message.getTimestamp());
assertEquals("s1", message.getMeasurements().get(0));
assertEquals(0.530635D, Double.parseDouble(message.getValues().get(0)), 0);
}
@Test
public void formatBatchJson() {
String payload = " {\n" +
" \"device\":\"root.sg.d1\",\n" +
" \"timestamps\":[1586076045524,1586076065526],\n" +
" \"measurements\":[\"s1\",\"s2\"],\n" +
" \"values\":[[0.530635,0.530635], [0.530655,0.530695]]\n" +
" }";
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
JSONPayloadFormatter formatter = new JSONPayloadFormatter();
Message message = formatter.format(buf).get(1);
assertEquals("root.sg.d1", message.getDevice());
assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
assertEquals("s2", message.getMeasurements().get(1));
assertEquals(0.530695D, Double.parseDouble(message.getValues().get(1)), 0);
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.mqtt;
import org.junit.Test;
import static org.junit.Assert.*;
public class PayloadFormatManagerTest {
@Test(expected = IllegalArgumentException.class)
public void getPayloadFormat() {
PayloadFormatManager.getPayloadFormat("txt");
}
@Test
public void getDefaultPayloadFormat() {
assertNotNull(PayloadFormatManager.getPayloadFormat("json"));
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.mqtt;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.*;
import org.apache.iotdb.db.qp.executor.IPlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
public class PublishHandlerTest {
@Test
public void onPublish() throws Exception {
IPlanExecutor executor = mock(IPlanExecutor.class);
PayloadFormatter payloadFormat = PayloadFormatManager.getPayloadFormat("json");
PublishHandler handler = new PublishHandler(executor, payloadFormat);
String payload = "{\n" +
"\"device\":\"root.sg.d1\",\n" +
"\"timestamp\":1586076045524,\n" +
"\"measurements\":[\"s1\"],\n" +
"\"values\":[0.530635]\n" +
"}";
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("root.sg.d1", 1);
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 1);
MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, variableHeader, buf);
InterceptPublishMessage message = new InterceptPublishMessage(publishMessage, null, null);
handler.onPublish(message);
verify(executor).processNonQuery(any(InsertPlan.class));
}
}

View File

@ -347,7 +347,8 @@ var config = {
'4-Client/3-Programming - JDBC',
'4-Client/4-Programming - Other Languages',
'4-Client/5-Programming - TsFile API',
'4-Client/6-Status Codes',
'4-Client/6-Programming - MQTT',
'4-Client/7-Status Codes',
]
},
{