diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 5f06543382..5781b16963 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -8,10 +8,11 @@ import se.scalablesolutions.akka.dispatch._ import se.scalablesolutions.akka.config.Config._ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.serialization.Serializer -import se.scalablesolutions.akka.util.Helpers.{ narrow, narrowSilently } +import se.scalablesolutions.akka.util.Helpers.{narrow, narrowSilently} import se.scalablesolutions.akka.util.Logging import com.google.protobuf.Message +import java.util.concurrent.TimeUnit /** * Implements the Transactor abstraction. E.g. a transactional actor. @@ -57,6 +58,7 @@ trait StatelessSerializableActor extends SerializableActor */ trait StatefulSerializerSerializableActor extends SerializableActor { val serializer: Serializer + def toBinary: Array[Byte] } @@ -68,6 +70,7 @@ trait StatefulSerializerSerializableActor extends SerializableActor { */ trait StatefulWrappedSerializableActor extends SerializableActor { def toBinary: Array[Byte] + def fromBinary(bytes: Array[Byte]) } @@ -79,10 +82,13 @@ trait StatefulWrappedSerializableActor extends SerializableActor { */ trait ProtobufSerializableActor[T <: Message] extends StatefulWrappedSerializableActor { def toBinary: Array[Byte] = toProtobuf.toByteArray + def fromBinary(bytes: Array[Byte]) = fromProtobuf(Serializer.Protobuf.fromBinary(bytes, Some(clazz)).asInstanceOf[T]) val clazz: Class[T] + def toProtobuf: T + def fromProtobuf(message: T): Unit } @@ -94,6 +100,7 @@ trait ProtobufSerializableActor[T <: Message] extends StatefulWrappedSerializabl */ trait JavaSerializableActor extends StatefulSerializerSerializableActor { @transient val serializer = Serializer.Java + def toBinary: Array[Byte] = serializer.toBinary(this) } @@ -105,6 +112,7 @@ trait JavaSerializableActor extends StatefulSerializerSerializableActor { */ trait JavaJSONSerializableActor extends StatefulSerializerSerializableActor { val serializer = Serializer.JavaJSON + def toBinary: Array[Byte] = serializer.toBinary(this) } @@ -116,6 +124,7 @@ trait JavaJSONSerializableActor extends StatefulSerializerSerializableActor { */ trait ScalaJSONSerializableActor extends StatefulSerializerSerializableActor { val serializer = Serializer.ScalaJSON + def toBinary: Array[Byte] = serializer.toBinary(this) } @@ -131,6 +140,8 @@ case class Unlink(child: ActorRef) extends LifeCycleMessage case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage case object Kill extends LifeCycleMessage +case object ReceiveTimeout + // Exceptions for Actors class ActorStartException private[akka](message: String) extends RuntimeException(message) class ActorKilledException private[akka](message: String) extends RuntimeException(message) @@ -142,7 +153,8 @@ class ActorInitializationException private[akka](message: String) extends Runtim * @author Jonas Bonér */ object Actor extends Logging { - val TIMEOUT = config.getInt("akka.actor.timeout", 5000) + val TIMEOUT = config.getInt("akka.actor.timeout", 5000) + val RECEIVE_TIMEOUT = config.getInt("akka.actor.receive.timeout", 30000) val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) /** @@ -167,7 +179,7 @@ object Actor extends Logging { * val actor = actorOf[MyActor].start * */ - def actorOf[T <: Actor: Manifest]: ActorRef = new LocalActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) + def actorOf[T <: Actor : Manifest]: ActorRef = new LocalActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) /** * Creates a Actor.actorOf out of the Actor. Allows you to pass in a factory function @@ -200,7 +212,7 @@ object Actor extends Logging { *
* import Actor._
*
- * val a = actor {
+ * val a = actor {
* case msg => ... // handle message
* }
*
@@ -222,7 +234,7 @@ object Actor extends Logging {
*
* import Actor._
*
- * val a = transactor {
+ * val a = transactor {
* case msg => ... // handle message
* }
*
@@ -242,7 +254,7 @@ object Actor extends Logging {
*
* import Actor._
*
- * val a = temporaryActor {
+ * val a = temporaryActor {
* case msg => ... // handle message
* }
*
@@ -262,9 +274,9 @@ object Actor extends Logging {
* The actor is started when created.
* Example:
*
- * val a = Actor.init {
+ * val a = Actor.init {
* ... // init stuff
- * } receive {
+ * } receive {
* case msg => ... // handle message
* }
*
@@ -292,7 +304,7 @@ object Actor extends Logging {
*
* import Actor._
*
- * spawn {
+ * spawn {
* ... // do stuff
* }
*
@@ -350,7 +362,7 @@ object Actor extends Logging {
* However, for convenience you can import these functions and fields like below, which will allow you do
* drop the 'self' prefix:
*
- * class MyActor extends Actor {
+ * class MyActor extends Actor {
* import self._
* id = ...
* dispatcher = ...
@@ -370,23 +382,23 @@ trait Actor extends Logging {
*/
type Receive = Actor.Receive
- /*
- * Option[ActorRef] representation of the 'self' ActorRef reference.
- *
- * Mainly for internal use, functions as the implicit sender references when invoking
- * one of the message send functions ('!', '!!' and '!!!').
- */
+ /*
+ * Option[ActorRef] representation of the 'self' ActorRef reference.
+ *
+ * Mainly for internal use, functions as the implicit sender references when invoking
+ * one of the message send functions ('!', '!!' and '!!!').
+ */
@transient implicit val optionSelf: Option[ActorRef] = {
val ref = Actor.actorRefInCreation.value
Actor.actorRefInCreation.value = None
if (ref.isEmpty) throw new ActorInitializationException(
- "ActorRef for instance of actor [" + getClass.getName + "] is not in scope." +
- "\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." +
- "\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." +
- "\n\tEither use:" +
- "\n\t\t'val actor = Actor.actorOf[MyActor]', or" +
- "\n\t\t'val actor = Actor.actorOf(new MyActor(..))', or" +
- "\n\t\t'val actor = Actor.actor { case msg => .. } }'")
+ "ActorRef for instance of actor [" + getClass.getName + "] is not in scope." +
+ "\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." +
+ "\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." +
+ "\n\tEither use:" +
+ "\n\t\t'val actor = Actor.actorOf[MyActor]', or" +
+ "\n\t\t'val actor = Actor.actorOf(new MyActor(..))', or" +
+ "\n\t\t'val actor = Actor.actor { case msg => .. } }'")
else ref
}
@@ -439,7 +451,7 @@ trait Actor extends Logging {
*
* Example code:
*
- * def receive = {
+ * def receive = {
* case Ping =>
* log.info("got a 'Ping' message")
* self.reply("pong")
@@ -506,8 +518,8 @@ trait Actor extends Logging {
def reply_?(message: Any): Boolean = self.reply_?(message)
/**
- * Is the actor able to handle the message passed in as arguments?
- */
+ * Is the actor able to handle the message passed in as arguments?
+ */
def isDefinedAt(message: Any): Boolean = base.isDefinedAt(message)
// =========================================
@@ -515,6 +527,7 @@ trait Actor extends Logging {
// =========================================
private[akka] def base: Receive = try {
+ cancelReceiveTimeout
lifeCycles orElse (self.hotswap getOrElse receive)
} catch {
case e: NullPointerException => throw new IllegalStateException(
@@ -522,13 +535,31 @@ trait Actor extends Logging {
}
private val lifeCycles: Receive = {
- case HotSwap(code) => self.hotswap = code
- case Restart(reason) => self.restart(reason)
- case Exit(dead, reason) => self.handleTrapExit(dead, reason)
- case Link(child) => self.link(child)
- case Unlink(child) => self.unlink(child)
+ case HotSwap(code) => self.hotswap = code; checkReceiveTimeout
+ case Restart(reason) => self.restart(reason)
+ case Exit(dead, reason) => self.handleTrapExit(dead, reason)
+ case Link(child) => self.link(child)
+ case Unlink(child) => self.unlink(child)
case UnlinkAndStop(child) => self.unlink(child); child.stop
- case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
+ case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
+ }
+
+ @volatile protected[akka] var timeoutActor: Option[ActorRef] = None
+
+ private[akka] def cancelReceiveTimeout = {
+ timeoutActor.foreach {
+ x =>
+ Scheduler.unschedule(x)
+ timeoutActor = None
+ log.debug("Timeout canceled")
+ }
+ }
+
+ private[akka] def checkReceiveTimeout = {
+ if (self.isDefinedAt(ReceiveTimeout)) {
+ log.debug("Scheduling timeout for Actor [" + toString + "]")
+ timeoutActor = Some(Scheduler.scheduleOnce(self, ReceiveTimeout, self.receiveTimeout, TimeUnit.MILLISECONDS))
+ }
}
}
diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala
index d07e18a314..e94f4c7b9e 100644
--- a/akka-core/src/main/scala/actor/ActorRef.scala
+++ b/akka-core/src/main/scala/actor/ActorRef.scala
@@ -216,6 +216,14 @@ trait ActorRef extends TransactionManagement {
*/
@volatile var timeout: Long = Actor.TIMEOUT
+ /**
+ * User overridable callback/setting.
+ *
+ * Defines the default timeout for an initial receive invocation.
+ * Used if the receive (or HotSwap) contains a case handling ReceiveTimeout.
+ */
+ @volatile var receiveTimeout: Long = Actor.RECEIVE_TIMEOUT
+
/**
* User overridable callback/setting.
*
@@ -1415,6 +1423,7 @@ sealed class LocalActorRef private[akka](
ActorRegistry.register(this)
if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name)
clearTransactionSet // clear transaction set that might have been created if atomic block has been used within the Actor constructor body
+ actor.checkReceiveTimeout
}
private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
diff --git a/akka-core/src/test/scala/ReceiveTimeoutSpec.scala b/akka-core/src/test/scala/ReceiveTimeoutSpec.scala
new file mode 100644
index 0000000000..60c5df934f
--- /dev/null
+++ b/akka-core/src/test/scala/ReceiveTimeoutSpec.scala
@@ -0,0 +1,68 @@
+package se.scalablesolutions.akka.actor
+
+import org.scalatest.junit.JUnitSuite
+import org.junit.Test
+
+import Actor._
+import java.util.concurrent.TimeUnit
+import org.multiverse.api.latches.StandardLatch
+
+class ReceiveTimeoutSpec extends JUnitSuite {
+
+ @Test def receiveShouldGetTimeout= {
+
+ val timeoutLatch = new StandardLatch
+
+ val timeoutActor = actorOf(new Actor {
+ self.receiveTimeout = 500
+
+ protected def receive = {
+ case ReceiveTimeout => timeoutLatch.open
+ }
+ }).start
+
+ // after max 1 second the timeout should already been sent
+ assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS))
+ }
+
+ @Test def swappedReceiveShouldAlsoGetTimout = {
+ val timeoutLatch = new StandardLatch
+
+ val timeoutActor = actorOf(new Actor {
+ self.receiveTimeout = 500
+
+ protected def receive = {
+ case ReceiveTimeout => timeoutLatch.open
+ }
+ }).start
+
+ // after max 1 second the timeout should already been sent
+ assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS))
+
+ val swappedLatch = new StandardLatch
+ timeoutActor ! HotSwap(Some{
+ case ReceiveTimeout => swappedLatch.open
+ })
+
+ // after max 1 second the timeout should already been sent
+ assert(swappedLatch.tryAwait(1, TimeUnit.SECONDS))
+ }
+
+ @Test def timeoutShouldBeCancelledAfterRegularReceive = {
+
+ val timeoutLatch = new StandardLatch
+ case object Tick
+ val timeoutActor = actorOf(new Actor {
+ self.receiveTimeout = 500
+
+ protected def receive = {
+ case Tick => ()
+ case ReceiveTimeout => timeoutLatch.open
+ }
+ }).start
+ timeoutActor ! Tick
+
+ // timeout already after 500 ms, so 1 second wait should be enough
+ assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS) == false)
+ }
+}
\ No newline at end of file