Skip to content

Commit

Permalink
Support trace context propagation on ListenableFuture returned by `…
Browse files Browse the repository at this point in the history
…submitListenable`.
  • Loading branch information
JasonMing authored and wujunming committed May 1, 2020
1 parent 5a11a13 commit 84efc78
Show file tree
Hide file tree
Showing 5 changed files with 420 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Copyright 2017-2020 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.opentracing.contrib.spring.cloud.async.instrument;

import io.opentracing.Span;
import io.opentracing.Tracer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SuccessCallback;

/**
* @author MiNG
*/
public class TracedListenableFuture<T> implements ListenableFuture<T> {

private final ListenableFuture<T> delegate;
private final Tracer tracer;
private final Span span;

public TracedListenableFuture(ListenableFuture<T> delegate, Tracer tracer) {
this(delegate, tracer, tracer.activeSpan());
}

public TracedListenableFuture(ListenableFuture<T> delegate, Tracer tracer, Span span) {
this.delegate = delegate;
this.tracer = tracer;
this.span = span;
}

@Override
public void addCallback(ListenableFutureCallback<? super T> callback) {
delegate.addCallback(new TracedListenableFutureCallback<>(callback, tracer, span));
}

@Override
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
delegate.addCallback(new TracedListenableFutureCallback<>(successCallback, failureCallback, tracer, span));
}

@Override
public CompletableFuture<T> completable() {
return delegate.completable();
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return delegate.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return delegate.isCancelled();
}

@Override
public boolean isDone() {
return delegate.isDone();
}

@Override
public T get() throws InterruptedException, ExecutionException {
return delegate.get();
}

@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Copyright 2017-2020 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.opentracing.contrib.spring.cloud.async.instrument;

import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SuccessCallback;

/**
* @author MiNG
*/
public class TracedListenableFutureCallback<T> implements ListenableFutureCallback<T> {

private final SuccessCallback<T> successDelegate;
private final FailureCallback failureDelegate;
private final Tracer tracer;
private final Span span;

public TracedListenableFutureCallback(ListenableFutureCallback<T> delegate, Tracer tracer) {
this(delegate, delegate, tracer, tracer.activeSpan());
}

public TracedListenableFutureCallback(ListenableFutureCallback<T> delegate, Tracer tracer, Span span) {
this(delegate, delegate, tracer, span);
}

public TracedListenableFutureCallback(SuccessCallback<T> successDelegate, FailureCallback failureDelegate, Tracer tracer) {
this(successDelegate, failureDelegate, tracer, tracer.activeSpan());
}

public TracedListenableFutureCallback(@Nullable SuccessCallback<T> successDelegate, @Nullable FailureCallback failureDelegate, Tracer tracer, Span span) {
Assert.notNull(successDelegate, "'successDelegate' must not be null");
Assert.notNull(failureDelegate, "'failureDelegate' must not be null");
this.successDelegate = successDelegate;
this.failureDelegate = failureDelegate;
this.tracer = tracer;
this.span = span;
}

@Override
public void onSuccess(T result) {
try (Scope ignored = span == null ? null : tracer.scopeManager().activate(span)) {
successDelegate.onSuccess(result);
}
}

@Override
public void onFailure(Throwable ex) {
try (Scope ignored = span == null ? null : tracer.scopeManager().activate(span)) {
failureDelegate.onFailure(ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2019 The OpenTracing Authors
* Copyright 2017-2020 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -59,12 +59,12 @@ public <T> Future<T> submit(Callable<T> task) {

@Override
public ListenableFuture<?> submitListenable(Runnable task) {
return this.delegate.submitListenable(new TracedRunnable(task, tracer));
return new TracedListenableFuture<>(this.delegate.submitListenable(new TracedRunnable(task, tracer)), tracer);
}

@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
return this.delegate.submitListenable(new TracedCallable<>(task, tracer));
return new TracedListenableFuture<>(this.delegate.submitListenable(new TracedCallable<>(task, tracer)), tracer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2019 The OpenTracing Authors
* Copyright 2017-2020 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -26,6 +26,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;

import org.springframework.lang.Nullable;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
Expand Down Expand Up @@ -113,12 +114,12 @@ public <T> Future<T> submit(Callable<T> task) {

@Override
public ListenableFuture<?> submitListenable(Runnable task) {
return delegate.submitListenable(new TracedRunnable(task, tracer));
return new TracedListenableFuture<>(delegate.submitListenable(new TracedRunnable(task, tracer)), tracer);
}

@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
return delegate.submitListenable(new TracedCallable<>(task, tracer));
return new TracedListenableFuture<>(delegate.submitListenable(new TracedCallable<>(task, tracer)), tracer);
}

@Override
Expand Down
Loading

0 comments on commit 84efc78

Please sign in to comment.