Merge branch 'master' of git@github.com:jboner/akka and resolve conflicts in akka-spring

This commit is contained in:
Martin Krasser 2010-07-06 07:32:26 +02:00
commit ce84b386bf
7 changed files with 72 additions and 38 deletions

View file

@ -62,7 +62,6 @@ class ActorInitializationException private[akka](message: String) extends Runtim
*/
object Actor extends Logging {
val TIMEOUT = config.getInt("akka.actor.timeout", 5000)
val RECEIVE_TIMEOUT = config.getInt("akka.actor.receive.timeout", 30000)
val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
/**
@ -435,7 +434,6 @@ trait Actor extends Logging {
// =========================================
private[akka] def base: Receive = try {
cancelReceiveTimeout
lifeCycles orElse (self.hotswap getOrElse receive)
} catch {
case e: NullPointerException => throw new IllegalActorStateException(
@ -443,7 +441,7 @@ trait Actor extends Logging {
}
private val lifeCycles: Receive = {
case HotSwap(code) => self.hotswap = code; checkReceiveTimeout
case HotSwap(code) => self.hotswap = code; self.checkReceiveTimeout // FIXME : how to reschedule receivetimeout on hotswap?
case Restart(reason) => self.restart(reason)
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
@ -451,25 +449,6 @@ trait Actor extends Logging {
case UnlinkAndStop(child) => self.unlink(child); child.stop
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
}
@volatile protected[akka] var timeoutActor: Option[ActorRef] = None
private[akka] def cancelReceiveTimeout = {
timeoutActor.foreach {
x =>
Scheduler.unschedule(x)
timeoutActor = None
log.debug("Timeout canceled")
}
}
private[akka] def checkReceiveTimeout = {
//if ((self.hotswap getOrElse receive).isDefinedAt(ReceiveTimeout)) { // FIXME use when 'self' is safe to use, throws NPE sometimes
if ((receive ne null) && receive.isDefinedAt(ReceiveTimeout)) {
log.debug("Scheduling timeout for Actor [" + toString + "]")
timeoutActor = Some(Scheduler.scheduleOnce(self, ReceiveTimeout, self.receiveTimeout, TimeUnit.MILLISECONDS))
}
}
}
private[actor] class AnyOptionAsTypedOption(anyOption: Option[Any]) {

View file

@ -24,12 +24,12 @@ import jsr166x.{Deque, ConcurrentLinkedDeque}
import java.net.InetSocketAddress
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.ConcurrentHashMap
import java.util.{Map => JMap}
import java.lang.reflect.Field
import RemoteActorSerialization._
import com.google.protobuf.ByteString
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
/**
* ActorRef is an immutable and serializable handle to an Actor.
@ -72,6 +72,8 @@ trait ActorRef extends TransactionManagement {
@volatile protected[akka] var _isBeingRestarted = false
@volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServer.HOSTNAME, RemoteServer.PORT)
@volatile protected[akka] var _timeoutActor: Option[ActorRef] = None
@volatile protected[akka] var startOnCreation = false
@volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
protected[this] val guard = new ReentrantGuard
@ -100,9 +102,9 @@ trait ActorRef extends TransactionManagement {
* User overridable callback/setting.
* <p/>
* Defines the default timeout for an initial receive invocation.
* Used if the receive (or HotSwap) contains a case handling ReceiveTimeout.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
*/
@volatile var receiveTimeout: Long = Actor.RECEIVE_TIMEOUT
@volatile var receiveTimeout: Option[Long] = None
/**
* User overridable callback/setting.
@ -551,6 +553,24 @@ trait ActorRef extends TransactionManagement {
}
override def toString = "Actor[" + id + ":" + uuid + "]"
protected[akka] def cancelReceiveTimeout = {
_timeoutActor.foreach {
x =>
Scheduler.unschedule(x)
_timeoutActor = None
log.debug("Timeout canceled")
}
}
protected [akka] def checkReceiveTimeout = {
cancelReceiveTimeout
receiveTimeout.foreach { timeout =>
log.debug("Scheduling timeout for Actor [" + toString + "]")
_timeoutActor = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, timeout, TimeUnit.MILLISECONDS))
}
}
}
/**
@ -734,8 +754,9 @@ sealed class LocalActorRef private[akka](
/**
* Shuts down the actor its dispatcher and message queue.
*/
def stop = guard.withGuard {
def stop() = guard.withGuard {
if (isRunning) {
cancelReceiveTimeout
dispatcher.unregister(this)
_transactionFactory = None
_isRunning = false
@ -1000,6 +1021,7 @@ sealed class LocalActorRef private[akka](
setTransactionSet(txSet)
try {
cancelReceiveTimeout // FIXME: leave this here?
if (isTransactor) {
val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory)
atomic(txFactory) {
@ -1158,7 +1180,7 @@ sealed class LocalActorRef private[akka](
ActorRegistry.register(this)
if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name)
clearTransactionSet // clear transaction set that might have been created if atomic block has been used within the Actor constructor body
actor.checkReceiveTimeout
checkReceiveTimeout
}
private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
@ -1221,7 +1243,7 @@ private[akka] case class RemoteActorRef private[akka] (
this
}
def stop(): Unit = {
def stop: Unit = {
_isRunning = false
_isShutDown = true
}
@ -1237,7 +1259,7 @@ private[akka] case class RemoteActorRef private[akka] (
def actorClass: Class[_ <: Actor] = unsupported
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
def dispatcher: MessageDispatcher = unsupported
def makeTransactionRequired(): Unit = unsupported
def makeTransactionRequired: Unit = unsupported
def transactionConfig_=(config: TransactionConfig): Unit = unsupported
def transactionConfig: TransactionConfig = unsupported
def makeRemote(hostname: String, port: Int): Unit = unsupported
@ -1254,7 +1276,7 @@ private[akka] case class RemoteActorRef private[akka] (
def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = unsupported
def mailboxSize: Int = unsupported
def supervisor: Option[ActorRef] = unsupported
def shutdownLinkedActors(): Unit = unsupported
def shutdownLinkedActors: Unit = unsupported
protected[akka] def mailbox: Deque[MessageInvocation] = unsupported
protected[akka] def restart(reason: Throwable): Unit = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported

View file

@ -14,7 +14,7 @@ class ReceiveTimeoutSpec extends JUnitSuite {
val timeoutLatch = new StandardLatch
val timeoutActor = actorOf(new Actor {
self.receiveTimeout = 500
self.receiveTimeout = Some(500L)
protected def receive = {
case ReceiveTimeout => timeoutLatch.open
@ -28,7 +28,7 @@ class ReceiveTimeoutSpec extends JUnitSuite {
val timeoutLatch = new StandardLatch
val timeoutActor = actorOf(new Actor {
self.receiveTimeout = 500
self.receiveTimeout = Some(500L)
protected def receive = {
case ReceiveTimeout => timeoutLatch.open
@ -51,7 +51,7 @@ class ReceiveTimeoutSpec extends JUnitSuite {
val timeoutLatch = new StandardLatch
case object Tick
val timeoutActor = actorOf(new Actor {
self.receiveTimeout = 500
self.receiveTimeout = Some(500L)
protected def receive = {
case Tick => ()
@ -60,6 +60,18 @@ class ReceiveTimeoutSpec extends JUnitSuite {
}).start
timeoutActor ! Tick
assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS) == false)
assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == false)
}
@Test def timeoutShouldNotBeSentWhenNotSpecified = {
val timeoutLatch = new StandardLatch
val timeoutActor = actorOf(new Actor {
protected def receive = {
case ReceiveTimeout => timeoutLatch.open
}
}).start
assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS) == false)
}
}

View file

@ -14,6 +14,7 @@ import org.springframework.beans.BeanWrapper
import org.springframework.beans.BeanUtils
import org.springframework.beans.factory.BeanFactory
import org.springframework.beans.factory.config.AbstractFactoryBean
import org.springframework.context.{ApplicationContext,ApplicationContextAware}
import org.springframework.util.ReflectionUtils
import org.springframework.util.StringUtils
@ -29,7 +30,7 @@ import se.scalablesolutions.akka.util.Logging
* @author <a href="johan.rask@jayway.com">Johan Rask</a>
* @author Martin Krasser
*/
class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging {
class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with ApplicationContextAware {
import StringReflect._
import AkkaSpringConfigurationTags._
@ -46,6 +47,7 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging {
@BeanProperty var dispatcher: DispatcherProperties = _
@BeanProperty var scope:String = VAL_SCOPE_SINGLETON
@BeanProperty var property:PropertyEntries = _
@BeanProperty var applicationContext:ApplicationContext = _
/*
* @see org.springframework.beans.factory.FactoryBean#getObjectType()
@ -87,7 +89,11 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging {
private def setProperties(ref:AnyRef) : AnyRef = {
log.debug("Processing properties and dependencies for target class %s",target)
val beanWrapper = new BeanWrapperImpl(ref);
for(entry <- property.entryList) {
if(ref.isInstanceOf[ApplicationContextAware]) {
log.debug("Setting application context")
beanWrapper.setPropertyValue("applicationContext",applicationContext)
}
for(entry <- property.entryList) {
val propertyDescriptor = BeanUtils.getPropertyDescriptor(ref.getClass,entry.name)
val method = propertyDescriptor.getWriteMethod();

View file

@ -1,13 +1,23 @@
package se.scalablesolutions.akka.spring;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationContext;
import se.scalablesolutions.akka.actor.annotation.shutdown;
public class SampleBean {
public class SampleBean implements ApplicationContextAware {
public boolean down;
public boolean gotApplicationContext;
public SampleBean() {
down = false;
gotApplicationContext = false;
}
public void setApplicationContext(ApplicationContext context) {
gotApplicationContext = true;
}
public String foo(String s) {

View file

@ -7,6 +7,7 @@
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka-0.10.xsd">
<akka:active-object id="sample" target="se.scalablesolutions.akka.spring.SampleBean" timeout="1000" />
<akka:active-object id="bean-singleton" target="se.scalablesolutions.akka.spring.SampleBean" timeout="1000"/>
<akka:active-object id="bean-prototype" target="se.scalablesolutions.akka.spring.SampleBean" timeout="1000" scope="prototype"/>
@ -21,4 +22,4 @@
<bean id="string" class="java.lang.String">
<constructor-arg value="someString"/>
</bean>
</beans>
</beans>

View file

@ -69,6 +69,10 @@ class ActiveObjectFactoryBeanTest extends Spec with ShouldMatchers {
var ctx = new ClassPathXmlApplicationContext("appContext.xml");
val target:ResourceEditor = ctx.getBean("bean").asInstanceOf[ResourceEditor]
assert(target.getSource === "someString")
val sampleBean = ctx.getBean("sample").asInstanceOf[SampleBean];
Thread.sleep(300)
assert(sampleBean.gotApplicationContext)
}
it("should stop the created active object when scope is singleton and the context is closed") {