diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 33bf9bf998..b38401a4a6 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -62,7 +62,6 @@ class ActorInitializationException private[akka](message: String) extends Runtim */ object Actor extends Logging { val TIMEOUT = config.getInt("akka.actor.timeout", 5000) - val RECEIVE_TIMEOUT = config.getInt("akka.actor.receive.timeout", 30000) val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) /** @@ -435,7 +434,6 @@ trait Actor extends Logging { // ========================================= private[akka] def base: Receive = try { - cancelReceiveTimeout lifeCycles orElse (self.hotswap getOrElse receive) } catch { case e: NullPointerException => throw new IllegalActorStateException( @@ -443,7 +441,7 @@ trait Actor extends Logging { } private val lifeCycles: Receive = { - case HotSwap(code) => self.hotswap = code; checkReceiveTimeout + case HotSwap(code) => self.hotswap = code; self.checkReceiveTimeout // FIXME : how to reschedule receivetimeout on hotswap? case Restart(reason) => self.restart(reason) case Exit(dead, reason) => self.handleTrapExit(dead, reason) case Link(child) => self.link(child) @@ -451,25 +449,6 @@ trait Actor extends Logging { case UnlinkAndStop(child) => self.unlink(child); child.stop case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message") } - - @volatile protected[akka] var timeoutActor: Option[ActorRef] = None - - private[akka] def cancelReceiveTimeout = { - timeoutActor.foreach { - x => - Scheduler.unschedule(x) - timeoutActor = None - log.debug("Timeout canceled") - } - } - - private[akka] def checkReceiveTimeout = { - //if ((self.hotswap getOrElse receive).isDefinedAt(ReceiveTimeout)) { // FIXME use when 'self' is safe to use, throws NPE sometimes - if ((receive ne null) && receive.isDefinedAt(ReceiveTimeout)) { - log.debug("Scheduling timeout for Actor [" + toString + "]") - timeoutActor = Some(Scheduler.scheduleOnce(self, ReceiveTimeout, self.receiveTimeout, TimeUnit.MILLISECONDS)) - } - } } private[actor] class AnyOptionAsTypedOption(anyOption: Option[Any]) { diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index e10a55e3b1..447931019a 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -24,12 +24,12 @@ import jsr166x.{Deque, ConcurrentLinkedDeque} import java.net.InetSocketAddress import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.ConcurrentHashMap import java.util.{Map => JMap} import java.lang.reflect.Field import RemoteActorSerialization._ import com.google.protobuf.ByteString +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} /** * ActorRef is an immutable and serializable handle to an Actor. @@ -72,6 +72,8 @@ trait ActorRef extends TransactionManagement { @volatile protected[akka] var _isBeingRestarted = false @volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServer.HOSTNAME, RemoteServer.PORT) + @volatile protected[akka] var _timeoutActor: Option[ActorRef] = None + @volatile protected[akka] var startOnCreation = false @volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false protected[this] val guard = new ReentrantGuard @@ -100,9 +102,9 @@ trait ActorRef extends TransactionManagement { * User overridable callback/setting. *
* Defines the default timeout for an initial receive invocation. - * Used if the receive (or HotSwap) contains a case handling ReceiveTimeout. + * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. */ - @volatile var receiveTimeout: Long = Actor.RECEIVE_TIMEOUT + @volatile var receiveTimeout: Option[Long] = None /** * User overridable callback/setting. @@ -551,6 +553,24 @@ trait ActorRef extends TransactionManagement { } override def toString = "Actor[" + id + ":" + uuid + "]" + + protected[akka] def cancelReceiveTimeout = { + _timeoutActor.foreach { + x => + Scheduler.unschedule(x) + _timeoutActor = None + log.debug("Timeout canceled") + } + } + + protected [akka] def checkReceiveTimeout = { + cancelReceiveTimeout + receiveTimeout.foreach { timeout => + log.debug("Scheduling timeout for Actor [" + toString + "]") + _timeoutActor = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, timeout, TimeUnit.MILLISECONDS)) + } + } + } /** @@ -734,8 +754,9 @@ sealed class LocalActorRef private[akka]( /** * Shuts down the actor its dispatcher and message queue. */ - def stop = guard.withGuard { + def stop() = guard.withGuard { if (isRunning) { + cancelReceiveTimeout dispatcher.unregister(this) _transactionFactory = None _isRunning = false @@ -1000,6 +1021,7 @@ sealed class LocalActorRef private[akka]( setTransactionSet(txSet) try { + cancelReceiveTimeout // FIXME: leave this here? if (isTransactor) { val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory) atomic(txFactory) { @@ -1158,7 +1180,7 @@ sealed class LocalActorRef private[akka]( ActorRegistry.register(this) if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name) clearTransactionSet // clear transaction set that might have been created if atomic block has been used within the Actor constructor body - actor.checkReceiveTimeout + checkReceiveTimeout } private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) { @@ -1221,7 +1243,7 @@ private[akka] case class RemoteActorRef private[akka] ( this } - def stop(): Unit = { + def stop: Unit = { _isRunning = false _isShutDown = true } @@ -1237,7 +1259,7 @@ private[akka] case class RemoteActorRef private[akka] ( def actorClass: Class[_ <: Actor] = unsupported def dispatcher_=(md: MessageDispatcher): Unit = unsupported def dispatcher: MessageDispatcher = unsupported - def makeTransactionRequired(): Unit = unsupported + def makeTransactionRequired: Unit = unsupported def transactionConfig_=(config: TransactionConfig): Unit = unsupported def transactionConfig: TransactionConfig = unsupported def makeRemote(hostname: String, port: Int): Unit = unsupported @@ -1254,7 +1276,7 @@ private[akka] case class RemoteActorRef private[akka] ( def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = unsupported def mailboxSize: Int = unsupported def supervisor: Option[ActorRef] = unsupported - def shutdownLinkedActors(): Unit = unsupported + def shutdownLinkedActors: Unit = unsupported protected[akka] def mailbox: Deque[MessageInvocation] = unsupported protected[akka] def restart(reason: Throwable): Unit = unsupported protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported diff --git a/akka-core/src/test/scala/ReceiveTimeoutSpec.scala b/akka-core/src/test/scala/ReceiveTimeoutSpec.scala index 938cb08d43..5c50337894 100644 --- a/akka-core/src/test/scala/ReceiveTimeoutSpec.scala +++ b/akka-core/src/test/scala/ReceiveTimeoutSpec.scala @@ -14,7 +14,7 @@ class ReceiveTimeoutSpec extends JUnitSuite { val timeoutLatch = new StandardLatch val timeoutActor = actorOf(new Actor { - self.receiveTimeout = 500 + self.receiveTimeout = Some(500L) protected def receive = { case ReceiveTimeout => timeoutLatch.open @@ -28,7 +28,7 @@ class ReceiveTimeoutSpec extends JUnitSuite { val timeoutLatch = new StandardLatch val timeoutActor = actorOf(new Actor { - self.receiveTimeout = 500 + self.receiveTimeout = Some(500L) protected def receive = { case ReceiveTimeout => timeoutLatch.open @@ -51,7 +51,7 @@ class ReceiveTimeoutSpec extends JUnitSuite { val timeoutLatch = new StandardLatch case object Tick val timeoutActor = actorOf(new Actor { - self.receiveTimeout = 500 + self.receiveTimeout = Some(500L) protected def receive = { case Tick => () @@ -60,6 +60,18 @@ class ReceiveTimeoutSpec extends JUnitSuite { }).start timeoutActor ! Tick - assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS) == false) + assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == false) + } + + @Test def timeoutShouldNotBeSentWhenNotSpecified = { + val timeoutLatch = new StandardLatch + val timeoutActor = actorOf(new Actor { + + protected def receive = { + case ReceiveTimeout => timeoutLatch.open + } + }).start + + assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS) == false) } } diff --git a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala index d86567c5a6..bc4286cec1 100644 --- a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala +++ b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala @@ -14,6 +14,7 @@ import org.springframework.beans.BeanWrapper import org.springframework.beans.BeanUtils import org.springframework.beans.factory.BeanFactory import org.springframework.beans.factory.config.AbstractFactoryBean +import org.springframework.context.{ApplicationContext,ApplicationContextAware} import org.springframework.util.ReflectionUtils import org.springframework.util.StringUtils @@ -29,7 +30,7 @@ import se.scalablesolutions.akka.util.Logging * @author Johan Rask * @author Martin Krasser */ -class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging { +class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with ApplicationContextAware { import StringReflect._ import AkkaSpringConfigurationTags._ @@ -46,6 +47,7 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging { @BeanProperty var dispatcher: DispatcherProperties = _ @BeanProperty var scope:String = VAL_SCOPE_SINGLETON @BeanProperty var property:PropertyEntries = _ + @BeanProperty var applicationContext:ApplicationContext = _ /* * @see org.springframework.beans.factory.FactoryBean#getObjectType() @@ -87,7 +89,11 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging { private def setProperties(ref:AnyRef) : AnyRef = { log.debug("Processing properties and dependencies for target class %s",target) val beanWrapper = new BeanWrapperImpl(ref); - for(entry <- property.entryList) { + if(ref.isInstanceOf[ApplicationContextAware]) { + log.debug("Setting application context") + beanWrapper.setPropertyValue("applicationContext",applicationContext) + } + for(entry <- property.entryList) { val propertyDescriptor = BeanUtils.getPropertyDescriptor(ref.getClass,entry.name) val method = propertyDescriptor.getWriteMethod(); diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java index e8adaa38e7..ce8cf1fd70 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java @@ -1,13 +1,23 @@ package se.scalablesolutions.akka.spring; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.ApplicationContext; + import se.scalablesolutions.akka.actor.annotation.shutdown; -public class SampleBean { +public class SampleBean implements ApplicationContextAware { public boolean down; + public boolean gotApplicationContext; + public SampleBean() { down = false; + gotApplicationContext = false; + } + + public void setApplicationContext(ApplicationContext context) { + gotApplicationContext = true; } public String foo(String s) { diff --git a/akka-spring/src/test/resources/appContext.xml b/akka-spring/src/test/resources/appContext.xml index bcb1a7f525..b299b3e363 100644 --- a/akka-spring/src/test/resources/appContext.xml +++ b/akka-spring/src/test/resources/appContext.xml @@ -7,6 +7,7 @@ http://www.akkasource.org/schema/akka http://scalablesolutions.se/akka/akka-0.10.xsd"> +