Django と RQ で非同期処理を実装する

シェアする

DjangoRQ で非同期処理を実装します。

以前、Django と Celery で非同期処理を実装するという記事を書きました。その後、たまたま RQ という Python パッケージを発見したのですが、Celery よりも手軽そうなので、ちょっと試してみたいと思います。

実装イメージ

環境とバージョン

  • CentOS 7.5.1804
  • Python 3.6.4
  • Django 2.0.3
  • Django-RQ 1.2.0
  • Redis 3.2.10

インストール

Django と RQ の環境を構築していきたいと思います。

Python

$ sudo yum install https://centos7.iuscommunity.org/ius-release.rpm
$ sudo yum install python36u python36u-libs python36u-devel python36u-pip

まず Python のインストールです。Python の 3.6 を使います。いつも書いていることなのですが、私はいつも IUS リポジトリの Python パッケージを使用しています。

Django

$ sudo pip install django

次に Django のインストールです。pip でインストールします。

Django-RQ

$ sudo pip intall django-rq

次は、RQ の Django 版 Django-RQ をインストールします。こちらも pip でインストールします。

Redis

$ sudo yum install epel-release
$ sudo yum install redis

最後に Redis をインストールします。こちらは、EPEL リポジトリからインストールします。

実装

実装に入りたいと思います。

startproject, startapp

$ django-admin startproject rq_test
$ cd rq_test/
$ python3.6 manage.py startapp app

まず最初は、Django のコマンドで、プロジェクトの雛形を作成します。

settings.py

...
ALLOWED_HOSTS = ['*']
INSTALLED_APPS = [
    ...
    'django_rq',
    'app.apps.AppConfig',
]
RQ_QUEUES = {
    'default': {
        'HOST': 'localhost',
        'PORT': 6379,
        'DB': 0,
        'PASSWORD': '',
        'DEFAULT_TIMEOUT': 360,
    },
}
RQ_API_TOKEN = 'hoge'
...

startproject で settings.py が生成されるので、それを編集していきます。

Django-RQ 関連で、INSTALLED_APPS に ‘django_rq’ を追加し、RQ_QUEUES と RQ_API_TOKEN を設定します。

views.py

import requests
from django.shortcuts import render
from django_rq import job

from .models import WorkerResults

@job
def count_words_at_url(url):
    resp = requests.get(url)
    result = len(resp.text.split())
    WorkerResults.objects.create(url=url, result=result)

def index(request):
    url = 'http://nvie.com'
    count_words_at_url.delay(url)
    object_list = WorkerResults.objects.all().order_by('-pk')
    return render(request, 'app/index.html', {'object_list': object_list})

views.py です。RQ オフィシャルのサンプルを少し改造しました。

指定した url の文字数をカウントするクローラーを Queue に入れて、非同期で結果を DB に格納し、それを表示します。

シンタックスハイライトの都合で、表示できていませんが、7行目に job というデコレータが入っています。

models.py

from django.db import models

class WorkerResults(models.Model):
    result = models.CharField(max_length=255, default=None)
    url = models.CharField(max_length=255, default=None)

非同期の結果を格納する用 models.py です。

urls.py

from django.urls import path, include
from app.views import index

urlpatterns = [
    path('', index),
    path('django-rq/', include('django_rq.urls')),
]

urls.py で django-rq を include していますが、後述する django-rq モニタリングに使用します。

index.html

<html>
  <head>
    <title>Django と RQ で非同期処理を実装する</title>
  </head>
  <body>
    <h1>Django と RQ で非同期処理を実装する</h1>
    <table>
      <thead>
        <tr>
          <th>#</th>
          <th>url</th>
          <th>result</th>
        </tr>
      </thead>
      <tbody>
        {% for obj in object_list %}
        <tr>
          <td>{{ forloop.counter }}</td>
          <td>{{ obj.url }}</td>
          <td>{{ obj.result }}</td>
        </tr>
        {% endfor %}
      </tbody>
    </table>
  </body>
</html>

動作確認

準備ができましたので、動作確認していきます。

makemigrations, migrate

$ python3.6 manage.py makemigrations app
Migrations for 'app':
  app/migrations/0001_initial.py
    - Create model WorkerResults
$ python3.6 manage.py migrate
Operations to perform:
  Apply all migrations: admin, app, auth, contenttypes, sessions
Running migrations:
  Applying app.0001_initial... OK

まず、非同期処理の結果を DB に保存して、ページに表示しますので、DB マイグレーションします。今回は検証が目的なので、sqlite を使用しています。

runserver

$ python3.6 manage.py runserver 0.0.0.0:8000
Performing system checks...

System check identified no issues (0 silenced).
September 05, 2018 - 12:03:25
Django version 2.0.3, using settings 'rq_test.settings'
Starting development server at http://0.0.0.0:8000/
Quit the server with CONTROL-C.

DB マイグレーションが終わったら、デバッグサーバの runserver を起動します。

rqworker

$ python3.6 manage.py rqworker
12:02:29 Registering birth of worker localhost.9647
12:02:29 RQ worker 'rq:worker:localhost.9647' started, version 0.12.0
12:02:29 *** Listening on default...
12:02:29 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
12:02:29 Cleaning registries for queue: default
12:02:29 *** Listening on default...
12:02:29 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.

次に、runserver とは別のコンソールで RQ の Worker を起動します。

これで準備が整いました。

ブラウザから http://IPアドレス:8000 にアクセスすると、非同期処理の動作確認ができると思います。

Queue status

Django-RQ は、Queue モニタリング機能がついており、http://IPアドレス:8000/django-rq/stats.json/hoge/ にアクセスすると上記画像のように Queue のステータスを確認することができます。見方はまだよく分かっていないのですが、便利ですね。

URL 最後の ‘hoge’ の部分は、settings.py に設定した RQ_API_TOKEN です。

まとめ

Django と RQ で非同期処理を実装しました。

後発 なのでやはり Celery よりは実装しやすい感じがしました。

今の所は好感触ですが、実際に運用となると、Worker を常駐させなければいけません。その辺りがまだ分かっていないので、今後の課題としたいと思います。

シェアする

フォローする