diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala index 974b2ea1c9..15316f727d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala @@ -22,7 +22,7 @@ class DeployerSpec extends WordSpec with MustMatchers { Clustered( Node("node1"), Replicate(3), - Stateful( + Replication( TransactionLog, WriteThrough))))) } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index a60a6437ca..d0d39d29c6 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -380,7 +380,13 @@ object Actor extends ListenerManagement { private def newClusterActorRef(factory: () ⇒ ActorRef, address: String, deploy: Deploy): ActorRef = { deploy match { - case Deploy(configAdress, router, serializerClassName, Clustered(home, replication: Replication, state: State)) ⇒ + case Deploy( + configAdress, router, serializerClassName, + Clustered( + home, + replicas, + replication)) ⇒ + ClusterModule.ensureEnabled() if (configAdress != address) throw new IllegalStateException( @@ -389,11 +395,12 @@ object Actor extends ListenerManagement { "Remote server is not running") val isHomeNode = DeploymentConfig.isHomeNode(home) - val replicas = DeploymentConfig.replicaValueFor(replication) + val nrOfReplicas = DeploymentConfig.replicaValueFor(replicas) - def storeActorAndGetClusterRef(replicationStrategy: ReplicationStrategy, serializer: Serializer): ActorRef = { + def storeActorAndGetClusterRef(replicationScheme: ReplicationScheme, serializer: Serializer): ActorRef = { // add actor to cluster registry (if not already added) - if (!cluster.isClustered(address)) cluster.store(factory().start(), replicas, replicationStrategy, false, serializer) + if (!cluster.isClustered(address)) + cluster.store(factory().start(), nrOfReplicas, replicationScheme, false, serializer) // remote node (not home node), check out as ClusterActorRef cluster.ref(address, DeploymentConfig.routerTypeFor(router)) @@ -401,11 +408,11 @@ object Actor extends ListenerManagement { val serializer = serializerFor(address, serializerClassName) - state match { - case _: Stateless | Stateless ⇒ + replication match { + case _: Transient | Transient ⇒ storeActorAndGetClusterRef(Transient, serializer) - case Stateful(storage, strategy) ⇒ + case replication: Replication ⇒ if (isHomeNode) { // stateful actor's home node cluster .use(address, serializer) @@ -413,7 +420,7 @@ object Actor extends ListenerManagement { "Could not check out actor [" + address + "] from cluster registry as a \"local\" actor")) } else { // FIXME later manage different 'storage' (data grid) as well - storeActorAndGetClusterRef(strategy, serializer) + storeActorAndGetClusterRef(replication, serializer) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 9ad22c4c2d..a11a9a34c7 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -6,13 +6,13 @@ package akka.actor import akka.event.EventHandler import akka.dispatch._ -import akka.config.Config +import akka.config._ import akka.config.Supervision._ import akka.util._ import akka.serialization.{ Format, Serializer } import ReflectiveAccess._ import ClusterModule._ -import DeploymentConfig.{ ReplicationStrategy, Transient, WriteThrough, WriteBehind } +import DeploymentConfig.{ ReplicationScheme, Replication, Transient, WriteThrough, WriteBehind } import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference @@ -515,7 +515,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S class LocalActorRef private[akka] ( private[this] val actorFactory: () ⇒ Actor, val address: String, - replicationStrategy: ReplicationStrategy) + replicationScheme: ReplicationScheme) extends ActorRef with ScalaActorRef { protected[akka] val guard = new ReentrantGuard @@ -543,24 +543,39 @@ class LocalActorRef private[akka] ( protected[akka] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) } - private val isReplicated: Boolean = replicationStrategy match { - case Transient ⇒ false - case _ ⇒ true + private val isReplicated: Boolean = replicationScheme match { + case _: Transient | Transient ⇒ false + case _ ⇒ true } // FIXME how to get the matching serializerClassName? Now default is used. Needed for transaction log snapshot private val serializer = Actor.serializerFor(address, Format.defaultSerializerName) - private lazy val txLog: TransactionLog = { - val log = replicationStrategy match { - case Transient ⇒ throw new IllegalStateException("Can not replicate 'transient' actor [" + toString + "]") - case WriteThrough ⇒ transactionLog.newLogFor(_uuid.toString, false, replicationStrategy, serializer) - case WriteBehind ⇒ transactionLog.newLogFor(_uuid.toString, true, replicationStrategy, serializer) + private lazy val replicationStorage: Either[TransactionLog, AnyRef] = { + replicationScheme match { + case _: Transient | Transient ⇒ + throw new IllegalStateException("Can not replicate 'transient' actor [" + toString + "]") + + case Replication(storage, strategy) ⇒ + val isWriteBehind = strategy match { + case _: WriteBehind | WriteBehind ⇒ true + case _: WriteThrough | WriteThrough ⇒ false + } + + storage match { + case _: DeploymentConfig.TransactionLog | DeploymentConfig.TransactionLog ⇒ + EventHandler.debug(this, + "Creating a transaction log for Actor [%s] with replication strategy [%s]" + .format(address, replicationScheme)) + Left(transactionLog.newLogFor(_uuid.toString, isWriteBehind, replicationScheme, serializer)) + + case _: DeploymentConfig.DataGrid | DeploymentConfig.DataGrid ⇒ + throw new ConfigurationException("Replication storage type \"data-grid\" is not yet supported") + + case unknown ⇒ + throw new ConfigurationException("Unknown replication storage type [" + unknown + "]") + } } - EventHandler.debug(this, - "Creating a transaction log for Actor [%s] with replication strategy [%s]" - .format(address, replicationStrategy)) - log } //If it was started inside "newActor", initialize it @@ -576,7 +591,7 @@ class LocalActorRef private[akka] ( __supervisor: Option[ActorRef], __hotswap: Stack[PartialFunction[Any, Unit]], __factory: () ⇒ Actor, - __replicationStrategy: ReplicationStrategy) = { + __replicationStrategy: ReplicationScheme) = { this(__factory, __address, __replicationStrategy) @@ -652,7 +667,9 @@ class LocalActorRef private[akka] ( } } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") - if (isReplicated) txLog.delete() + if (isReplicated) { + if (replicationStorage.isLeft) replicationStorage.left.get.delete() + } } } @@ -774,7 +791,7 @@ class LocalActorRef private[akka] ( } finally { guard.lock.unlock() if (isReplicated) { - txLog.recordEntry(messageHandle, this) + if (replicationStorage.isLeft) replicationStorage.left.get.recordEntry(messageHandle, this) } } } diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 07b258cd88..bfbf1441ef 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -62,8 +62,8 @@ object DeploymentConfig { sealed trait Scope case class Clustered( home: Home = Host("localhost"), - replication: Replication = NoReplicas, - state: State = Stateless) extends Scope + replicas: Replicas = NoReplicas, + replication: ReplicationScheme = Transient) extends Scope // For Java API case class Local() extends Scope @@ -80,34 +80,34 @@ object DeploymentConfig { case class IP(ipAddress: String) extends Home // -------------------------------- - // --- Replication + // --- Replicas // -------------------------------- - sealed trait Replication - case class Replicate(factor: Int) extends Replication { - if (factor < 1) throw new IllegalArgumentException("Replication factor can not be negative or zero") + sealed trait Replicas + case class Replicate(factor: Int) extends Replicas { + if (factor < 1) throw new IllegalArgumentException("Replicas factor can not be negative or zero") } // For Java API - case class AutoReplicate() extends Replication - case class NoReplicas() extends Replication + case class AutoReplicate() extends Replicas + case class NoReplicas() extends Replicas // For Scala API - case object AutoReplicate extends Replication - case object NoReplicas extends Replication + case object AutoReplicate extends Replicas + case object NoReplicas extends Replicas // -------------------------------- - // --- State + // --- Replication // -------------------------------- - sealed trait State + sealed trait ReplicationScheme // For Java API - case class Stateless() extends State + case class Transient() extends ReplicationScheme // For Scala API - case object Stateless extends State - case class Stateful( + case object Transient extends ReplicationScheme + case class Replication( storage: ReplicationStorage, - strategy: ReplicationStrategy) extends State + strategy: ReplicationStrategy) extends ReplicationScheme // -------------------------------- // --- ReplicationStorage @@ -130,12 +130,10 @@ object DeploymentConfig { // For Java API case class WriteBehind() extends ReplicationStrategy case class WriteThrough() extends ReplicationStrategy - case class Transient() extends ReplicationStrategy // For Scala API case object WriteBehind extends ReplicationStrategy case object WriteThrough extends ReplicationStrategy - case object Transient extends ReplicationStrategy // -------------------------------- // --- Helper methods for parsing @@ -147,7 +145,7 @@ object DeploymentConfig { case Node(nodename) ⇒ nodename == Config.nodename } - def replicaValueFor(replication: Replication): Int = replication match { + def replicaValueFor(replicas: Replicas): Int = replicas match { case Replicate(replicas) ⇒ replicas case AutoReplicate ⇒ -1 case AutoReplicate() ⇒ -1 @@ -170,6 +168,11 @@ object DeploymentConfig { case LeastMessages() ⇒ RouterType.LeastMessages case c: CustomRouter ⇒ throw new UnsupportedOperationException("routerTypeFor: " + c) } + + def isReplicationAsync(strategy: ReplicationStrategy): Boolean = strategy match { + case _: WriteBehind | WriteBehind ⇒ true + case _: WriteThrough | WriteThrough ⇒ false + } } /** @@ -375,27 +378,30 @@ object Deployer { } // -------------------------------- - // akka.actor.deployment.
.clustered.stateful + // akka.actor.deployment.
.clustered.replication // -------------------------------- - clusteredConfig.getSection("stateful") match { + clusteredConfig.getSection("replication") match { case None ⇒ - Some(Deploy(address, router, format, Clustered(home, replicas, Stateless))) + Some(Deploy(address, router, format, Clustered(home, replicas, Transient))) - case Some(statefulConfig) ⇒ - val storage = statefulConfig.getString("replication-storage", "transaction-log") match { + case Some(replicationConfig) ⇒ + val storage = replicationConfig.getString("storage", "transaction-log") match { case "transaction-log" ⇒ TransactionLog case "data-grid" ⇒ DataGrid case unknown ⇒ throw new ConfigurationException("Config option [" + addressPath + - ".clustered.stateful.replication-storage] needs to be either [\"transaction-log\"] or [\"data-grid\"] - was [" + + ".clustered.replication.storage] needs to be either [\"transaction-log\"] or [\"data-grid\"] - was [" + unknown + "]") } - val strategy = statefulConfig.getString("replication-strategy", "write-through") match { + val strategy = replicationConfig.getString("strategy", "write-through") match { case "write-through" ⇒ WriteThrough case "write-behind" ⇒ WriteBehind - case unknown ⇒ Transient + case unknown ⇒ + throw new ConfigurationException("Config option [" + addressPath + + ".clustered.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" + + unknown + "]") } - Some(Deploy(address, router, format, Clustered(home, replicas, Stateful(storage, strategy)))) + Some(Deploy(address, router, format, Clustered(home, replicas, Replication(storage, strategy)))) } } } @@ -459,7 +465,8 @@ object Address { def validate(address: String) { if (validAddressPattern.matcher(address).matches) true else { - val e = new IllegalArgumentException("Address [" + address + "] is not valid, need to follow pattern [0-9a-zA-Z\\-\\_\\$]+") + val e = new IllegalArgumentException( + "Address [" + address + "] is not valid, need to follow pattern [0-9a-zA-Z\\-\\_\\$]+") EventHandler.error(e, this, e.getMessage) throw e } diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala index 7920121e95..82155ebc90 100644 --- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala @@ -185,7 +185,7 @@ trait ClusterNode { * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * available durable store. */ - def store[T <: Actor](address: String, actorClass: Class[T], replicationStrategy: ReplicationStrategy, format: Serializer): ClusterNode + def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, format: Serializer): ClusterNode /** * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated @@ -199,7 +199,7 @@ trait ClusterNode { * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * available durable store. */ - def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationStrategy: ReplicationStrategy, format: Serializer): ClusterNode + def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode /** * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated @@ -213,7 +213,7 @@ trait ClusterNode { * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * available durable store. */ - def store[T <: Actor](address: String, actorClass: Class[T], replicationStrategy: ReplicationStrategy, serializeMailbox: Boolean, format: Serializer): ClusterNode + def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode /** * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated @@ -227,7 +227,7 @@ trait ClusterNode { * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * available durable store. */ - def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationStrategy: ReplicationStrategy, serializeMailbox: Boolean, format: Serializer): ClusterNode + def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode /** * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated @@ -241,7 +241,7 @@ trait ClusterNode { * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * available durable store. */ - def store(actorRef: ActorRef, replicationStrategy: ReplicationStrategy, format: Serializer): ClusterNode + def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode /** * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated @@ -255,7 +255,7 @@ trait ClusterNode { * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * available durable store. */ - def store(actorRef: ActorRef, replicationFactor: Int, replicationStrategy: ReplicationStrategy, format: Serializer): ClusterNode + def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode /** * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated @@ -269,7 +269,7 @@ trait ClusterNode { * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * available durable store. */ - def store(actorRef: ActorRef, replicationStrategy: ReplicationStrategy, serializeMailbox: Boolean, format: Serializer): ClusterNode + def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode /** * Needed to have reflection through structural typing work. @@ -279,7 +279,7 @@ trait ClusterNode { /** * Needed to have reflection through structural typing work. */ - def store(actorRef: ActorRef, replicationFactor: Int, replicationStrategy: ReplicationStrategy, serializeMailbox: Boolean, format: AnyRef): ClusterNode + def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: AnyRef): ClusterNode /** * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated @@ -293,7 +293,7 @@ trait ClusterNode { * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * available durable store. */ - def store(actorRef: ActorRef, replicationFactor: Int, replicationStrategy: ReplicationStrategy, serializeMailbox: Boolean, format: Serializer): ClusterNode + def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode /** * Removes actor with uuid from the cluster. diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index c7374a1b12..42fd88a78f 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -8,7 +8,7 @@ import akka.dispatch.{ Future, Promise, MessageInvocation } import akka.config.{ Config, ModuleNotAvailableException } import akka.remoteinterface.RemoteSupport import akka.actor._ -import DeploymentConfig.{ Deploy, ReplicationStrategy } +import DeploymentConfig.{ Deploy, ReplicationScheme, ReplicationStrategy } import akka.event.EventHandler import akka.serialization.Format import akka.cluster.ClusterNode @@ -111,13 +111,13 @@ object ReflectiveAccess { def newLogFor( id: String, isAsync: Boolean, - replicationStrategy: ReplicationStrategy, + replicationScheme: ReplicationScheme, format: Serializer): TransactionLog def logFor( id: String, isAsync: Boolean, - replicationStrategy: ReplicationStrategy, + replicationScheme: ReplicationScheme, format: Serializer): TransactionLog def shutdown() diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index b516aa6e47..a5b76b646c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -29,7 +29,7 @@ import Helpers._ import akka.actor._ import Actor._ import Status._ -import DeploymentConfig.{ ReplicationStrategy, Transient, WriteThrough, WriteBehind } +import DeploymentConfig.{ ReplicationScheme, ReplicationStrategy, Transient, WriteThrough, WriteBehind } import akka.event.EventHandler import akka.dispatch.{ Dispatchers, Future } import akka.remoteinterface._ @@ -471,8 +471,8 @@ class DefaultClusterNode private[akka] ( * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * available durable store. */ - def store[T <: Actor](address: String, actorClass: Class[T], replicationStrategy: ReplicationStrategy, format: Serializer): ClusterNode = - store(Actor.actorOf(actorClass, address).start, 0, replicationStrategy, false, format) + def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, format: Serializer): ClusterNode = + store(Actor.actorOf(actorClass, address).start, 0, replicationScheme, false, format) /** * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated @@ -487,8 +487,8 @@ class DefaultClusterNode private[akka] ( * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * available durable store. */ - def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationStrategy: ReplicationStrategy, format: Serializer): ClusterNode = - store(Actor.actorOf(actorClass, address).start, replicationFactor, replicationStrategy, false, format) + def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode = + store(Actor.actorOf(actorClass, address).start, replicationFactor, replicationScheme, false, format) /** * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated @@ -503,8 +503,8 @@ class DefaultClusterNode private[akka] ( * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * available durable store. */ - def store[T <: Actor](address: String, actorClass: Class[T], replicationStrategy: ReplicationStrategy, serializeMailbox: Boolean, format: Serializer): ClusterNode = - store(Actor.actorOf(actorClass, address).start, 0, replicationStrategy, serializeMailbox, format) + def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode = + store(Actor.actorOf(actorClass, address).start, 0, replicationScheme, serializeMailbox, format) /** * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated @@ -519,8 +519,8 @@ class DefaultClusterNode private[akka] ( * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * available durable store. */ - def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationStrategy: ReplicationStrategy, serializeMailbox: Boolean, format: Serializer): ClusterNode = - store(Actor.actorOf(actorClass, address).start, replicationFactor, replicationStrategy, serializeMailbox, format) + def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode = + store(Actor.actorOf(actorClass, address).start, replicationFactor, replicationScheme, serializeMailbox, format) /** * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated @@ -535,8 +535,8 @@ class DefaultClusterNode private[akka] ( * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * available durable store. */ - def store(actorRef: ActorRef, replicationStrategy: ReplicationStrategy, format: Serializer): ClusterNode = - store(actorRef, 0, replicationStrategy, false, format) + def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode = + store(actorRef, 0, replicationScheme, false, format) /** * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated @@ -551,8 +551,8 @@ class DefaultClusterNode private[akka] ( * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * available durable store. */ - def store(actorRef: ActorRef, replicationFactor: Int, replicationStrategy: ReplicationStrategy, format: Serializer): ClusterNode = - store(actorRef, replicationFactor, replicationStrategy, false, format) + def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode = + store(actorRef, replicationFactor, replicationScheme, false, format) /** * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated @@ -575,14 +575,14 @@ class DefaultClusterNode private[akka] ( * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * available durable store. */ - def store(actorRef: ActorRef, replicationStrategy: ReplicationStrategy, serializeMailbox: Boolean, format: Serializer): ClusterNode = - store(actorRef, 0, replicationStrategy, serializeMailbox, format) + def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode = + store(actorRef, 0, replicationScheme, serializeMailbox, format) /** * Needed to have reflection through structural typing work. */ - def store(actorRef: ActorRef, replicationFactor: Int, replicationStrategy: ReplicationStrategy, serializeMailbox: Boolean, format: AnyRef): ClusterNode = - store(actorRef, replicationFactor, replicationStrategy, serializeMailbox, format.asInstanceOf[Serializer]) + def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: AnyRef): ClusterNode = + store(actorRef, replicationFactor, replicationScheme, serializeMailbox, format.asInstanceOf[Serializer]) /** * Needed to have reflection through structural typing work. @@ -598,7 +598,7 @@ class DefaultClusterNode private[akka] ( def store( actorRef: ActorRef, replicationFactor: Int, - replicationStrategy: ReplicationStrategy, + replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode = if (isConnected.isOn) { @@ -612,8 +612,8 @@ class DefaultClusterNode private[akka] ( "Storing actor [%s] with UUID [%s] in cluster".format(actorRef.address, uuid)) val actorBytes = - if (shouldCompressData) LZF.compress(toBinary(actorRef, serializeMailbox, replicationStrategy)(format)) - else toBinary(actorRef, serializeMailbox, replicationStrategy)(format) + if (shouldCompressData) LZF.compress(toBinary(actorRef, serializeMailbox, replicationScheme)(format)) + else toBinary(actorRef, serializeMailbox, replicationScheme)(format) val actorRegistryPath = actorRegistryPathFor(uuid) diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index 7efec7df23..281d2f91e5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -14,7 +14,7 @@ import akka.config._ import Config._ import akka.util._ import akka.actor._ -import DeploymentConfig.{ ReplicationStrategy, Transient, WriteThrough, WriteBehind } +import DeploymentConfig.{ ReplicationScheme, ReplicationStrategy, Transient, WriteThrough, WriteBehind } import akka.event.EventHandler import akka.dispatch.{ DefaultPromise, Promise, MessageInvocation } import akka.remote.MessageSerializer @@ -52,7 +52,7 @@ class TransactionLog private ( ledger: LedgerHandle, val id: String, val isAsync: Boolean, - replicationStrategy: ReplicationStrategy, + replicationScheme: ReplicationScheme, format: Serializer) { import TransactionLog._ @@ -71,8 +71,8 @@ class TransactionLog private ( if (nrOfEntries.incrementAndGet % snapshotFrequency == 0) { val snapshot = // FIXME ReplicationStrategy Transient is always used - if (Cluster.shouldCompressData) LZF.compress(toBinary(actorRef, false, replicationStrategy)(format)) - else toBinary(actorRef, false, replicationStrategy)(format) + if (Cluster.shouldCompressData) LZF.compress(toBinary(actorRef, false, replicationScheme)(format)) + else toBinary(actorRef, false, replicationScheme)(format) recordSnapshot(snapshot) } recordEntry(MessageSerializer.serialize(messageHandle.message).toByteArray) @@ -371,9 +371,9 @@ object TransactionLog { ledger: LedgerHandle, id: String, isAsync: Boolean, - replicationStrategy: ReplicationStrategy, + replicationScheme: ReplicationScheme, format: Serializer) = - new TransactionLog(ledger, id, isAsync, replicationStrategy, format) + new TransactionLog(ledger, id, isAsync, replicationScheme, format) /** * Shuts down the transaction log. @@ -397,7 +397,7 @@ object TransactionLog { def newLogFor( id: String, isAsync: Boolean, - replicationStrategy: ReplicationStrategy, + replicationScheme: ReplicationScheme, format: Serializer): TransactionLog = { val txLogPath = transactionLogNode + "/" + id @@ -443,7 +443,7 @@ object TransactionLog { } EventHandler.info(this, "Created new transaction log [%s] for UUID [%s]".format(logId, id)) - TransactionLog(ledger, id, isAsync, replicationStrategy, format) + TransactionLog(ledger, id, isAsync, replicationScheme, format) } /** @@ -452,7 +452,7 @@ object TransactionLog { def logFor( id: String, isAsync: Boolean, - replicationStrategy: ReplicationStrategy, + replicationScheme: ReplicationScheme, format: Serializer): TransactionLog = { val txLogPath = transactionLogNode + "/" + id @@ -493,7 +493,7 @@ object TransactionLog { case e ⇒ handleError(e) } - TransactionLog(ledger, id, isAsync, replicationStrategy, format) + TransactionLog(ledger, id, isAsync, replicationScheme, format) } private[akka] def await[T](future: Promise[T]): T = { diff --git a/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java index 29e490103e..b643a9a750 100644 --- a/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java +++ b/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java @@ -77,25 +77,94 @@ public final class RemoteProtocol { // @@protoc_insertion_point(enum_scope:CommandType) } - public enum ReplicationStrategyType + public enum ReplicationStorageType implements com.google.protobuf.ProtocolMessageEnum { TRANSIENT(0, 1), - WRITE_THROUGH(1, 2), - WRITE_BEHIND(2, 3), + TRANSACTION_LOG(1, 2), + DATA_GRID(2, 3), ; public static final int TRANSIENT_VALUE = 1; - public static final int WRITE_THROUGH_VALUE = 2; - public static final int WRITE_BEHIND_VALUE = 3; + public static final int TRANSACTION_LOG_VALUE = 2; + public static final int DATA_GRID_VALUE = 3; + + + public final int getNumber() { return value; } + + public static ReplicationStorageType valueOf(int value) { + switch (value) { + case 1: return TRANSIENT; + case 2: return TRANSACTION_LOG; + case 3: return DATA_GRID; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static com.google.protobuf.Internal.EnumLiteMap + internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap() { + public ReplicationStorageType findValueByNumber(int number) { + return ReplicationStorageType.valueOf(number); + } + }; + + public final com.google.protobuf.Descriptors.EnumValueDescriptor + getValueDescriptor() { + return getDescriptor().getValues().get(index); + } + public final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptorForType() { + return getDescriptor(); + } + public static final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptor() { + return akka.remote.protocol.RemoteProtocol.getDescriptor().getEnumTypes().get(1); + } + + private static final ReplicationStorageType[] VALUES = { + TRANSIENT, TRANSACTION_LOG, DATA_GRID, + }; + + public static ReplicationStorageType valueOf( + com.google.protobuf.Descriptors.EnumValueDescriptor desc) { + if (desc.getType() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "EnumValueDescriptor is not for this type."); + } + return VALUES[desc.getIndex()]; + } + + private final int index; + private final int value; + + private ReplicationStorageType(int index, int value) { + this.index = index; + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:ReplicationStorageType) + } + + public enum ReplicationStrategyType + implements com.google.protobuf.ProtocolMessageEnum { + WRITE_THROUGH(0, 1), + WRITE_BEHIND(1, 2), + ; + + public static final int WRITE_THROUGH_VALUE = 1; + public static final int WRITE_BEHIND_VALUE = 2; public final int getNumber() { return value; } public static ReplicationStrategyType valueOf(int value) { switch (value) { - case 1: return TRANSIENT; - case 2: return WRITE_THROUGH; - case 3: return WRITE_BEHIND; + case 1: return WRITE_THROUGH; + case 2: return WRITE_BEHIND; default: return null; } } @@ -122,11 +191,11 @@ public final class RemoteProtocol { } public static final com.google.protobuf.Descriptors.EnumDescriptor getDescriptor() { - return akka.remote.protocol.RemoteProtocol.getDescriptor().getEnumTypes().get(1); + return akka.remote.protocol.RemoteProtocol.getDescriptor().getEnumTypes().get(2); } private static final ReplicationStrategyType[] VALUES = { - TRANSIENT, WRITE_THROUGH, WRITE_BEHIND, + WRITE_THROUGH, WRITE_BEHIND, }; public static ReplicationStrategyType valueOf( @@ -200,7 +269,7 @@ public final class RemoteProtocol { } public static final com.google.protobuf.Descriptors.EnumDescriptor getDescriptor() { - return akka.remote.protocol.RemoteProtocol.getDescriptor().getEnumTypes().get(2); + return akka.remote.protocol.RemoteProtocol.getDescriptor().getEnumTypes().get(3); } private static final SerializationSchemeType[] VALUES = { @@ -269,7 +338,7 @@ public final class RemoteProtocol { } public static final com.google.protobuf.Descriptors.EnumDescriptor getDescriptor() { - return akka.remote.protocol.RemoteProtocol.getDescriptor().getEnumTypes().get(3); + return akka.remote.protocol.RemoteProtocol.getDescriptor().getEnumTypes().get(4); } private static final LifeCycleType[] VALUES = { @@ -3459,11 +3528,15 @@ public final class RemoteProtocol { boolean hasHotswapStack(); com.google.protobuf.ByteString getHotswapStack(); - // optional .ReplicationStrategyType replicationStrategy = 11; + // optional .ReplicationStorageType replicationStorage = 11; + boolean hasReplicationStorage(); + akka.remote.protocol.RemoteProtocol.ReplicationStorageType getReplicationStorage(); + + // optional .ReplicationStrategyType replicationStrategy = 12; boolean hasReplicationStrategy(); akka.remote.protocol.RemoteProtocol.ReplicationStrategyType getReplicationStrategy(); - // repeated .RemoteMessageProtocol messages = 12; + // repeated .RemoteMessageProtocol messages = 13; java.util.List getMessagesList(); akka.remote.protocol.RemoteProtocol.RemoteMessageProtocol getMessages(int index); @@ -3677,18 +3750,28 @@ public final class RemoteProtocol { return hotswapStack_; } - // optional .ReplicationStrategyType replicationStrategy = 11; - public static final int REPLICATIONSTRATEGY_FIELD_NUMBER = 11; + // optional .ReplicationStorageType replicationStorage = 11; + public static final int REPLICATIONSTORAGE_FIELD_NUMBER = 11; + private akka.remote.protocol.RemoteProtocol.ReplicationStorageType replicationStorage_; + public boolean hasReplicationStorage() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + public akka.remote.protocol.RemoteProtocol.ReplicationStorageType getReplicationStorage() { + return replicationStorage_; + } + + // optional .ReplicationStrategyType replicationStrategy = 12; + public static final int REPLICATIONSTRATEGY_FIELD_NUMBER = 12; private akka.remote.protocol.RemoteProtocol.ReplicationStrategyType replicationStrategy_; public boolean hasReplicationStrategy() { - return ((bitField0_ & 0x00000400) == 0x00000400); + return ((bitField0_ & 0x00000800) == 0x00000800); } public akka.remote.protocol.RemoteProtocol.ReplicationStrategyType getReplicationStrategy() { return replicationStrategy_; } - // repeated .RemoteMessageProtocol messages = 12; - public static final int MESSAGES_FIELD_NUMBER = 12; + // repeated .RemoteMessageProtocol messages = 13; + public static final int MESSAGES_FIELD_NUMBER = 13; private java.util.List messages_; public java.util.List getMessagesList() { return messages_; @@ -3719,7 +3802,8 @@ public final class RemoteProtocol { lifeCycle_ = akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.getDefaultInstance(); supervisor_ = akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.getDefaultInstance(); hotswapStack_ = com.google.protobuf.ByteString.EMPTY; - replicationStrategy_ = akka.remote.protocol.RemoteProtocol.ReplicationStrategyType.TRANSIENT; + replicationStorage_ = akka.remote.protocol.RemoteProtocol.ReplicationStorageType.TRANSIENT; + replicationStrategy_ = akka.remote.protocol.RemoteProtocol.ReplicationStrategyType.WRITE_THROUGH; messages_ = java.util.Collections.emptyList(); } private byte memoizedIsInitialized = -1; @@ -3799,10 +3883,13 @@ public final class RemoteProtocol { output.writeBytes(10, hotswapStack_); } if (((bitField0_ & 0x00000400) == 0x00000400)) { - output.writeEnum(11, replicationStrategy_.getNumber()); + output.writeEnum(11, replicationStorage_.getNumber()); + } + if (((bitField0_ & 0x00000800) == 0x00000800)) { + output.writeEnum(12, replicationStrategy_.getNumber()); } for (int i = 0; i < messages_.size(); i++) { - output.writeMessage(12, messages_.get(i)); + output.writeMessage(13, messages_.get(i)); } getUnknownFields().writeTo(output); } @@ -3855,11 +3942,15 @@ public final class RemoteProtocol { } if (((bitField0_ & 0x00000400) == 0x00000400)) { size += com.google.protobuf.CodedOutputStream - .computeEnumSize(11, replicationStrategy_.getNumber()); + .computeEnumSize(11, replicationStorage_.getNumber()); + } + if (((bitField0_ & 0x00000800) == 0x00000800)) { + size += com.google.protobuf.CodedOutputStream + .computeEnumSize(12, replicationStrategy_.getNumber()); } for (int i = 0; i < messages_.size(); i++) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(12, messages_.get(i)); + .computeMessageSize(13, messages_.get(i)); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -4021,11 +4112,13 @@ public final class RemoteProtocol { bitField0_ = (bitField0_ & ~0x00000100); hotswapStack_ = com.google.protobuf.ByteString.EMPTY; bitField0_ = (bitField0_ & ~0x00000200); - replicationStrategy_ = akka.remote.protocol.RemoteProtocol.ReplicationStrategyType.TRANSIENT; + replicationStorage_ = akka.remote.protocol.RemoteProtocol.ReplicationStorageType.TRANSIENT; bitField0_ = (bitField0_ & ~0x00000400); + replicationStrategy_ = akka.remote.protocol.RemoteProtocol.ReplicationStrategyType.WRITE_THROUGH; + bitField0_ = (bitField0_ & ~0x00000800); if (messagesBuilder_ == null) { messages_ = java.util.Collections.emptyList(); - bitField0_ = (bitField0_ & ~0x00000800); + bitField0_ = (bitField0_ & ~0x00001000); } else { messagesBuilder_.clear(); } @@ -4122,11 +4215,15 @@ public final class RemoteProtocol { if (((from_bitField0_ & 0x00000400) == 0x00000400)) { to_bitField0_ |= 0x00000400; } + result.replicationStorage_ = replicationStorage_; + if (((from_bitField0_ & 0x00000800) == 0x00000800)) { + to_bitField0_ |= 0x00000800; + } result.replicationStrategy_ = replicationStrategy_; if (messagesBuilder_ == null) { - if (((bitField0_ & 0x00000800) == 0x00000800)) { + if (((bitField0_ & 0x00001000) == 0x00001000)) { messages_ = java.util.Collections.unmodifiableList(messages_); - bitField0_ = (bitField0_ & ~0x00000800); + bitField0_ = (bitField0_ & ~0x00001000); } result.messages_ = messages_; } else { @@ -4178,6 +4275,9 @@ public final class RemoteProtocol { if (other.hasHotswapStack()) { setHotswapStack(other.getHotswapStack()); } + if (other.hasReplicationStorage()) { + setReplicationStorage(other.getReplicationStorage()); + } if (other.hasReplicationStrategy()) { setReplicationStrategy(other.getReplicationStrategy()); } @@ -4185,7 +4285,7 @@ public final class RemoteProtocol { if (!other.messages_.isEmpty()) { if (messages_.isEmpty()) { messages_ = other.messages_; - bitField0_ = (bitField0_ & ~0x00000800); + bitField0_ = (bitField0_ & ~0x00001000); } else { ensureMessagesIsMutable(); messages_.addAll(other.messages_); @@ -4198,7 +4298,7 @@ public final class RemoteProtocol { messagesBuilder_.dispose(); messagesBuilder_ = null; messages_ = other.messages_; - bitField0_ = (bitField0_ & ~0x00000800); + bitField0_ = (bitField0_ & ~0x00001000); messagesBuilder_ = com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ? getMessagesFieldBuilder() : null; @@ -4336,16 +4436,27 @@ public final class RemoteProtocol { } case 88: { int rawValue = input.readEnum(); - akka.remote.protocol.RemoteProtocol.ReplicationStrategyType value = akka.remote.protocol.RemoteProtocol.ReplicationStrategyType.valueOf(rawValue); + akka.remote.protocol.RemoteProtocol.ReplicationStorageType value = akka.remote.protocol.RemoteProtocol.ReplicationStorageType.valueOf(rawValue); if (value == null) { unknownFields.mergeVarintField(11, rawValue); } else { bitField0_ |= 0x00000400; + replicationStorage_ = value; + } + break; + } + case 96: { + int rawValue = input.readEnum(); + akka.remote.protocol.RemoteProtocol.ReplicationStrategyType value = akka.remote.protocol.RemoteProtocol.ReplicationStrategyType.valueOf(rawValue); + if (value == null) { + unknownFields.mergeVarintField(12, rawValue); + } else { + bitField0_ |= 0x00000800; replicationStrategy_ = value; } break; } - case 98: { + case 106: { akka.remote.protocol.RemoteProtocol.RemoteMessageProtocol.Builder subBuilder = akka.remote.protocol.RemoteProtocol.RemoteMessageProtocol.newBuilder(); input.readMessage(subBuilder, extensionRegistry); addMessages(subBuilder.buildPartial()); @@ -4825,11 +4936,35 @@ public final class RemoteProtocol { return this; } - // optional .ReplicationStrategyType replicationStrategy = 11; - private akka.remote.protocol.RemoteProtocol.ReplicationStrategyType replicationStrategy_ = akka.remote.protocol.RemoteProtocol.ReplicationStrategyType.TRANSIENT; - public boolean hasReplicationStrategy() { + // optional .ReplicationStorageType replicationStorage = 11; + private akka.remote.protocol.RemoteProtocol.ReplicationStorageType replicationStorage_ = akka.remote.protocol.RemoteProtocol.ReplicationStorageType.TRANSIENT; + public boolean hasReplicationStorage() { return ((bitField0_ & 0x00000400) == 0x00000400); } + public akka.remote.protocol.RemoteProtocol.ReplicationStorageType getReplicationStorage() { + return replicationStorage_; + } + public Builder setReplicationStorage(akka.remote.protocol.RemoteProtocol.ReplicationStorageType value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000400; + replicationStorage_ = value; + onChanged(); + return this; + } + public Builder clearReplicationStorage() { + bitField0_ = (bitField0_ & ~0x00000400); + replicationStorage_ = akka.remote.protocol.RemoteProtocol.ReplicationStorageType.TRANSIENT; + onChanged(); + return this; + } + + // optional .ReplicationStrategyType replicationStrategy = 12; + private akka.remote.protocol.RemoteProtocol.ReplicationStrategyType replicationStrategy_ = akka.remote.protocol.RemoteProtocol.ReplicationStrategyType.WRITE_THROUGH; + public boolean hasReplicationStrategy() { + return ((bitField0_ & 0x00000800) == 0x00000800); + } public akka.remote.protocol.RemoteProtocol.ReplicationStrategyType getReplicationStrategy() { return replicationStrategy_; } @@ -4837,25 +4972,25 @@ public final class RemoteProtocol { if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000400; + bitField0_ |= 0x00000800; replicationStrategy_ = value; onChanged(); return this; } public Builder clearReplicationStrategy() { - bitField0_ = (bitField0_ & ~0x00000400); - replicationStrategy_ = akka.remote.protocol.RemoteProtocol.ReplicationStrategyType.TRANSIENT; + bitField0_ = (bitField0_ & ~0x00000800); + replicationStrategy_ = akka.remote.protocol.RemoteProtocol.ReplicationStrategyType.WRITE_THROUGH; onChanged(); return this; } - // repeated .RemoteMessageProtocol messages = 12; + // repeated .RemoteMessageProtocol messages = 13; private java.util.List messages_ = java.util.Collections.emptyList(); private void ensureMessagesIsMutable() { - if (!((bitField0_ & 0x00000800) == 0x00000800)) { + if (!((bitField0_ & 0x00001000) == 0x00001000)) { messages_ = new java.util.ArrayList(messages_); - bitField0_ |= 0x00000800; + bitField0_ |= 0x00001000; } } @@ -4971,7 +5106,7 @@ public final class RemoteProtocol { public Builder clearMessages() { if (messagesBuilder_ == null) { messages_ = java.util.Collections.emptyList(); - bitField0_ = (bitField0_ & ~0x00000800); + bitField0_ = (bitField0_ & ~0x00001000); onChanged(); } else { messagesBuilder_.clear(); @@ -5027,7 +5162,7 @@ public final class RemoteProtocol { messagesBuilder_ = new com.google.protobuf.RepeatedFieldBuilder< akka.remote.protocol.RemoteProtocol.RemoteMessageProtocol, akka.remote.protocol.RemoteProtocol.RemoteMessageProtocol.Builder, akka.remote.protocol.RemoteProtocol.RemoteMessageProtocolOrBuilder>( messages_, - ((bitField0_ & 0x00000800) == 0x00000800), + ((bitField0_ & 0x00001000) == 0x00001000), getParentForChildren(), isClean()); messages_ = null; @@ -8909,38 +9044,41 @@ public final class RemoteProtocol { "\013commandType\030\002 \002(\0162\014.CommandType\"U\n\026Remo" + "teActorRefProtocol\022\017\n\007address\030\001 \002(\t\022\031\n\021i" + "netSocketAddress\030\002 \002(\014\022\017\n\007timeout\030\003 \001(\004\"" + - "\212\003\n\032SerializedActorRefProtocol\022\033\n\004uuid\030\001" + + "\277\003\n\032SerializedActorRefProtocol\022\033\n\004uuid\030\001" + " \002(\0132\r.UuidProtocol\022\017\n\007address\030\002 \002(\t\022\026\n\016" + "actorClassname\030\003 \002(\t\022\025\n\ractorInstance\030\004 " + "\001(\014\022\033\n\023serializerClassname\030\005 \001(\t\022\017\n\007time" + "out\030\006 \001(\004\022\026\n\016receiveTimeout\030\007 \001(\004\022%\n\tlif", "eCycle\030\010 \001(\0132\022.LifeCycleProtocol\022+\n\nsupe" + "rvisor\030\t \001(\0132\027.RemoteActorRefProtocol\022\024\n" + - "\014hotswapStack\030\n \001(\014\0225\n\023replicationStrate" + - "gy\030\013 \001(\0162\030.ReplicationStrategyType\022(\n\010me" + - "ssages\030\014 \003(\0132\026.RemoteMessageProtocol\"g\n\037" + - "SerializedTypedActorRefProtocol\022-\n\010actor" + - "Ref\030\001 \002(\0132\033.SerializedActorRefProtocol\022\025" + - "\n\rinterfaceName\030\002 \002(\t\"r\n\017MessageProtocol" + - "\0225\n\023serializationScheme\030\001 \002(\0162\030.Serializ" + - "ationSchemeType\022\017\n\007message\030\002 \002(\014\022\027\n\017mess", - "ageManifest\030\003 \001(\014\"R\n\021ActorInfoProtocol\022\033" + - "\n\004uuid\030\001 \002(\0132\r.UuidProtocol\022\017\n\007timeout\030\002" + - " \002(\004\022\017\n\007address\030\003 \001(\t\")\n\014UuidProtocol\022\014\n" + - "\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025MetadataEntr" + - "yProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"6\n" + - "\021LifeCycleProtocol\022!\n\tlifeCycle\030\001 \002(\0162\016." + - "LifeCycleType\"1\n\017AddressProtocol\022\020\n\010host" + - "name\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021ExceptionPro" + - "tocol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007message\030\002 \002(" + - "\t*(\n\013CommandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOW", - "N\020\002*M\n\027ReplicationStrategyType\022\r\n\tTRANSI" + - "ENT\020\001\022\021\n\rWRITE_THROUGH\020\002\022\020\n\014WRITE_BEHIND" + - "\020\003*]\n\027SerializationSchemeType\022\010\n\004JAVA\020\001\022" + - "\013\n\007SBINARY\020\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSO" + - "N\020\004\022\014\n\010PROTOBUF\020\005*-\n\rLifeCycleType\022\r\n\tPE" + - "RMANENT\020\001\022\r\n\tTEMPORARY\020\002B\030\n\024akka.remote." + - "protocolH\001" + "\014hotswapStack\030\n \001(\014\0223\n\022replicationStorag" + + "e\030\013 \001(\0162\027.ReplicationStorageType\0225\n\023repl" + + "icationStrategy\030\014 \001(\0162\030.ReplicationStrat" + + "egyType\022(\n\010messages\030\r \003(\0132\026.RemoteMessag" + + "eProtocol\"g\n\037SerializedTypedActorRefProt" + + "ocol\022-\n\010actorRef\030\001 \002(\0132\033.SerializedActor" + + "RefProtocol\022\025\n\rinterfaceName\030\002 \002(\t\"r\n\017Me" + + "ssageProtocol\0225\n\023serializationScheme\030\001 \002", + "(\0162\030.SerializationSchemeType\022\017\n\007message\030" + + "\002 \002(\014\022\027\n\017messageManifest\030\003 \001(\014\"R\n\021ActorI" + + "nfoProtocol\022\033\n\004uuid\030\001 \002(\0132\r.UuidProtocol" + + "\022\017\n\007timeout\030\002 \002(\004\022\017\n\007address\030\003 \001(\t\")\n\014Uu" + + "idProtocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n" + + "\025MetadataEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005v" + + "alue\030\002 \002(\014\"6\n\021LifeCycleProtocol\022!\n\tlifeC" + + "ycle\030\001 \002(\0162\016.LifeCycleType\"1\n\017AddressPro" + + "tocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n" + + "\021ExceptionProtocol\022\021\n\tclassname\030\001 \002(\t\022\017\n", + "\007message\030\002 \002(\t*(\n\013CommandType\022\013\n\007CONNECT" + + "\020\001\022\014\n\010SHUTDOWN\020\002*K\n\026ReplicationStorageTy" + + "pe\022\r\n\tTRANSIENT\020\001\022\023\n\017TRANSACTION_LOG\020\002\022\r" + + "\n\tDATA_GRID\020\003*>\n\027ReplicationStrategyType" + + "\022\021\n\rWRITE_THROUGH\020\001\022\020\n\014WRITE_BEHIND\020\002*]\n" + + "\027SerializationSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SB" + + "INARY\020\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014" + + "\n\010PROTOBUF\020\005*-\n\rLifeCycleType\022\r\n\tPERMANE" + + "NT\020\001\022\r\n\tTEMPORARY\020\002B\030\n\024akka.remote.proto" + + "colH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -8984,7 +9122,7 @@ public final class RemoteProtocol { internal_static_SerializedActorRefProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SerializedActorRefProtocol_descriptor, - new java.lang.String[] { "Uuid", "Address", "ActorClassname", "ActorInstance", "SerializerClassname", "Timeout", "ReceiveTimeout", "LifeCycle", "Supervisor", "HotswapStack", "ReplicationStrategy", "Messages", }, + new java.lang.String[] { "Uuid", "Address", "ActorClassname", "ActorInstance", "SerializerClassname", "Timeout", "ReceiveTimeout", "LifeCycle", "Supervisor", "HotswapStack", "ReplicationStorage", "ReplicationStrategy", "Messages", }, akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.class, akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.Builder.class); internal_static_SerializedTypedActorRefProtocol_descriptor = diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index 25376004ca..03ea1abc5d 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -46,13 +46,21 @@ enum CommandType { SHUTDOWN = 2; } +/** + * Defines the type of the ReplicationStorage + */ +enum ReplicationStorageType { + TRANSIENT = 1; + TRANSACTION_LOG = 2; + DATA_GRID = 3; +} + /** * Defines the type of the ReplicationStrategy */ enum ReplicationStrategyType { - TRANSIENT = 1; - WRITE_THROUGH = 2; - WRITE_BEHIND = 3; + WRITE_THROUGH = 1; + WRITE_BEHIND = 2; } /** @@ -81,8 +89,9 @@ message SerializedActorRefProtocol { optional LifeCycleProtocol lifeCycle = 8; optional RemoteActorRefProtocol supervisor = 9; optional bytes hotswapStack = 10; - optional ReplicationStrategyType replicationStrategy = 11; - repeated RemoteMessageProtocol messages = 12; + optional ReplicationStorageType replicationStorage = 11; + optional ReplicationStrategyType replicationStrategy = 12; + repeated RemoteMessageProtocol messages = 13; } /** diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index da9ff2c6fc..61e9f69a17 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -7,7 +7,7 @@ package akka.serialization import akka.config.Supervision._ import akka.actor.{ uuidFrom, newUuid } import akka.actor._ -import DeploymentConfig.{ ReplicationStrategy, Transient, WriteThrough, WriteBehind } +import DeploymentConfig._ import akka.dispatch.MessageInvocation import akka.util.ReflectiveAccess import akka.remote.{ RemoteClientSettings, MessageSerializer } @@ -35,8 +35,8 @@ object ActorSerialization { def toBinary[T <: Actor]( a: ActorRef, serializeMailBox: Boolean = true, - replicationStrategy: ReplicationStrategy = Transient)(implicit format: Serializer): Array[Byte] = - toSerializedActorRefProtocol(a, format, serializeMailBox, replicationStrategy).toByteArray + replicationScheme: ReplicationScheme = Transient)(implicit format: Serializer): Array[Byte] = + toSerializedActorRefProtocol(a, format, serializeMailBox, replicationScheme).toByteArray // wrapper for implicits to be used by Java def fromBinaryJ[T <: Actor](bytes: Array[Byte], format: Serializer): ActorRef = @@ -47,14 +47,15 @@ object ActorSerialization { a: ActorRef, format: Serializer, srlMailBox: Boolean, - replicationStrategy: ReplicationStrategy): Array[Byte] = - toBinary(a, srlMailBox, replicationStrategy)(format) + replicationScheme: ReplicationScheme): Array[Byte] = + toBinary(a, srlMailBox, replicationScheme)(format) private[akka] def toSerializedActorRefProtocol[T <: Actor]( actorRef: ActorRef, format: Serializer, serializeMailBox: Boolean, - replicationStrategy: ReplicationStrategy): SerializedActorRefProtocol = { + replicationScheme: ReplicationScheme): SerializedActorRefProtocol = { + val lifeCycleProtocol: Option[LifeCycleProtocol] = { actorRef.lifeCycle match { case Permanent ⇒ Some(LifeCycleProtocol.newBuilder.setLifeCycle(LifeCycleType.PERMANENT).build) @@ -63,18 +64,29 @@ object ActorSerialization { } } - val replicationStrategyType = replicationStrategy match { - case WriteBehind ⇒ ReplicationStrategyType.WRITE_BEHIND - case WriteThrough ⇒ ReplicationStrategyType.WRITE_THROUGH - case Transient ⇒ ReplicationStrategyType.TRANSIENT - } - val builder = SerializedActorRefProtocol.newBuilder .setUuid(UuidProtocol.newBuilder.setHigh(actorRef.uuid.getTime).setLow(actorRef.uuid.getClockSeqAndNode).build) .setAddress(actorRef.address) .setActorClassname(actorRef.actorInstance.get.getClass.getName) .setTimeout(actorRef.timeout) - .setReplicationStrategy(replicationStrategyType) + + replicationScheme match { + case _: Transient | Transient ⇒ + builder.setReplicationStorage(ReplicationStorageType.TRANSIENT) + + case Replication(storage, strategy) ⇒ + val storageType = storage match { + case _: TransactionLog | TransactionLog ⇒ ReplicationStorageType.TRANSACTION_LOG + case _: DataGrid | DataGrid ⇒ ReplicationStorageType.DATA_GRID + } + builder.setReplicationStorage(storageType) + + val strategyType = strategy match { + case _: WriteBehind | WriteBehind ⇒ ReplicationStrategyType.WRITE_BEHIND + case _: WriteThrough | WriteThrough ⇒ ReplicationStrategyType.WRITE_THROUGH + } + builder.setReplicationStrategy(strategyType) + } if (serializeMailBox == true) { if (actorRef.mailbox eq null) throw new IllegalActorStateException("Can't serialize an actor that has not been started.") @@ -133,13 +145,26 @@ object ActorSerialization { if (protocol.hasSupervisor) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader)) else None + import ReplicationStorageType._ import ReplicationStrategyType._ - val replicationStrategy = - if (protocol.hasReplicationStrategy) { - protocol.getReplicationStrategy match { - case TRANSIENT ⇒ Transient - case WRITE_THROUGH ⇒ WriteThrough - case WRITE_BEHIND ⇒ WriteBehind + + val replicationScheme = + if (protocol.hasReplicationStorage) { + protocol.getReplicationStorage match { + case TRANSIENT ⇒ Transient + case store ⇒ + val storage = store match { + case TRANSACTION_LOG ⇒ TransactionLog + case DATA_GRID ⇒ DataGrid + } + val strategy = if (protocol.hasReplicationStrategy) { + protocol.getReplicationStrategy match { + case WRITE_THROUGH ⇒ WriteThrough + case WRITE_BEHIND ⇒ WriteBehind + } + } else throw new IllegalActorStateException( + "Expected replication strategy for replication storage [" + storage + "]") + Replication(storage, strategy) } } else Transient @@ -172,7 +197,7 @@ object ActorSerialization { supervisor, hotswap, factory, - replicationStrategy) + replicationScheme) val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]] messages.foreach(message ⇒ ar ! MessageSerializer.deserialize(message.getMessage)) diff --git a/config/akka-reference.conf b/config/akka-reference.conf index ec98b66e83..6eb218bc06 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -55,7 +55,7 @@ akka { clustered { # makes the actor available in the cluster registry # default (if omitted) is local non-clustered actor - home = "node:node1" # defines the hostname, IP-address or node name of the "home" node for clustered actor + home = "node:node1" # hostname, IP-address or node name of the "home" node for clustered actor # available: "host:", "ip:" and "node:" # default is "host:localhost" @@ -64,12 +64,16 @@ akka { # if "auto" is used then 'home' has no meaning # default is '0', meaning no replicas; - stateful { # stateful or stateless? - replication-storage = "transaction-log" # storage model for replication + replication { # use replication or not? + + # FIXME should we have this config option here? If so, implement it all through. + serialize-mailbox = on # should the actor mailbox be part of the serialized snapshot? + + storage = "transaction-log" # storage model for replication # available: "transaction-log" and "data-grid" # default is "transaction-log" - replication-strategy = "write-through" # guaranteees for replication + strategy = "write-through" # guaranteees for replication # available: "write-through" and "write-behind" # default is "write-through"