Fixing master: Unborking of BalancingDispatcher, fixing of PinnedDispatcher config, not creating one dispatcher per message for PinnedDispatcher ;)

This commit is contained in:
Viktor Klang 2012-01-19 12:15:49 +01:00
parent 158bbabb58
commit 97280ffeed
5 changed files with 12 additions and 68 deletions

View file

@ -274,7 +274,7 @@ private[akka] class ActorCell(
} }
@inline @inline
final def dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher) final val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher)
/** /**
* UntypedActorContext impl * UntypedActorContext impl

View file

@ -61,7 +61,8 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
/** /**
* Returns a dispatcher as specified in configuration, or if not defined it uses * Returns a dispatcher as specified in configuration, or if not defined it uses
* the default dispatcher. * the default dispatcher. Please note that this method _may_ create and return a NEW dispatcher,
* _every_ call.
*/ */
def lookup(id: String): MessageDispatcher = lookupConfigurator(id).dispatcher() def lookup(id: String): MessageDispatcher = lookupConfigurator(id).dispatcher()
@ -189,8 +190,7 @@ class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherP
config.getString("id"), config.getString("id"),
config.getInt("throughput"), config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType, mailboxType, threadPoolConfig,
threadPoolConfig.copy(corePoolSize = 1, maxPoolSize = 1),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS))).build Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS))).build
/** /**
@ -212,6 +212,7 @@ class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrer
override def dispatcher(): MessageDispatcher = configureThreadPool(config, override def dispatcher(): MessageDispatcher = configureThreadPool(config,
threadPoolConfig threadPoolConfig
new PinnedDispatcher(prerequisites, null, config.getString("name"), config.getString("id"), mailboxType, new PinnedDispatcher(prerequisites, null, config.getString("name"), config.getString("id"), mailboxType,
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), threadPoolConfig)).build Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS),
threadPoolConfig)).build
} }

View file

@ -25,14 +25,14 @@ class PinnedDispatcher(
_id: String, _id: String,
_mailboxType: MailboxType, _mailboxType: MailboxType,
_shutdownTimeout: Duration, _shutdownTimeout: Duration,
_threadPoolConfig: ThreadPoolConfig = ThreadPoolConfig(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1)) _threadPoolConfig: ThreadPoolConfig = ThreadPoolConfig())
extends Dispatcher(_prerequisites, extends Dispatcher(_prerequisites,
_name, _name,
_id, _id,
Int.MaxValue, Int.MaxValue,
Duration.Zero, Duration.Zero,
_mailboxType, _mailboxType,
_threadPoolConfig, _threadPoolConfig.copy(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1),
_shutdownTimeout) { _shutdownTimeout) {
@volatile @volatile

View file

@ -90,7 +90,7 @@ object ThreadPoolConfigDispatcherBuilder {
*/ */
case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder { case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder {
import ThreadPoolConfig._ import ThreadPoolConfig._
def build = dispatcherFactory(config) def build: MessageDispatcher = dispatcherFactory(config)
def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder = def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = newQueueFactory)) this.copy(config = config.copy(queueFactory = newQueueFactory))
@ -203,54 +203,3 @@ class SaneRejectedExecutionHandler extends RejectedExecutionHandler {
else runnable.run() else runnable.run()
} }
} }
/**
* Commented out pending discussion with Doug Lea
*
* case class ForkJoinPoolConfig(targetParallelism: Int = Runtime.getRuntime.availableProcessors()) extends ExecutorServiceFactoryProvider {
* final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory {
* def createExecutorService: ExecutorService = {
* new ForkJoinPool(targetParallelism) with ExecutorService {
* setAsyncMode(true)
* setMaintainsParallelism(true)
*
* override final def execute(r: Runnable) {
* r match {
* case fjmbox: FJMailbox
* //fjmbox.fjTask.reinitialize()
* Thread.currentThread match {
* case fjwt: ForkJoinWorkerThread if fjwt.getPool eq this
* fjmbox.fjTask.fork() //We should do fjwt.pushTask(fjmbox.fjTask) but it's package protected
* case _ super.execute[Unit](fjmbox.fjTask)
* }
* case _
* super.execute(r)
* }
* }
*
* import java.util.{ Collection JCollection }
*
* def invokeAny[T](callables: JCollection[_ <: Callable[T]]) =
* throw new UnsupportedOperationException("invokeAny. NOT!")
*
* def invokeAny[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) =
* throw new UnsupportedOperationException("invokeAny. NOT!")
*
* def invokeAll[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) =
* throw new UnsupportedOperationException("invokeAny. NOT!")
* }
* }
* }
* }
*
* trait FJMailbox { self: Mailbox
* final val fjTask = new ForkJoinTask[Unit] with Runnable {
* private[this] var result: Unit = ()
* final def getRawResult() = result
* final def setRawResult(v: Unit) { result = v }
* final def exec() = { self.run(); true }
* final def run() { invoke() }
* }
* }
*
*/

View file

@ -6,17 +6,11 @@ package akka.remote
import akka.actor._ import akka.actor._
import akka.event._ import akka.event._
import akka.actor.Status._
import akka.util._ import akka.util._
import akka.util.duration._ import akka.util.duration._
import akka.util.Helpers._ import akka.util.Helpers._
import akka.serialization.Compression.LZF import akka.serialization.{ JavaSerializer, Serialization, SerializationExtension }
import java.net.InetSocketAddress import akka.dispatch.MessageDispatcher
import com.eaio.uuid.UUID
import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compression, SerializationExtension }
import akka.dispatch.{ Terminate, Dispatchers, Future, PinnedDispatcher, MessageDispatcher }
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.dispatch.SystemMessage import akka.dispatch.SystemMessage
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.remote.RemoteProtocol.{ ActorRefProtocol, AkkaRemoteProtocol, RemoteControlProtocol, RemoteMessageProtocol } import akka.remote.RemoteProtocol.{ ActorRefProtocol, AkkaRemoteProtocol, RemoteControlProtocol, RemoteMessageProtocol }