diff --git a/primihub-service/application/src/main/java/com/primihub/application/controller/data/PirController.java b/primihub-service/application/src/main/java/com/primihub/application/controller/data/PirController.java index 53db5c49c..e7a6fe42b 100644 --- a/primihub-service/application/src/main/java/com/primihub/application/controller/data/PirController.java +++ b/primihub-service/application/src/main/java/com/primihub/application/controller/data/PirController.java @@ -6,7 +6,6 @@ import com.primihub.biz.entity.base.PageDataEntity; import com.primihub.biz.entity.data.base.DataPirKeyQuery; import com.primihub.biz.entity.data.dataenum.TaskStateEnum; -import com.primihub.biz.entity.data.req.DataModelAndComponentReq; import com.primihub.biz.entity.data.req.DataPirReq; import com.primihub.biz.entity.data.req.DataPirTaskReq; import com.primihub.biz.entity.data.vo.DataPirTaskDetailVo; @@ -38,16 +37,21 @@ public class PirController { @ApiOperation(value = "提交匿踪查询任务",httpMethod = "POST",consumes = MediaType.APPLICATION_JSON_VALUE) @RequestMapping("pirSubmitTask") public BaseResultEntity pirSubmitTask(String resourceId,String pirParam,String taskName){ +// public BaseResultEntity pirSubmitTask(@RequestBody BaseJsonParam req){ + // 查询条件 + DataPirReq param = new DataPirReq(); if (StringUtils.isBlank(resourceId)){ return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"resourceId"); } - if (StringUtils.isBlank(pirParam)){ - return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"pirParam"); - } if (StringUtils.isBlank(taskName)){ return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"taskName"); } - return pirService.pirSubmitTask(resourceId,pirParam,taskName); + if (StringUtils.isBlank(pirParam)) { + return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"pirParam"); + } + param.setResourceId(resourceId); + param.setTaskName(taskName); + return pirService.pirSubmitTask(param, pirParam); } diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/service/data/DataAsyncService.java b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/DataAsyncService.java index ed1dd4f1f..0c82f9285 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/service/data/DataAsyncService.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/DataAsyncService.java @@ -334,6 +334,7 @@ public TaskParam call() throws Exception { TaskParam taskParam = new TaskParam(new TaskPIRParam()); taskParam.setTaskId(dataTask.getTaskIdName()); taskParam.setJobId(String.valueOf(job)); + // 查询目标值,同组查询目标值使用逗号的字符串隔开 String[] querys = new String[dataPirKeyQuery.getQuery().size()]; for (int i = 0; i < dataPirKeyQuery.getQuery().size(); i++) { querys[i] = String.join(",", dataPirKeyQuery.getQuery().get(i)); @@ -356,31 +357,54 @@ public TaskParam call() throws Exception { } @Async - public void pirGrpcTask(DataTask dataTask, String resourceId, String param) { + public void pirGrpcTask(DataTask dataTask, DataPirTask dataPirTask,String resourceColumnNames, List dataPirKeyQueries) { Date date = new Date(); try { + dataTask.setTaskState(TaskStateEnum.IN_OPERATION.getStateType()); + updateTaskState(dataTask); String formatDate = DateUtil.formatDate(date, DateUtil.DateStyle.HOUR_FORMAT_SHORT.getFormat()); StringBuilder sb = new StringBuilder().append(baseConfiguration.getResultUrlDirPrefix()).append(formatDate).append("/").append(dataTask.getTaskIdName()).append(".csv"); dataTask.setTaskResultPath(sb.toString()); - TaskPIRParam pirParam = new TaskPIRParam(); - pirParam.setQueryParam(param.split(",")); - pirParam.setServerData(resourceId); - pirParam.setOutputFullFilename(dataTask.getTaskResultPath()); - TaskParam taskParam = new TaskParam(); - taskParam.setTaskContentParam(pirParam); - taskParam.setTaskId(dataTask.getTaskIdName()); - taskHelper.submit(taskParam); - if (taskParam.getSuccess()){ - dataTask.setTaskState(TaskStateEnum.SUCCESS.getStateType()); - }else { - dataTask.setTaskState(TaskStateEnum.FAIL.getStateType()); - dataTask.setTaskErrorMsg("运行失败:"+taskParam.getError()); +// List dataPirKeyQueries = JSONArray.parseArray(dataPirTask.getRetrievalId(), DataPirKeyQuery.class); + Map jobMap = new HashMap<>(); + List>> futureTasks = new ArrayList<>(); + for (int i = 0; i < dataPirKeyQueries.size(); i++) { + FutureTask> pirTaskFutureTask = getPirTaskFutureTask(dataPirKeyQueries.get(i), dataTask, dataPirTask, resourceColumnNames, formatDate, i); + primaryThreadPool.submit(pirTaskFutureTask); + futureTasks.add(pirTaskFutureTask); + jobMap.put(i+"",String.join("+",dataPirKeyQueries.get(i).getKey())); + } + List> listTaskParams = new ArrayList<>(); + for (FutureTask> futureTask : futureTasks) { + listTaskParams.add(futureTask.get()); + } + for (TaskParam listTaskParam : listTaskParams) { + if (dataTask.getTaskState().equals(TaskStateEnum.FAIL.getStateType())){ + if (!listTaskParam.getSuccess()){ + dataTask.setTaskErrorMsg("\n【"+jobMap.get(listTaskParam.getJobId())+"】匹配规则出错:"+listTaskParam.getError()); + } + } + if (!listTaskParam.getSuccess()){ + dataTask.setTaskState(TaskStateEnum.FAIL.getStateType()); + dataTask.setTaskErrorMsg("\n【"+jobMap.get(listTaskParam.getJobId())+"】匹配规则出错:"+listTaskParam.getError()); + }else { + dataTask.setTaskState(TaskStateEnum.SUCCESS.getStateType()); + } + } + if (dataTask.getTaskState().equals(TaskStateEnum.SUCCESS.getStateType())){ + List pirTaskResultData = dataRedisRepository.getPirTaskResultData(dataTask.getTaskIdName()); +// log.info("数据写入文件sb:{} - pirTaskResultDataSize:{}",sb.toString(),pirTaskResultData.size()); + boolean b = CsvUtil.csvWrite(sb.toString(), pirTaskResultData); +// log.info("数据写入文件结果:{}",b); + } } catch (Exception e) { dataTask.setTaskState(TaskStateEnum.FAIL.getStateType()); dataTask.setTaskErrorMsg(e.getMessage()); log.info("grpc pirSubmitTask Exception:{}", e.getMessage()); e.printStackTrace(); + }finally { + dataRedisRepository.deletePirTaskResultKey(dataTask.getTaskIdName()); } dataTask.setTaskEndTime(System.currentTimeMillis()); updateTaskState(dataTask); diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/service/data/PirService.java b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/PirService.java index 8770a8c8c..4a85c673c 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/service/data/PirService.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/PirService.java @@ -7,6 +7,7 @@ import com.primihub.biz.entity.base.BaseResultEntity; import com.primihub.biz.entity.base.BaseResultEnum; import com.primihub.biz.entity.base.PageDataEntity; +import com.primihub.biz.entity.data.base.DataPirKeyQuery; import com.primihub.biz.entity.data.dataenum.TaskStateEnum; import com.primihub.biz.entity.data.dataenum.TaskTypeEnum; import com.primihub.biz.entity.data.po.DataPirTask; @@ -45,8 +46,8 @@ public class PirService { public String getResultFilePath(String taskId,String taskDate){ return new StringBuilder().append(baseConfiguration.getResultUrlDirPrefix()).append(taskDate).append("/").append(taskId).append(".csv").toString(); } - public BaseResultEntity pirSubmitTask(String resourceId, String pirParam,String taskName) { - BaseResultEntity dataResource = otherBusinessesService.getDataResource(resourceId); + public BaseResultEntity pirSubmitTask(DataPirReq req, String pirParam) { + BaseResultEntity dataResource = otherBusinessesService.getDataResource(req.getResourceId()); if (dataResource.getCode()!=0) { return BaseResultEntity.failure(BaseResultEnum.DATA_RUN_TASK_FAIL,"资源查询失败"); } @@ -55,27 +56,58 @@ public BaseResultEntity pirSubmitTask(String resourceId, String pirParam,String if (available == 1) { return BaseResultEntity.failure(BaseResultEnum.DATA_RUN_TASK_FAIL,"资源不可用"); } + // dataResource columnName list + String resourceColumnNames = pirDataResource.getOrDefault("resourceColumnNameList", "").toString(); + if (StringUtils.isBlank(resourceColumnNames)){ + return BaseResultEntity.failure(BaseResultEnum.DATA_RUN_TASK_FAIL,"获取资源字段列表失败"); + } + String[] resourceColumnNameArray = resourceColumnNames.split(","); + if (resourceColumnNameArray.length == 0) { + return BaseResultEntity.failure(BaseResultEnum.DATA_RUN_TASK_FAIL,"获取资源字段列表为空"); + } + + String[] queryColumnNames = { + resourceColumnNameArray[0] + }; + // convert pirparam to query array + List dataPirKeyQueries = convertPirParamToQueryArray(pirParam,queryColumnNames); + DataTask dataTask = new DataTask(); // dataTask.setTaskIdName(UUID.randomUUID().toString()); dataTask.setTaskIdName(Long.toString(SnowflakeId.getInstance().nextId())); - dataTask.setTaskName(taskName); + dataTask.setTaskName(req.getTaskName()); dataTask.setTaskState(TaskStateEnum.IN_OPERATION.getStateType()); dataTask.setTaskType(TaskTypeEnum.PIR.getTaskType()); dataTask.setTaskStartTime(System.currentTimeMillis()); dataTaskPrRepository.saveDataTask(dataTask); DataPirTask dataPirTask = new DataPirTask(); dataPirTask.setTaskId(dataTask.getTaskId()); + // retrievalId will rent in web ,need to be readable dataPirTask.setRetrievalId(pirParam); dataPirTask.setProviderOrganName(pirDataResource.get("organName").toString()); dataPirTask.setResourceName(pirDataResource.get("resourceName").toString()); - dataPirTask.setResourceId(resourceId); + dataPirTask.setResourceId(req.getResourceId()); dataTaskPrRepository.saveDataPirTask(dataPirTask); - dataAsyncService.pirGrpcTask(dataTask,resourceId,pirParam); + dataAsyncService.pirGrpcTask(dataTask,dataPirTask,resourceColumnNames,dataPirKeyQueries); Map map = new HashMap<>(); map.put("taskId",dataTask.getTaskId()); return BaseResultEntity.success(map); } + private static List convertPirParamToQueryArray(String pirParam, String[] resourceColumnNameArray) { + DataPirKeyQuery dataPirKeyQuery = new DataPirKeyQuery(); + dataPirKeyQuery.setKey(resourceColumnNameArray); + String[] array = { + pirParam + }; + List queries = new ArrayList<>(resourceColumnNameArray.length); + for (int i = 0; i < resourceColumnNameArray.length; i++) { + queries.add(i, array); + } + dataPirKeyQuery.setQuery(queries); + return Collections.singletonList(dataPirKeyQuery); + } + public BaseResultEntity getPirTaskList(DataPirTaskReq req) { List dataPirTaskVos = dataTaskRepository.selectDataPirTaskPage(req); if (dataPirTaskVos.isEmpty()) {