servers/api/api.py
2022-12-11 18:41:07 +01:00

192 lines
5.2 KiB
Python

import asyncio
import uuid
import ansible_runner
import os
import sys
from typing import Any, Dict, List
from fastapi import FastAPI, HTTPException, Request, Security, Depends
from fastapi.responses import StreamingResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field
from ansible_runner import Runner
from dataclasses import dataclass
from yaml import safe_load
from starlette.status import HTTP_403_FORBIDDEN
bearer_auth = HTTPBearer()
class User(BaseModel):
token: str
allowed_services: list[str]
def load_users(filename) -> dict[str, User]:
try:
with open(filename, "r") as file:
data = safe_load(file)
if not isinstance(data, list):
print(f"{filename} must be list of users", file=sys.stderr)
sys.exit(1)
return {user["token"]: User(**user) for user in data}
except FileNotFoundError as e:
print(
f"File {filename} was not found, please make sure that you provide users file.",
file=sys.stderr,
)
sys.exit(1)
except ValueError as e:
print(e, file=sys.stderr)
sys.exit(1)
users = load_users("/etc/api-server/users.yaml")
def get_user(bearer_auth: HTTPAuthorizationCredentials = Security(bearer_auth)) -> User:
if bearer_auth and bearer_auth.credentials in users:
return users[bearer_auth.credentials]
raise HTTPException(
status_code=HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
class Link(BaseModel):
href: str
class DeployArgs(BaseModel):
service: str
inventory: str | None = None
vars: Dict[str, Any] = {}
class DeploymentLinks(BaseModel):
logs: Link
self: Link
@dataclass
class Deployment:
id: str
args: DeployArgs
runner: Runner | None = None
class DeploymentDTO(BaseModel):
id: str
status: str
links: DeploymentLinks = Field(..., alias="_links")
def from_deployment(deployment: Deployment, request: Request):
return DeploymentDTO(
id=deployment.id,
status=deployment.runner.status,
_links={
"self": {"href": request.url_for("deployment", id=deployment.id)},
"logs": {"href": request.url_for("deployment_logs", id=deployment.id)},
},
)
deployments: Dict[str, Deployment] = {}
def get_deployment(id: str) -> Deployment:
if id not in deployments:
raise HTTPException(status_code=404, detail="Deployment was not found")
return deployments[id]
app = FastAPI(
docs_url="/",
openapi_url="/swagger.json",
description="API for services management",
title="Server Management API",
)
@app.get("/deployment", response_model=List[DeploymentDTO])
async def deployment_list(request: Request, user: User = Depends(get_user)):
return [
DeploymentDTO.from_deployment(deployment, request)
for deployment in deployments.values()
if deployment.args.service in user.allowed_services
]
@app.get("/deployment/{id}/logs")
async def deployment_logs(
deployment: Deployment = Depends(get_deployment), user: User = Depends(get_user)
):
if deployment.args.service not in user.allowed_services:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN,
detail=f"This token does not allow to access {deployment.args.service} deployments.",
)
runner = deployment.runner
async def stream():
stdout = runner.stdout
while True:
while line := stdout.readline():
if line == "":
break
yield line
if runner.status != "running":
break
await asyncio.sleep(0.1)
return StreamingResponse(stream(), media_type="text/plain")
@app.get("/deployment/{id}", response_model=DeploymentDTO)
async def deployment(
request: Request,
deployment: Deployment = Depends(get_deployment),
user: User = Depends(get_user),
):
if deployment.args.service not in user.allowed_services:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN,
detail=f"This token does not allow to access {deployment.args.service} deployments.",
)
return DeploymentDTO.from_deployment(deployment, request)
@app.post("/deployment", response_model=DeploymentDTO)
async def deploy(args: DeployArgs, request: Request, user: User = Depends(get_user)):
if args.service not in user.allowed_services:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN,
detail=f"This token does not allow to deploy {args.service} service.",
)
ident = str(uuid.uuid4())
_, runner = ansible_runner.run_async(
playbook="deploy.yaml",
ident=ident,
extravars={"services": [args.service], **args.vars},
private_data_dir="/var/run/ansible",
project_dir=os.environ.get("API_PROJECT_DIR", "/var/project"),
inventory=args.inventory or os.environ.get("API_INVENTORY"),
settings={"suppress_ansible_output": True},
)
deployment = Deployment(id=ident, runner=runner, args=args)
deployments[ident] = deployment
return DeploymentDTO.from_deployment(deployment, request)