various code clean up + fixed kernel startup script

This commit is contained in:
Jonas Boner 2009-07-02 22:20:16 +02:00
parent 5c99b4ed8d
commit 35e3d97ef3
17 changed files with 549 additions and 1361 deletions

1238
akka.iws

File diff suppressed because it is too large Load diff

View file

@ -15,11 +15,12 @@ do
CLASSPATH=$CLASSPATH:$FILE
done
CLASSPATH=$CLASSPATH:$BASE_DIR/config
CLASSPATH=$CLASSPATH:$BASE_DIR/kernel/build/classes
CLASSPATH=$CLASSPATH:$BASE_DIR/kernel/target/classes
STORAGE_OPTS=" \
-Dcassandra \
-Dstorage-config=$BASE_DIR/config/storage-conf.xml"
-Dstorage-config=$BASE_DIR/config/ \
-Dpidfile=akka.pid"
JVM_OPTS=" \
-server \
@ -39,4 +40,4 @@ JVM_OPTS=" \
-Dcom.sun.management.jmxremote.ssl=false \
-Dcom.sun.management.jmxremote.authenticate=false"
java $JVM_OPTS $STORAGE_OPTS -cp $CLASSPATH se.scalablesolutions.akka.Boot se.scalablesolutions.akka.kernel.Kernel ${1}
/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Home/bin/java $JVM_OPTS $STORAGE_OPTS -cp $CLASSPATH se.scalablesolutions.akka.Boot se.scalablesolutions.akka.kernel.Kernel ${1}

View file

@ -37,14 +37,14 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase {
final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
protected void setUp() {
ThreadPoolBuilder builder = new ThreadPoolBuilder();
MessageDispatcher dispatcher = new EventBasedThreadPoolDispatcher(builder
.newThreadPoolWithBoundedBlockingQueue(100)
EventBasedThreadPoolDispatcher dispatcher = new EventBasedThreadPoolDispatcher();
dispatcher
.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(16)
.setMaxPoolSize(128)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new ThreadPoolExecutor.CallerRunsPolicy())
.build());
.buildThreadPool();
conf.addExternalGuiceModule(new AbstractModule() {
protected void configure() {

View file

@ -1,27 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.api;
import org.junit.*;
import static org.junit.Assert.*;
import junit.framework.TestSuite;
public class NioTest extends TestSuite {
@BeforeClass
public static void initialize() {
}
@AfterClass
public static void cleanup() {
}
@Test
public void simpleRequestReply() {
}
}

View file

@ -14,13 +14,13 @@ import com.sun.grizzly.standalone.StaticStreamAlgorithm;
import javax.ws.rs.core.UriBuilder;
import javax.servlet.Servlet;
import junit.framework.TestSuite;
import org.junit.*;
import static org.junit.Assert.*;
import java.io.IOException;
import java.net.URI;
import junit.framework.TestSuite;
import se.scalablesolutions.akka.kernel.config.ActiveObjectGuiceConfiguratorForJava;
import se.scalablesolutions.akka.kernel.config.JavaConfig;
@ -51,6 +51,11 @@ public class RestTest extends TestSuite {
}
@Test
public void dummy() {
assertTrue(true);
}
//@Test
public void simpleRequest() throws IOException, InstantiationException {
selector.start();
Client client = Client.create();

View file

@ -17,7 +17,7 @@ import kernel.util.Logging
object Boot extends Logging {
val HOME = try { System.getenv("AKKA_HOME") } catch { case e: NullPointerException => throw new IllegalStateException("AKKA_HOME system variable needs to be set") }
val CLASSES = HOME + "/classes"
val CLASSES = HOME + "/kernel/target/classes" // FIXME fix classes dir for dist wrap
val LIB = HOME + "/lib"
val CONFIG = HOME + "/config"

View file

@ -33,21 +33,10 @@ import kernel.util.Logging
object Kernel extends Logging {
val SERVER_URL = "localhost"
val JERSEY_SERVER_URL = "http://" + SERVER_URL + "/"
val JERSEY_SERVER_PORT = 9998
val JERSEY_REST_CLASSES_ROOT_PACKAGE = "se.scalablesolutions.akka.kernel"
val JERSEY_BASE_URI = UriBuilder.fromUri(JERSEY_SERVER_URL).port(getPort(JERSEY_SERVER_PORT)).build()
/*
val VOLDEMORT_SERVER_URL = "tcp://" + SERVER_URL
val VOLDEMORT_SERVER_PORT = 6666
val VOLDEMORT_BOOTSTRAP_URL = VOLDEMORT_SERVER_URL + ":" + VOLDEMORT_SERVER_PORT
val ZOO_KEEPER_SERVER_URL = SERVER_URL
val ZOO_KEEPER_SERVER_PORT = 9898
/*
private[this] var storageFactory: StoreClientFactory = _
private[this] var storageServer: VoldemortServer = _
*/
*/
private[this] var remoteServer: RemoteServer = _
@ -57,12 +46,7 @@ object Kernel extends Logging {
startCassandra
//cassandraBenchmark
//val threadSelector = startJersey
// TODO: handle shutdown of Jersey in separate thread
// TODO: spawn main in new thread an communicate using socket
//System.in.read
//threadSelector.stopEndpoint
//startJersey
//startZooKeeper
//startVoldemort
}
@ -76,22 +60,27 @@ object Kernel extends Logging {
}
})
remoteServerThread.start
Thread.sleep(1000) // wait for server to start up
}
private[akka] def startJersey: SelectorThread = {
val initParams = new java.util.HashMap[String, String]
initParams.put(
"com.sun.jersey.config.property.packages",
JERSEY_REST_CLASSES_ROOT_PACKAGE)
GrizzlyWebContainerFactory.create(JERSEY_BASE_URI, initParams)
}
private[akka] def startCassandra = {
CassandraNode.start
}
private[akka] def startJersey = {
val JERSEY_SERVER_URL = "http://" + SERVER_URL + "/"
val JERSEY_SERVER_PORT = 9998
val JERSEY_REST_CLASSES_ROOT_PACKAGE = "se.scalablesolutions.akka.kernel"
val JERSEY_BASE_URI = UriBuilder.fromUri(JERSEY_SERVER_URL).port(getPort(JERSEY_SERVER_PORT)).build()
val initParams = new java.util.HashMap[String, String]
initParams.put("com.sun.jersey.config.property.packages", JERSEY_REST_CLASSES_ROOT_PACKAGE)
val threadSelector = GrizzlyWebContainerFactory.create(JERSEY_BASE_URI, initParams)
// TODO: handle shutdown of Jersey in separate thread
// TODO: spawn main in new thread an communicate using socket
System.in.read
threadSelector.stopEndpoint
}
private def cassandraBenchmark = {
val NR_ENTRIES = 100000
@ -134,7 +123,11 @@ java.lang.ClassCastException: [B cannot be cast to org.apache.cassandra.db.Write
System.exit(0)
}
// private[akka] def startVoldemort = {
// val VOLDEMORT_SERVER_URL = "tcp://" + SERVER_URL
// val VOLDEMORT_SERVER_PORT = 6666
// val VOLDEMORT_BOOTSTRAP_URL = VOLDEMORT_SERVER_URL + ":" + VOLDEMORT_SERVER_PORT
// // Start Voldemort server
// val config = VoldemortConfig.loadFromVoldemortHome(Boot.HOME)
// storageServer = new VoldemortServer(config)
@ -166,6 +159,8 @@ java.lang.ClassCastException: [B cannot be cast to org.apache.cassandra.db.Write
// storageFactory.getStoreClient(storageName)
// private[akka] def startZooKeeper = {
// val ZOO_KEEPER_SERVER_URL = SERVER_URL
// val ZOO_KEEPER_SERVER_PORT = 9898
// try {
// ManagedUtil.registerLog4jMBeans
// ServerConfig.parse(args)

View file

@ -36,14 +36,6 @@ class ActiveObjectFactory {
// FIXME How to pass the MessageDispatcher on from active object to child???????
// FIXME call backs to @prerestart @postrestart methods
// FIXME dispatcher.newThreadPoolWith....build
// FIXME JMX enable configuration
// FIXME Configgy for config
def newInstance[T](target: Class[T], timeout: Long): T =
ActiveObject.newInstance(target, new Dispatcher, None, timeout)

View file

@ -81,8 +81,7 @@ trait Actor extends Logging with TransactionManagement {
* </pre>
*/
protected[kernel] var dispatcher: MessageDispatcher = {
val threadPool = ThreadPoolBuilder.newBuilder.newThreadPoolWithLinkedBlockingQueueWithCapacity(0).build
val dispatcher = new EventBasedThreadPoolDispatcher(threadPool)
val dispatcher = new EventBasedThreadPoolDispatcher
mailbox = dispatcher.messageQueue
dispatcher.registerHandler(this, new ActorMessageHandler(this))
dispatcher

View file

@ -2,23 +2,82 @@
* Copyright (C) 2009 Scalable Solutions.
*/
/**
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
* See also this article: [http://today.java.net/cs/user/print/a/350].
*
* Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
*/
package se.scalablesolutions.akka.kernel.reactor
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.ExecutorService
import java.util.{HashSet, LinkedList, Queue}
import java.util.concurrent._
import locks.ReentrantLock
import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
import java.util.{Collection, HashSet, LinkedList, Queue}
class EventBasedThreadPoolDispatcher(private val threadPool: ExecutorService) extends MessageDispatcherBase {
/**
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].<br/>
* See also this article: [http://today.java.net/cs/user/print/a/350].
* <p/>
* Default thread pool settings are:
* <pre/>
* - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
* - NR_START_THREADS = 16
* - NR_MAX_THREADS = 128
* - KEEP_ALIVE_TIME = 60000L // one minute
* </pre>
* <p/>
* The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case.
* There is a default thread pool defined but make use of the builder if you need it. Here are some examples.
* <p/>
* Scala API.
* <p/>
* Example usage:
* <pre/>
* val dispatcher = EventBasedThreadPoolDispatcher
* dispatcher
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
* .setMaxPoolSize(128)
* .setKeepAliveTimeInMillis(60000)
* .setRejectionPolicy(new CallerRunsPolicy)
* .buildThreadPool
* </pre>
* <p/>
*
* Java API.
* <p/>
* Example usage:
* <pre/>
* EventBasedThreadPoolDispatcher dispatcher = new EventBasedThreadPoolDispatcher();
* dispatcher
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
* .setMaxPoolSize(128)
* .setKeepAliveTimeInMillis(60000)
* .setRejectionPolicy(new CallerRunsPolicy())
* .buildThreadPool();
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
private val NR_START_THREADS = 16
private val NR_MAX_THREADS = 128
private val KEEP_ALIVE_TIME = 60000L // default is one minute
private val MILLISECONDS = TimeUnit.MILLISECONDS
private var inProcessOfBuilding = false
private var executor: ExecutorService = _
private var threadPoolBuilder: ThreadPoolExecutor = _
private val threadFactory = new MonitorableThreadFactory("akka")
private var boundedExecutorBound = -1
private val busyHandlers = new HashSet[AnyRef]
// build default thread pool
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
def start = if (!active) {
active = true
/**
* This dispatcher code is based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
*/
val messageDemultiplexer = new EventBasedThreadPoolDemultiplexer(messageQueue)
selectorThread = new Thread {
override def run = {
@ -33,7 +92,7 @@ class EventBasedThreadPoolDispatcher(private val threadPool: ExecutorService) ex
val message = queue.peek
val messageHandler = getIfNotBusy(message.sender)
if (messageHandler.isDefined) {
threadPool.execute(new Runnable {
executor.execute(new Runnable {
override def run = {
messageHandler.get.handle(message)
free(message.sender)
@ -52,7 +111,7 @@ class EventBasedThreadPoolDispatcher(private val threadPool: ExecutorService) ex
selectorThread.start
}
override protected def doShutdown = threadPool.shutdownNow
override protected def doShutdown = executor.shutdownNow
private def getIfNotBusy(key: AnyRef): Option[MessageHandler] = guard.synchronized {
if (!busyHandlers.contains(key) && messageHandlers.containsKey(key)) {
@ -64,6 +123,121 @@ class EventBasedThreadPoolDispatcher(private val threadPool: ExecutorService) ex
private def free(key: AnyRef) = guard.synchronized {
busyHandlers.remove(key)
}
// ============ Code for configuration of thread pool =============
def buildThreadPool = synchronized {
ensureNotActive
inProcessOfBuilding = false
if (boundedExecutorBound > 0) {
val boundedExecutor = new BoundedExecutorDecorator(threadPoolBuilder, boundedExecutorBound)
boundedExecutorBound = -1
executor = boundedExecutor
} else {
executor = threadPoolBuilder
}
}
def withNewThreadPoolWithQueue(queue: BlockingQueue[Runnable]): EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyNotInConstructionPhase
inProcessOfBuilding = false
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, queue)
this
}
/**
* Creates an new thread pool in which the number of tasks in the pending queue is bounded. Will block when exceeeded.
* <p/>
* The 'bound' variable should specify the number equal to the size of the thread pool PLUS the number of queued tasks that should be followed.
*/
def withNewThreadPoolWithBoundedBlockingQueue(bound: Int): EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyNotInConstructionPhase
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new LinkedBlockingQueue[Runnable], threadFactory)
boundedExecutorBound = bound
this
}
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyNotInConstructionPhase
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new LinkedBlockingQueue[Runnable](capacity), threadFactory, new CallerRunsPolicy)
this
}
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyNotInConstructionPhase
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new LinkedBlockingQueue[Runnable], threadFactory, new CallerRunsPolicy)
this
}
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyNotInConstructionPhase
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new SynchronousQueue[Runnable](fair), threadFactory, new CallerRunsPolicy)
this
}
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyNotInConstructionPhase
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new ArrayBlockingQueue[Runnable](capacity, fair), threadFactory, new CallerRunsPolicy)
this
}
/**
* Default is 16.
*/
def setCorePoolSize(size: Int): EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyInConstructionPhase
threadPoolBuilder.setCorePoolSize(size)
this
}
/**
* Default is 128.
*/
def setMaxPoolSize(size: Int): EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyInConstructionPhase
threadPoolBuilder.setMaximumPoolSize(size)
this
}
/**
* Default is 60000 (one minute).
*/
def setKeepAliveTimeInMillis(time: Long): EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyInConstructionPhase
threadPoolBuilder.setKeepAliveTime(time, MILLISECONDS)
this
}
/**
* Default ThreadPoolExecutor.CallerRunsPolicy. To allow graceful backing off when pool is overloaded.
*/
def setRejectionPolicy(policy: RejectedExecutionHandler): EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyInConstructionPhase
threadPoolBuilder.setRejectedExecutionHandler(policy)
this
}
private def verifyNotInConstructionPhase = {
if (inProcessOfBuilding) throw new IllegalStateException("Is already in the process of building a thread pool")
inProcessOfBuilding = true
}
private def verifyInConstructionPhase = {
if (!inProcessOfBuilding) throw new IllegalStateException("Is not in the process of building a thread pool, start building one by invoking one of the 'newThreadPool*' methods")
}
private def ensureNotActive = if (active) throw new IllegalStateException("Can't build a new thread pool for a dispatcher that is already up and running")
}
class EventBasedThreadPoolDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer {
@ -89,3 +263,91 @@ class EventBasedThreadPoolDemultiplexer(private val messageQueue: MessageQueue)
def wakeUp = messageQueue.interrupt
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorService {
private val semaphore = new Semaphore(bound)
def execute(command: Runnable) = {
semaphore.acquire
try {
executor.execute(new Runnable() {
def run = {
try {
command.run
} finally {
semaphore.release
}
}
})
} catch {
case e: RejectedExecutionException =>
semaphore.release
}
}
// Delegating methods for the ExecutorService interface
def shutdown = executor.shutdown
def shutdownNow = executor.shutdownNow
def isShutdown = executor.isShutdown
def isTerminated = executor.isTerminated
def awaitTermination(l: Long, timeUnit: TimeUnit) = executor.awaitTermination(l, timeUnit)
def submit[T](callable: Callable[T]) = executor.submit(callable)
def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t)
def submit(runnable: Runnable) = executor.submit(runnable)
/*
def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
*/
def invokeAll[T](callables: Collection[Callable[T]]) = executor.invokeAll(callables)
def invokeAll[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
def invokeAny[T](callables: Collection[Callable[T]]) = executor.invokeAny(callables)
def invokeAny[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class MonitorableThreadFactory(val name: String) extends ThreadFactory {
private val counter = new AtomicLong
def newThread(runnable: Runnable) =
//new MonitorableThread(runnable, name)
new Thread(runnable, name + "-" + counter.getAndIncrement)
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object MonitorableThread {
val DEFAULT_NAME = "MonitorableThread"
val created = new AtomicInteger
val alive = new AtomicInteger
@volatile val debugLifecycle = false
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class MonitorableThread(runnable: Runnable, name: String)
extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) {//with Logging {
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
def uncaughtException(thread: Thread, cause: Throwable) = {} //log.error("UNCAUGHT in thread [%s] cause [%s]", thread.getName, cause)
})
override def run = {
val debug = MonitorableThread.debugLifecycle
//if (debug) log.debug("Created %s", getName)
try {
MonitorableThread.alive.incrementAndGet
super.run
} finally {
MonitorableThread.alive.decrementAndGet
//if (debug) log.debug("Exiting %s", getName)
}
}
}

View file

@ -1,242 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.reactor
import java.util.concurrent._
import atomic.{AtomicLong, AtomicInteger}
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
import java.util.Collection
/**
* Scala API.
* <p/>
* Example usage:
* <pre/>
* val threadPool = ThreadPoolBuilder.newBuilder
* .newThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
* .setMaxPoolSize(128)
* .setKeepAliveTimeInMillis(60000)
* .setRejectionPolicy(new CallerRunsPolicy)
* .build
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ThreadPoolBuilder {
def newBuilder = new ThreadPoolBuilder
}
/**
* Java API.
* <p/>
* Example usage:
* <pre/>
* ThreadPoolBuilder builder = new ThreadPoolBuilder();
* Executor threadPool =
* builder
* .newThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
* .setMaxPoolSize(128)
* .setKeepAliveTimeInMillis(60000)
* .setRejectionPolicy(new CallerRunsPolicy())
* .build();
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ThreadPoolBuilder {
val NR_START_THREADS = 16
val NR_MAX_THREADS = 128
val KEEP_ALIVE_TIME = 60000L // default is one minute
val MILLISECONDS = TimeUnit.MILLISECONDS
private var inProcessOfBuilding = false
private var threadPool: ThreadPoolExecutor = _
private val threadFactory = new MonitorableThreadFactory("akka")
private var boundedExecutorBound = -1
def build: ExecutorService = synchronized {
inProcessOfBuilding = false
if (boundedExecutorBound > 0) {
val executor = new BoundedExecutorDecorator(threadPool, boundedExecutorBound)
boundedExecutorBound = -1
executor
} else threadPool
}
def newThreadPool(queue: BlockingQueue[Runnable]) = synchronized {
verifyNotInConstructionPhase
threadPool = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, queue)
this
}
/**
* Creates an new thread pool in which the number of tasks in the pending queue is bounded. Will block when exceeeded.
* <p/>
* The 'bound' variable should specify the number equal to the size of the thread pool PLUS the number of queued tasks that should be followed.
*/
def newThreadPoolWithBoundedBlockingQueue(bound: Int) = synchronized {
verifyNotInConstructionPhase
threadPool = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new LinkedBlockingQueue[Runnable], threadFactory)
boundedExecutorBound = bound
this
}
/**
* Negative or zero capacity creates an unbounded task queue.
*/
def newThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int) = synchronized {
verifyNotInConstructionPhase
val queue = if (capacity < 1) new LinkedBlockingQueue[Runnable] else new LinkedBlockingQueue[Runnable](capacity)
threadPool = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, queue, threadFactory, new CallerRunsPolicy)
this
}
def newThreadPoolWithSynchronousQueueWithFairness(fair: Boolean) = synchronized {
verifyNotInConstructionPhase
threadPool = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new SynchronousQueue[Runnable](fair), threadFactory, new CallerRunsPolicy)
this
}
def newThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean) = synchronized {
verifyNotInConstructionPhase
threadPool = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new ArrayBlockingQueue[Runnable](capacity, fair), threadFactory, new CallerRunsPolicy)
this
}
/**
* Default is 16.
*/
def setCorePoolSize(size: Int) = synchronized {
verifyInConstructionPhase
threadPool.setCorePoolSize(size)
this
}
/**
* Default is 128.
*/
def setMaxPoolSize(size: Int) = synchronized {
verifyInConstructionPhase
threadPool.setMaximumPoolSize(size)
this
}
/**
* Default is 60000 (one minute).
*/
def setKeepAliveTimeInMillis(time: Long) = synchronized {
verifyInConstructionPhase
threadPool.setKeepAliveTime(time, MILLISECONDS)
this
}
/**
* Default ThreadPoolExecutor.CallerRunsPolicy. To allow graceful backing off when pool is overloaded.
*/
def setRejectionPolicy(policy: RejectedExecutionHandler) = synchronized {
verifyInConstructionPhase
threadPool.setRejectedExecutionHandler(policy)
this
}
private def verifyNotInConstructionPhase = {
if (inProcessOfBuilding) throw new IllegalStateException("Is already in the process of building a thread pool")
inProcessOfBuilding = true
}
private def verifyInConstructionPhase = {
if (!inProcessOfBuilding) throw new IllegalStateException("Is not in the process of building a thread pool, start building one by invoking one of the 'newThreadPool*' methods")
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorService {
private val semaphore = new Semaphore(bound)
def execute(command: Runnable) = {
semaphore.acquire
try {
executor.execute(new Runnable() {
def run = {
try {
command.run
} finally {
semaphore.release
}
}
})
} catch {
case e: RejectedExecutionException =>
semaphore.release
}
}
// Delegating methods for the ExecutorService interface
def shutdown = executor.shutdown
def shutdownNow = executor.shutdownNow
def isShutdown = executor.isShutdown
def isTerminated = executor.isTerminated
def awaitTermination(l: Long, timeUnit: TimeUnit) = executor.awaitTermination(l, timeUnit)
def submit[T](callable: Callable[T]) = executor.submit(callable)
def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t)
def submit(runnable: Runnable) = executor.submit(runnable)
def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
/*
def invokeAll[T](callables: Collection[Callable[T]]) = executor.invokeAll(callables)
def invokeAll[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
def invokeAny[T](callables: Collection[Callable[T]]) = executor.invokeAny(callables)
def invokeAny[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
*/
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class MonitorableThreadFactory(val name: String) extends ThreadFactory {
private val counter = new AtomicLong
def newThread(runnable: Runnable) =
//new MonitorableThread(runnable, name)
new Thread(runnable, name + "-" + counter.getAndIncrement)
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object MonitorableThread {
val DEFAULT_NAME = "MonitorableThread"
val created = new AtomicInteger
val alive = new AtomicInteger
@volatile val debugLifecycle = false
}
import kernel.util.Logging
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class MonitorableThread(runnable: Runnable, name: String)
extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) {//with Logging {
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
def uncaughtException(thread: Thread, cause: Throwable) = {} //log.error("UNCAUGHT in thread [%s] cause [%s]", thread.getName, cause)
})
override def run = {
val debug = MonitorableThread.debugLifecycle
//if (debug) log.debug("Created %s", getName)
try {
MonitorableThread.alive.incrementAndGet
super.run
} finally {
MonitorableThread.alive.decrementAndGet
//if (debug) log.debug("Exiting %s", getName)
}
}
}

View file

@ -32,9 +32,9 @@ class JerseySpec extends Spec with ShouldMatchers {
describe("A Jersey REST service") {
it("should ...") {
/*
val selector = startJersey
selector.start
/*
val conf = new ActiveObjectGuiceConfigurator
conf.configureActiveObjects(
RestartStrategy(AllForOne, 3, 5000),

View file

@ -37,14 +37,13 @@ class ThreadBasedDispatcherTest extends TestCase {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(10)
val key = "key"
val pool = ThreadPoolBuilder.newBuilder
.newThreadPoolWithBoundedBlockingQueue(100)
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.build
val dispatcher = new EventBasedThreadPoolDispatcher(pool)
.buildThreadPool
dispatcher.registerHandler(key, new MessageHandler {
def handle(message: MessageHandle) {
try {
@ -74,14 +73,13 @@ class ThreadBasedDispatcherTest extends TestCase {
val handlersBarrier = new CyclicBarrier(3)
val key1 = "key1"
val key2 = "key2"
val pool = ThreadPoolBuilder.newBuilder
.newThreadPoolWithBoundedBlockingQueue(100)
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.build
val dispatcher = new EventBasedThreadPoolDispatcher(pool)
.buildThreadPool
dispatcher.registerHandler(key1, new MessageHandler {
def handle(message: MessageHandle) = synchronized {
try {handlersBarrier.await(1, TimeUnit.SECONDS)}
@ -105,14 +103,13 @@ class ThreadBasedDispatcherTest extends TestCase {
val handleLatch = new CountDownLatch(200)
val key1 = "key1"
val key2 = "key2"
val pool = ThreadPoolBuilder.newBuilder
.newThreadPoolWithBoundedBlockingQueue(100)
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.build
val dispatcher = new EventBasedThreadPoolDispatcher(pool)
.buildThreadPool
dispatcher.registerHandler(key1, new MessageHandler {
var currentValue = -1;
def handle(message: MessageHandle) {

Binary file not shown.

Binary file not shown.

Binary file not shown.