どすえのブログ

京都在住プログラマーの開発ブログ。バイクとキャンプが趣味。

threadingとQueueを使ったDjangoでのマルチスレッド処理

Djangoで、リクエストが来たら発火する処理が時間のかかるものだった場合、処理自体は裏で走らせておいて、とりあえずレスポンスを返しておきたいというケースがあると思います。

例えば、ユーザー登録が完了した際に確認メールを送信するといった作業です。メール送信に1秒かかるアプリケーションに、秒間1000アクセスきた場合

  • メール送信のネットワークI/Oの間にアプリケーションがロックされない。
  • メール送信のジョブを裏で走らせて、アクセスがあった順序で実行していく。

ということが求められるでしょう。

Djangoでメール送信機能を作る機会があったので、メモ。

基本動作の確認

まずはqueueのリファレンスにあるサンプルコードを少し改変してqueueとthreadingを使った処理を確認。

import threading
import queue
import time

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        time.sleep(1)
        print(f'Finished {item}')
        q.task_done()

# turn-on the worker thread
threading.Thread(target=worker, daemon=True).start()

# send thirty task requests to the worker
for item in range(5):
    print("Put item", item)
    q.put(item)
print('All task requests sent\n', end='')

# block until all tasks are done
q.join()
print('All work completed')

最初にQueueインスタンスとしてqグローバル変数で宣言します。このキューに実行すべきジョブが溜まります。次にジョブの実行主体であるworkerメソッドを定義します。workerではジョブを取得してprintし、1秒待つという処理を行います。while Trueで常に待機状態にありますが、q.get()が返り値を戻した時だけその下の処理が実行されます。 次にthreading.Thread()で新規スレッドを立ち上げます。targetに実行したいメソッドを渡します。 あとはキューにジョブとしてitemを追加するとworkerが走ります。q.join()qからgetされた全てのジョブに対して完了宣言q.task_done()が行われるまでそれ以降の処理を止める役割をします。

出力

Put item 0
Put item 1
Put item 2
Put item 3
Put item 4
All task requests sent
Working on 0
Finished 0
Working on 1
Finished 1
Working on 2
Finished 2
Working on 3
Finished 3
Working on 4
Finished 4
All work completed

出力を見ると最初にジョブの追加が全て完了し、追加した順にジョブが実行されており、非同期的にジョブ管理・実行が行われていることがわかります。この容量でメール送信も実装できそうです。試してみます。

Djangoでのタスクの非同期処理

まず、テスト用のDjango APIを作成します。

$ django-admin startproject mysite
$ cd mysite/
$ python manage.py startapp api

そして以下のようにコードを書き加えてください。

mysite/settings.py

(中略)
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'api',
    'rest_framework'
]
(中略)

mysite/urls.py

from django.contrib import admin
from django.urls import path
from api import views

urlpatterns = [
    path('admin/', admin.site.urls),
    path('index/', views.index)
]

api/views.py

from django.http import HttpResponse

# Create your views here.
def index(request):
    return HttpResponse("Hello, world.")

そしたら

$ python manage.py runserver

でサーバーを立ち上げ、http://127.0.0.1:8000/index/にアクセスしHello, world.が表示されれば準備OKです。

それでは非同期メール機能の作成に移ります。

api/mail.pyを新規作成してください。中身は次のように書いてください。

import threading
import queue
import time

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        time.sleep(3)  # ここで任意の処理を行う
        print(f'Finished {item}')
        q.task_done()

# 処理を並列化するためにスレッドを5つ立てている
for _ in range(5): 
    threading.Thread(target=worker, daemon=True).start()

def add(item):
    q.put(item)

次にviews.pyを以下のように修正します。

from django.http import HttpResponse
from api.mail import add as mail_add

# Create your views here.
def index(request):
    item = request.GET.get("param")
    mail_add(item)
    return HttpResponse("Hello, world.")

この状態でサーバーを再起動し、次のようにリクエストを連続で送信してみます。

curl http://127.0.0.1:8000/index/?param=1
curl http://127.0.0.1:8000/index/?param=2
curl http://127.0.0.1:8000/index/?param=3
curl http://127.0.0.1:8000/index/?param=4
curl http://127.0.0.1:8000/index/?param=5

出力

Working on 1
[15/May/2021 12:12:28] "GET /index/?param=1 HTTP/1.1" 200 13
Working on 2
[15/May/2021 12:12:28] "GET /index/?param=2 HTTP/1.1" 200 13
Working on 3
[15/May/2021 12:12:28] "GET /index/?param=3 HTTP/1.1" 200 13
Working on 4
[15/May/2021 12:12:28] "GET /index/?param=4 HTTP/1.1" 200 13
Working on 5
[15/May/2021 12:12:28] "GET /index/?param=5 HTTP/1.1" 200 13
Finished 1
Finished 2
Finished 3
Finished 4
Finished 5

Finishedがほぼ同時に出力されると思います。time.sleep(3)の部分をメール処理に書き換えれば非同期なメール送信機能の完成です。