# coding:utf-8 import json import os import time from fastapi import APIRouter, File, BackgroundTasks, UploadFile import schemas from core.config import settings from utils import * router = APIRouter() @router.get("/test") async def test(): 1 / 0 @router.post("/upload_file") async def upload_file( file_type: str, file: UploadFile = File(...) ) -> schemas.Msg: if file_type == 'mother_bag': file_path = os.path.join(settings.ROOT_DIR, f'ApkTool/{file.filename}') elif file_type == 'keystore': file_path = os.path.join(settings.ROOT_DIR, f'ApkTool/keystore/{file.filename}') else: resp = schemas.Msg( code=-1, msg='文件类型错误', data=file_type ) return resp with open(file_path, 'wb') as f: for i in iter(lambda: file.file.read(1024 * 1024 * 10), b''): f.write(i) resp = schemas.Msg( code=0, msg='ok', data=file_path ) return resp @router.post("/delete_file") async def delete_file( data_in: schemas.DeleteFile, ) -> schemas.Msg: file_path = data_in.file_path if 'ApkTool' in file_path and os.path.exists(path=file_path): os.remove(file_path) resp = schemas.Msg( code=0, msg='ok', data=file_path ) return resp @router.post("/run") async def run( data_in: schemas.BaleCreate, background_tasks: BackgroundTasks ): r = os.popen('ps -ef|grep MakeTool.py') res = r.readlines() if 'python' in ''.join(res) or bale.status == 'busy': resp = schemas.Msg( code=-1, msg='已存在打包任务,请等待' ) return resp if bale.status == 'upload': resp = schemas.Msg( code=-1, msg='正在上传,请等待' ) return resp file = os.path.join(settings.ROOT_DIR, 'ApkTool/channel.json') with open(file, 'w') as f: json.dump(data_in.channel, f) background_tasks.add_task(bale.run_bale_apk, f'{data_in.dir}_{int(time.time())}', data_in.id) resp = schemas.Msg( code=0, msg='开始打包任务' ) return resp