Removing deployId from config, should be replaced with patterns in deployment configuration that is checked towards the address

This commit is contained in:
Viktor Klang 2011-09-19 15:20:52 +02:00
parent f993219f91
commit b66d45ec6c
14 changed files with 43 additions and 99 deletions

View file

@ -21,8 +21,6 @@ class ClusterSpec extends WordSpec with MustMatchers {
getInt("akka.cluster.connection-timeout") must equal(Some(60))
getInt("akka.cluster.remote-daemon-ack-timeout") must equal(Some(30))
getBool("akka.cluster.include-ref-node-in-replica-set") must equal(Some(true))
getString("akka.cluster.compression-scheme") must equal(Some(""))
getInt("akka.cluster.zlib-compression-level") must equal(Some(6))
getString("akka.cluster.layer") must equal(Some("akka.cluster.netty.NettyRemoteSupport"))
getString("akka.cluster.secure-cookie") must equal(Some(""))
getString("akka.cluster.log-directory") must equal(Some("_akka_cluster"))

View file

@ -19,11 +19,10 @@ class Ticket1111Spec extends WordSpec with MustMatchers {
val shutdownLatch = new CountDownLatch(1)
val props = RoutedProps()
.withDeployId("foo")
.withConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch))))
.withRouter(() new ScatterGatherFirstCompletedRouter())
val actor = Routing.actorOf(props)
val actor = Routing.actorOf(props, "foo")
actor ! Broadcast(Stop(Some(0)))
@ -36,12 +35,11 @@ class Ticket1111Spec extends WordSpec with MustMatchers {
val shutdownLatch = new CountDownLatch(2)
val props = RoutedProps.apply()
.withDeployId("foo")
val props = RoutedProps()
.withConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch))))
.withRouter(() new ScatterGatherFirstCompletedRouter())
val actor = Routing.actorOf(props)
val actor = Routing.actorOf(props, "foo")
actor ! Broadcast(Stop())
@ -55,48 +53,44 @@ class Ticket1111Spec extends WordSpec with MustMatchers {
"return the first response from connections, when all of them replied" in {
val props = RoutedProps.apply()
.withDeployId("foo")
val props = RoutedProps()
.withConnections(List(newActor(0), newActor(1)))
.withRouter(() new ScatterGatherFirstCompletedRouter())
val actor = Routing.actorOf(props)
val actor = Routing.actorOf(props, "foo")
(actor ? Broadcast("Hi!")).get.asInstanceOf[Int] must be(0)
}
"return the first response from connections, when some of them failed to reply" in {
val props = RoutedProps.apply()
.withDeployId("foo")
val props = RoutedProps()
.withConnections(List(newActor(0), newActor(1)))
.withRouter(() new ScatterGatherFirstCompletedRouter())
val actor = Routing.actorOf(props)
val actor = Routing.actorOf(props, "foo")
(actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1)
}
"be started when constructed" in {
val props = RoutedProps.apply()
.withDeployId("foo")
val props = RoutedProps()
.withConnections(List(newActor(0)))
.withRouter(() new ScatterGatherFirstCompletedRouter())
val actor = Routing.actorOf(props)
val actor = Routing.actorOf(props, "foo")
actor.isRunning must be(true)
}
"throw IllegalArgumentException at construction when no connections" in {
val props = RoutedProps.apply()
.withDeployId("foo")
val props = RoutedProps()
.withConnections(List())
.withRouter(() new ScatterGatherFirstCompletedRouter())
try {
Routing.actorOf(props)
Routing.actorOf(props, "foo")
fail()
} catch {
case e: IllegalArgumentException
@ -122,12 +116,11 @@ class Ticket1111Spec extends WordSpec with MustMatchers {
connections = connections :+ connection
}
val props = RoutedProps.apply()
.withDeployId("foo")
val props = RoutedProps()
.withConnections(connections)
.withRouter(() new ScatterGatherFirstCompletedRouter())
val actor = Routing.actorOf(props)
val actor = Routing.actorOf(props, "foo")
for (i 0 until iterationCount) {
for (k 0 until connectionCount) {
@ -165,11 +158,10 @@ class Ticket1111Spec extends WordSpec with MustMatchers {
})
val props = RoutedProps.apply()
.withDeployId("foo")
.withConnections(List(connection1, connection2))
.withRouter(() new ScatterGatherFirstCompletedRouter())
val actor = Routing.actorOf(props)
val actor = Routing.actorOf(props, "foo")
actor ! Broadcast(1)
actor ! Broadcast("end")

View file

@ -44,8 +44,8 @@ private[akka] class ActorRefProviders(
}
//FIXME Implement support for configuring by deployment ID etc
//FIXME If deployId matches an already created actor (Ahead-of-time deployed) return that actor
//FIXME If deployId exists in config, it will override the specified Props (should we attempt to merge?)
//FIXME If address matches an already created actor (Ahead-of-time deployed) return that actor
//FIXME If address exists in config, it will override the specified Props (should we attempt to merge?)
def actorOf(props: Props, address: String): ActorRef = {
@ -54,7 +54,7 @@ private[akka] class ActorRefProviders(
providers match {
case Nil None
case provider :: rest
provider.actorOf(props, address) match {
provider.actorOf(props, address) match { //WARNING FIXME RACE CONDITION NEEDS TO BE SOLVED
case None actorOf(props, address, rest) // recur
case ref ref
}
@ -112,13 +112,8 @@ class LocalActorRefProvider extends ActorRefProvider {
case None // it is not -> create it
// if 'Props.deployId' is not specified then use 'address' as 'deployId'
val deployId = props.deployId match {
case Props.`defaultDeployId` | null address
case other other
}
Deployer.lookupDeploymentFor(deployId) match { // see if the deployment already exists, if so use it, if not create actor
//WARNING FIXME HUGE RACE CONDITION THAT NEEDS GETTING FIXED
Deployer.lookupDeploymentFor(address) match { // see if the deployment already exists, if so use it, if not create actor
case Some(Deploy(_, _, router, _, Local))
// FIXME create RoutedActorRef if 'router' is specified

View file

@ -66,7 +66,6 @@ object Props {
* ActorRef configuration object, this is thread safe and fully sharable
*/
case class Props(creator: () Actor = Props.defaultCreator,
deployId: String = Props.defaultDeployId,
@transient dispatcher: MessageDispatcher = Props.defaultDispatcher,
timeout: Timeout = Props.defaultTimeout,
faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler,
@ -77,7 +76,6 @@ case class Props(creator: () ⇒ Actor = Props.defaultCreator,
*/
def this() = this(
creator = Props.defaultCreator,
deployId = Props.defaultDeployId,
dispatcher = Props.defaultDispatcher,
timeout = Props.defaultTimeout,
faultHandler = Props.defaultFaultHandler,
@ -95,12 +93,6 @@ case class Props(creator: () ⇒ Actor = Props.defaultCreator,
*/
def withCreator(c: Creator[Actor]) = copy(creator = () c.create)
/**
* Returns a new Props with the specified deployId set
* Java and Scala API
*/
def withDeployId(id: String) = copy(deployId = if (id eq null) "" else id)
/**
* Returns a new Props with the specified dispatcher set
* Java API

View file

@ -73,12 +73,15 @@ abstract class MessageDispatcher extends Serializable {
* Attaches the specified actor instance to this dispatcher
*/
final def attach(actor: ActorInstance) {
guard withGuard {
var promise = new ActorPromise(Timeout.never)(this)
guard.lock.lock()
try {
register(actor)
val promise = new ActorPromise(Timeout.never)(this)
dispatchMessage(new MessageInvocation(actor, Init, promise))
promise
}.get
} finally {
guard.lock.unlock()
}
promise.get
}
/**

View file

@ -73,7 +73,6 @@ object RoutedProps {
final val defaultTimeout = Actor.TIMEOUT
final val defaultRouterFactory = () new RoundRobinRouter
final val defaultDeployId = ""
final val defaultLocalOnly = !ReflectiveAccess.ClusterModule.isEnabled
final val defaultFailureDetectorFactory = (connections: Map[InetSocketAddress, ActorRef]) new RemoveConnectionOnFirstFailureLocalFailureDetector(connections.values)
@ -100,7 +99,6 @@ object RoutedProps {
case class RoutedProps(
routerFactory: () Router,
failureDetectorFactory: (Map[InetSocketAddress, ActorRef]) FailureDetector,
deployId: String,
connections: Iterable[ActorRef],
timeout: Timeout,
localOnly: Boolean) {
@ -108,18 +106,10 @@ case class RoutedProps(
def this() = this(
routerFactory = RoutedProps.defaultRouterFactory,
failureDetectorFactory = RoutedProps.defaultFailureDetectorFactory,
deployId = RoutedProps.defaultDeployId,
connections = List(),
timeout = RoutedProps.defaultTimeout,
localOnly = RoutedProps.defaultLocalOnly)
/**
* Returns a new RoutedProps with the specified deployId set
*
* Java and Scala API
*/
def withDeployId(id: String): RoutedProps = copy(deployId = if (id eq null) "" else id)
/**
* Returns a new RoutedProps configured with a random router.
*

View file

@ -134,10 +134,10 @@ object Routing {
/**
* FIXME: will very likely be moved to the ActorRef.
*/
def actorOf(props: RoutedProps): ActorRef = {
def actorOf(props: RoutedProps, address: String = newUuid().toString): ActorRef = {
//TODO Implement support for configuring by deployment ID etc
//TODO If deployId matches an already created actor (Ahead-of-time deployed) return that actor
//TODO If deployId exists in config, it will override the specified Props (should we attempt to merge?)
//TODO If address matches an already created actor (Ahead-of-time deployed) return that actor
//TODO If address exists in config, it will override the specified Props (should we attempt to merge?)
//TODO If the actor deployed uses a different config, then ignore or throw exception?
val clusteringEnabled = ReflectiveAccess.ClusterModule.isEnabled
@ -146,10 +146,10 @@ object Routing {
if (clusteringEnabled && !props.localOnly)
ReflectiveAccess.ClusterModule.newClusteredActorRef(props)
else {
if (props.connections.isEmpty)
if (props.connections.isEmpty) //FIXME Shouldn't this be checked when instance is created so that it works with linking instead of barfing?
throw new IllegalArgumentException("A routed actorRef can't have an empty connection set")
new RoutedActorRef(props)
new RoutedActorRef(props, address)
}
}
@ -188,9 +188,9 @@ object Routing {
new RoutedProps(
() router,
RoutedProps.defaultFailureDetectorFactory,
actorAddress,
connections,
RoutedProps.defaultTimeout, true))
RoutedProps.defaultTimeout, true),
actorAddress)
}
}
@ -201,8 +201,6 @@ abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) exte
val router = props.routerFactory()
def address = props.deployId
override def postMessageToMailbox(message: Any, channel: UntypedChannel) = {
val sender = channel match {
case ref: ActorRef Some(ref)
@ -225,7 +223,7 @@ abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) exte
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
* on (or more) of these actors.
*/
private[akka] class RoutedActorRef(val routedProps: RoutedProps) extends AbstractRoutedActorRef(routedProps) {
private[akka] class RoutedActorRef(val routedProps: RoutedProps, val address: String) extends AbstractRoutedActorRef(routedProps) {
@volatile
private var running: Boolean = true

View file

@ -57,10 +57,10 @@ object ClusterActorRef {
new ClusterActorRef(
RoutedProps()
.withDeployId(actorAddress)
.withTimeout(timeout)
.withRouter(routerFactory)
.withFailureDetector(failureDetectorFactory))
.withFailureDetector(failureDetectorFactory),
actorAddress)
}
/**
@ -80,7 +80,7 @@ object ClusterActorRef {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] class ClusterActorRef(props: RoutedProps) extends AbstractRoutedActorRef(props) {
private[akka] class ClusterActorRef(props: RoutedProps, val address: String) extends AbstractRoutedActorRef(props) {
import ClusterActorRef._

View file

@ -65,12 +65,7 @@ object Pi extends App {
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker])
// wrap them with a load-balancing router
val router = Routing.actorOf(
RoutedProps.apply
.withRoundRobinRouter
.withConnections(workers)
.withDeployId("pi")
)
val router = Routing.actorOf(RoutedProps().withRoundRobinRouter.withConnections(workers), "pi")
loadBalancerActor(CyclicIterator(workers))
//#create-workers

View file

@ -49,8 +49,7 @@ object HttpConcurrencyTestStress {
startCamelService
val workers = for (i 1 to 8) yield actorOf[HttpServerWorker]
val balancer = Routing.actorOf(
RoutedProps.apply.withRoundRobinRouter.withConnections(workers).withDeployId("loadbalancer"))
val balancer = Routing.actorOf(RoutedProps().withRoundRobinRouter.withConnections(workers), "loadbalancer")
//service.get.awaitEndpointActivation(1) {
// actorOf(new HttpServerActor(balancer))
//}

View file

@ -109,12 +109,7 @@ public class Pi {
workers.add(worker);
}
router = Routing.actorOf(
RoutedProps.apply()
.withRoundRobinRouter()
.withConnections(workers)
.withDeployId("pi")
);
router = Routing.actorOf(RoutedProps.apply().withRoundRobinRouter().withConnections(workers), "pi");
}
// message handler

View file

@ -58,11 +58,7 @@ object Pi extends App {
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker])
// wrap them with a load-balancing router
val router = Routing.actorOf(
RoutedProps.default
.withRoundRobinRouter
.withConnections(workers)
.withDeployId("pi"))
val router = Routing.actorOf(RoutedProps().withRoundRobinRouter.withConnections(workers), "pi")
// message handler
def receive = {

View file

@ -103,12 +103,7 @@ public class Pi {
workers.add(worker);
}
router = Routing.actorOf(
RoutedProps.apply()
.withConnections(workers)
.withRoundRobinRouter()
.withDeployId("pi")
);
router = Routing.actorOf(RoutedProps.apply().withConnections(workers).withRoundRobinRouter(), "pi");
}
@Override

View file

@ -53,11 +53,7 @@ object Pi extends App {
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker])
// wrap them with a load-balancing router
val router = Routing.actorOf(
RoutedProps.apply()
.withConnections(workers)
.withRoundRobinRouter
.withDeployId("pi"))
val router = Routing.actorOf(RoutedProps().withConnections(workers).withRoundRobinRouter, "pi")
// phase 1, can accept a Calculate message
def scatter: Receive = {