osc/research/nuofang-db/nfm/mbase/page_parser.py
2025-05-28 19:16:17 +08:00

264 lines
12 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from lxml import etree
import json
import re
target_file_path = r"E:\yuxin\nuofang-data\base\webpage"
xpath_parse_dict = {
'title': "//div[@class='good-info-box']/div[@class='top cls']/span/text()",
'base_info': "//div[@class='tab '][1]/div[@id='tab1_info']/div[@class='cItem active']/div[@class='layui-row txt']/div[1]/p",
'leader_info': "//div[@class='good-info-box']/div[@class='tab '][1]/div[@id='tab1_info']/div[@class='cItem active']/div[@id='leader_list']//tbody/tr",
'website_info': "//div[@class='good-info-box']/div[@class='tab '][2]/div[@class='tabInfo sourcetabInfo']/div",
}
def trim_space(input_str):
result = re.sub(' {2,}', ' ', input_str)
return result
def parse_base_info(current_selector, current_title):
base_info_line = current_selector.xpath(xpath_parse_dict['base_info'])
base_info = {}
for line in base_info_line:
line_text = ''.join(line.xpath(".//text()"))
label = line_text.split('')[0].strip()
value = ''.join(line_text.split('')[1:]).strip()
base_info[label] = trim_space(value)
# if current_title != '' and len(base_info) > 0:
# print("[No. {}] {}: {}".format(count, current_title, json.dumps(base_info, ensure_ascii=False)))
# pass
# else:
# print("[No. {}] {} 解析不成功".format(count, _path))
return base_info
def parse_leader_info(current_selector, current_title):
leader_info_line = current_selector.xpath(xpath_parse_dict['leader_info'])
leader_info_list = []
ths = []
for i in range(len(leader_info_line)):
leader_info = {}
line = leader_info_line[i]
if i == 0:
ths = line.xpath("./th/text()")
else:
for th in ths:
leader_info[th] = ''
tds = line.xpath("./td")
for tdi in range(len(tds)):
td_txt = ''.join(tds[tdi].xpath(".//text()"))
leader_info[ths[tdi]] = trim_space(td_txt.strip())
leader_info_list.append(leader_info)
# if current_title != '' and len(leader_info_list) > 0:
# print("[No. {}] {}: {}".format(count, current_title, json.dumps(leader_info_list, ensure_ascii=False)))
# pass
return leader_info_list
def parse_website_info(current_selector, current_title):
tab_name_xpath = ".//div[@class='good-info-box']/div[@class='tab '][2]/div[@class='tabNav nav2 cls sourcetabNav']/p/text()"
tab_names = selector.xpath(tab_name_xpath)
if len(tab_names) <= 0:
return
website_info_tab = current_selector.xpath(xpath_parse_dict['website_info'])
website_info = {}
for i in range(len(website_info_tab)):
tab_dict = {}
tab_name = tab_names[i]
tab_content = website_info_tab[i]
overview_box = tab_content.xpath("./dl[@class='overview box']")
if overview_box is not None and len(overview_box) > 0:
overview_info = overview_box[0].xpath(".//p/text()")
tab_dict["概述"] = trim_space(''.join(overview_info).strip())
other_box = tab_content.xpath("./div")
if other_box is not None and len(other_box) > 0:
for box in other_box:
subtitle_info = box.xpath(".//h2/text()")
if len(subtitle_info) > 0:
subtitle = ''.join(subtitle_info).replace("", "").strip()
if subtitle not in ['入驻单位', '主要设施']:
content_info = box.xpath(".//text()")
_content_info = []
for _ in content_info:
_ = trim_space(_)
_ = _.replace(" ", "").replace("\xa0", "").replace('"','').strip()
if len(_) > 0:
if _ == "":
_content_info[-1] = "\n" + _content_info[-1] + ""
else:
_ = '        ' + _
_content_info.append(_)
content = '\n'.join(_content_info).strip()
tab_dict[subtitle] = content
website_info[tab_name] = tab_dict
# if current_title != '' and len(website_info) > 0:
# print("[No. {}] {}: {}".format(count, current_title, json.dumps(website_info, ensure_ascii=False)))
# pass
return website_info
def parse_faa_info(current_selector, current_title):
tab_xpath = "//div[@class='good-info-box']/div[@class='tab ']"
list_tabs = current_selector.xpath(tab_xpath)
faa_info = {}
for current_tab in list_tabs:
tab_name = ''.join(current_tab.xpath(".//p[@class='item active']/text()")).strip()
if 'FAA info' in tab_name:
faa_data_content = current_tab.xpath(
"./div[@class='tabInfo sourcetabInfo']/div[@class='cItem active']/div[@id='data_info_list1']")
data_titles = faa_data_content[0].xpath("./div[@class='bd']/span[@class='tabletitle']")
data_tables = faa_data_content[0].xpath("./div[@class='bd']/table[@class='good-table table1']")
for i in range(len(data_tables)):
data_title = data_titles[i]
data_table = data_tables[i]
trs = data_table.xpath("./tbody/tr")
item = {}
current_item_title = ''
current_item_content = {}
for tr in trs:
th_info = tr.xpath("./th/text()")
td1_info = tr.xpath("./td[1]//text()")
td2_info = tr.xpath("./td[2]//text()")
if th_info is not None and len(th_info) > 0:
if len(current_item_title) > 0:
item[current_item_title] = current_item_content
current_item_title = ''.join(th_info).strip()
current_item_content = {}
else:
td1 = ''.join(td1_info).strip()
td2 = ''.join(td2_info).strip()
current_item_content[td1] = trim_space(td2)
item[current_item_title] = current_item_content
data_title_name = ''.join(data_title.xpath(".//text()")).strip()
faa_info[data_title_name] = item
# if current_title != '' and len(faa_info) > 0:
# print("[No. {}] {}: {}".format(count, current_title, json.dumps(faa_info, ensure_ascii=False)))
# pass
return faa_info
def parse_facility_info(current_selector, current_title):
facility_area_xpath = "//div[@class='good-info-box']//div[@id='militarybase_main_facilities1']"
facility_areas = current_selector.xpath(facility_area_xpath)
facility_info_list = []
for facility_area in facility_areas:
facility_table = facility_area.xpath("./table")
if len(facility_table) > 0:
trs = facility_table[0].xpath("./tbody/tr")
ths = []
for i in range(len(trs)):
tr = trs[i]
if i == 0:
ths = tr.xpath("./th/text()")
else:
td1 = ''.join([_.strip() for _ in tr.xpath("./td[1]//text()")]).strip()
td2 = ''.join([_.strip() for _ in tr.xpath("./td[2]//text()")]).strip()
facility_info_list.append({
ths[0]: trim_space(td1),
ths[1]: trim_space(td2),
})
# if current_title != '' and len(facility_info_list) > 0:
# print("[No. {}] {}: {}".format(count, current_title, json.dumps(facility_info_list, ensure_ascii=False)))
# pass
return facility_info_list
def parse_unit_info(current_selector, current_title):
unit_info_xpath = "//div[@id='militarybase_department_list1']"
unit_info_areas = current_selector.xpath(unit_info_xpath)
unit_info_list = []
for unit_info_area in unit_info_areas:
unit_info_table = unit_info_area.xpath("./table")
if len(unit_info_table) > 0:
trs = unit_info_table[0].xpath("./tbody/tr")
ths = []
for i in range(len(trs)):
tr = trs[i]
if i == 0:
ths = tr.xpath("./th/text()")
else:
try:
td1 = tr.xpath("./td[1]//text()")[0]
td2 = ''.join([_.strip() for _ in tr.xpath("./td[2]//text()")]).strip()
style_info = tr.xpath("./td[2]/span/@style")
style = ''.join(style_info).strip()[12:-3]
tab_number = int(style) / 24
except (IndexError, ValueError):
td1 = "单位"
td2 = ''.join([_.strip() for _ in tr.xpath("./td[2]//text()")]).strip()
if len(td2) == 0:
td2 = ''.join([_.strip() for _ in tr.xpath("./td[1]//text()")]).strip()
tab_number = 1
if tab_number == 1:
unit_info_list.append({
ths[0]: trim_space(td1),
ths[1]: trim_space(td2),
"child": []
})
elif tab_number == 2:
_child = unit_info_list[-1]["child"]
_child.append({
ths[0]: trim_space(td1),
ths[1]: trim_space(td2),
"child": []
})
unit_info_list[-1]["child"] = _child
elif tab_number == 3:
_child = unit_info_list[-1]["child"][-1]["child"]
_child.append({
ths[0]: trim_space(td1),
ths[1]: trim_space(td2),
"child": []
})
unit_info_list[-1]["child"][-1]["child"] = _child
# if current_title != '' and len(unit_info_list) > 0:
# print("[No. {}] {}: {}".format(count, current_title, json.dumps(unit_info_list, ensure_ascii=False)))
# pass
return unit_info_list
if __name__ == '__main__':
count = 0
for _path in os.listdir(target_file_path):
if _path.endswith('.html'):
# if _path.endswith('横须贺舰队设施机构_综合类设施_全球军事态势情报数据库.html'):
count += 1
webpage_info = {}
target_file = open(target_file_path + '\\' + _path, mode='r', encoding='utf-8')
html_content = target_file.read().replace('\n', '')
target_file.close()
selector = etree.HTML(html_content)
title_info = selector.xpath(xpath_parse_dict['title'])
title = ''.join(title_info).strip()
url_info = selector.xpath('//ul[@class="osc-dropMenu"]/li[last()]/a/@href')[0]
webpage_info["url_info"] = url_info.replace("#", "")
base_info = parse_base_info(selector, title)
leader_info = parse_leader_info(selector, title)
website_info = parse_website_info(selector, title)
faa_info = parse_faa_info(selector, title)
facility_info = parse_facility_info(selector, title)
unit_info = parse_unit_info(selector, title)
if len(base_info) > 0:
webpage_info["base_info"] = base_info
if len(leader_info) > 0:
webpage_info["leader_info"] = leader_info
if len(website_info) > 0:
webpage_info["website_info"] = website_info
if len(faa_info) > 0:
webpage_info["faa_info"] = faa_info
if len(facility_info) > 0:
webpage_info["facility_info"] = facility_info
if len(unit_info) > 0:
webpage_info["unit_info"] = unit_info
result_file_name = title + '.json'
result_file_path = 'E:/yuxin/nuofang-data/base/result0513/' + result_file_name
result_file = open(result_file_path, 'w', encoding='utf-8')
result_file.write(json.dumps(webpage_info, ensure_ascii=False))
print("[No. {}] {} 写入完成原始url是{}".format(count, title, webpage_info["url_info"]))
result_file.close()