closing ticket #426
This commit is contained in:
parent
bfb612908b
commit
5a1e8f5235
7 changed files with 72 additions and 20 deletions
|
|
@ -1358,7 +1358,7 @@ object RemoteActorSystemMessage {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
private[akka] case class RemoteActorRef private[akka] (
|
||||
uuuid: String,
|
||||
classOrServiceName: String,
|
||||
val className: String,
|
||||
val hostname: String,
|
||||
val port: Int,
|
||||
|
|
@ -1369,7 +1369,7 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
|
||||
ensureRemotingEnabled
|
||||
|
||||
_uuid = uuuid
|
||||
id = classOrServiceName
|
||||
timeout = _timeout
|
||||
|
||||
start
|
||||
|
|
|
|||
|
|
@ -292,7 +292,7 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
/**
|
||||
* Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
|
||||
*/
|
||||
def register(actorRef: ActorRef): Unit = register(actorRef.id,actorRef)
|
||||
def register(actorRef: ActorRef): Unit = register(actorRef.id, actorRef)
|
||||
|
||||
/**
|
||||
* Register Remote Actor by a specific 'id' passed as argument.
|
||||
|
|
@ -555,6 +555,32 @@ class RemoteServerHandler(
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a registered actor by ID (default) or UUID.
|
||||
* Actors are registered by id apart from registering during serialization see SerializationProtocol.
|
||||
*/
|
||||
private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = {
|
||||
val registeredActors = server.actors()
|
||||
var actorRefOrNull = registeredActors get id
|
||||
if (actorRefOrNull eq null) {
|
||||
actorRefOrNull = registeredActors get uuid
|
||||
}
|
||||
actorRefOrNull
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a registered typed actor by ID (default) or UUID.
|
||||
* Actors are registered by id apart from registering during serialization see SerializationProtocol.
|
||||
*/
|
||||
private def findTypedActorByIdOrUUid(id: String, uuid: String) : AnyRef = {
|
||||
val registeredActors = server.typedActors()
|
||||
var actorRefOrNull = registeredActors get id
|
||||
if (actorRefOrNull eq null) {
|
||||
actorRefOrNull = registeredActors get uuid
|
||||
}
|
||||
actorRefOrNull
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance of the actor with name, uuid and timeout specified as arguments.
|
||||
*
|
||||
|
|
@ -563,12 +589,14 @@ class RemoteServerHandler(
|
|||
* Does not start the actor.
|
||||
*/
|
||||
private def createActor(actorInfo: ActorInfoProtocol): ActorRef = {
|
||||
val uuid = actorInfo.getUuid
|
||||
val ids = actorInfo.getUuid.split(':')
|
||||
val uuid = ids(0)
|
||||
val id = ids(1)
|
||||
|
||||
val name = actorInfo.getTarget
|
||||
val timeout = actorInfo.getTimeout
|
||||
|
||||
val registeredActors = server.actors()
|
||||
val actorRefOrNull = registeredActors get uuid
|
||||
val actorRefOrNull = findActorByIdOrUuid(id, uuid)
|
||||
|
||||
if (actorRefOrNull eq null) {
|
||||
try {
|
||||
|
|
@ -577,9 +605,10 @@ class RemoteServerHandler(
|
|||
else Class.forName(name)
|
||||
val actorRef = Actor.actorOf(clazz.newInstance.asInstanceOf[Actor])
|
||||
actorRef.uuid = uuid
|
||||
actorRef.id = id
|
||||
actorRef.timeout = timeout
|
||||
actorRef.remoteAddress = None
|
||||
registeredActors.put(uuid, actorRef)
|
||||
server.actors.put(id, actorRef) // register by id
|
||||
actorRef
|
||||
} catch {
|
||||
case e =>
|
||||
|
|
@ -591,9 +620,11 @@ class RemoteServerHandler(
|
|||
}
|
||||
|
||||
private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = {
|
||||
val uuid = actorInfo.getUuid
|
||||
val registeredTypedActors = server.typedActors()
|
||||
val typedActorOrNull = registeredTypedActors get uuid
|
||||
val ids = actorInfo.getUuid.split(':')
|
||||
val uuid = ids(0)
|
||||
val id = ids(1)
|
||||
|
||||
val typedActorOrNull = findTypedActorByIdOrUUid(id, uuid)
|
||||
|
||||
if (typedActorOrNull eq null) {
|
||||
val typedActorInfo = actorInfo.getTypedActorInfo
|
||||
|
|
@ -610,7 +641,7 @@ class RemoteServerHandler(
|
|||
|
||||
val newInstance = TypedActor.newInstance(
|
||||
interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef]
|
||||
registeredTypedActors.put(uuid, newInstance)
|
||||
server.typedActors.put(id, newInstance) // register by id
|
||||
newInstance
|
||||
} catch {
|
||||
case e =>
|
||||
|
|
|
|||
|
|
@ -230,7 +230,7 @@ object RemoteActorSerialization {
|
|||
}
|
||||
|
||||
RemoteActorRefProtocol.newBuilder
|
||||
.setUuid(uuid)
|
||||
.setUuid(uuid + ":" + id)
|
||||
.setActorClassname(actorClass.getName)
|
||||
.setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
|
||||
.setTimeout(timeout)
|
||||
|
|
@ -248,7 +248,7 @@ object RemoteActorSerialization {
|
|||
import actorRef._
|
||||
|
||||
val actorInfoBuilder = ActorInfoProtocol.newBuilder
|
||||
.setUuid(uuid)
|
||||
.setUuid(uuid + ":" + actorRef.id)
|
||||
.setTarget(actorClassName)
|
||||
.setTimeout(timeout)
|
||||
|
||||
|
|
|
|||
|
|
@ -93,6 +93,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
|
|||
actor.stop
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
def shouldSendOneWayAndReceiveReply = {
|
||||
val actor = actorOf[SendOneWayAndReplyReceiverActor]
|
||||
|
|
@ -134,6 +135,6 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
|
|||
assert("Expected exception; to test fault-tolerance" === e.getMessage())
|
||||
}
|
||||
actor.stop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,10 +4,7 @@
|
|||
|
||||
package se.scalablesolutions.akka.actor.remote
|
||||
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.Assertions
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
|
|
@ -19,6 +16,7 @@ import se.scalablesolutions.akka.actor._
|
|||
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
|
||||
|
||||
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
|
||||
import org.scalatest.{BeforeAndAfterEach, Spec, Assertions, BeforeAndAfterAll}
|
||||
|
||||
object RemoteTypedActorSpec {
|
||||
val HOSTNAME = "localhost"
|
||||
|
|
@ -40,7 +38,7 @@ object RemoteTypedActorLog {
|
|||
class RemoteTypedActorSpec extends
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterAll {
|
||||
BeforeAndAfterEach with BeforeAndAfterAll {
|
||||
|
||||
import RemoteTypedActorLog._
|
||||
import RemoteTypedActorSpec._
|
||||
|
|
@ -82,6 +80,10 @@ class RemoteTypedActorSpec extends
|
|||
ActorRegistry.shutdownAll
|
||||
}
|
||||
|
||||
override def afterEach() {
|
||||
server.typedActors.clear
|
||||
}
|
||||
|
||||
describe("Remote Typed Actor ") {
|
||||
|
||||
it("should receive one-way message") {
|
||||
|
|
|
|||
|
|
@ -79,6 +79,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
def shouldSendWithBang {
|
||||
val actor = RemoteClient.actorFor(
|
||||
|
|
@ -139,11 +140,29 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
|
|||
server.register(actorOf[RemoteActorSpecActorUnidirectional])
|
||||
val actor = RemoteClient.actorFor("se.scalablesolutions.akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", HOSTNAME, PORT)
|
||||
val numberOfActorsInRegistry = ActorRegistry.actors.length
|
||||
val result = actor ! "OneWay"
|
||||
actor ! "OneWay"
|
||||
assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS))
|
||||
assert(numberOfActorsInRegistry === ActorRegistry.actors.length)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldUseServiceNameAsIdForRemoteActorRef {
|
||||
server.register(actorOf[RemoteActorSpecActorUnidirectional])
|
||||
server.register("my-service", actorOf[RemoteActorSpecActorUnidirectional])
|
||||
val actor1 = RemoteClient.actorFor("se.scalablesolutions.akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", HOSTNAME, PORT)
|
||||
val actor2 = RemoteClient.actorFor("my-service", HOSTNAME, PORT)
|
||||
val actor3 = RemoteClient.actorFor("my-service", HOSTNAME, PORT)
|
||||
|
||||
actor1 ! "OneWay"
|
||||
actor2 ! "OneWay"
|
||||
actor3 ! "OneWay"
|
||||
|
||||
assert(actor1.uuid != actor2.uuid)
|
||||
assert(actor1.uuid != actor3.uuid)
|
||||
assert(actor1.id != actor2.id)
|
||||
assert(actor2.id == actor3.id)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -122,7 +122,6 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
|
|||
|
||||
remoteAddress.foreach { address =>
|
||||
actorRef.makeRemote(remoteAddress.get)
|
||||
RemoteServerModule.registerTypedActor(address, implementationClass.getName, proxy)
|
||||
}
|
||||
|
||||
AspectInitRegistry.register(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue