diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 5cbb6b3c8b..d565f4d55b 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -94,7 +94,7 @@ class ExecutorBasedEventDrivenDispatcher( private[akka] val threadFactory = new MonitorableThreadFactory(name) private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory)) - protected def dispatch(invocation: MessageInvocation) = { + private[akka] def dispatch(invocation: MessageInvocation) = { val mbox = getMailbox(invocation.receiver) mbox enqueue invocation registerForExecution(mbox) @@ -131,9 +131,9 @@ class ExecutorBasedEventDrivenDispatcher( case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported") } - protected def start= log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput) + private[akka] def start = log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput) - protected def shutdown { + private[akka] def shutdown { val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory)) if (old ne null) { log.debug("Shutting down %s", toString) diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index f357ff4ab3..0f850be6f7 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -61,7 +61,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size - protected def dispatch(invocation: MessageInvocation) { + private[akka] def dispatch(invocation: MessageInvocation) { val mbox = getMailbox(invocation.receiver) mbox enqueue invocation executorService.get() execute mbox @@ -167,9 +167,9 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( } else false } - protected def start = log.debug("Starting up %s",toString) + private[akka] def start = log.debug("Starting up %s",toString) - protected def shutdown { + private[akka] def shutdown { val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory)) if (old ne null) { log.debug("Shutting down %s", toString) @@ -190,7 +190,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( override val toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]" - def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = mailboxType match { + private[akka] def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = mailboxType match { case UnboundedMailbox(blocking) => // FIXME make use of 'blocking' in work stealer ConcurrentLinkedDeque new ConcurrentLinkedDeque[MessageInvocation] with MessageQueue with Runnable { def enqueue(handle: MessageInvocation): Unit = this.add(handle) @@ -220,7 +220,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( /** * Creates and returns a durable mailbox for the given actor. */ - protected def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match { + private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match { // FIXME make generic (work for TypedActor as well) case FileBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("FileBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher") case ZooKeeperBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("ZooKeeperBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher") diff --git a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala index 50ae2da19e..8f001084b7 100644 --- a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala @@ -144,11 +144,11 @@ class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = val mailboxType: Option[MailboxType] = None - protected def start { retainNonDaemon } + private[akka] def start { retainNonDaemon } - protected def shutdown { releaseNonDaemon } + private[akka] def shutdown { releaseNonDaemon } - protected def dispatch(invocation: MessageInvocation){ + private[akka] def dispatch(invocation: MessageInvocation){ mailbox(invocation.receiver).dispatch(invocation) } @@ -166,12 +166,12 @@ class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = def suspend(actorRef: ActorRef) = mailbox(actorRef).suspend def resume(actorRef:ActorRef) = mailbox(actorRef).resume - def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = null.asInstanceOf[AnyRef] + private[akka] def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = null.asInstanceOf[AnyRef] /** * Creates and returns a durable mailbox for the given actor. */ - protected def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = null.asInstanceOf[AnyRef] + private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = null.asInstanceOf[AnyRef] override def toString = "HawtDispatcher" } diff --git a/akka-actor/src/main/scala/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/dispatch/MailboxHandling.scala index 68fbebb0e7..d363e76382 100644 --- a/akka-actor/src/main/scala/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MailboxHandling.scala @@ -90,7 +90,7 @@ trait MailboxFactory { /** * Creates a MessageQueue (Mailbox) with the specified properties. */ - protected def createMailbox(actorRef: ActorRef): AnyRef = + private[akka] def createMailbox(actorRef: ActorRef): AnyRef = mailboxType.getOrElse(throw new IllegalStateException("No mailbox type defined")) match { case mb: TransientMailboxType => createTransientMailbox(actorRef, mb) case mb: DurableMailboxType => createDurableMailbox(actorRef, mb) @@ -99,10 +99,10 @@ trait MailboxFactory { /** * Creates and returns a transient mailbox for the given actor. */ - protected def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef + private[akka] def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef /** * Creates and returns a durable mailbox for the given actor. */ - protected def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef + private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef } \ No newline at end of file diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index dbbdc84b5c..9465d79897 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -86,7 +86,7 @@ trait MessageDispatcher extends MailboxFactory with Logging { dispatch(invocation) } else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started") - protected def register(actorRef: ActorRef) { + private[akka] def register(actorRef: ActorRef) { if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef) uuids add actorRef.uuid if (active.isOff) { @@ -96,7 +96,7 @@ trait MessageDispatcher extends MailboxFactory with Logging { } } - protected def unregister(actorRef: ActorRef) = { + private[akka] def unregister(actorRef: ActorRef) = { if (uuids remove actorRef.uuid) { actorRef.mailbox = null if (uuids.isEmpty){ @@ -145,7 +145,7 @@ trait MessageDispatcher extends MailboxFactory with Logging { } } - protected def timeoutMs: Long = 1000 + private[akka] def timeoutMs: Long = 1000 /** * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference @@ -160,17 +160,17 @@ trait MessageDispatcher extends MailboxFactory with Logging { /** * Will be called when the dispatcher is to queue an invocation for execution */ - protected def dispatch(invocation: MessageInvocation): Unit + private[akka] def dispatch(invocation: MessageInvocation): Unit /** * Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown */ - protected def start: Unit + private[akka] def start: Unit /** * Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached */ - protected def shutdown: Unit + private[akka] def shutdown: Unit /** * Returns the size of the mailbox for the specified actor diff --git a/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala new file mode 100644 index 0000000000..c79da02244 --- /dev/null +++ b/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala @@ -0,0 +1,222 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.actor.dispatch + +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import org.scalatest.Assertions._ +import se.scalablesolutions.akka.dispatch._ +import se.scalablesolutions.akka.actor.{ActorRef, Actor} +import se.scalablesolutions.akka.actor.Actor._ +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit} +import se.scalablesolutions.akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor + +object ActorModelSpec { + + sealed trait ActorModelMessage + case class Reply_?(expect: Any) extends ActorModelMessage + case class Reply(expect: Any) extends ActorModelMessage + case class Forward(to: ActorRef,msg: Any) extends ActorModelMessage + case class CountDown(latch: CountDownLatch) extends ActorModelMessage + case class Increment(counter: AtomicLong) extends ActorModelMessage + case class Await(latch: CountDownLatch) extends ActorModelMessage + case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage + case object Restart extends ActorModelMessage + + val Ping = "Ping" + val Pong = "Pong" + + class DispatcherActor(dispatcher: MessageDispatcherInterceptor) extends Actor { + self.dispatcher = dispatcher.asInstanceOf[MessageDispatcher] + + def ack { dispatcher.getStats(self).msgsProcessed.incrementAndGet() } + + override def postRestart(reason: Throwable) { + dispatcher.getStats(self).restarts.incrementAndGet() + } + + def receive = { + case Await(latch) => ack; latch.await() + case Meet(sign, wait) => ack; sign.countDown(); wait.await() + case Reply(msg) => ack; self.reply(msg) + case Reply_?(msg) => ack; self.reply_?(msg) + case Forward(to,msg) => ack; to.forward(msg) + case CountDown(latch) => ack; latch.countDown() + case Increment(count) => ack; count.incrementAndGet() + case Restart => ack; throw new Exception("Restart requested") + } + } + + class InterceptorStats { + val suspensions = new AtomicLong(0) + val resumes = new AtomicLong(0) + val registers = new AtomicLong(0) + val unregisters = new AtomicLong(0) + val msgsReceived = new AtomicLong(0) + val msgsProcessed = new AtomicLong(0) + val restarts = new AtomicLong(0) + } + + trait MessageDispatcherInterceptor extends MessageDispatcher { + val stats = new ConcurrentHashMap[ActorRef,InterceptorStats] + val starts = new AtomicLong(0) + val stops = new AtomicLong(0) + + def getStats(actorRef: ActorRef) = { + stats.putIfAbsent(actorRef,new InterceptorStats) + stats.get(actorRef) + } + + abstract override def suspend(actorRef: ActorRef) { + super.suspend(actorRef) + getStats(actorRef).suspensions.incrementAndGet() + } + + abstract override def resume(actorRef: ActorRef) { + super.resume(actorRef) + getStats(actorRef).resumes.incrementAndGet() + } + + private[akka] abstract override def register(actorRef: ActorRef) { + super.register(actorRef) + getStats(actorRef).registers.incrementAndGet() + } + + private[akka] abstract override def unregister(actorRef: ActorRef) { + super.unregister(actorRef) + getStats(actorRef).unregisters.incrementAndGet() + } + + private[akka] abstract override def dispatch(invocation: MessageInvocation) { + super.dispatch(invocation) + getStats(invocation.receiver).msgsReceived.incrementAndGet() + } + + private[akka] abstract override def start { + super.start + starts.incrementAndGet() + } + + private[akka] abstract override def shutdown { + super.shutdown + stops.incrementAndGet() + } + } + + def assertDispatcher(dispatcher: MessageDispatcherInterceptor)( + starts: Long = dispatcher.starts.get(), + stops: Long = dispatcher.stops.get() + ) { + assert(starts === dispatcher.starts.get(), "Dispatcher starts") + assert(stops === dispatcher.stops.get(), "Dispatcher stops") + } + + def assertCountDown(latch: CountDownLatch,wait: Long,hint: AnyRef){ + assert(latch.await(wait,TimeUnit.MILLISECONDS) === true) + } + + def assertNoCountDown(latch: CountDownLatch,wait: Long,hint: AnyRef){ + assert(latch.await(wait,TimeUnit.MILLISECONDS) === false) + } + + def statsFor(actorRef: ActorRef, dispatcher: MessageDispatcher = null) = + dispatcher.asInstanceOf[MessageDispatcherInterceptor].getStats(actorRef) + + def assertRefDefaultZero(actorRef: ActorRef,dispatcher: MessageDispatcher = null)( + suspensions: Long = 0, + resumes: Long = 0, + registers: Long = 0, + unregisters: Long = 0, + msgsReceived: Long = 0, + msgsProcessed: Long = 0, + restarts: Long = 0) { + assertRef(actorRef,dispatcher)( + suspensions, + resumes, + registers, + unregisters, + msgsReceived, + msgsProcessed, + restarts + ) + } + + def assertRef(actorRef: ActorRef,dispatcher: MessageDispatcher = null)( + suspensions: Long = statsFor(actorRef).suspensions.get(), + resumes: Long = statsFor(actorRef).resumes.get(), + registers: Long = statsFor(actorRef).registers.get(), + unregisters: Long = statsFor(actorRef).unregisters.get(), + msgsReceived: Long = statsFor(actorRef).msgsReceived.get(), + msgsProcessed: Long = statsFor(actorRef).msgsProcessed.get(), + restarts: Long = statsFor(actorRef).restarts.get() + ) { + val stats = statsFor(actorRef,if (dispatcher eq null) actorRef.dispatcher else dispatcher) + assert(stats.suspensions.get() === suspensions, "Suspensions") + assert(stats.resumes.get() === resumes, "Resumes") + assert(stats.registers.get() === registers, "Registers") + assert(stats.unregisters.get() === unregisters, "Unregisters") + assert(stats.msgsReceived.get() === msgsReceived, "Received") + assert(stats.msgsProcessed.get() === msgsProcessed, "Processed") + assert(stats.restarts.get() === restarts, "Restarts") + } + + def newTestActor(implicit d: MessageDispatcherInterceptor) = actorOf(new DispatcherActor(d)) +} + +abstract class ActorModelSpec extends JUnitSuite { + import ActorModelSpec._ + + protected def newInterceptedDispatcher: MessageDispatcherInterceptor + + @Test def dispatcherShouldDynamicallyHandleItsOwnLifeCycle { + implicit val dispatcher = newInterceptedDispatcher + val a = newTestActor + assertDispatcher(dispatcher)(starts = 0, stops = 0) + a.start + assertDispatcher(dispatcher)(starts = 1, stops = 0) + a.stop + Thread.sleep(dispatcher.timeoutMs + 100) + assertDispatcher(dispatcher)(starts = 1, stops = 1) + assertRef(a,dispatcher)( + suspensions = 0, + resumes = 0, + registers = 1, + unregisters = 1, + msgsReceived = 0, + msgsProcessed = 0, + restarts = 0 + ) + } + + @Test def dispatcherShouldProcessMessagesOneAtATime { + implicit val dispatcher = newInterceptedDispatcher + val a = newTestActor + val start,step1,step2,oneAtATime = new CountDownLatch(1) + val counter = new AtomicLong(0) + a.start + + a ! CountDown(start) + assertCountDown(start,3000, "Should process first message within 3 seconds") + assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1) + + a ! Meet(step1,step2) + assertCountDown(step1,3000, "Didn't process the Meet message in 3 seocnds") + assertRefDefaultZero(a)(registers = 1, msgsReceived = 2, msgsProcessed = 2) + + a ! CountDown(oneAtATime) + assertNoCountDown(oneAtATime,500,"Processed message when not allowed to") + step2.countDown() + assertCountDown(oneAtATime,500,"Processed message when allowed") + assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3) + + a.stop + assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 3, msgsProcessed = 3) + } +} + +class ExecutorBasedEventDrivenDispatcherModelTest extends ActorModelSpec { + def newInterceptedDispatcher = + new ExecutorBasedEventDrivenDispatcher("foo") with MessageDispatcherInterceptor +} \ No newline at end of file