From 3d97ca33681947ef06ffb02465159abc5b3b2a3f Mon Sep 17 00:00:00 2001 From: momania Date: Thu, 1 Jul 2010 13:58:05 +0200 Subject: [PATCH] Added ReceiveTimeout behaviour --- akka-core/src/main/scala/actor/Actor.scala | 95 ++++++++++++------- akka-core/src/main/scala/actor/ActorRef.scala | 9 ++ .../src/test/scala/ReceiveTimeoutSpec.scala | 68 +++++++++++++ 3 files changed, 140 insertions(+), 32 deletions(-) create mode 100644 akka-core/src/test/scala/ReceiveTimeoutSpec.scala diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 5f06543382..5781b16963 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -8,10 +8,11 @@ import se.scalablesolutions.akka.dispatch._ import se.scalablesolutions.akka.config.Config._ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.serialization.Serializer -import se.scalablesolutions.akka.util.Helpers.{ narrow, narrowSilently } +import se.scalablesolutions.akka.util.Helpers.{narrow, narrowSilently} import se.scalablesolutions.akka.util.Logging import com.google.protobuf.Message +import java.util.concurrent.TimeUnit /** * Implements the Transactor abstraction. E.g. a transactional actor. @@ -57,6 +58,7 @@ trait StatelessSerializableActor extends SerializableActor */ trait StatefulSerializerSerializableActor extends SerializableActor { val serializer: Serializer + def toBinary: Array[Byte] } @@ -68,6 +70,7 @@ trait StatefulSerializerSerializableActor extends SerializableActor { */ trait StatefulWrappedSerializableActor extends SerializableActor { def toBinary: Array[Byte] + def fromBinary(bytes: Array[Byte]) } @@ -79,10 +82,13 @@ trait StatefulWrappedSerializableActor extends SerializableActor { */ trait ProtobufSerializableActor[T <: Message] extends StatefulWrappedSerializableActor { def toBinary: Array[Byte] = toProtobuf.toByteArray + def fromBinary(bytes: Array[Byte]) = fromProtobuf(Serializer.Protobuf.fromBinary(bytes, Some(clazz)).asInstanceOf[T]) val clazz: Class[T] + def toProtobuf: T + def fromProtobuf(message: T): Unit } @@ -94,6 +100,7 @@ trait ProtobufSerializableActor[T <: Message] extends StatefulWrappedSerializabl */ trait JavaSerializableActor extends StatefulSerializerSerializableActor { @transient val serializer = Serializer.Java + def toBinary: Array[Byte] = serializer.toBinary(this) } @@ -105,6 +112,7 @@ trait JavaSerializableActor extends StatefulSerializerSerializableActor { */ trait JavaJSONSerializableActor extends StatefulSerializerSerializableActor { val serializer = Serializer.JavaJSON + def toBinary: Array[Byte] = serializer.toBinary(this) } @@ -116,6 +124,7 @@ trait JavaJSONSerializableActor extends StatefulSerializerSerializableActor { */ trait ScalaJSONSerializableActor extends StatefulSerializerSerializableActor { val serializer = Serializer.ScalaJSON + def toBinary: Array[Byte] = serializer.toBinary(this) } @@ -131,6 +140,8 @@ case class Unlink(child: ActorRef) extends LifeCycleMessage case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage case object Kill extends LifeCycleMessage +case object ReceiveTimeout + // Exceptions for Actors class ActorStartException private[akka](message: String) extends RuntimeException(message) class ActorKilledException private[akka](message: String) extends RuntimeException(message) @@ -142,7 +153,8 @@ class ActorInitializationException private[akka](message: String) extends Runtim * @author Jonas Bonér */ object Actor extends Logging { - val TIMEOUT = config.getInt("akka.actor.timeout", 5000) + val TIMEOUT = config.getInt("akka.actor.timeout", 5000) + val RECEIVE_TIMEOUT = config.getInt("akka.actor.receive.timeout", 30000) val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) /** @@ -167,7 +179,7 @@ object Actor extends Logging { * val actor = actorOf[MyActor].start * */ - def actorOf[T <: Actor: Manifest]: ActorRef = new LocalActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) + def actorOf[T <: Actor : Manifest]: ActorRef = new LocalActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) /** * Creates a Actor.actorOf out of the Actor. Allows you to pass in a factory function @@ -200,7 +212,7 @@ object Actor extends Logging { *
    * import Actor._
    *
-   * val a = actor {
+   * val a = actor  {
    *   case msg => ... // handle message
    * }
    * 
@@ -222,7 +234,7 @@ object Actor extends Logging { *
    * import Actor._
    *
-   * val a = transactor {
+   * val a = transactor  {
    *   case msg => ... // handle message
    * }
    * 
@@ -242,7 +254,7 @@ object Actor extends Logging { *
    * import Actor._
    *
-   * val a = temporaryActor {
+   * val a = temporaryActor  {
    *   case msg => ... // handle message
    * }
    * 
@@ -262,9 +274,9 @@ object Actor extends Logging { * The actor is started when created. * Example: *
-   * val a = Actor.init {
+   * val a = Actor.init  {
    *   ... // init stuff
-   * } receive  {
+   * } receive   {
    *   case msg => ... // handle message
    * }
    * 
@@ -292,7 +304,7 @@ object Actor extends Logging { *
    * import Actor._
    *
-   * spawn {
+   * spawn  {
    *   ... // do stuff
    * }
    * 
@@ -350,7 +362,7 @@ object Actor extends Logging { * However, for convenience you can import these functions and fields like below, which will allow you do * drop the 'self' prefix: *
- * class MyActor extends Actor {
+ * class MyActor extends Actor  {
  *   import self._
  *   id = ...
  *   dispatcher = ...
@@ -370,23 +382,23 @@ trait Actor extends Logging {
    */
   type Receive = Actor.Receive
 
-   /*
-    * Option[ActorRef] representation of the 'self' ActorRef reference.
-    * 

- * Mainly for internal use, functions as the implicit sender references when invoking - * one of the message send functions ('!', '!!' and '!!!'). - */ + /* + * Option[ActorRef] representation of the 'self' ActorRef reference. + *

+ * Mainly for internal use, functions as the implicit sender references when invoking + * one of the message send functions ('!', '!!' and '!!!'). + */ @transient implicit val optionSelf: Option[ActorRef] = { val ref = Actor.actorRefInCreation.value Actor.actorRefInCreation.value = None if (ref.isEmpty) throw new ActorInitializationException( - "ActorRef for instance of actor [" + getClass.getName + "] is not in scope." + - "\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." + - "\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." + - "\n\tEither use:" + - "\n\t\t'val actor = Actor.actorOf[MyActor]', or" + - "\n\t\t'val actor = Actor.actorOf(new MyActor(..))', or" + - "\n\t\t'val actor = Actor.actor { case msg => .. } }'") + "ActorRef for instance of actor [" + getClass.getName + "] is not in scope." + + "\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." + + "\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." + + "\n\tEither use:" + + "\n\t\t'val actor = Actor.actorOf[MyActor]', or" + + "\n\t\t'val actor = Actor.actorOf(new MyActor(..))', or" + + "\n\t\t'val actor = Actor.actor { case msg => .. } }'") else ref } @@ -439,7 +451,7 @@ trait Actor extends Logging { *

* Example code: *

-   *   def receive = {
+   *   def receive =  {
    *     case Ping =>
    *       log.info("got a 'Ping' message")
    *       self.reply("pong")
@@ -506,8 +518,8 @@ trait Actor extends Logging {
   def reply_?(message: Any): Boolean = self.reply_?(message)
 
   /**
-  * Is the actor able to handle the message passed in as arguments?
-  */
+   * Is the actor able to handle the message passed in as arguments?
+   */
   def isDefinedAt(message: Any): Boolean = base.isDefinedAt(message)
 
   // =========================================
@@ -515,6 +527,7 @@ trait Actor extends Logging {
   // =========================================
 
   private[akka] def base: Receive = try {
+    cancelReceiveTimeout
     lifeCycles orElse (self.hotswap getOrElse receive)
   } catch {
     case e: NullPointerException => throw new IllegalStateException(
@@ -522,13 +535,31 @@ trait Actor extends Logging {
   }
 
   private val lifeCycles: Receive = {
-    case HotSwap(code) =>        self.hotswap = code
-    case Restart(reason) =>      self.restart(reason)
-    case Exit(dead, reason) =>   self.handleTrapExit(dead, reason)
-    case Link(child) =>          self.link(child)
-    case Unlink(child) =>        self.unlink(child)
+    case HotSwap(code) => self.hotswap = code; checkReceiveTimeout
+    case Restart(reason) => self.restart(reason)
+    case Exit(dead, reason) => self.handleTrapExit(dead, reason)
+    case Link(child) => self.link(child)
+    case Unlink(child) => self.unlink(child)
     case UnlinkAndStop(child) => self.unlink(child); child.stop
-    case Kill =>                 throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
+    case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
+  }
+
+  @volatile protected[akka] var timeoutActor: Option[ActorRef] = None
+
+  private[akka] def cancelReceiveTimeout = {
+    timeoutActor.foreach {
+      x =>
+        Scheduler.unschedule(x)
+        timeoutActor = None
+        log.debug("Timeout canceled")
+    }
+  }
+
+  private[akka] def checkReceiveTimeout = {
+    if (self.isDefinedAt(ReceiveTimeout)) {
+      log.debug("Scheduling timeout for Actor [" + toString + "]")
+      timeoutActor = Some(Scheduler.scheduleOnce(self, ReceiveTimeout, self.receiveTimeout, TimeUnit.MILLISECONDS))
+    }
   }
 }
 
diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala
index d07e18a314..e94f4c7b9e 100644
--- a/akka-core/src/main/scala/actor/ActorRef.scala
+++ b/akka-core/src/main/scala/actor/ActorRef.scala
@@ -216,6 +216,14 @@ trait ActorRef extends TransactionManagement {
    */
   @volatile var timeout: Long = Actor.TIMEOUT
 
+  /**
+     * User overridable callback/setting.
+     * 

+ * Defines the default timeout for an initial receive invocation. + * Used if the receive (or HotSwap) contains a case handling ReceiveTimeout. + */ + @volatile var receiveTimeout: Long = Actor.RECEIVE_TIMEOUT + /** * User overridable callback/setting. * @@ -1415,6 +1423,7 @@ sealed class LocalActorRef private[akka]( ActorRegistry.register(this) if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name) clearTransactionSet // clear transaction set that might have been created if atomic block has been used within the Actor constructor body + actor.checkReceiveTimeout } private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) { diff --git a/akka-core/src/test/scala/ReceiveTimeoutSpec.scala b/akka-core/src/test/scala/ReceiveTimeoutSpec.scala new file mode 100644 index 0000000000..60c5df934f --- /dev/null +++ b/akka-core/src/test/scala/ReceiveTimeoutSpec.scala @@ -0,0 +1,68 @@ +package se.scalablesolutions.akka.actor + +import org.scalatest.junit.JUnitSuite +import org.junit.Test + +import Actor._ +import java.util.concurrent.TimeUnit +import org.multiverse.api.latches.StandardLatch + +class ReceiveTimeoutSpec extends JUnitSuite { + + @Test def receiveShouldGetTimeout= { + + val timeoutLatch = new StandardLatch + + val timeoutActor = actorOf(new Actor { + self.receiveTimeout = 500 + + protected def receive = { + case ReceiveTimeout => timeoutLatch.open + } + }).start + + // after max 1 second the timeout should already been sent + assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS)) + } + + @Test def swappedReceiveShouldAlsoGetTimout = { + val timeoutLatch = new StandardLatch + + val timeoutActor = actorOf(new Actor { + self.receiveTimeout = 500 + + protected def receive = { + case ReceiveTimeout => timeoutLatch.open + } + }).start + + // after max 1 second the timeout should already been sent + assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS)) + + val swappedLatch = new StandardLatch + timeoutActor ! HotSwap(Some{ + case ReceiveTimeout => swappedLatch.open + }) + + // after max 1 second the timeout should already been sent + assert(swappedLatch.tryAwait(1, TimeUnit.SECONDS)) + } + + @Test def timeoutShouldBeCancelledAfterRegularReceive = { + + val timeoutLatch = new StandardLatch + case object Tick + val timeoutActor = actorOf(new Actor { + self.receiveTimeout = 500 + + protected def receive = { + case Tick => () + case ReceiveTimeout => timeoutLatch.open + } + }).start + timeoutActor ! Tick + + // timeout already after 500 ms, so 1 second wait should be enough + assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS) == false) + } +} \ No newline at end of file