diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 2bb7575d2b..8fea2d6f26 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -3,7 +3,7 @@ package akka.routing import akka.routing._ import akka.config.ConfigurationException import java.util.concurrent.atomic.AtomicInteger -import akka.actor.{ ActorRef, Actor } +import akka.actor._ import collection.mutable.LinkedList import akka.routing.Routing.Broadcast import java.util.concurrent.{ CountDownLatch, TimeUnit } @@ -28,21 +28,11 @@ class RoutingSpec extends AkkaSpec { "be started when constructed" in { val actor1 = actorOf[TestActor] - val props = RoutedProps().withDirectRouter.withLocalConnections(List(actor1)) - val actor = app.actorOf(props, "foo") + val props = RoutedProps(routerFactory = () ⇒ new DirectRouter, connectionManager = new LocalConnectionManager(List(actor1))) + val actor = new RoutedActorRef(app, props, app.guardian, "foo") actor.isShutdown must be(false) } - "throw ConfigurationException at construction when no connections" in { - try { - val props = RoutedProps().withDirectRouter - app.actorOf(props, "foo") - fail() - } catch { - case e: ConfigurationException ⇒ - } - } - "send message to connection" in { val doneLatch = new CountDownLatch(1) @@ -54,8 +44,8 @@ class RoutingSpec extends AkkaSpec { } }) - val props = RoutedProps().withDirectRouter.withLocalConnections(List(connection1)) - val routedActor = app.actorOf(props, "foo") + val props = RoutedProps(routerFactory = () ⇒ new DirectRouter, connectionManager = new LocalConnectionManager(List(connection1))) + val routedActor = new RoutedActorRef(app, props, app.guardian, "foo") routedActor ! "hello" routedActor ! "end" @@ -75,8 +65,8 @@ class RoutingSpec extends AkkaSpec { } }) - val props = RoutedProps().withDirectRouter.withLocalConnections(List(connection1)) - val actor = app.actorOf(props, "foo") + val props = RoutedProps(routerFactory = () ⇒ new DirectRouter, connectionManager = new LocalConnectionManager(List(connection1))) + val actor = new RoutedActorRef(app, props, app.guardian, "foo") actor ! Broadcast(1) actor ! "end" @@ -92,21 +82,11 @@ class RoutingSpec extends AkkaSpec { "be started when constructed" in { val actor1 = actorOf[TestActor] - val props = RoutedProps().withRoundRobinRouter.withLocalConnections(List(actor1)) - val actor = app.actorOf(props, "foo") + val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(actor1))) + val actor = new RoutedActorRef(app, props, app.guardian, "foo") actor.isShutdown must be(false) } - "throw ConfigurationException at construction when no connections" in { - try { - val props = RoutedProps().withRoundRobinRouter - app.actorOf(props, "foo") - fail() - } catch { - case e: ConfigurationException ⇒ - } - } - //In this test a bunch of actors are created and each actor has its own counter. //to test round robin, the routed actor receives the following sequence of messages 1 2 3 .. 1 2 3 .. 1 2 3 which it //uses to increment his counter. @@ -132,8 +112,8 @@ class RoutingSpec extends AkkaSpec { } //create the routed actor. - val props = RoutedProps().withRoundRobinRouter.withLocalConnections(connections) - val actor = app.actorOf(props, "foo") + val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(connections)) + val actor = new RoutedActorRef(app, props, app.guardian, "foo") //send messages to the actor. for (i ← 0 until iterationCount) { @@ -171,8 +151,8 @@ class RoutingSpec extends AkkaSpec { } }) - val props = RoutedProps().withRoundRobinRouter.withLocalConnections(List(connection1, connection2)) - val actor = app.actorOf(props, "foo") + val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2))) + val actor = new RoutedActorRef(app, props, app.guardian, "foo") actor ! Broadcast(1) actor ! Broadcast("end") @@ -194,7 +174,8 @@ class RoutingSpec extends AkkaSpec { } }) - val actor = app.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(List(connection1)), "foo") + val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(connection1))) + val actor = new RoutedActorRef(app, props, app.guardian, "foo") intercept[RoutingException] { actor ? Broadcast(1) } @@ -210,25 +191,11 @@ class RoutingSpec extends AkkaSpec { val actor1 = actorOf[TestActor] - val props = RoutedProps().withRandomRouter.withLocalConnections(List(actor1)) - val actor = app.actorOf(props, "foo") + val props = RoutedProps(routerFactory = () ⇒ new RandomRouter, connectionManager = new LocalConnectionManager(List(actor1))) + val actor = new RoutedActorRef(app, props, app.guardian, "foo") actor.isShutdown must be(false) } - "throw ConfigurationException at construction when no connections" in { - try { - val props = RoutedProps().withRandomRouter - app.actorOf(props, "foo") - fail() - } catch { - case e: ConfigurationException ⇒ - } - } - - "deliver messages in a random fashion" ignore { - - } - "deliver a broadcast message" in { val doneLatch = new CountDownLatch(2) @@ -248,8 +215,8 @@ class RoutingSpec extends AkkaSpec { } }) - val props = RoutedProps().withRandomRouter.withLocalConnections(List(connection1, connection2)) - val actor = app.actorOf(props, "foo") + val props = RoutedProps(routerFactory = () ⇒ new RandomRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2))) + val actor = new RoutedActorRef(app, props, app.guardian, "foo") actor ! Broadcast(1) actor ! Broadcast("end") @@ -271,8 +238,8 @@ class RoutingSpec extends AkkaSpec { } }) - val props = RoutedProps().withRandomRouter.withLocalConnections(List(connection1)) - val actor = app.actorOf(props, "foo") + val props = RoutedProps(routerFactory = () ⇒ new RandomRouter, connectionManager = new LocalConnectionManager(List(connection1))) + val actor = new RoutedActorRef(app, props, app.guardian, "foo") try { actor ? Broadcast(1) @@ -293,11 +260,9 @@ class RoutingSpec extends AkkaSpec { val shutdownLatch = new TestLatch(1) - val props = RoutedProps() - .withLocalConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))) - .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch))))) - val actor = app.actorOf(props, "foo") + val actor = new RoutedActorRef(app, props, app.guardian, "foo") actor ! Broadcast(Stop(Some(0))) @@ -310,11 +275,9 @@ class RoutingSpec extends AkkaSpec { val shutdownLatch = new TestLatch(2) - val props = RoutedProps() - .withLocalConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))) - .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch))))) - val actor = app.actorOf(props, "foo") + val actor = new RoutedActorRef(app, props, app.guardian, "foo") actor ! Broadcast(Stop()) @@ -328,47 +291,27 @@ class RoutingSpec extends AkkaSpec { "return the first response from connections, when all of them replied" in { - val props = RoutedProps() - .withLocalConnections(List(newActor(0), newActor(1))) - .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0), newActor(1)))) - val actor = app.actorOf(props, "foo") + val actor = new RoutedActorRef(app, props, app.guardian, "foo") (actor ? Broadcast("Hi!")).get.asInstanceOf[Int] must be(0) } "return the first response from connections, when some of them failed to reply" in { - val props = RoutedProps() - .withLocalConnections(List(newActor(0), newActor(1))) - .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0), newActor(1)))) - val actor = app.actorOf(props, "foo") + val actor = new RoutedActorRef(app, props, app.guardian, "foo") (actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) } "be started when constructed" in { - val props = RoutedProps() - .withLocalConnections(List(newActor(0))) - .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - val actor = app.actorOf(props, "foo") + val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0)))) + val actor = new RoutedActorRef(app, props, app.guardian, "foo") actor.isShutdown must be(false) - - } - - "throw ConfigurationException at construction when no connections" in { - val props = RoutedProps() - .withLocalConnections(List()) - .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - - try { - app.actorOf(props, "foo") - fail() - } catch { - case e: ConfigurationException ⇒ - } } "deliver one-way messages in a round robin fashion" in { @@ -390,11 +333,9 @@ class RoutingSpec extends AkkaSpec { connections = connections :+ connection } - val props = RoutedProps() - .withLocalConnections(connections) - .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(connections)) - val actor = app.actorOf(props, "foo") + val actor = new RoutedActorRef(app, props, app.guardian, "foo") for (i ← 0 until iterationCount) { for (k ← 0 until connectionCount) { @@ -431,11 +372,9 @@ class RoutingSpec extends AkkaSpec { } }) - val props = RoutedProps.apply() - .withLocalConnections(List(connection1, connection2)) - .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2))) - val actor = app.actorOf(props, "foo") + val actor = new RoutedActorRef(app, props, app.guardian, "foo") actor ! Broadcast(1) actor ! Broadcast("end") diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index b644e5e184..7e1c881359 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -22,8 +22,6 @@ trait ActorRefProvider { def actorOf(props: Props, supervisor: ActorRef, name: String): ActorRef = actorOf(props, supervisor, name, false) - def actorOf(props: RoutedProps, supervisor: ActorRef, name: String): ActorRef - def actorFor(path: Iterable[String]): Option[ActorRef] /** @@ -87,10 +85,6 @@ trait ActorRefFactory { def actorOf(creator: UntypedActorFactory): ActorRef = actorOf(Props(() ⇒ creator.create())) - def actorOf(props: RoutedProps): ActorRef = actorOf(props, Props.randomName) - - def actorOf(props: RoutedProps, name: String): ActorRef = provider.actorOf(props, guardian, name) - def actorFor(path: String): Option[ActorRef] = actorFor(ActorPath.split(path)) def actorFor(path: Iterable[String]): Option[ActorRef] = provider.actorFor(path) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 86c47522b0..31dd6fb55d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -247,10 +247,9 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF def /(actorName: String): ActorPath = guardian.path / actorName // TODO shutdown all that other stuff, whatever that may be - def stop(): Unit = { + def stop() { guardian.stop() } terminationFuture.onComplete(_ ⇒ dispatcher.shutdown()) - } diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala index 0d005f6363..0184ad9fef 100644 --- a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala +++ b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala @@ -5,8 +5,7 @@ package akka.actor import akka.util.Duration -import akka.routing.{ RouterType, FailureDetectorType } -import akka.routing.FailureDetectorType._ +import akka.routing.RouterType import akka.remote.RemoteAddress object DeploymentConfig { diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala index 400c8e8de1..37c75716d5 100644 --- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala @@ -414,7 +414,7 @@ trait ClusterNode { /** * Creates an ActorRef with a Router to a set of clustered actors. */ - def ref(actorAddress: String, router: RouterType, failureDetector: FailureDetectorType): ActorRef + def ref(actorAddress: String, router: RouterType): ActorRef /** * Returns the addresses of all actors checked out on this node. diff --git a/akka-actor/src/main/scala/akka/routing/RoutedProps.scala b/akka-actor/src/main/scala/akka/routing/RoutedProps.scala index 9492824cc6..7f2e094ea3 100644 --- a/akka-actor/src/main/scala/akka/routing/RoutedProps.scala +++ b/akka-actor/src/main/scala/akka/routing/RoutedProps.scala @@ -4,27 +4,8 @@ package akka.routing -import akka.util.Duration import akka.actor._ -import akka.util.{ ReflectiveAccess, Duration } - -import java.net.InetSocketAddress - -import scala.collection.JavaConversions.{ iterableAsScalaIterable, mapAsScalaMap } - -sealed trait FailureDetectorType - -/** - * Used for declarative configuration of failure detection. - * - * @author Jonas Bonér - */ -object FailureDetectorType { - case object NoOp extends FailureDetectorType - case object RemoveConnectionOnFirstFailure extends FailureDetectorType - case class BannagePeriod(timeToBan: Duration) extends FailureDetectorType - case class Custom(className: String) extends FailureDetectorType -} +import akka.util.Duration sealed trait RouterType @@ -71,115 +52,21 @@ object RouterType { * A user-defined custom RouterType. */ case class Custom(implClass: String) extends RouterType - } /** * Contains the configuration to create local and clustered routed actor references. - * * Routed ActorRef configuration object, this is thread safe and fully sharable. - * - * Because the Routers are stateful, a new Router instance needs to be created for every ActorRef that relies on routing - * (currently the ClusterActorRef and the RoutedActorRef). That is why a Router factory is used (a function that returns - * a new Router instance) instead of a single Router instance. This makes sharing the same RoutedProps between multiple - * threads safe. - * - * This configuration object makes it possible to either. */ -case class RoutedProps( - routerFactory: () ⇒ Router, - connectionManager: ConnectionManager, +case class RoutedProps private[akka] ( + routerFactory: () ⇒ Router = RoutedProps.defaultRouterFactory, + connectionManager: ConnectionManager = new LocalConnectionManager(List()), timeout: Timeout = RoutedProps.defaultTimeout, localOnly: Boolean = RoutedProps.defaultLocalOnly) { - - def this() = this(RoutedProps.defaultRouterFactory, new LocalConnectionManager(List())) - - /** - * Returns a new RoutedProps configured with a random router. - * - * Java and Scala API. - */ - def withRandomRouter: RoutedProps = copy(routerFactory = () ⇒ new RandomRouter) - - /** - * Returns a new RoutedProps configured with a round robin router. - * - * Java and Scala API. - */ - def withRoundRobinRouter: RoutedProps = copy(routerFactory = () ⇒ new RoundRobinRouter) - - /** - * Returns a new RoutedProps configured with a direct router. - * - * Java and Scala API. - */ - def withDirectRouter: RoutedProps = copy(routerFactory = () ⇒ new DirectRouter) - - /** - * Makes it possible to change the default behavior in a clustered environment that a clustered actor ref is created. - * In some cases you just want to have local actor references, even though the Cluster Module is up and running. - * - * Java and Scala API. - */ - def withLocalOnly(l: Boolean = true) = copy(localOnly = l) - - /** - * Sets the Router factory method to use. Since Router instance contain state, and should be linked to a single 'routed' ActorRef, a new - * Router instance is needed for every 'routed' ActorRef. That is why a 'factory' function is used to create new - * instances. - * - * Scala API. - */ - def withRouter(f: () ⇒ Router): RoutedProps = copy(routerFactory = f) - - /** - * Sets the RouterFactory to use. Since Router instance contain state, and should be linked to a single 'routed' ActorRef, a new - * Router instance is needed for every 'routed' ActorRef. That is why a RouterFactory interface is used to create new - * instances. - * - * Java API. - */ - def withRouter(f: RouterFactory): RoutedProps = copy(routerFactory = () ⇒ f.newRouter()) - - /** - * - */ - def withTimeout(t: Timeout): RoutedProps = copy(timeout = t) - - /** - * Sets the connections to use. - * - * Scala API. - */ - def withLocalConnections(c: Iterable[ActorRef]): RoutedProps = copy(connectionManager = new LocalConnectionManager(c)) - - /** - * Sets the connections to use. - * - * Java API. - */ - def withLocalConnections(c: java.lang.Iterable[ActorRef]): RoutedProps = copy(connectionManager = new LocalConnectionManager(iterableAsScalaIterable(c))) - - /** - * Sets the connections to use. - * - * Scala API. - */ - // def withRemoteConnections(c: Map[InetSocketAddress, ActorRef]): RoutedProps = copy(connectionManager = new RemoteConnectionManager(c)) - - /** - * Sets the connections to use. - * - * Java API. - */ - // def withRemoteConnections(c: java.util.collection.Map[InetSocketAddress, ActorRef]): RoutedProps = copy(connectionManager = new RemoteConnectionManager(mapAsScalaMap(c))) } object RoutedProps { final val defaultTimeout = Timeout(Duration.MinusInf) final val defaultRouterFactory = () ⇒ new RoundRobinRouter final val defaultLocalOnly = false - - def apply() = new RoutedProps() } - diff --git a/akka-docs/intro/code/tutorials/first/Pi.scala b/akka-docs/intro/code/tutorials/first/Pi.scala index 91ed9be29c..6bbf05ee27 100644 --- a/akka-docs/intro/code/tutorials/first/Pi.scala +++ b/akka-docs/intro/code/tutorials/first/Pi.scala @@ -1,131 +1,132 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ +// /** +// * Copyright (C) 2009-2011 Typesafe Inc. +// */ -//#imports -package akka.tutorial.first.scala +// //#imports +// package akka.tutorial.first.scala -import akka.actor.{ Actor, ActorSystem, PoisonPill } -import akka.routing.Routing.Broadcast -import akka.routing.{ RoutedProps, Routing } -import java.util.concurrent.CountDownLatch -//#imports +// import akka.actor.{ Actor, ActorSystem, PoisonPill } +// import akka.routing.Routing.Broadcast +// import akka.routing.{ RoutedProps, Routing } +// import java.util.concurrent.CountDownLatch +// //#imports -//#app -object Pi extends App { +// //#app +// object Pi extends App { - val app = ActorSystem() +// val app = ActorSystem() - calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) +// calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) - //#actors-and-messages - // ==================== - // ===== Messages ===== - // ==================== - //#messages - sealed trait PiMessage +// //#actors-and-messages +// // ==================== +// // ===== Messages ===== +// // ==================== +// //#messages +// sealed trait PiMessage - case object Calculate extends PiMessage +// case object Calculate extends PiMessage - case class Work(start: Int, nrOfElements: Int) extends PiMessage +// case class Work(start: Int, nrOfElements: Int) extends PiMessage - case class Result(value: Double) extends PiMessage - //#messages +// case class Result(value: Double) extends PiMessage +// //#messages - // ================== - // ===== Worker ===== - // ================== - //#worker - class Worker extends Actor { +// // ================== +// // ===== Worker ===== +// // ================== +// //#worker +// class Worker extends Actor { - // define the work - //#calculatePiFor - def calculatePiFor(start: Int, nrOfElements: Int): Double = { - var acc = 0.0 - for (i ← start until (start + nrOfElements)) - acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1) - acc - } - //#calculatePiFor +// // define the work +// //#calculatePiFor +// def calculatePiFor(start: Int, nrOfElements: Int): Double = { +// var acc = 0.0 +// for (i ← start until (start + nrOfElements)) +// acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1) +// acc +// } +// //#calculatePiFor - def receive = { - case Work(start, nrOfElements) ⇒ sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work - } - } - //#worker +// def receive = { +// case Work(start, nrOfElements) ⇒ sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work +// } +// } +// //#worker - // ================== - // ===== Master ===== - // ================== - //#master - class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch) extends Actor { +// // ================== +// // ===== Master ===== +// // ================== +// //#master +// class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch) extends Actor { - var pi: Double = _ - var nrOfResults: Int = _ - var start: Long = _ +// var pi: Double = _ +// var nrOfResults: Int = _ +// var start: Long = _ - //#create-workers - // create the workers - val workers = Vector.fill(nrOfWorkers)(app.actorOf[Worker]) +// //#create-workers +// // create the workers +// val workers = Vector.fill(nrOfWorkers)(app.actorOf[Worker]) - // wrap them with a load-balancing router - val router = app.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi") - //#create-workers +// // wrap them with a load-balancing router +// val router = app.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi") +// //#create-workers - //#master-receive - // message handler - def receive = { - //#handle-messages - case Calculate ⇒ - // schedule work - for (i ← 0 until nrOfMessages) router ! Work(i * nrOfElements, nrOfElements) +// //#master-receive +// // message handler +// def receive = { +// //#handle-messages +// case Calculate ⇒ +// // schedule work +// for (i ← 0 until nrOfMessages) router ! Work(i * nrOfElements, nrOfElements) - // send a PoisonPill to all workers telling them to shut down themselves - router ! Broadcast(PoisonPill) +// // send a PoisonPill to all workers telling them to shut down themselves +// router ! Broadcast(PoisonPill) - // send a PoisonPill to the router, telling him to shut himself down - router ! PoisonPill +// // send a PoisonPill to the router, telling him to shut himself down +// router ! PoisonPill - case Result(value) ⇒ - // handle result from the worker - pi += value - nrOfResults += 1 - if (nrOfResults == nrOfMessages) self.stop() - //#handle-messages - } - //#master-receive +// case Result(value) ⇒ +// // handle result from the worker +// pi += value +// nrOfResults += 1 +// if (nrOfResults == nrOfMessages) self.stop() +// //#handle-messages +// } +// //#master-receive - override def preStart() { - start = System.currentTimeMillis - } +// override def preStart() { +// start = System.currentTimeMillis +// } - override def postStop() { - // tell the world that the calculation is complete - println( - "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis" - .format(pi, (System.currentTimeMillis - start))) - latch.countDown() - } - } - //#master - //#actors-and-messages +// override def postStop() { +// // tell the world that the calculation is complete +// println( +// "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis" +// .format(pi, (System.currentTimeMillis - start))) +// latch.countDown() +// } +// } +// //#master +// //#actors-and-messages - // ================== - // ===== Run it ===== - // ================== - def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { +// // ================== +// // ===== Run it ===== +// // ================== +// def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { - // this latch is only plumbing to know when the calculation is completed - val latch = new CountDownLatch(1) +// // this latch is only plumbing to know when the calculation is completed +// val latch = new CountDownLatch(1) - // create the master - val master = app.actorOf(new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)) +// // create the master +// val master = app.actorOf(new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)) - // start the calculation - master ! Calculate +// // start the calculation +// master ! Calculate + +// // wait for master to shut down +// latch.await() +// } +// } +// //#app - // wait for master to shut down - latch.await() - } -} -//#app diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java index 21e257bc63..f73550f16a 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java +++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java @@ -1,182 +1,182 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ +// * +// * Copyright (C) 2009-2011 Typesafe Inc. -package akka.tutorial.first.java; -import static akka.actor.Actors.poisonPill; -import static java.util.Arrays.asList; +// package akka.tutorial.first.java; -import akka.actor.ActorRef; -import akka.actor.Actors; -import akka.actor.ActorSystem; -import akka.actor.UntypedActor; -import akka.actor.UntypedActorFactory; -import akka.routing.RoutedProps; -import akka.routing.RouterType; -import akka.routing.LocalConnectionManager; -import akka.routing.Routing; -import akka.routing.Routing.Broadcast; -import scala.collection.JavaConversions; +// import static akka.actor.Actors.poisonPill; +// import static java.util.Arrays.asList; -import java.util.LinkedList; -import java.util.concurrent.CountDownLatch; +// import akka.actor.ActorRef; +// import akka.actor.Actors; +// import akka.actor.ActorSystem; +// import akka.actor.UntypedActor; +// import akka.actor.UntypedActorFactory; +// import akka.routing.RoutedProps; +// import akka.routing.RouterType; +// import akka.routing.LocalConnectionManager; +// import akka.routing.Routing; +// import akka.routing.Routing.Broadcast; +// import scala.collection.JavaConversions; -public class Pi { +// import java.util.LinkedList; +// import java.util.concurrent.CountDownLatch; - private static final ActorSystem app = new ActorSystem(); +// public class Pi { - public static void main(String[] args) throws Exception { - Pi pi = new Pi(); - pi.calculate(4, 10000, 10000); - } +// private static final ActorSystem app = new ActorSystem(); - // ==================== - // ===== Messages ===== - // ==================== - static class Calculate {} +// public static void main(String[] args) throws Exception { +// Pi pi = new Pi(); +// pi.calculate(4, 10000, 10000); +// } - static class Work { - private final int start; - private final int nrOfElements; +// // ==================== +// // ===== Messages ===== +// // ==================== +// static class Calculate {} - public Work(int start, int nrOfElements) { - this.start = start; - this.nrOfElements = nrOfElements; - } +// static class Work { +// private final int start; +// private final int nrOfElements; - public int getStart() { return start; } - public int getNrOfElements() { return nrOfElements; } - } +// public Work(int start, int nrOfElements) { +// this.start = start; +// this.nrOfElements = nrOfElements; +// } - static class Result { - private final double value; +// public int getStart() { return start; } +// public int getNrOfElements() { return nrOfElements; } +// } - public Result(double value) { - this.value = value; - } +// static class Result { +// private final double value; - public double getValue() { return value; } - } +// public Result(double value) { +// this.value = value; +// } - // ================== - // ===== Worker ===== - // ================== - static class Worker extends UntypedActor { +// public double getValue() { return value; } +// } - // define the work - private double calculatePiFor(int start, int nrOfElements) { - double acc = 0.0; - for (int i = start * nrOfElements; i <= ((start + 1) * nrOfElements - 1); i++) { - acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1); - } - return acc; - } +// // ================== +// // ===== Worker ===== +// // ================== +// static class Worker extends UntypedActor { - // message handler - public void onReceive(Object message) { - if (message instanceof Work) { - Work work = (Work) message; +// // define the work +// private double calculatePiFor(int start, int nrOfElements) { +// double acc = 0.0; +// for (int i = start * nrOfElements; i <= ((start + 1) * nrOfElements - 1); i++) { +// acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1); +// } +// return acc; +// } - // perform the work - double result = calculatePiFor(work.getStart(), work.getNrOfElements()); +// // message handler +// public void onReceive(Object message) { +// if (message instanceof Work) { +// Work work = (Work) message; - // reply with the result - getSender().tell(new Result(result)); +// // perform the work +// double result = calculatePiFor(work.getStart(), work.getNrOfElements()); - } else throw new IllegalArgumentException("Unknown message [" + message + "]"); - } - } +// // reply with the result +// getSender().tell(new Result(result)); - // ================== - // ===== Master ===== - // ================== - static class Master extends UntypedActor { - private final int nrOfMessages; - private final int nrOfElements; - private final CountDownLatch latch; +// } else throw new IllegalArgumentException("Unknown message [" + message + "]"); +// } +// } - private double pi; - private int nrOfResults; - private long start; +// // ================== +// // ===== Master ===== +// // ================== +// static class Master extends UntypedActor { +// private final int nrOfMessages; +// private final int nrOfElements; +// private final CountDownLatch latch; - private ActorRef router; +// private double pi; +// private int nrOfResults; +// private long start; - public Master(int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) { - this.nrOfMessages = nrOfMessages; - this.nrOfElements = nrOfElements; - this.latch = latch; +// private ActorRef router; - LinkedList workers = new LinkedList(); - for (int i = 0; i < nrOfWorkers; i++) { - ActorRef worker = app.actorOf(Worker.class); - workers.add(worker); - } +// public Master(int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) { +// this.nrOfMessages = nrOfMessages; +// this.nrOfElements = nrOfElements; +// this.latch = latch; - router = app.actorOf(new RoutedProps().withRoundRobinRouter().withLocalConnections(workers), "pi"); - } +// LinkedList workers = new LinkedList(); +// for (int i = 0; i < nrOfWorkers; i++) { +// ActorRef worker = app.actorOf(Worker.class); +// workers.add(worker); +// } - // message handler - public void onReceive(Object message) { +// router = app.actorOf(new RoutedProps().withRoundRobinRouter().withLocalConnections(workers), "pi"); +// } - if (message instanceof Calculate) { - // schedule work - for (int start = 0; start < nrOfMessages; start++) { - router.tell(new Work(start, nrOfElements), getSelf()); - } +// // message handler +// public void onReceive(Object message) { - // send a PoisonPill to all workers telling them to shut down themselves - router.tell(new Broadcast(poisonPill())); +// if (message instanceof Calculate) { +// // schedule work +// for (int start = 0; start < nrOfMessages; start++) { +// router.tell(new Work(start, nrOfElements), getSelf()); +// } - // send a PoisonPill to the router, telling him to shut himself down - router.tell(poisonPill()); +// // send a PoisonPill to all workers telling them to shut down themselves +// router.tell(new Broadcast(poisonPill())); - } else if (message instanceof Result) { +// // send a PoisonPill to the router, telling him to shut himself down +// router.tell(poisonPill()); - // handle result from the worker - Result result = (Result) message; - pi += result.getValue(); - nrOfResults += 1; - if (nrOfResults == nrOfMessages) getSelf().stop(); +// } else if (message instanceof Result) { - } else throw new IllegalArgumentException("Unknown message [" + message + "]"); - } +// // handle result from the worker +// Result result = (Result) message; +// pi += result.getValue(); +// nrOfResults += 1; +// if (nrOfResults == nrOfMessages) getSelf().stop(); - @Override - public void preStart() { - start = System.currentTimeMillis(); - } +// } else throw new IllegalArgumentException("Unknown message [" + message + "]"); +// } - @Override - public void postStop() { - // tell the world that the calculation is complete - System.out.println(String.format( - "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis", - pi, (System.currentTimeMillis() - start))); - latch.countDown(); - } - } +// @Override +// public void preStart() { +// start = System.currentTimeMillis(); +// } - // ================== - // ===== Run it ===== - // ================== - public void calculate(final int nrOfWorkers, final int nrOfElements, final int nrOfMessages) - throws Exception { +// @Override +// public void postStop() { +// // tell the world that the calculation is complete +// System.out.println(String.format( +// "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis", +// pi, (System.currentTimeMillis() - start))); +// latch.countDown(); +// } +// } - // this latch is only plumbing to know when the calculation is completed - final CountDownLatch latch = new CountDownLatch(1); +// // ================== +// // ===== Run it ===== +// // ================== +// public void calculate(final int nrOfWorkers, final int nrOfElements, final int nrOfMessages) +// throws Exception { - // create the master - ActorRef master = app.actorOf(new UntypedActorFactory() { - public UntypedActor create() { - return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch); - } - }); +// // this latch is only plumbing to know when the calculation is completed +// final CountDownLatch latch = new CountDownLatch(1); - // start the calculation - master.tell(new Calculate()); +// // create the master +// ActorRef master = app.actorOf(new UntypedActorFactory() { +// public UntypedActor create() { +// return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch); +// } +// }); - // wait for master to shut down - latch.await(); - } -} +// // start the calculation +// master.tell(new Calculate()); + +// // wait for master to shut down +// latch.await(); +// } +// } diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index 3950455c15..3ea86ae6d6 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -1,113 +1,113 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ +// /** +// * Copyright (C) 2009-2011 Typesafe Inc. +// */ -package akka.tutorial.first.scala +// package akka.tutorial.first.scala -import akka.actor.{ Actor, PoisonPill, ActorSystem } -import Actor._ -import java.util.concurrent.CountDownLatch -import akka.routing.Routing.Broadcast -import akka.routing.{ RoutedProps, Routing } +// import akka.actor.{ Actor, PoisonPill, ActorSystem } +// import Actor._ +// import java.util.concurrent.CountDownLatch +// import akka.routing.Routing.Broadcast +// import akka.routing.{ RoutedProps, Routing } -object Pi extends App { +// object Pi extends App { - val app = ActorSystem() +// val app = ActorSystem() - calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) +// calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) - // ==================== - // ===== Messages ===== - // ==================== - sealed trait PiMessage +// // ==================== +// // ===== Messages ===== +// // ==================== +// sealed trait PiMessage - case object Calculate extends PiMessage +// case object Calculate extends PiMessage - case class Work(start: Int, nrOfElements: Int) extends PiMessage +// case class Work(start: Int, nrOfElements: Int) extends PiMessage - case class Result(value: Double) extends PiMessage +// case class Result(value: Double) extends PiMessage - // ================== - // ===== Worker ===== - // ================== - class Worker extends Actor { +// // ================== +// // ===== Worker ===== +// // ================== +// class Worker extends Actor { - // define the work - def calculatePiFor(start: Int, nrOfElements: Int): Double = { - var acc = 0.0 - for (i ← start until (start + nrOfElements)) - acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1) - acc - } +// // define the work +// def calculatePiFor(start: Int, nrOfElements: Int): Double = { +// var acc = 0.0 +// for (i ← start until (start + nrOfElements)) +// acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1) +// acc +// } - def receive = { - case Work(start, nrOfElements) ⇒ sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work - } - } +// def receive = { +// case Work(start, nrOfElements) ⇒ sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work +// } +// } - // ================== - // ===== Master ===== - // ================== - class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch) - extends Actor { +// // ================== +// // ===== Master ===== +// // ================== +// class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch) +// extends Actor { - var pi: Double = _ - var nrOfResults: Int = _ - var start: Long = _ +// var pi: Double = _ +// var nrOfResults: Int = _ +// var start: Long = _ - // create the workers - val workers = Vector.fill(nrOfWorkers)(app.actorOf[Worker]) +// // create the workers +// val workers = Vector.fill(nrOfWorkers)(app.actorOf[Worker]) - // wrap them with a load-balancing router - val router = app.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi") +// // wrap them with a load-balancing router +// val router = app.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi") - // message handler - def receive = { - case Calculate ⇒ - // schedule work - for (i ← 0 until nrOfMessages) router ! Work(i * nrOfElements, nrOfElements) +// // message handler +// def receive = { +// case Calculate ⇒ +// // schedule work +// for (i ← 0 until nrOfMessages) router ! Work(i * nrOfElements, nrOfElements) - // send a PoisonPill to all workers telling them to shut down themselves - router ! Broadcast(PoisonPill) +// // send a PoisonPill to all workers telling them to shut down themselves +// router ! Broadcast(PoisonPill) - // send a PoisonPill to the router, telling him to shut himself down - router ! PoisonPill +// // send a PoisonPill to the router, telling him to shut himself down +// router ! PoisonPill - case Result(value) ⇒ - // handle result from the worker - pi += value - nrOfResults += 1 - if (nrOfResults == nrOfMessages) self.stop() - } +// case Result(value) ⇒ +// // handle result from the worker +// pi += value +// nrOfResults += 1 +// if (nrOfResults == nrOfMessages) self.stop() +// } - override def preStart() { - start = System.currentTimeMillis - } +// override def preStart() { +// start = System.currentTimeMillis +// } - override def postStop() { - // tell the world that the calculation is complete - println( - "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis" - .format(pi, (System.currentTimeMillis - start))) - latch.countDown() - } - } +// override def postStop() { +// // tell the world that the calculation is complete +// println( +// "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis" +// .format(pi, (System.currentTimeMillis - start))) +// latch.countDown() +// } +// } - // ================== - // ===== Run it ===== - // ================== - def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { +// // ================== +// // ===== Run it ===== +// // ================== +// def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { - // this latch is only plumbing to know when the calculation is completed - val latch = new CountDownLatch(1) +// // this latch is only plumbing to know when the calculation is completed +// val latch = new CountDownLatch(1) - // create the master - val master = app.actorOf(new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)) +// // create the master +// val master = app.actorOf(new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)) - // start the calculation - master ! Calculate +// // start the calculation +// master ! Calculate - // wait for master to shut down - latch.await() - } -} +// // wait for master to shut down +// latch.await() +// } +// } diff --git a/akka-tutorials/akka-tutorial-second/README b/akka-tutorials/akka-tutorial-second/README deleted file mode 100644 index ed31bbca22..0000000000 --- a/akka-tutorials/akka-tutorial-second/README +++ /dev/null @@ -1,7 +0,0 @@ -================= - Second Tutorial -================= - -This is the source code for the second tutorial. - -See the Akka Documentation for information about this tutorial. diff --git a/akka-tutorials/akka-tutorial-second/pom.xml b/akka-tutorials/akka-tutorial-second/pom.xml deleted file mode 100644 index e5793c90cc..0000000000 --- a/akka-tutorials/akka-tutorial-second/pom.xml +++ /dev/null @@ -1,43 +0,0 @@ - - - 4.0.0 - - akka-tutorial-second-java - akka.tutorial.second.java - akka-tutorial-second-java - jar - 2.0-SNAPSHOT - http://akka.io - - - - se.scalablesolutions.akka - akka-actor - 2.0-SNAPSHOT - - - - - - Akka - Akka Maven2 Repository - http://akka.io/repository/ - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 2.3.2 - - 1.6 - 1.6 - - - - - diff --git a/akka-tutorials/akka-tutorial-second/project/build.properties b/akka-tutorials/akka-tutorial-second/project/build.properties deleted file mode 100644 index fd354dd902..0000000000 --- a/akka-tutorials/akka-tutorial-second/project/build.properties +++ /dev/null @@ -1,5 +0,0 @@ -project.organization=se.scalablesolutions.akka -project.name=akka-tutorial-second -project.version=2.0-SNAPSHOT -build.scala.versions=2.9.0 -sbt.version=0.7.7 diff --git a/akka-tutorials/akka-tutorial-second/project/build/Project.scala b/akka-tutorials/akka-tutorial-second/project/build/Project.scala deleted file mode 100644 index 1d0b230149..0000000000 --- a/akka-tutorials/akka-tutorial-second/project/build/Project.scala +++ /dev/null @@ -1,3 +0,0 @@ -import sbt._ - -class TutorialTwoProject(info: ProjectInfo) extends DefaultProject(info) with AkkaProject diff --git a/akka-tutorials/akka-tutorial-second/project/plugins/Plugins.scala b/akka-tutorials/akka-tutorial-second/project/plugins/Plugins.scala deleted file mode 100644 index fb121fcd3e..0000000000 --- a/akka-tutorials/akka-tutorial-second/project/plugins/Plugins.scala +++ /dev/null @@ -1,6 +0,0 @@ -import sbt._ - -class Plugins(info: ProjectInfo) extends PluginDefinition(info) { - val akkaRepo = "Akka Repo" at "http://akka.io/repository" - val akkaPlugin = "se.scalablesolutions.akka" % "akka-sbt-plugin" % "2.0-SNAPSHOT" -} diff --git a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java deleted file mode 100644 index f078eb3ab7..0000000000 --- a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.tutorial.java.second; - -import static akka.actor.Actors.poisonPill; -import static java.lang.System.currentTimeMillis; -import static java.util.Arrays.asList; - -import akka.routing.RoutedProps; -import akka.routing.Routing; -import akka.routing.LocalConnectionManager; -import scala.Option; -import akka.actor.ActorRef; -import akka.actor.Actors; -import akka.actor.ActorSystem; -import akka.actor.UntypedActor; -import akka.actor.UntypedActorFactory; -import akka.dispatch.Future; -import akka.japi.Procedure; -import akka.routing.Routing.Broadcast; -import scala.collection.JavaConversions; - -import java.util.LinkedList; - -public class Pi { - - private static final ActorSystem app = new ActorSystem(); - - public static void main(String[] args) throws Exception { - Pi pi = new Pi(); - pi.calculate(4, 10000, 10000); - } - - // ==================== - // ===== Messages ===== - // ==================== - static class Calculate {} - - static class Work { - private final int arg; - private final int nrOfElements; - - public Work(int arg, int nrOfElements) { - this.arg = arg; - this.nrOfElements = nrOfElements; - } - - public int getArg() { return arg; } - public int getNrOfElements() { return nrOfElements; } - } - - static class Result { - private final double value; - - public Result(double value) { - this.value = value; - } - - public double getValue() { return value; } - } - - // ================== - // ===== Worker ===== - // ================== - static class Worker extends UntypedActor { - - // define the work - private double calculatePiFor(int arg, int nrOfElements) { - double acc = 0.0; - for (int i = arg * nrOfElements; i <= ((arg + 1) * nrOfElements - 1); i++) { - acc += 4 * Math.pow(-1, i) / (2 * i + 1); - } - return acc; - } - - // message handler - public void onReceive(Object message) { - if (message instanceof Work) { - Work work = (Work) message; - getSender().tell(new Result(calculatePiFor(work.getArg(), work.getNrOfElements()))); // perform the work - } else throw new IllegalArgumentException("Unknown message [" + message + "]"); - } - } - - // ================== - // ===== Master ===== - // ================== - static class Master extends UntypedActor { - private final int nrOfMessages; - private final int nrOfElements; - - private double pi; - private int nrOfResults; - - private ActorRef router; - - public Master(int nrOfWorkers, int nrOfMessages, int nrOfElements) { - this.nrOfMessages = nrOfMessages; - this.nrOfElements = nrOfElements; - - LinkedList workers = new LinkedList(); - for (int i = 0; i < nrOfWorkers; i++) { - ActorRef worker = app.actorOf(Worker.class); - workers.add(worker); - } - - router = app.actorOf(new RoutedProps().withRoundRobinRouter().withLocalConnections(workers), "pi"); - } - - @Override - public void preStart() { - become(scatter); - } - - // message handler - public void onReceive(Object message) { - throw new IllegalStateException("Should be gather or scatter"); - } - - private final Procedure scatter = new Procedure() { - public void apply(Object msg) { - // schedule work - for (int arg = 0; arg < nrOfMessages; arg++) { - router.tell(new Work(arg, nrOfElements), getSelf()); - } - // Assume the gathering behavior - become(gather(getSender())); - } - }; - - private Procedure gather(final ActorRef recipient) { - return new Procedure() { - public void apply(Object msg) { - // handle result from the worker - Result result = (Result) msg; - pi += result.getValue(); - nrOfResults += 1; - if (nrOfResults == nrOfMessages) { - // send the pi result back to the guy who started the calculation - recipient.tell(pi); - // shut ourselves down, we're done - getSelf().stop(); - } - } - }; - } - - @Override - public void postStop() { - // send a PoisonPill to all workers telling them to shut down themselves - router.tell(new Broadcast(poisonPill())); - // send a PoisonPill to the router, telling him to shut himself down - router.tell(poisonPill()); - } - } - - // ================== - // ===== Run it ===== - // ================== - public void calculate(final int nrOfWorkers, final int nrOfElements, final int nrOfMessages) throws Exception { - - // create the master - ActorRef master = app.actorOf(new UntypedActorFactory() { - public UntypedActor create() { - return new Master(nrOfWorkers, nrOfMessages, nrOfElements); - } - }); - - // start the calculation - long start = currentTimeMillis(); - - // send calculate message - long timeout = 60000; - Future replyFuture = master.ask(new Calculate(), timeout); - Option result = replyFuture.await().resultOrException(); - if (result.isDefined()) { - double pi = (Double) result.get(); - // TODO java api for EventHandler? -// EventHandler.info(this, String.format("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis", pi, (currentTimeMillis() - start))); - System.out.println(String.format("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis", pi, (currentTimeMillis() - start))); - } else { - // TODO java api for EventHandler? -// EventHandler.error(this, "Pi calculation did not complete within the timeout."); - System.out.println("Pi calculation did not complete within the timeout."); - } - - } -} diff --git a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala deleted file mode 100644 index 4b07eedf1e..0000000000 --- a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.tutorial.second - -import akka.actor.Actor._ -import akka.event.Logging -import System.{ currentTimeMillis ⇒ now } -import akka.routing.Routing.Broadcast -import akka.routing._ -import akka.actor.{ ActorRef, Timeout, Actor, PoisonPill, ActorSystem } - -object Pi extends App { - - val app = ActorSystem() - val log = Logging(app, this) - - calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) - - // ==================== - // ===== Messages ===== - // ==================== - sealed trait PiMessage - case object Calculate extends PiMessage - case class Work(arg: Int, nrOfElements: Int) extends PiMessage - case class Result(value: Double) extends PiMessage - - // ================== - // ===== Worker ===== - // ================== - class Worker() extends Actor { - // define the work - val calculatePiFor = (arg: Int, nrOfElements: Int) ⇒ { - val range = (arg * nrOfElements) to ((arg + 1) * nrOfElements - 1) - var acc = 0.0D - range foreach (i ⇒ acc += 4 * math.pow(-1, i) / (2 * i + 1)) - acc - //range map (j => 4 * math.pow(-1, j) / (2 * j + 1)) sum - } - - def receive = { - case Work(arg, nrOfElements) ⇒ sender ! Result(calculatePiFor(arg, nrOfElements)) // perform the work - } - } - - // ================== - // ===== Master ===== - // ================== - case class Master(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) extends Actor { - var pi: Double = _ - var nrOfResults: Int = _ - - // create the workers - val workers = Vector.fill(nrOfWorkers)(app.actorOf[Worker]) - - // wrap them with a load-balancing router - val router = app.actorOf(RoutedProps( - routerFactory = () ⇒ new RoundRobinRouter, - connectionManager = new LocalConnectionManager(workers)), "pi") - - // phase 1, can accept a Calculate message - def scatter: Receive = { - case Calculate ⇒ - // schedule work - for (arg ← 0 until nrOfMessages) router ! Work(arg, nrOfElements) - - //Assume the gathering behavior - this become gather(sender) - } - - // phase 2, aggregate the results of the Calculation - def gather(recipient: ActorRef): Receive = { - case Result(value) ⇒ - // handle result from the worker - pi += value - nrOfResults += 1 - if (nrOfResults == nrOfMessages) { - // send the pi result back to the guy who started the calculation - recipient ! pi - // shut ourselves down, we're done - self.stop() - } - } - - // message handler starts at the scattering behavior - def receive = scatter - - // when we are stopped, stop our team of workers and our router - override def postStop() { - // send a PoisonPill to all workers telling them to shut down themselves - router ! Broadcast(PoisonPill) - // send a PoisonPill to the router, telling him to shut himself down - router ! PoisonPill - } - } - - // ================== - // ===== Run it ===== - // ================== - def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { - // create the master - val master = app.actorOf(new Master(nrOfWorkers, nrOfElements, nrOfMessages)) - - //start the calculation - val start = now - - //send calculate message - master.?(Calculate, Timeout(60000)). - await.resultOrException match { //wait for the result, with a 60 seconds timeout - case Some(pi) ⇒ - log.info("\n\tPi estimate: \t\t{}\n\tCalculation time: \t{} millis", pi, now - start) - case None ⇒ - log.error("Pi calculation did not complete within the timeout.") - } - } -}