Merge branch 'master' of git@github.com:jboner/akka
This commit is contained in:
commit
698fbf95b2
12 changed files with 272 additions and 327 deletions
|
|
@ -196,19 +196,10 @@ trait ActorRef extends
|
|||
*/
|
||||
@volatile private[akka] var _transactionFactory: Option[TransactionFactory] = None
|
||||
|
||||
/**
|
||||
* This lock ensures thread safety in the dispatching: only one message can
|
||||
* be dispatched at once on the actor.
|
||||
*/
|
||||
protected[akka] val dispatcherLock = new ReentrantLock
|
||||
|
||||
/**
|
||||
* This is a reference to the message currently being processed by the actor
|
||||
*/
|
||||
protected[akka] var _currentMessage: Option[MessageInvocation] = None
|
||||
|
||||
protected[akka] def currentMessage_=(msg: Option[MessageInvocation]) = guard.withGuard { _currentMessage = msg }
|
||||
protected[akka] def currentMessage = guard.withGuard { _currentMessage }
|
||||
@volatile protected[akka] var currentMessage: MessageInvocation = null
|
||||
|
||||
/**
|
||||
* Comparison only takes uuid into account.
|
||||
|
|
@ -978,12 +969,12 @@ class LocalActorRef private[akka](
|
|||
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
|
||||
joinTransaction(message)
|
||||
|
||||
if (isRemotingEnabled && remoteAddress.isDefined) {
|
||||
if (remoteAddress.isDefined && isRemotingEnabled) {
|
||||
RemoteClientModule.send[Any](
|
||||
message, senderOption, None, remoteAddress.get, timeout, true, this, None, ActorType.ScalaActor)
|
||||
} else {
|
||||
val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get)
|
||||
invocation.send
|
||||
dispatcher dispatch invocation
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -994,7 +985,7 @@ class LocalActorRef private[akka](
|
|||
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
|
||||
joinTransaction(message)
|
||||
|
||||
if (isRemotingEnabled && remoteAddress.isDefined) {
|
||||
if (remoteAddress.isDefined && isRemotingEnabled) {
|
||||
val future = RemoteClientModule.send[T](
|
||||
message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, ActorType.ScalaActor)
|
||||
if (future.isDefined) future.get
|
||||
|
|
@ -1004,7 +995,7 @@ class LocalActorRef private[akka](
|
|||
else new DefaultCompletableFuture[T](timeout)
|
||||
val invocation = new MessageInvocation(
|
||||
this, message, senderOption, Some(future.asInstanceOf[CompletableFuture[Any]]), transactionSet.get)
|
||||
invocation.send
|
||||
dispatcher dispatch invocation
|
||||
future
|
||||
}
|
||||
}
|
||||
|
|
@ -1016,7 +1007,7 @@ class LocalActorRef private[akka](
|
|||
if (isShutdown)
|
||||
Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle)
|
||||
else {
|
||||
currentMessage = Option(messageHandle)
|
||||
currentMessage = messageHandle
|
||||
try {
|
||||
dispatch(messageHandle)
|
||||
} catch {
|
||||
|
|
@ -1024,7 +1015,7 @@ class LocalActorRef private[akka](
|
|||
Actor.log.error(e, "Could not invoke actor [%s]", this)
|
||||
throw e
|
||||
} finally {
|
||||
currentMessage = None //TODO: Don't reset this, we might want to resend the message
|
||||
currentMessage = null //TODO: Don't reset this, we might want to resend the message
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1188,7 +1179,7 @@ class LocalActorRef private[akka](
|
|||
}
|
||||
|
||||
private def dispatch[T](messageHandle: MessageInvocation) = {
|
||||
Actor.log.trace("Invoking actor with message:\n" + messageHandle)
|
||||
Actor.log.trace("Invoking actor with message: %s\n",messageHandle)
|
||||
val message = messageHandle.message //serializeMessage(messageHandle.message)
|
||||
var topLevelTransaction = false
|
||||
val txSet: Option[CountDownCommitBarrier] =
|
||||
|
|
@ -1535,10 +1526,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
|
|||
* Is defined if the message was sent from another Actor, else None.
|
||||
*/
|
||||
def sender: Option[ActorRef] = {
|
||||
// Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender }
|
||||
val msg = currentMessage
|
||||
if (msg.isEmpty) None
|
||||
else msg.get.sender
|
||||
if (msg eq null) None
|
||||
else msg.sender
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -1546,10 +1536,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
|
|||
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
|
||||
*/
|
||||
def senderFuture(): Option[CompletableFuture[Any]] = {
|
||||
// Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture }
|
||||
val msg = currentMessage
|
||||
if (msg.isEmpty) None
|
||||
else msg.get.senderFuture
|
||||
if (msg eq null) None
|
||||
else msg.senderFuture
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -125,7 +125,7 @@ object ActorRegistry extends ListenerManagement {
|
|||
actorsByUUID.put(actor.uuid, actor)
|
||||
|
||||
// notify listeners
|
||||
foreachListener(_ ! ActorRegistered(actor))
|
||||
notifyListeners(ActorRegistered(actor))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -137,7 +137,7 @@ object ActorRegistry extends ListenerManagement {
|
|||
actorsById.remove(actor.id,actor)
|
||||
|
||||
// notify listeners
|
||||
foreachListener(_ ! ActorUnregistered(actor))
|
||||
notifyListeners(ActorUnregistered(actor))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ package se.scalablesolutions.akka.dispatch
|
|||
import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException}
|
||||
|
||||
import java.util.Queue
|
||||
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
|
||||
import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
|
||||
|
||||
/**
|
||||
* Default settings are:
|
||||
|
|
@ -64,7 +64,7 @@ import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
|
|||
*/
|
||||
class ExecutorBasedEventDrivenDispatcher(
|
||||
_name: String,
|
||||
throughput: Int = Dispatchers.THROUGHPUT,
|
||||
val throughput: Int = Dispatchers.THROUGHPUT,
|
||||
mailboxConfig: MailboxConfig = Dispatchers.MAILBOX_CONFIG,
|
||||
config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder {
|
||||
|
||||
|
|
@ -80,71 +80,84 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
val name = "akka:event-driven:dispatcher:" + _name
|
||||
init
|
||||
|
||||
/**
|
||||
* This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox
|
||||
*/
|
||||
trait ExecutableMailbox extends Runnable { self: MessageQueue =>
|
||||
final def run = {
|
||||
|
||||
val reschedule = try {
|
||||
processMailbox()
|
||||
} finally {
|
||||
dispatcherLock.unlock()
|
||||
}
|
||||
|
||||
if (reschedule || !self.isEmpty)
|
||||
registerForExecution(self)
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the messages in the mailbox
|
||||
*
|
||||
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
|
||||
*/
|
||||
final def processMailbox(): Boolean = {
|
||||
val throttle = throughput > 0
|
||||
var processedMessages = 0
|
||||
var nextMessage = self.dequeue
|
||||
if (nextMessage ne null) {
|
||||
do {
|
||||
nextMessage.invoke
|
||||
|
||||
if(throttle) { //Will be elided when false
|
||||
processedMessages += 1
|
||||
if (processedMessages >= throughput) //If we're throttled, break out
|
||||
return !self.isEmpty
|
||||
}
|
||||
nextMessage = self.dequeue
|
||||
}
|
||||
while (nextMessage ne null)
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
def dispatch(invocation: MessageInvocation) = {
|
||||
getMailbox(invocation.receiver) enqueue invocation
|
||||
dispatch(invocation.receiver)
|
||||
val mbox = getMailbox(invocation.receiver)
|
||||
mbox enqueue invocation
|
||||
registerForExecution(mbox)
|
||||
}
|
||||
|
||||
protected def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) {
|
||||
if (mailbox.dispatcherLock.tryLock()) {
|
||||
try {
|
||||
executor execute mailbox
|
||||
} catch {
|
||||
case e: RejectedExecutionException =>
|
||||
mailbox.dispatcherLock.unlock()
|
||||
throw e
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox)
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the mailbox associated with the actor
|
||||
*/
|
||||
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue]
|
||||
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
|
||||
|
||||
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
|
||||
|
||||
override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(bounds = mailboxCapacity, blockDequeue = false)
|
||||
|
||||
def dispatch(receiver: ActorRef): Unit = if (active) {
|
||||
|
||||
executor.execute(new Runnable() {
|
||||
def run = {
|
||||
var lockAcquiredOnce = false
|
||||
var finishedBeforeMailboxEmpty = false
|
||||
val lock = receiver.dispatcherLock
|
||||
val mailbox = getMailbox(receiver)
|
||||
// this do-while loop is required to prevent missing new messages between the end of the inner while
|
||||
// loop and releasing the lock
|
||||
do {
|
||||
if (lock.tryLock) {
|
||||
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
|
||||
lockAcquiredOnce = true
|
||||
try {
|
||||
finishedBeforeMailboxEmpty = processMailbox(receiver)
|
||||
} finally {
|
||||
lock.unlock
|
||||
if (finishedBeforeMailboxEmpty) dispatch(receiver)
|
||||
}
|
||||
}
|
||||
} while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty))
|
||||
}
|
||||
})
|
||||
} else {
|
||||
log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, receiver)
|
||||
override def createMailbox(actorRef: ActorRef): AnyRef = {
|
||||
if (mailboxCapacity > 0)
|
||||
new DefaultBoundedMessageQueue(mailboxCapacity,mailboxConfig.pushTimeOut,blockDequeue = false) with ExecutableMailbox
|
||||
else
|
||||
new DefaultUnboundedMessageQueue(blockDequeue = false) with ExecutableMailbox
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Process the messages in the mailbox of the given actor.
|
||||
*
|
||||
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
|
||||
*/
|
||||
def processMailbox(receiver: ActorRef): Boolean = {
|
||||
var processedMessages = 0
|
||||
val mailbox = getMailbox(receiver)
|
||||
var messageInvocation = mailbox.dequeue
|
||||
while (messageInvocation != null) {
|
||||
messageInvocation.invoke
|
||||
processedMessages += 1
|
||||
// check if we simply continue with other messages, or reached the throughput limit
|
||||
if (throughput <= 0 || processedMessages < throughput) messageInvocation = mailbox.dequeue
|
||||
else {
|
||||
messageInvocation = null
|
||||
return !mailbox.isEmpty
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
def start = if (!active) {
|
||||
log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
|
||||
active = true
|
||||
|
|
@ -157,8 +170,10 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
uuids.clear
|
||||
}
|
||||
|
||||
def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException(
|
||||
def ensureNotActive(): Unit = if (active) {
|
||||
throw new IllegalActorStateException(
|
||||
"Can't build a new thread pool for a dispatcher that is already up and running")
|
||||
}
|
||||
|
||||
override def toString = "ExecutorBasedEventDrivenDispatcher[" + name + "]"
|
||||
|
||||
|
|
|
|||
|
|
@ -56,21 +56,14 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
|||
/**
|
||||
* @return the mailbox associated with the actor
|
||||
*/
|
||||
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Deque[MessageInvocation]]
|
||||
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Deque[MessageInvocation] with MessageQueue with Runnable]
|
||||
|
||||
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
|
||||
|
||||
def dispatch(invocation: MessageInvocation) = if (active) {
|
||||
getMailbox(invocation.receiver).add(invocation)
|
||||
executor.execute(new Runnable() {
|
||||
def run = {
|
||||
if (!tryProcessMailbox(invocation.receiver)) {
|
||||
// we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox
|
||||
// to another actor and then process his mailbox in stead.
|
||||
findThief(invocation.receiver).foreach( tryDonateAndProcessMessages(invocation.receiver,_) )
|
||||
}
|
||||
}
|
||||
})
|
||||
val mbox = getMailbox(invocation.receiver)
|
||||
mbox enqueue invocation
|
||||
executor execute mbox
|
||||
} else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started")
|
||||
|
||||
/**
|
||||
|
|
@ -79,22 +72,21 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
|||
*
|
||||
* @return true if the mailbox was processed, false otherwise
|
||||
*/
|
||||
private def tryProcessMailbox(receiver: ActorRef): Boolean = {
|
||||
private def tryProcessMailbox(mailbox: MessageQueue): Boolean = {
|
||||
var lockAcquiredOnce = false
|
||||
val lock = receiver.dispatcherLock
|
||||
|
||||
// this do-wile loop is required to prevent missing new messages between the end of processing
|
||||
// the mailbox and releasing the lock
|
||||
do {
|
||||
if (lock.tryLock) {
|
||||
if (mailbox.dispatcherLock.tryLock) {
|
||||
lockAcquiredOnce = true
|
||||
try {
|
||||
processMailbox(receiver)
|
||||
processMailbox(mailbox)
|
||||
} finally {
|
||||
lock.unlock
|
||||
mailbox.dispatcherLock.unlock
|
||||
}
|
||||
}
|
||||
} while ((lockAcquiredOnce && !getMailbox(receiver).isEmpty))
|
||||
} while ((lockAcquiredOnce && !mailbox.isEmpty))
|
||||
|
||||
lockAcquiredOnce
|
||||
}
|
||||
|
|
@ -102,12 +94,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
|||
/**
|
||||
* Process the messages in the mailbox of the given actor.
|
||||
*/
|
||||
private def processMailbox(receiver: ActorRef) = {
|
||||
val mailbox = getMailbox(receiver)
|
||||
var messageInvocation = mailbox.poll
|
||||
while (messageInvocation != null) {
|
||||
private def processMailbox(mailbox: MessageQueue) = {
|
||||
var messageInvocation = mailbox.dequeue
|
||||
while (messageInvocation ne null) {
|
||||
messageInvocation.invoke
|
||||
messageInvocation = mailbox.poll
|
||||
messageInvocation = mailbox.dequeue
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -145,11 +136,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
|||
* the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox.
|
||||
*/
|
||||
private def tryDonateAndProcessMessages(receiver: ActorRef, thief: ActorRef) = {
|
||||
if (thief.dispatcherLock.tryLock) {
|
||||
val mailbox = getMailbox(thief)
|
||||
if (mailbox.dispatcherLock.tryLock) {
|
||||
try {
|
||||
while(donateMessage(receiver, thief)) processMailbox(thief)
|
||||
while(donateMessage(receiver, thief)) processMailbox(mailbox)
|
||||
} finally {
|
||||
thief.dispatcherLock.unlock
|
||||
mailbox.dispatcherLock.unlock
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -191,18 +183,44 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
|||
}
|
||||
|
||||
protected override def createMailbox(actorRef: ActorRef): AnyRef = {
|
||||
if (mailboxCapacity <= 0) new ConcurrentLinkedDeque[MessageInvocation]
|
||||
else new LinkedBlockingDeque[MessageInvocation](mailboxCapacity)
|
||||
if (mailboxCapacity <= 0) {
|
||||
new ConcurrentLinkedDeque[MessageInvocation] with MessageQueue with Runnable {
|
||||
def enqueue(handle: MessageInvocation): Unit = this.add(handle)
|
||||
def dequeue: MessageInvocation = this.poll()
|
||||
|
||||
def run = {
|
||||
if (!tryProcessMailbox(this)) {
|
||||
// we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox
|
||||
// to another actor and then process his mailbox in stead.
|
||||
findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) )
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
new LinkedBlockingDeque[MessageInvocation](mailboxCapacity) with MessageQueue with Runnable {
|
||||
def enqueue(handle: MessageInvocation): Unit = this.add(handle)
|
||||
def dequeue: MessageInvocation = this.poll()
|
||||
|
||||
def run = {
|
||||
if (!tryProcessMailbox(this)) {
|
||||
// we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox
|
||||
// to another actor and then process his mailbox in stead.
|
||||
findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) )
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def register(actorRef: ActorRef) = {
|
||||
verifyActorsAreOfSameType(actorRef)
|
||||
pooledActors.add(actorRef)
|
||||
pooledActors add actorRef
|
||||
super.register(actorRef)
|
||||
}
|
||||
|
||||
override def unregister(actorRef: ActorRef) = {
|
||||
pooledActors.remove(actorRef)
|
||||
pooledActors remove actorRef
|
||||
super.unregister(actorRef)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,10 +8,10 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationExce
|
|||
|
||||
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
||||
import se.scalablesolutions.akka.AkkaException
|
||||
import se.scalablesolutions.akka.util.{Duration, HashCode, Logging}
|
||||
import java.util.{Queue, List}
|
||||
import java.util.concurrent._
|
||||
import concurrent.forkjoin.LinkedTransferQueue
|
||||
import se.scalablesolutions.akka.util.{SimpleLock, Duration, HashCode, Logging}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
|
|
@ -30,8 +30,6 @@ final class MessageInvocation(val receiver: ActorRef,
|
|||
"Don't call 'self ! message' in the Actor's constructor (e.g. body of the class).")
|
||||
}
|
||||
|
||||
def send = receiver.dispatcher.dispatch(this)
|
||||
|
||||
override def hashCode(): Int = synchronized {
|
||||
var result = HashCode.SEED
|
||||
result = HashCode.hash(result, receiver.actor)
|
||||
|
|
@ -63,6 +61,7 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait MessageQueue {
|
||||
val dispatcherLock = new SimpleLock
|
||||
def enqueue(handle: MessageInvocation)
|
||||
def dequeue(): MessageInvocation
|
||||
def size: Int
|
||||
|
|
@ -84,40 +83,36 @@ case class MailboxConfig(capacity: Int, pushTimeOut: Option[Duration], blockingD
|
|||
*/
|
||||
def newMailbox(bounds: Int = capacity,
|
||||
pushTime: Option[Duration] = pushTimeOut,
|
||||
blockDequeue: Boolean = blockingDequeue) : MessageQueue = {
|
||||
if (bounds <= 0) { //UNBOUNDED: Will never block enqueue and optionally blocking dequeue
|
||||
new LinkedTransferQueue[MessageInvocation] with MessageQueue {
|
||||
def enqueue(handle: MessageInvocation): Unit = this add handle
|
||||
def dequeue(): MessageInvocation = {
|
||||
if(blockDequeue) this.take()
|
||||
else this.poll()
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (pushTime.isDefined) { //BOUNDED: Timeouted enqueue with MessageQueueAppendFailedException and optionally blocking dequeue
|
||||
val time = pushTime.get
|
||||
new BoundedTransferQueue[MessageInvocation](bounds) with MessageQueue {
|
||||
def enqueue(handle: MessageInvocation) {
|
||||
if (!this.offer(handle,time.length,time.unit))
|
||||
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + this.toString)
|
||||
}
|
||||
blockDequeue: Boolean = blockingDequeue) : MessageQueue =
|
||||
if (capacity > 0) new DefaultBoundedMessageQueue(bounds,pushTime,blockDequeue)
|
||||
else new DefaultUnboundedMessageQueue(blockDequeue)
|
||||
}
|
||||
|
||||
def dequeue(): MessageInvocation = {
|
||||
if (blockDequeue) this.take()
|
||||
else this.poll()
|
||||
}
|
||||
}
|
||||
class DefaultUnboundedMessageQueue(blockDequeue: Boolean) extends LinkedBlockingQueue[MessageInvocation] with MessageQueue {
|
||||
final def enqueue(handle: MessageInvocation) {
|
||||
this add handle
|
||||
}
|
||||
|
||||
final def dequeue(): MessageInvocation =
|
||||
if (blockDequeue) this.take()
|
||||
else this.poll()
|
||||
}
|
||||
|
||||
class DefaultBoundedMessageQueue(capacity: Int, pushTimeOut: Option[Duration], blockDequeue: Boolean) extends LinkedBlockingQueue[MessageInvocation](capacity) with MessageQueue {
|
||||
final def enqueue(handle: MessageInvocation) {
|
||||
if (pushTimeOut.isDefined) {
|
||||
if(!this.offer(handle,pushTimeOut.get.length,pushTimeOut.get.unit))
|
||||
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
|
||||
}
|
||||
else { //BOUNDED: Blocking enqueue and optionally blocking dequeue
|
||||
new LinkedBlockingQueue[MessageInvocation](bounds) with MessageQueue {
|
||||
def enqueue(handle: MessageInvocation): Unit = this put handle
|
||||
def dequeue(): MessageInvocation = {
|
||||
if(blockDequeue) this.take()
|
||||
else this.poll()
|
||||
}
|
||||
}
|
||||
else {
|
||||
this put handle
|
||||
}
|
||||
}
|
||||
|
||||
final def dequeue(): MessageInvocation =
|
||||
if (blockDequeue) this.take()
|
||||
else this.poll()
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -139,7 +134,7 @@ trait MessageDispatcher extends Logging {
|
|||
}
|
||||
def unregister(actorRef: ActorRef) = {
|
||||
uuids remove actorRef.uuid
|
||||
//actorRef.mailbox = null //FIXME should we null out the mailbox here?
|
||||
actorRef.mailbox = null
|
||||
if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero
|
||||
}
|
||||
|
||||
|
|
@ -156,14 +151,4 @@ trait MessageDispatcher extends Logging {
|
|||
* Creates and returns a mailbox for the given actor
|
||||
*/
|
||||
protected def createMailbox(actorRef: ActorRef): AnyRef = null
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait MessageDemultiplexer {
|
||||
def select
|
||||
def wakeUp
|
||||
def acquireSelectedInvocations: List[MessageInvocation]
|
||||
def releaseSelectedInvocations
|
||||
}
|
||||
}
|
||||
|
|
@ -1,141 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.dispatch
|
||||
|
||||
import concurrent.forkjoin.LinkedTransferQueue
|
||||
import java.util.concurrent.{TimeUnit, Semaphore}
|
||||
import java.util.Iterator
|
||||
import se.scalablesolutions.akka.util.Logger
|
||||
|
||||
class BoundedTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransferQueue[E] {
|
||||
require(capacity > 0)
|
||||
|
||||
protected val guard = new Semaphore(capacity)
|
||||
|
||||
override def take(): E = {
|
||||
val e = super.take
|
||||
if (e ne null) guard.release
|
||||
e
|
||||
}
|
||||
|
||||
override def poll(): E = {
|
||||
val e = super.poll
|
||||
if (e ne null) guard.release
|
||||
e
|
||||
}
|
||||
|
||||
override def poll(timeout: Long, unit: TimeUnit): E = {
|
||||
val e = super.poll(timeout,unit)
|
||||
if (e ne null) guard.release
|
||||
e
|
||||
}
|
||||
|
||||
override def remainingCapacity = guard.availablePermits
|
||||
|
||||
override def remove(o: AnyRef): Boolean = {
|
||||
if (super.remove(o)) {
|
||||
guard.release
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
override def offer(e: E): Boolean = {
|
||||
if (guard.tryAcquire) {
|
||||
val result = try {
|
||||
super.offer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else
|
||||
false
|
||||
}
|
||||
|
||||
override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
|
||||
if (guard.tryAcquire(timeout,unit)) {
|
||||
val result = try {
|
||||
super.offer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else
|
||||
false
|
||||
}
|
||||
|
||||
override def add(e: E): Boolean = {
|
||||
if (guard.tryAcquire) {
|
||||
val result = try {
|
||||
super.add(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else
|
||||
false
|
||||
}
|
||||
|
||||
override def put(e :E): Unit = {
|
||||
guard.acquire
|
||||
try {
|
||||
super.put(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
}
|
||||
|
||||
override def tryTransfer(e: E): Boolean = {
|
||||
if (guard.tryAcquire) {
|
||||
val result = try {
|
||||
super.tryTransfer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else
|
||||
false
|
||||
}
|
||||
|
||||
override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
|
||||
if (guard.tryAcquire(timeout,unit)) {
|
||||
val result = try {
|
||||
super.tryTransfer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
if (!result) guard.release
|
||||
result
|
||||
} else
|
||||
false
|
||||
}
|
||||
|
||||
override def transfer(e: E): Unit = {
|
||||
if (guard.tryAcquire) {
|
||||
try {
|
||||
super.transfer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def iterator: Iterator[E] = {
|
||||
val it = super.iterator
|
||||
new Iterator[E] {
|
||||
def hasNext = it.hasNext
|
||||
def next = it.next
|
||||
def remove {
|
||||
it.remove
|
||||
guard.release //Assume remove worked if no exception was thrown
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -40,6 +40,23 @@ trait ListenerManagement extends Logging {
|
|||
if (manageLifeCycleOfListeners) listener.stop
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns whether there are any listeners currently
|
||||
*/
|
||||
def hasListeners: Boolean = !listeners.isEmpty
|
||||
|
||||
protected def notifyListeners(message: => Any) {
|
||||
if (hasListeners) {
|
||||
val msg = message
|
||||
val iterator = listeners.iterator
|
||||
while (iterator.hasNext) {
|
||||
val listener = iterator.next
|
||||
if (listener.isRunning) listener ! msg
|
||||
else log.warning("Can't notify [%s] since it is not running.", listener)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute <code>f</code> with each listener as argument.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
package se.scalablesolutions.akka.util
|
||||
|
||||
import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock}
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
|
|
@ -58,3 +59,56 @@ class ReadWriteGuard {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A very simple lock that uses CCAS (Compare Compare-And-Swap)
|
||||
* Does not keep track of the owner and isn't Reentrant, so don't nest and try to stick to the if*-methods
|
||||
*/
|
||||
class SimpleLock {
|
||||
val acquired = new AtomicBoolean(false)
|
||||
|
||||
def ifPossible(perform: () => Unit): Boolean = {
|
||||
if (tryLock()) {
|
||||
try {
|
||||
perform
|
||||
} finally {
|
||||
unlock()
|
||||
}
|
||||
true
|
||||
} else false
|
||||
}
|
||||
|
||||
def ifPossibleYield[T](perform: () => T): Option[T] = {
|
||||
if (tryLock()) {
|
||||
try {
|
||||
Some(perform())
|
||||
} finally {
|
||||
unlock()
|
||||
}
|
||||
} else None
|
||||
}
|
||||
|
||||
def ifPossibleApply[T,R](value: T)(function: (T) => R): Option[R] = {
|
||||
if (tryLock()) {
|
||||
try {
|
||||
Some(function(value))
|
||||
} finally {
|
||||
unlock()
|
||||
}
|
||||
} else None
|
||||
}
|
||||
|
||||
def tryLock() = {
|
||||
if (acquired.get) false
|
||||
else acquired.compareAndSet(false,true)
|
||||
}
|
||||
|
||||
def tryUnlock() = {
|
||||
acquired.compareAndSet(true,false)
|
||||
}
|
||||
|
||||
def locked = acquired.get
|
||||
|
||||
def unlock() {
|
||||
acquired.set(false)
|
||||
}
|
||||
}
|
||||
|
|
@ -5,11 +5,10 @@ import org.scalatest.junit.JUnitSuite
|
|||
|
||||
import org.junit.Test
|
||||
|
||||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
|
||||
import java.util.concurrent.{TimeUnit, CountDownLatch}
|
||||
import se.scalablesolutions.akka.actor.{IllegalActorStateException, Actor}
|
||||
import Actor._
|
||||
import se.scalablesolutions.akka.dispatch.{MessageQueue, Dispatchers}
|
||||
|
||||
object ExecutorBasedEventDrivenWorkStealingDispatcherSpec {
|
||||
val delayableActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")
|
||||
|
|
@ -18,7 +17,7 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec {
|
|||
|
||||
class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor {
|
||||
self.dispatcher = delayableActorDispatcher
|
||||
var invocationCount = 0
|
||||
@volatile var invocationCount = 0
|
||||
self.id = name
|
||||
|
||||
def receive = {
|
||||
|
|
@ -61,10 +60,14 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with
|
|||
val slow = actorOf(new DelayableActor("slow", 50, finishedCounter)).start
|
||||
val fast = actorOf(new DelayableActor("fast", 10, finishedCounter)).start
|
||||
|
||||
var sentToFast = 0
|
||||
|
||||
for (i <- 1 to 100) {
|
||||
// send most work to slow actor
|
||||
if (i % 20 == 0)
|
||||
if (i % 20 == 0) {
|
||||
fast ! i
|
||||
sentToFast += 1
|
||||
}
|
||||
else
|
||||
slow ! i
|
||||
}
|
||||
|
|
@ -72,13 +75,18 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with
|
|||
// now send some messages to actors to keep the dispatcher dispatching messages
|
||||
for (i <- 1 to 10) {
|
||||
Thread.sleep(150)
|
||||
if (i % 2 == 0)
|
||||
if (i % 2 == 0) {
|
||||
fast ! i
|
||||
sentToFast += 1
|
||||
}
|
||||
else
|
||||
slow ! i
|
||||
}
|
||||
|
||||
finishedCounter.await(5, TimeUnit.SECONDS)
|
||||
fast.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true)
|
||||
slow.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true)
|
||||
fast.actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast
|
||||
fast.actor.asInstanceOf[DelayableActor].invocationCount must be >
|
||||
(slow.actor.asInstanceOf[DelayableActor].invocationCount)
|
||||
slow.stop
|
||||
|
|
|
|||
|
|
@ -220,10 +220,10 @@ class RemoteClient private[akka] (
|
|||
val channel = connection.awaitUninterruptibly.getChannel
|
||||
openChannels.add(channel)
|
||||
if (!connection.isSuccess) {
|
||||
foreachListener(_ ! RemoteClientError(connection.getCause, this))
|
||||
notifyListeners(RemoteClientError(connection.getCause, this))
|
||||
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
|
||||
}
|
||||
foreachListener(_ ! RemoteClientStarted(this))
|
||||
notifyListeners(RemoteClientStarted(this))
|
||||
isRunning = true
|
||||
}
|
||||
}
|
||||
|
|
@ -232,7 +232,7 @@ class RemoteClient private[akka] (
|
|||
log.info("Shutting down %s", name)
|
||||
if (isRunning) {
|
||||
isRunning = false
|
||||
foreachListener(_ ! RemoteClientShutdown(this))
|
||||
notifyListeners(RemoteClientShutdown(this))
|
||||
timer.stop
|
||||
timer = null
|
||||
openChannels.close.awaitUninterruptibly
|
||||
|
|
@ -250,7 +250,7 @@ class RemoteClient private[akka] (
|
|||
@deprecated("Use removeListener instead")
|
||||
def deregisterListener(actorRef: ActorRef) = removeListener(actorRef)
|
||||
|
||||
override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f)
|
||||
override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
|
||||
|
||||
protected override def manageLifeCycleOfListeners = false
|
||||
|
||||
|
|
@ -287,7 +287,7 @@ class RemoteClient private[akka] (
|
|||
} else {
|
||||
val exception = new RemoteClientException(
|
||||
"Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this)
|
||||
foreachListener(l => l ! RemoteClientError(exception, this))
|
||||
notifyListeners(RemoteClientError(exception, this))
|
||||
throw exception
|
||||
}
|
||||
|
||||
|
|
@ -403,12 +403,12 @@ class RemoteClientHandler(
|
|||
futures.remove(reply.getId)
|
||||
} else {
|
||||
val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client)
|
||||
client.foreachListener(_ ! RemoteClientError(exception, client))
|
||||
client.notifyListeners(RemoteClientError(exception, client))
|
||||
throw exception
|
||||
}
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
client.foreachListener(_ ! RemoteClientError(e, client))
|
||||
client.notifyListeners(RemoteClientError(e, client))
|
||||
log.error("Unexpected exception in remote client handler: %s", e)
|
||||
throw e
|
||||
}
|
||||
|
|
@ -423,7 +423,7 @@ class RemoteClientHandler(
|
|||
client.connection = bootstrap.connect(remoteAddress)
|
||||
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
|
||||
if (!client.connection.isSuccess) {
|
||||
client.foreachListener(_ ! RemoteClientError(client.connection.getCause, client))
|
||||
client.notifyListeners(RemoteClientError(client.connection.getCause, client))
|
||||
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
|
||||
}
|
||||
}
|
||||
|
|
@ -433,7 +433,7 @@ class RemoteClientHandler(
|
|||
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
def connect = {
|
||||
client.foreachListener(_ ! RemoteClientConnected(client))
|
||||
client.notifyListeners(RemoteClientConnected(client))
|
||||
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
|
||||
client.resetReconnectionTimeWindow
|
||||
}
|
||||
|
|
@ -450,12 +450,12 @@ class RemoteClientHandler(
|
|||
}
|
||||
|
||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
client.foreachListener(_ ! RemoteClientDisconnected(client))
|
||||
client.notifyListeners(RemoteClientDisconnected(client))
|
||||
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
|
||||
}
|
||||
|
||||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
client.foreachListener(_ ! RemoteClientError(event.getCause, client))
|
||||
client.notifyListeners(RemoteClientError(event.getCause, client))
|
||||
log.error(event.getCause, "Unexpected exception from downstream in remote client")
|
||||
event.getChannel.close
|
||||
}
|
||||
|
|
|
|||
|
|
@ -245,12 +245,12 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
|
||||
_isRunning = true
|
||||
Cluster.registerLocalNode(hostname, port)
|
||||
foreachListener(_ ! RemoteServerStarted(this))
|
||||
notifyListeners(RemoteServerStarted(this))
|
||||
}
|
||||
} catch {
|
||||
case e =>
|
||||
log.error(e, "Could not start up remote server")
|
||||
foreachListener(_ ! RemoteServerError(e, this))
|
||||
notifyListeners(RemoteServerError(e, this))
|
||||
}
|
||||
this
|
||||
}
|
||||
|
|
@ -263,7 +263,7 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
openChannels.close.awaitUninterruptibly
|
||||
bootstrap.releaseExternalResources
|
||||
Cluster.deregisterLocalNode(hostname, port)
|
||||
foreachListener(_ ! RemoteServerShutdown(this))
|
||||
notifyListeners(RemoteServerShutdown(this))
|
||||
} catch {
|
||||
case e: java.nio.channels.ClosedChannelException => {}
|
||||
case e => log.warning("Could not close remote server channel in a graceful way")
|
||||
|
|
@ -323,7 +323,7 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
|
||||
protected override def manageLifeCycleOfListeners = false
|
||||
|
||||
protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f)
|
||||
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
|
||||
|
||||
private[akka] def actors() : ConcurrentHashMap[String, ActorRef] = {
|
||||
RemoteServer.actorsFor(address).actors
|
||||
|
|
@ -413,18 +413,18 @@ class RemoteServerHandler(
|
|||
def operationComplete(future: ChannelFuture): Unit = {
|
||||
if (future.isSuccess) {
|
||||
openChannels.add(future.getChannel)
|
||||
server.foreachListener(_ ! RemoteServerClientConnected(server))
|
||||
server.notifyListeners(RemoteServerClientConnected(server))
|
||||
} else future.getChannel.close
|
||||
}
|
||||
})
|
||||
} else {
|
||||
server.foreachListener(_ ! RemoteServerClientConnected(server))
|
||||
server.notifyListeners(RemoteServerClientConnected(server))
|
||||
}
|
||||
}
|
||||
|
||||
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
log.debug("Remote client disconnected from [%s]", server.name)
|
||||
server.foreachListener(_ ! RemoteServerClientDisconnected(server))
|
||||
server.notifyListeners(RemoteServerClientDisconnected(server))
|
||||
}
|
||||
|
||||
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
||||
|
|
@ -446,7 +446,7 @@ class RemoteServerHandler(
|
|||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
log.error(event.getCause, "Unexpected exception from remote downstream")
|
||||
event.getChannel.close
|
||||
server.foreachListener(_ ! RemoteServerError(event.getCause, server))
|
||||
server.notifyListeners(RemoteServerError(event.getCause, server))
|
||||
}
|
||||
|
||||
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
|
||||
|
|
@ -491,7 +491,7 @@ class RemoteServerHandler(
|
|||
} catch {
|
||||
case e: Throwable =>
|
||||
channel.write(createErrorReplyMessage(e, request, true))
|
||||
server.foreachListener(_ ! RemoteServerError(e, server))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -523,10 +523,10 @@ class RemoteServerHandler(
|
|||
} catch {
|
||||
case e: InvocationTargetException =>
|
||||
channel.write(createErrorReplyMessage(e.getCause, request, false))
|
||||
server.foreachListener(_ ! RemoteServerError(e, server))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
case e: Throwable =>
|
||||
channel.write(createErrorReplyMessage(e, request, false))
|
||||
server.foreachListener(_ ! RemoteServerError(e, server))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -559,7 +559,7 @@ class RemoteServerHandler(
|
|||
} catch {
|
||||
case e =>
|
||||
log.error(e, "Could not create remote actor instance")
|
||||
server.foreachListener(_ ! RemoteServerError(e, server))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
throw e
|
||||
}
|
||||
} else actorRefOrNull
|
||||
|
|
@ -590,7 +590,7 @@ class RemoteServerHandler(
|
|||
} catch {
|
||||
case e =>
|
||||
log.error(e, "Could not create remote typed actor instance")
|
||||
server.foreachListener(_ ! RemoteServerError(e, server))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
throw e
|
||||
}
|
||||
} else typedActorOrNull
|
||||
|
|
|
|||
|
|
@ -674,7 +674,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement {
|
|||
|
||||
def register(proxy: AnyRef, init: AspectInit) = {
|
||||
val res = initializations.put(proxy, init)
|
||||
foreachListener(_ ! AspectInitRegistered(proxy, init))
|
||||
notifyListeners(AspectInitRegistered(proxy, init))
|
||||
res
|
||||
}
|
||||
|
||||
|
|
@ -683,7 +683,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement {
|
|||
*/
|
||||
def unregister(proxy: AnyRef): AspectInit = {
|
||||
val init = initializations.remove(proxy)
|
||||
foreachListener(_ ! AspectInitUnregistered(proxy, init))
|
||||
notifyListeners(AspectInitUnregistered(proxy, init))
|
||||
init.actorRef.stop
|
||||
init
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue