import asyncio import uuid import ansible_runner import os import sys from typing import Any, Dict, List from fastapi import FastAPI, HTTPException, Request, Security, Depends from fastapi.responses import StreamingResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel, Field from ansible_runner import Runner from dataclasses import dataclass from yaml import safe_load from starlette.status import HTTP_403_FORBIDDEN bearer_auth = HTTPBearer() class User(BaseModel): token: str allowed_services: list[str] def load_users(filename) -> dict[str, User]: try: with open(filename, "r") as file: data = safe_load(file) if not isinstance(data, list): print(f"{filename} must be list of users", file=sys.stderr) sys.exit(1) return {user["token"]: User(**user) for user in data} except FileNotFoundError as e: print( f"File {filename} was not found, please make sure that you provide users file.", file=sys.stderr, ) sys.exit(1) except ValueError as e: print(e, file=sys.stderr) sys.exit(1) users = load_users("/etc/api-server/users.yaml") def get_user(bearer_auth: HTTPAuthorizationCredentials = Security(bearer_auth)) -> User: if bearer_auth and bearer_auth.credentials in users: return users[bearer_auth.credentials] raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail="Could not validate credentials", ) class Link(BaseModel): href: str class DeployArgs(BaseModel): service: str inventory: str | None = None vars: Dict[str, Any] = {} class DeploymentLinks(BaseModel): logs: Link self: Link @dataclass class Deployment: id: str args: DeployArgs runner: Runner | None = None class DeploymentDTO(BaseModel): id: str status: str links: DeploymentLinks = Field(..., alias="_links") def from_deployment(deployment: Deployment, request: Request): return DeploymentDTO( id=deployment.id, status=deployment.runner.status, _links={ "self": {"href": request.url_for("deployment", id=deployment.id)}, "logs": {"href": request.url_for("deployment_logs", id=deployment.id)}, }, ) deployments: Dict[str, Deployment] = {} def get_deployment(id: str) -> Deployment: if id not in deployments: raise HTTPException(status_code=404, detail="Deployment was not found") return deployments[id] app = FastAPI( docs_url="/", openapi_url="/swagger.json", description="API for services management", title="Server Management API", ) @app.get("/deployment", response_model=List[DeploymentDTO]) async def deployment_list(request: Request, user: User = Depends(get_user)): return [ DeploymentDTO.from_deployment(deployment, request) for deployment in deployments.values() if deployment.args.service in user.allowed_services ] @app.get("/deployment/{id}/logs") async def deployment_logs( deployment: Deployment = Depends(get_deployment), user: User = Depends(get_user) ): if deployment.args.service not in user.allowed_services: raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail=f"This token does not allow to access {deployment.args.service} deployments.", ) runner = deployment.runner async def stream(): stdout = runner.stdout while True: while line := stdout.readline(): if line == "": break yield line if runner.status != "running": break await asyncio.sleep(0.1) return StreamingResponse(stream(), media_type="text/plain") @app.get("/deployment/{id}", response_model=DeploymentDTO) async def deployment( request: Request, deployment: Deployment = Depends(get_deployment), user: User = Depends(get_user), ): if deployment.args.service not in user.allowed_services: raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail=f"This token does not allow to access {deployment.args.service} deployments.", ) return DeploymentDTO.from_deployment(deployment, request) @app.post("/deployment", response_model=DeploymentDTO) async def deploy(args: DeployArgs, request: Request, user: User = Depends(get_user)): if args.service not in user.allowed_services: raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail=f"This token does not allow to deploy {args.service} service.", ) ident = str(uuid.uuid4()) _, runner = ansible_runner.run_async( playbook="deploy.yaml", ident=ident, extravars={"services": [args.service], **args.vars}, private_data_dir="/var/run/ansible", project_dir=os.environ.get("API_PROJECT_DIR", "/var/project"), inventory=args.inventory or os.environ.get("API_INVENTORY"), settings={"suppress_ansible_output": True}, ) deployment = Deployment(id=ident, runner=runner, args=args) deployments[ident] = deployment return DeploymentDTO.from_deployment(deployment, request)