Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a instrumentation of load calls #178

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,16 @@ CompletableFuture<V> load(K key, Object loadContext) {
boolean cachingEnabled = loaderOptions.cachingEnabled();

stats.incrementLoadCount(new IncrementLoadCountStatisticsContext<>(key, loadContext));

DataLoaderInstrumentationContext<Object> ctx = ctxOrNoopCtx(instrumentation().beginLoad(dataLoader, key,loadContext));
CompletableFuture<V> cf;
if (cachingEnabled) {
return loadFromCache(key, loadContext, batchingEnabled);
cf = loadFromCache(key, loadContext, batchingEnabled);
} else {
return queueOrInvokeLoader(key, loadContext, batchingEnabled, false);
cf = queueOrInvokeLoader(key, loadContext, batchingEnabled, false);
}
ctx.onDispatched();
cf.whenComplete(ctx::onCompleted);
return cf;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ public ChainedDataLoaderInstrumentation addAll(Collection<DataLoaderInstrumentat
return new ChainedDataLoaderInstrumentation(list);
}


@Override
public DataLoaderInstrumentationContext<Object> beginLoad(DataLoader<?, ?> dataLoader, Object key, Object loadContext) {
return chainedCtx(it -> it.beginLoad(dataLoader, key, loadContext));
}

@Override
public DataLoaderInstrumentationContext<DispatchResult<?>> beginDispatch(DataLoader<?, ?> dataLoader) {
return chainedCtx(it -> it.beginDispatch(dataLoader));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,21 @@
*/
@PublicSpi
public interface DataLoaderInstrumentation {
/**
* This call back is done just before the {@link DataLoader#load(Object)} methods are invoked,
* and it completes when the load promise is completed. If the value is a cached {@link java.util.concurrent.CompletableFuture}
* then it might return almost immediately, otherwise it will return
* when the batch load function is invoked and values get returned
*
* @param dataLoader the {@link DataLoader} in question
* @param key the key used during the {@link DataLoader#load(Object)} call
* @param loadContext the load context used during the {@link DataLoader#load(Object, Object)} call
* @return a DataLoaderInstrumentationContext or null to be more performant
*/
default DataLoaderInstrumentationContext<Object> beginLoad(DataLoader<?, ?> dataLoader, Object key, Object loadContext) {
return null;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would allow some to "see" all DL load calls


/**
* This call back is done just before the {@link DataLoader#dispatch()} is invoked,
* and it completes when the dispatch call promise is done.
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/org/dataloader/DataLoaderCacheMapTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void should_access_to_future_dependants() {
Collection<CompletableFuture<Integer>> futures = dataLoader.getCacheMap().getAll();

List<CompletableFuture<Integer>> futuresList = new ArrayList<>(futures);
assertThat(futuresList.get(0).getNumberOfDependents(), equalTo(2));
assertThat(futuresList.get(1).getNumberOfDependents(), equalTo(1));
assertThat(futuresList.get(0).getNumberOfDependents(), equalTo(4)); // instrumentation is depending on the CF completing
assertThat(futuresList.get(1).getNumberOfDependents(), equalTo(2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,49 @@

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

class CapturingInstrumentation implements DataLoaderInstrumentation {
String name;
List<String> methods = new ArrayList<>();
protected String name;
protected List<String> methods = new ArrayList<>();

public CapturingInstrumentation(String name) {
this.name = name;
}

public String getName() {
return name;
}

public List<String> methods() {
return methods;
}

public List<String> notLoads() {
return methods.stream().filter(method -> !method.contains("beginLoad")).collect(Collectors.toList());
}

public List<String> onlyLoads() {
return methods.stream().filter(method -> method.contains("beginLoad")).collect(Collectors.toList());
}


@Override
public DataLoaderInstrumentationContext<Object> beginLoad(DataLoader<?, ?> dataLoader, Object key, Object loadContext) {
methods.add(name + "_beginLoad" +"_k:" + key);
return new DataLoaderInstrumentationContext<>() {
@Override
public void onDispatched() {
methods.add(name + "_beginLoad_onDispatched"+"_k:" + key);
}

@Override
public void onCompleted(Object result, Throwable t) {
methods.add(name + "_beginLoad_onCompleted"+"_k:" + key);
}
};
}

@Override
public DataLoaderInstrumentationContext<DispatchResult<?>> beginDispatch(DataLoader<?, ?> dataLoader) {
methods.add(name + "_beginDispatch");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ public CapturingInstrumentationReturnsNull(String name) {
super(name);
}

@Override
public DataLoaderInstrumentationContext<Object> beginLoad(DataLoader<?, ?> dataLoader, Object key, Object loadContext) {
methods.add(name + "_beginLoad" +"_k:" + key);
return null;
}

@Override
public DataLoaderInstrumentationContext<DispatchResult<?>> beginDispatch(DataLoader<?, ?> dataLoader) {
methods.add(name + "_beginDispatch");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,21 @@ void canChainTogetherOneInstrumentation() {

DataLoader<String, String> dl = DataLoaderFactory.newDataLoader(TestKit.keysAsValues(), options);

dl.load("A");
dl.load("B");
dl.load("X");
dl.load("Y");

CompletableFuture<List<String>> dispatch = dl.dispatch();

await().until(dispatch::isDone);

assertThat(capturingA.methods, equalTo(List.of("A_beginDispatch",
assertThat(capturingA.notLoads(), equalTo(List.of("A_beginDispatch",
"A_beginBatchLoader", "A_beginBatchLoader_onDispatched", "A_beginBatchLoader_onCompleted",
"A_beginDispatch_onDispatched", "A_beginDispatch_onCompleted")));

assertThat(capturingA.onlyLoads(), equalTo(List.of(
"A_beginLoad_k:X", "A_beginLoad_onDispatched_k:X", "A_beginLoad_k:Y", "A_beginLoad_onDispatched_k:Y",
"A_beginLoad_onCompleted_k:X", "A_beginLoad_onCompleted_k:Y"
)));
}


Expand All @@ -87,8 +92,8 @@ public void canChainTogetherManyInstrumentationsWithDifferentBatchLoaders(TestDa

DataLoader<String, String> dl = factory.idLoader(options);

dl.load("A");
dl.load("B");
dl.load("X");
dl.load("Y");

CompletableFuture<List<String>> dispatch = dl.dispatch();

Expand All @@ -98,16 +103,21 @@ public void canChainTogetherManyInstrumentationsWithDifferentBatchLoaders(TestDa
// A_beginBatchLoader happens before A_beginDispatch_onDispatched because these are sync
// and no async - a batch scheduler or async batch loader would change that
//
assertThat(capturingA.methods, equalTo(List.of("A_beginDispatch",
assertThat(capturingA.notLoads(), equalTo(List.of("A_beginDispatch",
"A_beginBatchLoader", "A_beginBatchLoader_onDispatched", "A_beginBatchLoader_onCompleted",
"A_beginDispatch_onDispatched", "A_beginDispatch_onCompleted")));

assertThat(capturingB.methods, equalTo(List.of("B_beginDispatch",
assertThat(capturingA.onlyLoads(), equalTo(List.of(
"A_beginLoad_k:X", "A_beginLoad_onDispatched_k:X", "A_beginLoad_k:Y", "A_beginLoad_onDispatched_k:Y",
"A_beginLoad_onCompleted_k:X", "A_beginLoad_onCompleted_k:Y"
)));

assertThat(capturingB.notLoads(), equalTo(List.of("B_beginDispatch",
"B_beginBatchLoader", "B_beginBatchLoader_onDispatched", "B_beginBatchLoader_onCompleted",
"B_beginDispatch_onDispatched", "B_beginDispatch_onCompleted")));

// it returned null on all its contexts - nothing to call back on
assertThat(capturingButReturnsNull.methods, equalTo(List.of("NULL_beginDispatch", "NULL_beginBatchLoader")));
assertThat(capturingButReturnsNull.notLoads(), equalTo(List.of("NULL_beginDispatch", "NULL_beginBatchLoader")));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,61 @@ public class DataLoaderInstrumentationTest {
return keys;
});

@Test
void canMonitorLoading() {
AtomicReference<DataLoader<?, ?>> dlRef = new AtomicReference<>();

CapturingInstrumentation instrumentation = new CapturingInstrumentation("x") {

@Override
public DataLoaderInstrumentationContext<Object> beginLoad(DataLoader<?, ?> dataLoader, Object key, Object loadContext) {
DataLoaderInstrumentationContext<Object> superCtx = super.beginLoad(dataLoader, key, loadContext);
dlRef.set(dataLoader);
return superCtx;
}

@Override
public DataLoaderInstrumentationContext<List<?>> beginBatchLoader(DataLoader<?, ?> dataLoader, List<?> keys, BatchLoaderEnvironment environment) {
return DataLoaderInstrumentationHelper.noOpCtx();
}
};

DataLoaderOptions options = new DataLoaderOptions()
.setInstrumentation(instrumentation)
.setMaxBatchSize(5);

DataLoader<String, String> dl = DataLoaderFactory.newDataLoader(snoozingBatchLoader, options);

List<String> keys = new ArrayList<>();
for (int i = 0; i < 3; i++) {
String key = "X" + i;
keys.add(key);
dl.load(key);
}

// load a key that is cached
dl.load("X0");

CompletableFuture<List<String>> dispatch = dl.dispatch();

await().until(dispatch::isDone);
assertThat(dlRef.get(), is(dl));
assertThat(dispatch.join(), equalTo(keys));

// the batch loading means they start and are instrumentation dispatched before they all end up completing
assertThat(instrumentation.onlyLoads(),
equalTo(List.of(
"x_beginLoad_k:X0", "x_beginLoad_onDispatched_k:X0",
"x_beginLoad_k:X1", "x_beginLoad_onDispatched_k:X1",
"x_beginLoad_k:X2", "x_beginLoad_onDispatched_k:X2",
"x_beginLoad_k:X0", "x_beginLoad_onDispatched_k:X0", // second cached call counts
"x_beginLoad_onCompleted_k:X0",
"x_beginLoad_onCompleted_k:X0", // each load call counts
"x_beginLoad_onCompleted_k:X1", "x_beginLoad_onCompleted_k:X2")));

}


@Test
void canMonitorDispatching() {
Stopwatch stopwatch = Stopwatch.stopwatchUnStarted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void endToEndIntegrationTest(TestDataLoaderFactory factory) {
await().until(loadA::isDone);
assertThat(loadA.join(), equalTo("A"));

assertThat(instrA.methods, equalTo(List.of("A_beginDispatch",
assertThat(instrA.notLoads(), equalTo(List.of("A_beginDispatch",
"A_beginBatchLoader", "A_beginBatchLoader_onDispatched", "A_beginBatchLoader_onCompleted",
"A_beginDispatch_onDispatched", "A_beginDispatch_onCompleted")));
}
Expand Down