现在的位置: 首页 > redis > 正文

利用redis和php-resque实现后台任务

2017年10月27日 redis ⁄ 共 7426字 ⁄ 字号 暂无评论

在PHP的页面编程过程中,我们总遇到这样一个问题,即是PHP是一个顺序执行的过程,只能在一个任务完成后接着去实现下一个任务,而这其中存在一个问题,就是假如其中一个任务耗费大量时间的时候,我们可能就必须要等待。借助redis可以将耗时任务放到后台去执行,从而减少等待时间。

Redis 是一个高性能的key-value数据库。可以帮助我们有效的实现后台任务,将耗费大量时间的任务迁移到后台去执行,可以节约很多的时间。

php-resque是来自Ruby的项目Resque的一个PHP扩展,正是由于Resque清晰简单的解决了后台任务带来的一系列问题。

在Resque中后台任务的角色划分:
在Resque中,一个后台任务被抽象为由三种角色共同完成:

Job | 任务 : 一个Job就是一个需要在后台完成的任务,比如发送邮件,就可以抽象为一个Job。在Resque中一个Job就是一个Class。
Queue | 队列 : 也就是上文的消息队列,在Resque中,队列则是由Redis实现的。Resque还提供了一个简单的队列管理器,可以实现将Job插入/取出队列等功能。
Worker | 执行者 : 负责从队列中取出Job并执行,可以以守护进程的方式运行在后台。
  • 1
  • 2
  • 3

那么基于这个划分,一个后台任务在Resque下的基本流程是这样的:

1、将一个后台任务编写为一个独立的Class,这个Class就是一个Job。
2、在需要使用后台程序的地方,系统将Job Class的名称以及所需参数放入队列。
3、以命令行方式开启一个Worker,并通过参数指定Worker所需要处理的队列。
4、Worker作为守护进程运行,并且定时检查队列。
5、当队列中有Job时,Worker取出Job并运行,即实例化Job Class并执行Class中的方法。
  • 1
  • 2
  • 3
  • 4
  • 5

至此就可以完整的运行完一个后台任务。

在Resque中,还有一个很重要的设计:一个Worker,可以处理一个队列,也可以处理很多个队列,并且可以通过增加Worker的进程/线程数来加快队列的执行速度。
  • 1

注:本文中的安装等操作,均在Linux下完成。

步骤一、php-resque的安装
此处可参阅:PHP的轻量消息队列php-resque使用说明

需要提前说明的是,由于涉及到进程的开辟与管理,php-resque使用了php的PCNTL函数,所以只能在Linux下运行,并且需要php编译PCNTL函数。如果希望用Windows做同样的工作,那么可以去找找Resque的其他语言版本,php在Windows下非常不适合做后台任务。
  • 1
  • 2

安装Redis

apt-get install redis-server
  • 1

安装Composer

apt-get install curl
cd /usr/local/bin
curl -s http://getcomposer.org/installer | php
chmod a+x composer.phar
alias composer='/usr/local/bin/composer.phar'
  • 1
  • 2
  • 3
  • 4
  • 5

使用Composer安装php-resque
假设web目录在/opt/htdocs

apt-get install git git-core
cd /opt/htdocs
git clone git://github.com/chrisboulton/php-resque.git
cd php-resque
composer install
  • 1
  • 2
  • 3
  • 4
  • 5

至此php-resque即可完成,可以进行其使用。

步骤二:php-resque的使用

首先需要运行Worker。
此处可参阅:后台任务和PHP-Resque的使用介绍

1、理解Worker的本质
技术上讲一个Worker就是一个不断运行的PHP进程,并且不断监视新的任务并运行。
一个简单的Worker的代码如下:

while (true) {
    $jobs = pullData(); // 从队列中拉取任务

    foreach ($jobs as $class => $args) { // 循环每个找到的任务
        $job = new $class();
        $job->perform($args); // 执行任务
    }
    sleep(300); // 等待5分钟后再次尝试拉取任务
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

以上这些代码的具体实现都可以交给php-resque。创建一个Worker,php-resque需要以下参数:

QUEUE: 需要执行的队列的名字
INTERVAL:在队列中循环的间隔时间,即完成一个任务后的等待时间,默认是5秒
APP_INCLUDE:需要自动载入PHP文件路径,Worker需要知道你的Job的位置并载入Job
COUNT:需要创建的Worker的数量。所有的Worker都具有相同的属性。默认是创建1个Worker
REDIS_BACKEND:Redis服务器的地址,使用 hostname:port 的格式,如127.0.0.1:6379,或localhost:6379。默认是localhost:6379
REDIS_BACKEND_DB:使用的Redis数据库的名称,默认是0
VERBOSE:啰嗦模式,设置“1”为启用,会输出基本的调试信息
VVERBOSE:设置“1”启用更啰嗦模式,会输出详细的调试信息
PREFIX:前缀。在Redis数据库中为队列的KEY添加前缀,以方便多个Worker运行在同一个Redis数据库中方便区分。默认为空
PIDFILE:手动指定PID文件的位置,适用于单Worker运行方式
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

以上参数中只有QUEUE是必须的。如果让Worker监视执行多个队列,可以用逗号隔开多个队列的名称,如:”queue1,queue2,queue3”,队列执行是有顺序的,如上queue2和queue3总是会在queue1后面被执行。

也可以设置QUEUE为*让Worker以字母顺序执行所有的队列。

Worker 必须以CLI方式启动。你不可以从浏览器启动Worker,因为:

你无法从浏览器执行后台任务
PCNTL扩展只能运行在CLI模式
  • 1
  • 2
  • 3

2、启动Worker

可以从resque.php启动Worker,这个位置位于php-resque/bin目录下(也可能不带.php后缀)。
在终端中执行:

cd /path/to/php-resque/bin/
php resque.php
  • 1
  • 2

很显然Worker不会被启动,因为缺少必须的参数QUEUE,程序将会返回如下错误:

Set QUEUE env var containing the list of queues to work.
  • 1

php-resque通过getenv获取参数,所以在启动Worker的时候应该传递环境变量过去。所以应该以下面的方式启动Worker:

QUEUE=notification php resque.php
  • 1

如果启用VVERBOSE模式:

QUEUE=notification VVERBOSE=1 php resque.php
  • 1

终端将会输出:

*** Starting worker KAMISAMA-MAC.local:84499:notification
** [23:48:18 2012-10-11] Registered signals
** [23:48:18 2012-10-11] Checking achievement
** [23:48:18 2012-10-11] Checking notification
** [23:48:18 2012-10-11] Sleeping for 5
** [23:48:23 2012-10-11] Checking achievement
** [23:48:23 2012-10-11] Checking notification
** [23:48:23 2012-10-11] Sleeping for 5
... etc ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

Worker会自动被命名为KAMISAMA-MAC.local:84499:notification,命名的规则是hostname:process-id:queue-names。

如果觉得这种启动方式太麻烦且难记,可以自己手动写一个bash脚本来帮助你启动Resque,如:

EXPORT QUEUE=notifacation
EXPORT VERBOSE=1

php resque.php
  • 1
  • 2
  • 3
  • 4

3、后台运行Worker

通过上面的方法成功启动了Worker,但只有在终端开启的状态下,关闭终端或按下Ctrl+C时Worker就会停止运行。我们可以在命令后面添加一个&来使其后台运行。

QUEUE=notification php resque.php &
  • 1

这样就可以让resque在后台运行。但如果你开启了VERBOSE模式,所有的输出信息将会丢失。所以我们需要在resque后台运行时把输出的信息保存起来。

我们可以使用nohup来保持resque后台运行,即使是在用户登出后。

nohup QUEUE=notification php resque.php &
  • 1

4、确认你的Worker成功运行了

通过管道操作无法知道Worker是否成功启动,当前通过查看log文件中有没有输出* Starting worker …..的内容也可以知道是否启动。

也可以通过查看系统进程的方法确认Worker是否正在运行。

ps -ef|grep resque.php
  • 1

将会输出名称中包含resque.php的进程,其中第二列是进程的PID。
使用这个方法可以很好的知道Worker是否正在运行,以及有没有意外终止。

5、暂停和停止Worker

要停止一个Worker,直接kill掉它的进程就行了。可以通过ps -ef|grep resque.php查看Worker进程的PID。当然通过这个命令你无法知道哪个PID代码的哪个Worker。

如果要结束一个PID是86681的进程:

kill 86681
  • 1

这个命令将会立即结束掉PID为86681的进程及子进程。如果Worker正在执行一个任务也不会等待任务执行完成(未完成的部分将会丢失)。

有一个可以平滑的停止Worker的方法,可以通过给kill命令发送一个SIGSPEC信号来告诉kill应该怎么做,这需要PCNTL扩展的支持。

当然下面所讲述的所有命令都需要PCNTL扩展支持。

通过PCNTL扩展,Worker可以支持以下信号:

QUIT - 等待子进程结束后再结束
TERM / INT - 立即结束子进程并退出
USR1 - 立即结束子进程,但不退出
USR2 - 暂停Worker,不会再执行新任务
CONT - 继续运行Worker
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

当没有信号发出时默认是TERM / INT信号。

如果想在所有当前正在运行的任务都完成后再停止,使用QUIT信号:

kill -QUIT YOUR-WORKER-PID
  • 1

结束所有子进程,但保留Worker:

kill -USR1 YOUR-WORKER-PID
  • 1

暂停和继续执行Worker:

kill -USR2 YOUR-WORKER-PID

kill -CONT YOUR-WORKER-PID
  • 1
  • 2
  • 3

简单的说,任务就是传递给Worker要执行的内容。我们需要把Job依次添加到Queue来执行。

要把任务添加到队列,程序必须要包含php-resque库以及Redis。

使用require_once '/path/to/php-resque/lib/Resque.php';包含php-resque的库文件,它会自动连接到Redis服务器,如果你的Redis服务器不是默认的localhost:6379,你需要使用Resque::setBackent('192.168.1.56:3680');这样的格式来设置你的Redis服务器的地址,同样setBackent支持可选的第二个参数为使用的Redis数据库名,默认为0。

现在php-resque已经准备好了,使用以下代码添加一个任务到队列:

Resque::enqueue('default', 'Mail', array('dest@mail.com', 'hi!', 'this is a test content'));
  • 1
第一个参数,’default’是指队列的名字(即上文中QUEUE后的参数),示例中将会把任务推送到名为default的队列中
第二个参数是Job的类名,表示要执行哪个Job
第三个参数是要发送给Job的参数也可以使用关联数组的形式
  • 1
  • 2
  • 3
  • 4

传递给Job的参数(上面第三个参数)可以是普通数组、关联数组的形式,也可以是一个字符串,但使用数组可以很方便的传递更多的信息给Job。所有的参数在推送到队列前都会经过json_encode处理。

步骤三、Job类创建和使用:

1、编写一个Worker,创建job类。

如上面的例子中,第一个参数是队列的名字(还记得上一节里面启动php resque.php时传递的QUEUE环境变量吗?)第二个参数是Job的类名,即要执行的Job。Mail类就是一个Job类。

所有的Job类都应该包含一个perform()方法,使用Resque::enqueue()传递的第三个参数可以在perform()方法中使用$this->args来得到。

class PHP_Job
{
    public function perform()
    {
        sleep(120);
        fwrite(STDOUT, 'Hello!');
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在Resque的设计中,一个Job必须存在一个perform方法,Worker则会自动运行这个方法。
Job类也可以包含setUp()和tearDown()方法,可选的这两个方法分别会在perform()方法之前和之后运行。

class Mail{
    public function setUp(){
        # 这个方法会在perform()之前运行,可以用来做一些初始化工作
        # 如连接数据库、处理参数等
    }

    public function perform(){
        # 执行Job
    }

    public function tearDown(){
        # 会在perform()之后运行,可以用来做一些清理工作
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2、包含Job类,将job插入队列。
在实例化Job类之前,必须让Worker找到并包含这个类。有很多种方法可以做到。
(1)、使用include_path
当PHP运行于Apache model方式的时候可以使用.htaccess设置包含:

php_value include_path ".:/already/existing/path:/path/to/job-classes"
  • 1

(2)、通过php.ini

include_path = ".:/php/includes:/path/to/job-classes"
  • 1

(3)、使用APP_INCLUDE包含
上一节说了使用APP_INCLUDE指定Worker执行时要包含的PHP文件的路径,如:

QUEUE=default APP_INCLUDE=/path/to/loader.php php resque.php
  • 1

loader.php的内容可以是下面的那样:(其中包含了所有的job类)

include '/path/to/Mail.php';
include '/path/to/AnotherJobClass.php';
include '/path/to/somewhere/AnotherJobClass.php';
include '/JobClass.php';
  • 1
  • 2
  • 3
  • 4

当然也可以使用PHP的autoloader方法——sql_autoloader。

2、在你的项目中使用后台任务

以下面的代码为例,把耗时较多的工作交给后台任务来做。

class User{
    # functions(){}  // 其它函数

    public function updateLocation($location) {
        $db->updateUserTable($this->userId, 'location', $location);
        $this->recomputeNewFriends(); # 此操作耗时较长
    }

    public function recomputeNewFriends() {
        # 查找新的朋友
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

把以上代码改成:

class User {
    # functions(){}  // 其它函数

    public function updateLocation($location) {
        $db->updateUserTable($this->userId, 'location', $location);
        # 把任务添加到队列
        # 这里的队列名为 'queueName'
        # 任务名为 'FriendRecommendator'
        Resque::enqueue('queueName', 'FriendRecommendator', array('id' => $this->userId));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

以下是任务FriendRecommendator类的实现代码:

class FriendRecommendator {
    function perform() {
        # 这里没有User类,需要创建一个User类对象
        $user = new User($this->args['id']);
        # 查找新朋友的操作
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

简单的说,你只需要把你的执行任务的代码放到Job类中并改名为perform()即可,只要你愿意甚至可以将普通类改成Job类,但并不推荐这样做。

perform()方法有个缺点,即一个Job类只能包含一个perform()方法,也就是说一个Job类只能执行一种后台任务。

3、程序
此处可参阅:PHP的轻量消息队列php-resque使用说明

其中需要注意的几点:
Hack的方法Resque::enqueue()的第三个参数必须是一个数组,并且它的第一个元素是要执行的任务的方法名,并且这个元素会在执行时从$args数组中移除。

必须在每次修改Job类后重新启动你的Worker。

本文综合以下几篇文章:
PHP的轻量消息队列php-resque使用说明
后台任务和PHP-Resque的使用介绍 
Background jobs with php and resque

转自:http://blog.csdn.net/helencoder/article/details/51122424

给我留言

您必须 [ 登录 ] 才能发表留言!