osc/research/nuofang-db/nfm/mbase/page_parser.py

264 lines
12 KiB
Python
Raw Normal View History

2025-05-28 19:16:17 +08:00
import os
from lxml import etree
import json
import re
target_file_path = r"E:\yuxin\nuofang-data\base\webpage"
xpath_parse_dict = {
'title': "//div[@class='good-info-box']/div[@class='top cls']/span/text()",
'base_info': "//div[@class='tab '][1]/div[@id='tab1_info']/div[@class='cItem active']/div[@class='layui-row txt']/div[1]/p",
'leader_info': "//div[@class='good-info-box']/div[@class='tab '][1]/div[@id='tab1_info']/div[@class='cItem active']/div[@id='leader_list']//tbody/tr",
'website_info': "//div[@class='good-info-box']/div[@class='tab '][2]/div[@class='tabInfo sourcetabInfo']/div",
}
def trim_space(input_str):
result = re.sub(' {2,}', ' ', input_str)
return result
def parse_base_info(current_selector, current_title):
base_info_line = current_selector.xpath(xpath_parse_dict['base_info'])
base_info = {}
for line in base_info_line:
line_text = ''.join(line.xpath(".//text()"))
label = line_text.split('')[0].strip()
value = ''.join(line_text.split('')[1:]).strip()
base_info[label] = trim_space(value)
# if current_title != '' and len(base_info) > 0:
# print("[No. {}] {}: {}".format(count, current_title, json.dumps(base_info, ensure_ascii=False)))
# pass
# else:
# print("[No. {}] {} 解析不成功".format(count, _path))
return base_info
def parse_leader_info(current_selector, current_title):
leader_info_line = current_selector.xpath(xpath_parse_dict['leader_info'])
leader_info_list = []
ths = []
for i in range(len(leader_info_line)):
leader_info = {}
line = leader_info_line[i]
if i == 0:
ths = line.xpath("./th/text()")
else:
for th in ths:
leader_info[th] = ''
tds = line.xpath("./td")
for tdi in range(len(tds)):
td_txt = ''.join(tds[tdi].xpath(".//text()"))
leader_info[ths[tdi]] = trim_space(td_txt.strip())
leader_info_list.append(leader_info)
# if current_title != '' and len(leader_info_list) > 0:
# print("[No. {}] {}: {}".format(count, current_title, json.dumps(leader_info_list, ensure_ascii=False)))
# pass
return leader_info_list
def parse_website_info(current_selector, current_title):
tab_name_xpath = ".//div[@class='good-info-box']/div[@class='tab '][2]/div[@class='tabNav nav2 cls sourcetabNav']/p/text()"
tab_names = selector.xpath(tab_name_xpath)
if len(tab_names) <= 0:
return
website_info_tab = current_selector.xpath(xpath_parse_dict['website_info'])
website_info = {}
for i in range(len(website_info_tab)):
tab_dict = {}
tab_name = tab_names[i]
tab_content = website_info_tab[i]
overview_box = tab_content.xpath("./dl[@class='overview box']")
if overview_box is not None and len(overview_box) > 0:
overview_info = overview_box[0].xpath(".//p/text()")
tab_dict["概述"] = trim_space(''.join(overview_info).strip())
other_box = tab_content.xpath("./div")
if other_box is not None and len(other_box) > 0:
for box in other_box:
subtitle_info = box.xpath(".//h2/text()")
if len(subtitle_info) > 0:
subtitle = ''.join(subtitle_info).replace("", "").strip()
if subtitle not in ['入驻单位', '主要设施']:
content_info = box.xpath(".//text()")
_content_info = []
for _ in content_info:
_ = trim_space(_)
_ = _.replace(" ", "").replace("\xa0", "").replace('"','').strip()
if len(_) > 0:
if _ == "":
_content_info[-1] = "\n" + _content_info[-1] + ""
else:
_ = '        ' + _
_content_info.append(_)
content = '\n'.join(_content_info).strip()
tab_dict[subtitle] = content
website_info[tab_name] = tab_dict
# if current_title != '' and len(website_info) > 0:
# print("[No. {}] {}: {}".format(count, current_title, json.dumps(website_info, ensure_ascii=False)))
# pass
return website_info
def parse_faa_info(current_selector, current_title):
tab_xpath = "//div[@class='good-info-box']/div[@class='tab ']"
list_tabs = current_selector.xpath(tab_xpath)
faa_info = {}
for current_tab in list_tabs:
tab_name = ''.join(current_tab.xpath(".//p[@class='item active']/text()")).strip()
if 'FAA info' in tab_name:
faa_data_content = current_tab.xpath(
"./div[@class='tabInfo sourcetabInfo']/div[@class='cItem active']/div[@id='data_info_list1']")
data_titles = faa_data_content[0].xpath("./div[@class='bd']/span[@class='tabletitle']")
data_tables = faa_data_content[0].xpath("./div[@class='bd']/table[@class='good-table table1']")
for i in range(len(data_tables)):
data_title = data_titles[i]
data_table = data_tables[i]
trs = data_table.xpath("./tbody/tr")
item = {}
current_item_title = ''
current_item_content = {}
for tr in trs:
th_info = tr.xpath("./th/text()")
td1_info = tr.xpath("./td[1]//text()")
td2_info = tr.xpath("./td[2]//text()")
if th_info is not None and len(th_info) > 0:
if len(current_item_title) > 0:
item[current_item_title] = current_item_content
current_item_title = ''.join(th_info).strip()
current_item_content = {}
else:
td1 = ''.join(td1_info).strip()
td2 = ''.join(td2_info).strip()
current_item_content[td1] = trim_space(td2)
item[current_item_title] = current_item_content
data_title_name = ''.join(data_title.xpath(".//text()")).strip()
faa_info[data_title_name] = item
# if current_title != '' and len(faa_info) > 0:
# print("[No. {}] {}: {}".format(count, current_title, json.dumps(faa_info, ensure_ascii=False)))
# pass
return faa_info
def parse_facility_info(current_selector, current_title):
facility_area_xpath = "//div[@class='good-info-box']//div[@id='militarybase_main_facilities1']"
facility_areas = current_selector.xpath(facility_area_xpath)
facility_info_list = []
for facility_area in facility_areas:
facility_table = facility_area.xpath("./table")
if len(facility_table) > 0:
trs = facility_table[0].xpath("./tbody/tr")
ths = []
for i in range(len(trs)):
tr = trs[i]
if i == 0:
ths = tr.xpath("./th/text()")
else:
td1 = ''.join([_.strip() for _ in tr.xpath("./td[1]//text()")]).strip()
td2 = ''.join([_.strip() for _ in tr.xpath("./td[2]//text()")]).strip()
facility_info_list.append({
ths[0]: trim_space(td1),
ths[1]: trim_space(td2),
})
# if current_title != '' and len(facility_info_list) > 0:
# print("[No. {}] {}: {}".format(count, current_title, json.dumps(facility_info_list, ensure_ascii=False)))
# pass
return facility_info_list
def parse_unit_info(current_selector, current_title):
unit_info_xpath = "//div[@id='militarybase_department_list1']"
unit_info_areas = current_selector.xpath(unit_info_xpath)
unit_info_list = []
for unit_info_area in unit_info_areas:
unit_info_table = unit_info_area.xpath("./table")
if len(unit_info_table) > 0:
trs = unit_info_table[0].xpath("./tbody/tr")
ths = []
for i in range(len(trs)):
tr = trs[i]
if i == 0:
ths = tr.xpath("./th/text()")
else:
try:
td1 = tr.xpath("./td[1]//text()")[0]
td2 = ''.join([_.strip() for _ in tr.xpath("./td[2]//text()")]).strip()
style_info = tr.xpath("./td[2]/span/@style")
style = ''.join(style_info).strip()[12:-3]
tab_number = int(style) / 24
except (IndexError, ValueError):
td1 = "单位"
td2 = ''.join([_.strip() for _ in tr.xpath("./td[2]//text()")]).strip()
if len(td2) == 0:
td2 = ''.join([_.strip() for _ in tr.xpath("./td[1]//text()")]).strip()
tab_number = 1
if tab_number == 1:
unit_info_list.append({
ths[0]: trim_space(td1),
ths[1]: trim_space(td2),
"child": []
})
elif tab_number == 2:
_child = unit_info_list[-1]["child"]
_child.append({
ths[0]: trim_space(td1),
ths[1]: trim_space(td2),
"child": []
})
unit_info_list[-1]["child"] = _child
elif tab_number == 3:
_child = unit_info_list[-1]["child"][-1]["child"]
_child.append({
ths[0]: trim_space(td1),
ths[1]: trim_space(td2),
"child": []
})
unit_info_list[-1]["child"][-1]["child"] = _child
# if current_title != '' and len(unit_info_list) > 0:
# print("[No. {}] {}: {}".format(count, current_title, json.dumps(unit_info_list, ensure_ascii=False)))
# pass
return unit_info_list
if __name__ == '__main__':
count = 0
for _path in os.listdir(target_file_path):
if _path.endswith('.html'):
# if _path.endswith('横须贺舰队设施机构_综合类设施_全球军事态势情报数据库.html'):
count += 1
webpage_info = {}
target_file = open(target_file_path + '\\' + _path, mode='r', encoding='utf-8')
html_content = target_file.read().replace('\n', '')
target_file.close()
selector = etree.HTML(html_content)
title_info = selector.xpath(xpath_parse_dict['title'])
title = ''.join(title_info).strip()
url_info = selector.xpath('//ul[@class="osc-dropMenu"]/li[last()]/a/@href')[0]
webpage_info["url_info"] = url_info.replace("#", "")
base_info = parse_base_info(selector, title)
leader_info = parse_leader_info(selector, title)
website_info = parse_website_info(selector, title)
faa_info = parse_faa_info(selector, title)
facility_info = parse_facility_info(selector, title)
unit_info = parse_unit_info(selector, title)
if len(base_info) > 0:
webpage_info["base_info"] = base_info
if len(leader_info) > 0:
webpage_info["leader_info"] = leader_info
if len(website_info) > 0:
webpage_info["website_info"] = website_info
if len(faa_info) > 0:
webpage_info["faa_info"] = faa_info
if len(facility_info) > 0:
webpage_info["facility_info"] = facility_info
if len(unit_info) > 0:
webpage_info["unit_info"] = unit_info
result_file_name = title + '.json'
result_file_path = 'E:/yuxin/nuofang-data/base/result0513/' + result_file_name
result_file = open(result_file_path, 'w', encoding='utf-8')
result_file.write(json.dumps(webpage_info, ensure_ascii=False))
print("[No. {}] {} 写入完成原始url是{}".format(count, title, webpage_info["url_info"]))
result_file.close()