Cleaned up RoutedProps and removed all actorOf methods with RoutedProps.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-11-11 19:57:27 +01:00
parent a9049ec9e5
commit e88d07305d
16 changed files with 375 additions and 927 deletions

View file

@ -3,7 +3,7 @@ package akka.routing
import akka.routing._
import akka.config.ConfigurationException
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{ ActorRef, Actor }
import akka.actor._
import collection.mutable.LinkedList
import akka.routing.Routing.Broadcast
import java.util.concurrent.{ CountDownLatch, TimeUnit }
@ -28,21 +28,11 @@ class RoutingSpec extends AkkaSpec {
"be started when constructed" in {
val actor1 = actorOf[TestActor]
val props = RoutedProps().withDirectRouter.withLocalConnections(List(actor1))
val actor = app.actorOf(props, "foo")
val props = RoutedProps(routerFactory = () new DirectRouter, connectionManager = new LocalConnectionManager(List(actor1)))
val actor = new RoutedActorRef(app, props, app.guardian, "foo")
actor.isShutdown must be(false)
}
"throw ConfigurationException at construction when no connections" in {
try {
val props = RoutedProps().withDirectRouter
app.actorOf(props, "foo")
fail()
} catch {
case e: ConfigurationException
}
}
"send message to connection" in {
val doneLatch = new CountDownLatch(1)
@ -54,8 +44,8 @@ class RoutingSpec extends AkkaSpec {
}
})
val props = RoutedProps().withDirectRouter.withLocalConnections(List(connection1))
val routedActor = app.actorOf(props, "foo")
val props = RoutedProps(routerFactory = () new DirectRouter, connectionManager = new LocalConnectionManager(List(connection1)))
val routedActor = new RoutedActorRef(app, props, app.guardian, "foo")
routedActor ! "hello"
routedActor ! "end"
@ -75,8 +65,8 @@ class RoutingSpec extends AkkaSpec {
}
})
val props = RoutedProps().withDirectRouter.withLocalConnections(List(connection1))
val actor = app.actorOf(props, "foo")
val props = RoutedProps(routerFactory = () new DirectRouter, connectionManager = new LocalConnectionManager(List(connection1)))
val actor = new RoutedActorRef(app, props, app.guardian, "foo")
actor ! Broadcast(1)
actor ! "end"
@ -92,21 +82,11 @@ class RoutingSpec extends AkkaSpec {
"be started when constructed" in {
val actor1 = actorOf[TestActor]
val props = RoutedProps().withRoundRobinRouter.withLocalConnections(List(actor1))
val actor = app.actorOf(props, "foo")
val props = RoutedProps(routerFactory = () new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(actor1)))
val actor = new RoutedActorRef(app, props, app.guardian, "foo")
actor.isShutdown must be(false)
}
"throw ConfigurationException at construction when no connections" in {
try {
val props = RoutedProps().withRoundRobinRouter
app.actorOf(props, "foo")
fail()
} catch {
case e: ConfigurationException
}
}
//In this test a bunch of actors are created and each actor has its own counter.
//to test round robin, the routed actor receives the following sequence of messages 1 2 3 .. 1 2 3 .. 1 2 3 which it
//uses to increment his counter.
@ -132,8 +112,8 @@ class RoutingSpec extends AkkaSpec {
}
//create the routed actor.
val props = RoutedProps().withRoundRobinRouter.withLocalConnections(connections)
val actor = app.actorOf(props, "foo")
val props = RoutedProps(routerFactory = () new RoundRobinRouter, connectionManager = new LocalConnectionManager(connections))
val actor = new RoutedActorRef(app, props, app.guardian, "foo")
//send messages to the actor.
for (i 0 until iterationCount) {
@ -171,8 +151,8 @@ class RoutingSpec extends AkkaSpec {
}
})
val props = RoutedProps().withRoundRobinRouter.withLocalConnections(List(connection1, connection2))
val actor = app.actorOf(props, "foo")
val props = RoutedProps(routerFactory = () new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2)))
val actor = new RoutedActorRef(app, props, app.guardian, "foo")
actor ! Broadcast(1)
actor ! Broadcast("end")
@ -194,7 +174,8 @@ class RoutingSpec extends AkkaSpec {
}
})
val actor = app.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(List(connection1)), "foo")
val props = RoutedProps(routerFactory = () new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(connection1)))
val actor = new RoutedActorRef(app, props, app.guardian, "foo")
intercept[RoutingException] { actor ? Broadcast(1) }
@ -210,25 +191,11 @@ class RoutingSpec extends AkkaSpec {
val actor1 = actorOf[TestActor]
val props = RoutedProps().withRandomRouter.withLocalConnections(List(actor1))
val actor = app.actorOf(props, "foo")
val props = RoutedProps(routerFactory = () new RandomRouter, connectionManager = new LocalConnectionManager(List(actor1)))
val actor = new RoutedActorRef(app, props, app.guardian, "foo")
actor.isShutdown must be(false)
}
"throw ConfigurationException at construction when no connections" in {
try {
val props = RoutedProps().withRandomRouter
app.actorOf(props, "foo")
fail()
} catch {
case e: ConfigurationException
}
}
"deliver messages in a random fashion" ignore {
}
"deliver a broadcast message" in {
val doneLatch = new CountDownLatch(2)
@ -248,8 +215,8 @@ class RoutingSpec extends AkkaSpec {
}
})
val props = RoutedProps().withRandomRouter.withLocalConnections(List(connection1, connection2))
val actor = app.actorOf(props, "foo")
val props = RoutedProps(routerFactory = () new RandomRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2)))
val actor = new RoutedActorRef(app, props, app.guardian, "foo")
actor ! Broadcast(1)
actor ! Broadcast("end")
@ -271,8 +238,8 @@ class RoutingSpec extends AkkaSpec {
}
})
val props = RoutedProps().withRandomRouter.withLocalConnections(List(connection1))
val actor = app.actorOf(props, "foo")
val props = RoutedProps(routerFactory = () new RandomRouter, connectionManager = new LocalConnectionManager(List(connection1)))
val actor = new RoutedActorRef(app, props, app.guardian, "foo")
try {
actor ? Broadcast(1)
@ -293,11 +260,9 @@ class RoutingSpec extends AkkaSpec {
val shutdownLatch = new TestLatch(1)
val props = RoutedProps()
.withLocalConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch))))
.withRouter(() new ScatterGatherFirstCompletedRouter())
val props = RoutedProps(routerFactory = () new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))))
val actor = app.actorOf(props, "foo")
val actor = new RoutedActorRef(app, props, app.guardian, "foo")
actor ! Broadcast(Stop(Some(0)))
@ -310,11 +275,9 @@ class RoutingSpec extends AkkaSpec {
val shutdownLatch = new TestLatch(2)
val props = RoutedProps()
.withLocalConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch))))
.withRouter(() new ScatterGatherFirstCompletedRouter())
val props = RoutedProps(routerFactory = () new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))))
val actor = app.actorOf(props, "foo")
val actor = new RoutedActorRef(app, props, app.guardian, "foo")
actor ! Broadcast(Stop())
@ -328,47 +291,27 @@ class RoutingSpec extends AkkaSpec {
"return the first response from connections, when all of them replied" in {
val props = RoutedProps()
.withLocalConnections(List(newActor(0), newActor(1)))
.withRouter(() new ScatterGatherFirstCompletedRouter())
val props = RoutedProps(routerFactory = () new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0), newActor(1))))
val actor = app.actorOf(props, "foo")
val actor = new RoutedActorRef(app, props, app.guardian, "foo")
(actor ? Broadcast("Hi!")).get.asInstanceOf[Int] must be(0)
}
"return the first response from connections, when some of them failed to reply" in {
val props = RoutedProps()
.withLocalConnections(List(newActor(0), newActor(1)))
.withRouter(() new ScatterGatherFirstCompletedRouter())
val props = RoutedProps(routerFactory = () new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0), newActor(1))))
val actor = app.actorOf(props, "foo")
val actor = new RoutedActorRef(app, props, app.guardian, "foo")
(actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1)
}
"be started when constructed" in {
val props = RoutedProps()
.withLocalConnections(List(newActor(0)))
.withRouter(() new ScatterGatherFirstCompletedRouter())
val actor = app.actorOf(props, "foo")
val props = RoutedProps(routerFactory = () new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0))))
val actor = new RoutedActorRef(app, props, app.guardian, "foo")
actor.isShutdown must be(false)
}
"throw ConfigurationException at construction when no connections" in {
val props = RoutedProps()
.withLocalConnections(List())
.withRouter(() new ScatterGatherFirstCompletedRouter())
try {
app.actorOf(props, "foo")
fail()
} catch {
case e: ConfigurationException
}
}
"deliver one-way messages in a round robin fashion" in {
@ -390,11 +333,9 @@ class RoutingSpec extends AkkaSpec {
connections = connections :+ connection
}
val props = RoutedProps()
.withLocalConnections(connections)
.withRouter(() new ScatterGatherFirstCompletedRouter())
val props = RoutedProps(routerFactory = () new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(connections))
val actor = app.actorOf(props, "foo")
val actor = new RoutedActorRef(app, props, app.guardian, "foo")
for (i 0 until iterationCount) {
for (k 0 until connectionCount) {
@ -431,11 +372,9 @@ class RoutingSpec extends AkkaSpec {
}
})
val props = RoutedProps.apply()
.withLocalConnections(List(connection1, connection2))
.withRouter(() new ScatterGatherFirstCompletedRouter())
val props = RoutedProps(routerFactory = () new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2)))
val actor = app.actorOf(props, "foo")
val actor = new RoutedActorRef(app, props, app.guardian, "foo")
actor ! Broadcast(1)
actor ! Broadcast("end")