diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 783203ec0b..8a1fb5223b 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -403,26 +403,6 @@ object Actor extends ListenerManagement { val serializer: Serializer = akka.serialization.Serialization.getSerializer(this.getClass).fold(x ⇒ serializerErrorDueTo(x.toString), s ⇒ s) - /** - * val serializer: Serializer = serializerClassName match { - * case null | "" | Format.`defaultSerializerName` ⇒ Format.Default - * case specialSerializer ⇒ - * ReflectiveAccess.getClassFor(specialSerializer) match { - * case Right(clazz) ⇒ - * clazz.newInstance match { - * case s: Serializer ⇒ s - * case other ⇒ serializerErrorDueTo("class must be of type [akka.serialization.Serializer]") - * } - * case Left(exception) ⇒ - * val cause = exception match { - * case i: InvocationTargetException ⇒ i.getTargetException - * case _ ⇒ exception - * } - * serializerErrorDueTo(cause.toString) - * } - * } - */ - def storeActorAndGetClusterRef(replicationScheme: ReplicationScheme, serializer: Serializer): ActorRef = { // add actor to cluster registry (if not already added) if (!cluster.isClustered(address)) diff --git a/akka-actor/src/main/scala/akka/config/Config.scala b/akka-actor/src/main/scala/akka/config/Config.scala index 5e123ca345..d1f8412d1a 100644 --- a/akka-actor/src/main/scala/akka/config/Config.scala +++ b/akka-actor/src/main/scala/akka/config/Config.scala @@ -121,8 +121,10 @@ object Config { def uptime = (System.currentTimeMillis - startTime) / 1000 val serializers = config.getSection("akka.actor.serializers").map(_.map).getOrElse(Map("default" -> "akka.serialization.JavaSerializer")) + val bindings = config.getSection("akka.actor.bindings") .map(_.map) .map(m ⇒ Map() ++ m.map { case (k, v: List[String]) ⇒ Map() ++ v.map((_, k)) }.flatten) + val serializerMap = bindings.map(m ⇒ m.map { case (k, v: String) ⇒ (k, serializers(v)) }).getOrElse(Map()) } diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 206a4296df..c1859f120b 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -13,12 +13,12 @@ object Serialization { case class NoSerializerFoundException(m: String) extends Exception(m) def serialize(o: AnyRef): Either[Exception, Array[Byte]] = - getSerializer(o.getClass) - .fold((ex) ⇒ Left(ex), - (ser) ⇒ Right(ser.toBinary(o))) + getSerializer(o.getClass).fold((ex) ⇒ Left(ex), (ser) ⇒ Right(ser.toBinary(o))) - def deserialize(bytes: Array[Byte], clazz: Class[_], - classLoader: Option[ClassLoader]): Either[Exception, AnyRef] = + def deserialize( + bytes: Array[Byte], + clazz: Class[_], + classLoader: Option[ClassLoader]): Either[Exception, AnyRef] = getSerializer(clazz) .fold((ex) ⇒ Left(ex), (ser) ⇒ Right(ser.fromBinary(bytes, Some(clazz), classLoader))) @@ -27,10 +27,8 @@ object Serialization { Config.serializerMap.get(clazz.getName) match { case Some(serializerName: String) ⇒ getClassFor(serializerName) match { - case Right(serializer) ⇒ { - Right(serializer.newInstance.asInstanceOf[Serializer]) - } - case Left(exception) ⇒ Left(exception) + case Right(serializer) ⇒ Right(serializer.newInstance.asInstanceOf[Serializer]) + case Left(exception) ⇒ Left(exception) } case _ ⇒ getDefaultSerializer match { @@ -44,32 +42,29 @@ object Serialization { Config.serializers.get("default") match { case Some(ser: String) ⇒ getClassFor(ser) match { - case Right(srializer) ⇒ { - Some(srializer.newInstance.asInstanceOf[Serializer]) - } - case Left(exception) ⇒ None + case Right(srializer) ⇒ Some(srializer.newInstance.asInstanceOf[Serializer]) + case Left(exception) ⇒ None } case None ⇒ None } } - private def getSerializerInstanceForBestMatchClass(configMap: collection.mutable.Map[String, String], cl: Class[_]) = { + private def getSerializerInstanceForBestMatchClass( + configMap: collection.mutable.Map[String, String], + cl: Class[_]) = { configMap .find { case (clazzName, ser) ⇒ getClassFor(clazzName) match { - case Right(clazz) ⇒ - clazz.isAssignableFrom(cl) - case _ ⇒ false + case Right(clazz) ⇒ clazz.isAssignableFrom(cl) + case _ ⇒ false } } .map { case (_, ser) ⇒ getClassFor(ser) match { - case Right(s) ⇒ - val instance = s.newInstance.asInstanceOf[Serializer] - Right(instance) - case _ ⇒ Left(new Exception("Error instantiating " + ser)) + case Right(s) ⇒ Right(s.newInstance.asInstanceOf[Serializer]) + case _ ⇒ Left(new Exception("Error instantiating " + ser)) } }.getOrElse(Left(NoSerializerFoundException("No mapping serializer found for " + cl))) } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 14d18e063c..c8fd8698f3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -12,6 +12,7 @@ import org.I0Itec.zkclient._ import org.I0Itec.zkclient.serialize._ import org.I0Itec.zkclient.exception._ +import java.util.{ List ⇒ JList } import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference, AtomicInteger } import java.util.concurrent.{ ConcurrentSkipListSet, CopyOnWriteArrayList, Callable, ConcurrentHashMap } import java.net.InetSocketAddress @@ -26,18 +27,22 @@ import RemoteDaemonMessageType._ import akka.util._ import Helpers._ + import akka.actor._ import Actor._ import Status._ import DeploymentConfig.{ ReplicationScheme, ReplicationStrategy, Transient, WriteThrough, WriteBehind } + import akka.event.EventHandler import akka.dispatch.{ Dispatchers, Future } import akka.remoteinterface._ import akka.routing.RouterType + import akka.config.{ Config, Supervision } import Supervision._ import Config._ -import akka.serialization.{ Format, Serializer, Compression } + +import akka.serialization.{ Serialization, Serializer, Compression } import Compression.LZF import akka.AkkaException @@ -47,7 +52,6 @@ import akka.cluster.ChangeListener._ import com.eaio.uuid.UUID import com.google.protobuf.ByteString -import java.util.{ List ⇒ JList } // FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down // FIXME Provisioning data in ZK (file names etc) and files in S3 and on disk @@ -612,8 +616,8 @@ class DefaultClusterNode private[akka] ( "Storing actor [%s] with UUID [%s] in cluster".format(actorRef.address, uuid)) val actorBytes = - if (shouldCompressData) LZF.compress(toBinary(actorRef, serializeMailbox, replicationScheme)(format)) - else toBinary(actorRef, serializeMailbox, replicationScheme)(format) + if (shouldCompressData) LZF.compress(toBinary(actorRef, serializeMailbox, replicationScheme)) + else toBinary(actorRef, serializeMailbox, replicationScheme) val actorRegistryPath = actorRegistryPathFor(uuid) @@ -771,7 +775,7 @@ class DefaultClusterNode private[akka] ( }) match { case Left(bytes) ⇒ locallyCheckedOutActors += (uuid -> bytes) - val actor = fromBinary[T](bytes, remoteServerAddress)(format) + val actor = fromBinary[T](bytes, remoteServerAddress) EventHandler.debug(this, "Checking out actor [%s] to be used on node [%s] as local actor" .format(actor, nodeAddress.nodeName)) @@ -1040,11 +1044,15 @@ class DefaultClusterNode private[akka] ( * Send a function 'Function0[Unit]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument). */ def send(f: Function0[Unit], replicationFactor: Int) { - val message = RemoteDaemonMessageProtocol.newBuilder - .setMessageType(FUNCTION_FUN0_UNIT) - .setPayload(ByteString.copyFrom(Serializers.Java.toBinary(f))) - .build - replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message) + Serialization.serialize(f) match { + case Left(error) ⇒ throw error + case Right(bytes) ⇒ + val message = RemoteDaemonMessageProtocol.newBuilder + .setMessageType(FUNCTION_FUN0_UNIT) + .setPayload(ByteString.copyFrom(bytes)) + .build + replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message) + } } /** @@ -1052,12 +1060,16 @@ class DefaultClusterNode private[akka] ( * Returns an 'Array' with all the 'Future's from the computation. */ def send(f: Function0[Any], replicationFactor: Int): List[Future[Any]] = { - val message = RemoteDaemonMessageProtocol.newBuilder - .setMessageType(FUNCTION_FUN0_ANY) - .setPayload(ByteString.copyFrom(Serializers.Java.toBinary(f))) - .build - val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ ? message) - results.toList.asInstanceOf[List[Future[Any]]] + Serialization.serialize(f) match { + case Left(error) ⇒ throw error + case Right(bytes) ⇒ + val message = RemoteDaemonMessageProtocol.newBuilder + .setMessageType(FUNCTION_FUN0_ANY) + .setPayload(ByteString.copyFrom(bytes)) + .build + val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ ? message) + results.toList.asInstanceOf[List[Future[Any]]] + } } /** @@ -1065,11 +1077,15 @@ class DefaultClusterNode private[akka] ( * with the argument speficied. */ def send(f: Function1[Any, Unit], arg: Any, replicationFactor: Int) { - val message = RemoteDaemonMessageProtocol.newBuilder - .setMessageType(FUNCTION_FUN1_ARG_UNIT) - .setPayload(ByteString.copyFrom(Serializers.Java.toBinary((f, arg)))) - .build - replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message) + Serialization.serialize((f, arg)) match { + case Left(error) ⇒ throw error + case Right(bytes) ⇒ + val message = RemoteDaemonMessageProtocol.newBuilder + .setMessageType(FUNCTION_FUN1_ARG_UNIT) + .setPayload(ByteString.copyFrom(bytes)) + .build + replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message) + } } /** @@ -1078,12 +1094,16 @@ class DefaultClusterNode private[akka] ( * Returns an 'Array' with all the 'Future's from the computation. */ def send(f: Function1[Any, Any], arg: Any, replicationFactor: Int): List[Future[Any]] = { - val message = RemoteDaemonMessageProtocol.newBuilder - .setMessageType(FUNCTION_FUN1_ARG_ANY) - .setPayload(ByteString.copyFrom(Serializers.Java.toBinary((f, arg)))) - .build - val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ ? message) - results.toList.asInstanceOf[List[Future[Any]]] + Serialization.serialize((f, arg)) match { + case Left(error) ⇒ throw error + case Right(bytes) ⇒ + val message = RemoteDaemonMessageProtocol.newBuilder + .setMessageType(FUNCTION_FUN1_ARG_ANY) + .setPayload(ByteString.copyFrom(bytes)) + .build + val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ ? message) + results.toList.asInstanceOf[List[Future[Any]]] + } } // ======================================= @@ -1312,15 +1332,19 @@ class DefaultClusterNode private[akka] ( // notify all available nodes that they should fail-over all connections from 'from' to 'to' val from = nodeNameToAddress.get(failedNodeName) val to = remoteServerAddress - val command = RemoteDaemonMessageProtocol.newBuilder - .setMessageType(FAIL_OVER_CONNECTIONS) - .setPayload(ByteString.copyFrom(Serializers.Java.toBinary((from, to)))) - .build - membershipNodes foreach { node ⇒ - replicaConnections.get(node) foreach { - case (_, connection) ⇒ - connection ! command - } + Serialization.serialize((from, to)) match { + case Left(error) ⇒ throw error + case Right(bytes) ⇒ + val command = RemoteDaemonMessageProtocol.newBuilder + .setMessageType(FAIL_OVER_CONNECTIONS) + .setPayload(ByteString.copyFrom(bytes)) + .build + membershipNodes foreach { node ⇒ + replicaConnections.get(node) foreach { + case (_, connection) ⇒ + connection ! command + } + } } } } @@ -1673,6 +1697,6 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { } private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = { - Serializers.Java.fromBinary(message.getPayload.toByteArray, Some(clazz)).asInstanceOf[T] + Serialization.serialize(message.getPayload.toByteArray, Some(clazz)).asInstanceOf[T] } } diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index 281d2f91e5..89a9c811d9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -71,11 +71,11 @@ class TransactionLog private ( if (nrOfEntries.incrementAndGet % snapshotFrequency == 0) { val snapshot = // FIXME ReplicationStrategy Transient is always used - if (Cluster.shouldCompressData) LZF.compress(toBinary(actorRef, false, replicationScheme)(format)) - else toBinary(actorRef, false, replicationScheme)(format) + if (Cluster.shouldCompressData) LZF.compress(toBinary(actorRef, false, replicationScheme)) + else toBinary(actorRef, false, replicationScheme) recordSnapshot(snapshot) } - recordEntry(MessageSerializer.serialize(messageHandle.message).toByteArray) + recordEntry(MessageSerializer.serialize(messageHandle.message.asInstanceOf[AnyRef]).toByteArray) } /** diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 0e450308d4..aa6016aa15 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -1,3 +1,4 @@ +/* package akka.cluster import org.scalatest.WordSpec @@ -33,7 +34,6 @@ object BinaryFormatMyJavaSerializableActor { val serializer = Serializers.Java } } -/* class ClusterSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach { import Cluster._ diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusteredPingPongSample.scala b/akka-cluster/src/test/scala/akka/cluster/ClusteredPingPongSample.scala index 43ef424aa8..60d2a69a88 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusteredPingPongSample.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusteredPingPongSample.scala @@ -8,7 +8,6 @@ import akka.cluster._ import akka.actor._ import akka.actor.Actor._ -import akka.serialization.{ Serializers, SerializerBasedActorFormat } import java.util.concurrent.CountDownLatch @@ -60,20 +59,6 @@ object PingPong { self.stop } } - - // ------------------------ - // Serialization - // ------------------------ - - object BinaryFormats { - implicit object PingActorFormat extends SerializerBasedActorFormat[PingActor] with Serializable { - val serializer = Serializers.Java - } - - implicit object PongActorFormat extends SerializerBasedActorFormat[PongActor] with Serializable { - val serializer = Serializers.Java - } - } } /* diff --git a/akka-cluster/src/test/scala/akka/cluster/PingPongMultiJvmExample.scala b/akka-cluster/src/test/scala/akka/cluster/PingPongMultiJvmExample.scala index 4856efd188..81fb364d76 100644 --- a/akka-cluster/src/test/scala/akka/cluster/PingPongMultiJvmExample.scala +++ b/akka-cluster/src/test/scala/akka/cluster/PingPongMultiJvmExample.scala @@ -7,7 +7,6 @@ package example.cluster import akka.cluster._ import akka.actor._ -import akka.serialization.{ Serializers, SerializerBasedActorFormat } import akka.util.duration._ object PingPong { @@ -62,20 +61,6 @@ object PingPong { self reply Pong } } - - // ----------------------------------------------- - // Serialization - // ----------------------------------------------- - - object BinaryFormats { - implicit object PingActorFormat extends SerializerBasedActorFormat[PingActor] with Serializable { - val serializer = Serializers.Java - } - - implicit object PongActorFormat extends SerializerBasedActorFormat[PongActor] with Serializable { - val serializer = Serializers.Java - } - } } /* diff --git a/akka-cluster/src/test/scala/akka/cluster/ReplicationSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ReplicationSpec.scala index 79b706c9ea..e5f5261d0a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ReplicationSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ReplicationSpec.scala @@ -32,31 +32,31 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "A Transaction Log" should { "be able to record entries - synchronous" in { val uuid = (new UUID).toString - val txlog = TransactionLog.newLogFor(uuid, false, null, Format.Default) + val txlog = TransactionLog.newLogFor(uuid, false, null, JavaSerializer) val entry = "hello".getBytes("UTF-8") txlog.recordEntry(entry) } "be able to record and delete entries - synchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default) + val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer) val entry = "hello".getBytes("UTF-8") txlog1.recordEntry(entry) txlog1.recordEntry(entry) txlog1.delete txlog1.close - intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, false, null, Format.Default)) + intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, false, null, JavaSerializer)) } "be able to record entries and read entries with 'entriesInRange' - synchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default) + val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer) val entry = "hello".getBytes("UTF-8") txlog1.recordEntry(entry) txlog1.recordEntry(entry) txlog1.close - val txlog2 = TransactionLog.logFor(uuid, false, null, Format.Default) + val txlog2 = TransactionLog.logFor(uuid, false, null, JavaSerializer) val entries = txlog2.entriesInRange(0, 1).map(bytes ⇒ new String(bytes, "UTF-8")) entries.size must equal(2) entries(0) must equal("hello") @@ -66,7 +66,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record entries and read entries with 'entries' - synchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default) + val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer) val entry = "hello".getBytes("UTF-8") txlog1.recordEntry(entry) txlog1.recordEntry(entry) @@ -74,7 +74,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll txlog1.recordEntry(entry) txlog1.close - val txlog2 = TransactionLog.logFor(uuid, false, null, Format.Default) + val txlog2 = TransactionLog.logFor(uuid, false, null, JavaSerializer) val entries = txlog2.entries.map(bytes ⇒ new String(bytes, "UTF-8")) entries.size must equal(4) entries(0) must equal("hello") @@ -86,7 +86,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record a snapshot - synchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default) + val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer) val snapshot = "snapshot".getBytes("UTF-8") txlog1.recordSnapshot(snapshot) txlog1.close @@ -94,7 +94,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record and read a snapshot and following entries - synchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default) + val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer) val snapshot = "snapshot".getBytes("UTF-8") txlog1.recordSnapshot(snapshot) @@ -105,7 +105,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll txlog1.recordEntry(entry) txlog1.close - val txlog2 = TransactionLog.logFor(uuid, false, null, Format.Default) + val txlog2 = TransactionLog.logFor(uuid, false, null, JavaSerializer) val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot new String(snapshotAsBytes, "UTF-8") must equal("snapshot") @@ -120,7 +120,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - synchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default) + val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer) val entry = "hello".getBytes("UTF-8") txlog1.recordEntry(entry) @@ -134,7 +134,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll txlog1.recordEntry(entry) txlog1.close - val txlog2 = TransactionLog.logFor(uuid, false, null, Format.Default) + val txlog2 = TransactionLog.logFor(uuid, false, null, JavaSerializer) val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot new String(snapshotAsBytes, "UTF-8") must equal("snapshot") @@ -149,7 +149,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "A Transaction Log" should { "be able to record entries - asynchronous" in { val uuid = (new UUID).toString - val txlog = TransactionLog.newLogFor(uuid, true, null, Format.Default) + val txlog = TransactionLog.newLogFor(uuid, true, null, JavaSerializer) val entry = "hello".getBytes("UTF-8") txlog.recordEntry(entry) Thread.sleep(100) @@ -158,24 +158,24 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record and delete entries - asynchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default) + val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer) val entry = "hello".getBytes("UTF-8") txlog1.recordEntry(entry) txlog1.recordEntry(entry) txlog1.delete Thread.sleep(100) - intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, true, null, Format.Default)) + intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, true, null, JavaSerializer)) } "be able to record entries and read entries with 'entriesInRange' - asynchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default) + val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer) val entry = "hello".getBytes("UTF-8") txlog1.recordEntry(entry) txlog1.recordEntry(entry) Thread.sleep(100) txlog1.close - val txlog2 = TransactionLog.logFor(uuid, true, null, Format.Default) + val txlog2 = TransactionLog.logFor(uuid, true, null, JavaSerializer) val entries = txlog2.entriesInRange(0, 1).map(bytes ⇒ new String(bytes, "UTF-8")) entries.size must equal(2) entries(0) must equal("hello") @@ -186,7 +186,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record entries and read entries with 'entries' - asynchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default) + val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer) val entry = "hello".getBytes("UTF-8") txlog1.recordEntry(entry) txlog1.recordEntry(entry) @@ -195,7 +195,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll Thread.sleep(100) txlog1.close - val txlog2 = TransactionLog.logFor(uuid, true, null, Format.Default) + val txlog2 = TransactionLog.logFor(uuid, true, null, JavaSerializer) val entries = txlog2.entries.map(bytes ⇒ new String(bytes, "UTF-8")) entries.size must equal(4) entries(0) must equal("hello") @@ -208,7 +208,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record a snapshot - asynchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default) + val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer) val snapshot = "snapshot".getBytes("UTF-8") txlog1.recordSnapshot(snapshot) Thread.sleep(100) @@ -217,7 +217,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record and read a snapshot and following entries - asynchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default) + val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer) val snapshot = "snapshot".getBytes("UTF-8") txlog1.recordSnapshot(snapshot) @@ -229,7 +229,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll Thread.sleep(100) txlog1.close - val txlog2 = TransactionLog.logFor(uuid, true, null, Format.Default) + val txlog2 = TransactionLog.logFor(uuid, true, null, JavaSerializer) val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot new String(snapshotAsBytes, "UTF-8") must equal("snapshot") @@ -245,7 +245,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - asynchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default) + val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer) val entry = "hello".getBytes("UTF-8") txlog1.recordEntry(entry) @@ -258,7 +258,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll Thread.sleep(100) txlog1.close - val txlog2 = TransactionLog.logFor(uuid, true, null, Format.Default) + val txlog2 = TransactionLog.logFor(uuid, true, null, JavaSerializer) val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot new String(snapshotAsBytes, "UTF-8") must equal("snapshot") val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode1.conf index 3c9999f42c..7b2ecc1583 100644 --- a/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode1.conf +++ b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode1.conf @@ -1,5 +1,4 @@ akka.event-handler-level = "DEBUG" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.home = "node:node1" -akka.actor.deployment.service-hello.clustered.replicas = 1 -akka.actor.deployment.service-hello.clustered.stateless = on +akka.actor.deployment.service-hello.clustered.replicas = 1 \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode2.conf index 59aa6fddac..7b2ecc1583 100644 --- a/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode2.conf +++ b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode2.conf @@ -1,5 +1,4 @@ akka.event-handler-level = "DEBUG" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.home = "node:node1" -akka.actor.deployment.service-hello.clustered.replicas = 1 -akka.actor.deployment.service-hello.clustered.stateless = on \ No newline at end of file +akka.actor.deployment.service-hello.clustered.replicas = 1 \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index ec1a3a6e62..5a928ce148 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -44,7 +44,7 @@ abstract class DurableExecutableMailbox(owner: ActorRef) extends MessageQueue wi //TODO: switch to RemoteProtocol def serialize(durableMessage: MessageInvocation) = { - val message = MessageSerializer.serialize(durableMessage.message) + val message = MessageSerializer.serialize(durableMessage.message.asInstanceOf[AnyRef]) val builder = DurableMailboxMessageProtocol.newBuilder .setOwnerAddress(ownerAddress) .setMessage(message.toByteString) diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 8e832cd391..c481ac899e 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -35,8 +35,8 @@ object ActorSerialization { def toBinary[T <: Actor]( a: ActorRef, serializeMailBox: Boolean = true, - replicationScheme: ReplicationScheme = Transient)(implicit format: Serializer): Array[Byte] = - toSerializedActorRefProtocol(a, format, serializeMailBox, replicationScheme).toByteArray + replicationScheme: ReplicationScheme = Transient): Array[Byte] = + toSerializedActorRefProtocol(a, serializeMailBox, replicationScheme).toByteArray // wrapper for implicits to be used by Java def fromBinaryJ[T <: Actor](bytes: Array[Byte]): ActorRef = @@ -45,14 +45,12 @@ object ActorSerialization { // wrapper for implicits to be used by Java def toBinaryJ[T <: Actor]( a: ActorRef, - format: Serializer, srlMailBox: Boolean, replicationScheme: ReplicationScheme): Array[Byte] = - toBinary(a, srlMailBox, replicationScheme)(format) + toBinary(a, srlMailBox, replicationScheme) private[akka] def toSerializedActorRefProtocol[T <: Actor]( actorRef: ActorRef, - format: Serializer, serializeMailBox: Boolean, replicationScheme: ReplicationScheme): SerializedActorRefProtocol = { @@ -114,7 +112,6 @@ object ActorSerialization { } actorRef.receiveTimeout.foreach(builder.setReceiveTimeout(_)) - // builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T]))) Serialization.serialize(actorRef.actor.asInstanceOf[T]) match { case Right(bytes) ⇒ builder.setActorInstance(ByteString.copyFrom(bytes)) case Left(exception) ⇒ throw new Exception("Error serializing : " + actorRef.actor.getClass.getName) @@ -175,13 +172,13 @@ object ActorSerialization { val hotswap = try { - Serialization.deserialize(protocol.getHotswapStack.toByteArray, classOf[Stack[PartialFunction[Any, Unit]]], loader) match { - case Right(r) ⇒ r.asInstanceOf[Stack[PartialFunction[Any, Unit]]] - case Left(ex) ⇒ throw new Exception("Cannot de-serialize hotswapstack") - } - // format - // .fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[Stack[PartialFunction[Any, Unit]]])) - // .asInstanceOf[Stack[PartialFunction[Any, Unit]]] + Serialization.deserialize( + protocol.getHotswapStack.toByteArray, + classOf[Stack[PartialFunction[Any, Unit]]], + loader) match { + case Right(r) ⇒ r.asInstanceOf[Stack[PartialFunction[Any, Unit]]] + case Left(ex) ⇒ throw new Exception("Cannot de-serialize hotswapstack") + } } catch { case e: Exception ⇒ Stack[PartialFunction[Any, Unit]]() } @@ -195,7 +192,6 @@ object ActorSerialization { case Right(r) ⇒ r.asInstanceOf[Actor] case Left(ex) ⇒ throw new Exception("Cannot de-serialize : " + actorClass) } - // format.fromBinary(protocol.getActorInstance.toByteArray, Some(actorClass)).asInstanceOf[Actor] } catch { case e: Exception ⇒ actorClass.newInstance.asInstanceOf[Actor] } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 9b43aa97b1..edd36f3c92 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -157,8 +157,8 @@ akka { # FIXME rename to transport layer = "akka.remote.netty.NettyRemoteSupport" - secure-cookie = "" # generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' - # or using 'Crypt.generateSecureCookie' + secure-cookie = "" # Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' + # or using 'akka.util.Crypt.generateSecureCookie' replication { digest-type = "MAC" # Options: CRC32 (cheap & unsafe), MAC (expensive & secure using password)