一句话概括需求就是:Allow Laravel to process non-laravel queue job.
小伙伴们应该都清楚在Laravel
中的队列体系,是把实现了你的Job类进行序列化之后在队列中传输,消费者一方通过反序列化恢复对象,所以在Job类中我们可以完整传递信息,如Eloquent\Model 等,但是如果生产者不是Laravel/Lumen体系的服务,投递到队列的消息也不是Queueable的对象,那Laravel Queue就无法正常解析,并且抛出异常。
例子可看下方代码
<?php namespace App\Jobs;use Illuminate\Bus\Queueable;use Illuminate\Queue\SerializesModels;use Illuminate\Queue\InteractsWithQueue;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Foundation\Bus\Dispatchable;use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;class GatewayJob implements ShouldQueue{ use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; /** * Create a new job instance. * * @return void */ public function __construct() { // } /** * Execute the job. * * @return void */ public function handle(RabbitMQJob $job, array $data) { dump($data); dump($job->getRawBody()); $job->delete(); }}
从上图可以看到,我们的业务通过微信网关(swoole)接受微信开放平台消息,在根据业务路由规则分发到下游服务,其中转发消息分为实时和异步,实时就不说了,异步就是微信网关将消息投递到消息队列(RabbitMQ),最后由消费者(Laravel)进行消息处理。
基于OpenRestry 的Kong网关用于处理用户h5侧的请求分发,基于下游服务大部分是swoole实现的内存常驻性,借助Kong API网关的优势:动态路由,健康检查,限流,可开发插件(e.g.Cl5,秒级监控),更有助于我们把控服务流量入口。
基于swoole实现的微信网关,只专注于接受微信开放平台的密文,解密后分发至各个需要的下游服务。
Laravel作为我们整套微服务体系的管理后台,既然是管理后台,当然还是单体式开发更舒适。再者管理后台已经聚集了所有数据对象的操作模型,那写消费者逻辑就更高效。至于消费者进程的运行方式是Supervisor+Laravel Queue,本身就是内存常驻型+KeepAlived,不担心传统LNMP架构的效率问题。
可以,但不优雅,不喜欢!
其实对于Allow Laravel to process non-laravel queue job这个问题还是比较有普遍性,毕竟生产者和消费者不是用一个框架,甚至不同语种都是很正常的。网上就有人问:“我的生产者是NodeJS,消费者是Laravel。。。不知道该怎么办。”
Illuminate\Queue\Jobs\Job类中的fire方法一直往下跟,你就会得到答案:
/** * Fire the job. * * @return void */ public function fire() { $payload = $this->payload(); list($class, $method) = JobName::parse($payload['job']); // 在这里解析队里中的Job格式 with($this->instance = $this->resolve($class))->{$method}($this, $payload['data']); }
最后追到Illuminate\Support\Traits\Macroable\Str中的parseCallback方法你就得到答案了:
/** * Parse a Class@method style callback into class and method. * * @param string $callback * @param string|null $default * @return array */ public static function parseCallback($callback, $default = null) { return static::contains($callback, '@') ? explode('@', $callback, 2) : [$callback, $default]; }
假设我想在队列中传输数据,指定消费者为App\Jobs\GatewayJob类的handle方法处理,那么能够让Laravel正确解析的数据结构(json)为:
{ "job": "App\\Jobs\\GatewayJob@handle", "data": { "payload": { "ToUserName": "gh_47a4043b185f", "FromUserName": "o9Le4wyQ08reKM2bn6evr1--YpOE", "CreateTime": "1551346579", "MsgType": "text", "Content": "kok波洛克", "MsgId": "22209944230247159" }, "extra": { "wx-appid": "wxcc5d27e1c808d79c", "timestamp": 1551346579, "area": "1", "platid": "0", "partition": "10071", "charac-name": "andychai", "charac-no": "2299733847", "unionid": "o9mcu0yK2GqgMJ9ZOKBV9XOzdC9A", "has-game-role": 1 } }}
job | data |
---|---|
声明处理Job的类名和方法 | 真实传输的数据 |
在回到我们之前make好的GatewayJob中看:
/** * @param RabbitMQJob $job Job父类本身(这里是RabbitMQJob的子类实现) * @param array $data 真实数据 */ public function handle(RabbitMQJob $job, array $data) { dump($data); dump($job->getRawBody()); $job->delete(); }
唯一让人不爽的是,生产者这一侧非要知道消费者的Job类才能够正常传递,感觉上有点怪怪的。但由于我们的微信网关的路由配置,本身也是能够在管理端动态配置,并且实时生效,所以这个问题也就是多加一个字段就解决了。
有话要说