fixed TX Vector and TX Ref plus added tests + rewrote Reactor impl + added custom Actor impl(currently not used though)

This commit is contained in:
Jonas Boner 2009-06-05 22:08:53 +02:00
parent 74bd8dea6d
commit 167b724671
15 changed files with 1148 additions and 274 deletions

View file

@ -102,19 +102,31 @@ sealed class TransactionalAroundAdvice(target: Class[_],
def invoke(joinpoint: JoinPoint): AnyRef = {
val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti]
val method = rtti.getMethod
if (method.isAnnotationPresent(Annotations.transactional)) {
tryToCommitTransaction
startNewTransaction
}
joinExistingTransaction
val result: AnyRef = if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) sendOneWay(joinpoint)
else handleResult(sendAndReceiveEventually(joinpoint))
tryToPrecommitTransaction
incrementTransaction
val result: AnyRef = try {
if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) sendOneWay(joinpoint)
else handleResult(sendAndReceiveEventually(joinpoint))
} finally {
decrementTransaction
tryToPrecommitTransaction
removeTransactionIfTopLevel
}
result
}
private def incrementTransaction = if (activeTx.isDefined) activeTx.get.increment
private def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement
private def removeTransactionIfTopLevel =
if (activeTx.isDefined && activeTx.get.topLevel_?) {
activeTx = None
threadBoundTx.set(None)
}
private def startNewTransaction = {
val newTx = new Transaction
newTx.begin(server)
@ -131,10 +143,7 @@ sealed class TransactionalAroundAdvice(target: Class[_],
activeTx = threadBoundTx.get
}
private def tryToPrecommitTransaction = {
// FIXME: clear threadBoundTx on successful commit
if (activeTx.isDefined) activeTx.get.precommit(server)
}
private def tryToPrecommitTransaction = if (activeTx.isDefined) activeTx.get.precommit(server)
private def tryToCommitTransaction = if (activeTx.isDefined) {
val tx = activeTx.get

View file

@ -0,0 +1,202 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.actor
import java.util.concurrent.{ConcurrentSkipListSet, TimeUnit}
import kernel.reactor._
sealed abstract class LifecycleMessage
//case class Init(config: AnyRef) extends LifecycleMessage
//case class Shutdown(reason: AnyRef) extends LifecycleMessage
case class Stop(reason: AnyRef) extends LifecycleMessage
//case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifecycleMessage
case class Exit(dead: Actor, killer: Throwable) extends LifecycleMessage
case object Restart extends LifecycleMessage
sealed abstract class DispatcherType
case object EventBased extends DispatcherType
case object ThreadBased extends DispatcherType
class ActorMessageHandler(val actor: Actor) extends MessageHandler {
def handle(handle: MessageHandle) = actor.handle(handle.message, handle.future)
}
trait Actor {
private[this] val linkedActors = new ConcurrentSkipListSet[Actor]
private[this] var mailbox: MessageQueue = _
private[this] var senderFuture: Option[CompletableFutureResult] = None
@volatile private var isRunning: Boolean = false
private var hotswap: Option[PartialFunction[Any, Unit]] = None
private var config: Option[AnyRef] = None
// ====================================
// ==== USER CALLBACKS TO OVERRIDE ====
// ====================================
/**
* Set dispatcher type to either EventBased or ThreadBased.
* Default is EventBased.
*/
protected[this] var dispatcherType: DispatcherType = EventBased
/**
* Set trapExit to true if actor should be able to trap linked actors exit messages.
*/
@volatile protected[this] var trapExit: Boolean = false
/**
* Partial function implementing the server logic.
* To be implemented by subclassing server.
* <p/>
* Example code:
* <pre>
* def receive: PartialFunction[Any, Unit] = {
* case Ping =>
* println("got a ping")
* reply("pong")
*
* case OneWay =>
* println("got a oneway")
*
* case _ =>
* println("unknown message, ignoring")
* }
* </pre>
*/
protected def receive: PartialFunction[Any, Unit]
/**
* Mandatory callback method that is called during restart and reinitialization after a server crash.
* To be implemented by subclassing actor.
*/
protected def restart(config: Option[AnyRef])
/**
* Optional callback method that is called during initialization.
* To be implemented by subclassing actor.
*/
protected def init(config: AnyRef) {}
/**
* Optional callback method that is called during termination.
* To be implemented by subclassing actor.
*/
protected def shutdown(reason: AnyRef) {}
// =============
// ==== API ====
// =============
def !(message: AnyRef) =
if (isRunning) mailbox.append(new MessageHandle(this, message, new NullFutureResult))
else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
def !![T](message: AnyRef)(implicit timeout: Long): Option[T] = if (isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
future.await_?
getResultOrThrowException(future)
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
def !?[T](message: AnyRef): Option[T] = if (isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0)
future.await_!
getResultOrThrowException(future)
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
def link(actor: Actor) =
if (isRunning) linkedActors.add(actor)
else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
def unlink(actor: Actor) =
if (isRunning) linkedActors.remove(actor)
else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
def start = synchronized {
if (!isRunning) {
dispatcherType match {
case EventBased =>
mailbox = EventBasedDispatcher.messageQueue
EventBasedDispatcher.registerHandler(this, new ActorMessageHandler(this))
case ThreadBased =>
mailbox = ThreadBasedDispatcher.messageQueue
ThreadBasedDispatcher.registerHandler(this, new ActorMessageHandler(this))
}
isRunning = true
}
}
def stop =
if (isRunning) {
this ! Stop("Actor gracefully stopped")
dispatcherType match {
case EventBased => EventBasedDispatcher.unregisterHandler(this)
case ThreadBased => ThreadBasedDispatcher.unregisterHandler(this)
}
isRunning = false
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
protected def reply(message: AnyRef) = senderFuture match {
case None => throw new IllegalStateException("No sender future in scope, can't reply")
case Some(future) => future.completeWithResult(message)
}
// ================================
// ==== IMPLEMENTATION DETAILS ====
// ================================
private def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: AnyRef, timeout: Long): CompletableFutureResult = {
val future = new DefaultCompletableFutureResult(timeout)
mailbox.append(new MessageHandle(this, message, future))
future
}
private def getResultOrThrowException[T](future: FutureResult): Option[T] =
if (future.exception.isDefined) throw future.exception.get
else future.result.asInstanceOf[Option[T]]
private[kernel] def handle(message: AnyRef, future: CompletableFutureResult) = {
try {
senderFuture = Some(future)
if (base.isDefinedAt(message)) base(message)
else throw new IllegalArgumentException("No handler matching message [" + message + "] in actor [" + this.getClass.getName + "]")
} catch {
case e =>
future.completeWithException(e)
handleFailure(this, e)
}
/*
try {
val result = message.asInstanceOf[Invocation].joinpoint.proceed
future.completeWithResult(result)
} catch {
case e: Exception => future.completeWithException(e)
}
*/
}
private def base: PartialFunction[Any, Unit] = lifeCycle orElse (hotswap getOrElse receive)
private val lifeCycle: PartialFunction[Any, Unit] = {
case Init(config) => init(config)
case HotSwap(code) => hotswap = code
case Restart => restart(config)
case Stop(reason) => shutdown(reason); exit
case Exit(dead, reason) => handleFailure(dead, reason)
}
private[this] def handleFailure(dead: Actor, e: Throwable) = {
if (trapExit) {
restartLinkedActors
scheduleRestart
} else linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_ ! Exit(this, e))
}
private[this] def restartLinkedActors = linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.scheduleRestart)
private[Actor] def scheduleRestart = mailbox.prepend(new MessageHandle(this, Restart, new NullFutureResult))
}

View file

@ -4,8 +4,9 @@
package se.scalablesolutions.akka.kernel
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import scala.collection.mutable.{HashSet, HashMap}
sealed abstract class TransactionStatus
object TransactionStatus {
case object New extends TransactionStatus
@ -39,8 +40,13 @@ class Transaction extends Logging {
private[this] var parent: Option[Transaction] = None
private[this] val participants = new HashSet[GenericServerContainer]
private[this] val precommitted = new HashSet[GenericServerContainer]
private[this] val depth = new AtomicInteger(0)
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
def increment = depth.incrementAndGet
def decrement = depth.decrementAndGet
def topLevel_? = depth.get == 0
def begin(server: GenericServerContainer) = synchronized {
ensureIsActiveOrNew
if (status == TransactionStatus.New) log.info("Server [%s] is starting NEW transaction [%s]", server.id, this)

View file

@ -0,0 +1,53 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
/**
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
* See also this article: [http://today.java.net/cs/user/print/a/350].
*
* Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
*/
package se.scalablesolutions.akka.kernel.reactor
object EventBasedDispatcher extends MessageDispatcherBase {
start
//def dispatch(messageQueue: MessageQueue) = if (!active) {
def start = if (!active) {
active = true
val messageDemultiplexer = new EventBasedDemultiplexer(messageQueue)
selectorThread = new Thread {
override def run = {
while (active) {
guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
try {
messageDemultiplexer.select
} catch { case e: InterruptedException => active = false }
val queue = messageDemultiplexer.acquireSelectedQueue
for (index <- 0 until queue.size) {
val handle = queue.remove
val handler = messageHandlers.get(handle.sender)
if (handler != null) handler.handle(handle)
}
}
}
}
selectorThread.start
}
}
class EventBasedDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer {
import java.util.{LinkedList, Queue}
private val selectedQueue: Queue[MessageHandle] = new LinkedList[MessageHandle]
def select = messageQueue.read(selectedQueue)
def acquireSelectedQueue: Queue[MessageHandle] = selectedQueue
def releaseSelectedQueue = throw new UnsupportedOperationException("EventBasedDemultiplexer can't release its queue")
def wakeUp = throw new UnsupportedOperationException("EventBasedDemultiplexer can't be woken up")
}

View file

@ -1,67 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
/**
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
* See also this article: [http://today.java.net/cs/user/print/a/350].
*
* Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
*/
package se.scalablesolutions.akka.kernel.reactor
import java.util.concurrent.{ConcurrentMap, ConcurrentHashMap}
import java.util.{LinkedList, Queue}
class EventDrivenDispatcher extends MessageDispatcher {
private val handlers = new ConcurrentHashMap[AnyRef, MessageHandler]
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
private val guard = new Object
def registerHandler(key: AnyRef, handler: MessageHandler) = handlers.put(key, handler)
def unregisterHandler(key: AnyRef) = handlers.remove(key)
def dispatch(messageQueue: MessageQueue) = if (!active) {
active = true
val messageDemultiplexer = new EventDrivenDemultiplexer(messageQueue)
selectorThread = new Thread {
override def run = {
while (active) {
guard.synchronized { /* empty */ }
messageDemultiplexer.select
val handles = messageDemultiplexer.acquireSelectedQueue
val handlesList = handles.toArray.toList.asInstanceOf[List[MessageHandle]]
for (index <- 0 to handles.size) {
val handle = handles.remove
val handler = handlers.get(handle.key)
if (handler != null) handler.handle(handle)
}
}
}
}
selectorThread.start
}
def shutdown = if (active) {
active = false
selectorThread.interrupt
}
}
class EventDrivenDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer {
private val selectedQueue: Queue[MessageHandle] = new LinkedList[MessageHandle]
def select = messageQueue.read(selectedQueue)
def acquireSelectedQueue: Queue[MessageHandle] = selectedQueue
def releaseSelectedQueue = {
throw new UnsupportedOperationException
}
def wakeUp = {
throw new UnsupportedOperationException
}
}

View file

@ -11,43 +11,57 @@ import java.util.concurrent.locks.{Lock, Condition, ReentrantLock}
import java.util.concurrent.TimeUnit
sealed trait FutureResult {
def await
def await_?
def await_!
def isCompleted: Boolean
def isExpired: Boolean
def timeoutInNanos: Long
def result: AnyRef
def exception: Exception
def result: Option[AnyRef]
def exception: Option[Throwable]
}
trait CompletableFutureResult extends FutureResult {
def completeWithResult(result: AnyRef)
def completeWithException(exception: Exception)
def completeWithException(exception: Throwable)
}
class GenericFutureResult(val timeoutInNanos: Long) extends CompletableFutureResult {
class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureResult {
private val TIME_UNIT = TimeUnit.MILLISECONDS
def this() = this(0)
val timeoutInNanos = TIME_UNIT.toNanos(timeout)
private val _startTimeInNanos = currentTimeInNanos
private val _lock = new ReentrantLock
private val _signal = _lock.newCondition
private var _completed: Boolean = _
private var _result: AnyRef = _
private var _exception: Exception = _
private var _result: Option[AnyRef] = None
private var _exception: Option[Throwable] = None
override def await = try {
override def await_? = try {
_lock.lock
var wait = timeoutInNanos - currentTimeInNanos - _startTimeInNanos
var wait = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
while (!_completed && wait > 0) {
var start = currentTimeInNanos
try {
wait = _signal.awaitNanos(wait)
} catch {
case e: InterruptedException =>
wait = wait - currentTimeInNanos - start
wait = wait - (currentTimeInNanos - start)
}
}
} finally {
_lock.unlock
}
override def await_! = try {
_lock.lock
while (!_completed) {
_signal.await
}
} finally {
_lock.unlock
}
override def isCompleted: Boolean = try {
_lock.lock
_completed
@ -57,19 +71,19 @@ class GenericFutureResult(val timeoutInNanos: Long) extends CompletableFutureRes
override def isExpired: Boolean = try {
_lock.lock
timeoutInNanos - currentTimeInNanos - _startTimeInNanos <= 0
timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) <= 0
} finally {
_lock.unlock
}
override def result: AnyRef = try {
override def result: Option[AnyRef] = try {
_lock.lock
_result
} finally {
_lock.unlock
}
override def exception: Exception = try {
override def exception: Option[Throwable] = try {
_lock.lock
_exception
} finally {
@ -80,19 +94,18 @@ class GenericFutureResult(val timeoutInNanos: Long) extends CompletableFutureRes
_lock.lock
if (!_completed) {
_completed = true
_result = result
_result = Some(result)
}
} finally {
_signal.signalAll
_lock.unlock
}
override def completeWithException(exception: Exception) = try {
override def completeWithException(exception: Throwable) = try {
_lock.lock
if (!_completed) {
_completed = true
_exception = exception
_exception = Some(exception)
}
} finally {
@ -100,16 +113,17 @@ class GenericFutureResult(val timeoutInNanos: Long) extends CompletableFutureRes
_lock.unlock
}
private def currentTimeInNanos: Long = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis)
private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis)
}
class NullFutureResult extends CompletableFutureResult {
override def completeWithResult(result: AnyRef) = {}
override def completeWithException(exception: Exception) = {}
override def await: Unit = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
override def completeWithException(exception: Throwable) = {}
override def await_? = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
override def await_! = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
override def isCompleted: Boolean = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
override def isExpired: Boolean = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
override def timeoutInNanos: Long = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
override def result: AnyRef = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
override def exception: Exception = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
override def result: Option[AnyRef] = None
override def exception: Option[Throwable] = None
}

View file

@ -0,0 +1,35 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.reactor
import java.util.concurrent.{ConcurrentMap, ConcurrentHashMap}
trait MessageDispatcherBase extends MessageDispatcher {
val messageQueue = new MessageQueue
protected val messageHandlers = new ConcurrentHashMap[AnyRef, MessageHandler]
protected var selectorThread: Thread = _
@volatile protected var active: Boolean = false
protected val guard = new Object
def registerHandler(key: AnyRef, handler: MessageHandler) = guard.synchronized {
messageHandlers.put(key, handler)
}
def unregisterHandler(key: AnyRef) = guard.synchronized {
messageHandlers.remove(key)
}
def shutdown = if (active) {
active = false
selectorThread.interrupt
doShutdown
}
/**
* Subclass callback. Override if additional shutdown behavior is needed.
*/
protected def doShutdown = {}
}

View file

@ -12,10 +12,14 @@ package se.scalablesolutions.akka.kernel.reactor
import java.util.{LinkedList, Queue}
trait MessageHandler {
def handle(message: MessageHandle)
}
trait MessageDispatcher {
def registerHandler(key: AnyRef, handler: MessageHandler)
def unregisterHandler(key: AnyRef)
def dispatch(messageQueue: MessageQueue)
def start
def shutdown
}
@ -26,11 +30,11 @@ trait MessageDemultiplexer {
def wakeUp
}
class MessageHandle(val key: AnyRef, val message: AnyRef, val future: CompletableFutureResult) {
class MessageHandle(val sender: AnyRef, val message: AnyRef, val future: CompletableFutureResult) {
override def hashCode(): Int = {
var result = HashCode.SEED
result = HashCode.hash(result, key)
result = HashCode.hash(result, sender)
result = HashCode.hash(result, message)
result = HashCode.hash(result, future)
result
@ -39,40 +43,33 @@ class MessageHandle(val key: AnyRef, val message: AnyRef, val future: Completabl
override def equals(that: Any): Boolean =
that != null &&
that.isInstanceOf[MessageHandle] &&
that.asInstanceOf[MessageHandle].key == key &&
that.asInstanceOf[MessageHandle].sender == sender &&
that.asInstanceOf[MessageHandle].message == message &&
that.asInstanceOf[MessageHandle].future == future
}
trait MessageHandler {
def handle(message: MessageHandle)
}
class MessageQueue {
private val handles: Queue[MessageHandle] = new LinkedList[MessageHandle]
private val queue: Queue[MessageHandle] = new LinkedList[MessageHandle]
@volatile private var interrupted = false
def put(handle: MessageHandle) = handles.synchronized {
handles.offer(handle)
handles.notifyAll
def append(handle: MessageHandle) = queue.synchronized {
queue.offer(handle)
queue.notifyAll
}
def read(destination: Queue[MessageHandle]) = handles.synchronized {
while (handles.isEmpty && !interrupted) {
handles.wait
}
if (!interrupted) {
while (!handles.isEmpty) {
destination.offer(handles.remove)
}
} else {
interrupted = false
}
def prepend(handle: MessageHandle) = queue.synchronized {
queue.add(handle)
queue.notifyAll
}
def read(destination: Queue[MessageHandle]) = queue.synchronized {
while (queue.isEmpty && !interrupted) queue.wait
if (!interrupted) while (!queue.isEmpty) destination.offer(queue.remove)
else interrupted = false
}
def interrupt = handles.synchronized {
def interrupt = queue.synchronized {
interrupted = true
handles.notifyAll
queue.notifyAll
}
}

View file

@ -10,65 +10,59 @@
*/
package se.scalablesolutions.akka.kernel.reactor
import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.concurrent.locks.ReentrantLock
import java.util.{HashSet, LinkedList, Queue}
class ThreadBasedDispatcher(val threadPoolSize: Int) extends MessageDispatcher {
private val handlers = new ConcurrentHashMap[AnyRef, MessageHandler]
object ThreadBasedDispatcher extends MessageDispatcherBase {
import java.util.concurrent.Executors
import java.util.HashSet
// FIXME: make configurable using configgy + JMX
// FIXME: create one executor per invocation to dispatch(..), grab config settings for specific actor (set in registerHandler)
private val threadPoolSize: Int = 10
private val busyHandlers = new HashSet[AnyRef]
private val handlerExecutor = Executors.newFixedThreadPool(threadPoolSize)
@volatile private var selectorThread: Thread = null
@volatile private var active: Boolean = false
def registerHandler(key: AnyRef, handler: MessageHandler) = handlers.put(key, handler)
def unregisterHandler(key: AnyRef) = handlers.remove(key)
def dispatch(messageQueue: MessageQueue) = {
if (!active) {
active = true
val messageDemultiplexer = new ThreadBasedDemultiplexer(messageQueue)
selectorThread = new Thread {
override def run = {
while (active) {
start
def start = if (!active) {
active = true
val messageDemultiplexer = new ThreadBasedDemultiplexer(messageQueue)
selectorThread = new Thread {
override def run = {
while (active) {
try {
guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
try {
messageDemultiplexer.select
val handles = messageDemultiplexer.acquireSelectedQueue
for (index <- 0 to handles.size) {
val handle = handles.peek
val handler = checkIfNotBusyThenGet(handle.key)
if (handler.isDefined) {
handlerExecutor.execute(new Runnable {
override def run = {
handler.get.handle(handle)
free(handle.key)
messageDemultiplexer.wakeUp
}
})
handles.remove
}
} catch {case e: InterruptedException => active = false}
val queue = messageDemultiplexer.acquireSelectedQueue
for (index <- 0 until queue.size) {
val message = queue.peek
val messageHandler = getIfNotBusy(message.sender)
if (messageHandler.isDefined) {
handlerExecutor.execute(new Runnable {
override def run = {
messageHandler.get.handle(message)
free(message.sender)
messageDemultiplexer.wakeUp
}
})
queue.remove
}
} finally {
messageDemultiplexer.releaseSelectedQueue
}
} finally {
messageDemultiplexer.releaseSelectedQueue
}
}
};
selectorThread.start();
}
}
};
selectorThread.start
}
def shutdown = if (active) {
active = false
selectorThread.interrupt
handlerExecutor.shutdownNow
}
override protected def doShutdown = handlerExecutor.shutdownNow
private def checkIfNotBusyThenGet(key: AnyRef): Option[MessageHandler] = synchronized {
if (!busyHandlers.contains(key) && handlers.containsKey(key)) {
private def getIfNotBusy(key: AnyRef): Option[MessageHandler] = synchronized {
if (!busyHandlers.contains(key) && messageHandlers.containsKey(key)) {
busyHandlers.add(key)
Some(handlers.get(key))
Some(messageHandlers.get(key))
} else None
}
@ -76,6 +70,9 @@ class ThreadBasedDispatcher(val threadPoolSize: Int) extends MessageDispatcher {
}
class ThreadBasedDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer {
import java.util.concurrent.locks.ReentrantLock
import java.util.{LinkedList, Queue}
private val selectedQueue: Queue[MessageHandle] = new LinkedList[MessageHandle]
private val selectedQueueLock = new ReentrantLock

View file

@ -0,0 +1,76 @@
package se.scalablesolutions.akka.kernel.actor
import concurrent.Lock
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import reactor._
import org.junit.{Test, Before}
import org.junit.Assert._
class ActorTest {
private val unit = TimeUnit.MILLISECONDS
class TestActor extends Actor {
def receive: PartialFunction[Any, Unit] = {
case "Hello" =>
println("Hello")
reply("World")
case "Failure" =>
throw new RuntimeException("expected")
}
def restart(config: Option[AnyRef]) = {}
}
@Test
def sendOneWay = {
implicit val timeout = 5000L
var oneWay = "nada"
val actor = new Actor {
def receive: PartialFunction[Any, Unit] = {
case "OneWay" => oneWay = "received"
}
def restart(config: Option[AnyRef]) = {}
}
actor.start
val result = actor ! "OneWay"
Thread.sleep(100)
assertEquals("received", oneWay)
//actor.stop
}
@Test
def sendReplySync = {
implicit val timeout = 5000L
val actor = new TestActor
actor.start
val result = actor !? "Hello"
assertEquals("World", result.get.asInstanceOf[String])
//actor.stop
}
@Test
def sendReplyAsync = {
implicit val timeout = 5000L
val actor = new TestActor
actor.start
val result = actor !! "Hello"
assertEquals("World", result.get.asInstanceOf[String])
//actor.stop
}
@Test
def sendReceiveException = {
implicit val timeout = 5000L
val actor = new TestActor
actor.start
try {
actor !! "Failure"
fail("Should have thrown an exception")
} catch {
case e =>
assertEquals("expected", e.getMessage())
}
//actor.stop
}
}

View file

@ -0,0 +1,116 @@
package se.scalablesolutions.akka.kernel.reactor
import java.util.concurrent.BrokenBarrierException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
import org.junit.{Test, Before}
import org.junit.Assert._
class EventBasedDispatcherTest {
private var threadingIssueDetected: AtomicBoolean = null
class TestMessageHandle(handleLatch: CountDownLatch) extends MessageHandler {
val guardLock: Lock = new ReentrantLock
def handle(message: MessageHandle) {
try {
if (threadingIssueDetected.get) return
if (guardLock.tryLock) {
Thread.sleep(100)
handleLatch.countDown
} else {
threadingIssueDetected.set(true)
}
} catch {
case e: Exception => threadingIssueDetected.set(true)
} finally {
guardLock.unlock
}
}
}
@Before
def setUp = {
threadingIssueDetected = new AtomicBoolean(false)
}
@Test
def testMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
}
@Test
def testMessagesDispatchedToDifferentHandlersAreExecutedSequentially = {
internalTestMessagesDispatchedToDifferentHandlersAreExecutedSequentially
}
@Test
def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
}
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(10)
val key = "key"
EventBasedDispatcher.registerHandler(key, new TestMessageHandle(handleLatch))
EventBasedDispatcher.start
for (i <- 0 until 10) {
EventBasedDispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedSequentially: Unit = {
val handleLatch = new CountDownLatch(2)
val key1 = "key1"
val key2 = "key2"
EventBasedDispatcher.registerHandler(key1, new TestMessageHandle(handleLatch))
EventBasedDispatcher.registerHandler(key2, new TestMessageHandle(handleLatch))
EventBasedDispatcher.start
EventBasedDispatcher.messageQueue.append(new MessageHandle(key1, new Object, new NullFutureResult))
EventBasedDispatcher.messageQueue.append(new MessageHandle(key2, new Object, new NullFutureResult))
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
val handleLatch = new CountDownLatch(200)
val key1 = "key1"
val key2 = "key2"
EventBasedDispatcher.registerHandler(key1, new MessageHandler {
var currentValue = -1;
def handle(message: MessageHandle) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
EventBasedDispatcher.registerHandler(key2, new MessageHandler {
var currentValue = -1;
def handle(message: MessageHandle) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
EventBasedDispatcher.start
for (i <- 0 until 100) {
EventBasedDispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), new NullFutureResult))
EventBasedDispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), new NullFutureResult))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
}
}

View file

@ -0,0 +1,124 @@
package se.scalablesolutions.akka.kernel.reactor
import java.util.concurrent.BrokenBarrierException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
import org.junit.Before
import org.junit.Test
import org.junit.Assert._
class ThreadBasedDispatcherTest {
private var threadingIssueDetected: AtomicBoolean = null
@Before
def setUp = {
threadingIssueDetected = new AtomicBoolean(false)
}
@Test
def testMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
}
@Test
def testMessagesDispatchedToDifferentHandlersAreExecutedConcurrently = {
internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently
}
@Test
def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
}
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(10)
val key = "key"
ThreadBasedDispatcher.registerHandler(key, new MessageHandler {
def handle(message: MessageHandle) {
try {
if (threadingIssueDetected.get) return
if (guardLock.tryLock) {
Thread.sleep(100)
handleLatch.countDown
} else {
threadingIssueDetected.set(true)
}
} catch {
case e: Exception => threadingIssueDetected.set(true)
} finally {
guardLock.unlock
}
}
})
ThreadBasedDispatcher.start
for (i <- 0 until 100) {
ThreadBasedDispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = {
val handlersBarrier = new CyclicBarrier(3)
val key1 = "key1"
val key2 = "key2"
ThreadBasedDispatcher.registerHandler(key1, new MessageHandler {
def handle(message: MessageHandle) = synchronized {
try {handlersBarrier.await(1, TimeUnit.SECONDS)}
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
ThreadBasedDispatcher.registerHandler(key2, new MessageHandler {
def handle(message: MessageHandle) = synchronized {
try {handlersBarrier.await(1, TimeUnit.SECONDS)}
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
ThreadBasedDispatcher.start
ThreadBasedDispatcher.messageQueue.append(new MessageHandle(key1, new Object, new NullFutureResult))
ThreadBasedDispatcher.messageQueue.append(new MessageHandle(key2, new Object, new NullFutureResult))
handlersBarrier.await(1, TimeUnit.SECONDS)
assertFalse(threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
val handleLatch = new CountDownLatch(200)
val key1 = "key1"
val key2 = "key2"
ThreadBasedDispatcher.registerHandler(key1, new MessageHandler {
var currentValue = -1;
def handle(message: MessageHandle) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
ThreadBasedDispatcher.registerHandler(key2, new MessageHandler {
var currentValue = -1;
def handle(message: MessageHandle) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
ThreadBasedDispatcher.start
for (i <- 0 until 100) {
ThreadBasedDispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), new NullFutureResult))
ThreadBasedDispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), new NullFutureResult))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
}
}