编辑
2023-04-14
Java
00
请注意,本文编写于 621 天前,最后修改于 621 天前,其中某些信息可能已经过时。

目录

Java操作emqx,控制emqx服务器
springboot操作emqx的项目

Java操作emqx,控制emqx服务器

Java可以通过MQTT协议来连接emqx服务器,并通过订阅、发布消息等操作来控制emqx服务器。下面是Java操作emqx的示例代码:

  1. 使用Eclipse Paho MQTT Client连接emqx服务器:
java
String broker = "tcp://localhost:1883"; String clientId = "Java_Client"; MemoryPersistence persistence = new MemoryPersistence(); MqttClient client = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); //connect to the broker client.connect(connOpts);
  1. 订阅MQTT主题:
java
String topic = "test/topic"; int qos = 2; client.subscribe(topic, qos);
  1. 发布MQTT消息:
java
String topic = "test/topic"; String content = "Hello, emqx!"; int qos = 2; MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); client.publish(topic, message);

通过以上操作,我们可以很方便地使用Java来控制emqx服务器。

springboot操作emqx的项目

实现了一个简单的MQTT消息发送和订阅功能,可以用来操作emqx服务器。

  1. 创建Spring Boot项目

使用IDEA或其他编辑器创建一个新的Spring Boot项目,选择Web和Thymeleaf等依赖。具体的依赖配置如下:

xml
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>

其中,使用spring-integration-mqtt依赖来实现MQTT客户端的功能。

  1. 配置MQTT客户端

在application.properties文件中添加MQTT相关配置:

spring.mqtt.username=user spring.mqtt.password=passwd spring.mqtt.url=tcp://localhost:1883 # 客户端ID,建议使用唯一的字符串 spring.mqtt.client-id=client-1 # 订阅的主题 spring.mqtt.default-topic=test/topic
  1. 编写MQTT配置类

创建MQTT的配置类,并设置连接参数和订阅主题:

java
@Configuration @EnableMqtt public class MqttConfig { @Autowired private MqttPahoMessageDrivenChannelAdapter mqttAdapter; @Bean public MqttConnectOptions mqttConnectOptions() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setUserName("${spring.mqtt.username}"); mqttConnectOptions.setPassword("${spring.mqtt.password}".toCharArray()); mqttConnectOptions.setServerURIs(new String[]{ "${spring.mqtt.url}" }); return mqttConnectOptions; } @ServiceActivator(inputChannel = "mqttInputChannel") public void messageReceiver(String message) { System.out.println("Received message: " + message); } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public IntegrationFlow mqttInFlow() { return IntegrationFlows .from(mqttAdapter) .channel(mqttInputChannel()) .get(); } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(mqttConnectOptions()); return factory; } @Bean public MqttPahoMessageDrivenChannelAdapter mqttAdapter() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("${spring.mqtt.client-id}", mqttClientFactory(), "${spring.mqtt.default-topic}"); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); return adapter; } }

其中,mqttConnectOptions方法用来设置MQTT连接的参数,messageReceiver方法用来接收MQTT消息,mqttInputChannel和mqttInFlow用来配置MQTT消息的输入流。mqttClientFactory方法用来创建MQTT客户端,mqttAdapter用来设置MQTT的相关参数。

  1. 编写MQTT服务类

创建MQTT服务类,用来实现MQTT客户端的连接、消息的发送和订阅等操作:

java
@Service public class MqttService { @Autowired private MqttPahoMessageHandler mqttHandler; @Autowired private MqttAdapter mqttAdapter; public void sendMessage(String topic, String message) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(message.getBytes()); mqttMessage.setQos(1); mqttMessage.setRetained(false); mqttHandler.handleMessage(new Message<byte[]>(mqttMessage.getPayload(), new MessageHeaders(Collections.singletonMap(MqttHeaders.TOPIC, topic)))); } public void subscribe(String topic) { mqttAdapter.addTopic(topic); } }

其中,sendMessage方法用来发送MQTT消息,subscribe方法用来订阅MQTT主题。

  1. 创建控制器

创建一个MqttController控制器,用来处理MQTT相关的请求:

java
@RestController @RequestMapping("/mqtt") public class MqttController { @Autowired private MqttService mqttService; @GetMapping("/subscribe") public String subscribe(String topic) { mqttService.subscribe(topic); return "订阅成功"; } @PostMapping("/send") public String sendMessage(String topic, String message) { mqttService.sendMessage(topic, message); return "发送成功"; } }

其中,subscribe方法处理订阅MQTT主题的请求,sendMessage方法处理发送MQTT消息的请求。

  1. 编写页面

最后,创建页面,用来测试订阅和发送MQTT消息。这里省略具体页面代码。

至此,一个完整的Spring Boot项目就完成了,它可以实现订阅MQTT主题和发送MQTT消息的功能,可以用来操作emqx服务器。