import asyncio import uuid from wsgiref.util import request_uri import ansible_runner import os from typing import Any, Dict from fastapi import FastAPI, HTTPException, Request from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from ansible_runner import Runner from dataclasses import dataclass class Link(BaseModel): href: str class DeployArgs(BaseModel): service: str inventory: str | None = None vars: Dict[str, Any] = {} class DeploymentLinks(BaseModel): logs: Link self: Link @dataclass class Deployment: id: str runner: Runner | None = None class DeploymentDTO(BaseModel): id: str status: str links: DeploymentLinks = Field(..., alias="_links") def from_deployment(deployment: Deployment, request: Request): return DeploymentDTO( id=deployment.id, status=deployment.runner.status, _links={ "self": {"href": request.url_for("deployment", id=deployment.id)}, "logs": {"href": request.url_for("deployment_logs", id=deployment.id)}, }, ) deployments: Dict[str, Deployment] = {} app = FastAPI() @app.get("/deployment/{id}/logs") async def deployment_logs(id: str): if id not in deployments: raise HTTPException(status_code=404, detail="Deployment was not found") runner = deployments.get(id).runner async def stream(): stdout = runner.stdout while True: while line := stdout.readline(): if line == "": break yield line if runner.status != "running": break await asyncio.sleep(0.1) return StreamingResponse(stream(), media_type="text/plain") @app.get("/deployment") async def deployment_list(request: Request): return [ DeploymentDTO.from_deployment(deployment, request) for deployment in deployments.values() ] @app.get("/deployment/{id}") async def deployment(id: str, request: Request): return DeploymentDTO.from_deployment(deployments[id], request) @app.post("/deployment") async def deploy(args: DeployArgs, request: Request): ident = str(uuid.uuid4()) _, runner = ansible_runner.run_async( playbook="deploy.yaml", ident=ident, extravars={"services": [args.service], **args.vars}, private_data_dir="/home/api-server", project_dir=os.environ.get("API_PROJECT_DIR", "/var/project"), inventory=args.inventory or os.environ.get("API_INVENTORY"), settings={"suppress_ansible_output": True}, ) deployment = Deployment(ident, runner) deployments[ident] = deployment return DeploymentDTO.from_deployment(deployment, request)