diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index f88f40fecf..870b74edd2 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -780,7 +780,7 @@ class LocalActorRef private[akka] ( if (isShutdown) throw new ActorStartException( "Can't restart an actor that has been shut down with 'stop' or 'exit'") if (!isRunning) { - dispatcher.register(this) + dispatcher.attach(this) if (isTransactor) transactorConfig = transactorConfig.copy(factory = Some(TransactionFactory(transactorConfig.config, id))) @@ -802,7 +802,7 @@ class LocalActorRef private[akka] ( if (isRunning) { receiveTimeout = None cancelReceiveTimeout - dispatcher.unregister(this) + dispatcher.detach(this) transactorConfig = transactorConfig.copy(factory = None) _status = ActorRefInternals.SHUTDOWN actor.postStop diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 49e94afc6f..2b0a7cbfe0 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -132,16 +132,15 @@ class ExecutorBasedEventDrivenDispatcher( case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported") } - def start: Unit = if (active.isOff) active switchOn { + protected def start: Unit = active switchOn { log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput) } - def shutdown: Unit = if (active.isOn) active switchOff { + protected def shutdown: Unit = active switchOff { val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory)) if (old ne null) { log.debug("Shutting down %s", toString) old.shutdownNow() - uuids.clear } } diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 53296cce71..a6e40d2f50 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -170,16 +170,15 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( } else false } - def start = active switchOn { + protected def start = active switchOn { log.debug("Starting up %s",toString) } - def shutdown: Unit = if (active.isOn) active switchOff { + protected def shutdown: Unit = active switchOff { val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory)) if (old ne null) { log.debug("Shutting down %s", toString) old.shutdownNow() - uuids.clear } } @@ -194,9 +193,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( executorService.get() execute mbox } - def ensureNotActive(): Unit = if (active.isOn) throw new IllegalActorStateException( - "Can't build a new thread pool for a dispatcher that is already up and running") - override val toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]" def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = mailboxType match { diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index c84a56e78d..1e18f057cf 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -9,7 +9,7 @@ import se.scalablesolutions.akka.actor.{ActorRegistry, ActorRef, Uuid, ActorInit import org.multiverse.commitbarriers.CountDownCommitBarrier import java.util.concurrent._ -import se.scalablesolutions.akka.util. {Logging, HashCode} +import se.scalablesolutions.akka.util. {ReentrantGuard, Logging, HashCode} /** * @author Jonas Bonér @@ -58,24 +58,33 @@ final class MessageInvocation(val receiver: ActorRef, trait MessageDispatcher extends MailboxFactory with Logging { protected val uuids = new ConcurrentSkipListSet[Uuid] + protected val guard = new ReentrantGuard - def register(actorRef: ActorRef) { - start + final def attach(actorRef: ActorRef): Unit = guard withGuard { + register(actorRef) + } + + final def detach(actorRef: ActorRef): Unit = guard withGuard { + unregister(actorRef) + } + + protected def register(actorRef: ActorRef) { + if (uuids.isEmpty()) start if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef) uuids add actorRef.uuid } - def unregister(actorRef: ActorRef) = { - uuids remove actorRef.uuid - actorRef.mailbox = null - if (uuids.isEmpty) shutdown // shut down in the dispatcher's references is zero + protected def unregister(actorRef: ActorRef) = { + if (uuids remove actorRef.uuid) { + actorRef.mailbox = null + if (uuids.isEmpty) shutdown // shut down in the dispatcher's references is zero + } } def stopAllLinkedActors { val i = uuids.iterator while(i.hasNext()) { val uuid = i.next() - i.remove() ActorRegistry.actorFor(uuid) match { case Some(actor) => actor.stop case None => log.warn("stopAllLinkedActors couldn't find linked actor: " + uuid) diff --git a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala index ef53e81eac..0e1a1ceafb 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -9,8 +9,7 @@ import java.util.concurrent._ import atomic.{AtomicLong, AtomicInteger} import ThreadPoolExecutor.CallerRunsPolicy -import se.scalablesolutions.akka.actor.IllegalActorStateException -import se.scalablesolutions.akka.util. {Duration, Logger, Logging} +import se.scalablesolutions.akka.util. {Duration, Logging} object ThreadPoolConfig { type Bounds = Int @@ -194,7 +193,7 @@ class MonitorableThread(runnable: Runnable, name: String) /** * @author Jonas Bonér */ -class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate with Logging { +class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate { protected val semaphore = new Semaphore(bound) override def execute(command: Runnable) = { @@ -219,7 +218,7 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend } } -trait ExecutorServiceDelegate extends ExecutorService { +trait ExecutorServiceDelegate extends ExecutorService with Logging { def executor: ExecutorService @@ -254,7 +253,10 @@ trait LazyExecutorService extends ExecutorServiceDelegate { def createExecutor: ExecutorService - lazy val executor = createExecutor + lazy val executor = { + log.info("Lazily initializing ExecutorService for ",this) + createExecutor + } } class LazyExecutorServiceWrapper(executorFactory: => ExecutorService) extends LazyExecutorService { diff --git a/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala b/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala index a5ae40ff16..fac5358fc3 100644 --- a/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala @@ -138,8 +138,8 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers { } unpackExecutorService(dispatcher match { - case e: ExecutorBasedEventDrivenDispatcher => e.start; e.executorService.get() - case e: ExecutorBasedEventDrivenWorkStealingDispatcher => e.start; e.executorService.get() + case e: ExecutorBasedEventDrivenDispatcher => e.executorService.get() + case e: ExecutorBasedEventDrivenWorkStealingDispatcher => e.executorService.get() case x => throw new IllegalStateException("Illegal dispatcher type: " + x) }).asInstanceOf[ThreadPoolExecutor] }