# coding:utf-8 import json import os import time from multiprocessing import Pool from sqlalchemy import Column, String, create_engine, table, Text, MetaData, Table, Integer, TIMESTAMP from sqlalchemy.orm import sessionmaker from sqlalchemy.dialects.mysql import Insert class UpdateInsert: def __init__(self): self.engine = None self.session = None def __enter__(self): self.engine = create_engine('mysql+pymysql://root:87251326@10.0.0.5:3306/file_monitor?charset=utf8') self.session = sessionmaker(bind=self.engine)() metadata = MetaData(self.engine) self.json_file_model = Table('json_file', metadata, Column('id', Integer, primary_key=True), Column('project', String(32)), Column('path', String(255)), Column('ctime', Integer), Column('atime', Integer), Column('mtime', Integer), Column('anitype', String(32)), ) return self def __exit__(self, exc_type, exc_val, exc_tb): self.session.commit() self.session.close() def on_duplicate_key_update(self, *data): for item in data: insert_stmt = Insert(self.json_file_model).values(item) on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(item) self.session.execute(on_duplicate_key_stmt) self.session.commit() class FileMonitor: def __init__(self, root_path): self.json_files = set() self.root_path = root_path self.skeleton_json_files = set() self.file_count = 0 def get_all_file(self, path): """ 取所有json文件 :param path: :return: """ if '.svn' in path: return all_files = os.listdir(path) for file in all_files: filepath = os.path.join(path, file) print(filepath) self.file_count += 1 if os.path.isdir(filepath): self.get_all_file(filepath) if filepath.endswith('.json'): self.json_files.add(filepath) def check_json(self): """ 取满足要求的json文件 :return: """ for path in self.json_files: try: print(f'检查 {path}') with open(path, encoding='utf8') as f: try: data = json.load(f) except: continue if isinstance(data, dict) and "skeleton" in data and "spine" in data['skeleton']: self.skeleton_json_files.add(path) except: pass def get_project_name(self, path): """ 获取项目名 :param path: :return: """ file_path = '' project_name = '' for item in path.split(self.root_path): if not item: continue file_path = item for p in item.split('\\'): if not p: continue project_name = p break break return file_path, project_name def get_data(self): """ 格式化数据 :return: """ data = [] for path in self.skeleton_json_files: ctime = int(os.path.getctime(path)) atime = int(os.path.getatime(path)) mtime = int(os.path.getmtime(path)) file_path, project_name = self.get_project_name(path) data.append({ 'ctime': ctime, 'atime': atime, 'mtime': mtime, 'project': project_name, 'path': file_path, 'anitype': 'spine' }) print(ctime, atime, mtime, project_name, file_path) return data def update_save(self, data): """ 更新插入 :return: """ with UpdateInsert() as obj: obj.on_duplicate_key_update(*data) def run(self, path): self.get_all_file(path) self.check_json() data = self.get_data() self.update_save(data) print(f'目录 {path} 遍历文件数 {self.file_count}') def handler(args): obj = FileMonitor(args[0]) obj.run(args[1]) if __name__ == '__main__': st = int(time.time()) print(st) root_dir = os.path.abspath(os.path.dirname(__file__)) print(root_dir) files = os.listdir(root_dir) dir_list = [(root_dir, os.path.join(root_dir, item)) for item in files if os.path.isdir(os.path.join(root_dir, item))] print(f'开启进程数 {len(dir_list)}') time.sleep(3) with Pool(len(dir_list)) as p: p.map(handler, dir_list) print(f'用时 {int(time.time()) - st}')