Merge branch 'master' of git@github.com:jboner/akka
This commit is contained in:
commit
16eeb26ccb
19 changed files with 406 additions and 222 deletions
|
|
@ -109,18 +109,15 @@ object ActorRegistry extends ListenerManagement {
|
|||
* Finds all actors that has a specific id.
|
||||
*/
|
||||
def actorsFor(id: String): Array[ActorRef] = {
|
||||
if (actorsById.containsKey(id)) {
|
||||
actorsById.get(id).toArray(Naught)
|
||||
} else Naught
|
||||
val set = actorsById get id
|
||||
if (set ne null) set toArray Naught
|
||||
else Naught
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds the actor that has a specific UUID.
|
||||
*/
|
||||
def actorFor(uuid: String): Option[ActorRef] = {
|
||||
if (actorsByUUID.containsKey(uuid)) Some(actorsByUUID.get(uuid))
|
||||
else None
|
||||
}
|
||||
def actorFor(uuid: String): Option[ActorRef] = Option(actorsByUUID get uuid)
|
||||
|
||||
/**
|
||||
* Registers an actor in the ActorRegistry.
|
||||
|
|
|
|||
|
|
@ -12,22 +12,11 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
|||
abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher {
|
||||
@volatile protected var active: Boolean = false
|
||||
protected val queue = new ReactiveMessageQueue(name)
|
||||
protected val messageInvokers = new HashMap[ActorRef, ActorRef]
|
||||
protected var selectorThread: Thread = _
|
||||
protected val guard = new Object
|
||||
|
||||
def dispatch(invocation: MessageInvocation) = queue.append(invocation)
|
||||
|
||||
override def register(actorRef: ActorRef) = synchronized {
|
||||
messageInvokers.put(actorRef, actorRef)
|
||||
super.register(actorRef)
|
||||
}
|
||||
|
||||
override def unregister(actorRef: ActorRef) = synchronized {
|
||||
messageInvokers.remove(actorRef)
|
||||
super.unregister(actorRef)
|
||||
}
|
||||
|
||||
def shutdown = if (active) {
|
||||
log.debug("Shutting down %s", toString)
|
||||
active = false
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import se.scalablesolutions.akka.config.Config.config
|
|||
import net.lag.configgy.ConfigMap
|
||||
import se.scalablesolutions.akka.util.UUID
|
||||
import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy}
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/**
|
||||
* Scala API. Dispatcher factory.
|
||||
|
|
@ -44,8 +45,8 @@ import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, D
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Dispatchers extends Logging {
|
||||
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
|
||||
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000)
|
||||
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
|
||||
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000)
|
||||
|
||||
lazy val defaultGlobalDispatcher = {
|
||||
config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher)
|
||||
|
|
@ -75,6 +76,7 @@ object Dispatchers extends Logging {
|
|||
|
||||
/**
|
||||
* Creates an thread based dispatcher serving a single actor through the same single thread.
|
||||
* Uses the default timeout
|
||||
* <p/>
|
||||
* E.g. each actor consumes its own thread.
|
||||
*/
|
||||
|
|
@ -82,11 +84,19 @@ object Dispatchers extends Logging {
|
|||
|
||||
/**
|
||||
* Creates an thread based dispatcher serving a single actor through the same single thread.
|
||||
* Uses the default timeout
|
||||
* <p/>
|
||||
* E.g. each actor consumes its own thread.
|
||||
*/
|
||||
def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int) = new ThreadBasedDispatcher(actor, mailboxCapacity)
|
||||
|
||||
/**
|
||||
* Creates an thread based dispatcher serving a single actor through the same single thread.
|
||||
* <p/>
|
||||
* E.g. each actor consumes its own thread.
|
||||
*/
|
||||
def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeout: Long, pushTimeUnit: TimeUnit) = new ThreadBasedDispatcher(actor, mailboxCapacity, pushTimeout, pushTimeUnit)
|
||||
|
||||
/**
|
||||
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
||||
* <p/>
|
||||
|
|
|
|||
|
|
@ -89,12 +89,9 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
|
||||
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
|
||||
|
||||
override def register(actorRef: ActorRef) = {
|
||||
if (actorRef.mailbox eq null ) {
|
||||
if (mailboxCapacity <= 0) actorRef.mailbox = new ConcurrentLinkedQueue[MessageInvocation]
|
||||
else actorRef.mailbox = new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
|
||||
}
|
||||
super.register(actorRef)
|
||||
override def createMailbox(actorRef: ActorRef): AnyRef = {
|
||||
if (mailboxCapacity <= 0) new ConcurrentLinkedQueue[MessageInvocation]
|
||||
else new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
|
||||
}
|
||||
|
||||
def dispatch(receiver: ActorRef): Unit = if (active) {
|
||||
|
|
|
|||
|
|
@ -182,13 +182,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
|||
|
||||
private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
|
||||
|
||||
protected override def createMailbox(actorRef: ActorRef): AnyRef = {
|
||||
if (mailboxCapacity <= 0) new ConcurrentLinkedDeque[MessageInvocation]
|
||||
else new LinkedBlockingDeque[MessageInvocation](mailboxCapacity)
|
||||
}
|
||||
|
||||
override def register(actorRef: ActorRef) = {
|
||||
verifyActorsAreOfSameType(actorRef)
|
||||
// The actor will need a ConcurrentLinkedDeque based mailbox
|
||||
if (actorRef.mailbox == null) {
|
||||
if (mailboxCapacity <= 0) actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation]
|
||||
else actorRef.mailbox = new LinkedBlockingDeque[MessageInvocation](mailboxCapacity)
|
||||
}
|
||||
pooledActors.add(actorRef)
|
||||
super.register(actorRef)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -185,16 +185,10 @@ class HawtDispatcher(val aggregate:Boolean=true, val parent:DispatchQueue=global
|
|||
// TODO: figure out if this can be optional in akka
|
||||
override def mailboxSize(actorRef: ActorRef) = 0
|
||||
|
||||
override def register(actorRef: ActorRef) = {
|
||||
if( actorRef.mailbox == null ) {
|
||||
val queue = parent.createSerialQueue(actorRef.toString)
|
||||
if( aggregate ) {
|
||||
actorRef.mailbox = new AggregatingHawtDispatcherMailbox(queue)
|
||||
} else {
|
||||
actorRef.mailbox = new HawtDispatcherMailbox(queue)
|
||||
}
|
||||
}
|
||||
super.register(actorRef)
|
||||
override def createMailbox(actorRef: ActorRef): AnyRef = {
|
||||
val queue = parent.createSerialQueue(actorRef.toString)
|
||||
if (aggregate) new AggregatingHawtDispatcherMailbox(queue)
|
||||
else new HawtDispatcherMailbox(queue)
|
||||
}
|
||||
|
||||
override def toString = "HawtDispatchEventDrivenDispatcher"
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import se.scalablesolutions.akka.util.{HashCode, Logging}
|
|||
import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationException}
|
||||
|
||||
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
||||
import se.scalablesolutions.akka.AkkaException
|
||||
import java.util.concurrent.{ConcurrentSkipListSet}
|
||||
|
||||
/**
|
||||
|
|
@ -56,6 +57,8 @@ final class MessageInvocation(val receiver: ActorRef,
|
|||
}
|
||||
}
|
||||
|
||||
class MessageQueueAppendFailedException(message: String) extends AkkaException(message)
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
|
|
@ -68,21 +71,41 @@ trait MessageQueue {
|
|||
*/
|
||||
trait MessageDispatcher extends Logging {
|
||||
protected val uuids = new ConcurrentSkipListSet[String]
|
||||
|
||||
def dispatch(invocation: MessageInvocation)
|
||||
|
||||
def start
|
||||
|
||||
def shutdown
|
||||
def register(actorRef: ActorRef) = uuids add actorRef.uuid
|
||||
|
||||
def register(actorRef: ActorRef) {
|
||||
if(actorRef.mailbox eq null)
|
||||
actorRef.mailbox = createMailbox(actorRef)
|
||||
uuids add actorRef.uuid
|
||||
}
|
||||
def unregister(actorRef: ActorRef) = {
|
||||
uuids remove actorRef.uuid
|
||||
//actorRef.mailbox = null //FIXME should we null out the mailbox here?
|
||||
if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero
|
||||
}
|
||||
|
||||
def canBeShutDown: Boolean = uuids.isEmpty
|
||||
|
||||
def isShutdown: Boolean
|
||||
|
||||
/**
|
||||
* Returns the size of the mailbox for the specified actor
|
||||
*/
|
||||
def mailboxSize(actorRef: ActorRef):Int = 0
|
||||
|
||||
/**
|
||||
* Creates and returns a mailbox for the given actor
|
||||
*/
|
||||
protected def createMailbox(actorRef: ActorRef): AnyRef = null
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait MessageDemultiplexer {
|
||||
def select
|
||||
|
|
|
|||
148
akka-actor/src/main/scala/dispatch/Queues.scala
Normal file
148
akka-actor/src/main/scala/dispatch/Queues.scala
Normal file
|
|
@ -0,0 +1,148 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.dispatch
|
||||
|
||||
import concurrent.forkjoin.LinkedTransferQueue
|
||||
import java.util.concurrent.{TimeUnit, Semaphore}
|
||||
import java.util.Iterator
|
||||
import se.scalablesolutions.akka.util.Logger
|
||||
|
||||
class BoundedTransferQueue[E <: AnyRef](
|
||||
val capacity: Int,
|
||||
val pushTimeout: Long,
|
||||
val pushTimeUnit: TimeUnit)
|
||||
extends LinkedTransferQueue[E] {
|
||||
require(capacity > 0)
|
||||
require(pushTimeout > 0)
|
||||
require(pushTimeUnit ne null)
|
||||
|
||||
protected val guard = new Semaphore(capacity)
|
||||
|
||||
override def take(): E = {
|
||||
val e = super.take
|
||||
if (e ne null) guard.release
|
||||
e
|
||||
}
|
||||
|
||||
override def poll(): E = {
|
||||
val e = super.poll
|
||||
if (e ne null) guard.release
|
||||
e
|
||||
}
|
||||
|
||||
override def poll(timeout: Long, unit: TimeUnit): E = {
|
||||
val e = super.poll(timeout,unit)
|
||||
if (e ne null) guard.release
|
||||
e
|
||||
}
|
||||
|
||||
override def remainingCapacity = guard.availablePermits
|
||||
|
||||
override def remove(o: AnyRef): Boolean = {
|
||||
if (super.remove(o)) {
|
||||
guard.release
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
override def offer(e: E): Boolean = {
|
||||
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
|
||||
val result = try {
|
||||
super.offer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else
|
||||
false
|
||||
}
|
||||
|
||||
override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
|
||||
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
|
||||
val result = try {
|
||||
super.offer(e,timeout,unit)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else
|
||||
false
|
||||
}
|
||||
|
||||
override def add(e: E): Boolean = {
|
||||
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
|
||||
val result = try {
|
||||
super.add(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else
|
||||
false
|
||||
}
|
||||
|
||||
override def put(e :E): Unit = {
|
||||
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
|
||||
try {
|
||||
super.put(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def tryTransfer(e: E): Boolean = {
|
||||
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
|
||||
val result = try {
|
||||
super.tryTransfer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else
|
||||
false
|
||||
}
|
||||
|
||||
override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
|
||||
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
|
||||
val result = try {
|
||||
super.tryTransfer(e,timeout,unit)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else
|
||||
false
|
||||
}
|
||||
|
||||
override def transfer(e: E): Unit = {
|
||||
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
|
||||
try {
|
||||
super.transfer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def iterator: Iterator[E] = {
|
||||
val it = super.iterator
|
||||
new Iterator[E] {
|
||||
def hasNext = it.hasNext
|
||||
def next = it.next
|
||||
def remove {
|
||||
it.remove
|
||||
guard.release //Assume remove worked if no exception was thrown
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -29,8 +29,8 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String)
|
|||
val iter = selectedInvocations.iterator
|
||||
while (iter.hasNext) {
|
||||
val invocation = iter.next
|
||||
val invoker = messageInvokers.get(invocation.receiver)
|
||||
if (invoker ne null) invoker.invoke(invocation)
|
||||
val invoker = invocation.receiver
|
||||
if (invoker ne null) invoker invoke invocation
|
||||
iter.remove
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -103,14 +103,14 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
|
|||
|
||||
private def process(selectedInvocations: List[MessageInvocation]) = synchronized {
|
||||
var nrOfBusyMessages = 0
|
||||
val totalNrOfActors = messageInvokers.size
|
||||
val totalNrOfActors = uuids.size
|
||||
val totalNrOfBusyActors = busyActors.size
|
||||
val invocations = selectedInvocations.iterator
|
||||
while (invocations.hasNext && totalNrOfActors > totalNrOfBusyActors && passFairnessCheck(nrOfBusyMessages)) {
|
||||
val invocation = invocations.next
|
||||
if (invocation eq null) throw new IllegalActorStateException("Message invocation is null [" + invocation + "]")
|
||||
if (!busyActors.contains(invocation.receiver)) {
|
||||
val invoker = messageInvokers.get(invocation.receiver)
|
||||
val invoker = invocation.receiver
|
||||
if (invoker eq null) throw new IllegalActorStateException(
|
||||
"Message invoker for invocation [" + invocation + "] is null")
|
||||
resume(invocation.receiver)
|
||||
|
|
|
|||
|
|
@ -4,27 +4,47 @@
|
|||
|
||||
package se.scalablesolutions.akka.dispatch
|
||||
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.Queue
|
||||
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
import se.scalablesolutions.akka.config.Config.config
|
||||
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
|
||||
import concurrent.forkjoin.{TransferQueue, LinkedTransferQueue}
|
||||
|
||||
/**
|
||||
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY) extends MessageDispatcher {
|
||||
class ThreadBasedDispatcher(private val actor: ActorRef,
|
||||
val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY,
|
||||
val pushTimeout: Long = 10000,
|
||||
val pushTimeoutUnit: TimeUnit = TimeUnit.MILLISECONDS
|
||||
) extends MessageDispatcher {
|
||||
def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java
|
||||
|
||||
private val name = actor.getClass.getName + ":" + actor.uuid
|
||||
private val threadName = "akka:thread-based:dispatcher:" + name
|
||||
private val queue = new BlockingMessageQueue(name, mailboxCapacity)
|
||||
private var selectorThread: Thread = _
|
||||
@volatile private var active: Boolean = false
|
||||
|
||||
def dispatch(invocation: MessageInvocation) = queue.append(invocation)
|
||||
override def createMailbox(actorRef: ActorRef): AnyRef = {
|
||||
if (mailboxCapacity > 0)
|
||||
new BoundedTransferQueue[MessageInvocation](mailboxCapacity,pushTimeout,pushTimeoutUnit) with ThreadMessageQueue
|
||||
else
|
||||
new LinkedTransferQueue[MessageInvocation] with ThreadMessageQueue
|
||||
}
|
||||
|
||||
override def register(actorRef: ActorRef) = {
|
||||
if(actorRef != actor)
|
||||
throw new IllegalArgumentException("Cannot register to anyone but " + actor)
|
||||
|
||||
super.register(actorRef)
|
||||
}
|
||||
|
||||
def mailbox = actor.mailbox.asInstanceOf[ThreadMessageQueue]
|
||||
|
||||
def dispatch(invocation: MessageInvocation) = mailbox append invocation
|
||||
|
||||
def start = if (!active) {
|
||||
log.debug("Starting up %s", toString)
|
||||
|
|
@ -33,7 +53,7 @@ class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: In
|
|||
override def run = {
|
||||
while (active) {
|
||||
try {
|
||||
actor.invoke(queue.take)
|
||||
actor.invoke(mailbox.next)
|
||||
} catch { case e: InterruptedException => active = false }
|
||||
}
|
||||
}
|
||||
|
|
@ -53,12 +73,14 @@ class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: In
|
|||
override def toString = "ThreadBasedDispatcher[" + threadName + "]"
|
||||
}
|
||||
|
||||
// FIXME: configure the LinkedBlockingQueue in BlockingMessageQueue, use a Builder like in the ReactorBasedThreadPoolEventDrivenDispatcher
|
||||
class BlockingMessageQueue(name: String, mailboxCapacity: Int) extends MessageQueue {
|
||||
private val queue = if (mailboxCapacity > 0) new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
|
||||
else new LinkedBlockingQueue[MessageInvocation]
|
||||
def append(invocation: MessageInvocation) = queue.put(invocation)
|
||||
def take: MessageInvocation = queue.take
|
||||
def read(destination: Queue[MessageInvocation]) = throw new UnsupportedOperationException
|
||||
def interrupt = throw new UnsupportedOperationException
|
||||
trait ThreadMessageQueue extends MessageQueue { self: TransferQueue[MessageInvocation] =>
|
||||
|
||||
final def append(invocation: MessageInvocation): Unit = {
|
||||
if(!self.tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer
|
||||
if(!self.offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting
|
||||
throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out")
|
||||
}
|
||||
}
|
||||
|
||||
final def next: MessageInvocation = self.take
|
||||
}
|
||||
|
|
|
|||
|
|
@ -99,18 +99,26 @@ class Logger(val logger: SLFLogger) {
|
|||
warning(t,message(fmt,arg,argN:_*))
|
||||
}
|
||||
|
||||
def warn(t: Throwable, fmt: => String, arg: Any, argN: Any*) = warning(t, fmt, arg, argN)
|
||||
|
||||
def warning(t: Throwable, msg: => String) {
|
||||
if (warning_?) logger.warn(msg,t)
|
||||
}
|
||||
|
||||
def warn(t: Throwable, msg: => String) = warning(t, msg)
|
||||
|
||||
def warning(fmt: => String, arg: Any, argN: Any*) {
|
||||
warning(message(fmt,arg,argN:_*))
|
||||
}
|
||||
|
||||
def warn(fmt: => String, arg: Any, argN: Any*) = warning(fmt, arg, argN)
|
||||
|
||||
def warning(msg: => String) {
|
||||
if (warning_?) logger warn msg
|
||||
}
|
||||
|
||||
def warn(msg: => String) = warning(msg)
|
||||
|
||||
//Error
|
||||
def error(t: Throwable, fmt: => String, arg: Any, argN: Any*) {
|
||||
error(t,message(fmt,arg,argN:_*))
|
||||
|
|
|
|||
|
|
@ -1,92 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.dispatch
|
||||
|
||||
import concurrent.forkjoin.LinkedTransferQueue
|
||||
import java.util.concurrent.{TimeUnit, Semaphore}
|
||||
import java.util.Iterator
|
||||
import se.scalablesolutions.akka.util.Logger
|
||||
|
||||
class BoundedTransferQueue[E <: AnyRef](
|
||||
val capacity: Int,
|
||||
val pushTimeout: Long,
|
||||
val pushTimeUnit: TimeUnit)
|
||||
extends LinkedTransferQueue[E] {
|
||||
require(capacity > 0)
|
||||
require(pushTimeout > 0)
|
||||
require(pushTimeUnit ne null)
|
||||
|
||||
protected val guard = new Semaphore(capacity)
|
||||
|
||||
//Enqueue an item within the push timeout (acquire Semaphore)
|
||||
protected def enq(f: => Boolean): Boolean = {
|
||||
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
|
||||
val result = try {
|
||||
f
|
||||
} catch {
|
||||
case e =>
|
||||
guard.release //If something broke, release
|
||||
throw e
|
||||
}
|
||||
if (!result) guard.release //Didn't add anything
|
||||
result
|
||||
} else
|
||||
false
|
||||
}
|
||||
|
||||
//Dequeue an item (release Semaphore)
|
||||
protected def deq(e: E): E = {
|
||||
if (e ne null) guard.release //Signal removal of item
|
||||
e
|
||||
}
|
||||
|
||||
override def take(): E = deq(super.take)
|
||||
override def poll(): E = deq(super.poll)
|
||||
override def poll(timeout: Long, unit: TimeUnit): E = deq(super.poll(timeout,unit))
|
||||
|
||||
override def remainingCapacity = guard.availablePermits
|
||||
|
||||
override def remove(o: AnyRef): Boolean = {
|
||||
if (super.remove(o)) {
|
||||
guard.release
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
override def offer(e: E): Boolean =
|
||||
enq(super.offer(e))
|
||||
|
||||
override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean =
|
||||
enq(super.offer(e,timeout,unit))
|
||||
|
||||
override def add(e: E): Boolean =
|
||||
enq(super.add(e))
|
||||
|
||||
override def put(e :E): Unit =
|
||||
enq({ super.put(e); true })
|
||||
|
||||
override def tryTransfer(e: E): Boolean =
|
||||
enq(super.tryTransfer(e))
|
||||
|
||||
override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean =
|
||||
enq(super.tryTransfer(e,timeout,unit))
|
||||
|
||||
override def transfer(e: E): Unit =
|
||||
enq({ super.transfer(e); true })
|
||||
|
||||
override def iterator: Iterator[E] = {
|
||||
val it = super.iterator
|
||||
new Iterator[E] {
|
||||
def hasNext = it.hasNext
|
||||
def next = it.next
|
||||
def remove {
|
||||
it.remove
|
||||
guard.release //Assume remove worked if no exception was thrown
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -50,11 +50,16 @@ class AkkaServlet extends AtmosphereServlet with Logging {
|
|||
addInitParameter(AtmosphereServlet.PROPERTY_USE_STREAM,"true")
|
||||
addInitParameter("com.sun.jersey.config.property.packages",c.getList("akka.rest.resource_packages").mkString(";"))
|
||||
addInitParameter("com.sun.jersey.spi.container.ResourceFilters",c.getList("akka.rest.filters").mkString(","))
|
||||
c.getInt("akka.rest.maxInactiveActivity").foreach { value =>
|
||||
|
||||
c.getInt("akka.rest.maxInactiveActivity") foreach { value =>
|
||||
log.info("MAX_INACTIVE:%s",value.toString)
|
||||
addInitParameter(CometSupport.MAX_INACTIVE,value.toString)
|
||||
}
|
||||
|
||||
c.getString("akka.rest.cometSupport") foreach { value =>
|
||||
addInitParameter("cometSupport",value)
|
||||
}
|
||||
|
||||
|
||||
val servlet = new AtmosphereRestServlet {
|
||||
override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key)
|
||||
|
|
@ -86,14 +91,7 @@ class AkkaServlet extends AtmosphereServlet with Logging {
|
|||
import scala.collection.JavaConversions._
|
||||
|
||||
new DefaultCometSupportResolver(config) {
|
||||
type CS = CometSupport[_ <: AtmosphereResource[_,_]]
|
||||
override def resolveMultipleNativeSupportConflict(available : JList[Class[_ <: CS]]) : CS = {
|
||||
available.filter(_ != classOf[GrizzlyCometSupport]).toList match {
|
||||
case Nil => new GrizzlyCometSupport(config)
|
||||
case x :: Nil => newCometSupport(x.asInstanceOf[Class[_ <: CS]])
|
||||
case _ => super.resolveMultipleNativeSupportConflict(available)
|
||||
}
|
||||
}
|
||||
type CS = CometSupport[_ <: AtmosphereResource[_,_]]
|
||||
|
||||
override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CS = {
|
||||
val predef = config.getInitParameter("cometSupport")
|
||||
|
|
|
|||
|
|
@ -4,16 +4,16 @@
|
|||
|
||||
package se.scalablesolutions.akka.kernel
|
||||
|
||||
import com.sun.grizzly.http.SelectorThread
|
||||
import com.sun.grizzly.http.servlet.{ ServletAdapter }
|
||||
import com.sun.grizzly.standalone.StaticStreamAlgorithm
|
||||
|
||||
import javax.ws.rs.core.UriBuilder
|
||||
import javax.servlet.ServletConfig
|
||||
|
||||
import se.scalablesolutions.akka.actor.BootableActorLoaderService
|
||||
import se.scalablesolutions.akka.util.{Bootable, Logging}
|
||||
import se.scalablesolutions.akka.comet.{ AkkaServlet }
|
||||
import org.eclipse.jetty.xml.XmlConfiguration
|
||||
import java.io.File
|
||||
import org.eclipse.jetty.server.{Handler, Server}
|
||||
import org.eclipse.jetty.server.handler.{HandlerList, HandlerCollection, ContextHandler}
|
||||
|
||||
/**
|
||||
* Handles the Akka Comet Support (load/unload)
|
||||
|
|
@ -24,67 +24,46 @@ trait EmbeddedAppServer extends Bootable with Logging {
|
|||
import se.scalablesolutions.akka.config.Config._
|
||||
|
||||
val REST_HOSTNAME = config.getString("akka.rest.hostname", "localhost")
|
||||
val REST_URL = "http://" + REST_HOSTNAME
|
||||
val REST_PORT = config.getInt("akka.rest.port", 9998)
|
||||
|
||||
protected var jerseySelectorThread: Option[SelectorThread] = None
|
||||
protected var server: Option[Server] = None
|
||||
|
||||
abstract override def onLoad = {
|
||||
super.onLoad
|
||||
if (config.getBool("akka.rest.service", true)) {
|
||||
log.info("Attempting to start Akka REST service (Jersey)")
|
||||
|
||||
System.setProperty("jetty.port",REST_PORT.toString)
|
||||
System.setProperty("jetty.host",REST_HOSTNAME)
|
||||
System.setProperty("jetty.home",HOME.get + "/deploy/root")
|
||||
|
||||
val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build()
|
||||
|
||||
val scheme = uri.getScheme
|
||||
if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException(
|
||||
"The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'")
|
||||
|
||||
log.info("Attempting to start REST service on uri [%s]",uri)
|
||||
|
||||
val adapter = new ServletAdapter
|
||||
adapter.setHandleStaticResources(true)
|
||||
adapter.setServletInstance(new AkkaServlet {
|
||||
override def init(sc : ServletConfig): Unit = {
|
||||
val cl = Thread.currentThread.getContextClassLoader
|
||||
try {
|
||||
Thread.currentThread.setContextClassLoader(applicationLoader.get)
|
||||
super.init(sc)
|
||||
val configuration = new XmlConfiguration(new File(HOME.get + "/config/microkernel-server.xml").toURI.toURL)
|
||||
|
||||
server = Option(configuration.configure.asInstanceOf[Server]) map { s => //Set the correct classloader to our contexts
|
||||
applicationLoader foreach { loader =>
|
||||
//We need to provide the correct classloader to the servlets
|
||||
def setClassLoader(handlers: Seq[Handler]): Unit = {
|
||||
handlers foreach {
|
||||
case c: ContextHandler => c.setClassLoader(loader)
|
||||
case c: HandlerCollection => setClassLoader(c.getHandlers)
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
finally {
|
||||
Thread.currentThread.setContextClassLoader(cl)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
adapter.setContextPath(uri.getPath)
|
||||
adapter.addInitParameter("cometSupport",
|
||||
"org.atmosphere.container.GrizzlyCometSupport")
|
||||
adapter.addInitParameter("com.sun.jersey.config.property.resourceConfigClass",
|
||||
"com.sun.jersey.api.core.PackagesResourceConfig")
|
||||
|
||||
if (HOME.isDefined) adapter.addRootFolder(HOME.get + "/deploy/root")
|
||||
log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolders, adapter.getContextPath)
|
||||
|
||||
val ah = new com.sun.grizzly.arp.DefaultAsyncHandler
|
||||
ah.addAsyncFilter(new com.sun.grizzly.comet.CometAsyncFilter)
|
||||
jerseySelectorThread = Some(new SelectorThread).map { t =>
|
||||
t.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName)
|
||||
t.setPort(REST_PORT)
|
||||
t.setAdapter(adapter)
|
||||
t.setEnableAsyncExecution(true)
|
||||
t.setAsyncHandler(ah)
|
||||
t.listen
|
||||
t
|
||||
setClassLoader(s.getHandlers)
|
||||
}
|
||||
//Start the server
|
||||
s.start()
|
||||
s
|
||||
}
|
||||
log.info("REST service started successfully. Listening to port [%s]", REST_PORT)
|
||||
log.info("Akka REST service started (Jersey)")
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def onUnload = {
|
||||
super.onUnload
|
||||
jerseySelectorThread foreach { (t) => {
|
||||
server foreach { t => {
|
||||
log.info("Shutting down REST service (Jersey)")
|
||||
t.stopEndpoint
|
||||
t.stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
import _root_.org.mortbay.jetty.Connector
|
||||
import _root_.org.mortbay.jetty.Server
|
||||
import _root_.org.mortbay.jetty.webapp.WebAppContext
|
||||
import org.eclipse.jetty.webapp.WebAppContext
|
||||
import org.eclipse.jetty.server.Server
|
||||
|
||||
object RunWebApp extends Application {
|
||||
val server = new Server(8080)
|
||||
|
|
@ -9,7 +8,7 @@ object RunWebApp extends Application {
|
|||
context.setContextPath("/")
|
||||
context.setWar("src/main/webapp")
|
||||
|
||||
server.addHandler(context)
|
||||
server.setHandler(context)
|
||||
|
||||
try {
|
||||
println(">>> STARTING EMBEDDED JETTY SERVER, PRESS ANY KEY TO STOP")
|
||||
|
|
|
|||
|
|
@ -80,6 +80,7 @@ akka {
|
|||
service = on
|
||||
hostname = "localhost"
|
||||
port = 9998
|
||||
#cometSupport = "org.atmosphere.container.Jetty7CometSupport" # Disregard autodetection, for valid values: http://doc.akkasource.org/comet
|
||||
filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use
|
||||
resource_packages = ["sample.rest.scala",
|
||||
"sample.rest.java",
|
||||
|
|
|
|||
100
config/microkernel-server.xml
Normal file
100
config/microkernel-server.xml
Normal file
|
|
@ -0,0 +1,100 @@
|
|||
<?xml version="1.0"?>
|
||||
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure.dtd">
|
||||
|
||||
<!-- =============================================================== -->
|
||||
<!-- Configure the Jetty Server -->
|
||||
<!-- -->
|
||||
<!-- Documentation of this file format can be found at: -->
|
||||
<!-- http://wiki.eclipse.org/Jetty/Reference/jetty.xml_syntax -->
|
||||
<!-- -->
|
||||
<!-- Additional configuration files are available in $JETTY_HOME/etc -->
|
||||
<!-- and can be mixed in. For example: -->
|
||||
<!-- java -jar start.jar etc/jetty.xml etc/jetty-ssl.xml -->
|
||||
<!-- -->
|
||||
<!-- See start.ini file for the default configuraton files -->
|
||||
<!-- =============================================================== -->
|
||||
|
||||
|
||||
<Configure id="Server" class="org.eclipse.jetty.server.Server">
|
||||
|
||||
<!-- =========================================================== -->
|
||||
<!-- Server Thread Pool -->
|
||||
<!-- =========================================================== -->
|
||||
<Set name="ThreadPool">
|
||||
<!-- Default queued blocking threadpool -->
|
||||
<New class="org.eclipse.jetty.util.thread.QueuedThreadPool">
|
||||
<Set name="minThreads">10</Set>
|
||||
<Set name="maxThreads">200</Set>
|
||||
</New>
|
||||
</Set>
|
||||
|
||||
<!-- =========================================================== -->
|
||||
<!-- Set connectors -->
|
||||
<!-- =========================================================== -->
|
||||
|
||||
<Call name="addConnector">
|
||||
<Arg>
|
||||
<New class="org.eclipse.jetty.server.nio.SelectChannelConnector">
|
||||
<Set name="host"><SystemProperty name="jetty.host" /></Set>
|
||||
<Set name="port"><SystemProperty name="jetty.port" default="8080"/></Set>
|
||||
<Set name="maxIdleTime">300000</Set>
|
||||
<Set name="Acceptors">2</Set>
|
||||
<Set name="statsOn">false</Set>
|
||||
<Set name="confidentialPort">8443</Set>
|
||||
<Set name="lowResourcesConnections">20000</Set>
|
||||
<Set name="lowResourcesMaxIdleTime">5000</Set>
|
||||
</New>
|
||||
</Arg>
|
||||
</Call>
|
||||
|
||||
<!-- Uncomment this and enter your SSL config/credentials to enable https
|
||||
<Call name="addConnector">
|
||||
<Arg>
|
||||
<New class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector">
|
||||
<Set name="Port">8443</Set>
|
||||
<Set name="maxIdleTime">30000</Set>
|
||||
<Set name="Acceptors">2</Set>
|
||||
<Set name="AcceptQueueSize">100</Set>
|
||||
<Set name="Keystore"><SystemProperty name="jetty.home" default="." />/etc/keystore</Set>
|
||||
<Set name="Password">PASSWORD</Set>
|
||||
<Set name="KeyPassword">KEYPASSWORD</Set>
|
||||
<Set name="truststore"><SystemProperty name="jetty.home" default="." />/etc/keystore</Set>
|
||||
<Set name="trustPassword">TRUSTPASSWORD</Set>
|
||||
</New>
|
||||
</Arg>
|
||||
</Call>
|
||||
-->
|
||||
|
||||
<!-- =========================================================== -->
|
||||
<!-- Set handler Collection Structure -->
|
||||
<!-- =========================================================== -->
|
||||
<Set name="handler">
|
||||
<New id="Handlers" class="org.eclipse.jetty.server.handler.HandlerCollection">
|
||||
<Set name="handlers">
|
||||
<Array type="org.eclipse.jetty.server.Handler">
|
||||
<Item>
|
||||
<New id="AkkaRestHandler" class="org.eclipse.jetty.servlet.ServletContextHandler">
|
||||
<Set name="contextPath">/</Set>
|
||||
<Call name="addServlet">
|
||||
<Arg>se.scalablesolutions.akka.comet.AkkaServlet</Arg>
|
||||
<Arg>/*</Arg>
|
||||
</Call>
|
||||
</New>
|
||||
</Item>
|
||||
<Item>
|
||||
<New id="DefaultHandler" class="org.eclipse.jetty.server.handler.DefaultHandler"/>
|
||||
</Item>
|
||||
</Array>
|
||||
</Set>
|
||||
</New>
|
||||
</Set>
|
||||
|
||||
<!-- =========================================================== -->
|
||||
<!-- extra options -->
|
||||
<!-- =========================================================== -->
|
||||
<Set name="stopAtShutdown">true</Set>
|
||||
<Set name="sendServerVersion">true</Set>
|
||||
<Set name="sendDateHeader">true</Set>
|
||||
<Set name="gracefulShutdown">1000</Set>
|
||||
|
||||
</Configure>
|
||||
|
|
@ -60,7 +60,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
|
||||
import Repositories._
|
||||
lazy val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo)
|
||||
lazy val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo)
|
||||
lazy val jettyModuleConfig = ModuleConfiguration("org.eclipse.jetty", sbt.DefaultMavenRepository)
|
||||
lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo)
|
||||
// lazy val hawtdispatchModuleConfig = ModuleConfiguration("org.fusesource.hawtdispatch", FusesourceSnapshotRepo)
|
||||
lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo)
|
||||
|
|
@ -90,12 +90,13 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
lazy val JACKSON_VERSION = "1.2.1"
|
||||
lazy val JERSEY_VERSION = "1.2"
|
||||
lazy val LIFT_VERSION = "2.1-M1"
|
||||
lazy val MULTIVERSE_VERSION = "0.6"
|
||||
lazy val MULTIVERSE_VERSION = "0.6.1"
|
||||
lazy val SCALATEST_VERSION = "1.2-for-scala-2.8.0.final-SNAPSHOT"
|
||||
lazy val LOGBACK_VERSION = "0.9.24"
|
||||
lazy val SLF4J_VERSION = "1.6.0"
|
||||
lazy val SPRING_VERSION = "3.0.3.RELEASE"
|
||||
lazy val ASPECTWERKZ_VERSION = "2.2.1"
|
||||
lazy val JETTY_VERSION = "7.1.6.v20100715"
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
// Dependencies
|
||||
|
|
@ -135,7 +136,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
lazy val dispatch_http = "net.databinder" % "dispatch-http_2.8.0" % DISPATCH_VERSION % "compile"
|
||||
lazy val dispatch_json = "net.databinder" % "dispatch-json_2.8.0" % DISPATCH_VERSION % "compile"
|
||||
|
||||
lazy val grizzly = "com.sun.grizzly" % "grizzly-comet-webserver" % "1.9.18-i" % "compile"
|
||||
lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "compile"
|
||||
lazy val jetty_util = "org.eclipse.jetty" % "jetty-util" % JETTY_VERSION % "compile"
|
||||
lazy val jetty_xml = "org.eclipse.jetty" % "jetty-xml" % JETTY_VERSION % "compile"
|
||||
|
||||
lazy val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile"
|
||||
|
||||
|
|
@ -207,7 +210,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
lazy val commons_coll = "commons-collections" % "commons-collections" % "3.2.1" % "test"
|
||||
lazy val google_coll = "com.google.collections" % "google-collections" % "1.0" % "test"
|
||||
lazy val high_scale = "org.apache.cassandra" % "high-scale-lib" % CASSANDRA_VERSION % "test"
|
||||
lazy val jettyServer = "org.mortbay.jetty" % "jetty" % "6.1.22" % "test"
|
||||
lazy val testJetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "test"
|
||||
lazy val testJettyWebApp= "org.eclipse.jetty" % "jetty-webapp" % JETTY_VERSION % "test"
|
||||
|
||||
lazy val junit = "junit" % "junit" % "4.5" % "test"
|
||||
lazy val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test"
|
||||
lazy val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test"
|
||||
|
|
@ -417,7 +422,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
val atmo_runtime = Dependencies.atmo_runtime
|
||||
val atmo_tomcat = Dependencies.atmo_tomcat
|
||||
val atmo_weblogic = Dependencies.atmo_weblogic
|
||||
val grizzly = Dependencies.grizzly
|
||||
val jetty = Dependencies.jetty
|
||||
val jetty_util = Dependencies.jetty_util
|
||||
val jetty_xml = Dependencies.jetty_xml
|
||||
val jackson_core_asl = Dependencies.jackson_core_asl
|
||||
val jersey = Dependencies.jersey
|
||||
val jersey_contrib = Dependencies.jersey_contrib
|
||||
|
|
@ -656,8 +663,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
val servlet = Dependencies.servlet
|
||||
|
||||
// testing
|
||||
val jettyServer = Dependencies.jettyServer
|
||||
val junit = Dependencies.junit
|
||||
val testJetty = Dependencies.testJetty
|
||||
val testJettyWebApp = Dependencies.testJettyWebApp
|
||||
val junit = Dependencies.junit
|
||||
|
||||
def deployPath = AkkaParentProject.this.deployPath
|
||||
override def jarPath = warPath
|
||||
|
|
@ -758,10 +766,13 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
|
||||
// ------------------------------------------------------------
|
||||
class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info) with DeployProject with OSGiProject {
|
||||
lazy val sourceArtifact = Artifact(this.artifactID, "sources", "jar", Some("sources"), Nil, None)
|
||||
lazy val docsArtifact = Artifact(this.artifactID, "docs", "jar", Some("docs"), Nil, None)
|
||||
override def runClasspath = super.runClasspath +++ (AkkaParentProject.this.info.projectPath / "config")
|
||||
override def testClasspath = super.testClasspath +++ (AkkaParentProject.this.info.projectPath / "config")
|
||||
override def packageDocsJar = this.defaultJarPath("-docs.jar")
|
||||
override def packageSrcJar = this.defaultJarPath("-sources.jar")
|
||||
override def packageToPublishActions = super.packageToPublishActions ++ Seq(this.packageDocs, this.packageSrc)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue