Merge branch 'master' of github.com:akka/akka

This commit is contained in:
Jonas Bonér 2012-05-08 14:24:53 +02:00
commit 4b5d107e66
30 changed files with 462 additions and 1312 deletions

View file

@ -262,6 +262,25 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
expectMsg(1 second, IndexedSeq(LogEntry(1, 1, "log"), LogEntry(1, 1, "count"), LogEntry(1, 2, "log"))) expectMsg(1 second, IndexedSeq(LogEntry(1, 1, "log"), LogEntry(1, 1, "count"), LogEntry(1, 2, "log")))
} }
"allow transforming of state results" in {
import akka.actor.FSM._
val fsmref = system.actorOf(Props(new Actor with FSM[Int, Int] {
startWith(0, 0)
when(0)(transform {
case Event("go", _) stay
} using {
case x goto(1)
})
when(1) {
case _ stay
}
}))
fsmref ! SubscribeTransitionCallBack(testActor)
fsmref ! "go"
expectMsg(CurrentState(fsmref, 0))
expectMsg(Transition(fsmref, 0, 1))
}
} }
} }

View file

@ -67,6 +67,18 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender {
} }
} }
"resubmit single-shot timer" taggedAs TimingTest in {
within(2 seconds) {
within(500 millis, 1.5 second) {
fsm ! TestSingleTimerResubmit
expectMsg(Tick)
expectMsg(Tock)
expectMsg(Transition(fsm, TestSingleTimerResubmit, Initial))
}
expectNoMsg
}
}
"correctly cancel a named timer" taggedAs TimingTest in { "correctly cancel a named timer" taggedAs TimingTest in {
fsm ! TestCancelTimer fsm ! TestCancelTimer
within(500 millis) { within(500 millis) {
@ -106,8 +118,8 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender {
} }
"notify unhandled messages" taggedAs TimingTest in { "notify unhandled messages" taggedAs TimingTest in {
filterEvents(EventFilter.warning("unhandled event Tick in state TestUnhandled", source = fsm.toString, occurrences = 1), filterEvents(EventFilter.warning("unhandled event Tick in state TestUnhandled", source = fsm.path.toString, occurrences = 1),
EventFilter.warning("unhandled event Unhandled(test) in state TestUnhandled", source = fsm.toString, occurrences = 1)) { EventFilter.warning("unhandled event Unhandled(test) in state TestUnhandled", source = fsm.path.toString, occurrences = 1)) {
fsm ! TestUnhandled fsm ! TestUnhandled
within(1 second) { within(1 second) {
fsm ! Tick fsm ! Tick
@ -142,6 +154,7 @@ object FSMTimingSpec {
case object TestStateTimeout extends State case object TestStateTimeout extends State
case object TestStateTimeoutOverride extends State case object TestStateTimeoutOverride extends State
case object TestSingleTimer extends State case object TestSingleTimer extends State
case object TestSingleTimerResubmit extends State
case object TestRepeatedTimer extends State case object TestRepeatedTimer extends State
case object TestUnhandled extends State case object TestUnhandled extends State
case object TestCancelTimer extends State case object TestCancelTimer extends State
@ -179,6 +192,13 @@ object FSMTimingSpec {
tester ! Tick tester ! Tick
goto(Initial) goto(Initial)
} }
onTransition {
case Initial -> TestSingleTimerResubmit setTimer("blah", Tick, 500 millis, false)
}
when(TestSingleTimerResubmit) {
case Event(Tick, _) tester ! Tick; setTimer("blah", Tock, 500 millis, false)
case Event(Tock, _) tester ! Tock; goto(Initial)
}
when(TestCancelTimer) { when(TestCancelTimer) {
case Event(Tick, _) case Event(Tick, _)
setTimer("hallo", Tock, 1 milli, false) setTimer("hallo", Tock, 1 milli, false)

View file

@ -86,9 +86,5 @@ class ReceiveTimeoutSpec extends AkkaSpec {
intercept[TimeoutException] { Await.ready(timeoutLatch, 1 second) } intercept[TimeoutException] { Await.ready(timeoutLatch, 1 second) }
system.stop(timeoutActor) system.stop(timeoutActor)
} }
"have ReceiveTimeout eq to Actors ReceiveTimeout" in {
akka.actor.Actors.receiveTimeout must be theSameInstanceAs (ReceiveTimeout)
}
} }
} }

View file

@ -428,6 +428,31 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
} }
} }
"be able to serialize and deserialize proxies" in {
import java.io._
JavaSerializer.currentSystem.withValue(system.asInstanceOf[ExtendedActorSystem]) {
val t = newFooBar(Duration(2, "s"))
t.optionPigdog() must be === Some("Pigdog")
val baos = new ByteArrayOutputStream(8192 * 4)
val out = new ObjectOutputStream(baos)
out.writeObject(t)
out.close()
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
val tNew = in.readObject().asInstanceOf[Foo]
tNew must be === t
tNew.optionPigdog() must be === Some("Pigdog")
mustStop(t)
}
}
"be able to override lifecycle callbacks" in { "be able to override lifecycle callbacks" in {
val latch = new CountDownLatch(16) val latch = new CountDownLatch(16)
val ta = TypedActor(system) val ta = TypedActor(system)

View file

@ -1,145 +0,0 @@
package akka.dispatch
import Future.flow
import akka.util.cps._
import akka.util.Timeout
import akka.util.duration._
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
"A PromiseStream" must {
"work" in {
val a, b, c = Promise[Int]()
val q = PromiseStream[Int]()
flow { q << (1, 2, 3) }
flow {
a << q()
b << q
c << q()
}
assert(Await.result(a, timeout.duration) === 1)
assert(Await.result(b, timeout.duration) === 2)
assert(Await.result(c, timeout.duration) === 3)
}
"pend" in {
val a, b, c = Promise[Int]()
val q = PromiseStream[Int]()
flow {
a << q
b << q()
c << q
}
flow { q <<< List(1, 2, 3) }
assert(Await.result(a, timeout.duration) === 1)
assert(Await.result(b, timeout.duration) === 2)
assert(Await.result(c, timeout.duration) === 3)
}
"pend again" in {
val a, b, c, d = Promise[Int]()
val q1, q2 = PromiseStream[Int]()
val oneTwo = Future(List(1, 2))
flow {
a << q2
b << q2
q1 << 3 << 4
}
flow {
q2 <<< oneTwo
c << q1
d << q1
}
assert(Await.result(a, timeout.duration) === 1)
assert(Await.result(b, timeout.duration) === 2)
assert(Await.result(c, timeout.duration) === 3)
assert(Await.result(d, timeout.duration) === 4)
}
"enque" in {
val q = PromiseStream[Int]()
val a = q.dequeue()
val b = q.dequeue()
val c, d = Promise[Int]()
flow {
c << q
d << q
}
q ++= List(1, 2, 3, 4)
assert(Await.result(a, timeout.duration) === 1)
assert(Await.result(b, timeout.duration) === 2)
assert(Await.result(c, timeout.duration) === 3)
assert(Await.result(d, timeout.duration) === 4)
}
"map" in {
val qs = PromiseStream[String]()
val qi = qs.map(_.length)
val a, c = Promise[Int]()
val b = Promise[String]()
flow {
a << qi
b << qs
c << qi
}
flow {
qs << ("Hello", "World!", "Test")
}
assert(Await.result(a, timeout.duration) === 5)
assert(Await.result(b, timeout.duration) === "World!")
assert(Await.result(c, timeout.duration) === 4)
}
"map futures" in {
val q = PromiseStream[String]()
flow {
q << (Future("a"), Future("b"), Future("c"))
}
val a, b, c = q.dequeue
Await.result(a, timeout.duration) must be("a")
Await.result(b, timeout.duration) must be("b")
Await.result(c, timeout.duration) must be("c")
}
"not fail under concurrent stress" in {
implicit val timeout = Timeout(60 seconds)
val q = PromiseStream[Long](timeout.duration.toMillis)
flow {
var n = 0L
repeatC(50000) {
n += 1
q << n
}
}
val future = Future sequence {
List.fill(10) {
flow {
var total = 0L
repeatC(10000) {
val n = q()
total += n
}
total
}
}
} map (_.sum)
flow {
var n = 50000L
repeatC(50000) {
n += 1
q << n
}
}
assert(Await.result(future, timeout.duration) === (1L to 100000L).sum)
}
}
}

View file

@ -1,51 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor;
/**
* JAVA API for - creating actors, - creating remote actors, - locating actors
*/
public class Actors {
/**
* The message that is sent when an Actor gets a receive timeout.
*
* <pre>
* if (message == receiveTimeout()) {
* // Timed out
* }
* </pre>
*
* @return the single instance of ReceiveTimeout
*/
public final static ReceiveTimeout$ receiveTimeout() {
return ReceiveTimeout$.MODULE$;
}
/**
* The message that when sent to an Actor kills it by throwing an exception.
*
* <pre>
* actor.tell(kill());
* </pre>
*
* @return the single instance of Kill
*/
public final static Kill$ kill() {
return Kill$.MODULE$;
}
/**
* The message that when sent to an Actor shuts it down by calling 'stop'.
*
* <pre>
* actor.tell(poisonPill());
* </pre>
*
* @return the single instance of PoisonPill
*/
public final static PoisonPill$ poisonPill() {
return PoisonPill$.MODULE$;
}
}

View file

@ -509,7 +509,14 @@ private[akka] class ActorCell(
checkReceiveTimeout checkReceiveTimeout
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
} catch { } catch {
case NonFatal(e) throw ActorInitializationException(self, "exception during creation", e) case NonFatal(i: InstantiationException)
throw ActorInitializationException(self,
"""exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either,
a non-static inner class (in which case make it a static inner class or use Props(new ...) or Props( new UntypedActorFactory ... )
or is missing an appropriate, reachable no-args constructor.
""", i)
case NonFatal(e)
throw ActorInitializationException(self, "exception during creation", e)
} }
} }

View file

@ -436,11 +436,27 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf
protected def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = protected def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler =
new Thread.UncaughtExceptionHandler() { new Thread.UncaughtExceptionHandler() {
def uncaughtException(thread: Thread, cause: Throwable): Unit = { def uncaughtException(thread: Thread, cause: Throwable): Unit = {
log.error(cause, "Uncaught error from thread [{}]", thread.getName)
cause match { cause match {
case NonFatal(_) | _: InterruptedException case NonFatal(_) | _: InterruptedException log.error(cause, "Uncaught error from thread [{}]", thread.getName)
case _ if settings.JvmExitOnFatalError System.exit(-1) case _
case _ shutdown() if (settings.JvmExitOnFatalError) {
try {
log.error(cause, "Uncaught error from thread [{}] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled", thread.getName)
import System.err
err.print("Uncaught error from thread [")
err.print(thread.getName)
err.print("] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[")
err.print(name)
err.println("]")
cause.printStackTrace(System.err)
System.err.flush()
} finally {
System.exit(-1)
}
} else {
log.error(cause, "Uncaught fatal error from thread [{}] shutting down ActorSystem [{}]", thread.getName, name)
shutdown()
}
} }
} }
} }

View file

@ -172,7 +172,7 @@ object FSM {
* timerActive_? ("tock") * timerActive_? ("tock")
* </pre> * </pre>
*/ */
trait FSM[S, D] extends Listeners { trait FSM[S, D] extends Listeners with ActorLogging {
this: Actor this: Actor
import FSM._ import FSM._
@ -186,8 +186,6 @@ trait FSM[S, D] extends Listeners {
val -> = FSM.-> val -> = FSM.->
val StateTimeout = FSM.StateTimeout val StateTimeout = FSM.StateTimeout
val log = Logging(context.system, this)
/** /**
* **************************************** * ****************************************
* DSL * DSL
@ -255,6 +253,13 @@ trait FSM[S, D] extends Listeners {
*/ */
protected final def stop(reason: Reason, stateData: D): State = stay using stateData withStopReason (reason) protected final def stop(reason: Reason, stateData: D): State = stay using stateData withStopReason (reason)
protected final class TransformHelper(func: StateFunction) {
def using(andThen: PartialFunction[State, State]): StateFunction =
func andThen (andThen orElse { case x x })
}
protected final def transform(func: StateFunction): TransformHelper = new TransformHelper(func)
/** /**
* Schedule named timer to deliver message after given delay, possibly repeating. * Schedule named timer to deliver message after given delay, possibly repeating.
* @param name identifier to be used with cancelTimer() * @param name identifier to be used with cancelTimer()
@ -327,7 +332,7 @@ trait FSM[S, D] extends Listeners {
* Convenience wrapper for using a total function instead of a partial * Convenience wrapper for using a total function instead of a partial
* function literal. To be used with onTransition. * function literal. To be used with onTransition.
*/ */
implicit protected final def total2pf(transitionHandler: (S, S) Unit) = implicit protected final def total2pf(transitionHandler: (S, S) Unit): TransitionHandler =
new TransitionHandler { new TransitionHandler {
def isDefinedAt(in: (S, S)) = true def isDefinedAt(in: (S, S)) = true
def apply(in: (S, S)) { transitionHandler(in._1, in._2) } def apply(in: (S, S)) { transitionHandler(in._1, in._2) }
@ -336,7 +341,7 @@ trait FSM[S, D] extends Listeners {
/** /**
* Set handler which is called upon termination of this FSM actor. * Set handler which is called upon termination of this FSM actor.
*/ */
protected final def onTermination(terminationHandler: PartialFunction[StopEvent[S, D], Unit]): Unit = protected final def onTermination(terminationHandler: PartialFunction[StopEvent, Unit]): Unit =
terminateEvent = terminationHandler terminateEvent = terminationHandler
/** /**
@ -415,7 +420,7 @@ trait FSM[S, D] extends Listeners {
/* /*
* termination handling * termination handling
*/ */
private var terminateEvent: PartialFunction[StopEvent[S, D], Unit] = NullFunction private var terminateEvent: PartialFunction[StopEvent, Unit] = NullFunction
/* /*
* transition handling * transition handling
@ -443,10 +448,10 @@ trait FSM[S, D] extends Listeners {
timeoutFuture = None timeoutFuture = None
} }
generation += 1 generation += 1
processMsg(msg, t)
if (!repeat) { if (!repeat) {
timers -= name timers -= name
} }
processMsg(msg, t)
} }
case SubscribeTransitionCallBack(actorRef) case SubscribeTransitionCallBack(actorRef)
// TODO use DeathWatch to clean up list // TODO use DeathWatch to clean up list
@ -538,7 +543,7 @@ trait FSM[S, D] extends Listeners {
case class Event(event: Any, stateData: D) case class Event(event: Any, stateData: D)
case class StopEvent[S, D](reason: Reason, currentState: S, stateData: D) case class StopEvent(reason: Reason, currentState: S, stateData: D)
} }
/** /**

View file

@ -43,14 +43,14 @@ object Props {
* Scala API. * Scala API.
*/ */
def apply[T <: Actor: ClassManifest]: Props = def apply[T <: Actor: ClassManifest]: Props =
default.withCreator(implicitly[ClassManifest[T]].erasure.asInstanceOf[Class[_ <: Actor]].newInstance) default.withCreator(implicitly[ClassManifest[T]].erasure.asInstanceOf[Class[_ <: Actor]])
/** /**
* Returns a Props that has default values except for "creator" which will be a function that creates an instance * Returns a Props that has default values except for "creator" which will be a function that creates an instance
* of the supplied class using the default constructor. * of the supplied class using the default constructor.
*/ */
def apply(actorClass: Class[_ <: Actor]): Props = def apply(actorClass: Class[_ <: Actor]): Props =
default.withCreator(actorClass.newInstance) default.withCreator(actorClass)
/** /**
* Returns a Props that has default values except for "creator" which will be a function that creates an instance * Returns a Props that has default values except for "creator" which will be a function that creates an instance
@ -70,7 +70,6 @@ object Props {
def apply(behavior: ActorContext Actor.Receive): Props = def apply(behavior: ActorContext Actor.Receive): Props =
apply(new Actor { def receive = behavior(context) }) apply(new Actor { def receive = behavior(context) })
} }
/** /**

View file

@ -8,12 +8,14 @@ import akka.japi.{ Creator, Option ⇒ JOption }
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy } import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import akka.util.{ Timeout, NonFatal } import akka.util.{ Timeout, NonFatal }
import java.util.concurrent.atomic.{ AtomicReference AtomVar } import java.util.concurrent.atomic.{ AtomicReference AtomVar }
import akka.serialization.{ Serialization, SerializationExtension }
import akka.dispatch._ import akka.dispatch._
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import java.lang.IllegalStateException import java.lang.IllegalStateException
import akka.util.Duration import akka.util.Duration
import akka.actor.TypedActor.TypedActorInvocationHandler
import akka.serialization.{ JavaSerializer, Serialization, SerializationExtension }
import java.io.ObjectStreamException
trait TypedActorFactory { trait TypedActorFactory {
@ -124,7 +126,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
} }
} catch { case i: InvocationTargetException throw i.getTargetException } } catch { case i: InvocationTargetException throw i.getTargetException }
private def writeReplace(): AnyRef = parameters match { @throws(classOf[ObjectStreamException]) private def writeReplace(): AnyRef = parameters match {
case null SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, null) case null SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, null)
case ps if ps.length == 0 SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, Array()) case ps if ps.length == 0 SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, Array())
case ps case ps
@ -148,7 +150,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
//TODO implement writeObject and readObject to serialize //TODO implement writeObject and readObject to serialize
//TODO Possible optimization is to special encode the parameter-types to conserve space //TODO Possible optimization is to special encode the parameter-types to conserve space
private def readResolve(): AnyRef = { @throws(classOf[ObjectStreamException]) private def readResolve(): AnyRef = {
val system = akka.serialization.JavaSerializer.currentSystem.value val system = akka.serialization.JavaSerializer.currentSystem.value
if (system eq null) throw new IllegalStateException( if (system eq null) throw new IllegalStateException(
"Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." + "Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." +
@ -369,9 +371,8 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
def postRestart(reason: Throwable): Unit def postRestart(reason: Throwable): Unit
} }
private[akka] class TypedActorInvocationHandler(val extension: TypedActorExtension, val actorVar: AtomVar[ActorRef], val timeout: Timeout) extends InvocationHandler { private[akka] class TypedActorInvocationHandler(@transient val extension: TypedActorExtension, @transient val actorVar: AtomVar[ActorRef], @transient val timeout: Timeout) extends InvocationHandler with Serializable {
def actor = actorVar.get def actor = actorVar.get
@throws(classOf[Throwable]) @throws(classOf[Throwable])
def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match { def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match {
case "toString" actor.toString case "toString" actor.toString
@ -392,6 +393,17 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
case m Await.result(ask(actor, m)(timeout), timeout.duration).asInstanceOf[AnyRef] case m Await.result(ask(actor, m)(timeout), timeout.duration).asInstanceOf[AnyRef]
} }
} }
@throws(classOf[ObjectStreamException]) private def writeReplace(): AnyRef = SerializedTypedActorInvocationHandler(actor, timeout.duration)
}
private[akka] case class SerializedTypedActorInvocationHandler(val actor: ActorRef, val timeout: Duration) {
@throws(classOf[ObjectStreamException]) private def readResolve(): AnyRef = JavaSerializer.currentSystem.value match {
case null throw new IllegalStateException("SerializedTypedActorInvocationHandler.readResolve requires that JavaSerializer.currentSystem.value is set to a non-null value")
case some toTypedActorInvocationHandler(some)
}
def toTypedActorInvocationHandler(system: ActorSystem): TypedActorInvocationHandler =
new TypedActorInvocationHandler(TypedActor(system), new AtomVar[ActorRef](actor), new Timeout(timeout))
} }
} }

View file

@ -819,21 +819,6 @@ trait Promise[T] extends Future[T] {
} }
fr fr
} }
final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] Future[Any])
val fr = Promise[Any]()
val f = stream.dequeue(this)
f.onComplete { _
try {
fr completeWith cont(f)
} catch {
case NonFatal(e)
executor.reportFailure(new LogEventException(Debug("Future", getClass, e.getMessage), e))
fr failure e
}
}
fr
}
} }
//Companion object to FState, just to provide a cheap, immutable default entry //Companion object to FState, just to provide a cheap, immutable default entry

View file

@ -1,260 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.dispatch
import java.util.concurrent.atomic.AtomicReference
import scala.util.continuations._
import scala.annotation.tailrec
import akka.util.Timeout
object PromiseStream {
def apply[A]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): PromiseStream[A] = new PromiseStream[A]
def apply[A](timeout: Long)(implicit dispatcher: MessageDispatcher): PromiseStream[A] = new PromiseStream[A]()(dispatcher, Timeout(timeout))
private sealed trait State
private case object Normal extends State
private case object Pending extends State
private case object Busy extends State
}
trait PromiseStreamOut[A] {
self
def dequeue(): Future[A]
def dequeue(promise: Promise[A]): Future[A]
def apply(): A @cps[Future[Any]]
def apply(promise: Promise[A]): A @cps[Future[Any]]
final def map[B](f: (A) B)(implicit timeout: Timeout): PromiseStreamOut[B] = new PromiseStreamOut[B] {
def dequeue(): Future[B] = self.dequeue().map(f)
def dequeue(promise: Promise[B]): Future[B] = self.dequeue().flatMap(a promise.complete(Right(f(a))))
def apply(): B @cps[Future[Any]] = this.dequeue().apply()
def apply(promise: Promise[B]): B @cps[Future[Any]] = this.dequeue(promise).apply()
}
}
trait PromiseStreamIn[A] {
def enqueue(elem: A): Unit
final def enqueue(elem1: A, elem2: A, elems: A*): Unit =
this += elem1 += elem2 ++= elems
final def enqueue(elem: Future[A]): Unit =
elem foreach (enqueue(_))
final def enqueue(elem1: Future[A], elem2: Future[A], elems: Future[A]*) {
this += elem1 += elem2
elems foreach (enqueue(_))
}
final def +=(elem: A): this.type = {
enqueue(elem)
this
}
final def +=(elem1: A, elem2: A, elems: A*): this.type = {
enqueue(elem1, elem2, elems: _*)
this
}
final def +=(elem: Future[A]): this.type = {
enqueue(elem)
this
}
final def +=(elem1: Future[A], elem2: Future[A], elems: Future[A]*): this.type = {
enqueue(elem1, elem2, elems: _*)
this
}
final def ++=(elem: Traversable[A]): this.type = {
elem foreach enqueue
this
}
final def ++=(elem: Future[Traversable[A]]): this.type = {
elem foreach (this ++= _)
this
}
def <<(elem: A): PromiseStreamIn[A] @cps[Future[Any]]
def <<(elem1: A, elem2: A, elems: A*): PromiseStreamIn[A] @cps[Future[Any]]
def <<(elem: Future[A]): PromiseStreamIn[A] @cps[Future[Any]]
def <<(elem1: Future[A], elem2: Future[A], elems: Future[A]*): PromiseStreamIn[A] @cps[Future[Any]]
def <<<(elems: Traversable[A]): PromiseStreamIn[A] @cps[Future[Any]]
def <<<(elems: Future[Traversable[A]]): PromiseStreamIn[A] @cps[Future[Any]]
}
class PromiseStream[A](implicit val dispatcher: MessageDispatcher, val timeout: Timeout) extends PromiseStreamOut[A] with PromiseStreamIn[A] {
import PromiseStream.{ State, Normal, Pending, Busy }
private val _elemOut: AtomicReference[List[A]] = new AtomicReference(Nil)
private val _elemIn: AtomicReference[List[A]] = new AtomicReference(Nil)
private val _pendOut: AtomicReference[List[Promise[A]]] = new AtomicReference(null)
private val _pendIn: AtomicReference[List[Promise[A]]] = new AtomicReference(null)
private val _state: AtomicReference[State] = new AtomicReference(Normal)
@tailrec
final def apply(): A @cps[Future[Any]] =
if (_state.get eq Normal) {
val eo = _elemOut.get
if (eo eq null) apply()
else {
if (eo.nonEmpty) {
if (_elemOut.compareAndSet(eo, eo.tail)) shift { cont: (A Future[Any]) cont(eo.head) }
else apply()
} else apply(Promise[A])
}
} else apply(Promise[A])
final def apply(promise: Promise[A]): A @cps[Future[Any]] =
shift { cont: (A Future[Any]) dequeue(promise) flatMap cont }
@tailrec
final def enqueue(elem: A): Unit = _state.get match {
case Normal
val ei = _elemIn.get
if (ei eq null) enqueue(elem)
else if (!_elemIn.compareAndSet(ei, elem :: ei)) enqueue(elem)
case Pending
val po = _pendOut.get
if (po eq null) enqueue(elem)
else {
if (po.isEmpty) {
if (_state.compareAndSet(Pending, Busy)) {
var nextState: State = Pending
try {
val pi = _pendIn.get
if (pi ne null) {
if (pi.isEmpty) {
if (_pendIn.compareAndSet(Nil, null)) {
if (_pendOut.compareAndSet(Nil, null)) {
_elemIn.set(Nil)
_elemOut.set(List(elem))
nextState = Normal
} else {
_pendIn.set(Nil)
}
}
} else {
if (_pendOut.get eq Nil) _pendOut.set(_pendIn.getAndSet(Nil).reverse)
}
}
} finally {
_state.set(nextState)
}
if (nextState eq Pending) enqueue(elem)
} else enqueue(elem)
} else {
if (_pendOut.compareAndSet(po, po.tail)) {
po.head success elem
if (!po.head.isCompleted) enqueue(elem)
} else enqueue(elem)
}
}
case Busy
enqueue(elem)
}
@tailrec
final def dequeue(): Future[A] =
if (_state.get eq Normal) {
val eo = _elemOut.get
if (eo eq null) dequeue()
else {
if (eo.nonEmpty) {
if (_elemOut.compareAndSet(eo, eo.tail)) Promise.successful(eo.head)
else dequeue()
} else dequeue(Promise[A])
}
} else dequeue(Promise[A])
@tailrec
final def dequeue(promise: Promise[A]): Future[A] = _state.get match {
case Pending
val pi = _pendIn.get
if ((pi ne null) && _pendIn.compareAndSet(pi, promise :: pi)) promise else dequeue(promise)
case Normal
val eo = _elemOut.get
if (eo eq null) dequeue(promise)
else {
if (eo.isEmpty) {
if (_state.compareAndSet(Normal, Busy)) {
var nextState: State = Normal
try {
val ei = _elemIn.get
if (ei ne null) {
if (ei.isEmpty) {
if (_elemIn.compareAndSet(Nil, null)) {
if (_elemOut.compareAndSet(Nil, null)) {
_pendIn.set(Nil)
_pendOut.set(List(promise))
nextState = Pending
} else {
_elemIn.set(Nil)
}
}
} else {
if (_elemOut.get eq Nil) _elemOut.set(_elemIn.getAndSet(Nil).reverse)
}
}
} finally {
_state.set(nextState)
}
if (nextState eq Normal) dequeue(promise)
else promise
} else dequeue(promise)
} else {
if (_elemOut.compareAndSet(eo, eo.tail)) {
promise success eo.head
} else dequeue(promise)
}
}
case Busy
dequeue(promise)
}
final def <<(elem: A): PromiseStream[A] @cps[Future[Any]] =
shift { cont: (PromiseStream[A] Future[Any]) cont(this += elem) }
final def <<(elem1: A, elem2: A, elems: A*): PromiseStream[A] @cps[Future[Any]] =
shift { cont: (PromiseStream[A] Future[Any]) cont(this += (elem1, elem2, elems: _*)) }
final def <<(elem: Future[A]): PromiseStream[A] @cps[Future[Any]] =
shift { cont: (PromiseStream[A] Future[Any]) elem map (a cont(this += a)) }
final def <<(elem1: Future[A], elem2: Future[A], elems: Future[A]*): PromiseStream[A] @cps[Future[Any]] =
shift { cont: (PromiseStream[A] Future[Any])
val seq = Future.sequence(elem1 +: elem2 +: elems)
seq map (a cont(this ++= a))
}
final def <<<(elems: Traversable[A]): PromiseStream[A] @cps[Future[Any]] =
shift { cont: (PromiseStream[A] Future[Any]) cont(this ++= elems) }
final def <<<(elems: Future[Traversable[A]]): PromiseStream[A] @cps[Future[Any]] =
shift { cont: (PromiseStream[A] Future[Any]) elems map (as cont(this ++= as)) }
}

View file

@ -35,7 +35,7 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w
anotherAwaiting.verifyActivated() anotherAwaiting.verifyActivated()
} }
"ActivationTracker send activation message even if activation happened earlier" in { "ActivationTracker send activation message even if activation happened earlier" taggedAs TimingTest in {
publish(EndpointActivated(actor.ref)) publish(EndpointActivated(actor.ref))
Thread.sleep(50) Thread.sleep(50)
awaiting.awaitActivation() awaiting.awaitActivation()
@ -43,7 +43,7 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w
awaiting.verifyActivated() awaiting.verifyActivated()
} }
"ActivationTracker send activation message even if actor is already deactivated" in { "ActivationTracker send activation message even if actor is already deactivated" taggedAs TimingTest in {
publish(EndpointActivated(actor.ref)) publish(EndpointActivated(actor.ref))
publish(EndpointDeActivated(actor.ref)) publish(EndpointDeActivated(actor.ref))
Thread.sleep(50) Thread.sleep(50)
@ -52,7 +52,7 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w
awaiting.verifyActivated() awaiting.verifyActivated()
} }
"ActivationTracker forwards de-activation message to all awaiting parties" in { "ActivationTracker forwards de-activation message to all awaiting parties" taggedAs TimingTest in {
given("Actor is activated") given("Actor is activated")
publish(EndpointActivated(actor.ref)) publish(EndpointActivated(actor.ref))
given("Actor is deactivated") given("Actor is deactivated")
@ -67,7 +67,7 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w
anotherAwaiting.verifyDeActivated() anotherAwaiting.verifyDeActivated()
} }
"ActivationTracker forwards de-activation message even if deactivation happened earlier" in { "ActivationTracker forwards de-activation message even if deactivation happened earlier" taggedAs TimingTest in {
given("Actor is activated") given("Actor is activated")
publish(EndpointActivated(actor.ref)) publish(EndpointActivated(actor.ref))
@ -81,7 +81,7 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w
awaiting.verifyDeActivated() awaiting.verifyDeActivated()
} }
"ActivationTracker forwards de-activation message even if someone awaits de-activation even before activation happens" in { "ActivationTracker forwards de-activation message even if someone awaits de-activation even before activation happens" taggedAs TimingTest in {
given("Someone is awaiting de-activation") given("Someone is awaiting de-activation")
val awaiting = new Awaiting(actor) val awaiting = new Awaiting(actor)
awaiting.awaitDeActivation() awaiting.awaitDeActivation()
@ -96,14 +96,14 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w
awaiting.verifyDeActivated() awaiting.verifyDeActivated()
} }
"ActivationTracker sends activation failure when failed to activate" in { "ActivationTracker sends activation failure when failed to activate" taggedAs TimingTest in {
awaiting.awaitActivation() awaiting.awaitActivation()
publish(EndpointFailedToActivate(actor.ref, cause)) publish(EndpointFailedToActivate(actor.ref, cause))
awaiting.verifyFailedToActivate() awaiting.verifyFailedToActivate()
} }
"ActivationTracker sends de-activation failure when failed to de-activate" in { "ActivationTracker sends de-activation failure when failed to de-activate" taggedAs TimingTest in {
publish(EndpointActivated(actor.ref)) publish(EndpointActivated(actor.ref))
awaiting.awaitDeActivation() awaiting.awaitDeActivation()
publish(EndpointFailedToDeActivate(actor.ref, cause)) publish(EndpointFailedToDeActivate(actor.ref, cause))
@ -111,7 +111,7 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w
awaiting.verifyFailedToDeActivate() awaiting.verifyFailedToDeActivate()
} }
"ActivationTracker sends activation message even if it failed to de-activate" in { "ActivationTracker sends activation message even if it failed to de-activate" taggedAs TimingTest in {
publish(EndpointActivated(actor.ref)) publish(EndpointActivated(actor.ref))
publish(EndpointFailedToDeActivate(actor.ref, cause)) publish(EndpointFailedToDeActivate(actor.ref, cause))
awaiting.awaitActivation() awaiting.awaitActivation()

View file

@ -385,6 +385,12 @@ prints the result and shuts down the ``ActorSystem``.
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java#result-listener .. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java#result-listener
Please note that shutting down the actor system should be done by that part of
the application which can safely determine that everything has been said and
done. In this case, it is the Listener actor, but in other scenarios it might
be the main thread or some other external service. It is by no means required
to call ``system.shutdown()`` from within that system.
Bootstrap the calculation Bootstrap the calculation
------------------------- -------------------------

View file

@ -417,6 +417,12 @@ prints the result and shuts down the ``ActorSystem``.
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/scala/akka/tutorial/first/scala/Pi.scala#result-listener .. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/scala/akka/tutorial/first/scala/Pi.scala#result-listener
Please note that shutting down the actor system should be done by that part of
the application which can safely determine that everything has been said and
done. In this case, it is the Listener actor, but in other scenarios it might
be the main thread or some other external service. It is by no means required
to call ``system.shutdown()`` from within that system.
Bootstrap the calculation Bootstrap the calculation
========================= =========================

View file

@ -5,7 +5,7 @@ package akka.docs.actor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import static akka.actor.Actors.*; import akka.actor.PoisonPill;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
//#context-actorOf //#context-actorOf
@ -16,6 +16,6 @@ public class FirstUntypedActor extends UntypedActor {
public void onReceive(Object message) { public void onReceive(Object message) {
myActor.forward(message, getContext()); myActor.forward(message, getContext());
myActor.tell(poisonPill()); myActor.tell(PoisonPill.getInstance());
} }
} }

View file

@ -4,7 +4,6 @@
package akka.docs.actor; package akka.docs.actor;
//#receive-timeout //#receive-timeout
import akka.actor.Actors;
import akka.actor.ReceiveTimeout; import akka.actor.ReceiveTimeout;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import akka.util.Duration; import akka.util.Duration;
@ -18,7 +17,7 @@ public class MyReceivedTimeoutUntypedActor extends UntypedActor {
public void onReceive(Object message) { public void onReceive(Object message) {
if (message.equals("Hello")) { if (message.equals("Hello")) {
getSender().tell("Hello world"); getSender().tell("Hello world");
} else if (message == Actors.receiveTimeout()) { } else if (message == ReceiveTimeout.getInstance()) {
throw new RuntimeException("received timeout"); throw new RuntimeException("received timeout");
} else { } else {
unhandled(message); unhandled(message);

View file

@ -19,7 +19,8 @@ import akka.util.Timeout;
//#import-future //#import-future
//#import-actors //#import-actors
import static akka.actor.Actors.*; import akka.actor.PoisonPill;
import akka.actor.Kill;
//#import-actors //#import-actors
//#import-procedure //#import-procedure
@ -158,7 +159,7 @@ public class UntypedActorDocTestBase {
ActorSystem system = ActorSystem.create("MySystem"); ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class)); ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class));
//#poison-pill //#poison-pill
myActor.tell(poisonPill()); myActor.tell(PoisonPill.getInstance());
//#poison-pill //#poison-pill
system.shutdown(); system.shutdown();
} }
@ -168,7 +169,7 @@ public class UntypedActorDocTestBase {
ActorSystem system = ActorSystem.create("MySystem"); ActorSystem system = ActorSystem.create("MySystem");
ActorRef victim = system.actorOf(new Props(MyUntypedActor.class)); ActorRef victim = system.actorOf(new Props(MyUntypedActor.class));
//#kill //#kill
victim.tell(kill()); victim.tell(Kill.getInstance());
//#kill //#kill
system.shutdown(); system.shutdown();
} }

View file

@ -72,7 +72,7 @@ public class FaultHandlingDocSample {
log.info("That's all, shutting down"); log.info("That's all, shutting down");
getContext().system().shutdown(); getContext().system().shutdown();
} }
} else if (msg == Actors.receiveTimeout()) { } else if (msg == ReceiveTimeout.getInstance()) {
// No progress within 15 seconds, ServiceUnavailable // No progress within 15 seconds, ServiceUnavailable
log.error("Shutting down due to unavailable service"); log.error("Shutting down due to unavailable service");
getContext().system().shutdown(); getContext().system().shutdown();

View file

@ -4,27 +4,22 @@
package akka.docs.dispatcher; package akka.docs.dispatcher;
//#imports //#imports
import akka.actor.*;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import akka.dispatch.MessageDispatcher; import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
//#imports //#imports
//#imports-prio //#imports-prio
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.actor.Actors;
import akka.event.Logging; import akka.event.Logging;
import akka.event.LoggingAdapter; import akka.event.LoggingAdapter;
//#imports-prio //#imports-prio
//#imports-prio-mailbox //#imports-prio-mailbox
import akka.actor.ActorContext;
import akka.dispatch.PriorityGenerator; import akka.dispatch.PriorityGenerator;
import akka.dispatch.UnboundedPriorityMailbox; import akka.dispatch.UnboundedPriorityMailbox;
import akka.dispatch.MailboxType;
import akka.dispatch.MessageQueue;
import com.typesafe.config.Config; import com.typesafe.config.Config;
//#imports-prio-mailbox //#imports-prio-mailbox
@ -37,7 +32,6 @@ import static org.junit.Assert.*;
import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigFactory;
import akka.actor.ActorSystem;
import akka.docs.actor.MyUntypedActor; import akka.docs.actor.MyUntypedActor;
import akka.docs.actor.UntypedActorDocTestBase.MyActor; import akka.docs.actor.UntypedActorDocTestBase.MyActor;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;
@ -93,7 +87,7 @@ public class DispatcherDocTestBase {
getSelf().tell("pigdog2"); getSelf().tell("pigdog2");
getSelf().tell("pigdog3"); getSelf().tell("pigdog3");
getSelf().tell("highpriority"); getSelf().tell("highpriority");
getSelf().tell(Actors.poisonPill()); getSelf().tell(PoisonPill.getInstance());
} }
public void onReceive(Object message) { public void onReceive(Object message) {
@ -133,7 +127,7 @@ public class DispatcherDocTestBase {
return 0; // 'highpriority messages should be treated first if possible return 0; // 'highpriority messages should be treated first if possible
else if (message.equals("lowpriority")) else if (message.equals("lowpriority"))
return 2; // 'lowpriority messages should be treated last if possible return 2; // 'lowpriority messages should be treated last if possible
else if (message.equals(Actors.poisonPill())) else if (message.equals(PoisonPill.getInstance()))
return 3; // PoisonPill when no other left return 3; // PoisonPill when no other left
else else
return 1; // By default they go between high and low prio return 1; // By default they go between high and low prio

View file

@ -5,6 +5,8 @@
Camel Camel
####### #######
Additional Resources
====================
For an introduction to akka-camel 2, see also the Peter Gabryanczyk's talk `Migrating akka-camel module to Akka 2.x`_. For an introduction to akka-camel 2, see also the Peter Gabryanczyk's talk `Migrating akka-camel module to Akka 2.x`_.
For an introduction to akka-camel 1, see also the `Appendix E - Akka and Camel`_ For an introduction to akka-camel 1, see also the `Appendix E - Akka and Camel`_
@ -32,7 +34,9 @@ actor API, actors can now exchange messages with other systems over large number
of protocols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a of protocols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a
few. At the moment, approximately 80 protocols and APIs are supported. few. At the moment, approximately 80 protocols and APIs are supported.
The akka-camel module is based on `Apache Camel`_, a powerful and leight-weight Apache Camel
------------
The akka-camel module is based on `Apache Camel`_, a powerful and light-weight
integration framework for the JVM. For an introduction to Apache Camel you may integration framework for the JVM. For an introduction to Apache Camel you may
want to read this `Apache Camel article`_. Camel comes with a want to read this `Apache Camel article`_. Camel comes with a
large number of `components`_ that provide bindings to different protocols and large number of `components`_ that provide bindings to different protocols and
@ -43,6 +47,8 @@ APIs. The `camel-extra`_ project provides further components.
.. _components: http://camel.apache.org/components.html .. _components: http://camel.apache.org/components.html
.. _camel-extra: http://code.google.com/p/camel-extra/ .. _camel-extra: http://code.google.com/p/camel-extra/
Consumer
--------
Usage of Camel's integration components in Akka is essentially a Usage of Camel's integration components in Akka is essentially a
one-liner. Here's an example. one-liner. Here's an example.
@ -60,16 +66,20 @@ component`_), only the actor's endpointUri method must be changed.
.. includecode:: code/akka/docs/camel/Introduction.scala#Consumer .. includecode:: code/akka/docs/camel/Introduction.scala#Consumer
Producer
--------
Actors can also trigger message exchanges with external systems i.e. produce to Actors can also trigger message exchanges with external systems i.e. produce to
Camel endpoints. Camel endpoints.
.. includecode:: code/akka/docs/camel/Introduction.scala .. includecode:: code/akka/docs/camel/Introduction.scala
:include: imports,Producer :include: imports,Producer
In the above example, any message sent to this actor will be added (produced) to In the above example, any message sent to this actor will be sent to
the example JMS queue. Producer actors may choose from the same set of Camel the JMS queue ``orders``. Producer actors may choose from the same set of Camel
components as Consumer actors do. components as Consumer actors do.
CamelMessage
------------
The number of Camel components is constantly increasing. The akka-camel module The number of Camel components is constantly increasing. The akka-camel module
can support these in a plug-and-play manner. Just add them to your application's can support these in a plug-and-play manner. Just add them to your application's
classpath, define a component-specific endpoint URI and use it to exchange classpath, define a component-specific endpoint URI and use it to exchange
@ -83,3 +93,66 @@ representations which are used by Consumer and Producer actors for pattern
matching, transformation, serialization or storage. matching, transformation, serialization or storage.
__ https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Message.java __ https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Message.java
Dependencies
============
SBT
---
.. code-block:: scala
"com.typesafe.akka" % "akka-camel" % "2.1-SNAPSHOT"
Maven
-----
.. code-block:: xml
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-camel</artifactId>
<version>2.1-SNAPSHOT</version>
</dependency>
.. _camel-consumer-actors:
Consumer Actors
================
For objects to receive messages, they must mixin the `Consumer`_
trait. For example, the following actor class (Consumer1) implements the
endpointUri method, which is declared in the Consumer trait, in order to receive
messages from the ``file:data/input/actor`` Camel endpoint.
.. _Consumer: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/Consumer.scala
.. includecode:: code/akka/docs/camel/Consumers.scala#Consumer1
Whenever a file is put into the data/input/actor directory, its content is
picked up by the Camel `file component`_ and sent as message to the
actor. Messages consumed by actors from Camel endpoints are of type
`CamelMessage`_. These are immutable representations of Camel messages.
.. _file component: http://camel.apache.org/file2.html
.. _Message: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/CamelMessage.scala
Here's another example that sets the endpointUri to
``jetty:http://localhost:8877/camel/default``. It causes Camel's `Jetty
component`_ to start an embedded `Jetty`_ server, accepting HTTP connections
from localhost on port 8877.
.. _Jetty component: http://camel.apache.org/jetty.html
.. _Jetty: http://www.eclipse.org/jetty/
.. includecode:: code/akka/docs/camel/Consumers.scala#Consumer2
After starting the actor, clients can send messages to that actor by POSTing to
``http://localhost:8877/camel/default``. The actor sends a response by using the
self.reply method (Scala). For returning a message body and headers to the HTTP
client the response type should be `Message`_. For any other response type, a
new Message object is created by akka-camel with the actor response as message
body.
.. _Message: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/CamelMessage.scala

View file

@ -37,11 +37,15 @@ class FSMDocSpec extends AkkaSpec {
//#simple-fsm //#simple-fsm
class Buncher extends Actor with FSM[State, Data] { class Buncher extends Actor with FSM[State, Data] {
//#fsm-body
startWith(Idle, Uninitialized) startWith(Idle, Uninitialized)
//#when-syntax
when(Idle) { when(Idle) {
case Event(SetTarget(ref), Uninitialized) stay using Todo(ref, Vector.empty) case Event(SetTarget(ref), Uninitialized)
stay using Todo(ref, Vector.empty)
} }
//#when-syntax
//#transition-elided //#transition-elided
onTransition { onTransition {
@ -51,10 +55,13 @@ class FSMDocSpec extends AkkaSpec {
} }
} }
//#transition-elided //#transition-elided
//#when-syntax
when(Active, stateTimeout = 1 second) { when(Active, stateTimeout = 1 second) {
case Event(Flush | FSM.StateTimeout, t: Todo) goto(Idle) using t.copy(queue = Vector.empty) case Event(Flush | StateTimeout, t: Todo)
goto(Idle) using t.copy(queue = Vector.empty)
} }
//#when-syntax
//#unhandled-elided //#unhandled-elided
whenUnhandled { whenUnhandled {
@ -67,10 +74,116 @@ class FSMDocSpec extends AkkaSpec {
stay stay
} }
//#unhandled-elided //#unhandled-elided
//#fsm-body
initialize initialize
} }
//#simple-fsm //#simple-fsm
object DemoCode {
trait StateType
case object SomeState extends StateType
case object Processing extends StateType
case object Error extends StateType
case object Idle extends StateType
case object Active extends StateType
class Dummy extends Actor with FSM[StateType, Int] {
class X
val newData = 42
object WillDo
object Tick
//#modifier-syntax
when(SomeState) {
case Event(msg, _)
goto(Processing) using (newData) forMax (5 seconds) replying (WillDo)
}
//#modifier-syntax
//#transition-syntax
onTransition {
case Idle -> Active setTimer("timeout", Tick, 1 second, true)
case Active -> _ cancelTimer("timeout")
case x -> Idle log.info("entering Idle from " + x)
}
//#transition-syntax
//#alt-transition-syntax
onTransition(handler _)
def handler(from: StateType, to: StateType) {
// handle it here ...
}
//#alt-transition-syntax
//#stop-syntax
when(Error) {
case Event("stop", _)
// do cleanup ...
stop()
}
//#stop-syntax
//#transform-syntax
when(SomeState)(transform {
case Event(bytes: Array[Byte], read) stay using (read + bytes.length)
case Event(bytes: List[Byte], read) stay using (read + bytes.size)
} using {
case s @ FSM.State(state, read, timeout, stopReason, replies) if read > 1000
goto(Processing)
})
//#transform-syntax
//#alt-transform-syntax
val processingTrigger: PartialFunction[State, State] = {
case s @ FSM.State(state, read, timeout, stopReason, replies) if read > 1000
goto(Processing)
}
when(SomeState)(transform {
case Event(bytes: Array[Byte], read) stay using (read + bytes.length)
case Event(bytes: List[Byte], read) stay using (read + bytes.size)
} using processingTrigger)
//#alt-transform-syntax
//#termination-syntax
onTermination {
case StopEvent(FSM.Normal, state, data) // ...
case StopEvent(FSM.Shutdown, state, data) // ...
case StopEvent(FSM.Failure(cause), state, data) // ...
}
//#termination-syntax
//#unhandled-syntax
whenUnhandled {
case Event(x: X, data)
log.info("Received unhandled event: " + x)
stay
case Event(msg, _)
log.warning("Received unknown event: " + msg)
goto(Error)
}
//#unhandled-syntax
}
//#logging-fsm
import akka.actor.LoggingFSM
class MyFSM extends Actor with LoggingFSM[StateType, Data] {
//#body-elided
override def logDepth = 12
onTermination {
case StopEvent(FSM.Failure(_), state, data)
val lastEvents = getLog.mkString("\n\t")
log.warning("Failure in state " + state + " with data " + data + "\n" +
"Events leading up to this point:\n\t" + lastEvents)
}
// ...
//#body-elided
}
//#logging-fsm
}
//#fsm-code-elided //#fsm-code-elided
"batch correctly" in { "batch correctly" in {

View file

@ -0,0 +1,30 @@
package akka.docs.camel
object Consumers {
{
//#Consumer1
import akka.camel.{ CamelMessage, Consumer }
class Consumer1 extends Consumer {
def endpointUri = "file:data/input/actor"
def receive = {
case msg: CamelMessage println("received %s" format msg.bodyAs[String])
}
}
//#Consumer1
}
{
//#Consumer2
import akka.camel.{ CamelMessage, Consumer }
class Consumer2 extends Consumer {
def endpointUri = "jetty:http://localhost:8877/camel/default"
def receive = {
case msg: CamelMessage sender ! ("Hello %s" format msg.bodyAs[String])
}
}
//#Consumer2
}
}

View file

@ -1,11 +1,11 @@
package akka.docs.camel package akka.docs.camel
object wrapper { object Introduction {
{ {
//#Consumer-mina //#Consumer-mina
import akka.camel.{ CamelMessage, Consumer } import akka.camel.{ CamelMessage, Consumer }
class MyActor extends Consumer { class MinaClient extends Consumer {
def endpointUri = "mina:tcp://localhost:6200?textline=true" def endpointUri = "mina:tcp://localhost:6200?textline=true"
def receive = { def receive = {
@ -18,14 +18,14 @@ object wrapper {
import akka.actor.{ ActorSystem, Props } import akka.actor.{ ActorSystem, Props }
val sys = ActorSystem("camel") val sys = ActorSystem("camel")
val myActor = sys.actorOf(Props[MyActor]) val mina = sys.actorOf(Props[MinaClient])
//#Consumer-mina //#Consumer-mina
} }
{ {
//#Consumer //#Consumer
import akka.camel.{ CamelMessage, Consumer } import akka.camel.{ CamelMessage, Consumer }
class MyActor extends Consumer { class JettyAdapter extends Consumer {
def endpointUri = "jetty:http://localhost:8877/example" def endpointUri = "jetty:http://localhost:8877/example"
def receive = { def receive = {
@ -39,10 +39,16 @@ object wrapper {
//#Producer //#Producer
import akka.actor.Actor import akka.actor.Actor
import akka.camel.{ Producer, Oneway } import akka.camel.{ Producer, Oneway }
import akka.actor.{ ActorSystem, Props }
class MyActor extends Actor with Producer with Oneway { class Orders extends Actor with Producer with Oneway {
def endpointUri = "jms:queue:example" def endpointUri = "jms:queue:Orders"
} }
val sys = ActorSystem("camel")
val orders = sys.actorOf(Props[Orders])
orders ! <order amount="100" currency="PLN" itemId="12345"/>
//#Producer //#Producer
} }
} }

View file

@ -118,19 +118,11 @@ The FSM Trait and Object
The :class:`FSM` trait may only be mixed into an :class:`Actor`. Instead of The :class:`FSM` trait may only be mixed into an :class:`Actor`. Instead of
extending :class:`Actor`, the self type approach was chosen in order to make it extending :class:`Actor`, the self type approach was chosen in order to make it
obvious that an actor is actually created. Importing all members of the obvious that an actor is actually created:
:obj:`FSM` object is recommended if you want to directly access the symbols
like :obj:`StateTimeout`. This import is usually placed inside the state
machine definition:
.. code-block:: scala .. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: simple-fsm
class MyFSM extends Actor with FSM[State, Data] { :exclude: fsm-body
import FSM._
...
}
The :class:`FSM` trait takes two type parameters: The :class:`FSM` trait takes two type parameters:
@ -153,7 +145,7 @@ Defining States
A state is defined by one or more invocations of the method A state is defined by one or more invocations of the method
:func:`when(<name>[, stateTimeout = <timeout>])(stateFunction)`. :func:`when(<name>[, stateTimeout = <timeout>])(stateFunction)`.
The given name must be an object which is type-compatible with the first type The given name must be an object which is type-compatible with the first type
parameter given to the :class:`FSM` trait. This object is used as a hash key, parameter given to the :class:`FSM` trait. This object is used as a hash key,
@ -165,27 +157,18 @@ If the :meth:`stateTimeout` parameter is given, then all transitions into this
state, including staying, receive this timeout by default. Initiating the state, including staying, receive this timeout by default. Initiating the
transition with an explicit timeout may be used to override this default, see transition with an explicit timeout may be used to override this default, see
`Initiating Transitions`_ for more information. The state timeout of any state `Initiating Transitions`_ for more information. The state timeout of any state
may be changed during action processing with :func:`setStateTimeout(state, may be changed during action processing with
duration)`. This enables runtime configuration e.g. via external message. :func:`setStateTimeout(state, duration)`. This enables runtime configuration
e.g. via external message.
The :meth:`stateFunction` argument is a :class:`PartialFunction[Event, State]`, The :meth:`stateFunction` argument is a :class:`PartialFunction[Event, State]`,
which is conveniently given using the partial function literal syntax as which is conveniently given using the partial function literal syntax as
demonstrated below: demonstrated below:
.. code-block:: scala .. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: when-syntax
when(Idle) { The :class:`Event(msg: Any, data: D)` case class is parameterized with the data
case Event(Start(msg), _) =>
goto(Timer) using (msg, sender)
}
when(Timer, stateTimeout = 12 seconds) {
case Event(StateTimeout, (msg, sender)) =>
sender ! msg
goto(Idle)
}
The :class:`Event(msg: Any, data: D)` case class is parameterized with the data
type held by the FSM for convenient pattern matching. type held by the FSM for convenient pattern matching.
Defining the Initial State Defining the Initial State
@ -193,7 +176,7 @@ Defining the Initial State
Each FSM needs a starting point, which is declared using Each FSM needs a starting point, which is declared using
:func:`startWith(state, data[, timeout])` :func:`startWith(state, data[, timeout])`
The optionally given timeout argument overrides any specification given for the The optionally given timeout argument overrides any specification given for the
desired initial state. If you want to cancel a default timeout, use desired initial state. If you want to cancel a default timeout, use
@ -206,16 +189,8 @@ If a state doesn't handle a received event a warning is logged. If you want to
do something else in this case you can specify that with do something else in this case you can specify that with
:func:`whenUnhandled(stateFunction)`: :func:`whenUnhandled(stateFunction)`:
.. code-block:: scala .. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: unhandled-syntax
whenUnhandled {
case Event(x : X, data) =>
log.info(this, "Received unhandled event: " + x)
stay
case Event(msg, _) =>
log.warn(this, "Received unknown event: " + x)
goto(Error)
}
**IMPORTANT**: This handler is not stacked, meaning that each invocation of **IMPORTANT**: This handler is not stacked, meaning that each invocation of
:func:`whenUnhandled` replaces the previously installed handler. :func:`whenUnhandled` replaces the previously installed handler.
@ -230,7 +205,8 @@ The state definition can either be the current state, as described by the
:func:`goto(state)`. The resulting object allows further qualification by way :func:`goto(state)`. The resulting object allows further qualification by way
of the modifiers described in the following: of the modifiers described in the following:
:meth:`forMax(duration)` * :meth:`forMax(duration)`
This modifier sets a state timeout on the next state. This means that a timer This modifier sets a state timeout on the next state. This means that a timer
is started which upon expiry sends a :obj:`StateTimeout` message to the FSM. is started which upon expiry sends a :obj:`StateTimeout` message to the FSM.
This timer is canceled upon reception of any other message in the meantime; This timer is canceled upon reception of any other message in the meantime;
@ -241,23 +217,21 @@ of the modifiers described in the following:
specified for the target state. If you want to cancel the default timeout, specified for the target state. If you want to cancel the default timeout,
use :obj:`Duration.Inf`. use :obj:`Duration.Inf`.
:meth:`using(data)` * :meth:`using(data)`
This modifier replaces the old state data with the new data given. If you This modifier replaces the old state data with the new data given. If you
follow the advice :ref:`above <fsm-philosophy>`, this is the only place where follow the advice :ref:`above <fsm-philosophy>`, this is the only place where
internal state data are ever modified. internal state data are ever modified.
:meth:`replying(msg)` * :meth:`replying(msg)`
This modifier sends a reply to the currently processed message and otherwise This modifier sends a reply to the currently processed message and otherwise
does not modify the state transition. does not modify the state transition.
All modifier can be chained to achieve a nice and concise description: All modifier can be chained to achieve a nice and concise description:
.. code-block:: scala .. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: modifier-syntax
when(State) {
case Event(msg, _) =>
goto(Processing) using (msg) forMax (5 seconds) replying (WillDo)
}
The parentheses are not actually needed in all cases, but they visually The parentheses are not actually needed in all cases, but they visually
distinguish between modifiers and their arguments and therefore make the code distinguish between modifiers and their arguments and therefore make the code
@ -267,7 +241,7 @@ even more pleasant to read for foreigners.
Please note that the ``return`` statement may not be used in :meth:`when` Please note that the ``return`` statement may not be used in :meth:`when`
blocks or similar; this is a Scala restriction. Either refactor your code blocks or similar; this is a Scala restriction. Either refactor your code
using ``if () ... else ...`` or move it into a method definition. using ``if () ... else ...`` or move it into a method definition.
Monitoring Transitions Monitoring Transitions
---------------------- ----------------------
@ -293,13 +267,8 @@ The handler is a partial function which takes a pair of states as input; no
resulting state is needed as it is not possible to modify the transition in resulting state is needed as it is not possible to modify the transition in
progress. progress.
.. code-block:: scala .. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: transition-syntax
onTransition {
case Idle -> Active => setTimer("timeout")
case Active -> _ => cancelTimer("timeout")
case x -> Idle => log.info("entering Idle from "+x)
}
The convenience extractor :obj:`->` enables decomposition of the pair of states The convenience extractor :obj:`->` enables decomposition of the pair of states
with a clear visual reminder of the transition's direction. As usual in pattern with a clear visual reminder of the transition's direction. As usual in pattern
@ -311,13 +280,8 @@ It is also possible to pass a function object accepting two states to
:func:`onTransition`, in case your transition handling logic is implemented as :func:`onTransition`, in case your transition handling logic is implemented as
a method: a method:
.. code-block:: scala .. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: alt-transition-syntax
onTransition(handler _)
private def handler(from: State, to: State) {
...
}
The handlers registered with this method are stacked, so you can intersperse The handlers registered with this method are stacked, so you can intersperse
:func:`onTransition` blocks with :func:`when` blocks as suits your design. It :func:`onTransition` blocks with :func:`when` blocks as suits your design. It
@ -338,8 +302,8 @@ External Monitoring
External actors may be registered to be notified of state transitions by External actors may be registered to be notified of state transitions by
sending a message :class:`SubscribeTransitionCallBack(actorRef)`. The named sending a message :class:`SubscribeTransitionCallBack(actorRef)`. The named
actor will be sent a :class:`CurrentState(self, stateName)` message immediately actor will be sent a :class:`CurrentState(self, stateName)` message immediately
and will receive :class:`Transition(actorRef, oldState, newState)` messages and will receive :class:`Transition(actorRef, oldState, newState)` messages
whenever a new state is reached. External monitors may be unregistered by whenever a new state is reached. External monitors may be unregistered by
sending :class:`UnsubscribeTransitionCallBack(actorRef)` to the FSM actor. sending :class:`UnsubscribeTransitionCallBack(actorRef)` to the FSM actor.
@ -347,13 +311,31 @@ Registering a not-running listener generates a warning and fails gracefully.
Stopping a listener without unregistering will remove the listener from the Stopping a listener without unregistering will remove the listener from the
subscription list upon the next transition. subscription list upon the next transition.
Transforming State
------------------
The partial functions supplied as argument to the ``when()`` blocks can be
transformed using Scalas full supplement of functional programming tools. In
order to retain type inference, there is a helper function which may be used in
case some common handling logic shall be applied to different clauses:
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: transform-syntax
It goes without saying that the arguments to this method may also be stored, to
be used several times, e.g. when applying the same transformation to several
``when()`` blocks:
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: alt-transform-syntax
Timers Timers
------ ------
Besides state timeouts, FSM manages timers identified by :class:`String` names. Besides state timeouts, FSM manages timers identified by :class:`String` names.
You may set a timer using You may set a timer using
:func:`setTimer(name, msg, interval, repeat)` :func:`setTimer(name, msg, interval, repeat)`
where :obj:`msg` is the message object which will be sent after the duration where :obj:`msg` is the message object which will be sent after the duration
:obj:`interval` has elapsed. If :obj:`repeat` is :obj:`true`, then the timer is :obj:`interval` has elapsed. If :obj:`repeat` is :obj:`true`, then the timer is
@ -376,7 +358,7 @@ Termination from Inside
The FSM is stopped by specifying the result state as The FSM is stopped by specifying the result state as
:func:`stop([reason[, data]])` :func:`stop([reason[, data]])`
The reason must be one of :obj:`Normal` (which is the default), :obj:`Shutdown` The reason must be one of :obj:`Normal` (which is the default), :obj:`Shutdown`
or :obj:`Failure(reason)`, and the second argument may be given to change the or :obj:`Failure(reason)`, and the second argument may be given to change the
@ -389,25 +371,15 @@ state data which is available during termination handling.
the same way as a state transition (but note that the ``return`` statement the same way as a state transition (but note that the ``return`` statement
may not be used within a :meth:`when` block). may not be used within a :meth:`when` block).
.. code-block:: scala .. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: stop-syntax
when(A) {
case Event(Stop, _) =>
doCleanup()
stop()
}
You can use :func:`onTermination(handler)` to specify custom code that is You can use :func:`onTermination(handler)` to specify custom code that is
executed when the FSM is stopped. The handler is a partial function which takes executed when the FSM is stopped. The handler is a partial function which takes
a :class:`StopEvent(reason, stateName, stateData)` as argument: a :class:`StopEvent(reason, stateName, stateData)` as argument:
.. code-block:: scala .. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: termination-syntax
onTermination {
case StopEvent(Normal, s, d) => ...
case StopEvent(Shutdown, _, _) => ...
case StopEvent(Failure(cause), s, d) => ...
}
As for the :func:`whenUnhandled` case, this handler is not stacked, so each As for the :func:`whenUnhandled` case, this handler is not stacked, so each
invocation of :func:`onTermination` replaces the previously installed handler. invocation of :func:`onTermination` replaces the previously installed handler.
@ -419,7 +391,7 @@ When an :class:`ActorRef` associated to a FSM is stopped using the
:meth:`stop()` method, its :meth:`postStop` hook will be executed. The default :meth:`stop()` method, its :meth:`postStop` hook will be executed. The default
implementation by the :class:`FSM` trait is to execute the implementation by the :class:`FSM` trait is to execute the
:meth:`onTermination` handler if that is prepared to handle a :meth:`onTermination` handler if that is prepared to handle a
:obj:`StopEvent(Shutdown, ...)`. :obj:`StopEvent(Shutdown, ...)`.
.. warning:: .. warning::
@ -438,11 +410,11 @@ Event Tracing
------------- -------------
The setting ``akka.actor.debug.fsm`` in :ref:`configuration` enables logging of an The setting ``akka.actor.debug.fsm`` in :ref:`configuration` enables logging of an
event trace by :class:`LoggingFSM` instances:: event trace by :class:`LoggingFSM` instances:
class MyFSM extends Actor with LoggingFSM[X, Z] { .. includecode:: code/akka/docs/actor/FSMDocSpec.scala
... :include: logging-fsm
} :exclude: body-elided
This FSM will log at DEBUG level: This FSM will log at DEBUG level:
@ -459,17 +431,10 @@ Rolling Event Log
The :class:`LoggingFSM` trait adds one more feature to the FSM: a rolling event The :class:`LoggingFSM` trait adds one more feature to the FSM: a rolling event
log which may be used during debugging (for tracing how the FSM entered a log which may be used during debugging (for tracing how the FSM entered a
certain failure state) or for other creative uses:: certain failure state) or for other creative uses:
class MyFSM extends Actor with LoggingFSM[X, Z] { .. includecode:: code/akka/docs/actor/FSMDocSpec.scala
override def logDepth = 12 :include: logging-fsm
onTermination {
case StopEvent(Failure(_), state, data) =>
log.warning(this, "Failure in state "+state+" with data "+data+"\n"+
"Events leading up to this point:\n\t"+getLog.mkString("\n\t"))
}
...
}
The :meth:`logDepth` defaults to zero, which turns off the event log. The :meth:`logDepth` defaults to zero, which turns off the event log.

View file

@ -80,147 +80,6 @@ public final class RemoteProtocol {
// @@protoc_insertion_point(enum_scope:CommandType) // @@protoc_insertion_point(enum_scope:CommandType)
} }
public enum ReplicationStorageType
implements com.google.protobuf.ProtocolMessageEnum {
TRANSIENT(0, 1),
TRANSACTION_LOG(1, 2),
DATA_GRID(2, 3),
;
public static final int TRANSIENT_VALUE = 1;
public static final int TRANSACTION_LOG_VALUE = 2;
public static final int DATA_GRID_VALUE = 3;
public final int getNumber() { return value; }
public static ReplicationStorageType valueOf(int value) {
switch (value) {
case 1: return TRANSIENT;
case 2: return TRANSACTION_LOG;
case 3: return DATA_GRID;
default: return null;
}
}
public static com.google.protobuf.Internal.EnumLiteMap<ReplicationStorageType>
internalGetValueMap() {
return internalValueMap;
}
private static com.google.protobuf.Internal.EnumLiteMap<ReplicationStorageType>
internalValueMap =
new com.google.protobuf.Internal.EnumLiteMap<ReplicationStorageType>() {
public ReplicationStorageType findValueByNumber(int number) {
return ReplicationStorageType.valueOf(number);
}
};
public final com.google.protobuf.Descriptors.EnumValueDescriptor
getValueDescriptor() {
return getDescriptor().getValues().get(index);
}
public final com.google.protobuf.Descriptors.EnumDescriptor
getDescriptorForType() {
return getDescriptor();
}
public static final com.google.protobuf.Descriptors.EnumDescriptor
getDescriptor() {
return akka.remote.RemoteProtocol.getDescriptor().getEnumTypes().get(1);
}
private static final ReplicationStorageType[] VALUES = {
TRANSIENT, TRANSACTION_LOG, DATA_GRID,
};
public static ReplicationStorageType valueOf(
com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
if (desc.getType() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"EnumValueDescriptor is not for this type.");
}
return VALUES[desc.getIndex()];
}
private final int index;
private final int value;
private ReplicationStorageType(int index, int value) {
this.index = index;
this.value = value;
}
// @@protoc_insertion_point(enum_scope:ReplicationStorageType)
}
public enum ReplicationStrategyType
implements com.google.protobuf.ProtocolMessageEnum {
WRITE_THROUGH(0, 1),
WRITE_BEHIND(1, 2),
;
public static final int WRITE_THROUGH_VALUE = 1;
public static final int WRITE_BEHIND_VALUE = 2;
public final int getNumber() { return value; }
public static ReplicationStrategyType valueOf(int value) {
switch (value) {
case 1: return WRITE_THROUGH;
case 2: return WRITE_BEHIND;
default: return null;
}
}
public static com.google.protobuf.Internal.EnumLiteMap<ReplicationStrategyType>
internalGetValueMap() {
return internalValueMap;
}
private static com.google.protobuf.Internal.EnumLiteMap<ReplicationStrategyType>
internalValueMap =
new com.google.protobuf.Internal.EnumLiteMap<ReplicationStrategyType>() {
public ReplicationStrategyType findValueByNumber(int number) {
return ReplicationStrategyType.valueOf(number);
}
};
public final com.google.protobuf.Descriptors.EnumValueDescriptor
getValueDescriptor() {
return getDescriptor().getValues().get(index);
}
public final com.google.protobuf.Descriptors.EnumDescriptor
getDescriptorForType() {
return getDescriptor();
}
public static final com.google.protobuf.Descriptors.EnumDescriptor
getDescriptor() {
return akka.remote.RemoteProtocol.getDescriptor().getEnumTypes().get(2);
}
private static final ReplicationStrategyType[] VALUES = {
WRITE_THROUGH, WRITE_BEHIND,
};
public static ReplicationStrategyType valueOf(
com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
if (desc.getType() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"EnumValueDescriptor is not for this type.");
}
return VALUES[desc.getIndex()];
}
private final int index;
private final int value;
private ReplicationStrategyType(int index, int value) {
this.index = index;
this.value = value;
}
// @@protoc_insertion_point(enum_scope:ReplicationStrategyType)
}
public interface AkkaRemoteProtocolOrBuilder public interface AkkaRemoteProtocolOrBuilder
extends com.google.protobuf.MessageOrBuilder { extends com.google.protobuf.MessageOrBuilder {
@ -4313,491 +4172,6 @@ public final class RemoteProtocol {
// @@protoc_insertion_point(class_scope:AddressProtocol) // @@protoc_insertion_point(class_scope:AddressProtocol)
} }
public interface ExceptionProtocolOrBuilder
extends com.google.protobuf.MessageOrBuilder {
// required string classname = 1;
boolean hasClassname();
String getClassname();
// required string message = 2;
boolean hasMessage();
String getMessage();
}
public static final class ExceptionProtocol extends
com.google.protobuf.GeneratedMessage
implements ExceptionProtocolOrBuilder {
// Use ExceptionProtocol.newBuilder() to construct.
private ExceptionProtocol(Builder builder) {
super(builder);
}
private ExceptionProtocol(boolean noInit) {}
private static final ExceptionProtocol defaultInstance;
public static ExceptionProtocol getDefaultInstance() {
return defaultInstance;
}
public ExceptionProtocol getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.remote.RemoteProtocol.internal_static_ExceptionProtocol_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.remote.RemoteProtocol.internal_static_ExceptionProtocol_fieldAccessorTable;
}
private int bitField0_;
// required string classname = 1;
public static final int CLASSNAME_FIELD_NUMBER = 1;
private java.lang.Object classname_;
public boolean hasClassname() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public String getClassname() {
java.lang.Object ref = classname_;
if (ref instanceof String) {
return (String) ref;
} else {
com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
String s = bs.toStringUtf8();
if (com.google.protobuf.Internal.isValidUtf8(bs)) {
classname_ = s;
}
return s;
}
}
private com.google.protobuf.ByteString getClassnameBytes() {
java.lang.Object ref = classname_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8((String) ref);
classname_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
// required string message = 2;
public static final int MESSAGE_FIELD_NUMBER = 2;
private java.lang.Object message_;
public boolean hasMessage() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public String getMessage() {
java.lang.Object ref = message_;
if (ref instanceof String) {
return (String) ref;
} else {
com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
String s = bs.toStringUtf8();
if (com.google.protobuf.Internal.isValidUtf8(bs)) {
message_ = s;
}
return s;
}
}
private com.google.protobuf.ByteString getMessageBytes() {
java.lang.Object ref = message_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8((String) ref);
message_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
private void initFields() {
classname_ = "";
message_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
if (!hasClassname()) {
memoizedIsInitialized = 0;
return false;
}
if (!hasMessage()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeBytes(1, getClassnameBytes());
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeBytes(2, getMessageBytes());
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(1, getClassnameBytes());
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(2, getMessageBytes());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
public static akka.remote.RemoteProtocol.ExceptionProtocol parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static akka.remote.RemoteProtocol.ExceptionProtocol parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static akka.remote.RemoteProtocol.ExceptionProtocol parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static akka.remote.RemoteProtocol.ExceptionProtocol parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static akka.remote.RemoteProtocol.ExceptionProtocol parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static akka.remote.RemoteProtocol.ExceptionProtocol parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static akka.remote.RemoteProtocol.ExceptionProtocol parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input)) {
return builder.buildParsed();
} else {
return null;
}
}
public static akka.remote.RemoteProtocol.ExceptionProtocol parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
return builder.buildParsed();
} else {
return null;
}
}
public static akka.remote.RemoteProtocol.ExceptionProtocol parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static akka.remote.RemoteProtocol.ExceptionProtocol parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(akka.remote.RemoteProtocol.ExceptionProtocol prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
implements akka.remote.RemoteProtocol.ExceptionProtocolOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.remote.RemoteProtocol.internal_static_ExceptionProtocol_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.remote.RemoteProtocol.internal_static_ExceptionProtocol_fieldAccessorTable;
}
// Construct using akka.remote.RemoteProtocol.ExceptionProtocol.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
classname_ = "";
bitField0_ = (bitField0_ & ~0x00000001);
message_ = "";
bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return akka.remote.RemoteProtocol.ExceptionProtocol.getDescriptor();
}
public akka.remote.RemoteProtocol.ExceptionProtocol getDefaultInstanceForType() {
return akka.remote.RemoteProtocol.ExceptionProtocol.getDefaultInstance();
}
public akka.remote.RemoteProtocol.ExceptionProtocol build() {
akka.remote.RemoteProtocol.ExceptionProtocol result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
private akka.remote.RemoteProtocol.ExceptionProtocol buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
akka.remote.RemoteProtocol.ExceptionProtocol result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return result;
}
public akka.remote.RemoteProtocol.ExceptionProtocol buildPartial() {
akka.remote.RemoteProtocol.ExceptionProtocol result = new akka.remote.RemoteProtocol.ExceptionProtocol(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
result.classname_ = classname_;
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000002;
}
result.message_ = message_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof akka.remote.RemoteProtocol.ExceptionProtocol) {
return mergeFrom((akka.remote.RemoteProtocol.ExceptionProtocol)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(akka.remote.RemoteProtocol.ExceptionProtocol other) {
if (other == akka.remote.RemoteProtocol.ExceptionProtocol.getDefaultInstance()) return this;
if (other.hasClassname()) {
setClassname(other.getClassname());
}
if (other.hasMessage()) {
setMessage(other.getMessage());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
if (!hasClassname()) {
return false;
}
if (!hasMessage()) {
return false;
}
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
}
break;
}
case 10: {
bitField0_ |= 0x00000001;
classname_ = input.readBytes();
break;
}
case 18: {
bitField0_ |= 0x00000002;
message_ = input.readBytes();
break;
}
}
}
}
private int bitField0_;
// required string classname = 1;
private java.lang.Object classname_ = "";
public boolean hasClassname() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public String getClassname() {
java.lang.Object ref = classname_;
if (!(ref instanceof String)) {
String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
classname_ = s;
return s;
} else {
return (String) ref;
}
}
public Builder setClassname(String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000001;
classname_ = value;
onChanged();
return this;
}
public Builder clearClassname() {
bitField0_ = (bitField0_ & ~0x00000001);
classname_ = getDefaultInstance().getClassname();
onChanged();
return this;
}
void setClassname(com.google.protobuf.ByteString value) {
bitField0_ |= 0x00000001;
classname_ = value;
onChanged();
}
// required string message = 2;
private java.lang.Object message_ = "";
public boolean hasMessage() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public String getMessage() {
java.lang.Object ref = message_;
if (!(ref instanceof String)) {
String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
message_ = s;
return s;
} else {
return (String) ref;
}
}
public Builder setMessage(String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000002;
message_ = value;
onChanged();
return this;
}
public Builder clearMessage() {
bitField0_ = (bitField0_ & ~0x00000002);
message_ = getDefaultInstance().getMessage();
onChanged();
return this;
}
void setMessage(com.google.protobuf.ByteString value) {
bitField0_ |= 0x00000002;
message_ = value;
onChanged();
}
// @@protoc_insertion_point(builder_scope:ExceptionProtocol)
}
static {
defaultInstance = new ExceptionProtocol(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:ExceptionProtocol)
}
public interface DurableMailboxMessageProtocolOrBuilder public interface DurableMailboxMessageProtocolOrBuilder
extends com.google.protobuf.MessageOrBuilder { extends com.google.protobuf.MessageOrBuilder {
@ -5496,11 +4870,6 @@ public final class RemoteProtocol {
private static private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_AddressProtocol_fieldAccessorTable; internal_static_AddressProtocol_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
internal_static_ExceptionProtocol_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_ExceptionProtocol_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor private static com.google.protobuf.Descriptors.Descriptor
internal_static_DurableMailboxMessageProtocol_descriptor; internal_static_DurableMailboxMessageProtocol_descriptor;
private static private static
@ -5531,17 +4900,12 @@ public final class RemoteProtocol {
"anifest\030\003 \001(\014\"3\n\025MetadataEntryProtocol\022\013" + "anifest\030\003 \001(\014\"3\n\025MetadataEntryProtocol\022\013" +
"\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"A\n\017AddressPro" + "\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"A\n\017AddressPro" +
"tocol\022\016\n\006system\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022" + "tocol\022\016\n\006system\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022" +
"\014\n\004port\030\003 \002(\r\"7\n\021ExceptionProtocol\022\021\n\tcl" + "\014\n\004port\030\003 \002(\r\"y\n\035DurableMailboxMessagePr" +
"assname\030\001 \002(\t\022\017\n\007message\030\002 \002(\t\"y\n\035Durabl" + "otocol\022$\n\trecipient\030\001 \002(\0132\021.ActorRefProt" +
"eMailboxMessageProtocol\022$\n\trecipient\030\001 \002" + "ocol\022!\n\006sender\030\002 \001(\0132\021.ActorRefProtocol\022" +
"(\0132\021.ActorRefProtocol\022!\n\006sender\030\002 \001(\0132\021.", "\017\n\007message\030\003 \002(\014*7\n\013CommandType\022\013\n\007CONNE",
"ActorRefProtocol\022\017\n\007message\030\003 \002(\014*7\n\013Com" + "CT\020\001\022\014\n\010SHUTDOWN\020\002\022\r\n\tHEARTBEAT\020\003B\017\n\013akk" +
"mandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020\002\022\r\n\tH" + "a.remoteH\001"
"EARTBEAT\020\003*K\n\026ReplicationStorageType\022\r\n\t" +
"TRANSIENT\020\001\022\023\n\017TRANSACTION_LOG\020\002\022\r\n\tDATA" +
"_GRID\020\003*>\n\027ReplicationStrategyType\022\021\n\rWR" +
"ITE_THROUGH\020\001\022\020\n\014WRITE_BEHIND\020\002B\017\n\013akka." +
"remoteH\001"
}; };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -5604,16 +4968,8 @@ public final class RemoteProtocol {
new java.lang.String[] { "System", "Hostname", "Port", }, new java.lang.String[] { "System", "Hostname", "Port", },
akka.remote.RemoteProtocol.AddressProtocol.class, akka.remote.RemoteProtocol.AddressProtocol.class,
akka.remote.RemoteProtocol.AddressProtocol.Builder.class); akka.remote.RemoteProtocol.AddressProtocol.Builder.class);
internal_static_ExceptionProtocol_descriptor =
getDescriptor().getMessageTypes().get(7);
internal_static_ExceptionProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ExceptionProtocol_descriptor,
new java.lang.String[] { "Classname", "Message", },
akka.remote.RemoteProtocol.ExceptionProtocol.class,
akka.remote.RemoteProtocol.ExceptionProtocol.Builder.class);
internal_static_DurableMailboxMessageProtocol_descriptor = internal_static_DurableMailboxMessageProtocol_descriptor =
getDescriptor().getMessageTypes().get(8); getDescriptor().getMessageTypes().get(7);
internal_static_DurableMailboxMessageProtocol_fieldAccessorTable = new internal_static_DurableMailboxMessageProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_DurableMailboxMessageProtocol_descriptor, internal_static_DurableMailboxMessageProtocol_descriptor,

View file

@ -44,23 +44,6 @@ enum CommandType {
HEARTBEAT = 3; HEARTBEAT = 3;
} }
/**
* Defines the type of the ReplicationStorage
*/
enum ReplicationStorageType {
TRANSIENT = 1;
TRANSACTION_LOG = 2;
DATA_GRID = 3;
}
/**
* Defines the type of the ReplicationStrategy
*/
enum ReplicationStrategyType {
WRITE_THROUGH = 1;
WRITE_BEHIND = 2;
}
/** /**
* Defines a remote ActorRef that "remembers" and uses its original Actor instance * Defines a remote ActorRef that "remembers" and uses its original Actor instance
* on the original node. * on the original node.
@ -95,14 +78,6 @@ message AddressProtocol {
required uint32 port = 3; required uint32 port = 3;
} }
/**
* Defines an exception.
*/
message ExceptionProtocol {
required string classname = 1;
required string message = 2;
}
/** /**
* Defines the durable mailbox message. * Defines the durable mailbox message.
*/ */

View file

@ -1,6 +1,5 @@
package akka.spring.foo; package akka.spring.foo;
import static akka.actor.Actors.*;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;

View file

@ -5,7 +5,6 @@
package akka.transactor; package akka.transactor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Actors;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import scala.concurrent.stm.Ref; import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.STM; import scala.concurrent.stm.japi.STM;