RabbitMQ定时消息怎么实现?手把手教你延时自动取消功能
在电商还有在线服务当中,要是订单出现超时却还没有支付的情况,那么就会被自动取消,这可是一项相当关键的功能哟,并且其背后的技术达成,会直接对用户体验以及系统可靠性有着影响 。
延时消息的应用场景
不是只有订单取消这一情况会用到延时消息,它还在极为广泛的范围里被用于优惠券过期提醒,用于预约超时释放,以及用于自动确认收货等诸多场景当中。比如说,在2024年占据主流地位那一批电商平台,用户下单操作之后,通常会有30分钟支付时间限制,一旦超出这时间限制,系统就会自动关闭订单,还会释放库存 。
该种机制能够有效地防止商品遭到长时间占据,起到了保障库存流动性的作用,也切实保障了其他用户的购买权利。除电子商务之外,可以提供在线挂号咨询以及处理酒店预订的这类生活服务同样依赖此种技术以管理有限的一些资源。
技术实现的核心思路
有个被称作“延时消息”的东西 ,它处于达成自动取消这件事的关键位置 。系统在创建订单的时候 ,会向消息中间件发送一条设置了延迟时长的消息 。这条消息不会马上就被消费 ,而是要等到指定的时间过去以后 ,才会被推送给消费者去进行处理 。
当消费者收到这条呈现超时状况的消息后,其会着手去核查订单的状态。要是订单在当下依旧处于“待支付”的状态维度中,那么就会施行取消方面的逻辑,比如针对订单状态加以更新操作,以及开展归还库存的行为。要是订单已然完成支付动作,那就会径直把该消息予以丢弃,借此来保证业务达成正确性。
org.springframework.boot spring-boot-starter-amqp
主流技术方案选型
存有三种用于达成延时消息的方案,其一借助消息队列自带功能,比如类似RocketMQ的延时等级或RabbitMQ的死信队列,其二基于Redis的键过期通知,以此通过监听Key过期事件来触发回调 。
第一种情况是,存在一种运用时间轮算法,依靠自身来实现调度的方式。在实际特定项目范畴内,RocketMQ 的方案因成熟稳定这一特性而被广泛使用采纳。它提供了 18 个预定义的延时等级,这些等级时长各不相同,范围从 1 秒开始一直到 2 小时,能够满足大多数业务的各类需求。
#消息队列
spring:
rabbitmq:
host: 192.168.88.130
port: 5672
virtual-host: my_vhost #使用的虚拟主机
username: root
password: root
listener:
simple:
acknowledge-mode: manual #开启手动应答
基于RocketMQ的实践配置
要是运用RocketMQ,那首先就得在项目里头去引入依赖。对于Java项目来讲,通常是在Maven的pom.xml文件当中增添org.apache.rocketmq:rocketmq-spring-boot-starter依赖,而且还要指定适宜的版本号,比如说2.3.0 。
随后,在应用的配置文件当中进行连接设置操作,这就需要明确NameServer的地址,也要明确生产者组名等关键信息。依靠这些配置,让应用能够准确连接上RocketMQ集群,进而为后续的消息收发搭建基础。
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
/**
* 订单交换机
*/
public static final String ORDER_EXCHANGE = "order_exchange";
/**
* 订单队列
*/
public static final String ORDER_QUEUE = "order_queue";
/**
* 订单路由键
*/
public static final String ORDER_ROUTING = "order_routing";
/**
* 死信交换机
*/
public static final String ORDER_DEAD_EXCHANGE = "order_dead_exchange";
/**
* 死信队列
*/
public static final String ORDER_DEAD_QUEUE = "order_dead_queue";
/**
* 死信路由键
*/
public static final String ORDER_DEAD_ROUTING = "order_dead_routing";
/**
* 订单交换机
*/
@Bean("orderExchange")
public Exchange getOrderExchange() {
return new DirectExchange(ORDER_EXCHANGE);
}
/**
* 订单队列
*/
@Bean("orderQueue")
public Queue getOrderQueue() {
Map map = new HashMap<>(3);
map.put("x-dead-letter-exchange", ORDER_DEAD_EXCHANGE);//死信交换机
map.put("x-dead-letter-routing-key", ORDER_DEAD_ROUTING);//死信路由键
map.put("x-message-ttl", 1000 * 60 * 15);//队列过期时间
return QueueBuilder
.durable(ORDER_QUEUE)
.withArguments(map)
.build();
}
/**
* 将订单交换机与订单队列绑定
*/
@Bean
Binding orderExchangeBindingOrder(@Qualifier("orderExchange") Exchange exchange,
@Qualifier("orderQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(ORDER_ROUTING).noargs();
}
/**
* 死信交换机
*/
@Bean("orderDeadExchange")
public Exchange getOrderDeadExchange() {
return new DirectExchange(ORDER_DEAD_EXCHANGE);
}
/**
* 死信队列
*/
@Bean("orderDeadQueue")
public Queue getOrderDeadQueue() {
return new Queue(
ORDER_DEAD_QUEUE,//队列名
true,//是否持久化
false,//是否具有排他性,只在首次声明时可见,不允许其他用户访问,连接断开时自动删除
false,//是否自动删除,经历过至少一次连接后,所有消费者都断开了连接,此队列会自动删除
null
);
}
/**
* 将死信交换机与死信队列绑定
*/
@Bean
Binding deadExchangeBindingDeadQueue(@Qualifier("orderDeadExchange") Exchange exchange,
@Qualifier("orderDeadQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_ROUTING).noargs();
}
}
消息的发送与消费逻辑
从发送端的角度而言,需要去创建一个消息生产者。在发送延时消息的时候,要调用setDelayTimeLevel方法来设定延迟的级别。举例来说,如果将延时等级设定为3,那么对应的就是在10秒之后进行投递。消息体当中应该包含业务标识,诸如订单编号这类的。
在消费的那一端呀,这儿要去创建出一个消息监听器呢,此监听器得去实现那个RocketMQListener接口哟,在onMessage方法之内编写业务逻辑的呢,消费那方面得做到幂等处理呀,原因是网络出现波动的状况呀会致使消息重复进行投递的哟。
import com.sky.configuration.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
/**
* 消息队列发送消息
*/
@Component
public class SendRabbitMQ {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @param orderId 15分钟后要检查的订单编号
*/
public void sendDelayOrder(Long orderId) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.ORDER_EXCHANGE,//订单交换机
RabbitMQConfig.ORDER_ROUTING,//订单路由键
orderId//要取消的订单编号
);
}
}
项目中的集成与注意事项
在 Spring Boot 项目当中,通常会将消息发送的那个类型给包裹成服务,并且依靠 @Autowired 把它注入到订单服务当中 。当订单建立成功这个时候,很快就调用此服务的发送办法去发送延时消息 。
务必留意的是,消息中间件自身并非绝对可靠,于生产环境中,需考量消息发送失败时的重试机制,且要构建补偿任务,还要定期扫描数据库中状态有异常情形的订单,这作为一种兜底举措,以保障能做到万无一失。
在实际项目当中,当处于应对类似需求的状况之时,遇到过什么样的出乎人意料的困难,或者饶富于趣味的解决办法呢?欢迎前往评论区域去分享自身的经历。
import com.rabbitmq.client.Channel;
import com.sky.configuration.RabbitMQConfig;
import com.sky.mapper.OrderMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
/**
* 消息队列接收消息
*/
@Component
public class ReceiveRabbitMQ {
@Autowired
private OrderMapper orderMapper;
/**
* @param orderId 要取消的订单的编号
* @param msg 包含了要回复的队列
* @param channel 有回复功能的参数
*/
@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_QUEUE)
public void ReceiveDeadOrder(Long orderId, Channel channel, Message msg) throws IOException {
orderMapper.delCancelOrder(orderId);//查询数据库,订单是否付款,未付款:改为已取消
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),//应答的消息
false//是否批量应答
);
}
} 热门文章排行
- 共享,正从风口到风险
- 走进涂料市场的秘密
- 在人工智能炒热机器人时,也被人把风带进了
- 生物涂料有什么好处?
- 智能音箱,正走在智能手表的老路上
- “去乐视化”之后,新易到的机会在哪儿?
- 解锁央媒投稿新路径!专业平台帮你提升内容
- 广州首批配售型保障房筹建顺利,近期将公布
- 广州今年首批经适房申购超火,1792套房
- 家用美容仪受青睐,市场蓬勃发展却乱象丛生
最新资讯文章
- 兴国老区双减探索:乡村中小学课后延时服务
- 广州首批配售型保障房筹建顺利,近期将公布
- 2026年QQ群发消息方法大揭秘,超实用
- 网传广州天河区广氮花园经适房消息,真相究
- 嵊州彩站中双色球引热议,聊聊双色球背后那
- 盘锦市大洼区入选首批美丽乡村先行区,探索
- 广州2026年经适房新消息:时隔两年推5
- 盘锦大洼区:从农业大县到商贸新城的华丽转
- 广州今年首批经适房申购超火,1792套房
- 11月20日盘锦市大洼区第二届委员会召开
- 广州加大住房保障力度,2024年计划筹建
- 广州2026年经适房啥情况?在建建成未分
- 北京地铁调价方案公布,新票价体系最高 1
- 广州今年迎保障房进展,城隽雅苑3月交付,
- 去年末多家媒体报道:2024年北京地铁票
- 海宁新闻频道在哪看?官方APP实时直播大
- 台湾中天新闻台被重罚,黄智贤节目停播,台
- 对用AI生成假新闻这事,你怎么看?网友已
- 洛天依吸毒假唱传闻真相大揭秘,别再被谣言
- 社交媒体咋成假新闻扩散帮凶?咱该咋应对?





