diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 40f94b4210..34b26629af 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -274,7 +274,7 @@ private[akka] class ActorCell( } @inline - final def dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher) + final val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher) /** * UntypedActorContext impl diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 6450d82f19..aa1907b093 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -61,7 +61,8 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc /** * Returns a dispatcher as specified in configuration, or if not defined it uses - * the default dispatcher. + * the default dispatcher. Please note that this method _may_ create and return a NEW dispatcher, + * _every_ call. */ def lookup(id: String): MessageDispatcher = lookupConfigurator(id).dispatcher() @@ -189,8 +190,7 @@ class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherP config.getString("id"), config.getInt("throughput"), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), - mailboxType, - threadPoolConfig.copy(corePoolSize = 1, maxPoolSize = 1), + mailboxType, threadPoolConfig, Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS))).build /** @@ -212,6 +212,7 @@ class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrer override def dispatcher(): MessageDispatcher = configureThreadPool(config, threadPoolConfig ⇒ new PinnedDispatcher(prerequisites, null, config.getString("name"), config.getString("id"), mailboxType, - Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), threadPoolConfig)).build + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), + threadPoolConfig)).build } diff --git a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala index afb6ed55c4..8e8f229c90 100644 --- a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala @@ -25,14 +25,14 @@ class PinnedDispatcher( _id: String, _mailboxType: MailboxType, _shutdownTimeout: Duration, - _threadPoolConfig: ThreadPoolConfig = ThreadPoolConfig(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1)) + _threadPoolConfig: ThreadPoolConfig = ThreadPoolConfig()) extends Dispatcher(_prerequisites, _name, _id, Int.MaxValue, Duration.Zero, _mailboxType, - _threadPoolConfig, + _threadPoolConfig.copy(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1), _shutdownTimeout) { @volatile diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 220c6e613a..77669cfd1a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -90,7 +90,7 @@ object ThreadPoolConfigDispatcherBuilder { */ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) ⇒ MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder { import ThreadPoolConfig._ - def build = dispatcherFactory(config) + def build: MessageDispatcher = dispatcherFactory(config) def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder = this.copy(config = config.copy(queueFactory = newQueueFactory)) @@ -202,55 +202,4 @@ class SaneRejectedExecutionHandler extends RejectedExecutionHandler { if (threadPoolExecutor.isShutdown) throw new RejectedExecutionException("Shutdown") else runnable.run() } -} - -/** - * Commented out pending discussion with Doug Lea - * - * case class ForkJoinPoolConfig(targetParallelism: Int = Runtime.getRuntime.availableProcessors()) extends ExecutorServiceFactoryProvider { - * final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory { - * def createExecutorService: ExecutorService = { - * new ForkJoinPool(targetParallelism) with ExecutorService { - * setAsyncMode(true) - * setMaintainsParallelism(true) - * - * override final def execute(r: Runnable) { - * r match { - * case fjmbox: FJMailbox ⇒ - * //fjmbox.fjTask.reinitialize() - * Thread.currentThread match { - * case fjwt: ForkJoinWorkerThread if fjwt.getPool eq this ⇒ - * fjmbox.fjTask.fork() //We should do fjwt.pushTask(fjmbox.fjTask) but it's package protected - * case _ ⇒ super.execute[Unit](fjmbox.fjTask) - * } - * case _ ⇒ - * super.execute(r) - * } - * } - * - * import java.util.{ Collection ⇒ JCollection } - * - * def invokeAny[T](callables: JCollection[_ <: Callable[T]]) = - * throw new UnsupportedOperationException("invokeAny. NOT!") - * - * def invokeAny[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = - * throw new UnsupportedOperationException("invokeAny. NOT!") - * - * def invokeAll[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = - * throw new UnsupportedOperationException("invokeAny. NOT!") - * } - * } - * } - * } - * - * trait FJMailbox { self: Mailbox ⇒ - * final val fjTask = new ForkJoinTask[Unit] with Runnable { - * private[this] var result: Unit = () - * final def getRawResult() = result - * final def setRawResult(v: Unit) { result = v } - * final def exec() = { self.run(); true } - * final def run() { invoke() } - * } - * } - * - */ +} \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index f76aa8e908..02d6d682b0 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -6,17 +6,11 @@ package akka.remote import akka.actor._ import akka.event._ -import akka.actor.Status._ import akka.util._ import akka.util.duration._ import akka.util.Helpers._ -import akka.serialization.Compression.LZF -import java.net.InetSocketAddress -import com.eaio.uuid.UUID -import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compression, SerializationExtension } -import akka.dispatch.{ Terminate, Dispatchers, Future, PinnedDispatcher, MessageDispatcher } -import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.serialization.{ JavaSerializer, Serialization, SerializationExtension } +import akka.dispatch.MessageDispatcher import akka.dispatch.SystemMessage import scala.annotation.tailrec import akka.remote.RemoteProtocol.{ ActorRefProtocol, AkkaRemoteProtocol, RemoteControlProtocol, RemoteMessageProtocol }