diff --git a/.gitignore b/.gitignore index c0fa0f10b4..8613a0ba79 100755 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,4 @@ run-codefellow multiverse.log .eprj .*.swp +akka-tutorials/akka-tutorial-pi-sbt/project/boot/ \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index e70b4a98ae..aef9055bc5 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -6,15 +6,10 @@ package akka.actor import akka.dispatch._ import akka.config.Config._ -import akka.config.Supervision._ -import akka.config.ConfigurationException import akka.util.Helpers.{narrow, narrowSilently} import akka.util.ListenerManagement import akka.AkkaException -import java.util.concurrent.TimeUnit -import java.net.InetSocketAddress - import scala.reflect.BeanProperty import akka.util. {ReflectiveAccess, Duration} import akka.remoteinterface.RemoteSupport @@ -61,6 +56,8 @@ case class UnlinkAndStop(child: ActorRef) extends AutoReceivedMessage with LifeC case object PoisonPill extends AutoReceivedMessage with LifeCycleMessage +case object Kill extends AutoReceivedMessage with LifeCycleMessage + case object ReceiveTimeout extends LifeCycleMessage case class MaximumNumberOfRestartsWithinTimeRangeReached( @@ -277,9 +274,6 @@ object Actor extends ListenerManagement { * } * * - *
- * The Actor trait also has a 'log' member field that can be used for logging within the Actor. - * * @author Jonas Bonér */ trait Actor { @@ -353,7 +347,7 @@ trait Actor { * * Example code: *
- * def receive = {
+ * def receive = {
* case Ping =>
* println("got a 'Ping' message")
* self.reply("pong")
@@ -465,6 +459,7 @@ trait Actor {
case Unlink(child) => self.unlink(child)
case UnlinkAndStop(child) => self.unlink(child); child.stop
case Restart(reason) => throw reason
+ case Kill => throw new ActorKilledException("Kill")
case PoisonPill =>
val f = self.senderFuture
self.stop
diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala
index 185f0d2799..e9e4168995 100644
--- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala
+++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala
@@ -17,7 +17,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.io.{PrintWriter, PrintStream}
trait RemoteModule {
- val UUID_PREFIX = "uuid:"
+ val UUID_PREFIX = "uuid:".intern
def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope
protected[akka] def notifyListeners(message: => Any): Unit
@@ -84,7 +84,6 @@ case class RemoteClientWriteFailed(
@BeanProperty client: RemoteClientModule,
@BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
-
/**
* Life-cycle events for RemoteServer.
*/
@@ -120,7 +119,12 @@ class RemoteClientException private[akka] (
val remoteAddress: InetSocketAddress) extends AkkaException(message)
/**
- * Returned when a remote exception sent over the wire cannot be loaded and instantiated
+ * Thrown when the remote server actor dispatching fails for some reason.
+ */
+class RemoteServerException private[akka] (message: String) extends AkkaException(message)
+
+/**
+ * Thrown when a remote exception sent over the wire cannot be loaded and instantiated
*/
case class CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException private[akka] (cause: Throwable, originalClassName: String, originalMessage: String)
extends AkkaException("\nParsingError[%s]\nOriginalException[%s]\nOriginalMessage[%s]"
@@ -140,6 +144,7 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule
}
def shutdown {
+ eventHandler.stop
removeListener(eventHandler)
this.shutdownClientModule
this.shutdownServerModule
@@ -484,4 +489,4 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule =>
@deprecated("Will be removed after 1.1")
private[akka] def unregisterClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit
-}
\ No newline at end of file
+}
diff --git a/akka-actor/src/main/scala/akka/routing/Iterators.scala b/akka-actor/src/main/scala/akka/routing/Iterators.scala
index a01cc6fe2d..f076ea00c1 100644
--- a/akka-actor/src/main/scala/akka/routing/Iterators.scala
+++ b/akka-actor/src/main/scala/akka/routing/Iterators.scala
@@ -10,15 +10,17 @@ import scala.collection.JavaConversions._
/**
* An Iterator that is either always empty or yields an infinite number of Ts.
*/
-trait InfiniteIterator[T] extends Iterator[T]
+trait InfiniteIterator[T] extends Iterator[T] {
+ val items: Seq[T]
+}
/**
* CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List.
*/
-class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
- def this(items: java.util.List[T]) = this(items.toList)
+case class CyclicIterator[T](val items: Seq[T]) extends InfiniteIterator[T] {
+ def this(items: java.util.List[T]) = this(items.toSeq)
- @volatile private[this] var current: List[T] = items
+ @volatile private[this] var current: Seq[T] = items
def hasNext = items != Nil
@@ -29,15 +31,14 @@ class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
}
override def exists(f: T => Boolean): Boolean = items.exists(f)
-
}
/**
* This InfiniteIterator always returns the Actor that has the currently smallest mailbox
* useful for work-stealing.
*/
-class SmallestMailboxFirstIterator(items : List[ActorRef]) extends InfiniteIterator[ActorRef] {
- def this(items: java.util.List[ActorRef]) = this(items.toList)
+case class SmallestMailboxFirstIterator(val items : Seq[ActorRef]) extends InfiniteIterator[ActorRef] {
+ def this(items: java.util.List[ActorRef]) = this(items.toSeq)
def hasNext = items != Nil
def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2)
diff --git a/akka-actor/src/main/scala/akka/routing/Routers.scala b/akka-actor/src/main/scala/akka/routing/Routers.scala
index b0283ce77d..57511076e8 100644
--- a/akka-actor/src/main/scala/akka/routing/Routers.scala
+++ b/akka-actor/src/main/scala/akka/routing/Routers.scala
@@ -15,7 +15,11 @@ trait Dispatcher { this: Actor =>
protected def routes: PartialFunction[Any, ActorRef]
+ protected def broadcast(message: Any) {}
+
protected def dispatch: Receive = {
+ case Routing.Broadcast(message) =>
+ broadcast(message)
case a if routes.isDefinedAt(a) =>
if (isSenderDefined) routes(a).forward(transform(a))(someSelf)
else routes(a).!(transform(a))(None)
@@ -34,15 +38,19 @@ abstract class UntypedDispatcher extends UntypedActor {
protected def route(msg: Any): ActorRef
+ protected def broadcast(message: Any) {}
+
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
@throws(classOf[Exception])
def onReceive(msg: Any): Unit = {
- val r = route(msg)
- if(r eq null)
- throw new IllegalStateException("No route for " + msg + " defined!")
- if (isSenderDefined) r.forward(transform(msg))(someSelf)
- else r.!(transform(msg))(None)
+ if (msg.isInstanceOf[Routing.Broadcast]) broadcast(msg.asInstanceOf[Routing.Broadcast].message)
+ else {
+ val r = route(msg)
+ if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!")
+ if (isSenderDefined) r.forward(transform(msg))(someSelf)
+ else r.!(transform(msg))(None)
+ }
}
}
@@ -53,7 +61,11 @@ abstract class UntypedDispatcher extends UntypedActor {
trait LoadBalancer extends Dispatcher { self: Actor =>
protected def seq: InfiniteIterator[ActorRef]
- protected def routes = { case x if seq.hasNext => seq.next }
+ protected def routes = {
+ case x if seq.hasNext => seq.next
+ }
+
+ override def broadcast(message: Any) = seq.items.foreach(_ ! message)
override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) )
}
@@ -69,5 +81,7 @@ abstract class UntypedLoadBalancer extends UntypedDispatcher {
if (seq.hasNext) seq.next
else null
+ override def broadcast(message: Any) = seq.items.foreach(_ ! message)
+
override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) )
}
diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala
index 1d43950f8b..2e041a4e35 100644
--- a/akka-actor/src/main/scala/akka/routing/Routing.scala
+++ b/akka-actor/src/main/scala/akka/routing/Routing.scala
@@ -9,6 +9,9 @@ import akka.actor.Actor._
object Routing {
+ sealed trait RoutingMessage
+ case class Broadcast(message: Any) extends RoutingMessage
+
type PF[A, B] = PartialFunction[A, B]
/**
@@ -31,7 +34,7 @@ object Routing {
/**
* Creates a LoadBalancer from the thunk-supplied InfiniteIterator.
*/
- def loadBalancerActor(actors: => InfiniteIterator[ActorRef]): ActorRef =
+ def loadBalancerActor(actors: => InfiniteIterator[ActorRef]): ActorRef =
actorOf(new Actor with LoadBalancer {
val seq = actors
}).start
@@ -39,7 +42,7 @@ object Routing {
/**
* Creates a Dispatcher given a routing and a message-transforming function.
*/
- def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) => Any): ActorRef =
+ def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) => Any): ActorRef =
actorOf(new Actor with Dispatcher {
override def transform(msg: Any) = msgTransformer(msg)
def routes = routing
@@ -48,7 +51,7 @@ object Routing {
/**
* Creates a Dispatcher given a routing.
*/
- def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = actorOf(new Actor with Dispatcher {
+ def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = actorOf(new Actor with Dispatcher {
def routes = routing
}).start
diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
index 41d1106818..f4ceba6ebe 100644
--- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
+++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
@@ -6,7 +6,6 @@ package akka.util
import akka.dispatch.{Future, CompletableFuture, MessageInvocation}
import akka.config.{Config, ModuleNotAvailableException}
-import akka.AkkaException
import java.net.InetSocketAddress
import akka.remoteinterface.RemoteSupport
@@ -45,13 +44,13 @@ object ReflectiveAccess {
def ensureEnabled = if (!isEnabled) {
val e = new ModuleNotAvailableException(
"Can't load the remoting module, make sure that akka-remote.jar is on the classpath")
- EventHandler.warning(this, e.toString)
+ EventHandler.debug(this, e.toString)
throw e
}
val remoteSupportClass: Option[Class[_ <: RemoteSupport]] = getClassFor(TRANSPORT)
- protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] =
- remoteSupportClass map { remoteClass =>
+ protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] =
+ remoteSupportClass map { remoteClass =>
() => createInstance[RemoteSupport](
remoteClass,
Array[Class[_]](),
@@ -59,7 +58,7 @@ object ReflectiveAccess {
) getOrElse {
val e = new ModuleNotAvailableException(
"Can't instantiate [%s] - make sure that akka-remote.jar is on the classpath".format(remoteClass.getName))
- EventHandler.warning(this, e.toString)
+ EventHandler.debug(this, e.toString)
throw e
}
}
@@ -135,7 +134,7 @@ object ReflectiveAccess {
Some(ctor.newInstance(args: _*).asInstanceOf[T])
} catch {
case e: Exception =>
- EventHandler.warning(this, e.toString)
+ EventHandler.debug(this, e.toString)
None
}
@@ -154,7 +153,7 @@ object ReflectiveAccess {
}
} catch {
case e: Exception =>
- EventHandler.warning(this, e.toString)
+ EventHandler.debug(this, e.toString)
None
}
@@ -168,7 +167,7 @@ object ReflectiveAccess {
}
} catch {
case e: ExceptionInInitializerError =>
- EventHandler.warning(this, e.toString)
+ EventHandler.debug(this, e.toString)
throw e
}
@@ -176,23 +175,23 @@ object ReflectiveAccess {
assert(fqn ne null)
// First, use the specified CL
- val first = try {
- Option(classloader.loadClass(fqn).asInstanceOf[Class[T]])
- } catch {
- case c: ClassNotFoundException =>
- EventHandler.warning(this, c.toString)
- None
- }
+ val first = try {
+ Option(classloader.loadClass(fqn).asInstanceOf[Class[T]])
+ } catch {
+ case c: ClassNotFoundException =>
+ EventHandler.debug(this, c.toString)
+ None
+ }
if (first.isDefined) first
- else {
+ else {
// Second option is to use the ContextClassLoader
- val second = try {
- Option(Thread.currentThread.getContextClassLoader.loadClass(fqn).asInstanceOf[Class[T]])
- } catch {
- case c: ClassNotFoundException =>
- EventHandler.warning(this, c.toString)
- None
+ val second = try {
+ Option(Thread.currentThread.getContextClassLoader.loadClass(fqn).asInstanceOf[Class[T]])
+ } catch {
+ case c: ClassNotFoundException =>
+ EventHandler.debug(this, c.toString)
+ None
}
if (second.isDefined) second
@@ -201,22 +200,22 @@ object ReflectiveAccess {
// Don't try to use "loader" if we got the default "classloader" parameter
if (classloader ne loader) Option(loader.loadClass(fqn).asInstanceOf[Class[T]])
else None
- } catch {
- case c: ClassNotFoundException =>
- EventHandler.warning(this, c.toString)
- None
+ } catch {
+ case c: ClassNotFoundException =>
+ EventHandler.debug(this, c.toString)
+ None
}
if (third.isDefined) third
else {
// Last option is Class.forName
- try {
+ try {
Option(Class.forName(fqn).asInstanceOf[Class[T]])
- } catch {
- case c: ClassNotFoundException =>
- EventHandler.warning(this, c.toString)
- None
- }
+ } catch {
+ case c: ClassNotFoundException =>
+ EventHandler.debug(this, c.toString)
+ None
+ }
}
}
}
diff --git a/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala
index 1eef7f068c..ea31d57287 100644
--- a/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala
+++ b/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala
@@ -1,18 +1,25 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB
+ */
+
package akka.actor
-import java.util.concurrent.{TimeUnit, CyclicBarrier, TimeoutException}
-import akka.config.Supervision._
-import org.scalatest.junit.JUnitSuite
-import org.junit.Test
+import org.scalatest.WordSpec
+import org.scalatest.matchers.MustMatchers
+import org.scalatest.BeforeAndAfterEach
+
+import akka.testing._
+import akka.testing.Testing.sleepFor
+import akka.util.duration._
-import akka.dispatch.Dispatchers
import Actor._
+import akka.config.Supervision._
+import akka.dispatch.Dispatchers
-import akka.Testing
object ActorFireForgetRequestReplySpec {
- class ReplyActor extends Actor {
+ class ReplyActor extends Actor {
def receive = {
case "Send" =>
self.reply("Reply")
@@ -32,7 +39,6 @@ object ActorFireForgetRequestReplySpec {
}
class SenderActor(replyActor: ActorRef) extends Actor {
-
def receive = {
case "Init" =>
replyActor ! "Send"
@@ -50,44 +56,42 @@ object ActorFireForgetRequestReplySpec {
object state {
var s = "NIL"
- val finished = new CyclicBarrier(2)
+ val finished = TestBarrier(2)
}
}
-class ActorFireForgetRequestReplySpec extends JUnitSuite {
+class ActorFireForgetRequestReplySpec extends WordSpec with MustMatchers with BeforeAndAfterEach {
import ActorFireForgetRequestReplySpec._
- @Test
- def shouldReplyToBangMessageUsingReply = {
+ override def beforeEach() = {
state.finished.reset
- val replyActor = actorOf[ReplyActor].start
- val senderActor = actorOf(new SenderActor(replyActor)).start
- senderActor ! "Init"
- try { state.finished.await(1L, TimeUnit.SECONDS) }
- catch { case e: TimeoutException => fail("Never got the message") }
- assert("Reply" === state.s)
- }
+ }
+
+ "An Actor" must {
- @Test
- def shouldReplyToBangMessageUsingImplicitSender = {
- state.finished.reset
- val replyActor = actorOf[ReplyActor].start
- val senderActor = actorOf(new SenderActor(replyActor)).start
- senderActor ! "InitImplicit"
- try { state.finished.await(1L, TimeUnit.SECONDS) }
- catch { case e: TimeoutException => fail("Never got the message") }
- assert("ReplyImplicit" === state.s)
- }
+ "reply to bang message using reply" in {
+ val replyActor = actorOf[ReplyActor].start
+ val senderActor = actorOf(new SenderActor(replyActor)).start
+ senderActor ! "Init"
+ state.finished.await
+ state.s must be ("Reply")
+ }
- @Test
- def shouldShutdownCrashedTemporaryActor = {
- state.finished.reset
- val actor = actorOf[CrashingTemporaryActor].start
- assert(actor.isRunning)
- actor ! "Die"
- try { state.finished.await(10L, TimeUnit.SECONDS) }
- catch { case e: TimeoutException => fail("Never got the message") }
- Thread.sleep(Testing.time(500))
- assert(actor.isShutdown)
+ "reply to bang message using implicit sender" in {
+ val replyActor = actorOf[ReplyActor].start
+ val senderActor = actorOf(new SenderActor(replyActor)).start
+ senderActor ! "InitImplicit"
+ state.finished.await
+ state.s must be ("ReplyImplicit")
+ }
+
+ "should shutdown crashed temporary actor" in {
+ val actor = actorOf[CrashingTemporaryActor].start
+ actor.isRunning must be (true)
+ actor ! "Die"
+ state.finished.await
+ sleepFor(1 second)
+ actor.isShutdown must be (true)
+ }
}
}
diff --git a/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala
index d5d6b28edc..d379f3f98c 100644
--- a/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala
+++ b/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala
@@ -4,19 +4,20 @@
package akka.actor
-import org.scalatest.Spec
-import org.scalatest.matchers.ShouldMatchers
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.junit.JUnitRunner
-import org.junit.runner.RunWith
+import org.scalatest.WordSpec
+import org.scalatest.matchers.MustMatchers
+import akka.testing._
+import akka.util.duration._
+import akka.testing.Testing.sleepFor
+import akka.config.Supervision.{OneForOneStrategy}
import akka.actor._
import akka.dispatch.Future
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.{TimeUnit, CountDownLatch}
object ActorRefSpec {
- var latch = new CountDownLatch(4)
+ val latch = TestLatch(4)
class ReplyActor extends Actor {
var replyTo: Channel[Any] = null
@@ -49,7 +50,7 @@ object ActorRefSpec {
}
private def work {
- Thread.sleep(1000)
+ sleepFor(1 second)
}
}
@@ -69,16 +70,12 @@ object ActorRefSpec {
}
}
-@RunWith(classOf[JUnitRunner])
-class ActorRefSpec extends
- Spec with
- ShouldMatchers with
- BeforeAndAfterAll {
-
+class ActorRefSpec extends WordSpec with MustMatchers {
import ActorRefSpec._
- describe("ActorRef") {
- it("should support to reply via channel") {
+ "An ActorRef" must {
+
+ "support reply via channel" in {
val serverRef = Actor.actorOf[ReplyActor].start
val clientRef = Actor.actorOf(new SenderActor(serverRef)).start
@@ -86,18 +83,23 @@ class ActorRefSpec extends
clientRef ! "simple"
clientRef ! "simple"
clientRef ! "simple"
- assert(latch.await(4L, TimeUnit.SECONDS))
- latch = new CountDownLatch(4)
+
+ latch.await
+
+ latch.reset
+
clientRef ! "complex2"
clientRef ! "simple"
clientRef ! "simple"
clientRef ! "simple"
- assert(latch.await(4L, TimeUnit.SECONDS))
+
+ latch.await
+
clientRef.stop
serverRef.stop
}
- it("should stop when sent a poison pill") {
+ "stop when sent a poison pill" in {
val ref = Actor.actorOf(
new Actor {
def receive = {
@@ -115,11 +117,34 @@ class ActorRefSpec extends
fail("shouldn't get here")
}
- assert(ffive.resultOrException.get == "five")
- assert(fnull.resultOrException.get == "null")
+ ffive.resultOrException.get must be ("five")
+ fnull.resultOrException.get must be ("null")
- assert(ref.isRunning == false)
- assert(ref.isShutdown == true)
+ ref.isRunning must be (false)
+ ref.isShutdown must be (true)
+ }
+
+ "restart when Kill:ed" in {
+ val latch = new CountDownLatch(2)
+
+ val boss = Actor.actorOf(new Actor{
+ self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), scala.Some(2), scala.Some(1000))
+
+ val ref = Actor.actorOf(
+ new Actor {
+ def receive = { case _ => }
+ override def preRestart(reason: Throwable) = latch.countDown
+ override def postRestart(reason: Throwable) = latch.countDown
+ }
+ ).start
+
+ self link ref
+
+ protected def receive = { case "sendKill" => ref ! Kill }
+ }).start
+
+ boss ! "sendKill"
+ latch.await(5, TimeUnit.SECONDS) must be === true
}
}
}
diff --git a/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala
index cf910925c8..91e6f92873 100644
--- a/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala
+++ b/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala
@@ -4,34 +4,31 @@
package akka.actor
-import org.scalatest.junit.JUnitSuite
-import org.junit.Test
+import org.scalatest.WordSpec
+import org.scalatest.matchers.MustMatchers
+
+import akka.testing._
+
import FSM._
-
-import org.multiverse.api.latches.StandardLatch
-
-import java.util.concurrent.TimeUnit
-
+import akka.util.Duration
import akka.util.duration._
-import akka.Testing
object FSMActorSpec {
-
- val unlockedLatch = new StandardLatch
- val lockedLatch = new StandardLatch
- val unhandledLatch = new StandardLatch
- val terminatedLatch = new StandardLatch
- val transitionLatch = new StandardLatch
- val initialStateLatch = new StandardLatch
- val transitionCallBackLatch = new StandardLatch
+ val unlockedLatch = TestLatch()
+ val lockedLatch = TestLatch()
+ val unhandledLatch = TestLatch()
+ val terminatedLatch = TestLatch()
+ val transitionLatch = TestLatch()
+ val initialStateLatch = TestLatch()
+ val transitionCallBackLatch = TestLatch()
sealed trait LockState
case object Locked extends LockState
case object Open extends LockState
- class Lock(code: String, timeout: (Long, TimeUnit)) extends Actor with FSM[LockState, CodeState] {
+ class Lock(code: String, timeout: Duration) extends Actor with FSM[LockState, CodeState] {
startWith(Locked, CodeState("", code))
@@ -94,53 +91,53 @@ object FSMActorSpec {
case class CodeState(soFar: String, code: String)
}
-class FSMActorSpec extends JUnitSuite {
+class FSMActorSpec extends WordSpec with MustMatchers {
import FSMActorSpec._
+ "An FSM Actor" must {
- @Test
- def unlockTheLock = {
+ "unlock the lock" in {
+
+ // lock that locked after being open for 1 sec
+ val lock = Actor.actorOf(new Lock("33221", 1 second)).start
- // lock that locked after being open for 1 sec
- val lock = Actor.actorOf(new Lock("33221", (Testing.time(1), TimeUnit.SECONDS))).start
+ val transitionTester = Actor.actorOf(new Actor { def receive = {
+ case Transition(_, _, _) => transitionCallBackLatch.open
+ case CurrentState(_, Locked) => initialStateLatch.open
+ }}).start
- val transitionTester = Actor.actorOf(new Actor { def receive = {
- case Transition(_, _, _) => transitionCallBackLatch.open
- case CurrentState(_, Locked) => initialStateLatch.open
- }}).start
+ lock ! SubscribeTransitionCallBack(transitionTester)
+ initialStateLatch.await
- lock ! SubscribeTransitionCallBack(transitionTester)
- assert(initialStateLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
+ lock ! '3'
+ lock ! '3'
+ lock ! '2'
+ lock ! '2'
+ lock ! '1'
- lock ! '3'
- lock ! '3'
- lock ! '2'
- lock ! '2'
- lock ! '1'
+ unlockedLatch.await
+ transitionLatch.await
+ transitionCallBackLatch.await
+ lockedLatch.await
- assert(unlockedLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
- assert(transitionLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
- assert(transitionCallBackLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
- assert(lockedLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))
+ lock ! "not_handled"
+ unhandledLatch.await
+ val answerLatch = TestLatch()
+ object Hello
+ object Bye
+ val tester = Actor.actorOf(new Actor {
+ protected def receive = {
+ case Hello => lock ! "hello"
+ case "world" => answerLatch.open
+ case Bye => lock ! "bye"
+ }
+ }).start
+ tester ! Hello
+ answerLatch.await
- lock ! "not_handled"
- assert(unhandledLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))
-
- val answerLatch = new StandardLatch
- object Hello
- object Bye
- val tester = Actor.actorOf(new Actor {
- protected def receive = {
- case Hello => lock ! "hello"
- case "world" => answerLatch.open
- case Bye => lock ! "bye"
- }
- }).start
- tester ! Hello
- assert(answerLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))
-
- tester ! Bye
- assert(terminatedLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))
+ tester ! Bye
+ terminatedLatch.await
+ }
}
}
diff --git a/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala
index a59785ab7a..2ea9525a3d 100644
--- a/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala
+++ b/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala
@@ -1,16 +1,13 @@
package akka.actor
-import akka.testkit.TestKit
-import akka.util.duration._
-
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
-class FSMTimingSpec
- extends WordSpec
- with MustMatchers
- with TestKit {
+import akka.testkit.TestKit
+import akka.util.duration._
+
+class FSMTimingSpec extends WordSpec with MustMatchers with TestKit {
import FSMTimingSpec._
import FSM._
@@ -140,4 +137,3 @@ object FSMTimingSpec {
}
-// vim: set ts=2 sw=2 et:
diff --git a/akka-actor/src/test/scala/akka/actor/actor/ForwardActorSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ForwardActorSpec.scala
index 3a1efe1fe8..b52b7c6b14 100644
--- a/akka-actor/src/test/scala/akka/actor/actor/ForwardActorSpec.scala
+++ b/akka-actor/src/test/scala/akka/actor/actor/ForwardActorSpec.scala
@@ -1,18 +1,25 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB
+ */
+
package akka.actor
-import java.util.concurrent.{TimeUnit, CountDownLatch}
-import org.scalatest.junit.JUnitSuite
-import org.junit.Test
+import org.scalatest.WordSpec
+import org.scalatest.matchers.MustMatchers
+
+import akka.testing._
+import akka.util.duration._
import Actor._
+
object ForwardActorSpec {
object ForwardState {
var sender: Option[ActorRef] = None
}
class ReceiverActor extends Actor {
- val latch = new CountDownLatch(1)
+ val latch = TestLatch()
def receive = {
case "SendBang" => {
ForwardState.sender = self.sender
@@ -42,7 +49,7 @@ object ForwardActorSpec {
}
class BangBangSenderActor extends Actor {
- val latch = new CountDownLatch(1)
+ val latch = TestLatch()
val forwardActor = actorOf[ForwardActor]
forwardActor.start
(forwardActor !! "SendBangBang") match {
@@ -55,27 +62,27 @@ object ForwardActorSpec {
}
}
-class ForwardActorSpec extends JUnitSuite {
+class ForwardActorSpec extends WordSpec with MustMatchers {
import ForwardActorSpec._
- @Test
- def shouldForwardActorReferenceWhenInvokingForwardOnBang {
- val senderActor = actorOf[BangSenderActor]
- val latch = senderActor.actor.asInstanceOf[BangSenderActor]
+ "A Forward Actor" must {
+ "forward actor reference when invoking forward on bang" in {
+ val senderActor = actorOf[BangSenderActor]
+ val latch = senderActor.actor.asInstanceOf[BangSenderActor]
.forwardActor.actor.asInstanceOf[ForwardActor]
.receiverActor.actor.asInstanceOf[ReceiverActor]
.latch
- senderActor.start
- assert(latch.await(1L, TimeUnit.SECONDS))
- assert(ForwardState.sender ne null)
- assert(senderActor.toString === ForwardState.sender.get.toString)
- }
+ senderActor.start
+ latch.await
+ ForwardState.sender must not be (null)
+ senderActor.toString must be (ForwardState.sender.get.toString)
+ }
- @Test
- def shouldForwardActorReferenceWhenInvokingForwardOnBangBang {
- val senderActor = actorOf[BangBangSenderActor]
- senderActor.start
- val latch = senderActor.actor.asInstanceOf[BangBangSenderActor].latch
- assert(latch.await(1L, TimeUnit.SECONDS))
+ "forward actor reference when invoking forward on bang bang" in {
+ val senderActor = actorOf[BangBangSenderActor]
+ senderActor.start
+ val latch = senderActor.actor.asInstanceOf[BangBangSenderActor].latch
+ latch.await
+ }
}
}
diff --git a/akka-actor/src/test/scala/akka/actor/actor/HotSwapSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/HotSwapSpec.scala
index 011141c746..e985d383b6 100644
--- a/akka-actor/src/test/scala/akka/actor/actor/HotSwapSpec.scala
+++ b/akka-actor/src/test/scala/akka/actor/actor/HotSwapSpec.scala
@@ -1,17 +1,23 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB
+ */
+
package akka.actor
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
+
+import akka.testing._
+
import Actor._
-import java.util.concurrent.CyclicBarrier
class HotSwapSpec extends WordSpec with MustMatchers {
- "An Actor" should {
+ "An Actor" must {
"be able to hotswap its behavior with HotSwap(..)" in {
- val barrier = new CyclicBarrier(2)
+ val barrier = TestBarrier(2)
@volatile var _log = ""
val a = actorOf( new Actor {
def receive = { case _ => _log += "default" }
@@ -27,7 +33,7 @@ class HotSwapSpec extends WordSpec with MustMatchers {
}
"be able to hotswap its behavior with become(..)" in {
- val barrier = new CyclicBarrier(2)
+ val barrier = TestBarrier(2)
@volatile var _log = ""
val a = actorOf(new Actor {
def receive = {
@@ -55,7 +61,7 @@ class HotSwapSpec extends WordSpec with MustMatchers {
}
"be able to revert hotswap its behavior with RevertHotSwap(..)" in {
- val barrier = new CyclicBarrier(2)
+ val barrier = TestBarrier(2)
@volatile var _log = ""
val a = actorOf( new Actor {
def receive = {
@@ -100,7 +106,7 @@ class HotSwapSpec extends WordSpec with MustMatchers {
}
"be able to revert hotswap its behavior with unbecome" in {
- val barrier = new CyclicBarrier(2)
+ val barrier = TestBarrier(2)
@volatile var _log = ""
val a = actorOf(new Actor {
def receive = {
diff --git a/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala
index 9e5fba863e..c48fbd6f9d 100644
--- a/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala
+++ b/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala
@@ -1,108 +1,120 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB
+ */
+
package akka.actor
-import org.scalatest.junit.JUnitSuite
-import org.junit.Test
+import org.scalatest.WordSpec
+import org.scalatest.matchers.MustMatchers
+
+import akka.testing._
+import akka.util.duration._
-import java.util.concurrent.TimeUnit
-import org.multiverse.api.latches.StandardLatch
import Actor._
import java.util.concurrent.atomic.AtomicInteger
-class ReceiveTimeoutSpec extends JUnitSuite {
- @Test def receiveShouldGetTimeout= {
+class ReceiveTimeoutSpec extends WordSpec with MustMatchers {
+ import Actor._
- val timeoutLatch = new StandardLatch
+ "An actor with receive timeout" must {
- val timeoutActor = actorOf(new Actor {
- self.receiveTimeout = Some(500L)
+ "get timeout" in {
+ val timeoutLatch = TestLatch()
- protected def receive = {
- case ReceiveTimeout => timeoutLatch.open
- }
- }).start
+ val timeoutActor = actorOf(new Actor {
+ self.receiveTimeout = Some(500L)
- assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS))
- timeoutActor.stop
- }
+ protected def receive = {
+ case ReceiveTimeout => timeoutLatch.open
+ }
+ }).start
- @Test def swappedReceiveShouldAlsoGetTimout = {
- val timeoutLatch = new StandardLatch
+ timeoutLatch.await
+ timeoutActor.stop
+ }
- val timeoutActor = actorOf(new Actor {
- self.receiveTimeout = Some(500L)
+ "get timeout when swapped" in {
+ val timeoutLatch = TestLatch()
- protected def receive = {
- case ReceiveTimeout => timeoutLatch.open
- }
- }).start
+ val timeoutActor = actorOf(new Actor {
+ self.receiveTimeout = Some(500L)
- // after max 1 second the timeout should already been sent
- assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS))
+ protected def receive = {
+ case ReceiveTimeout => timeoutLatch.open
+ }
+ }).start
- val swappedLatch = new StandardLatch
- timeoutActor ! HotSwap(self => {
- case ReceiveTimeout => swappedLatch.open
- })
+ timeoutLatch.await
- assert(swappedLatch.tryAwait(3, TimeUnit.SECONDS))
- timeoutActor.stop
- }
+ val swappedLatch = TestLatch()
- @Test def timeoutShouldBeRescheduledAfterRegularReceive = {
+ timeoutActor ! HotSwap(self => {
+ case ReceiveTimeout => swappedLatch.open
+ })
- val timeoutLatch = new StandardLatch
- case object Tick
- val timeoutActor = actorOf(new Actor {
- self.receiveTimeout = Some(500L)
+ swappedLatch.await
+ timeoutActor.stop
+ }
+
+ "reschedule timeout after regular receive" in {
+ val timeoutLatch = TestLatch()
+ case object Tick
- protected def receive = {
- case Tick => ()
- case ReceiveTimeout => timeoutLatch.open
- }
- }).start
- timeoutActor ! Tick
+ val timeoutActor = actorOf(new Actor {
+ self.receiveTimeout = Some(500L)
- assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == true)
- timeoutActor.stop
- }
+ protected def receive = {
+ case Tick => ()
+ case ReceiveTimeout => timeoutLatch.open
+ }
+ }).start
- @Test def timeoutShouldBeTurnedOffIfDesired = {
- val count = new AtomicInteger(0)
- val timeoutLatch = new StandardLatch
- case object Tick
- val timeoutActor = actorOf(new Actor {
- self.receiveTimeout = Some(500L)
+ timeoutActor ! Tick
- protected def receive = {
- case Tick => ()
- case ReceiveTimeout =>
- count.incrementAndGet
- timeoutLatch.open
- self.receiveTimeout = None
- }
- }).start
- timeoutActor ! Tick
+ timeoutLatch.await
+ timeoutActor.stop
+ }
- assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == true)
- assert(count.get === 1)
- timeoutActor.stop
- }
+ "be able to turn off timeout if desired" in {
+ val count = new AtomicInteger(0)
+ val timeoutLatch = TestLatch()
+ case object Tick
- @Test def timeoutShouldNotBeSentWhenNotSpecified = {
- val timeoutLatch = new StandardLatch
- val timeoutActor = actorOf(new Actor {
+ val timeoutActor = actorOf(new Actor {
+ self.receiveTimeout = Some(500L)
- protected def receive = {
- case ReceiveTimeout => timeoutLatch.open
- }
- }).start
+ protected def receive = {
+ case Tick => ()
+ case ReceiveTimeout =>
+ count.incrementAndGet
+ timeoutLatch.open
+ self.receiveTimeout = None
+ }
+ }).start
- assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS) == false)
- timeoutActor.stop
- }
+ timeoutActor ! Tick
- @Test def ActorsReceiveTimeoutShouldBeReceiveTimeout {
- assert(akka.actor.Actors.receiveTimeout() eq ReceiveTimeout)
+ timeoutLatch.await
+ count.get must be (1)
+ timeoutActor.stop
+ }
+
+ "not receive timeout message when not specified" in {
+ val timeoutLatch = TestLatch()
+
+ val timeoutActor = actorOf(new Actor {
+ protected def receive = {
+ case ReceiveTimeout => timeoutLatch.open
+ }
+ }).start
+
+ timeoutLatch.awaitTimeout(1 second) // timeout expected
+ timeoutActor.stop
+ }
+
+ "have ReceiveTimeout eq to Actors ReceiveTimeout" in {
+ akka.actor.Actors.receiveTimeout() must be theSameInstanceAs (ReceiveTimeout)
+ }
}
}
diff --git a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala
index 6b154b42a9..e875ac87e0 100644
--- a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala
+++ b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala
@@ -6,6 +6,7 @@ package akka.actor.dispatch
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import org.scalatest.Assertions._
+import akka.testing._
import akka.dispatch._
import akka.actor.{ActorRef, Actor}
import akka.actor.Actor._
@@ -13,7 +14,6 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit}
import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor
import akka.util.{Duration, Switch}
-import akka.Testing
object ActorModelSpec {
@@ -225,13 +225,13 @@ abstract class ActorModelSpec extends JUnitSuite {
a.start
a ! CountDown(start)
- assertCountDown(start, Testing.time(3000), "Should process first message within 3 seconds")
+ assertCountDown(start, Testing.testTime(3000), "Should process first message within 3 seconds")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1)
a ! Wait(1000)
a ! CountDown(oneAtATime)
// in case of serialization violation, restart would happen instead of count down
- assertCountDown(oneAtATime, Testing.time(1500) ,"Processed message when allowed")
+ assertCountDown(oneAtATime, Testing.testTime(1500) ,"Processed message when allowed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3)
a.stop
@@ -246,7 +246,7 @@ abstract class ActorModelSpec extends JUnitSuite {
def start = spawn { for (i <- 1 to 20) { a ! WaitAck(1, counter) } }
for (i <- 1 to 10) { start }
- assertCountDown(counter, Testing.time(3000), "Should process 200 messages")
+ assertCountDown(counter, Testing.testTime(3000), "Should process 200 messages")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200)
a.stop
@@ -264,10 +264,10 @@ abstract class ActorModelSpec extends JUnitSuite {
val aStart,aStop,bParallel = new CountDownLatch(1)
a ! Meet(aStart,aStop)
- assertCountDown(aStart, Testing.time(3000), "Should process first message within 3 seconds")
+ assertCountDown(aStart, Testing.testTime(3000), "Should process first message within 3 seconds")
b ! CountDown(bParallel)
- assertCountDown(bParallel, Testing.time(3000), "Should process other actors in parallel")
+ assertCountDown(bParallel, Testing.testTime(3000), "Should process other actors in parallel")
aStop.countDown()
a.stop
@@ -282,7 +282,7 @@ abstract class ActorModelSpec extends JUnitSuite {
val done = new CountDownLatch(1)
a ! Restart
a ! CountDown(done)
- assertCountDown(done, Testing.time(3000), "Should be suspended+resumed and done with next message within 3 seconds")
+ assertCountDown(done, Testing.testTime(3000), "Should be suspended+resumed and done with next message within 3 seconds")
a.stop
assertRefDefaultZero(a)(registers = 1,unregisters = 1, msgsReceived = 2,
msgsProcessed = 2, suspensions = 1, resumes = 1)
@@ -298,7 +298,7 @@ abstract class ActorModelSpec extends JUnitSuite {
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1)
dispatcher.resume(a)
- assertCountDown(done, Testing.time(3000), "Should resume processing of messages when resumed")
+ assertCountDown(done, Testing.testTime(3000), "Should resume processing of messages when resumed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1,
suspensions = 1, resumes = 1)
@@ -315,7 +315,7 @@ abstract class ActorModelSpec extends JUnitSuite {
(1 to num) foreach {
_ => newTestActor.start ! cachedMessage
}
- assertCountDown(cachedMessage.latch, Testing.time(10000), "Should process " + num + " countdowns")
+ assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns")
}
for(run <- 1 to 3) {
flood(10000)
diff --git a/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala
index 09e618e24c..975ef1ae52 100644
--- a/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala
+++ b/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala
@@ -468,7 +468,7 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
val pool2 = actorOf(new TestPool2).start
pool2 ! "a"
pool2 ! "b"
- done = latch.await(1,TimeUnit.SECONDS)
+ done = latch.await(1, TimeUnit.SECONDS)
done must be (true)
delegates.size must be (2)
pool2 stop
diff --git a/akka-actor/src/test/scala/akka/testing/TestBarrier.scala b/akka-actor/src/test/scala/akka/testing/TestBarrier.scala
new file mode 100644
index 0000000000..650ef7de79
--- /dev/null
+++ b/akka-actor/src/test/scala/akka/testing/TestBarrier.scala
@@ -0,0 +1,40 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB
+ */
+
+package akka.testing
+
+import akka.util.Duration
+import java.util.concurrent.{CyclicBarrier, TimeUnit, TimeoutException}
+
+
+class TestBarrierTimeoutException(message: String) extends RuntimeException(message)
+
+/**
+ * A cyclic barrier wrapper for use in testing.
+ * It always uses a timeout when waiting and timeouts are specified as durations.
+ * Timeouts will always throw an exception. The default timeout is 5 seconds.
+ * Timeouts are multiplied by the testing time factor for Jenkins builds.
+ */
+object TestBarrier {
+ val DefaultTimeout = Duration(5, TimeUnit.SECONDS)
+
+ def apply(count: Int) = new TestBarrier(count)
+}
+
+class TestBarrier(count: Int) {
+ private val barrier = new CyclicBarrier(count)
+
+ def await(): Unit = await(TestBarrier.DefaultTimeout)
+
+ def await(timeout: Duration): Unit = {
+ try {
+ barrier.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS)
+ } catch {
+ case e: TimeoutException =>
+ throw new TestBarrierTimeoutException("Timeout of %s and time factor of %s" format (timeout.toString, Testing.timeFactor))
+ }
+ }
+
+ def reset = barrier.reset
+}
diff --git a/akka-actor/src/test/scala/akka/testing/TestLatch.scala b/akka-actor/src/test/scala/akka/testing/TestLatch.scala
new file mode 100644
index 0000000000..b975145963
--- /dev/null
+++ b/akka-actor/src/test/scala/akka/testing/TestLatch.scala
@@ -0,0 +1,55 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB
+ */
+
+package akka.testing
+
+import akka.util.Duration
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+
+class TestLatchTimeoutException(message: String) extends RuntimeException(message)
+class TestLatchNoTimeoutException(message: String) extends RuntimeException(message)
+
+/**
+ * A count down latch wrapper for use in testing.
+ * It always uses a timeout when waiting and timeouts are specified as durations.
+ * There's a default timeout of 5 seconds and the default count is 1.
+ * Timeouts will always throw an exception (no need to wrap in assert in tests).
+ * Timeouts are multiplied by the testing time factor for Jenkins builds.
+ */
+object TestLatch {
+ val DefaultTimeout = Duration(5, TimeUnit.SECONDS)
+
+ def apply(count: Int = 1) = new TestLatch(count)
+}
+
+class TestLatch(count: Int = 1) {
+ private var latch = new CountDownLatch(count)
+
+ def countDown = latch.countDown
+
+ def open = countDown
+
+ def await(): Boolean = await(TestLatch.DefaultTimeout)
+
+ def await(timeout: Duration): Boolean = {
+ val opened = latch.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS)
+ if (!opened) throw new TestLatchTimeoutException(
+ "Timeout of %s with time factor of %s" format (timeout.toString, Testing.timeFactor))
+ opened
+ }
+
+ /**
+ * Timeout is expected. Throws exception if latch is opened before timeout.
+ */
+ def awaitTimeout(timeout: Duration = TestLatch.DefaultTimeout) = {
+ val opened = latch.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS)
+ if (opened) throw new TestLatchNoTimeoutException(
+ "Latch opened before timeout of %s with time factor of %s" format (timeout.toString, Testing.timeFactor))
+ opened
+ }
+
+ def reset = latch = new CountDownLatch(count)
+}
+
diff --git a/akka-actor/src/test/scala/akka/Testing.scala b/akka-actor/src/test/scala/akka/testing/Testing.scala
similarity index 57%
rename from akka-actor/src/test/scala/akka/Testing.scala
rename to akka-actor/src/test/scala/akka/testing/Testing.scala
index afc0c4a05a..d957b26af7 100644
--- a/akka-actor/src/test/scala/akka/Testing.scala
+++ b/akka-actor/src/test/scala/akka/testing/Testing.scala
@@ -2,7 +2,9 @@
* Copyright (C) 2009-2011 Scalable Solutions AB
*/
-package akka
+package akka.testing
+
+import akka.util.Duration
/**
* Multiplying numbers used in test timeouts by a factor, set by system property.
@@ -18,8 +20,10 @@ object Testing {
}
}
- def time(t: Int): Int = (timeFactor * t).toInt
- def time(t: Long): Long = (timeFactor * t).toLong
- def time(t: Float): Float = (timeFactor * t).toFloat
- def time(t: Double): Double = timeFactor * t
+ def testTime(t: Int): Int = (timeFactor * t).toInt
+ def testTime(t: Long): Long = (timeFactor * t).toLong
+ def testTime(t: Float): Float = (timeFactor * t).toFloat
+ def testTime(t: Double): Double = timeFactor * t
+
+ def sleepFor(duration: Duration) = Thread.sleep(testTime(duration.toMillis))
}
diff --git a/akka-actor/src/test/scala/akka/ticket/Ticket001Spec.scala b/akka-actor/src/test/scala/akka/ticket/Ticket001Spec.scala
index d4de2675fb..e1a862e03c 100644
--- a/akka-actor/src/test/scala/akka/ticket/Ticket001Spec.scala
+++ b/akka-actor/src/test/scala/akka/ticket/Ticket001Spec.scala
@@ -5,7 +5,7 @@ import org.scalatest.matchers.MustMatchers
class Ticket001Spec extends WordSpec with MustMatchers {
- "An XXX" should {
+ "An XXX" must {
"do YYY" in {
1 must be (1)
}
diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
index dd7a22df52..d9ded4b0e2 100644
--- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
+++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
@@ -1049,7 +1049,7 @@ class RemoteServerHandler(
throw firstException
targetMethod
- }
+ }
try {
val messageReceiver = resolveMethod(typedActor.getClass, ownerTypeHint, typedActorInfo.getMethod, argClasses)
@@ -1230,7 +1230,14 @@ class RemoteServerHandler(
server.findTypedActorByIdOrUuid(actorInfo.getId, parseUuid(uuid).toString) match {
case null => // the actor has not been registered globally. See if we have it in the session
createTypedSessionActor(actorInfo, channel) match {
- case null => createClientManagedTypedActor(actorInfo) //Maybe client managed actor?
+ case null =>
+ // FIXME this is broken, if a user tries to get a server-managed typed actor and that is not registered then a client-managed typed actor is created, but just throwing an exception here causes client-managed typed actors to fail
+
+/* val e = new RemoteServerException("Can't load remote Typed Actor for [" + actorInfo.getId + "]")
+ EventHandler.error(e, this, e.getMessage)
+ server.notifyListeners(RemoteServerError(e, server))
+ throw e
+*/ createClientManagedTypedActor(actorInfo) // client-managed actor
case sessionActor => sessionActor
}
case typedActor => typedActor
diff --git a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala
index c91565eec7..988236b85b 100644
--- a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala
+++ b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala
@@ -10,7 +10,7 @@ import akka.actor._
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
import akka.config. {RemoteAddress, Config, TypedActorConfigurator}
-import akka.Testing
+import akka.testing._
object RemoteTypedActorLog {
val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
@@ -39,13 +39,13 @@ class RemoteTypedActorSpec extends AkkaRemoteTest {
classOf[RemoteTypedActorOne],
classOf[RemoteTypedActorOneImpl],
Permanent,
- Testing.time(10000),
+ Testing.testTime(20000),
RemoteAddress(host,port)),
new SuperviseTypedActor(
classOf[RemoteTypedActorTwo],
classOf[RemoteTypedActorTwoImpl],
Permanent,
- Testing.time(10000),
+ Testing.testTime(20000),
RemoteAddress(host,port))
).toArray).supervise
}
diff --git a/akka-tutorials/akka-tutorial-pi-sbt/project/build.properties b/akka-tutorials/akka-tutorial-pi-sbt/project/build.properties
new file mode 100644
index 0000000000..14f82f14d0
--- /dev/null
+++ b/akka-tutorials/akka-tutorial-pi-sbt/project/build.properties
@@ -0,0 +1,5 @@
+project.organization=se.scalablesolutions.akka
+project.name=Akka Tutorial 1 SBT
+project.version=1.0
+build.scala.versions=2.9.0.RC1
+sbt.version=0.7.5.RC1
diff --git a/akka-tutorials/akka-tutorial-pi-sbt/project/plugins/Plugins.scala b/akka-tutorials/akka-tutorial-pi-sbt/project/plugins/Plugins.scala
new file mode 100644
index 0000000000..74a3d6b705
--- /dev/null
+++ b/akka-tutorials/akka-tutorial-pi-sbt/project/plugins/Plugins.scala
@@ -0,0 +1,6 @@
+import sbt._
+
+class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
+ val akkaRepo = "Akka Repo" at "http://akka.io/repository"
+ val akkaPlugin = "se.scalablesolutions.akka" % "akka-sbt-plugin" % "1.1-SNAPSHOT"
+}
diff --git a/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala
new file mode 100644
index 0000000000..1b84ae5008
--- /dev/null
+++ b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala
@@ -0,0 +1,127 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB
+ */
+
+package akka.tutorial.sbt.pi
+
+import akka.actor.{Actor, ActorRef, PoisonPill}
+import Actor._
+import akka.routing.{Routing, CyclicIterator}
+import Routing._
+import akka.event.EventHandler
+import akka.dispatch.Dispatchers
+
+import System.{currentTimeMillis => now}
+import java.util.concurrent.CountDownLatch
+
+/**
+ * Sample for Akka, SBT an Scala tutorial.
+ *
+ * Calculates Pi.
+ *
+ * Run it in SBT:
+ *
+ * $ sbt
+ * > update
+ * > console
+ * > akka.tutorial.sbt.pi.Pi.calculate
+ * > ...
+ * > :quit
+ *
+ *
+ * @author Jonas Bonér
+ */
+object Pi {
+ val nrOfWorkers = 4
+ val nrOfMessages = 10000
+ val nrOfElements = 10000
+
+ // ====================
+ // ===== Messages =====
+ // ====================
+ sealed trait PiMessage
+ case class Calculate(nrOfMessages: Int, nrOfElements: Int) extends PiMessage
+ case class Work(arg: Int, fun: (Int) => Double) extends PiMessage
+ case class Result(value: Double) extends PiMessage
+
+ // ==================
+ // ===== Worker =====
+ // ==================
+ class Worker extends Actor {
+ def receive = {
+ case Work(arg, fun) => self reply Result(fun(arg))
+ }
+ }
+
+ // ==================
+ // ===== Master =====
+ // ==================
+ class Master(latch: CountDownLatch) extends Actor {
+ var pi: Double = _
+ var nrOfResults: Int = _
+ var start: Long = _
+
+ // create the workers
+ val workers = {
+ val ws = new Array[ActorRef](nrOfWorkers)
+ for (i <- 0 until nrOfWorkers) ws(i) = actorOf[Worker].start
+ ws
+ }
+
+ // wrap them with a load-balancing router
+ val router = Routing.loadBalancerActor(CyclicIterator(workers)).start
+
+ // define the work
+ val algorithm = (i: Int) => {
+ val range = (i * nrOfElements) to ((i + 1) * nrOfElements - 1)
+ val results = for (j <- range) yield (4 * math.pow(-1, j) / (2 * j + 1))
+ results.sum
+ }
+
+ // message handler
+ def receive = {
+ case Calculate(nrOfMessages, nrOfElements) =>
+ // schedule work
+ for (arg <- 0 until nrOfMessages) router ! Work(arg, algorithm)
+
+ // send a PoisonPill to all workers telling them to shut down themselves
+ router ! Broadcast(PoisonPill)
+
+ case Result(value) =>
+ // handle result from the worker
+ pi += value
+ nrOfResults += 1
+ if (nrOfResults == nrOfMessages) self.stop
+ }
+
+ override def preStart = start = now
+
+ override def postStop = {
+ // tell the world that the calculation is complete
+ EventHandler.info(this, "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start)))
+ latch.countDown
+ }
+ }
+
+ // ==================
+ // ===== Run it =====
+ // ==================
+ def calculate = {
+ // this latch is only plumbing to know when the calculation is completed
+ val latch = new CountDownLatch(1)
+
+ // create the master
+ val master = actorOf(new Master(latch)).start
+
+ // start the calculation
+ master ! Calculate(nrOfMessages, nrOfElements)
+
+ // wait for master to shut down
+ latch.await
+ }
+}
+
+// To be able to run it as a main application
+object Main extends App {
+ Pi.calculate
+}
diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala
index 0e27557607..a9c7c694bf 100644
--- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala
+++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala
@@ -12,8 +12,6 @@ import akka.config.Supervision._
import java.util.concurrent.CountDownLatch
import akka.config.TypedActorConfigurator
-import akka.Testing
-
/**
* @author Martin Krasser
*/
@@ -97,7 +95,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
}
it("should be stopped when supervision cannot handle the problem in") {
- val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), Testing.time(30000))
+ val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), 30000)
val conf = new TypedActorConfigurator().configure(OneForOneStrategy(Nil, 3, 500000), Array(actorSupervision)).inject.supervise
try {
val first = conf.getInstance(classOf[TypedActorFailer])
@@ -123,7 +121,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
}
it("should be restarted when supervision handles the problem in") {
- val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), Testing.time(30000))
+ val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), 30000)
val conf = new TypedActorConfigurator().configure(OneForOneStrategy(classOf[Throwable] :: Nil, 3, 500000), Array(actorSupervision)).inject.supervise
try {
val first = conf.getInstance(classOf[TypedActorFailer])
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 1c3676ad31..df2c2c3e0d 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -13,7 +13,7 @@ akka {
time-unit = "seconds" # Time unit for all timeout properties throughout the config
event-handlers = ["akka.event.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT)
- event-handler-level = "DEBUG" # Options: ERROR, WARNING, INFO, DEBUG
+ event-handler-level = "INFO" # Options: ERROR, WARNING, INFO, DEBUG
# These boot classes are loaded (and created) automatically when the Akka Microkernel boots up
# Can be used to bootstrap your application(s)