diff --git a/dsp/dsp.iml b/dsp/dsp.iml
index 6453e56..36aa587 100644
--- a/dsp/dsp.iml
+++ b/dsp/dsp.iml
@@ -9,6 +9,16 @@
+
+
+
+
+
+
+
+
+
+
@@ -20,7 +30,7 @@
-
+
@@ -30,7 +40,6 @@
-
@@ -93,7 +102,6 @@
-
@@ -106,8 +114,6 @@
-
-
@@ -138,8 +144,8 @@
-
-
+
+
@@ -147,7 +153,7 @@
-
+
@@ -195,5 +201,40 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dsp/pom.xml b/dsp/pom.xml
index 658d53b..da6fe5e 100644
--- a/dsp/pom.xml
+++ b/dsp/pom.xml
@@ -94,15 +94,34 @@
true
true
-
- org.projectlombok
- lombok
-
mysql
mysql-connector-java
runtime
+
+ commons-net
+ commons-net
+ 3.10.0
+
+
+
+ org.springframework.boot
+ spring-boot-starter-data-jpa
+
+
+
+
+ org.apache.poi
+ poi
+ 5.2.4
+
+
+ org.apache.poi
+ poi-ooxml
+ 5.2.4
+
+
diff --git a/dsp/src/main/java/com/jsc/dsp/DspApplication.java b/dsp/src/main/java/com/jsc/dsp/DspApplication.java
index 3802233..88682b6 100644
--- a/dsp/src/main/java/com/jsc/dsp/DspApplication.java
+++ b/dsp/src/main/java/com/jsc/dsp/DspApplication.java
@@ -3,9 +3,10 @@ package com.jsc.dsp;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.scheduling.annotation.EnableScheduling;
-@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
+@SpringBootApplication
@EnableScheduling
public class DspApplication {
diff --git a/dsp/src/main/java/com/jsc/dsp/TestController.java b/dsp/src/main/java/com/jsc/dsp/TestController.java
new file mode 100644
index 0000000..3114015
--- /dev/null
+++ b/dsp/src/main/java/com/jsc/dsp/TestController.java
@@ -0,0 +1,45 @@
+package com.jsc.dsp;
+
+import com.alibaba.fastjson.JSONObject;
+import com.jsc.dsp.model.ReturnT;
+import com.jsc.dsp.utils.AutoPatroller;
+import com.jsc.dsp.utils.DatabaseConnector;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+
+@RestController
+@RequestMapping("/test")
+public class TestController {
+
+ @Resource
+ DatabaseConnector databaseConnector;
+
+ @Resource
+ AutoPatroller autoPatroller;
+
+ @PostMapping("/exportExcel")
+ public ReturnT exportExcel(@RequestBody JSONObject object) {
+ try {
+ String startTime = object.getString("startTime");
+ databaseConnector.exportToXlsx(startTime);
+ return new ReturnT<>(200, "", "");
+ } catch (Exception e) {
+ return new ReturnT<>(500, e.getMessage(), "");
+ }
+ }
+
+ @PostMapping("/triggerExportTask")
+ public ReturnT triggerTask() {
+ try {
+ autoPatroller.exportDataAndUpload();
+ return new ReturnT<>(200, "", "");
+ } catch (Exception e) {
+ return new ReturnT<>(500, e.getMessage(), "");
+ }
+ }
+
+}
diff --git a/dsp/src/main/java/com/jsc/dsp/dao/ConfigRepository.java b/dsp/src/main/java/com/jsc/dsp/dao/ConfigRepository.java
new file mode 100644
index 0000000..3aa6e53
--- /dev/null
+++ b/dsp/src/main/java/com/jsc/dsp/dao/ConfigRepository.java
@@ -0,0 +1,10 @@
+package com.jsc.dsp.dao;
+
+import com.jsc.dsp.model.Config;
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public interface ConfigRepository extends JpaRepository {
+ Config findFirstByConfigName(String configName);
+}
diff --git a/dsp/src/main/java/com/jsc/dsp/dao/EsDataNewsRepository.java b/dsp/src/main/java/com/jsc/dsp/dao/EsDataNewsRepository.java
new file mode 100644
index 0000000..aa769ab
--- /dev/null
+++ b/dsp/src/main/java/com/jsc/dsp/dao/EsDataNewsRepository.java
@@ -0,0 +1,12 @@
+package com.jsc.dsp.dao;
+
+import com.jsc.dsp.model.EsDataNewsView;
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+
+@Repository
+public interface EsDataNewsRepository extends JpaRepository {
+ List findAllByEsLoadtimeAfter(String loadtime);
+}
diff --git a/dsp/src/main/java/com/jsc/dsp/dao/IndeximosRepository.java b/dsp/src/main/java/com/jsc/dsp/dao/IndeximosRepository.java
new file mode 100644
index 0000000..59b852b
--- /dev/null
+++ b/dsp/src/main/java/com/jsc/dsp/dao/IndeximosRepository.java
@@ -0,0 +1,9 @@
+package com.jsc.dsp.dao;
+
+import com.jsc.dsp.model.Indeximos;
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public interface IndeximosRepository extends JpaRepository {
+}
diff --git a/dsp/src/main/java/com/jsc/dsp/model/Config.java b/dsp/src/main/java/com/jsc/dsp/model/Config.java
new file mode 100644
index 0000000..4b83f24
--- /dev/null
+++ b/dsp/src/main/java/com/jsc/dsp/model/Config.java
@@ -0,0 +1,15 @@
+package com.jsc.dsp.model;
+
+import lombok.Data;
+
+import javax.persistence.Entity;
+import javax.persistence.Id;
+
+@Entity
+@Data
+public class Config {
+ @Id
+ Integer id;
+ String configName;
+ String configValue;
+}
diff --git a/dsp/src/main/java/com/jsc/dsp/model/EsDataNewsView.java b/dsp/src/main/java/com/jsc/dsp/model/EsDataNewsView.java
new file mode 100644
index 0000000..ac9ec5b
--- /dev/null
+++ b/dsp/src/main/java/com/jsc/dsp/model/EsDataNewsView.java
@@ -0,0 +1,34 @@
+package com.jsc.dsp.model;
+
+import lombok.Data;
+
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Table;
+
+@Entity
+@Data
+@Table(name = "es_data_news")
+public class EsDataNewsView {
+ @Id
+ String esSid;
+ String esAuthors;
+ String esCarriertype;
+ String esCatalog;
+ String esCollection;
+ Float esDoclength;
+ String esLang;
+ String esLasttime;
+ String esLinks;
+ String esLoadtime;
+ String esSitename;
+ String esSrcname;
+ String esUrlcontent;
+ String esUrlimage;
+ String esUrlname;
+ String esUrltime;
+ String esUrltitle;
+ String esAbstract;
+ String esKeywords;
+ String file;
+}
diff --git a/dsp/src/main/java/com/jsc/dsp/model/Indeximos.java b/dsp/src/main/java/com/jsc/dsp/model/Indeximos.java
index 483e354..583ab0c 100644
--- a/dsp/src/main/java/com/jsc/dsp/model/Indeximos.java
+++ b/dsp/src/main/java/com/jsc/dsp/model/Indeximos.java
@@ -2,10 +2,17 @@ package com.jsc.dsp.model;
import lombok.Data;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Table;
import java.io.Serializable;
+@Entity
@Data
+@Table(name = "indeximos")
public class Indeximos implements Serializable {
+ @Id
+ String es_sid;
String es_abstract;
String es_annex;
String es_attachment;
@@ -56,7 +63,6 @@ public class Indeximos implements Serializable {
String es_repostuid;
String es_repostuname;
String es_rultopic;
- String es_sid;
String es_simhash;
String es_similarity;
String es_similaritycount;
diff --git a/dsp/src/main/java/com/jsc/dsp/service/ConfigService.java b/dsp/src/main/java/com/jsc/dsp/service/ConfigService.java
new file mode 100644
index 0000000..749c4a2
--- /dev/null
+++ b/dsp/src/main/java/com/jsc/dsp/service/ConfigService.java
@@ -0,0 +1,29 @@
+package com.jsc.dsp.service;
+
+import com.jsc.dsp.dao.ConfigRepository;
+import com.jsc.dsp.model.Config;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+
+@Service
+public class ConfigService {
+
+ @Resource
+ ConfigRepository configRepository;
+
+ public String getConfigValueByName(String configName) {
+ return getConfigByName(configName).getConfigValue();
+ }
+
+ public Config getConfigByName(String configName) {
+ return configRepository.findFirstByConfigName(configName);
+ }
+
+ public void setConfigValueByName(String configName, String configValue) {
+ Config config = getConfigByName(configName);
+ config.setConfigValue(configValue);
+ configRepository.save(config);
+ }
+
+}
diff --git a/dsp/src/main/java/com/jsc/dsp/service/StorageService.java b/dsp/src/main/java/com/jsc/dsp/service/StorageService.java
index 63db210..0a17473 100644
--- a/dsp/src/main/java/com/jsc/dsp/service/StorageService.java
+++ b/dsp/src/main/java/com/jsc/dsp/service/StorageService.java
@@ -1,23 +1,15 @@
package com.jsc.dsp.service;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.jsc.dsp.binding.StorageBinding;
import com.jsc.dsp.model.Indeximos;
import com.jsc.dsp.proto.EsOuterClass.Es;
import com.jsc.dsp.proto.EsOuterClass.EsSets;
-import com.jsc.dsp.utils.DBUtils;
-import com.jsc.dsp.utils.EsUtils;
-import com.jsc.dsp.utils.FileUtils;
+import com.jsc.dsp.utils.DatabaseConnector;
import com.jsc.dsp.utils.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -27,6 +19,7 @@ import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
+import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.*;
@@ -55,18 +48,8 @@ public class StorageService extends StreamService {
@Value("${custom.websiteWhiteList}")
String websiteWhiteListString;
- @Value("${db.driver}")
- String dbDriver;
-
- @Value("${db.url}")
- String dbUrl;
-
- @Value("${db.user}")
- String dbUser;
-
- @Value("${db.password}")
- String dbPassword;
-
+ @Resource
+ DatabaseConnector databaseConnector;
private final Logger logger = LogManager.getLogger(StorageService.class.getName());
@@ -101,7 +84,7 @@ public class StorageService extends StreamService {
Map fieldsMap = es.getAllFields();
Indeximos indeximos = new Indeximos();
for (FieldDescriptor key : fieldsMap.keySet()) {
- boolean hasField = DBUtils.hasField(Indeximos.class, key.getName());
+ boolean hasField = databaseConnector.hasField(Indeximos.class, key.getName());
if (!hasField) {
continue;
}
@@ -120,7 +103,7 @@ public class StorageService extends StreamService {
} else {
Field field = indeximos.getClass().getDeclaredField(key.getName());
field.setAccessible(true);
- String fieldType = DBUtils.getFieldType(Indeximos.class, key.getName());
+ String fieldType = databaseConnector.getFieldType(Indeximos.class, key.getName());
if (fieldType.contains("Float")) {
field.set(indeximos, Float.valueOf(value));
} else {
@@ -147,7 +130,7 @@ public class StorageService extends StreamService {
f.setAccessible(true);
//判断字段是否为空,并且对象属性中的基本都会转为对象类型来判断
if (f.get(indeximos) == null) {
- String fieldType = DBUtils.getFieldType(Indeximos.class, f.getName());
+ String fieldType = databaseConnector.getFieldType(Indeximos.class, f.getName());
if (fieldType.contains("Float")) {
f.set(indeximos, 0.0f);
} else {
@@ -161,11 +144,7 @@ public class StorageService extends StreamService {
}
}
if (dbStorageItems.size() > 0) {
- if (DBUtils.insertIntoDB(dbDriver, dbUrl, dbUser, dbPassword, dbStorageItems)) {
- logger.info("Store to MySQL Database");
- } else {
- logger.error("MySQL Database Storage error!");
- }
+ databaseConnector.insertIntoDB(dbStorageItems);
}
data.put("content", new String(esSetsBuilder.build().toByteArray(), StandardCharsets.ISO_8859_1));
}
diff --git a/dsp/src/main/java/com/jsc/dsp/utils/AutoPatroller.java b/dsp/src/main/java/com/jsc/dsp/utils/AutoPatroller.java
index e4c6dee..8809918 100644
--- a/dsp/src/main/java/com/jsc/dsp/utils/AutoPatroller.java
+++ b/dsp/src/main/java/com/jsc/dsp/utils/AutoPatroller.java
@@ -1,55 +1,237 @@
package com.jsc.dsp.utils;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import com.jsc.dsp.model.SearchAggregation;
-import com.jsc.dsp.model.TargetSocial;
-import com.jsc.dsp.model.TargetWebsite;
-
-import java.util.Date;
-import java.util.Map;
-import java.util.logging.Logger;
-
-import static com.jsc.dsp.utils.EsUtils.performAggregationSearch;
-
+import com.jsc.dsp.service.ConfigService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
-import org.springframework.kafka.support.LogIfLevelEnabled;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
+import javax.annotation.Resource;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.FileTime;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
@Component
public class AutoPatroller {
- private final Logger logger = Logger.getLogger(this.getClass().getName());
+ @Resource
+ DatabaseConnector databaseConnector;
- long updateInterval = 1500L;
+ @Resource
+ FTPConnector ftpConnector;
- @Value("${custom.websiteQueryAPI}")
- String websiteQueryAPI;
+ @Resource
+ ConfigService configService;
- @Value("${custom.websiteUpdateAPI}")
- String websiteUpdateAPI;
+ private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
- @Value("${custom.socialQueryAPI}")
- String socialQueryAPI;
+ private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+ private static final SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
- @Value("${custom.socialUpdateAPI}")
- String socialUpdateAPI;
-// @Scheduled(cron = "0 45 0/3 * * *")
-// public void checkNewsSite() {
-// checkWebsite("es_sitename", "es_carriertype", "news");
-// }
-//
-// @Scheduled(cron = "0 15 1/3 * * *")
-// public void checkWechat() {
-// checkSocial("es_authors", "es_carriertype", "wechat", "5");
-// }
-//
-// @Scheduled(cron = "0 0 2/4 * * *")
-// public void checkArticleSite() {
-// checkWebsite("es_sitename", "es_carriertype", "article");
-// }
+ @Value("${custom.excelOutputPath}")
+ String excelOutputPath;
+
+ @Value("${custom.backupFilePath}")
+ String backupFilePath;
+
+ @Value("${custom.pagesOutputPath}")
+ String pagesOutputPath;
+
+ @Value("${custom.ftpUploadPath}")
+ String ftpUploadPath;
+
+ @Scheduled(cron = "0 0 8 * * *")
+ public void exportDataAndUpload() {
+ String lastLoadTime = configService.getConfigValueByName("last_loadtime");
+ String currentLoadTime = StringUtils.DateToString(new Date());
+ databaseConnector.exportToXlsx(lastLoadTime);
+ packagePagesFiles(lastLoadTime, currentLoadTime);
+ configService.setConfigValueByName("last_loadtime", currentLoadTime);
+ String zipFileName = String.format("data_news-%s.zip", currentLoadTime.replace("-", "").replace(":", "").replace(" ", ""));
+ String zipFileFullName = backupFilePath + File.separator + zipFileName;
+ String remoteZipPath = ftpUploadPath + "/" + zipFileName;
+ zipAndUploadDirectory(excelOutputPath, zipFileFullName, remoteZipPath);
+ }
+
+ /**
+ * 将指定目录打包成 ZIP 文件(保存到指定本地路径),并上传到 FTP 服务器
+ *
+ * @param sourceDirPath 本地要打包的源目录路径(如:/data/reports)
+ * @param localZipPath 本地 ZIP 文件保存路径(如:/backup/archives/reports_20251224.zip)
+ * @param remoteZipPath FTP 上的目标路径(如:/ftp/backups/reports_20251224.zip)
+ * @return 是否上传成功
+ */
+ public boolean zipAndUploadDirectory(String sourceDirPath, String localZipPath, String remoteZipPath) {
+ Path sourceDir = Paths.get(sourceDirPath);
+ if (!Files.exists(sourceDir) || !Files.isDirectory(sourceDir)) {
+ logger.error("源目录不存在或不是一个目录: {}", sourceDirPath);
+ return false;
+ }
+
+ Path localZipFile = Paths.get(localZipPath);
+ Path zipParent = localZipFile.getParent();
+ if (zipParent != null && !Files.exists(zipParent)) {
+ try {
+ Files.createDirectories(zipParent);
+ logger.debug("创建 ZIP 父目录: {}", zipParent);
+ } catch (IOException e) {
+ logger.error("无法创建 ZIP 父目录: {}", zipParent, e);
+ return false;
+ }
+ }
+
+ // 打包目录到指定本地 ZIP 路径
+ try {
+ zipDirectory(sourceDir, localZipFile.toFile());
+ } catch (IOException e) {
+ logger.error("打包目录失败: {}", sourceDirPath, e);
+ return false;
+ }
+
+ // 上传 ZIP 文件
+ try (InputStream zipInputStream = Files.newInputStream(localZipFile)) {
+ boolean uploaded = ftpConnector.uploadFile(zipInputStream, remoteZipPath);
+ if (uploaded) {
+ logger.info("ZIP 文件上传成功 - 本地: {}, FTP: {}", localZipPath, remoteZipPath);
+ } else {
+ logger.error("ZIP 文件上传失败 - FTP: {}", remoteZipPath);
+ }
+ return uploaded;
+ } catch (IOException e) {
+ logger.error("读取本地 ZIP 文件失败: {}", localZipPath, e);
+ return false;
+ }
+
+ // 注意:此处不再删除 localZipFile,由调用方决定是否保留或清理
+ }
+
+
+ /**
+ * 将目录递归打包成 ZIP 文件
+ *
+ * @param sourceDir 要打包的源目录
+ * @param zipFile 输出的 ZIP 文件
+ * @throws IOException
+ */
+ private void zipDirectory(Path sourceDir, File zipFile) throws IOException {
+ try (ZipOutputStream zipOut = new ZipOutputStream(new FileOutputStream(zipFile))) {
+ Files.walk(sourceDir)
+ .filter(path -> !Files.isDirectory(path)) // 只处理文件
+ .forEach(path -> {
+ ZipEntry zipEntry = new ZipEntry(sourceDir.relativize(path).toString());
+ try {
+ zipOut.putNextEntry(zipEntry);
+ Files.copy(path, zipOut);
+ zipOut.closeEntry();
+ } catch (IOException e) {
+ throw new RuntimeException("打包文件失败: " + path, e);
+ }
+ });
+ }
+ logger.info("目录打包完成: {} -> {}", sourceDir, zipFile.getAbsolutePath());
+ try {
+ Files.walk(sourceDir)
+ .sorted(Comparator.reverseOrder()) // 先处理子文件/子目录,再处理父目录(但这里只删文件)
+ .filter(path -> !Files.isDirectory(path)) // 只删除文件
+ .forEach(path -> {
+ try {
+ Files.delete(path);
+ logger.debug("已删除文件: {}", path);
+ } catch (IOException e) {
+ logger.warn("无法删除文件: {}", path, e);
+ }
+ });
+ logger.info("源目录已清空(仅删除文件,保留目录结构): {}", sourceDir);
+ } catch (IOException e) {
+ logger.error("清空源目录时发生错误", e);
+ // 注意:即使清理失败,ZIP 已生成并会继续上传,根据业务决定是否抛异常
+ // 如果要求“必须清理成功才算成功”,可在此 throw 异常
+ }
+ }
+
+ public void packagePagesFiles(String startTime, String endTime) {
+ try {
+ // 解析时间范围
+ Date start = sdf.parse(startTime);
+ Date end = sdf.parse(endTime);
+
+ // 确保输出目录存在
+ Path excelOutputDir = Paths.get(excelOutputPath);
+ if (!Files.exists(excelOutputDir)) {
+ Files.createDirectories(excelOutputDir);
+ }
+
+ // 构造 ZIP 文件名
+ String zipFileName = String.format("pdf_files_%s_to_%s.zip",
+ startTime.replace(":", "").replace(" ", "_"),
+ endTime.replace(":", "").replace(" ", "_"));
+ Path zipFilePath = excelOutputDir.resolve(zipFileName);
+
+ // 遍历 mhtmlOutputPath 目录
+ Path sourceDir = Paths.get(pagesOutputPath);
+ if (!Files.exists(sourceDir) || !Files.isDirectory(sourceDir)) {
+ System.err.println("源目录不存在或不是目录: " + pagesOutputPath);
+ return;
+ }
+
+ try (ZipOutputStream zipOut = new ZipOutputStream(new FileOutputStream(zipFilePath.toFile()))) {
+ Files.walk(sourceDir)
+ .filter(path -> !Files.isDirectory(path))
+ .filter(path -> path.toString().toLowerCase().endsWith(".pdf"))
+ .forEach(path -> {
+ try {
+ // 获取文件创建时间(Windows 支持,Linux/macOS 可能返回最早时间)
+ BasicFileAttributes attrs = Files.readAttributes(path, BasicFileAttributes.class);
+ FileTime creationTime = attrs.creationTime(); // 注意:非所有系统都支持创建时间
+ Date fileCreationDate = new Date(creationTime.toMillis());
+
+ // 如果系统不支持创建时间(例如某些 Linux 系统),可改用 lastModifiedTime
+ // Date fileCreationDate = new Date(Files.getLastModifiedTime(path).toMillis());
+
+ if (!fileCreationDate.before(start) && !fileCreationDate.after(end)) {
+ // 文件创建时间在范围内
+ String entryName = sourceDir.relativize(path).toString();
+ zipOut.putNextEntry(new ZipEntry(entryName));
+ try (InputStream in = Files.newInputStream(path)) {
+ byte[] buffer = new byte[8192];
+ int len;
+ while ((len = in.read(buffer)) > 0) {
+ zipOut.write(buffer, 0, len);
+ }
+ }
+ zipOut.closeEntry();
+ System.out.println("已添加文件: " + path);
+ }
+ } catch (IOException e) {
+ System.err.println("处理文件时出错: " + path + " - " + e.getMessage());
+ }
+ });
+ }
+
+ System.out.println("ZIP 打包完成: " + zipFilePath);
+
+ } catch (ParseException e) {
+ System.err.println("时间格式解析错误,请确保使用格式: " + DATE_FORMAT);
+ e.printStackTrace();
+ } catch (IOException e) {
+ System.err.println("IO 错误: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
}
diff --git a/dsp/src/main/java/com/jsc/dsp/utils/DBUtils.java b/dsp/src/main/java/com/jsc/dsp/utils/DBUtils.java
deleted file mode 100644
index e5995d2..0000000
--- a/dsp/src/main/java/com/jsc/dsp/utils/DBUtils.java
+++ /dev/null
@@ -1,131 +0,0 @@
-package com.jsc.dsp.utils;
-
-import com.alibaba.fastjson.JSONArray;
-import com.jsc.dsp.model.Indeximos;
-
-import java.io.File;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.*;
-import java.util.logging.Logger;
-
-
-public class DBUtils {
-
- public static Connection conn = null;
-
- private static final List floatFields = Arrays.asList("es_doclength", "es_negativeProbability", "es_simrank");
-
- private static final Logger logger = Logger.getLogger("com.jsc.dsp.utils.DBUtils");
-
- public static Connection getConnection(String driver, String url, String user, String password) {
- try {
- Class.forName(driver);
- return DriverManager.getConnection(url, user, password);
- } catch (ClassNotFoundException | SQLException e) {
- logger.warning("Cannot get DB connection!");
- logger.warning(e.getMessage());
- return null;
- }
- }
-
- private static Map getObjectMap(Indeximos object) {
- Map resultMap = new HashMap<>();
- Field[] fields = object.getClass().getDeclaredFields();
- for (Field field : fields) {
- String fieldName = field.getName();
- String firstLetter = fieldName.substring(0, 1).toUpperCase();
- String getter = "get" + firstLetter + fieldName.substring(1);
- try {
- Method method = object.getClass().getMethod(getter);
- Object fieldValue = method.invoke(object);
- resultMap.put(fieldName, fieldValue);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- return resultMap;
- }
-
- public static boolean insertIntoDB(String driver, String url, String user, String password, List objectList) {
- if (conn == null) {
- conn = getConnection(driver, url, user, password);
- }
- if (conn != null) {
- try {
- PreparedStatement pres = null;
- for (Indeximos object : objectList) {
- Map objectMap = getObjectMap(object);
- Object[] keyObjects = objectMap.keySet().toArray();
- List keys = new ArrayList<>();
- List values = new ArrayList<>();
- for (Object ko : keyObjects) {
- String key = ko.toString();
- keys.add(key);
- Object value = objectMap.get(key);
- if (floatFields.contains(key)) {
- values.add(value.toString());
- } else {
- if (value != null && value.toString().length() > 0) {
- values.add("'" + value.toString().replace("'", "\\'") + "'");
- } else {
- values.add("null");
- }
- }
- }
- String sqlInsert = "REPLACE INTO indeximos(" + String.join(", ", keys) + ") VALUES("
- + String.join(", ", values) + ")";
- pres = conn.prepareStatement(sqlInsert);
- pres.addBatch();
- }
- if (pres != null) {
- pres.executeBatch();
- pres.close();
- }
- return true;
- } catch (SQLException e) {
- logger.warning("Fail to insert data to Database");
- logger.warning(e.getMessage());
- conn = getConnection(driver, url, user, password);
- return false;
- }
- } else {
- return false;
- }
- }
-
-
- public static boolean hasField(Class> clazz, String fieldName) {
- try {
- clazz.getDeclaredField(fieldName);
- return true;
- } catch (NoSuchFieldException e) {
- return false;
- }
- }
-
- public static String getFieldType(Class> clazz, String fieldName) {
- try {
- Field field = clazz.getDeclaredField(fieldName);
- return field.getType().getName();
- } catch (NoSuchFieldException e) {
- return "";
- }
- }
-
- public static void main(String[] args) {
- List objectList = JSONArray.parseArray(FileUtils.readContentFromFile(
- "D:/data/local-storage/indeximos_1700030748332.json"), Indeximos.class);
- insertIntoDB(
- "com.mysql.cj.jdbc.Driver",
- "jdbc:mysql://8.130.95.27:28089/dsp",
- "root",
- "passok123A",
- objectList);
- }
-
-}
diff --git a/dsp/src/main/java/com/jsc/dsp/utils/DatabaseConnector.java b/dsp/src/main/java/com/jsc/dsp/utils/DatabaseConnector.java
new file mode 100644
index 0000000..b37cdda
--- /dev/null
+++ b/dsp/src/main/java/com/jsc/dsp/utils/DatabaseConnector.java
@@ -0,0 +1,150 @@
+package com.jsc.dsp.utils;
+
+import com.jsc.dsp.dao.EsDataNewsRepository;
+import com.jsc.dsp.dao.IndeximosRepository;
+import com.jsc.dsp.model.EsDataNewsView;
+import com.jsc.dsp.model.Indeximos;
+import org.apache.poi.ss.usermodel.*;
+import org.apache.poi.xssf.usermodel.XSSFWorkbook;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.io.ByteArrayOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.logging.Logger;
+
+@Service
+public class DatabaseConnector {
+
+ @Resource
+ IndeximosRepository indeximosRepository;
+
+ @Resource
+ EsDataNewsRepository esDataNewsRepository;
+
+ @Value("${custom.excelOutputPath}")
+ String excelOutputPath;
+
+ private final Logger logger = Logger.getLogger(this.getClass().getName());
+
+ public void insertIntoDB(List objectList) {
+ try {
+ indeximosRepository.saveAll(objectList);
+ } catch (Exception e) {
+ logger.warning("Fail to insert data to Database");
+ logger.warning(e.getMessage());
+ }
+ }
+
+
+ public boolean hasField(Class> clazz, String fieldName) {
+ try {
+ clazz.getDeclaredField(fieldName);
+ return true;
+ } catch (NoSuchFieldException e) {
+ return false;
+ }
+ }
+
+ public String getFieldType(Class> clazz, String fieldName) {
+ try {
+ Field field = clazz.getDeclaredField(fieldName);
+ return field.getType().getName();
+ } catch (NoSuchFieldException e) {
+ return "";
+ }
+ }
+
+ public void exportToXlsx(String startTime) {
+ try {
+ Path dirPath = Paths.get(excelOutputPath);
+ if (!Files.exists(dirPath)) {
+ Files.createDirectories(dirPath);
+ }
+ String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss"));
+ String fileName = "data_news-" + timestamp + ".xlsx";
+ Path filePath = dirPath.resolve(fileName);
+
+ List esDataNewsViewList = esDataNewsRepository.findAllByEsLoadtimeAfter(startTime);
+ if (!esDataNewsViewList.isEmpty()) {
+ Field[] fields = esDataNewsViewList.get(0).getClass().getDeclaredFields();
+ try (Workbook workbook = new XSSFWorkbook();
+ ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+
+ Sheet sheet = workbook.createSheet("data");
+
+ // 创建表头
+ Row headerRow = sheet.createRow(0);
+ CellStyle headerStyle = workbook.createCellStyle();
+ headerStyle.setFillForegroundColor(IndexedColors.LIGHT_BLUE.getIndex());
+ headerStyle.setFillPattern(FillPatternType.SOLID_FOREGROUND);
+
+ for (int i = 0; i < fields.length; i++) {
+ Cell cell = headerRow.createCell(i);
+ String formField = formField(fields[i]);
+ cell.setCellValue(formField);
+ cell.setCellStyle(headerStyle);
+ }
+ // 填充数据
+ int rowNum = 1;
+ for (EsDataNewsView item : esDataNewsViewList) {
+ if (item.getFile() == null || item.getFile().length() < 5) {
+ continue;
+ }
+ Row row = sheet.createRow(rowNum++);
+ row.createCell(0).setCellValue(item.getEsSid());
+ row.createCell(1).setCellValue(item.getEsAuthors());
+ row.createCell(2).setCellValue(item.getEsCarriertype());
+ row.createCell(3).setCellValue(item.getEsCatalog());
+ row.createCell(4).setCellValue(item.getEsCollection());
+ row.createCell(5).setCellValue(item.getEsDoclength());
+ row.createCell(6).setCellValue(item.getEsLang());
+ row.createCell(7).setCellValue(item.getEsLasttime());
+ row.createCell(8).setCellValue(item.getEsLinks());
+ row.createCell(9).setCellValue(item.getEsLoadtime());
+ row.createCell(10).setCellValue(item.getEsSitename());
+ row.createCell(11).setCellValue(item.getEsSrcname());
+ row.createCell(12).setCellValue(item.getEsUrlcontent());
+ row.createCell(13).setCellValue(item.getEsUrlimage());
+ row.createCell(14).setCellValue(item.getEsUrlname());
+ row.createCell(15).setCellValue(item.getEsUrltime());
+ row.createCell(16).setCellValue(item.getEsUrltitle());
+ row.createCell(17).setCellValue(item.getEsAbstract());
+ row.createCell(18).setCellValue(item.getEsKeywords());
+ row.createCell(19).setCellValue(item.getFile());
+ }
+
+ // 自动调整列宽
+ for (int i = 0; i < fields.length; i++) {
+ sheet.autoSizeColumn(i);
+ }
+
+ workbook.write(out);
+
+ try (FileOutputStream fos = new FileOutputStream(filePath.toFile())) {
+ workbook.write(fos);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private String formField(Field field) {
+ String fieldString = field.getName();
+ return StringUtils.camelToSnake(fieldString);
+ }
+
+}
diff --git a/dsp/src/main/java/com/jsc/dsp/utils/FTPConnector.java b/dsp/src/main/java/com/jsc/dsp/utils/FTPConnector.java
new file mode 100644
index 0000000..c61011f
--- /dev/null
+++ b/dsp/src/main/java/com/jsc/dsp/utils/FTPConnector.java
@@ -0,0 +1,108 @@
+package com.jsc.dsp.utils;
+
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+@Component
+public class FTPConnector {
+
+ Logger log = LoggerFactory.getLogger(this.getClass().getName());
+
+ @Value("${ftp.host}")
+ String host;
+
+ @Value("${ftp.port}")
+ Integer port;
+
+ @Value("${ftp.username}")
+ String username;
+
+ @Value("${ftp.password}")
+ String password;
+
+ @Value("${ftp.timeout}")
+ Integer timeout;
+
+ public boolean uploadFile(InputStream inputStream, String remotePath) {
+ FTPClient ftpClient = new FTPClient();
+ try {
+ // 连接 FTP 服务器
+ ftpClient.connect(host, port);
+ ftpClient.login(username, password);
+ ftpClient.setConnectTimeout(timeout);
+ ftpClient.setSoTimeout(timeout);
+
+ // 设置文件类型为二进制(避免文本模式损坏文件)
+ ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
+
+ // 启用被动模式(适用于 NAT/防火墙环境)
+ ftpClient.enterLocalPassiveMode();
+
+ // 检查登录是否成功
+ if (!FTPReply.isPositiveCompletion(ftpClient.getReplyCode())) {
+ ftpClient.disconnect();
+ log.error("FTP 登录失败");
+ return false;
+ }
+
+ // 创建目录(如果路径包含子目录)
+ createDirectories(ftpClient, remotePath);
+
+ // 上传文件
+ boolean success = ftpClient.storeFile(remotePath, inputStream);
+ if (success) {
+ log.info("文件上传成功: {}", remotePath);
+ } else {
+ log.error("FTP 上传失败,错误码: {}", ftpClient.getReplyCode());
+ }
+ return success;
+
+ } catch (IOException e) {
+ log.error("FTP 上传异常: {}", e.getMessage(), e);
+ return false;
+ } finally {
+ try {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ if (ftpClient.isConnected()) {
+ ftpClient.logout();
+ ftpClient.disconnect();
+ }
+ } catch (IOException e) {
+ log.warn("关闭 FTP 连接时出错", e);
+ }
+ }
+ }
+
+ /**
+ * 递归创建远程目录(如果路径中包含目录)
+ */
+ private void createDirectories(FTPClient ftpClient, String remoteFilePath) throws IOException {
+ String[] pathParts = remoteFilePath.split("/");
+ StringBuilder currentPath = new StringBuilder();
+
+ for (int i = 0; i < pathParts.length - 1; i++) {
+ if (!pathParts[i].isEmpty()) {
+ currentPath.append("/").append(pathParts[i]);
+ // 尝试切换目录,如果失败则创建
+ if (!ftpClient.changeWorkingDirectory(currentPath.toString())) {
+ boolean made = ftpClient.makeDirectory(currentPath.toString());
+ if (made) {
+ log.debug("创建 FTP 目录: {}", currentPath);
+ }
+ ftpClient.changeWorkingDirectory(currentPath.toString());
+ }
+ }
+ }
+ }
+
+}
diff --git a/dsp/src/main/java/com/jsc/dsp/utils/StringUtils.java b/dsp/src/main/java/com/jsc/dsp/utils/StringUtils.java
index ff3873a..c830568 100644
--- a/dsp/src/main/java/com/jsc/dsp/utils/StringUtils.java
+++ b/dsp/src/main/java/com/jsc/dsp/utils/StringUtils.java
@@ -116,6 +116,29 @@ public class StringUtils {
return wordList;
}
+ public static String camelToSnake(String camel) {
+ if (camel == null || camel.isEmpty()) {
+ return camel;
+ }
+ StringBuilder result = new StringBuilder();
+ result.append(Character.toLowerCase(camel.charAt(0)));
+ for (int i = 1; i < camel.length(); i++) {
+ char ch = camel.charAt(i);
+ if (Character.isUpperCase(ch)) {
+ // 如果前一个字符不是大写,或者后一个不是小写,则加下划线
+ char prev = camel.charAt(i - 1);
+ if (!Character.isUpperCase(prev) ||
+ (i + 1 < camel.length() && Character.isLowerCase(camel.charAt(i + 1)))) {
+ result.append('_');
+ }
+ result.append(Character.toLowerCase(ch));
+ } else {
+ result.append(ch);
+ }
+ }
+ return result.toString();
+ }
+
public static void main(String[] args) {
initFilterMap("http://39.98.151.140:28081/api/open/wordBank/queryAll");
}
diff --git a/dsp/src/main/resources/application.yml b/dsp/src/main/resources/application.yml
index 83f6806..ebcacb0 100644
--- a/dsp/src/main/resources/application.yml
+++ b/dsp/src/main/resources/application.yml
@@ -42,21 +42,34 @@ spring:
records: 10
interval:
ms: 3600000
+ datasource:
+ url: jdbc:mysql://47.113.231.200:28089/dsp?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
+ username: root
+ password: passok123A
+ driver-class-name: com.mysql.cj.jdbc.Driver
+
+ jpa:
+ database-platform: org.hibernate.dialect.MySQL8Dialect
+ show-sql: true
+
topics:
stream-protobuf: com.jsc.dsp.service.ProtobufService
stream-db: com.jsc.dsp.service.StorageService
stream-file-dl: com.jsc.dsp.service.FileDlService
switch:
- enable-storage-service: true
+ enable-storage-service: false
enable-file-dl-service: false
enable-protobuf-service: false
-db:
- driver: com.mysql.cj.jdbc.Driver
- url: jdbc:mysql://47.113.231.200:28089/dsp
- user: root
- password: passok123A
+ftp:
+ host: 144.34.185.108
+ port: 21
+ username: jsc-2b
+ password: 1234qwer%
+ timeout: 5000
+ passive-mode: true
+
custom:
dev-mode: false
filter-words-query-api: http://47.115.228.133:28081/api/open/wordBank/queryAll
@@ -72,3 +85,7 @@ custom:
socialQueryAPI: http://47.115.228.133:28081/api/open/target/social/queryAll?sortBy=id&shuffleResult=false
socialUpdateAPI: http://47.115.228.133:28081/api/open/target/social/update
websiteWhiteList: 能源界(国内信息);能源界(国际信息);中国能源新闻网;新华能源网;中国能源网(能源战略);中国农网(三农要闻);中国经济网(三农经济);中华粮网(粮食安全);美国之音(中国版面);美国之音(中美关系);美国之音(台海两岸版面);美国之音(港澳版面);看中国(看大陆版面);看中国(重点新闻);德国之声(中国报道);纽约时报中文网(中国版面);大纪元(一周大陆新闻);EnergyNow;联合国粮农组织;路透社(中国版面)
+ excelOutputPath: D:/data/output/upload
+ backupFilePath: D:/data/output/backup
+ pagesOutputPath: D:/data/output/pdf
+ ftpUploadPath: /home/jsc-2b
\ No newline at end of file