Skip to content

Commit

Permalink
Fix the deserialization exception caused by the inconsistent column o…
Browse files Browse the repository at this point in the history
…rder between arrow vector data and open scanner results.
  • Loading branch information
gnehil committed Jan 7, 2025
1 parent 657f7c6 commit ca7401b
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,11 @@ public QueryPlan getQueryPlan(String database, String table, String sql) throws
throw new DorisException();
}
String entity = EntityUtils.toString(response.getEntity());
return MAPPER.readValue(extractEntity(entity, "data").traverse(), QueryPlan.class);
JsonNode dataJsonNode = extractEntity(entity, "data");
if (dataJsonNode.get("exception") != null) {
throw new DorisException("query plan failed, exception: " + dataJsonNode.get("exception").asText());
}
return MAPPER.readValue(dataJsonNode.traverse(), QueryPlan.class);
} catch (Exception e) {
throw new RuntimeException("query plan request failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ protected AbstractThriftReader(DorisReaderPartition partition) throws Exception
if (logger.isDebugEnabled()) {
logger.debug("origin thrift read Schema: " + schema + ", processed schema: " + dorisSchema);
}

if (isAsync) {
int blockingQueueSize = config.getValue(DorisOptions.DORIS_DESERIALIZE_QUEUE_SIZE);
this.rowBatchQueue = new ArrayBlockingQueue<>(blockingQueueSize);
Expand Down Expand Up @@ -243,22 +242,21 @@ protected Schema processDorisSchema(DorisReaderPartition partition, final Schema
Schema tableSchema = frontend.getTableSchema(partition.getDatabase(), partition.getTable());
Map<String, Field> fieldTypeMap = tableSchema.getProperties().stream()
.collect(Collectors.toMap(Field::getName, Function.identity()));
Map<String, Field> scanTypeMap = originSchema.getProperties().stream()
.collect(Collectors.toMap(Field::getName, Function.identity()));
String[] readColumns = partition.getReadColumns();
List<Field> newFieldList = new ArrayList<>();
int offset = 0;
for (int i = 0; i < readColumns.length; i++) {
String readColumn = readColumns[i];
if (!fieldTypeMap.containsKey(readColumn) && readColumn.contains(" AS ")) {
for (String readColumn : readColumns) {
if (readColumn.contains(" AS ")) {
int asIdx = readColumn.indexOf(" AS ");
String realColumn = readColumn.substring(asIdx + 4).trim().replaceAll("`", "");
if (fieldTypeMap.containsKey(realColumn)
if (fieldTypeMap.containsKey(realColumn) && scanTypeMap.containsKey(realColumn)
&& ("BITMAP".equalsIgnoreCase(fieldTypeMap.get(realColumn).getType())
|| "HLL".equalsIgnoreCase(fieldTypeMap.get(realColumn).getType()))) {
newFieldList.add(new Field(realColumn, TPrimitiveType.VARCHAR.name(), null, 0, 0, null));
offset++;
}
} else {
newFieldList.add(originSchema.getProperties().get(i + offset));
newFieldList.add(scanTypeMap.get(readColumn.trim().replaceAll("`", "")));
}
}
processedSchema.setProperties(newFieldList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ private[spark] class ScalaDorisRowRDDIterator(context: TaskContext,
extends AbstractDorisRDDIterator[Row](context, partition) {

override def initReader(config: DorisConfig): Unit = {
config.setProperty(DorisOptions.DORIS_READ_FIELDS, schema.map(f => s"`${f.name}`").mkString(","))
config.getValue(DorisOptions.READ_MODE).toLowerCase match {
case "thrift" => config.setProperty(DorisOptions.DORIS_VALUE_READER_CLASS, classOf[DorisRowThriftReader].getName)
case "arrow" => config.setProperty(DorisOptions.DORIS_VALUE_READER_CLASS, classOf[DorisRowFlightSqlReader].getName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ object SchemaConvertors {
def convertToSchema(tscanColumnDescs: Seq[TScanColumnDesc]): Schema = {
val schema = new Schema(tscanColumnDescs.length)
tscanColumnDescs.foreach(desc => {
// println(desc.getName + " " + desc.getType.name())
schema.put(new Field(desc.getName, desc.getType.name, "", 0, 0, ""))
})
schema
Expand Down

0 comments on commit ca7401b

Please sign in to comment.