- moved receive timeout logic to ActorRef

- receivetimeout now only inititiated when receiveTimeout property is set
This commit is contained in:
momania 2010-07-05 13:45:03 +02:00
parent ae6bf7aaa0
commit 5baf86fd38
3 changed files with 52 additions and 39 deletions

View file

@ -62,7 +62,6 @@ class ActorInitializationException private[akka](message: String) extends Runtim
*/
object Actor extends Logging {
val TIMEOUT = config.getInt("akka.actor.timeout", 5000)
val RECEIVE_TIMEOUT = config.getInt("akka.actor.receive.timeout", 30000)
val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
/**
@ -435,7 +434,6 @@ trait Actor extends Logging {
// =========================================
private[akka] def base: Receive = try {
cancelReceiveTimeout
lifeCycles orElse (self.hotswap getOrElse receive)
} catch {
case e: NullPointerException => throw new IllegalActorStateException(
@ -443,7 +441,7 @@ trait Actor extends Logging {
}
private val lifeCycles: Receive = {
case HotSwap(code) => self.hotswap = code; checkReceiveTimeout
case HotSwap(code) => self.hotswap = code; self.checkReceiveTimeout // FIXME : how to reschedule receivetimeout on hotswap?
case Restart(reason) => self.restart(reason)
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
@ -451,25 +449,6 @@ trait Actor extends Logging {
case UnlinkAndStop(child) => self.unlink(child); child.stop
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
}
@volatile protected[akka] var timeoutActor: Option[ActorRef] = None
private[akka] def cancelReceiveTimeout = {
timeoutActor.foreach {
x =>
Scheduler.unschedule(x)
timeoutActor = None
log.debug("Timeout canceled")
}
}
private[akka] def checkReceiveTimeout = {
//if ((self.hotswap getOrElse receive).isDefinedAt(ReceiveTimeout)) { // FIXME use when 'self' is safe to use, throws NPE sometimes
if ((receive ne null) && receive.isDefinedAt(ReceiveTimeout)) {
log.debug("Scheduling timeout for Actor [" + toString + "]")
timeoutActor = Some(Scheduler.scheduleOnce(self, ReceiveTimeout, self.receiveTimeout, TimeUnit.MILLISECONDS))
}
}
}
private[actor] class AnyOptionAsTypedOption(anyOption: Option[Any]) {

View file

@ -24,12 +24,12 @@ import jsr166x.{Deque, ConcurrentLinkedDeque}
import java.net.InetSocketAddress
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.ConcurrentHashMap
import java.util.{Map => JMap}
import java.lang.reflect.Field
import RemoteActorSerialization._
import com.google.protobuf.ByteString
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
/**
* ActorRef is an immutable and serializable handle to an Actor.
@ -72,6 +72,8 @@ trait ActorRef extends TransactionManagement {
@volatile protected[akka] var _isBeingRestarted = false
@volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServer.HOSTNAME, RemoteServer.PORT)
@volatile protected[akka] var _timeoutActor: Option[ActorRef] = None
@volatile protected[akka] var startOnCreation = false
@volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
protected[this] val guard = new ReentrantGuard
@ -100,9 +102,9 @@ trait ActorRef extends TransactionManagement {
* User overridable callback/setting.
* <p/>
* Defines the default timeout for an initial receive invocation.
* Used if the receive (or HotSwap) contains a case handling ReceiveTimeout.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
*/
@volatile var receiveTimeout: Long = Actor.RECEIVE_TIMEOUT
@volatile var receiveTimeout: Option[Long] = None
/**
* User overridable callback/setting.
@ -386,7 +388,7 @@ trait ActorRef extends TransactionManagement {
* Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
* However, it will always participate in an existing transaction.
*/
def makeTransactionRequired(): Unit
def makeTransactionRequired: Unit
/**
* Sets the transaction configuration for this actor. Needs to be invoked before the actor is started.
@ -429,12 +431,12 @@ trait ActorRef extends TransactionManagement {
* Shuts down the actor its dispatcher and message queue.
* Alias for 'stop'.
*/
def exit() = stop()
def exit = stop
/**
* Shuts down the actor its dispatcher and message queue.
*/
def stop(): Unit
def stop: Unit
/**
* Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
@ -510,7 +512,7 @@ trait ActorRef extends TransactionManagement {
/**
* Shuts down and removes all linked actors.
*/
def shutdownLinkedActors(): Unit
def shutdownLinkedActors: Unit
protected[akka] def invoke(messageHandle: MessageInvocation): Unit
@ -551,6 +553,24 @@ trait ActorRef extends TransactionManagement {
}
override def toString = "Actor[" + id + ":" + uuid + "]"
protected[akka] def cancelReceiveTimeout = {
_timeoutActor.foreach {
x =>
Scheduler.unschedule(x)
_timeoutActor = None
log.debug("Timeout canceled")
}
}
protected [akka] def checkReceiveTimeout = {
cancelReceiveTimeout
receiveTimeout.foreach { timeout =>
log.debug("Scheduling timeout for Actor [" + toString + "]")
_timeoutActor = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, timeout, TimeUnit.MILLISECONDS))
}
}
}
/**
@ -680,7 +700,7 @@ sealed class LocalActorRef private[akka](
* Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
* However, it will always participate in an existing transaction.
*/
def makeTransactionRequired() = guard.withGuard {
def makeTransactionRequired = guard.withGuard {
if (!isRunning || isBeingRestarted) isTransactor = true
else throw new ActorInitializationException(
"Can not make actor transaction required after it has been started")
@ -736,6 +756,7 @@ sealed class LocalActorRef private[akka](
*/
def stop = guard.withGuard {
if (isRunning) {
cancelReceiveTimeout
dispatcher.unregister(this)
_transactionFactory = None
_isRunning = false
@ -873,7 +894,7 @@ sealed class LocalActorRef private[akka](
/**
* Shuts down and removes all linked actors.
*/
def shutdownLinkedActors(): Unit = guard.withGuard {
def shutdownLinkedActors: Unit = guard.withGuard {
linkedActorsAsList.foreach(_.stop)
linkedActors.clear
}
@ -1000,6 +1021,7 @@ sealed class LocalActorRef private[akka](
setTransactionSet(txSet)
try {
cancelReceiveTimeout // FIXME: leave this here?
if (isTransactor) {
val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory)
atomic(txFactory) {
@ -1158,7 +1180,7 @@ sealed class LocalActorRef private[akka](
ActorRegistry.register(this)
if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name)
clearTransactionSet // clear transaction set that might have been created if atomic block has been used within the Actor constructor body
actor.checkReceiveTimeout
checkReceiveTimeout
}
private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
@ -1221,7 +1243,7 @@ private[akka] case class RemoteActorRef private[akka] (
this
}
def stop(): Unit = {
def stop: Unit = {
_isRunning = false
_isShutDown = true
}
@ -1237,7 +1259,7 @@ private[akka] case class RemoteActorRef private[akka] (
def actorClass: Class[_ <: Actor] = unsupported
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
def dispatcher: MessageDispatcher = unsupported
def makeTransactionRequired(): Unit = unsupported
def makeTransactionRequired: Unit = unsupported
def transactionConfig_=(config: TransactionConfig): Unit = unsupported
def transactionConfig: TransactionConfig = unsupported
def makeRemote(hostname: String, port: Int): Unit = unsupported
@ -1254,7 +1276,7 @@ private[akka] case class RemoteActorRef private[akka] (
def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = unsupported
def mailboxSize: Int = unsupported
def supervisor: Option[ActorRef] = unsupported
def shutdownLinkedActors(): Unit = unsupported
def shutdownLinkedActors: Unit = unsupported
protected[akka] def mailbox: Deque[MessageInvocation] = unsupported
protected[akka] def restart(reason: Throwable): Unit = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported

View file

@ -14,7 +14,7 @@ class ReceiveTimeoutSpec extends JUnitSuite {
val timeoutLatch = new StandardLatch
val timeoutActor = actorOf(new Actor {
self.receiveTimeout = 500
self.receiveTimeout = Some(500L)
protected def receive = {
case ReceiveTimeout => timeoutLatch.open
@ -28,7 +28,7 @@ class ReceiveTimeoutSpec extends JUnitSuite {
val timeoutLatch = new StandardLatch
val timeoutActor = actorOf(new Actor {
self.receiveTimeout = 500
self.receiveTimeout = Some(500L)
protected def receive = {
case ReceiveTimeout => timeoutLatch.open
@ -51,7 +51,7 @@ class ReceiveTimeoutSpec extends JUnitSuite {
val timeoutLatch = new StandardLatch
case object Tick
val timeoutActor = actorOf(new Actor {
self.receiveTimeout = 500
self.receiveTimeout = Some(500L)
protected def receive = {
case Tick => ()
@ -60,6 +60,18 @@ class ReceiveTimeoutSpec extends JUnitSuite {
}).start
timeoutActor ! Tick
assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS) == false)
assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == false)
}
@Test def timeoutShouldNotBeSentWhenNotSpecified = {
val timeoutLatch = new StandardLatch
val timeoutActor = actorOf(new Actor {
protected def receive = {
case ReceiveTimeout => timeoutLatch.open
}
}).start
assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS) == false)
}
}