diff --git a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala index 0c2995eaa2..65bff78409 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala @@ -13,6 +13,7 @@ import java.util.concurrent.TimeoutException object ReceiveTimeoutSpec { case object Tick + case object TransperentTick extends NotInfluenceReceiveTimeout } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -89,5 +90,24 @@ class ReceiveTimeoutSpec extends AkkaSpec { intercept[TimeoutException] { Await.ready(timeoutLatch, 1 second) } system.stop(timeoutActor) } + + "get timeout while receiving NotInfluenceReceiveTimeout messages" in { + val timeoutLatch = TestLatch() + + val timeoutActor = system.actorOf(Props(new Actor { + context.setReceiveTimeout(1 second) + + def receive = { + case ReceiveTimeout ⇒ timeoutLatch.open + case TransperentTick ⇒ + } + })) + + val ticks = system.scheduler.schedule(100.millis, 100.millis, timeoutActor, TransperentTick)(system.dispatcher) + + Await.ready(timeoutLatch, TestLatch.DefaultTimeout) + ticks.cancel() + system.stop(timeoutActor) + } } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 9b4d6dbffc..7f4d07546c 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -126,6 +126,11 @@ case object ReceiveTimeout extends ReceiveTimeout { def getInstance = this } +/** + * Marker trait to indicate that a message should not reset the receive timeout. + */ +trait NotInfluenceReceiveTimeout + /** * IllegalActorStateException is thrown when a core invariant in the Actor implementation has been violated. * For instance, if you try to create an Actor that doesn't extend Actor. diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 97f8bf63e8..117ee5ef3d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -67,6 +67,10 @@ trait ActorContext extends ActorRefFactory { * * Once set, the receive timeout stays in effect (i.e. continues firing repeatedly after inactivity * periods). Pass in `Duration.Undefined` to switch off this feature. + * + * Messages marked with [[NotInfluenceReceiveTimeout]] will not reset the timer. This can be useful when + * `ReceiveTimeout` should be fired by external inactivity but not influenced by internal activity, + * e.g. scheduled tick messages. */ def setReceiveTimeout(timeout: Duration): Unit @@ -479,18 +483,23 @@ private[akka] class ActorCell( } //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status - final def invoke(messageHandle: Envelope): Unit = try { - currentMessage = messageHandle - cancelReceiveTimeout() // FIXME: leave this here??? - messageHandle.message match { - case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) - case msg ⇒ receiveMessage(msg) + final def invoke(messageHandle: Envelope): Unit = { + val influenceReceiveTimeout = !messageHandle.message.isInstanceOf[NotInfluenceReceiveTimeout] + try { + currentMessage = messageHandle + if (influenceReceiveTimeout) + cancelReceiveTimeout() + messageHandle.message match { + case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) + case msg ⇒ receiveMessage(msg) + } + currentMessage = null // reset current message after successful invocation + } catch handleNonFatalOrInterruptedException { e ⇒ + handleInvokeFailure(Nil, e) + } finally { + if (influenceReceiveTimeout) + checkReceiveTimeout // Reschedule receive timeout } - currentMessage = null // reset current message after successful invocation - } catch handleNonFatalOrInterruptedException { e ⇒ - handleInvokeFailure(Nil, e) - } finally { - checkReceiveTimeout // Reschedule receive timeout } def autoReceiveMessage(msg: Envelope): Unit = { diff --git a/akka-docs/rst/java/lambda-actors.rst b/akka-docs/rst/java/lambda-actors.rst index 02e638b5ff..cd43b3f8e1 100644 --- a/akka-docs/rst/java/lambda-actors.rst +++ b/akka-docs/rst/java/lambda-actors.rst @@ -658,6 +658,10 @@ periods). Pass in `Duration.Undefined` to switch off this feature. .. includecode:: code/docs/actorlambda/ActorDocTest.java#receive-timeout +Messages marked with ``NotInfluenceReceiveTimeout`` will not reset the timer. This can be useful when +``ReceiveTimeout`` should be fired by external inactivity but not influenced by internal activity, +e.g. scheduled tick messages. + .. _stopping-actors-lambda: Stopping actors diff --git a/akka-docs/rst/java/untyped-actors.rst b/akka-docs/rst/java/untyped-actors.rst index 7caee71283..ca3c353569 100644 --- a/akka-docs/rst/java/untyped-actors.rst +++ b/akka-docs/rst/java/untyped-actors.rst @@ -603,6 +603,10 @@ periods). Pass in `Duration.Undefined` to switch off this feature. .. includecode:: code/docs/actor/MyReceiveTimeoutUntypedActor.java#receive-timeout +Messages marked with ``NotInfluenceReceiveTimeout`` will not reset the timer. This can be useful when +``ReceiveTimeout`` should be fired by external inactivity but not influenced by internal activity, +e.g. scheduled tick messages. + .. _stopping-actors-java: Stopping actors diff --git a/akka-docs/rst/scala/actors.rst b/akka-docs/rst/scala/actors.rst index ebe4e17b82..8f7e56c9b0 100644 --- a/akka-docs/rst/scala/actors.rst +++ b/akka-docs/rst/scala/actors.rst @@ -653,6 +653,10 @@ periods). Pass in `Duration.Undefined` to switch off this feature. .. includecode:: code/docs/actor/ActorDocSpec.scala#receive-timeout +Messages marked with ``NotInfluenceReceiveTimeout`` will not reset the timer. This can be useful when +``ReceiveTimeout`` should be fired by external inactivity but not influenced by internal activity, +e.g. scheduled tick messages. + .. _stopping-actors-scala: Stopping actors diff --git a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala index 85bf375a1c..c141424f1e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala @@ -7,7 +7,7 @@ import scala.annotation.tailrec import scala.collection.breakOut import scala.collection.immutable import scala.concurrent.duration.FiniteDuration -import akka.actor.{ ActorSelection, Actor, ActorPath } +import akka.actor.{ ActorSelection, Actor, ActorPath, NotInfluenceReceiveTimeout } import akka.persistence.serialization.Message object AtLeastOnceDelivery { @@ -67,7 +67,7 @@ object AtLeastOnceDelivery { */ private[akka] object Internal { case class Delivery(destination: ActorPath, message: Any, timestamp: Long, attempt: Int) - case object RedeliveryTick + case object RedeliveryTick extends NotInfluenceReceiveTimeout } }