From aa2c4fe7bf8cba8371da627bdda0623752f023f2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 27 May 2016 11:24:08 +0200 Subject: [PATCH] handle serialization errors, #20324 --- .../scala/akka/remote/MessageSerializer.scala | 3 +- .../scala/akka/remote/artery/Codecs.scala | 77 +++++++++++----- .../artery/SerializationErrorSpec.scala | 87 +++++++++++++++++++ 3 files changed, 142 insertions(+), 25 deletions(-) create mode 100644 akka-remote/src/test/scala/akka/remote/artery/SerializationErrorSpec.scala diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index 38699a04c9..09625d7fa5 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -72,7 +72,8 @@ private[akka] object MessageSerializer { } } - def deserializeForArtery(system: ExtendedActorSystem, serialization: Serialization, headerBuilder: HeaderBuilder, envelope: EnvelopeBuffer): AnyRef = { + def deserializeForArtery(system: ExtendedActorSystem, serialization: Serialization, headerBuilder: HeaderBuilder, + envelope: EnvelopeBuffer): AnyRef = { serialization.deserializeByteBuffer( envelope.byteBuffer, Integer.parseInt(headerBuilder.serializer), // FIXME: Use FQCN diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index da662e9fcd..454c01b663 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -1,13 +1,16 @@ package akka.remote.artery +import scala.util.control.NonFatal + import akka.actor.{ ActorRef, InternalActorRef } -import akka.remote.EndpointManager.Send +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem import akka.remote.{ MessageSerializer, UniqueAddress } +import akka.remote.EndpointManager.Send +import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope import akka.serialization.{ Serialization, SerializationExtension } import akka.stream._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } -import akka.actor.ActorSystem -import akka.actor.ExtendedActorSystem // TODO: Long UID class Encoder( @@ -22,7 +25,7 @@ class Encoder( val shape: FlowShape[Send, EnvelopeBuffer] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) with InHandler with OutHandler { + new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { private val headerBuilder = HeaderBuilder(compressionTable) headerBuilder.version = ArteryTransport.Version @@ -35,6 +38,8 @@ class Encoder( private val senderCache = new java.util.HashMap[ActorRef, String] private var recipientCache = new java.util.HashMap[ActorRef, String] + override protected def logSource = classOf[Encoder] + override def onPush(): Unit = { val send = grab(in) val envelope = pool.acquire() @@ -69,18 +74,31 @@ class Encoder( headerBuilder.senderActorRef = noSender } - // avoiding currentTransportInformation.withValue due to thunk allocation - val oldValue = Serialization.currentTransportInformation.value try { - Serialization.currentTransportInformation.value = serializationInfo - MessageSerializer.serializeForArtery(serialization, send.message.asInstanceOf[AnyRef], headerBuilder, envelope) - } finally - Serialization.currentTransportInformation.value = oldValue + // avoiding currentTransportInformation.withValue due to thunk allocation + val oldValue = Serialization.currentTransportInformation.value + try { + Serialization.currentTransportInformation.value = serializationInfo + MessageSerializer.serializeForArtery(serialization, send.message.asInstanceOf[AnyRef], headerBuilder, envelope) + } finally + Serialization.currentTransportInformation.value = oldValue - //println(s"${headerBuilder.senderActorRef} --> ${headerBuilder.recipientActorRef} ${headerBuilder.classManifest}") + envelope.byteBuffer.flip() + push(out, envelope) + + } catch { + case NonFatal(e) ⇒ + pool.release(envelope) + send.message match { + case _: SystemMessageEnvelope ⇒ + log.error(e, "Failed to serialize system message [{}].", send.message.getClass.getName) + throw e + case _ ⇒ + log.error(e, "Failed to serialize message [{}].", send.message.getClass.getName) + pull(in) + } + } - envelope.byteBuffer.flip() - push(out, envelope) } override def onPull(): Unit = pull(in) @@ -100,7 +118,7 @@ class Decoder( val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) with InHandler with OutHandler { + new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { private val localAddress = uniqueLocalAddress.address private val headerBuilder = HeaderBuilder(compressionTable) private val serialization = SerializationExtension(system) @@ -108,12 +126,12 @@ class Decoder( private val recipientCache = new java.util.HashMap[String, InternalActorRef] private val senderCache = new java.util.HashMap[String, Option[ActorRef]] + override protected def logSource = classOf[Decoder] + override def onPush(): Unit = { val envelope = grab(in) envelope.parseHeader(headerBuilder) - //println(s"${headerBuilder.recipientActorRef} <-- ${headerBuilder.senderActorRef} ${headerBuilder.classManifest}") - // FIXME: Instead of using Strings, the headerBuilder should automatically return cached ActorRef instances // in case of compression is enabled // FIXME: Is localAddress really needed? @@ -140,15 +158,26 @@ class Decoder( case refOpt ⇒ refOpt } - val decoded = InboundEnvelope( - recipient, - localAddress, // FIXME: Is this needed anymore? What should we do here? - MessageSerializer.deserializeForArtery(system, serialization, headerBuilder, envelope), - senderOption, // FIXME: No need for an option, decode simply to deadLetters instead - UniqueAddress(senderOption.get.path.address, headerBuilder.uid)) // FIXME see issue #20568 + try { + val deserializedMessage = MessageSerializer.deserializeForArtery( + system, serialization, headerBuilder, envelope) - pool.release(envelope) - push(out, decoded) + val decoded = InboundEnvelope( + recipient, + localAddress, // FIXME: Is this needed anymore? What should we do here? + deserializedMessage, + senderOption, // FIXME: No need for an option, decode simply to deadLetters instead + UniqueAddress(senderOption.get.path.address, headerBuilder.uid)) // FIXME see issue #20568 + + push(out, decoded) + } catch { + case NonFatal(e) ⇒ + log.warning("Failed to deserialize message with serializer id [{}] and manifest [{}]. {}", + headerBuilder.serializer, headerBuilder.classManifest, e.getMessage) + pull(in) + } finally { + pool.release(envelope) + } } override def onPull(): Unit = pull(in) diff --git a/akka-remote/src/test/scala/akka/remote/artery/SerializationErrorSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SerializationErrorSpec.scala new file mode 100644 index 0000000000..6b104257fe --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/SerializationErrorSpec.scala @@ -0,0 +1,87 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.duration._ +import akka.actor.{ ActorIdentity, ActorSystem, ExtendedActorSystem, Identify, RootActorPath } +import akka.testkit.{ AkkaSpec, ImplicitSender } +import akka.testkit.TestActors +import com.typesafe.config.ConfigFactory +import akka.testkit.EventFilter + +object SerializationErrorSpec { + + val config = ConfigFactory.parseString(s""" + akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.artery.enabled = on + remote.artery.hostname = localhost + remote.artery.port = 0 + actor { + serialize-creators = false + serialize-messages = false + } + } + """) + + object NotSerializableMsg + +} + +class SerializationErrorSpec extends AkkaSpec(SerializationErrorSpec.config) with ImplicitSender { + import SerializationErrorSpec._ + + val configB = ConfigFactory.parseString(""" + akka.actor.serialization-identifiers { + # this will cause deserialization error + "akka.serialization.ByteArraySerializer" = -4 + } + """).withFallback(system.settings.config) + val systemB = ActorSystem("systemB", configB) + systemB.actorOf(TestActors.echoActorProps, "echo") + val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + val rootB = RootActorPath(addressB) + + override def afterTermination(): Unit = shutdown(systemB) + + "Serialization error" must { + + "be logged when serialize fails" in { + val remoteRef = { + system.actorSelection(rootB / "user" / "echo") ! Identify(None) + expectMsgType[ActorIdentity].ref.get + } + + remoteRef ! "ping" + expectMsg("ping") + + EventFilter[java.io.NotSerializableException](start = "Failed to serialize message", occurrences = 1).intercept { + remoteRef ! NotSerializableMsg + } + + remoteRef ! "ping2" + expectMsg("ping2") + } + + "be logged when deserialize fails" in { + val remoteRef = { + system.actorSelection(rootB / "user" / "echo") ! Identify(None) + expectMsgType[ActorIdentity].ref.get + } + + remoteRef ! "ping" + expectMsg("ping") + + EventFilter.warning( + start = "Failed to deserialize message with serializer id [4]", occurrences = 1).intercept { + remoteRef ! "boom".getBytes("utf-8") + }(systemB) + + remoteRef ! "ping2" + expectMsg("ping2") + } + + } + +}