Completed RemoteActorRefProvider and parsing/management of 'remote' section akka.conf, now does provisioning and local instantiation of remote actor on its home node. Also changed command line option 'akka.cluster.port' to 'akka.remote.port'.
Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
parent
db51115a0e
commit
af49b99d6f
69 changed files with 143 additions and 119 deletions
|
|
@ -46,10 +46,16 @@ class RemoteActorRefProvider extends ActorRefProvider {
|
|||
case Some(Deploy(_, _, router, _, RemoteScope(host, port))) ⇒
|
||||
// FIXME create RoutedActorRef if 'router' is specified
|
||||
|
||||
val remoteAddress = new InetSocketAddress(host, port)
|
||||
useActorOnNode(remoteAddress, address, props.creator)
|
||||
|
||||
Some(newRemoteActorRef(address, remoteAddress)) // create a remote actor
|
||||
val serverAddress = Remote.address
|
||||
if (serverAddress.getHostName == host && serverAddress.getPort == port) {
|
||||
// home node for this remote actor
|
||||
Some(new LocalActorRef(props, address, false)) // create a local actor
|
||||
} else {
|
||||
// not home node, need to provision it
|
||||
val remoteAddress = new InetSocketAddress(host, port)
|
||||
useActorOnNode(remoteAddress, deployId, props.creator)
|
||||
Some(RemoteActorRef(remoteAddress, address, Actor.TIMEOUT, None)) // create a remote actor
|
||||
}
|
||||
|
||||
case deploy ⇒ None // non-remote actor
|
||||
}
|
||||
|
|
@ -78,26 +84,22 @@ class RemoteActorRefProvider extends ActorRefProvider {
|
|||
.setPayload(ByteString.copyFrom(actorFactoryBytes))
|
||||
.build()
|
||||
|
||||
val connectionFactory = () ⇒ newRemoteActorRef(actorAddress, remoteAddress)
|
||||
val connectionFactory =
|
||||
() ⇒ Remote.server.actorFor(
|
||||
Remote.remoteDaemonServiceName, remoteAddress.getHostName, remoteAddress.getPort)
|
||||
|
||||
// try to get the connection for the remote address, if not already there then create it
|
||||
val connection = failureDetector.putIfAbsent(remoteAddress, connectionFactory)
|
||||
|
||||
sendCommandToRemoteNode(connection, command, async = false) // ensure we get an ACK on the USE command
|
||||
}
|
||||
|
||||
private def newRemoteActorRef(actorAddress: String, inetSocketAddress: InetSocketAddress) = {
|
||||
RemoteActorRef(inetSocketAddress, actorAddress, Actor.TIMEOUT, None)
|
||||
sendCommandToRemoteNode(connection, command, withACK = true) // ensure we get an ACK on the USE command
|
||||
}
|
||||
|
||||
private def sendCommandToRemoteNode(
|
||||
connection: ActorRef,
|
||||
command: RemoteDaemonMessageProtocol,
|
||||
async: Boolean = true) {
|
||||
withACK: Boolean) {
|
||||
|
||||
if (async) {
|
||||
connection ! command
|
||||
} else {
|
||||
if (withACK) {
|
||||
try {
|
||||
(connection ? (command, Remote.remoteDaemonAckTimeout)).as[Status] match {
|
||||
case Some(Success(receiver)) ⇒
|
||||
|
|
@ -117,6 +119,8 @@ class RemoteActorRefProvider extends ActorRefProvider {
|
|||
EventHandler.error(e, this, "Could not send remote command to [%s] due to: %s".format(connection.address, e.toString))
|
||||
throw e
|
||||
}
|
||||
} else {
|
||||
connection ! command
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue