prs_server/utils/jianli.py
2022-09-06 14:19:26 +08:00

719 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import docx
import os
import copy
from pprint import pprint
from paddlenlp import Taskflow
import pdfplumber
from win32com import client as wc
from pdf2docx import Converter
# 文件路径
schema = ['姓名', '所在地', '户口所在地', '籍贯', '婚姻状况', '民族', '电话', 'tel', '应聘职位', '到岗时间', '学历', '毕业学校', '专业', '期望薪资',
'在校时间', '电子邮箱', '工作经验', 'Email', '性别', '年龄', '身份证号', '技能特长', '生日', '现住址'
]
schema_dict = {'姓名': 'name', '所在地': 'account', '户口所在地': 'accounts', '婚姻状况': 'gam', '民族': 'nation', '生日': 'birthday',
'电话': 'phone', '应聘职位': 'job_name', '到岗时间': 'come_time', '学历': 'education', '毕业学校': 'school',
'专业': 'specialty', '期望薪资': 'hope_money', '在校时间': 'at_school', '电子邮箱': 'mail', '工作经验': 'work_exp',
'Email': 'mails', '性别': 'gender', '年龄': 'age', '籍贯': 'account', 'tel': 'tels', '身份证号': 'id_card',
'技能特长': 'specialty_do', '现住址': 'now_address'}
# 简历初始文档
data_mode = {
"interview_name": "",
"interview_type": 1,
"interview_sign": 0,
"hope_money": "",
"feedback": 0,
"interview_round": 0,
"event_time": "",
"name": "",
"phone": "",
"job_name": "",
"hr_name": "",
"work_exp": 0,
"interview_stage": 1,
"owner_name": 2,
"education": 1,
"work_undergo": [],
"project_undergo": [],
"work_list": [],
"school": "",
"at_school": "",
"specialty": "",
"specialty_do": [],
"mmended_state": 0,
"mail": "",
"account": "",
"id_card": "",
"gender": "",
"age": 0,
"gam": "",
"interview_state": 1,
"counts": 1,
"nation": "",
"review": [],
"upgrade": [],
"come_time": "",
"now_money": "",
"men_state": 1,
"teacher_state": 1,
"teacher_back": 1,
"offer_state": 1,
"offer_exam_state": 1,
"notice_state": 1,
"pass_why": 0,
"pass_text": "",
"file_url": '',
"now_address": '',
"birthday": '',
}
def chkworlkandtime(listdata):
"""
获取工作经历中任职的公司名称和对应的在岗时间
:param dictdata:
:return:返回列表格式
"""
res = {}
for i in listdata:
for key, datalist in i.items():
trueDict = {}
for data in datalist:
if data['text'] in trueDict:
if data['probability'] <= trueDict[data['text']]['probability']:
continue
trueDict.update({
data['text']: {
'end': data['end'],
'probability': data['probability'],
'start': data['start'],
}
})
trueList = []
for key1, value1 in trueDict.items():
value1.update({
'text': key1
})
trueDict1 = copy.deepcopy(value1)
trueList.append(trueDict1)
trueList.sort(key=lambda item: item['start'])
res.update({key: trueList})
ress = []
if res != {}:
print(res)
for i in range(len(res['公司名'])):
company_name = ''
position_name = ''
duty = ''
if '公司名' in res:
if len(res['公司名']) >= i + 1:
company_name = res['公司名'][i]['text']
else:
company_name = ''
if '职责' in res:
if len(res['职责']) >= i + 1:
position_name = res['职责'][i]['text']
else:
position_name = ''
if '工作内容' in res:
if len(res['工作内容']) >= i + 1:
duty = res['工作内容'][i]['text']
else:
duty = ''
if '岗位' in res:
if len(res['岗位']) >= i + 1:
duty = res['岗位'][i]['text']
else:
duty = ''
date = {
'company_name': company_name,
'position_name': position_name,
'duty': duty
}
ress.append(str(date))
return ress
def chkworlkandtime1(listdata):
"""
获取语言能力中语言类型和掌握程度,听说,读写
:param dictdata:
:return:返回列表格式
"""
res = {}
for i in listdata:
for key, datalist in i.items():
trueDict = {}
for data in datalist:
if data['text'] in trueDict:
if data['probability'] <= trueDict[data['text']]['probability']:
continue
trueDict.update({
data['text']: {
'end': data['end'],
'probability': data['probability'],
'start': data['start'],
}
})
trueList = []
for key1, value1 in trueDict.items():
value1.update({
'text': key1
})
trueDict1 = copy.deepcopy(value1)
trueList.append(trueDict1)
trueList.sort(key=lambda item: item['start'])
res.update({key: trueList})
ress = []
if res != {}:
for i in range(len(res['语言'])):
language_name = ''
has_sleep = ''
reading = ''
writing = ''
if '语言' in res:
language_name = res['语言'][i]['text']
if '掌握程度' in res:
has_sleep = res['掌握程度'][i]['text']
if '听说' in res:
reading = res['听说'][i]['text']
if '读写' in res:
writing = res['读写'][i]['text']
date = {
'language_name ': language_name,
'has_sleep ': has_sleep,
'reading': reading,
'writing': writing
}
ress.append(str(date))
return ress
def chkworlkandtime2(listdata):
"""
获取获奖经历中奖项名称和获奖时间
:param dictdata:
:return:返回列表格式
"""
res = {}
for i in listdata:
for key, datalist in i.items():
trueDict = {}
for data in datalist:
if data['text'] in trueDict:
if data['probability'] <= trueDict[data['text']]['probability']:
continue
trueDict.update({
data['text']: {
'end': data['end'],
'probability': data['probability'],
'start': data['start'],
}
})
trueList = []
for key1, value1 in trueDict.items():
value1.update({
'text': key1
})
trueDict1 = copy.deepcopy(value1)
trueList.append(trueDict1)
trueList.sort(key=lambda item: item['start'])
res.update({key: trueList})
ress = []
if res != {}:
for i in range(len(res['奖项名'])):
prize_name = ''
prize_time = ''
if '奖项名' in res:
prize_name = res['公司名'][i]['text']
if '时间' in res:
prize_time = res['时间'][i]['text']
date = {
'prize_name ': prize_name,
'prize_time': prize_time,
}
ress.append(str(date))
return ress
def getText_docx(filename): # docx 转text
"""将docx读成text"""
doc = docx.Document(filename)
fullText = []
for i in doc.paragraphs: # 迭代docx文档里面的每一个段落
fullText.append(i.text) # 保存每一个段落的文本
numTables = doc.tables # 如果有表格的内容存放在这
if len(numTables) > 0:
for table in numTables:
row_count = len(table.rows)
col_count = len(table.columns)
for i in range(row_count):
for j in range(col_count):
fullText.append(table.cell(i, j).text)
return '\n'.join(fullText)
def pdf_docx(url, filename):
"""
将pdf文件转为docx文件
:param url:
:param filename:
:return:
"""
# 获取文件名称
file_name = filename.split('.')[0]
# pdf文件名称
pdf_name = url + f"/{filename}"
# docx文件名称
docx_name = url + f"/{file_name}.docx"
# 加载pdf文档
cv = Converter(pdf_name)
cv.convert(docx_name, start=0, end=None)
cv.close()
# comand = f'$ pdf2docx convert {pdf_name} {docx_name}'
# os.system(comand)
def getText_pdf(filename):
"""将pdf读成text"""
with pdfplumber.open(filename) as pdf_file:
content = ''
for i in range(len(pdf_file.pages)):
page_text = pdf_file.pages[i]
page_content = page_text.extract_text()
if page_content:
content = content + page_content + "\n"
return content
def doc_docx(url, filename):
"""
将doc文件转为docx文件
:param filename:
:return:
"""
word = wc.Dispatch("Word.Application")
doc = word.Documents.Open(url + f"/{filename}")
name = filename.split('.')[0]
doc.SaveAs(url + f'/{name}.docx', 12) # 12为docx
doc.Close()
word.Quit()
def clash(date, retain, pop):
"""
解决词性搜索时,最后的结果只取有值的一个
例如'户口所在地','籍贯'只取默认的籍贯返回,(户口所在地有值把值给籍贯)
:param date: 原数据
:param retain: 要固定返回给前端的数据 户口所在地
:param pop: 要删除的那个字段 籍贯
:return:
"""
if date[retain] != '':
date.pop(pop)
else:
date[retain] = date[pop]
date.pop(pop)
def get_date(schema, dates, schema_dict):
"""
把第三方获取的数据筛选出想要的基本信息
:param schema:中文的词性标注
:param dates:原数据
:param schema_dict:对应中文的英文
:return: 返回取出概率最大的基本信息数据
"""
date = data_mode
for i in schema:
text = dates[0].get(i, '')
# 如果数据中没有搜到对应的键,返回空字符串
if text == '':
date[schema_dict[i]] = text
else:
if len(text) == 1:
date[schema_dict[i]] = text[0]['text']
else:
aa = {}
num = []
for dic in text:
aa[dic['probability']] = dic['text']
num.append(dic['probability'])
# 取出概率最大的值
date[schema_dict[i]] = aa[max(num)]
# 解决邮箱冲突的问题
clash(date, 'mail', 'mails')
# 解决户口所在地冲突的问题
clash(date, 'account', 'accounts')
# 解决电话冲突的问题
clash(date, 'phone', 'tels')
work_exp = date['work_exp']
if not work_exp:
work_exp = 0
# 工作经验float转化
if work_exp and isinstance(work_exp, str):
true_work_exp = re.search(r"\d+\.?\d*", work_exp)
if len(true_work_exp.group()) > 3:
work_exp = 0
else:
work_exp = float(true_work_exp.group())
if work_exp <= 0:
date['work_exp'] = 0
if 1 <= work_exp < 3:
date['work_exp'] = 1
if 3 <= work_exp < 5:
date['work_exp'] = 2
if work_exp >= 5:
date['work_exp'] = 3
return date
def fmtTxt(txt, istable=0):
# 所有关键字
chkStr = ['自我评价', '自我描述', '个人优势', '项目经历', '项目经验', '项目描述', '教育经历', '学习经历', '工作经历', '工作经验', '实习经历',
'技能特长', '技能', '特长', '专长', '技能专长', '专业技能', '职业技能', '个人评价', '语言', '获奖', '证书', '获奖记录', '获奖经历']
# 自我描述
chkList1 = ['自我评价', '自我描述', '个人优势', '个人评价']
# 项目经验
chkList2 = ['项目经历', '项目经验', '项目描述']
# 教育背景
chkList3 = ['教育经历', '学习经历']
# 工作经历
chkList4 = ['工作经历', '工作经验', '实习经历']
# 个人技能
chkList5 = ['技能特长', '技能', '特长', '专长', '技能专长', '专业技能', '职业技能']
# 语言能力
chkList6 = ['语言']
# 获奖经历
chkList7 = ['获奖', '证书', '获奖记录', '获奖经历']
fmtList = [] # 返回拼接好的字符串列表
trueIndex = 0
fmtStr = ''
nowChkList = []
# 判断while循环是否需要停止
stop_int = 0
for index, i in enumerate(txt):
if istable:
text = i
else:
text = i.text
# text = re.sub('\s+', '', text).lstrip() # 字符串去除空格和换行符
# 没有检测出关键字
if not fmtStr:
# 自我描述
for i in chkList1:
# 判断是不是以关键字开头
if not text.startswith(i, 0):
continue
else:
if i in text:
fmtStr = text
nowChkList = [chk for chk in chkStr if chk not in chkList1]
# 检测出关键字证明需要继续循环
stop_int = 1
break
if fmtStr:
continue
# 项目经验
for i in chkList2:
if i in text:
fmtStr = text
nowChkList = [chk for chk in chkStr if chk not in chkList2]
stop_int = 1
break
if fmtStr:
continue
# 教育背景
for i in chkList3:
if i in text:
fmtStr = text
nowChkList = [chk for chk in chkStr if chk not in chkList3]
stop_int = 1
break
if fmtStr:
continue
# 工作经历
for i in chkList4:
# 判断是不是以关键字开头
if not text.startswith(i, 0):
continue
else:
if i in text:
fmtStr = text
nowChkList = [chk for chk in chkStr if chk not in chkList4]
stop_int = 1
break
if fmtStr:
continue
# 个人技能
for i in chkList5:
# 判断是不是以关键字开头
if not text.startswith(i, 0):
continue
else:
if i in text:
fmtStr = text
nowChkList = [chk for chk in chkStr if chk not in chkList5]
stop_int = 1
break
if fmtStr:
continue
# 语言能力
for i in chkList6:
# 判断是不是以关键字开头
if not text.startswith(i, 0):
continue
else:
if i in text:
fmtStr = text
nowChkList = [chk for chk in chkStr if chk not in chkList6]
stop_int = 1
break
if fmtStr:
continue
# 获奖经历
for i in chkList7:
if i in text:
fmtStr = text
nowChkList = [chk for chk in chkStr if chk not in chkList7]
stop_int = 1
break
continue
else:
isTure = 1
for i in nowChkList:
if i in text:
isTure = 0
break
if isTure:
fmtStr += text
continue
else:
fmtStrTrue = fmtStr
fmtList.append(fmtStrTrue)
trueIndex = index
# fmtStr = ''
# nowChkList = []
# 剩余没有检索的部分
txt1 = txt[trueIndex:]
return fmtList, txt1, stop_int
# 当列表全部检索完毕需要停止循环
if fmtStr:
fmtStrTrue = fmtStr
fmtList.append(fmtStrTrue)
stop_int = 0
txt1 = txt[trueIndex:]
return fmtList, txt1, stop_int
def fmtList(txtlist, dates):
chkList1 = ['自我评价', '自我描述', '个人优势']
chkList2 = ['项目经历', '项目经验', '项目描述']
chkList3 = ['教育经历', '学习经历']
chkList4 = ['工作经历', '工作经验', '实习经历']
chkList5 = ['技能特长', '技能', '特长', '专长', '技能专长', '专业技能', '职业技能']
chkList6 = ['语言']
chkList7 = ['获奖', '证书', '获奖记录', '获奖经历']
# 自我评价
review = []
# 项目经验
project = []
# 工作经验
work = []
# 教育经验
upgrade = []
# 技能特长
specialty = []
# 语言能力
language = []
# 获奖经历
remembrance = []
for text in txtlist:
ischk = 0
# 自我评价
for i in chkList1:
if i in text:
review.append(text)
ischk = 1
break
if ischk:
continue
# 项目经验
for i in chkList2:
if i in text:
project.append(text)
ischk = 1
break
if ischk:
continue
# 工作经验
for i in chkList4:
if i in text:
work.append(text)
ischk = 1
break
if ischk:
continue
# 教育经历
for i in chkList3:
if i in text:
upgrade.append(text)
ischk = 1
break
if ischk:
continue
# 自我评价
for i in chkList5:
if i in text:
specialty.append(text)
ischk = 1
break
if ischk:
continue
# 语言能力
for i in chkList6:
if i in text:
language.append(text)
ischk = 1
break
if ischk:
continue
# 获奖经历
for i in chkList7:
if i in text:
remembrance.append(text)
ischk = 1
break
if ischk:
continue
# 取出工作经验里面的公司名和时间
if len(work) > 0:
works = ''
for i in work:
works += i
schema = ['公司名', '职责', '工作内容', '岗位']
ie = Taskflow('information_extraction', schema=schema)
text_lists = ie(works)
work_list = chkworlkandtime(text_lists)
if not work_list:
work_list = ["{'company_name': '测试公司','position_name': '测试职位','duty': '测试职责'}"]
else:
work_list = ["{'company_name': '测试公司','position_name': '测试职位','duty': '测试职责'}"]
# 取出获奖经历里面的公司名和时间
if len(remembrance) > 0:
remembrances = ''
for i in remembrance:
remembrances += i
schema = ['奖项名', '时间']
ie = Taskflow('information_extraction', schema=schema)
text_lists = ie(remembrances)
remembrance_list = chkworlkandtime2(text_lists)
if not remembrance_list:
remembrance_list = ["{'prize_name': '测试奖项', 'prize_time': '2022-08-26'}"]
else:
remembrance_list = ["{'prize_name': '测试奖项', 'prize_time': '2022-08-26'}"]
# 取出语言能力里面的语言,掌握程度,听说,读写
if len(language) > 0:
works = ''
for i in work:
works += i
schema = ['语言', '掌握程度', '听说', '读写']
ie = Taskflow('information_extraction', schema=schema)
text_lists = ie(works)
language_list = chkworlkandtime1(text_lists)
if not language_list:
language_list = ["{'language_name': '测试语言', 'has_sleep': '', 'reading': '听说', 'writing': '读写'}"]
else:
language_list = ["{'language_name': '测试语言', 'has_sleep': '', 'reading': '听说', 'writing': '读写'}"]
# review自我评价, project项目经验work工作经验work具体工作的公司和时间upgrade教育经历specialty技能特长
dates.update({
'review': review,
'project_undergo': project,
'work_undergo': work,
'work_list': work_list,
'upgrade': upgrade,
'specialty_do': specialty,
'language': language_list,
'remembrance': remembrance_list,
})
return dates
def get_resume(file, path_data):
url = path_data + f"/{file}"
if os.path.splitext(file)[1] == '.pdf':
pdf_docx(path_data, file) # 转为docx
name = file.split('.')[0]
open_txt = docx.Document(path_data + f"/{name}.docx") # 打开docx
os.remove(path_data + f"/{name}.docx") # 删除生成的文件
txt = getText_pdf(url) # 打开pdf格式文件转txt
# txt = getText_docx(PATH_DATA + f"\{name}.docx")
elif os.path.splitext(file)[1] == '.docx':
open_txt = docx.Document(url) # 打开docx将用来读取每一段的内容
txt = getText_docx(url) # 打开docx格式文件转txt
elif os.path.splitext(file)[1] == '.doc':
doc_docx(path_data, file) # 转为docx
name = file.split('.')[0]
open_txt = docx.Document(path_data + f"/{name}.docx") # 打开docx
txt = getText_docx(path_data + f"/{name}.docx") # 打开docx格式文件转txt
os.remove(path_data + f"/{name}.docx") # 删除生成的文件
ie = Taskflow('information_extraction', schema=schema) # 花费时间会安装文件
# pprint(ie(txt)) # 姓名,电话,电子邮箱,民族,毕业院校,专业,工作经验,婚姻状况
# 获取的基础数据
text_lists = ie(txt)
# 处理后的基本数据
dates = get_date(schema, text_lists, schema_dict)
# 打开docx获取的每一段数据
txt_list = open_txt.paragraphs
# 获取的文档内容
txt_list1 = []
stop_int = 1
txt1 = txt_list
while stop_int:
txt_list2, txt1, stop_int = fmtTxt(txt1)
txt_list1 += txt_list2
# print(txt_list1)
numTables = open_txt.tables # 获取表格里面的内容
table_list = []
if len(numTables) > 0:
for table in numTables:
row_count = len(table.rows)
col_count = len(table.columns)
for i in range(row_count):
for j in range(col_count):
texts = table.cell(i, j).text
# texts = re.sub('\s+', '', texts).lstrip() # 字符串去除空格和换行符
if not texts:
continue
if texts in table_list:
continue
table_list.append(texts)
if table_list:
stop_table = 1
table1 = table_list
while stop_table:
table_list2, table1, stop_table = fmtTxt(table1, istable=1)
txt_list1 += table_list2
# print(txt_list1)
# review自我评价,project项目经验work工作经验upgrade教育经历specialty技能特长
# 把两部分的数据合起来返回前端,数据都在dates中
fmtList(txt_list1, dates)
# pprint(dates)
a = 1
return dates
if __name__ == '__main__':
path_data = "D:\wokerplay\面试简历1"
for _, _, files in os.walk(path_data):
for file in files:
print(get_resume(file, path_data))