乐竟·(中国)体育智能科技股份有限公司官网

全国加盟咨询热线:

400-123-4567

当前位置: 首页 > 新闻动态

RabbitMQ定时消息怎么实现?手把手教你延时自动取消功能

文章作者:小编 浏览次数:发表时间:2025-12-16 04:40:16

在电商还有在线服务当中,要是订单出现超时却还没有支付的情况,那么就会被自动取消,这可是一项相当关键的功能哟,并且其背后的技术达成,会直接对用户体验以及系统可靠性有着影响 。

延时消息的应用场景

不是只有订单取消这一情况会用到延时消息,它还在极为广泛的范围里被用于优惠券过期提醒,用于预约超时释放,以及用于自动确认收货等诸多场景当中。比如说,在2024年占据主流地位那一批电商平台,用户下单操作之后,通常会有30分钟支付时间限制,一旦超出这时间限制,系统就会自动关闭订单,还会释放库存 。

该种机制能够有效地防止商品遭到长时间占据,起到了保障库存流动性的作用,也切实保障了其他用户的购买权利。除电子商务之外,可以提供在线挂号咨询以及处理酒店预订的这类生活服务同样依赖此种技术以管理有限的一些资源。

技术实现的核心思路

有个被称作“延时消息”的东西 ,它处于达成自动取消这件事的关键位置 。系统在创建订单的时候 ,会向消息中间件发送一条设置了延迟时长的消息 。这条消息不会马上就被消费 ,而是要等到指定的时间过去以后 ,才会被推送给消费者去进行处理 。

当消费者收到这条呈现超时状况的消息后,其会着手去核查订单的状态。要是订单在当下依旧处于“待支付”的状态维度中,那么就会施行取消方面的逻辑,比如针对订单状态加以更新操作,以及开展归还库存的行为。要是订单已然完成支付动作,那就会径直把该消息予以丢弃,借此来保证业务达成正确性。



    org.springframework.boot
    spring-boot-starter-amqp

主流技术方案选型

存有三种用于达成延时消息的方案,其一借助消息队列自带功能,比如类似RocketMQ的延时等级或RabbitMQ的死信队列,其二基于Redis的键过期通知,以此通过监听Key过期事件来触发回调 。

第一种情况是,存在一种运用时间轮算法,依靠自身来实现调度的方式。在实际特定项目范畴内,RocketMQ 的方案因成熟稳定这一特性而被广泛使用采纳。它提供了 18 个预定义的延时等级,这些等级时长各不相同,范围从 1 秒开始一直到 2 小时,能够满足大多数业务的各类需求。

#消息队列
spring:
  rabbitmq:
    host: 192.168.88.130
    port: 5672
    virtual-host: my_vhost #使用的虚拟主机
    username: root
    password: root
    listener:
      simple:
        acknowledge-mode: manual #开启手动应答

基于RocketMQ的实践配置

要是运用RocketMQ,那首先就得在项目里头去引入依赖。对于Java项目来讲,通常是在Maven的pom.xml文件当中增添org.apache.rocketmq:rocketmq-spring-boot-starter依赖,而且还要指定适宜的版本号,比如说2.3.0 。

随后,在应用的配置文件当中进行连接设置操作,这就需要明确NameServer的地址,也要明确生产者组名等关键信息。依靠这些配置,让应用能够准确连接上RocketMQ集群,进而为后续的消息收发搭建基础。

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
    /**
     * 订单交换机
     */
    public static final String ORDER_EXCHANGE = "order_exchange";
    /**
     * 订单队列
     */
    public static final String ORDER_QUEUE = "order_queue";
    /**
     * 订单路由键
     */
    public static final String ORDER_ROUTING = "order_routing";
    /**
     * 死信交换机
     */
    public static final String ORDER_DEAD_EXCHANGE = "order_dead_exchange";
    /**
     * 死信队列
     */
    public static final String ORDER_DEAD_QUEUE = "order_dead_queue";
    /**
     * 死信路由键
     */
    public static final String ORDER_DEAD_ROUTING = "order_dead_routing";
    /**
     * 订单交换机
     */
    @Bean("orderExchange")
    public Exchange getOrderExchange() {
        return new DirectExchange(ORDER_EXCHANGE);
    }
    /**
     * 订单队列
     */
    @Bean("orderQueue")
    public Queue getOrderQueue() {
        Map map = new HashMap<>(3);
        map.put("x-dead-letter-exchange", ORDER_DEAD_EXCHANGE);//死信交换机
        map.put("x-dead-letter-routing-key", ORDER_DEAD_ROUTING);//死信路由键
        map.put("x-message-ttl", 1000 * 60 * 15);//队列过期时间
        return QueueBuilder
                .durable(ORDER_QUEUE)
                .withArguments(map)
                .build();
    }
    /**
     * 将订单交换机与订单队列绑定
     */
    @Bean
    Binding orderExchangeBindingOrder(@Qualifier("orderExchange") Exchange exchange,
                                      @Qualifier("orderQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(ORDER_ROUTING).noargs();
    }
    /**
     * 死信交换机
     */
    @Bean("orderDeadExchange")
    public Exchange getOrderDeadExchange() {
        return new DirectExchange(ORDER_DEAD_EXCHANGE);
    }
    /**
     * 死信队列
     */
    @Bean("orderDeadQueue")
    public Queue getOrderDeadQueue() {
        return new Queue(
                ORDER_DEAD_QUEUE,//队列名
                true,//是否持久化
                false,//是否具有排他性,只在首次声明时可见,不允许其他用户访问,连接断开时自动删除
                false,//是否自动删除,经历过至少一次连接后,所有消费者都断开了连接,此队列会自动删除
                null
        );
    }
    /**
     * 将死信交换机与死信队列绑定
     */
    @Bean
    Binding deadExchangeBindingDeadQueue(@Qualifier("orderDeadExchange") Exchange exchange,
                                         @Qualifier("orderDeadQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_ROUTING).noargs();
    }
}

消息的发送与消费逻辑

从发送端的角度而言,需要去创建一个消息生产者。在发送延时消息的时候,要调用setDelayTimeLevel方法来设定延迟的级别。举例来说,如果将延时等级设定为3,那么对应的就是在10秒之后进行投递。消息体当中应该包含业务标识,诸如订单编号这类的。

在消费的那一端呀,这儿要去创建出一个消息监听器呢,此监听器得去实现那个RocketMQListener接口哟,在onMessage方法之内编写业务逻辑的呢,消费那方面得做到幂等处理呀,原因是网络出现波动的状况呀会致使消息重复进行投递的哟。

import com.sky.configuration.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
/**
 * 消息队列发送消息
 */
@Component
public class SendRabbitMQ {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * @param orderId 15分钟后要检查的订单编号
     */
    public void sendDelayOrder(Long orderId) {
        rabbitTemplate.convertAndSend(
                RabbitMQConfig.ORDER_EXCHANGE,//订单交换机
                RabbitMQConfig.ORDER_ROUTING,//订单路由键
                orderId//要取消的订单编号
        );
    }
}

项目中的集成与注意事项

在 Spring Boot 项目当中,通常会将消息发送的那个类型给包裹成服务,并且依靠 @Autowired 把它注入到订单服务当中 。当订单建立成功这个时候,很快就调用此服务的发送办法去发送延时消息 。

务必留意的是,消息中间件自身并非绝对可靠,于生产环境中,需考量消息发送失败时的重试机制,且要构建补偿任务,还要定期扫描数据库中状态有异常情形的订单,这作为一种兜底举措,以保障能做到万无一失。

在实际项目当中,当处于应对类似需求的状况之时,遇到过什么样的出乎人意料的困难,或者饶富于趣味的解决办法呢?欢迎前往评论区域去分享自身的经历。

import com.rabbitmq.client.Channel;
import com.sky.configuration.RabbitMQConfig;
import com.sky.mapper.OrderMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
/**
 * 消息队列接收消息
 */
@Component
public class ReceiveRabbitMQ {
    @Autowired
    private OrderMapper orderMapper;
    /**
     * @param orderId 要取消的订单的编号
     * @param msg     包含了要回复的队列
     * @param channel 有回复功能的参数
     */
    @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_QUEUE)
    public void ReceiveDeadOrder(Long orderId, Channel channel, Message msg) throws IOException {
        orderMapper.delCancelOrder(orderId);//查询数据库,订单是否付款,未付款:改为已取消
        channel.basicAck(msg.getMessageProperties().getDeliveryTag(),//应答的消息
                false//是否批量应答
        );
    }
}

热门文章排行

最新资讯文章

回顶部