jsc-dsp #1
@ -20,6 +20,7 @@ import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.json.JacksonJsonParser;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
@ -33,6 +34,7 @@ import java.util.Base64.Decoder;
|
||||
|
||||
@Component
|
||||
@EnableBinding(StorageBinding.class)
|
||||
@ConditionalOnProperty(name = "switch.enable-storage-service", havingValue = "true", matchIfMissing = true)
|
||||
public class StorageService extends StreamService {
|
||||
|
||||
@Autowired
|
||||
@ -44,27 +46,15 @@ public class StorageService extends StreamService {
|
||||
@Autowired
|
||||
JacksonJsonParser jsonParser;
|
||||
|
||||
@Value("${es.ip}")
|
||||
String esIp;
|
||||
|
||||
@Value("${es.port}")
|
||||
Integer esPort;
|
||||
|
||||
@Value("${es.username}")
|
||||
String esUsername;
|
||||
|
||||
@Value("${es.password}")
|
||||
String esPassword;
|
||||
|
||||
@Value("${es.index}")
|
||||
String esIndex;
|
||||
|
||||
@Value("${custom.dev-mode}")
|
||||
boolean devMode;
|
||||
|
||||
@Value("${custom.local-file-storage-path}")
|
||||
String localFileStoragePath;
|
||||
|
||||
@Value("${custom.websiteWhiteList}")
|
||||
String websiteWhiteListString;
|
||||
|
||||
@Value("${db.driver}")
|
||||
String dbDriver;
|
||||
|
||||
@ -80,6 +70,7 @@ public class StorageService extends StreamService {
|
||||
|
||||
private final Logger logger = LogManager.getLogger(StorageService.class.getName());
|
||||
|
||||
|
||||
@Override
|
||||
public void sendMessage(byte[] msg) {
|
||||
source.StorageOutput().send(MessageBuilder.withPayload(msg).build());
|
||||
@ -91,8 +82,8 @@ public class StorageService extends StreamService {
|
||||
@Override
|
||||
@StreamListener(StorageBinding.STORAGE_PIPELINE_IN)
|
||||
public void receiveMessage(Object payload) {
|
||||
List<String> websiteWhiteList = Arrays.asList(websiteWhiteListString.split(";"));
|
||||
String tempString;
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
try {
|
||||
tempString = new String(base64.decode(payload.toString()), StandardCharsets.UTF_8);
|
||||
Map<String, Object> data = jsonParser.parseMap(tempString);
|
||||
@ -101,7 +92,6 @@ public class StorageService extends StreamService {
|
||||
if ("public_info_data_".equals(protoName)) {
|
||||
EsSets.Builder esSetsBuilder = EsSets.newBuilder();
|
||||
EsSets esSets = EsSets.parseFrom(data.get("content").toString().getBytes(StandardCharsets.ISO_8859_1));
|
||||
List<Object> localStorageItems = new ArrayList<>();
|
||||
List<Indeximos> dbStorageItems = new ArrayList<>();
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
bulkRequest.timeout("5s");
|
||||
@ -138,6 +128,9 @@ public class StorageService extends StreamService {
|
||||
}
|
||||
}
|
||||
}
|
||||
// 只导出目标站点的数据
|
||||
if (websiteWhiteList.contains(indeximos.getEs_sitename())) {
|
||||
logger.info("开始处理站点【" + indeximos.getEs_sitename() + "】的数据入库流程");
|
||||
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
|
||||
String es_urlname = indeximos.getEs_urlname();
|
||||
if (!es_urlname.isEmpty()) {
|
||||
@ -145,6 +138,7 @@ public class StorageService extends StreamService {
|
||||
UUID _uuid = UUID.nameUUIDFromBytes(es_urlname.getBytes());
|
||||
uuid = _uuid.toString().replaceAll("-", "");
|
||||
}
|
||||
indeximos.setEs_urltitle(indeximos.getEs_urltitle().trim());
|
||||
indeximos.setEs_sid(uuid);
|
||||
indeximos.setEs_links(indeximos.getEs_links());
|
||||
indeximos.setEs_loadtime(StringUtils.TimestampToStringDate(System.currentTimeMillis()));
|
||||
@ -163,36 +157,8 @@ public class StorageService extends StreamService {
|
||||
}
|
||||
}
|
||||
}
|
||||
IndexRequest indexRequest = new IndexRequest(esIndex);
|
||||
indexRequest.id(indeximos.getEs_sid());
|
||||
indexRequest.source(objectMapper.writeValueAsString(indeximos), XContentType.JSON);
|
||||
bulkRequest.add(indexRequest);
|
||||
Es es_temp = builder.build();
|
||||
esSetsBuilder.addEs(es_temp);
|
||||
List<String> localizedOption = JSON.parseArray(indeximos.getEs_urltopic(), String.class);
|
||||
if (indeximos.getEs_carriertype().equals("wechat")) {
|
||||
dbStorageItems.add(indeximos);
|
||||
}
|
||||
if (localizedOption != null && localizedOption.size() > 0) {
|
||||
//本地存储用
|
||||
if (localizedOption.contains("json")) {
|
||||
localStorageItems.add(indeximos);
|
||||
}
|
||||
//入库MySQL
|
||||
if (localizedOption.contains("mysql")) {
|
||||
dbStorageItems.add(indeximos);
|
||||
}
|
||||
}
|
||||
}
|
||||
EsUtils.EsSaveBulkRequest(esIp, esPort, esUsername, esPassword, bulkRequest);
|
||||
if (localStorageItems.size() > 0) {
|
||||
String entityItemsString = JSON.toJSONString(localStorageItems);
|
||||
String entityFileFullPath = localFileStoragePath + esIndex + "_" + System.currentTimeMillis() + ".json";
|
||||
if (FileUtils.saveStringToFile(entityItemsString, entityFileFullPath)) {
|
||||
logger.info("Local file store to " + entityFileFullPath);
|
||||
} else {
|
||||
logger.error("Local file store error!");
|
||||
}
|
||||
}
|
||||
if (dbStorageItems.size() > 0) {
|
||||
if (DBUtils.insertIntoDB(dbDriver, dbUrl, dbUser, dbPassword, dbStorageItems)) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user