This commit is contained in:
Vasil Remeniuk 2011-08-15 19:35:34 +03:00
commit ea201b4e1c
28 changed files with 632 additions and 381 deletions

View file

@ -65,6 +65,20 @@ class FSMTimingSpec extends WordSpec with MustMatchers with TestKit {
expectMsg(Transition(fsm, TestCancelTimer, Initial))
}
"not get confused between named and state timers" in {
fsm ! TestCancelStateTimerInNamedTimerMessage
fsm ! Tick
expectMsg(100 millis, Tick)
Thread.sleep(200)
fsm.dispatcher resume fsm
expectMsg(100 millis, Transition(fsm, TestCancelStateTimerInNamedTimerMessage, TestCancelStateTimerInNamedTimerMessage2))
fsm ! Cancel
within(100 millis) {
expectMsg(Cancel)
expectMsg(Transition(fsm, TestCancelStateTimerInNamedTimerMessage2, Initial))
}
}
"receive and cancel a repeated timer" in {
fsm ! TestRepeatedTimer
val seq = receiveWhile(600 millis) {
@ -113,6 +127,8 @@ object FSMTimingSpec {
case object TestRepeatedTimer extends State
case object TestUnhandled extends State
case object TestCancelTimer extends State
case object TestCancelStateTimerInNamedTimerMessage extends State
case object TestCancelStateTimerInNamedTimerMessage2 extends State
case object Tick
case object Tock
@ -170,6 +186,21 @@ object FSMTimingSpec {
stay using (remaining - 1)
}
}
when(TestCancelStateTimerInNamedTimerMessage) {
// FSM is suspended after processing this message and resumed 200ms later
case Ev(Tick)
self.dispatcher suspend self
setTimer("named", Tock, 10 millis, false)
stay forMax (100 millis) replying Tick
case Ev(Tock)
goto(TestCancelStateTimerInNamedTimerMessage2)
}
when(TestCancelStateTimerInNamedTimerMessage2) {
case Ev(StateTimeout)
goto(Initial)
case Ev(Cancel)
goto(Initial) replying Cancel
}
when(TestUnhandled) {
case Ev(SetHandler)
whenUnhandled {

View file

@ -144,14 +144,14 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
def newFooBar: Foo = newFooBar(Duration(2, "s"))
def newFooBar(timeout: Duration): Foo =
newFooBar(Configuration(timeout))
def newFooBar(d: Duration): Foo =
newFooBar(Props().withTimeout(Timeout(d)))
def newFooBar(config: Configuration): Foo =
typedActorOf(classOf[Foo], classOf[Bar], config)
def newFooBar(props: Props): Foo =
typedActorOf(classOf[Foo], classOf[Bar], props)
def newStacked(config: Configuration = Configuration(Duration(2, "s"))): Stacked =
typedActorOf(classOf[Stacked], classOf[StackedImpl], config)
def newStacked(props: Props = Props().withTimeout(Timeout(2000))): Stacked =
typedActorOf(classOf[Stacked], classOf[StackedImpl], props)
def mustStop(typedActor: AnyRef) = stop(typedActor) must be(true)
@ -295,7 +295,7 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
}
"be able to support implementation only typed actors" in {
val t = typedActorOf[Foo, Bar](Configuration())
val t = typedActorOf[Foo, Bar](Props())
val f = t.futurePigdog(200)
val f2 = t.futurePigdog(0)
f2.isCompleted must be(false)
@ -312,15 +312,15 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
}
"be able to use work-stealing dispatcher" in {
val config = Configuration(
Duration(6600, "ms"),
Dispatchers.newBalancingDispatcher("pooled-dispatcher")
val props = Props(
timeout = Timeout(6600),
dispatcher = Dispatchers.newBalancingDispatcher("pooled-dispatcher")
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
.setCorePoolSize(60)
.setMaxPoolSize(60)
.build)
val thais = for (i 1 to 60) yield newFooBar(config)
val thais = for (i 1 to 60) yield newFooBar(props)
val iterator = new CyclicIterator(thais)
val results = for (i 1 to 120) yield (i, iterator.next.futurePigdog(200L, i))

View file

@ -20,10 +20,10 @@ class ConfigSpec extends WordSpec with MustMatchers {
getString("akka.time-unit") must equal(Some("seconds"))
getString("akka.version") must equal(Some("2.0-SNAPSHOT"))
getString("akka.actor.default-dispatcher.type") must equal(Some("GlobalDispatcher"))
getString("akka.actor.default-dispatcher.type") must equal(Some("Dispatcher"))
getInt("akka.actor.default-dispatcher.keep-alive-time") must equal(Some(60))
getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(Some(1.0))
getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(Some(4.0))
getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(Some(8.0))
getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(Some(8.0))
getInt("akka.actor.default-dispatcher.executor-bounds") must equal(Some(-1))
getBool("akka.actor.default-dispatcher.allow-core-timeout") must equal(Some(true))
getString("akka.actor.default-dispatcher.rejection-policy") must equal(Some("caller-runs"))

View file

@ -28,8 +28,7 @@ object DispatchersSpec {
def typesAndValidators: Map[String, (MessageDispatcher) Boolean] = Map(
"BalancingDispatcher" -> ofType[BalancingDispatcher],
"Dispatcher" -> ofType[Dispatcher],
"GlobalDispatcher" -> instance(globalDispatcher))
"Dispatcher" -> ofType[Dispatcher])
def validTypes = typesAndValidators.keys.toList

View file

@ -371,6 +371,27 @@ object Actor {
createActor(address, () new LocalActorRef(() creator.create, address))
}
//TODO FIXME
def actorOf(props: Props): ActorRef = {
//TODO Implement support for configuring by deployment ID etc
//TODO If localOnly = true, never use the config file deployment and always create a new actor
//TODO If deployId matches an already created actor (Ahead-of-time deployed) return that actor
//TODO If deployId exists in config, it will override the specified Props (should we attempt to merge?)
val address = props.deployId match { //TODO handle deployId separately from address?
case "" | null newUuid().toString
case other other
}
val newActor = new LocalActorRef(props.creator, address)
newActor.dispatcher = props.dispatcher
newActor.faultHandler = props.faultHandler
newActor.lifeCycle = props.lifeCycle
newActor.timeout = props.timeout.duration.toMillis
newActor.receiveTimeout = props.receiveTimeout.map(_.toMillis)
props.supervisor.foreach(newActor.link(_))
newActor.start
}
def localActorOf[T <: Actor: Manifest]: ActorRef = {
newLocalActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], new UUID().toString)
}

View file

@ -47,12 +47,19 @@ private[akka] object ActorRefInternals {
* TODO document me
*/
object Props {
val defaultTimeout: Timeout = Timeout(Duration(Actor.TIMEOUT, "millis"))
def defaultDispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
val noCreatorSpecified: () Actor = () throw new UnsupportedOperationException("No actorFactoryProvided!")
object Default {
val creator: () Actor = () throw new UnsupportedOperationException("No actor creator specified!")
val deployId: String = ""
val dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
val timeout: Timeout = Timeout(Duration(Actor.TIMEOUT, "millis"))
val receiveTimeout: Option[Duration] = None
val lifeCycle: LifeCycle = Permanent
val faultHandler: FaultHandlingStrategy = NoFaultHandlingStrategy
val supervisor: Option[ActorRef] = None
val localOnly: Boolean = false
}
val default = new Props(creator = noCreatorSpecified)
def apply(): Props = default
val default = new Props()
def apply[T <: Actor: Manifest]: Props =
default.withCreator(() implicitly[Manifest[T]].erasure.asInstanceOf[Class[_ <: Actor]].newInstance)
@ -64,15 +71,26 @@ object Props {
/**
* ActorRef configuration object, this is thread safe and fully sharable
*/
case class Props(creator: () Actor,
deployId: String = "",
dispatcher: MessageDispatcher = Props.defaultDispatcher,
timeout: Timeout = Props.defaultTimeout,
receiveTimeout: Option[Duration] = None,
lifeCycle: LifeCycle = Permanent,
faultHandler: FaultHandlingStrategy = NoFaultHandlingStrategy,
supervisor: Option[ActorRef] = None,
localOnly: Boolean = false) {
case class Props(creator: () Actor = Props.Default.creator,
deployId: String = Props.Default.deployId,
dispatcher: MessageDispatcher = Props.Default.dispatcher,
timeout: Timeout = Props.Default.timeout,
receiveTimeout: Option[Duration] = Props.Default.receiveTimeout,
lifeCycle: LifeCycle = Props.Default.lifeCycle,
faultHandler: FaultHandlingStrategy = Props.Default.faultHandler,
supervisor: Option[ActorRef] = Props.Default.supervisor,
localOnly: Boolean = Props.Default.localOnly) {
def this() = this(
creator = Props.Default.creator,
deployId = Props.Default.deployId,
dispatcher = Props.Default.dispatcher,
timeout = Props.Default.timeout,
receiveTimeout = Props.Default.receiveTimeout,
lifeCycle = Props.Default.lifeCycle,
faultHandler = Props.Default.faultHandler,
supervisor = Props.Default.supervisor,
localOnly = Props.Default.localOnly)
/**
* Returns a new Props with the specified creator set
* Scala API
@ -581,13 +599,11 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
new AtomicReference[Actor](newActor)
}
def serializerErrorDueTo(reason: String) =
throw new akka.config.ConfigurationException(
"Could not create Serializer object [" + this.getClass.getName +
"]")
private val serializer: Serializer =
try { Serialization.serializerFor(this.getClass) } catch { case e: Exception serializerErrorDueTo(e.toString) }
private def serializer: Serializer = //TODO Is this used or needed?
try { Serialization.serializerFor(this.getClass) } catch {
case e: Exception throw new akka.config.ConfigurationException(
"Could not create Serializer object for [" + this.getClass.getName + "]")
}
private lazy val replicationStorage: Option[TransactionLog] = {
import DeploymentConfig._

View file

@ -61,10 +61,11 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
notifyListeners(ActorRegistered(address, actor, Option(typedActorsByUuid get actor.uuid)))
}
private[akka] def registerTypedActor(actorRef: ActorRef, interface: AnyRef) {
private[akka] def registerTypedActor(actorRef: ActorRef, interface: AnyRef): Unit =
typedActorsByUuid.put(actorRef.uuid, interface)
actorRef.start // register actorRef
}
private[akka] def unregisterTypedActor(actorRef: ActorRef, interface: AnyRef): Unit =
typedActorsByUuid.remove(actorRef.uuid, interface)
/**
* Unregisters an actor in the ActorRegistry.

View file

@ -454,8 +454,13 @@ trait FSM[S, D] extends ListenerManagement {
if (generation == gen) {
processMsg(StateTimeout, "state timeout")
}
case t @ Timer(name, msg, repeat, generation)
if ((timers contains name) && (timers(name).generation == generation)) {
case t @ Timer(name, msg, repeat, gen)
if ((timers contains name) && (timers(name).generation == gen)) {
if (timeoutFuture.isDefined) {
timeoutFuture.get.cancel(true)
timeoutFuture = None
}
generation += 1
processMsg(msg, t)
if (!repeat) {
timers -= name

View file

@ -5,6 +5,7 @@ package akka.actor
import akka.util.ByteString
import akka.dispatch.MessageInvocation
import akka.event.EventHandler
import java.net.InetSocketAddress
import java.io.IOException
@ -90,7 +91,7 @@ object IO {
case class Connect(socket: SocketHandle, address: InetSocketAddress) extends IOMessage
case class Connected(socket: SocketHandle) extends IOMessage
case class Close(handle: Handle) extends IOMessage
case class Closed(handle: Handle) extends IOMessage
case class Closed(handle: Handle, cause: Option[Exception]) extends IOMessage
case class Read(handle: ReadHandle, bytes: ByteString) extends IOMessage
case class Write(handle: WriteHandle, bytes: ByteString) extends IOMessage
@ -168,7 +169,7 @@ trait IO {
case Connected(socket)
state(socket).connected = true
run()
case msg @ Closed(handle)
case msg @ Closed(handle, _)
_state -= handle // TODO: clean up better
if (_receiveIO.isDefinedAt(msg)) {
_next = reset { _receiveIO(msg); Idle }
@ -322,7 +323,7 @@ private[akka] class IOWorker(ioManager: ActorRef, val bufferSize: Int) {
private val buffer = ByteBuffer.allocate(bufferSize)
private val thread = new Thread() {
private val thread = new Thread("io-worker") {
override def run(): Unit = {
while (selector.isOpen) {
selector select ()
@ -349,7 +350,7 @@ private[akka] class IOWorker(ioManager: ActorRef, val bufferSize: Int) {
writes += (handle -> queue.enqueue(data))
}
case Close(handle)
cleanup(handle)
cleanup(handle, None)
case Shutdown
channels.values foreach (_.close)
selector.close
@ -375,17 +376,18 @@ private[akka] class IOWorker(ioManager: ActorRef, val bufferSize: Int) {
try {
write(handle.asWritable, channel)
} catch {
case e: IOException // ignore, let it fail on read to ensure
// nothing left in read buffer.
case e: IOException
// ignore, let it fail on read to ensure nothing left in read buffer.
}
}
} catch {
case e: CancelledKeyException cleanup(handle)
case e: IOException cleanup(handle)
case e: CancelledKeyException cleanup(handle, Some(e))
case e: IOException cleanup(handle, Some(e))
case e: ActorInitializationException cleanup(handle, Some(e))
}
}
private def cleanup(handle: IO.Handle): Unit = {
private def cleanup(handle: IO.Handle, cause: Option[Exception]): Unit = {
handle match {
case server: IO.ServerHandle accepted -= server
case writable: IO.WriteHandle writes -= writable
@ -394,7 +396,12 @@ private[akka] class IOWorker(ioManager: ActorRef, val bufferSize: Int) {
case Some(channel)
channel.close
channels -= handle
handle.owner ! IO.Closed(handle)
try {
handle.owner ! IO.Closed(handle, cause)
} catch {
case e: ActorInitializationException
EventHandler debug (this, "IO.Handle's owner not running")
}
case None
}
}
@ -415,9 +422,12 @@ private[akka] class IOWorker(ioManager: ActorRef, val bufferSize: Int) {
}
private def connect(socket: IO.SocketHandle, channel: SocketChannel): Unit = {
channel.finishConnect
removeOps(socket, OP_CONNECT)
socket.owner ! IO.Connected(socket)
if (channel.finishConnect) {
removeOps(socket, OP_CONNECT)
socket.owner ! IO.Connected(socket)
} else {
cleanup(socket, None) // TODO: Add a cause
}
}
@tailrec
@ -436,7 +446,7 @@ private[akka] class IOWorker(ioManager: ActorRef, val bufferSize: Int) {
buffer.clear
val readLen = channel read buffer
if (readLen == -1) {
cleanup(handle)
cleanup(handle, None) // TODO: Add a cause
} else if (readLen > 0) {
buffer.flip
handle.owner ! IO.Read(handle, ByteString(buffer))

View file

@ -6,11 +6,11 @@ package akka.actor
import akka.japi.{ Creator, Option JOption }
import akka.actor.Actor._
import akka.dispatch.{ MessageDispatcher, Dispatchers, Future, FutureTimeoutException }
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import akka.util.{ Duration }
import java.util.concurrent.atomic.{ AtomicReference AtomVar }
import akka.serialization.{ Serializer, Serialization }
import akka.dispatch._
//TODO Document this class, not only in Scaladoc, but also in a dedicated typed-actor.rst, for both java and scala
/**
@ -55,15 +55,6 @@ object TypedActor {
case some some
}
@deprecated("This should be replaced with the same immutable configuration that will be used for ActorRef.actorOf", "!!!")
object Configuration { //TODO: Replace this with the new ActorConfiguration when it exists
val defaultTimeout = Duration(Actor.TIMEOUT, "millis")
val defaultConfiguration = new Configuration(defaultTimeout, Dispatchers.defaultGlobalDispatcher)
def apply(): Configuration = defaultConfiguration
}
@deprecated("This should be replaced with the same immutable configuration that will be used for ActorRef.actorOf", "!!!")
case class Configuration(timeout: Duration = Configuration.defaultTimeout, dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher)
/**
* This class represents a Method call, and has a reference to the Method to be called and the parameters to supply
* It's sent to the ActorRef backing the TypedActor and can be serialized and deserialized
@ -122,46 +113,46 @@ object TypedActor {
}
/**
* Creates a new TypedActor proxy using the supplied configuration,
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration): R =
createProxyAndTypedActor(interface, impl.newInstance, config, interface.getClassLoader)
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props): R =
createProxyAndTypedActor(interface, impl.newInstance, props, interface.getClassLoader)
/**
* Creates a new TypedActor proxy using the supplied configuration,
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration): R =
createProxyAndTypedActor(interface, impl.create, config, interface.getClassLoader)
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props): R =
createProxyAndTypedActor(interface, impl.create, props, interface.getClassLoader)
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration, loader: ClassLoader): R =
createProxyAndTypedActor(interface, impl.newInstance, config, loader)
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, loader: ClassLoader): R =
createProxyAndTypedActor(interface, impl.newInstance, props, loader)
/**
* Creates a new TypedActor proxy using the supplied configuration,
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration, loader: ClassLoader): R =
createProxyAndTypedActor(interface, impl.create, config, loader)
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, loader: ClassLoader): R =
createProxyAndTypedActor(interface, impl.create, props, loader)
/**
* Creates a new TypedActor proxy using the supplied configuration,
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
*/
def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], config: Configuration, loader: ClassLoader): R =
createProxyAndTypedActor(impl, impl.newInstance, config, loader)
def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, loader: ClassLoader): R =
createProxyAndTypedActor(impl, impl.newInstance, props, loader)
/**
* Creates a new TypedActor proxy using the supplied configuration,
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
*/
def typedActorOf[R <: AnyRef, T <: R](config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[T]): R = {
def typedActorOf[R <: AnyRef, T <: R](props: Props = Props(), loader: ClassLoader = null)(implicit m: Manifest[T]): R = {
val clazz = m.erasure.asInstanceOf[Class[T]]
createProxyAndTypedActor(clazz, clazz.newInstance, config, if (loader eq null) clazz.getClassLoader else loader)
createProxyAndTypedActor(clazz, clazz.newInstance, props, if (loader eq null) clazz.getClassLoader else loader)
}
/**
@ -186,25 +177,25 @@ object TypedActor {
def isTypedActor(proxyOrNot: AnyRef): Boolean = invocationHandlerFor(proxyOrNot) ne null
/**
* Creates a proxy given the supplied configuration, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](constructor: Actor, config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[R]): R =
createProxy[R](extractInterfaces(m.erasure), (ref: AtomVar[R]) constructor, config, if (loader eq null) m.erasure.getClassLoader else loader)
def createProxy[R <: AnyRef](constructor: Actor, props: Props = Props(), loader: ClassLoader = null)(implicit m: Manifest[R]): R =
createProxy[R](extractInterfaces(m.erasure), (ref: AtomVar[R]) constructor, props, if (loader eq null) m.erasure.getClassLoader else loader)
/**
* Creates a proxy given the supplied configuration, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], config: Configuration, loader: ClassLoader): R =
createProxy(interfaces, (ref: AtomVar[R]) constructor.create, config, loader)
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, loader: ClassLoader): R =
createProxy(interfaces, (ref: AtomVar[R]) constructor.create, props, loader)
/**
* Creates a proxy given the supplied configuration, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Actor, config: Configuration, loader: ClassLoader): R =
createProxy[R](interfaces, (ref: AtomVar[R]) constructor, config, loader)
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Actor, props: Props, loader: ClassLoader): R =
createProxy[R](interfaces, (ref: AtomVar[R]) constructor, props, loader)
/* Internal API */
@ -219,44 +210,51 @@ object TypedActor {
}
else null
private[akka] def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomVar[R]) Actor, config: Configuration, loader: ClassLoader): R = {
val proxyRef = new AtomVar[R]
configureAndProxyLocalActorRef[R](interfaces, proxyRef, constructor(proxyRef), config, loader)
private[akka] def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomVar[R]) Actor, props: Props, loader: ClassLoader): R = {
val proxyVar = new AtomVar[R]
configureAndProxyLocalActorRef[R](interfaces, proxyVar, props.withCreator(() constructor(proxyVar)), loader)
}
private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: T, config: Configuration, loader: ClassLoader): R =
createProxy[R](extractInterfaces(interface), (ref: AtomVar[R]) new TypedActor[R, T](ref, constructor), config, loader)
private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: T, props: Props, loader: ClassLoader): R =
createProxy[R](extractInterfaces(interface), (ref: AtomVar[R]) new TypedActor[R, T](ref, constructor), props, loader)
private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomVar[T], actor: Actor, config: Configuration, loader: ClassLoader): T = {
val ref = actorOf(actor)
ref.timeout = config.timeout.toMillis
ref.dispatcher = config.dispatcher
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(ref)).asInstanceOf[T]
proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
Actor.registry.registerTypedActor(ref, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop
proxy
private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, loader: ClassLoader): T = {
//Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling
val actorVar = new AtomVar[ActorRef](null)
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actorVar)).asInstanceOf[T]
proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
val ref = actorOf(props)
actorVar.set(ref) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet
proxyVar.get
}
private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = if (clazz.isInterface) Array[Class[_]](clazz) else clazz.getInterfaces
private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyRef: AtomVar[R], createInstance: T) extends Actor {
private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyVar: AtomVar[R], createInstance: T) extends Actor {
override def preStart = Actor.registry.registerTypedActor(self, proxyVar.get) //Make sure actor registry knows about this actor
override def postStop = Actor.registry.unregisterTypedActor(self, proxyVar.get)
val me = createInstance
def receive = {
case m: MethodCall
selfReference set proxyRef.get
selfReference set proxyVar.get
try {
if (m.isOneWay) m(me)
else if (m.returnsFuture_?) self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]]
else self reply m(me)
else if (m.returnsFuture_?) {
self.channel match {
case p: ActorPromise p completeWith m(me).asInstanceOf[Future[Any]]
case _ throw new IllegalStateException("Future-returning TypedActor didn't use ?/ask so cannot reply")
}
} else self reply m(me)
} finally { selfReference set null }
}
}
private[akka] case class TypedActorInvocationHandler(actor: ActorRef) extends InvocationHandler {
private[akka] class TypedActorInvocationHandler(actorVar: AtomVar[ActorRef]) extends InvocationHandler {
def actor = actorVar.get
def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match {
case "toString" actor.toString
case "equals" (args.length == 1 && (proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean
@ -264,11 +262,8 @@ object TypedActor {
case _
implicit val timeout = Timeout(actor.timeout)
MethodCall(method, args) match {
case m if m.isOneWay
actor ! m
null
case m if m.returnsFuture_?
actor ? m
case m if m.isOneWay actor ! m; null //Null return value
case m if m.returnsFuture_? actor ? m
case m if m.returnsJOption_? || m.returnsOption_?
val f = actor ? m
try { f.await } catch { case _: FutureTimeoutException }

View file

@ -56,11 +56,8 @@ object Dispatchers {
val THROUGHPUT_DEADLINE_TIME_MILLIS = THROUGHPUT_DEADLINE_TIME.toMillis.toInt
val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 1) UnboundedMailbox() else BoundedMailbox()
lazy val defaultGlobalDispatcher = {
config.getSection("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalDispatcher)
}
object globalDispatcher extends Dispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE)
lazy val defaultGlobalDispatcher =
config.getSection("akka.actor.default-dispatcher").flatMap(from) getOrElse newDispatcher("AkkaDefaultGlobalDispatcher", 1, MAILBOX_TYPE).build
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
@ -171,11 +168,11 @@ object Dispatchers {
* Creates of obtains a dispatcher from a ConfigMap according to the format below
*
* default-dispatcher {
* type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
* # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
* # GlobalExecutorBasedEventDriven
* type = "Dispatcher" # Must be one of the following
* # Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type),
* # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor
* keep-alive-time = 60 # Keep alive time for threads
* name = "MyDispatcher" # Optional, will be a generated UUID if omitted
* keep-alive-time = 60 # Keep alive time for threads in akka.time-unit
* core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
* max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
* executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
@ -188,18 +185,18 @@ object Dispatchers {
* Gotcha: Only configures the dispatcher if possible
* Returns: None if "type" isn't specified in the config
* Throws: IllegalArgumentException if the value of "type" is not valid
* IllegalArgumentException if it cannot
* IllegalArgumentException if it cannot create the MessageDispatcherConfigurator
*/
def from(cfg: Configuration): Option[MessageDispatcher] = {
cfg.getString("type") map {
case "Dispatcher" new DispatcherConfigurator()
case "BalancingDispatcher" new BalancingDispatcherConfigurator()
case "GlobalDispatcher" GlobalDispatcherConfigurator
cfg.getString("type") flatMap {
case "Dispatcher" Some(new DispatcherConfigurator())
case "BalancingDispatcher" Some(new BalancingDispatcherConfigurator())
case "GlobalDispatcher" None //TODO FIXME remove this
case fqn
ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match {
case Right(clazz)
ReflectiveAccess.createInstance[MessageDispatcherConfigurator](clazz, Array[Class[_]](), Array[AnyRef]()) match {
case Right(configurator) configurator
case Right(configurator) Some(configurator)
case Left(exception)
throw new IllegalArgumentException(
"Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, exception)
@ -213,10 +210,6 @@ object Dispatchers {
}
}
object GlobalDispatcherConfigurator extends MessageDispatcherConfigurator {
def configure(config: Configuration): MessageDispatcher = Dispatchers.globalDispatcher
}
class DispatcherConfigurator extends MessageDispatcherConfigurator {
def configure(config: Configuration): MessageDispatcher = {
configureThreadPool(config, threadPoolConfig new Dispatcher(

View file

@ -374,6 +374,37 @@ sealed trait Future[+T] extends japi.Future[T] {
*/
def await(atMost: Duration): Future[T]
/**
* Await completion of this Future and return its value if it conforms to A's
* erased type. Will throw a ClassCastException if the value does not
* conform, or any exception the Future was completed with. Will return None
* in case of a timeout.
*/
def as[A](implicit m: Manifest[A]): Option[A] = {
try await catch { case _: FutureTimeoutException }
value match {
case None None
case Some(Left(ex)) throw ex
case Some(Right(v)) Some(BoxedType(m.erasure).cast(v).asInstanceOf[A])
}
}
/**
* Await completion of this Future and return its value if it conforms to A's
* erased type, None otherwise. Will throw any exception the Future was
* completed with. Will return None in case of a timeout.
*/
def asSilently[A](implicit m: Manifest[A]): Option[A] = {
try await catch { case _: FutureTimeoutException }
value match {
case None None
case Some(Left(ex)) throw ex
case Some(Right(v))
try Some(BoxedType(m.erasure).cast(v).asInstanceOf[A])
catch { case _: ClassCastException None }
}
}
/**
* Tests whether this Future has been completed.
*/

View file

@ -1,39 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
import akka.dispatch.{ FutureTimeoutException, Future }
import akka.util.Helpers.{ narrow, narrowSilently }
package object akka {
/**
* Implicitly converts the given Option[Any] to a AnyOptionAsTypedOption which offers the method <code>as[T]</code>
* to convert an Option[Any] to an Option[T].
*/
implicit def toAnyOptionAsTypedOption(anyOption: Option[Any]) = new AnyOptionAsTypedOption(anyOption)
/**
* Implicitly converts the given Future[_] to a AnyOptionAsTypedOption which offers the method <code>as[T]</code>
* to convert an Option[Any] to an Option[T].
* This means that the following code is equivalent:
* (actor ? "foo").as[Int] (Recommended)
*/
implicit def futureToAnyOptionAsTypedOption(anyFuture: Future[_]) = new AnyOptionAsTypedOption({
try { anyFuture.await } catch { case t: FutureTimeoutException }
anyFuture.resultOrException
})
private[akka] class AnyOptionAsTypedOption(anyOption: Option[Any]) {
/**
* Convenience helper to cast the given Option of Any to an Option of the given type. Will throw a ClassCastException
* if the actual type is not assignable from the given one.
*/
def as[T]: Option[T] = narrow[T](anyOption)
/**
* Convenience helper to cast the given Option of Any to an Option of the given type. Will swallow a possible
* ClassCastException and return None in that case.
*/
def asSilently[T: Manifest]: Option[T] = narrowSilently[T](anyOption)
}
}

View file

@ -2,7 +2,8 @@ package akka.camel;
import akka.actor.Actor;
import akka.actor.TypedActor;
import akka.actor.TypedActor.Configuration;
import akka.actor.Props;
import akka.actor.Timeout;
import akka.dispatch.Dispatchers;
import akka.japi.SideEffect;
import akka.util.FiniteDuration;
@ -42,8 +43,7 @@ public class TypedConsumerJavaTestBase {
consumer = TypedActor.typedActorOf(
SampleErrorHandlingTypedConsumer.class,
SampleErrorHandlingTypedConsumerImpl.class,
new Configuration(new FiniteDuration(5000, "millis"), Dispatchers.defaultGlobalDispatcher()
));
(new Props()).withTimeout(new Timeout(new FiniteDuration(5000, "millis"))));
}
});
String result = getMandatoryTemplate().requestBody("direct:error-handler-test-java-typed", "hello", String.class);

View file

@ -7,7 +7,6 @@ import org.scalatest.junit.JUnitSuite
import akka.actor._
import akka.actor.Actor._
import akka.actor.TypedActor.Configuration._
import akka.camel.TypedCamelTestSupport.{ SetExpectedMessageCount SetExpectedTestMessageCount, _ }
class TypedConsumerPublishRequestorTest extends JUnitSuite {
@ -41,7 +40,7 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite {
def shouldReceiveOneConsumerMethodRegisteredEvent = {
Actor.registry.addListener(requestor)
val latch = (publisher ? SetExpectedTestMessageCount(1)).as[CountDownLatch].get
val obj = TypedActor.typedActorOf(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl], defaultConfiguration)
val obj = TypedActor.typedActorOf(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl], Props())
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val event = (publisher ? GetRetainedMessage).as[ConsumerMethodRegistered].get
assert(event.endpointUri === "direct:foo")
@ -51,7 +50,7 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite {
@Test
def shouldReceiveOneConsumerMethodUnregisteredEvent = {
val obj = TypedActor.typedActorOf(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl], defaultConfiguration)
val obj = TypedActor.typedActorOf(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl], Props())
val latch = (publisher ? SetExpectedTestMessageCount(1)).as[CountDownLatch].get
Actor.registry.addListener(requestor)
TypedActor.stop(obj)
@ -66,7 +65,7 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite {
def shouldReceiveThreeConsumerMethodRegisteredEvents = {
Actor.registry.addListener(requestor)
val latch = (publisher ? SetExpectedTestMessageCount(3)).as[CountDownLatch].get
val obj = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], defaultConfiguration)
val obj = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], Props())
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val request = GetRetainedMessages(_.isInstanceOf[ConsumerMethodRegistered])
val events = (publisher ? request).as[List[ConsumerMethodRegistered]].get
@ -75,7 +74,7 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite {
@Test
def shouldReceiveThreeConsumerMethodUnregisteredEvents = {
val obj = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], defaultConfiguration)
val obj = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], Props())
val latch = (publisher ? SetExpectedTestMessageCount(3)).as[CountDownLatch].get
Actor.registry.addListener(requestor)
TypedActor.stop(obj)

View file

@ -7,7 +7,6 @@ import org.scalatest.matchers.MustMatchers
import akka.actor.Actor._
import akka.actor._
import akka.actor.TypedActor.Configuration._
/**
* @author Martin Krasser
@ -33,7 +32,7 @@ class TypedConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMa
"started" must {
"support in-out message exchanges via its endpoints" in {
service.awaitEndpointActivation(3) {
actor = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], defaultConfiguration)
actor = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], Props())
} must be(true)
mandatoryTemplate.requestBodyAndHeader("direct:m2", "x", "test", "y") must equal("m2: x y")
mandatoryTemplate.requestBodyAndHeader("direct:m3", "x", "test", "y") must equal("m3: x y")
@ -63,7 +62,7 @@ class TypedConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMa
"started" must {
"support in-out message exchanges via its endpoints" in {
service.awaitEndpointActivation(2) {
actor = TypedActor.typedActorOf(classOf[TestTypedConsumer], classOf[TestTypedConsumerImpl], defaultConfiguration)
actor = TypedActor.typedActorOf(classOf[TestTypedConsumer], classOf[TestTypedConsumerImpl], Props())
} must be(true)
mandatoryTemplate.requestBody("direct:publish-test-3", "x") must equal("foo: x")
mandatoryTemplate.requestBody("direct:publish-test-4", "x") must equal("bar: x")

View file

@ -5,8 +5,7 @@ import org.apache.camel.builder.RouteBuilder
import org.apache.camel.impl.{ DefaultCamelContext, SimpleRegistry }
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec }
import akka.actor.{ Actor, TypedActor }
import akka.actor.TypedActor.Configuration._
import akka.actor.{ Actor, TypedActor, Props }
import akka.camel._
/**
@ -21,10 +20,10 @@ class TypedActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll
override protected def beforeAll = {
val typedActor = TypedActor.typedActorOf(
classOf[SampleTypedActor],
classOf[SampleTypedActorImpl], defaultConfiguration) // not a consumer
classOf[SampleTypedActorImpl], Props()) // not a consumer
val typedConsumer = TypedActor.typedActorOf(
classOf[SampleTypedConsumer],
classOf[SampleTypedConsumerImpl], defaultConfiguration)
classOf[SampleTypedConsumerImpl], Props())
typedConsumerUuid = TypedActor.getActorRefFor(typedConsumer).uuid.toString

View file

@ -403,13 +403,6 @@ class DefaultClusterNode private[akka] (
def membershipNodes: Array[String] = locallyCachedMembershipNodes.toList.toArray.asInstanceOf[Array[String]]
private[akka] val nodeConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] = {
val conns = new ConcurrentHashMap[String, Tuple2[InetSocketAddress, ActorRef]]
if (includeRefNodeInReplicaSet)
conns.put(nodeAddress.nodeName, (remoteServerAddress, remoteDaemon)) // add the remote connection to 'this' node as well, but as a 'local' actor
conns
}
// zookeeper listeners
private val stateListener = new StateListener(this)
private val membershipListener = new MembershipChildListener(this)
@ -420,6 +413,17 @@ class DefaultClusterNode private[akka] (
// Address -> ClusterActorRef
private val clusterActorRefs = new Index[InetSocketAddress, ClusterActorRef]
// ============================================================================================================
// ========== WARNING: THESE FIELDS AND EVERYTHING USING THEM IN THE CONSTRUCTOR NEEDS TO BE LAZY =============
// ============================================================================================================
lazy private[akka] val nodeConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] = {
val conns = new ConcurrentHashMap[String, Tuple2[InetSocketAddress, ActorRef]]
if (includeRefNodeInReplicaSet)
conns.put(nodeAddress.nodeName, (remoteServerAddress, remoteDaemon)) // add the remote connection to 'this' node as well, but as a 'local' actor
conns
}
// ZooKeeper client
lazy private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer)
@ -441,6 +445,8 @@ class DefaultClusterNode private[akka] (
LEADER_ELECTION_PATH, null,
leaderElectionCallback)
// ============================================================================================================
if (enableJMX) createMBean
// =======================================
@ -476,6 +482,7 @@ class DefaultClusterNode private[akka] (
}
def shutdown() {
def shutdownNode() {
ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath))

View file

@ -407,7 +407,7 @@ class ActiveRemoteClient private[akka] (
//Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients
def shutdown() = runSwitch switchOff {
EventHandler.info(this, "Shutting down [%s]".format(name))
EventHandler.info(this, "Shutting down remote client [%s]".format(name))
notifyListeners(RemoteClientShutdown(module, remoteAddress))
timer.stop()
@ -655,6 +655,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
serverModule.notifyListeners(RemoteServerStarted(serverModule))
def shutdown() {
EventHandler.info(this, "Shutting down remote server [%s]".format(name))
try {
val shutdownSignal = {
val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN)

View file

@ -8,170 +8,14 @@ import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll
import akka.actor._
import com.eaio.uuid.UUID
class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterAll {
class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterAll {
private var bookKeeper: BookKeeper = _
private var localBookKeeper: LocalBookKeeper = _
"A synchronous used Transaction Log" should {
"be able to be deleted - synchronous" in {
val uuid = (new UUID).toString
val txlog = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog.recordEntry(entry)
txlog.delete()
txlog.close()
val zkClient = TransactionLog.zkClient
assert(zkClient.readData(txlog.snapshotPath, true) == null)
assert(zkClient.readData(txlog.txLogPath, true) == null)
}
"fail to be opened if non existing - synchronous" in {
val uuid = (new UUID).toString
intercept[ReplicationException](TransactionLog.logFor(uuid, false, null))
}
"be able to be checked for existence - synchronous" in {
val uuid = (new UUID).toString
TransactionLog.exists(uuid) must be(false)
TransactionLog.newLogFor(uuid, false, null)
TransactionLog.exists(uuid) must be(true)
}
"be able to record entries - synchronous" in {
val uuid = (new UUID).toString
val txlog = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog.recordEntry(entry)
}
"be able to overweite an existing txlog if one already exists - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txLog2 = TransactionLog.newLogFor(uuid, false, null)
txLog2.latestSnapshotId.isDefined must be(false)
txLog2.latestEntryId must be(-1)
}
"be able to record and delete entries - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.delete
txlog1.close
intercept[ReplicationException](TransactionLog.logFor(uuid, false, null))
}
"be able to record entries and read entries with 'entriesInRange' - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid, false, null)
val entries = txlog2.entriesInRange(0, 1).map(bytes new String(bytes, "UTF-8"))
entries.size must equal(2)
entries(0) must equal("hello")
entries(1) must equal("hello")
txlog2.close
}
"be able to record entries and read entries with 'entries' - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
// txlog1.close // should work without txlog.close
val txlog2 = TransactionLog.logFor(uuid, false, null)
val entries = txlog2.entries.map(bytes new String(bytes, "UTF-8"))
entries.size must equal(4)
entries(0) must equal("hello")
entries(1) must equal("hello")
entries(2) must equal("hello")
entries(3) must equal("hello")
txlog2.close
}
"be able to record a snapshot - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
txlog1.close
}
"be able to record and read a snapshot and following entries - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid, false, null)
val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries
new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot")
val entries = entriesAsBytes.map(bytes new String(bytes, "UTF-8"))
entries.size must equal(4)
entries(0) must equal("hello")
entries(1) must equal("hello")
entries(2) must equal("hello")
entries(3) must equal("hello")
txlog2.close
}
"be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid, false, null)
val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries
new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot")
val entries = entriesAsBytes.map(bytes new String(bytes, "UTF-8"))
entries.size must equal(2)
entries(0) must equal("hello")
entries(1) must equal("hello")
txlog2.close
}
}
"An asynchronous Transaction Log" should {
"be able to record entries - asynchronous" in {
val uuid = (new UUID).toString
@ -373,7 +217,11 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
}
override def afterAll() = {
Cluster.node.shutdown()
Cluster.shutdownLocalCluster()
TransactionLog.shutdown()
LocalBookKeeperEnsemble.shutdown()
Actor.registry.local.shutdownAll()
Scheduler.shutdown()
}
}

View file

@ -0,0 +1,190 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import org.apache.bookkeeper.client.BookKeeper
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll
import akka.actor._
import com.eaio.uuid.UUID
class SynchronousTransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterAll {
private var bookKeeper: BookKeeper = _
private var localBookKeeper: LocalBookKeeper = _
"A synchronous used Transaction Log" should {
"be able to be deleted - synchronous" in {
val uuid = (new UUID).toString
val txlog = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog.recordEntry(entry)
txlog.delete()
txlog.close()
val zkClient = TransactionLog.zkClient
assert(zkClient.readData(txlog.snapshotPath, true) == null)
assert(zkClient.readData(txlog.txLogPath, true) == null)
}
"fail to be opened if non existing - synchronous" in {
val uuid = (new UUID).toString
intercept[ReplicationException](TransactionLog.logFor(uuid, false, null))
}
"be able to be checked for existence - synchronous" in {
val uuid = (new UUID).toString
TransactionLog.exists(uuid) must be(false)
TransactionLog.newLogFor(uuid, false, null)
TransactionLog.exists(uuid) must be(true)
}
"be able to record entries - synchronous" in {
val uuid = (new UUID).toString
val txlog = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog.recordEntry(entry)
}
"be able to overweite an existing txlog if one already exists - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txLog2 = TransactionLog.newLogFor(uuid, false, null)
txLog2.latestSnapshotId.isDefined must be(false)
txLog2.latestEntryId must be(-1)
}
"be able to record and delete entries - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.delete
txlog1.close
// intercept[ReplicationException](TransactionLog.logFor(uuid, false, null))
}
"be able to record entries and read entries with 'entriesInRange' - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid, false, null)
val entries = txlog2.entriesInRange(0, 1).map(bytes new String(bytes, "UTF-8"))
entries.size must equal(2)
entries(0) must equal("hello")
entries(1) must equal("hello")
txlog2.close
}
"be able to record entries and read entries with 'entries' - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close // should work without txlog.close
val txlog2 = TransactionLog.logFor(uuid, false, null)
val entries = txlog2.entries.map(bytes new String(bytes, "UTF-8"))
entries.size must equal(4)
entries(0) must equal("hello")
entries(1) must equal("hello")
entries(2) must equal("hello")
entries(3) must equal("hello")
txlog2.close
}
"be able to record a snapshot - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
txlog1.close
}
"be able to record and read a snapshot and following entries - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid, false, null)
val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries
new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot")
val entries = entriesAsBytes.map(bytes new String(bytes, "UTF-8"))
entries.size must equal(4)
entries(0) must equal("hello")
entries(1) must equal("hello")
entries(2) must equal("hello")
entries(3) must equal("hello")
txlog2.close
}
"be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txlog2 = TransactionLog.logFor(uuid, false, null)
val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries
new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot")
val entries = entriesAsBytes.map(bytes new String(bytes, "UTF-8"))
entries.size must equal(2)
entries(0) must equal("hello")
entries(1) must equal("hello")
txlog2.close
}
}
override def beforeAll() = {
Cluster.startLocalCluster()
LocalBookKeeperEnsemble.start()
}
override def afterAll() = {
Cluster.node.shutdown()
Cluster.shutdownLocalCluster()
TransactionLog.shutdown()
LocalBookKeeperEnsemble.shutdown()
Actor.registry.local.shutdownAll()
Scheduler.shutdown()
}
}

View file

@ -1,15 +1,124 @@
Release Notes
==============
Changes listed in no particular order.
Release 1.2
-----------
1.1
----------------------------------------
This release, while containing several substantial improvements, focuses on
paving the way for the upcoming 2.0 release. A selection of changes is
presented in the following, for the full list of tickets closed during the
development cycle please refer to
`the issue tracker <https://www.assembla.com/spaces/akka/milestones/356697-1-2>`_.
- **UPD** - improve FSM DSL: make onTransition syntax nicer (Roland Kuhn)
- **Actor:**
Release 1.1-M1
--------------------
- unified :class:`Channel` abstraction for :class:`Promise` & :class:`Actor`
- reintegrate invocation tracing (to be enabled per class and globally)
- make last message available during :meth:`preRestart()`
- experimental :meth:`freshInstance()` life-cycle hook for priming the new instance during restart
- new textual primitives :meth:`tell` (``!``) and :meth:`ask` (``?``, formerly ``!!!``)
- timeout for :meth:`ask` Futures taken from implicit argument (currently with fallback to deprecated ``ActorRef.timeout``
- **durable mailboxes:**
- beanstalk, file, mongo, redis
- **Future:**
- :meth:`onTimeout` callback
- select dispatcher for execution by implicit argument
- add safer cast methods :meth:`as[T]: T` and :meth:`mapTo[T]: Future[T]`
- **TestKit:**
- add :class:`TestProbe` (can receive, reply and forward messages, supports all :class:`TestKit` assertions)
- add :meth:`TestKit.awaitCond`
- support global time-factor for all timing assertions (for running on busy CI servers)
- **FSM:**
- add :class:`TestFSMRef`
- add :class:`LoggingFSM` (transition tracing, rolling event log)
- updated dependencies:
- Jackson 1.8.0
- Netty 3.2.5
- Protobuf 2.4.1
- ScalaTest 1.6.1
- various fixes, small improvements and documentation updates
- several **deprecations** in preparation for 2.0
================================ =====================
Method Replacement
================================ =====================
Actor.preRestart(cause) Actor.preRestart(cause, lastMsg)
ActorRef.sendOneWay ActorRef.tell
ActorRef.sendOneWaySafe ActorRef.tryTell
ActorRef.sendRequestReply ActorRef.ask(...).get()
ActorRef.sendRequestReplyFuture ActorRef.ask(...).get()
ActorRef.replyUnsafe ActorRef.reply
ActorRef.replySafe ActorRef.tryReply
ActorRef.mailboxSize ActorRef.dispatcher.mailboxSize(actorRef)
ActorRef.sender/senderFuture ActorRef.channel
ActorRef.!! ActorRef.?(...).as[T]
ActorRef.!!! ActorRef.?
ActorRef.reply\_? ActorRef.tryReply
Future.receive Future.onResult
Future.collect Future.map
Future.failure Future.recover
MessageDispatcher.pendingFutures MessageDispatcher.tasks
RemoteClientModule.*Listener(s) EventHandler.<X>
TestKit.expectMsg(pf) TestKit.expectMsgPF
TestKit.receiveWhile(pf) TestKit.receiveWhile()(pf)
================================ =====================
Trivia
^^^^^^
This release contains changes to 213 files, with 16053 insertions and 3624
deletions. The authorship of the corresponding commits is distributed as shown
below; the listing should not be taken too seriously, though, it has just been
done using ``git log --shortstat`` and summing up the numbers, so it certainly
misses details like who originally authored changes which were then back-ported
from the master branch (do not fear, you will be correctly attributed when the
stats for 2.0 are made).
======= ========== ========= =========
Commits Insertions Deletions Author
======= ========== ========= =========
69 11805 170 Viktor Klang
34 9694 97 Patrik Nordwall
72 3563 179 Roland Kuhn
27 1749 115 Peter Vlugter
7 238 22 Derek Williams
4 86 25 Peter Veentjer
1 17 5 Debasish Ghosh
2 15 5 Jonas Bonér
======= ========== ========= =========
.. note::
Release notes of previous releases consisted of ticket or change listings in
no particular order
Release 1.1
-----------
- **ADD** - #647 Extract an akka-camel-typed module out of akka-camel for optional typed actor support (Martin Krasser)
- **ADD** - #654 Allow consumer actors to acknowledge in-only message exchanges (Martin Krasser)

View file

@ -287,8 +287,11 @@ processing another message on this actor).
For this purpose, there is the method :meth:`Future.as[T]` which waits until
either the future is completed or its timeout expires, whichever comes first.
The result is then inspected and returned as :class:`Some[T]` if it was
normally completed and the answers runtime type matches the desired type; in
all other cases :class:`None` is returned.
normally completed and the answers runtime type matches the desired type; if
the future contains an exception or the value cannot be cast to the desired
type, it will throw the exception or a :class:`ClassCastException` (if you want
to get :obj:`None` in the latter case, use :meth:`Future.asSilently[T]`). In
case of a timeout, :obj:`None` is returned.
.. code-block:: scala

View file

@ -7,8 +7,8 @@ import org.apache.camel.spring.spi.ApplicationContextRegistry
import org.springframework.context.support.ClassPathXmlApplicationContext
import akka.actor.Actor._
import akka.actor.Props
import akka.actor.TypedActor
import akka.actor.TypedActor.Configuration._
import akka.camel.CamelContextManager
import akka.config.Supervision._
@ -91,7 +91,7 @@ class Boot {
// -----------------------------------------------------------------------
// TODO: investigate why this consumer is not published
TypedActor.typedActorOf(classOf[TypedConsumer1], classOf[TypedConsumer1Impl], defaultConfiguration)
TypedActor.typedActorOf(classOf[TypedConsumer1], classOf[TypedConsumer1Impl], Props())
}
/**

View file

@ -2,8 +2,7 @@ package sample.camel
import akka.actor.Actor._
import akka.camel.CamelServiceManager
import akka.actor.TypedActor
import akka.actor.TypedActor.Configuration._
import akka.actor.{ TypedActor, Props }
/**
* @author Martin Krasser
@ -18,7 +17,7 @@ object ServerApplication extends App {
val ua = actorOf[RemoteActor2].start
val ta = TypedActor.typedActorOf(
classOf[RemoteTypedConsumer2],
classOf[RemoteTypedConsumer2Impl], defaultConfiguration)
classOf[RemoteTypedConsumer2Impl], Props())
remote.start("localhost", 7777)
remote.register("remote2", ua)

View file

@ -5,8 +5,7 @@ import org.apache.camel.builder.RouteBuilder
import org.apache.camel.spring.spi.ApplicationContextRegistry
import org.springframework.context.support.ClassPathXmlApplicationContext
import akka.actor.{ Actor, TypedActor }
import akka.actor.TypedActor.Configuration._
import akka.actor.{ Actor, TypedActor, Props }
import akka.camel._
/**
@ -18,7 +17,7 @@ object StandaloneApplication extends App {
// 'externally' register typed actors
val registry = new SimpleRegistry
registry.put("sample", TypedActor.typedActorOf(classOf[BeanIntf], classOf[BeanImpl], defaultConfiguration))
registry.put("sample", TypedActor.typedActorOf(classOf[BeanIntf], classOf[BeanImpl], Props()))
// customize CamelContext
CamelContextManager.init(new DefaultCamelContext(registry))
@ -31,7 +30,7 @@ object StandaloneApplication extends App {
mandatoryService.awaitEndpointActivation(1) {
// 'internally' register typed actor (requires CamelService)
TypedActor.typedActorOf(classOf[TypedConsumer2], classOf[TypedConsumer2Impl], defaultConfiguration)
TypedActor.typedActorOf(classOf[TypedConsumer2], classOf[TypedConsumer2Impl], Props())
}
// access 'internally' (automatically) registered typed-actors

View file

@ -81,13 +81,13 @@ akka {
}
default-dispatcher {
type = "GlobalDispatcher" # Must be one of the following, all "Global*" are non-configurable
# - Dispatcher
# - BalancingDispatcher
# - GlobalDispatcher
type = "Dispatcher" # Must be one of the following
# Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type),
# A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor
name = "MyDispatcher" # Optional, will be a generated UUID if omitted
keep-alive-time = 60 # Keep alive time for threads
core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
core-pool-size-factor = 8.0 # No of core threads ... ceil(available processors * factor)
max-pool-size-factor = 8.0 # Max no of threads ... ceil(available processors * factor)
executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
allow-core-timeout = on # Allow core threads to time out
rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard

35
scripts/authors.pl Executable file
View file

@ -0,0 +1,35 @@
#!/usr/bin/perl
#
# This script can generate commit statistics from 'git log --shortstat -z tag1..tag2' output
#
use strict;
use warnings;
local $/ = "\x0";
my %auth;
our $commits;
our $insertions;
our $deletions;
our $author;
while (<>) {
($author) = /Author: (.*) </;
my ($insert, $delete) = /files changed, (\d+) insert.*(\d+) delet/;
next unless defined $insert;
$auth{$author} = [0, 0, 0] unless defined($auth{$author});
my @l = @{$auth{$author}};
$auth{$author} = [$l[0] + 1, $l[1] + $insert, $l[2] + $delete];
}
for $author (sort { $auth{$b}->[0] <=> $auth{$a}->[0] } keys %auth) {
($commits, $insertions, $deletions) = @{$auth{$author}};
write;
}
format STDOUT =
@#### @###### @###### @*
$commits, $insertions, $deletions, $author
.