pekko/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala

222 lines
8.1 KiB
Scala
Raw Normal View History

/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor.dispatch
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import org.scalatest.Assertions._
import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.actor.{ActorRef, Actor}
import se.scalablesolutions.akka.actor.Actor._
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor
object ActorModelSpec {
sealed trait ActorModelMessage
case class Reply_?(expect: Any) extends ActorModelMessage
case class Reply(expect: Any) extends ActorModelMessage
case class Forward(to: ActorRef,msg: Any) extends ActorModelMessage
case class CountDown(latch: CountDownLatch) extends ActorModelMessage
case class Increment(counter: AtomicLong) extends ActorModelMessage
case class Await(latch: CountDownLatch) extends ActorModelMessage
case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage
case object Restart extends ActorModelMessage
val Ping = "Ping"
val Pong = "Pong"
class DispatcherActor(dispatcher: MessageDispatcherInterceptor) extends Actor {
self.dispatcher = dispatcher.asInstanceOf[MessageDispatcher]
def ack { dispatcher.getStats(self).msgsProcessed.incrementAndGet() }
override def postRestart(reason: Throwable) {
dispatcher.getStats(self).restarts.incrementAndGet()
}
def receive = {
case Await(latch) => ack; latch.await()
case Meet(sign, wait) => ack; sign.countDown(); wait.await()
case Reply(msg) => ack; self.reply(msg)
case Reply_?(msg) => ack; self.reply_?(msg)
case Forward(to,msg) => ack; to.forward(msg)
case CountDown(latch) => ack; latch.countDown()
case Increment(count) => ack; count.incrementAndGet()
case Restart => ack; throw new Exception("Restart requested")
}
}
class InterceptorStats {
val suspensions = new AtomicLong(0)
val resumes = new AtomicLong(0)
val registers = new AtomicLong(0)
val unregisters = new AtomicLong(0)
val msgsReceived = new AtomicLong(0)
val msgsProcessed = new AtomicLong(0)
val restarts = new AtomicLong(0)
}
trait MessageDispatcherInterceptor extends MessageDispatcher {
val stats = new ConcurrentHashMap[ActorRef,InterceptorStats]
val starts = new AtomicLong(0)
val stops = new AtomicLong(0)
def getStats(actorRef: ActorRef) = {
stats.putIfAbsent(actorRef,new InterceptorStats)
stats.get(actorRef)
}
abstract override def suspend(actorRef: ActorRef) {
super.suspend(actorRef)
getStats(actorRef).suspensions.incrementAndGet()
}
abstract override def resume(actorRef: ActorRef) {
super.resume(actorRef)
getStats(actorRef).resumes.incrementAndGet()
}
private[akka] abstract override def register(actorRef: ActorRef) {
super.register(actorRef)
getStats(actorRef).registers.incrementAndGet()
}
private[akka] abstract override def unregister(actorRef: ActorRef) {
super.unregister(actorRef)
getStats(actorRef).unregisters.incrementAndGet()
}
private[akka] abstract override def dispatch(invocation: MessageInvocation) {
super.dispatch(invocation)
getStats(invocation.receiver).msgsReceived.incrementAndGet()
}
private[akka] abstract override def start {
super.start
starts.incrementAndGet()
}
private[akka] abstract override def shutdown {
super.shutdown
stops.incrementAndGet()
}
}
def assertDispatcher(dispatcher: MessageDispatcherInterceptor)(
starts: Long = dispatcher.starts.get(),
stops: Long = dispatcher.stops.get()
) {
assert(starts === dispatcher.starts.get(), "Dispatcher starts")
assert(stops === dispatcher.stops.get(), "Dispatcher stops")
}
def assertCountDown(latch: CountDownLatch,wait: Long,hint: AnyRef){
assert(latch.await(wait,TimeUnit.MILLISECONDS) === true)
}
def assertNoCountDown(latch: CountDownLatch,wait: Long,hint: AnyRef){
assert(latch.await(wait,TimeUnit.MILLISECONDS) === false)
}
def statsFor(actorRef: ActorRef, dispatcher: MessageDispatcher = null) =
dispatcher.asInstanceOf[MessageDispatcherInterceptor].getStats(actorRef)
def assertRefDefaultZero(actorRef: ActorRef,dispatcher: MessageDispatcher = null)(
suspensions: Long = 0,
resumes: Long = 0,
registers: Long = 0,
unregisters: Long = 0,
msgsReceived: Long = 0,
msgsProcessed: Long = 0,
restarts: Long = 0) {
assertRef(actorRef,dispatcher)(
suspensions,
resumes,
registers,
unregisters,
msgsReceived,
msgsProcessed,
restarts
)
}
def assertRef(actorRef: ActorRef,dispatcher: MessageDispatcher = null)(
suspensions: Long = statsFor(actorRef).suspensions.get(),
resumes: Long = statsFor(actorRef).resumes.get(),
registers: Long = statsFor(actorRef).registers.get(),
unregisters: Long = statsFor(actorRef).unregisters.get(),
msgsReceived: Long = statsFor(actorRef).msgsReceived.get(),
msgsProcessed: Long = statsFor(actorRef).msgsProcessed.get(),
restarts: Long = statsFor(actorRef).restarts.get()
) {
val stats = statsFor(actorRef,if (dispatcher eq null) actorRef.dispatcher else dispatcher)
assert(stats.suspensions.get() === suspensions, "Suspensions")
assert(stats.resumes.get() === resumes, "Resumes")
assert(stats.registers.get() === registers, "Registers")
assert(stats.unregisters.get() === unregisters, "Unregisters")
assert(stats.msgsReceived.get() === msgsReceived, "Received")
assert(stats.msgsProcessed.get() === msgsProcessed, "Processed")
assert(stats.restarts.get() === restarts, "Restarts")
}
def newTestActor(implicit d: MessageDispatcherInterceptor) = actorOf(new DispatcherActor(d))
}
abstract class ActorModelSpec extends JUnitSuite {
import ActorModelSpec._
protected def newInterceptedDispatcher: MessageDispatcherInterceptor
@Test def dispatcherShouldDynamicallyHandleItsOwnLifeCycle {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor
assertDispatcher(dispatcher)(starts = 0, stops = 0)
a.start
assertDispatcher(dispatcher)(starts = 1, stops = 0)
a.stop
Thread.sleep(dispatcher.timeoutMs + 100)
assertDispatcher(dispatcher)(starts = 1, stops = 1)
assertRef(a,dispatcher)(
suspensions = 0,
resumes = 0,
registers = 1,
unregisters = 1,
msgsReceived = 0,
msgsProcessed = 0,
restarts = 0
)
}
@Test def dispatcherShouldProcessMessagesOneAtATime {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor
val start,step1,step2,oneAtATime = new CountDownLatch(1)
val counter = new AtomicLong(0)
a.start
a ! CountDown(start)
assertCountDown(start,3000, "Should process first message within 3 seconds")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1)
a ! Meet(step1,step2)
assertCountDown(step1,3000, "Didn't process the Meet message in 3 seocnds")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 2, msgsProcessed = 2)
a ! CountDown(oneAtATime)
assertNoCountDown(oneAtATime,500,"Processed message when not allowed to")
step2.countDown()
assertCountDown(oneAtATime,500,"Processed message when allowed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3)
a.stop
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 3, msgsProcessed = 3)
}
}
class ExecutorBasedEventDrivenDispatcherModelTest extends ActorModelSpec {
def newInterceptedDispatcher =
new ExecutorBasedEventDrivenDispatcher("foo") with MessageDispatcherInterceptor
}