From fa084fc5cec9d55b5d379007000ef933f886f074 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 2 Sep 2016 11:05:00 +0200 Subject: [PATCH] treat serialization (toBinary) exception of remote message as transient error, #21343 --- .../src/main/scala/akka/remote/Endpoint.scala | 5 ++- .../scala/akka/remote/MessageSerializer.scala | 34 +++++++++++++------ 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index c92f1b1032..18624b2417 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -803,7 +803,10 @@ private[remote] class EndpointWriter( } } catch { case e: NotSerializableException ⇒ - log.error(e, "Transient association error (association remains live)") + log.error(e, "Serializer not defined for message type []. Transient association error (association remains live)", s.message.getClass) + true + case e: MessageSerializer.SerializationException ⇒ + log.error(e, "{} Transient association error (association remains live)", e.getMessage) true case e: EndpointException ⇒ publishAndThrow(e, Logging.ErrorLevel) diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index e222adacd5..8306ff533a 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -9,6 +9,7 @@ import akka.protobuf.ByteString import akka.actor.ExtendedActorSystem import akka.serialization.SerializationExtension import akka.serialization.SerializerWithStringManifest +import scala.util.control.NonFatal /** * INTERNAL API @@ -17,6 +18,8 @@ import akka.serialization.SerializerWithStringManifest */ private[akka] object MessageSerializer { + class SerializationException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) + /** * Uses Akka Serialization for the specified ActorSystem to transform the given MessageProtocol to a message */ @@ -29,22 +32,31 @@ private[akka] object MessageSerializer { /** * Uses Akka Serialization for the specified ActorSystem to transform the given message to a MessageProtocol + * Throws `NotSerializableException` if serializer was not configured for the message type. + * Throws `MessageSerializer.SerializationException` if exception was thrown from `toBinary` of the + * serializer. */ def serialize(system: ExtendedActorSystem, message: AnyRef): SerializedMessage = { val s = SerializationExtension(system) val serializer = s.findSerializerFor(message) val builder = SerializedMessage.newBuilder - builder.setMessage(ByteString.copyFrom(serializer.toBinary(message))) - builder.setSerializerId(serializer.identifier) - serializer match { - case ser2: SerializerWithStringManifest ⇒ - val manifest = ser2.manifest(message) - if (manifest != "") - builder.setMessageManifest(ByteString.copyFromUtf8(manifest)) - case _ ⇒ - if (serializer.includeManifest) - builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) + try { + builder.setMessage(ByteString.copyFrom(serializer.toBinary(message))) + builder.setSerializerId(serializer.identifier) + serializer match { + case ser2: SerializerWithStringManifest ⇒ + val manifest = ser2.manifest(message) + if (manifest != "") + builder.setMessageManifest(ByteString.copyFromUtf8(manifest)) + case _ ⇒ + if (serializer.includeManifest) + builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) + } + builder.build + } catch { + case NonFatal(e) ⇒ + throw new SerializationException(s"Failed to serialize remote message [${message.getClass}] " + + s"using serializer [${serializer.getClass}].", e) } - builder.build } }