fix routing of remote messages bouncing nodes (there may be pathological cases ...)
- RemoteCommunicationSpec last test currently failing, lookup of deployment must be fixed to work zig-zag across nodes - commit mainly to merge with Henrik’s router work and get the surface clean
This commit is contained in:
parent
e5bd8b5f88
commit
a20aad4a5b
3 changed files with 120 additions and 66 deletions
|
|
@ -87,66 +87,66 @@ class RemoteActorRefProvider(
|
|||
new RemoteActorRef(this, remote.server, rpath, supervisor, None)
|
||||
}
|
||||
|
||||
// def isReplicaNode: Boolean = remoteAddresses exists { _ == remote.remoteAddress }
|
||||
//
|
||||
// //system.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(system.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
|
||||
//
|
||||
// if (isReplicaNode) {
|
||||
// // we are on one of the replica node for this remote actor
|
||||
// local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create (is this fixed now?)
|
||||
// } else {
|
||||
//
|
||||
// implicit val dispatcher = if (props.dispatcher == Props.defaultDispatcher) system.dispatcher else props.dispatcher
|
||||
// implicit val timeout = system.settings.ActorTimeout
|
||||
//
|
||||
// // we are on the single "reference" node uses the remote actors on the replica nodes
|
||||
// val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match {
|
||||
// case RouterType.Direct ⇒
|
||||
// if (remoteAddresses.size != 1) throw new ConfigurationException(
|
||||
// "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]"
|
||||
// .format(name, remoteAddresses.mkString(", ")))
|
||||
// () ⇒ new DirectRouter
|
||||
//
|
||||
// case RouterType.Broadcast ⇒
|
||||
// if (remoteAddresses.size != 1) throw new ConfigurationException(
|
||||
// "Actor [%s] configured with Broadcast router must have exactly 1 remote node configured. Found [%s]"
|
||||
// .format(name, remoteAddresses.mkString(", ")))
|
||||
// () ⇒ new BroadcastRouter
|
||||
//
|
||||
// case RouterType.Random ⇒
|
||||
// if (remoteAddresses.size < 1) throw new ConfigurationException(
|
||||
// "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]"
|
||||
// .format(name, remoteAddresses.mkString(", ")))
|
||||
// () ⇒ new RandomRouter
|
||||
//
|
||||
// case RouterType.RoundRobin ⇒
|
||||
// if (remoteAddresses.size < 1) throw new ConfigurationException(
|
||||
// "Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]"
|
||||
// .format(name, remoteAddresses.mkString(", ")))
|
||||
// () ⇒ new RoundRobinRouter
|
||||
//
|
||||
// case RouterType.ScatterGather ⇒
|
||||
// if (remoteAddresses.size < 1) throw new ConfigurationException(
|
||||
// "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]"
|
||||
// .format(name, remoteAddresses.mkString(", ")))
|
||||
// () ⇒ new ScatterGatherFirstCompletedRouter()(dispatcher, defaultTimeout)
|
||||
//
|
||||
// case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet")
|
||||
// case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet")
|
||||
// case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet")
|
||||
// case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass)
|
||||
// }
|
||||
//
|
||||
// val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒
|
||||
// conns + (a -> new RemoteActorRef(this, remote.server, path, None)) // FIXME RK correct path must be put in here
|
||||
// }
|
||||
//
|
||||
// val connectionManager = new RemoteConnectionManager(system, remote, connections)
|
||||
//
|
||||
// connections.keys foreach { useActorOnNode(system, _, path.toString, props.creator) }
|
||||
//
|
||||
// actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, name)
|
||||
// }
|
||||
// def isReplicaNode: Boolean = remoteAddresses exists { _ == remote.remoteAddress }
|
||||
//
|
||||
// //system.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(system.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
|
||||
//
|
||||
// if (isReplicaNode) {
|
||||
// // we are on one of the replica node for this remote actor
|
||||
// local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create (is this fixed now?)
|
||||
// } else {
|
||||
//
|
||||
// implicit val dispatcher = if (props.dispatcher == Props.defaultDispatcher) system.dispatcher else props.dispatcher
|
||||
// implicit val timeout = system.settings.ActorTimeout
|
||||
//
|
||||
// // we are on the single "reference" node uses the remote actors on the replica nodes
|
||||
// val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match {
|
||||
// case RouterType.Direct ⇒
|
||||
// if (remoteAddresses.size != 1) throw new ConfigurationException(
|
||||
// "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]"
|
||||
// .format(name, remoteAddresses.mkString(", ")))
|
||||
// () ⇒ new DirectRouter
|
||||
//
|
||||
// case RouterType.Broadcast ⇒
|
||||
// if (remoteAddresses.size != 1) throw new ConfigurationException(
|
||||
// "Actor [%s] configured with Broadcast router must have exactly 1 remote node configured. Found [%s]"
|
||||
// .format(name, remoteAddresses.mkString(", ")))
|
||||
// () ⇒ new BroadcastRouter
|
||||
//
|
||||
// case RouterType.Random ⇒
|
||||
// if (remoteAddresses.size < 1) throw new ConfigurationException(
|
||||
// "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]"
|
||||
// .format(name, remoteAddresses.mkString(", ")))
|
||||
// () ⇒ new RandomRouter
|
||||
//
|
||||
// case RouterType.RoundRobin ⇒
|
||||
// if (remoteAddresses.size < 1) throw new ConfigurationException(
|
||||
// "Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]"
|
||||
// .format(name, remoteAddresses.mkString(", ")))
|
||||
// () ⇒ new RoundRobinRouter
|
||||
//
|
||||
// case RouterType.ScatterGather ⇒
|
||||
// if (remoteAddresses.size < 1) throw new ConfigurationException(
|
||||
// "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]"
|
||||
// .format(name, remoteAddresses.mkString(", ")))
|
||||
// () ⇒ new ScatterGatherFirstCompletedRouter()(dispatcher, defaultTimeout)
|
||||
//
|
||||
// case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet")
|
||||
// case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet")
|
||||
// case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet")
|
||||
// case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass)
|
||||
// }
|
||||
//
|
||||
// val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒
|
||||
// conns + (a -> new RemoteActorRef(this, remote.server, path, None)) // FIXME RK correct path must be put in here
|
||||
// }
|
||||
//
|
||||
// val connectionManager = new RemoteConnectionManager(system, remote, connections)
|
||||
//
|
||||
// connections.keys foreach { useActorOnNode(system, _, path.toString, props.creator) }
|
||||
//
|
||||
// actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, name)
|
||||
// }
|
||||
case deploy ⇒ local.actorOf(system, props, supervisor, name, systemService)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue