diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index e85ee21932..db2e7a7885 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -16,7 +16,7 @@ import scala.concurrent.duration.FiniteDuration /** * INTERNAL API: Use `BalancingPool` instead of this dispatcher directly. - * + * * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed * that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the * actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message. diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index cf6e56bba6..a09190f5fc 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -120,6 +120,15 @@ abstract class SerializerWithStringManifest extends Serializer { /** * Produces an object from an array of bytes, with an optional type-hint; * the class should be loaded using ActorSystem.dynamicAccess. + * + * It's recommended to throw `java.io.NotSerializableException` in `fromBinary` + * if the manifest is unknown. This makes it possible to introduce new message + * types and send them to nodes that don't know about them. This is typically + * needed when performing rolling upgrades, i.e. running a cluster with mixed + * versions for while. `NotSerializableException` is treated as a transient + * problem in the TCP based remoting layer. The problem will be logged + * and message is dropped. Other exceptions will tear down the TCP connection + * because it can be an indication of corrupt bytes from the underlying transport. */ def fromBinary(bytes: Array[Byte], manifest: String): AnyRef diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala index 2a48613dc0..2057b006c2 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala @@ -16,6 +16,7 @@ import akka.protobuf.{ ByteString, MessageLite } import scala.annotation.tailrec import scala.collection.JavaConverters.{ asJavaIterableConverter, asScalaBufferConverter, setAsJavaSetConverter } import akka.serialization.SerializerWithStringManifest +import java.io.NotSerializableException /** * Protobuf serializer for [[akka.cluster.metrics.ClusterMetricsMessage]] types. @@ -66,7 +67,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends SerializerWithS override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { case MetricsGossipEnvelopeManifest ⇒ metricsGossipEnvelopeFromBinary(bytes) - case _ ⇒ throw new IllegalArgumentException( + case _ ⇒ throw new NotSerializableException( s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}") } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala index 3e629ee2d9..da8d5df02d 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala @@ -21,6 +21,7 @@ import akka.serialization.Serialization import akka.serialization.SerializationExtension import akka.serialization.SerializerWithStringManifest import akka.protobuf.MessageLite +import java.io.NotSerializableException /** * INTERNAL API: Protobuf serializer of ClusterSharding messages. @@ -159,7 +160,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = fromBinaryMap.get(manifest) match { case Some(f) ⇒ f(bytes) - case None ⇒ throw new IllegalArgumentException( + case None ⇒ throw new NotSerializableException( s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") } diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializer.scala b/akka-cluster-tools/src/main/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializer.scala index 1a2f2bb5ab..cebb37ab36 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializer.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializer.scala @@ -10,6 +10,7 @@ import akka.serialization.SerializationExtension import akka.serialization.SerializerWithStringManifest import akka.cluster.client.ClusterReceptionist import akka.cluster.client.protobuf.msg.{ ClusterClientMessages ⇒ cm } +import java.io.NotSerializableException /** * INTERNAL API: Serializer of ClusterClient messages. @@ -54,7 +55,7 @@ private[akka] class ClusterClientMessageSerializer(val system: ExtendedActorSyst override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = fromBinaryMap.get(manifest) match { case Some(f) ⇒ f(bytes) - case None ⇒ throw new IllegalArgumentException( + case None ⇒ throw new NotSerializableException( s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") } diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala index ef03acf261..29eb8bbac9 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala @@ -21,6 +21,7 @@ import akka.actor.ActorRef import akka.serialization.SerializationExtension import scala.collection.immutable.TreeMap import akka.serialization.SerializerWithStringManifest +import java.io.NotSerializableException /** * INTERNAL API: Protobuf serializer of DistributedPubSubMediator messages. @@ -72,7 +73,7 @@ private[akka] class DistributedPubSubMessageSerializer(val system: ExtendedActor override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = fromBinaryMap.get(manifest) match { case Some(f) ⇒ f(bytes) - case None ⇒ throw new IllegalArgumentException( + case None ⇒ throw new NotSerializableException( s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") } diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/protobuf/ClusterSingletonMessageSerializer.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/protobuf/ClusterSingletonMessageSerializer.scala index 00e1233b75..df74a7d580 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/protobuf/ClusterSingletonMessageSerializer.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/protobuf/ClusterSingletonMessageSerializer.scala @@ -11,6 +11,7 @@ import akka.cluster.singleton.ClusterSingletonManager.Internal.TakeOverFromMe import akka.serialization.BaseSerializer import akka.serialization.SerializationExtension import akka.serialization.SerializerWithStringManifest +import java.io.NotSerializableException /** * INTERNAL API: Serializer of ClusterSingleton messages. @@ -56,7 +57,7 @@ private[akka] class ClusterSingletonMessageSerializer(val system: ExtendedActorS override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = fromBinaryMap.get(manifest) match { case Some(f) ⇒ f(bytes) - case None ⇒ throw new IllegalArgumentException( + case None ⇒ throw new NotSerializableException( s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") } diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 970c8e1a6e..4c9ded529c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -18,6 +18,7 @@ import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.collection.immutable import scala.concurrent.duration.Deadline +import java.io.NotSerializableException /** * Protobuf serializer of cluster messages. @@ -107,7 +108,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = clazz match { case Some(c) ⇒ fromBinaryMap.get(c.asInstanceOf[Class[ClusterMessage]]) match { case Some(f) ⇒ f(bytes) - case None ⇒ throw new IllegalArgumentException(s"Unimplemented deserialization of message class $c in ClusterSerializer") + case None ⇒ throw new NotSerializableException(s"Unimplemented deserialization of message class $c in ClusterSerializer") } case _ ⇒ throw new IllegalArgumentException("Need a cluster message class to be able to deserialize bytes in ClusterSerializer") } @@ -175,8 +176,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri } else { // old remote node uniqueAddress.getUid.toLong - } - ) + }) } private val memberStatusToInt = scala.collection.immutable.HashMap[MemberStatus, Int]( diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala index eaf2b61692..bc8d52ccdc 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala @@ -21,6 +21,7 @@ import akka.protobuf.ByteString import akka.util.ByteString.UTF_8 import scala.collection.immutable.TreeMap import akka.cluster.UniqueAddress +import java.io.NotSerializableException /** * Protobuf serializer of ReplicatedData. @@ -126,7 +127,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = fromBinaryMap.get(manifest) match { case Some(f) ⇒ f(bytes) - case None ⇒ throw new IllegalArgumentException( + case None ⇒ throw new NotSerializableException( s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala index 356725d706..a231d83a78 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala @@ -27,6 +27,7 @@ import scala.annotation.tailrec import scala.concurrent.duration.FiniteDuration import akka.cluster.ddata.DurableStore.DurableDataEnvelope import akka.cluster.ddata.DurableStore.DurableDataEnvelope +import java.io.NotSerializableException /** * INTERNAL API @@ -235,7 +236,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = fromBinaryMap.get(manifest) match { case Some(f) ⇒ f(bytes) - case None ⇒ throw new IllegalArgumentException( + case None ⇒ throw new NotSerializableException( s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") } diff --git a/akka-docs/rst/java/serialization.rst b/akka-docs/rst/java/serialization.rst index d76de2b513..800000c517 100644 --- a/akka-docs/rst/java/serialization.rst +++ b/akka-docs/rst/java/serialization.rst @@ -145,6 +145,16 @@ This is how a ``SerializerWithStringManifest`` looks like: You must also bind it to a name in your :ref:`configuration` and then list which classes that should be serialized using it. +It's recommended to throw ``java.io.NotSerializableException`` in ``fromBinary`` +if the manifest is unknown. This makes it possible to introduce new message types and +send them to nodes that don't know about them. This is typically needed when performing +rolling upgrades, i.e. running a cluster with mixed versions for while. +``NotSerializableException`` is treated as a transient problem in the TCP based remoting +layer. The problem will be logged and message is dropped. Other exceptions will tear down +the TCP connection because it can be an indication of corrupt bytes from the underlying +transport. + + Serializing ActorRefs --------------------- diff --git a/akka-docs/rst/scala/serialization.rst b/akka-docs/rst/scala/serialization.rst index b9b518008d..de882d57f1 100644 --- a/akka-docs/rst/scala/serialization.rst +++ b/akka-docs/rst/scala/serialization.rst @@ -135,6 +135,15 @@ This is how a ``SerializerWithStringManifest`` looks like: You must also bind it to a name in your :ref:`configuration` and then list which classes that should be serialized using it. +It's recommended to throw ``java.io.NotSerializableException`` in ``fromBinary`` +if the manifest is unknown. This makes it possible to introduce new message types and +send them to nodes that don't know about them. This is typically needed when performing +rolling upgrades, i.e. running a cluster with mixed versions for while. +``NotSerializableException`` is treated as a transient problem in the TCP based remoting +layer. The problem will be logged and message is dropped. Other exceptions will tear down +the TCP connection because it can be an indication of corrupt bytes from the underlying +transport. + Serializing ActorRefs --------------------- diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index ba438f7d74..20112282f4 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -16,6 +16,7 @@ import scala.concurrent.duration import akka.actor.Actor import scala.concurrent.duration.Duration import scala.language.existentials +import java.io.NotSerializableException /** * Marker trait for all protobuf-serializable messages in `akka.persistence`. @@ -71,7 +72,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer case AtLeastOnceDeliverySnapshotClass ⇒ atLeastOnceDeliverySnapshot(mf.AtLeastOnceDeliverySnapshot.parseFrom(bytes)) case PersistentStateChangeEventClass ⇒ stateChange(mf.PersistentStateChangeEvent.parseFrom(bytes)) case PersistentFSMSnapshotClass ⇒ persistentFSMSnapshot(mf.PersistentFSMSnapshot.parseFrom(bytes)) - case _ ⇒ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}") + case _ ⇒ throw new NotSerializableException(s"Can't deserialize object of type ${c}") } } diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 5ed21c8b94..020dc422fd 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -984,7 +984,18 @@ private[remote] class EndpointReader( if (msg.reliableDeliveryEnabled) { ackedReceiveBuffer = ackedReceiveBuffer.receive(msg) deliverAndAck() - } else msgDispatch.dispatch(msg.recipient, msg.recipientAddress, msg.serializedMessage, msg.senderOption) + } else try + msgDispatch.dispatch(msg.recipient, msg.recipientAddress, msg.serializedMessage, msg.senderOption) + catch { + case e: NotSerializableException ⇒ + val sm = msg.serializedMessage + log.warning( + "Serializer not defined for message with serializer id [{}] and manifest [{}]. " + + "Transient association error (association remains live). {}", + sm.getSerializerId, + if (sm.hasMessageManifest) sm.getMessageManifest.toStringUtf8 else "", + e.getMessage) + } case None ⇒ } diff --git a/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala index 9b2fcdf131..6db28fe7ad 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala @@ -8,6 +8,7 @@ import akka.protobuf.ByteString import akka.remote.{ ContainerFormats, RemoteWatcher } import akka.serialization.{ BaseSerializer, Serialization, SerializationExtension, SerializerWithStringManifest } import java.util.Optional +import java.io.NotSerializableException class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer { @@ -152,7 +153,7 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = fromBinaryMap.get(manifest) match { case Some(deserializer) ⇒ deserializer(bytes) - case None ⇒ throw new IllegalArgumentException( + case None ⇒ throw new NotSerializableException( s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") } diff --git a/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala index 24b4f7a6c7..5d8a56b840 100644 --- a/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala @@ -12,6 +12,7 @@ import com.typesafe.config.ConfigFactory import scala.util.control.NoStackTrace import java.util.Optional +import java.io.NotSerializableException object MiscMessageSerializerSpec { val serializationTestOverrides = @@ -103,7 +104,7 @@ class MiscMessageSerializerSpec extends AkkaSpec(MiscMessageSerializerSpec.testC } "reject deserialization with invalid manifest" in { - intercept[IllegalArgumentException] { + intercept[NotSerializableException] { val serializer = new MiscMessageSerializer(system.asInstanceOf[ExtendedActorSystem]) serializer.fromBinary(Array.empty[Byte], "INVALID") }