diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/ActorContextImpl.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/ActorContextImpl.scala index b9077832f5..6d6864246a 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/ActorContextImpl.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/ActorContextImpl.scala @@ -284,7 +284,7 @@ import scala.util.Success def pipeToSelf[Value]( future: CompletionStage[Value], applyToResult: pekko.japi.function.Function2[Value, Throwable, T]): Unit = { - future.whenComplete { (value, ex) => + future.handle[Unit] { (value, ex) => if (ex != null) self.unsafeUpcast ! AdaptMessage(ex, applyToResult.apply(null.asInstanceOf[Value], _: Throwable)) else self.unsafeUpcast ! AdaptMessage(value, applyToResult.apply(_: Value, null)) diff --git a/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala b/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala index ea9b118e10..2be1ad59b6 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala @@ -15,7 +15,6 @@ package org.apache.pekko.pattern import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletionStage -import java.util.function.BiConsumer import scala.concurrent.{ ExecutionContext, Future, Promise } import scala.concurrent.duration.FiniteDuration @@ -78,11 +77,9 @@ trait FutureTimeoutSupport { using.scheduleOnce(duration) { try { val future = value - future.whenComplete(new BiConsumer[T, Throwable] { - override def accept(t: T, ex: Throwable): Unit = { - if (t != null) p.complete(t) - if (ex != null) p.completeExceptionally(ex) - } + future.handle[Unit]((t: T, ex: Throwable) => { + if (t != null) p.complete(t) + if (ex != null) p.completeExceptionally(ex) }) } catch { case NonFatal(ex) => p.completeExceptionally(ex) diff --git a/actor/src/main/scala/org/apache/pekko/pattern/PipeToSupport.scala b/actor/src/main/scala/org/apache/pekko/pattern/PipeToSupport.scala index 2c536a3b46..8a8dc5f1cb 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/PipeToSupport.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/PipeToSupport.scala @@ -14,7 +14,6 @@ package org.apache.pekko.pattern import java.util.concurrent.CompletionStage -import java.util.function.BiConsumer import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Failure, Success } @@ -56,19 +55,15 @@ trait PipeToSupport { final class PipeableCompletionStage[T](val future: CompletionStage[T])( implicit @unused executionContext: ExecutionContext) { def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = Actor.noSender): CompletionStage[T] = { - future.whenComplete(new BiConsumer[T, Throwable] { - override def accept(t: T, ex: Throwable): Unit = { - if (t != null) recipient ! t - if (ex != null) recipient ! Status.Failure(ex) - } + future.whenComplete((t: T, ex: Throwable) => { + if (t != null) recipient ! t + if (ex != null) recipient ! Status.Failure(ex) }) } def pipeToSelection(recipient: ActorSelection)(implicit sender: ActorRef = Actor.noSender): CompletionStage[T] = { - future.whenComplete(new BiConsumer[T, Throwable] { - override def accept(t: T, ex: Throwable): Unit = { - if (t != null) recipient ! t - if (ex != null) recipient ! Status.Failure(ex) - } + future.whenComplete((t: T, ex: Throwable) => { + if (t != null) recipient ! t + if (ex != null) recipient ! Status.Failure(ex) }) } def to(recipient: ActorRef): PipeableCompletionStage[T] = to(recipient, Actor.noSender)