package cc.zeelan.resources.helper.queue; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedQueue; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import cc.zeelan.resources.om.transaction.TranctionOrder; import cn.hutool.core.lang.Validator; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpResponse; import cn.hutool.json.JSONUtil; /** * 队列推送订单通知 */ @Component public class QueueNotifyHelper { /** * 推送请求最大请求错误超过5次,则抛弃该推送 */ private static final int max_counter = 5; /** * 请求超时 */ private static final int time_out = 3000; /** * 该队列是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序 */ private static ConcurrentLinkedQueue<TranctionOrder> queue = new ConcurrentLinkedQueue<TranctionOrder>(); /** * 添加到队列 */ public static void offer(TranctionOrder order) { queue.offer(order); } @Scheduled(cron = "0/5 * * * * ?") public void push() { Iterator<TranctionOrder> iterator = queue.iterator(); while (iterator.hasNext()) { TranctionOrder queueOrder = iterator.next(); if (ObjectUtil.isNotNull(queueOrder) && Validator.isUrl(queueOrder.getNotifyUrl())) {// 当队列元素不为空 ThreadUtil.execAsync(new Runnable() {// 防止远程推送请求,队列阻塞 @Override public void run() { notifyService(queueOrder.getNotifyUrl(), queueOrder); } }); } } } /** * 推送服务 * * @see 因为线程,可能会超出max_counter的数量,但不影响后续业务,所以不需要加锁 * @param notifyUrl 通知URL * @param order 预支付订单 */ private void notifyService(String notifyUrl, TranctionOrder order) { HttpResponse r = HttpRequest.post(notifyUrl).body(JSONUtil.toJsonStr(order)).timeout(time_out).execute().sync(); if (r.isOk()) {// 当请求成功之后,从队列当中移出该元素 queue.remove(order); } else {// 请求失败,放回队列进行补偿操作 order.setCounter(Math.addExact(order.getCounter(), 1)); if (order.getCounter() > max_counter) {// 当错误超过五次,则放弃推送, queue.remove(order); } else {// 添加到队列,继续推送 offer(order); } } } }
本文暂时没有评论,来添加一个吧(●'◡'●)