From 3e3cf86bdf41c4b53b87983fa749b149d2974d29 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 23 Oct 2011 18:40:00 +0200 Subject: [PATCH] Removing futures from the remoting --- .../scala/akka/remote/RemoteInterface.scala | 16 +- .../src/main/scala/akka/remote/Remote.scala | 2 +- .../akka/remote/RemoteActorRefProvider.scala | 6 +- .../main/scala/akka/remote/RemoteConfig.scala | 1 - .../remote/netty/NettyRemoteSupport.scala | 136 ++--------- .../serialization/SerializationProtocol.scala | 220 +----------------- .../serialization/ActorSerializeSpec.scala | 158 ------------- 7 files changed, 30 insertions(+), 509 deletions(-) delete mode 100644 akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala diff --git a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala index c1050c7842..99cfbccc0c 100644 --- a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala @@ -321,16 +321,10 @@ trait RemoteServerModule extends RemoteModule { this: RemoteSupport ⇒ trait RemoteClientModule extends RemoteModule { self: RemoteSupport ⇒ def actorFor(address: String, hostname: String, port: Int): ActorRef = - actorFor(address, app.AkkaConfig.ActorTimeoutMillis, hostname, port, None) + actorFor(address, hostname, port, None) def actorFor(address: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = - actorFor(address, app.AkkaConfig.ActorTimeoutMillis, hostname, port, Some(loader)) - - def actorFor(address: String, timeout: Long, hostname: String, port: Int): ActorRef = - actorFor(address, timeout, hostname, port, None) - - def actorFor(address: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = - actorFor(address, timeout, hostname, port, Some(loader)) + actorFor(address, hostname, port, Some(loader)) /** * Clean-up all open connections. @@ -349,13 +343,11 @@ trait RemoteClientModule extends RemoteModule { self: RemoteSupport ⇒ /** Methods that needs to be implemented by a transport **/ - protected[akka] def actorFor(address: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef + protected[akka] def actorFor(address: String, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef protected[akka] def send[T](message: Any, senderOption: Option[ActorRef], - senderFuture: Option[Promise[T]], remoteAddress: InetSocketAddress, - isOneWay: Boolean, actorRef: ActorRef, - loader: Option[ClassLoader]): Option[Promise[T]] + loader: Option[ClassLoader]): Unit } diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 82c3ea823d..2d78b2f306 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -13,7 +13,7 @@ import akka.util._ import akka.util.duration._ import akka.util.Helpers._ import akka.actor.DeploymentConfig._ -import akka.serialization.{ Serialization, Serializer, ActorSerialization, Compression } +import akka.serialization.{ Serialization, Serializer, Compression } import akka.serialization.Compression.LZF import akka.remote.RemoteProtocol._ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index c65c05bff6..4981e5244a 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -13,7 +13,7 @@ import akka.dispatch._ import akka.util.duration._ import akka.config.ConfigurationException import akka.event.{ DeathWatch, EventHandler } -import akka.serialization.{ Serialization, Serializer, ActorSerialization, Compression } +import akka.serialization.{ Serialization, Serializer, Compression } import akka.serialization.Compression.LZF import akka.remote.RemoteProtocol._ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ @@ -247,7 +247,7 @@ private[akka] case class RemoteActorRef private[akka] ( protected[akka] def sendSystemMessage(message: SystemMessage): Unit = unsupported def postMessageToMailbox(message: Any, sender: ActorRef) { - remote.send[Any](message, Some(sender), None, remoteAddress, true, this, loader) + remote.send[Any](message, Some(sender), remoteAddress, this, loader) } def ?(message: Any)(implicit timeout: Timeout): Future[Any] = remote.app.provider.ask(message, this, timeout) @@ -260,7 +260,7 @@ private[akka] case class RemoteActorRef private[akka] ( synchronized { if (running) { running = false - postMessageToMailbox(new Terminate(), remote.app.deadLetters) + remote.send[Any](new Terminate(), None, remoteAddress, this, loader) } } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConfig.scala b/akka-remote/src/main/scala/akka/remote/RemoteConfig.scala index a1af20f8bc..4cae594a68 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConfig.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConfig.scala @@ -21,7 +21,6 @@ class RemoteClientSettings(val app: AkkaApplication) { val RECONNECTION_TIME_WINDOW = Duration(config.getInt("akka.remote.client.reconnection-time-window", 600), DefaultTimeUnit).toMillis val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 3600), DefaultTimeUnit) val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), DefaultTimeUnit) - val REAP_FUTURES_DELAY = Duration(config.getInt("akka.remote.client.reap-futures-delay", 5), DefaultTimeUnit) val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size", 1048576) } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index ac29e0d5b1..16c910dfcd 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -57,14 +57,10 @@ trait NettyRemoteClientModule extends RemoteClientModule { protected[akka] def send[T](message: Any, senderOption: Option[ActorRef], - senderFuture: Option[Promise[T]], remoteAddress: InetSocketAddress, - isOneWay: Boolean, actorRef: ActorRef, - loader: Option[ClassLoader]): Option[Promise[T]] = - withClientFor(remoteAddress, loader) { client ⇒ - client.send[T](message, senderOption, senderFuture, remoteAddress, isOneWay, actorRef) - } + loader: Option[ClassLoader]): Unit = + withClientFor(remoteAddress, loader) { _.send[T](message, senderOption, remoteAddress, actorRef) } private[akka] def withClientFor[T]( address: InetSocketAddress, loader: Option[ClassLoader])(body: RemoteClient ⇒ T): T = { @@ -125,9 +121,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { } def shutdownRemoteClients() = lock withWriteGuard { - remoteClients.foreach({ - case (addr, client) ⇒ client.shutdown() - }) + remoteClients foreach { case (_, client) ⇒ client.shutdown() } remoteClients.clear() } } @@ -149,8 +143,6 @@ abstract class RemoteClient private[akka] ( val serialization = new RemoteActorSerialization(remoteSupport) - protected val futures = new ConcurrentHashMap[Uuid, Promise[_]] - private[remote] val runSwitch = new Switch() private[remote] def isRunning = runSwitch.isOn @@ -166,74 +158,28 @@ abstract class RemoteClient private[akka] ( /** * Converts the message to the wireprotocol and sends the message across the wire */ - def send[T]( - message: Any, - senderOption: Option[ActorRef], - senderFuture: Option[Promise[T]], - remoteAddress: InetSocketAddress, - isOneWay: Boolean, - actorRef: ActorRef): Option[Promise[T]] = { - val messageProtocol = serialization.createRemoteMessageProtocolBuilder( - Some(actorRef), Left(actorRef.uuid), actorRef.address, app.AkkaConfig.ActorTimeoutMillis, Right(message), isOneWay, senderOption).build - send(messageProtocol, senderFuture) + def send[T](message: Any, senderOption: Option[ActorRef], remoteAddress: InetSocketAddress, actorRef: ActorRef) { + val messageProtocol = serialization.createRemoteMessageProtocolBuilder(Some(actorRef), Left(actorRef.uuid), actorRef.address, app.AkkaConfig.ActorTimeoutMillis, Right(message), senderOption).build + send(messageProtocol) } /** * Sends the message across the wire */ - def send[T]( - request: RemoteMessageProtocol, - senderFuture: Option[Promise[T]]): Option[Promise[T]] = { - - if (isRunning) { + def send[T](request: RemoteMessageProtocol) { + if (isRunning) { //TODO FIXME RACY app.eventHandler.debug(this, "Sending to connection [%s] message [\n%s]".format(remoteAddress, request)) // tell - if (request.getOneWay) { - try { - val future = currentChannel.write(RemoteEncoder.encode(request)) - future.awaitUninterruptibly() - if (!future.isCancelled && !future.isSuccess) { - notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) - } - } catch { - case e: Exception ⇒ notifyListeners(RemoteClientError(e, module, remoteAddress)) + try { + val future = currentChannel.write(RemoteEncoder.encode(request)) + future.awaitUninterruptibly() //TODO FIXME SWITCH TO NONBLOCKING WRITE + if (!future.isCancelled && !future.isSuccess) { + notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) } - None - - // ask - } else { - val futureResult = - if (senderFuture.isDefined) senderFuture.get - else new DefaultPromise[T](request.getActorInfo.getTimeout)(app.dispatcher) - - val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) - futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails - - def handleRequestReplyError(future: ChannelFuture) = { - val f = futures.remove(futureUuid) // Clean up future - if (f ne null) f.completeWithException(future.getCause) - } - - var future: ChannelFuture = null - try { - // try to send the original one - future = currentChannel.write(RemoteEncoder.encode(request)) - future.awaitUninterruptibly() - - if (future.isCancelled || !future.isSuccess) { - notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) - handleRequestReplyError(future) - } - - } catch { - case e: Exception ⇒ - notifyListeners(RemoteClientWriteFailed(request, e, module, remoteAddress)) - handleRequestReplyError(future) - } - Some(futureResult) + } catch { + case e: Exception ⇒ notifyListeners(RemoteClientError(e, module, remoteAddress)) } - } else { val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", module, remoteAddress) notifyListeners(RemoteClientError(exception, module, remoteAddress)) @@ -314,7 +260,7 @@ class ActiveRemoteClient private[akka] ( timer = new HashedWheelTimer bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)) - bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(app, settings, name, futures, bootstrap, remoteAddress, timer, this)) + bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(app, settings, name, bootstrap, remoteAddress, timer, this)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) @@ -329,23 +275,8 @@ class ActiveRemoteClient private[akka] ( notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress)) app.eventHandler.error(connection.getCause, this, "Remote client connection to [%s] has failed".format(remoteAddress)) false - } else { sendSecureCookie(connection) - - //Add a task that does GCing of expired Futures - timer.newTimeout(new TimerTask() { - def run(timeout: Timeout) = { - if (isRunning) { - val i = futures.entrySet.iterator - while (i.hasNext) { - val e = i.next - if (e.getValue.isExpired) - futures.remove(e.getKey) - } - } - } - }, REAP_FUTURES_DELAY.length, REAP_FUTURES_DELAY.unit) notifyListeners(RemoteClientStarted(module, remoteAddress)) true } @@ -400,7 +331,6 @@ class ActiveRemoteClientPipelineFactory( app: AkkaApplication, val settings: RemoteClientSettings, name: String, - futures: ConcurrentMap[Uuid, Promise[_]], bootstrap: ClientBootstrap, remoteAddress: InetSocketAddress, timer: HashedWheelTimer, @@ -414,7 +344,7 @@ class ActiveRemoteClientPipelineFactory( val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val remoteClient = new ActiveRemoteClientHandler(app, settings, name, futures, bootstrap, remoteAddress, timer, client) + val remoteClient = new ActiveRemoteClientHandler(app, settings, name, bootstrap, remoteAddress, timer, client) new StaticChannelPipeline(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient) } @@ -428,7 +358,6 @@ class ActiveRemoteClientHandler( val app: AkkaApplication, val settings: RemoteClientSettings, val name: String, - val futures: ConcurrentMap[Uuid, Promise[_]], val bootstrap: ClientBootstrap, val remoteAddress: InetSocketAddress, val timer: HashedWheelTimer, @@ -453,20 +382,7 @@ class ActiveRemoteClientHandler( val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow) app.eventHandler.debug(this, "Remote client received RemoteMessageProtocol[\n%s]\nTrying to map back to future [%s]".format(reply, replyUuid)) - futures.remove(replyUuid).asInstanceOf[Promise[Any]] match { - case null ⇒ - client.notifyListeners(RemoteClientError( - new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist"), client.module, - client.remoteAddress)) - - case future ⇒ - if (reply.hasMessage) { - val message = MessageSerializer.deserialize(app, reply.getMessage) - future.completeWithResult(message) - } else { - future.completeWithException(parseException(reply, client.loader)) - } - } + //TODO FIXME DOESN'T DO ANYTHING ANYMORE case other ⇒ throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress) @@ -556,12 +472,7 @@ class NettyRemoteSupport(_app: AkkaApplication) extends RemoteSupport(_app) with def optimizeLocalScoped_?() = optimizeLocal.get - protected[akka] def actorFor( - actorAddress: String, - timeout: Long, - host: String, - port: Int, - loader: Option[ClassLoader]): ActorRef = { + protected[akka] def actorFor(actorAddress: String, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = { val homeInetSocketAddress = this.address if (optimizeLocalScoped_?) { @@ -1023,14 +934,7 @@ class RemoteServerHandler( private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol = { val actorInfo = request.getActorInfo - val messageBuilder = serialization.createRemoteMessageProtocolBuilder( - None, - Right(request.getUuid), - actorInfo.getAddress, - actorInfo.getTimeout, - Left(exception), - true, - None) + val messageBuilder = serialization.createRemoteMessageProtocolBuilder(None, Right(request.getUuid), actorInfo.getAddress, actorInfo.getTimeout, Left(exception), None) if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) RemoteEncoder.encode(messageBuilder.build) } diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index c49e8c5dbe..75415b9be1 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -22,205 +22,6 @@ import com.google.protobuf.ByteString import com.eaio.uuid.UUID -/** - * Module for local actor serialization. - */ -class ActorSerialization(val app: AkkaApplication, remote: RemoteSupport) { - implicit val defaultSerializer = akka.serialization.JavaSerializer // Format.Default - - val remoteActorSerialization = new RemoteActorSerialization(remote) - - def fromBinary[T <: Actor](bytes: Array[Byte], homeAddress: InetSocketAddress): ActorRef = - fromBinaryToLocalActorRef(bytes, None, Some(homeAddress)) - - def fromBinary[T <: Actor](bytes: Array[Byte], uuid: UUID): ActorRef = - fromBinaryToLocalActorRef(bytes, Some(uuid), None) - - def fromBinary[T <: Actor](bytes: Array[Byte]): ActorRef = - fromBinaryToLocalActorRef(bytes, None, None) - - def toBinary[T <: Actor]( - a: ActorRef, - serializeMailBox: Boolean = true, - replicationScheme: ReplicationScheme = Transient): Array[Byte] = - toSerializedActorRefProtocol(a, serializeMailBox, replicationScheme).toByteArray - - // wrapper for implicits to be used by Java - def fromBinaryJ[T <: Actor](bytes: Array[Byte]): ActorRef = - fromBinary(bytes) - - // wrapper for implicits to be used by Java - def toBinaryJ[T <: Actor]( - a: ActorRef, - srlMailBox: Boolean, - replicationScheme: ReplicationScheme): Array[Byte] = - toBinary(a, srlMailBox, replicationScheme) - - @deprecated("BROKEN, REMOVE ME", "NOW") - private[akka] def toSerializedActorRefProtocol[T <: Actor]( - actorRef: ActorRef, - serializeMailBox: Boolean, - replicationScheme: ReplicationScheme): SerializedActorRefProtocol = { - - val localRef: Option[LocalActorRef] = actorRef match { - case l: LocalActorRef ⇒ Some(l) - case _ ⇒ None - } - - val builder = SerializedActorRefProtocol.newBuilder - .setUuid(UuidProtocol.newBuilder.setHigh(actorRef.uuid.getTime).setLow(actorRef.uuid.getClockSeqAndNode).build) - .setAddress(actorRef.address) - .setTimeout(app.AkkaConfig.ActorTimeoutMillis) - - replicationScheme match { - case _: Transient | Transient ⇒ - builder.setReplicationStorage(ReplicationStorageType.TRANSIENT) - - case Replication(storage, strategy) ⇒ - val storageType = storage match { - case _: TransactionLog | TransactionLog ⇒ ReplicationStorageType.TRANSACTION_LOG - case _: DataGrid | DataGrid ⇒ ReplicationStorageType.DATA_GRID - } - builder.setReplicationStorage(storageType) - - val strategyType = strategy match { - case _: WriteBehind ⇒ ReplicationStrategyType.WRITE_BEHIND - case _: WriteThrough ⇒ ReplicationStrategyType.WRITE_THROUGH - } - builder.setReplicationStrategy(strategyType) - } - - localRef foreach { l ⇒ - if (serializeMailBox) { - l.underlying.mailbox match { - case null ⇒ throw new IllegalActorStateException("Can't serialize an actor that has not been started.") - case q: java.util.Queue[_] ⇒ - val l = new scala.collection.mutable.ListBuffer[Envelope] - val it = q.iterator - while (it.hasNext) l += it.next.asInstanceOf[Envelope] - - l map { m ⇒ - remoteActorSerialization.createRemoteMessageProtocolBuilder( - localRef, - Left(actorRef.uuid), - actorRef.address, - app.AkkaConfig.ActorTimeoutMillis, - Right(m.message), - false, - m.sender match { - case a: ActorRef ⇒ Some(a) - case _ ⇒ None - }) - } foreach { - builder.addMessages(_) - } - } - } - - l.underlying.receiveTimeout.foreach(builder.setReceiveTimeout(_)) - val actorInstance = l.underlyingActorInstance - app.serialization.serialize(actorInstance.asInstanceOf[T]) match { - case Right(bytes) ⇒ builder.setActorInstance(ByteString.copyFrom(bytes)) - case Left(exception) ⇒ throw new Exception("Error serializing : " + actorInstance.getClass.getName) - } - val stack = l.underlying.hotswap - if (!stack.isEmpty) - builder.setHotswapStack(ByteString.copyFrom(akka.serialization.JavaSerializer.toBinary(stack))) - } - - builder.build - } - - private def fromBinaryToLocalActorRef[T <: Actor]( - bytes: Array[Byte], - uuid: Option[UUID], - homeAddress: Option[InetSocketAddress]): ActorRef = { - val builder = SerializedActorRefProtocol.newBuilder.mergeFrom(bytes) - fromProtobufToLocalActorRef(builder.build, uuid, None) - } - - private[akka] def fromProtobufToLocalActorRef[T <: Actor]( - protocol: SerializedActorRefProtocol, - overriddenUuid: Option[UUID], - loader: Option[ClassLoader]): ActorRef = { - - app.eventHandler.debug(this, "Deserializing SerializedActorRefProtocol to LocalActorRef:\n%s".format(protocol)) - - // import ReplicationStorageType._ - // import ReplicationStrategyType._ - // val replicationScheme = - // if (protocol.hasReplicationStorage) { - // protocol.getReplicationStorage match { - // case TRANSIENT ⇒ Transient - // case store ⇒ - // val storage = store match { - // case TRANSACTION_LOG ⇒ TransactionLog - // case DATA_GRID ⇒ DataGrid - // } - // val strategy = if (protocol.hasReplicationStrategy) { - // protocol.getReplicationStrategy match { - // case WRITE_THROUGH ⇒ WriteThrough - // case WRITE_BEHIND ⇒ WriteBehind - // } - // } else throw new IllegalActorStateException( - // "Expected replication strategy for replication storage [" + storage + "]") - // Replication(storage, strategy) - // } - // } else Transient - - val storedHotswap = - try { - app.serialization.deserialize( - protocol.getHotswapStack.toByteArray, - classOf[Stack[PartialFunction[Any, Unit]]], - loader) match { - case Right(r) ⇒ r.asInstanceOf[Stack[PartialFunction[Any, Unit]]] - case Left(ex) ⇒ throw new Exception("Cannot de-serialize hotswapstack") - } - } catch { - case e: Exception ⇒ Stack[PartialFunction[Any, Unit]]() - } - - val storedSupervisor = - if (protocol.hasSupervisor) Some(remoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader)) - else None - - val classLoader = loader.getOrElse(this.getClass.getClassLoader) - val bytes = protocol.getActorInstance.toByteArray - val actorClass = classLoader.loadClass(protocol.getActorClassname) - val factory = () ⇒ { - app.serialization.deserialize(bytes, actorClass, loader) match { - case Right(r) ⇒ r.asInstanceOf[Actor] - case Left(ex) ⇒ throw new Exception("Cannot de-serialize : " + actorClass) - } - } - - val actorUuid = overriddenUuid match { - case Some(uuid) ⇒ uuid - case None ⇒ uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow) - } - - val props = Props(creator = factory, - timeout = if (protocol.hasTimeout) protocol.getTimeout else app.AkkaConfig.ActorTimeout //TODO what dispatcher should it use? - //TODO what faultHandler should it use? - ) - - val receiveTimeout = if (protocol.hasReceiveTimeout) Some(protocol.getReceiveTimeout) else None //TODO FIXME, I'm expensive and slow - - // FIXME: what to do if storedSupervisor is empty? - val ar = new LocalActorRef(app, props, storedSupervisor getOrElse app.guardian, protocol.getAddress, false, actorUuid, receiveTimeout, storedHotswap) - - //Deserialize messages - { - val iterator = protocol.getMessagesList.iterator() - while (iterator.hasNext()) - ar ! MessageSerializer.deserialize(app, iterator.next().getMessage, Some(classLoader)) //TODO This is broken, why aren't we preserving the sender? - } - - ar - } -} - class RemoteActorSerialization(remote: RemoteSupport) { /** @@ -281,7 +82,6 @@ class RemoteActorSerialization(remote: RemoteSupport) { actorAddress: String, timeout: Long, message: Either[Throwable, Any], - isOneWay: Boolean, senderOption: Option[ActorRef]): RemoteMessageProtocol.Builder = { val uuidProtocol = replyUuid match { @@ -301,7 +101,7 @@ class RemoteActorSerialization(remote: RemoteSupport) { UuidProtocol.newBuilder.setHigh(messageUuid.getTime).setLow(messageUuid.getClockSeqAndNode).build }) .setActorInfo(actorInfo) - .setOneWay(isOneWay) + .setOneWay(true) message match { case Right(message) ⇒ @@ -309,26 +109,10 @@ class RemoteActorSerialization(remote: RemoteSupport) { case Left(exception) ⇒ messageBuilder.setException(ExceptionProtocol.newBuilder .setClassname(exception.getClass.getName) - .setMessage(empty(exception.getMessage)) + .setMessage(Option(exception.getMessage).getOrElse("")) .build) } - def empty(s: String): String = s match { - case null ⇒ "" - case s ⇒ s - } - - /* TODO invent new supervision strategy - actorRef.foreach { ref => - ref.registerSupervisorAsRemoteActor.foreach { id => - messageBuilder.setSupervisorUuid( - UuidProtocol.newBuilder - .setHigh(id.getTime) - .setLow(id.getClockSeqAndNode) - .build) - } - } */ - if (senderOption.isDefined) messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get)) diff --git a/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala b/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala deleted file mode 100644 index a7d8b374e7..0000000000 --- a/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala +++ /dev/null @@ -1,158 +0,0 @@ -package akka.serialization - -import org.scalatest.BeforeAndAfterAll -import com.google.protobuf.Message -import akka.actor._ -import akka.remote._ -import akka.testkit.AkkaSpec -import akka.serialization.SerializeSpec.Person - -case class MyMessage(id: Long, name: String, status: Boolean) - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ActorSerializeSpec extends AkkaSpec with BeforeAndAfterAll { - - lazy val remote: Remote = { - app.provider match { - case r: RemoteActorRefProvider ⇒ r.remote - case _ ⇒ throw new Exception("Remoting is not enabled") - } - } - - lazy val serialization = new ActorSerialization(app, remote.server) - - "Serializable actor" must { - "must be able to serialize and de-serialize a stateful actor with a given serializer" ignore { - - val actor1 = new LocalActorRef(app, Props[MyJavaSerializableActor], app.guardian, Props.randomAddress, systemService = true) - - (actor1 ? "hello").get must equal("world 1") - (actor1 ? "hello").get must equal("world 2") - - val bytes = serialization.toBinary(actor1) - val actor2 = serialization.fromBinary(bytes).asInstanceOf[LocalActorRef] - (actor2 ? "hello").get must equal("world 3") - - actor2.underlying.receiveTimeout must equal(Some(1000)) - actor1.stop() - actor2.stop() - } - - "must be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox" ignore { - - val actor1 = new LocalActorRef(app, Props[MyStatelessActorWithMessagesInMailbox], app.guardian, Props.randomAddress, systemService = true) - for (i ← 1 to 10) actor1 ! "hello" - - actor1.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0) - val actor2 = serialization.fromBinary(serialization.toBinary(actor1)).asInstanceOf[LocalActorRef] - Thread.sleep(1000) - actor2.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0) - (actor2 ? "hello-reply").get must equal("world") - - val actor3 = serialization.fromBinary(serialization.toBinary(actor1, false)).asInstanceOf[LocalActorRef] - Thread.sleep(1000) - actor3.underlying.dispatcher.mailboxSize(actor1.underlying) must equal(0) - (actor3 ? "hello-reply").get must equal("world") - } - - "must be able to serialize and deserialize a PersonActorWithMessagesInMailbox" ignore { - - val p1 = Person("debasish ghosh", 25, SerializeSpec.Address("120", "Monroe Street", "Santa Clara", "95050")) - val actor1 = new LocalActorRef(app, Props[PersonActorWithMessagesInMailbox], app.guardian, Props.randomAddress, systemService = true) - (actor1 ! p1) - (actor1 ! p1) - (actor1 ! p1) - (actor1 ! p1) - (actor1 ! p1) - (actor1 ! p1) - (actor1 ! p1) - (actor1 ! p1) - (actor1 ! p1) - (actor1 ! p1) - actor1.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0) - val actor2 = serialization.fromBinary(serialization.toBinary(actor1)).asInstanceOf[LocalActorRef] - Thread.sleep(1000) - actor2.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0) - (actor2 ? "hello-reply").get must equal("hello") - - val actor3 = serialization.fromBinary(serialization.toBinary(actor1, false)).asInstanceOf[LocalActorRef] - Thread.sleep(1000) - actor3.underlying.dispatcher.mailboxSize(actor1.underlying) must equal(0) - (actor3 ? "hello-reply").get must equal("hello") - } - } - - "serialize protobuf" must { - "must serialize" ignore { - val msg = MyMessage(123, "debasish ghosh", true) - - val ser = new Serialization(app) - - val b = ser.serialize(ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build) match { - case Left(exception) ⇒ fail(exception) - case Right(bytes) ⇒ bytes - } - val in = ser.deserialize(b, classOf[ProtobufProtocol.MyMessage], None) match { - case Left(exception) ⇒ fail(exception) - case Right(i) ⇒ i - } - val m = in.asInstanceOf[ProtobufProtocol.MyMessage] - MyMessage(m.getId, m.getName, m.getStatus) must equal(msg) - } - } - - "serialize actor that accepts protobuf message" ignore { - "must serialize" ignore { - - val actor1 = new LocalActorRef(app, Props[MyActorWithProtobufMessagesInMailbox], app.guardian, Props.randomAddress, systemService = true) - val msg = MyMessage(123, "debasish ghosh", true) - val b = ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build - for (i ← 1 to 10) actor1 ! b - actor1.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0) - val actor2 = serialization.fromBinary(serialization.toBinary(actor1)).asInstanceOf[LocalActorRef] - Thread.sleep(1000) - actor2.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0) - (actor2 ? "hello-reply").get must equal("world") - - val actor3 = serialization.fromBinary(serialization.toBinary(actor1, false)).asInstanceOf[LocalActorRef] - Thread.sleep(1000) - actor3.underlying.dispatcher.mailboxSize(actor1.underlying) must equal(0) - (actor3 ? "hello-reply").get must equal("world") - } - } -} - -class MyJavaSerializableActor extends Actor with scala.Serializable { - var count = 0 - receiveTimeout = Some(1000) - - def receive = { - case "hello" ⇒ - count = count + 1 - sender ! "world " + count - } -} - -class MyStatelessActorWithMessagesInMailbox extends Actor with scala.Serializable { - def receive = { - case "hello" ⇒ - Thread.sleep(500) - case "hello-reply" ⇒ sender ! "world" - } -} - -class MyActorWithProtobufMessagesInMailbox extends Actor with scala.Serializable { - def receive = { - case m: Message ⇒ - Thread.sleep(500) - case "hello-reply" ⇒ sender ! "world" - } -} - -class PersonActorWithMessagesInMailbox extends Actor with scala.Serializable { - def receive = { - case p: Person ⇒ - Thread.sleep(500) - case "hello-reply" ⇒ sender ! "hello" - } -}