Merge branch 'master' into ticket-622
This commit is contained in:
commit
5faabc362c
27 changed files with 625 additions and 318 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -45,3 +45,4 @@ run-codefellow
|
|||
multiverse.log
|
||||
.eprj
|
||||
.*.swp
|
||||
akka-tutorials/akka-tutorial-pi-sbt/project/boot/
|
||||
|
|
@ -6,15 +6,10 @@ package akka.actor
|
|||
|
||||
import akka.dispatch._
|
||||
import akka.config.Config._
|
||||
import akka.config.Supervision._
|
||||
import akka.config.ConfigurationException
|
||||
import akka.util.Helpers.{narrow, narrowSilently}
|
||||
import akka.util.ListenerManagement
|
||||
import akka.AkkaException
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import scala.reflect.BeanProperty
|
||||
import akka.util. {ReflectiveAccess, Duration}
|
||||
import akka.remoteinterface.RemoteSupport
|
||||
|
|
@ -61,6 +56,8 @@ case class UnlinkAndStop(child: ActorRef) extends AutoReceivedMessage with LifeC
|
|||
|
||||
case object PoisonPill extends AutoReceivedMessage with LifeCycleMessage
|
||||
|
||||
case object Kill extends AutoReceivedMessage with LifeCycleMessage
|
||||
|
||||
case object ReceiveTimeout extends LifeCycleMessage
|
||||
|
||||
case class MaximumNumberOfRestartsWithinTimeRangeReached(
|
||||
|
|
@ -277,9 +274,6 @@ object Actor extends ListenerManagement {
|
|||
* }
|
||||
* </pre>
|
||||
*
|
||||
* <p/>
|
||||
* The Actor trait also has a 'log' member field that can be used for logging within the Actor.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait Actor {
|
||||
|
|
@ -353,7 +347,7 @@ trait Actor {
|
|||
* <p/>
|
||||
* Example code:
|
||||
* <pre>
|
||||
* def receive = {
|
||||
* def receive = {
|
||||
* case Ping =>
|
||||
* println("got a 'Ping' message")
|
||||
* self.reply("pong")
|
||||
|
|
@ -465,6 +459,7 @@ trait Actor {
|
|||
case Unlink(child) => self.unlink(child)
|
||||
case UnlinkAndStop(child) => self.unlink(child); child.stop
|
||||
case Restart(reason) => throw reason
|
||||
case Kill => throw new ActorKilledException("Kill")
|
||||
case PoisonPill =>
|
||||
val f = self.senderFuture
|
||||
self.stop
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ import java.util.concurrent.ConcurrentHashMap
|
|||
import java.io.{PrintWriter, PrintStream}
|
||||
|
||||
trait RemoteModule {
|
||||
val UUID_PREFIX = "uuid:"
|
||||
val UUID_PREFIX = "uuid:".intern
|
||||
|
||||
def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope
|
||||
protected[akka] def notifyListeners(message: => Any): Unit
|
||||
|
|
@ -84,7 +84,6 @@ case class RemoteClientWriteFailed(
|
|||
@BeanProperty client: RemoteClientModule,
|
||||
@BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
|
||||
|
||||
|
||||
/**
|
||||
* Life-cycle events for RemoteServer.
|
||||
*/
|
||||
|
|
@ -120,7 +119,12 @@ class RemoteClientException private[akka] (
|
|||
val remoteAddress: InetSocketAddress) extends AkkaException(message)
|
||||
|
||||
/**
|
||||
* Returned when a remote exception sent over the wire cannot be loaded and instantiated
|
||||
* Thrown when the remote server actor dispatching fails for some reason.
|
||||
*/
|
||||
class RemoteServerException private[akka] (message: String) extends AkkaException(message)
|
||||
|
||||
/**
|
||||
* Thrown when a remote exception sent over the wire cannot be loaded and instantiated
|
||||
*/
|
||||
case class CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException private[akka] (cause: Throwable, originalClassName: String, originalMessage: String)
|
||||
extends AkkaException("\nParsingError[%s]\nOriginalException[%s]\nOriginalMessage[%s]"
|
||||
|
|
@ -140,6 +144,7 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule
|
|||
}
|
||||
|
||||
def shutdown {
|
||||
eventHandler.stop
|
||||
removeListener(eventHandler)
|
||||
this.shutdownClientModule
|
||||
this.shutdownServerModule
|
||||
|
|
@ -484,4 +489,4 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule =>
|
|||
|
||||
@deprecated("Will be removed after 1.1")
|
||||
private[akka] def unregisterClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,15 +10,17 @@ import scala.collection.JavaConversions._
|
|||
/**
|
||||
* An Iterator that is either always empty or yields an infinite number of Ts.
|
||||
*/
|
||||
trait InfiniteIterator[T] extends Iterator[T]
|
||||
trait InfiniteIterator[T] extends Iterator[T] {
|
||||
val items: Seq[T]
|
||||
}
|
||||
|
||||
/**
|
||||
* CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List.
|
||||
*/
|
||||
class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
|
||||
def this(items: java.util.List[T]) = this(items.toList)
|
||||
case class CyclicIterator[T](val items: Seq[T]) extends InfiniteIterator[T] {
|
||||
def this(items: java.util.List[T]) = this(items.toSeq)
|
||||
|
||||
@volatile private[this] var current: List[T] = items
|
||||
@volatile private[this] var current: Seq[T] = items
|
||||
|
||||
def hasNext = items != Nil
|
||||
|
||||
|
|
@ -29,15 +31,14 @@ class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
|
|||
}
|
||||
|
||||
override def exists(f: T => Boolean): Boolean = items.exists(f)
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This InfiniteIterator always returns the Actor that has the currently smallest mailbox
|
||||
* useful for work-stealing.
|
||||
*/
|
||||
class SmallestMailboxFirstIterator(items : List[ActorRef]) extends InfiniteIterator[ActorRef] {
|
||||
def this(items: java.util.List[ActorRef]) = this(items.toList)
|
||||
case class SmallestMailboxFirstIterator(val items : Seq[ActorRef]) extends InfiniteIterator[ActorRef] {
|
||||
def this(items: java.util.List[ActorRef]) = this(items.toSeq)
|
||||
def hasNext = items != Nil
|
||||
|
||||
def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2)
|
||||
|
|
|
|||
|
|
@ -15,7 +15,11 @@ trait Dispatcher { this: Actor =>
|
|||
|
||||
protected def routes: PartialFunction[Any, ActorRef]
|
||||
|
||||
protected def broadcast(message: Any) {}
|
||||
|
||||
protected def dispatch: Receive = {
|
||||
case Routing.Broadcast(message) =>
|
||||
broadcast(message)
|
||||
case a if routes.isDefinedAt(a) =>
|
||||
if (isSenderDefined) routes(a).forward(transform(a))(someSelf)
|
||||
else routes(a).!(transform(a))(None)
|
||||
|
|
@ -34,15 +38,19 @@ abstract class UntypedDispatcher extends UntypedActor {
|
|||
|
||||
protected def route(msg: Any): ActorRef
|
||||
|
||||
protected def broadcast(message: Any) {}
|
||||
|
||||
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
|
||||
|
||||
@throws(classOf[Exception])
|
||||
def onReceive(msg: Any): Unit = {
|
||||
val r = route(msg)
|
||||
if(r eq null)
|
||||
throw new IllegalStateException("No route for " + msg + " defined!")
|
||||
if (isSenderDefined) r.forward(transform(msg))(someSelf)
|
||||
else r.!(transform(msg))(None)
|
||||
if (msg.isInstanceOf[Routing.Broadcast]) broadcast(msg.asInstanceOf[Routing.Broadcast].message)
|
||||
else {
|
||||
val r = route(msg)
|
||||
if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!")
|
||||
if (isSenderDefined) r.forward(transform(msg))(someSelf)
|
||||
else r.!(transform(msg))(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -53,7 +61,11 @@ abstract class UntypedDispatcher extends UntypedActor {
|
|||
trait LoadBalancer extends Dispatcher { self: Actor =>
|
||||
protected def seq: InfiniteIterator[ActorRef]
|
||||
|
||||
protected def routes = { case x if seq.hasNext => seq.next }
|
||||
protected def routes = {
|
||||
case x if seq.hasNext => seq.next
|
||||
}
|
||||
|
||||
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
|
||||
|
||||
override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) )
|
||||
}
|
||||
|
|
@ -69,5 +81,7 @@ abstract class UntypedLoadBalancer extends UntypedDispatcher {
|
|||
if (seq.hasNext) seq.next
|
||||
else null
|
||||
|
||||
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
|
||||
|
||||
override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) )
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,6 +9,9 @@ import akka.actor.Actor._
|
|||
|
||||
object Routing {
|
||||
|
||||
sealed trait RoutingMessage
|
||||
case class Broadcast(message: Any) extends RoutingMessage
|
||||
|
||||
type PF[A, B] = PartialFunction[A, B]
|
||||
|
||||
/**
|
||||
|
|
@ -31,7 +34,7 @@ object Routing {
|
|||
/**
|
||||
* Creates a LoadBalancer from the thunk-supplied InfiniteIterator.
|
||||
*/
|
||||
def loadBalancerActor(actors: => InfiniteIterator[ActorRef]): ActorRef =
|
||||
def loadBalancerActor(actors: => InfiniteIterator[ActorRef]): ActorRef =
|
||||
actorOf(new Actor with LoadBalancer {
|
||||
val seq = actors
|
||||
}).start
|
||||
|
|
@ -39,7 +42,7 @@ object Routing {
|
|||
/**
|
||||
* Creates a Dispatcher given a routing and a message-transforming function.
|
||||
*/
|
||||
def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) => Any): ActorRef =
|
||||
def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) => Any): ActorRef =
|
||||
actorOf(new Actor with Dispatcher {
|
||||
override def transform(msg: Any) = msgTransformer(msg)
|
||||
def routes = routing
|
||||
|
|
@ -48,7 +51,7 @@ object Routing {
|
|||
/**
|
||||
* Creates a Dispatcher given a routing.
|
||||
*/
|
||||
def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = actorOf(new Actor with Dispatcher {
|
||||
def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = actorOf(new Actor with Dispatcher {
|
||||
def routes = routing
|
||||
}).start
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ package akka.util
|
|||
|
||||
import akka.dispatch.{Future, CompletableFuture, MessageInvocation}
|
||||
import akka.config.{Config, ModuleNotAvailableException}
|
||||
import akka.AkkaException
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import akka.remoteinterface.RemoteSupport
|
||||
|
|
@ -45,13 +44,13 @@ object ReflectiveAccess {
|
|||
def ensureEnabled = if (!isEnabled) {
|
||||
val e = new ModuleNotAvailableException(
|
||||
"Can't load the remoting module, make sure that akka-remote.jar is on the classpath")
|
||||
EventHandler.warning(this, e.toString)
|
||||
EventHandler.debug(this, e.toString)
|
||||
throw e
|
||||
}
|
||||
val remoteSupportClass: Option[Class[_ <: RemoteSupport]] = getClassFor(TRANSPORT)
|
||||
|
||||
protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] =
|
||||
remoteSupportClass map { remoteClass =>
|
||||
protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] =
|
||||
remoteSupportClass map { remoteClass =>
|
||||
() => createInstance[RemoteSupport](
|
||||
remoteClass,
|
||||
Array[Class[_]](),
|
||||
|
|
@ -59,7 +58,7 @@ object ReflectiveAccess {
|
|||
) getOrElse {
|
||||
val e = new ModuleNotAvailableException(
|
||||
"Can't instantiate [%s] - make sure that akka-remote.jar is on the classpath".format(remoteClass.getName))
|
||||
EventHandler.warning(this, e.toString)
|
||||
EventHandler.debug(this, e.toString)
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
|
@ -135,7 +134,7 @@ object ReflectiveAccess {
|
|||
Some(ctor.newInstance(args: _*).asInstanceOf[T])
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
EventHandler.warning(this, e.toString)
|
||||
EventHandler.debug(this, e.toString)
|
||||
None
|
||||
}
|
||||
|
||||
|
|
@ -154,7 +153,7 @@ object ReflectiveAccess {
|
|||
}
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
EventHandler.warning(this, e.toString)
|
||||
EventHandler.debug(this, e.toString)
|
||||
None
|
||||
}
|
||||
|
||||
|
|
@ -168,7 +167,7 @@ object ReflectiveAccess {
|
|||
}
|
||||
} catch {
|
||||
case e: ExceptionInInitializerError =>
|
||||
EventHandler.warning(this, e.toString)
|
||||
EventHandler.debug(this, e.toString)
|
||||
throw e
|
||||
}
|
||||
|
||||
|
|
@ -176,23 +175,23 @@ object ReflectiveAccess {
|
|||
assert(fqn ne null)
|
||||
|
||||
// First, use the specified CL
|
||||
val first = try {
|
||||
Option(classloader.loadClass(fqn).asInstanceOf[Class[T]])
|
||||
} catch {
|
||||
case c: ClassNotFoundException =>
|
||||
EventHandler.warning(this, c.toString)
|
||||
None
|
||||
}
|
||||
val first = try {
|
||||
Option(classloader.loadClass(fqn).asInstanceOf[Class[T]])
|
||||
} catch {
|
||||
case c: ClassNotFoundException =>
|
||||
EventHandler.debug(this, c.toString)
|
||||
None
|
||||
}
|
||||
|
||||
if (first.isDefined) first
|
||||
else {
|
||||
else {
|
||||
// Second option is to use the ContextClassLoader
|
||||
val second = try {
|
||||
Option(Thread.currentThread.getContextClassLoader.loadClass(fqn).asInstanceOf[Class[T]])
|
||||
} catch {
|
||||
case c: ClassNotFoundException =>
|
||||
EventHandler.warning(this, c.toString)
|
||||
None
|
||||
val second = try {
|
||||
Option(Thread.currentThread.getContextClassLoader.loadClass(fqn).asInstanceOf[Class[T]])
|
||||
} catch {
|
||||
case c: ClassNotFoundException =>
|
||||
EventHandler.debug(this, c.toString)
|
||||
None
|
||||
}
|
||||
|
||||
if (second.isDefined) second
|
||||
|
|
@ -201,22 +200,22 @@ object ReflectiveAccess {
|
|||
// Don't try to use "loader" if we got the default "classloader" parameter
|
||||
if (classloader ne loader) Option(loader.loadClass(fqn).asInstanceOf[Class[T]])
|
||||
else None
|
||||
} catch {
|
||||
case c: ClassNotFoundException =>
|
||||
EventHandler.warning(this, c.toString)
|
||||
None
|
||||
} catch {
|
||||
case c: ClassNotFoundException =>
|
||||
EventHandler.debug(this, c.toString)
|
||||
None
|
||||
}
|
||||
|
||||
if (third.isDefined) third
|
||||
else {
|
||||
// Last option is Class.forName
|
||||
try {
|
||||
try {
|
||||
Option(Class.forName(fqn).asInstanceOf[Class[T]])
|
||||
} catch {
|
||||
case c: ClassNotFoundException =>
|
||||
EventHandler.warning(this, c.toString)
|
||||
None
|
||||
}
|
||||
} catch {
|
||||
case c: ClassNotFoundException =>
|
||||
EventHandler.debug(this, c.toString)
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,18 +1,25 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
||||
import java.util.concurrent.{TimeUnit, CyclicBarrier, TimeoutException}
|
||||
import akka.config.Supervision._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import org.scalatest.BeforeAndAfterEach
|
||||
|
||||
import akka.testing._
|
||||
import akka.testing.Testing.sleepFor
|
||||
import akka.util.duration._
|
||||
|
||||
import akka.dispatch.Dispatchers
|
||||
import Actor._
|
||||
import akka.config.Supervision._
|
||||
import akka.dispatch.Dispatchers
|
||||
|
||||
import akka.Testing
|
||||
|
||||
object ActorFireForgetRequestReplySpec {
|
||||
class ReplyActor extends Actor {
|
||||
|
||||
class ReplyActor extends Actor {
|
||||
def receive = {
|
||||
case "Send" =>
|
||||
self.reply("Reply")
|
||||
|
|
@ -32,7 +39,6 @@ object ActorFireForgetRequestReplySpec {
|
|||
}
|
||||
|
||||
class SenderActor(replyActor: ActorRef) extends Actor {
|
||||
|
||||
def receive = {
|
||||
case "Init" =>
|
||||
replyActor ! "Send"
|
||||
|
|
@ -50,44 +56,42 @@ object ActorFireForgetRequestReplySpec {
|
|||
|
||||
object state {
|
||||
var s = "NIL"
|
||||
val finished = new CyclicBarrier(2)
|
||||
val finished = TestBarrier(2)
|
||||
}
|
||||
}
|
||||
|
||||
class ActorFireForgetRequestReplySpec extends JUnitSuite {
|
||||
class ActorFireForgetRequestReplySpec extends WordSpec with MustMatchers with BeforeAndAfterEach {
|
||||
import ActorFireForgetRequestReplySpec._
|
||||
|
||||
@Test
|
||||
def shouldReplyToBangMessageUsingReply = {
|
||||
override def beforeEach() = {
|
||||
state.finished.reset
|
||||
val replyActor = actorOf[ReplyActor].start
|
||||
val senderActor = actorOf(new SenderActor(replyActor)).start
|
||||
senderActor ! "Init"
|
||||
try { state.finished.await(1L, TimeUnit.SECONDS) }
|
||||
catch { case e: TimeoutException => fail("Never got the message") }
|
||||
assert("Reply" === state.s)
|
||||
}
|
||||
}
|
||||
|
||||
"An Actor" must {
|
||||
|
||||
@Test
|
||||
def shouldReplyToBangMessageUsingImplicitSender = {
|
||||
state.finished.reset
|
||||
val replyActor = actorOf[ReplyActor].start
|
||||
val senderActor = actorOf(new SenderActor(replyActor)).start
|
||||
senderActor ! "InitImplicit"
|
||||
try { state.finished.await(1L, TimeUnit.SECONDS) }
|
||||
catch { case e: TimeoutException => fail("Never got the message") }
|
||||
assert("ReplyImplicit" === state.s)
|
||||
}
|
||||
"reply to bang message using reply" in {
|
||||
val replyActor = actorOf[ReplyActor].start
|
||||
val senderActor = actorOf(new SenderActor(replyActor)).start
|
||||
senderActor ! "Init"
|
||||
state.finished.await
|
||||
state.s must be ("Reply")
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldShutdownCrashedTemporaryActor = {
|
||||
state.finished.reset
|
||||
val actor = actorOf[CrashingTemporaryActor].start
|
||||
assert(actor.isRunning)
|
||||
actor ! "Die"
|
||||
try { state.finished.await(10L, TimeUnit.SECONDS) }
|
||||
catch { case e: TimeoutException => fail("Never got the message") }
|
||||
Thread.sleep(Testing.time(500))
|
||||
assert(actor.isShutdown)
|
||||
"reply to bang message using implicit sender" in {
|
||||
val replyActor = actorOf[ReplyActor].start
|
||||
val senderActor = actorOf(new SenderActor(replyActor)).start
|
||||
senderActor ! "InitImplicit"
|
||||
state.finished.await
|
||||
state.s must be ("ReplyImplicit")
|
||||
}
|
||||
|
||||
"should shutdown crashed temporary actor" in {
|
||||
val actor = actorOf[CrashingTemporaryActor].start
|
||||
actor.isRunning must be (true)
|
||||
actor ! "Die"
|
||||
state.finished.await
|
||||
sleepFor(1 second)
|
||||
actor.isShutdown must be (true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,19 +4,20 @@
|
|||
|
||||
package akka.actor
|
||||
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
import akka.testing._
|
||||
import akka.util.duration._
|
||||
import akka.testing.Testing.sleepFor
|
||||
import akka.config.Supervision.{OneForOneStrategy}
|
||||
import akka.actor._
|
||||
import akka.dispatch.Future
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import java.util.concurrent.{TimeUnit, CountDownLatch}
|
||||
|
||||
object ActorRefSpec {
|
||||
|
||||
var latch = new CountDownLatch(4)
|
||||
val latch = TestLatch(4)
|
||||
|
||||
class ReplyActor extends Actor {
|
||||
var replyTo: Channel[Any] = null
|
||||
|
|
@ -49,7 +50,7 @@ object ActorRefSpec {
|
|||
}
|
||||
|
||||
private def work {
|
||||
Thread.sleep(1000)
|
||||
sleepFor(1 second)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -69,16 +70,12 @@ object ActorRefSpec {
|
|||
}
|
||||
}
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class ActorRefSpec extends
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterAll {
|
||||
|
||||
class ActorRefSpec extends WordSpec with MustMatchers {
|
||||
import ActorRefSpec._
|
||||
|
||||
describe("ActorRef") {
|
||||
it("should support to reply via channel") {
|
||||
"An ActorRef" must {
|
||||
|
||||
"support reply via channel" in {
|
||||
val serverRef = Actor.actorOf[ReplyActor].start
|
||||
val clientRef = Actor.actorOf(new SenderActor(serverRef)).start
|
||||
|
||||
|
|
@ -86,18 +83,23 @@ class ActorRefSpec extends
|
|||
clientRef ! "simple"
|
||||
clientRef ! "simple"
|
||||
clientRef ! "simple"
|
||||
assert(latch.await(4L, TimeUnit.SECONDS))
|
||||
latch = new CountDownLatch(4)
|
||||
|
||||
latch.await
|
||||
|
||||
latch.reset
|
||||
|
||||
clientRef ! "complex2"
|
||||
clientRef ! "simple"
|
||||
clientRef ! "simple"
|
||||
clientRef ! "simple"
|
||||
assert(latch.await(4L, TimeUnit.SECONDS))
|
||||
|
||||
latch.await
|
||||
|
||||
clientRef.stop
|
||||
serverRef.stop
|
||||
}
|
||||
|
||||
it("should stop when sent a poison pill") {
|
||||
"stop when sent a poison pill" in {
|
||||
val ref = Actor.actorOf(
|
||||
new Actor {
|
||||
def receive = {
|
||||
|
|
@ -115,11 +117,34 @@ class ActorRefSpec extends
|
|||
fail("shouldn't get here")
|
||||
}
|
||||
|
||||
assert(ffive.resultOrException.get == "five")
|
||||
assert(fnull.resultOrException.get == "null")
|
||||
ffive.resultOrException.get must be ("five")
|
||||
fnull.resultOrException.get must be ("null")
|
||||
|
||||
assert(ref.isRunning == false)
|
||||
assert(ref.isShutdown == true)
|
||||
ref.isRunning must be (false)
|
||||
ref.isShutdown must be (true)
|
||||
}
|
||||
|
||||
"restart when Kill:ed" in {
|
||||
val latch = new CountDownLatch(2)
|
||||
|
||||
val boss = Actor.actorOf(new Actor{
|
||||
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), scala.Some(2), scala.Some(1000))
|
||||
|
||||
val ref = Actor.actorOf(
|
||||
new Actor {
|
||||
def receive = { case _ => }
|
||||
override def preRestart(reason: Throwable) = latch.countDown
|
||||
override def postRestart(reason: Throwable) = latch.countDown
|
||||
}
|
||||
).start
|
||||
|
||||
self link ref
|
||||
|
||||
protected def receive = { case "sendKill" => ref ! Kill }
|
||||
}).start
|
||||
|
||||
boss ! "sendKill"
|
||||
latch.await(5, TimeUnit.SECONDS) must be === true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,34 +4,31 @@
|
|||
|
||||
package akka.actor
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
import akka.testing._
|
||||
|
||||
import FSM._
|
||||
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import akka.util.Duration
|
||||
import akka.util.duration._
|
||||
|
||||
import akka.Testing
|
||||
|
||||
object FSMActorSpec {
|
||||
|
||||
|
||||
val unlockedLatch = new StandardLatch
|
||||
val lockedLatch = new StandardLatch
|
||||
val unhandledLatch = new StandardLatch
|
||||
val terminatedLatch = new StandardLatch
|
||||
val transitionLatch = new StandardLatch
|
||||
val initialStateLatch = new StandardLatch
|
||||
val transitionCallBackLatch = new StandardLatch
|
||||
val unlockedLatch = TestLatch()
|
||||
val lockedLatch = TestLatch()
|
||||
val unhandledLatch = TestLatch()
|
||||
val terminatedLatch = TestLatch()
|
||||
val transitionLatch = TestLatch()
|
||||
val initialStateLatch = TestLatch()
|
||||
val transitionCallBackLatch = TestLatch()
|
||||
|
||||
sealed trait LockState
|
||||
case object Locked extends LockState
|
||||
case object Open extends LockState
|
||||
|
||||
class Lock(code: String, timeout: (Long, TimeUnit)) extends Actor with FSM[LockState, CodeState] {
|
||||
class Lock(code: String, timeout: Duration) extends Actor with FSM[LockState, CodeState] {
|
||||
|
||||
startWith(Locked, CodeState("", code))
|
||||
|
||||
|
|
@ -94,53 +91,53 @@ object FSMActorSpec {
|
|||
case class CodeState(soFar: String, code: String)
|
||||
}
|
||||
|
||||
class FSMActorSpec extends JUnitSuite {
|
||||
class FSMActorSpec extends WordSpec with MustMatchers {
|
||||
import FSMActorSpec._
|
||||
|
||||
"An FSM Actor" must {
|
||||
|
||||
@Test
|
||||
def unlockTheLock = {
|
||||
"unlock the lock" in {
|
||||
|
||||
// lock that locked after being open for 1 sec
|
||||
val lock = Actor.actorOf(new Lock("33221", 1 second)).start
|
||||
|
||||
// lock that locked after being open for 1 sec
|
||||
val lock = Actor.actorOf(new Lock("33221", (Testing.time(1), TimeUnit.SECONDS))).start
|
||||
val transitionTester = Actor.actorOf(new Actor { def receive = {
|
||||
case Transition(_, _, _) => transitionCallBackLatch.open
|
||||
case CurrentState(_, Locked) => initialStateLatch.open
|
||||
}}).start
|
||||
|
||||
val transitionTester = Actor.actorOf(new Actor { def receive = {
|
||||
case Transition(_, _, _) => transitionCallBackLatch.open
|
||||
case CurrentState(_, Locked) => initialStateLatch.open
|
||||
}}).start
|
||||
lock ! SubscribeTransitionCallBack(transitionTester)
|
||||
initialStateLatch.await
|
||||
|
||||
lock ! SubscribeTransitionCallBack(transitionTester)
|
||||
assert(initialStateLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
|
||||
lock ! '3'
|
||||
lock ! '3'
|
||||
lock ! '2'
|
||||
lock ! '2'
|
||||
lock ! '1'
|
||||
|
||||
lock ! '3'
|
||||
lock ! '3'
|
||||
lock ! '2'
|
||||
lock ! '2'
|
||||
lock ! '1'
|
||||
unlockedLatch.await
|
||||
transitionLatch.await
|
||||
transitionCallBackLatch.await
|
||||
lockedLatch.await
|
||||
|
||||
assert(unlockedLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
|
||||
assert(transitionLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
|
||||
assert(transitionCallBackLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
|
||||
assert(lockedLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))
|
||||
lock ! "not_handled"
|
||||
unhandledLatch.await
|
||||
|
||||
val answerLatch = TestLatch()
|
||||
object Hello
|
||||
object Bye
|
||||
val tester = Actor.actorOf(new Actor {
|
||||
protected def receive = {
|
||||
case Hello => lock ! "hello"
|
||||
case "world" => answerLatch.open
|
||||
case Bye => lock ! "bye"
|
||||
}
|
||||
}).start
|
||||
tester ! Hello
|
||||
answerLatch.await
|
||||
|
||||
lock ! "not_handled"
|
||||
assert(unhandledLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))
|
||||
|
||||
val answerLatch = new StandardLatch
|
||||
object Hello
|
||||
object Bye
|
||||
val tester = Actor.actorOf(new Actor {
|
||||
protected def receive = {
|
||||
case Hello => lock ! "hello"
|
||||
case "world" => answerLatch.open
|
||||
case Bye => lock ! "bye"
|
||||
}
|
||||
}).start
|
||||
tester ! Hello
|
||||
assert(answerLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))
|
||||
|
||||
tester ! Bye
|
||||
assert(terminatedLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))
|
||||
tester ! Bye
|
||||
terminatedLatch.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,16 +1,13 @@
|
|||
package akka.actor
|
||||
|
||||
import akka.testkit.TestKit
|
||||
import akka.util.duration._
|
||||
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
class FSMTimingSpec
|
||||
extends WordSpec
|
||||
with MustMatchers
|
||||
with TestKit {
|
||||
import akka.testkit.TestKit
|
||||
import akka.util.duration._
|
||||
|
||||
|
||||
class FSMTimingSpec extends WordSpec with MustMatchers with TestKit {
|
||||
import FSMTimingSpec._
|
||||
import FSM._
|
||||
|
||||
|
|
@ -140,4 +137,3 @@ object FSMTimingSpec {
|
|||
|
||||
}
|
||||
|
||||
// vim: set ts=2 sw=2 et:
|
||||
|
|
|
|||
|
|
@ -1,18 +1,25 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
||||
import java.util.concurrent.{TimeUnit, CountDownLatch}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
import akka.testing._
|
||||
import akka.util.duration._
|
||||
|
||||
import Actor._
|
||||
|
||||
|
||||
object ForwardActorSpec {
|
||||
object ForwardState {
|
||||
var sender: Option[ActorRef] = None
|
||||
}
|
||||
|
||||
class ReceiverActor extends Actor {
|
||||
val latch = new CountDownLatch(1)
|
||||
val latch = TestLatch()
|
||||
def receive = {
|
||||
case "SendBang" => {
|
||||
ForwardState.sender = self.sender
|
||||
|
|
@ -42,7 +49,7 @@ object ForwardActorSpec {
|
|||
}
|
||||
|
||||
class BangBangSenderActor extends Actor {
|
||||
val latch = new CountDownLatch(1)
|
||||
val latch = TestLatch()
|
||||
val forwardActor = actorOf[ForwardActor]
|
||||
forwardActor.start
|
||||
(forwardActor !! "SendBangBang") match {
|
||||
|
|
@ -55,27 +62,27 @@ object ForwardActorSpec {
|
|||
}
|
||||
}
|
||||
|
||||
class ForwardActorSpec extends JUnitSuite {
|
||||
class ForwardActorSpec extends WordSpec with MustMatchers {
|
||||
import ForwardActorSpec._
|
||||
|
||||
@Test
|
||||
def shouldForwardActorReferenceWhenInvokingForwardOnBang {
|
||||
val senderActor = actorOf[BangSenderActor]
|
||||
val latch = senderActor.actor.asInstanceOf[BangSenderActor]
|
||||
"A Forward Actor" must {
|
||||
"forward actor reference when invoking forward on bang" in {
|
||||
val senderActor = actorOf[BangSenderActor]
|
||||
val latch = senderActor.actor.asInstanceOf[BangSenderActor]
|
||||
.forwardActor.actor.asInstanceOf[ForwardActor]
|
||||
.receiverActor.actor.asInstanceOf[ReceiverActor]
|
||||
.latch
|
||||
senderActor.start
|
||||
assert(latch.await(1L, TimeUnit.SECONDS))
|
||||
assert(ForwardState.sender ne null)
|
||||
assert(senderActor.toString === ForwardState.sender.get.toString)
|
||||
}
|
||||
senderActor.start
|
||||
latch.await
|
||||
ForwardState.sender must not be (null)
|
||||
senderActor.toString must be (ForwardState.sender.get.toString)
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldForwardActorReferenceWhenInvokingForwardOnBangBang {
|
||||
val senderActor = actorOf[BangBangSenderActor]
|
||||
senderActor.start
|
||||
val latch = senderActor.actor.asInstanceOf[BangBangSenderActor].latch
|
||||
assert(latch.await(1L, TimeUnit.SECONDS))
|
||||
"forward actor reference when invoking forward on bang bang" in {
|
||||
val senderActor = actorOf[BangBangSenderActor]
|
||||
senderActor.start
|
||||
val latch = senderActor.actor.asInstanceOf[BangBangSenderActor].latch
|
||||
latch.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,17 +1,23 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
import akka.testing._
|
||||
|
||||
import Actor._
|
||||
|
||||
import java.util.concurrent.CyclicBarrier
|
||||
|
||||
class HotSwapSpec extends WordSpec with MustMatchers {
|
||||
|
||||
"An Actor" should {
|
||||
"An Actor" must {
|
||||
|
||||
"be able to hotswap its behavior with HotSwap(..)" in {
|
||||
val barrier = new CyclicBarrier(2)
|
||||
val barrier = TestBarrier(2)
|
||||
@volatile var _log = ""
|
||||
val a = actorOf( new Actor {
|
||||
def receive = { case _ => _log += "default" }
|
||||
|
|
@ -27,7 +33,7 @@ class HotSwapSpec extends WordSpec with MustMatchers {
|
|||
}
|
||||
|
||||
"be able to hotswap its behavior with become(..)" in {
|
||||
val barrier = new CyclicBarrier(2)
|
||||
val barrier = TestBarrier(2)
|
||||
@volatile var _log = ""
|
||||
val a = actorOf(new Actor {
|
||||
def receive = {
|
||||
|
|
@ -55,7 +61,7 @@ class HotSwapSpec extends WordSpec with MustMatchers {
|
|||
}
|
||||
|
||||
"be able to revert hotswap its behavior with RevertHotSwap(..)" in {
|
||||
val barrier = new CyclicBarrier(2)
|
||||
val barrier = TestBarrier(2)
|
||||
@volatile var _log = ""
|
||||
val a = actorOf( new Actor {
|
||||
def receive = {
|
||||
|
|
@ -100,7 +106,7 @@ class HotSwapSpec extends WordSpec with MustMatchers {
|
|||
}
|
||||
|
||||
"be able to revert hotswap its behavior with unbecome" in {
|
||||
val barrier = new CyclicBarrier(2)
|
||||
val barrier = TestBarrier(2)
|
||||
@volatile var _log = ""
|
||||
val a = actorOf(new Actor {
|
||||
def receive = {
|
||||
|
|
|
|||
|
|
@ -1,108 +1,120 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
import akka.testing._
|
||||
import akka.util.duration._
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
import Actor._
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
class ReceiveTimeoutSpec extends JUnitSuite {
|
||||
|
||||
@Test def receiveShouldGetTimeout= {
|
||||
class ReceiveTimeoutSpec extends WordSpec with MustMatchers {
|
||||
import Actor._
|
||||
|
||||
val timeoutLatch = new StandardLatch
|
||||
"An actor with receive timeout" must {
|
||||
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
self.receiveTimeout = Some(500L)
|
||||
"get timeout" in {
|
||||
val timeoutLatch = TestLatch()
|
||||
|
||||
protected def receive = {
|
||||
case ReceiveTimeout => timeoutLatch.open
|
||||
}
|
||||
}).start
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
self.receiveTimeout = Some(500L)
|
||||
|
||||
assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS))
|
||||
timeoutActor.stop
|
||||
}
|
||||
protected def receive = {
|
||||
case ReceiveTimeout => timeoutLatch.open
|
||||
}
|
||||
}).start
|
||||
|
||||
@Test def swappedReceiveShouldAlsoGetTimout = {
|
||||
val timeoutLatch = new StandardLatch
|
||||
timeoutLatch.await
|
||||
timeoutActor.stop
|
||||
}
|
||||
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
self.receiveTimeout = Some(500L)
|
||||
"get timeout when swapped" in {
|
||||
val timeoutLatch = TestLatch()
|
||||
|
||||
protected def receive = {
|
||||
case ReceiveTimeout => timeoutLatch.open
|
||||
}
|
||||
}).start
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
self.receiveTimeout = Some(500L)
|
||||
|
||||
// after max 1 second the timeout should already been sent
|
||||
assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS))
|
||||
protected def receive = {
|
||||
case ReceiveTimeout => timeoutLatch.open
|
||||
}
|
||||
}).start
|
||||
|
||||
val swappedLatch = new StandardLatch
|
||||
timeoutActor ! HotSwap(self => {
|
||||
case ReceiveTimeout => swappedLatch.open
|
||||
})
|
||||
timeoutLatch.await
|
||||
|
||||
assert(swappedLatch.tryAwait(3, TimeUnit.SECONDS))
|
||||
timeoutActor.stop
|
||||
}
|
||||
val swappedLatch = TestLatch()
|
||||
|
||||
@Test def timeoutShouldBeRescheduledAfterRegularReceive = {
|
||||
timeoutActor ! HotSwap(self => {
|
||||
case ReceiveTimeout => swappedLatch.open
|
||||
})
|
||||
|
||||
val timeoutLatch = new StandardLatch
|
||||
case object Tick
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
self.receiveTimeout = Some(500L)
|
||||
swappedLatch.await
|
||||
timeoutActor.stop
|
||||
}
|
||||
|
||||
"reschedule timeout after regular receive" in {
|
||||
val timeoutLatch = TestLatch()
|
||||
case object Tick
|
||||
|
||||
protected def receive = {
|
||||
case Tick => ()
|
||||
case ReceiveTimeout => timeoutLatch.open
|
||||
}
|
||||
}).start
|
||||
timeoutActor ! Tick
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
self.receiveTimeout = Some(500L)
|
||||
|
||||
assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == true)
|
||||
timeoutActor.stop
|
||||
}
|
||||
protected def receive = {
|
||||
case Tick => ()
|
||||
case ReceiveTimeout => timeoutLatch.open
|
||||
}
|
||||
}).start
|
||||
|
||||
@Test def timeoutShouldBeTurnedOffIfDesired = {
|
||||
val count = new AtomicInteger(0)
|
||||
val timeoutLatch = new StandardLatch
|
||||
case object Tick
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
self.receiveTimeout = Some(500L)
|
||||
timeoutActor ! Tick
|
||||
|
||||
protected def receive = {
|
||||
case Tick => ()
|
||||
case ReceiveTimeout =>
|
||||
count.incrementAndGet
|
||||
timeoutLatch.open
|
||||
self.receiveTimeout = None
|
||||
}
|
||||
}).start
|
||||
timeoutActor ! Tick
|
||||
timeoutLatch.await
|
||||
timeoutActor.stop
|
||||
}
|
||||
|
||||
assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == true)
|
||||
assert(count.get === 1)
|
||||
timeoutActor.stop
|
||||
}
|
||||
"be able to turn off timeout if desired" in {
|
||||
val count = new AtomicInteger(0)
|
||||
val timeoutLatch = TestLatch()
|
||||
case object Tick
|
||||
|
||||
@Test def timeoutShouldNotBeSentWhenNotSpecified = {
|
||||
val timeoutLatch = new StandardLatch
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
self.receiveTimeout = Some(500L)
|
||||
|
||||
protected def receive = {
|
||||
case ReceiveTimeout => timeoutLatch.open
|
||||
}
|
||||
}).start
|
||||
protected def receive = {
|
||||
case Tick => ()
|
||||
case ReceiveTimeout =>
|
||||
count.incrementAndGet
|
||||
timeoutLatch.open
|
||||
self.receiveTimeout = None
|
||||
}
|
||||
}).start
|
||||
|
||||
assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS) == false)
|
||||
timeoutActor.stop
|
||||
}
|
||||
timeoutActor ! Tick
|
||||
|
||||
@Test def ActorsReceiveTimeoutShouldBeReceiveTimeout {
|
||||
assert(akka.actor.Actors.receiveTimeout() eq ReceiveTimeout)
|
||||
timeoutLatch.await
|
||||
count.get must be (1)
|
||||
timeoutActor.stop
|
||||
}
|
||||
|
||||
"not receive timeout message when not specified" in {
|
||||
val timeoutLatch = TestLatch()
|
||||
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
protected def receive = {
|
||||
case ReceiveTimeout => timeoutLatch.open
|
||||
}
|
||||
}).start
|
||||
|
||||
timeoutLatch.awaitTimeout(1 second) // timeout expected
|
||||
timeoutActor.stop
|
||||
}
|
||||
|
||||
"have ReceiveTimeout eq to Actors ReceiveTimeout" in {
|
||||
akka.actor.Actors.receiveTimeout() must be theSameInstanceAs (ReceiveTimeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ package akka.actor.dispatch
|
|||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import org.scalatest.Assertions._
|
||||
import akka.testing._
|
||||
import akka.dispatch._
|
||||
import akka.actor.{ActorRef, Actor}
|
||||
import akka.actor.Actor._
|
||||
|
|
@ -13,7 +14,6 @@ import java.util.concurrent.atomic.AtomicLong
|
|||
import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit}
|
||||
import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor
|
||||
import akka.util.{Duration, Switch}
|
||||
import akka.Testing
|
||||
|
||||
object ActorModelSpec {
|
||||
|
||||
|
|
@ -225,13 +225,13 @@ abstract class ActorModelSpec extends JUnitSuite {
|
|||
a.start
|
||||
|
||||
a ! CountDown(start)
|
||||
assertCountDown(start, Testing.time(3000), "Should process first message within 3 seconds")
|
||||
assertCountDown(start, Testing.testTime(3000), "Should process first message within 3 seconds")
|
||||
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1)
|
||||
|
||||
a ! Wait(1000)
|
||||
a ! CountDown(oneAtATime)
|
||||
// in case of serialization violation, restart would happen instead of count down
|
||||
assertCountDown(oneAtATime, Testing.time(1500) ,"Processed message when allowed")
|
||||
assertCountDown(oneAtATime, Testing.testTime(1500) ,"Processed message when allowed")
|
||||
assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3)
|
||||
|
||||
a.stop
|
||||
|
|
@ -246,7 +246,7 @@ abstract class ActorModelSpec extends JUnitSuite {
|
|||
|
||||
def start = spawn { for (i <- 1 to 20) { a ! WaitAck(1, counter) } }
|
||||
for (i <- 1 to 10) { start }
|
||||
assertCountDown(counter, Testing.time(3000), "Should process 200 messages")
|
||||
assertCountDown(counter, Testing.testTime(3000), "Should process 200 messages")
|
||||
assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200)
|
||||
|
||||
a.stop
|
||||
|
|
@ -264,10 +264,10 @@ abstract class ActorModelSpec extends JUnitSuite {
|
|||
val aStart,aStop,bParallel = new CountDownLatch(1)
|
||||
|
||||
a ! Meet(aStart,aStop)
|
||||
assertCountDown(aStart, Testing.time(3000), "Should process first message within 3 seconds")
|
||||
assertCountDown(aStart, Testing.testTime(3000), "Should process first message within 3 seconds")
|
||||
|
||||
b ! CountDown(bParallel)
|
||||
assertCountDown(bParallel, Testing.time(3000), "Should process other actors in parallel")
|
||||
assertCountDown(bParallel, Testing.testTime(3000), "Should process other actors in parallel")
|
||||
|
||||
aStop.countDown()
|
||||
a.stop
|
||||
|
|
@ -282,7 +282,7 @@ abstract class ActorModelSpec extends JUnitSuite {
|
|||
val done = new CountDownLatch(1)
|
||||
a ! Restart
|
||||
a ! CountDown(done)
|
||||
assertCountDown(done, Testing.time(3000), "Should be suspended+resumed and done with next message within 3 seconds")
|
||||
assertCountDown(done, Testing.testTime(3000), "Should be suspended+resumed and done with next message within 3 seconds")
|
||||
a.stop
|
||||
assertRefDefaultZero(a)(registers = 1,unregisters = 1, msgsReceived = 2,
|
||||
msgsProcessed = 2, suspensions = 1, resumes = 1)
|
||||
|
|
@ -298,7 +298,7 @@ abstract class ActorModelSpec extends JUnitSuite {
|
|||
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1)
|
||||
|
||||
dispatcher.resume(a)
|
||||
assertCountDown(done, Testing.time(3000), "Should resume processing of messages when resumed")
|
||||
assertCountDown(done, Testing.testTime(3000), "Should resume processing of messages when resumed")
|
||||
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1,
|
||||
suspensions = 1, resumes = 1)
|
||||
|
||||
|
|
@ -315,7 +315,7 @@ abstract class ActorModelSpec extends JUnitSuite {
|
|||
(1 to num) foreach {
|
||||
_ => newTestActor.start ! cachedMessage
|
||||
}
|
||||
assertCountDown(cachedMessage.latch, Testing.time(10000), "Should process " + num + " countdowns")
|
||||
assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns")
|
||||
}
|
||||
for(run <- 1 to 3) {
|
||||
flood(10000)
|
||||
|
|
|
|||
|
|
@ -468,7 +468,7 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
|
|||
val pool2 = actorOf(new TestPool2).start
|
||||
pool2 ! "a"
|
||||
pool2 ! "b"
|
||||
done = latch.await(1,TimeUnit.SECONDS)
|
||||
done = latch.await(1, TimeUnit.SECONDS)
|
||||
done must be (true)
|
||||
delegates.size must be (2)
|
||||
pool2 stop
|
||||
|
|
|
|||
40
akka-actor/src/test/scala/akka/testing/TestBarrier.scala
Normal file
40
akka-actor/src/test/scala/akka/testing/TestBarrier.scala
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.testing
|
||||
|
||||
import akka.util.Duration
|
||||
import java.util.concurrent.{CyclicBarrier, TimeUnit, TimeoutException}
|
||||
|
||||
|
||||
class TestBarrierTimeoutException(message: String) extends RuntimeException(message)
|
||||
|
||||
/**
|
||||
* A cyclic barrier wrapper for use in testing.
|
||||
* It always uses a timeout when waiting and timeouts are specified as durations.
|
||||
* Timeouts will always throw an exception. The default timeout is 5 seconds.
|
||||
* Timeouts are multiplied by the testing time factor for Jenkins builds.
|
||||
*/
|
||||
object TestBarrier {
|
||||
val DefaultTimeout = Duration(5, TimeUnit.SECONDS)
|
||||
|
||||
def apply(count: Int) = new TestBarrier(count)
|
||||
}
|
||||
|
||||
class TestBarrier(count: Int) {
|
||||
private val barrier = new CyclicBarrier(count)
|
||||
|
||||
def await(): Unit = await(TestBarrier.DefaultTimeout)
|
||||
|
||||
def await(timeout: Duration): Unit = {
|
||||
try {
|
||||
barrier.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS)
|
||||
} catch {
|
||||
case e: TimeoutException =>
|
||||
throw new TestBarrierTimeoutException("Timeout of %s and time factor of %s" format (timeout.toString, Testing.timeFactor))
|
||||
}
|
||||
}
|
||||
|
||||
def reset = barrier.reset
|
||||
}
|
||||
55
akka-actor/src/test/scala/akka/testing/TestLatch.scala
Normal file
55
akka-actor/src/test/scala/akka/testing/TestLatch.scala
Normal file
|
|
@ -0,0 +1,55 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.testing
|
||||
|
||||
import akka.util.Duration
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
|
||||
|
||||
class TestLatchTimeoutException(message: String) extends RuntimeException(message)
|
||||
class TestLatchNoTimeoutException(message: String) extends RuntimeException(message)
|
||||
|
||||
/**
|
||||
* A count down latch wrapper for use in testing.
|
||||
* It always uses a timeout when waiting and timeouts are specified as durations.
|
||||
* There's a default timeout of 5 seconds and the default count is 1.
|
||||
* Timeouts will always throw an exception (no need to wrap in assert in tests).
|
||||
* Timeouts are multiplied by the testing time factor for Jenkins builds.
|
||||
*/
|
||||
object TestLatch {
|
||||
val DefaultTimeout = Duration(5, TimeUnit.SECONDS)
|
||||
|
||||
def apply(count: Int = 1) = new TestLatch(count)
|
||||
}
|
||||
|
||||
class TestLatch(count: Int = 1) {
|
||||
private var latch = new CountDownLatch(count)
|
||||
|
||||
def countDown = latch.countDown
|
||||
|
||||
def open = countDown
|
||||
|
||||
def await(): Boolean = await(TestLatch.DefaultTimeout)
|
||||
|
||||
def await(timeout: Duration): Boolean = {
|
||||
val opened = latch.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS)
|
||||
if (!opened) throw new TestLatchTimeoutException(
|
||||
"Timeout of %s with time factor of %s" format (timeout.toString, Testing.timeFactor))
|
||||
opened
|
||||
}
|
||||
|
||||
/**
|
||||
* Timeout is expected. Throws exception if latch is opened before timeout.
|
||||
*/
|
||||
def awaitTimeout(timeout: Duration = TestLatch.DefaultTimeout) = {
|
||||
val opened = latch.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS)
|
||||
if (opened) throw new TestLatchNoTimeoutException(
|
||||
"Latch opened before timeout of %s with time factor of %s" format (timeout.toString, Testing.timeFactor))
|
||||
opened
|
||||
}
|
||||
|
||||
def reset = latch = new CountDownLatch(count)
|
||||
}
|
||||
|
||||
|
|
@ -2,7 +2,9 @@
|
|||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka
|
||||
package akka.testing
|
||||
|
||||
import akka.util.Duration
|
||||
|
||||
/**
|
||||
* Multiplying numbers used in test timeouts by a factor, set by system property.
|
||||
|
|
@ -18,8 +20,10 @@ object Testing {
|
|||
}
|
||||
}
|
||||
|
||||
def time(t: Int): Int = (timeFactor * t).toInt
|
||||
def time(t: Long): Long = (timeFactor * t).toLong
|
||||
def time(t: Float): Float = (timeFactor * t).toFloat
|
||||
def time(t: Double): Double = timeFactor * t
|
||||
def testTime(t: Int): Int = (timeFactor * t).toInt
|
||||
def testTime(t: Long): Long = (timeFactor * t).toLong
|
||||
def testTime(t: Float): Float = (timeFactor * t).toFloat
|
||||
def testTime(t: Double): Double = timeFactor * t
|
||||
|
||||
def sleepFor(duration: Duration) = Thread.sleep(testTime(duration.toMillis))
|
||||
}
|
||||
|
|
@ -5,7 +5,7 @@ import org.scalatest.matchers.MustMatchers
|
|||
|
||||
class Ticket001Spec extends WordSpec with MustMatchers {
|
||||
|
||||
"An XXX" should {
|
||||
"An XXX" must {
|
||||
"do YYY" in {
|
||||
1 must be (1)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1049,7 +1049,7 @@ class RemoteServerHandler(
|
|||
throw firstException
|
||||
|
||||
targetMethod
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
val messageReceiver = resolveMethod(typedActor.getClass, ownerTypeHint, typedActorInfo.getMethod, argClasses)
|
||||
|
|
@ -1230,7 +1230,14 @@ class RemoteServerHandler(
|
|||
server.findTypedActorByIdOrUuid(actorInfo.getId, parseUuid(uuid).toString) match {
|
||||
case null => // the actor has not been registered globally. See if we have it in the session
|
||||
createTypedSessionActor(actorInfo, channel) match {
|
||||
case null => createClientManagedTypedActor(actorInfo) //Maybe client managed actor?
|
||||
case null =>
|
||||
// FIXME this is broken, if a user tries to get a server-managed typed actor and that is not registered then a client-managed typed actor is created, but just throwing an exception here causes client-managed typed actors to fail
|
||||
|
||||
/* val e = new RemoteServerException("Can't load remote Typed Actor for [" + actorInfo.getId + "]")
|
||||
EventHandler.error(e, this, e.getMessage)
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
throw e
|
||||
*/ createClientManagedTypedActor(actorInfo) // client-managed actor
|
||||
case sessionActor => sessionActor
|
||||
}
|
||||
case typedActor => typedActor
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import akka.actor._
|
|||
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
|
||||
import akka.config. {RemoteAddress, Config, TypedActorConfigurator}
|
||||
|
||||
import akka.Testing
|
||||
import akka.testing._
|
||||
|
||||
object RemoteTypedActorLog {
|
||||
val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
|
||||
|
|
@ -39,13 +39,13 @@ class RemoteTypedActorSpec extends AkkaRemoteTest {
|
|||
classOf[RemoteTypedActorOne],
|
||||
classOf[RemoteTypedActorOneImpl],
|
||||
Permanent,
|
||||
Testing.time(10000),
|
||||
Testing.testTime(20000),
|
||||
RemoteAddress(host,port)),
|
||||
new SuperviseTypedActor(
|
||||
classOf[RemoteTypedActorTwo],
|
||||
classOf[RemoteTypedActorTwoImpl],
|
||||
Permanent,
|
||||
Testing.time(10000),
|
||||
Testing.testTime(20000),
|
||||
RemoteAddress(host,port))
|
||||
).toArray).supervise
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,5 @@
|
|||
project.organization=se.scalablesolutions.akka
|
||||
project.name=Akka Tutorial 1 SBT
|
||||
project.version=1.0
|
||||
build.scala.versions=2.9.0.RC1
|
||||
sbt.version=0.7.5.RC1
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
import sbt._
|
||||
|
||||
class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
|
||||
val akkaRepo = "Akka Repo" at "http://akka.io/repository"
|
||||
val akkaPlugin = "se.scalablesolutions.akka" % "akka-sbt-plugin" % "1.1-SNAPSHOT"
|
||||
}
|
||||
127
akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala
Normal file
127
akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala
Normal file
|
|
@ -0,0 +1,127 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.tutorial.sbt.pi
|
||||
|
||||
import akka.actor.{Actor, ActorRef, PoisonPill}
|
||||
import Actor._
|
||||
import akka.routing.{Routing, CyclicIterator}
|
||||
import Routing._
|
||||
import akka.event.EventHandler
|
||||
import akka.dispatch.Dispatchers
|
||||
|
||||
import System.{currentTimeMillis => now}
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
/**
|
||||
* Sample for Akka, SBT an Scala tutorial.
|
||||
* <p/>
|
||||
* Calculates Pi.
|
||||
* <p/>
|
||||
* Run it in SBT:
|
||||
* <pre>
|
||||
* $ sbt
|
||||
* > update
|
||||
* > console
|
||||
* > akka.tutorial.sbt.pi.Pi.calculate
|
||||
* > ...
|
||||
* > :quit
|
||||
* </pre>
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Pi {
|
||||
val nrOfWorkers = 4
|
||||
val nrOfMessages = 10000
|
||||
val nrOfElements = 10000
|
||||
|
||||
// ====================
|
||||
// ===== Messages =====
|
||||
// ====================
|
||||
sealed trait PiMessage
|
||||
case class Calculate(nrOfMessages: Int, nrOfElements: Int) extends PiMessage
|
||||
case class Work(arg: Int, fun: (Int) => Double) extends PiMessage
|
||||
case class Result(value: Double) extends PiMessage
|
||||
|
||||
// ==================
|
||||
// ===== Worker =====
|
||||
// ==================
|
||||
class Worker extends Actor {
|
||||
def receive = {
|
||||
case Work(arg, fun) => self reply Result(fun(arg))
|
||||
}
|
||||
}
|
||||
|
||||
// ==================
|
||||
// ===== Master =====
|
||||
// ==================
|
||||
class Master(latch: CountDownLatch) extends Actor {
|
||||
var pi: Double = _
|
||||
var nrOfResults: Int = _
|
||||
var start: Long = _
|
||||
|
||||
// create the workers
|
||||
val workers = {
|
||||
val ws = new Array[ActorRef](nrOfWorkers)
|
||||
for (i <- 0 until nrOfWorkers) ws(i) = actorOf[Worker].start
|
||||
ws
|
||||
}
|
||||
|
||||
// wrap them with a load-balancing router
|
||||
val router = Routing.loadBalancerActor(CyclicIterator(workers)).start
|
||||
|
||||
// define the work
|
||||
val algorithm = (i: Int) => {
|
||||
val range = (i * nrOfElements) to ((i + 1) * nrOfElements - 1)
|
||||
val results = for (j <- range) yield (4 * math.pow(-1, j) / (2 * j + 1))
|
||||
results.sum
|
||||
}
|
||||
|
||||
// message handler
|
||||
def receive = {
|
||||
case Calculate(nrOfMessages, nrOfElements) =>
|
||||
// schedule work
|
||||
for (arg <- 0 until nrOfMessages) router ! Work(arg, algorithm)
|
||||
|
||||
// send a PoisonPill to all workers telling them to shut down themselves
|
||||
router ! Broadcast(PoisonPill)
|
||||
|
||||
case Result(value) =>
|
||||
// handle result from the worker
|
||||
pi += value
|
||||
nrOfResults += 1
|
||||
if (nrOfResults == nrOfMessages) self.stop
|
||||
}
|
||||
|
||||
override def preStart = start = now
|
||||
|
||||
override def postStop = {
|
||||
// tell the world that the calculation is complete
|
||||
EventHandler.info(this, "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start)))
|
||||
latch.countDown
|
||||
}
|
||||
}
|
||||
|
||||
// ==================
|
||||
// ===== Run it =====
|
||||
// ==================
|
||||
def calculate = {
|
||||
// this latch is only plumbing to know when the calculation is completed
|
||||
val latch = new CountDownLatch(1)
|
||||
|
||||
// create the master
|
||||
val master = actorOf(new Master(latch)).start
|
||||
|
||||
// start the calculation
|
||||
master ! Calculate(nrOfMessages, nrOfElements)
|
||||
|
||||
// wait for master to shut down
|
||||
latch.await
|
||||
}
|
||||
}
|
||||
|
||||
// To be able to run it as a main application
|
||||
object Main extends App {
|
||||
Pi.calculate
|
||||
}
|
||||
|
|
@ -12,8 +12,6 @@ import akka.config.Supervision._
|
|||
import java.util.concurrent.CountDownLatch
|
||||
import akka.config.TypedActorConfigurator
|
||||
|
||||
import akka.Testing
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
|
|
@ -97,7 +95,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
|
|||
}
|
||||
|
||||
it("should be stopped when supervision cannot handle the problem in") {
|
||||
val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), Testing.time(30000))
|
||||
val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), 30000)
|
||||
val conf = new TypedActorConfigurator().configure(OneForOneStrategy(Nil, 3, 500000), Array(actorSupervision)).inject.supervise
|
||||
try {
|
||||
val first = conf.getInstance(classOf[TypedActorFailer])
|
||||
|
|
@ -123,7 +121,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
|
|||
}
|
||||
|
||||
it("should be restarted when supervision handles the problem in") {
|
||||
val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), Testing.time(30000))
|
||||
val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), 30000)
|
||||
val conf = new TypedActorConfigurator().configure(OneForOneStrategy(classOf[Throwable] :: Nil, 3, 500000), Array(actorSupervision)).inject.supervise
|
||||
try {
|
||||
val first = conf.getInstance(classOf[TypedActorFailer])
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ akka {
|
|||
time-unit = "seconds" # Time unit for all timeout properties throughout the config
|
||||
|
||||
event-handlers = ["akka.event.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT)
|
||||
event-handler-level = "DEBUG" # Options: ERROR, WARNING, INFO, DEBUG
|
||||
event-handler-level = "INFO" # Options: ERROR, WARNING, INFO, DEBUG
|
||||
|
||||
# These boot classes are loaded (and created) automatically when the Akka Microkernel boots up
|
||||
# Can be used to bootstrap your application(s)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue