Skip to content

Commit

Permalink
Make ObserverHolder thread safe by having a thread local observer member
Browse files Browse the repository at this point in the history
  • Loading branch information
albertogpz committed Sep 20, 2021
1 parent d0113fc commit 2cb85bc
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.apache.geode.cache.query.internal;

import static org.apache.geode.cache.Region.SEPARATOR;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -60,10 +61,12 @@ public class QueryTraceJUnitTest {
@Before
public void setUp() throws Exception {
CacheUtils.startCache();
DefaultQuery.testHook = new BeforeQueryExecutionHook();
}

@After
public void tearDown() throws Exception {
DefaultQuery.testHook = null;
CacheUtils.closeCache();
}

Expand Down Expand Up @@ -104,7 +107,11 @@ public void testTraceOnPartitionedRegionWithTracePrefix() throws Exception {
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -141,7 +148,11 @@ public void testTraceOnLocalRegionWithTracePrefix() throws Exception {
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -183,7 +194,11 @@ public void testNegTraceOnPartitionedRegionWithTracePrefix() throws Exception {
assertFalse(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertFalse(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should not have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isNotInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -223,7 +238,11 @@ public void testNegTraceOnLocalRegionWithTracePrefix() throws Exception {
assertFalse(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertFalse(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should not have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isNotInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -262,7 +281,11 @@ public void testTraceOnPartitionedRegionWithTracePrefixNoComments() throws Excep
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -296,8 +319,11 @@ public void testTraceOnLocalRegionWithTracePrefixNoComments() throws Exception {
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);
// The query should return all elements in region.

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
}
Expand Down Expand Up @@ -331,7 +357,11 @@ public void testTraceOnPartitionedRegionWithSmallTracePrefixNoComments() throws
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -366,7 +396,11 @@ public void testTraceOnLocalRegionWithSmallTracePrefixNoComments() throws Except
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -438,4 +472,21 @@ public void testQueryFailLocalRegionWithSmallTracePrefixNoSpace() throws Excepti
}
}

private class BeforeQueryExecutionHook implements DefaultQuery.TestHook {
private QueryObserver observer = null;

@Override
public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
final ExecutionContext executionContext) {
switch (spot) {
case BEFORE_QUERY_EXECUTION:
observer = QueryObserverHolder.getInstance();
break;
}
}

public QueryObserver getObserver() {
return observer;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,31 +49,31 @@ public class QueryObserverHolder {
* The current observer which will be notified of all query events.
*/
@MakeNotStatic
private static QueryObserver _instance = NO_OBSERVER;
private static ThreadLocal<QueryObserver> _instance = ThreadLocal.withInitial(() -> NO_OBSERVER);

/**
* Set the given observer to be notified of query events. Returns the current observer.
*/
public static QueryObserver setInstance(QueryObserver observer) {
Support.assertArg(observer != null, "setInstance expects a non-null argument!");
QueryObserver oldObserver = _instance;
_instance = observer;
QueryObserver oldObserver = _instance.get();
_instance.set(observer);
return oldObserver;
}

public static boolean hasObserver() {
return _instance != NO_OBSERVER;
return _instance.get() != NO_OBSERVER;
}

/** Return the current QueryObserver instance */
public static QueryObserver getInstance() {
return _instance;
return _instance.get();
}

/**
* Only for test purposes.
*/
public static void reset() {
_instance = NO_OBSERVER;
_instance.set(NO_OBSERVER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,13 @@ private DataCommandResult select(InternalCache cache, Object principal, String q
Query query = qs.newQuery(queryString);
DefaultQuery tracedQuery = (DefaultQuery) query;
WrappedIndexTrackingQueryObserver queryObserver = null;
QueryObserver origQueryObserver = null;
String queryVerboseMsg = null;
long startTime = -1;
if (tracedQuery.isTraced()) {
startTime = NanoTimer.getTime();
queryObserver = new WrappedIndexTrackingQueryObserver();
QueryObserverHolder.setInstance(queryObserver);
origQueryObserver = QueryObserverHolder.setInstance(queryObserver);
}
List<SelectResultRow> list = new ArrayList<>();

Expand All @@ -237,6 +238,9 @@ private DataCommandResult select(InternalCache cache, Object principal, String q
if (queryObserver != null) {
QueryObserverHolder.reset();
}
if (tracedQuery.isTraced()) {
QueryObserverHolder.setInstance(origQueryObserver);
}
}
}

Expand Down

0 comments on commit 2cb85bc

Please sign in to comment.