handle serialization errors, #20324

This commit is contained in:
Patrik Nordwall 2016-05-27 11:24:08 +02:00
parent e74e1da6cc
commit aa2c4fe7bf
3 changed files with 142 additions and 25 deletions

View file

@ -72,7 +72,8 @@ private[akka] object MessageSerializer {
}
}
def deserializeForArtery(system: ExtendedActorSystem, serialization: Serialization, headerBuilder: HeaderBuilder, envelope: EnvelopeBuffer): AnyRef = {
def deserializeForArtery(system: ExtendedActorSystem, serialization: Serialization, headerBuilder: HeaderBuilder,
envelope: EnvelopeBuffer): AnyRef = {
serialization.deserializeByteBuffer(
envelope.byteBuffer,
Integer.parseInt(headerBuilder.serializer), // FIXME: Use FQCN

View file

@ -1,13 +1,16 @@
package akka.remote.artery
import scala.util.control.NonFatal
import akka.actor.{ ActorRef, InternalActorRef }
import akka.remote.EndpointManager.Send
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.remote.{ MessageSerializer, UniqueAddress }
import akka.remote.EndpointManager.Send
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
import akka.serialization.{ Serialization, SerializationExtension }
import akka.stream._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
// TODO: Long UID
class Encoder(
@ -22,7 +25,7 @@ class Encoder(
val shape: FlowShape[Send, EnvelopeBuffer] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
private val headerBuilder = HeaderBuilder(compressionTable)
headerBuilder.version = ArteryTransport.Version
@ -35,6 +38,8 @@ class Encoder(
private val senderCache = new java.util.HashMap[ActorRef, String]
private var recipientCache = new java.util.HashMap[ActorRef, String]
override protected def logSource = classOf[Encoder]
override def onPush(): Unit = {
val send = grab(in)
val envelope = pool.acquire()
@ -69,6 +74,7 @@ class Encoder(
headerBuilder.senderActorRef = noSender
}
try {
// avoiding currentTransportInformation.withValue due to thunk allocation
val oldValue = Serialization.currentTransportInformation.value
try {
@ -77,10 +83,22 @@ class Encoder(
} finally
Serialization.currentTransportInformation.value = oldValue
//println(s"${headerBuilder.senderActorRef} --> ${headerBuilder.recipientActorRef} ${headerBuilder.classManifest}")
envelope.byteBuffer.flip()
push(out, envelope)
} catch {
case NonFatal(e)
pool.release(envelope)
send.message match {
case _: SystemMessageEnvelope
log.error(e, "Failed to serialize system message [{}].", send.message.getClass.getName)
throw e
case _
log.error(e, "Failed to serialize message [{}].", send.message.getClass.getName)
pull(in)
}
}
}
override def onPull(): Unit = pull(in)
@ -100,7 +118,7 @@ class Decoder(
val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
private val localAddress = uniqueLocalAddress.address
private val headerBuilder = HeaderBuilder(compressionTable)
private val serialization = SerializationExtension(system)
@ -108,12 +126,12 @@ class Decoder(
private val recipientCache = new java.util.HashMap[String, InternalActorRef]
private val senderCache = new java.util.HashMap[String, Option[ActorRef]]
override protected def logSource = classOf[Decoder]
override def onPush(): Unit = {
val envelope = grab(in)
envelope.parseHeader(headerBuilder)
//println(s"${headerBuilder.recipientActorRef} <-- ${headerBuilder.senderActorRef} ${headerBuilder.classManifest}")
// FIXME: Instead of using Strings, the headerBuilder should automatically return cached ActorRef instances
// in case of compression is enabled
// FIXME: Is localAddress really needed?
@ -140,15 +158,26 @@ class Decoder(
case refOpt refOpt
}
try {
val deserializedMessage = MessageSerializer.deserializeForArtery(
system, serialization, headerBuilder, envelope)
val decoded = InboundEnvelope(
recipient,
localAddress, // FIXME: Is this needed anymore? What should we do here?
MessageSerializer.deserializeForArtery(system, serialization, headerBuilder, envelope),
deserializedMessage,
senderOption, // FIXME: No need for an option, decode simply to deadLetters instead
UniqueAddress(senderOption.get.path.address, headerBuilder.uid)) // FIXME see issue #20568
pool.release(envelope)
push(out, decoded)
} catch {
case NonFatal(e)
log.warning("Failed to deserialize message with serializer id [{}] and manifest [{}]. {}",
headerBuilder.serializer, headerBuilder.classManifest, e.getMessage)
pull(in)
} finally {
pool.release(envelope)
}
}
override def onPull(): Unit = pull(in)

View file

@ -0,0 +1,87 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import scala.concurrent.duration._
import akka.actor.{ ActorIdentity, ActorSystem, ExtendedActorSystem, Identify, RootActorPath }
import akka.testkit.{ AkkaSpec, ImplicitSender }
import akka.testkit.TestActors
import com.typesafe.config.ConfigFactory
import akka.testkit.EventFilter
object SerializationErrorSpec {
val config = ConfigFactory.parseString(s"""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.artery.enabled = on
remote.artery.hostname = localhost
remote.artery.port = 0
actor {
serialize-creators = false
serialize-messages = false
}
}
""")
object NotSerializableMsg
}
class SerializationErrorSpec extends AkkaSpec(SerializationErrorSpec.config) with ImplicitSender {
import SerializationErrorSpec._
val configB = ConfigFactory.parseString("""
akka.actor.serialization-identifiers {
# this will cause deserialization error
"akka.serialization.ByteArraySerializer" = -4
}
""").withFallback(system.settings.config)
val systemB = ActorSystem("systemB", configB)
systemB.actorOf(TestActors.echoActorProps, "echo")
val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val rootB = RootActorPath(addressB)
override def afterTermination(): Unit = shutdown(systemB)
"Serialization error" must {
"be logged when serialize fails" in {
val remoteRef = {
system.actorSelection(rootB / "user" / "echo") ! Identify(None)
expectMsgType[ActorIdentity].ref.get
}
remoteRef ! "ping"
expectMsg("ping")
EventFilter[java.io.NotSerializableException](start = "Failed to serialize message", occurrences = 1).intercept {
remoteRef ! NotSerializableMsg
}
remoteRef ! "ping2"
expectMsg("ping2")
}
"be logged when deserialize fails" in {
val remoteRef = {
system.actorSelection(rootB / "user" / "echo") ! Identify(None)
expectMsgType[ActorIdentity].ref.get
}
remoteRef ! "ping"
expectMsg("ping")
EventFilter.warning(
start = "Failed to deserialize message with serializer id [4]", occurrences = 1).intercept {
remoteRef ! "boom".getBytes("utf-8")
}(systemB)
remoteRef ! "ping2"
expectMsg("ping2")
}
}
}