Added storage models to remote protocol and refactored all clustering to use it.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-06-10 16:31:24 +01:00
parent 7dbc5ac958
commit 8098a8d9be
12 changed files with 404 additions and 197 deletions

View file

@ -7,7 +7,7 @@ package akka.serialization
import akka.config.Supervision._
import akka.actor.{ uuidFrom, newUuid }
import akka.actor._
import DeploymentConfig.{ ReplicationStrategy, Transient, WriteThrough, WriteBehind }
import DeploymentConfig._
import akka.dispatch.MessageInvocation
import akka.util.ReflectiveAccess
import akka.remote.{ RemoteClientSettings, MessageSerializer }
@ -35,8 +35,8 @@ object ActorSerialization {
def toBinary[T <: Actor](
a: ActorRef,
serializeMailBox: Boolean = true,
replicationStrategy: ReplicationStrategy = Transient)(implicit format: Serializer): Array[Byte] =
toSerializedActorRefProtocol(a, format, serializeMailBox, replicationStrategy).toByteArray
replicationScheme: ReplicationScheme = Transient)(implicit format: Serializer): Array[Byte] =
toSerializedActorRefProtocol(a, format, serializeMailBox, replicationScheme).toByteArray
// wrapper for implicits to be used by Java
def fromBinaryJ[T <: Actor](bytes: Array[Byte], format: Serializer): ActorRef =
@ -47,14 +47,15 @@ object ActorSerialization {
a: ActorRef,
format: Serializer,
srlMailBox: Boolean,
replicationStrategy: ReplicationStrategy): Array[Byte] =
toBinary(a, srlMailBox, replicationStrategy)(format)
replicationScheme: ReplicationScheme): Array[Byte] =
toBinary(a, srlMailBox, replicationScheme)(format)
private[akka] def toSerializedActorRefProtocol[T <: Actor](
actorRef: ActorRef,
format: Serializer,
serializeMailBox: Boolean,
replicationStrategy: ReplicationStrategy): SerializedActorRefProtocol = {
replicationScheme: ReplicationScheme): SerializedActorRefProtocol = {
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
actorRef.lifeCycle match {
case Permanent Some(LifeCycleProtocol.newBuilder.setLifeCycle(LifeCycleType.PERMANENT).build)
@ -63,18 +64,29 @@ object ActorSerialization {
}
}
val replicationStrategyType = replicationStrategy match {
case WriteBehind ReplicationStrategyType.WRITE_BEHIND
case WriteThrough ReplicationStrategyType.WRITE_THROUGH
case Transient ReplicationStrategyType.TRANSIENT
}
val builder = SerializedActorRefProtocol.newBuilder
.setUuid(UuidProtocol.newBuilder.setHigh(actorRef.uuid.getTime).setLow(actorRef.uuid.getClockSeqAndNode).build)
.setAddress(actorRef.address)
.setActorClassname(actorRef.actorInstance.get.getClass.getName)
.setTimeout(actorRef.timeout)
.setReplicationStrategy(replicationStrategyType)
replicationScheme match {
case _: Transient | Transient
builder.setReplicationStorage(ReplicationStorageType.TRANSIENT)
case Replication(storage, strategy)
val storageType = storage match {
case _: TransactionLog | TransactionLog ReplicationStorageType.TRANSACTION_LOG
case _: DataGrid | DataGrid ReplicationStorageType.DATA_GRID
}
builder.setReplicationStorage(storageType)
val strategyType = strategy match {
case _: WriteBehind | WriteBehind ReplicationStrategyType.WRITE_BEHIND
case _: WriteThrough | WriteThrough ReplicationStrategyType.WRITE_THROUGH
}
builder.setReplicationStrategy(strategyType)
}
if (serializeMailBox == true) {
if (actorRef.mailbox eq null) throw new IllegalActorStateException("Can't serialize an actor that has not been started.")
@ -133,13 +145,26 @@ object ActorSerialization {
if (protocol.hasSupervisor) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
else None
import ReplicationStorageType._
import ReplicationStrategyType._
val replicationStrategy =
if (protocol.hasReplicationStrategy) {
protocol.getReplicationStrategy match {
case TRANSIENT Transient
case WRITE_THROUGH WriteThrough
case WRITE_BEHIND WriteBehind
val replicationScheme =
if (protocol.hasReplicationStorage) {
protocol.getReplicationStorage match {
case TRANSIENT Transient
case store
val storage = store match {
case TRANSACTION_LOG TransactionLog
case DATA_GRID DataGrid
}
val strategy = if (protocol.hasReplicationStrategy) {
protocol.getReplicationStrategy match {
case WRITE_THROUGH WriteThrough
case WRITE_BEHIND WriteBehind
}
} else throw new IllegalActorStateException(
"Expected replication strategy for replication storage [" + storage + "]")
Replication(storage, strategy)
}
} else Transient
@ -172,7 +197,7 @@ object ActorSerialization {
supervisor,
hotswap,
factory,
replicationStrategy)
replicationScheme)
val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]]
messages.foreach(message ar ! MessageSerializer.deserialize(message.getMessage))