Merge branch 'master' of github.com:jboner/akka
This commit is contained in:
commit
f0fa7bccb0
10 changed files with 67 additions and 39 deletions
|
|
@ -96,7 +96,7 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender {
|
|||
|
||||
"receive and cancel a repeated timer" in {
|
||||
fsm ! TestRepeatedTimer
|
||||
val seq = receiveWhile(1 second) {
|
||||
val seq = receiveWhile(2 seconds) {
|
||||
case Tick ⇒ Tick
|
||||
}
|
||||
seq must have length 5
|
||||
|
|
@ -184,8 +184,9 @@ object FSMTimingSpec {
|
|||
setTimer("hallo", Tock, 1 milli, false)
|
||||
TestKit.awaitCond(context.hasMessages, 1 second)
|
||||
cancelTimer("hallo")
|
||||
sender ! Tick
|
||||
setTimer("hallo", Tock, 500 millis, false)
|
||||
stay replying Tick
|
||||
stay
|
||||
case Ev(Tock) ⇒
|
||||
tester ! Tock
|
||||
stay
|
||||
|
|
|
|||
|
|
@ -173,7 +173,7 @@ object ScanningEventBusSpec {
|
|||
|
||||
class MyScanningEventBus extends ScanningEventBus[Int, akka.japi.Procedure[Int], String] {
|
||||
protected def compareClassifiers(a: Classifier, b: Classifier): Int = a compareTo b
|
||||
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = System.identityHashCode(a) - System.identityHashCode(b)
|
||||
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = akka.util.Helpers.compareIdentityHash(a, b)
|
||||
|
||||
protected def matches(classifier: Classifier, event: Event): Boolean = event.toString == classifier
|
||||
|
||||
|
|
@ -200,7 +200,7 @@ class ScanningEventBusSpec extends EventBusSpec("ScanningEventBus") {
|
|||
object LookupEventBusSpec {
|
||||
class MyLookupEventBus extends akka.event.japi.LookupEventBus[Int, akka.japi.Procedure[Int], String] {
|
||||
protected def classify(event: Event): Classifier = event.toString
|
||||
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = System.identityHashCode(a) - System.identityHashCode(b)
|
||||
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = akka.util.Helpers.compareIdentityHash(a, b)
|
||||
protected def mapSize = 32
|
||||
protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber(event)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -95,6 +95,7 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF
|
|||
try java.lang.Double.parseDouble(System.getProperty("akka.test.timefactor")) catch {
|
||||
case _: Exception ⇒ getDouble("akka.test.timefactor", 1.0)
|
||||
}
|
||||
val SingleExpectDefaultTimeout = Duration(getDouble("akka.test.single-expect-default", 1), DefaultTimeUnit)
|
||||
val TestEventFilterLeeway = Duration(getDouble("akka.test.filter-leeway", 0.5), DefaultTimeUnit)
|
||||
|
||||
val LogLevel = getString("akka.loglevel", "INFO")
|
||||
|
|
|
|||
|
|
@ -222,8 +222,9 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
|
|||
if (mailBox.hasSystemMessages) {
|
||||
var message = mailBox.systemDrain()
|
||||
while (message ne null) {
|
||||
val next = message.next
|
||||
deadLetterMailbox.systemEnqueue(message)
|
||||
message = message.next
|
||||
message = next
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -37,18 +37,7 @@ class BalancingDispatcher(
|
|||
_timeoutMs: Long)
|
||||
extends Dispatcher(_app, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) {
|
||||
|
||||
private val buddies = new ConcurrentSkipListSet[ActorCell](
|
||||
new Comparator[ActorCell] {
|
||||
def compare(a: ActorCell, b: ActorCell): Int = {
|
||||
/*
|
||||
* make sure that there is no overflow or underflow in comparisons, so
|
||||
* that the ordering is actually consistent and you cannot have a
|
||||
* sequence which cyclically is monotone without end.
|
||||
*/
|
||||
val diff = ((System.identityHashCode(a) & 0xffffffffL) - (System.identityHashCode(b) & 0xffffffffL))
|
||||
if (diff > 0) 1 else if (diff < 0) -1 else 0
|
||||
}
|
||||
})
|
||||
private val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator)
|
||||
|
||||
protected val messageQueue: MessageQueue = mailboxType match {
|
||||
case u: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
|
||||
|
|
|
|||
|
|
@ -244,6 +244,7 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒
|
|||
|
||||
@tailrec
|
||||
final def systemEnqueue(message: SystemMessage): Unit = {
|
||||
assert(message.next eq null)
|
||||
if (Mailbox.debug) println(actor + " having enqueued " + message)
|
||||
val head = systemQueueGet
|
||||
/*
|
||||
|
|
@ -253,7 +254,10 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒
|
|||
* Hence, SystemMessage.next does not need to be volatile.
|
||||
*/
|
||||
message.next = head
|
||||
if (!systemQueuePut(head, message)) systemEnqueue(message)
|
||||
if (!systemQueuePut(head, message)) {
|
||||
message.next = null
|
||||
systemEnqueue(message)
|
||||
}
|
||||
}
|
||||
|
||||
@tailrec
|
||||
|
|
|
|||
|
|
@ -121,6 +121,20 @@ object Duration {
|
|||
}
|
||||
|
||||
val Zero: Duration = new FiniteDuration(0, NANOSECONDS)
|
||||
val Undefined: Duration = new Duration with Infinite {
|
||||
override def toString = "Duration.Undefined"
|
||||
override def equals(other: Any) = other.asInstanceOf[AnyRef] eq this
|
||||
override def +(other: Duration): Duration = throw new IllegalArgumentException("cannot add Undefined duration")
|
||||
override def -(other: Duration): Duration = throw new IllegalArgumentException("cannot subtract Undefined duration")
|
||||
override def *(factor: Double): Duration = throw new IllegalArgumentException("cannot multiply Undefined duration")
|
||||
override def /(factor: Double): Duration = throw new IllegalArgumentException("cannot divide Undefined duration")
|
||||
override def /(other: Duration): Double = throw new IllegalArgumentException("cannot divide Undefined duration")
|
||||
def >(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
|
||||
def >=(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
|
||||
def <(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
|
||||
def <=(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
|
||||
def unary_- : Duration = throw new IllegalArgumentException("cannot negate Undefined duration")
|
||||
}
|
||||
|
||||
trait Infinite {
|
||||
this: Duration ⇒
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
package akka.util
|
||||
|
||||
import java.io.{ PrintWriter, StringWriter }
|
||||
import java.util.Comparator
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
|
|
@ -12,6 +13,20 @@ object Helpers {
|
|||
|
||||
implicit def null2Option[T](t: T): Option[T] = Option(t)
|
||||
|
||||
def compareIdentityHash(a: AnyRef, b: AnyRef): Int = {
|
||||
/*
|
||||
* make sure that there is no overflow or underflow in comparisons, so
|
||||
* that the ordering is actually consistent and you cannot have a
|
||||
* sequence which cyclically is monotone without end.
|
||||
*/
|
||||
val diff = ((System.identityHashCode(a) & 0xffffffffL) - (System.identityHashCode(b) & 0xffffffffL))
|
||||
if (diff > 0) 1 else if (diff < 0) -1 else 0
|
||||
}
|
||||
|
||||
val IdentityHashComparator = new Comparator[AnyRef] {
|
||||
def compare(a: AnyRef, b: AnyRef): Int = compareIdentityHash(a, b)
|
||||
}
|
||||
|
||||
def intToBytes(value: Int): Array[Byte] = {
|
||||
val bytes = new Array[Byte](4)
|
||||
bytes(0) = (value >>> 24).asInstanceOf[Byte]
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ class TestKit(_app: ActorSystem) {
|
|||
val testActor: ActorRef = app.systemActorOf(Props(new TestActor(queue)).copy(dispatcher = new CallingThreadDispatcher(app)),
|
||||
"testActor" + TestKit.testActorId.incrementAndGet)
|
||||
|
||||
private var end: Duration = Duration.Inf
|
||||
private var end: Duration = Duration.Undefined
|
||||
|
||||
/**
|
||||
* if last assertion was expectNoMsg, disable timing failure upon within()
|
||||
|
|
@ -147,9 +147,11 @@ class TestKit(_app: ActorSystem) {
|
|||
def now: Duration = System.nanoTime.nanos
|
||||
|
||||
/**
|
||||
* Obtain time remaining for execution of the innermost enclosing `within` block.
|
||||
* Obtain time remaining for execution of the innermost enclosing `within`
|
||||
* block or missing that it returns the properly dilated default for this
|
||||
* case from AkkaConfig (key "akka.test.single-expect-default").
|
||||
*/
|
||||
def remaining: Duration = end - now
|
||||
def remaining: Duration = if (end == Duration.Undefined) app.AkkaConfig.SingleExpectDefaultTimeout.dilated else end - now
|
||||
|
||||
/**
|
||||
* Query queue status.
|
||||
|
|
@ -165,8 +167,8 @@ class TestKit(_app: ActorSystem) {
|
|||
*
|
||||
* Note that the timeout is scaled using Duration.timeFactor.
|
||||
*/
|
||||
def awaitCond(p: ⇒ Boolean, max: Duration = Duration.MinusInf, interval: Duration = 100.millis) {
|
||||
val _max = if (max eq Duration.MinusInf) remaining else max.dilated
|
||||
def awaitCond(p: ⇒ Boolean, max: Duration = Duration.Undefined, interval: Duration = 100.millis) {
|
||||
val _max = if (max eq Duration.Undefined) remaining else max.dilated
|
||||
val stop = now + _max
|
||||
|
||||
@tailrec
|
||||
|
|
@ -200,7 +202,7 @@ class TestKit(_app: ActorSystem) {
|
|||
def within[T](min: Duration, max: Duration)(f: ⇒ T): T = {
|
||||
val _max = max.dilated
|
||||
val start = now
|
||||
val rem = end - start
|
||||
val rem = if (end == Duration.Undefined) Duration.Inf else end - start
|
||||
assert(rem >= min, "required min time " + min + " not possible, only " + format(min.unit, rem) + " left")
|
||||
|
||||
lastWasNoMsg = false
|
||||
|
|
@ -241,7 +243,7 @@ class TestKit(_app: ActorSystem) {
|
|||
|
||||
private def expectMsg_internal[T](max: Duration, obj: T): T = {
|
||||
val o = receiveOne(max)
|
||||
assert(o ne null, "timeout during expectMsg while waiting for " + obj)
|
||||
assert(o ne null, "timeout (" + max + ") during expectMsg while waiting for " + obj)
|
||||
assert(obj == o, "expected " + obj + ", found " + o)
|
||||
o.asInstanceOf[T]
|
||||
}
|
||||
|
|
@ -256,10 +258,10 @@ class TestKit(_app: ActorSystem) {
|
|||
*
|
||||
* @return the received object as transformed by the partial function
|
||||
*/
|
||||
def expectMsgPF[T](max: Duration = Duration.MinusInf, hint: String = "")(f: PartialFunction[Any, T]): T = {
|
||||
val _max = if (max eq Duration.MinusInf) remaining else max.dilated
|
||||
def expectMsgPF[T](max: Duration = Duration.Undefined, hint: String = "")(f: PartialFunction[Any, T]): T = {
|
||||
val _max = if (max eq Duration.Undefined) remaining else max.dilated
|
||||
val o = receiveOne(_max)
|
||||
assert(o ne null, "timeout during expectMsg: " + hint)
|
||||
assert(o ne null, "timeout (" + max + ") during expectMsg: " + hint)
|
||||
assert(f.isDefinedAt(o), "expected: " + hint + " but got unexpected message " + o)
|
||||
f(o)
|
||||
}
|
||||
|
|
@ -272,13 +274,13 @@ class TestKit(_app: ActorSystem) {
|
|||
* @return the last received messsage, i.e. the first one for which the
|
||||
* partial function returned true
|
||||
*/
|
||||
def fishForMessage(max: Duration = Duration.MinusInf, hint: String = "")(f: PartialFunction[Any, Boolean]): Any = {
|
||||
val _max = if (max eq Duration.MinusInf) remaining else max.dilated
|
||||
def fishForMessage(max: Duration = Duration.Undefined, hint: String = "")(f: PartialFunction[Any, Boolean]): Any = {
|
||||
val _max = if (max eq Duration.Undefined) remaining else max.dilated
|
||||
val end = now + _max
|
||||
@tailrec
|
||||
def recv: Any = {
|
||||
val o = receiveOne(end - now)
|
||||
assert(o ne null, "timeout during fishForMessage, hint: " + hint)
|
||||
assert(o ne null, "timeout (" + max + ") during fishForMessage, hint: " + hint)
|
||||
assert(f.isDefinedAt(o), "fishForMessage(" + hint + ") found unexpected message " + o)
|
||||
if (f(o)) o else recv
|
||||
}
|
||||
|
|
@ -315,7 +317,7 @@ class TestKit(_app: ActorSystem) {
|
|||
|
||||
private def expectMsgClass_internal[C](max: Duration, c: Class[C]): C = {
|
||||
val o = receiveOne(max)
|
||||
assert(o ne null, "timeout during expectMsgClass waiting for " + c)
|
||||
assert(o ne null, "timeout (" + max + ") during expectMsgClass waiting for " + c)
|
||||
assert(c isInstance o, "expected " + c + ", found " + o.getClass)
|
||||
o.asInstanceOf[C]
|
||||
}
|
||||
|
|
@ -336,7 +338,7 @@ class TestKit(_app: ActorSystem) {
|
|||
|
||||
private def expectMsgAnyOf_internal[T](max: Duration, obj: T*): T = {
|
||||
val o = receiveOne(max)
|
||||
assert(o ne null, "timeout during expectMsgAnyOf waiting for " + obj.mkString("(", ", ", ")"))
|
||||
assert(o ne null, "timeout (" + max + ") during expectMsgAnyOf waiting for " + obj.mkString("(", ", ", ")"))
|
||||
assert(obj exists (_ == o), "found unexpected " + o)
|
||||
o.asInstanceOf[T]
|
||||
}
|
||||
|
|
@ -357,7 +359,7 @@ class TestKit(_app: ActorSystem) {
|
|||
|
||||
private def expectMsgAnyClassOf_internal[C](max: Duration, obj: Class[_ <: C]*): C = {
|
||||
val o = receiveOne(max)
|
||||
assert(o ne null, "timeout during expectMsgAnyClassOf waiting for " + obj.mkString("(", ", ", ")"))
|
||||
assert(o ne null, "timeout (" + max + ") during expectMsgAnyClassOf waiting for " + obj.mkString("(", ", ", ")"))
|
||||
assert(obj exists (_ isInstance o), "found unexpected " + o)
|
||||
o.asInstanceOf[C]
|
||||
}
|
||||
|
|
@ -470,8 +472,8 @@ class TestKit(_app: ActorSystem) {
|
|||
* assert(series == (1 to 7).toList)
|
||||
* </pre>
|
||||
*/
|
||||
def receiveWhile[T](max: Duration = Duration.MinusInf, idle: Duration = Duration.Inf, messages: Int = Int.MaxValue)(f: PartialFunction[AnyRef, T]): Seq[T] = {
|
||||
val stop = now + (if (max eq Duration.MinusInf) remaining else max.dilated)
|
||||
def receiveWhile[T](max: Duration = Duration.Undefined, idle: Duration = Duration.Inf, messages: Int = Int.MaxValue)(f: PartialFunction[AnyRef, T]): Seq[T] = {
|
||||
val stop = now + (if (max eq Duration.Undefined) remaining else max.dilated)
|
||||
var msg: Message = NullMessage
|
||||
|
||||
@tailrec
|
||||
|
|
@ -513,7 +515,7 @@ class TestKit(_app: ActorSystem) {
|
|||
for { x ← 1 to n } yield {
|
||||
val timeout = stop - now
|
||||
val o = receiveOne(timeout)
|
||||
assert(o ne null, "timeout while expecting " + n + " messages")
|
||||
assert(o ne null, "timeout (" + max + ") while expecting " + n + " messages")
|
||||
o
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,8 +15,9 @@ class TestTimeSpec extends AkkaSpec(Configuration("akka.test.timefactor" -> 2.0)
|
|||
val now = System.nanoTime
|
||||
intercept[AssertionError] { probe.awaitCond(false, Duration("1 second")) }
|
||||
val diff = System.nanoTime - now
|
||||
diff must be > 1700000000l
|
||||
diff must be < 3000000000l
|
||||
val target = (1000000000l * app.AkkaConfig.TestTimeFactor).toLong
|
||||
diff must be > (target - 300000000l)
|
||||
diff must be < (target + 1000000000l)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue