Fixing race-conditions, now works albeit inefficiently when adding/removing actors rapidly

This commit is contained in:
Viktor Klang 2010-10-24 15:22:28 +02:00
parent 594efe9464
commit b80fb9096a
6 changed files with 32 additions and 26 deletions

View file

@ -780,7 +780,7 @@ class LocalActorRef private[akka] (
if (isShutdown) throw new ActorStartException(
"Can't restart an actor that has been shut down with 'stop' or 'exit'")
if (!isRunning) {
dispatcher.register(this)
dispatcher.attach(this)
if (isTransactor)
transactorConfig = transactorConfig.copy(factory = Some(TransactionFactory(transactorConfig.config, id)))
@ -802,7 +802,7 @@ class LocalActorRef private[akka] (
if (isRunning) {
receiveTimeout = None
cancelReceiveTimeout
dispatcher.unregister(this)
dispatcher.detach(this)
transactorConfig = transactorConfig.copy(factory = None)
_status = ActorRefInternals.SHUTDOWN
actor.postStop

View file

@ -132,16 +132,15 @@ class ExecutorBasedEventDrivenDispatcher(
case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported")
}
def start: Unit = if (active.isOff) active switchOn {
protected def start: Unit = active switchOn {
log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
}
def shutdown: Unit = if (active.isOn) active switchOff {
protected def shutdown: Unit = active switchOff {
val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
if (old ne null) {
log.debug("Shutting down %s", toString)
old.shutdownNow()
uuids.clear
}
}

View file

@ -170,16 +170,15 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
} else false
}
def start = active switchOn {
protected def start = active switchOn {
log.debug("Starting up %s",toString)
}
def shutdown: Unit = if (active.isOn) active switchOff {
protected def shutdown: Unit = active switchOff {
val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
if (old ne null) {
log.debug("Shutting down %s", toString)
old.shutdownNow()
uuids.clear
}
}
@ -194,9 +193,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
executorService.get() execute mbox
}
def ensureNotActive(): Unit = if (active.isOn) throw new IllegalActorStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")
override val toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]"
def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = mailboxType match {

View file

@ -9,7 +9,7 @@ import se.scalablesolutions.akka.actor.{ActorRegistry, ActorRef, Uuid, ActorInit
import org.multiverse.commitbarriers.CountDownCommitBarrier
import java.util.concurrent._
import se.scalablesolutions.akka.util. {Logging, HashCode}
import se.scalablesolutions.akka.util. {ReentrantGuard, Logging, HashCode}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -58,24 +58,33 @@ final class MessageInvocation(val receiver: ActorRef,
trait MessageDispatcher extends MailboxFactory with Logging {
protected val uuids = new ConcurrentSkipListSet[Uuid]
protected val guard = new ReentrantGuard
def register(actorRef: ActorRef) {
start
final def attach(actorRef: ActorRef): Unit = guard withGuard {
register(actorRef)
}
final def detach(actorRef: ActorRef): Unit = guard withGuard {
unregister(actorRef)
}
protected def register(actorRef: ActorRef) {
if (uuids.isEmpty()) start
if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef)
uuids add actorRef.uuid
}
def unregister(actorRef: ActorRef) = {
uuids remove actorRef.uuid
protected def unregister(actorRef: ActorRef) = {
if (uuids remove actorRef.uuid) {
actorRef.mailbox = null
if (uuids.isEmpty) shutdown // shut down in the dispatcher's references is zero
}
}
def stopAllLinkedActors {
val i = uuids.iterator
while(i.hasNext()) {
val uuid = i.next()
i.remove()
ActorRegistry.actorFor(uuid) match {
case Some(actor) => actor.stop
case None => log.warn("stopAllLinkedActors couldn't find linked actor: " + uuid)

View file

@ -9,8 +9,7 @@ import java.util.concurrent._
import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
import se.scalablesolutions.akka.actor.IllegalActorStateException
import se.scalablesolutions.akka.util. {Duration, Logger, Logging}
import se.scalablesolutions.akka.util. {Duration, Logging}
object ThreadPoolConfig {
type Bounds = Int
@ -194,7 +193,7 @@ class MonitorableThread(runnable: Runnable, name: String)
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate with Logging {
class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate {
protected val semaphore = new Semaphore(bound)
override def execute(command: Runnable) = {
@ -219,7 +218,7 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend
}
}
trait ExecutorServiceDelegate extends ExecutorService {
trait ExecutorServiceDelegate extends ExecutorService with Logging {
def executor: ExecutorService
@ -254,7 +253,10 @@ trait LazyExecutorService extends ExecutorServiceDelegate {
def createExecutor: ExecutorService
lazy val executor = createExecutor
lazy val executor = {
log.info("Lazily initializing ExecutorService for ",this)
createExecutor
}
}
class LazyExecutorServiceWrapper(executorFactory: => ExecutorService) extends LazyExecutorService {

View file

@ -138,8 +138,8 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers {
}
unpackExecutorService(dispatcher match {
case e: ExecutorBasedEventDrivenDispatcher => e.start; e.executorService.get()
case e: ExecutorBasedEventDrivenWorkStealingDispatcher => e.start; e.executorService.get()
case e: ExecutorBasedEventDrivenDispatcher => e.executorService.get()
case e: ExecutorBasedEventDrivenWorkStealingDispatcher => e.executorService.get()
case x => throw new IllegalStateException("Illegal dispatcher type: " + x)
}).asInstanceOf[ThreadPoolExecutor]
}