Converting akka scala futures to java futures
Back in akka land! I’m using the ask pattern to get results back from actors since I have a requirement to block and get a result (I can’t wait for an actor to push at a later date). Thats fine, but converting from scala futures to java completable futures is a pain. I also, (like mentioned in another post) want to make sure that my async responses capture and set the MDC for proper logging.
My final usage should look something like:
private \<Response, Request\> Future\<Response\> askActorForResponseAsync(Request source) {
final FiniteDuration askTimeout = new FiniteDuration(config.getAskForResultTimeout().toMillis(), TimeUnit.MILLISECONDS);
final Timeout timeout = new Timeout(askTimeout);
final scala.concurrent.Future\<Object\> ask = Patterns.ask(master.getActor(), new PersistableMessageContext(source), timeout);
return FutureConverter.fromScalaFuture(ask)
.executeOn(actorSystem.dispatcher())
.thenApply(i -\> (Response) i);
}
The idea is that I’m going to translate a scala future with a callback into a completable future java promise.
Next up, the future converter:
public class FutureConverter {
public static \<T\> FromScalaFuture\<T\> fromScalaFuture(scala.concurrent.Future\<T\> future) {
return new FromScalaFuture\<\>(future);
}
}
This is just an entrypoint into a new class that can give you a nice fluent interface to provide the execution context.
Next, a class whose job is to create an akka callback and convert it into a completable future.
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import java.util.concurrent.CompletableFuture;
public class FromScalaFuture\<T\> {
private final Future\<T\> future;
public FromScalaFuture(Future\<T\> future) {
this.future = future;
}
public CompletableFuture\<T\> executeOn(ExecutionContext context) {
final CompletableFuture\<T\> completableFuture = new CompletableFuture\<\>();
final AkkaOnCompleteCallback\<T\> completer = AkkaCompletionConverter.\<T\>createCompleter((failure, success) -\> {
if (failure != null) {
completableFuture.completeExceptionally(failure);
}
else {
completableFuture.complete(success);
}
});
future.onComplete(completer.toScalaCallback(), context);
return completableFuture;
}
}
And finally another guy whose job it is to translate java functions into akka callbacks:
import akka.dispatch.OnComplete;
@FunctionalInterface
public interface AkkaOnCompleteCallback\<T\> {
OnComplete\<T\> toScalaCallback();
}
import akka.dispatch.OnComplete;
import org.slf4j.MDC;
import java.util.Map;
import java.util.function.BiConsumer;
public class AkkaCompletionConverter {
/\*\*
\* Handles closing over the mdc context map and setting the responding future thread with the
\* previous context
\*
\* @param callback
\* @return
\*/
public static \<T\> AkkaOnCompleteCallback\<T\> createCompleter(BiConsumer\<Throwable, T\> callback) {
return () -\> {
final Map\<String, String\> oldContextMap = MDC.getCopyOfContextMap();
return new OnComplete\<T\>() {
@Override public void onComplete(final Throwable failure, final T success) throws Throwable {
// capture the current threads context map
final Map\<String, String\> currentThreadsContext = MDC.getCopyOfContextMap();
// set the closed over context map
if(oldContextMap != null) {
MDC.setContextMap(oldContextMap);
}
callback.accept(failure, success);
// return the current threads previous context map
if(currentThreadsContext != null) {
MDC.setContextMap(currentThreadsContext);
}
}
};
};
}
}