RabbitMq 消息确认和退回机制

一、Rabbit中消息确认和退回机制

1、发布确认

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者 (包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,(单个)如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号。

1.1 开启发布确认

//开启发布确认
channel.confirmSelect();

1.2 发布确认的两种方式

      同步确认

      就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

public static void publishMessageIndividually() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //队列声明
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName, true, false, false, null);
    //开启发布确认
    channel.confirmSelect();

    long begin = System.currentTimeMillis();

    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        //服务端返回 false 或超时时间内未返回,生产者可以消息重发
        boolean flag = channel.waitForConfirms();
        if (flag) {
            System.out.println("消息发送成功");
        }
    }

    long end = System.currentTimeMillis();
    System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");

}
  异步确认 

    他是利用回调函数来达到消息可靠性传递的

//    异步确认发布
    private static void publishMessageAsync() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //队列声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, true, false, false, null);
        //开启发布确认
        channel.confirmSelect();


        /**线程安全有序的一个哈希表,适用于高并发的情况下
          1.轻松的将序号与消息进行关联
          2.轻松批量删除条目,只要给到序号
          3.支持高并发(多线程)
         **/
        //消息确认成功,回调函数
        ConcurrentSkipListMap<Long,String> outstandingConfirms =
                new ConcurrentSkipListMap<>();
        ConfirmCallback ackCallback = (deliveryTag , multiple)->{ //确认了多少条,multiple:批量或单个
            if (multiple){  //批量的
                //2.轻松批量删除条目,只要给到序号
                ConcurrentNavigableMap<Long,String> confirmed =
                        outstandingConfirms.headMap(deliveryTag); //消息的序号
                confirmed.clear();
            }else {  //单个的
                outstandingConfirms.remove(deliveryTag);
            }
            System.out.println("确认的消息:"+deliveryTag);
        };

        //消息确认失败,回调函数
        /**
         * 1.消息的标记
         * 2.是否为批量确认
         */
        ConfirmCallback nackCallback = (deliveryTag , multiple)->{
            //3、打印一下未确认的消息都有哪些
            String message = outstandingConfirms.get(deliveryTag);
            System.out.println("未确认的消息是:"+message+":::未确认的消息:"+deliveryTag);
        };


        //准备消息的监听器 ,监听哪些消息成功了, 哪些失败了
        channel.addConfirmListener( ackCallback, nackCallback);

        //开始时间
        long begin = System.currentTimeMillis();

        //发送消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            //1.轻松的将序号与消息进行关联,将每一条消息都存放hash表里面
            outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
    }

2、 退回机制

退回模式(return)说的是当消息到达交换机后,但是没有找到匹配的队列时,将消息回退给生产者。

默认情况下,如果消息没有匹配到队列会直接丢弃,采用退回模式可以在生产者端监听改消息是否被成功投递到队列中

channel.addReturnListener(new ReturnCallback() {
                @Override
                public void handle(Return returnMessage) {
                    System.out.println("消息被回退,原因:"+returnMessage.getReplyText());
                    System.out.println(returnMessage.getExchange()); // 交换机
                    System.out.println(returnMessage.getReplyCode()); // 返回原因的代码
                    System.out.println(returnMessage.getReplyText()); // 返回信息,例如NO_ROUTE
                    System.out.println(returnMessage.getRoutingKey()); // 路由KEY
                }
            });

二、Spring boot 中Rabbit 实现消息确认和退回机制

1、 开启配置

spring:
  #配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: root
    password: root
    #虚拟host 可以不设置,使用server默认host
    virtual-host: JCcccHost
      #确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
      #确认消息已发送到队列(Queue)
    publisher-returns: true

2、配置消息确认和退回机制

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
 
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Configuration
public class RabbitConfig {
 
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
 
        // 配置发布到交换机的确认
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
            *correlationData :客户端在发送原始消息时提供的对象。
            *ack:exchange交换机是否成功收到了消息。true成功,false代表失败。
            *cause:失败原因。
            */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);
                System.out.println("ConfirmCallback:     "+"确认情况:"+ack);
                System.out.println("ConfirmCallback:     "+"原因:"+cause);
            }
        });
 
        // 配置交换机没有找到对应的消息队列时,消息退回时的处理
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("ReturnCallback:     "+"消息:"+message);
                System.out.println("ReturnCallback:     "+"回应码:"+replyCode);
                System.out.println("ReturnCallback:     "+"回应信息:"+replyText);
                System.out.println("ReturnCallback:     "+"交换机:"+exchange);
                System.out.println("ReturnCallback:     "+"路由键:"+routingKey);
            }
        });
 
        return rabbitTemplate;
    }
 
}

2.1 其他方式

package com.student.rabbitmq.config;


import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 第二种方式
 */
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 将我们实现的MyCallBack接口注入到RabbitTemplate中
    @PostConstruct
    public void init() {
        // 设置确认消息交给谁处理
        rabbitTemplate.setConfirmCallback(this);
        // 设置回退消息交给谁处理
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 交换机确认回调方法
     *
     * @param correlationData 保存回调消息的ID以及相关信息
     * @param ack             表示交换机是否收到消息(true表示收到)
     * @param cause           表示消息接收失败的原因(收到消息为null)
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            System.out.println("交换机已经收到ID为:{}的消息");
        } else {
            System.out.println("交换机还未收到ID为:{}的消息,原因为:{}" + cause);
        }
    }

    /**
     * 路由出现问题的消息回退方法
     *
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println(new String(message.getBody())+
                        exchange+
                        replyText+
                        routingKey);
    }

}

三、参考

发布确认—发布确认逻辑和发布确认的策略

消息可靠性之发布确认、退回机制_rabbittemplate 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/764389.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PO模式简介

V1顺序型&#xff1a;不能批量运行 import unittest from selenium import webdriver from time import sleep driver webdriver.Edge()# driver.maximize_window() driver.implicitly_wait(30) # driver.get(r"https://demo5.tp-shop.cn/") # driver.find_element…

ROS2参数通信原理

执行ros2 service list 由于没有启动任何节点&#xff0c;因此查看服务器列表为空 执行ros2 run turtlesim turtlesim_node 启动服务节点 执行ros2 service list 将返回系统中当前活动的所有服务的列表: 执行 ros2 service call /turtlesim/list_parameters rcl_interfaces/…

证件照肤色不均匀怎么处理 证件照肤色调整最简单方式 证件照肤色很白符合要求吗 证件照制作软件免费下载

在我们的日常生活中&#xff0c;证件照扮演着至关重要的角色。它不仅是身份识别的关键&#xff0c;更是我们在各种正式场合展示自己形象的重要一环。那么今天我们就来聊聊关于证件照肤色不均匀怎么处理的问题及证件照肤色调整最简单方式。 一、证件照肤色不均匀怎么处理 对于…

面试官:你了解git cherry-pick吗

事情要从一次不规范的代码开发开始说起 背景故事 时间 2024年某个风平浪静的周五晚上 地点 中国&#xff0c;北京&#xff0c;西二旗&#xff0c;某互联网大厂会议室 人物 小杰&#xff0c;小A&#xff0c;小B&#xff0c;老K 对话 老K&#xff1a;昨天提交的代码被测试打回来…

数据结构——带环链表、循环队列问题

1.带环链表问题 1.1给定一个链表判断其是否带环 解决思路&#xff1a;利用快慢指针法&#xff0c;快指针一次走两步慢指针一次走一步&#xff0c;从链表起始位置遍历链表&#xff0c;如果链表带环&#xff0c;则快慢指针一定会在环中相遇&#xff0c;否则快指针先到达链表末尾…

OpenSSH Server 远程代码执行漏洞(CVE-2024-6387)(附代码)

OpenSSH Server 远程代码执行漏洞&#xff08;CVE-2024-6387&#xff09;&#xff08;附代码&#xff09; 前言影响范围验证脚本1.python2.C? 参考链接 前言 2024年7月1日&#xff0c;OpenSSH 官方发布安全通告&#xff0c;披露CVE-2024-6387 OpenSSH Server 远程代码执行漏洞…

【084】基于SpringBoot实现的家乡特色推荐系统

系统介绍 视频演示 点击查看演示视频 基于SpringBoot实现的家乡特色推荐系统主要采用SpringBootVue进行开发&#xff0c;系统整体分为管理员、用户两种角色&#xff0c;主要功能包括首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;文章分类管理&#xff0c;文章分…

鸿蒙开发Ability Kit(程序访问控制):【对所有应用开放】

对所有应用开放 在申请目标权限前&#xff0c;建议开发者先阅读[申请应用权限]&#xff0c;对权限的工作流程有基本了解后&#xff0c;再结合以下权限字段的具体说明&#xff0c;判断应用能否申请目标权限&#xff0c;提高开发效率。 说明&#xff1a; 权限级别为normal的权限…

Sharding-JDBC分库分表的基本使用

前言 传统的小型应用通常一个项目一个数据库&#xff0c;单表的数据量在百万以内&#xff0c;对于数据库的操作不会成为系统性能的瓶颈。但是对于互联网应用&#xff0c;单表的数据量动辄上千万、上亿&#xff0c;此时通过数据库优化、索引优化等手段&#xff0c;对数据库操作…

新手教学系列——【Python开发】不同系统更换pip源的方法

在使用Python进行开发时,你可能会发现使用pip安装包的速度较慢,尤其是在国内进行操作时。为了提高安装速度,我们可以将pip的默认源更换为国内的一些镜像源。本文将详细介绍如何在不同操作系统上进行这一操作,并给出常用的国内镜像源。 为什么要换源 pip默认使用的是官方的…

嵌入式Linux系统编程 — 6.2 signal和 sigaction信号处理函数

目录 1 信号如何处理 2 signal()函数 2.1 signal()函数介绍 2.2 示例程序 3 sigaction()函数 3.1 sigaction()函数介绍 3.2 示例程序 1 信号如何处理 信号通常是发送给对应的进程&#xff0c;当信号到达后&#xff0c; 该进程需要做出相应的处理措施&#xff0c;可以通…

IP地址与电商企业

网购作为我们现代生活不可或缺的部分&#xff0c;现如今电商企业蓬勃发展。 IP地址是网络世界中每一台设备的独特标识符&#xff0c;就像现实世界中每家每户的门牌号。对于电商企业而言&#xff0c;它在很多方面方面发挥着作用。 IP地址能够帮助电商企业精准地确定用户所在的地…

从理论到实践的指南:企业如何建立有效的EHS管理体系?

企业如何建立有效的EHS管理体系&#xff1f;对于任何企业&#xff0c;没有安全就谈不上稳定生产和经济效益&#xff0c;因此建立EHS管理体系是解决企业长期追求的建立安全管理长效机制的最有效手段。良好的体系运转&#xff0c;可以最大限度地减少事故发生。 这篇借着开头这个…

要不要从单片机转Linux?进来看看大神怎么说

在开始前刚好我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「单片机的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#xff01;究竟要不要从单片机转Linu…

对象的引用和常引用

前面曾介绍&#xff1a;一个变量的引用就是变量的别名。实际上&#xff0c;引用是一个指针常量&#xff0c;用来存放该变量的地址。如果形参为变量的引用&#xff0c;实参为变量名&#xff0c;则在调用函数进行虚实结合时&#xff0c;把实参变量的地址传给形参&#xff08;引用…

2024 年江西省研究生数学建模竞赛题目 B题投标中的竞争策略问题---完整文章分享(仅供学习)

问题&#xff1a; 招投标问题是企业运营过程中必须面对的基本问题之一。现有的招投标平台有国家级的&#xff0c;也有地方性的。在招投标过程中&#xff0c;企业需要全面了解招标公告中的相关信息&#xff0c;在遵守招投标各种规范和制度的基础上&#xff0c;选择有效的竞争策…

工业交换机端口统计功能

工业交换机端口统计功能不仅是一项技术手段&#xff0c;更是一双透视企业网络健康状态的慧眼。通过这一功能&#xff0c;企业能够实时捕捉到网络中每一个端口的流量情况&#xff0c;这不仅仅是数据的积累&#xff0c;更是对网络脉搏的精准把握。当网络的每一个脉动都被记录在案…

【每日一练】Python遍历循环

1. 情节描述&#xff1a;上公交车(10个座位)&#xff0c;并且有座位就可以坐下 要求&#xff1a;输入公交卡当前的余额&#xff0c;只要超过2元&#xff0c;就可以上公交车&#xff1b;如果车上有空座位&#xff0c;才可以上。 seat 10 while seat > 0:money int(input(…

springboot个人证书管理系统-计算机毕业设计源码16679

摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了个人证书管理系统的开发全过程。通过分析个人证书管理系统管理的不足&#xff0c;创建了一个计算机管理个人证书管理系统的方案。文章介绍了个人证书管理系统的系…

计算机组成原理 | CPU子系统(4)MIPS32架构-单周期处理器设计

R型运算指令通路 I型运算指令通路 I型访存指令数据通路 I型分支 J型j指令 重新布局 继续整合通路&#xff1a;MUX多路选择器 控制方式 硬连接控制方式&#xff1a;依靠电路 微命令控制&#xff1a;将指令转换为微命令 控制信号的整理和编码 控制系统的两级控制方案 ALU控制器&…