FastAPIでバックグラウンド処理

概要

fastapi で重い処理を走らせるときの処理パターン

実装

成功するときの系だけで失敗したときのときの処理を書いていないので注意

流れとしては タスク作成 → タスクの進捗確認 → タスクの結果取得

タスク

時間のかかるタスクとして10秒間sleepするタスクを実装

タスクの結果は単純に result data という文字列を返す

class Task:
    def __init__(self):
        self.id = str(uuid.uuid4())
        self.progress = 0
        self.res = None

    def __call__(self):
        for _ in range(10):
            time.sleep(1)
            self.progress += 10
        self.res = 'result data'


TASKS = {}

タスク作成

重いタスクを作成するAPI

作成したタスクのIDを返す

@app.post('/task', status_code=202)
async def create_task(background_tasks: BackgroundTasks):
    task = Task()
    TASKS[task.id] = task
    background_tasks.add_task(task)
    return {'task_id': task.id}

タスク進捗取得

タスクの進捗度合いを取得するAPI

状態と進捗度合いとタスク結果のURIを返す

@app.get('/task/{task_id}')
async def read_task(task_id: str):
    if task_id not in TASKS:
        return {'message': 'nope'}

    task = TASKS[task_id]

    if task.res is None:
        return {
            'status': 'IN_PROGRESS',
            'progress': task.progress,
            'uri': None
        }
    else:
        return {
            'status': 'SUCCEEDED',
            'progress': 100,
            'uri': f'/task/result/{task.id}'
        }

タスク結果取得

タスクの結果を取得するAPI

@app.get('/task/result/{task_id}')
async def read_result(task_id: str):
    if task_id in TASKS:
        task = TASKS[task_id]
        return {'result': task.res}
    else:
        return {'result': 'nothing'}

リポジトリ

github.com

参考資料