Partial migration to M5

This commit is contained in:
Viktor Klang 2012-07-17 17:21:08 +02:00
parent b8e7569e56
commit 1bf0fe4448
23 changed files with 88 additions and 105 deletions

View file

@ -5,20 +5,23 @@
package akka.dispatch
import java.util.Collection
import scala.concurrent.{ Awaitable, BlockContext }
import scala.concurrent.util.Duration
import scala.concurrent.forkjoin._
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.BlockingQueue
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.RejectedExecutionHandler
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.ThreadFactory
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.{
ArrayBlockingQueue,
BlockingQueue,
Callable,
ExecutorService,
LinkedBlockingQueue,
RejectedExecutionHandler,
RejectedExecutionException,
SynchronousQueue,
TimeUnit,
ThreadFactory,
ThreadPoolExecutor
}
import java.util.concurrent.atomic.{ AtomicReference, AtomicLong }
object ThreadPoolConfig {
type QueueFactory = () BlockingQueue[Runnable]
@ -154,6 +157,22 @@ case class ThreadPoolConfigBuilder(config: ThreadPoolConfig) {
object MonitorableThreadFactory {
val doNothing: Thread.UncaughtExceptionHandler =
new Thread.UncaughtExceptionHandler() { def uncaughtException(thread: Thread, cause: Throwable) = () }
private[akka] class AkkaForkJoinWorkerThread(_pool: ForkJoinPool) extends ForkJoinWorkerThread(_pool) with BlockContext {
override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
val result = new AtomicReference[Option[T]](None)
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
def block(): Boolean = {
result.set(Some(awaitable.result(atMost)(scala.concurrent.impl.InternalFutureUtil.canAwaitEvidence)))
// FIXME replace with
//result.set(Some(BlockContext.DefaultBlockContext.internalBlockingCall(awaitable, atMost)))
true
}
def isReleasable = result.get.isDefined
})
result.get.get // Exception intended if None
}
}
}
case class MonitorableThreadFactory(name: String,
@ -164,7 +183,7 @@ case class MonitorableThreadFactory(name: String,
protected val counter = new AtomicLong
def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = {
val t = wire(ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool))
val t = wire(new MonitorableThreadFactory.AkkaForkJoinWorkerThread(pool))
// Name of the threads for the ForkJoinPool are not customizable. Change it here.
t.setName(name + "-" + counter.incrementAndGet())
t