TestEventFilter overhaul

- make normal filters available for all four log levels
- allow filtering for
  + exact and complete message
  + start of message
  + regular expression
- keep count of occurrences if requested and verify correct count at end
  of filterEvents/filterExceptions block
- remove akka.testkit.Testing (sleepFor replaced by Duration.dilated.sleep)
- remove Duration.timeFactor (needs config -> AkkaApplication)
- TestLatch needs config -> AkkaApplication (was forgotten because used
  directly System.getProperty)
- lots of scaladoc for TestEventListener infrastructure
This commit is contained in:
Roland 2011-11-07 22:10:17 +01:00
parent 6559511bce
commit c1a9475015
23 changed files with 508 additions and 182 deletions

View file

@ -6,7 +6,6 @@ package akka.actor
import akka.testkit._
import org.scalatest.BeforeAndAfterEach
import akka.testkit.Testing.sleepFor
import akka.util.duration._
import akka.dispatch.Dispatchers
@ -84,7 +83,7 @@ class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach {
actor.isShutdown must be(false)
actor ! "Die"
state.finished.await
sleepFor(1 second)
1.second.dilated.sleep()
actor.isShutdown must be(true)
supervisor.stop()
}

View file

@ -9,7 +9,6 @@ import org.scalatest.matchers.MustMatchers
import akka.testkit._
import akka.util.duration._
import akka.testkit.Testing.sleepFor
import java.lang.IllegalStateException
import akka.util.ReflectiveAccess
import akka.dispatch.{ DefaultPromise, Promise, Future }
@ -20,8 +19,6 @@ object ActorRefSpec {
case class ReplyTo(channel: Channel[Any])
val latch = TestLatch(4)
class ReplyActor extends Actor {
var replyTo: Channel[Any] = null
@ -53,11 +50,11 @@ object ActorRefSpec {
}
private def work {
sleepFor(1 second)
1.second.dilated.sleep
}
}
class SenderActor(replyActor: ActorRef) extends Actor {
class SenderActor(replyActor: ActorRef, latch: TestLatch) extends Actor {
def receive = {
case "complex" replyActor ! "complexRequest"
@ -343,8 +340,9 @@ class ActorRefSpec extends AkkaSpec {
}
"support reply via channel" in {
val latch = new TestLatch(4)
val serverRef = actorOf(Props[ReplyActor])
val clientRef = actorOf(Props(new SenderActor(serverRef)))
val clientRef = actorOf(Props(new SenderActor(serverRef, latch)))
clientRef ! "complex"
clientRef ! "simple"

View file

@ -103,7 +103,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
}
"fail a monitor which does not handle Terminated()" in {
filterEvents(EventFilter[ActorKilledException], EventFilter[DeathPactException]) {
filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) {
case class FF(fail: Failed)
val supervisor = actorOf(Props[Supervisor]
.withFaultHandler(new OneForOneStrategy(FaultHandlingStrategy.makeDecider(List(classOf[Exception])), Some(0)) {

View file

@ -18,19 +18,23 @@ import akka.config.Configuration
object FSMActorSpec {
val unlockedLatch = TestLatch()
val lockedLatch = TestLatch()
val unhandledLatch = TestLatch()
val terminatedLatch = TestLatch()
val transitionLatch = TestLatch()
val initialStateLatch = TestLatch()
val transitionCallBackLatch = TestLatch()
class Latches(implicit app: AkkaApplication) {
val unlockedLatch = TestLatch()
val lockedLatch = TestLatch()
val unhandledLatch = TestLatch()
val terminatedLatch = TestLatch()
val transitionLatch = TestLatch()
val initialStateLatch = TestLatch()
val transitionCallBackLatch = TestLatch()
}
sealed trait LockState
case object Locked extends LockState
case object Open extends LockState
class Lock(code: String, timeout: Duration) extends Actor with FSM[LockState, CodeState] {
class Lock(code: String, timeout: Duration, latches: Latches) extends Actor with FSM[LockState, CodeState] {
import latches._
startWith(Locked, CodeState("", code))
@ -107,8 +111,11 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true
"unlock the lock" in {
val latches = new Latches
import latches._
// lock that locked after being open for 1 sec
val lock = actorOf(new Lock("33221", 1 second))
val lock = actorOf(new Lock("33221", 1 second, latches))
val transitionTester = actorOf(new Actor {
def receive = {

View file

@ -6,7 +6,6 @@ package akka.actor
import akka.testkit._
import akka.util.duration._
import akka.testkit.Testing.sleepFor
import java.util.concurrent.{ TimeUnit, CountDownLatch }

View file

@ -5,7 +5,6 @@
package akka.actor
import org.scalatest.BeforeAndAfterEach
import akka.testkit.Testing.sleepFor
import akka.util.duration._
import akka.{ Die, Ping }
import akka.actor.Actor._
@ -17,7 +16,6 @@ import akka.testkit.AkkaSpec
object SupervisorSpec {
val Timeout = 5 seconds
val TimeoutMillis = Timeout.dilated.toMillis.toInt
// =====================================================
// Message logs
@ -67,6 +65,8 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
import SupervisorSpec._
val TimeoutMillis = Timeout.dilated.toMillis.toInt
// =====================================================
// Creating actors and supervisors
// =====================================================
@ -144,7 +144,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
master ! Die
expectMsg(3 seconds, "terminated")
sleepFor(1 second)
1.second.dilated.sleep
messageLogPoll must be(null)
}
@ -155,7 +155,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
(temporaryActor.?(Die, TimeoutMillis)).get
}
sleepFor(1 second)
1.second.dilated.sleep
messageLog.size must be(0)
}
@ -299,7 +299,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
}
// give time for restart
sleepFor(3 seconds)
3.seconds.dilated.sleep
(dyingActor.?(Ping, TimeoutMillis)).as[String].getOrElse("nil") must be === PongMessage

View file

@ -6,7 +6,6 @@ package akka.actor
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.util.duration._
import akka.testkit.Testing.sleepFor
import akka.dispatch.Dispatchers
import akka.actor.Actor._
import akka.testkit.{ TestKit, EventFilter, filterEvents, filterException }

View file

@ -4,7 +4,7 @@
package akka.actor.dispatch
import org.scalatest.Assertions._
import akka.testkit.{ Testing, filterEvents, EventFilter, AkkaSpec }
import akka.testkit.{ filterEvents, EventFilter, AkkaSpec }
import akka.dispatch._
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, TimeUnit }
@ -285,13 +285,13 @@ abstract class ActorModelSpec extends AkkaSpec {
val a = newTestActor(dispatcher)
a ! CountDown(start)
assertCountDown(start, Testing.testTime(3000), "Should process first message within 3 seconds")
assertCountDown(start, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1)
a ! Wait(1000)
a ! CountDown(oneAtATime)
// in case of serialization violation, restart would happen instead of count down
assertCountDown(oneAtATime, Testing.testTime(1500), "Processed message when allowed")
assertCountDown(oneAtATime, (1.5 seconds).dilated.toMillis, "Processed message when allowed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3)
a.stop()
@ -310,7 +310,7 @@ abstract class ActorModelSpec extends AkkaSpec {
}
}
}
assertCountDown(counter, Testing.testTime(3000), "Should process 200 messages")
assertCountDown(counter, 3.seconds.dilated.toMillis, "Should process 200 messages")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200)
a.stop()
@ -339,7 +339,7 @@ abstract class ActorModelSpec extends AkkaSpec {
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1)
a.resume
assertCountDown(done, Testing.testTime(3000), "Should resume processing of messages when resumed")
assertCountDown(done, 3.seconds.dilated.toMillis, "Should resume processing of messages when resumed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1,
suspensions = 1, resumes = 1)
@ -360,7 +360,7 @@ abstract class ActorModelSpec extends AkkaSpec {
}).withDispatcher(wavesSupervisorDispatcher(dispatcher)))
boss ! "run"
try {
assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns")
assertCountDown(cachedMessage.latch, 10.seconds.dilated.toMillis, "Should process " + num + " countdowns")
} catch {
case e
System.err.println("Error: " + e.getMessage + " missing count downs == " + cachedMessage.latch.getCount() + " out of " + num)
@ -374,7 +374,7 @@ abstract class ActorModelSpec extends AkkaSpec {
}
"continue to process messages when a thread gets interrupted" in {
filterEvents(EventFilter[InterruptedException], EventFilter[akka.event.Logging.EventHandlerException]) {
filterEvents(EventFilter[InterruptedException](), EventFilter[akka.event.Logging.EventHandlerException]()) {
implicit val dispatcher = newInterceptedDispatcher
implicit val timeout = Timeout(5 seconds)
val a = newTestActor(dispatcher)
@ -408,7 +408,7 @@ abstract class ActorModelSpec extends AkkaSpec {
}
"continue to process messages when exception is thrown" in {
filterEvents(EventFilter[IndexOutOfBoundsException], EventFilter[RemoteException]) {
filterEvents(EventFilter[IndexOutOfBoundsException](), EventFilter[RemoteException]()) {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor(dispatcher)
val f1 = a ? Reply("foo")
@ -467,10 +467,10 @@ class DispatcherModelSpec extends ActorModelSpec {
val a, b = newTestActor(dispatcher)
a ! Meet(aStart, aStop)
assertCountDown(aStart, Testing.testTime(3000), "Should process first message within 3 seconds")
assertCountDown(aStart, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds")
b ! CountDown(bParallel)
assertCountDown(bParallel, Testing.testTime(3000), "Should process other actors in parallel")
assertCountDown(bParallel, 3.seconds.dilated.toMillis, "Should process other actors in parallel")
aStop.countDown()
@ -506,10 +506,10 @@ class BalancingDispatcherModelSpec extends ActorModelSpec {
val a, b = newTestActor(dispatcher)
a ! Meet(aStart, aStop)
assertCountDown(aStart, Testing.testTime(3000), "Should process first message within 3 seconds")
assertCountDown(aStart, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds")
b ! CountDown(bParallel)
assertCountDown(bParallel, Testing.testTime(3000), "Should process other actors in parallel")
assertCountDown(bParallel, 3.seconds.dilated.toMillis, "Should process other actors in parallel")
aStop.countDown()

View file

@ -2,7 +2,6 @@ package akka.routing
import akka.dispatch.{ KeptPromise, Future }
import akka.actor._
import akka.testkit.Testing._
import akka.testkit.{ TestLatch, filterEvents, EventFilter, filterException }
import akka.util.duration._
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
@ -88,7 +87,7 @@ class ActorPoolSpec extends AkkaSpec {
def instance(p: Props) = actorOf(p.withCreator(new Actor {
def receive = {
case req: String {
sleepFor(10 millis)
(10 millis).dilated.sleep
channel.tryTell("Response")
}
}
@ -116,7 +115,7 @@ class ActorPoolSpec extends AkkaSpec {
def instance(p: Props) = actorOf(p.withCreator(new Actor {
def receive = {
case n: Int
sleepFor(n millis)
(n millis).dilated.sleep
count.incrementAndGet
latch.countDown()
}
@ -142,7 +141,7 @@ class ActorPoolSpec extends AkkaSpec {
count.set(0)
for (m 0 until loops) {
pool ? t
sleepFor(50 millis)
(50 millis).dilated.sleep
}
}
@ -180,7 +179,7 @@ class ActorPoolSpec extends AkkaSpec {
def instance(p: Props) = actorOf(p.withCreator(new Actor {
def receive = {
case n: Int
sleepFor(n millis)
(n millis).dilated.sleep
count.incrementAndGet
latch.countDown()
}
@ -291,7 +290,7 @@ class ActorPoolSpec extends AkkaSpec {
def instance(p: Props) = actorOf(p.withCreator(new Actor {
def receive = {
case n: Int
sleepFor(n millis)
(n millis).dilated.sleep
latch.countDown()
}
}))
@ -311,7 +310,7 @@ class ActorPoolSpec extends AkkaSpec {
for (m 0 to 10) pool ! 250
sleepFor(5 millis)
(5 millis).dilated.sleep
val z = (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size
@ -321,7 +320,7 @@ class ActorPoolSpec extends AkkaSpec {
for (m 0 to 3) {
pool ! 1
sleepFor(500 millis)
(500 millis).dilated.sleep
}
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be <= (z)
@ -414,7 +413,7 @@ class ActorPoolSpec extends AkkaSpec {
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(2 seconds).dilated.sleep
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
@ -425,7 +424,7 @@ class ActorPoolSpec extends AkkaSpec {
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(2 seconds).dilated.sleep
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
@ -438,7 +437,7 @@ class ActorPoolSpec extends AkkaSpec {
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(2 seconds).dilated.sleep
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
@ -449,7 +448,7 @@ class ActorPoolSpec extends AkkaSpec {
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(2 seconds).dilated.sleep
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
@ -461,7 +460,7 @@ class ActorPoolSpec extends AkkaSpec {
pool3 ! "ping"
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool3 ! akka.Die
sleepFor(2 seconds)
(2 seconds).dilated.sleep
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool3 ! "ping"
pool3 ! "ping"
@ -472,7 +471,7 @@ class ActorPoolSpec extends AkkaSpec {
}
"support customizable supervision config of pooled actors" in {
filterEvents(EventFilter[IllegalStateException], EventFilter[RuntimeException]) {
filterEvents(EventFilter[IllegalStateException](), EventFilter[RuntimeException]()) {
val pingCount = new AtomicInteger(0)
val deathCount = new AtomicInteger(0)
var keepDying = new AtomicBoolean(false)
@ -510,7 +509,7 @@ class ActorPoolSpec extends AkkaSpec {
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(2 seconds).dilated.sleep
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
@ -520,7 +519,7 @@ class ActorPoolSpec extends AkkaSpec {
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(2 seconds).dilated.sleep
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)

View file

@ -90,11 +90,14 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor
val ProviderClass = getString("akka.actor.provider", "akka.actor.LocalActorRefProvider")
val DefaultTimeUnit = getString("akka.time-unit", "seconds")
val DefaultTimeUnit = Duration.timeUnit(getString("akka.time-unit", "seconds"))
val ActorTimeout = Timeout(Duration(getInt("akka.actor.timeout", 5), DefaultTimeUnit))
val ActorTimeoutMillis = ActorTimeout.duration.toMillis
val SerializeAllMessages = getBool("akka.actor.serialize-messages", false)
val TestTimeFactor = getDouble("akka.test.timefactor", 1.0)
val TestEventFilterLeeway = Duration(getDouble("akka.test.filter-leeway", 0.5), DefaultTimeUnit)
val LogLevel = getString("akka.loglevel", "INFO")
val StdoutLogLevel = getString("akka.stdout-loglevel", LogLevel)
val EventHandlers = getList("akka.event-handlers")

View file

@ -1,7 +1,6 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch
import akka.AkkaException

View file

@ -7,6 +7,7 @@ package akka.util
import java.util.concurrent.TimeUnit
import TimeUnit._
import java.lang.{ Long JLong, Double JDouble }
import akka.AkkaApplication
class TimerException(message: String) extends RuntimeException(message)
@ -119,15 +120,6 @@ object Duration {
case "ns" | "nano" | "nanos" | "nanosecond" | "nanoseconds" NANOSECONDS
}
/*
* Testing facilities
*/
val timeFactor: Double = {
val factor = System.getProperty("akka.test.timefactor", "1.0")
try { factor.toDouble }
catch { case e: java.lang.NumberFormatException 1.0 }
}
val Zero: Duration = new FiniteDuration(0, NANOSECONDS)
trait Infinite {
@ -272,9 +264,10 @@ abstract class Duration extends Serializable {
def /(other: Duration): Double
def unary_- : Duration
def finite_? : Boolean
def dilated: Duration = this * Duration.timeFactor
def dilated(implicit app: AkkaApplication): Duration = this * app.AkkaConfig.TestTimeFactor
def min(other: Duration): Duration = if (this < other) this else other
def max(other: Duration): Duration = if (this > other) this else other
def sleep(): Unit = Thread.sleep(toMillis)
// Java API
def lt(other: Duration) = this < other

View file

@ -82,9 +82,9 @@ class CoordinatedIncrementSpec extends AkkaSpec with BeforeAndAfterAll {
"increment no counters with a failing transaction" in {
val ignoreExceptions = Seq(
EventFilter[ExpectedFailureException],
EventFilter[CoordinatedTransactionException],
EventFilter[ActorTimeoutException])
EventFilter[ExpectedFailureException](),
EventFilter[CoordinatedTransactionException](),
EventFilter[ActorTimeoutException]())
filterEvents(ignoreExceptions) {
val (counters, failer) = actorOfs
val coordinated = Coordinated()

View file

@ -115,9 +115,9 @@ class FickleFriendsSpec extends AkkaSpec with BeforeAndAfterAll {
"Coordinated fickle friends" should {
"eventually succeed to increment all counters by one" in {
val ignoreExceptions = Seq(
EventFilter[ExpectedFailureException],
EventFilter[CoordinatedTransactionException],
EventFilter[ActorTimeoutException])
EventFilter[ExpectedFailureException](),
EventFilter[CoordinatedTransactionException](),
EventFilter[ActorTimeoutException]())
filterEvents(ignoreExceptions) {
val (counters, coordinator) = actorOfs
val latch = new CountDownLatch(1)

View file

@ -105,9 +105,9 @@ class TransactorSpec extends AkkaSpec {
"increment no counters with a failing transaction" in {
val ignoreExceptions = Seq(
EventFilter[ExpectedFailureException],
EventFilter[CoordinatedTransactionException],
EventFilter[ActorTimeoutException])
EventFilter[ExpectedFailureException](),
EventFilter[CoordinatedTransactionException](),
EventFilter[ActorTimeoutException]())
filterEvents(ignoreExceptions) {
val (counters, failer) = createTransactors
val failLatch = TestLatch(numCounters)

View file

@ -6,6 +6,7 @@ package akka.testkit
import akka.util.Duration
import java.util.concurrent.{ CyclicBarrier, TimeUnit, TimeoutException }
import akka.AkkaApplication
class TestBarrierTimeoutException(message: String) extends RuntimeException(message)
@ -24,14 +25,15 @@ object TestBarrier {
class TestBarrier(count: Int) {
private val barrier = new CyclicBarrier(count)
def await(): Unit = await(TestBarrier.DefaultTimeout)
def await()(implicit app: AkkaApplication): Unit = await(TestBarrier.DefaultTimeout)
def await(timeout: Duration) {
def await(timeout: Duration)(implicit app: AkkaApplication) {
try {
barrier.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS)
barrier.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS)
} catch {
case e: TimeoutException
throw new TestBarrierTimeoutException("Timeout of %s and time factor of %s" format (timeout.toString, Duration.timeFactor))
throw new TestBarrierTimeoutException("Timeout of %s and time factor of %s"
format (timeout.toString, app.AkkaConfig.TestTimeFactor))
}
}

View file

@ -1,10 +1,36 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit
import akka.event.Logging.{ LogEvent, Error, InitializeLogger }
import akka.actor.Actor
import scala.util.matching.Regex
import akka.actor.Actor
import akka.event.Logging._
import akka.event.Logging
import akka.util.Duration
import akka.AkkaApplication
/**
* Implementation helpers of the EventFilter facilities: send `Mute`
* to the TestEventListener to install a filter, and `UnMute` to
* deinstall it.
*
* You should always prefer the filter methods in the package object
* (see [[akka.testkit]] `filterEvents` and `filterException`) or on the
* EventFilter implementations.
*/
sealed trait TestEvent
/**
* Implementation helpers of the EventFilter facilities: send <code>Mute</code>
* to the TestEventFilter to install a filter, and <code>UnMute</code> to
* deinstall it.
*
* You should always prefer the filter methods in the package object
* (see [[akka.testkit]] `filterEvents` and `filterException`) or on the
* EventFilter implementations.
*/
object TestEvent {
object Mute {
def apply(filter: EventFilter, filters: EventFilter*): Mute = new Mute(filter +: filters.toSeq)
@ -16,69 +42,398 @@ object TestEvent {
case class UnMute(filters: Seq[EventFilter]) extends TestEvent
}
trait EventFilter {
def apply(event: LogEvent): Boolean
/**
* Facilities for selectively filtering out expected events from logging so
* that you can keep your test runs console output clean and do not miss real
* error messages.
*
* See the companion object for convenient factory methods.
*
* If the `occurrences` is set to Int.MaxValue, no tracking is done.
*/
abstract class EventFilter(occurrences: Int) {
private var todo = occurrences
/**
* This method decides whether to filter the event (<code>true</code>) or not
* (<code>false</code>).
*/
protected def matches(event: LogEvent): Boolean
final def apply(event: LogEvent): Boolean = {
if (matches(event)) {
if (todo != Int.MaxValue) todo -= 1
true
} else false
}
def awaitDone(max: Duration): Boolean = {
if (todo != Int.MaxValue && todo > 0) TestKit.awaitCond(todo == 0, max)
todo == Int.MaxValue || todo == 0
}
/**
* Apply this filter while executing the given code block. Care is taken to
* remove the filter when the block is finished or aborted.
*/
def intercept[T](code: T)(implicit app: AkkaApplication): T = {
app.mainbus publish TestEvent.Mute(this)
try {
val result = code
if (!awaitDone(app.AkkaConfig.TestEventFilterLeeway))
throw new AssertionError("Timeout waiting for " + todo + " messages on " + this)
result
} finally app.mainbus publish TestEvent.UnMute(this)
}
/*
* these default values are just there for easier subclassing
*/
protected val source: Option[AnyRef] = None
protected val message: Either[String, Regex] = Left("")
protected val complete: Boolean = false
/**
* internal implementation helper, no guaranteed API
*/
protected def doMatch(src: AnyRef, msg: Any) = {
val msgstr = if (msg != null) msg.toString else "null"
(source.isDefined && sourceMatch(src) || source.isEmpty) &&
(message match {
case Left(s) if (complete) msgstr == s else msgstr.startsWith(s)
case Right(p) p.findFirstIn(msgstr).isDefined
})
}
private def sourceMatch(src: AnyRef) = {
source.get match {
case c: Class[_] c isInstance src
case s src == s
}
}
}
/**
* Facilities for selectively filtering out expected events from logging so
* that you can keep your test runs console output clean and do not miss real
* error messages.
*
* '''Also have a look at the [[akka.testkit]] package objects `filterEvents` and
* `filterException` methods.'''
*
* The source filters do accept `Class[_]` arguments, matching any
* object which is an instance of the given class, e.g.
*
* {{{
* EventFilter.info(source = classOf[MyActor]) // will match Info events from any MyActor instance
* }}}
*
* The message object will be converted to a string before matching (`"null"` if it is `null`).
*/
object EventFilter {
def apply[A <: Throwable: Manifest](): EventFilter =
ErrorFilter(manifest[A].erasure)
/**
* Create a filter for Error events. Give up to one of <code>start</code> and <code>pattern</code>:
*
* {{{
* EventFilter[MyException]() // filter only on exception type
* EventFilter[MyException]("message") // filter on exactly matching message
* EventFilter[MyException](source = obj) // filter on event source
* EventFilter[MyException](start = "Expected") // filter on start of message
* EventFilter[MyException](source = obj, pattern = "weird.*message") // filter on pattern and message
* }}}
*
* ''Please note that filtering on the `source` being
* `null` does NOT work (passing `null` disables the
* source filter).''
*/
def apply[A <: Throwable: Manifest](message: String = null, source: AnyRef = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
ErrorFilter(manifest[A].erasure, Option(source),
if (message ne null) Left(message) else Option(pattern) map (new Regex(_)) toRight start,
message ne null)(occurrences)
def apply[A <: Throwable: Manifest](message: String): EventFilter =
ErrorMessageFilter(manifest[A].erasure, message)
/**
* Create a filter for Warning events. Give up to one of <code>start</code> and <code>pattern</code>:
*
* {{{
* EventFilter.warning() // filter only on exception type
* EventFilter.warning(source = obj) // filter on event source
* EventFilter.warning(start = "Expected") // filter on start of message
* EventFilter.warning(source = obj, pattern = "weird.*message") // filter on pattern and message
* }}}
*
* ''Please note that filtering on the `source` being
* `null` does NOT work (passing `null` disables the
* source filter).''
*/
def warning(message: String = null, source: AnyRef = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
WarningFilter(Option(source),
if (message ne null) Left(message) else Option(pattern) map (new Regex(_)) toRight start,
message ne null)(occurrences)
def apply[A <: Throwable: Manifest](source: AnyRef): EventFilter =
ErrorSourceFilter(manifest[A].erasure, source)
/**
* Create a filter for Info events. Give up to one of <code>start</code> and <code>pattern</code>:
*
* {{{
* EventFilter.info() // filter only on exception type
* EventFilter.info(source = obj) // filter on event source
* EventFilter.info(start = "Expected") // filter on start of message
* EventFilter.info(source = obj, pattern = "weird.*message") // filter on pattern and message
* }}}
*
* ''Please note that filtering on the `source` being
* `null` does NOT work (passing `null` disables the
* source filter).''
*/
def info(message: String = null, source: AnyRef = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
InfoFilter(Option(source),
if (message ne null) Left(message) else Option(pattern) map (new Regex(_)) toRight start,
message ne null)(occurrences)
def apply[A <: Throwable: Manifest](source: AnyRef, message: String): EventFilter =
ErrorSourceMessageFilter(manifest[A].erasure, source, message)
/**
* Create a filter for Debug events. Give up to one of <code>start</code> and <code>pattern</code>:
*
* {{{
* EventFilter.debug() // filter only on exception type
* EventFilter.debug(source = obj) // filter on event source
* EventFilter.debug(start = "Expected") // filter on start of message
* EventFilter.debug(source = obj, pattern = "weird.*message") // filter on pattern and message
* }}}
*
* ''Please note that filtering on the `source` being
* `null` does NOT work (passing `null` disables the
* source filter).''
*/
def debug(message: String = null, source: AnyRef = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
DebugFilter(Option(source),
if (message ne null) Left(message) else Option(pattern) map (new Regex(_)) toRight start,
message ne null)(occurrences)
def custom(test: (LogEvent) Boolean): EventFilter =
CustomEventFilter(test)
/**
* Create a custom event filter. The filter will affect those events for
* which the supplied partial function is defined and returns
* `true`.
*
* {{{
* EventFilter.custom {
* case Warning(ref, "my warning") if ref == actor || ref == null => true
* }
* }}}
*/
def custom(test: PartialFunction[LogEvent, Boolean], occurrences: Int = Int.MaxValue): EventFilter =
CustomEventFilter(test)(occurrences)
}
case class ErrorFilter(throwable: Class[_]) extends EventFilter {
def apply(event: LogEvent) = event match {
case Error(cause, _, _) throwable isInstance cause
case _ false
/**
* Filter which matches Error events, if they satisfy the given criteria:
* <ul>
* <li><code>throwable</code> applies an upper bound on the type of exception contained in the Error event</li>
* <li><code>source</code>, if given, applies a filter on the events origin</li>
* <li><code>message</code> applies a filter on the events message (either
* with String.startsWith or Regex.findFirstIn().isDefined); if the message
* itself does not match, the match is retried with the contained Exceptions
* message; if both are <code>null</code>, the filter always matches if at
* the same time the Exceptions stack trace is empty (this catches
* JVM-omitted fast-throw exceptions)</li>
* </ul>
* If you want to match all Error events, the most efficient is to use <code>Left("")</code>.
*/
case class ErrorFilter(
throwable: Class[_],
override val source: Option[AnyRef],
override val message: Either[String, Regex],
override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) {
def matches(event: LogEvent) = {
event match {
case Error(cause, src, msg) if throwable isInstance cause
(msg == null && cause.getMessage == null && cause.getStackTrace.length == 0) ||
doMatch(src, msg) || doMatch(src, cause.getMessage)
case _ false
}
}
/**
* Java API
*
* @param source
* apply this filter only to events from the given source; do not filter on source if this is given as <code>null</code>
* @param message
* apply this filter only to events whose message matches; do not filter on message if this is given as <code>null</code>
* @param pattern
* if <code>false</code>, the message string must start with the given
* string, otherwise the <code>message</code> argument is treated as
* regular expression which is matched against the message (may match only
* a substring to filter)
* @param complete
* whether the events message must match the given message string or pattern completely
*/
def this(throwable: Class[_], source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
this(throwable, Option(source),
if (message eq null) Left("")
else if (pattern) Right(new Regex(message))
else Left(message),
complete)(occurrences)
/**
* Java API: filter only on the given type of exception
*/
def this(throwable: Class[_]) = this(throwable, null, null, false, false, Int.MaxValue)
}
/**
* Filter which matches Warning events, if they satisfy the given criteria:
* <ul>
* <li><code>source</code>, if given, applies a filter on the events origin</li>
* <li><code>message</code> applies a filter on the events message (either with String.startsWith or Regex.findFirstIn().isDefined)</li>
* </ul>
* If you want to match all Warning events, the most efficient is to use <code>Left("")</code>.
*/
case class WarningFilter(
override val source: Option[AnyRef],
override val message: Either[String, Regex],
override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) {
def matches(event: LogEvent) = {
event match {
case Warning(src, msg) doMatch(src, msg)
case _ false
}
}
/**
* Java API
*
* @param source
* apply this filter only to events from the given source; do not filter on source if this is given as <code>null</code>
* @param message
* apply this filter only to events whose message matches; do not filter on message if this is given as <code>null</code>
* @param pattern
* if <code>false</code>, the message string must start with the given
* string, otherwise the <code>message</code> argument is treated as
* regular expression which is matched against the message (may match only
* a substring to filter)
* @param complete
* whether the events message must match the given message string or pattern completely
*/
def this(source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
this(Option(source),
if (message eq null) Left("")
else if (pattern) Right(new Regex(message))
else Left(message),
complete)(occurrences)
}
/**
* Filter which matches Info events, if they satisfy the given criteria:
* <ul>
* <li><code>source</code>, if given, applies a filter on the events origin</li>
* <li><code>message</code> applies a filter on the events message (either with String.startsWith or Regex.findFirstIn().isDefined)</li>
* </ul>
* If you want to match all Info events, the most efficient is to use <code>Left("")</code>.
*/
case class InfoFilter(
override val source: Option[AnyRef],
override val message: Either[String, Regex],
override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) {
def matches(event: LogEvent) = {
event match {
case Info(src, msg) doMatch(src, msg)
case _ false
}
}
/**
* Java API
*
* @param source
* apply this filter only to events from the given source; do not filter on source if this is given as <code>null</code>
* @param message
* apply this filter only to events whose message matches; do not filter on message if this is given as <code>null</code>
* @param pattern
* if <code>false</code>, the message string must start with the given
* string, otherwise the <code>message</code> argument is treated as
* regular expression which is matched against the message (may match only
* a substring to filter)
* @param complete
* whether the events message must match the given message string or pattern completely
*/
def this(source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
this(Option(source),
if (message eq null) Left("")
else if (pattern) Right(new Regex(message))
else Left(message),
complete)(occurrences)
}
/**
* Filter which matches Debug events, if they satisfy the given criteria:
* <ul>
* <li><code>source</code>, if given, applies a filter on the events origin</li>
* <li><code>message</code> applies a filter on the events message (either with String.startsWith or Regex.findFirstIn().isDefined)</li>
* </ul>
* If you want to match all Debug events, the most efficient is to use <code>Left("")</code>.
*/
case class DebugFilter(
override val source: Option[AnyRef],
override val message: Either[String, Regex],
override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) {
def matches(event: LogEvent) = {
event match {
case Debug(src, msg) doMatch(src, msg)
case _ false
}
}
/**
* Java API
*
* @param source
* apply this filter only to events from the given source; do not filter on source if this is given as <code>null</code>
* @param message
* apply this filter only to events whose message matches; do not filter on message if this is given as <code>null</code>
* @param pattern
* if <code>false</code>, the message string must start with the given
* string, otherwise the <code>message</code> argument is treated as
* regular expression which is matched against the message (may match only
* a substring to filter)
* @param complete
* whether the events message must match the given message string or pattern completely
*/
def this(source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
this(Option(source),
if (message eq null) Left("")
else if (pattern) Right(new Regex(message))
else Left(message),
complete)(occurrences)
}
/**
* Custom event filter when the others do not fit the bill.
*
* If the partial function is defined and returns true, filter the event.
*/
case class CustomEventFilter(test: PartialFunction[LogEvent, Boolean])(occurrences: Int) extends EventFilter(occurrences) {
def matches(event: LogEvent) = {
test.isDefinedAt(event) && test(event)
}
}
case class ErrorMessageFilter(throwable: Class[_], message: String) extends EventFilter {
def apply(event: LogEvent) = event match {
case Error(cause, _, _) if !(throwable isInstance cause) false
case Error(cause, _, null) if cause.getMessage eq null cause.getStackTrace.length == 0
case Error(cause, _, null) cause.getMessage startsWith message
case Error(cause, _, msg)
(msg.toString startsWith message) || (cause.getMessage startsWith message)
case _ false
}
}
case class ErrorSourceFilter(throwable: Class[_], source: AnyRef) extends EventFilter {
def apply(event: LogEvent) = event match {
case Error(cause, instance, _) (throwable isInstance cause) && (source eq instance)
case _ false
}
}
case class ErrorSourceMessageFilter(throwable: Class[_], source: AnyRef, message: String) extends EventFilter {
def apply(event: LogEvent) = event match {
case Error(cause, instance, _) if !((throwable isInstance cause) && (source eq instance)) false
case Error(cause, _, null) if cause.getMessage eq null cause.getStackTrace.length == 0
case Error(cause, _, null) cause.getMessage startsWith message
case Error(cause, _, msg)
(msg.toString startsWith message) || (cause.getMessage startsWith message)
case _ false
}
}
case class CustomEventFilter(test: (LogEvent) Boolean) extends EventFilter {
def apply(event: LogEvent) = test(event)
}
class TestEventListener extends akka.event.Logging.DefaultLogger {
/**
* EventListener for running tests, which allows selectively filtering out
* expected messages. To use it, include something like this into
* <code>akka.test.conf</code> and run your tests with system property
* <code>"akka.mode"</code> set to <code>"test"</code>:
*
* <pre><code>
* akka {
* event-handlers = ["akka.testkit.TestEventListener"]
* }
* </code></pre>
*/
class TestEventListener extends Logging.DefaultLogger {
import TestEvent._
var filters: List[EventFilter] = Nil

View file

@ -452,12 +452,6 @@ class TestKit(_app: AkkaApplication) {
lastWasNoMsg = true
}
/**
* Same as `receiveWhile(remaining)(f)`, but correctly treating the timeFactor.
*/
@deprecated("insert empty first parameter list", "1.2")
def receiveWhile[T](f: PartialFunction[AnyRef, T]): Seq[T] = receiveWhile(remaining / Duration.timeFactor)(f)
/**
* Receive a series of messages until one does not match the given partial
* function or the idle timeout is met (disabled by default) or the overall

View file

@ -6,6 +6,7 @@ package akka.testkit
import akka.util.Duration
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.AkkaApplication
class TestLatchTimeoutException(message: String) extends RuntimeException(message)
class TestLatchNoTimeoutException(message: String) extends RuntimeException(message)
@ -20,10 +21,10 @@ class TestLatchNoTimeoutException(message: String) extends RuntimeException(mess
object TestLatch {
val DefaultTimeout = Duration(5, TimeUnit.SECONDS)
def apply(count: Int = 1) = new TestLatch(count)
def apply(count: Int = 1)(implicit app: AkkaApplication) = new TestLatch(count)
}
class TestLatch(count: Int = 1) {
class TestLatch(count: Int = 1)(implicit app: AkkaApplication) {
private var latch = new CountDownLatch(count)
def countDown() = latch.countDown()
@ -33,9 +34,9 @@ class TestLatch(count: Int = 1) {
def await(): Boolean = await(TestLatch.DefaultTimeout)
def await(timeout: Duration): Boolean = {
val opened = latch.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS)
val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS)
if (!opened) throw new TestLatchTimeoutException(
"Timeout of %s with time factor of %s" format (timeout.toString, Duration.timeFactor))
"Timeout of %s with time factor of %s" format (timeout.toString, app.AkkaConfig.TestTimeFactor))
opened
}
@ -43,9 +44,9 @@ class TestLatch(count: Int = 1) {
* Timeout is expected. Throws exception if latch is opened before timeout.
*/
def awaitTimeout(timeout: Duration = TestLatch.DefaultTimeout) = {
val opened = latch.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS)
val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS)
if (opened) throw new TestLatchNoTimeoutException(
"Latch opened before timeout of %s with time factor of %s" format (timeout.toString, Duration.timeFactor))
"Latch opened before timeout of %s with time factor of %s" format (timeout.toString, app.AkkaConfig.TestTimeFactor))
opened
}

View file

@ -1,21 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit
import akka.util.Duration
import Duration.timeFactor
/**
* Multiplying numbers used in test timeouts by a factor, set by system property.
* Useful for Jenkins builds (where the machine may need more time).
*/
object Testing {
def testTime(t: Int): Int = (timeFactor * t).toInt
def testTime(t: Long): Long = (timeFactor * t).toLong
def testTime(t: Float): Float = (timeFactor * t).toFloat
def testTime(t: Double): Double = timeFactor * t
def sleepFor(duration: Duration) = Thread.sleep(testTime(duration.toMillis))
}

View file

@ -1,10 +1,22 @@
package akka
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
package object testkit {
def filterEvents[T](eventFilters: Iterable[EventFilter])(block: T)(implicit app: AkkaApplication): T = {
def now = System.currentTimeMillis
app.mainbus.publish(TestEvent.Mute(eventFilters.toSeq))
try {
block
val result = block
val stop = now + app.AkkaConfig.TestEventFilterLeeway.toMillis
val failed = eventFilters filterNot (_.awaitDone(Duration(stop - now, MILLISECONDS))) map ("Timeout waiting for " + _)
if (failed.nonEmpty)
throw new AssertionError("Filter completion error:\n" + failed.mkString("\n"))
result
} finally {
app.mainbus.publish(TestEvent.UnMute(eventFilters.toSeq))
}
@ -12,5 +24,5 @@ package object testkit {
def filterEvents[T](eventFilters: EventFilter*)(block: T)(implicit app: AkkaApplication): T = filterEvents(eventFilters.toSeq)(block)
def filterException[T <: Throwable](block: Unit)(implicit app: AkkaApplication, m: Manifest[T]): Unit = filterEvents(Seq(EventFilter[T]))(block)
def filterException[T <: Throwable](block: Unit)(implicit app: AkkaApplication, m: Manifest[T]): Unit = EventFilter[T]() intercept (block)
}

View file

@ -156,7 +156,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach {
}
"stop when sent a poison pill" in {
filterEvents(EventFilter[ActorKilledException]) {
EventFilter[ActorKilledException]() intercept {
val a = TestActorRef(Props[WorkerActor])
intercept[ActorKilledException] {
(a ? PoisonPill).get
@ -167,7 +167,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach {
}
"restart when Kill:ed" in {
filterEvents(EventFilter[ActorKilledException]) {
EventFilter[ActorKilledException]() intercept {
counter = 2
val boss = TestActorRef(Props(new TActor {

View file

@ -3,23 +3,10 @@ package akka.testkit
import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterEach, WordSpec }
import akka.util.Duration
import akka.config.Configuration
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TestTimeSpec extends AkkaSpec with BeforeAndAfterEach {
val tf = Duration.timeFactor
override def beforeEach {
val f = Duration.getClass.getDeclaredField("timeFactor")
f.setAccessible(true)
f.setDouble(Duration, 2.0)
}
override def afterEach {
val f = Duration.getClass.getDeclaredField("timeFactor")
f.setAccessible(true)
f.setDouble(Duration, tf)
}
class TestTimeSpec extends AkkaSpec(Configuration("akka.test.timefactor" -> 2.0)) with BeforeAndAfterEach {
"A TestKit" must {