import asyncio from os import system from time import sleep from typing import Any, Dict from fastapi import FastAPI from fastapi.responses import StreamingResponse from pydantic import BaseModel import ansible_runner class DeployArgs(BaseModel): extra_vars: Dict[str, Any] app = FastAPI() @app.post("/deploy/{service}") async def deploy(service: str, args: DeployArgs): finished = False lines = [] def finish_callback(_): nonlocal finished finished = True def event_callback(data: Dict): if 'stdout' in data: lines.append(data['stdout']) async def logs(): nonlocal lines while not finished: for line in lines: yield line.rstrip() + "\n" lines = [] await asyncio.sleep(0.1) ansible_runner.run_async( playbook='deploy.yaml', extravars={ 'services': [service], **args.extra_vars }, private_data_dir='/home/api-server', project_dir='/var/project', inventory='inventory/m2.ini', event_handler=event_callback, finished_callback=finish_callback, settings={ 'suppress_ansible_output': True } ) return StreamingResponse(logs(), media_type='text/plain')