阿里云RocketMQ消费MQTT消息

业务背景:

        项目中涉及的消息队列既有RocketMQ,又有MQTT,均为阿里云提供(阿里云有专门的“微消息队列 MQTT 版”模块,但博主公司消息队列的实例都在“消息队列 RocketMQ 版”模块下,只是实例不同,猜测是做了适配,有清楚的大佬欢迎指点)。其中MQTT的消息是由硬件设备上报而来,由java服务进行消费,使用一套内部框架连接。现因框架存在适配问题,经讨论决定放弃使用,需改造原消费代码。

技术方案:

       查询资料与询问同事得知两种消息队列在底层逻辑上高度相似,可以用RocketMQ方式连接MQTT的topic并消费。在实际改造过程中发现二者均可通过com.aliyun.openservices.ons.api.ONSFactory#createConsumer这一阿里云官方接口进行连接。而项目中已有一套配置用于连接RocketMQ,两类消息队列的地址、密钥等实例信息并不相同。因此改造的重点其实就是实现RocketMQ多实例配置。

代码实现:

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * 自定义注解
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface OnsMqttMessageListener {

    String topic();

    String consumerGroup();

}
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * 参数类
 */
@Data
@Component
@EnableConfigurationProperties
@ConfigurationProperties(prefix = "aliyun.rocketmq")
public class OnsMqttMessageProperties {

    private String onsAddr;

    private String accessKey;

    private String secretKey;

    private String groupId;

}
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.geelytech.bms.annotation.OnsMqttMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Properties;

/**
 * 配置类
 */
@Slf4j
@Component
public class OnsMqttMessageConsumerConfig implements ApplicationListener<ApplicationReadyEvent> {

    @Autowired
    private OnsMqttMessageProperties onsMqttMessageProperties;

    /**
     * 注入所有MessageListener实例
     */
    @Autowired
    private Map<String, MessageListener> messageListeners;

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        String onsAddr = onsMqttMessageProperties.getOnsAddr();
        String accessKey = onsMqttMessageProperties.getAccessKey();
        String secretKey = onsMqttMessageProperties.getSecretKey();
        log.info("ConsumerSubscriptionConfig onApplicationEvent messageListeners={}", messageListeners);

        // 订阅每个MessageListener
        for (Map.Entry<String, MessageListener> entry : messageListeners.entrySet()) {
            MessageListener listener = entry.getValue();

            OnsMqttMessageListener annotation = AnnotationUtils.findAnnotation(listener.getClass(), OnsMqttMessageListener.class);
            if (annotation != null) {
                String topic = annotation.topic();
                String consumerGroup = annotation.consumerGroup();

                Properties properties = new Properties();
                properties.setProperty(PropertyKeyConst.GROUP_ID, consumerGroup);
                properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, onsAddr);
                properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
                properties.setProperty(PropertyKeyConst.SecretKey, secretKey);

                Consumer consumer = ONSFactory.createConsumer(properties);
                consumer.subscribe(topic, "*", listener);
                consumer.start();
            }
        }
    }

}
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.geelytech.bms.annotation.OnsMqttMessageListener;
import com.geelytech.satellite.constant.TopicConstant;
import com.geelytech.satellite.dto.eventOriginalData.BizSwapDone;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import static com.aliyun.openservices.ons.api.Action.CommitMessage;

@Slf4j
@Component
@OnsMqttMessageListener(topic = "", consumerGroup = "")
public class BmsBizSwapDoneConsumer implements MessageListener {

    @Override
    public Action consume(Message message, ConsumeContext consumeContext) {
        // 业务逻辑
        return CommitMessage;
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/581773.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录-二叉树(节点)

目录 104. 二叉树的最大深度 题目描述&#xff1a; 输入输出描述&#xff1a; 思路和想法&#xff1a; 111. 二叉树的最小深度 题目描述&#xff1a; 输入输出描述&#xff1a; 思路和想法&#xff1a; 222. 完全二叉树的节点个数 题目描述&#xff1a; ​输入输出描…

商汤研究院招大模型实习生

商汤研究院招大模型实习生&#xff0c;base上海、北京&#xff0c;400/day。福利&#xff1a;每天50租房补贴&#xff0c;20的餐补。晚上8点之后回去有额外的25元晚餐餐补&#xff0c;10点之后回去可以免费用滴滴。 组内的大模型工作大概分两个方向&#xff1a; 1.3B、3B等小…

特别的时钟特别的倒计时

念念不忘的歌曲&#xff1a;Thats Why You Go Away <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title&…

IDEA新版本创建Spring项目只能勾选17和21却无法使用Java8的完美解决方案

想创建一个springboot的项目&#xff0c;使用Spring Initializr创建项目时&#xff0c;发现版本只有17&#xff5e;21&#xff0c;无法选择Java8。 我们知道IDEA页面创建Spring项目&#xff0c;其实是访问spring initializr去创建项目。我们可以通过阿里云国服间接创建Spring项…

工业异常检测

工业异常检测在业界和学界都一直是热门&#xff0c;近期其更是迎来了全新突破&#xff1a;与大模型相结合&#xff01;让异常检测变得更快更准更简单&#xff01; 比如模型AnomalyGPT&#xff0c;它克服了以往的局限&#xff0c;能够让大模型充分理解工业场景图像&#xff0c;判…

Redis哈希槽和一致性哈希

前言 单点的Redis有一定的局限&#xff1a; 单点发生故障&#xff0c;数据丢失&#xff0c;影响整体服务应用自身资源有限&#xff0c;无法承载更多资源分配并发访问&#xff0c;给服务器主机带来压力&#xff0c;性能瓶颈 我们想提升系统的容量、性能和可靠性&#xff0c;就…

paddleocr C++生成dll

目录 编译完成后修改内容: 新建ppocr.h头文件 注释掉main.cpp内全部内容&#xff0c;将下面内容替换进去。ppocr.h需要再环境配置中包含进去头文件 然后更改配置信息&#xff0c;将exe换成dll 随后右击重新编译会在根目录生成dll,lib文件。 注意这些dll一个也不能少。生成…

伪装目标检测论文阅读 SAM大模型之参数微调:Conv LoRA

paper&#xff1a;link code&#xff1a;还没公开 摘要 任意分割模型(SAM)是图像分割的基本框架。虽然它在典型场景中表现出显著的零镜头泛化&#xff0c;但当应用于医学图像和遥感等专门领域时&#xff0c;其优势就会减弱。针对这一局限性&#xff0c;本文提出了一种简单有效…

Java进阶-JavaStreamAPI的使用

本文全面介绍了 Java Stream API 的概念、功能以及如何在 Java 中有效地使用它进行集合和数据流的处理。通过详细解释和示例&#xff0c;文章展示了 Java Stream API 在简化代码、提高效率以及支持函数式编程方面的优势。文中还比较了 Java Stream API 与其他集合处理库的异同&…

Django之搭配内网穿透

一&#xff0c;安装coplar 二&#xff0c;开启8087的内网穿透 三&#xff0c;setting.py中加入如下配置&#xff1a; ALLOWED_HOSTS [*]CSRF_TRUSTED_ORIGINS ["https://localhost:8087", "http://localhost:8087"]四&#xff0c;启动项目 五&#xff…

比较美观即将跳转html源码

源码介绍 比较美观即将跳转html源码&#xff0c;源码由HTMLCSSJS组成&#xff0c;记事本打开源码文件可以进行内容文字之类的修改&#xff0c;双击html文件可以本地运行效果&#xff0c;也可以上传到服务器里面 源码截图 比较美观的一个跳转界面&#xff0c;修改方法如上&…

MATLAB实现果蝇算法优化BP神经网络预测分类(FOA-BP)

果蝇算法&#xff08;Fruit Fly Optimization Algorithm, FFOA&#xff09;是一种启发式优化算法&#xff0c;受果蝇觅食行为的启发。将其应用于优化BP神经网络&#xff0c;主要是为了寻找BP神经网络中的最佳权重和偏置值。以下是一个基本的流程&#xff1a; 初始化&#xff1a…

Ubuntu20.04 [Ros Noetic]版本——在catkin_make编译时出现报错的解决方案

今天在新的笔记本电脑上进行catkin_make的编译过程中遇到了报错&#xff0c;这个报错在之前也遇到过&#xff0c;但是&#xff0c;我却忘了怎么解决。很是头痛&#xff01; 经过多篇博客的查询&#xff0c;特此解决了这个编译报错的问题&#xff0c;于此特地记录&#xff01;&…

【bug已解决】发生错误,导致虚拟 CPU 进入关闭状态。如果虚拟机外部发生此错误,则可能已导致物理计算机重新启动......

本bug报错已找到原因,并成功解决。 项目场景: vmware安装ubuntu报错。 如下: 发生错误,导致虚拟 CPU 进入关闭状态。如果虚拟机外部发生此错误,则可能已导致物理计算机重新启动。错误配置虚拟机、客户机操作系统中的错误或 VMware Workstation 中的问题都可以导致关闭状…

kaggle(4) Regression with an Abalone Dataset 鲍鱼数据集的回归

kaggle&#xff08;4&#xff09; Regression with an Abalone Dataset 鲍鱼数据集的回归 import pandas as pd import numpy as npimport xgboost import lightgbm import optuna import catboostfrom sklearn.model_selection import train_test_split from sklearn.metrics …

C++之list模拟实现

1、定义 定义一个结点&#xff1a; 在list类中的定义&#xff1a; 2、push_back() 3、迭代器 3.1迭代器的构造和定义 3.2、迭代器中的取值 3.3、迭代器的迭代(前置或前置--) 3.4、迭代器的迭代(后置或后置--) 3.5、迭代器的判断 3.6、在类list的定义 4.begin()和end() 5.con…

Nodejs 第六十九章(杀毒)

杀毒 杀毒&#xff08;Antivirus&#xff09;是指一类计算机安全软件&#xff0c;旨在检测、阻止和清除计算机系统中的恶意软件&#xff0c;如病毒、蠕虫、木马、间谍软件和广告软件等。这些恶意软件可能会对计算机系统和用户数据造成损害&#xff0c;包括数据丢失、系统崩溃、…

⑥ - 后端工程师通识指南

&#x1f4d6; 该文隶属 程序员&#xff1a;职场关键角色通识宝典 ✍️ 作者&#xff1a;哈哥撩编程&#xff08;视频号同名&#xff09; 博客专家全国博客之星第四名超级个体COC上海社区主理人特约讲师谷歌亚马逊演讲嘉宾科技博主极星会首批签约作者 &#x1f3c6; 推荐专栏…

windows下git提交修改文件名大小写提交无效问题

windows系统不区分大小写&#xff0c;以及git提交忽略大小写&#xff0c;git仓库已存在文件A.js&#xff0c;本地修改a.js一般是没有提交记录的&#xff0c;需要手动copy一份出来A.js&#xff0c;再删除A.js文件提交仓库删除后&#xff0c;再提交修改后的a.js文件。 windows决…

岚图汽车与东软睿驰签署战略合作协议

4月26日,东软睿驰与岚图汽车正式签署战略合作协议,双方将结合在各自领域拥有的产业资源、技术研发和资本运作等优势,聚焦智能化产品和应用,建立长期共赢的战略合作伙伴关系,通过不断探索未来新技术、新产业、新业态和新模式,围绕用户需求共同打造极致的智能出行体验。 图为岚图…
最新文章