diff --git a/akka-camel/src/main/scala/service/ConsumerPublisher.scala b/akka-camel/src/main/scala/service/ConsumerPublisher.scala index 45aa0514f6..722f4e428e 100644 --- a/akka-camel/src/main/scala/service/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/service/ConsumerPublisher.scala @@ -121,15 +121,15 @@ object Publish { def forConsumer(actor: ActorRef): Option[Publish] = forConsumeAnnotated(actor) orElse forConsumerType(actor) - private def forConsumeAnnotated(actorId: ActorRef): Option[Publish] = { - val annotation = actorId.actorClass.getAnnotation(classOf[consume]) + private def forConsumeAnnotated(actorRef: ActorRef): Option[Publish] = { + val annotation = actorRef.actorClass.getAnnotation(classOf[consume]) if (annotation eq null) None - else if (actorId.remoteAddress.isDefined) None // do not publish proxies - else Some(Publish(annotation.value, actorId.id, false)) + else if (actorRef.remoteAddress.isDefined) None // do not publish proxies + else Some(Publish(annotation.value, actorRef.id, false)) } - private def forConsumerType(actorId: ActorRef): Option[Publish] = - if (!actorId.actor.isInstanceOf[Consumer]) None - else if (actorId.remoteAddress.isDefined) None - else Some(Publish(actorId.actor.asInstanceOf[Consumer].endpointUri, actorId.uuid, true)) + private def forConsumerType(actorRef: ActorRef): Option[Publish] = + if (!actorRef.actor.isInstanceOf[Consumer]) None + else if (actorRef.remoteAddress.isDefined) None + else Some(Publish(actorRef.actor.asInstanceOf[Consumer].endpointUri, actorRef.uuid, true)) } diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 187d770a07..7a81953403 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -5,8 +5,8 @@ package se.scalablesolutions.akka.actor import se.scalablesolutions.akka.config.FaultHandlingStrategy -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest -import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol +import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestProtocolIdFactory} import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future} import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.serialization.Serializer @@ -284,9 +284,9 @@ object ActiveObject { actor.initialize(target, proxy) actor.timeout = timeout if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) - val actorId = new ActorRef(() => actor) - AspectInitRegistry.register(proxy, AspectInit(target, actorId, remoteAddress, timeout)) - actorId.start + val actorRef = new ActorRef(() => actor) + AspectInitRegistry.register(proxy, AspectInit(target, actorRef, remoteAddress, timeout)) + actorRef.start proxy.asInstanceOf[T] } @@ -295,9 +295,9 @@ object ActiveObject { actor.initialize(target.getClass, target) actor.timeout = timeout if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) - val actorId = new ActorRef(() => actor) - AspectInitRegistry.register(proxy, AspectInit(intf, actorId, remoteAddress, timeout)) - actorId.start + val actorRef = new ActorRef(() => actor) + AspectInitRegistry.register(proxy, AspectInit(intf, actorRef, remoteAddress, timeout)) + actorRef.start proxy.asInstanceOf[T] } @@ -388,10 +388,10 @@ private[akka] object AspectInitRegistry { private[akka] sealed case class AspectInit( val target: Class[_], - val actorId: ActorRef, + val actorRef: ActorRef, val remoteAddress: Option[InetSocketAddress], val timeout: Long) { - def this(target: Class[_], actorId: ActorRef, timeout: Long) = this(target, actorId, None, timeout) + def this(target: Class[_], actorRef: ActorRef, timeout: Long) = this(target, actorRef, None, timeout) } /** @@ -405,7 +405,7 @@ private[akka] sealed case class AspectInit( private[akka] sealed class ActiveObjectAspect { @volatile private var isInitialized = false private var target: Class[_] = _ - private var actorId: ActorRef = _ + private var actorRef: ActorRef = _ private var remoteAddress: Option[InetSocketAddress] = _ private var timeout: Long = _ @@ -414,7 +414,7 @@ private[akka] sealed class ActiveObjectAspect { if (!isInitialized) { val init = AspectInitRegistry.initFor(joinPoint.getThis) target = init.target - actorId = init.actorId + actorRef = init.actorRef remoteAddress = init.remoteAddress timeout = init.timeout isInitialized = true @@ -430,10 +430,10 @@ private[akka] sealed class ActiveObjectAspect { private def localDispatch(joinPoint: JoinPoint): AnyRef = { val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti] if (isOneWay(rtti)) { - (actorId ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef] + (actorRef ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef] } else { - val result = actorId !! (Invocation(joinPoint, false, isVoid(rtti)), timeout) + val result = actorRef !! (Invocation(joinPoint, false, isVoid(rtti)), timeout) if (result.isDefined) result.get else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]") } @@ -443,17 +443,17 @@ private[akka] sealed class ActiveObjectAspect { val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti] val oneWay_? = isOneWay(rtti) || isVoid(rtti) val (message: Array[AnyRef], isEscaped) = escapeArguments(rtti.getParameterValues) - val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) + val requestBuilder = RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) .setMethod(rtti.getMethod.getName) .setTarget(target.getName) - .setUuid(actorId.uuid) + .setUuid(actorRef.uuid) .setTimeout(timeout) .setIsActor(false) .setIsOneWay(oneWay_?) .setIsEscaped(false) RemoteProtocolBuilder.setMessage(message, requestBuilder) - val id = actorId.actor.registerSupervisorAsRemoteActor + val id = actorRef.actor.registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) val remoteMessage = requestBuilder.build val future = RemoteClient.clientFor(remoteAddress.get).send(remoteMessage, None) diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 3d6332f729..76fcfa0f15 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -11,12 +11,10 @@ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.stm.Transaction.Global._ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.TransactionManagement -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol -import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, - RemoteActorProxy, RemoteProtocolBuilder, - RemoteRequestIdFactory} import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util.{HashCode, Logging, UUID} +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol +import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteActorProxy, RemoteProtocolBuilder, RemoteRequestProtocolIdFactory} import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.commitbarriers.CountDownCommitBarrier @@ -250,31 +248,32 @@ object Actor extends Logging { } /** - * The ActorID object can be used to create ActorID instances out of its binary + * The ActorRef object can be used to create ActorRef instances out of its binary * protobuf based representation. *
- *   val actorRef = ActorID.fromBinary(bytes)
+ *   val actorRef = ActorRef.fromBinary(bytes)
  *   actorRef ! message // send message to remote actor through its reference
  * 
* * @author Jonas Bonér */ -object ActorID { - def fromBinary(bytes: Array[Byte]): ActorID = { - val actorRefProto = RemoteProtocol.ActorRef.newBuilder.mergeFrom(bytes).build +object ActorRef { + def fromBinary(bytes: Array[Byte]): ActorRef = + fromProtocol(RemoteProtocol.ActorRefProtocol.newBuilder.mergeFrom(bytes).build) + + def fromProtocol(protocol: RemoteProtocol.ActorRefProtocol): ActorRef = RemoteActorProxy( - actorRefProto.getUuid, - actorRefProto.getActorClassName, - actorRefProto.getSourceHostname, - actorRefProto.getSourcePort, - actorRefProto.getTimeout) - } + protocol.getUuid, + protocol.getActorClassName, + protocol.getSourceHostname, + protocol.getSourcePort, + protocol.getTimeout) } /** - * ActorID is an immutable and serializable handle to an Actor. + * ActorRef is an immutable and serializable handle to an Actor. *

- * Create an ActorID for an Actor by using the factory method on the Actor object. + * Create an ActorRef for an Actor by using the factory method on the Actor object. *

* Here is an example on how to create an actor with a default constructor. *

@@ -297,6 +296,7 @@ object ActorID {
  * 
  * @author Jonas Bonér
  */
+final class ActorRef private[akka] () {
   private[akka] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None)
 
   private[akka] lazy val actor: Actor = {
@@ -329,20 +329,29 @@ object ActorID {
     actorFactory = Right(Some(factory))
   }
   
-  def toBinary: Array[Byte] = {
+  def toProtocol: RemoteProtocol.ActorRefProtocol = {
+    val (host, port) = actor._replyToAddress.map(address => 
+      (address.getHostName, address.getPort))
+      .getOrElse((Actor.HOSTNAME, Actor.PORT))
+
     if (!actor._registeredInRemoteNodeDuringSerialization) { 
-      RemoteNode.register(uuid, this)
+      Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port)
+      if (RemoteServer.serverFor(host, port).isEmpty) (new RemoteServer).start(host, port)
+      RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(uuid, this)
       actor._registeredInRemoteNodeDuringSerialization = true
     }
-    RemoteProtocol.ActorRef.newBuilder
+    
+    RemoteProtocol.ActorRefProtocol.newBuilder
       .setUuid(uuid)
       .setActorClassName(actorClass.getName)
-      .setSourceHostname(RemoteServer.HOSTNAME)
-      .setSourcePort(RemoteServer.PORT)
+      .setSourceHostname(host)
+      .setSourcePort(port)
       .setTimeout(timeout)
-      .build.toByteArray
+      .build
   }
   
+  def toBinary: Array[Byte] = toProtocol.toByteArray
+  
   /**
    * Returns the class for the Actor instance that is managed by the ActorRef.
    */
@@ -450,7 +459,7 @@ object ActorID {
    * If you are sending messages using !! then you have to use reply(..)
    * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
    */
-  def !![T](message: Any): Option[T] = !![T](message, actor.timeout)
+  def !![T](message: Any)(implicit sender: Option[ActorRef] = None): Option[T] = !![T](message, actor.timeout)
 
   /**
    * Sends a message asynchronously returns a future holding the eventual reply message.
@@ -478,7 +487,7 @@ object ActorID {
     if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
     if (actor.isRunning) {
       sender.get.actor.replyTo match {
-        case Some(Left(actorID)) => actor.postMessageToMailbox(message, Some(actorID))
+        case Some(Left(actorRef)) => actor.postMessageToMailbox(message, Some(actorRef))
         case Some(Right(future)) => actor.postMessageToMailboxAndCreateFutureResultWithTimeout(message, actor.timeout, Some(future))
         case _                   => throw new IllegalStateException("Can't forward message when initial sender is not an actor")
       }
@@ -599,7 +608,7 @@ trait Actor extends TransactionManagement with Logging {
   @volatile private[this] var _isSuspended = true
   @volatile private[this] var _isShutDown = false
   @volatile private[akka] var _isKilled = false
-  @volatile private[akka] var _registeredInRemoteNodeDuringSerialization = true
+  @volatile private[akka] var _registeredInRemoteNodeDuringSerialization = false
   private var _hotswap: Option[PartialFunction[Any, Unit]] = None
   private[akka] var _remoteAddress: Option[InetSocketAddress] = None
   private[akka] var _linkedActors: Option[HashSet[ActorRef]] = None
@@ -904,7 +913,8 @@ trait Actor extends TransactionManagement with Logging {
 
 
   /**
-   * Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists.
+   * Set the contact address for this actor. This is used for replying to messages sent 
+   * asynchronously when no reply channel exists.
    */
   def setReplyToAddress(hostname: String, port: Int): Unit = setReplyToAddress(new InetSocketAddress(hostname, port))
 
@@ -934,12 +944,12 @@ trait Actor extends TransactionManagement with Logging {
    * 

* To be invoked from within the actor itself. */ - protected[this] def link(actorId: ActorRef) = { - if (actorId.supervisor.isDefined) throw new IllegalStateException( - "Actor can only have one supervisor [" + actorId + "], e.g. link(actor) fails") - getLinkedActors.add(actorId) - actorId.supervisor = Some(self) - Actor.log.debug("Linking actor [%s] to actor [%s]", actorId, this) + protected[this] def link(actorRef: ActorRef) = { + if (actorRef.supervisor.isDefined) throw new IllegalStateException( + "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") + getLinkedActors.add(actorRef) + actorRef.supervisor = Some(self) + Actor.log.debug("Linking actor [%s] to actor [%s]", actorRef, this) } /** @@ -947,12 +957,12 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def unlink(actorId: ActorRef) = { - if (!getLinkedActors.contains(actorId)) throw new IllegalStateException( - "Actor [" + actorId + "] is not a linked actor, can't unlink") - getLinkedActors.remove(actorId) - actorId.supervisor = None - Actor.log.debug("Unlinking actor [%s] from actor [%s]", actorId, this) + protected[this] def unlink(actorRef: ActorRef) = { + if (!getLinkedActors.contains(actorRef)) throw new IllegalStateException( + "Actor [" + actorRef + "] is not a linked actor, can't unlink") + getLinkedActors.remove(actorRef) + actorRef.supervisor = None + Actor.log.debug("Unlinking actor [%s] from actor [%s]", actorRef, this) } /** @@ -960,11 +970,11 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def startLink(actorId: ActorRef) = { + protected[this] def startLink(actorRef: ActorRef) = { try { - actorId.start + actorRef.start } finally { - link(actorId) + link(actorRef) } } @@ -973,12 +983,12 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def startLinkRemote(actorId: ActorRef, hostname: String, port: Int) = { + protected[this] def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = { try { - actorId.makeRemote(hostname, port) - actorId.start + actorRef.makeRemote(hostname, port) + actorRef.start } finally { - link(actorId) + link(actorRef) } } @@ -988,9 +998,9 @@ trait Actor extends TransactionManagement with Logging { * To be invoked from within the actor itself. */ protected[this] def spawn[T <: Actor : Manifest]: ActorRef = { - val actorId = spawnButDoNotStart[T] - actorId.start - actorId + val actorRef = spawnButDoNotStart[T] + actorRef.start + actorRef } /** @@ -1062,15 +1072,15 @@ trait Actor extends TransactionManagement with Logging { new ActorRef(() => actor) } - protected[akka] def postMessageToMailbox(message: Any, sender: Option[ActorRef]): Unit = { + protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { joinTransaction(message) if (_remoteAddress.isDefined) { - val requestBuilder = RemoteProtocol.RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) + val requestBuilder = RemoteProtocol.RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) .setTarget(this.getClass.getName) .setTimeout(this.timeout) - .setUuid(this.id) + .setUuid(this.uuid) .setIsActor(true) .setIsOneWay(true) .setIsEscaped(false) @@ -1078,30 +1088,12 @@ trait Actor extends TransactionManagement with Logging { val id = registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) - // set the source fields used to reply back to the original sender - // (i.e. not the remote proxy actor) - if (sender.isDefined) { - val s = sender.get.actor - requestBuilder.setSourceTarget(s.getClass.getName) - requestBuilder.setSourceUuid(s.uuid) + senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) - val (host, port) = s._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) - - Actor.log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port) - - requestBuilder.setSourceHostname(host) - requestBuilder.setSourcePort(port) - - if (RemoteServer.serverFor(host, port).isEmpty) { - val server = new RemoteServer - server.start(host, port) - } - RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(sender.get.id, sender.get) - } RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None) } else { - val invocation = new MessageInvocation(self, message, sender.map(Left(_)), transactionSet.get) + val invocation = new MessageInvocation(self, message, senderOption.map(Left(_)), transactionSet.get) if (messageDispatcher.usesActorMailbox) { _mailbox.add(invocation) if (_isSuspended) invocation.send @@ -1117,17 +1109,21 @@ trait Actor extends TransactionManagement with Logging { joinTransaction(message) if (_remoteAddress.isDefined) { - val requestBuilder = RemoteProtocol.RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) + val requestBuilder = RemoteProtocol.RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) .setTarget(this.getClass.getName) .setTimeout(this.timeout) - .setUuid(this.id) + .setUuid(this.uuid) .setIsActor(true) .setIsOneWay(false) .setIsEscaped(false) + + //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) RemoteProtocolBuilder.setMessage(message, requestBuilder) + val id = registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) + val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture) if (future.isDefined) future.get else throw new IllegalStateException("Expected a future from remote call to actor " + toString) @@ -1136,10 +1132,7 @@ trait Actor extends TransactionManagement with Logging { else new DefaultCompletableFuture[T](timeout) val invocation = new MessageInvocation( self, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get) - - if (messageDispatcher.usesActorMailbox) - _mailbox.add(invocation) - + if (messageDispatcher.usesActorMailbox) _mailbox.add(invocation) invocation.send future } @@ -1273,8 +1266,8 @@ trait Actor extends TransactionManagement with Logging { private[this] def restartLinkedActors(reason: Throwable) = { getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { - actorId => - val actor = actorId.actor + actorRef => + val actor = actorRef.actor if (actor.lifeCycle.isEmpty) actor.lifeCycle = Some(LifeCycle(Permanent)) actor.lifeCycle.get match { case LifeCycle(scope, _) => { @@ -1284,7 +1277,7 @@ trait Actor extends TransactionManagement with Logging { case Temporary => Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", actor.id) actor.stop - getLinkedActors.remove(actorId) // remove the temporary actor + getLinkedActors.remove(actorRef) // remove the temporary actor // if last temporary actor is gone, then unlink me from supervisor if (getLinkedActors.isEmpty) { Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)" + @@ -1370,7 +1363,7 @@ object DispatcherType { * * @author Jonas Bonér */ -class ActorMessageInvoker private[akka] (val actorId: ActorRef) extends MessageInvoker { - def invoke(handle: MessageInvocation) = actorId.actor.invoke(handle) +class ActorMessageInvoker private[akka] (val actorRef: ActorRef) extends MessageInvoker { + def invoke(handle: MessageInvocation) = actorRef.actor.invoke(handle) } diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index c72c588937..c52c59b2ae 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -57,9 +57,9 @@ object ActorRegistry extends Logging { val all = new ListBuffer[ActorRef] val elements = actorsByUUID.elements while (elements.hasMoreElements) { - val actorId = elements.nextElement - if (manifest.erasure.isAssignableFrom(actorId.actor.getClass)) { - all += actorId + val actorRef = elements.nextElement + if (manifest.erasure.isAssignableFrom(actorRef.actor.getClass)) { + all += actorRef } } all.toList @@ -92,24 +92,24 @@ object ActorRegistry extends Logging { /** * Registers an actor in the ActorRegistry. */ - def register(actorId: ActorRef) = { + def register(actorRef: ActorRef) = { // UUID - actorsByUUID.put(actorId.uuid, actorId) + actorsByUUID.put(actorRef.uuid, actorRef) // ID - val id = actorId.id - if (id eq null) throw new IllegalStateException("Actor.id is null " + actorId) - if (actorsById.containsKey(id)) actorsById.put(id, actorId :: actorsById.get(id)) - else actorsById.put(id, actorId :: Nil) + val id = actorRef.id + if (id eq null) throw new IllegalStateException("Actor.id is null " + actorRef) + if (actorsById.containsKey(id)) actorsById.put(id, actorRef :: actorsById.get(id)) + else actorsById.put(id, actorRef :: Nil) // Class name - val className = actorId.actor.getClass.getName + val className = actorRef.actor.getClass.getName if (actorsByClassName.containsKey(className)) { - actorsByClassName.put(className, actorId :: actorsByClassName.get(className)) - } else actorsByClassName.put(className, actorId :: Nil) + actorsByClassName.put(className, actorRef :: actorsByClassName.get(className)) + } else actorsByClassName.put(className, actorRef :: Nil) // notify listeners - foreachListener(_ ! ActorRegistered(actorId)) + foreachListener(_ ! ActorRegistered(actorRef)) } /** diff --git a/akka-core/src/main/scala/actor/Scheduler.scala b/akka-core/src/main/scala/actor/Scheduler.scala index 905cd6c8fe..2b31bea60c 100644 --- a/akka-core/src/main/scala/actor/Scheduler.scala +++ b/akka-core/src/main/scala/actor/Scheduler.scala @@ -58,9 +58,9 @@ object Scheduler extends Actor { def restart = service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) - def stopSupervising(actorId: ActorRef) = { - unlink(actorId) - schedulers.remove(actorId) + def stopSupervising(actorRef: ActorRef) = { + unlink(actorRef) + schedulers.remove(actorRef) } override def shutdown = { diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index f4b7b2b012..c0023ae44b 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -103,9 +103,9 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep override def stop = synchronized { super[Actor].stop - getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorId => - actorId.stop - log.info("Shutting actor down: %s", actorId) + getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorRef => + actorRef.stop + log.info("Shutting actor down: %s", actorRef) } log.info("Stopping supervisor: %s", this) } @@ -119,19 +119,19 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep case SupervisorConfig(_, servers) => servers.map(server => server match { - case Supervise(actorId, lifeCycle, remoteAddress) => - val className = actorId.actor.getClass.getName + case Supervise(actorRef, lifeCycle, remoteAddress) => + val className = actorRef.actor.getClass.getName val currentActors = { val list = actors.get(className) if (list eq null) List[ActorRef]() else list } - actors.put(className, actorId :: currentActors) - actorId.actor.lifeCycle = Some(lifeCycle) - startLink(actorId) + actors.put(className, actorRef :: currentActors) + actorRef.actor.lifeCycle = Some(lifeCycle) + startLink(actorRef) remoteAddress.foreach(address => RemoteServer.actorsFor( RemoteServer.Address(address.hostname, address.port)) - .actors.put(actorId.id, actorId)) + .actors.put(actorRef.id, actorRef)) case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration val supervisor = { diff --git a/akka-core/src/main/scala/config/SupervisionConfig.scala b/akka-core/src/main/scala/config/SupervisionConfig.scala index 7e1daa5935..7f1cd308b3 100644 --- a/akka-core/src/main/scala/config/SupervisionConfig.scala +++ b/akka-core/src/main/scala/config/SupervisionConfig.scala @@ -25,13 +25,13 @@ object ScalaConfig { case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server - class Supervise(val actorId: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server { + class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server { val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress) } object Supervise { - def apply(actorId: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorId, lifeCycle, remoteAddress) - def apply(actorId: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorId, lifeCycle, null) - def unapply(supervise: Supervise) = Some((supervise.actorId, supervise.lifeCycle, supervise.remoteAddress)) + def apply(actorRef: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorRef, lifeCycle, remoteAddress) + def apply(actorRef: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, null) + def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.remoteAddress)) } case class RestartStrategy( @@ -227,8 +227,8 @@ object JavaConfig { intf, target, lifeCycle.transform, timeout, transactionRequired, dispatcher, if (remoteAddress ne null) se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress(remoteAddress.hostname, remoteAddress.port) else null) - def newSupervised(actorId: ActorRef) = - se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorId, lifeCycle.transform) + def newSupervised(actorRef: ActorRef) = + se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorRef, lifeCycle.transform) } } \ No newline at end of file diff --git a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala index f5fd490df9..8d87272ef0 100644 --- a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala @@ -18,14 +18,14 @@ abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) exten def dispatch(invocation: MessageInvocation) = queue.append(invocation) - override def register(actorId: ActorRef) = synchronized { - messageInvokers.put(actorId, new ActorMessageInvoker(actorId)) - super.register(actorId) + override def register(actorRef: ActorRef) = synchronized { + messageInvokers.put(actorRef, new ActorMessageInvoker(actorRef)) + super.register(actorRef) } - override def unregister(actorId: ActorRef) = synchronized { - messageInvokers.remove(actorId) - super.unregister(actorId) + override def unregister(actorRef: ActorRef) = synchronized { + messageInvokers.remove(actorRef) + super.unregister(actorRef) } def shutdown = if (active) { diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 23adfed05f..4edf6651c0 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -31,7 +31,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef} class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends MessageDispatcher with ThreadPoolBuilder { @volatile private var active: Boolean = false - implicit def actorId2actor(actorId: ActorRef): Actor = actorId.actor + implicit def actorRef2actor(actorRef: ActorRef): Actor = actorRef.actor /** Type of the actors registered in this dispatcher. */ private var actorType:Option[Class[_]] = None @@ -193,15 +193,15 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool - override def register(actorId: ActorRef) = { - verifyActorsAreOfSameType(actorId) - pooledActors.add(actorId) - super.register(actorId) + override def register(actorRef: ActorRef) = { + verifyActorsAreOfSameType(actorRef) + pooledActors.add(actorRef) + super.register(actorRef) } - override def unregister(actorId: ActorRef) = { - pooledActors.remove(actorId) - super.unregister(actorId) + override def unregister(actorRef: ActorRef) = { + pooledActors.remove(actorRef) + super.unregister(actorRef) } def usesActorMailbox = true diff --git a/akka-core/src/main/scala/dispatch/MessageHandling.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala index 9d5c049495..6c9fc0f842 100644 --- a/akka-core/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-core/src/main/scala/dispatch/MessageHandling.scala @@ -60,9 +60,9 @@ trait MessageDispatcher extends Logging { def dispatch(invocation: MessageInvocation) def start def shutdown - def register(actorId: ActorRef) = references.put(actorId.uuid, actorId) - def unregister(actorId: ActorRef) = { - references.remove(actorId.uuid) + def register(actorRef: ActorRef) = references.put(actorRef.uuid, actorRef) + def unregister(actorRef: ActorRef) = { + references.remove(actorRef.uuid) if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero } diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index f977260cf2..38e068b9a1 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.remote -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol} import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef} import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture} import se.scalablesolutions.akka.util.{UUID, Logging} @@ -31,7 +31,7 @@ import scala.collection.mutable.{HashSet, HashMap} * * @author Jonas Bonér */ -object RemoteRequestIdFactory { +object RemoteRequestProtocolIdFactory { private val nodeId = UUID.newUuid private val id = new AtomicLong @@ -64,23 +64,15 @@ private[akka] class RemoteActorProxy private ( val remoteClient = RemoteClient.clientFor(hostname, port) override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { - val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) + val requestBuilder = RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) .setTarget(className) .setTimeout(timeOut) .setUuid(uuid) .setIsActor(true) .setIsOneWay(true) .setIsEscaped(false) - if (senderOption.isDefined) { - val sender = senderOption.get.actor - requestBuilder.setSourceTarget(sender.getClass.getName) - requestBuilder.setSourceUuid(sender.uuid) - val (host, port) = sender._replyToAddress.map(address => - (address.getHostName, address.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) - requestBuilder.setSourceHostname(host) - requestBuilder.setSourcePort(port) - } + senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) RemoteProtocolBuilder.setMessage(message, requestBuilder) remoteClient.send[Any](requestBuilder.build, None) } @@ -89,14 +81,15 @@ private[akka] class RemoteActorProxy private ( message: Any, timeout: Long, senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) + val requestBuilder = RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) .setTarget(className) .setTimeout(timeout) .setUuid(uuid) .setIsActor(true) .setIsOneWay(false) .setIsEscaped(false) + //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) RemoteProtocolBuilder.setMessage(message, requestBuilder) val future = remoteClient.send(requestBuilder.build, senderFuture) if (future.isDefined) future.get @@ -121,14 +114,14 @@ object RemoteClient extends Logging { def actorFor(className: String, hostname: String, port: Int): ActorRef = actorFor(className, className, 5000L, hostname, port) - def actorFor(actorId: String, className: String, hostname: String, port: Int): ActorRef = - actorFor(actorId, className, 5000L, hostname, port) + def actorFor(actorRef: String, className: String, hostname: String, port: Int): ActorRef = + actorFor(actorRef, className, 5000L, hostname, port) def actorFor(className: String, timeout: Long, hostname: String, port: Int): ActorRef = actorFor(className, className, timeout, hostname, port) - def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = - RemoteActorProxy(actorId, className, hostname, port, timeout) + def actorFor(actorRef: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = + RemoteActorProxy(actorRef, className, hostname, port, timeout) def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port)) @@ -237,7 +230,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { } } - def send[T](request: RemoteRequest, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = if (isRunning) { + def send[T](request: RemoteRequestProtocol, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = if (isRunning) { if (request.getIsOneWay) { connection.getChannel.write(request) None @@ -256,17 +249,17 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { throw exception } - def registerSupervisorForActor(actorId: ActorRef) = - if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actorId + " since it is not under supervision") - else supervisors.putIfAbsent(actorId.supervisor.get.uuid, actorId) + def registerSupervisorForActor(actorRef: ActorRef) = + if (!actorRef.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actorRef + " since it is not under supervision") + else supervisors.putIfAbsent(actorRef.supervisor.get.uuid, actorRef) - def deregisterSupervisorForActor(actorId: ActorRef) = - if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actorId + " since it is not under supervision") - else supervisors.remove(actorId.supervisor.get.uuid) + def deregisterSupervisorForActor(actorRef: ActorRef) = + if (!actorRef.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actorRef + " since it is not under supervision") + else supervisors.remove(actorRef.supervisor.get.uuid) - def registerListener(actorId: ActorRef) = listeners.add(actorId) + def registerListener(actorRef: ActorRef) = listeners.add(actorRef) - def deregisterListener(actorId: ActorRef) = listeners.remove(actorId) + def deregisterListener(actorRef: ActorRef) = listeners.remove(actorRef) } /** @@ -283,7 +276,7 @@ class RemoteClientPipelineFactory(name: String, val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT) val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) - val protobufDec = new ProtobufDecoder(RemoteReply.getDefaultInstance) + val protobufDec = new ProtobufDecoder(RemoteReplyProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder val zipCodec = RemoteServer.COMPRESSION_SCHEME match { case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder)) @@ -323,9 +316,9 @@ class RemoteClientHandler(val name: String, override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { try { val result = event.getMessage - if (result.isInstanceOf[RemoteReply]) { - val reply = result.asInstanceOf[RemoteReply] - log.debug("Remote client received RemoteReply[\n%s]", reply.toString) + if (result.isInstanceOf[RemoteReplyProtocol]) { + val reply = result.asInstanceOf[RemoteReplyProtocol] + log.debug("Remote client received RemoteReplyProtocol[\n%s]", reply.toString) val future = futures.get(reply.getId).asInstanceOf[CompletableFuture[Any]] if (reply.getIsSuccessful) { val message = RemoteProtocolBuilder.getMessage(reply) @@ -388,7 +381,7 @@ class RemoteClientHandler(val name: String, event.getChannel.close } - private def parseException(reply: RemoteReply) = { + private def parseException(reply: RemoteReplyProtocol) = { val exception = reply.getException val exceptionType = Class.forName(exception.substring(0, exception.indexOf('$'))) val exceptionMessage = exception.substring(exception.indexOf('$') + 1, exception.length) diff --git a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala index b95ac210f5..b9c446d3d3 100644 --- a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala +++ b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.remote import se.scalablesolutions.akka.serialization.Serializable.SBinary import se.scalablesolutions.akka.serialization.{Serializer, Serializable, SerializationProtocol} -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol} import com.google.protobuf.{Message, ByteString} @@ -23,7 +23,7 @@ object RemoteProtocolBuilder { SERIALIZER_SCALA_JSON.classLoader = Some(cl) } - def getMessage(request: RemoteRequest): Any = { + def getMessage(request: RemoteRequestProtocol): Any = { request.getProtocol match { case SerializationProtocol.JAVA => unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None)) @@ -42,7 +42,7 @@ object RemoteProtocolBuilder { } } - def getMessage(reply: RemoteReply): Any = { + def getMessage(reply: RemoteReplyProtocol): Any = { reply.getProtocol match { case SerializationProtocol.JAVA => unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None)) @@ -61,7 +61,7 @@ object RemoteProtocolBuilder { } } - def setMessage(message: Any, builder: RemoteRequest.Builder) = { + def setMessage(message: Any, builder: RemoteRequestProtocol.Builder) = { if (message.isInstanceOf[Serializable.SBinary[_]]) { val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]] builder.setProtocol(SerializationProtocol.SBINARY) @@ -89,7 +89,7 @@ object RemoteProtocolBuilder { } } - def setMessage(message: Any, builder: RemoteReply.Builder) = { + def setMessage(message: Any, builder: RemoteReplyProtocol.Builder) = { if (message.isInstanceOf[Serializable.SBinary[_]]) { val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]] builder.setProtocol(SerializationProtocol.SBINARY) diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index bfe7e6355a..6703d677c0 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -11,7 +11,7 @@ import java.util.{Map => JMap} import se.scalablesolutions.akka.actor._ import se.scalablesolutions.akka.util._ -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteReply, RemoteRequest} +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol._ import se.scalablesolutions.akka.config.Config.config import org.jboss.netty.bootstrap.ServerBootstrap @@ -199,37 +199,54 @@ class RemoteServer extends Logging { /** * Register Remote Actor by the Actor's 'id' field. */ - def register(actor: ActorRef) = synchronized { + def register(actorRef: ActorRef) = synchronized { if (_isRunning) { - log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, actor.id) - RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor) + log.info("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id) + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actorRef.id, actorRef) } } /** * Register Remote Actor by a specific 'id' passed as argument. + *

+ * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. */ - def register(id: String, actor: ActorRef) = synchronized { + def register(id: String, actorRef: ActorRef) = synchronized { if (_isRunning) { - log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, id) - RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor) + log.info("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id) + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actorRef) } } /** - * Unregister Remote Actor. + * Unregister Remote Actor that is registered using its 'id' field (not custom ID). */ - def unregister(actor: ActorID) = synchronized { + def unregister(actorRef: ActorRef) = synchronized { if (_isRunning) { - log.info("Unregistering server side remote actor [%s] with id [%s]", actor.actorClass.getName, actor.id) + log.info("Unregistering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id) val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) - server.actors.put(actor.id, actor) - if (actor.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actor.uuid) + server.actors.remove(actorRef.id) + if (actorRef.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid) + } + } + + /** + * Unregister Remote Actor by specific 'id'. + *

+ * NOTE: You need to call this method if you have registered an actor by a custom ID. + */ + def unregister(id: String) = synchronized { + if (_isRunning) { + log.info("Unregistering server side remote actor with id [%s]", id) + val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) + val actorRef = server.actors.get(id) + server.actors.remove(id) + if (actorRef.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid) } } } -case class Codec(encoder : ChannelHandler, decoder : ChannelHandler) +case class Codec(encoder: ChannelHandler, decoder: ChannelHandler) /** * @author Jonas Bonér @@ -245,7 +262,7 @@ class RemoteServerPipelineFactory( def getPipeline: ChannelPipeline = { val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) - val protobufDec = new ProtobufDecoder(RemoteRequest.getDefaultInstance) + val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder val zipCodec = RemoteServer.COMPRESSION_SCHEME match { case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder)) @@ -295,50 +312,37 @@ class RemoteServerHandler( override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { val message = event.getMessage if (message eq null) throw new IllegalStateException("Message in remote MessageEvent is null: " + event) - if (message.isInstanceOf[RemoteRequest]) { - handleRemoteRequest(message.asInstanceOf[RemoteRequest], event.getChannel) + if (message.isInstanceOf[RemoteRequestProtocol]) { + handleRemoteRequestProtocol(message.asInstanceOf[RemoteRequestProtocol], event.getChannel) } } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { + event.getCause.printStackTrace log.error(event.getCause, "Unexpected exception from remote downstream") event.getChannel.close } - private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = { - log.debug("Received RemoteRequest[\n%s]", request.toString) + private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = { + log.debug("Received RemoteRequestProtocol[\n%s]", request.toString) if (request.getIsActor) dispatchToActor(request, channel) else dispatchToActiveObject(request, channel) } - private def dispatchToActor(request: RemoteRequest, channel: Channel) = { - log.debug("Dispatching to remote actor [%s]", request.getTarget) - val actorId = createActor(request.getTarget, request.getUuid, request.getTimeout) - actorId.start - + private def dispatchToActor(request: RemoteRequestProtocol, channel: Channel) = { + log.debug("Dispatching to remote actor [%s:%s]", request.getTarget, request.getUuid) + val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout) + actorRef.start val message = RemoteProtocolBuilder.getMessage(request) if (request.getIsOneWay) { - if (request.hasSourceHostname && request.hasSourcePort) { - // re-create the sending actor - val targetClass = if (request.hasSourceTarget) request.getSourceTarget - else request.getTarget - - val remoteActorId = createActor(targetClass, request.getSourceUuid, request.getTimeout) - if (!remoteActorId.isRunning) { - remoteActorId.makeRemote(request.getSourceHostname, request.getSourcePort) - remoteActorId.start - } - actorId.!(message)(Some(remoteActorId)) - } else { - // couldn't find a way to reply, send the message without a source/sender - actorId ! message - } + val sender = request.getSender + if (sender ne null) actorRef.!(message)(Some(ActorRef.fromProtocol(sender))) } else { try { - val resultOrNone = actorId !! message + val resultOrNone = actorRef !! message val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null log.debug("Returning result from actor invocation [%s]", result) - val replyBuilder = RemoteReply.newBuilder + val replyBuilder = RemoteReplyProtocol.newBuilder .setId(request.getId) .setIsSuccessful(true) .setIsActor(true) @@ -349,7 +353,7 @@ class RemoteServerHandler( } catch { case e: Throwable => log.error(e, "Could not invoke remote actor [%s]", request.getTarget) - val replyBuilder = RemoteReply.newBuilder + val replyBuilder = RemoteReplyProtocol.newBuilder .setId(request.getId) .setException(e.getClass.getName + "$" + e.getMessage) .setIsSuccessful(false) @@ -361,7 +365,7 @@ class RemoteServerHandler( } } - private def dispatchToActiveObject(request: RemoteRequest, channel: Channel) = { + private def dispatchToActiveObject(request: RemoteRequestProtocol, channel: Channel) = { log.debug("Dispatching to remote active object [%s :: %s]", request.getMethod, request.getTarget) val activeObject = createActiveObject(request.getTarget, request.getTimeout) @@ -377,7 +381,7 @@ class RemoteServerHandler( else { val result = messageReceiver.invoke(activeObject, unescapedArgs: _*) log.debug("Returning result from remote active object invocation [%s]", result) - val replyBuilder = RemoteReply.newBuilder + val replyBuilder = RemoteReplyProtocol.newBuilder .setId(request.getId) .setIsSuccessful(true) .setIsActor(false) @@ -389,7 +393,7 @@ class RemoteServerHandler( } catch { case e: InvocationTargetException => log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget) - val replyBuilder = RemoteReply.newBuilder + val replyBuilder = RemoteReplyProtocol.newBuilder .setId(request.getId) .setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage) .setIsSuccessful(false) @@ -399,7 +403,7 @@ class RemoteServerHandler( channel.write(replyMessage) case e: Throwable => log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget) - val replyBuilder = RemoteReply.newBuilder + val replyBuilder = RemoteReplyProtocol.newBuilder .setId(request.getId) .setException(e.getClass.getName + "$" + e.getMessage) .setIsSuccessful(false) @@ -454,8 +458,9 @@ class RemoteServerHandler( * Does not start the actor. */ private def createActor(name: String, uuid: String, timeout: Long): ActorRef = { - val actorIdOrNull = actors.get(uuid) - if (actorIdOrNull eq null) { + val actorRefOrNull = actors.get(uuid) + println("----------- ACTOR " + actorRefOrNull + " " + uuid) + if (actorRefOrNull eq null) { try { log.info("Creating a new remote actor [%s:%s]", name, uuid) val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) @@ -464,14 +469,14 @@ class RemoteServerHandler( newInstance._uuid = uuid newInstance.timeout = timeout newInstance._remoteAddress = None - val actorId = new ActorRef(() => newInstance) - actors.put(uuid, actorId) - actorId + val actorRef = new ActorRef(() => newInstance) + actors.put(uuid, actorRef) + actorRef } catch { case e => log.error(e, "Could not create remote actor instance") throw e } - } else actorIdOrNull + } else actorRefOrNull } } diff --git a/akka-core/src/test/scala/FutureSpec.scala b/akka-core/src/test/scala/FutureSpec.scala index 59822d9ea7..4f9da6572f 100644 --- a/akka-core/src/test/scala/FutureSpec.scala +++ b/akka-core/src/test/scala/FutureSpec.scala @@ -20,7 +20,7 @@ object FutureSpec { class FutureSpec extends JUnitSuite { import FutureSpec._ - @Test def shouldActorReplyResultThroughExplicitFuture = { + @Test def shouldActorReplyResultThroughExplicitFuture { val actor = newActor[TestActor] actor.start val future = actor !!! "Hello" @@ -30,7 +30,7 @@ class FutureSpec extends JUnitSuite { actor.stop } - @Test def shouldActorReplyExceptionThroughExplicitFuture = { + @Test def shouldActorReplyExceptionThroughExplicitFuture { val actor = newActor[TestActor] actor.start val future = actor !!! "Failure" diff --git a/akka-core/src/test/scala/MemoryFootprintSpec.scala b/akka-core/src/test/scala/MemoryFootprintSpec.scala deleted file mode 100644 index 9efb8270cb..0000000000 --- a/akka-core/src/test/scala/MemoryFootprintSpec.scala +++ /dev/null @@ -1,36 +0,0 @@ -package se.scalablesolutions.akka.actor - -import org.scalatest.junit.JUnitSuite -import org.junit.Test -import Actor._ - -class MemoryFootprintSpec extends JUnitSuite { - class Mem extends Actor { - def receive = { - case _ => {} - } - } - - val NR_OF_ACTORS = 100000 - val MAX_MEMORY_FOOTPRINT_PER_ACTOR = 700 - - @Test - def actorsShouldHaveLessMemoryFootprintThan700Bytes = { - println("============== MEMORY FOOTPRINT TEST ==============") - // warm up - (1 until 10000).foreach(i => new Mem) - - // Actors are put in AspectRegistry when created so they won't be GCd here - - val totalMem = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory - println("Memory before " + totalMem) - (1 until NR_OF_ACTORS).foreach(i => new Mem) - - val newTotalMem = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory - println("Memory aftor " + newTotalMem) - val memPerActor = (newTotalMem - totalMem) / NR_OF_ACTORS - - println("Memory footprint per actor is : " + memPerActor) - assert(memPerActor < MAX_MEMORY_FOOTPRINT_PER_ACTOR) // memory per actor should be less than 630 bytes - } -} diff --git a/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala b/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala index 6ecefca1e3..51d1e0876f 100644 --- a/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala +++ b/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala @@ -105,7 +105,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { } @Test - def shouldSendRemoteReply = { + def shouldSendRemoteReplyProtocol = { implicit val timeout = 500000000L val actor = RemoteClient.actorFor( "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", diff --git a/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java b/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java index 724978ef12..69a50a4bd2 100644 --- a/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java +++ b/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java @@ -7,28 +7,28 @@ public final class RemoteProtocol { public static void registerAllExtensions( com.google.protobuf.ExtensionRegistry registry) { } - public static final class ActorRef extends + public static final class ActorRefProtocol extends com.google.protobuf.GeneratedMessage { - // Use ActorRef.newBuilder() to construct. - private ActorRef() {} + // Use ActorRefProtocol.newBuilder() to construct. + private ActorRefProtocol() {} - private static final ActorRef defaultInstance = new ActorRef(); - public static ActorRef getDefaultInstance() { + private static final ActorRefProtocol defaultInstance = new ActorRefProtocol(); + public static ActorRefProtocol getDefaultInstance() { return defaultInstance; } - public ActorRef getDefaultInstanceForType() { + public ActorRefProtocol getDefaultInstanceForType() { return defaultInstance; } public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_descriptor; + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_descriptor; } protected com.google.protobuf.GeneratedMessage.FieldAccessorTable internalGetFieldAccessorTable() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_fieldAccessorTable; + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_fieldAccessorTable; } // required string uuid = 1; @@ -126,57 +126,57 @@ public final class RemoteProtocol { return size; } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom(byte[] data) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom(byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom(java.io.InputStream input) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom(java.io.InputStream input) throws java.io.IOException { return newBuilder().mergeFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return newBuilder().mergeFrom(input, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseDelimitedFrom(java.io.InputStream input) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException { return newBuilder().mergeDelimitedFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseDelimitedFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseDelimitedFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return newBuilder().mergeDelimitedFrom(input, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( com.google.protobuf.CodedInputStream input) throws java.io.IOException { return newBuilder().mergeFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { @@ -186,25 +186,25 @@ public final class RemoteProtocol { public static Builder newBuilder() { return Builder.create(); } public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef prototype) { + public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol prototype) { return newBuilder().mergeFrom(prototype); } public Builder toBuilder() { return newBuilder(this); } public static final class Builder extends com.google.protobuf.GeneratedMessage.Builder { - private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef result; + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol result; - // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.newBuilder() + // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.newBuilder() private Builder() {} private static Builder create() { Builder builder = new Builder(); - builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef(); + builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol(); return builder; } - protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef internalGetResult() { + protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol internalGetResult() { return result; } @@ -213,7 +213,7 @@ public final class RemoteProtocol { throw new IllegalStateException( "Cannot call clear() after build()."); } - result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef(); + result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol(); return this; } @@ -223,24 +223,24 @@ public final class RemoteProtocol { public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.getDescriptor(); + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDescriptor(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef getDefaultInstanceForType() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.getDefaultInstance(); + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol getDefaultInstanceForType() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); } public boolean isInitialized() { return result.isInitialized(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef build() { + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol build() { if (result != null && !isInitialized()) { throw newUninitializedMessageException(result); } return buildPartial(); } - private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef buildParsed() + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol buildParsed() throws com.google.protobuf.InvalidProtocolBufferException { if (!isInitialized()) { throw newUninitializedMessageException( @@ -249,27 +249,27 @@ public final class RemoteProtocol { return buildPartial(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef buildPartial() { + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol buildPartial() { if (result == null) { throw new IllegalStateException( "build() has already been called on this Builder."); } - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef returnMe = result; + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol returnMe = result; result = null; return returnMe; } public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef) { - return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef)other); + if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol) { + return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol)other); } else { super.mergeFrom(other); return this; } } - public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef other) { - if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.getDefaultInstance()) return this; + public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol other) { + if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDefaultInstance()) return this; if (other.hasUuid()) { setUuid(other.getUuid()); } @@ -444,28 +444,28 @@ public final class RemoteProtocol { } } - public static final class RemoteRequest extends + public static final class RemoteRequestProtocol extends com.google.protobuf.GeneratedMessage { - // Use RemoteRequest.newBuilder() to construct. - private RemoteRequest() {} + // Use RemoteRequestProtocol.newBuilder() to construct. + private RemoteRequestProtocol() {} - private static final RemoteRequest defaultInstance = new RemoteRequest(); - public static RemoteRequest getDefaultInstance() { + private static final RemoteRequestProtocol defaultInstance = new RemoteRequestProtocol(); + public static RemoteRequestProtocol getDefaultInstance() { return defaultInstance; } - public RemoteRequest getDefaultInstanceForType() { + public RemoteRequestProtocol getDefaultInstanceForType() { return defaultInstance; } public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_descriptor; + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_descriptor; } protected com.google.protobuf.GeneratedMessage.FieldAccessorTable internalGetFieldAccessorTable() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_fieldAccessorTable; + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_fieldAccessorTable; } // required uint64 id = 1; @@ -552,33 +552,12 @@ public final class RemoteProtocol { public boolean hasIsEscaped() { return hasIsEscaped; } public boolean getIsEscaped() { return isEscaped_; } - // optional string sourceHostname = 13; - public static final int SOURCEHOSTNAME_FIELD_NUMBER = 13; - private boolean hasSourceHostname; - private java.lang.String sourceHostname_ = ""; - public boolean hasSourceHostname() { return hasSourceHostname; } - public java.lang.String getSourceHostname() { return sourceHostname_; } - - // optional uint32 sourcePort = 14; - public static final int SOURCEPORT_FIELD_NUMBER = 14; - private boolean hasSourcePort; - private int sourcePort_ = 0; - public boolean hasSourcePort() { return hasSourcePort; } - public int getSourcePort() { return sourcePort_; } - - // optional string sourceTarget = 15; - public static final int SOURCETARGET_FIELD_NUMBER = 15; - private boolean hasSourceTarget; - private java.lang.String sourceTarget_ = ""; - public boolean hasSourceTarget() { return hasSourceTarget; } - public java.lang.String getSourceTarget() { return sourceTarget_; } - - // optional string sourceUuid = 16; - public static final int SOURCEUUID_FIELD_NUMBER = 16; - private boolean hasSourceUuid; - private java.lang.String sourceUuid_ = ""; - public boolean hasSourceUuid() { return hasSourceUuid; } - public java.lang.String getSourceUuid() { return sourceUuid_; } + // optional .se.scalablesolutions.akka.remote.protobuf.ActorRefProtocol sender = 13; + public static final int SENDER_FIELD_NUMBER = 13; + private boolean hasSender; + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol sender_ = se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); + public boolean hasSender() { return hasSender; } + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol getSender() { return sender_; } public final boolean isInitialized() { if (!hasId) return false; @@ -590,6 +569,9 @@ public final class RemoteProtocol { if (!hasIsActor) return false; if (!hasIsOneWay) return false; if (!hasIsEscaped) return false; + if (hasSender()) { + if (!getSender().isInitialized()) return false; + } return true; } @@ -631,17 +613,8 @@ public final class RemoteProtocol { if (hasIsEscaped()) { output.writeBool(12, getIsEscaped()); } - if (hasSourceHostname()) { - output.writeString(13, getSourceHostname()); - } - if (hasSourcePort()) { - output.writeUInt32(14, getSourcePort()); - } - if (hasSourceTarget()) { - output.writeString(15, getSourceTarget()); - } - if (hasSourceUuid()) { - output.writeString(16, getSourceUuid()); + if (hasSender()) { + output.writeMessage(13, getSender()); } getUnknownFields().writeTo(output); } @@ -700,78 +673,66 @@ public final class RemoteProtocol { size += com.google.protobuf.CodedOutputStream .computeBoolSize(12, getIsEscaped()); } - if (hasSourceHostname()) { + if (hasSender()) { size += com.google.protobuf.CodedOutputStream - .computeStringSize(13, getSourceHostname()); - } - if (hasSourcePort()) { - size += com.google.protobuf.CodedOutputStream - .computeUInt32Size(14, getSourcePort()); - } - if (hasSourceTarget()) { - size += com.google.protobuf.CodedOutputStream - .computeStringSize(15, getSourceTarget()); - } - if (hasSourceUuid()) { - size += com.google.protobuf.CodedOutputStream - .computeStringSize(16, getSourceUuid()); + .computeMessageSize(13, getSender()); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom(byte[] data) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom(byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom(java.io.InputStream input) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom(java.io.InputStream input) throws java.io.IOException { return newBuilder().mergeFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return newBuilder().mergeFrom(input, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseDelimitedFrom(java.io.InputStream input) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException { return newBuilder().mergeDelimitedFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseDelimitedFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseDelimitedFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return newBuilder().mergeDelimitedFrom(input, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( com.google.protobuf.CodedInputStream input) throws java.io.IOException { return newBuilder().mergeFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { @@ -781,25 +742,25 @@ public final class RemoteProtocol { public static Builder newBuilder() { return Builder.create(); } public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest prototype) { + public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol prototype) { return newBuilder().mergeFrom(prototype); } public Builder toBuilder() { return newBuilder(this); } public static final class Builder extends com.google.protobuf.GeneratedMessage.Builder { - private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest result; + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol result; - // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.newBuilder() + // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.newBuilder() private Builder() {} private static Builder create() { Builder builder = new Builder(); - builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest(); + builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol(); return builder; } - protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest internalGetResult() { + protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol internalGetResult() { return result; } @@ -808,7 +769,7 @@ public final class RemoteProtocol { throw new IllegalStateException( "Cannot call clear() after build()."); } - result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest(); + result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol(); return this; } @@ -818,24 +779,24 @@ public final class RemoteProtocol { public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.getDescriptor(); + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.getDescriptor(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest getDefaultInstanceForType() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.getDefaultInstance(); + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol getDefaultInstanceForType() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.getDefaultInstance(); } public boolean isInitialized() { return result.isInitialized(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest build() { + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol build() { if (result != null && !isInitialized()) { throw newUninitializedMessageException(result); } return buildPartial(); } - private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest buildParsed() + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol buildParsed() throws com.google.protobuf.InvalidProtocolBufferException { if (!isInitialized()) { throw newUninitializedMessageException( @@ -844,27 +805,27 @@ public final class RemoteProtocol { return buildPartial(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest buildPartial() { + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol buildPartial() { if (result == null) { throw new IllegalStateException( "build() has already been called on this Builder."); } - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest returnMe = result; + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol returnMe = result; result = null; return returnMe; } public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest) { - return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest)other); + if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol) { + return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol)other); } else { super.mergeFrom(other); return this; } } - public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest other) { - if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.getDefaultInstance()) return this; + public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol other) { + if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.getDefaultInstance()) return this; if (other.hasId()) { setId(other.getId()); } @@ -901,17 +862,8 @@ public final class RemoteProtocol { if (other.hasIsEscaped()) { setIsEscaped(other.getIsEscaped()); } - if (other.hasSourceHostname()) { - setSourceHostname(other.getSourceHostname()); - } - if (other.hasSourcePort()) { - setSourcePort(other.getSourcePort()); - } - if (other.hasSourceTarget()) { - setSourceTarget(other.getSourceTarget()); - } - if (other.hasSourceUuid()) { - setSourceUuid(other.getSourceUuid()); + if (other.hasSender()) { + mergeSender(other.getSender()); } this.mergeUnknownFields(other.getUnknownFields()); return this; @@ -987,19 +939,12 @@ public final class RemoteProtocol { break; } case 106: { - setSourceHostname(input.readString()); - break; - } - case 112: { - setSourcePort(input.readUInt32()); - break; - } - case 122: { - setSourceTarget(input.readString()); - break; - } - case 130: { - setSourceUuid(input.readString()); + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.newBuilder(); + if (hasSender()) { + subBuilder.mergeFrom(getSender()); + } + input.readMessage(subBuilder, extensionRegistry); + setSender(subBuilder.buildPartial()); break; } } @@ -1241,84 +1186,40 @@ public final class RemoteProtocol { return this; } - // optional string sourceHostname = 13; - public boolean hasSourceHostname() { - return result.hasSourceHostname(); + // optional .se.scalablesolutions.akka.remote.protobuf.ActorRefProtocol sender = 13; + public boolean hasSender() { + return result.hasSender(); } - public java.lang.String getSourceHostname() { - return result.getSourceHostname(); + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol getSender() { + return result.getSender(); } - public Builder setSourceHostname(java.lang.String value) { + public Builder setSender(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol value) { if (value == null) { - throw new NullPointerException(); - } - result.hasSourceHostname = true; - result.sourceHostname_ = value; + throw new NullPointerException(); + } + result.hasSender = true; + result.sender_ = value; return this; } - public Builder clearSourceHostname() { - result.hasSourceHostname = false; - result.sourceHostname_ = getDefaultInstance().getSourceHostname(); + public Builder setSender(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.Builder builderForValue) { + result.hasSender = true; + result.sender_ = builderForValue.build(); return this; } - - // optional uint32 sourcePort = 14; - public boolean hasSourcePort() { - return result.hasSourcePort(); - } - public int getSourcePort() { - return result.getSourcePort(); - } - public Builder setSourcePort(int value) { - result.hasSourcePort = true; - result.sourcePort_ = value; + public Builder mergeSender(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol value) { + if (result.hasSender() && + result.sender_ != se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDefaultInstance()) { + result.sender_ = + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.newBuilder(result.sender_).mergeFrom(value).buildPartial(); + } else { + result.sender_ = value; + } + result.hasSender = true; return this; } - public Builder clearSourcePort() { - result.hasSourcePort = false; - result.sourcePort_ = 0; - return this; - } - - // optional string sourceTarget = 15; - public boolean hasSourceTarget() { - return result.hasSourceTarget(); - } - public java.lang.String getSourceTarget() { - return result.getSourceTarget(); - } - public Builder setSourceTarget(java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - result.hasSourceTarget = true; - result.sourceTarget_ = value; - return this; - } - public Builder clearSourceTarget() { - result.hasSourceTarget = false; - result.sourceTarget_ = getDefaultInstance().getSourceTarget(); - return this; - } - - // optional string sourceUuid = 16; - public boolean hasSourceUuid() { - return result.hasSourceUuid(); - } - public java.lang.String getSourceUuid() { - return result.getSourceUuid(); - } - public Builder setSourceUuid(java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - result.hasSourceUuid = true; - result.sourceUuid_ = value; - return this; - } - public Builder clearSourceUuid() { - result.hasSourceUuid = false; - result.sourceUuid_ = getDefaultInstance().getSourceUuid(); + public Builder clearSender() { + result.hasSender = false; + result.sender_ = se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); return this; } } @@ -1332,28 +1233,28 @@ public final class RemoteProtocol { } } - public static final class RemoteReply extends + public static final class RemoteReplyProtocol extends com.google.protobuf.GeneratedMessage { - // Use RemoteReply.newBuilder() to construct. - private RemoteReply() {} + // Use RemoteReplyProtocol.newBuilder() to construct. + private RemoteReplyProtocol() {} - private static final RemoteReply defaultInstance = new RemoteReply(); - public static RemoteReply getDefaultInstance() { + private static final RemoteReplyProtocol defaultInstance = new RemoteReplyProtocol(); + public static RemoteReplyProtocol getDefaultInstance() { return defaultInstance; } - public RemoteReply getDefaultInstanceForType() { + public RemoteReplyProtocol getDefaultInstanceForType() { return defaultInstance; } public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_descriptor; + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_descriptor; } protected com.google.protobuf.GeneratedMessage.FieldAccessorTable internalGetFieldAccessorTable() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_fieldAccessorTable; + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_fieldAccessorTable; } // required uint64 id = 1; @@ -1491,57 +1392,57 @@ public final class RemoteProtocol { return size; } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom(byte[] data) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom(byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom(java.io.InputStream input) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom(java.io.InputStream input) throws java.io.IOException { return newBuilder().mergeFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return newBuilder().mergeFrom(input, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseDelimitedFrom(java.io.InputStream input) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException { return newBuilder().mergeDelimitedFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseDelimitedFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseDelimitedFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return newBuilder().mergeDelimitedFrom(input, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( com.google.protobuf.CodedInputStream input) throws java.io.IOException { return newBuilder().mergeFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { @@ -1551,25 +1452,25 @@ public final class RemoteProtocol { public static Builder newBuilder() { return Builder.create(); } public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply prototype) { + public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol prototype) { return newBuilder().mergeFrom(prototype); } public Builder toBuilder() { return newBuilder(this); } public static final class Builder extends com.google.protobuf.GeneratedMessage.Builder { - private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply result; + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol result; - // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.newBuilder() + // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.newBuilder() private Builder() {} private static Builder create() { Builder builder = new Builder(); - builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply(); + builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol(); return builder; } - protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply internalGetResult() { + protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol internalGetResult() { return result; } @@ -1578,7 +1479,7 @@ public final class RemoteProtocol { throw new IllegalStateException( "Cannot call clear() after build()."); } - result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply(); + result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol(); return this; } @@ -1588,24 +1489,24 @@ public final class RemoteProtocol { public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.getDescriptor(); + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.getDescriptor(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply getDefaultInstanceForType() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.getDefaultInstance(); + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol getDefaultInstanceForType() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.getDefaultInstance(); } public boolean isInitialized() { return result.isInitialized(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply build() { + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol build() { if (result != null && !isInitialized()) { throw newUninitializedMessageException(result); } return buildPartial(); } - private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply buildParsed() + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol buildParsed() throws com.google.protobuf.InvalidProtocolBufferException { if (!isInitialized()) { throw newUninitializedMessageException( @@ -1614,27 +1515,27 @@ public final class RemoteProtocol { return buildPartial(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply buildPartial() { + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol buildPartial() { if (result == null) { throw new IllegalStateException( "build() has already been called on this Builder."); } - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply returnMe = result; + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol returnMe = result; result = null; return returnMe; } public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply) { - return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply)other); + if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol) { + return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol)other); } else { super.mergeFrom(other); return this; } } - public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply other) { - if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.getDefaultInstance()) return this; + public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol other) { + if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.getDefaultInstance()) return this; if (other.hasId()) { setId(other.getId()); } @@ -1888,20 +1789,20 @@ public final class RemoteProtocol { } private static com.google.protobuf.Descriptors.Descriptor - internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_descriptor; + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable - internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_fieldAccessorTable; + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_descriptor; + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_fieldAccessorTable; + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_descriptor; + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_fieldAccessorTable; + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -1913,52 +1814,52 @@ public final class RemoteProtocol { java.lang.String[] descriptorData = { "\n>se/scalablesolutions/akka/remote/proto" + "buf/RemoteProtocol.proto\022)se.scalablesol" + - "utions.akka.remote.protobuf\"m\n\010ActorRef\022" + - "\014\n\004uuid\030\001 \002(\t\022\026\n\016actorClassName\030\002 \002(\t\022\026\n" + - "\016sourceHostname\030\003 \002(\t\022\022\n\nsourcePort\030\004 \002(" + - "\r\022\017\n\007timeout\030\005 \002(\004\"\272\002\n\rRemoteRequest\022\n\n\002" + - "id\030\001 \002(\004\022\020\n\010protocol\030\002 \002(\r\022\017\n\007message\030\003 " + - "\002(\014\022\027\n\017messageManifest\030\004 \001(\014\022\016\n\006method\030\005" + - " \001(\t\022\016\n\006target\030\006 \002(\t\022\014\n\004uuid\030\007 \002(\t\022\017\n\007ti" + - "meout\030\010 \002(\004\022\026\n\016supervisorUuid\030\t \001(\t\022\017\n\007i", - "sActor\030\n \002(\010\022\020\n\010isOneWay\030\013 \002(\010\022\021\n\tisEsca" + - "ped\030\014 \002(\010\022\026\n\016sourceHostname\030\r \001(\t\022\022\n\nsou" + - "rcePort\030\016 \001(\r\022\024\n\014sourceTarget\030\017 \001(\t\022\022\n\ns" + - "ourceUuid\030\020 \001(\t\"\247\001\n\013RemoteReply\022\n\n\002id\030\001 " + - "\002(\004\022\020\n\010protocol\030\002 \001(\r\022\017\n\007message\030\003 \001(\014\022\027" + - "\n\017messageManifest\030\004 \001(\014\022\021\n\texception\030\005 \001" + - "(\t\022\026\n\016supervisorUuid\030\006 \001(\t\022\017\n\007isActor\030\007 " + - "\002(\010\022\024\n\014isSuccessful\030\010 \002(\010" + "utions.akka.remote.protobuf\"u\n\020ActorRefP" + + "rotocol\022\014\n\004uuid\030\001 \002(\t\022\026\n\016actorClassName\030" + + "\002 \002(\t\022\026\n\016sourceHostname\030\003 \002(\t\022\022\n\nsourceP" + + "ort\030\004 \002(\r\022\017\n\007timeout\030\005 \002(\004\"\271\002\n\025RemoteReq" + + "uestProtocol\022\n\n\002id\030\001 \002(\004\022\020\n\010protocol\030\002 \002" + + "(\r\022\017\n\007message\030\003 \002(\014\022\027\n\017messageManifest\030\004" + + " \001(\014\022\016\n\006method\030\005 \001(\t\022\016\n\006target\030\006 \002(\t\022\014\n\004" + + "uuid\030\007 \002(\t\022\017\n\007timeout\030\010 \002(\004\022\026\n\016superviso", + "rUuid\030\t \001(\t\022\017\n\007isActor\030\n \002(\010\022\020\n\010isOneWay" + + "\030\013 \002(\010\022\021\n\tisEscaped\030\014 \002(\010\022K\n\006sender\030\r \001(" + + "\0132;.se.scalablesolutions.akka.remote.pro" + + "tobuf.ActorRefProtocol\"\257\001\n\023RemoteReplyPr" + + "otocol\022\n\n\002id\030\001 \002(\004\022\020\n\010protocol\030\002 \001(\r\022\017\n\007" + + "message\030\003 \001(\014\022\027\n\017messageManifest\030\004 \001(\014\022\021" + + "\n\texception\030\005 \001(\t\022\026\n\016supervisorUuid\030\006 \001(" + + "\t\022\017\n\007isActor\030\007 \002(\010\022\024\n\014isSuccessful\030\010 \002(\010" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { public com.google.protobuf.ExtensionRegistry assignDescriptors( com.google.protobuf.Descriptors.FileDescriptor root) { descriptor = root; - internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_descriptor = + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_descriptor = getDescriptor().getMessageTypes().get(0); - internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_fieldAccessorTable = new + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( - internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_descriptor, + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_descriptor, new java.lang.String[] { "Uuid", "ActorClassName", "SourceHostname", "SourcePort", "Timeout", }, - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.class, - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.Builder.class); - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_descriptor = + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.class, + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.Builder.class); + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_descriptor = getDescriptor().getMessageTypes().get(1); - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_fieldAccessorTable = new + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_descriptor, - new java.lang.String[] { "Id", "Protocol", "Message", "MessageManifest", "Method", "Target", "Uuid", "Timeout", "SupervisorUuid", "IsActor", "IsOneWay", "IsEscaped", "SourceHostname", "SourcePort", "SourceTarget", "SourceUuid", }, - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.class, - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.Builder.class); - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_descriptor = + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_descriptor, + new java.lang.String[] { "Id", "Protocol", "Message", "MessageManifest", "Method", "Target", "Uuid", "Timeout", "SupervisorUuid", "IsActor", "IsOneWay", "IsEscaped", "Sender", }, + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.class, + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.Builder.class); + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_descriptor = getDescriptor().getMessageTypes().get(2); - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_fieldAccessorTable = new + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_descriptor, + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_descriptor, new java.lang.String[] { "Id", "Protocol", "Message", "MessageManifest", "Exception", "SupervisorUuid", "IsActor", "IsSuccessful", }, - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.class, - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.Builder.class); + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.class, + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.Builder.class); return null; } }; diff --git a/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto b/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto index c2ccef040d..372691cd8b 100644 --- a/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto +++ b/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto @@ -10,7 +10,7 @@ package se.scalablesolutions.akka.remote.protobuf; protoc se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto --java_out . */ -message ActorRef { +message ActorRefProtocol { required string uuid = 1; required string actorClassName = 2; required string sourceHostname = 3; @@ -18,7 +18,7 @@ message ActorRef { required uint64 timeout = 5; } -message RemoteRequest { +message RemoteRequestProtocol { required uint64 id = 1; required uint32 protocol = 2; required bytes message = 3; @@ -31,13 +31,10 @@ message RemoteRequest { required bool isActor = 10; required bool isOneWay = 11; required bool isEscaped = 12; - optional string sourceHostname = 13; - optional uint32 sourcePort = 14; - optional string sourceTarget = 15; - optional string sourceUuid = 16; + optional ActorRefProtocol sender = 13; } -message RemoteReply { +message RemoteReplyProtocol { required uint64 id = 1; optional uint32 protocol = 2; optional bytes message = 3;