Do not tear down connections on IllegalArgumentException from serializer #24910

This commit is contained in:
Johan Andrén 2018-05-16 15:35:05 +02:00 committed by GitHub
parent 0181a38ebc
commit b27bdba13b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 151 additions and 9 deletions

View file

@ -817,6 +817,9 @@ private[remote] class EndpointWriter(
case e: NotSerializableException case e: NotSerializableException
log.error(e, "Serializer not defined for message type [{}]. Transient association error (association remains live)", s.message.getClass) log.error(e, "Serializer not defined for message type [{}]. Transient association error (association remains live)", s.message.getClass)
true true
case e: IllegalArgumentException
log.error(e, "Serializer not defined for message type [{}]. Transient association error (association remains live)", s.message.getClass)
true
case e: MessageSerializer.SerializationException case e: MessageSerializer.SerializationException
log.error(e, "{} Transient association error (association remains live)", e.getMessage) log.error(e, "{} Transient association error (association remains live)", e.getMessage)
true true
@ -994,14 +997,8 @@ private[remote] class EndpointReader(
} else try } else try
msgDispatch.dispatch(msg.recipient, msg.recipientAddress, msg.serializedMessage, msg.senderOption) msgDispatch.dispatch(msg.recipient, msg.recipientAddress, msg.serializedMessage, msg.senderOption)
catch { catch {
case e: NotSerializableException case e: NotSerializableException logTransientSerializationError(msg, e)
val sm = msg.serializedMessage case e: IllegalArgumentException logTransientSerializationError(msg, e)
log.warning(
"Serializer not defined for message with serializer id [{}] and manifest [{}]. " +
"Transient association error (association remains live). {}",
sm.getSerializerId,
if (sm.hasMessageManifest) sm.getMessageManifest.toStringUtf8 else "",
e.getMessage)
} }
case None case None
@ -1020,6 +1017,16 @@ private[remote] class EndpointReader(
} }
private def logTransientSerializationError(msg: AkkaPduCodec.Message, error: Exception): Unit = {
val sm = msg.serializedMessage
log.warning(
"Serializer not defined for message with serializer id [{}] and manifest [{}]. " +
"Transient association error (association remains live). {}",
sm.getSerializerId,
if (sm.hasMessageManifest) sm.getMessageManifest.toStringUtf8 else "",
error.getMessage)
}
def notReading: Receive = { def notReading: Receive = {
case Disassociated(info) handleDisassociated(info) case Disassociated(info) handleDisassociated(info)

View file

@ -7,6 +7,7 @@ package akka.remote
import akka.remote.WireFormats._ import akka.remote.WireFormats._
import akka.protobuf.ByteString import akka.protobuf.ByteString
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
import akka.annotation.InternalApi
import akka.remote.artery.{ EnvelopeBuffer, HeaderBuilder, OutboundEnvelope } import akka.remote.artery.{ EnvelopeBuffer, HeaderBuilder, OutboundEnvelope }
import akka.serialization._ import akka.serialization._
@ -17,6 +18,7 @@ import scala.util.control.NonFatal
* *
* MessageSerializer is a helper for serializing and deserialize messages * MessageSerializer is a helper for serializing and deserialize messages
*/ */
@InternalApi
private[akka] object MessageSerializer { private[akka] object MessageSerializer {
class SerializationException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) class SerializationException(msg: String, cause: Throwable) extends RuntimeException(msg, cause)

View file

@ -7,17 +7,20 @@ package akka.remote
import akka.actor._ import akka.actor._
import akka.event.AddressTerminatedTopic import akka.event.AddressTerminatedTopic
import akka.pattern.ask import akka.pattern.ask
import akka.remote.transport.AssociationHandle.{ HandleEventListener, HandleEvent } import akka.remote.transport.AssociationHandle.{ HandleEvent, HandleEventListener }
import akka.remote.transport._ import akka.remote.transport._
import akka.remote.transport.Transport.InvalidAssociationException import akka.remote.transport.Transport.InvalidAssociationException
import akka.testkit._ import akka.testkit._
import akka.util.ByteString import akka.util.ByteString
import com.typesafe.config._ import com.typesafe.config._
import java.io.NotSerializableException import java.io.NotSerializableException
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.duration._ import scala.concurrent.duration._
import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.ThreadLocalRandom
import akka.serialization.SerializerWithStringManifest
import akka.testkit.SocketUtil.temporaryServerAddress import akka.testkit.SocketUtil.temporaryServerAddress
object RemotingSpec { object RemotingSpec {

View file

@ -0,0 +1,120 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote
import java.io.NotSerializableException
import akka.actor.{ Actor, ActorSystem, ExtendedActorSystem, Props, RootActorPath }
import akka.serialization.SerializerWithStringManifest
import akka.testkit.{ AkkaSpec, TestActors, TestKit }
import com.typesafe.config.{ Config, ConfigFactory }
object TransientSerializationErrorSpec {
object ManifestNotSerializable
object ManifestIllegal
object ToBinaryNotSerializable
object ToBinaryIllegal
object NotDeserializable
object IllegalOnDeserialize
class TestSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
def identifier: Int = 666
def manifest(o: AnyRef): String = o match {
case ManifestNotSerializable throw new NotSerializableException()
case ManifestIllegal throw new IllegalArgumentException()
case ToBinaryNotSerializable "TBNS"
case ToBinaryIllegal "TI"
case NotDeserializable "ND"
case IllegalOnDeserialize "IOD"
}
def toBinary(o: AnyRef): Array[Byte] = o match {
case ToBinaryNotSerializable throw new NotSerializableException()
case ToBinaryIllegal throw new IllegalArgumentException()
case _ Array.emptyByteArray
}
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
manifest match {
case "ND" throw new NotSerializableException() // Not sure this applies here
case "IOD" throw new IllegalArgumentException()
}
}
}
}
abstract class AbstractTransientSerializationErrorSpec(config: Config) extends AkkaSpec(
config.withFallback(ConfigFactory.parseString(
"""
akka {
loglevel = info
actor {
provider = remote
warn-about-java-serializer-usage = off
serialize-creators = off
serializers {
test = "akka.remote.TransientSerializationErrorSpec$TestSerializer"
}
serialization-bindings {
"akka.remote.TransientSerializationErrorSpec$ManifestNotSerializable$" = test
"akka.remote.TransientSerializationErrorSpec$ManifestIllegal$" = test
"akka.remote.TransientSerializationErrorSpec$ToBinaryNotSerializable$" = test
"akka.remote.TransientSerializationErrorSpec$ToBinaryIllegal$" = test
"akka.remote.TransientSerializationErrorSpec$NotDeserializable$" = test
"akka.remote.TransientSerializationErrorSpec$IllegalOnDeserialize$" = test
}
}
}
"""))) {
import TransientSerializationErrorSpec._
val port = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get
val sysName = system.name
val protocol =
if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka"
else "akka.tcp"
val system2 = ActorSystem(system.name, system.settings.config)
val system2Address = RARP(system2).provider.getDefaultAddress
"The transport" must {
"stay alive after a transient exception from the serializer" in {
system2.actorOf(TestActors.echoActorProps, "echo")
val selection = system.actorSelection(RootActorPath(system2Address) / "user" / "echo")
selection.tell("ping", this.testActor)
expectMsg("ping")
// none of these should tear down the connection
List(
ManifestIllegal,
ManifestNotSerializable,
ToBinaryIllegal,
ToBinaryNotSerializable,
NotDeserializable,
IllegalOnDeserialize
).foreach(msg
selection.tell(msg, this.testActor)
)
// make sure we still have a connection
selection.tell("ping", this.testActor)
expectMsg("ping")
}
}
override def afterTermination(): Unit = {
TestKit.shutdownActorSystem(system2)
}
}
class TransientSerializationErrorSpec extends AbstractTransientSerializationErrorSpec(ConfigFactory.parseString("""
akka.remote.netty.tcp {
hostname = localhost
port = 0
}
"""))

View file

@ -0,0 +1,10 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
import akka.remote.AbstractTransientSerializationErrorSpec
class TransientSerializationErrorSpec extends AbstractTransientSerializationErrorSpec(
ArterySpecSupport.defaultConfig)