diff --git a/dsp/dsp.iml b/dsp/dsp.iml index 6453e56..36aa587 100644 --- a/dsp/dsp.iml +++ b/dsp/dsp.iml @@ -9,6 +9,16 @@ + + + + + + + + + + @@ -20,7 +30,7 @@ - + @@ -30,7 +40,6 @@ - @@ -93,7 +102,6 @@ - @@ -106,8 +114,6 @@ - - @@ -138,8 +144,8 @@ - - + + @@ -147,7 +153,7 @@ - + @@ -195,5 +201,40 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/dsp/pom.xml b/dsp/pom.xml index 658d53b..da6fe5e 100644 --- a/dsp/pom.xml +++ b/dsp/pom.xml @@ -94,15 +94,34 @@ true true - - org.projectlombok - lombok - mysql mysql-connector-java runtime + + commons-net + commons-net + 3.10.0 + + + + org.springframework.boot + spring-boot-starter-data-jpa + + + + + org.apache.poi + poi + 5.2.4 + + + org.apache.poi + poi-ooxml + 5.2.4 + + diff --git a/dsp/src/main/java/com/jsc/dsp/DspApplication.java b/dsp/src/main/java/com/jsc/dsp/DspApplication.java index 3802233..88682b6 100644 --- a/dsp/src/main/java/com/jsc/dsp/DspApplication.java +++ b/dsp/src/main/java/com/jsc/dsp/DspApplication.java @@ -3,9 +3,10 @@ package com.jsc.dsp; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; +import org.springframework.data.jpa.repository.config.EnableJpaRepositories; import org.springframework.scheduling.annotation.EnableScheduling; -@SpringBootApplication(exclude = DataSourceAutoConfiguration.class) +@SpringBootApplication @EnableScheduling public class DspApplication { diff --git a/dsp/src/main/java/com/jsc/dsp/config/Configuration.java b/dsp/src/main/java/com/jsc/dsp/config/Configuration.java index a194e5f..c6c1383 100644 --- a/dsp/src/main/java/com/jsc/dsp/config/Configuration.java +++ b/dsp/src/main/java/com/jsc/dsp/config/Configuration.java @@ -16,18 +16,6 @@ import java.util.concurrent.Executors; @Component public class Configuration { - @Value("${es.ip}") - String esIp; - - @Value("${es.port}") - Integer esPort; - - @Value("${es.username}") - String esUsername; - - @Value("${es.password}") - String esPassword; - @Bean public JacksonJsonParser getJacksonParser() { return new JacksonJsonParser(); @@ -48,8 +36,4 @@ public class Configuration { return Executors.newFixedThreadPool(4); } - @Bean - public RestHighLevelClient esClient() { - return EsUtils.getElasticsearchClient(esIp, esPort, esUsername, esPassword); - } } diff --git a/dsp/src/main/java/com/jsc/dsp/controller/ExportController.java b/dsp/src/main/java/com/jsc/dsp/controller/ExportController.java new file mode 100644 index 0000000..d1c8ba5 --- /dev/null +++ b/dsp/src/main/java/com/jsc/dsp/controller/ExportController.java @@ -0,0 +1,47 @@ +package com.jsc.dsp.controller; + +import com.alibaba.fastjson.JSONObject; +import com.jsc.dsp.model.ReturnT; +import com.jsc.dsp.utils.AutoExportAndUpload; +import com.jsc.dsp.utils.DatabaseConnector; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; + +@RestController +@RequestMapping("/export") +@ConditionalOnProperty(name = "switch.auto-export-and-upload", havingValue = "true", matchIfMissing = true) +public class ExportController { + + @Resource + DatabaseConnector databaseConnector; + + @Resource + AutoExportAndUpload autoExportAndUpload; + + @PostMapping("/exportExcel") + public ReturnT exportExcel(@RequestBody JSONObject object) { + try { + String startTime = object.getString("startTime"); + databaseConnector.exportToXlsx(startTime); + return new ReturnT<>(200, "", ""); + } catch (Exception e) { + return new ReturnT<>(500, e.getMessage(), ""); + } + } + + @PostMapping("/triggerTask") + public ReturnT triggerTask() { + try { + new Thread(() -> autoExportAndUpload.exportDataAndUpload()).start(); + return new ReturnT<>(200, "", ""); + } catch (Exception e) { + return new ReturnT<>(500, e.getMessage(), ""); + } + } + +} diff --git a/dsp/src/main/java/com/jsc/dsp/dao/ConfigRepository.java b/dsp/src/main/java/com/jsc/dsp/dao/ConfigRepository.java new file mode 100644 index 0000000..3aa6e53 --- /dev/null +++ b/dsp/src/main/java/com/jsc/dsp/dao/ConfigRepository.java @@ -0,0 +1,10 @@ +package com.jsc.dsp.dao; + +import com.jsc.dsp.model.Config; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface ConfigRepository extends JpaRepository { + Config findFirstByConfigName(String configName); +} diff --git a/dsp/src/main/java/com/jsc/dsp/dao/EsDataNewsRepository.java b/dsp/src/main/java/com/jsc/dsp/dao/EsDataNewsRepository.java new file mode 100644 index 0000000..aa769ab --- /dev/null +++ b/dsp/src/main/java/com/jsc/dsp/dao/EsDataNewsRepository.java @@ -0,0 +1,12 @@ +package com.jsc.dsp.dao; + +import com.jsc.dsp.model.EsDataNewsView; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +import java.util.List; + +@Repository +public interface EsDataNewsRepository extends JpaRepository { + List findAllByEsLoadtimeAfter(String loadtime); +} diff --git a/dsp/src/main/java/com/jsc/dsp/dao/IndeximosRepository.java b/dsp/src/main/java/com/jsc/dsp/dao/IndeximosRepository.java new file mode 100644 index 0000000..59b852b --- /dev/null +++ b/dsp/src/main/java/com/jsc/dsp/dao/IndeximosRepository.java @@ -0,0 +1,9 @@ +package com.jsc.dsp.dao; + +import com.jsc.dsp.model.Indeximos; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface IndeximosRepository extends JpaRepository { +} diff --git a/dsp/src/main/java/com/jsc/dsp/model/Config.java b/dsp/src/main/java/com/jsc/dsp/model/Config.java new file mode 100644 index 0000000..4b83f24 --- /dev/null +++ b/dsp/src/main/java/com/jsc/dsp/model/Config.java @@ -0,0 +1,15 @@ +package com.jsc.dsp.model; + +import lombok.Data; + +import javax.persistence.Entity; +import javax.persistence.Id; + +@Entity +@Data +public class Config { + @Id + Integer id; + String configName; + String configValue; +} diff --git a/dsp/src/main/java/com/jsc/dsp/model/EsDataNewsView.java b/dsp/src/main/java/com/jsc/dsp/model/EsDataNewsView.java new file mode 100644 index 0000000..d6cfd5b --- /dev/null +++ b/dsp/src/main/java/com/jsc/dsp/model/EsDataNewsView.java @@ -0,0 +1,36 @@ +package com.jsc.dsp.model; + +import lombok.Data; + +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Table; + +@Entity +@Data +@Table(name = "es_data_news") +public class EsDataNewsView { + @Id + String esSid; + String esAuthors; + String esCarriertype; + String esCatalog; + String esCollection; + Float esDoclength; + String esLang; + String esLasttime; + String esLinks; + String esLoadtime; + String esSitename; + String esSrcname; + String esUrlcontent; + String esUrlcontentRaw; + String esUrlimage; + String esUrlname; + String esUrltime; + String esUrltitle; + String esUrltitleRaw; + String esAbstract; + String esKeywords; + String file; +} diff --git a/dsp/src/main/java/com/jsc/dsp/model/Indeximos.java b/dsp/src/main/java/com/jsc/dsp/model/Indeximos.java index 483e354..583ab0c 100644 --- a/dsp/src/main/java/com/jsc/dsp/model/Indeximos.java +++ b/dsp/src/main/java/com/jsc/dsp/model/Indeximos.java @@ -2,10 +2,17 @@ package com.jsc.dsp.model; import lombok.Data; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Table; import java.io.Serializable; +@Entity @Data +@Table(name = "indeximos") public class Indeximos implements Serializable { + @Id + String es_sid; String es_abstract; String es_annex; String es_attachment; @@ -56,7 +63,6 @@ public class Indeximos implements Serializable { String es_repostuid; String es_repostuname; String es_rultopic; - String es_sid; String es_simhash; String es_similarity; String es_similaritycount; diff --git a/dsp/src/main/java/com/jsc/dsp/service/ConfigService.java b/dsp/src/main/java/com/jsc/dsp/service/ConfigService.java new file mode 100644 index 0000000..749c4a2 --- /dev/null +++ b/dsp/src/main/java/com/jsc/dsp/service/ConfigService.java @@ -0,0 +1,29 @@ +package com.jsc.dsp.service; + +import com.jsc.dsp.dao.ConfigRepository; +import com.jsc.dsp.model.Config; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +@Service +public class ConfigService { + + @Resource + ConfigRepository configRepository; + + public String getConfigValueByName(String configName) { + return getConfigByName(configName).getConfigValue(); + } + + public Config getConfigByName(String configName) { + return configRepository.findFirstByConfigName(configName); + } + + public void setConfigValueByName(String configName, String configValue) { + Config config = getConfigByName(configName); + config.setConfigValue(configValue); + configRepository.save(config); + } + +} diff --git a/dsp/src/main/java/com/jsc/dsp/service/FileDlService.java b/dsp/src/main/java/com/jsc/dsp/service/FileDlService.java index 3e2a2b4..afcf485 100644 --- a/dsp/src/main/java/com/jsc/dsp/service/FileDlService.java +++ b/dsp/src/main/java/com/jsc/dsp/service/FileDlService.java @@ -6,6 +6,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.json.JacksonJsonParser; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; @@ -21,6 +22,7 @@ import java.util.concurrent.Executors; @Component @EnableBinding(FileDlBinding.class) +@ConditionalOnProperty(name = "switch.enable-file-dl-service", havingValue = "true", matchIfMissing = true) public class FileDlService extends StreamService { @Autowired @@ -78,31 +80,14 @@ public class FileDlService extends StreamService { } int dlResult = fileUtils.downloadFromUrl(fileURL, protoSavePath); if (dlResult == 1) { - File transferPath = new File(transferBackupPath); + File transferPath = new File(protoSavePath); File[] files = transferPath.listFiles(); if (files != null && files.length > 0) { for (File transferFile : files) { if (transferFile.getName().endsWith(".tar.gz")) { - if (transferFile.getName().startsWith("attach")) { - try { - fileUtils.UnzipTarGzip(transferFile.getAbsolutePath(), nginxPath); - logger.info("Unzip attachments " + transferFile.getName()); - } catch (Exception e) { - logger.error("Unzip error!"); - } - } else { - try { - fileUtils.UnzipTarGzip(transferFile.getAbsolutePath(), fileUnzipPath); - logger.info("Unzip " + transferFile.getName()); - } catch (Exception e) { - logger.error("Unzip error!"); - } - } + fileUtils.moveFileToBackupFolder(transferFile, keepBackupFile); } - fileUtils.moveFileToBackupFolder(transferFile, keepBackupFile); } -// Runnable upload2Ceph = () -> fileUtils.uploadToCeph(fileUnzipPath); -// pool.execute(upload2Ceph); } } else if (dlResult == 0) { logger.error("File " + fileName + " download failure"); diff --git a/dsp/src/main/java/com/jsc/dsp/service/ProtobufService.java b/dsp/src/main/java/com/jsc/dsp/service/ProtobufService.java index ceb1888..c0051e5 100644 --- a/dsp/src/main/java/com/jsc/dsp/service/ProtobufService.java +++ b/dsp/src/main/java/com/jsc/dsp/service/ProtobufService.java @@ -12,12 +12,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.json.JacksonJsonParser; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.io.File; import java.io.FileOutputStream; import java.io.OutputStream; @@ -30,6 +32,7 @@ import java.util.Map; @Component @EnableBinding(ProtobufBinding.class) +@ConditionalOnProperty(name = "switch.enable-protobuf-service", havingValue = "true", matchIfMissing = true) public class ProtobufService extends StreamService { @Autowired @@ -41,9 +44,6 @@ public class ProtobufService extends StreamService { @Value("${custom.proto_save_path}") String protoSavePath; - @Value("${custom.transfer_backup_path}") - String transferBackupPath; - @Value("${custom.keep_backup_file}") String keepBackupFile; @@ -55,7 +55,7 @@ public class ProtobufService extends StreamService { private final Logger logger = LogManager.getLogger(ProtobufService.class.getName()); - @Autowired + @Resource private ProtobufBinding source; @Override @@ -169,7 +169,7 @@ public class ProtobufService extends StreamService { } logger.debug("protobuf done"); // 转移备份目录的todist文件 - File transferPath = new File(transferBackupPath); + File transferPath = new File(protoSavePath); File[] files = transferPath.listFiles(); if (files != null && files.length > 0) { for (File transferFile : files) { diff --git a/dsp/src/main/java/com/jsc/dsp/service/StorageService.java b/dsp/src/main/java/com/jsc/dsp/service/StorageService.java index 8c45d1a..0a17473 100644 --- a/dsp/src/main/java/com/jsc/dsp/service/StorageService.java +++ b/dsp/src/main/java/com/jsc/dsp/service/StorageService.java @@ -1,31 +1,25 @@ package com.jsc.dsp.service; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.Descriptors.FieldDescriptor; import com.jsc.dsp.binding.StorageBinding; import com.jsc.dsp.model.Indeximos; import com.jsc.dsp.proto.EsOuterClass.Es; import com.jsc.dsp.proto.EsOuterClass.EsSets; -import com.jsc.dsp.utils.DBUtils; -import com.jsc.dsp.utils.EsUtils; -import com.jsc.dsp.utils.FileUtils; +import com.jsc.dsp.utils.DatabaseConnector; import com.jsc.dsp.utils.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.common.xcontent.XContentType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.json.JacksonJsonParser; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.*; @@ -33,6 +27,7 @@ import java.util.Base64.Decoder; @Component @EnableBinding(StorageBinding.class) +@ConditionalOnProperty(name = "switch.enable-storage-service", havingValue = "true", matchIfMissing = true) public class StorageService extends StreamService { @Autowired @@ -44,42 +39,21 @@ public class StorageService extends StreamService { @Autowired JacksonJsonParser jsonParser; - @Value("${es.ip}") - String esIp; - - @Value("${es.port}") - Integer esPort; - - @Value("${es.username}") - String esUsername; - - @Value("${es.password}") - String esPassword; - - @Value("${es.index}") - String esIndex; - @Value("${custom.dev-mode}") boolean devMode; @Value("${custom.local-file-storage-path}") String localFileStoragePath; - @Value("${db.driver}") - String dbDriver; - - @Value("${db.url}") - String dbUrl; - - @Value("${db.user}") - String dbUser; - - @Value("${db.password}") - String dbPassword; + @Value("${custom.websiteWhiteList}") + String websiteWhiteListString; + @Resource + DatabaseConnector databaseConnector; private final Logger logger = LogManager.getLogger(StorageService.class.getName()); + @Override public void sendMessage(byte[] msg) { source.StorageOutput().send(MessageBuilder.withPayload(msg).build()); @@ -91,8 +65,8 @@ public class StorageService extends StreamService { @Override @StreamListener(StorageBinding.STORAGE_PIPELINE_IN) public void receiveMessage(Object payload) { + List websiteWhiteList = Arrays.asList(websiteWhiteListString.split(";")); String tempString; - ObjectMapper objectMapper = new ObjectMapper(); try { tempString = new String(base64.decode(payload.toString()), StandardCharsets.UTF_8); Map data = jsonParser.parseMap(tempString); @@ -101,7 +75,6 @@ public class StorageService extends StreamService { if ("public_info_data_".equals(protoName)) { EsSets.Builder esSetsBuilder = EsSets.newBuilder(); EsSets esSets = EsSets.parseFrom(data.get("content").toString().getBytes(StandardCharsets.ISO_8859_1)); - List localStorageItems = new ArrayList<>(); List dbStorageItems = new ArrayList<>(); BulkRequest bulkRequest = new BulkRequest(); bulkRequest.timeout("5s"); @@ -111,7 +84,7 @@ public class StorageService extends StreamService { Map fieldsMap = es.getAllFields(); Indeximos indeximos = new Indeximos(); for (FieldDescriptor key : fieldsMap.keySet()) { - boolean hasField = DBUtils.hasField(Indeximos.class, key.getName()); + boolean hasField = databaseConnector.hasField(Indeximos.class, key.getName()); if (!hasField) { continue; } @@ -130,7 +103,7 @@ public class StorageService extends StreamService { } else { Field field = indeximos.getClass().getDeclaredField(key.getName()); field.setAccessible(true); - String fieldType = DBUtils.getFieldType(Indeximos.class, key.getName()); + String fieldType = databaseConnector.getFieldType(Indeximos.class, key.getName()); if (fieldType.contains("Float")) { field.set(indeximos, Float.valueOf(value)); } else { @@ -138,68 +111,40 @@ public class StorageService extends StreamService { } } } - String uuid = UUID.randomUUID().toString().replaceAll("-", ""); - String es_urlname = indeximos.getEs_urlname(); - if (!es_urlname.isEmpty()) { - // 根据urlname生成固定的UUID,避免重复入库相同的文章 - UUID _uuid = UUID.nameUUIDFromBytes(es_urlname.getBytes()); - uuid = _uuid.toString().replaceAll("-", ""); - } - indeximos.setEs_sid(uuid); - indeximos.setEs_links(indeximos.getEs_links()); - indeximos.setEs_loadtime(StringUtils.TimestampToStringDate(System.currentTimeMillis())); - builder.setEsSid(uuid); - for (Field f : indeximos.getClass().getDeclaredFields()) { - f.setAccessible(true); - //判断字段是否为空,并且对象属性中的基本都会转为对象类型来判断 - if (f.get(indeximos) == null) { - String fieldType = DBUtils.getFieldType(Indeximos.class, f.getName()); - if (fieldType.contains("Float")) { - f.set(indeximos, 0.0f); - } else { - if (!dateFields.contains(f.getName())) { - f.set(indeximos, ""); + // 只导出目标站点的数据 + if (websiteWhiteList.contains(indeximos.getEs_sitename())) { + logger.info("开始处理站点【" + indeximos.getEs_sitename() + "】的数据入库流程"); + String uuid = UUID.randomUUID().toString().replaceAll("-", ""); + String es_urlname = indeximos.getEs_urlname(); + if (!es_urlname.isEmpty()) { + // 根据urlname生成固定的UUID,避免重复入库相同的文章 + UUID _uuid = UUID.nameUUIDFromBytes(es_urlname.getBytes()); + uuid = _uuid.toString().replaceAll("-", ""); + } + indeximos.setEs_urltitle(indeximos.getEs_urltitle().trim()); + indeximos.setEs_sid(uuid); + indeximos.setEs_links(indeximos.getEs_links()); + indeximos.setEs_loadtime(StringUtils.TimestampToStringDate(System.currentTimeMillis())); + builder.setEsSid(uuid); + for (Field f : indeximos.getClass().getDeclaredFields()) { + f.setAccessible(true); + //判断字段是否为空,并且对象属性中的基本都会转为对象类型来判断 + if (f.get(indeximos) == null) { + String fieldType = databaseConnector.getFieldType(Indeximos.class, f.getName()); + if (fieldType.contains("Float")) { + f.set(indeximos, 0.0f); + } else { + if (!dateFields.contains(f.getName())) { + f.set(indeximos, ""); + } } } } - } - IndexRequest indexRequest = new IndexRequest(esIndex); - indexRequest.id(indeximos.getEs_sid()); - indexRequest.source(objectMapper.writeValueAsString(indeximos), XContentType.JSON); - bulkRequest.add(indexRequest); - Es es_temp = builder.build(); - esSetsBuilder.addEs(es_temp); - List localizedOption = JSON.parseArray(indeximos.getEs_urltopic(), String.class); - if (indeximos.getEs_carriertype().equals("wechat")) { dbStorageItems.add(indeximos); } - if (localizedOption != null && localizedOption.size() > 0) { - //本地存储用 - if (localizedOption.contains("json")) { - localStorageItems.add(indeximos); - } - //入库MySQL - if (localizedOption.contains("mysql")) { - dbStorageItems.add(indeximos); - } - } - } - EsUtils.EsSaveBulkRequest(esIp, esPort, esUsername, esPassword, bulkRequest); - if (localStorageItems.size() > 0) { - String entityItemsString = JSON.toJSONString(localStorageItems); - String entityFileFullPath = localFileStoragePath + esIndex + "_" + System.currentTimeMillis() + ".json"; - if (FileUtils.saveStringToFile(entityItemsString, entityFileFullPath)) { - logger.info("Local file store to " + entityFileFullPath); - } else { - logger.error("Local file store error!"); - } } if (dbStorageItems.size() > 0) { - if (DBUtils.insertIntoDB(dbDriver, dbUrl, dbUser, dbPassword, dbStorageItems)) { - logger.info("Store to MySQL Database"); - } else { - logger.error("MySQL Database Storage error!"); - } + databaseConnector.insertIntoDB(dbStorageItems); } data.put("content", new String(esSetsBuilder.build().toByteArray(), StandardCharsets.ISO_8859_1)); } diff --git a/dsp/src/main/java/com/jsc/dsp/utils/AutoExportAndUpload.java b/dsp/src/main/java/com/jsc/dsp/utils/AutoExportAndUpload.java new file mode 100644 index 0000000..01a3708 --- /dev/null +++ b/dsp/src/main/java/com/jsc/dsp/utils/AutoExportAndUpload.java @@ -0,0 +1,243 @@ +package com.jsc.dsp.utils; + +import com.jsc.dsp.service.ConfigService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.nio.file.attribute.BasicFileAttributes; +import java.nio.file.attribute.FileTime; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Comparator; +import java.util.Date; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +@Component +@ConditionalOnProperty(name = "switch.auto-export-and-upload", havingValue = "true", matchIfMissing = true) +public class AutoExportAndUpload { + + @Resource + DatabaseConnector databaseConnector; + + @Resource + FTPConnector ftpConnector; + + @Resource + ConfigService configService; + + private final Logger logger = LoggerFactory.getLogger(this.getClass().getName()); + + private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; + private static final SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); + + + @Value("${custom.excelOutputPath}") + String excelOutputPath; + + @Value("${custom.backupFilePath}") + String backupFilePath; + + @Value("${custom.pagesOutputPath}") + String pagesOutputPath; + + @Value("${custom.ftpUploadPath}") + String ftpUploadPath; + + /** + * 每周一、三、五的早上8点,执行导出数据的任务 + */ + @Scheduled(cron = "${custom.exportTaskSchedule}") + public void exportDataAndUpload() { + logger.info("开始导出excel和pdf数据..."); + String lastLoadTime = configService.getConfigValueByName("last_loadtime"); + String currentLoadTime = StringUtils.DateToString(new Date()); + String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")); + databaseConnector.exportToXlsx(lastLoadTime); + copyPagesFiles(lastLoadTime, currentLoadTime); + configService.setConfigValueByName("last_loadtime", currentLoadTime); + String zipFileName = "data_news-" + timestamp + "-001.zip"; + String zipFileFullName = backupFilePath + File.separator + zipFileName; + String remoteZipPath = ftpUploadPath + "/" + zipFileName; + zipAndUploadDirectory(excelOutputPath, zipFileFullName, remoteZipPath); + } + + /** + * 将指定目录打包成 ZIP 文件(保存到指定本地路径),并上传到 FTP 服务器 + * + * @param sourceDirPath 本地要打包的源目录路径(如:/data/reports) + * @param localZipPath 本地 ZIP 文件保存路径(如:/backup/archives/reports_20251224.zip) + * @param remoteZipPath FTP 上的目标路径(如:/ftp/backups/reports_20251224.zip) + */ + public void zipAndUploadDirectory(String sourceDirPath, String localZipPath, String remoteZipPath) { + Path sourceDir = Paths.get(sourceDirPath); + if (!Files.exists(sourceDir) || !Files.isDirectory(sourceDir)) { + logger.error("源目录不存在或不是一个目录: {}", sourceDirPath); + return; + } + + Path localZipFile = Paths.get(localZipPath); + Path zipParent = localZipFile.getParent(); + if (zipParent != null && !Files.exists(zipParent)) { + try { + Files.createDirectories(zipParent); + logger.debug("创建 ZIP 父目录: {}", zipParent); + } catch (IOException e) { + logger.error("无法创建 ZIP 父目录: {}", zipParent, e); + return; + } + } + + // 打包目录到指定本地 ZIP 路径 + try { + zipDirectory(sourceDir, localZipFile.toFile()); + } catch (IOException e) { + logger.error("打包目录失败: {}", sourceDirPath, e); + return; + } + + // 上传 ZIP 文件 + try (InputStream zipInputStream = Files.newInputStream(localZipFile)) { + boolean uploaded = ftpConnector.uploadFile(zipInputStream, remoteZipPath); + if (uploaded) { + logger.info("ZIP 文件上传成功 - 本地: {}, FTP: {}", localZipPath, remoteZipPath); + } else { + logger.error("ZIP 文件上传失败 - FTP: {}", remoteZipPath); + } + } catch (IOException e) { + logger.error("读取本地 ZIP 文件失败: {}", localZipPath, e); + } + + // 注意:此处不再删除 localZipFile,由调用方决定是否保留或清理 + } + + + /** + * 将目录递归打包成 ZIP 文件 + * + * @param sourceDir 要打包的源目录 + * @param zipFile 输出的 ZIP 文件 + * @throws IOException + */ + private void zipDirectory(Path sourceDir, File zipFile) throws IOException { + try (ZipOutputStream zipOut = new ZipOutputStream(new FileOutputStream(zipFile))) { + Files.walk(sourceDir) + .filter(path -> !Files.isDirectory(path)) // 只处理文件 + .forEach(path -> { + ZipEntry zipEntry = new ZipEntry(sourceDir.relativize(path).toString()); + try { + zipOut.putNextEntry(zipEntry); + Files.copy(path, zipOut); + zipOut.closeEntry(); + } catch (IOException e) { + throw new RuntimeException("打包文件失败: " + path, e); + } + }); + } + logger.info("目录打包完成: {} -> {}", sourceDir, zipFile.getAbsolutePath()); + try { + Files.walk(sourceDir) + .sorted(Comparator.reverseOrder()) // 先处理子文件/子目录,再处理父目录(但这里只删文件) + .filter(path -> !Files.isDirectory(path)) // 只删除文件 + .forEach(path -> { + try { + Files.delete(path); + logger.debug("已删除文件: {}", path); + } catch (IOException e) { + logger.warn("无法删除文件: {}", path, e); + } + }); + logger.info("源目录已清空(仅删除文件,保留目录结构): {}", sourceDir); + } catch (IOException e) { + logger.error("清空源目录时发生错误", e); + // 注意:即使清理失败,ZIP 已生成并会继续上传,根据业务决定是否抛异常 + // 如果要求“必须清理成功才算成功”,可在此 throw 异常 + } + } + + public void copyPagesFiles(String startTime, String endTime) { + try { + logger.info("开始复制PDF..."); + // 解析时间范围 + Date start = sdf.parse(startTime); + Date end = sdf.parse(endTime); + + // 源目录 + Path sourceDir = Paths.get(pagesOutputPath); + if (!Files.exists(sourceDir) || !Files.isDirectory(sourceDir)) { + logger.error("源目录不存在或不是目录: " + pagesOutputPath); + return; + } + + // 目标目录:在 excelOutputPath 下创建 pdf 子目录 + Path targetBaseDir = Paths.get(excelOutputPath); + Path targetPdfDir = targetBaseDir.resolve("pdf"); + + // 确保目标目录存在 + if (!Files.exists(targetPdfDir)) { + Files.createDirectories(targetPdfDir); + } + + // 遍历源目录中的所有 PDF 文件 + Files.walk(sourceDir) + .filter(path -> !Files.isDirectory(path)) + .filter(path -> path.toString().toLowerCase().endsWith(".pdf")) + .forEach(path -> { + try { + // 获取文件创建时间(注意:Linux/macOS 可能不支持 creationTime) + BasicFileAttributes attrs = Files.readAttributes(path, BasicFileAttributes.class); + FileTime creationTime = attrs.creationTime(); + Date fileCreationDate = new Date(creationTime.toMillis()); + + // 如果 creationTime 在某些系统上不可靠,可替换为 lastModifiedTime: + // Date fileCreationDate = new Date(Files.getLastModifiedTime(path).toMillis()); + + // 判断文件时间是否在指定范围内 + if (!fileCreationDate.before(start) && !fileCreationDate.after(end)) { + // 构建目标路径(保留相对结构,或直接放平?这里按原相对路径保留) + Path relativePath = sourceDir.relativize(path); + Path targetPath = targetPdfDir.resolve(relativePath); + + // 确保目标子目录存在 + Path targetParent = targetPath.getParent(); + if (targetParent != null && !Files.exists(targetParent)) { + Files.createDirectories(targetParent); + } + + // 复制文件 + Files.copy(path, targetPath, StandardCopyOption.REPLACE_EXISTING); + logger.info("已复制文件: " + path + " -> " + targetPath); + } + } catch (IOException e) { + logger.error("处理文件时出错: " + path + " - " + e.getMessage()); + } + }); + + logger.info("PDF 文件复制完成,目标目录: " + targetPdfDir.toAbsolutePath()); + + } catch (ParseException e) { + logger.error("时间格式解析错误,请确保使用格式: " + DATE_FORMAT); + e.printStackTrace(); + } catch (IOException e) { + logger.error("IO 错误: " + e.getMessage()); + e.printStackTrace(); + } + } + + +} diff --git a/dsp/src/main/java/com/jsc/dsp/utils/AutoPatroller.java b/dsp/src/main/java/com/jsc/dsp/utils/AutoPatroller.java deleted file mode 100644 index 5cfe553..0000000 --- a/dsp/src/main/java/com/jsc/dsp/utils/AutoPatroller.java +++ /dev/null @@ -1,135 +0,0 @@ -package com.jsc.dsp.utils; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.jsc.dsp.model.SearchAggregation; -import com.jsc.dsp.model.TargetSocial; -import com.jsc.dsp.model.TargetWebsite; - -import java.util.Date; -import java.util.Map; -import java.util.logging.Logger; - -import static com.jsc.dsp.utils.EsUtils.performAggregationSearch; - -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.support.LogIfLevelEnabled; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -@Component -public class AutoPatroller { - - private final Logger logger = Logger.getLogger(this.getClass().getName()); - - long updateInterval = 1500L; - - @Value("${custom.websiteQueryAPI}") - String websiteQueryAPI; - - @Value("${custom.websiteUpdateAPI}") - String websiteUpdateAPI; - - @Value("${custom.socialQueryAPI}") - String socialQueryAPI; - - @Value("${custom.socialUpdateAPI}") - String socialUpdateAPI; - - @Value("${es.ip}") - String esIp; - - @Value("${es.port}") - Integer esPort; - - @Value("${es.username}") - String esUsername; - - @Value("${es.password}") - String esPassword; - - @Scheduled(cron = "0 45 0/3 * * *") - public void checkNewsSite() { - checkWebsite("es_sitename", "es_carriertype", "news"); - } - - @Scheduled(cron = "0 15 1/3 * * *") - public void checkWechat() { - checkSocial("es_authors", "es_carriertype", "wechat", "5"); - } - - @Scheduled(cron = "0 0 2/4 * * *") - public void checkArticleSite() { - checkWebsite("es_sitename", "es_carriertype", "article"); - } - - public void checkWebsite(String aggFieldName, String queryFieldName, String queryFieldValue) { - try { - Map searchAggregationMap = performAggregationSearch( - esIp, esPort, esUsername, esPassword, aggFieldName, queryFieldName, queryFieldValue); - JSONObject dataObject = new JSONObject(); - dataObject.put("carrierType", queryFieldValue); - String rsp = HttpUtils.post(websiteQueryAPI, dataObject); - JSONObject rspObj = JSON.parseObject(rsp); - if (rspObj.getIntValue("code") == 200) { - JSONArray rspArr = rspObj.getJSONArray("content"); - for (Object obj : rspArr) { - TargetWebsite targetWebsite = JSONObject.parseObject(obj.toString(), TargetWebsite.class); - String siteName = targetWebsite.getSiteName(); - if (searchAggregationMap.containsKey(siteName)) { - SearchAggregation checkInfo = searchAggregationMap.get(siteName); - targetWebsite.setCheckTotalNum(checkInfo.getCount()); - targetWebsite.setCheckLastTime(checkInfo.getLastTime()); - targetWebsite.setCheckUpdateTime(new Date()); - String updateRsp = HttpUtils.post(websiteUpdateAPI, targetWebsite); - JSONObject updateRspObj = JSONObject.parseObject(updateRsp); - if (updateRspObj.getIntValue("code") != 200) { - logger.warning("更新站点【" + siteName + "】巡检信息失败"); - } - Thread.sleep(updateInterval); - } - } - } - } catch (Exception e) { - e.printStackTrace(); - } - logger.info("站点巡检完毕"); - } - - public void checkSocial(String aggFieldName, String queryFieldName, String queryFieldValue, String socialTypeCode) { - try { - Map searchAggregationMap = performAggregationSearch( - esIp, esPort, esUsername, esPassword, aggFieldName, queryFieldName, queryFieldValue); - TargetSocial postData = new TargetSocial(); - postData.setUserFlag("0"); - postData.setUserType(socialTypeCode); - String rsp = HttpUtils.post(socialQueryAPI, postData); - JSONObject rspObj = JSON.parseObject(rsp); - if (rspObj.getIntValue("code") == 200) { - JSONArray rspArr = rspObj.getJSONArray("content"); - for (Object obj : rspArr) { - TargetSocial targetSocial = JSONObject.parseObject(obj.toString(), TargetSocial.class); - String userName = targetSocial.getUserName(); - if (searchAggregationMap.containsKey(userName)) { - SearchAggregation checkInfo = searchAggregationMap.get(userName); - targetSocial.setCheckTotalNum(checkInfo.getCount()); - targetSocial.setCheckLastTime(checkInfo.getLastTime()); - targetSocial.setCheckUpdateTime(new Date()); - String updateRsp = HttpUtils.post(socialUpdateAPI, targetSocial); - JSONObject updateRspObj = JSONObject.parseObject(updateRsp); - if (updateRspObj.getIntValue("code") != 200) { - logger.warning("更新账号【" + userName + "】巡检信息失败"); - } - Thread.sleep(updateInterval); - } - } - } - } catch (Exception e) { - e.printStackTrace(); - } - logger.info("社交帐号巡检完毕"); - - } - -} diff --git a/dsp/src/main/java/com/jsc/dsp/utils/DBUtils.java b/dsp/src/main/java/com/jsc/dsp/utils/DBUtils.java deleted file mode 100644 index e5995d2..0000000 --- a/dsp/src/main/java/com/jsc/dsp/utils/DBUtils.java +++ /dev/null @@ -1,131 +0,0 @@ -package com.jsc.dsp.utils; - -import com.alibaba.fastjson.JSONArray; -import com.jsc.dsp.model.Indeximos; - -import java.io.File; -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.*; -import java.util.logging.Logger; - - -public class DBUtils { - - public static Connection conn = null; - - private static final List floatFields = Arrays.asList("es_doclength", "es_negativeProbability", "es_simrank"); - - private static final Logger logger = Logger.getLogger("com.jsc.dsp.utils.DBUtils"); - - public static Connection getConnection(String driver, String url, String user, String password) { - try { - Class.forName(driver); - return DriverManager.getConnection(url, user, password); - } catch (ClassNotFoundException | SQLException e) { - logger.warning("Cannot get DB connection!"); - logger.warning(e.getMessage()); - return null; - } - } - - private static Map getObjectMap(Indeximos object) { - Map resultMap = new HashMap<>(); - Field[] fields = object.getClass().getDeclaredFields(); - for (Field field : fields) { - String fieldName = field.getName(); - String firstLetter = fieldName.substring(0, 1).toUpperCase(); - String getter = "get" + firstLetter + fieldName.substring(1); - try { - Method method = object.getClass().getMethod(getter); - Object fieldValue = method.invoke(object); - resultMap.put(fieldName, fieldValue); - } catch (Exception e) { - e.printStackTrace(); - } - } - return resultMap; - } - - public static boolean insertIntoDB(String driver, String url, String user, String password, List objectList) { - if (conn == null) { - conn = getConnection(driver, url, user, password); - } - if (conn != null) { - try { - PreparedStatement pres = null; - for (Indeximos object : objectList) { - Map objectMap = getObjectMap(object); - Object[] keyObjects = objectMap.keySet().toArray(); - List keys = new ArrayList<>(); - List values = new ArrayList<>(); - for (Object ko : keyObjects) { - String key = ko.toString(); - keys.add(key); - Object value = objectMap.get(key); - if (floatFields.contains(key)) { - values.add(value.toString()); - } else { - if (value != null && value.toString().length() > 0) { - values.add("'" + value.toString().replace("'", "\\'") + "'"); - } else { - values.add("null"); - } - } - } - String sqlInsert = "REPLACE INTO indeximos(" + String.join(", ", keys) + ") VALUES(" - + String.join(", ", values) + ")"; - pres = conn.prepareStatement(sqlInsert); - pres.addBatch(); - } - if (pres != null) { - pres.executeBatch(); - pres.close(); - } - return true; - } catch (SQLException e) { - logger.warning("Fail to insert data to Database"); - logger.warning(e.getMessage()); - conn = getConnection(driver, url, user, password); - return false; - } - } else { - return false; - } - } - - - public static boolean hasField(Class clazz, String fieldName) { - try { - clazz.getDeclaredField(fieldName); - return true; - } catch (NoSuchFieldException e) { - return false; - } - } - - public static String getFieldType(Class clazz, String fieldName) { - try { - Field field = clazz.getDeclaredField(fieldName); - return field.getType().getName(); - } catch (NoSuchFieldException e) { - return ""; - } - } - - public static void main(String[] args) { - List objectList = JSONArray.parseArray(FileUtils.readContentFromFile( - "D:/data/local-storage/indeximos_1700030748332.json"), Indeximos.class); - insertIntoDB( - "com.mysql.cj.jdbc.Driver", - "jdbc:mysql://8.130.95.27:28089/dsp", - "root", - "passok123A", - objectList); - } - -} diff --git a/dsp/src/main/java/com/jsc/dsp/utils/DatabaseConnector.java b/dsp/src/main/java/com/jsc/dsp/utils/DatabaseConnector.java new file mode 100644 index 0000000..563299e --- /dev/null +++ b/dsp/src/main/java/com/jsc/dsp/utils/DatabaseConnector.java @@ -0,0 +1,165 @@ +package com.jsc.dsp.utils; + +import com.jsc.dsp.dao.EsDataNewsRepository; +import com.jsc.dsp.dao.IndeximosRepository; +import com.jsc.dsp.model.EsDataNewsView; +import com.jsc.dsp.model.Indeximos; +import org.apache.poi.ss.usermodel.*; +import org.apache.poi.xssf.usermodel.XSSFWorkbook; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; + +@Service +public class DatabaseConnector { + + @Resource + IndeximosRepository indeximosRepository; + + @Resource + EsDataNewsRepository esDataNewsRepository; + + @Value("${custom.excelOutputPath}") + String excelOutputPath; + + private final Logger logger = LoggerFactory.getLogger(this.getClass().getName()); + + public void insertIntoDB(List objectList) { + try { + indeximosRepository.saveAll(objectList); + } catch (Exception e) { + logger.warn("Fail to insert data to Database"); + logger.warn(e.getMessage()); + } + } + + + public boolean hasField(Class clazz, String fieldName) { + try { + clazz.getDeclaredField(fieldName); + return true; + } catch (NoSuchFieldException e) { + return false; + } + } + + public String getFieldType(Class clazz, String fieldName) { + try { + Field field = clazz.getDeclaredField(fieldName); + return field.getType().getName(); + } catch (NoSuchFieldException e) { + return ""; + } + } + + public void exportToXlsx(String startTime) { + try { + Path dirPath = Paths.get(excelOutputPath); + if (!Files.exists(dirPath)) { + Files.createDirectories(dirPath); + } + String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")); + String fileName = "data_news-" + timestamp + "-001.xlsx"; + Path filePath = dirPath.resolve(fileName); + + List esDataNewsViewList = esDataNewsRepository.findAllByEsLoadtimeAfter(startTime); + if (!esDataNewsViewList.isEmpty()) { + Field[] fields = esDataNewsViewList.get(0).getClass().getDeclaredFields(); + try (Workbook workbook = new XSSFWorkbook(); + ByteArrayOutputStream out = new ByteArrayOutputStream()) { + + Sheet sheet = workbook.createSheet("data"); + + // 创建表头 + Row headerRow = sheet.createRow(0); + CellStyle headerStyle = workbook.createCellStyle(); + headerStyle.setFillForegroundColor(IndexedColors.LIGHT_BLUE.getIndex()); + headerStyle.setFillPattern(FillPatternType.SOLID_FOREGROUND); + + for (int i = 0; i < fields.length; i++) { + Cell cell = headerRow.createCell(i); + String formField = formField(fields[i]); + cell.setCellValue(formField); + cell.setCellStyle(headerStyle); + } + // 填充数据 + int rowNum = 1; + for (EsDataNewsView item : esDataNewsViewList) { + if (item.getFile() == null || item.getFile().length() < 5) { + continue; + } else { + String fileFullPath = item.getFile(); + int i = fileFullPath.indexOf(File.separator); + item.setFile(fileFullPath.substring(i + 1)); + } + Row row = sheet.createRow(rowNum++); + logger.debug("导出excel第" + rowNum + "行"); + row.createCell(0).setCellValue(item.getEsSid()); + row.createCell(1).setCellValue(item.getEsAuthors()); + row.createCell(2).setCellValue(item.getEsCarriertype()); + row.createCell(3).setCellValue(item.getEsCatalog()); + row.createCell(4).setCellValue(item.getEsCollection()); + row.createCell(5).setCellValue(item.getEsDoclength()); + row.createCell(6).setCellValue(item.getEsLang()); + row.createCell(7).setCellValue(item.getEsLasttime()); + if (item.getEsLinks().length() > 10000) { + row.createCell(8).setCellValue(item.getEsLinks().substring(0, 10000)); + } else { + row.createCell(8).setCellValue(item.getEsLinks()); + } + row.createCell(9).setCellValue(item.getEsLoadtime()); + row.createCell(10).setCellValue(item.getEsSitename()); + row.createCell(11).setCellValue(item.getEsSrcname()); + row.createCell(12).setCellValue(item.getEsUrlcontent()); + row.createCell(13).setCellValue(item.getEsUrlcontentRaw()); + row.createCell(14).setCellValue(item.getEsUrlimage()); + row.createCell(15).setCellValue(item.getEsUrlname()); + row.createCell(16).setCellValue(item.getEsUrltime()); + row.createCell(17).setCellValue(item.getEsUrltitle()); + row.createCell(18).setCellValue(item.getEsUrltitleRaw()); + row.createCell(19).setCellValue(item.getEsAbstract()); + row.createCell(20).setCellValue(item.getEsKeywords()); + row.createCell(21).setCellValue(item.getFile()); + } + logger.info("完成excel数据写入,共" + rowNum + "行"); + + // 自动调整列宽 + for (int i = 0; i < fields.length; i++) { + sheet.autoSizeColumn(i); + } + + workbook.write(out); + + try (FileOutputStream fos = new FileOutputStream(filePath.toFile())) { + workbook.write(fos); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + logger.info("excel导出完成!"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private String formField(Field field) { + String fieldString = field.getName(); + return StringUtils.camelToSnake(fieldString); + } + +} diff --git a/dsp/src/main/java/com/jsc/dsp/utils/FTPConnector.java b/dsp/src/main/java/com/jsc/dsp/utils/FTPConnector.java new file mode 100644 index 0000000..c61011f --- /dev/null +++ b/dsp/src/main/java/com/jsc/dsp/utils/FTPConnector.java @@ -0,0 +1,108 @@ +package com.jsc.dsp.utils; + +import org.apache.commons.net.ftp.FTP; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPReply; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.io.InputStream; + +@Component +public class FTPConnector { + + Logger log = LoggerFactory.getLogger(this.getClass().getName()); + + @Value("${ftp.host}") + String host; + + @Value("${ftp.port}") + Integer port; + + @Value("${ftp.username}") + String username; + + @Value("${ftp.password}") + String password; + + @Value("${ftp.timeout}") + Integer timeout; + + public boolean uploadFile(InputStream inputStream, String remotePath) { + FTPClient ftpClient = new FTPClient(); + try { + // 连接 FTP 服务器 + ftpClient.connect(host, port); + ftpClient.login(username, password); + ftpClient.setConnectTimeout(timeout); + ftpClient.setSoTimeout(timeout); + + // 设置文件类型为二进制(避免文本模式损坏文件) + ftpClient.setFileType(FTP.BINARY_FILE_TYPE); + + // 启用被动模式(适用于 NAT/防火墙环境) + ftpClient.enterLocalPassiveMode(); + + // 检查登录是否成功 + if (!FTPReply.isPositiveCompletion(ftpClient.getReplyCode())) { + ftpClient.disconnect(); + log.error("FTP 登录失败"); + return false; + } + + // 创建目录(如果路径包含子目录) + createDirectories(ftpClient, remotePath); + + // 上传文件 + boolean success = ftpClient.storeFile(remotePath, inputStream); + if (success) { + log.info("文件上传成功: {}", remotePath); + } else { + log.error("FTP 上传失败,错误码: {}", ftpClient.getReplyCode()); + } + return success; + + } catch (IOException e) { + log.error("FTP 上传异常: {}", e.getMessage(), e); + return false; + } finally { + try { + if (inputStream != null) { + inputStream.close(); + } + if (ftpClient.isConnected()) { + ftpClient.logout(); + ftpClient.disconnect(); + } + } catch (IOException e) { + log.warn("关闭 FTP 连接时出错", e); + } + } + } + + /** + * 递归创建远程目录(如果路径中包含目录) + */ + private void createDirectories(FTPClient ftpClient, String remoteFilePath) throws IOException { + String[] pathParts = remoteFilePath.split("/"); + StringBuilder currentPath = new StringBuilder(); + + for (int i = 0; i < pathParts.length - 1; i++) { + if (!pathParts[i].isEmpty()) { + currentPath.append("/").append(pathParts[i]); + // 尝试切换目录,如果失败则创建 + if (!ftpClient.changeWorkingDirectory(currentPath.toString())) { + boolean made = ftpClient.makeDirectory(currentPath.toString()); + if (made) { + log.debug("创建 FTP 目录: {}", currentPath); + } + ftpClient.changeWorkingDirectory(currentPath.toString()); + } + } + } + } + +} diff --git a/dsp/src/main/java/com/jsc/dsp/utils/StringUtils.java b/dsp/src/main/java/com/jsc/dsp/utils/StringUtils.java index ff3873a..c830568 100644 --- a/dsp/src/main/java/com/jsc/dsp/utils/StringUtils.java +++ b/dsp/src/main/java/com/jsc/dsp/utils/StringUtils.java @@ -116,6 +116,29 @@ public class StringUtils { return wordList; } + public static String camelToSnake(String camel) { + if (camel == null || camel.isEmpty()) { + return camel; + } + StringBuilder result = new StringBuilder(); + result.append(Character.toLowerCase(camel.charAt(0))); + for (int i = 1; i < camel.length(); i++) { + char ch = camel.charAt(i); + if (Character.isUpperCase(ch)) { + // 如果前一个字符不是大写,或者后一个不是小写,则加下划线 + char prev = camel.charAt(i - 1); + if (!Character.isUpperCase(prev) || + (i + 1 < camel.length() && Character.isLowerCase(camel.charAt(i + 1)))) { + result.append('_'); + } + result.append(Character.toLowerCase(ch)); + } else { + result.append(ch); + } + } + return result.toString(); + } + public static void main(String[] args) { initFilterMap("http://39.98.151.140:28081/api/open/wordBank/queryAll"); } diff --git a/dsp/src/main/resources/application.yml b/dsp/src/main/resources/application.yml index 8700186..fe43a5a 100644 --- a/dsp/src/main/resources/application.yml +++ b/dsp/src/main/resources/application.yml @@ -1,18 +1,19 @@ server: port: 8084 + servlet: + context-path: /dsp spring: cloud: stream: kafka: binder: brokers: 47.113.231.200:9092 - zkNodes: 47.113.231.200:2181 auto-create-topics: true healthTimeout: 600 bindings: file_dl_pipeline_in: destination: stream-file-dl - group: file-dl + group: file-dl-test consumer: pollTimeout: 60 file_dl_pipeline_out: @@ -20,7 +21,7 @@ spring: content-type: text/plain protobuf_pipeline_in: destination: stream-protobuf - group: protobuf + group: protobuf-test consumer: pollTimeout: 60 protobuf_pipeline_out: @@ -28,7 +29,7 @@ spring: content-type: text/plain storage_pipeline_in: destination: stream-db - group: db + group: db-test consumer: pollTimeout: 60 storage_pipeline_out: @@ -43,38 +44,52 @@ spring: records: 10 interval: ms: 3600000 + datasource: + url: jdbc:mysql://47.113.231.200:28089/dsp?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true + username: root + password: passok123A + driver-class-name: com.mysql.cj.jdbc.Driver + + jpa: + database-platform: org.hibernate.dialect.MySQL8Dialect + show-sql: true + topics: stream-protobuf: com.jsc.dsp.service.ProtobufService stream-db: com.jsc.dsp.service.StorageService stream-file-dl: com.jsc.dsp.service.FileDlService -es: - ip: 8.130.95.27 - port: 28087 - username: elastic - password: passok123A - index: indeximos - type: default -ceph: - aws-access-key: JH8OF0D9ZJYYXBFYB5OD - aws-secret-key: FuptELjiPQOQNR6tPOVL777n3dGe3bZCDJphyiz0 - endpoint: 192.168.1.16:28090 -db: - driver: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://8.130.95.27:28089/dsp - user: root - password: passok123A +switch: + enable-storage-service: false + enable-file-dl-service: false + enable-protobuf-service: false + auto-export-and-upload: true + +ftp: + host: 144.34.185.108 + port: 21 + username: jsc-2b + password: 1234qwer% + timeout: 5000 + passive-mode: true + custom: dev-mode: false filter-words-query-api: http://47.115.228.133:28081/api/open/wordBank/queryAll filter-words-update-interval-ms: 3600000 - local-file-storage-path: E:/data/local-storage/ + local-file-storage-path: D:/data/local-storage/ proto_save_path: D:/data/spider_data/proto/ - transfer_backup_path: E:/data/transfer_backup/ - file_unzip_path: E:/html-full/ - keep_backup_file: E:/data/dbzq_backup/ - nginx_path: E:/OSC-3.0/app/osdp_board/html/ + transfer_backup_path: D:/data/transfer_backup/ + file_unzip_path: D:/html-full/ + keep_backup_file: D:/data/dbzq_backup/ + nginx_path: D:/OSC-3.0/app/osdp_board/html/ websiteQueryAPI: http://47.115.228.133:28081/api/open/target/website/queryAllInfo websiteUpdateAPI: http://47.115.228.133:28081/api/open/target/website/update socialQueryAPI: http://47.115.228.133:28081/api/open/target/social/queryAll?sortBy=id&shuffleResult=false socialUpdateAPI: http://47.115.228.133:28081/api/open/target/social/update + websiteWhiteList: 能源界(国内信息);能源界(国际信息);中国能源新闻网;新华能源网;中国能源网(能源战略);中国农网(三农要闻);中国经济网(三农经济);中华粮网(粮食安全);美国之音(中国版面);美国之音(中美关系);美国之音(台海两岸版面);美国之音(港澳版面);看中国(看大陆版面);看中国(重点新闻);德国之声(中国报道);纽约时报中文网(中国版面);大纪元(一周大陆新闻);EnergyNow;联合国粮农组织;路透社(中国版面) + excelOutputPath: D:/data/output/upload + backupFilePath: D:/data/output/backup + pagesOutputPath: D:/data/output/pdf + ftpUploadPath: /home/jsc-2b + exportTaskSchedule: "0 0 12 * * 1,3,5" \ No newline at end of file diff --git a/dsp/src/main/resources/logback-spring.xml b/dsp/src/main/resources/logback-spring.xml index f06cdf8..3de274f 100644 --- a/dsp/src/main/resources/logback-spring.xml +++ b/dsp/src/main/resources/logback-spring.xml @@ -8,7 +8,7 @@ logback - + @@ -27,7 +27,7 @@ ${CONSOLE_LOG_PATTERN} - GBK + UTF-8 diff --git a/research/pdf_downloader/decode-url-for-rodong-news.py b/research/pdf_downloader/decode-url-for-rodong-news.py new file mode 100644 index 0000000..cd2fee4 --- /dev/null +++ b/research/pdf_downloader/decode-url-for-rodong-news.py @@ -0,0 +1,171 @@ +import mysql.connector +import base64 +import urllib.parse +import re + +# === 数据库配置 === +DB_CONFIG = { + 'host': '47.113.231.200', + 'port': 28089, + 'user': 'root', + 'password': 'passok123A', + 'database': 'dsp', + 'charset': 'utf8mb4', +} + + +def decode_rodong_url(url): + """ + 从朝鲜劳动新闻URL中提取并Base64解码参数部分 + 示例输入: http://www.rodong.rep.kp/cn/index.php?MTJAMjAyNi0wMS0wNS0wMDJAMUAxQEAwQDNA== + 输出: '12@2026-01-05-002@1@1@@0@37@' 或 None(若无法解析) + """ + if not url or 'index.php?' not in url: + return None + + try: + # 方法1:使用 urllib.parse 解析 + parsed = urllib.parse.urlparse(url) + query = parsed.query + + # 如果 query 为空,尝试用正则兜底(应对非常规URL) + if not query: + match = re.search(r'index\.php\?([A-Za-z0-9+/=]+)', url) + if match: + query = match.group(1) + else: + return None + + # Base64 解码 + decoded_bytes = base64.b64decode(query) + decoded_str = decoded_bytes.decode('utf-8') + return decoded_str + + except Exception as e: + # 记录错误但不中断整体流程 + print(f" 解码失败 (URL: {url[:60]}...): {e}") + return None + + +def main(): + try: + # 连接数据库 + conn = mysql.connector.connect(**DB_CONFIG) + cursor = conn.cursor(buffered=True) + + # 查询所有需要处理的记录(只处理包含 index.php? 的 URL) + print("正在查询待处理的新闻记录...") + cursor.execute(""" + SELECT es_sid, es_urlname + FROM indeximos + WHERE es_sitename = '劳动新闻' + AND (es_tags IS NULL OR es_tags = '') + """) + records = cursor.fetchall() + + if not records: + print("没有找到需要处理的记录。") + return + + print(f"共找到 {len(records)} 条待处理记录。") + + updated_count = 0 + for i, (es_sid, es_urlname) in enumerate(records, 1): + print(f"[{i}/{len(records)}] 处理 ID={es_sid} ...", end=" ") + + decoded = decode_rodong_url(es_urlname) + if decoded is not None: + # 更新 es_tags 字段 + update_query = "UPDATE indeximos SET es_tags = %s WHERE es_sid = %s" + cursor.execute(update_query, (decoded, es_sid)) + conn.commit() + updated_count += 1 + print(f"成功 → {decoded[:50]}{'...' if len(decoded) > 50 else ''}") + else: + print("跳过(无法解码)") + + print(f"\n✅ 完成!共更新 {updated_count} 条记录。") + + except mysql.connector.Error as db_err: + print(f"❌ 数据库错误: {db_err}") + except Exception as e: + print(f"❌ 脚本执行出错: {e}") + finally: + if 'cursor' in locals(): + cursor.close() + if 'conn' in locals() and conn.is_connected(): + conn.close() + print("数据库连接已关闭。") + + +if __name__ == "__main__": + + # 动态替换 SQL 中的表名(注意:表名不能用参数化,需手动拼接,但确保安全) + # 为安全起见,可加校验 + if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', 'indeximos'): + raise ValueError("表名包含非法字符!") + + # 临时替换函数中的表名(更优雅的方式是传参,此处为简洁) + import sys + + module = sys.modules[__name__] + + + # 修改 main 函数中的 SQL(通过字符串替换) + # 实际建议:将表名作为全局变量或参数传递 + + # 更简单做法:在 main() 上方定义 TABLE_NAME,然后在 SQL 中直接引用 + # 我们重写 main 函数内部逻辑以支持变量表名 + + # 重新定义带表名参数的主逻辑 + def main_with_table(table_name): + try: + conn = mysql.connector.connect(**DB_CONFIG) + cursor = conn.cursor(buffered=True) + + # 查询 + query_sql = f""" + SELECT es_sid, es_urlname + FROM `{table_name}` + WHERE es_urlname LIKE '%index.php?%' + AND (es_tags IS NULL OR es_tags = '') + """ + cursor.execute(query_sql) + records = cursor.fetchall() + + if not records: + print("没有找到需要处理的记录。") + return + + print(f"共找到 {len(records)} 条待处理记录。") + + updated_count = 0 + for i, (es_sid, es_urlname) in enumerate(records, 1): + print(f"[{i}/{len(records)}] 处理 ID={es_sid} ...", end=" ") + + decoded = decode_rodong_url(es_urlname) + if decoded is not None: + update_sql = f"UPDATE `{table_name}` SET es_tags = %s WHERE es_sid = %s" + cursor.execute(update_sql, (decoded, es_sid)) + conn.commit() + updated_count += 1 + print(f"成功 → {decoded[:50]}{'...' if len(decoded) > 50 else ''}") + else: + print("跳过(无法解码)") + + print(f"\n✅ 完成!共更新 {updated_count} 条记录。") + + except mysql.connector.Error as db_err: + print(f"❌ 数据库错误: {db_err}") + except Exception as e: + print(f"❌ 脚本执行出错: {e}") + finally: + if 'cursor' in locals(): + cursor.close() + if 'conn' in locals() and conn.is_connected(): + conn.close() + print("数据库连接已关闭。") + + + # 执行 + main_with_table('indeximos') diff --git a/research/pdf_downloader/save-page-with-selenium.py b/research/pdf_downloader/save-page-with-selenium.py new file mode 100644 index 0000000..eea8e43 --- /dev/null +++ b/research/pdf_downloader/save-page-with-selenium.py @@ -0,0 +1,348 @@ +import logging +import os +import queue +import threading +import time +from datetime import datetime +import random + +import pymysql +from tqdm import tqdm + +from save_page_as_pdf import PDFSaver +from save_remote_as_mhtml import RemoteMHTMLSaver +from save_page_as_mhtml import MHTMLSaver +import tldextract + +# 配置日志 +from save_remote_as_pdf import RemotePDFSaver + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), + logging.FileHandler('pdf_downloader.log') + ] +) +logger = logging.getLogger(__name__) + +# =============== MySQL 配置 =============== +MYSQL_CONFIG = { + 'host': '47.113.231.200', + 'port': 28089, + 'user': 'root', + 'password': 'passok123A', + 'database': 'dsp', + 'charset': 'utf8mb4', + 'autocommit': False # 手动控制事务 +} +# ========================================= + +# 配置参数 +BATCH_SIZE = 500 +MAX_WORKERS = 1 +TIMEOUT = 10 +PDF_OUTPUT_DIR = 'D:/data/output/pdf' +MIN_PDF_SIZE = 5 * 1024 # 80KB + +MHTML_OUTPUT_DIR = 'D:/data/output/mhtml' +os.makedirs(PDF_OUTPUT_DIR, exist_ok=True) + +running = True +running_interval_seconds = 10 + +skip_host_name = [ + 'epochtimes.com', + 'secretchina.com', + # 'rodong.rep.kp', + # 'kcna.kp' +] + + +class PDFDownloader: + def __init__(self): + self.db_lock = threading.Lock() + self.db_connection = None + self.task_queue = queue.Queue(maxsize=MAX_WORKERS * 3) + self.processed_count = 0 + self.success_count = 0 + self.fail_count = 0 + self.small_file_count = 0 # 新增:统计小文件数量 + self.last_loadtime = self.get_last_loadtime() + self.total_rows = self.get_total_rows() + self.start_time = time.time() + self.skip_hosts = [] + self.local_handler = None + self.remote_handler = None + + # 替换 MYSQL_CONFIG 中的连接方式 + def get_db_connection(self): + self.db_connection = pymysql.connect( + host=MYSQL_CONFIG['host'], + port=MYSQL_CONFIG['port'], + user=MYSQL_CONFIG['user'], + password=MYSQL_CONFIG['password'], + database=MYSQL_CONFIG['database'], + charset='utf8mb4', + autocommit=False + ) + + def get_total_rows(self): + """获取总记录数""" + if self.db_connection is None: + self.get_db_connection() + cursor = self.db_connection.cursor() + cursor.execute( + "SELECT COUNT(*) FROM indeximos " + "WHERE (es_video IS NULL OR es_video IN ('-1')) " + "AND es_loadtime > %s", self.last_loadtime + ) + return cursor.fetchone()[0] + + def get_last_loadtime(self): + """获取上次导出数据的时间""" + if self.db_connection is None: + self.get_db_connection() + cursor = self.db_connection.cursor() + cursor.execute( + "SELECT config_value FROM config " + "WHERE config_name = 'last_loadtime' " + ) + return cursor.fetchone()[0] + + def use_remote_selenium(self, url): + for host in skip_host_name: + if host in url: + return True + return False + + def format_pdf_filename(self, row): + """格式化PDF文件名""" + es_urltitle = row[2] or 'untitled' + es_urltime = str(row[3]) or '19700101_000000' + es_sitename = row[4] or 'anonymous' + + def clean_filename(text): + if not text: + return '' + invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*'] + for char in invalid_chars: + text = text.replace(char, '_') + return text.strip()[:100] + + try: + dt = datetime.strptime(es_urltime, '%Y-%m-%d %H:%M:%S') + es_urltime_fix = dt.strftime('%Y%m%d_%H%M%S') + except: + es_urltime_fix = '19700101_000000' + + filename = f"{clean_filename(es_urltitle)}_{es_urltime_fix}_{es_sitename}.pdf" + return os.path.join(PDF_OUTPUT_DIR, filename) + + def format_mhtml_filename(self, row): + """格式化PDF文件名""" + es_urltitle = row[2] or 'untitled' + es_urltime = str(row[3]) or '19700101_000000' + es_sitename = row[4] or 'anonymous' + + def clean_filename(text): + if not text: + return '' + invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*'] + for char in invalid_chars: + text = text.replace(char, '_') + return text.strip()[:100] + + try: + dt = datetime.strptime(es_urltime, '%Y-%m-%d %H:%M:%S') + es_urltime_fix = dt.strftime('%Y%m%d_%H%M%S') + except: + es_urltime_fix = '19700101_000000' + + filename = f"{clean_filename(es_urltitle)}_{es_urltime_fix}_{es_sitename}.mhtml" + return os.path.join(PDF_OUTPUT_DIR, filename) + + def fetch_data_batch(self, offset): + """分页获取数据""" + if self.db_connection is None: + self.get_db_connection() + cursor = self.db_connection.cursor() + cursor.execute( + "SELECT es_sid, es_urlname, es_urltitle, es_urltime, es_sitename, es_authors FROM indeximos " + "WHERE (es_video IS NULL OR es_video IN ('-1')) " + "AND es_loadtime > %s " + "ORDER BY es_urltime LIMIT %s OFFSET %s", + (self.last_loadtime, BATCH_SIZE, offset) + ) + return cursor.fetchall() + + def update_file_status(self, es_sid, status, retry=3): + """更新数据库状态""" + for attempt in range(retry): + try: + with self.db_lock: + if self.db_connection is None: + self.get_db_connection() + cursor = self.db_connection.cursor() + cursor.execute( + "UPDATE indeximos SET es_video = %s WHERE es_sid = %s", + (status, es_sid)) + self.db_connection.commit() + return True + except Exception as e: + if attempt == retry - 1: + logger.error(f"更新数据库失败(es_sid={es_sid}): {e}") + return False + time.sleep(1) + + def extract_main_domain(self, url): + extracted = tldextract.extract(url) + # 组合注册域名(主域名) + main_domain = f"{extracted.domain}.{extracted.suffix}" + return main_domain + + def download_worker(self): + """工作线程函数""" + while True: + try: + task = self.task_queue.get(timeout=1) + if task is None: + break + + row = task + url = row[1] + if self.extract_main_domain(url) in self.skip_hosts: + self.small_file_count += 1 + self.processed_count += 1 + self.task_queue.task_done() + print(f"小文件规避,暂时跳过URL:{url}") + continue + output_file = self.format_pdf_filename(row) # 获取格式化后的文件名 + + try: + os.makedirs(os.path.dirname(output_file), exist_ok=True) + + # 调用下载函数 + if self.use_remote_selenium(url): + self.processed_count += 1 + self.task_queue.task_done() + continue + # if self.remote_handler is None: + # self.remote_handler = RemotePDFSaver() + # success = self.remote_handler.save_as_pdf( + # url=url, + # output_path=output_file, + # timeout=TIMEOUT + # ) + else: + if self.local_handler is None: + self.local_handler = PDFSaver(headless=False) + success = self.local_handler.save_as_pdf( + url=url, + output_path=output_file, + timeout=TIMEOUT, + wait_time=5 + ) + + # 验证下载结果 + if success and os.path.exists(output_file): + file_size = os.path.getsize(output_file) + + if file_size >= MIN_PDF_SIZE: # 文件大小合格 + self.update_file_status(row[0], output_file) + self.success_count += 1 + else: # 文件太小 + self.update_file_status(row[0], '-2') + self.small_file_count += 1 + logger.warning(f"文件过小({file_size}字节): {output_file}") + try: + os.remove(output_file) + self.skip_hosts.append(self.extract_main_domain(url)) + except: + pass + else: # 下载失败 + self.update_file_status(row[0], '0') + self.fail_count += 1 + if os.path.exists(output_file): + try: + os.remove(output_file) + except: + pass + + except Exception as e: + logger.error(f"下载出现异常(es_sid={row[0]}, url={url}): {str(e)}") + self.update_file_status(row[0], '-1') + self.fail_count += 1 + + self.processed_count += 1 + self.task_queue.task_done() + + except queue.Empty: + continue + + def run(self): + """启动下载任务""" + threads = [] + + # 创建工作线程 + for _ in range(MAX_WORKERS): + t = threading.Thread(target=self.download_worker) + t.start() + threads.append(t) + + # 使用进度条显示进度 + with tqdm(total=self.total_rows, desc="处理进度", unit="条") as pbar: + offset = 0 + while True: + batch = self.fetch_data_batch(offset) + if not batch: + break + batch_list = list(batch) + random.shuffle(batch_list) + batch = tuple(batch_list) + for row in batch: + self.task_queue.put(row) + + pbar.update(len(batch)) + pbar.set_postfix({ + '成功': self.success_count, + '失败': self.fail_count, + '小文件': self.small_file_count, + '速度': f"{self.processed_count / (time.time() - self.start_time):.1f}条/秒" + }) + + offset += BATCH_SIZE + + self.task_queue.join() + + for _ in range(MAX_WORKERS): + self.task_queue.put(None) + + for t in threads: + t.join() + + total_time = time.time() - self.start_time + print(f"\n处理完成! 总计: {self.total_rows}条") + print(f"成功: {self.success_count}条, 失败: {self.fail_count}条, 小文件: {self.small_file_count}条") + print(f"总耗时: {total_time:.2f}秒, 平均速度: {self.total_rows / total_time:.2f}条/秒") + + def terminate(self): + if self.local_handler: + self.local_handler.quit() + if self.remote_handler: + self.remote_handler.quit() + self.db_connection.close() + + +if __name__ == "__main__": + while running: + try: + print(f"开始处理,总记录数: {PDFDownloader().get_total_rows()}") + downloader = PDFDownloader() + downloader.run() + print(f"运行完成,暂停{running_interval_seconds}秒后开始下一次运行...") + downloader.terminate() + time.sleep(running_interval_seconds) + except Exception as e: + print(repr(e)) diff --git a/research/pdf_downloader/save_page_as_mhtml.py b/research/pdf_downloader/save_page_as_mhtml.py new file mode 100644 index 0000000..21607c3 --- /dev/null +++ b/research/pdf_downloader/save_page_as_mhtml.py @@ -0,0 +1,141 @@ +import logging +import os +import time +from urllib.parse import urlparse + +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service as ChromeService + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), + logging.FileHandler('mhtml_saver.log', encoding='utf-8') + ] +) +logger = logging.getLogger(__name__) + + +class MHTMLSaver: + def __init__(self, headless=True): + logger.info("正在初始化 Chrome WebDriver(自动匹配版本)...") + service = ChromeService(executable_path="C:/Program Files/Python38/chromedriver.exe") + user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.3650.75 Safari/537.36 Edg/143.0.3650.75" + + # Chrome 选项 + chrome_options = Options() + chrome_options.add_argument('--headless=new') + chrome_options.add_argument('--disable-gpu') + chrome_options.add_argument('--no-sandbox') + chrome_options.add_argument('--disable-dev-shm-usage') + chrome_options.add_argument(f'--user-agent={user_agent}') + chrome_options.add_argument('--save-page-as-mhtml') # 启用 MHTML 支持 + chrome_options.add_experimental_option('excludeSwitches', ['enable-automation']) + chrome_options.add_experimental_option('useAutomationExtension', False) + chrome_options.add_argument('--lang=zh-CN') + chrome_options.add_experimental_option('prefs', { + 'intl.accept_languages': 'zh-CN,zh,en' + }) + # 或启动时指定(部分版本支持) + chrome_options.add_argument('--window-size=1920,1080') + + # 隐藏 webdriver 特征 + chrome_options.add_argument("--disable-blink-features=AutomationControlled") + # 隐藏 "navigator.webdriver" + chrome_options.add_experimental_option('excludeSwitches', ['enable-automation']) + chrome_options.add_experimental_option('useAutomationExtension', False) + + self.driver = webdriver.Chrome(service=service, options=chrome_options) + + def save_as_mhtml(self, url, output_path=None, timeout=30, wait_time=5): + """ + 将网页保存为 MHTML 文件 + :param url: 目标网页 URL + :param output_path: 输出路径(.mhtml) + :param timeout: 页面加载超时(秒) + :param wait_time: 页面加载后等待时间(秒),用于动态内容渲染 + :return: 保存的文件绝对路径 + """ + if output_path is None: + parsed = urlparse(url) + domain = parsed.netloc.replace('www.', '').split('.')[0] or 'page' + output_path = f"{domain}.mhtml" + + if not output_path.lower().endswith('.mhtml'): + output_path += '.mhtml' + + try: + # 设置超时 + self.driver.set_page_load_timeout(timeout) + + # 启动后注入脚本(双重保险) + self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', { + 'source': ''' + delete navigator.__proto__.webdriver; + window.navigator.permissions.query = (parameters) => { + return parameters.name === 'notifications' ? + Promise.resolve({ state: Notification.permission }) : + originalQuery(parameters); + }; + ''' + }) + # 在 driver.get() 之后设置窗口大小 + + logger.info(f"正在加载页面: {url}") + self.driver.get(url) + self.driver.set_window_size(1920, 1080) + + # 等待页面动态内容加载(可调整) + logger.info(f"等待 {wait_time} 秒以确保页面完全渲染...") + time.sleep(wait_time) + + # ✅ 关键:调用 CDP 命令捕获 MHTML + logger.info("正在生成 MHTML 快照...") + result = self.driver.execute_cdp_cmd('Page.captureSnapshot', {'format': 'mhtml'}) + + # ✅ result['data'] 是 Base64 编码的 MHTML 文本(实际是纯字符串) + mhtml_content = result['data'] + + # ✅ 以文本模式写入(UTF-8) + with open(output_path, 'w', encoding='utf-8', newline='') as f: + f.write(mhtml_content) + + # 验证文件 + file_size = os.path.getsize(output_path) + if file_size == 0: + raise RuntimeError("生成了空文件") + + logger.info(f"✅ MHTML 保存成功: {os.path.abspath(output_path)} (大小: {file_size} 字节)") + return os.path.abspath(output_path) + + except Exception as e: + logger.error(f"❌ 保存失败: {e}") + raise + + def quit(self): + if self.driver: + self.driver.quit() + logger.info("浏览器已关闭") + + +# ===== 测试入口 ===== +if __name__ == "__main__": + # 示例 URL(可替换为你自己的) + test_url = "https://cn.ultraiso.net/jiaocheng/ke-lu-guang-pan.html" + + saver = MHTMLSaver(headless=True) + try: + output_file = saver.save_as_mhtml( + url=test_url, + output_path="example.mhtml", + timeout=30, + wait_time=5 + ) + print(f"\n🎉 成功保存 MHTML 文件: {output_file}") + except Exception as e: + print(f"\n💥 保存失败: {e}") + finally: + saver.quit() diff --git a/research/pdf_downloader/save_page_as_pdf.py b/research/pdf_downloader/save_page_as_pdf.py new file mode 100644 index 0000000..6a01fb9 --- /dev/null +++ b/research/pdf_downloader/save_page_as_pdf.py @@ -0,0 +1,145 @@ +import base64 +import logging +import os +import time +from urllib.parse import urlparse + +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service as ChromeService + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), + logging.FileHandler('pdf_saver.log', encoding='utf-8') + ] +) +logger = logging.getLogger(__name__) + + +class PDFSaver: + def __init__(self, headless=True): + logger.info("正在初始化 Chrome WebDriver(自动匹配版本)...") + service = ChromeService(executable_path="D:/chromedriver.exe") + user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.3650.75 Safari/537.36 Edg/143.0.3650.75" + + # Chrome 选项 + chrome_options = Options() + if headless: + chrome_options.add_argument('--headless=new') + chrome_options.add_argument('--disable-gpu') + chrome_options.add_argument('--no-sandbox') + chrome_options.add_argument('--disable-dev-shm-usage') + chrome_options.add_argument(f'--user-agent={user_agent}') + chrome_options.add_experimental_option('excludeSwitches', ['enable-automation']) + chrome_options.add_experimental_option('useAutomationExtension', False) + chrome_options.add_argument('--lang=zh-CN') + chrome_options.add_experimental_option('prefs', { + 'intl.accept_languages': 'zh-CN,zh,en' + }) + chrome_options.add_argument('--window-size=1920,1080') + chrome_options.add_argument("--disable-blink-features=AutomationControlled") + chrome_options.page_load_strategy = 'eager' + + # 注意:PDF 打印不需要 --save-page-as-mhtml + self.driver = webdriver.Chrome(service=service, options=chrome_options) + + def save_as_pdf(self, url, output_path=None, timeout=30, wait_time=5, print_options=None): + """ + 将网页保存为 PDF 文件 + :param url: 目标网页 URL + :param output_path: 输出路径(.pdf) + :param timeout: 页面加载超时(秒) + :param wait_time: 页面加载后等待时间(秒),用于动态内容渲染 + :param print_options: PDF 打印选项(可选),参考 https://chromedevtools.github.io/devtools-protocol/tot/Page/#method-printToPDF + :return: 保存的文件绝对路径 + """ + if output_path is None: + parsed = urlparse(url) + domain = parsed.netloc.replace('www.', '').split('.')[0] or 'page' + output_path = f"{domain}.pdf" + + if not output_path.lower().endswith('.pdf'): + output_path += '.pdf' + + # 默认打印选项(可按需调整) + default_print_options = { + 'landscape': False, + 'displayHeaderFooter': False, + 'printBackground': True, + 'preferCSSPageSize': True, + 'paperWidth': 8.27, # A4 宽(英寸) + 'paperHeight': 11.69, # A4 高(英寸) + } + if print_options: + default_print_options.update(print_options) + + try: + self.driver.set_page_load_timeout(timeout) + + # 隐藏自动化特征 + self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', { + 'source': ''' + delete navigator.__proto__.webdriver; + window.navigator.permissions.query = (parameters) => { + return parameters.name === 'notifications' ? + Promise.resolve({ state: Notification.permission }) : + originalQuery(parameters); + }; + const originalQuery = window.navigator.permissions.query; + ''' + }) + + logger.info(f"正在加载页面: {url}") + self.driver.get(url) + self.driver.set_window_size(1920, 1080) + + logger.info(f"等待 {wait_time} 秒以确保页面完全渲染...") + time.sleep(wait_time) + + logger.info("正在生成 PDF...") + result = self.driver.execute_cdp_cmd('Page.printToPDF', default_print_options) + + # result['data'] 是 Base64 编码的 PDF + pdf_data = base64.b64decode(result['data']) + + with open(output_path, 'wb') as f: + f.write(pdf_data) + + file_size = os.path.getsize(output_path) + if file_size == 0: + raise RuntimeError("生成了空文件") + + logger.info(f"✅ PDF 保存成功: {os.path.abspath(output_path)} (大小: {file_size} 字节)") + return os.path.abspath(output_path) + + except Exception as e: + logger.error(f"❌ 保存失败: {e}") + raise + + def quit(self): + if self.driver: + self.driver.quit() + logger.info("浏览器已关闭") + + +# ===== 测试入口 ===== +if __name__ == "__main__": + test_url = "https://cn.ultraiso.net/jiaocheng/ke-lu-guang-pan.html" + + saver = PDFSaver(headless=True) + try: + output_file = saver.save_as_pdf( + url=test_url, + output_path="example.pdf", + timeout=30, + wait_time=5 + ) + print(f"\n🎉 成功保存 PDF 文件: {output_file}") + except Exception as e: + print(f"\n💥 保存失败: {e}") + finally: + saver.quit() diff --git a/research/pdf_downloader/save_remote_as_mhtml.py b/research/pdf_downloader/save_remote_as_mhtml.py new file mode 100644 index 0000000..f59fb44 --- /dev/null +++ b/research/pdf_downloader/save_remote_as_mhtml.py @@ -0,0 +1,190 @@ +import logging +import os +import time +from urllib.parse import urlparse + +from selenium import webdriver +from selenium.common.exceptions import ( + WebDriverException, + TimeoutException, + SessionNotCreatedException, + InvalidSessionIdException +) +from selenium.webdriver.chrome.options import Options + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class RemoteMHTMLSaver: + def __init__( + self, + remote_url="http://144.34.185.108:28098/wd/hub", + headless=True, + max_retries=3, + retry_delay=2 + ): + """ + 初始化远程 MHTML 保存器(支持自动重建 session) + :param remote_url: 远程 Selenium 地址 + :param headless: 是否无头 + :param max_retries: 单次操作最大重试次数 + :param retry_delay: 重试前等待时间(秒) + """ + self.remote_url = remote_url + self.headless = headless + self.max_retries = max_retries + self.retry_delay = retry_delay + self.driver = None + self._init_driver() + + def _build_chrome_options(self): + """构建 Chrome 选项(可复用)""" + chrome_options = Options() + if self.headless: + chrome_options.add_argument('--headless=new') + chrome_options.add_argument('--no-sandbox') + chrome_options.add_argument('--disable-dev-shm-usage') + chrome_options.add_argument('--disable-gpu') + chrome_options.add_argument('--window-size=1920,1080') + chrome_options.add_argument( + "--user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.3650.75 Safari/537.36" + ) + chrome_options.add_experimental_option('excludeSwitches', ['enable-automation']) + chrome_options.add_experimental_option('useAutomationExtension', False) + return chrome_options + + def _init_driver(self): + """初始化或重新初始化 WebDriver""" + if self.driver: + try: + self.driver.quit() + except Exception: + pass # 忽略关闭失败 + + logger.info(f"正在创建新的远程 WebDriver 会话: {self.remote_url}") + for attempt in range(3): + try: + self.driver = webdriver.Remote( + command_executor=self.remote_url, + options=self._build_chrome_options() + ) + # 注入反检测脚本 + self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', { + 'source': ''' + delete navigator.__proto__.webdriver; + window.chrome = { runtime: {} }; + Object.defineProperty(navigator, 'languages', { + get: () => ['zh-CN', 'zh', 'en'] + }); + ''' + }) + logger.info("✅ 远程 WebDriver 会话创建成功") + return + except Exception as e: + logger.warning(f"创建 WebDriver 失败 (尝试 {attempt + 1}/3): {e}") + if attempt < 2: + time.sleep(2) + else: + raise RuntimeError(f"无法连接到远程 Selenium 服务: {e}") + + def save_as_mhtml(self, url, output_path=None, timeout=30, wait_time=5): + """ + 保存网页为 MHTML,支持自动重试和 session 重建 + """ + if output_path is None: + domain = urlparse(url).netloc.replace('www.', '').split('.')[0] or 'page' + output_path = f"{domain}.mhtml" + if not output_path.lower().endswith('.mhtml'): + output_path += '.mhtml' + + last_exception = None + + for retry in range(self.max_retries + 1): + try: + # 检查 driver 是否有效 + if not self.driver: + self._init_driver() + + self.driver.set_page_load_timeout(timeout) + logger.info(f"[{retry + 1}/{self.max_retries + 1}] 加载页面: {url}") + self.driver.get(url) + time.sleep(wait_time) + + logger.info("生成 MHTML 快照...") + result = self.driver.execute_cdp_cmd('Page.captureSnapshot', {'format': 'mhtml'}) + mhtml_content = result['data'] + + # 写入本地文件 + with open(output_path, 'w', encoding='utf-8', newline='') as f: + f.write(mhtml_content) + + file_size = os.path.getsize(output_path) + if file_size == 0: + raise RuntimeError("生成了空文件") + + logger.info(f"✅ 保存成功: {os.path.abspath(output_path)} ({file_size} 字节)") + return os.path.abspath(output_path) + + except (WebDriverException, InvalidSessionIdException, SessionNotCreatedException) as e: + last_exception = e + logger.warning(f"WebDriver 异常 (retry {retry + 1}): {e}") + if retry < self.max_retries: + logger.info("正在重建 WebDriver 会话...") + self._init_driver() + time.sleep(self.retry_delay) + else: + logger.error("达到最大重试次数,放弃") + break + + except TimeoutException as e: + last_exception = e + logger.warning(f"页面加载超时 (retry {retry + 1}): {e}") + if retry < self.max_retries: + time.sleep(self.retry_delay) + else: + break + + except Exception as e: + last_exception = e + logger.error(f"未知错误 (retry {retry + 1}): {e}") + break # 非 WebDriver 错误,不重试 + + # 如果所有重试失败 + if os.path.exists(output_path): + try: + os.remove(output_path) + except OSError: + pass + + raise RuntimeError(f"保存失败({type(last_exception).__name__}): {last_exception}") + + def quit(self): + """显式关闭浏览器""" + if self.driver: + try: + self.driver.quit() + logger.info("WebDriver 会话已关闭") + except Exception: + pass + self.driver = None + + def __del__(self): + self.quit() + + +# ===== 测试 ===== +if __name__ == "__main__": + saver = RemoteMHTMLSaver( + remote_url="http://144.34.185.108:28098/wd/hub", # ← 替换为你的云服务器公网 IP + headless=True + ) + try: + saver.save_as_mhtml( + url="https://www.epochtimes.com/gb/25/12/22/n14660274.htm", + output_path="remote_example2.mhtml" + ) + except Exception as e: + print(f"❌ 失败: {e}") + + saver.quit() diff --git a/research/pdf_downloader/save_remote_as_pdf.py b/research/pdf_downloader/save_remote_as_pdf.py new file mode 100644 index 0000000..dac38e7 --- /dev/null +++ b/research/pdf_downloader/save_remote_as_pdf.py @@ -0,0 +1,201 @@ +import base64 +import logging +import os +import time +from urllib.parse import urlparse + +from selenium import webdriver +from selenium.common.exceptions import ( + WebDriverException, + TimeoutException, + SessionNotCreatedException, + InvalidSessionIdException +) +from selenium.webdriver.chrome.options import Options + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class RemotePDFSaver: + def __init__( + self, + remote_url="http://144.34.185.108:28098/wd/hub", + headless=True, + max_retries=3, + retry_delay=2, + print_options=None + ): + """ + 初始化远程 PDF 保存器(支持自动重建 session) + :param remote_url: 远程 Selenium 地址 + :param headless: 是否无头模式 + :param max_retries: 单次操作最大重试次数 + :param retry_delay: 重试前等待时间(秒) + :param print_options: PDF 打印选项(参考 DevTools Protocol) + """ + self.remote_url = remote_url + self.headless = headless + self.max_retries = max_retries + self.retry_delay = retry_delay + self.print_options = print_options or { + 'landscape': False, + 'displayHeaderFooter': False, + 'printBackground': True, + 'preferCSSPageSize': True, + 'paperWidth': 8.27, # A4 宽(英寸) + 'paperHeight': 11.69, # A4 高(英寸) + } + self.driver = None + self._init_driver() + + def _build_chrome_options(self): + """构建 Chrome 选项(可复用)""" + chrome_options = Options() + if self.headless: + chrome_options.add_argument('--headless=new') + chrome_options.add_argument('--no-sandbox') + chrome_options.add_argument('--disable-dev-shm-usage') + chrome_options.add_argument('--disable-gpu') + chrome_options.add_argument('--window-size=1920,1080') + chrome_options.add_argument( + "--user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.3650.75 Safari/537.36" + ) + chrome_options.add_experimental_option('excludeSwitches', ['enable-automation']) + chrome_options.add_experimental_option('useAutomationExtension', False) + return chrome_options + + def _init_driver(self): + """初始化或重新初始化 WebDriver""" + if self.driver: + try: + self.driver.quit() + except Exception: + pass # 忽略关闭失败 + + logger.info(f"正在创建新的远程 WebDriver 会话: {self.remote_url}") + for attempt in range(3): + try: + self.driver = webdriver.Remote( + command_executor=self.remote_url, + options=self._build_chrome_options() + ) + # 注入反检测脚本 + self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', { + 'source': ''' + delete navigator.__proto__.webdriver; + window.chrome = { runtime: {} }; + Object.defineProperty(navigator, 'languages', { + get: () => ['zh-CN', 'zh', 'en'] + }); + ''' + }) + logger.info("✅ 远程 WebDriver 会话创建成功") + return + except Exception as e: + logger.warning(f"创建 WebDriver 失败 (尝试 {attempt + 1}/3): {e}") + if attempt < 2: + time.sleep(2) + else: + raise RuntimeError(f"无法连接到远程 Selenium 服务: {e}") + + def save_as_pdf(self, url, output_path=None, timeout=30, wait_time=5): + """ + 保存网页为 PDF,支持自动重试和 session 重建 + """ + if output_path is None: + domain = urlparse(url).netloc.replace('www.', '').split('.')[0] or 'page' + output_path = f"{domain}.pdf" + if not output_path.lower().endswith('.pdf'): + output_path += '.pdf' + + last_exception = None + + for retry in range(self.max_retries + 1): + try: + # 检查 driver 是否有效 + if not self.driver: + self._init_driver() + + self.driver.set_page_load_timeout(timeout) + logger.info(f"[{retry + 1}/{self.max_retries + 1}] 加载页面: {url}") + self.driver.get(url) + time.sleep(wait_time) + + logger.info("生成 PDF...") + result = self.driver.execute_cdp_cmd('Page.printToPDF', self.print_options) + pdf_data = base64.b64decode(result['data']) + + # 写入本地 PDF 文件(二进制) + with open(output_path, 'wb') as f: + f.write(pdf_data) + + file_size = os.path.getsize(output_path) + if file_size == 0: + raise RuntimeError("生成了空文件") + + logger.info(f"✅ 保存成功: {os.path.abspath(output_path)} ({file_size} 字节)") + return os.path.abspath(output_path) + + except (WebDriverException, InvalidSessionIdException, SessionNotCreatedException) as e: + last_exception = e + logger.warning(f"WebDriver 异常 (retry {retry + 1}): {e}") + if retry < self.max_retries: + logger.info("正在重建 WebDriver 会话...") + self._init_driver() + time.sleep(self.retry_delay) + else: + logger.error("达到最大重试次数,放弃") + break + + except TimeoutException as e: + last_exception = e + logger.warning(f"页面加载超时 (retry {retry + 1}): {e}") + if retry < self.max_retries: + time.sleep(self.retry_delay) + else: + break + + except Exception as e: + last_exception = e + logger.error(f"未知错误 (retry {retry + 1}): {e}") + break # 非 WebDriver 错误,不重试 + + # 清理失败生成的空文件 + if os.path.exists(output_path): + try: + os.remove(output_path) + except OSError: + pass + + raise RuntimeError(f"保存失败({type(last_exception).__name__}): {last_exception}") + + def quit(self): + """显式关闭浏览器""" + if self.driver: + try: + self.driver.quit() + logger.info("WebDriver 会话已关闭") + except Exception: + pass + self.driver = None + + def __del__(self): + self.quit() + + +# ===== 测试 ===== +if __name__ == "__main__": + saver = RemotePDFSaver( + remote_url="http://144.34.185.108:28098/wd/hub", # ← 替换为你的云服务器公网 IP + headless=True + ) + try: + saver.save_as_pdf( + url="https://www.epochtimes.com/gb/25/12/22/n14660274.htm", + output_path="remote_example2.pdf" + ) + except Exception as e: + print(f"❌ 失败: {e}") + + saver.quit() diff --git a/research/pdf_downloader/set_raw_title_kcna.py b/research/pdf_downloader/set_raw_title_kcna.py new file mode 100644 index 0000000..2925987 --- /dev/null +++ b/research/pdf_downloader/set_raw_title_kcna.py @@ -0,0 +1,118 @@ +import pymysql +from typing import Dict, List, Tuple, Optional + +# ================== 配置区 ================== + +DB_CONFIG = { + 'host': '47.113.231.200', + 'port': 28089, + 'user': 'root', + 'password': 'passok123A', + 'database': 'dsp', + 'charset': 'utf8mb4', +} + +# 仅用于指定哪些 es_srcname 的记录需要处理(值可为空,因为不再做替换) +TARGET_SRCNAMES: List[str] = [ + "http://www.kcna.kp/cn/category/articles/q/5394b80bdae203fadef02522cfb578c0.kcmsf", + # 添加你需要处理的站点名 +] + + +# ================== 工具函数 ================== + +def get_suffix_32(url: str) -> Optional[str]: + """获取 URL 最后 32 个字符,不足则返回 None""" + if not url or len(url) < 32: + return None + return url[-32:] + + +def find_foreign_by_suffix(cursor, suffix: str, exclude_id: int) -> Optional[Tuple[str, str]]: + """ + 根据后缀查找外文记录(排除自身) + """ + query = """ + SELECT es_urltitle, es_urlcontent + FROM indeximos + WHERE + es_sid != %s + AND es_urlname IS NOT NULL + AND CHAR_LENGTH(es_urlname) >= 32 + AND RIGHT(es_urlname, 32) = %s + LIMIT 1 + """ + cursor.execute(query, (exclude_id, suffix)) + result = cursor.fetchone() + return result if result else None + + +def update_chinese_record(cursor, record_id: int, title: str, content: str): + """更新中文记录的 es_title 和 es_content""" + update_query = """ + UPDATE indeximos + SET es_title = %s, es_content = %s + WHERE es_sid = %s + """ + cursor.execute(update_query, (title, content, record_id)) + + +# ================== 主逻辑 ================== + +def main(): + if not TARGET_SRCNAMES: + print("⚠️ 未指定任何目标 es_srcname,程序退出。") + return + + conn = pymysql.connect(**DB_CONFIG) + cursor = conn.cursor() + + try: + # 获取所有目标站点的中文记录 + placeholders = ','.join(['%s'] * len(TARGET_SRCNAMES)) + query = f""" + SELECT es_sid, es_srcname, es_urlname + FROM indeximos + WHERE es_srcname IN ({placeholders}) + AND es_urlname IS NOT NULL + AND es_urlname != '' + """ + cursor.execute(query, TARGET_SRCNAMES) + records = cursor.fetchall() + total = len(records) + print(f"共加载 {total} 条来自 {TARGET_SRCNAMES} 的记录用于匹配...") + + updated_count = 0 + skipped_short = 0 + + for idx, (record_id, es_srcname, es_urlname) in enumerate(records, 1): + suffix = get_suffix_32(es_urlname) + if suffix is None: + skipped_short += 1 + continue + + foreign_data = find_foreign_by_suffix(cursor, suffix, record_id) + if foreign_data: + title, content = foreign_data + update_chinese_record(cursor, record_id, title, content) + updated_count += 1 + print(f"[{idx}/{total}] ✅ 已更新 ID={record_id} | src={es_srcname}") + + conn.commit() + print("\n" + "=" * 50) + print(f"✅ 匹配完成!") + print(f" - 成功更新: {updated_count} 条") + print(f" - 因 URL 长度 <32 跳过: {skipped_short} 条") + print(f" - 总处理: {total} 条") + + except Exception as e: + conn.rollback() + print(f"❌ 发生错误,已回滚: {e}") + raise + finally: + cursor.close() + conn.close() + + +if __name__ == "__main__": + main() diff --git a/research/pdf_downloader/set_raw_title_rodong.py b/research/pdf_downloader/set_raw_title_rodong.py new file mode 100644 index 0000000..0aaba06 --- /dev/null +++ b/research/pdf_downloader/set_raw_title_rodong.py @@ -0,0 +1,158 @@ +import pymysql +import jieba +from collections import Counter +from typing import List, Tuple, Set + +# ================== 配置区 ================== + +DB_CONFIG = { + 'host': '47.113.231.200', + 'port': 28089, + 'user': 'root', + 'password': 'passok123A', + 'database': 'dsp', + 'charset': 'utf8mb4', +} + +# 指定需要处理的中文站点(es_srcname) +TARGET_SRCNAMES: List[str] = [ + "http://www.rodong.rep.kp/cn/index.php?MUBAMUAxQA==", + # 添加你的站点 +] + +FOREIGN_SRCNAME = 'http://www.rodong.rep.kp/ko/index.php?MUBAMUAxQA==' + +# 相似度阈值(关键词重合率),建议 0.3 ~ 0.6 +SIMILARITY_THRESHOLD = 0.3 + + +# ================== 文本相似度函数 ================== + +def extract_keywords(text: str) -> Set[str]: + """提取中文关键词:分词 + 过滤单字、数字、标点""" + if not text: + return set() + words = jieba.lcut(text) + return {w for w in words if len(w) >= 2 and w.isalpha()} + + +def keyword_overlap_similarity(title1: str, title2: str) -> float: + """计算两个中文标题的关键词重合率""" + kw1 = extract_keywords(title1) + kw2 = extract_keywords(title2) + + if not kw1 and not kw2: + return 1.0 if title1 == title2 else 0.0 + if not kw1 or not kw2: + return 0.0 + + overlap = kw1 & kw2 + return len(overlap) / max(len(kw1), len(kw2)) + + +# ================== 数据库操作 ================== + +def get_chinese_records(cursor) -> List[Tuple]: + """获取待处理的中文记录""" + if not TARGET_SRCNAMES: + return [] + placeholders = ','.join(['%s'] * len(TARGET_SRCNAMES)) + query = f""" + SELECT es_sid, es_srcname, es_urlname, es_urltitle, es_urltime + FROM indeximos + WHERE es_srcname IN ({placeholders}) + AND es_urltitle IS NOT NULL AND TRIM(es_urltitle) != '' + AND es_urltime IS NOT NULL + """ + cursor.execute(query, TARGET_SRCNAMES) + return cursor.fetchall() + + +def get_foreign_candidates_by_time(cursor, pub_time) -> List[Tuple]: + """ + 获取同一发布时间的所有外文候选记录(要求 es_abstract 不为空) + """ + query = """ + SELECT es_sid, es_abstract, es_urltitle, es_urlcontent + FROM indeximos + WHERE es_urltime = %s + AND es_abstract IS NOT NULL AND TRIM(es_abstract) != '' + AND es_urlcontent IS NOT NULL + """ + cursor.execute(query, (pub_time,)) + return cursor.fetchall() + + +def update_chinese_record(cursor, record_id: int, new_title: str, content: str): + """更新中文记录的标题和内容""" + update_query = """ + UPDATE indeximos + SET es_title = %s, es_content = %s + WHERE es_sid = %s + """ + cursor.execute(update_query, (new_title, content, record_id)) + + +# ================== 主逻辑 ================== + +def main(): + if not TARGET_SRCNAMES: + print("⚠️ 未指定目标站点,退出。") + return + + conn = pymysql.connect(**DB_CONFIG) + cursor = conn.cursor() + + try: + chinese_records = get_chinese_records(cursor) + total = len(chinese_records) + print(f"共加载 {total} 条中文记录用于匹配...") + + matched_count = 0 + + for idx, (cid, srcname, urlname, zh_title, pub_time) in enumerate(chinese_records, 1): + print(f"\n[{idx}/{total}] ID={cid}, 时间={pub_time}, 标题='{zh_title[:30]}...'") + + candidates = get_foreign_candidates_by_time(cursor, pub_time) + if not candidates: + print(" → 无同时间且有翻译标题的外文记录") + continue + + best_score = 0.0 + best_candidate = None + + for fid, trans_title, ori_title, content in candidates: + # 跳过自己(理论上不会发生,但安全起见) + if fid == cid: + continue + + score = keyword_overlap_similarity(zh_title, trans_title) + print(f" 候选ID={fid} | 翻译标题='{trans_title[:30]}...' | 重合度={score:.3f}") + + if score > best_score: + best_score = score + best_candidate = (ori_title, content) + + if best_candidate and best_score >= SIMILARITY_THRESHOLD: + final_title, final_content = best_candidate + update_chinese_record(cursor, cid, final_title, final_content) + matched_count += 1 + print(f" ✅ 匹配成功! 重合度={best_score:.3f}") + else: + print(f" ❌ 未达阈值(最高相似度={best_score:.3f})") + + conn.commit() + print("\n" + "=" * 50) + print(f"✅ 匹配完成!成功关联 {matched_count} / {total} 条记录。") + + except Exception as e: + conn.rollback() + print(f"❌ 发生错误,已回滚: {e}") + raise + finally: + cursor.close() + conn.close() + + +if __name__ == "__main__": + main() diff --git a/spiders/MediaSpiders/MediaSpiders/scrapy_selenium/middlewares.py b/spiders/MediaSpiders/MediaSpiders/scrapy_selenium/middlewares.py index 2df273f..5788a53 100644 --- a/spiders/MediaSpiders/MediaSpiders/scrapy_selenium/middlewares.py +++ b/spiders/MediaSpiders/MediaSpiders/scrapy_selenium/middlewares.py @@ -59,7 +59,14 @@ class SeleniumMiddleware: # Edge in headless mode edge_options = EdgeOptions() edge_options.use_chromium = True - self.driver = Edge(executable_path='MicrosoftWebDriver.exe', options=edge_options) + self.driver = Edge(executable_path='msedgedriver.exe', options=edge_options) + self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { + "source": """ + Object.defineProperty(navigator, 'webdriver', { + get: () => undefined + }) + """ + }) @classmethod def from_crawler(cls, crawler): diff --git a/spiders/MediaSpiders/MediaSpiders/settings.py b/spiders/MediaSpiders/MediaSpiders/settings.py index bf0820a..d248fb3 100644 --- a/spiders/MediaSpiders/MediaSpiders/settings.py +++ b/spiders/MediaSpiders/MediaSpiders/settings.py @@ -74,26 +74,20 @@ TWITTER_PID_KEY = '' KAFKA_PROCESS_QUEUE = ['stream-protobuf', 'stream-db'] CUSTOM_USER_AGENT = [ - 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36', - 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:77.0) Gecko/20100101 Firefox/77.0', - 'Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko', - 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.18363', - 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50', - 'Opera/9.80 (Windows NT 6.1; U; zh-cn) Presto/2.9.168 Version/11.50', - 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; InfoPath.3; .NET4.0C; .NET4.0E; SE 2.X MetaSr 1.0', - 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; InfoPath.3; .NET4.0C; .NET4.0E', - 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/13.0.782.41 Safari/535.1 QQBrowser/6.9.11079.201' + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0', ] # 部署在外网采集fb时使用selenium_chrome SELENIUM_DRIVER_NAME = 'chrome' -SELENIUM_DRIVER_EXECUTABLE_PATH = 'http://144.34.185.108:28098' +SELENIUM_DRIVER_EXECUTABLE_PATH = 'local' +# SELENIUM_DRIVER_EXECUTABLE_PATH = 'http://144.34.185.108:28098' SELENIUM_DRIVER_ARGUMENTS = [ '--headless', '--no-sandbox', '--disable-dev-shm-usage', '--disable-gpu', - '--window-size=1920,1080' + '--window-size=1920,1080', + '--disable-blink-features=AutomationControlled' ] # 本地调试用 diff --git a/spiders/MediaSpiders/MediaSpiders/spiders/TwitterUserSpider.py b/spiders/MediaSpiders/MediaSpiders/spiders/TwitterUserSpider.py index 4884797..d83cba8 100644 --- a/spiders/MediaSpiders/MediaSpiders/spiders/TwitterUserSpider.py +++ b/spiders/MediaSpiders/MediaSpiders/spiders/TwitterUserSpider.py @@ -65,7 +65,7 @@ class TwitterSpider(scrapy.Spider): logger.info("login twitter") driver = response.request.meta['driver'] driver.maximize_window() - driver.get('https://twitter.com/i/flow/login') + driver.get('https://x.com/i/flow/login') time.sleep(5) # 获取采集登录账号并登录 login_users = self.redis_client.smembers('MediaSpiders:Twitter_login_accounts') diff --git a/spiders/MediaSpiders/MediaSpiders/utils/date_utils.py b/spiders/MediaSpiders/MediaSpiders/utils/date_utils.py index 5ecb4be..e641be1 100644 --- a/spiders/MediaSpiders/MediaSpiders/utils/date_utils.py +++ b/spiders/MediaSpiders/MediaSpiders/utils/date_utils.py @@ -87,7 +87,9 @@ def get_format_time(pattern, time_str): date = result.group(1) time_t = result.group(2) date = date.replace('/', '-').replace(".", "-").replace( - ",", "-").replace("年", "-").replace("月", "-").replace("日", "").replace(' ', '-').replace('--', '-') + ",", "-").replace("年", "-").replace("月", "-").replace("日", "").replace( + "년", "-").replace("월", "-").replace("일", "").replace( + ' ', '-').replace('--', '-') date_array = date.split('-') for i in range(len(date_array)): if (date_array[i].endswith('st') or @@ -128,7 +130,7 @@ def get_format_time(pattern, time_str): if __name__ == '__main__': # a = [' 令和4年6月9日', 'www.kcna.kp (主体111.6.6.)', '民國111年06月09日 ', 'Jun. 9, 2022', '111年 06月 21日'] - a = ['06.10.2023 03:24'] + a = ['2026년 1월 6일 화요일 1면 [사진있음]'] for _ in a: - print(get_time_stamp(_)) - # print(get_time_stamp(_, {r"(\d{4}年\d{1,2}月\d{2}日)\D*(\d{2}:\d{2}:\d{2})*\D*": ['%Y-%m-%d %H:%M:%S']})) + # print(get_time_stamp(_)) + print(get_time_stamp(_, {r"(\d{4}년 \d{1,2}월 \d{1,2}일)\D*(\d{2}:\d{2}:\d{2})*\D*": ['%Y-%m-%d %H:%M:%S']})) diff --git a/spiders/MediaSpiders/run.py b/spiders/MediaSpiders/run.py index 0d4e91c..34f1f5c 100644 --- a/spiders/MediaSpiders/run.py +++ b/spiders/MediaSpiders/run.py @@ -7,4 +7,4 @@ from scrapy.cmdline import execute dirpath = os.path.dirname(os.path.abspath(__file__)) sys.path.append(dirpath) -execute(['scrapy', 'crawl', 'FacebookUserSpider', '-a', 'params={}']) +execute(['scrapy', 'crawl', 'TwitterUserSpider', '-a', 'params={}']) diff --git a/spiders/WebsiteSpider/WebsiteSpider/utils/date_utils.py b/spiders/WebsiteSpider/WebsiteSpider/utils/date_utils.py index 62e9bb1..c213da3 100644 --- a/spiders/WebsiteSpider/WebsiteSpider/utils/date_utils.py +++ b/spiders/WebsiteSpider/WebsiteSpider/utils/date_utils.py @@ -89,7 +89,9 @@ def get_format_time(pattern, time_str): date = result.group(1) time_t = result.group(2) date = date.replace('/', '-').replace(".", "-").replace( - ",", "-").replace("年", "-").replace("月", "-").replace("日", "").replace(' ', '-').replace('--', '-') + ",", "-").replace("年", "-").replace("月", "-").replace("日", "").replace( + "년", "-").replace("월", "-").replace("일", "").replace( + ' ', '-').replace('--', '-') date_array = date.split('-') for i in range(len(date_array)): if (date_array[i].endswith('st') or @@ -135,7 +137,7 @@ def get_format_time(pattern, time_str): if __name__ == '__main__': # a = [' 令和4年6月9日', 'www.kcna.kp (主体111.6.6.)', '民國111年06月09日 ', 'Jun. 9, 2022', '111年 06月 21日'] - a = ['July 26, 2024 12:53 PM'] + a = ['2026년 1월 6일 화요일 1면 [사진있음]'] for _ in a: - print(get_time_stamp(_)) - # print(get_time_stamp(_, {r"(\w+ \d+, \d{4})\D*(\d+:\d+)\D*": ['%B-%d-%Y %H:%M:%S']})) + # print(get_time_stamp(_)) + print(get_time_stamp(_, {r"(\d{4}년 \d{1,2}월 \d{1,2}일)\D*(\d{2}:\d{2}:\d{2})*\D*": ['%Y-%m-%d %H:%M:%S']}))