Removing futures from the remoting
This commit is contained in:
parent
1b730b5c82
commit
3e3cf86bdf
7 changed files with 30 additions and 509 deletions
|
|
@ -321,16 +321,10 @@ trait RemoteServerModule extends RemoteModule { this: RemoteSupport ⇒
|
|||
trait RemoteClientModule extends RemoteModule { self: RemoteSupport ⇒
|
||||
|
||||
def actorFor(address: String, hostname: String, port: Int): ActorRef =
|
||||
actorFor(address, app.AkkaConfig.ActorTimeoutMillis, hostname, port, None)
|
||||
actorFor(address, hostname, port, None)
|
||||
|
||||
def actorFor(address: String, hostname: String, port: Int, loader: ClassLoader): ActorRef =
|
||||
actorFor(address, app.AkkaConfig.ActorTimeoutMillis, hostname, port, Some(loader))
|
||||
|
||||
def actorFor(address: String, timeout: Long, hostname: String, port: Int): ActorRef =
|
||||
actorFor(address, timeout, hostname, port, None)
|
||||
|
||||
def actorFor(address: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef =
|
||||
actorFor(address, timeout, hostname, port, Some(loader))
|
||||
actorFor(address, hostname, port, Some(loader))
|
||||
|
||||
/**
|
||||
* Clean-up all open connections.
|
||||
|
|
@ -349,13 +343,11 @@ trait RemoteClientModule extends RemoteModule { self: RemoteSupport ⇒
|
|||
|
||||
/** Methods that needs to be implemented by a transport **/
|
||||
|
||||
protected[akka] def actorFor(address: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef
|
||||
protected[akka] def actorFor(address: String, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef
|
||||
|
||||
protected[akka] def send[T](message: Any,
|
||||
senderOption: Option[ActorRef],
|
||||
senderFuture: Option[Promise[T]],
|
||||
remoteAddress: InetSocketAddress,
|
||||
isOneWay: Boolean,
|
||||
actorRef: ActorRef,
|
||||
loader: Option[ClassLoader]): Option[Promise[T]]
|
||||
loader: Option[ClassLoader]): Unit
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ import akka.util._
|
|||
import akka.util.duration._
|
||||
import akka.util.Helpers._
|
||||
import akka.actor.DeploymentConfig._
|
||||
import akka.serialization.{ Serialization, Serializer, ActorSerialization, Compression }
|
||||
import akka.serialization.{ Serialization, Serializer, Compression }
|
||||
import akka.serialization.Compression.LZF
|
||||
import akka.remote.RemoteProtocol._
|
||||
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ import akka.dispatch._
|
|||
import akka.util.duration._
|
||||
import akka.config.ConfigurationException
|
||||
import akka.event.{ DeathWatch, EventHandler }
|
||||
import akka.serialization.{ Serialization, Serializer, ActorSerialization, Compression }
|
||||
import akka.serialization.{ Serialization, Serializer, Compression }
|
||||
import akka.serialization.Compression.LZF
|
||||
import akka.remote.RemoteProtocol._
|
||||
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
|
||||
|
|
@ -247,7 +247,7 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
protected[akka] def sendSystemMessage(message: SystemMessage): Unit = unsupported
|
||||
|
||||
def postMessageToMailbox(message: Any, sender: ActorRef) {
|
||||
remote.send[Any](message, Some(sender), None, remoteAddress, true, this, loader)
|
||||
remote.send[Any](message, Some(sender), remoteAddress, this, loader)
|
||||
}
|
||||
|
||||
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = remote.app.provider.ask(message, this, timeout)
|
||||
|
|
@ -260,7 +260,7 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
synchronized {
|
||||
if (running) {
|
||||
running = false
|
||||
postMessageToMailbox(new Terminate(), remote.app.deadLetters)
|
||||
remote.send[Any](new Terminate(), None, remoteAddress, this, loader)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,7 +21,6 @@ class RemoteClientSettings(val app: AkkaApplication) {
|
|||
val RECONNECTION_TIME_WINDOW = Duration(config.getInt("akka.remote.client.reconnection-time-window", 600), DefaultTimeUnit).toMillis
|
||||
val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 3600), DefaultTimeUnit)
|
||||
val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), DefaultTimeUnit)
|
||||
val REAP_FUTURES_DELAY = Duration(config.getInt("akka.remote.client.reap-futures-delay", 5), DefaultTimeUnit)
|
||||
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size", 1048576)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -57,14 +57,10 @@ trait NettyRemoteClientModule extends RemoteClientModule {
|
|||
|
||||
protected[akka] def send[T](message: Any,
|
||||
senderOption: Option[ActorRef],
|
||||
senderFuture: Option[Promise[T]],
|
||||
remoteAddress: InetSocketAddress,
|
||||
isOneWay: Boolean,
|
||||
actorRef: ActorRef,
|
||||
loader: Option[ClassLoader]): Option[Promise[T]] =
|
||||
withClientFor(remoteAddress, loader) { client ⇒
|
||||
client.send[T](message, senderOption, senderFuture, remoteAddress, isOneWay, actorRef)
|
||||
}
|
||||
loader: Option[ClassLoader]): Unit =
|
||||
withClientFor(remoteAddress, loader) { _.send[T](message, senderOption, remoteAddress, actorRef) }
|
||||
|
||||
private[akka] def withClientFor[T](
|
||||
address: InetSocketAddress, loader: Option[ClassLoader])(body: RemoteClient ⇒ T): T = {
|
||||
|
|
@ -125,9 +121,7 @@ trait NettyRemoteClientModule extends RemoteClientModule {
|
|||
}
|
||||
|
||||
def shutdownRemoteClients() = lock withWriteGuard {
|
||||
remoteClients.foreach({
|
||||
case (addr, client) ⇒ client.shutdown()
|
||||
})
|
||||
remoteClients foreach { case (_, client) ⇒ client.shutdown() }
|
||||
remoteClients.clear()
|
||||
}
|
||||
}
|
||||
|
|
@ -149,8 +143,6 @@ abstract class RemoteClient private[akka] (
|
|||
|
||||
val serialization = new RemoteActorSerialization(remoteSupport)
|
||||
|
||||
protected val futures = new ConcurrentHashMap[Uuid, Promise[_]]
|
||||
|
||||
private[remote] val runSwitch = new Switch()
|
||||
|
||||
private[remote] def isRunning = runSwitch.isOn
|
||||
|
|
@ -166,74 +158,28 @@ abstract class RemoteClient private[akka] (
|
|||
/**
|
||||
* Converts the message to the wireprotocol and sends the message across the wire
|
||||
*/
|
||||
def send[T](
|
||||
message: Any,
|
||||
senderOption: Option[ActorRef],
|
||||
senderFuture: Option[Promise[T]],
|
||||
remoteAddress: InetSocketAddress,
|
||||
isOneWay: Boolean,
|
||||
actorRef: ActorRef): Option[Promise[T]] = {
|
||||
val messageProtocol = serialization.createRemoteMessageProtocolBuilder(
|
||||
Some(actorRef), Left(actorRef.uuid), actorRef.address, app.AkkaConfig.ActorTimeoutMillis, Right(message), isOneWay, senderOption).build
|
||||
send(messageProtocol, senderFuture)
|
||||
def send[T](message: Any, senderOption: Option[ActorRef], remoteAddress: InetSocketAddress, actorRef: ActorRef) {
|
||||
val messageProtocol = serialization.createRemoteMessageProtocolBuilder(Some(actorRef), Left(actorRef.uuid), actorRef.address, app.AkkaConfig.ActorTimeoutMillis, Right(message), senderOption).build
|
||||
send(messageProtocol)
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends the message across the wire
|
||||
*/
|
||||
def send[T](
|
||||
request: RemoteMessageProtocol,
|
||||
senderFuture: Option[Promise[T]]): Option[Promise[T]] = {
|
||||
|
||||
if (isRunning) {
|
||||
def send[T](request: RemoteMessageProtocol) {
|
||||
if (isRunning) { //TODO FIXME RACY
|
||||
app.eventHandler.debug(this, "Sending to connection [%s] message [\n%s]".format(remoteAddress, request))
|
||||
|
||||
// tell
|
||||
if (request.getOneWay) {
|
||||
try {
|
||||
val future = currentChannel.write(RemoteEncoder.encode(request))
|
||||
future.awaitUninterruptibly()
|
||||
future.awaitUninterruptibly() //TODO FIXME SWITCH TO NONBLOCKING WRITE
|
||||
if (!future.isCancelled && !future.isSuccess) {
|
||||
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒ notifyListeners(RemoteClientError(e, module, remoteAddress))
|
||||
}
|
||||
None
|
||||
|
||||
// ask
|
||||
} else {
|
||||
val futureResult =
|
||||
if (senderFuture.isDefined) senderFuture.get
|
||||
else new DefaultPromise[T](request.getActorInfo.getTimeout)(app.dispatcher)
|
||||
|
||||
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
|
||||
futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails
|
||||
|
||||
def handleRequestReplyError(future: ChannelFuture) = {
|
||||
val f = futures.remove(futureUuid) // Clean up future
|
||||
if (f ne null) f.completeWithException(future.getCause)
|
||||
}
|
||||
|
||||
var future: ChannelFuture = null
|
||||
try {
|
||||
// try to send the original one
|
||||
future = currentChannel.write(RemoteEncoder.encode(request))
|
||||
future.awaitUninterruptibly()
|
||||
|
||||
if (future.isCancelled || !future.isSuccess) {
|
||||
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
|
||||
handleRequestReplyError(future)
|
||||
}
|
||||
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
notifyListeners(RemoteClientWriteFailed(request, e, module, remoteAddress))
|
||||
handleRequestReplyError(future)
|
||||
}
|
||||
Some(futureResult)
|
||||
}
|
||||
|
||||
} else {
|
||||
val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", module, remoteAddress)
|
||||
notifyListeners(RemoteClientError(exception, module, remoteAddress))
|
||||
|
|
@ -314,7 +260,7 @@ class ActiveRemoteClient private[akka] (
|
|||
timer = new HashedWheelTimer
|
||||
|
||||
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))
|
||||
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(app, settings, name, futures, bootstrap, remoteAddress, timer, this))
|
||||
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(app, settings, name, bootstrap, remoteAddress, timer, this))
|
||||
bootstrap.setOption("tcpNoDelay", true)
|
||||
bootstrap.setOption("keepAlive", true)
|
||||
|
||||
|
|
@ -329,23 +275,8 @@ class ActiveRemoteClient private[akka] (
|
|||
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
|
||||
app.eventHandler.error(connection.getCause, this, "Remote client connection to [%s] has failed".format(remoteAddress))
|
||||
false
|
||||
|
||||
} else {
|
||||
sendSecureCookie(connection)
|
||||
|
||||
//Add a task that does GCing of expired Futures
|
||||
timer.newTimeout(new TimerTask() {
|
||||
def run(timeout: Timeout) = {
|
||||
if (isRunning) {
|
||||
val i = futures.entrySet.iterator
|
||||
while (i.hasNext) {
|
||||
val e = i.next
|
||||
if (e.getValue.isExpired)
|
||||
futures.remove(e.getKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
}, REAP_FUTURES_DELAY.length, REAP_FUTURES_DELAY.unit)
|
||||
notifyListeners(RemoteClientStarted(module, remoteAddress))
|
||||
true
|
||||
}
|
||||
|
|
@ -400,7 +331,6 @@ class ActiveRemoteClientPipelineFactory(
|
|||
app: AkkaApplication,
|
||||
val settings: RemoteClientSettings,
|
||||
name: String,
|
||||
futures: ConcurrentMap[Uuid, Promise[_]],
|
||||
bootstrap: ClientBootstrap,
|
||||
remoteAddress: InetSocketAddress,
|
||||
timer: HashedWheelTimer,
|
||||
|
|
@ -414,7 +344,7 @@ class ActiveRemoteClientPipelineFactory(
|
|||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
val remoteClient = new ActiveRemoteClientHandler(app, settings, name, futures, bootstrap, remoteAddress, timer, client)
|
||||
val remoteClient = new ActiveRemoteClientHandler(app, settings, name, bootstrap, remoteAddress, timer, client)
|
||||
|
||||
new StaticChannelPipeline(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient)
|
||||
}
|
||||
|
|
@ -428,7 +358,6 @@ class ActiveRemoteClientHandler(
|
|||
val app: AkkaApplication,
|
||||
val settings: RemoteClientSettings,
|
||||
val name: String,
|
||||
val futures: ConcurrentMap[Uuid, Promise[_]],
|
||||
val bootstrap: ClientBootstrap,
|
||||
val remoteAddress: InetSocketAddress,
|
||||
val timer: HashedWheelTimer,
|
||||
|
|
@ -453,20 +382,7 @@ class ActiveRemoteClientHandler(
|
|||
val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
|
||||
app.eventHandler.debug(this, "Remote client received RemoteMessageProtocol[\n%s]\nTrying to map back to future [%s]".format(reply, replyUuid))
|
||||
|
||||
futures.remove(replyUuid).asInstanceOf[Promise[Any]] match {
|
||||
case null ⇒
|
||||
client.notifyListeners(RemoteClientError(
|
||||
new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist"), client.module,
|
||||
client.remoteAddress))
|
||||
|
||||
case future ⇒
|
||||
if (reply.hasMessage) {
|
||||
val message = MessageSerializer.deserialize(app, reply.getMessage)
|
||||
future.completeWithResult(message)
|
||||
} else {
|
||||
future.completeWithException(parseException(reply, client.loader))
|
||||
}
|
||||
}
|
||||
//TODO FIXME DOESN'T DO ANYTHING ANYMORE
|
||||
|
||||
case other ⇒
|
||||
throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress)
|
||||
|
|
@ -556,12 +472,7 @@ class NettyRemoteSupport(_app: AkkaApplication) extends RemoteSupport(_app) with
|
|||
|
||||
def optimizeLocalScoped_?() = optimizeLocal.get
|
||||
|
||||
protected[akka] def actorFor(
|
||||
actorAddress: String,
|
||||
timeout: Long,
|
||||
host: String,
|
||||
port: Int,
|
||||
loader: Option[ClassLoader]): ActorRef = {
|
||||
protected[akka] def actorFor(actorAddress: String, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = {
|
||||
|
||||
val homeInetSocketAddress = this.address
|
||||
if (optimizeLocalScoped_?) {
|
||||
|
|
@ -1023,14 +934,7 @@ class RemoteServerHandler(
|
|||
|
||||
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol = {
|
||||
val actorInfo = request.getActorInfo
|
||||
val messageBuilder = serialization.createRemoteMessageProtocolBuilder(
|
||||
None,
|
||||
Right(request.getUuid),
|
||||
actorInfo.getAddress,
|
||||
actorInfo.getTimeout,
|
||||
Left(exception),
|
||||
true,
|
||||
None)
|
||||
val messageBuilder = serialization.createRemoteMessageProtocolBuilder(None, Right(request.getUuid), actorInfo.getAddress, actorInfo.getTimeout, Left(exception), None)
|
||||
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
RemoteEncoder.encode(messageBuilder.build)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,205 +22,6 @@ import com.google.protobuf.ByteString
|
|||
|
||||
import com.eaio.uuid.UUID
|
||||
|
||||
/**
|
||||
* Module for local actor serialization.
|
||||
*/
|
||||
class ActorSerialization(val app: AkkaApplication, remote: RemoteSupport) {
|
||||
implicit val defaultSerializer = akka.serialization.JavaSerializer // Format.Default
|
||||
|
||||
val remoteActorSerialization = new RemoteActorSerialization(remote)
|
||||
|
||||
def fromBinary[T <: Actor](bytes: Array[Byte], homeAddress: InetSocketAddress): ActorRef =
|
||||
fromBinaryToLocalActorRef(bytes, None, Some(homeAddress))
|
||||
|
||||
def fromBinary[T <: Actor](bytes: Array[Byte], uuid: UUID): ActorRef =
|
||||
fromBinaryToLocalActorRef(bytes, Some(uuid), None)
|
||||
|
||||
def fromBinary[T <: Actor](bytes: Array[Byte]): ActorRef =
|
||||
fromBinaryToLocalActorRef(bytes, None, None)
|
||||
|
||||
def toBinary[T <: Actor](
|
||||
a: ActorRef,
|
||||
serializeMailBox: Boolean = true,
|
||||
replicationScheme: ReplicationScheme = Transient): Array[Byte] =
|
||||
toSerializedActorRefProtocol(a, serializeMailBox, replicationScheme).toByteArray
|
||||
|
||||
// wrapper for implicits to be used by Java
|
||||
def fromBinaryJ[T <: Actor](bytes: Array[Byte]): ActorRef =
|
||||
fromBinary(bytes)
|
||||
|
||||
// wrapper for implicits to be used by Java
|
||||
def toBinaryJ[T <: Actor](
|
||||
a: ActorRef,
|
||||
srlMailBox: Boolean,
|
||||
replicationScheme: ReplicationScheme): Array[Byte] =
|
||||
toBinary(a, srlMailBox, replicationScheme)
|
||||
|
||||
@deprecated("BROKEN, REMOVE ME", "NOW")
|
||||
private[akka] def toSerializedActorRefProtocol[T <: Actor](
|
||||
actorRef: ActorRef,
|
||||
serializeMailBox: Boolean,
|
||||
replicationScheme: ReplicationScheme): SerializedActorRefProtocol = {
|
||||
|
||||
val localRef: Option[LocalActorRef] = actorRef match {
|
||||
case l: LocalActorRef ⇒ Some(l)
|
||||
case _ ⇒ None
|
||||
}
|
||||
|
||||
val builder = SerializedActorRefProtocol.newBuilder
|
||||
.setUuid(UuidProtocol.newBuilder.setHigh(actorRef.uuid.getTime).setLow(actorRef.uuid.getClockSeqAndNode).build)
|
||||
.setAddress(actorRef.address)
|
||||
.setTimeout(app.AkkaConfig.ActorTimeoutMillis)
|
||||
|
||||
replicationScheme match {
|
||||
case _: Transient | Transient ⇒
|
||||
builder.setReplicationStorage(ReplicationStorageType.TRANSIENT)
|
||||
|
||||
case Replication(storage, strategy) ⇒
|
||||
val storageType = storage match {
|
||||
case _: TransactionLog | TransactionLog ⇒ ReplicationStorageType.TRANSACTION_LOG
|
||||
case _: DataGrid | DataGrid ⇒ ReplicationStorageType.DATA_GRID
|
||||
}
|
||||
builder.setReplicationStorage(storageType)
|
||||
|
||||
val strategyType = strategy match {
|
||||
case _: WriteBehind ⇒ ReplicationStrategyType.WRITE_BEHIND
|
||||
case _: WriteThrough ⇒ ReplicationStrategyType.WRITE_THROUGH
|
||||
}
|
||||
builder.setReplicationStrategy(strategyType)
|
||||
}
|
||||
|
||||
localRef foreach { l ⇒
|
||||
if (serializeMailBox) {
|
||||
l.underlying.mailbox match {
|
||||
case null ⇒ throw new IllegalActorStateException("Can't serialize an actor that has not been started.")
|
||||
case q: java.util.Queue[_] ⇒
|
||||
val l = new scala.collection.mutable.ListBuffer[Envelope]
|
||||
val it = q.iterator
|
||||
while (it.hasNext) l += it.next.asInstanceOf[Envelope]
|
||||
|
||||
l map { m ⇒
|
||||
remoteActorSerialization.createRemoteMessageProtocolBuilder(
|
||||
localRef,
|
||||
Left(actorRef.uuid),
|
||||
actorRef.address,
|
||||
app.AkkaConfig.ActorTimeoutMillis,
|
||||
Right(m.message),
|
||||
false,
|
||||
m.sender match {
|
||||
case a: ActorRef ⇒ Some(a)
|
||||
case _ ⇒ None
|
||||
})
|
||||
} foreach {
|
||||
builder.addMessages(_)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
l.underlying.receiveTimeout.foreach(builder.setReceiveTimeout(_))
|
||||
val actorInstance = l.underlyingActorInstance
|
||||
app.serialization.serialize(actorInstance.asInstanceOf[T]) match {
|
||||
case Right(bytes) ⇒ builder.setActorInstance(ByteString.copyFrom(bytes))
|
||||
case Left(exception) ⇒ throw new Exception("Error serializing : " + actorInstance.getClass.getName)
|
||||
}
|
||||
val stack = l.underlying.hotswap
|
||||
if (!stack.isEmpty)
|
||||
builder.setHotswapStack(ByteString.copyFrom(akka.serialization.JavaSerializer.toBinary(stack)))
|
||||
}
|
||||
|
||||
builder.build
|
||||
}
|
||||
|
||||
private def fromBinaryToLocalActorRef[T <: Actor](
|
||||
bytes: Array[Byte],
|
||||
uuid: Option[UUID],
|
||||
homeAddress: Option[InetSocketAddress]): ActorRef = {
|
||||
val builder = SerializedActorRefProtocol.newBuilder.mergeFrom(bytes)
|
||||
fromProtobufToLocalActorRef(builder.build, uuid, None)
|
||||
}
|
||||
|
||||
private[akka] def fromProtobufToLocalActorRef[T <: Actor](
|
||||
protocol: SerializedActorRefProtocol,
|
||||
overriddenUuid: Option[UUID],
|
||||
loader: Option[ClassLoader]): ActorRef = {
|
||||
|
||||
app.eventHandler.debug(this, "Deserializing SerializedActorRefProtocol to LocalActorRef:\n%s".format(protocol))
|
||||
|
||||
// import ReplicationStorageType._
|
||||
// import ReplicationStrategyType._
|
||||
// val replicationScheme =
|
||||
// if (protocol.hasReplicationStorage) {
|
||||
// protocol.getReplicationStorage match {
|
||||
// case TRANSIENT ⇒ Transient
|
||||
// case store ⇒
|
||||
// val storage = store match {
|
||||
// case TRANSACTION_LOG ⇒ TransactionLog
|
||||
// case DATA_GRID ⇒ DataGrid
|
||||
// }
|
||||
// val strategy = if (protocol.hasReplicationStrategy) {
|
||||
// protocol.getReplicationStrategy match {
|
||||
// case WRITE_THROUGH ⇒ WriteThrough
|
||||
// case WRITE_BEHIND ⇒ WriteBehind
|
||||
// }
|
||||
// } else throw new IllegalActorStateException(
|
||||
// "Expected replication strategy for replication storage [" + storage + "]")
|
||||
// Replication(storage, strategy)
|
||||
// }
|
||||
// } else Transient
|
||||
|
||||
val storedHotswap =
|
||||
try {
|
||||
app.serialization.deserialize(
|
||||
protocol.getHotswapStack.toByteArray,
|
||||
classOf[Stack[PartialFunction[Any, Unit]]],
|
||||
loader) match {
|
||||
case Right(r) ⇒ r.asInstanceOf[Stack[PartialFunction[Any, Unit]]]
|
||||
case Left(ex) ⇒ throw new Exception("Cannot de-serialize hotswapstack")
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒ Stack[PartialFunction[Any, Unit]]()
|
||||
}
|
||||
|
||||
val storedSupervisor =
|
||||
if (protocol.hasSupervisor) Some(remoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
|
||||
else None
|
||||
|
||||
val classLoader = loader.getOrElse(this.getClass.getClassLoader)
|
||||
val bytes = protocol.getActorInstance.toByteArray
|
||||
val actorClass = classLoader.loadClass(protocol.getActorClassname)
|
||||
val factory = () ⇒ {
|
||||
app.serialization.deserialize(bytes, actorClass, loader) match {
|
||||
case Right(r) ⇒ r.asInstanceOf[Actor]
|
||||
case Left(ex) ⇒ throw new Exception("Cannot de-serialize : " + actorClass)
|
||||
}
|
||||
}
|
||||
|
||||
val actorUuid = overriddenUuid match {
|
||||
case Some(uuid) ⇒ uuid
|
||||
case None ⇒ uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow)
|
||||
}
|
||||
|
||||
val props = Props(creator = factory,
|
||||
timeout = if (protocol.hasTimeout) protocol.getTimeout else app.AkkaConfig.ActorTimeout //TODO what dispatcher should it use?
|
||||
//TODO what faultHandler should it use?
|
||||
)
|
||||
|
||||
val receiveTimeout = if (protocol.hasReceiveTimeout) Some(protocol.getReceiveTimeout) else None //TODO FIXME, I'm expensive and slow
|
||||
|
||||
// FIXME: what to do if storedSupervisor is empty?
|
||||
val ar = new LocalActorRef(app, props, storedSupervisor getOrElse app.guardian, protocol.getAddress, false, actorUuid, receiveTimeout, storedHotswap)
|
||||
|
||||
//Deserialize messages
|
||||
{
|
||||
val iterator = protocol.getMessagesList.iterator()
|
||||
while (iterator.hasNext())
|
||||
ar ! MessageSerializer.deserialize(app, iterator.next().getMessage, Some(classLoader)) //TODO This is broken, why aren't we preserving the sender?
|
||||
}
|
||||
|
||||
ar
|
||||
}
|
||||
}
|
||||
|
||||
class RemoteActorSerialization(remote: RemoteSupport) {
|
||||
|
||||
/**
|
||||
|
|
@ -281,7 +82,6 @@ class RemoteActorSerialization(remote: RemoteSupport) {
|
|||
actorAddress: String,
|
||||
timeout: Long,
|
||||
message: Either[Throwable, Any],
|
||||
isOneWay: Boolean,
|
||||
senderOption: Option[ActorRef]): RemoteMessageProtocol.Builder = {
|
||||
|
||||
val uuidProtocol = replyUuid match {
|
||||
|
|
@ -301,7 +101,7 @@ class RemoteActorSerialization(remote: RemoteSupport) {
|
|||
UuidProtocol.newBuilder.setHigh(messageUuid.getTime).setLow(messageUuid.getClockSeqAndNode).build
|
||||
})
|
||||
.setActorInfo(actorInfo)
|
||||
.setOneWay(isOneWay)
|
||||
.setOneWay(true)
|
||||
|
||||
message match {
|
||||
case Right(message) ⇒
|
||||
|
|
@ -309,26 +109,10 @@ class RemoteActorSerialization(remote: RemoteSupport) {
|
|||
case Left(exception) ⇒
|
||||
messageBuilder.setException(ExceptionProtocol.newBuilder
|
||||
.setClassname(exception.getClass.getName)
|
||||
.setMessage(empty(exception.getMessage))
|
||||
.setMessage(Option(exception.getMessage).getOrElse(""))
|
||||
.build)
|
||||
}
|
||||
|
||||
def empty(s: String): String = s match {
|
||||
case null ⇒ ""
|
||||
case s ⇒ s
|
||||
}
|
||||
|
||||
/* TODO invent new supervision strategy
|
||||
actorRef.foreach { ref =>
|
||||
ref.registerSupervisorAsRemoteActor.foreach { id =>
|
||||
messageBuilder.setSupervisorUuid(
|
||||
UuidProtocol.newBuilder
|
||||
.setHigh(id.getTime)
|
||||
.setLow(id.getClockSeqAndNode)
|
||||
.build)
|
||||
}
|
||||
} */
|
||||
|
||||
if (senderOption.isDefined)
|
||||
messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get))
|
||||
|
||||
|
|
|
|||
|
|
@ -1,158 +0,0 @@
|
|||
package akka.serialization
|
||||
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import com.google.protobuf.Message
|
||||
import akka.actor._
|
||||
import akka.remote._
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.serialization.SerializeSpec.Person
|
||||
|
||||
case class MyMessage(id: Long, name: String, status: Boolean)
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class ActorSerializeSpec extends AkkaSpec with BeforeAndAfterAll {
|
||||
|
||||
lazy val remote: Remote = {
|
||||
app.provider match {
|
||||
case r: RemoteActorRefProvider ⇒ r.remote
|
||||
case _ ⇒ throw new Exception("Remoting is not enabled")
|
||||
}
|
||||
}
|
||||
|
||||
lazy val serialization = new ActorSerialization(app, remote.server)
|
||||
|
||||
"Serializable actor" must {
|
||||
"must be able to serialize and de-serialize a stateful actor with a given serializer" ignore {
|
||||
|
||||
val actor1 = new LocalActorRef(app, Props[MyJavaSerializableActor], app.guardian, Props.randomAddress, systemService = true)
|
||||
|
||||
(actor1 ? "hello").get must equal("world 1")
|
||||
(actor1 ? "hello").get must equal("world 2")
|
||||
|
||||
val bytes = serialization.toBinary(actor1)
|
||||
val actor2 = serialization.fromBinary(bytes).asInstanceOf[LocalActorRef]
|
||||
(actor2 ? "hello").get must equal("world 3")
|
||||
|
||||
actor2.underlying.receiveTimeout must equal(Some(1000))
|
||||
actor1.stop()
|
||||
actor2.stop()
|
||||
}
|
||||
|
||||
"must be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox" ignore {
|
||||
|
||||
val actor1 = new LocalActorRef(app, Props[MyStatelessActorWithMessagesInMailbox], app.guardian, Props.randomAddress, systemService = true)
|
||||
for (i ← 1 to 10) actor1 ! "hello"
|
||||
|
||||
actor1.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0)
|
||||
val actor2 = serialization.fromBinary(serialization.toBinary(actor1)).asInstanceOf[LocalActorRef]
|
||||
Thread.sleep(1000)
|
||||
actor2.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0)
|
||||
(actor2 ? "hello-reply").get must equal("world")
|
||||
|
||||
val actor3 = serialization.fromBinary(serialization.toBinary(actor1, false)).asInstanceOf[LocalActorRef]
|
||||
Thread.sleep(1000)
|
||||
actor3.underlying.dispatcher.mailboxSize(actor1.underlying) must equal(0)
|
||||
(actor3 ? "hello-reply").get must equal("world")
|
||||
}
|
||||
|
||||
"must be able to serialize and deserialize a PersonActorWithMessagesInMailbox" ignore {
|
||||
|
||||
val p1 = Person("debasish ghosh", 25, SerializeSpec.Address("120", "Monroe Street", "Santa Clara", "95050"))
|
||||
val actor1 = new LocalActorRef(app, Props[PersonActorWithMessagesInMailbox], app.guardian, Props.randomAddress, systemService = true)
|
||||
(actor1 ! p1)
|
||||
(actor1 ! p1)
|
||||
(actor1 ! p1)
|
||||
(actor1 ! p1)
|
||||
(actor1 ! p1)
|
||||
(actor1 ! p1)
|
||||
(actor1 ! p1)
|
||||
(actor1 ! p1)
|
||||
(actor1 ! p1)
|
||||
(actor1 ! p1)
|
||||
actor1.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0)
|
||||
val actor2 = serialization.fromBinary(serialization.toBinary(actor1)).asInstanceOf[LocalActorRef]
|
||||
Thread.sleep(1000)
|
||||
actor2.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0)
|
||||
(actor2 ? "hello-reply").get must equal("hello")
|
||||
|
||||
val actor3 = serialization.fromBinary(serialization.toBinary(actor1, false)).asInstanceOf[LocalActorRef]
|
||||
Thread.sleep(1000)
|
||||
actor3.underlying.dispatcher.mailboxSize(actor1.underlying) must equal(0)
|
||||
(actor3 ? "hello-reply").get must equal("hello")
|
||||
}
|
||||
}
|
||||
|
||||
"serialize protobuf" must {
|
||||
"must serialize" ignore {
|
||||
val msg = MyMessage(123, "debasish ghosh", true)
|
||||
|
||||
val ser = new Serialization(app)
|
||||
|
||||
val b = ser.serialize(ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(bytes) ⇒ bytes
|
||||
}
|
||||
val in = ser.deserialize(b, classOf[ProtobufProtocol.MyMessage], None) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(i) ⇒ i
|
||||
}
|
||||
val m = in.asInstanceOf[ProtobufProtocol.MyMessage]
|
||||
MyMessage(m.getId, m.getName, m.getStatus) must equal(msg)
|
||||
}
|
||||
}
|
||||
|
||||
"serialize actor that accepts protobuf message" ignore {
|
||||
"must serialize" ignore {
|
||||
|
||||
val actor1 = new LocalActorRef(app, Props[MyActorWithProtobufMessagesInMailbox], app.guardian, Props.randomAddress, systemService = true)
|
||||
val msg = MyMessage(123, "debasish ghosh", true)
|
||||
val b = ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build
|
||||
for (i ← 1 to 10) actor1 ! b
|
||||
actor1.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0)
|
||||
val actor2 = serialization.fromBinary(serialization.toBinary(actor1)).asInstanceOf[LocalActorRef]
|
||||
Thread.sleep(1000)
|
||||
actor2.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0)
|
||||
(actor2 ? "hello-reply").get must equal("world")
|
||||
|
||||
val actor3 = serialization.fromBinary(serialization.toBinary(actor1, false)).asInstanceOf[LocalActorRef]
|
||||
Thread.sleep(1000)
|
||||
actor3.underlying.dispatcher.mailboxSize(actor1.underlying) must equal(0)
|
||||
(actor3 ? "hello-reply").get must equal("world")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class MyJavaSerializableActor extends Actor with scala.Serializable {
|
||||
var count = 0
|
||||
receiveTimeout = Some(1000)
|
||||
|
||||
def receive = {
|
||||
case "hello" ⇒
|
||||
count = count + 1
|
||||
sender ! "world " + count
|
||||
}
|
||||
}
|
||||
|
||||
class MyStatelessActorWithMessagesInMailbox extends Actor with scala.Serializable {
|
||||
def receive = {
|
||||
case "hello" ⇒
|
||||
Thread.sleep(500)
|
||||
case "hello-reply" ⇒ sender ! "world"
|
||||
}
|
||||
}
|
||||
|
||||
class MyActorWithProtobufMessagesInMailbox extends Actor with scala.Serializable {
|
||||
def receive = {
|
||||
case m: Message ⇒
|
||||
Thread.sleep(500)
|
||||
case "hello-reply" ⇒ sender ! "world"
|
||||
}
|
||||
}
|
||||
|
||||
class PersonActorWithMessagesInMailbox extends Actor with scala.Serializable {
|
||||
def receive = {
|
||||
case p: Person ⇒
|
||||
Thread.sleep(500)
|
||||
case "hello-reply" ⇒ sender ! "hello"
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue