Merge remote branch 'upstream/master'

This commit is contained in:
David Greco 2010-09-20 19:12:24 +02:00
commit 9648ef9b68
5 changed files with 70 additions and 67 deletions

View file

@ -28,6 +28,17 @@ import java.lang.reflect.Field
import scala.reflect.BeanProperty
object ActorRefStatus {
/** LifeCycles for ActorRefs
*/
private[akka] sealed trait StatusType
object UNSTARTED extends StatusType
object RUNNING extends StatusType
object BEING_RESTARTED extends StatusType
object SHUTDOWN extends StatusType
}
/**
* ActorRef is an immutable and serializable handle to an Actor.
* <p/>
@ -68,9 +79,7 @@ trait ActorRef extends
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile protected[akka] var _uuid = UUID.newUuid.toString
@volatile protected[this] var _isRunning = false
@volatile protected[this] var _isShutDown = false
@volatile protected[akka] var _isBeingRestarted = false
@volatile protected[this] var _status: ActorRefStatus.StatusType = ActorRefStatus.UNSTARTED
@volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServerModule.HOSTNAME, RemoteServerModule.PORT)
@volatile protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None
@volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
@ -229,17 +238,25 @@ trait ActorRef extends
/**
* Is the actor being restarted?
*/
def isBeingRestarted: Boolean = _isBeingRestarted
def isBeingRestarted: Boolean = _status == ActorRefStatus.BEING_RESTARTED
/**
* Is the actor running?
*/
def isRunning: Boolean = _isRunning
def isRunning: Boolean = _status match {
case ActorRefStatus.BEING_RESTARTED | ActorRefStatus.RUNNING => true
case _ => false
}
/**
* Is the actor shut down?
*/
def isShutdown: Boolean = _isShutDown
def isShutdown: Boolean = _status == ActorRefStatus.SHUTDOWN
/**
* Is the actor ever started?
*/
def isUnstarted: Boolean = _status == ActorRefStatus.UNSTARTED
/**
* Is the actor able to handle the message passed in as arguments?
@ -800,7 +817,7 @@ class LocalActorRef private[akka](
if (isTransactor) {
_transactionFactory = Some(TransactionFactory(_transactionConfig, id))
}
_isRunning = true
_status = ActorRefStatus.RUNNING
if (!isInInitialization) initializeActorInstance
else runActorInitialization = true
}
@ -815,8 +832,7 @@ class LocalActorRef private[akka](
cancelReceiveTimeout
dispatcher.unregister(this)
_transactionFactory = None
_isRunning = false
_isShutDown = true
_status = ActorRefStatus.SHUTDOWN
actor.postStop
ActorRegistry.unregister(this)
if (isRemotingEnabled) {
@ -1000,7 +1016,7 @@ class LocalActorRef private[akka](
}
/**
* Callback for the dispatcher. This is the ingle entry point to the user Actor implementation.
* Callback for the dispatcher. This is the single entry point to the user Actor implementation.
*/
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard {
if (isShutdown)
@ -1067,7 +1083,7 @@ class LocalActorRef private[akka](
stop
} else {
_isBeingRestarted = true
_status = ActorRefStatus.BEING_RESTARTED
val failedActor = actorInstance.get
guard.withGuard {
lifeCycle match {
@ -1077,10 +1093,12 @@ class LocalActorRef private[akka](
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
Actor.log.debug("Restarting linked actors for actor [%s].", id)
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
if (isProxyableDispatcher(failedActor)) restartProxyableDispatcher(failedActor, reason)
else restartActor(failedActor, reason)
_isBeingRestarted = false
else restartActor(failedActor, reason)
_status = ActorRefStatus.RUNNING
}
}
}
@ -1236,7 +1254,7 @@ class LocalActorRef private[akka](
private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = {
Actor.log.error(reason, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
_isBeingRestarted = true
_status = ActorRefStatus.BEING_RESTARTED
// abort transaction set
if (isTransactionSetInScope) {
val txSet = getTransactionSetInScope
@ -1376,13 +1394,12 @@ private[akka] case class RemoteActorRef private[akka] (
}
def start: ActorRef = {
_isRunning = true
_status = ActorRefStatus.RUNNING
this
}
def stop: Unit = {
_isRunning = false
_isShutDown = true
_status = ActorRefStatus.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
}

View file

@ -189,3 +189,13 @@ class ExecutorBasedEventDrivenDispatcher(
buildThreadPool
}
}
/**
* Usable to create a single-threaded dispatcher
*/
object SingleThread extends Function1[ThreadPoolBuilder,Unit] {
def apply(b: ThreadPoolBuilder) {
b setCorePoolSize 1
b setMaxPoolSize 1
}
}

View file

@ -11,6 +11,14 @@ import se.scalablesolutions.akka.config.Config.config
import concurrent.forkjoin.{TransferQueue, LinkedTransferQueue}
import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue}
object ThreadBasedDispatcher {
def oneThread(b: ThreadPoolBuilder) {
b setCorePoolSize 1
b setMaxPoolSize 1
b setAllowCoreThreadTimeout true
}
}
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
*
@ -18,16 +26,14 @@ import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, Lin
*/
class ThreadBasedDispatcher(private val actor: ActorRef,
val mailboxConfig: MailboxConfig
) extends MessageDispatcher {
) extends ExecutorBasedEventDrivenDispatcher(
actor.getClass.getName + ":" + actor.uuid,
Dispatchers.THROUGHPUT,
-1,
mailboxConfig,
ThreadBasedDispatcher.oneThread) {
def this(actor: ActorRef, capacity: Int) = this(actor,MailboxConfig(capacity,None,true))
def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java
private val name = actor.getClass.getName + ":" + actor.uuid
private val threadName = "akka:thread-based:dispatcher:" + name
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(blockDequeue = true)
override def register(actorRef: ActorRef) = {
if(actorRef != actor)
@ -36,35 +42,5 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
super.register(actorRef)
}
def mailbox = actor.mailbox.asInstanceOf[Queue[MessageInvocation] with MessageQueue]
def mailboxSize(a: ActorRef) = mailbox.size
def dispatch(invocation: MessageInvocation) = mailbox enqueue invocation
def start = if (!active) {
log.debug("Starting up %s", toString)
active = true
selectorThread = new Thread(threadName) {
override def run = {
while (active) {
try {
actor.invoke(mailbox.dequeue)
} catch { case e: InterruptedException => active = false }
}
}
}
selectorThread.start
}
def isShutdown = !active
def shutdown = if (active) {
log.debug("Shutting down %s", toString)
active = false
selectorThread.interrupt
uuids.clear
}
override def toString = "ThreadBasedDispatcher[" + threadName + "]"
override def toString = "ThreadBasedDispatcher[" + name + "]"
}

View file

@ -18,10 +18,10 @@ import se.scalablesolutions.akka.camel.{Failure, CamelMessageConversion, Message
import CamelMessageConversion.toExchangeAdapter
import se.scalablesolutions.akka.dispatch.{CompletableFuture, MessageInvocation, MessageDispatcher}
import se.scalablesolutions.akka.stm.TransactionConfig
import se.scalablesolutions.akka.actor.{ScalaActorRef, ActorRegistry, Actor, ActorRef}
import se.scalablesolutions.akka.AkkaException
import scala.reflect.BeanProperty
import se.scalablesolutions.akka.actor._
/**
* Camel component for sending messages to and receiving replies from (untyped) actors.
@ -199,13 +199,12 @@ private[akka] object AsyncCallbackAdapter {
private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCallback) extends ActorRef with ScalaActorRef {
def start = {
_isRunning = true
_status = ActorRefStatus.RUNNING
this
}
def stop() = {
_isRunning = false
_isShutDown = true
_status = ActorRefStatus.SHUTDOWN
}
/**

View file

@ -383,9 +383,9 @@ class RemoteServer extends Logging with ListenerManagement {
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
private[akka] def actors() = RemoteServer.actorsFor(address).actors
private[akka] def actors() = RemoteServer.actorsFor(address).actors
private[akka] def actorsByUuid() = RemoteServer.actorsFor(address).actorsByUuid
private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors
private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors
private[akka] def typedActorsByUuid() = RemoteServer.actorsFor(address).typedActorsByUuid
}
@ -508,11 +508,12 @@ class RemoteServerHandler(
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
log.debug("Received RemoteRequestProtocol[\n%s]", request.toString)
val actorType = request.getActorInfo.getActorType
if (actorType == SCALA_ACTOR) dispatchToActor(request, channel)
else if (actorType == JAVA_ACTOR) throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported")
else if (actorType == TYPED_ACTOR) dispatchToTypedActor(request, channel)
else throw new IllegalActorStateException("Unknown ActorType [" + actorType + "]")
request.getActorInfo.getActorType match {
case SCALA_ACTOR => dispatchToActor(request, channel)
case TYPED_ACTOR => dispatchToTypedActor(request, channel)
case JAVA_ACTOR => throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported")
case other => throw new IllegalActorStateException("Unknown ActorType [" + other + "]")
}
}
private def dispatchToActor(request: RemoteRequestProtocol, channel: Channel) = {