From 5baf86fd380c5c28b40898a0d325f1eea925b8db Mon Sep 17 00:00:00 2001 From: momania Date: Mon, 5 Jul 2010 13:45:03 +0200 Subject: [PATCH 1/3] - moved receive timeout logic to ActorRef - receivetimeout now only inititiated when receiveTimeout property is set --- akka-core/src/main/scala/actor/Actor.scala | 23 +-------- akka-core/src/main/scala/actor/ActorRef.scala | 48 ++++++++++++++----- .../src/test/scala/ReceiveTimeoutSpec.scala | 20 ++++++-- 3 files changed, 52 insertions(+), 39 deletions(-) diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 33bf9bf998..b38401a4a6 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -62,7 +62,6 @@ class ActorInitializationException private[akka](message: String) extends Runtim */ object Actor extends Logging { val TIMEOUT = config.getInt("akka.actor.timeout", 5000) - val RECEIVE_TIMEOUT = config.getInt("akka.actor.receive.timeout", 30000) val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) /** @@ -435,7 +434,6 @@ trait Actor extends Logging { // ========================================= private[akka] def base: Receive = try { - cancelReceiveTimeout lifeCycles orElse (self.hotswap getOrElse receive) } catch { case e: NullPointerException => throw new IllegalActorStateException( @@ -443,7 +441,7 @@ trait Actor extends Logging { } private val lifeCycles: Receive = { - case HotSwap(code) => self.hotswap = code; checkReceiveTimeout + case HotSwap(code) => self.hotswap = code; self.checkReceiveTimeout // FIXME : how to reschedule receivetimeout on hotswap? case Restart(reason) => self.restart(reason) case Exit(dead, reason) => self.handleTrapExit(dead, reason) case Link(child) => self.link(child) @@ -451,25 +449,6 @@ trait Actor extends Logging { case UnlinkAndStop(child) => self.unlink(child); child.stop case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message") } - - @volatile protected[akka] var timeoutActor: Option[ActorRef] = None - - private[akka] def cancelReceiveTimeout = { - timeoutActor.foreach { - x => - Scheduler.unschedule(x) - timeoutActor = None - log.debug("Timeout canceled") - } - } - - private[akka] def checkReceiveTimeout = { - //if ((self.hotswap getOrElse receive).isDefinedAt(ReceiveTimeout)) { // FIXME use when 'self' is safe to use, throws NPE sometimes - if ((receive ne null) && receive.isDefinedAt(ReceiveTimeout)) { - log.debug("Scheduling timeout for Actor [" + toString + "]") - timeoutActor = Some(Scheduler.scheduleOnce(self, ReceiveTimeout, self.receiveTimeout, TimeUnit.MILLISECONDS)) - } - } } private[actor] class AnyOptionAsTypedOption(anyOption: Option[Any]) { diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index e10a55e3b1..8fbec1ed85 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -24,12 +24,12 @@ import jsr166x.{Deque, ConcurrentLinkedDeque} import java.net.InetSocketAddress import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.ConcurrentHashMap import java.util.{Map => JMap} import java.lang.reflect.Field import RemoteActorSerialization._ import com.google.protobuf.ByteString +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} /** * ActorRef is an immutable and serializable handle to an Actor. @@ -72,6 +72,8 @@ trait ActorRef extends TransactionManagement { @volatile protected[akka] var _isBeingRestarted = false @volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServer.HOSTNAME, RemoteServer.PORT) + @volatile protected[akka] var _timeoutActor: Option[ActorRef] = None + @volatile protected[akka] var startOnCreation = false @volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false protected[this] val guard = new ReentrantGuard @@ -100,9 +102,9 @@ trait ActorRef extends TransactionManagement { * User overridable callback/setting. *

* Defines the default timeout for an initial receive invocation. - * Used if the receive (or HotSwap) contains a case handling ReceiveTimeout. + * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. */ - @volatile var receiveTimeout: Long = Actor.RECEIVE_TIMEOUT + @volatile var receiveTimeout: Option[Long] = None /** * User overridable callback/setting. @@ -386,7 +388,7 @@ trait ActorRef extends TransactionManagement { * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists. * However, it will always participate in an existing transaction. */ - def makeTransactionRequired(): Unit + def makeTransactionRequired: Unit /** * Sets the transaction configuration for this actor. Needs to be invoked before the actor is started. @@ -429,12 +431,12 @@ trait ActorRef extends TransactionManagement { * Shuts down the actor its dispatcher and message queue. * Alias for 'stop'. */ - def exit() = stop() + def exit = stop /** * Shuts down the actor its dispatcher and message queue. */ - def stop(): Unit + def stop: Unit /** * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will @@ -510,7 +512,7 @@ trait ActorRef extends TransactionManagement { /** * Shuts down and removes all linked actors. */ - def shutdownLinkedActors(): Unit + def shutdownLinkedActors: Unit protected[akka] def invoke(messageHandle: MessageInvocation): Unit @@ -551,6 +553,24 @@ trait ActorRef extends TransactionManagement { } override def toString = "Actor[" + id + ":" + uuid + "]" + + protected[akka] def cancelReceiveTimeout = { + _timeoutActor.foreach { + x => + Scheduler.unschedule(x) + _timeoutActor = None + log.debug("Timeout canceled") + } + } + + protected [akka] def checkReceiveTimeout = { + cancelReceiveTimeout + receiveTimeout.foreach { timeout => + log.debug("Scheduling timeout for Actor [" + toString + "]") + _timeoutActor = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, timeout, TimeUnit.MILLISECONDS)) + } + } + } /** @@ -680,7 +700,7 @@ sealed class LocalActorRef private[akka]( * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists. * However, it will always participate in an existing transaction. */ - def makeTransactionRequired() = guard.withGuard { + def makeTransactionRequired = guard.withGuard { if (!isRunning || isBeingRestarted) isTransactor = true else throw new ActorInitializationException( "Can not make actor transaction required after it has been started") @@ -736,6 +756,7 @@ sealed class LocalActorRef private[akka]( */ def stop = guard.withGuard { if (isRunning) { + cancelReceiveTimeout dispatcher.unregister(this) _transactionFactory = None _isRunning = false @@ -873,7 +894,7 @@ sealed class LocalActorRef private[akka]( /** * Shuts down and removes all linked actors. */ - def shutdownLinkedActors(): Unit = guard.withGuard { + def shutdownLinkedActors: Unit = guard.withGuard { linkedActorsAsList.foreach(_.stop) linkedActors.clear } @@ -1000,6 +1021,7 @@ sealed class LocalActorRef private[akka]( setTransactionSet(txSet) try { + cancelReceiveTimeout // FIXME: leave this here? if (isTransactor) { val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory) atomic(txFactory) { @@ -1158,7 +1180,7 @@ sealed class LocalActorRef private[akka]( ActorRegistry.register(this) if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name) clearTransactionSet // clear transaction set that might have been created if atomic block has been used within the Actor constructor body - actor.checkReceiveTimeout + checkReceiveTimeout } private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) { @@ -1221,7 +1243,7 @@ private[akka] case class RemoteActorRef private[akka] ( this } - def stop(): Unit = { + def stop: Unit = { _isRunning = false _isShutDown = true } @@ -1237,7 +1259,7 @@ private[akka] case class RemoteActorRef private[akka] ( def actorClass: Class[_ <: Actor] = unsupported def dispatcher_=(md: MessageDispatcher): Unit = unsupported def dispatcher: MessageDispatcher = unsupported - def makeTransactionRequired(): Unit = unsupported + def makeTransactionRequired: Unit = unsupported def transactionConfig_=(config: TransactionConfig): Unit = unsupported def transactionConfig: TransactionConfig = unsupported def makeRemote(hostname: String, port: Int): Unit = unsupported @@ -1254,7 +1276,7 @@ private[akka] case class RemoteActorRef private[akka] ( def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = unsupported def mailboxSize: Int = unsupported def supervisor: Option[ActorRef] = unsupported - def shutdownLinkedActors(): Unit = unsupported + def shutdownLinkedActors: Unit = unsupported protected[akka] def mailbox: Deque[MessageInvocation] = unsupported protected[akka] def restart(reason: Throwable): Unit = unsupported protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported diff --git a/akka-core/src/test/scala/ReceiveTimeoutSpec.scala b/akka-core/src/test/scala/ReceiveTimeoutSpec.scala index 938cb08d43..5c50337894 100644 --- a/akka-core/src/test/scala/ReceiveTimeoutSpec.scala +++ b/akka-core/src/test/scala/ReceiveTimeoutSpec.scala @@ -14,7 +14,7 @@ class ReceiveTimeoutSpec extends JUnitSuite { val timeoutLatch = new StandardLatch val timeoutActor = actorOf(new Actor { - self.receiveTimeout = 500 + self.receiveTimeout = Some(500L) protected def receive = { case ReceiveTimeout => timeoutLatch.open @@ -28,7 +28,7 @@ class ReceiveTimeoutSpec extends JUnitSuite { val timeoutLatch = new StandardLatch val timeoutActor = actorOf(new Actor { - self.receiveTimeout = 500 + self.receiveTimeout = Some(500L) protected def receive = { case ReceiveTimeout => timeoutLatch.open @@ -51,7 +51,7 @@ class ReceiveTimeoutSpec extends JUnitSuite { val timeoutLatch = new StandardLatch case object Tick val timeoutActor = actorOf(new Actor { - self.receiveTimeout = 500 + self.receiveTimeout = Some(500L) protected def receive = { case Tick => () @@ -60,6 +60,18 @@ class ReceiveTimeoutSpec extends JUnitSuite { }).start timeoutActor ! Tick - assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS) == false) + assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == false) + } + + @Test def timeoutShouldNotBeSentWhenNotSpecified = { + val timeoutLatch = new StandardLatch + val timeoutActor = actorOf(new Actor { + + protected def receive = { + case ReceiveTimeout => timeoutLatch.open + } + }).start + + assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS) == false) } } From da275f45ddd38af36c335105230381e21afe9769 Mon Sep 17 00:00:00 2001 From: momania Date: Mon, 5 Jul 2010 13:57:53 +0200 Subject: [PATCH 2/3] set emtpy parens back --- akka-core/src/main/scala/actor/ActorRef.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 8fbec1ed85..447931019a 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -388,7 +388,7 @@ trait ActorRef extends TransactionManagement { * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists. * However, it will always participate in an existing transaction. */ - def makeTransactionRequired: Unit + def makeTransactionRequired(): Unit /** * Sets the transaction configuration for this actor. Needs to be invoked before the actor is started. @@ -431,12 +431,12 @@ trait ActorRef extends TransactionManagement { * Shuts down the actor its dispatcher and message queue. * Alias for 'stop'. */ - def exit = stop + def exit() = stop() /** * Shuts down the actor its dispatcher and message queue. */ - def stop: Unit + def stop(): Unit /** * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will @@ -512,7 +512,7 @@ trait ActorRef extends TransactionManagement { /** * Shuts down and removes all linked actors. */ - def shutdownLinkedActors: Unit + def shutdownLinkedActors(): Unit protected[akka] def invoke(messageHandle: MessageInvocation): Unit @@ -700,7 +700,7 @@ sealed class LocalActorRef private[akka]( * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists. * However, it will always participate in an existing transaction. */ - def makeTransactionRequired = guard.withGuard { + def makeTransactionRequired() = guard.withGuard { if (!isRunning || isBeingRestarted) isTransactor = true else throw new ActorInitializationException( "Can not make actor transaction required after it has been started") @@ -754,7 +754,7 @@ sealed class LocalActorRef private[akka]( /** * Shuts down the actor its dispatcher and message queue. */ - def stop = guard.withGuard { + def stop() = guard.withGuard { if (isRunning) { cancelReceiveTimeout dispatcher.unregister(this) @@ -894,7 +894,7 @@ sealed class LocalActorRef private[akka]( /** * Shuts down and removes all linked actors. */ - def shutdownLinkedActors: Unit = guard.withGuard { + def shutdownLinkedActors(): Unit = guard.withGuard { linkedActorsAsList.foreach(_.stop) linkedActors.clear } From 2deb9faf3d04e64a341d49f14a0803f825bb3367 Mon Sep 17 00:00:00 2001 From: Johan Rask Date: Mon, 5 Jul 2010 15:53:49 +0200 Subject: [PATCH 3/3] #304 Fixed Support for ApplicationContextAware in akka-spring --- .../src/main/scala/ActiveObjectFactoryBean.scala | 10 ++++++++-- .../se/scalablesolutions/akka/spring/SampleBean.java | 11 ++++++++++- akka-spring/src/test/resources/appContext.xml | 4 +++- .../src/test/scala/ActiveObjectFactoryBeanTest.scala | 4 ++++ 4 files changed, 25 insertions(+), 4 deletions(-) diff --git a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala index 4f6ea37148..1e669114ce 100644 --- a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala +++ b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala @@ -12,6 +12,7 @@ import org.springframework.beans.BeanWrapper import org.springframework.beans.BeanUtils import org.springframework.util.ReflectionUtils import org.springframework.util.StringUtils +import org.springframework.context.{ApplicationContext,ApplicationContextAware} import org.springframework.beans.factory.BeanFactory import org.springframework.beans.factory.config.AbstractFactoryBean import se.scalablesolutions.akka.actor.ActiveObject @@ -26,7 +27,7 @@ import se.scalablesolutions.akka.util.Logging * @author michaelkober * @author Johan Rask */ -class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging { +class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with ApplicationContextAware { import StringReflect._ import AkkaSpringConfigurationTags._ @@ -42,6 +43,7 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging { @BeanProperty var dispatcher: DispatcherProperties = _ @BeanProperty var scope:String = VAL_SCOPE_SINGLETON @BeanProperty var property:PropertyEntries = _ + @BeanProperty var applicationContext:ApplicationContext = _ /* * @see org.springframework.beans.factory.FactoryBean#getObjectType() @@ -78,7 +80,11 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging { private def setProperties(ref:AnyRef) : AnyRef = { log.debug("Processing properties and dependencies for target class %s",target) val beanWrapper = new BeanWrapperImpl(ref); - for(entry <- property.entryList) { + if(ref.isInstanceOf[ApplicationContextAware]) { + log.debug("Setting application context") + beanWrapper.setPropertyValue("applicationContext",applicationContext) + } + for(entry <- property.entryList) { val propertyDescriptor = BeanUtils.getPropertyDescriptor(ref.getClass,entry.name) val method = propertyDescriptor.getWriteMethod(); diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java index 37953173ec..7e29f5588d 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/SampleBean.java @@ -1,6 +1,15 @@ package se.scalablesolutions.akka.spring; -public class SampleBean { +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.ApplicationContext; + +public class SampleBean implements ApplicationContextAware { + + public boolean gotApplicationContext = false; + + public void setApplicationContext(ApplicationContext context) { + gotApplicationContext = true; + } public String foo(String s) { return "hello " + s; diff --git a/akka-spring/src/test/resources/appContext.xml b/akka-spring/src/test/resources/appContext.xml index 5648fa2fdf..2da3ae0c00 100644 --- a/akka-spring/src/test/resources/appContext.xml +++ b/akka-spring/src/test/resources/appContext.xml @@ -15,7 +15,9 @@ - + + + \ No newline at end of file diff --git a/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala b/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala index f1c11d0eee..39544804da 100644 --- a/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala +++ b/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala @@ -69,6 +69,10 @@ class ActiveObjectFactoryBeanTest extends Spec with ShouldMatchers { var ctx = new ClassPathXmlApplicationContext("appContext.xml"); val target:ResourceEditor = ctx.getBean("bean").asInstanceOf[ResourceEditor] assert(target.getSource === "someString") + + val sampleBean = ctx.getBean("sample").asInstanceOf[SampleBean]; + Thread.sleep(300) + assert(sampleBean.gotApplicationContext) } } }