From 9f3eb9cf941b5c47bb771fea396ec24431434e03 Mon Sep 17 00:00:00 2001 From: yuxin-pc Date: Tue, 23 Dec 2025 19:23:15 +0800 Subject: [PATCH] Update StorageService.java MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 入库MySQL --- .../com/jsc/dsp/service/StorageService.java | 100 ++++++------------ 1 file changed, 33 insertions(+), 67 deletions(-) diff --git a/dsp/src/main/java/com/jsc/dsp/service/StorageService.java b/dsp/src/main/java/com/jsc/dsp/service/StorageService.java index 8c45d1a..63db210 100644 --- a/dsp/src/main/java/com/jsc/dsp/service/StorageService.java +++ b/dsp/src/main/java/com/jsc/dsp/service/StorageService.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.xcontent.XContentType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.json.JacksonJsonParser; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; @@ -33,6 +34,7 @@ import java.util.Base64.Decoder; @Component @EnableBinding(StorageBinding.class) +@ConditionalOnProperty(name = "switch.enable-storage-service", havingValue = "true", matchIfMissing = true) public class StorageService extends StreamService { @Autowired @@ -44,27 +46,15 @@ public class StorageService extends StreamService { @Autowired JacksonJsonParser jsonParser; - @Value("${es.ip}") - String esIp; - - @Value("${es.port}") - Integer esPort; - - @Value("${es.username}") - String esUsername; - - @Value("${es.password}") - String esPassword; - - @Value("${es.index}") - String esIndex; - @Value("${custom.dev-mode}") boolean devMode; @Value("${custom.local-file-storage-path}") String localFileStoragePath; + @Value("${custom.websiteWhiteList}") + String websiteWhiteListString; + @Value("${db.driver}") String dbDriver; @@ -80,6 +70,7 @@ public class StorageService extends StreamService { private final Logger logger = LogManager.getLogger(StorageService.class.getName()); + @Override public void sendMessage(byte[] msg) { source.StorageOutput().send(MessageBuilder.withPayload(msg).build()); @@ -91,8 +82,8 @@ public class StorageService extends StreamService { @Override @StreamListener(StorageBinding.STORAGE_PIPELINE_IN) public void receiveMessage(Object payload) { + List websiteWhiteList = Arrays.asList(websiteWhiteListString.split(";")); String tempString; - ObjectMapper objectMapper = new ObjectMapper(); try { tempString = new String(base64.decode(payload.toString()), StandardCharsets.UTF_8); Map data = jsonParser.parseMap(tempString); @@ -101,7 +92,6 @@ public class StorageService extends StreamService { if ("public_info_data_".equals(protoName)) { EsSets.Builder esSetsBuilder = EsSets.newBuilder(); EsSets esSets = EsSets.parseFrom(data.get("content").toString().getBytes(StandardCharsets.ISO_8859_1)); - List localStorageItems = new ArrayList<>(); List dbStorageItems = new ArrayList<>(); BulkRequest bulkRequest = new BulkRequest(); bulkRequest.timeout("5s"); @@ -138,61 +128,37 @@ public class StorageService extends StreamService { } } } - String uuid = UUID.randomUUID().toString().replaceAll("-", ""); - String es_urlname = indeximos.getEs_urlname(); - if (!es_urlname.isEmpty()) { - // 根据urlname生成固定的UUID,避免重复入库相同的文章 - UUID _uuid = UUID.nameUUIDFromBytes(es_urlname.getBytes()); - uuid = _uuid.toString().replaceAll("-", ""); - } - indeximos.setEs_sid(uuid); - indeximos.setEs_links(indeximos.getEs_links()); - indeximos.setEs_loadtime(StringUtils.TimestampToStringDate(System.currentTimeMillis())); - builder.setEsSid(uuid); - for (Field f : indeximos.getClass().getDeclaredFields()) { - f.setAccessible(true); - //判断字段是否为空,并且对象属性中的基本都会转为对象类型来判断 - if (f.get(indeximos) == null) { - String fieldType = DBUtils.getFieldType(Indeximos.class, f.getName()); - if (fieldType.contains("Float")) { - f.set(indeximos, 0.0f); - } else { - if (!dateFields.contains(f.getName())) { - f.set(indeximos, ""); + // 只导出目标站点的数据 + if (websiteWhiteList.contains(indeximos.getEs_sitename())) { + logger.info("开始处理站点【" + indeximos.getEs_sitename() + "】的数据入库流程"); + String uuid = UUID.randomUUID().toString().replaceAll("-", ""); + String es_urlname = indeximos.getEs_urlname(); + if (!es_urlname.isEmpty()) { + // 根据urlname生成固定的UUID,避免重复入库相同的文章 + UUID _uuid = UUID.nameUUIDFromBytes(es_urlname.getBytes()); + uuid = _uuid.toString().replaceAll("-", ""); + } + indeximos.setEs_urltitle(indeximos.getEs_urltitle().trim()); + indeximos.setEs_sid(uuid); + indeximos.setEs_links(indeximos.getEs_links()); + indeximos.setEs_loadtime(StringUtils.TimestampToStringDate(System.currentTimeMillis())); + builder.setEsSid(uuid); + for (Field f : indeximos.getClass().getDeclaredFields()) { + f.setAccessible(true); + //判断字段是否为空,并且对象属性中的基本都会转为对象类型来判断 + if (f.get(indeximos) == null) { + String fieldType = DBUtils.getFieldType(Indeximos.class, f.getName()); + if (fieldType.contains("Float")) { + f.set(indeximos, 0.0f); + } else { + if (!dateFields.contains(f.getName())) { + f.set(indeximos, ""); + } } } } - } - IndexRequest indexRequest = new IndexRequest(esIndex); - indexRequest.id(indeximos.getEs_sid()); - indexRequest.source(objectMapper.writeValueAsString(indeximos), XContentType.JSON); - bulkRequest.add(indexRequest); - Es es_temp = builder.build(); - esSetsBuilder.addEs(es_temp); - List localizedOption = JSON.parseArray(indeximos.getEs_urltopic(), String.class); - if (indeximos.getEs_carriertype().equals("wechat")) { dbStorageItems.add(indeximos); } - if (localizedOption != null && localizedOption.size() > 0) { - //本地存储用 - if (localizedOption.contains("json")) { - localStorageItems.add(indeximos); - } - //入库MySQL - if (localizedOption.contains("mysql")) { - dbStorageItems.add(indeximos); - } - } - } - EsUtils.EsSaveBulkRequest(esIp, esPort, esUsername, esPassword, bulkRequest); - if (localStorageItems.size() > 0) { - String entityItemsString = JSON.toJSONString(localStorageItems); - String entityFileFullPath = localFileStoragePath + esIndex + "_" + System.currentTimeMillis() + ".json"; - if (FileUtils.saveStringToFile(entityItemsString, entityFileFullPath)) { - logger.info("Local file store to " + entityFileFullPath); - } else { - logger.error("Local file store error!"); - } } if (dbStorageItems.size() > 0) { if (DBUtils.insertIntoDB(dbDriver, dbUrl, dbUser, dbPassword, dbStorageItems)) {