Skip to content

Commit

Permalink
fix(core): Don't index part keys with invalid schema (#1870)
Browse files Browse the repository at this point in the history
When bootstrapping the raw index we skip over tracking items with invalid schemas,
signified by partId = -1.  However, today we still index them which can create query
errors later on like the following:

```
java.lang.IllegalStateException: This shouldn't happen since every document should have a partIdDv
	at filodb.core.memstore.PartIdCollector.collect(PartKeyLuceneIndex.scala:963)
	at org.apache.lucene.search.Weight$DefaultBulkScorer.scoreAll(Weight.java:305)
	at org.apache.lucene.search.Weight$DefaultBulkScorer.score(Weight.java:247)
	at org.apache.lucene.search.BulkScorer.score(BulkScorer.java:38)
	at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:776)
	at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:551)
	at filodb.core.memstore.PartKeyLuceneIndex.$anonfun$searchFromFilters$1(PartKeyLuceneIndex.scala:635)
	at filodb.core.memstore.PartKeyLuceneIndex.$anonfun$searchFromFilters$1$adapted(PartKeyLuceneIndex.scala:635)
	at filodb.core.memstore.PartKeyLuceneIndex.withNewSearcher(PartKeyLuceneIndex.scala:279)
	at filodb.core.memstore.PartKeyLuceneIndex.searchFromFilters(PartKeyLuceneIndex.scala:635)
	at filodb.core.memstore.PartKeyLuceneIndex.partIdsFromFilters(PartKeyLuceneIndex.scala:591)
	at filodb.core.memstore.TimeSeriesShard.labelValuesWithFilters(TimeSeriesShard.scala:1782)
```

This fix ensures that we don't index part keys we skip during bootstrap so that the in memory
shard and index are consistent with each other.
  • Loading branch information
rfairfax authored Oct 16, 2024
1 parent 69df0c2 commit 84f7ade
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,14 @@ class RawIndexBootstrapper(colStore: ColumnStore) {
colStore.scanPartKeys(ref, shardNum)
.map { pk =>
val partId = assignPartId(pk)
index.addPartKey(pk.partKey, partId, pk.startTime, pk.endTime)()
// -1 is returned if we skiped the part key for any reason, such as
// unknown schema or memory issues
//
// assignPartId will log these cases, so no extra logging is needed
// here
if (partId != -1) {
index.addPartKey(pk.partKey, partId, pk.startTime, pk.endTime)()
}
}
.countL
.map { count =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,40 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte

}

it("should recover index data from col store ignoring bad schemas") {
val partBuilder = new RecordBuilder(TestData.nativeMem)

val pkPtrs = GdeltTestData.partKeyFromRecords(dataset1,
records(dataset1, linearMultiSeries().take(2)), Some(partBuilder))
val pks = pkPtrs.map(dataset1.partKeySchema.asByteArray(_)).toArray

// Corrupt the schema of the second item
pks(1)(4) = 0xFF.toByte
pks(1)(5) = 0xFF.toByte

val colStore = new NullColumnStore() {
override def scanPartKeys(ref: DatasetRef, shard: Int): Observable[PartKeyRecord] = {
val keys = Seq(
PartKeyRecord(pks(0), 50, Long.MaxValue, 0),
PartKeyRecord(pks(1), 250, Long.MaxValue, 0)
)
Observable.fromIterable(keys)
}
}

val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore())
memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf.copy(groupsPerShard = 2,
diskTTLSeconds = 1.hour.toSeconds.toInt,
flushInterval = 10.minutes), 1)
Thread sleep 1000

val tsShard = memStore.asInstanceOf[TimeSeriesMemStore].getShard(dataset1.ref, 0).get
tsShard.recoverIndex().futureValue

tsShard.partitions.size shouldEqual 1 // only 1, skipping the bad schema
tsShard.partKeyIndex.indexNumEntries shouldEqual 1 // only 1, skipping the bad schema
}

it("should lookupPartitions and return correct PartLookupResult") {
memStore.setup(dataset2.ref, schemas2h, 0, TestData.storeConf, 1)
val data = records(dataset2, withMap(linearMultiSeries().take(20))) // 2 records per series x 10 series
Expand Down

0 comments on commit 84f7ade

Please sign in to comment.