diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 4418256c61..71d739b84a 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -817,6 +817,9 @@ private[remote] class EndpointWriter( case e: NotSerializableException ⇒ log.error(e, "Serializer not defined for message type [{}]. Transient association error (association remains live)", s.message.getClass) true + case e: IllegalArgumentException ⇒ + log.error(e, "Serializer not defined for message type [{}]. Transient association error (association remains live)", s.message.getClass) + true case e: MessageSerializer.SerializationException ⇒ log.error(e, "{} Transient association error (association remains live)", e.getMessage) true @@ -994,14 +997,8 @@ private[remote] class EndpointReader( } else try msgDispatch.dispatch(msg.recipient, msg.recipientAddress, msg.serializedMessage, msg.senderOption) catch { - case e: NotSerializableException ⇒ - val sm = msg.serializedMessage - log.warning( - "Serializer not defined for message with serializer id [{}] and manifest [{}]. " + - "Transient association error (association remains live). {}", - sm.getSerializerId, - if (sm.hasMessageManifest) sm.getMessageManifest.toStringUtf8 else "", - e.getMessage) + case e: NotSerializableException ⇒ logTransientSerializationError(msg, e) + case e: IllegalArgumentException ⇒ logTransientSerializationError(msg, e) } case None ⇒ @@ -1020,6 +1017,16 @@ private[remote] class EndpointReader( } + private def logTransientSerializationError(msg: AkkaPduCodec.Message, error: Exception): Unit = { + val sm = msg.serializedMessage + log.warning( + "Serializer not defined for message with serializer id [{}] and manifest [{}]. " + + "Transient association error (association remains live). {}", + sm.getSerializerId, + if (sm.hasMessageManifest) sm.getMessageManifest.toStringUtf8 else "", + error.getMessage) + } + def notReading: Receive = { case Disassociated(info) ⇒ handleDisassociated(info) diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index 1777d88d7d..e066dce8f7 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -7,6 +7,7 @@ package akka.remote import akka.remote.WireFormats._ import akka.protobuf.ByteString import akka.actor.ExtendedActorSystem +import akka.annotation.InternalApi import akka.remote.artery.{ EnvelopeBuffer, HeaderBuilder, OutboundEnvelope } import akka.serialization._ @@ -17,6 +18,7 @@ import scala.util.control.NonFatal * * MessageSerializer is a helper for serializing and deserialize messages */ +@InternalApi private[akka] object MessageSerializer { class SerializationException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index 225136f0d2..d1f378a2c7 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -7,17 +7,20 @@ package akka.remote import akka.actor._ import akka.event.AddressTerminatedTopic import akka.pattern.ask -import akka.remote.transport.AssociationHandle.{ HandleEventListener, HandleEvent } +import akka.remote.transport.AssociationHandle.{ HandleEvent, HandleEventListener } import akka.remote.transport._ import akka.remote.transport.Transport.InvalidAssociationException import akka.testkit._ import akka.util.ByteString import com.typesafe.config._ import java.io.NotSerializableException + import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ import java.util.concurrent.ThreadLocalRandom + +import akka.serialization.SerializerWithStringManifest import akka.testkit.SocketUtil.temporaryServerAddress object RemotingSpec { diff --git a/akka-remote/src/test/scala/akka/remote/TransientSerializationErrorSpec.scala b/akka-remote/src/test/scala/akka/remote/TransientSerializationErrorSpec.scala new file mode 100644 index 0000000000..4ce28561d2 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/TransientSerializationErrorSpec.scala @@ -0,0 +1,120 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.remote + +import java.io.NotSerializableException + +import akka.actor.{ Actor, ActorSystem, ExtendedActorSystem, Props, RootActorPath } +import akka.serialization.SerializerWithStringManifest +import akka.testkit.{ AkkaSpec, TestActors, TestKit } +import com.typesafe.config.{ Config, ConfigFactory } + +object TransientSerializationErrorSpec { + object ManifestNotSerializable + object ManifestIllegal + object ToBinaryNotSerializable + object ToBinaryIllegal + object NotDeserializable + object IllegalOnDeserialize + + class TestSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { + def identifier: Int = 666 + def manifest(o: AnyRef): String = o match { + case ManifestNotSerializable ⇒ throw new NotSerializableException() + case ManifestIllegal ⇒ throw new IllegalArgumentException() + case ToBinaryNotSerializable ⇒ "TBNS" + case ToBinaryIllegal ⇒ "TI" + case NotDeserializable ⇒ "ND" + case IllegalOnDeserialize ⇒ "IOD" + } + def toBinary(o: AnyRef): Array[Byte] = o match { + case ToBinaryNotSerializable ⇒ throw new NotSerializableException() + case ToBinaryIllegal ⇒ throw new IllegalArgumentException() + case _ ⇒ Array.emptyByteArray + } + def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { + manifest match { + case "ND" ⇒ throw new NotSerializableException() // Not sure this applies here + case "IOD" ⇒ throw new IllegalArgumentException() + } + } + } +} + +abstract class AbstractTransientSerializationErrorSpec(config: Config) extends AkkaSpec( + config.withFallback(ConfigFactory.parseString( + """ + akka { + loglevel = info + actor { + provider = remote + warn-about-java-serializer-usage = off + serialize-creators = off + serializers { + test = "akka.remote.TransientSerializationErrorSpec$TestSerializer" + } + serialization-bindings { + "akka.remote.TransientSerializationErrorSpec$ManifestNotSerializable$" = test + "akka.remote.TransientSerializationErrorSpec$ManifestIllegal$" = test + "akka.remote.TransientSerializationErrorSpec$ToBinaryNotSerializable$" = test + "akka.remote.TransientSerializationErrorSpec$ToBinaryIllegal$" = test + "akka.remote.TransientSerializationErrorSpec$NotDeserializable$" = test + "akka.remote.TransientSerializationErrorSpec$IllegalOnDeserialize$" = test + } + } + } + """))) { + + import TransientSerializationErrorSpec._ + + val port = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get + val sysName = system.name + val protocol = + if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka" + else "akka.tcp" + + val system2 = ActorSystem(system.name, system.settings.config) + val system2Address = RARP(system2).provider.getDefaultAddress + + "The transport" must { + + "stay alive after a transient exception from the serializer" in { + system2.actorOf(TestActors.echoActorProps, "echo") + + val selection = system.actorSelection(RootActorPath(system2Address) / "user" / "echo") + + selection.tell("ping", this.testActor) + expectMsg("ping") + + // none of these should tear down the connection + List( + ManifestIllegal, + ManifestNotSerializable, + ToBinaryIllegal, + ToBinaryNotSerializable, + NotDeserializable, + IllegalOnDeserialize + ).foreach(msg ⇒ + selection.tell(msg, this.testActor) + ) + + // make sure we still have a connection + selection.tell("ping", this.testActor) + expectMsg("ping") + + } + } + + override def afterTermination(): Unit = { + TestKit.shutdownActorSystem(system2) + } +} + +class TransientSerializationErrorSpec extends AbstractTransientSerializationErrorSpec(ConfigFactory.parseString(""" + akka.remote.netty.tcp { + hostname = localhost + port = 0 + } +""")) diff --git a/akka-remote/src/test/scala/akka/remote/artery/TransientSerializationErrorSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/TransientSerializationErrorSpec.scala new file mode 100644 index 0000000000..72e86aa9e1 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/TransientSerializationErrorSpec.scala @@ -0,0 +1,10 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.remote.artery + +import akka.remote.AbstractTransientSerializationErrorSpec + +class TransientSerializationErrorSpec extends AbstractTransientSerializationErrorSpec( + ArterySpecSupport.defaultConfig)