Moved method for creating a RoutedActorRef from 'Routing.actorOf' to 'Actor.actorOf'

This commit is contained in:
Jonas Bonér 2011-10-11 11:55:59 +02:00
parent 84f4840926
commit e20866c982
11 changed files with 54 additions and 55 deletions

View file

@ -31,14 +31,14 @@ class RoutingSpec extends WordSpec with MustMatchers {
val actor1 = Actor.actorOf[TestActor] val actor1 = Actor.actorOf[TestActor]
val props = RoutedProps().withDirectRouter.withLocalConnections(List(actor1)) val props = RoutedProps().withDirectRouter.withLocalConnections(List(actor1))
val actor = Routing.actorOf(props, "foo") val actor = Actor.actorOf(props, "foo")
actor.isShutdown must be(false) actor.isShutdown must be(false)
} }
"throw ConfigurationException at construction when no connections" in { "throw ConfigurationException at construction when no connections" in {
try { try {
val props = RoutedProps().withDirectRouter val props = RoutedProps().withDirectRouter
Routing.actorOf(props, "foo") Actor.actorOf(props, "foo")
fail() fail()
} catch { } catch {
case e: ConfigurationException case e: ConfigurationException
@ -57,7 +57,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
}) })
val props = RoutedProps().withDirectRouter.withLocalConnections(List(connection1)) val props = RoutedProps().withDirectRouter.withLocalConnections(List(connection1))
val routedActor = Routing.actorOf(props, "foo") val routedActor = Actor.actorOf(props, "foo")
routedActor ! "hello" routedActor ! "hello"
routedActor ! "end" routedActor ! "end"
@ -78,7 +78,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
}) })
val props = RoutedProps().withDirectRouter.withLocalConnections(List(connection1)) val props = RoutedProps().withDirectRouter.withLocalConnections(List(connection1))
val actor = Routing.actorOf(props, "foo") val actor = Actor.actorOf(props, "foo")
actor ! Broadcast(1) actor ! Broadcast(1)
actor ! "end" actor ! "end"
@ -95,14 +95,14 @@ class RoutingSpec extends WordSpec with MustMatchers {
val actor1 = Actor.actorOf[TestActor] val actor1 = Actor.actorOf[TestActor]
val props = RoutedProps().withRoundRobinRouter.withLocalConnections(List(actor1)) val props = RoutedProps().withRoundRobinRouter.withLocalConnections(List(actor1))
val actor = Routing.actorOf(props, "foo") val actor = Actor.actorOf(props, "foo")
actor.isShutdown must be(false) actor.isShutdown must be(false)
} }
"throw ConfigurationException at construction when no connections" in { "throw ConfigurationException at construction when no connections" in {
try { try {
val props = RoutedProps().withRoundRobinRouter val props = RoutedProps().withRoundRobinRouter
Routing.actorOf(props, "foo") Actor.actorOf(props, "foo")
fail() fail()
} catch { } catch {
case e: ConfigurationException case e: ConfigurationException
@ -135,7 +135,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
//create the routed actor. //create the routed actor.
val props = RoutedProps().withRoundRobinRouter.withLocalConnections(connections) val props = RoutedProps().withRoundRobinRouter.withLocalConnections(connections)
val actor = Routing.actorOf(props, "foo") val actor = Actor.actorOf(props, "foo")
//send messages to the actor. //send messages to the actor.
for (i 0 until iterationCount) { for (i 0 until iterationCount) {
@ -174,7 +174,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
}) })
val props = RoutedProps().withRoundRobinRouter.withLocalConnections(List(connection1, connection2)) val props = RoutedProps().withRoundRobinRouter.withLocalConnections(List(connection1, connection2))
val actor = Routing.actorOf(props, "foo") val actor = Actor.actorOf(props, "foo")
actor ! Broadcast(1) actor ! Broadcast(1)
actor ! Broadcast("end") actor ! Broadcast("end")
@ -197,7 +197,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
}) })
val props = RoutedProps().withRoundRobinRouter.withLocalConnections(List(connection1)) val props = RoutedProps().withRoundRobinRouter.withLocalConnections(List(connection1))
val actor = Routing.actorOf(props, "foo") val actor = Actor.actorOf(props, "foo")
try { try {
actor ? Broadcast(1) actor ? Broadcast(1)
@ -219,14 +219,14 @@ class RoutingSpec extends WordSpec with MustMatchers {
val actor1 = Actor.actorOf[TestActor] val actor1 = Actor.actorOf[TestActor]
val props = RoutedProps().withRandomRouter.withLocalConnections(List(actor1)) val props = RoutedProps().withRandomRouter.withLocalConnections(List(actor1))
val actor = Routing.actorOf(props, "foo") val actor = Actor.actorOf(props, "foo")
actor.isShutdown must be(false) actor.isShutdown must be(false)
} }
"throw ConfigurationException at construction when no connections" in { "throw ConfigurationException at construction when no connections" in {
try { try {
val props = RoutedProps().withRandomRouter val props = RoutedProps().withRandomRouter
Routing.actorOf(props, "foo") Actor.actorOf(props, "foo")
fail() fail()
} catch { } catch {
case e: ConfigurationException case e: ConfigurationException
@ -257,7 +257,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
}) })
val props = RoutedProps().withRandomRouter.withLocalConnections(List(connection1, connection2)) val props = RoutedProps().withRandomRouter.withLocalConnections(List(connection1, connection2))
val actor = Routing.actorOf(props, "foo") val actor = Actor.actorOf(props, "foo")
actor ! Broadcast(1) actor ! Broadcast(1)
actor ! Broadcast("end") actor ! Broadcast("end")
@ -280,7 +280,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
}) })
val props = RoutedProps().withRandomRouter.withLocalConnections(List(connection1)) val props = RoutedProps().withRandomRouter.withLocalConnections(List(connection1))
val actor = Routing.actorOf(props, "foo") val actor = Actor.actorOf(props, "foo")
try { try {
actor ? Broadcast(1) actor ? Broadcast(1)
@ -305,7 +305,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
.withLocalConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))) .withLocalConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch))))
.withRouter(() new ScatterGatherFirstCompletedRouter()) .withRouter(() new ScatterGatherFirstCompletedRouter())
val actor = Routing.actorOf(props, "foo") val actor = Actor.actorOf(props, "foo")
actor ! Broadcast(Stop(Some(0))) actor ! Broadcast(Stop(Some(0)))
@ -322,7 +322,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
.withLocalConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))) .withLocalConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch))))
.withRouter(() new ScatterGatherFirstCompletedRouter()) .withRouter(() new ScatterGatherFirstCompletedRouter())
val actor = Routing.actorOf(props, "foo") val actor = Actor.actorOf(props, "foo")
actor ! Broadcast(Stop()) actor ! Broadcast(Stop())
@ -340,7 +340,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
.withLocalConnections(List(newActor(0), newActor(1))) .withLocalConnections(List(newActor(0), newActor(1)))
.withRouter(() new ScatterGatherFirstCompletedRouter()) .withRouter(() new ScatterGatherFirstCompletedRouter())
val actor = Routing.actorOf(props, "foo") val actor = Actor.actorOf(props, "foo")
(actor ? Broadcast("Hi!")).get.asInstanceOf[Int] must be(0) (actor ? Broadcast("Hi!")).get.asInstanceOf[Int] must be(0)
@ -351,7 +351,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
.withLocalConnections(List(newActor(0), newActor(1))) .withLocalConnections(List(newActor(0), newActor(1)))
.withRouter(() new ScatterGatherFirstCompletedRouter()) .withRouter(() new ScatterGatherFirstCompletedRouter())
val actor = Routing.actorOf(props, "foo") val actor = Actor.actorOf(props, "foo")
(actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) (actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1)
} }
@ -360,7 +360,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val props = RoutedProps() val props = RoutedProps()
.withLocalConnections(List(newActor(0))) .withLocalConnections(List(newActor(0)))
.withRouter(() new ScatterGatherFirstCompletedRouter()) .withRouter(() new ScatterGatherFirstCompletedRouter())
val actor = Routing.actorOf(props, "foo") val actor = Actor.actorOf(props, "foo")
actor.isShutdown must be(false) actor.isShutdown must be(false)
@ -372,7 +372,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
.withRouter(() new ScatterGatherFirstCompletedRouter()) .withRouter(() new ScatterGatherFirstCompletedRouter())
try { try {
Routing.actorOf(props, "foo") Actor.actorOf(props, "foo")
fail() fail()
} catch { } catch {
case e: ConfigurationException case e: ConfigurationException
@ -402,7 +402,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
.withLocalConnections(connections) .withLocalConnections(connections)
.withRouter(() new ScatterGatherFirstCompletedRouter()) .withRouter(() new ScatterGatherFirstCompletedRouter())
val actor = Routing.actorOf(props, "foo") val actor = Actor.actorOf(props, "foo")
for (i 0 until iterationCount) { for (i 0 until iterationCount) {
for (k 0 until connectionCount) { for (k 0 until connectionCount) {
@ -443,7 +443,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
.withLocalConnections(List(connection1, connection2)) .withLocalConnections(List(connection1, connection2))
.withRouter(() new ScatterGatherFirstCompletedRouter()) .withRouter(() new ScatterGatherFirstCompletedRouter())
val actor = Routing.actorOf(props, "foo") val actor = Actor.actorOf(props, "foo")
actor ! Broadcast(1) actor ! Broadcast(1)
actor ! Broadcast("end") actor ! Broadcast("end")

View file

@ -25,14 +25,6 @@ public class Actors {
return Actor$.MODULE$.provider(); return Actor$.MODULE$.provider();
} }
/**
*
* @return The actor registry
*/
public static ActorRefProviders registry() {
return Actor$.MODULE$.provider();
}
/** /**
* *
* @return * @return

View file

@ -335,6 +335,11 @@ object Actor {
*/ */
def actorOf(props: Props, address: String): ActorRef = provider.actorOf(props, address) def actorOf(props: Props, address: String): ActorRef = provider.actorOf(props, address)
/**
* Creates (or fetches) a routed actor reference, configured by the 'props: RoutedProps' configuration.
*/
def actorOf(props: RoutedProps, address: String = newUuid().toString): ActorRef = provider.actorOf(props, address)
/** /**
* Use to spawn out a block of code in an event-driven actor. Will shut actor down when * Use to spawn out a block of code in an event-driven actor. Will shut actor down when
* the block has been executed. * the block has been executed.

View file

@ -5,8 +5,10 @@
package akka.actor package akka.actor
import akka.event.EventHandler import akka.event.EventHandler
import akka.AkkaException import akka.config.ConfigurationException
import akka.util.ReflectiveAccess
import akka.routing._ import akka.routing._
import akka.AkkaException
/** /**
* Interface for all ActorRef providers to implement. * Interface for all ActorRef providers to implement.
@ -90,6 +92,22 @@ private[akka] class ActorRefProviders(
actorFor(address, providersAsList) actorFor(address, providersAsList)
} }
/**
* Creates (or fetches) a routed actor reference, configured by the 'props: RoutedProps' configuration.
*/
def actorOf(props: RoutedProps, address: String = newUuid().toString): ActorRef = {
//TODO Implement support for configuring by deployment ID etc
//TODO If address matches an already created actor (Ahead-of-time deployed) return that actor
//TODO If address exists in config, it will override the specified Props (should we attempt to merge?)
//TODO If the actor deployed uses a different config, then ignore or throw exception?
if (props.connectionManager.size == 0) throw new ConfigurationException("RoutedProps used for creating actor [" + address + "] has zero connections configured; can't create a router")
val clusteringEnabled = ReflectiveAccess.ClusterModule.isEnabled
val localOnly = props.localOnly
if (clusteringEnabled && !props.localOnly) ReflectiveAccess.ClusterModule.newClusteredActorRef(props)
else new RoutedActorRef(props, address)
}
/** /**
* Returns true if the actor was in the provider's cache and evicted successfully, else false. * Returns true if the actor was in the provider's cache and evicted successfully, else false.
*/ */
@ -173,7 +191,7 @@ class LocalActorRefProvider extends ActorRefProvider {
Vector.fill(nrOfInstances.factor)(new LocalActorRef(props, new UUID().toString, systemService)) Vector.fill(nrOfInstances.factor)(new LocalActorRef(props, new UUID().toString, systemService))
else Nil else Nil
Some(Routing.actorOf(RoutedProps( Some(Actor.actorOf(RoutedProps(
routerFactory = routerFactory, routerFactory = routerFactory,
connectionManager = new LocalConnectionManager(connections)))) connectionManager = new LocalConnectionManager(connections))))

View file

@ -72,24 +72,6 @@ object Routing {
*/ */
case class Broadcast(message: Any) extends RoutingMessage case class Broadcast(message: Any) extends RoutingMessage
/**
* Creates (or fetches) a routed actor reference, configured by the 'props: RoutedProps' configuration.
*
* FIXME: will very likely be moved to the ActorRefProvider.
*/
def actorOf(props: RoutedProps, address: String = newUuid().toString): ActorRef = {
//TODO Implement support for configuring by deployment ID etc
//TODO If address matches an already created actor (Ahead-of-time deployed) return that actor
//TODO If address exists in config, it will override the specified Props (should we attempt to merge?)
//TODO If the actor deployed uses a different config, then ignore or throw exception?
if (props.connectionManager.size == 0) throw new ConfigurationException("RoutedProps used for creating actor [" + address + "] has zero connections configured; can't create a router")
val clusteringEnabled = ReflectiveAccess.ClusterModule.isEnabled
val localOnly = props.localOnly
if (clusteringEnabled && !props.localOnly) ReflectiveAccess.ClusterModule.newClusteredActorRef(props)
else new RoutedActorRef(props, address)
}
def createCustomRouter(implClass: String): Router = { def createCustomRouter(implClass: String): Router = {
ReflectiveAccess.createInstance( ReflectiveAccess.createInstance(
implClass, implClass,

View file

@ -69,7 +69,7 @@ object Pi extends App {
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker]) val workers = Vector.fill(nrOfWorkers)(actorOf[Worker])
// wrap them with a load-balancing router // wrap them with a load-balancing router
val router = Routing.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi") val router = Actor.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi")
//#create-workers //#create-workers
//#master-receive //#master-receive

View file

@ -107,7 +107,7 @@ class RemoteActorRefProvider extends ActorRefProvider {
connections.keys foreach { useActorOnNode(_, address, props.creator) } connections.keys foreach { useActorOnNode(_, address, props.creator) }
Some(Routing.actorOf(RoutedProps( Some(Actor.actorOf(RoutedProps(
routerFactory = routerFactory, routerFactory = routerFactory,
connectionManager = connectionManager))) connectionManager = connectionManager)))
} }

View file

@ -9,6 +9,7 @@ import static akka.actor.Actors.poisonPill;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Actors;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory; import akka.actor.UntypedActorFactory;
import akka.routing.RoutedProps; import akka.routing.RoutedProps;
@ -110,7 +111,7 @@ public class Pi {
workers.add(worker); workers.add(worker);
} }
router = Routing.actorOf(new RoutedProps().withRoundRobinRouter().withLocalConnections(workers), "pi"); router = Actors.provider().actorOf(new RoutedProps().withRoundRobinRouter().withLocalConnections(workers), "pi");
} }
// message handler // message handler

View file

@ -58,7 +58,7 @@ object Pi extends App {
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker]) val workers = Vector.fill(nrOfWorkers)(actorOf[Worker])
// wrap them with a load-balancing router // wrap them with a load-balancing router
val router = Routing.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi") val router = Actor.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi")
// message handler // message handler
def receive = { def receive = {

View file

@ -14,6 +14,7 @@ import akka.routing.Routing;
import akka.routing.LocalConnectionManager; import akka.routing.LocalConnectionManager;
import scala.Option; import scala.Option;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Actors;
import akka.actor.Channel; import akka.actor.Channel;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory; import akka.actor.UntypedActorFactory;
@ -104,7 +105,7 @@ public class Pi {
workers.add(worker); workers.add(worker);
} }
router = Routing.actorOf(new RoutedProps().withRoundRobinRouter().withLocalConnections(workers), "pi"); router = Actors.provider().actorOf(new RoutedProps().withRoundRobinRouter().withLocalConnections(workers), "pi");
} }
@Override @Override

View file

@ -53,7 +53,7 @@ object Pi extends App {
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker]) val workers = Vector.fill(nrOfWorkers)(actorOf[Worker])
// wrap them with a load-balancing router // wrap them with a load-balancing router
val router = Routing.actorOf(RoutedProps( val router = Actor.actorOf(RoutedProps(
routerFactory = () new RoundRobinRouter, routerFactory = () new RoundRobinRouter,
connectionManager = new LocalConnectionManager(workers)), "pi") connectionManager = new LocalConnectionManager(workers)), "pi")