Merge branch 'master' of github.com:jboner/akka
This commit is contained in:
commit
e7cf4850e2
15 changed files with 375 additions and 254 deletions
|
|
@ -1,18 +1,25 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
||||
import java.util.concurrent.{TimeUnit, CyclicBarrier, TimeoutException}
|
||||
import akka.config.Supervision._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import org.scalatest.BeforeAndAfterEach
|
||||
|
||||
import akka.testing._
|
||||
import akka.testing.Testing.sleepFor
|
||||
import akka.util.duration._
|
||||
|
||||
import akka.dispatch.Dispatchers
|
||||
import Actor._
|
||||
import akka.config.Supervision._
|
||||
import akka.dispatch.Dispatchers
|
||||
|
||||
import akka.Testing
|
||||
|
||||
object ActorFireForgetRequestReplySpec {
|
||||
class ReplyActor extends Actor {
|
||||
|
||||
class ReplyActor extends Actor {
|
||||
def receive = {
|
||||
case "Send" =>
|
||||
self.reply("Reply")
|
||||
|
|
@ -32,7 +39,6 @@ object ActorFireForgetRequestReplySpec {
|
|||
}
|
||||
|
||||
class SenderActor(replyActor: ActorRef) extends Actor {
|
||||
|
||||
def receive = {
|
||||
case "Init" =>
|
||||
replyActor ! "Send"
|
||||
|
|
@ -50,44 +56,42 @@ object ActorFireForgetRequestReplySpec {
|
|||
|
||||
object state {
|
||||
var s = "NIL"
|
||||
val finished = new CyclicBarrier(2)
|
||||
val finished = TestBarrier(2)
|
||||
}
|
||||
}
|
||||
|
||||
class ActorFireForgetRequestReplySpec extends JUnitSuite {
|
||||
class ActorFireForgetRequestReplySpec extends WordSpec with MustMatchers with BeforeAndAfterEach {
|
||||
import ActorFireForgetRequestReplySpec._
|
||||
|
||||
@Test
|
||||
def shouldReplyToBangMessageUsingReply = {
|
||||
override def beforeEach() = {
|
||||
state.finished.reset
|
||||
val replyActor = actorOf[ReplyActor].start
|
||||
val senderActor = actorOf(new SenderActor(replyActor)).start
|
||||
senderActor ! "Init"
|
||||
try { state.finished.await(1L, TimeUnit.SECONDS) }
|
||||
catch { case e: TimeoutException => fail("Never got the message") }
|
||||
assert("Reply" === state.s)
|
||||
}
|
||||
}
|
||||
|
||||
"An Actor" must {
|
||||
|
||||
@Test
|
||||
def shouldReplyToBangMessageUsingImplicitSender = {
|
||||
state.finished.reset
|
||||
val replyActor = actorOf[ReplyActor].start
|
||||
val senderActor = actorOf(new SenderActor(replyActor)).start
|
||||
senderActor ! "InitImplicit"
|
||||
try { state.finished.await(1L, TimeUnit.SECONDS) }
|
||||
catch { case e: TimeoutException => fail("Never got the message") }
|
||||
assert("ReplyImplicit" === state.s)
|
||||
}
|
||||
"reply to bang message using reply" in {
|
||||
val replyActor = actorOf[ReplyActor].start
|
||||
val senderActor = actorOf(new SenderActor(replyActor)).start
|
||||
senderActor ! "Init"
|
||||
state.finished.await
|
||||
state.s must be ("Reply")
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldShutdownCrashedTemporaryActor = {
|
||||
state.finished.reset
|
||||
val actor = actorOf[CrashingTemporaryActor].start
|
||||
assert(actor.isRunning)
|
||||
actor ! "Die"
|
||||
try { state.finished.await(10L, TimeUnit.SECONDS) }
|
||||
catch { case e: TimeoutException => fail("Never got the message") }
|
||||
Thread.sleep(Testing.time(500))
|
||||
assert(actor.isShutdown)
|
||||
"reply to bang message using implicit sender" in {
|
||||
val replyActor = actorOf[ReplyActor].start
|
||||
val senderActor = actorOf(new SenderActor(replyActor)).start
|
||||
senderActor ! "InitImplicit"
|
||||
state.finished.await
|
||||
state.s must be ("ReplyImplicit")
|
||||
}
|
||||
|
||||
"should shutdown crashed temporary actor" in {
|
||||
val actor = actorOf[CrashingTemporaryActor].start
|
||||
actor.isRunning must be (true)
|
||||
actor ! "Die"
|
||||
state.finished.await
|
||||
sleepFor(1 second)
|
||||
actor.isShutdown must be (true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,19 +4,20 @@
|
|||
|
||||
package akka.actor
|
||||
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
import akka.testing._
|
||||
import akka.util.duration._
|
||||
import akka.testing.Testing.sleepFor
|
||||
|
||||
import akka.actor._
|
||||
import akka.dispatch.Future
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
|
||||
|
||||
object ActorRefSpec {
|
||||
|
||||
var latch = new CountDownLatch(4)
|
||||
val latch = TestLatch(4)
|
||||
|
||||
class ReplyActor extends Actor {
|
||||
var replyTo: Channel[Any] = null
|
||||
|
|
@ -49,7 +50,7 @@ object ActorRefSpec {
|
|||
}
|
||||
|
||||
private def work {
|
||||
Thread.sleep(1000)
|
||||
sleepFor(1 second)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -69,16 +70,12 @@ object ActorRefSpec {
|
|||
}
|
||||
}
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class ActorRefSpec extends
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterAll {
|
||||
|
||||
class ActorRefSpec extends WordSpec with MustMatchers {
|
||||
import ActorRefSpec._
|
||||
|
||||
describe("ActorRef") {
|
||||
it("should support to reply via channel") {
|
||||
"An ActorRef" must {
|
||||
|
||||
"support reply via channel" in {
|
||||
val serverRef = Actor.actorOf[ReplyActor].start
|
||||
val clientRef = Actor.actorOf(new SenderActor(serverRef)).start
|
||||
|
||||
|
|
@ -86,18 +83,23 @@ class ActorRefSpec extends
|
|||
clientRef ! "simple"
|
||||
clientRef ! "simple"
|
||||
clientRef ! "simple"
|
||||
assert(latch.await(4L, TimeUnit.SECONDS))
|
||||
latch = new CountDownLatch(4)
|
||||
|
||||
latch.await
|
||||
|
||||
latch.reset
|
||||
|
||||
clientRef ! "complex2"
|
||||
clientRef ! "simple"
|
||||
clientRef ! "simple"
|
||||
clientRef ! "simple"
|
||||
assert(latch.await(4L, TimeUnit.SECONDS))
|
||||
|
||||
latch.await
|
||||
|
||||
clientRef.stop
|
||||
serverRef.stop
|
||||
}
|
||||
|
||||
it("should stop when sent a poison pill") {
|
||||
"stop when sent a poison pill" in {
|
||||
val ref = Actor.actorOf(
|
||||
new Actor {
|
||||
def receive = {
|
||||
|
|
@ -115,11 +117,11 @@ class ActorRefSpec extends
|
|||
fail("shouldn't get here")
|
||||
}
|
||||
|
||||
assert(ffive.resultOrException.get == "five")
|
||||
assert(fnull.resultOrException.get == "null")
|
||||
ffive.resultOrException.get must be ("five")
|
||||
fnull.resultOrException.get must be ("null")
|
||||
|
||||
assert(ref.isRunning == false)
|
||||
assert(ref.isShutdown == true)
|
||||
ref.isRunning must be (false)
|
||||
ref.isShutdown must be (true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,34 +4,31 @@
|
|||
|
||||
package akka.actor
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
import akka.testing._
|
||||
|
||||
import FSM._
|
||||
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import akka.util.Duration
|
||||
import akka.util.duration._
|
||||
|
||||
import akka.Testing
|
||||
|
||||
object FSMActorSpec {
|
||||
|
||||
|
||||
val unlockedLatch = new StandardLatch
|
||||
val lockedLatch = new StandardLatch
|
||||
val unhandledLatch = new StandardLatch
|
||||
val terminatedLatch = new StandardLatch
|
||||
val transitionLatch = new StandardLatch
|
||||
val initialStateLatch = new StandardLatch
|
||||
val transitionCallBackLatch = new StandardLatch
|
||||
val unlockedLatch = TestLatch()
|
||||
val lockedLatch = TestLatch()
|
||||
val unhandledLatch = TestLatch()
|
||||
val terminatedLatch = TestLatch()
|
||||
val transitionLatch = TestLatch()
|
||||
val initialStateLatch = TestLatch()
|
||||
val transitionCallBackLatch = TestLatch()
|
||||
|
||||
sealed trait LockState
|
||||
case object Locked extends LockState
|
||||
case object Open extends LockState
|
||||
|
||||
class Lock(code: String, timeout: (Long, TimeUnit)) extends Actor with FSM[LockState, CodeState] {
|
||||
class Lock(code: String, timeout: Duration) extends Actor with FSM[LockState, CodeState] {
|
||||
|
||||
startWith(Locked, CodeState("", code))
|
||||
|
||||
|
|
@ -94,53 +91,53 @@ object FSMActorSpec {
|
|||
case class CodeState(soFar: String, code: String)
|
||||
}
|
||||
|
||||
class FSMActorSpec extends JUnitSuite {
|
||||
class FSMActorSpec extends WordSpec with MustMatchers {
|
||||
import FSMActorSpec._
|
||||
|
||||
"An FSM Actor" must {
|
||||
|
||||
@Test
|
||||
def unlockTheLock = {
|
||||
"unlock the lock" in {
|
||||
|
||||
// lock that locked after being open for 1 sec
|
||||
val lock = Actor.actorOf(new Lock("33221", 1 second)).start
|
||||
|
||||
// lock that locked after being open for 1 sec
|
||||
val lock = Actor.actorOf(new Lock("33221", (Testing.time(1), TimeUnit.SECONDS))).start
|
||||
val transitionTester = Actor.actorOf(new Actor { def receive = {
|
||||
case Transition(_, _, _) => transitionCallBackLatch.open
|
||||
case CurrentState(_, Locked) => initialStateLatch.open
|
||||
}}).start
|
||||
|
||||
val transitionTester = Actor.actorOf(new Actor { def receive = {
|
||||
case Transition(_, _, _) => transitionCallBackLatch.open
|
||||
case CurrentState(_, Locked) => initialStateLatch.open
|
||||
}}).start
|
||||
lock ! SubscribeTransitionCallBack(transitionTester)
|
||||
initialStateLatch.await
|
||||
|
||||
lock ! SubscribeTransitionCallBack(transitionTester)
|
||||
assert(initialStateLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
|
||||
lock ! '3'
|
||||
lock ! '3'
|
||||
lock ! '2'
|
||||
lock ! '2'
|
||||
lock ! '1'
|
||||
|
||||
lock ! '3'
|
||||
lock ! '3'
|
||||
lock ! '2'
|
||||
lock ! '2'
|
||||
lock ! '1'
|
||||
unlockedLatch.await
|
||||
transitionLatch.await
|
||||
transitionCallBackLatch.await
|
||||
lockedLatch.await
|
||||
|
||||
assert(unlockedLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
|
||||
assert(transitionLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
|
||||
assert(transitionCallBackLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
|
||||
assert(lockedLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))
|
||||
lock ! "not_handled"
|
||||
unhandledLatch.await
|
||||
|
||||
val answerLatch = TestLatch()
|
||||
object Hello
|
||||
object Bye
|
||||
val tester = Actor.actorOf(new Actor {
|
||||
protected def receive = {
|
||||
case Hello => lock ! "hello"
|
||||
case "world" => answerLatch.open
|
||||
case Bye => lock ! "bye"
|
||||
}
|
||||
}).start
|
||||
tester ! Hello
|
||||
answerLatch.await
|
||||
|
||||
lock ! "not_handled"
|
||||
assert(unhandledLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))
|
||||
|
||||
val answerLatch = new StandardLatch
|
||||
object Hello
|
||||
object Bye
|
||||
val tester = Actor.actorOf(new Actor {
|
||||
protected def receive = {
|
||||
case Hello => lock ! "hello"
|
||||
case "world" => answerLatch.open
|
||||
case Bye => lock ! "bye"
|
||||
}
|
||||
}).start
|
||||
tester ! Hello
|
||||
assert(answerLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))
|
||||
|
||||
tester ! Bye
|
||||
assert(terminatedLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))
|
||||
tester ! Bye
|
||||
terminatedLatch.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,16 +1,13 @@
|
|||
package akka.actor
|
||||
|
||||
import akka.testkit.TestKit
|
||||
import akka.util.duration._
|
||||
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
class FSMTimingSpec
|
||||
extends WordSpec
|
||||
with MustMatchers
|
||||
with TestKit {
|
||||
import akka.testkit.TestKit
|
||||
import akka.util.duration._
|
||||
|
||||
|
||||
class FSMTimingSpec extends WordSpec with MustMatchers with TestKit {
|
||||
import FSMTimingSpec._
|
||||
import FSM._
|
||||
|
||||
|
|
@ -140,4 +137,3 @@ object FSMTimingSpec {
|
|||
|
||||
}
|
||||
|
||||
// vim: set ts=2 sw=2 et:
|
||||
|
|
|
|||
|
|
@ -1,18 +1,25 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
||||
import java.util.concurrent.{TimeUnit, CountDownLatch}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
import akka.testing._
|
||||
import akka.util.duration._
|
||||
|
||||
import Actor._
|
||||
|
||||
|
||||
object ForwardActorSpec {
|
||||
object ForwardState {
|
||||
var sender: Option[ActorRef] = None
|
||||
}
|
||||
|
||||
class ReceiverActor extends Actor {
|
||||
val latch = new CountDownLatch(1)
|
||||
val latch = TestLatch()
|
||||
def receive = {
|
||||
case "SendBang" => {
|
||||
ForwardState.sender = self.sender
|
||||
|
|
@ -42,7 +49,7 @@ object ForwardActorSpec {
|
|||
}
|
||||
|
||||
class BangBangSenderActor extends Actor {
|
||||
val latch = new CountDownLatch(1)
|
||||
val latch = TestLatch()
|
||||
val forwardActor = actorOf[ForwardActor]
|
||||
forwardActor.start
|
||||
(forwardActor !! "SendBangBang") match {
|
||||
|
|
@ -55,27 +62,27 @@ object ForwardActorSpec {
|
|||
}
|
||||
}
|
||||
|
||||
class ForwardActorSpec extends JUnitSuite {
|
||||
class ForwardActorSpec extends WordSpec with MustMatchers {
|
||||
import ForwardActorSpec._
|
||||
|
||||
@Test
|
||||
def shouldForwardActorReferenceWhenInvokingForwardOnBang {
|
||||
val senderActor = actorOf[BangSenderActor]
|
||||
val latch = senderActor.actor.asInstanceOf[BangSenderActor]
|
||||
"A Forward Actor" must {
|
||||
"forward actor reference when invoking forward on bang" in {
|
||||
val senderActor = actorOf[BangSenderActor]
|
||||
val latch = senderActor.actor.asInstanceOf[BangSenderActor]
|
||||
.forwardActor.actor.asInstanceOf[ForwardActor]
|
||||
.receiverActor.actor.asInstanceOf[ReceiverActor]
|
||||
.latch
|
||||
senderActor.start
|
||||
assert(latch.await(1L, TimeUnit.SECONDS))
|
||||
assert(ForwardState.sender ne null)
|
||||
assert(senderActor.toString === ForwardState.sender.get.toString)
|
||||
}
|
||||
senderActor.start
|
||||
latch.await
|
||||
ForwardState.sender must not be (null)
|
||||
senderActor.toString must be (ForwardState.sender.get.toString)
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldForwardActorReferenceWhenInvokingForwardOnBangBang {
|
||||
val senderActor = actorOf[BangBangSenderActor]
|
||||
senderActor.start
|
||||
val latch = senderActor.actor.asInstanceOf[BangBangSenderActor].latch
|
||||
assert(latch.await(1L, TimeUnit.SECONDS))
|
||||
"forward actor reference when invoking forward on bang bang" in {
|
||||
val senderActor = actorOf[BangBangSenderActor]
|
||||
senderActor.start
|
||||
val latch = senderActor.actor.asInstanceOf[BangBangSenderActor].latch
|
||||
latch.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,17 +1,23 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
import akka.testing._
|
||||
|
||||
import Actor._
|
||||
|
||||
import java.util.concurrent.CyclicBarrier
|
||||
|
||||
class HotSwapSpec extends WordSpec with MustMatchers {
|
||||
|
||||
"An Actor" should {
|
||||
"An Actor" must {
|
||||
|
||||
"be able to hotswap its behavior with HotSwap(..)" in {
|
||||
val barrier = new CyclicBarrier(2)
|
||||
val barrier = TestBarrier(2)
|
||||
@volatile var _log = ""
|
||||
val a = actorOf( new Actor {
|
||||
def receive = { case _ => _log += "default" }
|
||||
|
|
@ -27,7 +33,7 @@ class HotSwapSpec extends WordSpec with MustMatchers {
|
|||
}
|
||||
|
||||
"be able to hotswap its behavior with become(..)" in {
|
||||
val barrier = new CyclicBarrier(2)
|
||||
val barrier = TestBarrier(2)
|
||||
@volatile var _log = ""
|
||||
val a = actorOf(new Actor {
|
||||
def receive = {
|
||||
|
|
@ -55,7 +61,7 @@ class HotSwapSpec extends WordSpec with MustMatchers {
|
|||
}
|
||||
|
||||
"be able to revert hotswap its behavior with RevertHotSwap(..)" in {
|
||||
val barrier = new CyclicBarrier(2)
|
||||
val barrier = TestBarrier(2)
|
||||
@volatile var _log = ""
|
||||
val a = actorOf( new Actor {
|
||||
def receive = {
|
||||
|
|
@ -100,7 +106,7 @@ class HotSwapSpec extends WordSpec with MustMatchers {
|
|||
}
|
||||
|
||||
"be able to revert hotswap its behavior with unbecome" in {
|
||||
val barrier = new CyclicBarrier(2)
|
||||
val barrier = TestBarrier(2)
|
||||
@volatile var _log = ""
|
||||
val a = actorOf(new Actor {
|
||||
def receive = {
|
||||
|
|
|
|||
|
|
@ -1,108 +1,120 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
import akka.testing._
|
||||
import akka.util.duration._
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
import Actor._
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
class ReceiveTimeoutSpec extends JUnitSuite {
|
||||
|
||||
@Test def receiveShouldGetTimeout= {
|
||||
class ReceiveTimeoutSpec extends WordSpec with MustMatchers {
|
||||
import Actor._
|
||||
|
||||
val timeoutLatch = new StandardLatch
|
||||
"An actor with receive timeout" must {
|
||||
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
self.receiveTimeout = Some(500L)
|
||||
"get timeout" in {
|
||||
val timeoutLatch = TestLatch()
|
||||
|
||||
protected def receive = {
|
||||
case ReceiveTimeout => timeoutLatch.open
|
||||
}
|
||||
}).start
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
self.receiveTimeout = Some(500L)
|
||||
|
||||
assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS))
|
||||
timeoutActor.stop
|
||||
}
|
||||
protected def receive = {
|
||||
case ReceiveTimeout => timeoutLatch.open
|
||||
}
|
||||
}).start
|
||||
|
||||
@Test def swappedReceiveShouldAlsoGetTimout = {
|
||||
val timeoutLatch = new StandardLatch
|
||||
timeoutLatch.await
|
||||
timeoutActor.stop
|
||||
}
|
||||
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
self.receiveTimeout = Some(500L)
|
||||
"get timeout when swapped" in {
|
||||
val timeoutLatch = TestLatch()
|
||||
|
||||
protected def receive = {
|
||||
case ReceiveTimeout => timeoutLatch.open
|
||||
}
|
||||
}).start
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
self.receiveTimeout = Some(500L)
|
||||
|
||||
// after max 1 second the timeout should already been sent
|
||||
assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS))
|
||||
protected def receive = {
|
||||
case ReceiveTimeout => timeoutLatch.open
|
||||
}
|
||||
}).start
|
||||
|
||||
val swappedLatch = new StandardLatch
|
||||
timeoutActor ! HotSwap(self => {
|
||||
case ReceiveTimeout => swappedLatch.open
|
||||
})
|
||||
timeoutLatch.await
|
||||
|
||||
assert(swappedLatch.tryAwait(3, TimeUnit.SECONDS))
|
||||
timeoutActor.stop
|
||||
}
|
||||
val swappedLatch = TestLatch()
|
||||
|
||||
@Test def timeoutShouldBeRescheduledAfterRegularReceive = {
|
||||
timeoutActor ! HotSwap(self => {
|
||||
case ReceiveTimeout => swappedLatch.open
|
||||
})
|
||||
|
||||
val timeoutLatch = new StandardLatch
|
||||
case object Tick
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
self.receiveTimeout = Some(500L)
|
||||
swappedLatch.await
|
||||
timeoutActor.stop
|
||||
}
|
||||
|
||||
"reschedule timeout after regular receive" in {
|
||||
val timeoutLatch = TestLatch()
|
||||
case object Tick
|
||||
|
||||
protected def receive = {
|
||||
case Tick => ()
|
||||
case ReceiveTimeout => timeoutLatch.open
|
||||
}
|
||||
}).start
|
||||
timeoutActor ! Tick
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
self.receiveTimeout = Some(500L)
|
||||
|
||||
assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == true)
|
||||
timeoutActor.stop
|
||||
}
|
||||
protected def receive = {
|
||||
case Tick => ()
|
||||
case ReceiveTimeout => timeoutLatch.open
|
||||
}
|
||||
}).start
|
||||
|
||||
@Test def timeoutShouldBeTurnedOffIfDesired = {
|
||||
val count = new AtomicInteger(0)
|
||||
val timeoutLatch = new StandardLatch
|
||||
case object Tick
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
self.receiveTimeout = Some(500L)
|
||||
timeoutActor ! Tick
|
||||
|
||||
protected def receive = {
|
||||
case Tick => ()
|
||||
case ReceiveTimeout =>
|
||||
count.incrementAndGet
|
||||
timeoutLatch.open
|
||||
self.receiveTimeout = None
|
||||
}
|
||||
}).start
|
||||
timeoutActor ! Tick
|
||||
timeoutLatch.await
|
||||
timeoutActor.stop
|
||||
}
|
||||
|
||||
assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == true)
|
||||
assert(count.get === 1)
|
||||
timeoutActor.stop
|
||||
}
|
||||
"be able to turn off timeout if desired" in {
|
||||
val count = new AtomicInteger(0)
|
||||
val timeoutLatch = TestLatch()
|
||||
case object Tick
|
||||
|
||||
@Test def timeoutShouldNotBeSentWhenNotSpecified = {
|
||||
val timeoutLatch = new StandardLatch
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
self.receiveTimeout = Some(500L)
|
||||
|
||||
protected def receive = {
|
||||
case ReceiveTimeout => timeoutLatch.open
|
||||
}
|
||||
}).start
|
||||
protected def receive = {
|
||||
case Tick => ()
|
||||
case ReceiveTimeout =>
|
||||
count.incrementAndGet
|
||||
timeoutLatch.open
|
||||
self.receiveTimeout = None
|
||||
}
|
||||
}).start
|
||||
|
||||
assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS) == false)
|
||||
timeoutActor.stop
|
||||
}
|
||||
timeoutActor ! Tick
|
||||
|
||||
@Test def ActorsReceiveTimeoutShouldBeReceiveTimeout {
|
||||
assert(akka.actor.Actors.receiveTimeout() eq ReceiveTimeout)
|
||||
timeoutLatch.await
|
||||
count.get must be (1)
|
||||
timeoutActor.stop
|
||||
}
|
||||
|
||||
"not receive timeout message when not specified" in {
|
||||
val timeoutLatch = TestLatch()
|
||||
|
||||
val timeoutActor = actorOf(new Actor {
|
||||
protected def receive = {
|
||||
case ReceiveTimeout => timeoutLatch.open
|
||||
}
|
||||
}).start
|
||||
|
||||
timeoutLatch.awaitTimeout(1 second) // timeout expected
|
||||
timeoutActor.stop
|
||||
}
|
||||
|
||||
"have ReceiveTimeout eq to Actors ReceiveTimeout" in {
|
||||
akka.actor.Actors.receiveTimeout() must be theSameInstanceAs (ReceiveTimeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ package akka.actor.dispatch
|
|||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import org.scalatest.Assertions._
|
||||
import akka.testing._
|
||||
import akka.dispatch._
|
||||
import akka.actor.{ActorRef, Actor}
|
||||
import akka.actor.Actor._
|
||||
|
|
@ -13,7 +14,6 @@ import java.util.concurrent.atomic.AtomicLong
|
|||
import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit}
|
||||
import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor
|
||||
import akka.util.{Duration, Switch}
|
||||
import akka.Testing
|
||||
|
||||
object ActorModelSpec {
|
||||
|
||||
|
|
@ -225,13 +225,13 @@ abstract class ActorModelSpec extends JUnitSuite {
|
|||
a.start
|
||||
|
||||
a ! CountDown(start)
|
||||
assertCountDown(start, Testing.time(3000), "Should process first message within 3 seconds")
|
||||
assertCountDown(start, Testing.testTime(3000), "Should process first message within 3 seconds")
|
||||
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1)
|
||||
|
||||
a ! Wait(1000)
|
||||
a ! CountDown(oneAtATime)
|
||||
// in case of serialization violation, restart would happen instead of count down
|
||||
assertCountDown(oneAtATime, Testing.time(1500) ,"Processed message when allowed")
|
||||
assertCountDown(oneAtATime, Testing.testTime(1500) ,"Processed message when allowed")
|
||||
assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3)
|
||||
|
||||
a.stop
|
||||
|
|
@ -246,7 +246,7 @@ abstract class ActorModelSpec extends JUnitSuite {
|
|||
|
||||
def start = spawn { for (i <- 1 to 20) { a ! WaitAck(1, counter) } }
|
||||
for (i <- 1 to 10) { start }
|
||||
assertCountDown(counter, Testing.time(3000), "Should process 200 messages")
|
||||
assertCountDown(counter, Testing.testTime(3000), "Should process 200 messages")
|
||||
assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200)
|
||||
|
||||
a.stop
|
||||
|
|
@ -264,10 +264,10 @@ abstract class ActorModelSpec extends JUnitSuite {
|
|||
val aStart,aStop,bParallel = new CountDownLatch(1)
|
||||
|
||||
a ! Meet(aStart,aStop)
|
||||
assertCountDown(aStart, Testing.time(3000), "Should process first message within 3 seconds")
|
||||
assertCountDown(aStart, Testing.testTime(3000), "Should process first message within 3 seconds")
|
||||
|
||||
b ! CountDown(bParallel)
|
||||
assertCountDown(bParallel, Testing.time(3000), "Should process other actors in parallel")
|
||||
assertCountDown(bParallel, Testing.testTime(3000), "Should process other actors in parallel")
|
||||
|
||||
aStop.countDown()
|
||||
a.stop
|
||||
|
|
@ -282,7 +282,7 @@ abstract class ActorModelSpec extends JUnitSuite {
|
|||
val done = new CountDownLatch(1)
|
||||
a ! Restart
|
||||
a ! CountDown(done)
|
||||
assertCountDown(done, Testing.time(3000), "Should be suspended+resumed and done with next message within 3 seconds")
|
||||
assertCountDown(done, Testing.testTime(3000), "Should be suspended+resumed and done with next message within 3 seconds")
|
||||
a.stop
|
||||
assertRefDefaultZero(a)(registers = 1,unregisters = 1, msgsReceived = 2,
|
||||
msgsProcessed = 2, suspensions = 1, resumes = 1)
|
||||
|
|
@ -298,7 +298,7 @@ abstract class ActorModelSpec extends JUnitSuite {
|
|||
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1)
|
||||
|
||||
dispatcher.resume(a)
|
||||
assertCountDown(done, Testing.time(3000), "Should resume processing of messages when resumed")
|
||||
assertCountDown(done, Testing.testTime(3000), "Should resume processing of messages when resumed")
|
||||
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1,
|
||||
suspensions = 1, resumes = 1)
|
||||
|
||||
|
|
@ -315,7 +315,7 @@ abstract class ActorModelSpec extends JUnitSuite {
|
|||
(1 to num) foreach {
|
||||
_ => newTestActor.start ! cachedMessage
|
||||
}
|
||||
assertCountDown(cachedMessage.latch, Testing.time(10000), "Should process " + num + " countdowns")
|
||||
assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns")
|
||||
}
|
||||
for(run <- 1 to 3) {
|
||||
flood(10000)
|
||||
|
|
|
|||
|
|
@ -468,7 +468,7 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
|
|||
val pool2 = actorOf(new TestPool2).start
|
||||
pool2 ! "a"
|
||||
pool2 ! "b"
|
||||
done = latch.await(1,TimeUnit.SECONDS)
|
||||
done = latch.await(1, TimeUnit.SECONDS)
|
||||
done must be (true)
|
||||
delegates.size must be (2)
|
||||
pool2 stop
|
||||
|
|
|
|||
40
akka-actor/src/test/scala/akka/testing/TestBarrier.scala
Normal file
40
akka-actor/src/test/scala/akka/testing/TestBarrier.scala
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.testing
|
||||
|
||||
import akka.util.Duration
|
||||
import java.util.concurrent.{CyclicBarrier, TimeUnit, TimeoutException}
|
||||
|
||||
|
||||
class TestBarrierTimeoutException(message: String) extends RuntimeException(message)
|
||||
|
||||
/**
|
||||
* A cyclic barrier wrapper for use in testing.
|
||||
* It always uses a timeout when waiting and timeouts are specified as durations.
|
||||
* Timeouts will always throw an exception. The default timeout is 5 seconds.
|
||||
* Timeouts are multiplied by the testing time factor for Jenkins builds.
|
||||
*/
|
||||
object TestBarrier {
|
||||
val DefaultTimeout = Duration(5, TimeUnit.SECONDS)
|
||||
|
||||
def apply(count: Int) = new TestBarrier(count)
|
||||
}
|
||||
|
||||
class TestBarrier(count: Int) {
|
||||
private val barrier = new CyclicBarrier(count)
|
||||
|
||||
def await(): Unit = await(TestBarrier.DefaultTimeout)
|
||||
|
||||
def await(timeout: Duration): Unit = {
|
||||
try {
|
||||
barrier.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS)
|
||||
} catch {
|
||||
case e: TimeoutException =>
|
||||
throw new TestBarrierTimeoutException("Timeout of %s and time factor of %s" format (timeout.toString, Testing.timeFactor))
|
||||
}
|
||||
}
|
||||
|
||||
def reset = barrier.reset
|
||||
}
|
||||
55
akka-actor/src/test/scala/akka/testing/TestLatch.scala
Normal file
55
akka-actor/src/test/scala/akka/testing/TestLatch.scala
Normal file
|
|
@ -0,0 +1,55 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.testing
|
||||
|
||||
import akka.util.Duration
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
|
||||
|
||||
class TestLatchTimeoutException(message: String) extends RuntimeException(message)
|
||||
class TestLatchNoTimeoutException(message: String) extends RuntimeException(message)
|
||||
|
||||
/**
|
||||
* A count down latch wrapper for use in testing.
|
||||
* It always uses a timeout when waiting and timeouts are specified as durations.
|
||||
* There's a default timeout of 5 seconds and the default count is 1.
|
||||
* Timeouts will always throw an exception (no need to wrap in assert in tests).
|
||||
* Timeouts are multiplied by the testing time factor for Jenkins builds.
|
||||
*/
|
||||
object TestLatch {
|
||||
val DefaultTimeout = Duration(5, TimeUnit.SECONDS)
|
||||
|
||||
def apply(count: Int = 1) = new TestLatch(count)
|
||||
}
|
||||
|
||||
class TestLatch(count: Int = 1) {
|
||||
private var latch = new CountDownLatch(count)
|
||||
|
||||
def countDown = latch.countDown
|
||||
|
||||
def open = countDown
|
||||
|
||||
def await(): Boolean = await(TestLatch.DefaultTimeout)
|
||||
|
||||
def await(timeout: Duration): Boolean = {
|
||||
val opened = latch.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS)
|
||||
if (!opened) throw new TestLatchTimeoutException(
|
||||
"Timeout of %s with time factor of %s" format (timeout.toString, Testing.timeFactor))
|
||||
opened
|
||||
}
|
||||
|
||||
/**
|
||||
* Timeout is expected. Throws exception if latch is opened before timeout.
|
||||
*/
|
||||
def awaitTimeout(timeout: Duration = TestLatch.DefaultTimeout) = {
|
||||
val opened = latch.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS)
|
||||
if (opened) throw new TestLatchNoTimeoutException(
|
||||
"Latch opened before timeout of %s with time factor of %s" format (timeout.toString, Testing.timeFactor))
|
||||
opened
|
||||
}
|
||||
|
||||
def reset = latch = new CountDownLatch(count)
|
||||
}
|
||||
|
||||
|
|
@ -2,7 +2,9 @@
|
|||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka
|
||||
package akka.testing
|
||||
|
||||
import akka.util.Duration
|
||||
|
||||
/**
|
||||
* Multiplying numbers used in test timeouts by a factor, set by system property.
|
||||
|
|
@ -18,8 +20,10 @@ object Testing {
|
|||
}
|
||||
}
|
||||
|
||||
def time(t: Int): Int = (timeFactor * t).toInt
|
||||
def time(t: Long): Long = (timeFactor * t).toLong
|
||||
def time(t: Float): Float = (timeFactor * t).toFloat
|
||||
def time(t: Double): Double = timeFactor * t
|
||||
def testTime(t: Int): Int = (timeFactor * t).toInt
|
||||
def testTime(t: Long): Long = (timeFactor * t).toLong
|
||||
def testTime(t: Float): Float = (timeFactor * t).toFloat
|
||||
def testTime(t: Double): Double = timeFactor * t
|
||||
|
||||
def sleepFor(duration: Duration) = Thread.sleep(testTime(duration.toMillis))
|
||||
}
|
||||
|
|
@ -5,7 +5,7 @@ import org.scalatest.matchers.MustMatchers
|
|||
|
||||
class Ticket001Spec extends WordSpec with MustMatchers {
|
||||
|
||||
"An XXX" should {
|
||||
"An XXX" must {
|
||||
"do YYY" in {
|
||||
1 must be (1)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import akka.actor._
|
|||
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
|
||||
import akka.config. {RemoteAddress, Config, TypedActorConfigurator}
|
||||
|
||||
import akka.Testing
|
||||
import akka.testing._
|
||||
|
||||
object RemoteTypedActorLog {
|
||||
val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
|
||||
|
|
@ -39,13 +39,13 @@ class RemoteTypedActorSpec extends AkkaRemoteTest {
|
|||
classOf[RemoteTypedActorOne],
|
||||
classOf[RemoteTypedActorOneImpl],
|
||||
Permanent,
|
||||
Testing.time(10000),
|
||||
Testing.testTime(20000),
|
||||
RemoteAddress(host,port)),
|
||||
new SuperviseTypedActor(
|
||||
classOf[RemoteTypedActorTwo],
|
||||
classOf[RemoteTypedActorTwoImpl],
|
||||
Permanent,
|
||||
Testing.time(10000),
|
||||
Testing.testTime(20000),
|
||||
RemoteAddress(host,port))
|
||||
).toArray).supervise
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,8 +12,6 @@ import akka.config.Supervision._
|
|||
import java.util.concurrent.CountDownLatch
|
||||
import akka.config.TypedActorConfigurator
|
||||
|
||||
import akka.Testing
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
|
|
@ -97,7 +95,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
|
|||
}
|
||||
|
||||
it("should be stopped when supervision cannot handle the problem in") {
|
||||
val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), Testing.time(30000))
|
||||
val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), 30000)
|
||||
val conf = new TypedActorConfigurator().configure(OneForOneStrategy(Nil, 3, 500000), Array(actorSupervision)).inject.supervise
|
||||
try {
|
||||
val first = conf.getInstance(classOf[TypedActorFailer])
|
||||
|
|
@ -123,7 +121,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
|
|||
}
|
||||
|
||||
it("should be restarted when supervision handles the problem in") {
|
||||
val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), Testing.time(30000))
|
||||
val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), 30000)
|
||||
val conf = new TypedActorConfigurator().configure(OneForOneStrategy(classOf[Throwable] :: Nil, 3, 500000), Array(actorSupervision)).inject.supervise
|
||||
try {
|
||||
val first = conf.getInstance(classOf[TypedActorFailer])
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue