laravel队列实现阶梯降频轮询 | 开源可商用方案 | laravel china 社区-金年会app官方网

例如有这样的场景,某电商系统的订单支付状态需要和上游的支付系统保持实时同步,需要每秒检查一次订单状态,如果订单状态为未支付,则继续检查,直到订单已完成。

然而针对每一笔订单,每秒检查一次会对服务器和上游api造成很大的负载。

如果我们能动态调整检查频率,比如前五次尝试中每1秒检查一次订单状态, 然后在接下来的五次尝试中每5秒检查一次,依此类推。这样既能减少服务器和外部api的负载,又能确保及时更新

我们给他起个名字叫做阶梯降频轮询

真实的例子比如对接支付宝时,支付成功的异步通知以逐渐减低请求频次的的方式通知业务方:


在进行异步通知交互时,如果支付宝收到的应答不是 success ,支付宝会认为通知失败,会通过一定的策略定期重新发起通知。通知的间隔频率为:4m、10m、10m、1h、2h、6h、15h。

准备工作

创建一个空的 laravel 11 项目,接下来我们来实现每间隔几秒钟请求上游api同步订单状态,同时确保控制好频率,不要发起太多请求,也不要在短时间内发起重复请求。

设置队列任务

创建基础任务类

首先,我们需要一个基础任务类来实现轮询逻辑。下面是一个 basepollingjob 类的示例,它实现了基本的轮询功能:

namespace app\jobs;
use illuminate\contracts\queue\shouldbeunique;
use illuminate\contracts\queue\shouldqueue;
use illuminate\foundation\queue\queueable;
use illuminate\queue\maxattemptsexceededexception;
use illuminate\support\facades\log;
abstract class basepollingjob implements shouldqueue, shouldbeunique
{
    use queueable;
    protected $jobdesc = '任务轮询队列';
    protected $jobpayload;
    const timeout_second = 60 * 3;
    public function retryuntil()
    {
        return now()->addseconds(self::timeout_second);
    }

在这个类中,我们定义了任务的描述、有效负载和重试的超时时间。retryuntil 方法用于指定任务的最大重试时间,如果超过该时间队列任务尚未完成,laravel-queue会抛出一个异常maxattemptsexceededexception表示已达最大重试次数,业务上可以在fail()方法里标记处理超时。

调度下次运行

接下来,我们实现一个 schedulenextrunning 方法,根据尝试次数调节下次执行的时间间隔:

    protected function schedulenextrunning()
    {
        $attempts = $this->job->attempts();
        if ($attempts <= 5) {
            $this->release(1); // 前5次,每隔1秒执行一次
        } elseif ($attempts <= 10) {
            $this->release(5); // 接下来5次,每隔5秒执行一次
        } elseif ($attempts <= 20) {
            $this->release(10); // 接下来10次,每隔10秒执行一次
        } else {
            $this->release(30); // 超过20次后,每隔30秒执行一次
        }
    }

这个方法根据当前的尝试次数来决定下次任务的执行间隔,确保在初始阶段短时间间隔检查,然后逐渐减少频率。

处理任务失败

在任务失败时或者处理超时,我们需要记录错误并执行相应的逻辑:

    public function failed(\throwable $exception)
    {
        if ($exception instanceof maxattemptsexceededexception) {
            log::info($this->jobdesc . '超时退出执行', [
                'payload' => $this->jobpayload,
                'error' => $exception->getmessage(),
            ]);
            $this->aftermaxattemptsexceeded();
        } else {
            log::error($this->jobdesc . '异常失败', [
                'payload' => $this->jobpayload,
                'error' => $exception->getmessage(),
                'file' => $exception->getfile(),
                'line' => $exception->getline(),
                'trace' => $exception->gettrace()
            ]);
        }
    }

在这里,我们可以记录错误信息,并根据需要进行后续处理。

实现轮询任务

接下来,我们创建一个具体的轮询任务类 pollingorderstatusjob,用于检查订单状态:

namespace app\jobs;
use app\enums\orderstatus;
use app\models\order;
use illuminate\support\facades\log;
class pollingorderstatusjob extends basepollingjob
{
    protected $jobdesc = '订单同步队列';
    private order $order;
    public function __construct(order $order)
    {
        $this->order = $order;
        $this->jobpayload = $order->toarray();
    }

在这个类中,我们定义了任务的描述和订单对象。在构造函数中,我们将订单信息存储为有效负载,以便后续使用。

为了确保同一个订单号不会重复入队列,我们需设置订单号为唯一锁获取的依据。

    // 使用订单号来获取唯一锁
    public function uniqueid()
    {
        return $this->order->trade_no;
    }

处理任务逻辑

handle 方法中,我们实现具体的任务逻辑:

    public function handle()
    {
        try {
            log::info($this->jobdesc . '开始执行', [$this->order->id, $this->order->trade_no]);
            // 前置检测:订单状态如果已是终态,无需操作,退出队列
            if (!$this->checkorderbeforepolling($this->order)) {
                $this->delete();
                return;
            }
            // 查询订单状态:比如发起一个http请求,获取最新状态
            if (!$this->orderquery($this->order)) {
                $this->delete();
                return;
            }
            // 调度下次运行的时机
            $this->schedulenextrunning();
        } catch (\throwable $e) {
            $this->fail($e);
        }
    }

在这个方法中,我们首先记录任务开始的日志。接着,调用 checkorderbeforepolling 方法检查订单是否需要继续轮询。如果不需要,则删除任务并返回。然后,我们调用 orderquery 方法查询订单状态,最后调度下次运行。

前置检测

checkorderbeforepolling 方法中,如果订单状态已是终态,比如已超时或已完成,就不需要同步了。防止因上层业务异常,这里可以做拦截。

    private function checkorderbeforepolling(order $order): bool
    {
        $orderstatus = orderstatus::from($order->status);
        if (!in_array($orderstatus, [orderstatus::default, orderstatus::paying])) {
            return false;
        }
        return true;
    }

查询订单状态

orderquery 方法模拟查询订单状态的过程:

    private function orderquery(order $order)
    {
        // 模拟订单查询请求
        sleep(1);
        // 支付成功返回
//        $result = [
//            'amount' => 100,
//            'payment_no' => 'p123456',
//            'status' => 'success'
//        ];
        // 正在支付中返回
        $result = [
            'amount' => null,
            'payment_no' => null,
            'status' => 'pending'
        ];
       return $result['status'] == 'pending';
    }

这里我们简单模拟了一个查询请求,实际应用中可能会调用外部 api。

处理超时

    public function aftermaxattemptsexceeded()
    {
        try {
            $this->order->status = orderstatus::timeout->value;
            $this->order->save();
            log::info($this->jobdesc . '-更新订单状态为超时', [$this->order->trade_no]);
        } catch (\throwable $e) {
            log::error($this->jobdesc . '-超时后置逻辑执行异常', [$this->order, $e->getmessage()]);
        }
    }

最后

完整代码包括日志记录、数据库初始化、手把手教程,开箱即用。已开源在这里:

来自亿级别流水项目实战经验总结,开源不易,还请各位看官点个爱心/star,您的支持是我前进的动力~

本作品采用《cc 协议》,转载必须注明作者和本文链接
本帖由系统于 1周前 自动加精
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
以构建论坛项目 larabbs 为线索,展开对 laravel 框架的全面学习。应用程序架构思路贴近 laravel 框架的设计哲学。
讨论数量: 1

思路不錯

1周前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
未填写
文章
3
粉丝
4
喜欢
23
收藏
19
排名:1567
访问:2021
所有博文
博客标签
社区赞助商
网站地图