queue_classic是一个ruby的gem,用来实现PostgreSQL的消息队列。它基于PostgreSQL的listen/notify,有很多接口,使用起来比较简单。
如果你有使用过sidekiq,resque,delayed_job,就会发现基本每个这种消息队列的gem的使用方法都是类似的,里面的概念也是差不多,也就是说,学会了queue_classic就等于会其中三种,只要掌握思想就好了。
假如我们已经有一个rails项目了。
在Gemfile添加下面这行。
gem "queue_classic", "~> 3.0.0"
执行bundle install
正如我们上面所说的,任务是需要存储的。所以要创建相应的表来存。
# 创建queue_classic_jobs表
rails generate queue_classic:install
# username替换为你自己的数据库的用户名,password是数据库的密码,rails365_dev是数据库名
export QC_DATABASE_URL="postgres://username:password@localhost/rails365_dev"
bundle exec rake db:migrate
我们先在rails console
里测试。
前面说过,消息队列是跑在一个进程里的。所以要启动那个进程。
bundle exec rake qc:work
你会发现delayed_job,sidekiq也是差不多的启动方法。
还可以指定队列来启动。
QUEUES="priority_queue,secondary_queue" bundle exec rake qc:work
启动好后,我们进入rails console
中,执行下面这行语句。
➜ rails365 git:(master) ✗ rails c
Loading development environment (Rails 4.2.3)
2.2.2 :001 > QC.enqueue("Kernel.puts", "hello world")
nil
你会在进程里看到类似这样的输出,就说明成功了。
➜ rails365 git:(master) ✗ bundle exec rake qc:work
hello world
QC.enqueue就是后面的命令加上参数作为任务push到队列中。
上面只是个最简单的例子,还有可以在指定时间,指定队列来执行,具体更为详细的命令要看官方的readme文档。
如果需要调试或查看日志,可以开启调试功能,有两种不同的日志,分别是:
export QC_MEASURE="true"
# or
export DEBUG="true"
在运行rake qc:work
之前运行,具体的效果,尝试下就知道的。
以本站为例,是一个放博客的网站,文章是存放在articles这张表,当时为了查看哪篇文章最受欢迎,就在articles存了一个字段叫visit_count,但每次用户查看文章时,就会往这个字段加1。这个动作是用rails的ActiveSupport::Notifications配合sidekiq的消息队列来做的,现在要改成用queue_classic来做。
我们来看下相关的代码。
# app/workers/update_article_visit_count_worker.rb
class UpdateArticleVisitCountWorker
include Sidekiq::Worker
def perform(article_id)
logger.info 'update article visit count begin'
@article = Article.find(article_id)
@article.visit_count += 1
@article.save!(validate: false)
logger.info 'update article visit count end'
end
end
在哪里调用呢,我们是结合ActiveSupport::Notifications来做的,这个先不管,你也可以在articles_controller的show action直接调用。
# config/initializers/notification.rb
ActiveSupport::Notifications.subscribe "process_action.action_controller" do |name, started, finished, unique_id, payload|
Rails.logger.info payload
if payload[:controller] == "ArticlesController" && payload[:action] == "show"
UpdateArticleVisitCountWorker.perform_async(payload[:params]["id"]) if payload[:params]["id"].present?
end
end
上文提过,queue_classic主要是利用QC.enqueue
这条命令把任务push到队列中的。只需要把这行UpdateArticleVisitCountWorker.perform_async(payload[:params]["id"]) if payload[:params]["id"].present?
改成我们需要的就可以了。
把增加visit_count的值的逻辑移动model中去,然后在ActiveSupport::Notifications.subscribe
用QC.enqueue
中调用就好了。
改造之后是这样的。
# app/models/article.rb
class Article < ActiveRecord::Base
def self.update_article_visit_count(article_id)
article = Article.find(article_id)
article.visit_count += 1
article.save!(validate: false)
end
end
# config/initializers/notification.rb
ActiveSupport::Notifications.subscribe "process_action.action_controller" do |name, started, finished, unique_id, payload|
Rails.logger.info payload
if payload[:controller] == "ArticlesController" && payload[:action] == "show"
QC.enqueue "Article.update_article_visit_count",payload[:params]["id"] if payload[:params]["id"].present?
end
end
现在需要重启下rails server
和bundle exec rake qc:work
。
重启rails server
运行好export QC_DATABASE_URL="postgres://username:password@localhost/rails365_dev"
这个命令。
为了让rake qc:work
更能明显地看到日志信息,在运行rake qc:work
前先执行export QC_MEASURE="true"
。
现在可以去页面上测试的。
第一点是关于环境变量,也就是QC_DATABASE_URL
、QC_MEASURE
、DEBUG
这三个,当部署到线上环境时,就要把这三个变量写进shell的配置文件,如比ubuntu系统,就写进~/.bashrc_profile就好了。
第二点是关于错误的任务,任务也是有可能会报错的,但是我们不知道哪个任务报错了,所以很不方便,其实官方提供了接口的,你自己可以捕获那个错误信息,捕获后就可以进行自己想要的处理了。其实错误的任务都会一直存在表queue_classic_jobs中,这样查看就好,至于那个接口,就是worker.rb中的handle_failure方法,官方readme文档也有示例,这里不再深究。
完结。