1. 介绍

queue_classic是一个ruby的gem,用来实现PostgreSQL的消息队列。它基于PostgreSQL的listen/notify,有很多接口,使用起来比较简单。

如果你有使用过sidekiqresquedelayed_job,就会发现基本每个这种消息队列的gem的使用方法都是类似的,里面的概念也是差不多,也就是说,学会了queue_classic就等于会其中三种,只要掌握思想就好了。

2. 安装

假如我们已经有一个rails项目了。

在Gemfile添加下面这行。

gem "queue_classic", "~> 3.0.0"

执行bundle install

正如我们上面所说的,任务是需要存储的。所以要创建相应的表来存。

# 创建queue_classic_jobs表
rails generate queue_classic:install
# username替换为你自己的数据库的用户名,password是数据库的密码,rails365_dev是数据库名
export QC_DATABASE_URL="postgres://username:password@localhost/rails365_dev" 
bundle exec rake db:migrate

3. 测试

我们先在rails console里测试。

前面说过,消息队列是跑在一个进程里的。所以要启动那个进程。

bundle exec rake qc:work

你会发现delayed_job,sidekiq也是差不多的启动方法。

还可以指定队列来启动。

QUEUES="priority_queue,secondary_queue" bundle exec rake qc:work

启动好后,我们进入rails console中,执行下面这行语句。

➜  rails365 git:(master) ✗ rails c
Loading development environment (Rails 4.2.3)
2.2.2 :001 > QC.enqueue("Kernel.puts", "hello world")
nil

你会在进程里看到类似这样的输出,就说明成功了。

➜  rails365 git:(master) ✗ bundle exec rake qc:work 
hello world

QC.enqueue就是后面的命令加上参数作为任务push到队列中。

上面只是个最简单的例子,还有可以在指定时间,指定队列来执行,具体更为详细的命令要看官方的readme文档。

如果需要调试或查看日志,可以开启调试功能,有两种不同的日志,分别是:

export QC_MEASURE="true"

# or

export DEBUG="true"

在运行rake qc:work之前运行,具体的效果,尝试下就知道的。

4. 在rails中使用

本站为例,是一个放博客的网站,文章是存放在articles这张表,当时为了查看哪篇文章最受欢迎,就在articles存了一个字段叫visit_count,但每次用户查看文章时,就会往这个字段加1。这个动作是用rails的ActiveSupport::Notifications配合sidekiq的消息队列来做的,现在要改成用queue_classic来做。

我们来看下相关的代码。

# app/workers/update_article_visit_count_worker.rb
class UpdateArticleVisitCountWorker
  include Sidekiq::Worker
  def perform(article_id)
    logger.info 'update article visit count begin'
    @article = Article.find(article_id)
    @article.visit_count += 1
    @article.save!(validate: false)
    logger.info 'update article visit count end'
  end
end

在哪里调用呢,我们是结合ActiveSupport::Notifications来做的,这个先不管,你也可以在articles_controller的show action直接调用。

# config/initializers/notification.rb
ActiveSupport::Notifications.subscribe "process_action.action_controller" do |name, started, finished, unique_id, payload|
  Rails.logger.info payload
  if payload[:controller] == "ArticlesController" && payload[:action] == "show"
    UpdateArticleVisitCountWorker.perform_async(payload[:params]["id"]) if payload[:params]["id"].present?
  end
end

上文提过,queue_classic主要是利用QC.enqueue这条命令把任务push到队列中的。只需要把这行UpdateArticleVisitCountWorker.perform_async(payload[:params]["id"]) if payload[:params]["id"].present?改成我们需要的就可以了。

把增加visit_count的值的逻辑移动model中去,然后在ActiveSupport::Notifications.subscribeQC.enqueue中调用就好了。

改造之后是这样的。

# app/models/article.rb
class Article < ActiveRecord::Base
  def self.update_article_visit_count(article_id)
    article = Article.find(article_id)
    article.visit_count += 1
    article.save!(validate: false)
  end
end
# config/initializers/notification.rb
ActiveSupport::Notifications.subscribe "process_action.action_controller" do |name, started, finished, unique_id, payload|
  Rails.logger.info payload
  if payload[:controller] == "ArticlesController" && payload[:action] == "show"
    QC.enqueue "Article.update_article_visit_count",payload[:params]["id"] if payload[:params]["id"].present?
  end
end

现在需要重启下rails serverbundle exec rake qc:work

重启rails server运行好export QC_DATABASE_URL="postgres://username:password@localhost/rails365_dev"这个命令。

为了让rake qc:work更能明显地看到日志信息,在运行rake qc:work前先执行export QC_MEASURE="true"

现在可以去页面上测试的。

5. 注意事项

第一点是关于环境变量,也就是QC_DATABASE_URLQC_MEASUREDEBUG这三个,当部署到线上环境时,就要把这三个变量写进shell的配置文件,如比ubuntu系统,就写进~/.bashrc_profile就好了。

第二点是关于错误的任务,任务也是有可能会报错的,但是我们不知道哪个任务报错了,所以很不方便,其实官方提供了接口的,你自己可以捕获那个错误信息,捕获后就可以进行自己想要的处理了。其实错误的任务都会一直存在表queue_classic_jobs中,这样查看就好,至于那个接口,就是worker.rb中的handle_failure方法,官方readme文档也有示例,这里不再深究。

完结。


书籍推荐