Skip to content

Commit

Permalink
fix pirparam string to k-v list param (#235)
Browse files Browse the repository at this point in the history
Co-authored-by: terrence <terrence>
  • Loading branch information
TerrenceGee authored Dec 27, 2023
1 parent c013b5f commit 8470ecd
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.primihub.biz.entity.base.PageDataEntity;
import com.primihub.biz.entity.data.base.DataPirKeyQuery;
import com.primihub.biz.entity.data.dataenum.TaskStateEnum;
import com.primihub.biz.entity.data.req.DataModelAndComponentReq;
import com.primihub.biz.entity.data.req.DataPirReq;
import com.primihub.biz.entity.data.req.DataPirTaskReq;
import com.primihub.biz.entity.data.vo.DataPirTaskDetailVo;
Expand Down Expand Up @@ -38,16 +37,21 @@ public class PirController {
@ApiOperation(value = "提交匿踪查询任务",httpMethod = "POST",consumes = MediaType.APPLICATION_JSON_VALUE)
@RequestMapping("pirSubmitTask")
public BaseResultEntity pirSubmitTask(String resourceId,String pirParam,String taskName){
// public BaseResultEntity pirSubmitTask(@RequestBody BaseJsonParam<DataPirReq> req){
// 查询条件
DataPirReq param = new DataPirReq();
if (StringUtils.isBlank(resourceId)){
return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"resourceId");
}
if (StringUtils.isBlank(pirParam)){
return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"pirParam");
}
if (StringUtils.isBlank(taskName)){
return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"taskName");
}
return pirService.pirSubmitTask(resourceId,pirParam,taskName);
if (StringUtils.isBlank(pirParam)) {
return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"pirParam");
}
param.setResourceId(resourceId);
param.setTaskName(taskName);
return pirService.pirSubmitTask(param, pirParam);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ public TaskParam<TaskPIRParam> call() throws Exception {
TaskParam<TaskPIRParam> taskParam = new TaskParam(new TaskPIRParam());
taskParam.setTaskId(dataTask.getTaskIdName());
taskParam.setJobId(String.valueOf(job));
// 查询目标值,同组查询目标值使用逗号的字符串隔开
String[] querys = new String[dataPirKeyQuery.getQuery().size()];
for (int i = 0; i < dataPirKeyQuery.getQuery().size(); i++) {
querys[i] = String.join(",", dataPirKeyQuery.getQuery().get(i));
Expand All @@ -356,31 +357,54 @@ public TaskParam<TaskPIRParam> call() throws Exception {
}

@Async
public void pirGrpcTask(DataTask dataTask, String resourceId, String param) {
public void pirGrpcTask(DataTask dataTask, DataPirTask dataPirTask,String resourceColumnNames, List<DataPirKeyQuery> dataPirKeyQueries) {
Date date = new Date();
try {
dataTask.setTaskState(TaskStateEnum.IN_OPERATION.getStateType());
updateTaskState(dataTask);
String formatDate = DateUtil.formatDate(date, DateUtil.DateStyle.HOUR_FORMAT_SHORT.getFormat());
StringBuilder sb = new StringBuilder().append(baseConfiguration.getResultUrlDirPrefix()).append(formatDate).append("/").append(dataTask.getTaskIdName()).append(".csv");
dataTask.setTaskResultPath(sb.toString());
TaskPIRParam pirParam = new TaskPIRParam();
pirParam.setQueryParam(param.split(","));
pirParam.setServerData(resourceId);
pirParam.setOutputFullFilename(dataTask.getTaskResultPath());
TaskParam taskParam = new TaskParam();
taskParam.setTaskContentParam(pirParam);
taskParam.setTaskId(dataTask.getTaskIdName());
taskHelper.submit(taskParam);
if (taskParam.getSuccess()){
dataTask.setTaskState(TaskStateEnum.SUCCESS.getStateType());
}else {
dataTask.setTaskState(TaskStateEnum.FAIL.getStateType());
dataTask.setTaskErrorMsg("运行失败:"+taskParam.getError());
// List<DataPirKeyQuery> dataPirKeyQueries = JSONArray.parseArray(dataPirTask.getRetrievalId(), DataPirKeyQuery.class);
Map<String,String> jobMap = new HashMap<>();
List<FutureTask<TaskParam<TaskPIRParam>>> futureTasks = new ArrayList<>();
for (int i = 0; i < dataPirKeyQueries.size(); i++) {
FutureTask<TaskParam<TaskPIRParam>> pirTaskFutureTask = getPirTaskFutureTask(dataPirKeyQueries.get(i), dataTask, dataPirTask, resourceColumnNames, formatDate, i);
primaryThreadPool.submit(pirTaskFutureTask);
futureTasks.add(pirTaskFutureTask);
jobMap.put(i+"",String.join("+",dataPirKeyQueries.get(i).getKey()));
}
List<TaskParam<TaskPIRParam>> listTaskParams = new ArrayList<>();
for (FutureTask<TaskParam<TaskPIRParam>> futureTask : futureTasks) {
listTaskParams.add(futureTask.get());
}
for (TaskParam<TaskPIRParam> listTaskParam : listTaskParams) {
if (dataTask.getTaskState().equals(TaskStateEnum.FAIL.getStateType())){
if (!listTaskParam.getSuccess()){
dataTask.setTaskErrorMsg("\n【"+jobMap.get(listTaskParam.getJobId())+"】匹配规则出错:"+listTaskParam.getError());
}
}
if (!listTaskParam.getSuccess()){
dataTask.setTaskState(TaskStateEnum.FAIL.getStateType());
dataTask.setTaskErrorMsg("\n【"+jobMap.get(listTaskParam.getJobId())+"】匹配规则出错:"+listTaskParam.getError());
}else {
dataTask.setTaskState(TaskStateEnum.SUCCESS.getStateType());
}
}
if (dataTask.getTaskState().equals(TaskStateEnum.SUCCESS.getStateType())){
List<String> pirTaskResultData = dataRedisRepository.getPirTaskResultData(dataTask.getTaskIdName());
// log.info("数据写入文件sb:{} - pirTaskResultDataSize:{}",sb.toString(),pirTaskResultData.size());
boolean b = CsvUtil.csvWrite(sb.toString(), pirTaskResultData);
// log.info("数据写入文件结果:{}",b);

}
} catch (Exception e) {
dataTask.setTaskState(TaskStateEnum.FAIL.getStateType());
dataTask.setTaskErrorMsg(e.getMessage());
log.info("grpc pirSubmitTask Exception:{}", e.getMessage());
e.printStackTrace();
}finally {
dataRedisRepository.deletePirTaskResultKey(dataTask.getTaskIdName());
}
dataTask.setTaskEndTime(System.currentTimeMillis());
updateTaskState(dataTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.primihub.biz.entity.base.BaseResultEntity;
import com.primihub.biz.entity.base.BaseResultEnum;
import com.primihub.biz.entity.base.PageDataEntity;
import com.primihub.biz.entity.data.base.DataPirKeyQuery;
import com.primihub.biz.entity.data.dataenum.TaskStateEnum;
import com.primihub.biz.entity.data.dataenum.TaskTypeEnum;
import com.primihub.biz.entity.data.po.DataPirTask;
Expand Down Expand Up @@ -45,8 +46,8 @@ public class PirService {
public String getResultFilePath(String taskId,String taskDate){
return new StringBuilder().append(baseConfiguration.getResultUrlDirPrefix()).append(taskDate).append("/").append(taskId).append(".csv").toString();
}
public BaseResultEntity pirSubmitTask(String resourceId, String pirParam,String taskName) {
BaseResultEntity dataResource = otherBusinessesService.getDataResource(resourceId);
public BaseResultEntity pirSubmitTask(DataPirReq req, String pirParam) {
BaseResultEntity dataResource = otherBusinessesService.getDataResource(req.getResourceId());
if (dataResource.getCode()!=0) {
return BaseResultEntity.failure(BaseResultEnum.DATA_RUN_TASK_FAIL,"资源查询失败");
}
Expand All @@ -55,27 +56,58 @@ public BaseResultEntity pirSubmitTask(String resourceId, String pirParam,String
if (available == 1) {
return BaseResultEntity.failure(BaseResultEnum.DATA_RUN_TASK_FAIL,"资源不可用");
}
// dataResource columnName list
String resourceColumnNames = pirDataResource.getOrDefault("resourceColumnNameList", "").toString();
if (StringUtils.isBlank(resourceColumnNames)){
return BaseResultEntity.failure(BaseResultEnum.DATA_RUN_TASK_FAIL,"获取资源字段列表失败");
}
String[] resourceColumnNameArray = resourceColumnNames.split(",");
if (resourceColumnNameArray.length == 0) {
return BaseResultEntity.failure(BaseResultEnum.DATA_RUN_TASK_FAIL,"获取资源字段列表为空");
}

String[] queryColumnNames = {
resourceColumnNameArray[0]
};
// convert pirparam to query array
List<DataPirKeyQuery> dataPirKeyQueries = convertPirParamToQueryArray(pirParam,queryColumnNames);

DataTask dataTask = new DataTask();
// dataTask.setTaskIdName(UUID.randomUUID().toString());
dataTask.setTaskIdName(Long.toString(SnowflakeId.getInstance().nextId()));
dataTask.setTaskName(taskName);
dataTask.setTaskName(req.getTaskName());
dataTask.setTaskState(TaskStateEnum.IN_OPERATION.getStateType());
dataTask.setTaskType(TaskTypeEnum.PIR.getTaskType());
dataTask.setTaskStartTime(System.currentTimeMillis());
dataTaskPrRepository.saveDataTask(dataTask);
DataPirTask dataPirTask = new DataPirTask();
dataPirTask.setTaskId(dataTask.getTaskId());
// retrievalId will rent in web ,need to be readable
dataPirTask.setRetrievalId(pirParam);
dataPirTask.setProviderOrganName(pirDataResource.get("organName").toString());
dataPirTask.setResourceName(pirDataResource.get("resourceName").toString());
dataPirTask.setResourceId(resourceId);
dataPirTask.setResourceId(req.getResourceId());
dataTaskPrRepository.saveDataPirTask(dataPirTask);
dataAsyncService.pirGrpcTask(dataTask,resourceId,pirParam);
dataAsyncService.pirGrpcTask(dataTask,dataPirTask,resourceColumnNames,dataPirKeyQueries);
Map<String, Object> map = new HashMap<>();
map.put("taskId",dataTask.getTaskId());
return BaseResultEntity.success(map);
}

private static List<DataPirKeyQuery> convertPirParamToQueryArray(String pirParam, String[] resourceColumnNameArray) {
DataPirKeyQuery dataPirKeyQuery = new DataPirKeyQuery();
dataPirKeyQuery.setKey(resourceColumnNameArray);
String[] array = {
pirParam
};
List<String[]> queries = new ArrayList<>(resourceColumnNameArray.length);
for (int i = 0; i < resourceColumnNameArray.length; i++) {
queries.add(i, array);
}
dataPirKeyQuery.setQuery(queries);
return Collections.singletonList(dataPirKeyQuery);
}

public BaseResultEntity getPirTaskList(DataPirTaskReq req) {
List<DataPirTaskVo> dataPirTaskVos = dataTaskRepository.selectDataPirTaskPage(req);
if (dataPirTaskVos.isEmpty()) {
Expand Down

0 comments on commit 8470ecd

Please sign in to comment.