diff --git a/.gitignore b/.gitignore index 857686b6aa..91eba2fc6b 100755 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +*.vim *~ *# src_managed diff --git a/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala new file mode 100644 index 0000000000..1118daff1c --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala @@ -0,0 +1,61 @@ +package akka.actor + +import akka.testkit.AkkaSpec +import akka.dispatch.UnboundedMailbox +import akka.util.duration._ + +object ConsistencySpec { + class CacheMisaligned(var value: Long, var padding1: Long, var padding2: Long, var padding3: Int) //Vars, no final fences + + class ConsistencyCheckingActor extends Actor { + var left = new CacheMisaligned(42, 0, 0, 0) //var + var right = new CacheMisaligned(0, 0, 0, 0) //var + var lastStep = -1L + def receive = { + case step: Long ⇒ + + if (lastStep != (step - 1)) + sender.tell("Test failed: Last step %s, this step %s".format(lastStep, step)) + + var shouldBeFortyTwo = left.value + right.value + if (shouldBeFortyTwo != 42) + sender ! "Test failed: 42 failed" + else { + left.value += 1 + right.value -= 1 + } + + lastStep = step + case "done" ⇒ sender ! "done"; self.stop() + } + } +} + +class ConsistencySpec extends AkkaSpec { + import ConsistencySpec._ + "The Akka actor model implementation" must { + "provide memory consistency" in { + val noOfActors = 7 + val dispatcher = system + .dispatcherFactory + .newDispatcher("consistency-dispatcher", 1, UnboundedMailbox()) + .withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(noOfActors, true) + .setCorePoolSize(10) + .setMaxPoolSize(10) + .setKeepAliveTimeInMillis(1) + .setAllowCoreThreadTimeout(true) + .build + + val props = Props[ConsistencyCheckingActor].withDispatcher(dispatcher) + val actors = Vector.fill(noOfActors)(system.actorOf(props)) + + for (i ← 0L until 600000L) { + actors.foreach(_.tell(i, testActor)) + } + + for (a ← actors) { a.tell("done", testActor) } + + for (a ← actors) expectMsg(5 minutes, "done") + } + } +} \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 90e398e4cb..88b31f25d9 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -11,6 +11,10 @@ import java.util.concurrent.atomic._ @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender { + def startWatching(target: ActorRef) = actorOf(Props(new Actor { + watch(target) + def receive = { case x ⇒ testActor forward x } + })) "The Death Watch" must { def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(5 seconds, actorRef + ": Stopped or Already terminated when linking") { @@ -19,8 +23,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "notify with one Terminated message when an Actor is stopped" in { val terminal = actorOf(Props(context ⇒ { case _ ⇒ })) - - testActor startsWatching terminal + startWatching(terminal) testActor ! "ping" expectMsg("ping") @@ -32,11 +35,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "notify with all monitors with one Terminated message when an Actor is stopped" in { val terminal = actorOf(Props(context ⇒ { case _ ⇒ })) - val monitor1, monitor2, monitor3 = - actorOf(Props(new Actor { - watch(terminal) - def receive = { case t: Terminated ⇒ testActor ! t } - })) + val monitor1, monitor2, monitor3 = startWatching(terminal) terminal ! PoisonPill @@ -51,11 +50,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "notify with _current_ monitors with one Terminated message when an Actor is stopped" in { val terminal = actorOf(Props(context ⇒ { case _ ⇒ })) - val monitor1, monitor3 = - actorOf(Props(new Actor { - watch(terminal) - def receive = { case t: Terminated ⇒ testActor ! t } - })) + val monitor1, monitor3 = startWatching(terminal) val monitor2 = actorOf(Props(new Actor { watch(terminal) unwatch(terminal) @@ -85,10 +80,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende val terminalProps = Props(context ⇒ { case x ⇒ context.sender ! x }) val terminal = (supervisor ? terminalProps).as[ActorRef].get - val monitor = actorOf(Props(new Actor { - watch(terminal) - def receive = { case t: Terminated ⇒ testActor ! t } - })) + val monitor = startWatching(terminal) terminal ! Kill terminal ! Kill @@ -113,9 +105,13 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende } })) - val failed, brother = (supervisor ? Props.empty).as[ActorRef].get - brother startsWatching failed - testActor startsWatching brother + val failed = (supervisor ? Props.empty).as[ActorRef].get + val brother = (supervisor ? Props(new Actor { + watch(failed) + def receive = Actor.emptyBehavior + })).as[ActorRef].get + + startWatching(brother) failed ! Kill val result = receiveWhile(3 seconds, messages = 3) { diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala index 917a65ec25..fdeabd2a47 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala @@ -58,7 +58,7 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender { val forward = actorOf(new Forwarder(testActor)) val fsm = actorOf(new MyFSM(testActor)) val sup = actorOf(Props(new Actor { - self startsWatching fsm + watch(fsm) def receive = { case _ ⇒ } }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None))) diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index f7ad0d34cb..dd9e9ac79f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -206,7 +206,7 @@ class RestartStrategySpec extends AkkaSpec { val boss = actorOf(Props(new Actor { def receive = { - case p: Props ⇒ sender ! context.actorOf(p) + case p: Props ⇒ sender ! watch(context.actorOf(p)) case t: Terminated ⇒ maxNoOfRestartsLatch.open } }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, Some(1000)))) @@ -228,8 +228,6 @@ class RestartStrategySpec extends AkkaSpec { }) val slave = (boss ? slaveProps).as[ActorRef].get - boss startsWatching slave - slave ! Ping slave ! Crash slave ! Ping diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index e3d2bf1eea..7b6299ab69 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -52,8 +52,7 @@ class SupervisorHierarchySpec extends AkkaSpec { val countDownMessages = new CountDownLatch(1) val countDownMax = new CountDownLatch(1) val boss = actorOf(Props(new Actor { - val crasher = context.actorOf(Props(new CountDownActor(countDownMessages))) - self startsWatching crasher + val crasher = watch(context.actorOf(Props(new CountDownActor(countDownMessages)))) protected def receive = { case "killCrasher" ⇒ crasher ! Kill diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index e21f965c51..f07ed9dfa1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -341,9 +341,11 @@ abstract class ActorModelSpec extends AkkaSpec { val cachedMessage = CountDownNStop(new CountDownLatch(num)) val stopLatch = new CountDownLatch(num) val waitTime = (30 seconds).dilated.toMillis - val boss = actorOf(Props(context ⇒ { - case "run" ⇒ for (_ ← 1 to num) (context.self startsWatching context.actorOf(props)) ! cachedMessage - case Terminated(child) ⇒ stopLatch.countDown() + val boss = actorOf(Props(new Actor { + def receive = { + case "run" ⇒ for (_ ← 1 to num) (watch(context.actorOf(props))) ! cachedMessage + case Terminated(child) ⇒ stopLatch.countDown() + } }).withDispatcher(system.dispatcherFactory.newPinnedDispatcher("boss"))) boss ! "run" try { @@ -492,3 +494,39 @@ class BalancingDispatcherModelSpec extends ActorModelSpec { } } } + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FJDispatcherModelSpec extends ActorModelSpec { + import ActorModelSpec._ + + def newInterceptedDispatcher = + (new Dispatcher(system.dispatcherFactory.prerequisites, "foo", system.settings.DispatcherThroughput, + system.settings.DispatcherThroughputDeadlineTime, system.dispatcherFactory.MailboxType, + new ForkJoinPoolConfig(), system.settings.DispatcherDefaultShutdown) with MessageDispatcherInterceptor).asInstanceOf[MessageDispatcherInterceptor] + + def dispatcherType = "FJDispatcher" + + "A " + dispatcherType must { + "process messages in parallel" in { + implicit val dispatcher = newInterceptedDispatcher + val aStart, aStop, bParallel = new CountDownLatch(1) + val a, b = newTestActor(dispatcher) + + a ! Meet(aStart, aStop) + assertCountDown(aStart, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds") + + b ! CountDown(bParallel) + assertCountDown(bParallel, 3.seconds.dilated.toMillis, "Should process other actors in parallel") + + aStop.countDown() + + a.stop + b.stop + + while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination + + assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1) + assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1) + } + } +} \ No newline at end of file diff --git a/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java b/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java index 80cc4c9675..dbe87482dc 100644 --- a/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java +++ b/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java @@ -12,8 +12,8 @@ final class AbstractMailbox { static { try { - mailboxStatusOffset = Unsafe.instance.objectFieldOffset(Mailbox.class.getDeclaredField("_status")); - systemMessageOffset = Unsafe.instance.objectFieldOffset(Mailbox.class.getDeclaredField("_systemQueue")); + mailboxStatusOffset = Unsafe.instance.objectFieldOffset(Mailbox.class.getDeclaredField("_statusDoNotCallMeDirectly")); + systemMessageOffset = Unsafe.instance.objectFieldOffset(Mailbox.class.getDeclaredField("_systemQueueDoNotCallMeDirectly")); } catch(Throwable t){ throw new ExceptionInInitializerError(t); } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index b8c0bbb327..8f613e02f5 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -363,24 +363,24 @@ trait Actor { * Puts the behavior on top of the hotswap stack. * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack */ - def become(behavior: Receive, discardOld: Boolean = true) { context.become(behavior, discardOld) } + final def become(behavior: Receive, discardOld: Boolean = true) { context.become(behavior, discardOld) } /** * Reverts the Actor behavior to the previous one in the hotswap stack. */ - def unbecome() { context.unbecome() } + final def unbecome() { context.unbecome() } /** * Registers this actor as a Monitor for the provided ActorRef * @return the provided ActorRef */ - def watch(subject: ActorRef): ActorRef = self startsWatching subject + final def watch(subject: ActorRef): ActorRef = context startsWatching subject /** * Unregisters this actor as Monitor for the provided ActorRef * @return the provided ActorRef */ - def unwatch(subject: ActorRef): ActorRef = self stopsWatching subject + final def unwatch(subject: ActorRef): ActorRef = context stopsWatching subject // ========================================= // ==== INTERNAL IMPLEMENTATION DETAILS ==== @@ -395,6 +395,6 @@ trait Actor { } } - private val processingBehavior = receive //ProcessingBehavior is the original behavior + private[this] val processingBehavior = receive //ProcessingBehavior is the original behavior } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index afd462ff1e..704d78d38b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -47,6 +47,10 @@ trait ActorContext extends ActorRefFactory { def system: ActorSystem def parent: ActorRef + + def startsWatching(subject: ActorRef): ActorRef + + def stopsWatching(subject: ActorRef): ActorRef } private[akka] object ActorCell { @@ -136,13 +140,13 @@ private[akka] class ActorCell( // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ private[akka] def stop(): Unit = dispatcher.systemDispatch(this, Terminate()) - final def startsWatching(subject: ActorRef): ActorRef = { + override final def startsWatching(subject: ActorRef): ActorRef = { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ dispatcher.systemDispatch(this, Link(subject)) subject } - final def stopsWatching(subject: ActorRef): ActorRef = { + override final def stopsWatching(subject: ActorRef): ActorRef = { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ dispatcher.systemDispatch(this, Unlink(subject)) subject diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index e62a04938a..30a20ad769 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -46,7 +46,7 @@ import akka.event.DeathWatch * @author Jonas Bonér */ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable { - scalaRef: ScalaActorRef ⇒ + scalaRef: ScalaActorRef with RefInternals ⇒ // Only mutable for RemoteServer in order to maintain identity across nodes /** @@ -108,16 +108,6 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable */ def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender) - /** - * Suspends the actor. It will not process messages while suspended. - */ - def suspend(): Unit //TODO FIXME REMOVE THIS - - /** - * Resumes a suspended actor. - */ - def resume(): Unit //TODO FIXME REMOVE THIS - /** * Shuts down the actor its dispatcher and message queue. */ @@ -128,24 +118,6 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable */ def isTerminated: Boolean - /** - * Registers this actor to be a death monitor of the provided ActorRef - * This means that this actor will get a Terminated()-message when the provided actor - * is permanently terminated. - * - * @return the same ActorRef that is provided to it, to allow for cleaner invocations - */ - def startsWatching(subject: ActorRef): ActorRef //TODO FIXME REMOVE THIS - - /** - * Deregisters this actor from being a death monitor of the provided ActorRef - * This means that this actor will not get a Terminated()-message when the provided actor - * is permanently terminated. - * - * @return the same ActorRef that is provided to it, to allow for cleaner invocations - */ - def stopsWatching(subject: ActorRef): ActorRef //TODO FIXME REMOVE THIS - override def hashCode: Int = HashCode.hash(HashCode.SEED, address) override def equals(that: Any): Boolean = { @@ -169,7 +141,7 @@ class LocalActorRef private[akka] ( val systemService: Boolean = false, _receiveTimeout: Option[Long] = None, _hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap) - extends ActorRef with ScalaActorRef { + extends ActorRef with ScalaActorRef with RefInternals { def name = path.name @@ -215,24 +187,6 @@ class LocalActorRef private[akka] ( */ def stop(): Unit = actorCell.stop() - /** - * Registers this actor to be a death monitor of the provided ActorRef - * This means that this actor will get a Terminated()-message when the provided actor - * is permanently terminated. - * - * @return the same ActorRef that is provided to it, to allow for cleaner invocations - */ - def startsWatching(subject: ActorRef): ActorRef = actorCell.startsWatching(subject) - - /** - * Deregisters this actor from being a death monitor of the provided ActorRef - * This means that this actor will not get a Terminated()-message when the provided actor - * is permanently terminated. - * - * @return the same ActorRef that is provided to it, to allow for cleaner invocations - */ - def stopsWatching(subject: ActorRef): ActorRef = actorCell.stopsWatching(subject) - // ========= AKKA PROTECTED FUNCTIONS ========= protected[akka] def underlying: ActorCell = actorCell @@ -296,7 +250,11 @@ trait ScalaActorRef { ref: ActorRef ⇒ * implicit timeout */ def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout) +} +private[akka] trait RefInternals { + def resume(): Unit + def suspend(): Unit protected[akka] def restart(cause: Throwable): Unit } @@ -325,14 +283,11 @@ case class SerializedActorRef(hostname: String, port: Int, path: String) { /** * Trait for ActorRef implementations where all methods contain default stubs. */ -trait MinimalActorRef extends ActorRef with ScalaActorRef { +trait MinimalActorRef extends ActorRef with ScalaActorRef with RefInternals { private[akka] val uuid: Uuid = newUuid() def name: String = uuid.toString - def startsWatching(actorRef: ActorRef): ActorRef = actorRef - def stopsWatching(actorRef: ActorRef): ActorRef = actorRef - def suspend(): Unit = () def resume(): Unit = () diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 764838cdb8..f33dd3f0c5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -409,7 +409,12 @@ class LocalDeathWatch extends DeathWatch with ActorClassification { } } -class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, system: ActorSystem) extends Scheduler { +/** + * Scheduled tasks (Runnable and functions) are executed with the supplied dispatcher. + * Note that dispatcher is by-name parameter, because dispatcher might not be initialized + * when the scheduler is created. + */ +class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, dispatcher: ⇒ MessageDispatcher) extends Scheduler { def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable = new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay), initialDelay)) @@ -429,8 +434,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, system: ActorSystem) private def createSingleTask(runnable: Runnable): TimerTask = new TimerTask() { def run(timeout: org.jboss.netty.akka.util.Timeout) { - // FIXME: consider executing runnable inside main dispatcher to prevent blocking of scheduler - runnable.run() + dispatcher.dispatchTask(() ⇒ runnable.run()) } } @@ -444,7 +448,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, system: ActorSystem) private def createSingleTask(f: () ⇒ Unit): TimerTask = new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { - f() + dispatcher.dispatchTask(f) } } @@ -456,7 +460,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, system: ActorSystem) receiver ! message timeout.getTimer.newTimeout(this, delay) } else { - system.eventStream.publish(Warning(this.getClass.getSimpleName, "Could not reschedule message to be sent because receiving actor has been terminated.")) + log.warning("Could not reschedule message to be sent because receiving actor has been terminated.") } } } @@ -465,7 +469,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, system: ActorSystem) private def createContinuousTask(f: () ⇒ Unit, delay: Duration): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { - f() + dispatcher.dispatchTask(f) timeout.getTimer.newTimeout(this, delay) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 249fbd343a..e5a67d9199 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -255,7 +255,7 @@ abstract class ActorSystem extends ActorRefFactory { * Register a block of code to run after all actors in this actor system have * been stopped. */ - def registerOnTermination(code: ⇒ Unit) + def registerOnTermination[T](code: ⇒ T) /** * Register a block of code to run after all actors in this actor system have @@ -312,7 +312,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor eventStream.startStdoutLogger(settings) val log = new BusLogging(eventStream, "ActorSystem") // “this” used only for .getClass in tagging messages - val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel), this) + val scheduler = createScheduler() val provider: ActorRefProvider = { val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match { @@ -346,6 +346,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor } val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler)) + // TODO why implicit val dispatcher? implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher //FIXME Set this to a Failure when things bubble to the top @@ -374,16 +375,38 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor def start() = _start - def registerOnTermination(code: ⇒ Unit) { terminationFuture onComplete (_ ⇒ code) } + def registerOnTermination[T](code: ⇒ T) { terminationFuture onComplete (_ ⇒ code) } def registerOnTermination(code: Runnable) { terminationFuture onComplete (_ ⇒ code.run) } // TODO shutdown all that other stuff, whatever that may be def stop() { guardian.stop() - terminationFuture onComplete (_ ⇒ scheduler.stop()) + terminationFuture onComplete (_ ⇒ stopScheduler()) terminationFuture onComplete (_ ⇒ dispatcher.shutdown()) } + protected def createScheduler(): Scheduler = { + val threadFactory = new MonitorableThreadFactory("DefaultScheduler") + val hwt = new HashedWheelTimer(log, threadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel) + // note that dispatcher is by-name parameter in DefaultScheduler constructor, + // because dispatcher is not initialized when the scheduler is created + def safeDispatcher = { + if (dispatcher eq null) { + val exc = new IllegalStateException("Scheduler is using dispatcher before it has been initialized") + log.error(exc, exc.getMessage) + throw exc + } else { + dispatcher + } + } + new DefaultScheduler(hwt, log, safeDispatcher) + } + + protected def stopScheduler(): Unit = scheduler match { + case x: DefaultScheduler ⇒ x.stop() + case _ ⇒ + } + private val extensions = new ConcurrentIdentityHashMap[ExtensionId[_], AnyRef] /** diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 4656f5a3e3..b079550998 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -122,12 +122,12 @@ abstract class FaultHandlingStrategy { def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = { if (children.nonEmpty) - children.foreach(_.suspend()) + children.foreach(_.asInstanceOf[RefInternals].suspend()) } def handleSupervisorRestarted(cause: Throwable, supervisor: ActorRef, children: Iterable[ActorRef]): Unit = { if (children.nonEmpty) - children.foreach(_.restart(cause)) + children.foreach(_.asInstanceOf[RefInternals].restart(cause)) } /** @@ -136,7 +136,7 @@ abstract class FaultHandlingStrategy { def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate action match { - case Resume ⇒ child.resume(); true + case Resume ⇒ child.asInstanceOf[RefInternals].resume(); true case Restart ⇒ processFailure(true, child, cause, stats, children); true case Stop ⇒ processFailure(false, child, cause, stats, children); true case Escalate ⇒ false @@ -194,7 +194,7 @@ case class AllForOneStrategy(decider: FaultHandlingStrategy.Decider, def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (children.nonEmpty) { if (restart && children.forall(_.requestRestartPermission(retriesWindow))) - children.foreach(_.child.restart(cause)) + children.foreach(_.child.asInstanceOf[RefInternals].restart(cause)) else children.foreach(_.child.stop()) } @@ -247,7 +247,7 @@ case class OneForOneStrategy(decider: FaultHandlingStrategy.Decider, def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (restart && stats.requestRestartPermission(retriesWindow)) - child.restart(cause) + child.asInstanceOf[RefInternals].restart(cause) else child.stop() //TODO optimization to drop child here already? } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index ee28fd586e..1a40ee23cd 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -6,12 +6,9 @@ package akka.dispatch import akka.event.Logging.Warning import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue } -import akka.actor.{ ActorCell, ActorKilledException } -import akka.actor.ActorSystem -import akka.event.EventStream -import akka.actor.Scheduler +import akka.actor.ActorCell import akka.util.Duration +import java.util.concurrent._ /** * Default settings are: @@ -156,4 +153,4 @@ abstract class PriorityGenerator extends java.util.Comparator[Envelope] { final def compare(thisMessage: Envelope, thatMessage: Envelope): Int = gen(thisMessage.message) - gen(thatMessage.message) -} +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 7d3b3c3d8b..49110c2974 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -40,13 +40,13 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes import Mailbox._ @volatile - protected var _status: Status = _ //0 by default + protected var _statusDoNotCallMeDirectly: Status = _ //0 by default @volatile - protected var _systemQueue: SystemMessage = _ //null by default + protected var _systemQueueDoNotCallMeDirectly: SystemMessage = _ //null by default @inline - final def status: Mailbox.Status = _status + final def status: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset) @inline final def shouldProcessMessage: Boolean = (status & 3) == Open @@ -65,7 +65,8 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes Unsafe.instance.compareAndSwapInt(this, AbstractMailbox.mailboxStatusOffset, oldStatus, newStatus) @inline - protected final def setStatus(newStatus: Status): Unit = _status = newStatus + protected final def setStatus(newStatus: Status): Unit = + Unsafe.instance.putIntVolatile(this, AbstractMailbox.mailboxStatusOffset, newStatus) /** * set new primary status Open. Caller does not need to worry about whether @@ -130,7 +131,8 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes /* * AtomicReferenceFieldUpdater for system queue */ - protected final def systemQueueGet: SystemMessage = _systemQueue + protected final def systemQueueGet: SystemMessage = + Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[SystemMessage] protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean = Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old, _new) diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index f543e5c016..c45cc74593 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -11,6 +11,9 @@ import akka.event.Logging.{ Warning, Error } import akka.actor.ActorSystem import java.util.concurrent._ import akka.event.EventStream +import concurrent.forkjoin.ForkJoinPool._ +import concurrent.forkjoin.{ ForkJoinTask, ForkJoinWorkerThread, ForkJoinPool } +import concurrent.forkjoin.ForkJoinTask._ object ThreadPoolConfig { type Bounds = Int @@ -184,6 +187,52 @@ class MonitorableThread(runnable: Runnable, name: String) } } +case class ForkJoinPoolConfig(targetParallelism: Int = Runtime.getRuntime.availableProcessors()) extends ExecutorServiceFactoryProvider { + final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory { + def createExecutorService: ExecutorService = { + new ForkJoinPool(targetParallelism) with ExecutorService { + setAsyncMode(true) + setMaintainsParallelism(true) + + override final def execute(r: Runnable) { + r match { + case fjmbox: FJMailbox ⇒ + //fjmbox.fjTask.reinitialize() + Thread.currentThread match { + case fjwt: ForkJoinWorkerThread if fjwt.getPool eq this ⇒ + fjmbox.fjTask.fork() //We should do fjwt.pushTask(fjmbox.fjTask) but it's package protected + case _ ⇒ super.execute[Unit](fjmbox.fjTask) + } + case _ ⇒ + super.execute(r) + } + } + + import java.util.{ Collection ⇒ JCollection } + + def invokeAny[T](callables: JCollection[_ <: Callable[T]]) = + throw new UnsupportedOperationException("invokeAny. NOT!") + + def invokeAny[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = + throw new UnsupportedOperationException("invokeAny. NOT!") + + def invokeAll[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = + throw new UnsupportedOperationException("invokeAny. NOT!") + } + } + } +} + +trait FJMailbox { self: Mailbox ⇒ + final val fjTask = new ForkJoinTask[Unit] with Runnable { + private[this] var result: Unit = () + final def getRawResult() = result + final def setRawResult(v: Unit) { result = v } + final def exec() = { self.run(); true } + final def run() { invoke() } + } +} + /** * As the name says */ diff --git a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala index b7655e376e..6e45a50cad 100644 --- a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala +++ b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala @@ -11,6 +11,7 @@ import scala.annotation.tailrec import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger } import java.net.InetSocketAddress import akka.remote.RemoteAddress +import collection.JavaConverters /** * An Iterable that also contains a version. @@ -85,6 +86,10 @@ trait ConnectionManager { */ class LocalConnectionManager(initialConnections: Iterable[ActorRef]) extends ConnectionManager { + def this(iterable: java.lang.Iterable[ActorRef]) { + this(JavaConverters.iterableAsScalaIterableConverter(iterable).asScala) + } + case class State(version: Long, connections: Iterable[ActorRef]) extends VersionedIterable[ActorRef] { def iterable = connections } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 293c1abb4b..bae2ee2f73 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -9,11 +9,11 @@ import akka.actor._ import akka.config.ConfigurationException import akka.dispatch.{ Future, MessageDispatcher } import akka.util.{ ReflectiveAccess, Duration } -import java.net.InetSocketAddress import java.lang.reflect.InvocationTargetException import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger } import scala.annotation.tailrec +import akka.japi.Creator sealed trait RouterType @@ -76,6 +76,12 @@ case class RoutedProps private[akka] ( connectionManager: ConnectionManager, timeout: Timeout = RoutedProps.defaultTimeout, localOnly: Boolean = RoutedProps.defaultLocalOnly) { + + // Java API + def this(creator: Creator[Router], connectionManager: ConnectionManager, timeout: Timeout, localOnly: Boolean) { + this(() ⇒ creator.create(), connectionManager, timeout, localOnly) + } + } object RoutedProps { diff --git a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala index 795fbf5a54..c4ec7dcf31 100644 --- a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala @@ -293,9 +293,6 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall callback.done(false) } - def startsWatching(actorRef: ActorRef): ActorRef = unsupported - def stopsWatching(actorRef: ActorRef): ActorRef = unsupported - def ?(message: Any)(implicit timeout: Timeout): Future[Any] = new KeptPromise[Any](Left(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName)))) def restart(reason: Throwable): Unit = unsupported diff --git a/akka-docs/java/dispatchers.rst b/akka-docs/java/dispatchers.rst index d9f32f38c9..dc2684f9d8 100644 --- a/akka-docs/java/dispatchers.rst +++ b/akka-docs/java/dispatchers.rst @@ -155,6 +155,15 @@ Creating a Dispatcher with a priority mailbox using PriorityGenerator: public class Main { // A simple Actor that just prints the messages it processes public static class MyActor extends UntypedActor { + public MyActor() { + self.tell("lowpriority"); + getSelf().tell("lowpriority"); + getSelf().tell("highpriority"); + getSelf().tell("pigdog"); + getSelf().tell("pigdog2"); + getSelf().tell("pigdog3"); + getSelf().tell("highpriority"); + } public void onReceive(Object message) throws Exception { System.out.println(message); } @@ -170,19 +179,9 @@ Creating a Dispatcher with a priority mailbox using PriorityGenerator: } }; // We create an instance of the actor that will print out the messages it processes - ActorRef ref = Actors.actorOf(MyActor.class); - // We create a new Priority dispatcher and seed it with the priority generator - ref.setDispatcher(new Dispatcher("foo", 5, new UnboundedPriorityMailbox(gen))); + // We create a new Priority dispatcher and seed it with the priority generator + ActorRef ref = Actors.actorOf((new Props()).withCreator(MyActor.class).withDispatcher(new Dispatcher("foo", 5, new UnboundedPriorityMailbox(gen)))); - ref.getDispatcher().suspend(ref); // Suspending the actor so it doesn't start to treat the messages before we have enqueued all of them :-) - ref.tell("lowpriority"); - ref.tell("lowpriority"); - ref.tell("highpriority"); - ref.tell("pigdog"); - ref.tell("pigdog2"); - ref.tell("pigdog3"); - ref.tell("highpriority"); - ref.getDispatcher().resume(ref); // Resuming the actor so it will start treating its messages } } diff --git a/akka-docs/java/remote-actors.rst b/akka-docs/java/remote-actors.rst index 745679c537..2147e03db1 100644 --- a/akka-docs/java/remote-actors.rst +++ b/akka-docs/java/remote-actors.rst @@ -178,10 +178,7 @@ The messages that it prevents are all that extends 'LifeCycleMessage': * case object ReceiveTimeout It also prevents the client from invoking any life-cycle and side-effecting methods, such as: -* start * stop -* startsWatching -* stopsWatching * etc. Using secure cookie for remote client authentication diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index fa00c746f5..e16c336753 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -155,23 +155,18 @@ Creating a Dispatcher using PriorityGenerator: val a = Actor.actorOf( // We create a new Actor that just prints out what it processes Props(new Actor { + self ! 'lowpriority + self ! 'lowpriority + self ! 'highpriority + self ! 'pigdog + self ! 'pigdog2 + self ! 'pigdog3 + self ! 'highpriority def receive = { case x => println(x) } }).withDispatcher(new Dispatcher("foo", 5, UnboundedPriorityMailbox(gen)))) // We create a new Priority dispatcher and seed it with the priority generator - a.dispatcher.suspend(a) // Suspending the actor so it doesn't start to treat the messages before we have enqueued all of them :-) - - a ! 'lowpriority - a ! 'lowpriority - a ! 'highpriority - a ! 'pigdog - a ! 'pigdog2 - a ! 'pigdog3 - a ! 'highpriority - - a.dispatcher.resume(a) // Resuming the actor so it will start treating its messages - Prints: 'highpriority diff --git a/akka-docs/scala/remote-actors.rst b/akka-docs/scala/remote-actors.rst index ab2fe345c8..c408436b1e 100644 --- a/akka-docs/scala/remote-actors.rst +++ b/akka-docs/scala/remote-actors.rst @@ -180,10 +180,7 @@ The messages that it prevents are all that extends 'LifeCycleMessage': * class ReceiveTimeout..) It also prevents the client from invoking any life-cycle and side-effecting methods, such as: -* start * stop -* startsWatching -* stopsWatching * etc. Using secure cookie for remote client authentication diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 358568e13c..385a27f29a 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -285,7 +285,7 @@ private[akka] case class RemoteActorRef private[akka] ( remoteAddress: RemoteAddress, path: ActorPath, loader: Option[ClassLoader]) - extends ActorRef with ScalaActorRef { + extends ActorRef with ScalaActorRef with RefInternals { @volatile private var running: Boolean = true @@ -296,7 +296,7 @@ private[akka] case class RemoteActorRef private[akka] ( def isTerminated: Boolean = !running - protected[akka] def sendSystemMessage(message: SystemMessage): Unit = unsupported + protected[akka] def sendSystemMessage(message: SystemMessage): Unit = throw new UnsupportedOperationException("Not supported for RemoteActorRef") override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), remoteAddress, this, loader) @@ -318,11 +318,5 @@ private[akka] case class RemoteActorRef private[akka] ( @throws(classOf[java.io.ObjectStreamException]) private def writeReplace(): AnyRef = provider.serialize(this) - def startsWatching(actorRef: ActorRef): ActorRef = unsupported //FIXME Implement - - def stopsWatching(actorRef: ActorRef): ActorRef = unsupported //FIXME Implement - protected[akka] def restart(cause: Throwable): Unit = () - - private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index d12f5ea7e4..2732cf1ebf 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -59,35 +59,34 @@ abstract class RemoteClient private[akka] ( /** * Converts the message to the wireprotocol and sends the message across the wire */ - def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = + def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) { send(remoteSupport.createRemoteMessageProtocolBuilder(Left(recipient), Right(message), senderOption).build) + } else { + val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress) + remoteSupport.notifyListeners(RemoteClientError(exception, remoteSupport, remoteAddress)) + throw exception + } /** * Sends the message across the wire */ - def send(request: RemoteMessageProtocol) { - if (isRunning) { //TODO FIXME RACY - log.debug("Sending message: " + new RemoteMessage(request, remoteSupport)) + def send(request: RemoteMessageProtocol): Unit = { + log.debug("Sending message: {}", new RemoteMessage(request, remoteSupport)) - try { - val payload = remoteSupport.createMessageSendEnvelope(request) - currentChannel.write(payload).addListener( - new ChannelFutureListener { - def operationComplete(future: ChannelFuture) { - if (future.isCancelled) { - //Not interesting at the moment - } else if (!future.isSuccess) { - remoteSupport.notifyListeners(RemoteClientWriteFailed(payload, future.getCause, remoteSupport, remoteAddress)) - } + try { + val payload = remoteSupport.createMessageSendEnvelope(request) + currentChannel.write(payload).addListener( + new ChannelFutureListener { + def operationComplete(future: ChannelFuture) { + if (future.isCancelled) { + //Not interesting at the moment + } else if (!future.isSuccess) { + remoteSupport.notifyListeners(RemoteClientWriteFailed(payload, future.getCause, remoteSupport, remoteAddress)) } - }) - } catch { - case e: Exception ⇒ remoteSupport.notifyListeners(RemoteClientError(e, remoteSupport, remoteAddress)) - } - } else { - val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress) - remoteSupport.notifyListeners(RemoteClientError(exception, remoteSupport, remoteAddress)) - throw exception + } + }) + } catch { + case e: Exception ⇒ remoteSupport.notifyListeners(RemoteClientError(e, remoteSupport, remoteAddress)) } } @@ -132,8 +131,7 @@ class ActiveRemoteClient private[akka] ( private[remote] var connection: ChannelFuture = _ @volatile private[remote] var openChannels: DefaultChannelGroup = _ - @volatile - private var timer: HashedWheelTimer = _ + @volatile private var reconnectionTimeWindowStart = 0L @@ -180,10 +178,9 @@ class ActiveRemoteClient private[akka] ( runSwitch switchOn { openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName) - timer = new HashedWheelTimer bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)) - bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, remoteAddress, timer, this)) + bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, remoteAddress, this)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) @@ -219,8 +216,6 @@ class ActiveRemoteClient private[akka] ( log.debug("Shutting down remote client [{}]", name) notifyListeners(RemoteClientShutdown(remoteSupport, remoteAddress)) - timer.stop() - timer = null openChannels.close.awaitUninterruptibly openChannels = null bootstrap.releaseExternalResources() @@ -253,18 +248,17 @@ class ActiveRemoteClientPipelineFactory( name: String, bootstrap: ClientBootstrap, remoteAddress: RemoteAddress, - timer: HashedWheelTimer, client: ActiveRemoteClient) extends ChannelPipelineFactory { import client.remoteSupport.clientSettings._ def getPipeline: ChannelPipeline = { - val timeout = new ReadTimeoutHandler(timer, ReadTimeout.length, ReadTimeout.unit) + val timeout = new ReadTimeoutHandler(client.remoteSupport.timer, ReadTimeout.length, ReadTimeout.unit) val lenDec = new LengthFieldBasedFrameDecoder(MessageFrameSize, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, timer, client) + val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, client.remoteSupport.timer, client) new StaticChannelPipeline(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient) } @@ -361,6 +355,10 @@ class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) wi val serverSettings = RemoteExtension(system).serverSettings val clientSettings = RemoteExtension(system).clientSettings + val timer: HashedWheelTimer = new HashedWheelTimer + + _system.registerOnTermination(timer.stop()) //Shut this guy down at the end + private val remoteClients = new HashMap[RemoteAddress, RemoteClient] private val clientsLock = new ReentrantReadWriteLock @@ -519,6 +517,10 @@ class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Optio try { val shutdownSignal = { val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN) + b.setOrigin(RemoteProtocol.AddressProtocol.newBuilder + .setHostname(address.hostname) + .setPort(address.port) + .build) if (SecureCookie.nonEmpty) b.setCookie(SecureCookie.get) b.build @@ -648,7 +650,7 @@ class RemoteServerHandler( val inbound = RemoteAddress(origin.getHostname, origin.getPort) val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound) remoteSupport.bindClient(inbound, client) - case CommandType.SHUTDOWN ⇒ //TODO FIXME Dispose passive connection here + case CommandType.SHUTDOWN ⇒ //No need to do anything here case _ ⇒ //Unknown command } case _ ⇒ //ignore diff --git a/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala b/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala index 7f7072b427..56a27079ea 100644 --- a/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala @@ -20,7 +20,7 @@ trait NetworkFailureSpec { self: AkkaSpec ⇒ val BytesPerSecond = "60KByte/s" val DelayMillis = "350ms" - val PortRang = "1024-65535" + val PortRange = "1024-65535" def replyWithTcpResetFor(duration: Duration, dead: AtomicBoolean) = { Future { @@ -82,12 +82,12 @@ trait NetworkFailureSpec { self: AkkaSpec ⇒ def enableNetworkDrop() = { restoreIP() - assert(new ProcessBuilder("ipfw", "add", "1", "deny", "tcp", "from", "any", "to", "any", PortRang).start.waitFor == 0) + assert(new ProcessBuilder("ipfw", "add", "1", "deny", "tcp", "from", "any", "to", "any", PortRange).start.waitFor == 0) } def enableTcpReset() = { restoreIP() - assert(new ProcessBuilder("ipfw", "add", "1", "reset", "tcp", "from", "any", "to", "any", PortRang).start.waitFor == 0) + assert(new ProcessBuilder("ipfw", "add", "1", "reset", "tcp", "from", "any", "to", "any", PortRange).start.waitFor == 0) } def restoreIP() = { diff --git a/akka-samples/akka-sample-fsm/README b/akka-samples/akka-sample-fsm/README new file mode 100644 index 0000000000..1391071f0b --- /dev/null +++ b/akka-samples/akka-sample-fsm/README @@ -0,0 +1,28 @@ +FSM +=== + +Requirements +------------ + +To build and run FSM you need [Simple Build Tool][sbt] (sbt). + +Running +------- + +First time, 'sbt update' to get dependencies, then to run Ants use 'sbt run'. +Here is an example. First type 'sbt' to start SBT interactively, the run 'update' and 'run': +> cd $AKKA_HOME + +> % sbt + +> > project akka-sample-fsm + +> > run + +> > Choose 1 or 2 depending on what sample you wish to run + +Notice +------ + +[akka]: http://akka.io +[sbt]: http://code.google.com/p/simple-build-tool/ diff --git a/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala b/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala index 0dcf33e401..d039609a98 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2010 Typesafe Inc. . + */ package sample.fsm.buncher import akka.actor.ActorRefFactory @@ -6,15 +9,15 @@ import akka.util.Duration import akka.actor.{ FSM, Actor, ActorRef } /* - * generic typed object buncher. - * - * To instantiate it, use the factory method like so: - * Buncher(100, 500)(x : List[AnyRef] => x foreach println) - * which will yield a fully functional ActorRef. - * The type of messages allowed is strongly typed to match the - * supplied processing method; other messages are discarded (and - * possibly logged). - */ +* generic typed object buncher. +* +* To instantiate it, use the factory method like so: +* Buncher(100, 500)(x : List[AnyRef] => x foreach println) +* which will yield a fully functional ActorRef. +* The type of messages allowed is strongly typed to match the +* supplied processing method; other messages are discarded (and +* possibly logged). +*/ object GenericBuncher { trait State case object Idle extends State diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala index 2c23940c9f..78449edc1b 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2010 Typesafe Inc. . + */ package sample.fsm.dining.become //Akka adaptation of @@ -7,8 +10,8 @@ import akka.actor.{ ActorRef, Actor, ActorSystem } import akka.util.duration._ /* - * First we define our messages, they basically speak for themselves - */ +* First we define our messages, they basically speak for themselves +*/ sealed trait DiningHakkerMessage case class Busy(chopstick: ActorRef) extends DiningHakkerMessage case class Put(hakker: ActorRef) extends DiningHakkerMessage @@ -18,9 +21,9 @@ object Eat extends DiningHakkerMessage object Think extends DiningHakkerMessage /* - * A Chopstick is an actor, it can be taken, and put back - */ -class Chopstick(name: String) extends Actor { +* A Chopstick is an actor, it can be taken, and put back +*/ +class Chopstick extends Actor { //When a Chopstick is taken by a hakker //It will refuse to be taken by other hakkers @@ -44,8 +47,8 @@ class Chopstick(name: String) extends Actor { } /* - * A hakker is an awesome dude or dudett who either thinks about hacking or has to eat ;-) - */ +* A hakker is an awesome dude or dudett who either thinks about hacking or has to eat ;-) +*/ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { //When a hakker is thinking it can become hungry @@ -75,7 +78,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { //back to think about how he should obtain his chopsticks :-) def waiting_for(chopstickToWaitFor: ActorRef, otherChopstick: ActorRef): Receive = { case Taken(`chopstickToWaitFor`) ⇒ - println("%s has picked up %s and %s, and starts to eat", name, left.address, right.address) + println("%s has picked up %s and %s and starts to eat".format(name, left.name, right.name)) become(eating) system.scheduler.scheduleOnce(self, Think, 5 seconds) @@ -105,27 +108,33 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { become(thinking) left ! Put(self) right ! Put(self) - println("%s puts down his chopsticks and starts to think", name) + println("%s puts down his chopsticks and starts to think".format(name)) system.scheduler.scheduleOnce(self, Eat, 5 seconds) } //All hakkers start in a non-eating state def receive = { case Think ⇒ - println("%s starts to think", name) + println("%s starts to think".format(name)) become(thinking) system.scheduler.scheduleOnce(self, Eat, 5 seconds) } } /* - * Alright, here's our test-harness - */ +* Alright, here's our test-harness +*/ object DiningHakkers { val system = ActorSystem() + + def main(args: Array[String]): Unit = { + run + } + def run { //Create 5 chopsticks - val chopsticks = for (i ← 1 to 5) yield system.actorOf(new Chopstick("Chopstick " + i)) + val chopsticks = for (i ← 1 to 5) yield system.actorOf[Chopstick]("Chopstick " + i) + //Create 5 awesome hakkers and assign them their left and right chopstick val hakkers = for { (name, i) ← List("Ghosh", "Bonér", "Klang", "Krasser", "Manie").zipWithIndex diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index 987f630784..b9cea1fe8f 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2010 Typesafe Inc. . + */ package sample.fsm.dining.fsm import akka.actor.{ ActorRef, Actor, FSM, ActorSystem } @@ -6,8 +9,8 @@ import akka.util.Duration import akka.util.duration._ /* - * Some messages for the chopstick - */ +* Some messages for the chopstick +*/ sealed trait ChopstickMessage object Take extends ChopstickMessage object Put extends ChopstickMessage @@ -27,9 +30,9 @@ case object Taken extends ChopstickState case class TakenBy(hakker: ActorRef) /* - * A chopstick is an actor, it can be taken, and put back - */ -class Chopstick(name: String) extends Actor with FSM[ChopstickState, TakenBy] { +* A chopstick is an actor, it can be taken, and put back +*/ +class Chopstick extends Actor with FSM[ChopstickState, TakenBy] { // A chopstick begins its existence as available and taken by no one startWith(Available, TakenBy(system.deadLetters)) @@ -77,8 +80,8 @@ case object Eating extends FSMHakkerState case class TakenChopsticks(left: Option[ActorRef], right: Option[ActorRef]) /* - * A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-) - */ +* A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-) +*/ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[FSMHakkerState, TakenChopsticks] { //All hakkers start waiting @@ -86,7 +89,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit when(Waiting) { case Event(Think, _) ⇒ - println("%s starts to think", name) + println("%s starts to think".format(name)) startThinking(5 seconds) } @@ -125,7 +128,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit } private def startEating(left: ActorRef, right: ActorRef): State = { - println("%s has picked up %s and %s, and starts to eat", name, left.address, right.address) + println("%s has picked up %s and %s and starts to eat".format(name, left.name, right.name)) goto(Eating) using TakenChopsticks(Some(left), Some(right)) forMax (5 seconds) } @@ -144,7 +147,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit // then he puts down his chopsticks and starts to think when(Eating) { case Event(StateTimeout, _) ⇒ - println("%s puts down his chopsticks and starts to think", name) + println("%s puts down his chopsticks and starts to think".format(name)) left ! Put right ! Put startThinking(5 seconds) @@ -159,15 +162,19 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit } /* - * Alright, here's our test-harness - */ +* Alright, here's our test-harness +*/ object DiningHakkersOnFsm { val system = ActorSystem() + def main(args: Array[String]): Unit = { + run + } + def run = { // Create 5 chopsticks - val chopsticks = for (i ← 1 to 5) yield system.actorOf(new Chopstick("Chopstick " + i)) + val chopsticks = for (i ← 1 to 5) yield system.actorOf[Chopstick]("Chopstick " + i) // Create 5 awesome fsm hakkers and assign them their left and right chopstick val hakkers = for { (name, i) ← List("Ghosh", "Bonér", "Klang", "Krasser", "Manie").zipWithIndex diff --git a/akka-samples/akka-sample-hello/README b/akka-samples/akka-sample-hello/README new file mode 100644 index 0000000000..81a6db8d3e --- /dev/null +++ b/akka-samples/akka-sample-hello/README @@ -0,0 +1,26 @@ +HELLO +===== + +Requirements +------------ + +To build and run FSM you need [Simple Build Tool][sbt] (sbt). + +Running +------- + +First time, 'sbt update' to get dependencies, then to run Ants use 'sbt run'. +Here is an example. First type 'sbt' to start SBT interactively, the run 'update' and 'run': +> cd $AKKA_HOME + +> % sbt + +> > project akka-sample-hello + +> > run + +Notice +------ + +[akka]: http://akka.io +[sbt]: http://code.google.com/p/simple-build-tool/ diff --git a/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Main.scala b/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Main.scala new file mode 100644 index 0000000000..5df2661800 --- /dev/null +++ b/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Main.scala @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package sample.hello + +import akka.actor.{ ActorSystem, Actor } + +case object Start + +object Main { + def main(args: Array[String]): Unit = { + val system = ActorSystem() + system.actorOf[HelloActor] ! Start + } +} + +class HelloActor extends Actor { + val worldActor = system.actorOf[WorldActor] + def receive = { + case Start ⇒ worldActor ! "Hello" + case s: String ⇒ + println("Received message: %s".format(s)) + system.stop() + } +} + +class WorldActor extends Actor { + def receive = { + case s: String ⇒ sender ! s.toUpperCase + " world!" + } +} + diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 6f03df59b2..18a181eb3f 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -42,6 +42,24 @@ class TestActorRef[T <: Actor]( */ def underlyingActor: T = underlyingActorInstance.asInstanceOf[T] + /** + * Registers this actor to be a death monitor of the provided ActorRef + * This means that this actor will get a Terminated()-message when the provided actor + * is permanently terminated. + * + * @return the same ActorRef that is provided to it, to allow for cleaner invocations + */ + def startsWatching(subject: ActorRef): ActorRef = underlying.startsWatching(subject) + + /** + * Deregisters this actor from being a death monitor of the provided ActorRef + * This means that this actor will not get a Terminated()-message when the provided actor + * is permanently terminated. + * + * @return the same ActorRef that is provided to it, to allow for cleaner invocations + */ + def stopsWatching(subject: ActorRef): ActorRef = underlying.stopsWatching(subject) + override def toString = "TestActor[" + address + "]" override def equals(other: Any) = other.isInstanceOf[TestActorRef[_]] && other.asInstanceOf[TestActorRef[_]].address == address diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index a6248ff63c..12096a61b1 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -155,7 +155,10 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach { "stop when sent a poison pill" in { EventFilter[ActorKilledException]() intercept { val a = TestActorRef(Props[WorkerActor]) - testActor startsWatching a + val forwarder = actorOf(Props(new Actor { + watch(a) + def receive = { case x ⇒ testActor forward x } + })) a.!(PoisonPill)(testActor) expectMsgPF(5 seconds) { case Terminated(`a`) ⇒ true diff --git a/akka-tutorials/akka-tutorial-first/pom.xml b/akka-tutorials/akka-tutorial-first/pom.xml index 8e25d972f3..1cec835a9c 100644 --- a/akka-tutorials/akka-tutorial-first/pom.xml +++ b/akka-tutorials/akka-tutorial-first/pom.xml @@ -13,7 +13,7 @@ - se.scalablesolutions.akka + com.typesafe.akka akka-actor 2.0-SNAPSHOT diff --git a/akka-tutorials/akka-tutorial-first/project/TutorialBuild.scala b/akka-tutorials/akka-tutorial-first/project/TutorialBuild.scala new file mode 100644 index 0000000000..5e5ef32493 --- /dev/null +++ b/akka-tutorials/akka-tutorial-first/project/TutorialBuild.scala @@ -0,0 +1,22 @@ +import sbt._ +import Keys._ + +object TutorialBuild extends Build { + lazy val buildSettings = Seq( + organization := "com.typesafe.akka", + version := "2.0-SNAPSHOT", + scalaVersion := "2.9.1" + ) + + lazy val akka = Project( + id = "akka-tutorial-first", + base = file("."), + settings = Defaults.defaultSettings ++ Seq( + libraryDependencies ++= Seq( + "com.typesafe.akka" % "akka-actor" % "2.0-SNAPSHOT", + "junit" % "junit" % "4.5" % "test", + "org.scalatest" % "scalatest_2.9.0" % "1.6.1" % "test", + "com.typesafe.akka" % "akka-testkit" % "2.0-SNAPSHOT" % "test") + ) + ) +} \ No newline at end of file diff --git a/akka-tutorials/akka-tutorial-first/project/build.properties b/akka-tutorials/akka-tutorial-first/project/build.properties index 4981c1c2c3..c6158f7be4 100644 --- a/akka-tutorials/akka-tutorial-first/project/build.properties +++ b/akka-tutorials/akka-tutorial-first/project/build.properties @@ -1,5 +1 @@ -project.organization=se.scalablesolutions.akka -project.name=akka-tutorial-first -project.version=2.0-SNAPSHOT -build.scala.versions=2.9.0 -sbt.version=0.7.7 +sbt.version=0.11.0 \ No newline at end of file diff --git a/akka-tutorials/akka-tutorial-first/project/build/Project.scala b/akka-tutorials/akka-tutorial-first/project/build/Project.scala deleted file mode 100644 index 975f2ce970..0000000000 --- a/akka-tutorials/akka-tutorial-first/project/build/Project.scala +++ /dev/null @@ -1,3 +0,0 @@ -import sbt._ - -class TutorialOneProject(info: ProjectInfo) extends DefaultProject(info) with AkkaProject diff --git a/akka-tutorials/akka-tutorial-first/project/plugins/Plugins.scala b/akka-tutorials/akka-tutorial-first/project/plugins/Plugins.scala deleted file mode 100644 index fb121fcd3e..0000000000 --- a/akka-tutorials/akka-tutorial-first/project/plugins/Plugins.scala +++ /dev/null @@ -1,6 +0,0 @@ -import sbt._ - -class Plugins(info: ProjectInfo) extends PluginDefinition(info) { - val akkaRepo = "Akka Repo" at "http://akka.io/repository" - val akkaPlugin = "se.scalablesolutions.akka" % "akka-sbt-plugin" % "2.0-SNAPSHOT" -} diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java index ca8fe597f7..5a80699ade 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java +++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java @@ -1,182 +1,182 @@ -// * -// * Copyright (C) 2009-2011 Typesafe Inc. +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.tutorial.first.java; -// package akka.tutorial.first.java; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.UntypedActor; +import akka.actor.UntypedActorFactory; +import akka.japi.Creator; +import akka.routing.*; -// import static akka.actor.Actors.poisonPill; -// import static java.util.Arrays.asList; +import java.util.LinkedList; +import java.util.concurrent.CountDownLatch; -// import akka.actor.ActorRef; -// import akka.actor.Actors; -// import akka.actor.ActorSystem; -// import akka.actor.UntypedActor; -// import akka.actor.UntypedActorFactory; -// import akka.routing.RoutedProps; -// import akka.routing.RouterType; -// import akka.routing.LocalConnectionManager; -// import akka.routing.Routing; -// import akka.routing.Routing.Broadcast; -// import scala.collection.JavaConversions; +public class Pi { -// import java.util.LinkedList; -// import java.util.concurrent.CountDownLatch; + public static void main(String[] args) throws Exception { + Pi pi = new Pi(); + pi.calculate(4, 10000, 10000); + } -// public class Pi { + // ==================== + // ===== Messages ===== + // ==================== + static class Calculate { + } -// private static final ActorSystem system = new ActorSystem(); + static class Work { + private final int start; + private final int nrOfElements; -// public static void main(String[] args) throws Exception { -// Pi pi = new Pi(); -// pi.calculate(4, 10000, 10000); -// } + public Work(int start, int nrOfElements) { + this.start = start; + this.nrOfElements = nrOfElements; + } -// // ==================== -// // ===== Messages ===== -// // ==================== -// static class Calculate {} + public int getStart() { + return start; + } -// static class Work { -// private final int start; -// private final int nrOfElements; + public int getNrOfElements() { + return nrOfElements; + } + } -// public Work(int start, int nrOfElements) { -// this.start = start; -// this.nrOfElements = nrOfElements; -// } + static class Result { + private final double value; -// public int getStart() { return start; } -// public int getNrOfElements() { return nrOfElements; } -// } + public Result(double value) { + this.value = value; + } -// static class Result { -// private final double value; + public double getValue() { + return value; + } + } -// public Result(double value) { -// this.value = value; -// } + // ================== + // ===== Worker ===== + // ================== + public static class Worker extends UntypedActor { -// public double getValue() { return value; } -// } + // define the work + private double calculatePiFor(int start, int nrOfElements) { + double acc = 0.0; + for (int i = start * nrOfElements; i <= ((start + 1) * nrOfElements - 1); i++) { + acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1); + } + return acc; + } -// // ================== -// // ===== Worker ===== -// // ================== -// static class Worker extends UntypedActor { + // message handler + public void onReceive(Object message) { + if (message instanceof Work) { + Work work = (Work) message; -// // define the work -// private double calculatePiFor(int start, int nrOfElements) { -// double acc = 0.0; -// for (int i = start * nrOfElements; i <= ((start + 1) * nrOfElements - 1); i++) { -// acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1); -// } -// return acc; -// } + // perform the work + double result = calculatePiFor(work.getStart(), work.getNrOfElements()); -// // message handler -// public void onReceive(Object message) { -// if (message instanceof Work) { -// Work work = (Work) message; + // reply with the result + getSender().tell(new Result(result)); -// // perform the work -// double result = calculatePiFor(work.getStart(), work.getNrOfElements()); + } else throw new IllegalArgumentException("Unknown message [" + message + "]"); + } + } -// // reply with the result -// getSender().tell(new Result(result)); + // ================== + // ===== Master ===== + // ================== + public static class Master extends UntypedActor { + private final int nrOfMessages; + private final int nrOfElements; + private final CountDownLatch latch; -// } else throw new IllegalArgumentException("Unknown message [" + message + "]"); -// } -// } + private double pi; + private int nrOfResults; + private long start; -// // ================== -// // ===== Master ===== -// // ================== -// static class Master extends UntypedActor { -// private final int nrOfMessages; -// private final int nrOfElements; -// private final CountDownLatch latch; + private ActorRef router; -// private double pi; -// private int nrOfResults; -// private long start; + public Master(final int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) { + this.nrOfMessages = nrOfMessages; + this.nrOfElements = nrOfElements; + this.latch = latch; + Creator routerCreator = new Creator() { + public Router create() { + return new RoundRobinRouter(dispatcher(), new akka.actor.Timeout(-1)); + } + }; + LinkedList actors = new LinkedList() { + { + for (int i = 0; i < nrOfWorkers; i++) add(context().actorOf(Worker.class)); + } + }; + RoutedProps props = new RoutedProps(routerCreator, new LocalConnectionManager(actors), new akka.actor.Timeout(-1), true); + router = new RoutedActorRef(system(), props, getSelf(), "pi"); + } -// private ActorRef router; + // message handler + public void onReceive(Object message) { -// public Master(int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) { -// this.nrOfMessages = nrOfMessages; -// this.nrOfElements = nrOfElements; -// this.latch = latch; + if (message instanceof Calculate) { + // schedule work + for (int start = 0; start < nrOfMessages; start++) { + router.tell(new Work(start, nrOfElements), getSelf()); + } -// LinkedList workers = new LinkedList(); -// for (int i = 0; i < nrOfWorkers; i++) { -// ActorRef worker = system.actorOf(Worker.class); -// workers.add(worker); -// } + } else if (message instanceof Result) { -// router = system.actorOf(new RoutedProps().withRoundRobinRouter().withLocalConnections(workers), "pi"); -// } + // handle result from the worker + Result result = (Result) message; + pi += result.getValue(); + nrOfResults += 1; + if (nrOfResults == nrOfMessages) getSelf().stop(); -// // message handler -// public void onReceive(Object message) { + } else throw new IllegalArgumentException("Unknown message [" + message + "]"); + } -// if (message instanceof Calculate) { -// // schedule work -// for (int start = 0; start < nrOfMessages; start++) { -// router.tell(new Work(start, nrOfElements), getSelf()); -// } + @Override + public void preStart() { + start = System.currentTimeMillis(); + } -// // send a PoisonPill to all workers telling them to shut down themselves -// router.tell(new Broadcast(poisonPill())); + @Override + public void postStop() { + // tell the world that the calculation is complete + System.out.println(String.format( + "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis", + pi, (System.currentTimeMillis() - start))); + latch.countDown(); + } + } -// // send a PoisonPill to the router, telling him to shut himself down -// router.tell(poisonPill()); + // ================== + // ===== Run it ===== + // ================== + public void calculate(final int nrOfWorkers, final int nrOfElements, final int nrOfMessages) + throws Exception { + final ActorSystem system = ActorSystem.create(); -// } else if (message instanceof Result) { + // this latch is only plumbing to know when the calculation is completed + final CountDownLatch latch = new CountDownLatch(1); -// // handle result from the worker -// Result result = (Result) message; -// pi += result.getValue(); -// nrOfResults += 1; -// if (nrOfResults == nrOfMessages) getSelf().stop(); + // create the master + ActorRef master = system.actorOf(new UntypedActorFactory() { + public UntypedActor create() { + return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch); + } + }); -// } else throw new IllegalArgumentException("Unknown message [" + message + "]"); -// } + // start the calculation + master.tell(new Calculate()); -// @Override -// public void preStart() { -// start = System.currentTimeMillis(); -// } + // wait for master to shut down + latch.await(); -// @Override -// public void postStop() { -// // tell the world that the calculation is complete -// System.out.println(String.format( -// "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis", -// pi, (System.currentTimeMillis() - start))); -// latch.countDown(); -// } -// } - -// // ================== -// // ===== Run it ===== -// // ================== -// public void calculate(final int nrOfWorkers, final int nrOfElements, final int nrOfMessages) -// throws Exception { - -// // this latch is only plumbing to know when the calculation is completed -// final CountDownLatch latch = new CountDownLatch(1); - -// // create the master -// ActorRef master = system.actorOf(new UntypedActorFactory() { -// public UntypedActor create() { -// return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch); -// } -// }); - -// // start the calculation -// master.tell(new Calculate()); - -// // wait for master to shut down -// latch.await(); -// } -// } + // Shut down the system + system.stop(); + } +} diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index 836f766e12..3283a591f4 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -1,113 +1,109 @@ -// /** -// * Copyright (C) 2009-2011 Typesafe Inc. -// */ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.tutorial.first.scala -// package akka.tutorial.first.scala +import java.util.concurrent.CountDownLatch +import akka.routing.{ RoutedActorRef, LocalConnectionManager, RoundRobinRouter, RoutedProps } +import akka.actor.{ ActorSystemImpl, Actor, ActorSystem } -// import akka.actor.{ Actor, PoisonPill, ActorSystem } -// import Actor._ -// import java.util.concurrent.CountDownLatch -// import akka.routing.Routing.Broadcast -// import akka.routing.{ RoutedProps, Routing } +object Pi extends App { -// object Pi extends App { + // Initiate the calculation + calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) -// val system = ActorSystem() + // ==================== + // ===== Messages ===== + // ==================== + sealed trait PiMessage -// calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) + case object Calculate extends PiMessage -// // ==================== -// // ===== Messages ===== -// // ==================== -// sealed trait PiMessage + case class Work(start: Int, nrOfElements: Int) extends PiMessage -// case object Calculate extends PiMessage + case class Result(value: Double) extends PiMessage -// case class Work(start: Int, nrOfElements: Int) extends PiMessage + // ================== + // ===== Worker ===== + // ================== + class Worker extends Actor { -// case class Result(value: Double) extends PiMessage + // define the work + def calculatePiFor(start: Int, nrOfElements: Int): Double = { + var acc = 0.0 + for (i ← start until (start + nrOfElements)) + acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1) + acc + } -// // ================== -// // ===== Worker ===== -// // ================== -// class Worker extends Actor { + def receive = { + case Work(start, nrOfElements) ⇒ sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work + } + } -// // define the work -// def calculatePiFor(start: Int, nrOfElements: Int): Double = { -// var acc = 0.0 -// for (i ← start until (start + nrOfElements)) -// acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1) -// acc -// } + // ================== + // ===== Master ===== + // ================== + class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch) + extends Actor { -// def receive = { -// case Work(start, nrOfElements) ⇒ sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work -// } -// } + var pi: Double = _ + var nrOfResults: Int = _ + var start: Long = _ -// // ================== -// // ===== Master ===== -// // ================== -// class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch) -// extends Actor { + // create the workers + val workers = Vector.fill(nrOfWorkers)(system.actorOf[Worker]) -// var pi: Double = _ -// var nrOfResults: Int = _ -// var start: Long = _ + // wrap them with a load-balancing router + val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(workers)) + val router = new RoutedActorRef(system, props, self, "pi") -// // create the workers -// val workers = Vector.fill(nrOfWorkers)(system.actorOf[Worker]) + // message handler + def receive = { + case Calculate ⇒ + // schedule work + for (i ← 0 until nrOfMessages) router ! Work(i * nrOfElements, nrOfElements) + case Result(value) ⇒ + // handle result from the worker + pi += value + nrOfResults += 1 -// // wrap them with a load-balancing router -// val router = system.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi") + // Stop this actor and all its supervised children + if (nrOfResults == nrOfMessages) self.stop() + } -// // message handler -// def receive = { -// case Calculate ⇒ -// // schedule work -// for (i ← 0 until nrOfMessages) router ! Work(i * nrOfElements, nrOfElements) + override def preStart() { + start = System.currentTimeMillis + } -// // send a PoisonPill to all workers telling them to shut down themselves -// router ! Broadcast(PoisonPill) + override def postStop() { + // tell the world that the calculation is complete + println( + "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis" + .format(pi, (System.currentTimeMillis - start))) + latch.countDown() + } + } -// // send a PoisonPill to the router, telling him to shut himself down -// router ! PoisonPill + // ================== + // ===== Run it ===== + // ================== + def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { + val system = ActorSystem() -// case Result(value) ⇒ -// // handle result from the worker -// pi += value -// nrOfResults += 1 -// if (nrOfResults == nrOfMessages) self.stop() -// } + // this latch is only plumbing to know when the calculation is completed + val latch = new CountDownLatch(1) -// override def preStart() { -// start = System.currentTimeMillis -// } + // create the master + val master = system.actorOf(new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)) -// override def postStop() { -// // tell the world that the calculation is complete -// println( -// "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis" -// .format(pi, (System.currentTimeMillis - start))) -// latch.countDown() -// } -// } + // start the calculation + master ! Calculate -// // ================== -// // ===== Run it ===== -// // ================== -// def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { + // wait for master to shut down + latch.await() -// // this latch is only plumbing to know when the calculation is completed -// val latch = new CountDownLatch(1) - -// // create the master -// val master = system.actorOf(new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)) - -// // start the calculation -// master ! Calculate - -// // wait for master to shut down -// latch.await() -// } -// } + // Shut down the system + system.stop() + } +} diff --git a/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala b/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala new file mode 100644 index 0000000000..de5851bfe7 --- /dev/null +++ b/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.tutorial.first.scala + +import org.junit.runner.RunWith +import org.scalatest.matchers.MustMatchers +import org.scalatest.WordSpec +import akka.testkit.TestActorRef +import akka.tutorial.first.scala.Pi.Worker +import akka.actor.ActorSystem + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class WorkerSpec extends WordSpec with MustMatchers { + + implicit def system = ActorSystem() + + "Worker" must { + "calculate pi correctly" in { + val testActor = TestActorRef[Worker] + val actor = testActor.underlyingActor + actor.calculatePiFor(0, 0) must equal(0.0) + actor.calculatePiFor(1, 1) must be(-1.3333333333333333 plusOrMinus 0.0000000001) + } + } +} \ No newline at end of file diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000000..13467e1654 --- /dev/null +++ b/build.sbt @@ -0,0 +1,7 @@ + seq(lsSettings:_*) + + (LsKeys.tags in LsKeys.lsync) := Seq("actors", "stm", "concurrency", "distributed", "fault-tolerance", "scala", "java", "futures", "dataflow", "remoting") + + (externalResolvers in LsKeys.lsync) := Seq("Akka Repository" at "http://akka.io/repository/") + + (description in LsKeys.lsync) := "Akka is the platform for the next generation of event-driven, scalable and fault-tolerant architectures on the JVM." diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index fdc8ecd877..5359a05e66 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -215,7 +215,7 @@ object AkkaBuild extends Build { id = "akka-samples", base = file("akka-samples"), settings = parentSettings, - aggregate = Seq(fsmSample) + aggregate = Seq(fsmSample, helloSample) ) lazy val fsmSample = Project( @@ -224,27 +224,36 @@ object AkkaBuild extends Build { dependencies = Seq(actor), settings = defaultSettings ) - + + lazy val helloSample = Project( + id = "akka-sample-hello", + base = file("akka-samples/akka-sample-hello"), + dependencies = Seq(actor), + settings = defaultSettings + ) + lazy val tutorials = Project( id = "akka-tutorials", base = file("akka-tutorials"), settings = parentSettings, - aggregate = Seq(firstTutorial, secondTutorial) + aggregate = Seq(firstTutorial) ) lazy val firstTutorial = Project( id = "akka-tutorial-first", base = file("akka-tutorials/akka-tutorial-first"), - dependencies = Seq(actor), - settings = defaultSettings + dependencies = Seq(actor, testkit), + settings = defaultSettings ++ Seq( + libraryDependencies ++= Dependencies.tutorials + ) ) - lazy val secondTutorial = Project( - id = "akka-tutorial-second", - base = file("akka-tutorials/akka-tutorial-second"), - dependencies = Seq(actor), - settings = defaultSettings - ) + // lazy val secondTutorial = Project( + // id = "akka-tutorial-second", + // base = file("akka-tutorials/akka-tutorial-second"), + // dependencies = Seq(actor), + // settings = defaultSettings + // ) lazy val docs = Project( id = "akka-docs", @@ -406,6 +415,8 @@ object Dependencies { // val sampleCamel = Seq(camelCore, camelSpring, commonsCodec, Runtime.camelJms, Runtime.activemq, Runtime.springJms, // Test.junit, Test.scalatest, Test.logback) + val tutorials = Seq(Test.scalatest, Test.junit) + val docs = Seq(Test.scalatest, Test.junit) } diff --git a/project/plugins.sbt b/project/plugins.sbt index 8b6d17a0c3..7140718543 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,3 +4,9 @@ resolvers += Classpaths.typesafeResolver addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.1.7") addSbtPlugin("com.typesafe.sbtscalariform" % "sbt-scalariform" % "0.1.4") + +resolvers ++= Seq( + "less is" at "http://repo.lessis.me", + "coda" at "http://repo.codahale.com") + +addSbtPlugin("me.lessis" % "ls-sbt" % "0.1.0")