Log warning when typed message adapters return null (#28590)

Can happen if

 * pipeToSelf adapt function returns null (can easily happen in Java with two param adapt
   when completion stage is completed with null and check is for exception non-null)
 * messageAdapter returns null (less likely)
This commit is contained in:
Johan Andrén 2020-02-28 10:52:35 +01:00 committed by GitHub
parent 6fe2f66adc
commit e4fba1cfab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 84 additions and 3 deletions

View file

@ -5,16 +5,20 @@
package akka.actor.typed.javadsl; package akka.actor.typed.javadsl;
import akka.actor.testkit.typed.javadsl.LogCapturing; import akka.actor.testkit.typed.javadsl.LogCapturing;
import akka.actor.testkit.typed.javadsl.LoggingTestKit;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe; import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior; import akka.actor.typed.Behavior;
import akka.actor.typed.Props; import akka.actor.typed.Props;
import akka.actor.typed.Signal;
import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite; import org.scalatestplus.junit.JUnitSuite;
import java.time.Duration;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
@ -54,6 +58,53 @@ public final class ActorContextPipeToSelfTest extends JUnitSuite {
assertEquals("ko: boom", responseFrom(failedFuture(new RuntimeException("boom")))); assertEquals("ko: boom", responseFrom(failedFuture(new RuntimeException("boom"))));
} }
@Test
public void handleAdaptedNull() {
final TestProbe<String> probe = testKit.createTestProbe();
ActorRef<String> actor =
testKit.spawn(
Behaviors.setup(
context -> {
CompletableFuture<String> future = new CompletableFuture<>();
context.pipeToSelf(
future,
(ok, ko) -> {
// should happen even if ok is null
probe.ref().tell("adapting");
if (ko == null) // but we pass it on if there is no exception rather than
// non-null ok val
return ok;
// is not allowed
else throw new RuntimeException(ko);
});
return Behaviors.receive(String.class)
.onMessageEquals(
"complete-with-null",
() -> {
future.complete(null);
return Behaviors.same();
})
.onAnyMessage(
msg -> {
probe.ref().tell(msg);
return Behaviors.same();
})
.build();
}));
LoggingTestKit.warn(
"Adapter function returned null which is not valid as an actor message, ignoring")
.expect(
testKit.system(),
() -> {
actor.tell("complete-with-null");
probe.expectMessage("adapting");
probe.expectNoMessage(Duration.ofMillis(200));
return null;
});
}
private CompletableFuture<String> failedFuture(final Throwable ex) { private CompletableFuture<String> failedFuture(final Throwable ex) {
final CompletableFuture<String> future = new CompletableFuture<>(); final CompletableFuture<String> future = new CompletableFuture<>();
future.completeExceptionally(ex); future.completeExceptionally(ex);

View file

@ -7,13 +7,15 @@ package akka.actor.typed.scaladsl
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import scala.util.{ Failure, Success } import scala.util.{ Failure, Success }
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe } import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe }
import akka.actor.typed.Props import akka.actor.typed.Props
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.wordspec.AnyWordSpecLike
import scala.concurrent.Promise
object ActorContextPipeToSelfSpec { object ActorContextPipeToSelfSpec {
val config = ConfigFactory.parseString(""" val config = ConfigFactory.parseString("""
|pipe-to-self-spec-dispatcher { |pipe-to-self-spec-dispatcher {
@ -31,6 +33,31 @@ final class ActorContextPipeToSelfSpec
"The Scala DSL ActorContext pipeToSelf" must { "The Scala DSL ActorContext pipeToSelf" must {
"handle success" in { responseFrom(Future.successful("hi")) should ===("ok: hi") } "handle success" in { responseFrom(Future.successful("hi")) should ===("ok: hi") }
"handle failure" in { responseFrom(Future.failed(Fail)) should ===(s"ko: $Fail") } "handle failure" in { responseFrom(Future.failed(Fail)) should ===(s"ko: $Fail") }
"handle adapted null" in {
val probe = testKit.createTestProbe[String]()
val promise = Promise[String]()
testKit.spawn(Behaviors.setup[String] { ctx =>
ctx.pipeToSelf(promise.future) {
case Success(value) =>
probe.ref ! "adapting"
value // we're passing on null here
case Failure(ex) => throw ex
}
Behaviors.receiveMessage {
case msg =>
probe.ref ! msg
Behaviors.same
}
})
LoggingTestKit.warn("Adapter function returned null which is not valid as an actor message, ignoring").expect {
// (probably more likely to happen in Java)
promise.success(null.asInstanceOf[String])
}
}
} }
object Fail extends NoStackTrace object Fail extends NoStackTrace

View file

@ -220,9 +220,9 @@ import org.slf4j.LoggerFactory
future: CompletionStage[Value], future: CompletionStage[Value],
applyToResult: akka.japi.function.Function2[Value, Throwable, T]): Unit = { applyToResult: akka.japi.function.Function2[Value, Throwable, T]): Unit = {
future.whenComplete { (value, ex) => future.whenComplete { (value, ex) =>
if (value != null) self.unsafeUpcast ! AdaptMessage(value, applyToResult.apply(_: Value, null))
if (ex != null) if (ex != null)
self.unsafeUpcast ! AdaptMessage(ex, applyToResult.apply(null.asInstanceOf[Value], _: Throwable)) self.unsafeUpcast ! AdaptMessage(ex, applyToResult.apply(null.asInstanceOf[Value], _: Throwable))
else self.unsafeUpcast ! AdaptMessage(value, applyToResult.apply(_: Value, null))
} }
} }

View file

@ -14,13 +14,13 @@ import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior
import akka.actor.typed.internal.BehaviorImpl.StoppedBehavior import akka.actor.typed.internal.BehaviorImpl.StoppedBehavior
import akka.actor.typed.internal.adapter.ActorAdapter.TypedActorFailedException import akka.actor.typed.internal.adapter.ActorAdapter.TypedActorFailedException
import akka.annotation.InternalApi import akka.annotation.InternalApi
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.util.Failure import scala.util.Failure
import scala.util.Success import scala.util.Success
import scala.util.Try import scala.util.Try
import scala.util.control.Exception.Catcher import scala.util.control.Exception.Catcher
import scala.annotation.switch import scala.annotation.switch
import akka.actor.typed.internal.TimerSchedulerImpl.TimerMsg import akka.actor.typed.internal.TimerSchedulerImpl.TimerMsg
import akka.util.OptionVal import akka.util.OptionVal
@ -182,6 +182,9 @@ import akka.util.OptionVal
private def withSafelyAdapted[U, V](adapt: () => U)(body: U => V): Unit = { private def withSafelyAdapted[U, V](adapt: () => U)(body: U => V): Unit = {
Try(adapt()) match { Try(adapt()) match {
case Success(null) =>
ctx.log.warn(
"Adapter function returned null which is not valid as an actor message, ignoring. This can happen for example when using pipeToSelf and returning null from the adapt function. Null value is ignored and not passed on to actor.")
case Success(a) => case Success(a) =>
body(a) body(a)
case Failure(ex) => case Failure(ex) =>