import re import docx import os import copy from pprint import pprint from paddlenlp import Taskflow import pdfplumber from win32com import client as wc from pdf2docx import Converter # 文件路径 schema = ['姓名', '所在地', '户口所在地', '籍贯', '婚姻状况', '民族', '电话', 'tel', '应聘职位', '到岗时间', '学历', '毕业学校', '专业', '期望薪资', '在校时间', '电子邮箱', '工作经验', 'Email', '性别', '年龄', '身份证号', '技能特长', '生日', '现住址' ] schema_dict = {'姓名': 'name', '所在地': 'account', '户口所在地': 'accounts', '婚姻状况': 'gam', '民族': 'nation', '生日': 'birthday', '电话': 'phone', '应聘职位': 'job_name', '到岗时间': 'come_time', '学历': 'education', '毕业学校': 'school', '专业': 'specialty', '期望薪资': 'hope_money', '在校时间': 'at_school', '电子邮箱': 'mail', '工作经验': 'work_exp', 'Email': 'mails', '性别': 'gender', '年龄': 'age', '籍贯': 'account', 'tel': 'tels', '身份证号': 'id_card', '技能特长': 'specialty_do', '现住址': 'now_address'} # 简历初始文档 data_mode = { "interview_name": "", "interview_type": 1, "interview_sign": 0, "hope_money": "", "feedback": 0, "interview_round": 0, "event_time": "", "name": "", "phone": "", "job_name": "", "hr_name": "", "work_exp": 0, "interview_stage": 1, "owner_name": 2, "education": 1, "work_undergo": [], "project_undergo": [], "work_list": [], "school": "", "at_school": "", "specialty": "", "specialty_do": [], "mmended_state": 0, "mail": "", "account": "", "id_card": "", "gender": "", "age": 0, "gam": "", "interview_state": 1, "counts": 1, "nation": "汉", "review": [], "upgrade": [], "come_time": "", "now_money": "", "men_state": 1, "teacher_state": 1, "teacher_back": 1, "offer_state": 1, "offer_exam_state": 1, "notice_state": 1, "pass_why": 0, "pass_text": "", "file_url": '', "now_address": '', "birthday": '', } def chkworlkandtime(listdata): """ 获取工作经历中任职的公司名称和对应的在岗时间 :param dictdata: :return:返回列表格式 """ res = {} for i in listdata: for key, datalist in i.items(): trueDict = {} for data in datalist: if data['text'] in trueDict: if data['probability'] <= trueDict[data['text']]['probability']: continue trueDict.update({ data['text']: { 'end': data['end'], 'probability': data['probability'], 'start': data['start'], } }) trueList = [] for key1, value1 in trueDict.items(): value1.update({ 'text': key1 }) trueDict1 = copy.deepcopy(value1) trueList.append(trueDict1) trueList.sort(key=lambda item: item['start']) res.update({key: trueList}) ress = [] if res != {}: print(res) for i in range(len(res['公司名'])): company_name = '' position_name = '' duty = '' if '公司名' in res: if len(res['公司名']) >= i + 1: company_name = res['公司名'][i]['text'] else: company_name = '' if '职责' in res: if len(res['职责']) >= i + 1: position_name = res['职责'][i]['text'] else: position_name = '' if '工作内容' in res: if len(res['工作内容']) >= i + 1: duty = res['工作内容'][i]['text'] else: duty = '' if '岗位' in res: if len(res['岗位']) >= i + 1: duty = res['岗位'][i]['text'] else: duty = '' date = { 'company_name': company_name, 'position_name': position_name, 'duty': duty } ress.append(str(date)) return ress def chkworlkandtime1(listdata): """ 获取语言能力中语言类型和掌握程度,听说,读写 :param dictdata: :return:返回列表格式 """ res = {} for i in listdata: for key, datalist in i.items(): trueDict = {} for data in datalist: if data['text'] in trueDict: if data['probability'] <= trueDict[data['text']]['probability']: continue trueDict.update({ data['text']: { 'end': data['end'], 'probability': data['probability'], 'start': data['start'], } }) trueList = [] for key1, value1 in trueDict.items(): value1.update({ 'text': key1 }) trueDict1 = copy.deepcopy(value1) trueList.append(trueDict1) trueList.sort(key=lambda item: item['start']) res.update({key: trueList}) ress = [] if res != {}: for i in range(len(res['语言'])): language_name = '' has_sleep = '' reading = '' writing = '' if '语言' in res: language_name = res['语言'][i]['text'] if '掌握程度' in res: has_sleep = res['掌握程度'][i]['text'] if '听说' in res: reading = res['听说'][i]['text'] if '读写' in res: writing = res['读写'][i]['text'] date = { 'language_name ': language_name, 'has_sleep ': has_sleep, 'reading': reading, 'writing': writing } ress.append(str(date)) return ress def chkworlkandtime2(listdata): """ 获取获奖经历中奖项名称和获奖时间 :param dictdata: :return:返回列表格式 """ res = {} for i in listdata: for key, datalist in i.items(): trueDict = {} for data in datalist: if data['text'] in trueDict: if data['probability'] <= trueDict[data['text']]['probability']: continue trueDict.update({ data['text']: { 'end': data['end'], 'probability': data['probability'], 'start': data['start'], } }) trueList = [] for key1, value1 in trueDict.items(): value1.update({ 'text': key1 }) trueDict1 = copy.deepcopy(value1) trueList.append(trueDict1) trueList.sort(key=lambda item: item['start']) res.update({key: trueList}) ress = [] if res != {}: for i in range(len(res['奖项名'])): prize_name = '' prize_time = '' if '奖项名' in res: prize_name = res['公司名'][i]['text'] if '时间' in res: prize_time = res['时间'][i]['text'] date = { 'prize_name ': prize_name, 'prize_time': prize_time, } ress.append(str(date)) return ress def getText_docx(filename): # docx 转text """将docx读成text""" doc = docx.Document(filename) fullText = [] for i in doc.paragraphs: # 迭代docx文档里面的每一个段落 fullText.append(i.text) # 保存每一个段落的文本 numTables = doc.tables # 如果有表格的内容存放在这 if len(numTables) > 0: for table in numTables: row_count = len(table.rows) col_count = len(table.columns) for i in range(row_count): for j in range(col_count): fullText.append(table.cell(i, j).text) return '\n'.join(fullText) def pdf_docx(url, filename): """ 将pdf文件转为docx文件 :param url: :param filename: :return: """ # 获取文件名称 file_name = filename.split('.')[0] # pdf文件名称 pdf_name = url + f"/{filename}" # docx文件名称 docx_name = url + f"/{file_name}.docx" # 加载pdf文档 cv = Converter(pdf_name) cv.convert(docx_name, start=0, end=None) cv.close() # comand = f'$ pdf2docx convert {pdf_name} {docx_name}' # os.system(comand) def getText_pdf(filename): """将pdf读成text""" with pdfplumber.open(filename) as pdf_file: content = '' for i in range(len(pdf_file.pages)): page_text = pdf_file.pages[i] page_content = page_text.extract_text() if page_content: content = content + page_content + "\n" return content def doc_docx(url, filename): """ 将doc文件转为docx文件 :param filename: :return: """ word = wc.Dispatch("Word.Application") doc = word.Documents.Open(url + f"/{filename}") name = filename.split('.')[0] doc.SaveAs(url + f'/{name}.docx', 12) # 12为docx doc.Close() word.Quit() def clash(date, retain, pop): """ 解决词性搜索时,最后的结果只取有值的一个 例如'户口所在地','籍贯'只取默认的籍贯返回,(户口所在地有值把值给籍贯) :param date: 原数据 :param retain: 要固定返回给前端的数据 户口所在地 :param pop: 要删除的那个字段 籍贯 :return: """ if date[retain] != '': date.pop(pop) else: date[retain] = date[pop] date.pop(pop) def get_date(schema, dates, schema_dict): """ 把第三方获取的数据筛选出想要的基本信息 :param schema:中文的词性标注 :param dates:原数据 :param schema_dict:对应中文的英文 :return: 返回取出概率最大的基本信息数据 """ date = data_mode for i in schema: text = dates[0].get(i, '') # 如果数据中没有搜到对应的键,返回空字符串 if text == '': date[schema_dict[i]] = text else: if len(text) == 1: date[schema_dict[i]] = text[0]['text'] else: aa = {} num = [] for dic in text: aa[dic['probability']] = dic['text'] num.append(dic['probability']) # 取出概率最大的值 date[schema_dict[i]] = aa[max(num)] # 解决邮箱冲突的问题 clash(date, 'mail', 'mails') # 解决户口所在地冲突的问题 clash(date, 'account', 'accounts') # 解决电话冲突的问题 clash(date, 'phone', 'tels') work_exp = date['work_exp'] if not work_exp: work_exp = 0 # 工作经验float转化 if work_exp and isinstance(work_exp, str): true_work_exp = re.search(r"\d+\.?\d*", work_exp) if len(true_work_exp.group()) > 3: work_exp = 0 else: work_exp = float(true_work_exp.group()) if work_exp <= 0: date['work_exp'] = 0 if 1 <= work_exp < 3: date['work_exp'] = 1 if 3 <= work_exp < 5: date['work_exp'] = 2 if work_exp >= 5: date['work_exp'] = 3 return date def fmtTxt(txt, istable=0): # 所有关键字 chkStr = ['自我评价', '自我描述', '个人优势', '项目经历', '项目经验', '项目描述', '教育经历', '学习经历', '工作经历', '工作经验', '实习经历', '技能特长', '技能', '特长', '专长', '技能专长', '专业技能', '职业技能', '个人评价', '语言', '获奖', '证书', '获奖记录', '获奖经历'] # 自我描述 chkList1 = ['自我评价', '自我描述', '个人优势', '个人评价'] # 项目经验 chkList2 = ['项目经历', '项目经验', '项目描述'] # 教育背景 chkList3 = ['教育经历', '学习经历'] # 工作经历 chkList4 = ['工作经历', '工作经验', '实习经历'] # 个人技能 chkList5 = ['技能特长', '技能', '特长', '专长', '技能专长', '专业技能', '职业技能'] # 语言能力 chkList6 = ['语言'] # 获奖经历 chkList7 = ['获奖', '证书', '获奖记录', '获奖经历'] fmtList = [] # 返回拼接好的字符串列表 trueIndex = 0 fmtStr = '' nowChkList = [] # 判断while循环是否需要停止 stop_int = 0 for index, i in enumerate(txt): if istable: text = i else: text = i.text # text = re.sub('\s+', '', text).lstrip() # 字符串去除空格和换行符 # 没有检测出关键字 if not fmtStr: # 自我描述 for i in chkList1: # 判断是不是以关键字开头 if not text.startswith(i, 0): continue else: if i in text: fmtStr = text nowChkList = [chk for chk in chkStr if chk not in chkList1] # 检测出关键字证明需要继续循环 stop_int = 1 break if fmtStr: continue # 项目经验 for i in chkList2: if i in text: fmtStr = text nowChkList = [chk for chk in chkStr if chk not in chkList2] stop_int = 1 break if fmtStr: continue # 教育背景 for i in chkList3: if i in text: fmtStr = text nowChkList = [chk for chk in chkStr if chk not in chkList3] stop_int = 1 break if fmtStr: continue # 工作经历 for i in chkList4: # 判断是不是以关键字开头 if not text.startswith(i, 0): continue else: if i in text: fmtStr = text nowChkList = [chk for chk in chkStr if chk not in chkList4] stop_int = 1 break if fmtStr: continue # 个人技能 for i in chkList5: # 判断是不是以关键字开头 if not text.startswith(i, 0): continue else: if i in text: fmtStr = text nowChkList = [chk for chk in chkStr if chk not in chkList5] stop_int = 1 break if fmtStr: continue # 语言能力 for i in chkList6: # 判断是不是以关键字开头 if not text.startswith(i, 0): continue else: if i in text: fmtStr = text nowChkList = [chk for chk in chkStr if chk not in chkList6] stop_int = 1 break if fmtStr: continue # 获奖经历 for i in chkList7: if i in text: fmtStr = text nowChkList = [chk for chk in chkStr if chk not in chkList7] stop_int = 1 break continue else: isTure = 1 for i in nowChkList: if i in text: isTure = 0 break if isTure: fmtStr += text continue else: fmtStrTrue = fmtStr fmtList.append(fmtStrTrue) trueIndex = index # fmtStr = '' # nowChkList = [] # 剩余没有检索的部分 txt1 = txt[trueIndex:] return fmtList, txt1, stop_int # 当列表全部检索完毕需要停止循环 if fmtStr: fmtStrTrue = fmtStr fmtList.append(fmtStrTrue) stop_int = 0 txt1 = txt[trueIndex:] return fmtList, txt1, stop_int def fmtList(txtlist, dates): chkList1 = ['自我评价', '自我描述', '个人优势'] chkList2 = ['项目经历', '项目经验', '项目描述'] chkList3 = ['教育经历', '学习经历'] chkList4 = ['工作经历', '工作经验', '实习经历'] chkList5 = ['技能特长', '技能', '特长', '专长', '技能专长', '专业技能', '职业技能'] chkList6 = ['语言'] chkList7 = ['获奖', '证书', '获奖记录', '获奖经历'] # 自我评价 review = [] # 项目经验 project = [] # 工作经验 work = [] # 教育经验 upgrade = [] # 技能特长 specialty = [] # 语言能力 language = [] # 获奖经历 remembrance = [] for text in txtlist: ischk = 0 # 自我评价 for i in chkList1: if i in text: review.append(text) ischk = 1 break if ischk: continue # 项目经验 for i in chkList2: if i in text: project.append(text) ischk = 1 break if ischk: continue # 工作经验 for i in chkList4: if i in text: work.append(text) ischk = 1 break if ischk: continue # 教育经历 for i in chkList3: if i in text: upgrade.append(text) ischk = 1 break if ischk: continue # 自我评价 for i in chkList5: if i in text: specialty.append(text) ischk = 1 break if ischk: continue # 语言能力 for i in chkList6: if i in text: language.append(text) ischk = 1 break if ischk: continue # 获奖经历 for i in chkList7: if i in text: remembrance.append(text) ischk = 1 break if ischk: continue # 取出工作经验里面的公司名和时间 if len(work) > 0: works = '' for i in work: works += i schema = ['公司名', '职责', '工作内容', '岗位'] ie = Taskflow('information_extraction', schema=schema) text_lists = ie(works) work_list = chkworlkandtime(text_lists) if not work_list: work_list = ["{'company_name': '测试公司','position_name': '测试职位','duty': '测试职责'}"] else: work_list = ["{'company_name': '测试公司','position_name': '测试职位','duty': '测试职责'}"] # 取出获奖经历里面的公司名和时间 if len(remembrance) > 0: remembrances = '' for i in remembrance: remembrances += i schema = ['奖项名', '时间'] ie = Taskflow('information_extraction', schema=schema) text_lists = ie(remembrances) remembrance_list = chkworlkandtime2(text_lists) if not remembrance_list: remembrance_list = ["{'prize_name': '测试奖项', 'prize_time': '2022-08-26'}"] else: remembrance_list = ["{'prize_name': '测试奖项', 'prize_time': '2022-08-26'}"] # 取出语言能力里面的语言,掌握程度,听说,读写 if len(language) > 0: works = '' for i in work: works += i schema = ['语言', '掌握程度', '听说', '读写'] ie = Taskflow('information_extraction', schema=schema) text_lists = ie(works) language_list = chkworlkandtime1(text_lists) if not language_list: language_list = ["{'language_name': '测试语言', 'has_sleep': '好', 'reading': '听说', 'writing': '读写'}"] else: language_list = ["{'language_name': '测试语言', 'has_sleep': '好', 'reading': '听说', 'writing': '读写'}"] # review自我评价, project项目经验,work工作经验,work具体工作的公司和时间,upgrade教育经历,specialty技能特长 dates.update({ 'review': review, 'project_undergo': project, 'work_undergo': work, 'work_list': work_list, 'upgrade': upgrade, 'specialty_do': specialty, 'language': language_list, 'remembrance': remembrance_list, }) return dates def get_resume(file, path_data): url = path_data + f"/{file}" if os.path.splitext(file)[1] == '.pdf': pdf_docx(path_data, file) # 转为docx name = file.split('.')[0] open_txt = docx.Document(path_data + f"/{name}.docx") # 打开docx os.remove(path_data + f"/{name}.docx") # 删除生成的文件 txt = getText_pdf(url) # 打开pdf格式文件转txt # txt = getText_docx(PATH_DATA + f"\{name}.docx") elif os.path.splitext(file)[1] == '.docx': open_txt = docx.Document(url) # 打开docx,将用来读取每一段的内容 txt = getText_docx(url) # 打开docx格式文件转txt elif os.path.splitext(file)[1] == '.doc': doc_docx(path_data, file) # 转为docx name = file.split('.')[0] open_txt = docx.Document(path_data + f"/{name}.docx") # 打开docx txt = getText_docx(path_data + f"/{name}.docx") # 打开docx格式文件转txt os.remove(path_data + f"/{name}.docx") # 删除生成的文件 ie = Taskflow('information_extraction', schema=schema) # 花费时间会安装文件 # pprint(ie(txt)) # 姓名,电话,电子邮箱,民族,毕业院校,专业,工作经验,婚姻状况 # 获取的基础数据 text_lists = ie(txt) # 处理后的基本数据 dates = get_date(schema, text_lists, schema_dict) # 打开docx获取的每一段数据 txt_list = open_txt.paragraphs # 获取的文档内容 txt_list1 = [] stop_int = 1 txt1 = txt_list while stop_int: txt_list2, txt1, stop_int = fmtTxt(txt1) txt_list1 += txt_list2 # print(txt_list1) numTables = open_txt.tables # 获取表格里面的内容 table_list = [] if len(numTables) > 0: for table in numTables: row_count = len(table.rows) col_count = len(table.columns) for i in range(row_count): for j in range(col_count): texts = table.cell(i, j).text # texts = re.sub('\s+', '', texts).lstrip() # 字符串去除空格和换行符 if not texts: continue if texts in table_list: continue table_list.append(texts) if table_list: stop_table = 1 table1 = table_list while stop_table: table_list2, table1, stop_table = fmtTxt(table1, istable=1) txt_list1 += table_list2 # print(txt_list1) # review自我评价,project项目经验,work工作经验,upgrade教育经历,specialty技能特长 # 把两部分的数据合起来返回前端,数据都在dates中 fmtList(txt_list1, dates) # pprint(dates) a = 1 return dates if __name__ == '__main__': path_data = "D:\wokerplay\面试简历1" for _, _, files in os.walk(path_data): for file in files: print(get_resume(file, path_data))