赶知识网

让Laravel/Lumen队列消费Non-Laravel queue job

2023-03-29 / 501次点击 php/mysql/apache Laravel Lumen

如何让Laravel/Lumen作为消费者处理非Laravel/Lumen生产的消息?

一句话概括需求就是:Allow Laravel to process non-laravel queue job.

小伙伴们应该都清楚在Laravel中的队列体系,是把实现了你的Job类进行序列化之后在队列中传输,消费者一方通过反序列化恢复对象,所以在Job类中我们可以完整传递信息,如Eloquent\Model 等,但是如果生产者不是Laravel/Lumen体系的服务,投递到队列的消息也不是Queueable的对象,那Laravel Queue就无法正常解析,并且抛出异常。

例子可看下方代码

<?php

namespace App\Jobs;use Illuminate\Bus\Queueable;use Illuminate\Queue\SerializesModels;use Illuminate\Queue\InteractsWithQueue;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Foundation\Bus\Dispatchable;use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;class GatewayJob implements ShouldQueue{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle(RabbitMQJob $job, array $data)
    {
        dump($data);
        dump($job->getRawBody());
        $job->delete();
    }}

业务背景

AWP物理架构.png

从上图可以看到,我们的业务通过微信网关(swoole)接受微信开放平台消息,在根据业务路由规则分发到下游服务,其中转发消息分为实时异步实时就不说了,异步就是微信网关将消息投递到消息队列(RabbitMQ),最后由消费者(Laravel)进行消息处理。

解释几个大家可能会问的问题:

为什么架构图中有两个网关?

  • 基于OpenRestryKong网关用于处理用户h5侧的请求分发,基于下游服务大部分是swoole实现的内存常驻性,借助Kong API网关的优势:动态路由,健康检查,限流,可开发插件(e.g.Cl5,秒级监控),更有助于我们把控服务流量入口。

  • 基于swoole实现的微信网关,只专注于接受微信开放平台的密文,解密后分发至各个需要的下游服务。

为什么消费者是Laravel?

Laravel作为我们整套微服务体系的管理后台,既然是管理后台,当然还是单体式开发更舒适。再者管理后台已经聚集了所有数据对象的操作模型,那写消费者逻辑就更高效。至于消费者进程的运行方式是Supervisor+Laravel Queue,本身就是内存常驻型+KeepAlived,不担心传统LNMP架构的效率问题。

微信消息异步通信.png

能不能直接写Laravel Command替代?

可以,但不优雅,不喜欢!


其实对于Allow Laravel to process non-laravel queue job这个问题还是比较有普遍性,毕竟生产者和消费者不是用一个框架,甚至不同语种都是很正常的。网上就有人问:“我的生产者是NodeJS,消费者是Laravel。。。不知道该怎么办。”

解决办法

Illuminate\Queue\Jobs\Job类中的fire方法一直往下跟,你就会得到答案:

/**
     * Fire the job.
     *
     * @return void
     */
    public function fire()
    {
        $payload = $this->payload();

        list($class, $method) = JobName::parse($payload['job']); // 在这里解析队里中的Job格式

        with($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
    }

最后追到Illuminate\Support\Traits\Macroable\Str中的parseCallback方法你就得到答案了:

/**
     * Parse a Class@method style callback into class and method.
     *
     * @param  string  $callback
     * @param  string|null  $default
     * @return array
     */
    public static function parseCallback($callback, $default = null)
    {
        return static::contains($callback, '@') ? explode('@', $callback, 2) : [$callback, $default];
    }

解释

假设我想在队列中传输数据,指定消费者为App\Jobs\GatewayJob类的handle方法处理,那么能够让Laravel正确解析的数据结构(json)为:

{
  "job": "App\\Jobs\\GatewayJob@handle",
  "data": {
    "payload": {
      "ToUserName": "gh_47a4043b185f",
      "FromUserName": "o9Le4wyQ08reKM2bn6evr1--YpOE",
      "CreateTime": "1551346579",
      "MsgType": "text",
      "Content": "kok波洛克",
      "MsgId": "22209944230247159"
    },
    "extra": {
      "wx-appid": "wxcc5d27e1c808d79c",
      "timestamp": 1551346579,
      "area": "1",
      "platid": "0",
      "partition": "10071",
      "charac-name": "andychai",
      "charac-no": "2299733847",
      "unionid": "o9mcu0yK2GqgMJ9ZOKBV9XOzdC9A",
      "has-game-role": 1
    }
  }}

job

data

声明处理Job的类名和方法

真实传输的数据

在回到我们之前make好的GatewayJob中看:

/**
     * @param RabbitMQJob $job Job父类本身(这里是RabbitMQJob的子类实现)
     * @param array $data 真实数据
     */
    public function handle(RabbitMQJob $job, array $data)
    {
        dump($data);
        dump($job->getRawBody());
        $job->delete();
    }

结束语

唯一让人不爽的是,生产者这一侧非要知道消费者的Job类才能够正常传递,感觉上有点怪怪的。但由于我们的微信网关的路由配置,本身也是能够在管理端动态配置,并且实时生效,所以这个问题也就是多加一个字段就解决了。


Top10

沪ICP备09053415号 © 赶知识网