threadingとQueueを使ったDjangoでのマルチスレッド処理
Djangoで、リクエストが来たら発火する処理が時間のかかるものだった場合、処理自体は裏で走らせておいて、とりあえずレスポンスを返しておきたいというケースがあると思います。
例えば、ユーザー登録が完了した際に確認メールを送信するといった作業です。メール送信に1秒かかるアプリケーションに、秒間1000アクセスきた場合
- メール送信のネットワークI/Oの間にアプリケーションがロックされない。
- メール送信のジョブを裏で走らせて、アクセスがあった順序で実行していく。
ということが求められるでしょう。
Djangoでメール送信機能を作る機会があったので、メモ。
基本動作の確認
まずはqueueのリファレンスにあるサンプルコードを少し改変してqueueとthreadingを使った処理を確認。
import threading import queue import time q = queue.Queue() def worker(): while True: item = q.get() print(f'Working on {item}') time.sleep(1) print(f'Finished {item}') q.task_done() # turn-on the worker thread threading.Thread(target=worker, daemon=True).start() # send thirty task requests to the worker for item in range(5): print("Put item", item) q.put(item) print('All task requests sent\n', end='') # block until all tasks are done q.join() print('All work completed')
最初にQueue
インスタンスとしてq
をグローバル変数で宣言します。このキューに実行すべきジョブが溜まります。次にジョブの実行主体であるworker
メソッドを定義します。worker
ではジョブを取得してprintし、1秒待つという処理を行います。while True
で常に待機状態にありますが、q.get()
が返り値を戻した時だけその下の処理が実行されます。
次にthreading.Thread()
で新規スレッドを立ち上げます。target
に実行したいメソッドを渡します。
あとはキューにジョブとしてitem
を追加するとworkerが走ります。q.join()
はq
からgetされた全てのジョブに対して完了宣言q.task_done()
が行われるまでそれ以降の処理を止める役割をします。
出力
Put item 0 Put item 1 Put item 2 Put item 3 Put item 4 All task requests sent Working on 0 Finished 0 Working on 1 Finished 1 Working on 2 Finished 2 Working on 3 Finished 3 Working on 4 Finished 4 All work completed
出力を見ると最初にジョブの追加が全て完了し、追加した順にジョブが実行されており、非同期的にジョブ管理・実行が行われていることがわかります。この要領でメール送信も実装できそうです。試してみます。
Djangoでのタスクの非同期処理
$ django-admin startproject mysite $ cd mysite/ $ python manage.py startapp api
そして以下のようにコードを書き加えてください。
mysite/settings.py
(中略) INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'api', 'rest_framework' ] (中略)
mysite/urls.py
from django.contrib import admin from django.urls import path from api import views urlpatterns = [ path('admin/', admin.site.urls), path('index/', views.index) ]
api/views.py
from django.http import HttpResponse # Create your views here. def index(request): return HttpResponse("Hello, world.")
そしたら
$ python manage.py runserver
でサーバーを立ち上げ、http://127.0.0.1:8000/index/
にアクセスしHello, world.
が表示されれば準備OKです。
それでは非同期メール機能の作成に移ります。
api/mail.py
を新規作成してください。中身は次のように書いてください。
import threading import queue import time q = queue.Queue() def worker(): while True: item = q.get() print(f'Working on {item}') time.sleep(3) # ここで任意の処理を行う print(f'Finished {item}') q.task_done() # 処理を並列化するためにスレッドを5つ立てている for _ in range(5): threading.Thread(target=worker, daemon=True).start() def add(item): q.put(item)
次にviews.py
を以下のように修正します。
from django.http import HttpResponse from api.mail import add as mail_add # Create your views here. def index(request): item = request.GET.get("param") mail_add(item) return HttpResponse("Hello, world.")
この状態でサーバーを再起動し、次のようにリクエストを連続で送信してみます。
curl http://127.0.0.1:8000/index/?param=1 curl http://127.0.0.1:8000/index/?param=2 curl http://127.0.0.1:8000/index/?param=3 curl http://127.0.0.1:8000/index/?param=4 curl http://127.0.0.1:8000/index/?param=5
出力
Working on 1 [15/May/2021 12:12:28] "GET /index/?param=1 HTTP/1.1" 200 13 Working on 2 [15/May/2021 12:12:28] "GET /index/?param=2 HTTP/1.1" 200 13 Working on 3 [15/May/2021 12:12:28] "GET /index/?param=3 HTTP/1.1" 200 13 Working on 4 [15/May/2021 12:12:28] "GET /index/?param=4 HTTP/1.1" 200 13 Working on 5 [15/May/2021 12:12:28] "GET /index/?param=5 HTTP/1.1" 200 13 Finished 1 Finished 2 Finished 3 Finished 4 Finished 5
Finishedがほぼ同時に出力されると思います。time.sleep(3)
の部分をメール処理に書き換えれば非同期なメール送信機能の完成です。