diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index e2dc9f098e..cb30653d8f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -96,7 +96,7 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender { "receive and cancel a repeated timer" in { fsm ! TestRepeatedTimer - val seq = receiveWhile(1 second) { + val seq = receiveWhile(2 seconds) { case Tick ⇒ Tick } seq must have length 5 @@ -184,8 +184,9 @@ object FSMTimingSpec { setTimer("hallo", Tock, 1 milli, false) TestKit.awaitCond(context.hasMessages, 1 second) cancelTimer("hallo") + sender ! Tick setTimer("hallo", Tock, 500 millis, false) - stay replying Tick + stay case Ev(Tock) ⇒ tester ! Tock stay diff --git a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala index afd255a27c..6e2f672507 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala @@ -173,7 +173,7 @@ object ScanningEventBusSpec { class MyScanningEventBus extends ScanningEventBus[Int, akka.japi.Procedure[Int], String] { protected def compareClassifiers(a: Classifier, b: Classifier): Int = a compareTo b - protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = System.identityHashCode(a) - System.identityHashCode(b) + protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = akka.util.Helpers.compareIdentityHash(a, b) protected def matches(classifier: Classifier, event: Event): Boolean = event.toString == classifier @@ -200,7 +200,7 @@ class ScanningEventBusSpec extends EventBusSpec("ScanningEventBus") { object LookupEventBusSpec { class MyLookupEventBus extends akka.event.japi.LookupEventBus[Int, akka.japi.Procedure[Int], String] { protected def classify(event: Event): Classifier = event.toString - protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = System.identityHashCode(a) - System.identityHashCode(b) + protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = akka.util.Helpers.compareIdentityHash(a, b) protected def mapSize = 32 protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber(event) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index f81fae21e0..521e3d3d7e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -95,6 +95,7 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF try java.lang.Double.parseDouble(System.getProperty("akka.test.timefactor")) catch { case _: Exception ⇒ getDouble("akka.test.timefactor", 1.0) } + val SingleExpectDefaultTimeout = Duration(getDouble("akka.test.single-expect-default", 1), DefaultTimeUnit) val TestEventFilterLeeway = Duration(getDouble("akka.test.filter-leeway", 0.5), DefaultTimeUnit) val LogLevel = getString("akka.loglevel", "INFO") diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 144110db4b..2611ae6926 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -222,8 +222,9 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { if (mailBox.hasSystemMessages) { var message = mailBox.systemDrain() while (message ne null) { + val next = message.next deadLetterMailbox.systemEnqueue(message) - message = message.next + message = next } } diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index a366a246b6..33545d551f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -37,18 +37,7 @@ class BalancingDispatcher( _timeoutMs: Long) extends Dispatcher(_app, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) { - private val buddies = new ConcurrentSkipListSet[ActorCell]( - new Comparator[ActorCell] { - def compare(a: ActorCell, b: ActorCell): Int = { - /* - * make sure that there is no overflow or underflow in comparisons, so - * that the ordering is actually consistent and you cannot have a - * sequence which cyclically is monotone without end. - */ - val diff = ((System.identityHashCode(a) & 0xffffffffL) - (System.identityHashCode(b) & 0xffffffffL)) - if (diff > 0) 1 else if (diff < 0) -1 else 0 - } - }) + private val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator) protected val messageQueue: MessageQueue = mailboxType match { case u: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index ae54c5bf50..34729914c2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -244,6 +244,7 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒ @tailrec final def systemEnqueue(message: SystemMessage): Unit = { + assert(message.next eq null) if (Mailbox.debug) println(actor + " having enqueued " + message) val head = systemQueueGet /* @@ -253,7 +254,10 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒ * Hence, SystemMessage.next does not need to be volatile. */ message.next = head - if (!systemQueuePut(head, message)) systemEnqueue(message) + if (!systemQueuePut(head, message)) { + message.next = null + systemEnqueue(message) + } } @tailrec diff --git a/akka-actor/src/main/scala/akka/util/Duration.scala b/akka-actor/src/main/scala/akka/util/Duration.scala index 26d863d5a9..fbb27526f1 100644 --- a/akka-actor/src/main/scala/akka/util/Duration.scala +++ b/akka-actor/src/main/scala/akka/util/Duration.scala @@ -121,6 +121,20 @@ object Duration { } val Zero: Duration = new FiniteDuration(0, NANOSECONDS) + val Undefined: Duration = new Duration with Infinite { + override def toString = "Duration.Undefined" + override def equals(other: Any) = other.asInstanceOf[AnyRef] eq this + override def +(other: Duration): Duration = throw new IllegalArgumentException("cannot add Undefined duration") + override def -(other: Duration): Duration = throw new IllegalArgumentException("cannot subtract Undefined duration") + override def *(factor: Double): Duration = throw new IllegalArgumentException("cannot multiply Undefined duration") + override def /(factor: Double): Duration = throw new IllegalArgumentException("cannot divide Undefined duration") + override def /(other: Duration): Double = throw new IllegalArgumentException("cannot divide Undefined duration") + def >(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration") + def >=(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration") + def <(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration") + def <=(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration") + def unary_- : Duration = throw new IllegalArgumentException("cannot negate Undefined duration") + } trait Infinite { this: Duration ⇒ diff --git a/akka-actor/src/main/scala/akka/util/Helpers.scala b/akka-actor/src/main/scala/akka/util/Helpers.scala index 0975081d97..14af35523c 100644 --- a/akka-actor/src/main/scala/akka/util/Helpers.scala +++ b/akka-actor/src/main/scala/akka/util/Helpers.scala @@ -4,6 +4,7 @@ package akka.util import java.io.{ PrintWriter, StringWriter } +import java.util.Comparator /** * @author Jonas Bonér @@ -12,6 +13,20 @@ object Helpers { implicit def null2Option[T](t: T): Option[T] = Option(t) + def compareIdentityHash(a: AnyRef, b: AnyRef): Int = { + /* + * make sure that there is no overflow or underflow in comparisons, so + * that the ordering is actually consistent and you cannot have a + * sequence which cyclically is monotone without end. + */ + val diff = ((System.identityHashCode(a) & 0xffffffffL) - (System.identityHashCode(b) & 0xffffffffL)) + if (diff > 0) 1 else if (diff < 0) -1 else 0 + } + + val IdentityHashComparator = new Comparator[AnyRef] { + def compare(a: AnyRef, b: AnyRef): Int = compareIdentityHash(a, b) + } + def intToBytes(value: Int): Array[Byte] = { val bytes = new Array[Byte](4) bytes(0) = (value >>> 24).asInstanceOf[Byte] diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index d939302100..c6facdda52 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -102,7 +102,7 @@ class TestKit(_app: ActorSystem) { val testActor: ActorRef = app.systemActorOf(Props(new TestActor(queue)).copy(dispatcher = new CallingThreadDispatcher(app)), "testActor" + TestKit.testActorId.incrementAndGet) - private var end: Duration = Duration.Inf + private var end: Duration = Duration.Undefined /** * if last assertion was expectNoMsg, disable timing failure upon within() @@ -147,9 +147,11 @@ class TestKit(_app: ActorSystem) { def now: Duration = System.nanoTime.nanos /** - * Obtain time remaining for execution of the innermost enclosing `within` block. + * Obtain time remaining for execution of the innermost enclosing `within` + * block or missing that it returns the properly dilated default for this + * case from AkkaConfig (key "akka.test.single-expect-default"). */ - def remaining: Duration = end - now + def remaining: Duration = if (end == Duration.Undefined) app.AkkaConfig.SingleExpectDefaultTimeout.dilated else end - now /** * Query queue status. @@ -165,8 +167,8 @@ class TestKit(_app: ActorSystem) { * * Note that the timeout is scaled using Duration.timeFactor. */ - def awaitCond(p: ⇒ Boolean, max: Duration = Duration.MinusInf, interval: Duration = 100.millis) { - val _max = if (max eq Duration.MinusInf) remaining else max.dilated + def awaitCond(p: ⇒ Boolean, max: Duration = Duration.Undefined, interval: Duration = 100.millis) { + val _max = if (max eq Duration.Undefined) remaining else max.dilated val stop = now + _max @tailrec @@ -200,7 +202,7 @@ class TestKit(_app: ActorSystem) { def within[T](min: Duration, max: Duration)(f: ⇒ T): T = { val _max = max.dilated val start = now - val rem = end - start + val rem = if (end == Duration.Undefined) Duration.Inf else end - start assert(rem >= min, "required min time " + min + " not possible, only " + format(min.unit, rem) + " left") lastWasNoMsg = false @@ -241,7 +243,7 @@ class TestKit(_app: ActorSystem) { private def expectMsg_internal[T](max: Duration, obj: T): T = { val o = receiveOne(max) - assert(o ne null, "timeout during expectMsg while waiting for " + obj) + assert(o ne null, "timeout (" + max + ") during expectMsg while waiting for " + obj) assert(obj == o, "expected " + obj + ", found " + o) o.asInstanceOf[T] } @@ -256,10 +258,10 @@ class TestKit(_app: ActorSystem) { * * @return the received object as transformed by the partial function */ - def expectMsgPF[T](max: Duration = Duration.MinusInf, hint: String = "")(f: PartialFunction[Any, T]): T = { - val _max = if (max eq Duration.MinusInf) remaining else max.dilated + def expectMsgPF[T](max: Duration = Duration.Undefined, hint: String = "")(f: PartialFunction[Any, T]): T = { + val _max = if (max eq Duration.Undefined) remaining else max.dilated val o = receiveOne(_max) - assert(o ne null, "timeout during expectMsg: " + hint) + assert(o ne null, "timeout (" + max + ") during expectMsg: " + hint) assert(f.isDefinedAt(o), "expected: " + hint + " but got unexpected message " + o) f(o) } @@ -272,13 +274,13 @@ class TestKit(_app: ActorSystem) { * @return the last received messsage, i.e. the first one for which the * partial function returned true */ - def fishForMessage(max: Duration = Duration.MinusInf, hint: String = "")(f: PartialFunction[Any, Boolean]): Any = { - val _max = if (max eq Duration.MinusInf) remaining else max.dilated + def fishForMessage(max: Duration = Duration.Undefined, hint: String = "")(f: PartialFunction[Any, Boolean]): Any = { + val _max = if (max eq Duration.Undefined) remaining else max.dilated val end = now + _max @tailrec def recv: Any = { val o = receiveOne(end - now) - assert(o ne null, "timeout during fishForMessage, hint: " + hint) + assert(o ne null, "timeout (" + max + ") during fishForMessage, hint: " + hint) assert(f.isDefinedAt(o), "fishForMessage(" + hint + ") found unexpected message " + o) if (f(o)) o else recv } @@ -315,7 +317,7 @@ class TestKit(_app: ActorSystem) { private def expectMsgClass_internal[C](max: Duration, c: Class[C]): C = { val o = receiveOne(max) - assert(o ne null, "timeout during expectMsgClass waiting for " + c) + assert(o ne null, "timeout (" + max + ") during expectMsgClass waiting for " + c) assert(c isInstance o, "expected " + c + ", found " + o.getClass) o.asInstanceOf[C] } @@ -336,7 +338,7 @@ class TestKit(_app: ActorSystem) { private def expectMsgAnyOf_internal[T](max: Duration, obj: T*): T = { val o = receiveOne(max) - assert(o ne null, "timeout during expectMsgAnyOf waiting for " + obj.mkString("(", ", ", ")")) + assert(o ne null, "timeout (" + max + ") during expectMsgAnyOf waiting for " + obj.mkString("(", ", ", ")")) assert(obj exists (_ == o), "found unexpected " + o) o.asInstanceOf[T] } @@ -357,7 +359,7 @@ class TestKit(_app: ActorSystem) { private def expectMsgAnyClassOf_internal[C](max: Duration, obj: Class[_ <: C]*): C = { val o = receiveOne(max) - assert(o ne null, "timeout during expectMsgAnyClassOf waiting for " + obj.mkString("(", ", ", ")")) + assert(o ne null, "timeout (" + max + ") during expectMsgAnyClassOf waiting for " + obj.mkString("(", ", ", ")")) assert(obj exists (_ isInstance o), "found unexpected " + o) o.asInstanceOf[C] } @@ -470,8 +472,8 @@ class TestKit(_app: ActorSystem) { * assert(series == (1 to 7).toList) * */ - def receiveWhile[T](max: Duration = Duration.MinusInf, idle: Duration = Duration.Inf, messages: Int = Int.MaxValue)(f: PartialFunction[AnyRef, T]): Seq[T] = { - val stop = now + (if (max eq Duration.MinusInf) remaining else max.dilated) + def receiveWhile[T](max: Duration = Duration.Undefined, idle: Duration = Duration.Inf, messages: Int = Int.MaxValue)(f: PartialFunction[AnyRef, T]): Seq[T] = { + val stop = now + (if (max eq Duration.Undefined) remaining else max.dilated) var msg: Message = NullMessage @tailrec @@ -513,7 +515,7 @@ class TestKit(_app: ActorSystem) { for { x ← 1 to n } yield { val timeout = stop - now val o = receiveOne(timeout) - assert(o ne null, "timeout while expecting " + n + " messages") + assert(o ne null, "timeout (" + max + ") while expecting " + n + " messages") o } } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestTimeSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestTimeSpec.scala index a8b50bc43a..3dd2415e20 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestTimeSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestTimeSpec.scala @@ -15,8 +15,9 @@ class TestTimeSpec extends AkkaSpec(Configuration("akka.test.timefactor" -> 2.0) val now = System.nanoTime intercept[AssertionError] { probe.awaitCond(false, Duration("1 second")) } val diff = System.nanoTime - now - diff must be > 1700000000l - diff must be < 3000000000l + val target = (1000000000l * app.AkkaConfig.TestTimeFactor).toLong + diff must be > (target - 300000000l) + diff must be < (target + 1000000000l) } }