Added ReceiveTimeout behaviour

This commit is contained in:
momania 2010-07-01 13:58:05 +02:00
parent 23033b4d4a
commit 3d97ca3368
3 changed files with 140 additions and 32 deletions

View file

@ -8,10 +8,11 @@ import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util.Helpers.{ narrow, narrowSilently }
import se.scalablesolutions.akka.util.Helpers.{narrow, narrowSilently}
import se.scalablesolutions.akka.util.Logging
import com.google.protobuf.Message
import java.util.concurrent.TimeUnit
/**
* Implements the Transactor abstraction. E.g. a transactional actor.
@ -57,6 +58,7 @@ trait StatelessSerializableActor extends SerializableActor
*/
trait StatefulSerializerSerializableActor extends SerializableActor {
val serializer: Serializer
def toBinary: Array[Byte]
}
@ -68,6 +70,7 @@ trait StatefulSerializerSerializableActor extends SerializableActor {
*/
trait StatefulWrappedSerializableActor extends SerializableActor {
def toBinary: Array[Byte]
def fromBinary(bytes: Array[Byte])
}
@ -79,10 +82,13 @@ trait StatefulWrappedSerializableActor extends SerializableActor {
*/
trait ProtobufSerializableActor[T <: Message] extends StatefulWrappedSerializableActor {
def toBinary: Array[Byte] = toProtobuf.toByteArray
def fromBinary(bytes: Array[Byte]) = fromProtobuf(Serializer.Protobuf.fromBinary(bytes, Some(clazz)).asInstanceOf[T])
val clazz: Class[T]
def toProtobuf: T
def fromProtobuf(message: T): Unit
}
@ -94,6 +100,7 @@ trait ProtobufSerializableActor[T <: Message] extends StatefulWrappedSerializabl
*/
trait JavaSerializableActor extends StatefulSerializerSerializableActor {
@transient val serializer = Serializer.Java
def toBinary: Array[Byte] = serializer.toBinary(this)
}
@ -105,6 +112,7 @@ trait JavaSerializableActor extends StatefulSerializerSerializableActor {
*/
trait JavaJSONSerializableActor extends StatefulSerializerSerializableActor {
val serializer = Serializer.JavaJSON
def toBinary: Array[Byte] = serializer.toBinary(this)
}
@ -116,6 +124,7 @@ trait JavaJSONSerializableActor extends StatefulSerializerSerializableActor {
*/
trait ScalaJSONSerializableActor extends StatefulSerializerSerializableActor {
val serializer = Serializer.ScalaJSON
def toBinary: Array[Byte] = serializer.toBinary(this)
}
@ -131,6 +140,8 @@ case class Unlink(child: ActorRef) extends LifeCycleMessage
case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage
case object Kill extends LifeCycleMessage
case object ReceiveTimeout
// Exceptions for Actors
class ActorStartException private[akka](message: String) extends RuntimeException(message)
class ActorKilledException private[akka](message: String) extends RuntimeException(message)
@ -142,7 +153,8 @@ class ActorInitializationException private[akka](message: String) extends Runtim
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Actor extends Logging {
val TIMEOUT = config.getInt("akka.actor.timeout", 5000)
val TIMEOUT = config.getInt("akka.actor.timeout", 5000)
val RECEIVE_TIMEOUT = config.getInt("akka.actor.receive.timeout", 30000)
val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
/**
@ -167,7 +179,7 @@ object Actor extends Logging {
* val actor = actorOf[MyActor].start
* </pre>
*/
def actorOf[T <: Actor: Manifest]: ActorRef = new LocalActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
def actorOf[T <: Actor : Manifest]: ActorRef = new LocalActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
/**
* Creates a Actor.actorOf out of the Actor. Allows you to pass in a factory function
@ -200,7 +212,7 @@ object Actor extends Logging {
* <pre>
* import Actor._
*
* val a = actor {
* val a = actor {
* case msg => ... // handle message
* }
* </pre>
@ -222,7 +234,7 @@ object Actor extends Logging {
* <pre>
* import Actor._
*
* val a = transactor {
* val a = transactor {
* case msg => ... // handle message
* }
* </pre>
@ -242,7 +254,7 @@ object Actor extends Logging {
* <pre>
* import Actor._
*
* val a = temporaryActor {
* val a = temporaryActor {
* case msg => ... // handle message
* }
* </pre>
@ -262,9 +274,9 @@ object Actor extends Logging {
* The actor is started when created.
* Example:
* <pre>
* val a = Actor.init {
* val a = Actor.init {
* ... // init stuff
* } receive {
* } receive {
* case msg => ... // handle message
* }
* </pre>
@ -292,7 +304,7 @@ object Actor extends Logging {
* <pre>
* import Actor._
*
* spawn {
* spawn {
* ... // do stuff
* }
* </pre>
@ -350,7 +362,7 @@ object Actor extends Logging {
* However, for convenience you can import these functions and fields like below, which will allow you do
* drop the 'self' prefix:
* <pre>
* class MyActor extends Actor {
* class MyActor extends Actor {
* import self._
* id = ...
* dispatcher = ...
@ -370,23 +382,23 @@ trait Actor extends Logging {
*/
type Receive = Actor.Receive
/*
* Option[ActorRef] representation of the 'self' ActorRef reference.
* <p/>
* Mainly for internal use, functions as the implicit sender references when invoking
* one of the message send functions ('!', '!!' and '!!!').
*/
/*
* Option[ActorRef] representation of the 'self' ActorRef reference.
* <p/>
* Mainly for internal use, functions as the implicit sender references when invoking
* one of the message send functions ('!', '!!' and '!!!').
*/
@transient implicit val optionSelf: Option[ActorRef] = {
val ref = Actor.actorRefInCreation.value
Actor.actorRefInCreation.value = None
if (ref.isEmpty) throw new ActorInitializationException(
"ActorRef for instance of actor [" + getClass.getName + "] is not in scope." +
"\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." +
"\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." +
"\n\tEither use:" +
"\n\t\t'val actor = Actor.actorOf[MyActor]', or" +
"\n\t\t'val actor = Actor.actorOf(new MyActor(..))', or" +
"\n\t\t'val actor = Actor.actor { case msg => .. } }'")
"ActorRef for instance of actor [" + getClass.getName + "] is not in scope." +
"\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." +
"\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." +
"\n\tEither use:" +
"\n\t\t'val actor = Actor.actorOf[MyActor]', or" +
"\n\t\t'val actor = Actor.actorOf(new MyActor(..))', or" +
"\n\t\t'val actor = Actor.actor { case msg => .. } }'")
else ref
}
@ -439,7 +451,7 @@ trait Actor extends Logging {
* <p/>
* Example code:
* <pre>
* def receive = {
* def receive = {
* case Ping =&gt;
* log.info("got a 'Ping' message")
* self.reply("pong")
@ -506,8 +518,8 @@ trait Actor extends Logging {
def reply_?(message: Any): Boolean = self.reply_?(message)
/**
* Is the actor able to handle the message passed in as arguments?
*/
* Is the actor able to handle the message passed in as arguments?
*/
def isDefinedAt(message: Any): Boolean = base.isDefinedAt(message)
// =========================================
@ -515,6 +527,7 @@ trait Actor extends Logging {
// =========================================
private[akka] def base: Receive = try {
cancelReceiveTimeout
lifeCycles orElse (self.hotswap getOrElse receive)
} catch {
case e: NullPointerException => throw new IllegalStateException(
@ -522,13 +535,31 @@ trait Actor extends Logging {
}
private val lifeCycles: Receive = {
case HotSwap(code) => self.hotswap = code
case Restart(reason) => self.restart(reason)
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
case Unlink(child) => self.unlink(child)
case HotSwap(code) => self.hotswap = code; checkReceiveTimeout
case Restart(reason) => self.restart(reason)
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
case Unlink(child) => self.unlink(child)
case UnlinkAndStop(child) => self.unlink(child); child.stop
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
}
@volatile protected[akka] var timeoutActor: Option[ActorRef] = None
private[akka] def cancelReceiveTimeout = {
timeoutActor.foreach {
x =>
Scheduler.unschedule(x)
timeoutActor = None
log.debug("Timeout canceled")
}
}
private[akka] def checkReceiveTimeout = {
if (self.isDefinedAt(ReceiveTimeout)) {
log.debug("Scheduling timeout for Actor [" + toString + "]")
timeoutActor = Some(Scheduler.scheduleOnce(self, ReceiveTimeout, self.receiveTimeout, TimeUnit.MILLISECONDS))
}
}
}

View file

@ -216,6 +216,14 @@ trait ActorRef extends TransactionManagement {
*/
@volatile var timeout: Long = Actor.TIMEOUT
/**
* User overridable callback/setting.
* <p/>
* Defines the default timeout for an initial receive invocation.
* Used if the receive (or HotSwap) contains a case handling ReceiveTimeout.
*/
@volatile var receiveTimeout: Long = Actor.RECEIVE_TIMEOUT
/**
* User overridable callback/setting.
*
@ -1415,6 +1423,7 @@ sealed class LocalActorRef private[akka](
ActorRegistry.register(this)
if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name)
clearTransactionSet // clear transaction set that might have been created if atomic block has been used within the Actor constructor body
actor.checkReceiveTimeout
}
private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {

View file

@ -0,0 +1,68 @@
package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import Actor._
import java.util.concurrent.TimeUnit
import org.multiverse.api.latches.StandardLatch
class ReceiveTimeoutSpec extends JUnitSuite {
@Test def receiveShouldGetTimeout= {
val timeoutLatch = new StandardLatch
val timeoutActor = actorOf(new Actor {
self.receiveTimeout = 500
protected def receive = {
case ReceiveTimeout => timeoutLatch.open
}
}).start
// after max 1 second the timeout should already been sent
assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS))
}
@Test def swappedReceiveShouldAlsoGetTimout = {
val timeoutLatch = new StandardLatch
val timeoutActor = actorOf(new Actor {
self.receiveTimeout = 500
protected def receive = {
case ReceiveTimeout => timeoutLatch.open
}
}).start
// after max 1 second the timeout should already been sent
assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS))
val swappedLatch = new StandardLatch
timeoutActor ! HotSwap(Some{
case ReceiveTimeout => swappedLatch.open
})
// after max 1 second the timeout should already been sent
assert(swappedLatch.tryAwait(1, TimeUnit.SECONDS))
}
@Test def timeoutShouldBeCancelledAfterRegularReceive = {
val timeoutLatch = new StandardLatch
case object Tick
val timeoutActor = actorOf(new Actor {
self.receiveTimeout = 500
protected def receive = {
case Tick => ()
case ReceiveTimeout => timeoutLatch.open
}
}).start
timeoutActor ! Tick
// timeout already after 500 ms, so 1 second wait should be enough
assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS) == false)
}
}