Fixed remaining issues in pluggable serializers (cluster impl)

This commit is contained in:
Jonas Bonér 2011-06-14 19:35:18 +02:00
parent e0e96960b8
commit bf0515b8e5
14 changed files with 122 additions and 157 deletions

View file

@ -12,6 +12,7 @@ import org.I0Itec.zkclient._
import org.I0Itec.zkclient.serialize._
import org.I0Itec.zkclient.exception._
import java.util.{ List JList }
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference, AtomicInteger }
import java.util.concurrent.{ ConcurrentSkipListSet, CopyOnWriteArrayList, Callable, ConcurrentHashMap }
import java.net.InetSocketAddress
@ -26,18 +27,22 @@ import RemoteDaemonMessageType._
import akka.util._
import Helpers._
import akka.actor._
import Actor._
import Status._
import DeploymentConfig.{ ReplicationScheme, ReplicationStrategy, Transient, WriteThrough, WriteBehind }
import akka.event.EventHandler
import akka.dispatch.{ Dispatchers, Future }
import akka.remoteinterface._
import akka.routing.RouterType
import akka.config.{ Config, Supervision }
import Supervision._
import Config._
import akka.serialization.{ Format, Serializer, Compression }
import akka.serialization.{ Serialization, Serializer, Compression }
import Compression.LZF
import akka.AkkaException
@ -47,7 +52,6 @@ import akka.cluster.ChangeListener._
import com.eaio.uuid.UUID
import com.google.protobuf.ByteString
import java.util.{ List JList }
// FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down
// FIXME Provisioning data in ZK (file names etc) and files in S3 and on disk
@ -612,8 +616,8 @@ class DefaultClusterNode private[akka] (
"Storing actor [%s] with UUID [%s] in cluster".format(actorRef.address, uuid))
val actorBytes =
if (shouldCompressData) LZF.compress(toBinary(actorRef, serializeMailbox, replicationScheme)(format))
else toBinary(actorRef, serializeMailbox, replicationScheme)(format)
if (shouldCompressData) LZF.compress(toBinary(actorRef, serializeMailbox, replicationScheme))
else toBinary(actorRef, serializeMailbox, replicationScheme)
val actorRegistryPath = actorRegistryPathFor(uuid)
@ -771,7 +775,7 @@ class DefaultClusterNode private[akka] (
}) match {
case Left(bytes)
locallyCheckedOutActors += (uuid -> bytes)
val actor = fromBinary[T](bytes, remoteServerAddress)(format)
val actor = fromBinary[T](bytes, remoteServerAddress)
EventHandler.debug(this,
"Checking out actor [%s] to be used on node [%s] as local actor"
.format(actor, nodeAddress.nodeName))
@ -1040,11 +1044,15 @@ class DefaultClusterNode private[akka] (
* Send a function 'Function0[Unit]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument).
*/
def send(f: Function0[Unit], replicationFactor: Int) {
val message = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN0_UNIT)
.setPayload(ByteString.copyFrom(Serializers.Java.toBinary(f)))
.build
replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message)
Serialization.serialize(f) match {
case Left(error) throw error
case Right(bytes)
val message = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN0_UNIT)
.setPayload(ByteString.copyFrom(bytes))
.build
replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message)
}
}
/**
@ -1052,12 +1060,16 @@ class DefaultClusterNode private[akka] (
* Returns an 'Array' with all the 'Future's from the computation.
*/
def send(f: Function0[Any], replicationFactor: Int): List[Future[Any]] = {
val message = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN0_ANY)
.setPayload(ByteString.copyFrom(Serializers.Java.toBinary(f)))
.build
val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ ? message)
results.toList.asInstanceOf[List[Future[Any]]]
Serialization.serialize(f) match {
case Left(error) throw error
case Right(bytes)
val message = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN0_ANY)
.setPayload(ByteString.copyFrom(bytes))
.build
val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ ? message)
results.toList.asInstanceOf[List[Future[Any]]]
}
}
/**
@ -1065,11 +1077,15 @@ class DefaultClusterNode private[akka] (
* with the argument speficied.
*/
def send(f: Function1[Any, Unit], arg: Any, replicationFactor: Int) {
val message = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN1_ARG_UNIT)
.setPayload(ByteString.copyFrom(Serializers.Java.toBinary((f, arg))))
.build
replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message)
Serialization.serialize((f, arg)) match {
case Left(error) throw error
case Right(bytes)
val message = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN1_ARG_UNIT)
.setPayload(ByteString.copyFrom(bytes))
.build
replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message)
}
}
/**
@ -1078,12 +1094,16 @@ class DefaultClusterNode private[akka] (
* Returns an 'Array' with all the 'Future's from the computation.
*/
def send(f: Function1[Any, Any], arg: Any, replicationFactor: Int): List[Future[Any]] = {
val message = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN1_ARG_ANY)
.setPayload(ByteString.copyFrom(Serializers.Java.toBinary((f, arg))))
.build
val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ ? message)
results.toList.asInstanceOf[List[Future[Any]]]
Serialization.serialize((f, arg)) match {
case Left(error) throw error
case Right(bytes)
val message = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN1_ARG_ANY)
.setPayload(ByteString.copyFrom(bytes))
.build
val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ ? message)
results.toList.asInstanceOf[List[Future[Any]]]
}
}
// =======================================
@ -1312,15 +1332,19 @@ class DefaultClusterNode private[akka] (
// notify all available nodes that they should fail-over all connections from 'from' to 'to'
val from = nodeNameToAddress.get(failedNodeName)
val to = remoteServerAddress
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FAIL_OVER_CONNECTIONS)
.setPayload(ByteString.copyFrom(Serializers.Java.toBinary((from, to))))
.build
membershipNodes foreach { node
replicaConnections.get(node) foreach {
case (_, connection)
connection ! command
}
Serialization.serialize((from, to)) match {
case Left(error) throw error
case Right(bytes)
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FAIL_OVER_CONNECTIONS)
.setPayload(ByteString.copyFrom(bytes))
.build
membershipNodes foreach { node
replicaConnections.get(node) foreach {
case (_, connection)
connection ! command
}
}
}
}
}
@ -1673,6 +1697,6 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
}
private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = {
Serializers.Java.fromBinary(message.getPayload.toByteArray, Some(clazz)).asInstanceOf[T]
Serialization.serialize(message.getPayload.toByteArray, Some(clazz)).asInstanceOf[T]
}
}