diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextPipeToSelfTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextPipeToSelfTest.java index c45b78a2c5..d429202249 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextPipeToSelfTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextPipeToSelfTest.java @@ -5,16 +5,20 @@ package akka.actor.typed.javadsl; import akka.actor.testkit.typed.javadsl.LogCapturing; +import akka.actor.testkit.typed.javadsl.LoggingTestKit; import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.actor.typed.Props; +import akka.actor.typed.Signal; import com.typesafe.config.ConfigFactory; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.scalatestplus.junit.JUnitSuite; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -54,6 +58,53 @@ public final class ActorContextPipeToSelfTest extends JUnitSuite { assertEquals("ko: boom", responseFrom(failedFuture(new RuntimeException("boom")))); } + @Test + public void handleAdaptedNull() { + final TestProbe probe = testKit.createTestProbe(); + ActorRef actor = + testKit.spawn( + Behaviors.setup( + context -> { + CompletableFuture future = new CompletableFuture<>(); + context.pipeToSelf( + future, + (ok, ko) -> { + // should happen even if ok is null + probe.ref().tell("adapting"); + if (ko == null) // but we pass it on if there is no exception rather than + // non-null ok val + return ok; + // is not allowed + else throw new RuntimeException(ko); + }); + + return Behaviors.receive(String.class) + .onMessageEquals( + "complete-with-null", + () -> { + future.complete(null); + return Behaviors.same(); + }) + .onAnyMessage( + msg -> { + probe.ref().tell(msg); + return Behaviors.same(); + }) + .build(); + })); + + LoggingTestKit.warn( + "Adapter function returned null which is not valid as an actor message, ignoring") + .expect( + testKit.system(), + () -> { + actor.tell("complete-with-null"); + probe.expectMessage("adapting"); + probe.expectNoMessage(Duration.ofMillis(200)); + return null; + }); + } + private CompletableFuture failedFuture(final Throwable ex) { final CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(ex); diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextPipeToSelfSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextPipeToSelfSpec.scala index ae3b4656bb..25bbdfa13f 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextPipeToSelfSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextPipeToSelfSpec.scala @@ -7,13 +7,15 @@ package akka.actor.typed.scaladsl import scala.concurrent.Future import scala.util.control.NoStackTrace import scala.util.{ Failure, Success } - +import akka.actor.testkit.typed.scaladsl.LoggingTestKit import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe } import akka.actor.typed.Props import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike +import scala.concurrent.Promise + object ActorContextPipeToSelfSpec { val config = ConfigFactory.parseString(""" |pipe-to-self-spec-dispatcher { @@ -31,6 +33,31 @@ final class ActorContextPipeToSelfSpec "The Scala DSL ActorContext pipeToSelf" must { "handle success" in { responseFrom(Future.successful("hi")) should ===("ok: hi") } "handle failure" in { responseFrom(Future.failed(Fail)) should ===(s"ko: $Fail") } + "handle adapted null" in { + val probe = testKit.createTestProbe[String]() + val promise = Promise[String]() + testKit.spawn(Behaviors.setup[String] { ctx => + ctx.pipeToSelf(promise.future) { + case Success(value) => + probe.ref ! "adapting" + value // we're passing on null here + case Failure(ex) => throw ex + } + + Behaviors.receiveMessage { + case msg => + probe.ref ! msg + Behaviors.same + + } + }) + + LoggingTestKit.warn("Adapter function returned null which is not valid as an actor message, ignoring").expect { + // (probably more likely to happen in Java) + promise.success(null.asInstanceOf[String]) + } + + } } object Fail extends NoStackTrace diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index eaf53e7d1e..14286f8267 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -220,9 +220,9 @@ import org.slf4j.LoggerFactory future: CompletionStage[Value], applyToResult: akka.japi.function.Function2[Value, Throwable, T]): Unit = { future.whenComplete { (value, ex) => - if (value != null) self.unsafeUpcast ! AdaptMessage(value, applyToResult.apply(_: Value, null)) if (ex != null) self.unsafeUpcast ! AdaptMessage(ex, applyToResult.apply(null.asInstanceOf[Value], _: Throwable)) + else self.unsafeUpcast ! AdaptMessage(value, applyToResult.apply(_: Value, null)) } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala index 98b0904223..bc12717abb 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala @@ -14,13 +14,13 @@ import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior import akka.actor.typed.internal.BehaviorImpl.StoppedBehavior import akka.actor.typed.internal.adapter.ActorAdapter.TypedActorFailedException import akka.annotation.InternalApi + import scala.annotation.tailrec import scala.util.Failure import scala.util.Success import scala.util.Try import scala.util.control.Exception.Catcher import scala.annotation.switch - import akka.actor.typed.internal.TimerSchedulerImpl.TimerMsg import akka.util.OptionVal @@ -182,6 +182,9 @@ import akka.util.OptionVal private def withSafelyAdapted[U, V](adapt: () => U)(body: U => V): Unit = { Try(adapt()) match { + case Success(null) => + ctx.log.warn( + "Adapter function returned null which is not valid as an actor message, ignoring. This can happen for example when using pipeToSelf and returning null from the adapt function. Null value is ignored and not passed on to actor.") case Success(a) => body(a) case Failure(ex) =>