file_monitor/main.py
2021-09-01 18:15:22 +08:00

165 lines
5.0 KiB
Python

# coding:utf-8
import json
import os
import time
from multiprocessing import Pool
from sqlalchemy import Column, String, create_engine, table, Text, MetaData, Table, Integer, TIMESTAMP
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects.mysql import Insert
class UpdateInsert:
def __init__(self):
self.engine = None
self.session = None
def __enter__(self):
self.engine = create_engine('mysql+pymysql://root:87251326@10.0.0.5:3306/file_monitor?charset=utf8')
self.session = sessionmaker(bind=self.engine)()
metadata = MetaData(self.engine)
self.json_file_model = Table('json_file', metadata,
Column('id', Integer, primary_key=True),
Column('project', String(32)),
Column('path', String(255)),
Column('ctime', Integer),
Column('atime', Integer),
Column('mtime', Integer),
Column('anitype', String(32)),
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.session.commit()
self.session.close()
def on_duplicate_key_update(self, *data):
for item in data:
insert_stmt = Insert(self.json_file_model).values(item)
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(item)
self.session.execute(on_duplicate_key_stmt)
self.session.commit()
class FileMonitor:
def __init__(self, root_path):
self.json_files = set()
self.root_path = root_path
self.skeleton_json_files = set()
self.file_count = 0
def get_all_file(self, path):
"""
取所有json文件
:param path:
:return:
"""
if '.svn' in path:
return
all_files = os.listdir(path)
for file in all_files:
filepath = os.path.join(path, file)
print(filepath)
self.file_count += 1
if os.path.isdir(filepath):
self.get_all_file(filepath)
if filepath.endswith('.json'):
self.json_files.add(filepath)
def check_json(self):
"""
取满足要求的json文件
:return:
"""
for path in self.json_files:
try:
print(f'检查 {path}')
with open(path, encoding='utf8') as f:
try:
data = json.load(f)
except:
continue
if isinstance(data, dict) and "skeleton" in data and "spine" in data['skeleton']:
self.skeleton_json_files.add(path)
except:
pass
def get_project_name(self, path):
"""
获取项目名
:param path:
:return:
"""
file_path = ''
project_name = ''
for item in path.split(self.root_path):
if not item:
continue
file_path = item
for p in item.split('\\'):
if not p:
continue
project_name = p
break
break
return file_path, project_name
def get_data(self):
"""
格式化数据
:return:
"""
data = []
for path in self.skeleton_json_files:
ctime = int(os.path.getctime(path))
atime = int(os.path.getatime(path))
mtime = int(os.path.getmtime(path))
file_path, project_name = self.get_project_name(path)
data.append({
'ctime': ctime,
'atime': atime,
'mtime': mtime,
'project': project_name,
'path': file_path,
'anitype': 'spine'
})
print(ctime, atime, mtime, project_name, file_path)
return data
def update_save(self, data):
"""
更新插入
:return:
"""
with UpdateInsert() as obj:
obj.on_duplicate_key_update(*data)
def run(self, path):
self.get_all_file(path)
self.check_json()
data = self.get_data()
self.update_save(data)
print(f'目录 {path} 遍历文件数 {self.file_count}')
def handler(args):
obj = FileMonitor(args[0])
obj.run(args[1])
if __name__ == '__main__':
st = int(time.time())
print(st)
root_dir = os.path.abspath(os.path.dirname(__file__))
print(root_dir)
files = os.listdir(root_dir)
dir_list = [(root_dir, os.path.join(root_dir, item)) for item in files if
os.path.isdir(os.path.join(root_dir, item))]
print(f'开启进程数 {len(dir_list)}')
time.sleep(3)
with Pool(len(dir_list)) as p:
p.map(handler, dir_list)
print(f'用时 {int(time.time()) - st}')