diff --git a/akka-actor-tests/src/test/scala/akka/actor/ClusterSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ClusterSpec.scala index c014bc0ec5..42486dd4cc 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ClusterSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ClusterSpec.scala @@ -21,8 +21,6 @@ class ClusterSpec extends WordSpec with MustMatchers { getInt("akka.cluster.connection-timeout") must equal(Some(60)) getInt("akka.cluster.remote-daemon-ack-timeout") must equal(Some(30)) getBool("akka.cluster.include-ref-node-in-replica-set") must equal(Some(true)) - getString("akka.cluster.compression-scheme") must equal(Some("")) - getInt("akka.cluster.zlib-compression-level") must equal(Some(6)) getString("akka.cluster.layer") must equal(Some("akka.cluster.netty.NettyRemoteSupport")) getString("akka.cluster.secure-cookie") must equal(Some("")) getString("akka.cluster.log-directory") must equal(Some("_akka_cluster")) diff --git a/akka-actor-tests/src/test/scala/akka/ticket/Ticket1111Spec.scala b/akka-actor-tests/src/test/scala/akka/ticket/Ticket1111Spec.scala index 64e1869eb8..ce22616396 100644 --- a/akka-actor-tests/src/test/scala/akka/ticket/Ticket1111Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/ticket/Ticket1111Spec.scala @@ -19,11 +19,10 @@ class Ticket1111Spec extends WordSpec with MustMatchers { val shutdownLatch = new CountDownLatch(1) val props = RoutedProps() - .withDeployId("foo") .withConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))) .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - val actor = Routing.actorOf(props) + val actor = Routing.actorOf(props, "foo") actor ! Broadcast(Stop(Some(0))) @@ -36,12 +35,11 @@ class Ticket1111Spec extends WordSpec with MustMatchers { val shutdownLatch = new CountDownLatch(2) - val props = RoutedProps.apply() - .withDeployId("foo") + val props = RoutedProps() .withConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))) .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - val actor = Routing.actorOf(props) + val actor = Routing.actorOf(props, "foo") actor ! Broadcast(Stop()) @@ -55,48 +53,44 @@ class Ticket1111Spec extends WordSpec with MustMatchers { "return the first response from connections, when all of them replied" in { - val props = RoutedProps.apply() - .withDeployId("foo") + val props = RoutedProps() .withConnections(List(newActor(0), newActor(1))) .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - val actor = Routing.actorOf(props) + val actor = Routing.actorOf(props, "foo") (actor ? Broadcast("Hi!")).get.asInstanceOf[Int] must be(0) } "return the first response from connections, when some of them failed to reply" in { - val props = RoutedProps.apply() - .withDeployId("foo") + val props = RoutedProps() .withConnections(List(newActor(0), newActor(1))) .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - val actor = Routing.actorOf(props) + val actor = Routing.actorOf(props, "foo") (actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) } "be started when constructed" in { - val props = RoutedProps.apply() - .withDeployId("foo") + val props = RoutedProps() .withConnections(List(newActor(0))) .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - val actor = Routing.actorOf(props) + val actor = Routing.actorOf(props, "foo") actor.isRunning must be(true) } "throw IllegalArgumentException at construction when no connections" in { - val props = RoutedProps.apply() - .withDeployId("foo") + val props = RoutedProps() .withConnections(List()) .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) try { - Routing.actorOf(props) + Routing.actorOf(props, "foo") fail() } catch { case e: IllegalArgumentException ⇒ @@ -122,12 +116,11 @@ class Ticket1111Spec extends WordSpec with MustMatchers { connections = connections :+ connection } - val props = RoutedProps.apply() - .withDeployId("foo") + val props = RoutedProps() .withConnections(connections) .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - val actor = Routing.actorOf(props) + val actor = Routing.actorOf(props, "foo") for (i ← 0 until iterationCount) { for (k ← 0 until connectionCount) { @@ -165,11 +158,10 @@ class Ticket1111Spec extends WordSpec with MustMatchers { }) val props = RoutedProps.apply() - .withDeployId("foo") .withConnections(List(connection1, connection2)) .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - val actor = Routing.actorOf(props) + val actor = Routing.actorOf(props, "foo") actor ! Broadcast(1) actor ! Broadcast("end") diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index fe63d82a09..bb07051de5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -44,8 +44,8 @@ private[akka] class ActorRefProviders( } //FIXME Implement support for configuring by deployment ID etc - //FIXME If deployId matches an already created actor (Ahead-of-time deployed) return that actor - //FIXME If deployId exists in config, it will override the specified Props (should we attempt to merge?) + //FIXME If address matches an already created actor (Ahead-of-time deployed) return that actor + //FIXME If address exists in config, it will override the specified Props (should we attempt to merge?) def actorOf(props: Props, address: String): ActorRef = { @@ -54,7 +54,7 @@ private[akka] class ActorRefProviders( providers match { case Nil ⇒ None case provider :: rest ⇒ - provider.actorOf(props, address) match { + provider.actorOf(props, address) match { //WARNING FIXME RACE CONDITION NEEDS TO BE SOLVED case None ⇒ actorOf(props, address, rest) // recur case ref ⇒ ref } @@ -112,13 +112,8 @@ class LocalActorRefProvider extends ActorRefProvider { case None ⇒ // it is not -> create it - // if 'Props.deployId' is not specified then use 'address' as 'deployId' - val deployId = props.deployId match { - case Props.`defaultDeployId` | null ⇒ address - case other ⇒ other - } - - Deployer.lookupDeploymentFor(deployId) match { // see if the deployment already exists, if so use it, if not create actor + //WARNING FIXME HUGE RACE CONDITION THAT NEEDS GETTING FIXED + Deployer.lookupDeploymentFor(address) match { // see if the deployment already exists, if so use it, if not create actor case Some(Deploy(_, _, router, _, Local)) ⇒ // FIXME create RoutedActorRef if 'router' is specified diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index b35c46b5db..a61d236fab 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -66,7 +66,6 @@ object Props { * ActorRef configuration object, this is thread safe and fully sharable */ case class Props(creator: () ⇒ Actor = Props.defaultCreator, - deployId: String = Props.defaultDeployId, @transient dispatcher: MessageDispatcher = Props.defaultDispatcher, timeout: Timeout = Props.defaultTimeout, faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler, @@ -77,7 +76,6 @@ case class Props(creator: () ⇒ Actor = Props.defaultCreator, */ def this() = this( creator = Props.defaultCreator, - deployId = Props.defaultDeployId, dispatcher = Props.defaultDispatcher, timeout = Props.defaultTimeout, faultHandler = Props.defaultFaultHandler, @@ -95,12 +93,6 @@ case class Props(creator: () ⇒ Actor = Props.defaultCreator, */ def withCreator(c: Creator[Actor]) = copy(creator = () ⇒ c.create) - /** - * Returns a new Props with the specified deployId set - * Java and Scala API - */ - def withDeployId(id: String) = copy(deployId = if (id eq null) "" else id) - /** * Returns a new Props with the specified dispatcher set * Java API diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 36782d9712..0466587e10 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -73,12 +73,15 @@ abstract class MessageDispatcher extends Serializable { * Attaches the specified actor instance to this dispatcher */ final def attach(actor: ActorInstance) { - guard withGuard { + var promise = new ActorPromise(Timeout.never)(this) + guard.lock.lock() + try { register(actor) - val promise = new ActorPromise(Timeout.never)(this) dispatchMessage(new MessageInvocation(actor, Init, promise)) - promise - }.get + } finally { + guard.lock.unlock() + } + promise.get } /** diff --git a/akka-actor/src/main/scala/akka/routing/RoutedProps.scala b/akka-actor/src/main/scala/akka/routing/RoutedProps.scala index e00dada3aa..5d113d34c2 100644 --- a/akka-actor/src/main/scala/akka/routing/RoutedProps.scala +++ b/akka-actor/src/main/scala/akka/routing/RoutedProps.scala @@ -73,7 +73,6 @@ object RoutedProps { final val defaultTimeout = Actor.TIMEOUT final val defaultRouterFactory = () ⇒ new RoundRobinRouter - final val defaultDeployId = "" final val defaultLocalOnly = !ReflectiveAccess.ClusterModule.isEnabled final val defaultFailureDetectorFactory = (connections: Map[InetSocketAddress, ActorRef]) ⇒ new RemoveConnectionOnFirstFailureLocalFailureDetector(connections.values) @@ -100,7 +99,6 @@ object RoutedProps { case class RoutedProps( routerFactory: () ⇒ Router, failureDetectorFactory: (Map[InetSocketAddress, ActorRef]) ⇒ FailureDetector, - deployId: String, connections: Iterable[ActorRef], timeout: Timeout, localOnly: Boolean) { @@ -108,18 +106,10 @@ case class RoutedProps( def this() = this( routerFactory = RoutedProps.defaultRouterFactory, failureDetectorFactory = RoutedProps.defaultFailureDetectorFactory, - deployId = RoutedProps.defaultDeployId, connections = List(), timeout = RoutedProps.defaultTimeout, localOnly = RoutedProps.defaultLocalOnly) - /** - * Returns a new RoutedProps with the specified deployId set - * - * Java and Scala API - */ - def withDeployId(id: String): RoutedProps = copy(deployId = if (id eq null) "" else id) - /** * Returns a new RoutedProps configured with a random router. * diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 32dcfc8438..c8a2e2261a 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -134,10 +134,10 @@ object Routing { /** * FIXME: will very likely be moved to the ActorRef. */ - def actorOf(props: RoutedProps): ActorRef = { + def actorOf(props: RoutedProps, address: String = newUuid().toString): ActorRef = { //TODO Implement support for configuring by deployment ID etc - //TODO If deployId matches an already created actor (Ahead-of-time deployed) return that actor - //TODO If deployId exists in config, it will override the specified Props (should we attempt to merge?) + //TODO If address matches an already created actor (Ahead-of-time deployed) return that actor + //TODO If address exists in config, it will override the specified Props (should we attempt to merge?) //TODO If the actor deployed uses a different config, then ignore or throw exception? val clusteringEnabled = ReflectiveAccess.ClusterModule.isEnabled @@ -146,10 +146,10 @@ object Routing { if (clusteringEnabled && !props.localOnly) ReflectiveAccess.ClusterModule.newClusteredActorRef(props) else { - if (props.connections.isEmpty) + if (props.connections.isEmpty) //FIXME Shouldn't this be checked when instance is created so that it works with linking instead of barfing? throw new IllegalArgumentException("A routed actorRef can't have an empty connection set") - new RoutedActorRef(props) + new RoutedActorRef(props, address) } } @@ -188,9 +188,9 @@ object Routing { new RoutedProps( () ⇒ router, RoutedProps.defaultFailureDetectorFactory, - actorAddress, connections, - RoutedProps.defaultTimeout, true)) + RoutedProps.defaultTimeout, true), + actorAddress) } } @@ -201,8 +201,6 @@ abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) exte val router = props.routerFactory() - def address = props.deployId - override def postMessageToMailbox(message: Any, channel: UntypedChannel) = { val sender = channel match { case ref: ActorRef ⇒ Some(ref) @@ -225,7 +223,7 @@ abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) exte * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to * on (or more) of these actors. */ -private[akka] class RoutedActorRef(val routedProps: RoutedProps) extends AbstractRoutedActorRef(routedProps) { +private[akka] class RoutedActorRef(val routedProps: RoutedProps, val address: String) extends AbstractRoutedActorRef(routedProps) { @volatile private var running: Boolean = true diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala index d677f7c663..43ea3586a9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala @@ -57,10 +57,10 @@ object ClusterActorRef { new ClusterActorRef( RoutedProps() - .withDeployId(actorAddress) .withTimeout(timeout) .withRouter(routerFactory) - .withFailureDetector(failureDetectorFactory)) + .withFailureDetector(failureDetectorFactory), + actorAddress) } /** @@ -80,7 +80,7 @@ object ClusterActorRef { * * @author Jonas Bonér */ -private[akka] class ClusterActorRef(props: RoutedProps) extends AbstractRoutedActorRef(props) { +private[akka] class ClusterActorRef(props: RoutedProps, val address: String) extends AbstractRoutedActorRef(props) { import ClusterActorRef._ diff --git a/akka-docs/disabled/examples/Pi.scala b/akka-docs/disabled/examples/Pi.scala index b70eb20f37..5d69a702d4 100644 --- a/akka-docs/disabled/examples/Pi.scala +++ b/akka-docs/disabled/examples/Pi.scala @@ -65,12 +65,7 @@ object Pi extends App { val workers = Vector.fill(nrOfWorkers)(actorOf[Worker]) // wrap them with a load-balancing router - val router = Routing.actorOf( - RoutedProps.apply - .withRoundRobinRouter - .withConnections(workers) - .withDeployId("pi") - ) + val router = Routing.actorOf(RoutedProps().withRoundRobinRouter.withConnections(workers), "pi") loadBalancerActor(CyclicIterator(workers)) //#create-workers diff --git a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala index 9b5afd4572..ee9532de17 100644 --- a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala +++ b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala @@ -49,8 +49,7 @@ object HttpConcurrencyTestStress { startCamelService val workers = for (i ← 1 to 8) yield actorOf[HttpServerWorker] - val balancer = Routing.actorOf( - RoutedProps.apply.withRoundRobinRouter.withConnections(workers).withDeployId("loadbalancer")) + val balancer = Routing.actorOf(RoutedProps().withRoundRobinRouter.withConnections(workers), "loadbalancer") //service.get.awaitEndpointActivation(1) { // actorOf(new HttpServerActor(balancer)) //} diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java index ab29cf5f52..9eb219ec56 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java +++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java @@ -109,12 +109,7 @@ public class Pi { workers.add(worker); } - router = Routing.actorOf( - RoutedProps.apply() - .withRoundRobinRouter() - .withConnections(workers) - .withDeployId("pi") - ); + router = Routing.actorOf(RoutedProps.apply().withRoundRobinRouter().withConnections(workers), "pi"); } // message handler diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index c5a10e41a3..1c5b876536 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -58,11 +58,7 @@ object Pi extends App { val workers = Vector.fill(nrOfWorkers)(actorOf[Worker]) // wrap them with a load-balancing router - val router = Routing.actorOf( - RoutedProps.default - .withRoundRobinRouter - .withConnections(workers) - .withDeployId("pi")) + val router = Routing.actorOf(RoutedProps().withRoundRobinRouter.withConnections(workers), "pi") // message handler def receive = { diff --git a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java index 51ec382fd5..304e4f76a1 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java +++ b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java @@ -103,12 +103,7 @@ public class Pi { workers.add(worker); } - router = Routing.actorOf( - RoutedProps.apply() - .withConnections(workers) - .withRoundRobinRouter() - .withDeployId("pi") - ); + router = Routing.actorOf(RoutedProps.apply().withConnections(workers).withRoundRobinRouter(), "pi"); } @Override diff --git a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala index 2f3717227a..6fe8d222ba 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala @@ -53,11 +53,7 @@ object Pi extends App { val workers = Vector.fill(nrOfWorkers)(actorOf[Worker]) // wrap them with a load-balancing router - val router = Routing.actorOf( - RoutedProps.apply() - .withConnections(workers) - .withRoundRobinRouter - .withDeployId("pi")) + val router = Routing.actorOf(RoutedProps().withConnections(workers).withRoundRobinRouter, "pi") // phase 1, can accept a Calculate message def scatter: Receive = {