Merge branch 'ActorRefSerialization'
This commit is contained in:
commit
494e443334
21 changed files with 914 additions and 532 deletions
|
|
@ -121,15 +121,15 @@ object Publish {
|
|||
def forConsumer(actor: ActorRef): Option[Publish] =
|
||||
forConsumeAnnotated(actor) orElse forConsumerType(actor)
|
||||
|
||||
private def forConsumeAnnotated(actorId: ActorRef): Option[Publish] = {
|
||||
val annotation = actorId.actorClass.getAnnotation(classOf[consume])
|
||||
private def forConsumeAnnotated(actorRef: ActorRef): Option[Publish] = {
|
||||
val annotation = actorRef.actorClass.getAnnotation(classOf[consume])
|
||||
if (annotation eq null) None
|
||||
else if (actorId.remoteAddress.isDefined) None // do not publish proxies
|
||||
else Some(Publish(annotation.value, actorId.id, false))
|
||||
else if (actorRef.remoteAddress.isDefined) None // do not publish proxies
|
||||
else Some(Publish(annotation.value, actorRef.id, false))
|
||||
}
|
||||
|
||||
private def forConsumerType(actorId: ActorRef): Option[Publish] =
|
||||
if (!actorId.actor.isInstanceOf[Consumer]) None
|
||||
else if (actorId.remoteAddress.isDefined) None
|
||||
else Some(Publish(actorId.actor.asInstanceOf[Consumer].endpointUri, actorId.uuid, true))
|
||||
private def forConsumerType(actorRef: ActorRef): Option[Publish] =
|
||||
if (!actorRef.actor.isInstanceOf[Consumer]) None
|
||||
else if (actorRef.remoteAddress.isDefined) None
|
||||
else Some(Publish(actorRef.actor.asInstanceOf[Consumer].endpointUri, actorRef.uuid, true))
|
||||
}
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -10,7 +10,15 @@ package se.scalablesolutions.akka.remote.protobuf;
|
|||
protoc se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto --java_out .
|
||||
*/
|
||||
|
||||
message RemoteRequest {
|
||||
message ActorRefProtocol {
|
||||
required string uuid = 1;
|
||||
required string actorClassName = 2;
|
||||
required string sourceHostname = 3;
|
||||
required uint32 sourcePort = 4;
|
||||
required uint64 timeout = 5;
|
||||
}
|
||||
|
||||
message RemoteRequestProtocol {
|
||||
required uint64 id = 1;
|
||||
required uint32 protocol = 2;
|
||||
required bytes message = 3;
|
||||
|
|
@ -23,13 +31,10 @@ message RemoteRequest {
|
|||
required bool isActor = 10;
|
||||
required bool isOneWay = 11;
|
||||
required bool isEscaped = 12;
|
||||
optional string sourceHostname = 13;
|
||||
optional uint32 sourcePort = 14;
|
||||
optional string sourceTarget = 15;
|
||||
optional string sourceUuid = 16;
|
||||
optional ActorRefProtocol sender = 13;
|
||||
}
|
||||
|
||||
message RemoteReply {
|
||||
message RemoteReplyProtocol {
|
||||
required uint64 id = 1;
|
||||
optional uint32 protocol = 2;
|
||||
optional bytes message = 3;
|
||||
|
|
|
|||
|
|
@ -5,8 +5,8 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import se.scalablesolutions.akka.config.FaultHandlingStrategy
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
|
||||
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol
|
||||
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestProtocolIdFactory}
|
||||
import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.serialization.Serializer
|
||||
|
|
@ -284,9 +284,9 @@ object ActiveObject {
|
|||
actor.initialize(target, proxy)
|
||||
actor.timeout = timeout
|
||||
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
|
||||
val actorId = new ActorRef(() => actor)
|
||||
AspectInitRegistry.register(proxy, AspectInit(target, actorId, remoteAddress, timeout))
|
||||
actorId.start
|
||||
val actorRef = new ActorRef(() => actor)
|
||||
AspectInitRegistry.register(proxy, AspectInit(target, actorRef, remoteAddress, timeout))
|
||||
actorRef.start
|
||||
proxy.asInstanceOf[T]
|
||||
}
|
||||
|
||||
|
|
@ -295,9 +295,9 @@ object ActiveObject {
|
|||
actor.initialize(target.getClass, target)
|
||||
actor.timeout = timeout
|
||||
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
|
||||
val actorId = new ActorRef(() => actor)
|
||||
AspectInitRegistry.register(proxy, AspectInit(intf, actorId, remoteAddress, timeout))
|
||||
actorId.start
|
||||
val actorRef = new ActorRef(() => actor)
|
||||
AspectInitRegistry.register(proxy, AspectInit(intf, actorRef, remoteAddress, timeout))
|
||||
actorRef.start
|
||||
proxy.asInstanceOf[T]
|
||||
}
|
||||
|
||||
|
|
@ -388,10 +388,10 @@ private[akka] object AspectInitRegistry {
|
|||
|
||||
private[akka] sealed case class AspectInit(
|
||||
val target: Class[_],
|
||||
val actorId: ActorRef,
|
||||
val actorRef: ActorRef,
|
||||
val remoteAddress: Option[InetSocketAddress],
|
||||
val timeout: Long) {
|
||||
def this(target: Class[_], actorId: ActorRef, timeout: Long) = this(target, actorId, None, timeout)
|
||||
def this(target: Class[_], actorRef: ActorRef, timeout: Long) = this(target, actorRef, None, timeout)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -405,7 +405,7 @@ private[akka] sealed case class AspectInit(
|
|||
private[akka] sealed class ActiveObjectAspect {
|
||||
@volatile private var isInitialized = false
|
||||
private var target: Class[_] = _
|
||||
private var actorId: ActorRef = _
|
||||
private var actorRef: ActorRef = _
|
||||
private var remoteAddress: Option[InetSocketAddress] = _
|
||||
private var timeout: Long = _
|
||||
|
||||
|
|
@ -414,7 +414,7 @@ private[akka] sealed class ActiveObjectAspect {
|
|||
if (!isInitialized) {
|
||||
val init = AspectInitRegistry.initFor(joinPoint.getThis)
|
||||
target = init.target
|
||||
actorId = init.actorId
|
||||
actorRef = init.actorRef
|
||||
remoteAddress = init.remoteAddress
|
||||
timeout = init.timeout
|
||||
isInitialized = true
|
||||
|
|
@ -430,10 +430,10 @@ private[akka] sealed class ActiveObjectAspect {
|
|||
private def localDispatch(joinPoint: JoinPoint): AnyRef = {
|
||||
val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
|
||||
if (isOneWay(rtti)) {
|
||||
(actorId ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef]
|
||||
(actorRef ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef]
|
||||
}
|
||||
else {
|
||||
val result = actorId !! (Invocation(joinPoint, false, isVoid(rtti)), timeout)
|
||||
val result = actorRef !! (Invocation(joinPoint, false, isVoid(rtti)), timeout)
|
||||
if (result.isDefined) result.get
|
||||
else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]")
|
||||
}
|
||||
|
|
@ -443,17 +443,17 @@ private[akka] sealed class ActiveObjectAspect {
|
|||
val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
|
||||
val oneWay_? = isOneWay(rtti) || isVoid(rtti)
|
||||
val (message: Array[AnyRef], isEscaped) = escapeArguments(rtti.getParameterValues)
|
||||
val requestBuilder = RemoteRequest.newBuilder
|
||||
.setId(RemoteRequestIdFactory.nextId)
|
||||
val requestBuilder = RemoteRequestProtocol.newBuilder
|
||||
.setId(RemoteRequestProtocolIdFactory.nextId)
|
||||
.setMethod(rtti.getMethod.getName)
|
||||
.setTarget(target.getName)
|
||||
.setUuid(actorId.uuid)
|
||||
.setUuid(actorRef.uuid)
|
||||
.setTimeout(timeout)
|
||||
.setIsActor(false)
|
||||
.setIsOneWay(oneWay_?)
|
||||
.setIsEscaped(false)
|
||||
RemoteProtocolBuilder.setMessage(message, requestBuilder)
|
||||
val id = actorId.actor.registerSupervisorAsRemoteActor
|
||||
val id = actorRef.actor.registerSupervisorAsRemoteActor
|
||||
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
|
||||
val remoteMessage = requestBuilder.build
|
||||
val future = RemoteClient.clientFor(remoteAddress.get).send(remoteMessage, None)
|
||||
|
|
|
|||
|
|
@ -11,10 +11,10 @@ import se.scalablesolutions.akka.config.ScalaConfig._
|
|||
import se.scalablesolutions.akka.stm.Transaction.Global._
|
||||
import se.scalablesolutions.akka.stm.TransactionManagement._
|
||||
import se.scalablesolutions.akka.stm.TransactionManagement
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
|
||||
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteProtocolBuilder, RemoteRequestIdFactory}
|
||||
import se.scalablesolutions.akka.serialization.Serializer
|
||||
import se.scalablesolutions.akka.util.{HashCode, Logging, UUID}
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol
|
||||
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteActorProxy, RemoteProtocolBuilder, RemoteRequestProtocolIdFactory}
|
||||
|
||||
import org.multiverse.api.ThreadLocalTransaction._
|
||||
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
||||
|
|
@ -228,24 +228,54 @@ object Actor extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
/** Starts the specified actor and returns it, useful for:
|
||||
* <pre>val actor = new FooActor
|
||||
/**
|
||||
* Starts the specified actor and returns it, useful for simplifying code such as:
|
||||
* <pre>
|
||||
* val actor = new FooActor
|
||||
* actor.start
|
||||
* //Gets replaced by
|
||||
* </pre>
|
||||
* can be replaced with:
|
||||
* <pre>
|
||||
* import Actor._
|
||||
*
|
||||
* val actor = start(new FooActor)
|
||||
* </pre>
|
||||
* </pre>
|
||||
*/
|
||||
def start[T <: Actor](actor : T) : T = {
|
||||
actor.start
|
||||
actor
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* The ActorRef object can be used to create ActorRef instances out of its binary
|
||||
* protobuf based representation.
|
||||
* <pre>
|
||||
* val actorRef = ActorRef.fromBinary(bytes)
|
||||
* actorRef ! message // send message to remote actor through its reference
|
||||
* </pre>
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object ActorRef {
|
||||
def fromBinary(bytes: Array[Byte]): ActorRef =
|
||||
fromProtocol(RemoteProtocol.ActorRefProtocol.newBuilder.mergeFrom(bytes).build)
|
||||
|
||||
def fromProtocol(protocol: RemoteProtocol.ActorRefProtocol): ActorRef =
|
||||
RemoteActorProxy(
|
||||
protocol.getUuid,
|
||||
protocol.getActorClassName,
|
||||
protocol.getSourceHostname,
|
||||
protocol.getSourcePort,
|
||||
protocol.getTimeout)
|
||||
}
|
||||
|
||||
/**
|
||||
* ActorRef is an immutable and serializable handle to an Actor.
|
||||
* <p/>
|
||||
* Create an ActorRef for an Actor by using the factory method on the Actor object.
|
||||
* Here is an example:
|
||||
* <p/>
|
||||
* Here is an example on how to create an actor with a default constructor.
|
||||
* <pre>
|
||||
* import Actor._
|
||||
*
|
||||
|
|
@ -254,24 +284,23 @@ object Actor extends Logging {
|
|||
* actor ! message
|
||||
* actor.stop
|
||||
* </pre>
|
||||
* Here is an example on how to create an actor with a non-default constructor.
|
||||
* <pre>
|
||||
* import Actor._
|
||||
*
|
||||
* val actor = newActor(() => new MyActor(...))
|
||||
* actor.start
|
||||
* actor ! message
|
||||
* actor.stop
|
||||
* </pre>
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
final class ActorRef private[akka] () {
|
||||
private[akka] var newActorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None)
|
||||
private[akka] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None)
|
||||
|
||||
private[akka] def this(clazz: Class[_ <: Actor]) = {
|
||||
this()
|
||||
newActorFactory = Left(Some(clazz))
|
||||
}
|
||||
|
||||
private[akka] def this(factory: () => Actor) = {
|
||||
this()
|
||||
newActorFactory = Right(Some(factory))
|
||||
}
|
||||
|
||||
private[akka] lazy val actor: Actor = {
|
||||
val actor = newActorFactory match {
|
||||
val actor = actorFactory match {
|
||||
case Left(Some(clazz)) =>
|
||||
try {
|
||||
clazz.newInstance
|
||||
|
|
@ -290,6 +319,39 @@ final class ActorRef private[akka] () {
|
|||
actor
|
||||
}
|
||||
|
||||
private[akka] def this(clazz: Class[_ <: Actor]) = {
|
||||
this()
|
||||
actorFactory = Left(Some(clazz))
|
||||
}
|
||||
|
||||
private[akka] def this(factory: () => Actor) = {
|
||||
this()
|
||||
actorFactory = Right(Some(factory))
|
||||
}
|
||||
|
||||
def toProtocol: RemoteProtocol.ActorRefProtocol = {
|
||||
val (host, port) = actor._replyToAddress.map(address =>
|
||||
(address.getHostName, address.getPort))
|
||||
.getOrElse((Actor.HOSTNAME, Actor.PORT))
|
||||
|
||||
if (!actor._registeredInRemoteNodeDuringSerialization) {
|
||||
Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port)
|
||||
if (RemoteServer.serverFor(host, port).isEmpty) (new RemoteServer).start(host, port)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(uuid, this)
|
||||
actor._registeredInRemoteNodeDuringSerialization = true
|
||||
}
|
||||
|
||||
RemoteProtocol.ActorRefProtocol.newBuilder
|
||||
.setUuid(uuid)
|
||||
.setActorClassName(actorClass.getName)
|
||||
.setSourceHostname(host)
|
||||
.setSourcePort(port)
|
||||
.setTimeout(timeout)
|
||||
.build
|
||||
}
|
||||
|
||||
def toBinary: Array[Byte] = toProtocol.toByteArray
|
||||
|
||||
/**
|
||||
* Returns the class for the Actor instance that is managed by the ActorRef.
|
||||
*/
|
||||
|
|
@ -397,7 +459,7 @@ final class ActorRef private[akka] () {
|
|||
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>reply(..)</code>
|
||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
def !: Option[T] = !
|
||||
def !(implicit sender: Option[ActorRef] = None): Option[T] = !
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously returns a future holding the eventual reply message.
|
||||
|
|
@ -425,7 +487,7 @@ final class ActorRef private[akka] () {
|
|||
if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
|
||||
if (actor.isRunning) {
|
||||
sender.get.actor.replyTo match {
|
||||
case Some(Left(actorID)) => actor.postMessageToMailbox(message, Some(actorID))
|
||||
case Some(Left(actorRef)) => actor.postMessageToMailbox(message, Some(actorRef))
|
||||
case Some(Right(future)) => actor.postMessageToMailboxAndCreateFutureResultWithTimeout(message, actor.timeout, Some(future))
|
||||
case _ => throw new IllegalStateException("Can't forward message when initial sender is not an actor")
|
||||
}
|
||||
|
|
@ -546,6 +608,7 @@ trait Actor extends TransactionManagement with Logging {
|
|||
@volatile private[this] var _isSuspended = true
|
||||
@volatile private[this] var _isShutDown = false
|
||||
@volatile private[akka] var _isKilled = false
|
||||
@volatile private[akka] var _registeredInRemoteNodeDuringSerialization = false
|
||||
private var _hotswap: Option[PartialFunction[Any, Unit]] = None
|
||||
private[akka] var _remoteAddress: Option[InetSocketAddress] = None
|
||||
private[akka] var _linkedActors: Option[HashSet[ActorRef]] = None
|
||||
|
|
@ -755,6 +818,7 @@ trait Actor extends TransactionManagement with Logging {
|
|||
shutdown
|
||||
ActorRegistry.unregister(self)
|
||||
_remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid))
|
||||
RemoteNode.unregister(self)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -849,7 +913,8 @@ trait Actor extends TransactionManagement with Logging {
|
|||
|
||||
|
||||
/**
|
||||
* Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists.
|
||||
* Set the contact address for this actor. This is used for replying to messages sent
|
||||
* asynchronously when no reply channel exists.
|
||||
*/
|
||||
def setReplyToAddress(hostname: String, port: Int): Unit = setReplyToAddress(new InetSocketAddress(hostname, port))
|
||||
|
||||
|
|
@ -879,12 +944,12 @@ trait Actor extends TransactionManagement with Logging {
|
|||
* <p/>
|
||||
* To be invoked from within the actor itself.
|
||||
*/
|
||||
protected[this] def link(actorId: ActorRef) = {
|
||||
if (actorId.supervisor.isDefined) throw new IllegalStateException(
|
||||
"Actor can only have one supervisor [" + actorId + "], e.g. link(actor) fails")
|
||||
getLinkedActors.add(actorId)
|
||||
actorId.supervisor = Some(self)
|
||||
Actor.log.debug("Linking actor [%s] to actor [%s]", actorId, this)
|
||||
protected[this] def link(actorRef: ActorRef) = {
|
||||
if (actorRef.supervisor.isDefined) throw new IllegalStateException(
|
||||
"Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails")
|
||||
getLinkedActors.add(actorRef)
|
||||
actorRef.supervisor = Some(self)
|
||||
Actor.log.debug("Linking actor [%s] to actor [%s]", actorRef, this)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -892,12 +957,12 @@ trait Actor extends TransactionManagement with Logging {
|
|||
* <p/>
|
||||
* To be invoked from within the actor itself.
|
||||
*/
|
||||
protected[this] def unlink(actorId: ActorRef) = {
|
||||
if (!getLinkedActors.contains(actorId)) throw new IllegalStateException(
|
||||
"Actor [" + actorId + "] is not a linked actor, can't unlink")
|
||||
getLinkedActors.remove(actorId)
|
||||
actorId.supervisor = None
|
||||
Actor.log.debug("Unlinking actor [%s] from actor [%s]", actorId, this)
|
||||
protected[this] def unlink(actorRef: ActorRef) = {
|
||||
if (!getLinkedActors.contains(actorRef)) throw new IllegalStateException(
|
||||
"Actor [" + actorRef + "] is not a linked actor, can't unlink")
|
||||
getLinkedActors.remove(actorRef)
|
||||
actorRef.supervisor = None
|
||||
Actor.log.debug("Unlinking actor [%s] from actor [%s]", actorRef, this)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -905,11 +970,11 @@ trait Actor extends TransactionManagement with Logging {
|
|||
* <p/>
|
||||
* To be invoked from within the actor itself.
|
||||
*/
|
||||
protected[this] def startLink(actorId: ActorRef) = {
|
||||
protected[this] def startLink(actorRef: ActorRef) = {
|
||||
try {
|
||||
actorId.start
|
||||
actorRef.start
|
||||
} finally {
|
||||
link(actorId)
|
||||
link(actorRef)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -918,12 +983,12 @@ trait Actor extends TransactionManagement with Logging {
|
|||
* <p/>
|
||||
* To be invoked from within the actor itself.
|
||||
*/
|
||||
protected[this] def startLinkRemote(actorId: ActorRef, hostname: String, port: Int) = {
|
||||
protected[this] def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = {
|
||||
try {
|
||||
actorId.makeRemote(hostname, port)
|
||||
actorId.start
|
||||
actorRef.makeRemote(hostname, port)
|
||||
actorRef.start
|
||||
} finally {
|
||||
link(actorId)
|
||||
link(actorRef)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -933,9 +998,9 @@ trait Actor extends TransactionManagement with Logging {
|
|||
* To be invoked from within the actor itself.
|
||||
*/
|
||||
protected[this] def spawn[T <: Actor : Manifest]: ActorRef = {
|
||||
val actorId = spawnButDoNotStart[T]
|
||||
actorId.start
|
||||
actorId
|
||||
val actorRef = spawnButDoNotStart[T]
|
||||
actorRef.start
|
||||
actorRef
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -1007,15 +1072,15 @@ trait Actor extends TransactionManagement with Logging {
|
|||
new ActorRef(() => actor)
|
||||
}
|
||||
|
||||
protected[akka] def postMessageToMailbox(message: Any, sender: Option[ActorRef]): Unit = {
|
||||
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
|
||||
joinTransaction(message)
|
||||
|
||||
if (_remoteAddress.isDefined) {
|
||||
val requestBuilder = RemoteRequest.newBuilder
|
||||
.setId(RemoteRequestIdFactory.nextId)
|
||||
val requestBuilder = RemoteProtocol.RemoteRequestProtocol.newBuilder
|
||||
.setId(RemoteRequestProtocolIdFactory.nextId)
|
||||
.setTarget(this.getClass.getName)
|
||||
.setTimeout(this.timeout)
|
||||
.setUuid(this.id)
|
||||
.setUuid(this.uuid)
|
||||
.setIsActor(true)
|
||||
.setIsOneWay(true)
|
||||
.setIsEscaped(false)
|
||||
|
|
@ -1023,30 +1088,12 @@ trait Actor extends TransactionManagement with Logging {
|
|||
val id = registerSupervisorAsRemoteActor
|
||||
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
|
||||
|
||||
// set the source fields used to reply back to the original sender
|
||||
// (i.e. not the remote proxy actor)
|
||||
if (sender.isDefined) {
|
||||
val s = sender.get.actor
|
||||
requestBuilder.setSourceTarget(s.getClass.getName)
|
||||
requestBuilder.setSourceUuid(s.uuid)
|
||||
senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
|
||||
|
||||
val (host, port) = s._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT))
|
||||
|
||||
Actor.log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port)
|
||||
|
||||
requestBuilder.setSourceHostname(host)
|
||||
requestBuilder.setSourcePort(port)
|
||||
|
||||
if (RemoteServer.serverFor(host, port).isEmpty) {
|
||||
val server = new RemoteServer
|
||||
server.start(host, port)
|
||||
}
|
||||
RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(sender.get.id, sender.get)
|
||||
}
|
||||
RemoteProtocolBuilder.setMessage(message, requestBuilder)
|
||||
RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None)
|
||||
} else {
|
||||
val invocation = new MessageInvocation(self, message, sender.map(Left(_)), transactionSet.get)
|
||||
val invocation = new MessageInvocation(self, message, senderOption.map(Left(_)), transactionSet.get)
|
||||
if (messageDispatcher.usesActorMailbox) {
|
||||
_mailbox.add(invocation)
|
||||
if (_isSuspended) invocation.send
|
||||
|
|
@ -1062,28 +1109,30 @@ trait Actor extends TransactionManagement with Logging {
|
|||
joinTransaction(message)
|
||||
|
||||
if (_remoteAddress.isDefined) {
|
||||
val requestBuilder = RemoteRequest.newBuilder
|
||||
.setId(RemoteRequestIdFactory.nextId)
|
||||
val requestBuilder = RemoteProtocol.RemoteRequestProtocol.newBuilder
|
||||
.setId(RemoteRequestProtocolIdFactory.nextId)
|
||||
.setTarget(this.getClass.getName)
|
||||
.setTimeout(this.timeout)
|
||||
.setUuid(this.id)
|
||||
.setUuid(this.uuid)
|
||||
.setIsActor(true)
|
||||
.setIsOneWay(false)
|
||||
.setIsEscaped(false)
|
||||
|
||||
//senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
|
||||
RemoteProtocolBuilder.setMessage(message, requestBuilder)
|
||||
|
||||
val id = registerSupervisorAsRemoteActor
|
||||
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
|
||||
|
||||
val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture)
|
||||
if (future.isDefined) future.get
|
||||
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
|
||||
} else {
|
||||
val future = if (senderFuture.isDefined) senderFuture.get
|
||||
else new DefaultCompletableFuture[T](timeout)
|
||||
val invocation = new MessageInvocation(self, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get)
|
||||
|
||||
if (messageDispatcher.usesActorMailbox)
|
||||
_mailbox.add(invocation)
|
||||
|
||||
val invocation = new MessageInvocation(
|
||||
self, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get)
|
||||
if (messageDispatcher.usesActorMailbox) _mailbox.add(invocation)
|
||||
invocation.send
|
||||
future
|
||||
}
|
||||
|
|
@ -1217,8 +1266,8 @@ trait Actor extends TransactionManagement with Logging {
|
|||
|
||||
private[this] def restartLinkedActors(reason: Throwable) = {
|
||||
getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach {
|
||||
actorId =>
|
||||
val actor = actorId.actor
|
||||
actorRef =>
|
||||
val actor = actorRef.actor
|
||||
if (actor.lifeCycle.isEmpty) actor.lifeCycle = Some(LifeCycle(Permanent))
|
||||
actor.lifeCycle.get match {
|
||||
case LifeCycle(scope, _) => {
|
||||
|
|
@ -1228,7 +1277,7 @@ trait Actor extends TransactionManagement with Logging {
|
|||
case Temporary =>
|
||||
Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", actor.id)
|
||||
actor.stop
|
||||
getLinkedActors.remove(actorId) // remove the temporary actor
|
||||
getLinkedActors.remove(actorRef) // remove the temporary actor
|
||||
// if last temporary actor is gone, then unlink me from supervisor
|
||||
if (getLinkedActors.isEmpty) {
|
||||
Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)" +
|
||||
|
|
@ -1314,7 +1363,7 @@ object DispatcherType {
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class ActorMessageInvoker private[akka] (val actorId: ActorRef) extends MessageInvoker {
|
||||
def invoke(handle: MessageInvocation) = actorId.actor.invoke(handle)
|
||||
class ActorMessageInvoker private[akka] (val actorRef: ActorRef) extends MessageInvoker {
|
||||
def invoke(handle: MessageInvocation) = actorRef.actor.invoke(handle)
|
||||
}
|
||||
|
||||
|
|
|
|||
0
akka-core/src/main/scala/actor/ActorIdProtobufSpec.proto
Normal file
0
akka-core/src/main/scala/actor/ActorIdProtobufSpec.proto
Normal file
|
|
@ -57,9 +57,9 @@ object ActorRegistry extends Logging {
|
|||
val all = new ListBuffer[ActorRef]
|
||||
val elements = actorsByUUID.elements
|
||||
while (elements.hasMoreElements) {
|
||||
val actorId = elements.nextElement
|
||||
if (manifest.erasure.isAssignableFrom(actorId.actor.getClass)) {
|
||||
all += actorId
|
||||
val actorRef = elements.nextElement
|
||||
if (manifest.erasure.isAssignableFrom(actorRef.actor.getClass)) {
|
||||
all += actorRef
|
||||
}
|
||||
}
|
||||
all.toList
|
||||
|
|
@ -92,24 +92,24 @@ object ActorRegistry extends Logging {
|
|||
/**
|
||||
* Registers an actor in the ActorRegistry.
|
||||
*/
|
||||
def register(actorId: ActorRef) = {
|
||||
def register(actorRef: ActorRef) = {
|
||||
// UUID
|
||||
actorsByUUID.put(actorId.uuid, actorId)
|
||||
actorsByUUID.put(actorRef.uuid, actorRef)
|
||||
|
||||
// ID
|
||||
val id = actorId.id
|
||||
if (id eq null) throw new IllegalStateException("Actor.id is null " + actorId)
|
||||
if (actorsById.containsKey(id)) actorsById.put(id, actorId :: actorsById.get(id))
|
||||
else actorsById.put(id, actorId :: Nil)
|
||||
val id = actorRef.id
|
||||
if (id eq null) throw new IllegalStateException("Actor.id is null " + actorRef)
|
||||
if (actorsById.containsKey(id)) actorsById.put(id, actorRef :: actorsById.get(id))
|
||||
else actorsById.put(id, actorRef :: Nil)
|
||||
|
||||
// Class name
|
||||
val className = actorId.actor.getClass.getName
|
||||
val className = actorRef.actor.getClass.getName
|
||||
if (actorsByClassName.containsKey(className)) {
|
||||
actorsByClassName.put(className, actorId :: actorsByClassName.get(className))
|
||||
} else actorsByClassName.put(className, actorId :: Nil)
|
||||
actorsByClassName.put(className, actorRef :: actorsByClassName.get(className))
|
||||
} else actorsByClassName.put(className, actorRef :: Nil)
|
||||
|
||||
// notify listeners
|
||||
foreachListener(_ ! ActorRegistered(actorId))
|
||||
foreachListener(_ ! ActorRegistered(actorRef))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -58,9 +58,9 @@ object Scheduler extends Actor {
|
|||
|
||||
def restart = service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
|
||||
|
||||
def stopSupervising(actorId: ActorRef) = {
|
||||
unlink(actorId)
|
||||
schedulers.remove(actorId)
|
||||
def stopSupervising(actorRef: ActorRef) = {
|
||||
unlink(actorRef)
|
||||
schedulers.remove(actorRef)
|
||||
}
|
||||
|
||||
override def shutdown = {
|
||||
|
|
|
|||
|
|
@ -103,9 +103,9 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
|
|||
|
||||
override def stop = synchronized {
|
||||
super[Actor].stop
|
||||
getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorId =>
|
||||
actorId.stop
|
||||
log.info("Shutting actor down: %s", actorId)
|
||||
getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorRef =>
|
||||
actorRef.stop
|
||||
log.info("Shutting actor down: %s", actorRef)
|
||||
}
|
||||
log.info("Stopping supervisor: %s", this)
|
||||
}
|
||||
|
|
@ -119,19 +119,19 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
|
|||
case SupervisorConfig(_, servers) =>
|
||||
servers.map(server =>
|
||||
server match {
|
||||
case Supervise(actorId, lifeCycle, remoteAddress) =>
|
||||
val className = actorId.actor.getClass.getName
|
||||
case Supervise(actorRef, lifeCycle, remoteAddress) =>
|
||||
val className = actorRef.actor.getClass.getName
|
||||
val currentActors = {
|
||||
val list = actors.get(className)
|
||||
if (list eq null) List[ActorRef]()
|
||||
else list
|
||||
}
|
||||
actors.put(className, actorId :: currentActors)
|
||||
actorId.actor.lifeCycle = Some(lifeCycle)
|
||||
startLink(actorId)
|
||||
actors.put(className, actorRef :: currentActors)
|
||||
actorRef.actor.lifeCycle = Some(lifeCycle)
|
||||
startLink(actorRef)
|
||||
remoteAddress.foreach(address => RemoteServer.actorsFor(
|
||||
RemoteServer.Address(address.hostname, address.port))
|
||||
.actors.put(actorId.id, actorId))
|
||||
.actors.put(actorRef.id, actorRef))
|
||||
|
||||
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
|
||||
val supervisor = {
|
||||
|
|
|
|||
|
|
@ -25,13 +25,13 @@ object ScalaConfig {
|
|||
|
||||
case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server
|
||||
|
||||
class Supervise(val actorId: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server {
|
||||
class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server {
|
||||
val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress)
|
||||
}
|
||||
object Supervise {
|
||||
def apply(actorId: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorId, lifeCycle, remoteAddress)
|
||||
def apply(actorId: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorId, lifeCycle, null)
|
||||
def unapply(supervise: Supervise) = Some((supervise.actorId, supervise.lifeCycle, supervise.remoteAddress))
|
||||
def apply(actorRef: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorRef, lifeCycle, remoteAddress)
|
||||
def apply(actorRef: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, null)
|
||||
def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.remoteAddress))
|
||||
}
|
||||
|
||||
case class RestartStrategy(
|
||||
|
|
@ -227,8 +227,8 @@ object JavaConfig {
|
|||
intf, target, lifeCycle.transform, timeout, transactionRequired, dispatcher,
|
||||
if (remoteAddress ne null) se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress(remoteAddress.hostname, remoteAddress.port) else null)
|
||||
|
||||
def newSupervised(actorId: ActorRef) =
|
||||
se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorId, lifeCycle.transform)
|
||||
def newSupervised(actorRef: ActorRef) =
|
||||
se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorRef, lifeCycle.transform)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -18,14 +18,14 @@ abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) exten
|
|||
|
||||
def dispatch(invocation: MessageInvocation) = queue.append(invocation)
|
||||
|
||||
override def register(actorId: ActorRef) = synchronized {
|
||||
messageInvokers.put(actorId, new ActorMessageInvoker(actorId))
|
||||
super.register(actorId)
|
||||
override def register(actorRef: ActorRef) = synchronized {
|
||||
messageInvokers.put(actorRef, new ActorMessageInvoker(actorRef))
|
||||
super.register(actorRef)
|
||||
}
|
||||
|
||||
override def unregister(actorId: ActorRef) = synchronized {
|
||||
messageInvokers.remove(actorId)
|
||||
super.unregister(actorId)
|
||||
override def unregister(actorRef: ActorRef) = synchronized {
|
||||
messageInvokers.remove(actorRef)
|
||||
super.unregister(actorRef)
|
||||
}
|
||||
|
||||
def shutdown = if (active) {
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
|||
class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends MessageDispatcher with ThreadPoolBuilder {
|
||||
@volatile private var active: Boolean = false
|
||||
|
||||
implicit def actorId2actor(actorId: ActorRef): Actor = actorId.actor
|
||||
implicit def actorRef2actor(actorRef: ActorRef): Actor = actorRef.actor
|
||||
|
||||
/** Type of the actors registered in this dispatcher. */
|
||||
private var actorType:Option[Class[_]] = None
|
||||
|
|
@ -193,15 +193,15 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
|
|||
|
||||
private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
|
||||
|
||||
override def register(actorId: ActorRef) = {
|
||||
verifyActorsAreOfSameType(actorId)
|
||||
pooledActors.add(actorId)
|
||||
super.register(actorId)
|
||||
override def register(actorRef: ActorRef) = {
|
||||
verifyActorsAreOfSameType(actorRef)
|
||||
pooledActors.add(actorRef)
|
||||
super.register(actorRef)
|
||||
}
|
||||
|
||||
override def unregister(actorId: ActorRef) = {
|
||||
pooledActors.remove(actorId)
|
||||
super.unregister(actorId)
|
||||
override def unregister(actorRef: ActorRef) = {
|
||||
pooledActors.remove(actorRef)
|
||||
super.unregister(actorRef)
|
||||
}
|
||||
|
||||
def usesActorMailbox = true
|
||||
|
|
|
|||
|
|
@ -60,9 +60,9 @@ trait MessageDispatcher extends Logging {
|
|||
def dispatch(invocation: MessageInvocation)
|
||||
def start
|
||||
def shutdown
|
||||
def register(actorId: ActorRef) = references.put(actorId.uuid, actorId)
|
||||
def unregister(actorId: ActorRef) = {
|
||||
references.remove(actorId.uuid)
|
||||
def register(actorRef: ActorRef) = references.put(actorRef.uuid, actorRef)
|
||||
def unregister(actorRef: ActorRef) = {
|
||||
references.remove(actorRef.uuid)
|
||||
if (canBeShutDown)
|
||||
shutdown // shut down in the dispatcher's references is zero
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
package se.scalablesolutions.akka.remote
|
||||
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol}
|
||||
import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef}
|
||||
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
|
||||
import se.scalablesolutions.akka.util.{UUID, Logging}
|
||||
|
|
@ -31,7 +31,7 @@ import scala.collection.mutable.{HashSet, HashMap}
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object RemoteRequestIdFactory {
|
||||
object RemoteRequestProtocolIdFactory {
|
||||
private val nodeId = UUID.newUuid
|
||||
private val id = new AtomicLong
|
||||
|
||||
|
|
@ -64,23 +64,15 @@ private[akka] class RemoteActorProxy private (
|
|||
val remoteClient = RemoteClient.clientFor(hostname, port)
|
||||
|
||||
override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
|
||||
val requestBuilder = RemoteRequest.newBuilder
|
||||
.setId(RemoteRequestIdFactory.nextId)
|
||||
val requestBuilder = RemoteRequestProtocol.newBuilder
|
||||
.setId(RemoteRequestProtocolIdFactory.nextId)
|
||||
.setTarget(className)
|
||||
.setTimeout(timeOut)
|
||||
.setUuid(uuid)
|
||||
.setIsActor(true)
|
||||
.setIsOneWay(true)
|
||||
.setIsEscaped(false)
|
||||
if (senderOption.isDefined) {
|
||||
val sender = senderOption.get.actor
|
||||
requestBuilder.setSourceTarget(sender.getClass.getName)
|
||||
requestBuilder.setSourceUuid(sender.uuid)
|
||||
val (host, port) = sender._replyToAddress.map(address =>
|
||||
(address.getHostName, address.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT))
|
||||
requestBuilder.setSourceHostname(host)
|
||||
requestBuilder.setSourcePort(port)
|
||||
}
|
||||
senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
|
||||
RemoteProtocolBuilder.setMessage(message, requestBuilder)
|
||||
remoteClient.send[Any](requestBuilder.build, None)
|
||||
}
|
||||
|
|
@ -89,14 +81,15 @@ private[akka] class RemoteActorProxy private (
|
|||
message: Any,
|
||||
timeout: Long,
|
||||
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
|
||||
val requestBuilder = RemoteRequest.newBuilder
|
||||
.setId(RemoteRequestIdFactory.nextId)
|
||||
val requestBuilder = RemoteRequestProtocol.newBuilder
|
||||
.setId(RemoteRequestProtocolIdFactory.nextId)
|
||||
.setTarget(className)
|
||||
.setTimeout(timeout)
|
||||
.setUuid(uuid)
|
||||
.setIsActor(true)
|
||||
.setIsOneWay(false)
|
||||
.setIsEscaped(false)
|
||||
//senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
|
||||
RemoteProtocolBuilder.setMessage(message, requestBuilder)
|
||||
val future = remoteClient.send(requestBuilder.build, senderFuture)
|
||||
if (future.isDefined) future.get
|
||||
|
|
@ -121,14 +114,14 @@ object RemoteClient extends Logging {
|
|||
def actorFor(className: String, hostname: String, port: Int): ActorRef =
|
||||
actorFor(className, className, 5000L, hostname, port)
|
||||
|
||||
def actorFor(actorId: String, className: String, hostname: String, port: Int): ActorRef =
|
||||
actorFor(actorId, className, 5000L, hostname, port)
|
||||
def actorFor(actorRef: String, className: String, hostname: String, port: Int): ActorRef =
|
||||
actorFor(actorRef, className, 5000L, hostname, port)
|
||||
|
||||
def actorFor(className: String, timeout: Long, hostname: String, port: Int): ActorRef =
|
||||
actorFor(className, className, timeout, hostname, port)
|
||||
|
||||
def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef =
|
||||
RemoteActorProxy(actorId, className, hostname, port, timeout)
|
||||
def actorFor(actorRef: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef =
|
||||
RemoteActorProxy(actorRef, className, hostname, port, timeout)
|
||||
|
||||
def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port))
|
||||
|
||||
|
|
@ -237,7 +230,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
def send[T](request: RemoteRequest, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = if (isRunning) {
|
||||
def send[T](request: RemoteRequestProtocol, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = if (isRunning) {
|
||||
if (request.getIsOneWay) {
|
||||
connection.getChannel.write(request)
|
||||
None
|
||||
|
|
@ -256,17 +249,17 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
|
|||
throw exception
|
||||
}
|
||||
|
||||
def registerSupervisorForActor(actorId: ActorRef) =
|
||||
if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actorId + " since it is not under supervision")
|
||||
else supervisors.putIfAbsent(actorId.supervisor.get.uuid, actorId)
|
||||
def registerSupervisorForActor(actorRef: ActorRef) =
|
||||
if (!actorRef.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actorRef + " since it is not under supervision")
|
||||
else supervisors.putIfAbsent(actorRef.supervisor.get.uuid, actorRef)
|
||||
|
||||
def deregisterSupervisorForActor(actorId: ActorRef) =
|
||||
if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actorId + " since it is not under supervision")
|
||||
else supervisors.remove(actorId.supervisor.get.uuid)
|
||||
def deregisterSupervisorForActor(actorRef: ActorRef) =
|
||||
if (!actorRef.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actorRef + " since it is not under supervision")
|
||||
else supervisors.remove(actorRef.supervisor.get.uuid)
|
||||
|
||||
def registerListener(actorId: ActorRef) = listeners.add(actorId)
|
||||
def registerListener(actorRef: ActorRef) = listeners.add(actorRef)
|
||||
|
||||
def deregisterListener(actorId: ActorRef) = listeners.remove(actorId)
|
||||
def deregisterListener(actorRef: ActorRef) = listeners.remove(actorRef)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -283,7 +276,7 @@ class RemoteClientPipelineFactory(name: String,
|
|||
val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT)
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(RemoteReply.getDefaultInstance)
|
||||
val protobufDec = new ProtobufDecoder(RemoteReplyProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
val zipCodec = RemoteServer.COMPRESSION_SCHEME match {
|
||||
case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder))
|
||||
|
|
@ -323,9 +316,9 @@ class RemoteClientHandler(val name: String,
|
|||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
|
||||
try {
|
||||
val result = event.getMessage
|
||||
if (result.isInstanceOf[RemoteReply]) {
|
||||
val reply = result.asInstanceOf[RemoteReply]
|
||||
log.debug("Remote client received RemoteReply[\n%s]", reply.toString)
|
||||
if (result.isInstanceOf[RemoteReplyProtocol]) {
|
||||
val reply = result.asInstanceOf[RemoteReplyProtocol]
|
||||
log.debug("Remote client received RemoteReplyProtocol[\n%s]", reply.toString)
|
||||
val future = futures.get(reply.getId).asInstanceOf[CompletableFuture[Any]]
|
||||
if (reply.getIsSuccessful) {
|
||||
val message = RemoteProtocolBuilder.getMessage(reply)
|
||||
|
|
@ -388,7 +381,7 @@ class RemoteClientHandler(val name: String,
|
|||
event.getChannel.close
|
||||
}
|
||||
|
||||
private def parseException(reply: RemoteReply) = {
|
||||
private def parseException(reply: RemoteReplyProtocol) = {
|
||||
val exception = reply.getException
|
||||
val exceptionType = Class.forName(exception.substring(0, exception.indexOf('$')))
|
||||
val exceptionMessage = exception.substring(exception.indexOf('$') + 1, exception.length)
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ package se.scalablesolutions.akka.remote
|
|||
|
||||
import se.scalablesolutions.akka.serialization.Serializable.SBinary
|
||||
import se.scalablesolutions.akka.serialization.{Serializer, Serializable, SerializationProtocol}
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol}
|
||||
|
||||
import com.google.protobuf.{Message, ByteString}
|
||||
|
||||
|
|
@ -23,7 +23,7 @@ object RemoteProtocolBuilder {
|
|||
SERIALIZER_SCALA_JSON.classLoader = Some(cl)
|
||||
}
|
||||
|
||||
def getMessage(request: RemoteRequest): Any = {
|
||||
def getMessage(request: RemoteRequestProtocol): Any = {
|
||||
request.getProtocol match {
|
||||
case SerializationProtocol.JAVA =>
|
||||
unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None))
|
||||
|
|
@ -42,7 +42,7 @@ object RemoteProtocolBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
def getMessage(reply: RemoteReply): Any = {
|
||||
def getMessage(reply: RemoteReplyProtocol): Any = {
|
||||
reply.getProtocol match {
|
||||
case SerializationProtocol.JAVA =>
|
||||
unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None))
|
||||
|
|
@ -61,7 +61,7 @@ object RemoteProtocolBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
def setMessage(message: Any, builder: RemoteRequest.Builder) = {
|
||||
def setMessage(message: Any, builder: RemoteRequestProtocol.Builder) = {
|
||||
if (message.isInstanceOf[Serializable.SBinary[_]]) {
|
||||
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
|
||||
builder.setProtocol(SerializationProtocol.SBINARY)
|
||||
|
|
@ -89,7 +89,7 @@ object RemoteProtocolBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
def setMessage(message: Any, builder: RemoteReply.Builder) = {
|
||||
def setMessage(message: Any, builder: RemoteReplyProtocol.Builder) = {
|
||||
if (message.isInstanceOf[Serializable.SBinary[_]]) {
|
||||
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
|
||||
builder.setProtocol(SerializationProtocol.SBINARY)
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import java.util.{Map => JMap}
|
|||
|
||||
import se.scalablesolutions.akka.actor._
|
||||
import se.scalablesolutions.akka.util._
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteReply, RemoteRequest}
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol._
|
||||
import se.scalablesolutions.akka.config.Config.config
|
||||
|
||||
import org.jboss.netty.bootstrap.ServerBootstrap
|
||||
|
|
@ -199,25 +199,54 @@ class RemoteServer extends Logging {
|
|||
/**
|
||||
* Register Remote Actor by the Actor's 'id' field.
|
||||
*/
|
||||
def register(actor: ActorRef) = synchronized {
|
||||
def register(actorRef: ActorRef) = synchronized {
|
||||
if (_isRunning) {
|
||||
log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, actor.id)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor)
|
||||
log.info("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actorRef.id, actorRef)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register Remote Actor by a specific 'id' passed as argument.
|
||||
* <p/>
|
||||
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
|
||||
*/
|
||||
def register(id: String, actor: ActorRef) = synchronized {
|
||||
def register(id: String, actorRef: ActorRef) = synchronized {
|
||||
if (_isRunning) {
|
||||
log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, id)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor)
|
||||
log.info("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actorRef)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister Remote Actor that is registered using its 'id' field (not custom ID).
|
||||
*/
|
||||
def unregister(actorRef: ActorRef) = synchronized {
|
||||
if (_isRunning) {
|
||||
log.info("Unregistering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id)
|
||||
val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
|
||||
server.actors.remove(actorRef.id)
|
||||
if (actorRef.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister Remote Actor by specific 'id'.
|
||||
* <p/>
|
||||
* NOTE: You need to call this method if you have registered an actor by a custom ID.
|
||||
*/
|
||||
def unregister(id: String) = synchronized {
|
||||
if (_isRunning) {
|
||||
log.info("Unregistering server side remote actor with id [%s]", id)
|
||||
val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
|
||||
val actorRef = server.actors.get(id)
|
||||
server.actors.remove(id)
|
||||
if (actorRef.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case class Codec(encoder : ChannelHandler, decoder : ChannelHandler)
|
||||
case class Codec(encoder: ChannelHandler, decoder: ChannelHandler)
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
|
|
@ -233,7 +262,7 @@ class RemoteServerPipelineFactory(
|
|||
def getPipeline: ChannelPipeline = {
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(RemoteRequest.getDefaultInstance)
|
||||
val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
val zipCodec = RemoteServer.COMPRESSION_SCHEME match {
|
||||
case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder))
|
||||
|
|
@ -283,50 +312,37 @@ class RemoteServerHandler(
|
|||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
|
||||
val message = event.getMessage
|
||||
if (message eq null) throw new IllegalStateException("Message in remote MessageEvent is null: " + event)
|
||||
if (message.isInstanceOf[RemoteRequest]) {
|
||||
handleRemoteRequest(message.asInstanceOf[RemoteRequest], event.getChannel)
|
||||
if (message.isInstanceOf[RemoteRequestProtocol]) {
|
||||
handleRemoteRequestProtocol(message.asInstanceOf[RemoteRequestProtocol], event.getChannel)
|
||||
}
|
||||
}
|
||||
|
||||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
event.getCause.printStackTrace
|
||||
log.error(event.getCause, "Unexpected exception from remote downstream")
|
||||
event.getChannel.close
|
||||
}
|
||||
|
||||
private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = {
|
||||
log.debug("Received RemoteRequest[\n%s]", request.toString)
|
||||
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
|
||||
log.debug("Received RemoteRequestProtocol[\n%s]", request.toString)
|
||||
if (request.getIsActor) dispatchToActor(request, channel)
|
||||
else dispatchToActiveObject(request, channel)
|
||||
}
|
||||
|
||||
private def dispatchToActor(request: RemoteRequest, channel: Channel) = {
|
||||
log.debug("Dispatching to remote actor [%s]", request.getTarget)
|
||||
val actorId = createActor(request.getTarget, request.getUuid, request.getTimeout)
|
||||
actorId.start
|
||||
|
||||
private def dispatchToActor(request: RemoteRequestProtocol, channel: Channel) = {
|
||||
log.debug("Dispatching to remote actor [%s:%s]", request.getTarget, request.getUuid)
|
||||
val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout)
|
||||
actorRef.start
|
||||
val message = RemoteProtocolBuilder.getMessage(request)
|
||||
if (request.getIsOneWay) {
|
||||
if (request.hasSourceHostname && request.hasSourcePort) {
|
||||
// re-create the sending actor
|
||||
val targetClass = if (request.hasSourceTarget) request.getSourceTarget
|
||||
else request.getTarget
|
||||
|
||||
val remoteActorId = createActor(targetClass, request.getSourceUuid, request.getTimeout)
|
||||
if (!remoteActorId.isRunning) {
|
||||
remoteActorId.makeRemote(request.getSourceHostname, request.getSourcePort)
|
||||
remoteActorId.start
|
||||
}
|
||||
actorId.!(message)(Some(remoteActorId))
|
||||
} else {
|
||||
// couldn't find a way to reply, send the message without a source/sender
|
||||
actorId ! message
|
||||
}
|
||||
val sender = request.getSender
|
||||
if (sender ne null) actorRef.!(message)(Some(ActorRef.fromProtocol(sender)))
|
||||
} else {
|
||||
try {
|
||||
val resultOrNone = actorId !! message
|
||||
val resultOrNone = actorRef !! message
|
||||
val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null
|
||||
log.debug("Returning result from actor invocation [%s]", result)
|
||||
val replyBuilder = RemoteReply.newBuilder
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setIsSuccessful(true)
|
||||
.setIsActor(true)
|
||||
|
|
@ -337,7 +353,7 @@ class RemoteServerHandler(
|
|||
} catch {
|
||||
case e: Throwable =>
|
||||
log.error(e, "Could not invoke remote actor [%s]", request.getTarget)
|
||||
val replyBuilder = RemoteReply.newBuilder
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setException(e.getClass.getName + "$" + e.getMessage)
|
||||
.setIsSuccessful(false)
|
||||
|
|
@ -349,7 +365,7 @@ class RemoteServerHandler(
|
|||
}
|
||||
}
|
||||
|
||||
private def dispatchToActiveObject(request: RemoteRequest, channel: Channel) = {
|
||||
private def dispatchToActiveObject(request: RemoteRequestProtocol, channel: Channel) = {
|
||||
log.debug("Dispatching to remote active object [%s :: %s]", request.getMethod, request.getTarget)
|
||||
val activeObject = createActiveObject(request.getTarget, request.getTimeout)
|
||||
|
||||
|
|
@ -365,7 +381,7 @@ class RemoteServerHandler(
|
|||
else {
|
||||
val result = messageReceiver.invoke(activeObject, unescapedArgs: _*)
|
||||
log.debug("Returning result from remote active object invocation [%s]", result)
|
||||
val replyBuilder = RemoteReply.newBuilder
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setIsSuccessful(true)
|
||||
.setIsActor(false)
|
||||
|
|
@ -377,7 +393,7 @@ class RemoteServerHandler(
|
|||
} catch {
|
||||
case e: InvocationTargetException =>
|
||||
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
|
||||
val replyBuilder = RemoteReply.newBuilder
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage)
|
||||
.setIsSuccessful(false)
|
||||
|
|
@ -387,7 +403,7 @@ class RemoteServerHandler(
|
|||
channel.write(replyMessage)
|
||||
case e: Throwable =>
|
||||
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
|
||||
val replyBuilder = RemoteReply.newBuilder
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setException(e.getClass.getName + "$" + e.getMessage)
|
||||
.setIsSuccessful(false)
|
||||
|
|
@ -442,8 +458,9 @@ class RemoteServerHandler(
|
|||
* Does not start the actor.
|
||||
*/
|
||||
private def createActor(name: String, uuid: String, timeout: Long): ActorRef = {
|
||||
val actorIdOrNull = actors.get(uuid)
|
||||
if (actorIdOrNull eq null) {
|
||||
val actorRefOrNull = actors.get(uuid)
|
||||
println("----------- ACTOR " + actorRefOrNull + " " + uuid)
|
||||
if (actorRefOrNull eq null) {
|
||||
try {
|
||||
log.info("Creating a new remote actor [%s:%s]", name, uuid)
|
||||
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
|
||||
|
|
@ -452,14 +469,14 @@ class RemoteServerHandler(
|
|||
newInstance._uuid = uuid
|
||||
newInstance.timeout = timeout
|
||||
newInstance._remoteAddress = None
|
||||
val actorId = new ActorRef(() => newInstance)
|
||||
actors.put(uuid, actorId)
|
||||
actorId
|
||||
val actorRef = new ActorRef(() => newInstance)
|
||||
actors.put(uuid, actorRef)
|
||||
actorRef
|
||||
} catch {
|
||||
case e =>
|
||||
log.error(e, "Could not create remote actor instance")
|
||||
throw e
|
||||
}
|
||||
} else actorIdOrNull
|
||||
} else actorRefOrNull
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -171,7 +171,7 @@ class TransactionalRef[T](initialOpt: Option[T] = None) extends Transactional {
|
|||
* Necessary to keep from being implicitly converted to Iterable in for comprehensions.
|
||||
*/
|
||||
def withFilter(p: T => Boolean): WithFilter = new WithFilter(p)
|
||||
|
||||
|
||||
class WithFilter(p: T => Boolean) {
|
||||
def map[B](f: T => B): TransactionalRef[B] = self filter p map f
|
||||
def flatMap[B](f: T => TransactionalRef[B]): TransactionalRef[B] = self filter p flatMap f
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ object FutureSpec {
|
|||
class FutureSpec extends JUnitSuite {
|
||||
import FutureSpec._
|
||||
|
||||
@Test def shouldActorReplyResultThroughExplicitFuture = {
|
||||
@Test def shouldActorReplyResultThroughExplicitFuture {
|
||||
val actor = newActor[TestActor]
|
||||
actor.start
|
||||
val future = actor !!! "Hello"
|
||||
|
|
@ -30,7 +30,7 @@ class FutureSpec extends JUnitSuite {
|
|||
actor.stop
|
||||
}
|
||||
|
||||
@Test def shouldActorReplyExceptionThroughExplicitFuture = {
|
||||
@Test def shouldActorReplyExceptionThroughExplicitFuture {
|
||||
val actor = newActor[TestActor]
|
||||
actor.start
|
||||
val future = actor !!! "Failure"
|
||||
|
|
|
|||
|
|
@ -1,36 +0,0 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import Actor._
|
||||
|
||||
class MemoryFootprintSpec extends JUnitSuite {
|
||||
class Mem extends Actor {
|
||||
def receive = {
|
||||
case _ => {}
|
||||
}
|
||||
}
|
||||
|
||||
val NR_OF_ACTORS = 100000
|
||||
val MAX_MEMORY_FOOTPRINT_PER_ACTOR = 700
|
||||
|
||||
@Test
|
||||
def actorsShouldHaveLessMemoryFootprintThan700Bytes = {
|
||||
println("============== MEMORY FOOTPRINT TEST ==============")
|
||||
// warm up
|
||||
(1 until 10000).foreach(i => new Mem)
|
||||
|
||||
// Actors are put in AspectRegistry when created so they won't be GCd here
|
||||
|
||||
val totalMem = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory
|
||||
println("Memory before " + totalMem)
|
||||
(1 until NR_OF_ACTORS).foreach(i => new Mem)
|
||||
|
||||
val newTotalMem = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory
|
||||
println("Memory aftor " + newTotalMem)
|
||||
val memPerActor = (newTotalMem - totalMem) / NR_OF_ACTORS
|
||||
|
||||
println("Memory footprint per actor is : " + memPerActor)
|
||||
assert(memPerActor < MAX_MEMORY_FOOTPRINT_PER_ACTOR) // memory per actor should be less than 630 bytes
|
||||
}
|
||||
}
|
||||
|
|
@ -105,7 +105,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Test
|
||||
def shouldSendRemoteReply = {
|
||||
def shouldSendRemoteReplyProtocol = {
|
||||
implicit val timeout = 500000000L
|
||||
val actor = RemoteClient.actorFor(
|
||||
"se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional",
|
||||
|
|
|
|||
|
|
@ -98,9 +98,9 @@ class TransactionalRefSpec extends Spec with ShouldMatchers {
|
|||
var result = 0
|
||||
|
||||
atomic {
|
||||
for (value <- ref) {
|
||||
result += value
|
||||
}
|
||||
for (value <- ref) {
|
||||
result += value
|
||||
}
|
||||
}
|
||||
|
||||
result should be(3)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue