diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index 3a2c1bb627..ef49cbc18d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -262,6 +262,25 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im expectMsg(1 second, IndexedSeq(LogEntry(1, 1, "log"), LogEntry(1, 1, "count"), LogEntry(1, 2, "log"))) } + "allow transforming of state results" in { + import akka.actor.FSM._ + val fsmref = system.actorOf(Props(new Actor with FSM[Int, Int] { + startWith(0, 0) + when(0)(transform { + case Event("go", _) ⇒ stay + } using { + case x ⇒ goto(1) + }) + when(1) { + case _ ⇒ stay + } + })) + fsmref ! SubscribeTransitionCallBack(testActor) + fsmref ! "go" + expectMsg(CurrentState(fsmref, 0)) + expectMsg(Transition(fsmref, 0, 1)) + } + } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index 59468125eb..df47c801bb 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -67,6 +67,18 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender { } } + "resubmit single-shot timer" taggedAs TimingTest in { + within(2 seconds) { + within(500 millis, 1.5 second) { + fsm ! TestSingleTimerResubmit + expectMsg(Tick) + expectMsg(Tock) + expectMsg(Transition(fsm, TestSingleTimerResubmit, Initial)) + } + expectNoMsg + } + } + "correctly cancel a named timer" taggedAs TimingTest in { fsm ! TestCancelTimer within(500 millis) { @@ -106,8 +118,8 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender { } "notify unhandled messages" taggedAs TimingTest in { - filterEvents(EventFilter.warning("unhandled event Tick in state TestUnhandled", source = fsm.toString, occurrences = 1), - EventFilter.warning("unhandled event Unhandled(test) in state TestUnhandled", source = fsm.toString, occurrences = 1)) { + filterEvents(EventFilter.warning("unhandled event Tick in state TestUnhandled", source = fsm.path.toString, occurrences = 1), + EventFilter.warning("unhandled event Unhandled(test) in state TestUnhandled", source = fsm.path.toString, occurrences = 1)) { fsm ! TestUnhandled within(1 second) { fsm ! Tick @@ -142,6 +154,7 @@ object FSMTimingSpec { case object TestStateTimeout extends State case object TestStateTimeoutOverride extends State case object TestSingleTimer extends State + case object TestSingleTimerResubmit extends State case object TestRepeatedTimer extends State case object TestUnhandled extends State case object TestCancelTimer extends State @@ -179,6 +192,13 @@ object FSMTimingSpec { tester ! Tick goto(Initial) } + onTransition { + case Initial -> TestSingleTimerResubmit ⇒ setTimer("blah", Tick, 500 millis, false) + } + when(TestSingleTimerResubmit) { + case Event(Tick, _) ⇒ tester ! Tick; setTimer("blah", Tock, 500 millis, false) + case Event(Tock, _) ⇒ tester ! Tock; goto(Initial) + } when(TestCancelTimer) { case Event(Tick, _) ⇒ setTimer("hallo", Tock, 1 milli, false) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala index 0a28a8597e..dc08df1c98 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala @@ -86,9 +86,5 @@ class ReceiveTimeoutSpec extends AkkaSpec { intercept[TimeoutException] { Await.ready(timeoutLatch, 1 second) } system.stop(timeoutActor) } - - "have ReceiveTimeout eq to Actors ReceiveTimeout" in { - akka.actor.Actors.receiveTimeout must be theSameInstanceAs (ReceiveTimeout) - } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index c386bb0a52..502712872a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -428,6 +428,31 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) } } + "be able to serialize and deserialize proxies" in { + import java.io._ + JavaSerializer.currentSystem.withValue(system.asInstanceOf[ExtendedActorSystem]) { + val t = newFooBar(Duration(2, "s")) + + t.optionPigdog() must be === Some("Pigdog") + + val baos = new ByteArrayOutputStream(8192 * 4) + val out = new ObjectOutputStream(baos) + + out.writeObject(t) + out.close() + + val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) + + val tNew = in.readObject().asInstanceOf[Foo] + + tNew must be === t + + tNew.optionPigdog() must be === Some("Pigdog") + + mustStop(t) + } + } + "be able to override lifecycle callbacks" in { val latch = new CountDownLatch(16) val ta = TypedActor(system) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala deleted file mode 100644 index 7ac129e254..0000000000 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala +++ /dev/null @@ -1,145 +0,0 @@ -package akka.dispatch - -import Future.flow -import akka.util.cps._ -import akka.util.Timeout -import akka.util.duration._ -import akka.testkit.AkkaSpec -import akka.testkit.DefaultTimeout - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class PromiseStreamSpec extends AkkaSpec with DefaultTimeout { - - "A PromiseStream" must { - - "work" in { - val a, b, c = Promise[Int]() - val q = PromiseStream[Int]() - flow { q << (1, 2, 3) } - flow { - a << q() - b << q - c << q() - } - assert(Await.result(a, timeout.duration) === 1) - assert(Await.result(b, timeout.duration) === 2) - assert(Await.result(c, timeout.duration) === 3) - } - - "pend" in { - val a, b, c = Promise[Int]() - val q = PromiseStream[Int]() - flow { - a << q - b << q() - c << q - } - flow { q <<< List(1, 2, 3) } - assert(Await.result(a, timeout.duration) === 1) - assert(Await.result(b, timeout.duration) === 2) - assert(Await.result(c, timeout.duration) === 3) - } - - "pend again" in { - val a, b, c, d = Promise[Int]() - val q1, q2 = PromiseStream[Int]() - val oneTwo = Future(List(1, 2)) - flow { - a << q2 - b << q2 - q1 << 3 << 4 - } - flow { - q2 <<< oneTwo - c << q1 - d << q1 - } - assert(Await.result(a, timeout.duration) === 1) - assert(Await.result(b, timeout.duration) === 2) - assert(Await.result(c, timeout.duration) === 3) - assert(Await.result(d, timeout.duration) === 4) - } - - "enque" in { - val q = PromiseStream[Int]() - val a = q.dequeue() - val b = q.dequeue() - val c, d = Promise[Int]() - flow { - c << q - d << q - } - q ++= List(1, 2, 3, 4) - - assert(Await.result(a, timeout.duration) === 1) - assert(Await.result(b, timeout.duration) === 2) - assert(Await.result(c, timeout.duration) === 3) - assert(Await.result(d, timeout.duration) === 4) - } - - "map" in { - val qs = PromiseStream[String]() - val qi = qs.map(_.length) - val a, c = Promise[Int]() - val b = Promise[String]() - flow { - a << qi - b << qs - c << qi - } - flow { - qs << ("Hello", "World!", "Test") - } - assert(Await.result(a, timeout.duration) === 5) - assert(Await.result(b, timeout.duration) === "World!") - assert(Await.result(c, timeout.duration) === 4) - } - - "map futures" in { - val q = PromiseStream[String]() - flow { - q << (Future("a"), Future("b"), Future("c")) - } - val a, b, c = q.dequeue - Await.result(a, timeout.duration) must be("a") - Await.result(b, timeout.duration) must be("b") - Await.result(c, timeout.duration) must be("c") - } - - "not fail under concurrent stress" in { - implicit val timeout = Timeout(60 seconds) - val q = PromiseStream[Long](timeout.duration.toMillis) - - flow { - var n = 0L - repeatC(50000) { - n += 1 - q << n - } - } - - val future = Future sequence { - List.fill(10) { - flow { - var total = 0L - repeatC(10000) { - val n = q() - total += n - } - total - } - } - } map (_.sum) - - flow { - var n = 50000L - repeatC(50000) { - n += 1 - q << n - } - } - - assert(Await.result(future, timeout.duration) === (1L to 100000L).sum) - } - } -} diff --git a/akka-actor/src/main/java/akka/actor/Actors.java b/akka-actor/src/main/java/akka/actor/Actors.java deleted file mode 100644 index f0ee734c3b..0000000000 --- a/akka-actor/src/main/java/akka/actor/Actors.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.actor; - -/** - * JAVA API for - creating actors, - creating remote actors, - locating actors - */ -public class Actors { - /** - * The message that is sent when an Actor gets a receive timeout. - * - *
-   * if (message == receiveTimeout()) {
-   *   // Timed out
-   * }
-   * 
- * - * @return the single instance of ReceiveTimeout - */ - public final static ReceiveTimeout$ receiveTimeout() { - return ReceiveTimeout$.MODULE$; - } - - /** - * The message that when sent to an Actor kills it by throwing an exception. - * - *
-   * actor.tell(kill());
-   * 
- * - * @return the single instance of Kill - */ - public final static Kill$ kill() { - return Kill$.MODULE$; - } - - /** - * The message that when sent to an Actor shuts it down by calling 'stop'. - * - *
-   * actor.tell(poisonPill());
-   * 
- * - * @return the single instance of PoisonPill - */ - public final static PoisonPill$ poisonPill() { - return PoisonPill$.MODULE$; - } -} diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index fb33b2ab85..8c68ba3315 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -509,7 +509,14 @@ private[akka] class ActorCell( checkReceiveTimeout if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) } catch { - case NonFatal(e) ⇒ throw ActorInitializationException(self, "exception during creation", e) + case NonFatal(i: InstantiationException) ⇒ + throw ActorInitializationException(self, + """exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either, + a non-static inner class (in which case make it a static inner class or use Props(new ...) or Props( new UntypedActorFactory ... ) + or is missing an appropriate, reachable no-args constructor. + """, i) + case NonFatal(e) ⇒ + throw ActorInitializationException(self, "exception during creation", e) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 4d32c3c0b1..b84057b749 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -436,11 +436,27 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf protected def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() { def uncaughtException(thread: Thread, cause: Throwable): Unit = { - log.error(cause, "Uncaught error from thread [{}]", thread.getName) cause match { - case NonFatal(_) | _: InterruptedException ⇒ - case _ if settings.JvmExitOnFatalError ⇒ System.exit(-1) - case _ ⇒ shutdown() + case NonFatal(_) | _: InterruptedException ⇒ log.error(cause, "Uncaught error from thread [{}]", thread.getName) + case _ ⇒ + if (settings.JvmExitOnFatalError) { + try { + log.error(cause, "Uncaught error from thread [{}] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled", thread.getName) + import System.err + err.print("Uncaught error from thread [") + err.print(thread.getName) + err.print("] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[") + err.print(name) + err.println("]") + cause.printStackTrace(System.err) + System.err.flush() + } finally { + System.exit(-1) + } + } else { + log.error(cause, "Uncaught fatal error from thread [{}] shutting down ActorSystem [{}]", thread.getName, name) + shutdown() + } } } } diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index b277142e76..3d1f8930c4 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -172,7 +172,7 @@ object FSM { * timerActive_? ("tock") * */ -trait FSM[S, D] extends Listeners { +trait FSM[S, D] extends Listeners with ActorLogging { this: Actor ⇒ import FSM._ @@ -186,8 +186,6 @@ trait FSM[S, D] extends Listeners { val -> = FSM.-> val StateTimeout = FSM.StateTimeout - val log = Logging(context.system, this) - /** * **************************************** * DSL @@ -255,6 +253,13 @@ trait FSM[S, D] extends Listeners { */ protected final def stop(reason: Reason, stateData: D): State = stay using stateData withStopReason (reason) + protected final class TransformHelper(func: StateFunction) { + def using(andThen: PartialFunction[State, State]): StateFunction = + func andThen (andThen orElse { case x ⇒ x }) + } + + protected final def transform(func: StateFunction): TransformHelper = new TransformHelper(func) + /** * Schedule named timer to deliver message after given delay, possibly repeating. * @param name identifier to be used with cancelTimer() @@ -327,7 +332,7 @@ trait FSM[S, D] extends Listeners { * Convenience wrapper for using a total function instead of a partial * function literal. To be used with onTransition. */ - implicit protected final def total2pf(transitionHandler: (S, S) ⇒ Unit) = + implicit protected final def total2pf(transitionHandler: (S, S) ⇒ Unit): TransitionHandler = new TransitionHandler { def isDefinedAt(in: (S, S)) = true def apply(in: (S, S)) { transitionHandler(in._1, in._2) } @@ -336,7 +341,7 @@ trait FSM[S, D] extends Listeners { /** * Set handler which is called upon termination of this FSM actor. */ - protected final def onTermination(terminationHandler: PartialFunction[StopEvent[S, D], Unit]): Unit = + protected final def onTermination(terminationHandler: PartialFunction[StopEvent, Unit]): Unit = terminateEvent = terminationHandler /** @@ -415,7 +420,7 @@ trait FSM[S, D] extends Listeners { /* * termination handling */ - private var terminateEvent: PartialFunction[StopEvent[S, D], Unit] = NullFunction + private var terminateEvent: PartialFunction[StopEvent, Unit] = NullFunction /* * transition handling @@ -443,10 +448,10 @@ trait FSM[S, D] extends Listeners { timeoutFuture = None } generation += 1 - processMsg(msg, t) if (!repeat) { timers -= name } + processMsg(msg, t) } case SubscribeTransitionCallBack(actorRef) ⇒ // TODO use DeathWatch to clean up list @@ -538,7 +543,7 @@ trait FSM[S, D] extends Listeners { case class Event(event: Any, stateData: D) - case class StopEvent[S, D](reason: Reason, currentState: S, stateData: D) + case class StopEvent(reason: Reason, currentState: S, stateData: D) } /** diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index b78c0a5eb4..3751898c5c 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -43,14 +43,14 @@ object Props { * Scala API. */ def apply[T <: Actor: ClassManifest]: Props = - default.withCreator(implicitly[ClassManifest[T]].erasure.asInstanceOf[Class[_ <: Actor]].newInstance) + default.withCreator(implicitly[ClassManifest[T]].erasure.asInstanceOf[Class[_ <: Actor]]) /** * Returns a Props that has default values except for "creator" which will be a function that creates an instance * of the supplied class using the default constructor. */ def apply(actorClass: Class[_ <: Actor]): Props = - default.withCreator(actorClass.newInstance) + default.withCreator(actorClass) /** * Returns a Props that has default values except for "creator" which will be a function that creates an instance @@ -70,7 +70,6 @@ object Props { def apply(behavior: ActorContext ⇒ Actor.Receive): Props = apply(new Actor { def receive = behavior(context) }) - } /** diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index f775042566..4d85542d36 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -8,12 +8,14 @@ import akka.japi.{ Creator, Option ⇒ JOption } import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy } import akka.util.{ Timeout, NonFatal } import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar } -import akka.serialization.{ Serialization, SerializationExtension } import akka.dispatch._ import java.util.concurrent.TimeoutException import java.util.concurrent.TimeUnit.MILLISECONDS import java.lang.IllegalStateException import akka.util.Duration +import akka.actor.TypedActor.TypedActorInvocationHandler +import akka.serialization.{ JavaSerializer, Serialization, SerializationExtension } +import java.io.ObjectStreamException trait TypedActorFactory { @@ -124,7 +126,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi } } catch { case i: InvocationTargetException ⇒ throw i.getTargetException } - private def writeReplace(): AnyRef = parameters match { + @throws(classOf[ObjectStreamException]) private def writeReplace(): AnyRef = parameters match { case null ⇒ SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, null) case ps if ps.length == 0 ⇒ SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, Array()) case ps ⇒ @@ -148,7 +150,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi //TODO implement writeObject and readObject to serialize //TODO Possible optimization is to special encode the parameter-types to conserve space - private def readResolve(): AnyRef = { + @throws(classOf[ObjectStreamException]) private def readResolve(): AnyRef = { val system = akka.serialization.JavaSerializer.currentSystem.value if (system eq null) throw new IllegalStateException( "Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." + @@ -369,9 +371,8 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi def postRestart(reason: Throwable): Unit } - private[akka] class TypedActorInvocationHandler(val extension: TypedActorExtension, val actorVar: AtomVar[ActorRef], val timeout: Timeout) extends InvocationHandler { + private[akka] class TypedActorInvocationHandler(@transient val extension: TypedActorExtension, @transient val actorVar: AtomVar[ActorRef], @transient val timeout: Timeout) extends InvocationHandler with Serializable { def actor = actorVar.get - @throws(classOf[Throwable]) def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match { case "toString" ⇒ actor.toString @@ -392,6 +393,17 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi case m ⇒ Await.result(ask(actor, m)(timeout), timeout.duration).asInstanceOf[AnyRef] } } + @throws(classOf[ObjectStreamException]) private def writeReplace(): AnyRef = SerializedTypedActorInvocationHandler(actor, timeout.duration) + } + + private[akka] case class SerializedTypedActorInvocationHandler(val actor: ActorRef, val timeout: Duration) { + @throws(classOf[ObjectStreamException]) private def readResolve(): AnyRef = JavaSerializer.currentSystem.value match { + case null ⇒ throw new IllegalStateException("SerializedTypedActorInvocationHandler.readResolve requires that JavaSerializer.currentSystem.value is set to a non-null value") + case some ⇒ toTypedActorInvocationHandler(some) + } + + def toTypedActorInvocationHandler(system: ActorSystem): TypedActorInvocationHandler = + new TypedActorInvocationHandler(TypedActor(system), new AtomVar[ActorRef](actor), new Timeout(timeout)) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index eea0ecbeef..54ec2d08b4 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -819,21 +819,6 @@ trait Promise[T] extends Future[T] { } fr } - - final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ - val fr = Promise[Any]() - val f = stream.dequeue(this) - f.onComplete { _ ⇒ - try { - fr completeWith cont(f) - } catch { - case NonFatal(e) ⇒ - executor.reportFailure(new LogEventException(Debug("Future", getClass, e.getMessage), e)) - fr failure e - } - } - fr - } } //Companion object to FState, just to provide a cheap, immutable default entry diff --git a/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala b/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala deleted file mode 100644 index 882219f84d..0000000000 --- a/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala +++ /dev/null @@ -1,260 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.dispatch - -import java.util.concurrent.atomic.AtomicReference -import scala.util.continuations._ -import scala.annotation.tailrec -import akka.util.Timeout - -object PromiseStream { - def apply[A]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): PromiseStream[A] = new PromiseStream[A] - def apply[A](timeout: Long)(implicit dispatcher: MessageDispatcher): PromiseStream[A] = new PromiseStream[A]()(dispatcher, Timeout(timeout)) - - private sealed trait State - private case object Normal extends State - private case object Pending extends State - private case object Busy extends State -} - -trait PromiseStreamOut[A] { - self ⇒ - - def dequeue(): Future[A] - - def dequeue(promise: Promise[A]): Future[A] - - def apply(): A @cps[Future[Any]] - - def apply(promise: Promise[A]): A @cps[Future[Any]] - - final def map[B](f: (A) ⇒ B)(implicit timeout: Timeout): PromiseStreamOut[B] = new PromiseStreamOut[B] { - - def dequeue(): Future[B] = self.dequeue().map(f) - - def dequeue(promise: Promise[B]): Future[B] = self.dequeue().flatMap(a ⇒ promise.complete(Right(f(a)))) - - def apply(): B @cps[Future[Any]] = this.dequeue().apply() - - def apply(promise: Promise[B]): B @cps[Future[Any]] = this.dequeue(promise).apply() - - } - -} - -trait PromiseStreamIn[A] { - - def enqueue(elem: A): Unit - - final def enqueue(elem1: A, elem2: A, elems: A*): Unit = - this += elem1 += elem2 ++= elems - - final def enqueue(elem: Future[A]): Unit = - elem foreach (enqueue(_)) - - final def enqueue(elem1: Future[A], elem2: Future[A], elems: Future[A]*) { - this += elem1 += elem2 - elems foreach (enqueue(_)) - } - - final def +=(elem: A): this.type = { - enqueue(elem) - this - } - - final def +=(elem1: A, elem2: A, elems: A*): this.type = { - enqueue(elem1, elem2, elems: _*) - this - } - - final def +=(elem: Future[A]): this.type = { - enqueue(elem) - this - } - - final def +=(elem1: Future[A], elem2: Future[A], elems: Future[A]*): this.type = { - enqueue(elem1, elem2, elems: _*) - this - } - - final def ++=(elem: Traversable[A]): this.type = { - elem foreach enqueue - this - } - - final def ++=(elem: Future[Traversable[A]]): this.type = { - elem foreach (this ++= _) - this - } - - def <<(elem: A): PromiseStreamIn[A] @cps[Future[Any]] - - def <<(elem1: A, elem2: A, elems: A*): PromiseStreamIn[A] @cps[Future[Any]] - - def <<(elem: Future[A]): PromiseStreamIn[A] @cps[Future[Any]] - - def <<(elem1: Future[A], elem2: Future[A], elems: Future[A]*): PromiseStreamIn[A] @cps[Future[Any]] - - def <<<(elems: Traversable[A]): PromiseStreamIn[A] @cps[Future[Any]] - - def <<<(elems: Future[Traversable[A]]): PromiseStreamIn[A] @cps[Future[Any]] - -} - -class PromiseStream[A](implicit val dispatcher: MessageDispatcher, val timeout: Timeout) extends PromiseStreamOut[A] with PromiseStreamIn[A] { - import PromiseStream.{ State, Normal, Pending, Busy } - - private val _elemOut: AtomicReference[List[A]] = new AtomicReference(Nil) - private val _elemIn: AtomicReference[List[A]] = new AtomicReference(Nil) - private val _pendOut: AtomicReference[List[Promise[A]]] = new AtomicReference(null) - private val _pendIn: AtomicReference[List[Promise[A]]] = new AtomicReference(null) - private val _state: AtomicReference[State] = new AtomicReference(Normal) - - @tailrec - final def apply(): A @cps[Future[Any]] = - if (_state.get eq Normal) { - val eo = _elemOut.get - if (eo eq null) apply() - else { - if (eo.nonEmpty) { - if (_elemOut.compareAndSet(eo, eo.tail)) shift { cont: (A ⇒ Future[Any]) ⇒ cont(eo.head) } - else apply() - } else apply(Promise[A]) - } - } else apply(Promise[A]) - - final def apply(promise: Promise[A]): A @cps[Future[Any]] = - shift { cont: (A ⇒ Future[Any]) ⇒ dequeue(promise) flatMap cont } - - @tailrec - final def enqueue(elem: A): Unit = _state.get match { - case Normal ⇒ - val ei = _elemIn.get - if (ei eq null) enqueue(elem) - else if (!_elemIn.compareAndSet(ei, elem :: ei)) enqueue(elem) - - case Pending ⇒ - val po = _pendOut.get - if (po eq null) enqueue(elem) - else { - if (po.isEmpty) { - if (_state.compareAndSet(Pending, Busy)) { - var nextState: State = Pending - try { - val pi = _pendIn.get - if (pi ne null) { - if (pi.isEmpty) { - if (_pendIn.compareAndSet(Nil, null)) { - if (_pendOut.compareAndSet(Nil, null)) { - _elemIn.set(Nil) - _elemOut.set(List(elem)) - nextState = Normal - } else { - _pendIn.set(Nil) - } - } - } else { - if (_pendOut.get eq Nil) _pendOut.set(_pendIn.getAndSet(Nil).reverse) - } - } - } finally { - _state.set(nextState) - } - if (nextState eq Pending) enqueue(elem) - } else enqueue(elem) - } else { - if (_pendOut.compareAndSet(po, po.tail)) { - po.head success elem - if (!po.head.isCompleted) enqueue(elem) - } else enqueue(elem) - } - } - - case Busy ⇒ - enqueue(elem) - } - - @tailrec - final def dequeue(): Future[A] = - if (_state.get eq Normal) { - val eo = _elemOut.get - if (eo eq null) dequeue() - else { - if (eo.nonEmpty) { - if (_elemOut.compareAndSet(eo, eo.tail)) Promise.successful(eo.head) - else dequeue() - } else dequeue(Promise[A]) - } - } else dequeue(Promise[A]) - - @tailrec - final def dequeue(promise: Promise[A]): Future[A] = _state.get match { - case Pending ⇒ - val pi = _pendIn.get - if ((pi ne null) && _pendIn.compareAndSet(pi, promise :: pi)) promise else dequeue(promise) - - case Normal ⇒ - val eo = _elemOut.get - if (eo eq null) dequeue(promise) - else { - if (eo.isEmpty) { - if (_state.compareAndSet(Normal, Busy)) { - var nextState: State = Normal - try { - val ei = _elemIn.get - if (ei ne null) { - if (ei.isEmpty) { - if (_elemIn.compareAndSet(Nil, null)) { - if (_elemOut.compareAndSet(Nil, null)) { - _pendIn.set(Nil) - _pendOut.set(List(promise)) - nextState = Pending - } else { - _elemIn.set(Nil) - } - } - } else { - if (_elemOut.get eq Nil) _elemOut.set(_elemIn.getAndSet(Nil).reverse) - } - } - } finally { - _state.set(nextState) - } - if (nextState eq Normal) dequeue(promise) - else promise - } else dequeue(promise) - } else { - if (_elemOut.compareAndSet(eo, eo.tail)) { - promise success eo.head - } else dequeue(promise) - } - } - - case Busy ⇒ - dequeue(promise) - } - - final def <<(elem: A): PromiseStream[A] @cps[Future[Any]] = - shift { cont: (PromiseStream[A] ⇒ Future[Any]) ⇒ cont(this += elem) } - - final def <<(elem1: A, elem2: A, elems: A*): PromiseStream[A] @cps[Future[Any]] = - shift { cont: (PromiseStream[A] ⇒ Future[Any]) ⇒ cont(this += (elem1, elem2, elems: _*)) } - - final def <<(elem: Future[A]): PromiseStream[A] @cps[Future[Any]] = - shift { cont: (PromiseStream[A] ⇒ Future[Any]) ⇒ elem map (a ⇒ cont(this += a)) } - - final def <<(elem1: Future[A], elem2: Future[A], elems: Future[A]*): PromiseStream[A] @cps[Future[Any]] = - shift { cont: (PromiseStream[A] ⇒ Future[Any]) ⇒ - val seq = Future.sequence(elem1 +: elem2 +: elems) - seq map (a ⇒ cont(this ++= a)) - } - - final def <<<(elems: Traversable[A]): PromiseStream[A] @cps[Future[Any]] = - shift { cont: (PromiseStream[A] ⇒ Future[Any]) ⇒ cont(this ++= elems) } - - final def <<<(elems: Future[Traversable[A]]): PromiseStream[A] @cps[Future[Any]] = - shift { cont: (PromiseStream[A] ⇒ Future[Any]) ⇒ elems map (as ⇒ cont(this ++= as)) } - -} diff --git a/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala index 2b4de717d3..fef69ec6db 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala @@ -35,7 +35,7 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w anotherAwaiting.verifyActivated() } - "ActivationTracker send activation message even if activation happened earlier" in { + "ActivationTracker send activation message even if activation happened earlier" taggedAs TimingTest in { publish(EndpointActivated(actor.ref)) Thread.sleep(50) awaiting.awaitActivation() @@ -43,7 +43,7 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w awaiting.verifyActivated() } - "ActivationTracker send activation message even if actor is already deactivated" in { + "ActivationTracker send activation message even if actor is already deactivated" taggedAs TimingTest in { publish(EndpointActivated(actor.ref)) publish(EndpointDeActivated(actor.ref)) Thread.sleep(50) @@ -52,7 +52,7 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w awaiting.verifyActivated() } - "ActivationTracker forwards de-activation message to all awaiting parties" in { + "ActivationTracker forwards de-activation message to all awaiting parties" taggedAs TimingTest in { given("Actor is activated") publish(EndpointActivated(actor.ref)) given("Actor is deactivated") @@ -67,7 +67,7 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w anotherAwaiting.verifyDeActivated() } - "ActivationTracker forwards de-activation message even if deactivation happened earlier" in { + "ActivationTracker forwards de-activation message even if deactivation happened earlier" taggedAs TimingTest in { given("Actor is activated") publish(EndpointActivated(actor.ref)) @@ -81,7 +81,7 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w awaiting.verifyDeActivated() } - "ActivationTracker forwards de-activation message even if someone awaits de-activation even before activation happens" in { + "ActivationTracker forwards de-activation message even if someone awaits de-activation even before activation happens" taggedAs TimingTest in { given("Someone is awaiting de-activation") val awaiting = new Awaiting(actor) awaiting.awaitDeActivation() @@ -96,14 +96,14 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w awaiting.verifyDeActivated() } - "ActivationTracker sends activation failure when failed to activate" in { + "ActivationTracker sends activation failure when failed to activate" taggedAs TimingTest in { awaiting.awaitActivation() publish(EndpointFailedToActivate(actor.ref, cause)) awaiting.verifyFailedToActivate() } - "ActivationTracker sends de-activation failure when failed to de-activate" in { + "ActivationTracker sends de-activation failure when failed to de-activate" taggedAs TimingTest in { publish(EndpointActivated(actor.ref)) awaiting.awaitDeActivation() publish(EndpointFailedToDeActivate(actor.ref, cause)) @@ -111,7 +111,7 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w awaiting.verifyFailedToDeActivate() } - "ActivationTracker sends activation message even if it failed to de-activate" in { + "ActivationTracker sends activation message even if it failed to de-activate" taggedAs TimingTest in { publish(EndpointActivated(actor.ref)) publish(EndpointFailedToDeActivate(actor.ref, cause)) awaiting.awaitActivation() diff --git a/akka-docs/intro/getting-started-first-java.rst b/akka-docs/intro/getting-started-first-java.rst index 9f8544f922..e6deb34217 100644 --- a/akka-docs/intro/getting-started-first-java.rst +++ b/akka-docs/intro/getting-started-first-java.rst @@ -385,6 +385,12 @@ prints the result and shuts down the ``ActorSystem``. .. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java#result-listener +Please note that shutting down the actor system should be done by that part of +the application which can safely determine that everything has been said and +done. In this case, it is the Listener actor, but in other scenarios it might +be the main thread or some other external service. It is by no means required +to call ``system.shutdown()`` from within that system. + Bootstrap the calculation ------------------------- diff --git a/akka-docs/intro/getting-started-first-scala.rst b/akka-docs/intro/getting-started-first-scala.rst index 91bed34b78..0b6492314c 100644 --- a/akka-docs/intro/getting-started-first-scala.rst +++ b/akka-docs/intro/getting-started-first-scala.rst @@ -417,6 +417,12 @@ prints the result and shuts down the ``ActorSystem``. .. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/scala/akka/tutorial/first/scala/Pi.scala#result-listener +Please note that shutting down the actor system should be done by that part of +the application which can safely determine that everything has been said and +done. In this case, it is the Listener actor, but in other scenarios it might +be the main thread or some other external service. It is by no means required +to call ``system.shutdown()`` from within that system. + Bootstrap the calculation ========================= diff --git a/akka-docs/java/code/akka/docs/actor/FirstUntypedActor.java b/akka-docs/java/code/akka/docs/actor/FirstUntypedActor.java index 8fb12df4f7..6cfbe75b99 100644 --- a/akka-docs/java/code/akka/docs/actor/FirstUntypedActor.java +++ b/akka-docs/java/code/akka/docs/actor/FirstUntypedActor.java @@ -5,7 +5,7 @@ package akka.docs.actor; import akka.actor.ActorRef; import akka.actor.Props; -import static akka.actor.Actors.*; +import akka.actor.PoisonPill; import akka.actor.UntypedActor; //#context-actorOf @@ -16,6 +16,6 @@ public class FirstUntypedActor extends UntypedActor { public void onReceive(Object message) { myActor.forward(message, getContext()); - myActor.tell(poisonPill()); + myActor.tell(PoisonPill.getInstance()); } } diff --git a/akka-docs/java/code/akka/docs/actor/MyReceivedTimeoutUntypedActor.java b/akka-docs/java/code/akka/docs/actor/MyReceivedTimeoutUntypedActor.java index 9131941851..97742d9bd1 100644 --- a/akka-docs/java/code/akka/docs/actor/MyReceivedTimeoutUntypedActor.java +++ b/akka-docs/java/code/akka/docs/actor/MyReceivedTimeoutUntypedActor.java @@ -4,7 +4,6 @@ package akka.docs.actor; //#receive-timeout -import akka.actor.Actors; import akka.actor.ReceiveTimeout; import akka.actor.UntypedActor; import akka.util.Duration; @@ -18,7 +17,7 @@ public class MyReceivedTimeoutUntypedActor extends UntypedActor { public void onReceive(Object message) { if (message.equals("Hello")) { getSender().tell("Hello world"); - } else if (message == Actors.receiveTimeout()) { + } else if (message == ReceiveTimeout.getInstance()) { throw new RuntimeException("received timeout"); } else { unhandled(message); diff --git a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java index 1237797b62..65ff37c10e 100644 --- a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java @@ -19,7 +19,8 @@ import akka.util.Timeout; //#import-future //#import-actors -import static akka.actor.Actors.*; +import akka.actor.PoisonPill; +import akka.actor.Kill; //#import-actors //#import-procedure @@ -158,7 +159,7 @@ public class UntypedActorDocTestBase { ActorSystem system = ActorSystem.create("MySystem"); ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class)); //#poison-pill - myActor.tell(poisonPill()); + myActor.tell(PoisonPill.getInstance()); //#poison-pill system.shutdown(); } @@ -168,7 +169,7 @@ public class UntypedActorDocTestBase { ActorSystem system = ActorSystem.create("MySystem"); ActorRef victim = system.actorOf(new Props(MyUntypedActor.class)); //#kill - victim.tell(kill()); + victim.tell(Kill.getInstance()); //#kill system.shutdown(); } diff --git a/akka-docs/java/code/akka/docs/actor/japi/FaultHandlingDocSample.java b/akka-docs/java/code/akka/docs/actor/japi/FaultHandlingDocSample.java index db39a5d663..4ba8358174 100644 --- a/akka-docs/java/code/akka/docs/actor/japi/FaultHandlingDocSample.java +++ b/akka-docs/java/code/akka/docs/actor/japi/FaultHandlingDocSample.java @@ -72,7 +72,7 @@ public class FaultHandlingDocSample { log.info("That's all, shutting down"); getContext().system().shutdown(); } - } else if (msg == Actors.receiveTimeout()) { + } else if (msg == ReceiveTimeout.getInstance()) { // No progress within 15 seconds, ServiceUnavailable log.error("Shutting down due to unavailable service"); getContext().system().shutdown(); diff --git a/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java b/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java index 5c3bc7b8ad..f080dd52b9 100644 --- a/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java +++ b/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java @@ -4,27 +4,22 @@ package akka.docs.dispatcher; //#imports +import akka.actor.*; import akka.actor.ActorRef; import akka.actor.Props; -import akka.dispatch.MessageDispatcher; - +import akka.actor.UntypedActor; +import akka.actor.UntypedActorFactory; //#imports //#imports-prio -import akka.actor.UntypedActor; -import akka.actor.UntypedActorFactory; -import akka.actor.Actors; import akka.event.Logging; import akka.event.LoggingAdapter; //#imports-prio //#imports-prio-mailbox -import akka.actor.ActorContext; import akka.dispatch.PriorityGenerator; import akka.dispatch.UnboundedPriorityMailbox; -import akka.dispatch.MailboxType; -import akka.dispatch.MessageQueue; import com.typesafe.config.Config; //#imports-prio-mailbox @@ -37,7 +32,6 @@ import static org.junit.Assert.*; import com.typesafe.config.ConfigFactory; -import akka.actor.ActorSystem; import akka.docs.actor.MyUntypedActor; import akka.docs.actor.UntypedActorDocTestBase.MyActor; import akka.testkit.AkkaSpec; @@ -93,7 +87,7 @@ public class DispatcherDocTestBase { getSelf().tell("pigdog2"); getSelf().tell("pigdog3"); getSelf().tell("highpriority"); - getSelf().tell(Actors.poisonPill()); + getSelf().tell(PoisonPill.getInstance()); } public void onReceive(Object message) { @@ -133,7 +127,7 @@ public class DispatcherDocTestBase { return 0; // 'highpriority messages should be treated first if possible else if (message.equals("lowpriority")) return 2; // 'lowpriority messages should be treated last if possible - else if (message.equals(Actors.poisonPill())) + else if (message.equals(PoisonPill.getInstance())) return 3; // PoisonPill when no other left else return 1; // By default they go between high and low prio diff --git a/akka-docs/scala/camel.rst b/akka-docs/scala/camel.rst index b3fc5e5f2a..9c24d123f0 100644 --- a/akka-docs/scala/camel.rst +++ b/akka-docs/scala/camel.rst @@ -5,6 +5,8 @@ Camel ####### +Additional Resources +==================== For an introduction to akka-camel 2, see also the Peter Gabryanczyk's talk `Migrating akka-camel module to Akka 2.x`_. For an introduction to akka-camel 1, see also the `Appendix E - Akka and Camel`_ @@ -32,7 +34,9 @@ actor API, actors can now exchange messages with other systems over large number of protocols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a few. At the moment, approximately 80 protocols and APIs are supported. -The akka-camel module is based on `Apache Camel`_, a powerful and leight-weight +Apache Camel +------------ +The akka-camel module is based on `Apache Camel`_, a powerful and light-weight integration framework for the JVM. For an introduction to Apache Camel you may want to read this `Apache Camel article`_. Camel comes with a large number of `components`_ that provide bindings to different protocols and @@ -43,6 +47,8 @@ APIs. The `camel-extra`_ project provides further components. .. _components: http://camel.apache.org/components.html .. _camel-extra: http://code.google.com/p/camel-extra/ +Consumer +-------- Usage of Camel's integration components in Akka is essentially a one-liner. Here's an example. @@ -60,16 +66,20 @@ component`_), only the actor's endpointUri method must be changed. .. includecode:: code/akka/docs/camel/Introduction.scala#Consumer +Producer +-------- Actors can also trigger message exchanges with external systems i.e. produce to Camel endpoints. .. includecode:: code/akka/docs/camel/Introduction.scala :include: imports,Producer -In the above example, any message sent to this actor will be added (produced) to -the example JMS queue. Producer actors may choose from the same set of Camel +In the above example, any message sent to this actor will be sent to +the JMS queue ``orders``. Producer actors may choose from the same set of Camel components as Consumer actors do. +CamelMessage +------------ The number of Camel components is constantly increasing. The akka-camel module can support these in a plug-and-play manner. Just add them to your application's classpath, define a component-specific endpoint URI and use it to exchange @@ -83,3 +93,66 @@ representations which are used by Consumer and Producer actors for pattern matching, transformation, serialization or storage. __ https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Message.java + + +Dependencies +============ + +SBT +--- +.. code-block:: scala + + "com.typesafe.akka" % "akka-camel" % "2.1-SNAPSHOT" + +Maven +----- +.. code-block:: xml + + + com.typesafe.akka + akka-camel + 2.1-SNAPSHOT + + +.. _camel-consumer-actors: + + +Consumer Actors +================ + +For objects to receive messages, they must mixin the `Consumer`_ +trait. For example, the following actor class (Consumer1) implements the +endpointUri method, which is declared in the Consumer trait, in order to receive +messages from the ``file:data/input/actor`` Camel endpoint. + +.. _Consumer: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/Consumer.scala + +.. includecode:: code/akka/docs/camel/Consumers.scala#Consumer1 + +Whenever a file is put into the data/input/actor directory, its content is +picked up by the Camel `file component`_ and sent as message to the +actor. Messages consumed by actors from Camel endpoints are of type +`CamelMessage`_. These are immutable representations of Camel messages. + +.. _file component: http://camel.apache.org/file2.html +.. _Message: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/CamelMessage.scala + + +Here's another example that sets the endpointUri to +``jetty:http://localhost:8877/camel/default``. It causes Camel's `Jetty +component`_ to start an embedded `Jetty`_ server, accepting HTTP connections +from localhost on port 8877. + +.. _Jetty component: http://camel.apache.org/jetty.html +.. _Jetty: http://www.eclipse.org/jetty/ + +.. includecode:: code/akka/docs/camel/Consumers.scala#Consumer2 + +After starting the actor, clients can send messages to that actor by POSTing to +``http://localhost:8877/camel/default``. The actor sends a response by using the +self.reply method (Scala). For returning a message body and headers to the HTTP +client the response type should be `Message`_. For any other response type, a +new Message object is created by akka-camel with the actor response as message +body. + +.. _Message: http://github.com/akka/akka/blob/master/akka-camel/src/main/scala/akka/camel/CamelMessage.scala diff --git a/akka-docs/scala/code/akka/docs/actor/FSMDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/FSMDocSpec.scala index 684d0eea3e..158f8979a0 100644 --- a/akka-docs/scala/code/akka/docs/actor/FSMDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/FSMDocSpec.scala @@ -37,11 +37,15 @@ class FSMDocSpec extends AkkaSpec { //#simple-fsm class Buncher extends Actor with FSM[State, Data] { + //#fsm-body startWith(Idle, Uninitialized) + //#when-syntax when(Idle) { - case Event(SetTarget(ref), Uninitialized) ⇒ stay using Todo(ref, Vector.empty) + case Event(SetTarget(ref), Uninitialized) ⇒ + stay using Todo(ref, Vector.empty) } + //#when-syntax //#transition-elided onTransition { @@ -51,10 +55,13 @@ class FSMDocSpec extends AkkaSpec { } } //#transition-elided + //#when-syntax when(Active, stateTimeout = 1 second) { - case Event(Flush | FSM.StateTimeout, t: Todo) ⇒ goto(Idle) using t.copy(queue = Vector.empty) + case Event(Flush | StateTimeout, t: Todo) ⇒ + goto(Idle) using t.copy(queue = Vector.empty) } + //#when-syntax //#unhandled-elided whenUnhandled { @@ -67,10 +74,116 @@ class FSMDocSpec extends AkkaSpec { stay } //#unhandled-elided + //#fsm-body initialize } //#simple-fsm + object DemoCode { + trait StateType + case object SomeState extends StateType + case object Processing extends StateType + case object Error extends StateType + case object Idle extends StateType + case object Active extends StateType + + class Dummy extends Actor with FSM[StateType, Int] { + class X + val newData = 42 + object WillDo + object Tick + + //#modifier-syntax + when(SomeState) { + case Event(msg, _) ⇒ + goto(Processing) using (newData) forMax (5 seconds) replying (WillDo) + } + //#modifier-syntax + + //#transition-syntax + onTransition { + case Idle -> Active ⇒ setTimer("timeout", Tick, 1 second, true) + case Active -> _ ⇒ cancelTimer("timeout") + case x -> Idle ⇒ log.info("entering Idle from " + x) + } + //#transition-syntax + + //#alt-transition-syntax + onTransition(handler _) + + def handler(from: StateType, to: StateType) { + // handle it here ... + } + //#alt-transition-syntax + + //#stop-syntax + when(Error) { + case Event("stop", _) ⇒ + // do cleanup ... + stop() + } + //#stop-syntax + + //#transform-syntax + when(SomeState)(transform { + case Event(bytes: Array[Byte], read) ⇒ stay using (read + bytes.length) + case Event(bytes: List[Byte], read) ⇒ stay using (read + bytes.size) + } using { + case s @ FSM.State(state, read, timeout, stopReason, replies) if read > 1000 ⇒ + goto(Processing) + }) + //#transform-syntax + + //#alt-transform-syntax + val processingTrigger: PartialFunction[State, State] = { + case s @ FSM.State(state, read, timeout, stopReason, replies) if read > 1000 ⇒ + goto(Processing) + } + + when(SomeState)(transform { + case Event(bytes: Array[Byte], read) ⇒ stay using (read + bytes.length) + case Event(bytes: List[Byte], read) ⇒ stay using (read + bytes.size) + } using processingTrigger) + //#alt-transform-syntax + + //#termination-syntax + onTermination { + case StopEvent(FSM.Normal, state, data) ⇒ // ... + case StopEvent(FSM.Shutdown, state, data) ⇒ // ... + case StopEvent(FSM.Failure(cause), state, data) ⇒ // ... + } + //#termination-syntax + + //#unhandled-syntax + whenUnhandled { + case Event(x: X, data) ⇒ + log.info("Received unhandled event: " + x) + stay + case Event(msg, _) ⇒ + log.warning("Received unknown event: " + msg) + goto(Error) + } + //#unhandled-syntax + + } + + //#logging-fsm + import akka.actor.LoggingFSM + class MyFSM extends Actor with LoggingFSM[StateType, Data] { + //#body-elided + override def logDepth = 12 + onTermination { + case StopEvent(FSM.Failure(_), state, data) ⇒ + val lastEvents = getLog.mkString("\n\t") + log.warning("Failure in state " + state + " with data " + data + "\n" + + "Events leading up to this point:\n\t" + lastEvents) + } + // ... + //#body-elided + } + //#logging-fsm + + } //#fsm-code-elided "batch correctly" in { diff --git a/akka-docs/scala/code/akka/docs/camel/Consumers.scala b/akka-docs/scala/code/akka/docs/camel/Consumers.scala new file mode 100644 index 0000000000..90f181df3f --- /dev/null +++ b/akka-docs/scala/code/akka/docs/camel/Consumers.scala @@ -0,0 +1,30 @@ +package akka.docs.camel + +object Consumers { + { + //#Consumer1 + import akka.camel.{ CamelMessage, Consumer } + + class Consumer1 extends Consumer { + def endpointUri = "file:data/input/actor" + + def receive = { + case msg: CamelMessage ⇒ println("received %s" format msg.bodyAs[String]) + } + } + //#Consumer1 + } + { + //#Consumer2 + import akka.camel.{ CamelMessage, Consumer } + + class Consumer2 extends Consumer { + def endpointUri = "jetty:http://localhost:8877/camel/default" + + def receive = { + case msg: CamelMessage ⇒ sender ! ("Hello %s" format msg.bodyAs[String]) + } + } + //#Consumer2 + } +} \ No newline at end of file diff --git a/akka-docs/scala/code/akka/docs/camel/Introduction.scala b/akka-docs/scala/code/akka/docs/camel/Introduction.scala index 38546c9f41..4899843a27 100644 --- a/akka-docs/scala/code/akka/docs/camel/Introduction.scala +++ b/akka-docs/scala/code/akka/docs/camel/Introduction.scala @@ -1,11 +1,11 @@ package akka.docs.camel -object wrapper { +object Introduction { { //#Consumer-mina import akka.camel.{ CamelMessage, Consumer } - class MyActor extends Consumer { + class MinaClient extends Consumer { def endpointUri = "mina:tcp://localhost:6200?textline=true" def receive = { @@ -18,14 +18,14 @@ object wrapper { import akka.actor.{ ActorSystem, Props } val sys = ActorSystem("camel") - val myActor = sys.actorOf(Props[MyActor]) + val mina = sys.actorOf(Props[MinaClient]) //#Consumer-mina } { //#Consumer import akka.camel.{ CamelMessage, Consumer } - class MyActor extends Consumer { + class JettyAdapter extends Consumer { def endpointUri = "jetty:http://localhost:8877/example" def receive = { @@ -39,10 +39,16 @@ object wrapper { //#Producer import akka.actor.Actor import akka.camel.{ Producer, Oneway } + import akka.actor.{ ActorSystem, Props } - class MyActor extends Actor with Producer with Oneway { - def endpointUri = "jms:queue:example" + class Orders extends Actor with Producer with Oneway { + def endpointUri = "jms:queue:Orders" } + + val sys = ActorSystem("camel") + val orders = sys.actorOf(Props[Orders]) + + orders ! //#Producer } } \ No newline at end of file diff --git a/akka-docs/scala/fsm.rst b/akka-docs/scala/fsm.rst index 0dcc12ed67..807cd7567c 100644 --- a/akka-docs/scala/fsm.rst +++ b/akka-docs/scala/fsm.rst @@ -118,19 +118,11 @@ The FSM Trait and Object The :class:`FSM` trait may only be mixed into an :class:`Actor`. Instead of extending :class:`Actor`, the self type approach was chosen in order to make it -obvious that an actor is actually created. Importing all members of the -:obj:`FSM` object is recommended if you want to directly access the symbols -like :obj:`StateTimeout`. This import is usually placed inside the state -machine definition: +obvious that an actor is actually created: -.. code-block:: scala - - class MyFSM extends Actor with FSM[State, Data] { - import FSM._ - - ... - - } +.. includecode:: code/akka/docs/actor/FSMDocSpec.scala + :include: simple-fsm + :exclude: fsm-body The :class:`FSM` trait takes two type parameters: @@ -153,7 +145,7 @@ Defining States A state is defined by one or more invocations of the method - :func:`when([, stateTimeout = ])(stateFunction)`. + :func:`when([, stateTimeout = ])(stateFunction)`. The given name must be an object which is type-compatible with the first type parameter given to the :class:`FSM` trait. This object is used as a hash key, @@ -165,27 +157,18 @@ If the :meth:`stateTimeout` parameter is given, then all transitions into this state, including staying, receive this timeout by default. Initiating the transition with an explicit timeout may be used to override this default, see `Initiating Transitions`_ for more information. The state timeout of any state -may be changed during action processing with :func:`setStateTimeout(state, -duration)`. This enables runtime configuration e.g. via external message. +may be changed during action processing with +:func:`setStateTimeout(state, duration)`. This enables runtime configuration +e.g. via external message. -The :meth:`stateFunction` argument is a :class:`PartialFunction[Event, State]`, +The :meth:`stateFunction` argument is a :class:`PartialFunction[Event, State]`, which is conveniently given using the partial function literal syntax as demonstrated below: -.. code-block:: scala +.. includecode:: code/akka/docs/actor/FSMDocSpec.scala + :include: when-syntax - when(Idle) { - case Event(Start(msg), _) => - goto(Timer) using (msg, sender) - } - - when(Timer, stateTimeout = 12 seconds) { - case Event(StateTimeout, (msg, sender)) => - sender ! msg - goto(Idle) - } - -The :class:`Event(msg: Any, data: D)` case class is parameterized with the data +The :class:`Event(msg: Any, data: D)` case class is parameterized with the data type held by the FSM for convenient pattern matching. Defining the Initial State @@ -193,7 +176,7 @@ Defining the Initial State Each FSM needs a starting point, which is declared using - :func:`startWith(state, data[, timeout])` + :func:`startWith(state, data[, timeout])` The optionally given timeout argument overrides any specification given for the desired initial state. If you want to cancel a default timeout, use @@ -206,16 +189,8 @@ If a state doesn't handle a received event a warning is logged. If you want to do something else in this case you can specify that with :func:`whenUnhandled(stateFunction)`: -.. code-block:: scala - - whenUnhandled { - case Event(x : X, data) => - log.info(this, "Received unhandled event: " + x) - stay - case Event(msg, _) => - log.warn(this, "Received unknown event: " + x) - goto(Error) - } +.. includecode:: code/akka/docs/actor/FSMDocSpec.scala + :include: unhandled-syntax **IMPORTANT**: This handler is not stacked, meaning that each invocation of :func:`whenUnhandled` replaces the previously installed handler. @@ -230,7 +205,8 @@ The state definition can either be the current state, as described by the :func:`goto(state)`. The resulting object allows further qualification by way of the modifiers described in the following: -:meth:`forMax(duration)` +* :meth:`forMax(duration)` + This modifier sets a state timeout on the next state. This means that a timer is started which upon expiry sends a :obj:`StateTimeout` message to the FSM. This timer is canceled upon reception of any other message in the meantime; @@ -241,23 +217,21 @@ of the modifiers described in the following: specified for the target state. If you want to cancel the default timeout, use :obj:`Duration.Inf`. -:meth:`using(data)` +* :meth:`using(data)` + This modifier replaces the old state data with the new data given. If you follow the advice :ref:`above `, this is the only place where internal state data are ever modified. -:meth:`replying(msg)` +* :meth:`replying(msg)` + This modifier sends a reply to the currently processed message and otherwise does not modify the state transition. All modifier can be chained to achieve a nice and concise description: -.. code-block:: scala - - when(State) { - case Event(msg, _) => - goto(Processing) using (msg) forMax (5 seconds) replying (WillDo) - } +.. includecode:: code/akka/docs/actor/FSMDocSpec.scala + :include: modifier-syntax The parentheses are not actually needed in all cases, but they visually distinguish between modifiers and their arguments and therefore make the code @@ -267,7 +241,7 @@ even more pleasant to read for foreigners. Please note that the ``return`` statement may not be used in :meth:`when` blocks or similar; this is a Scala restriction. Either refactor your code - using ``if () ... else ...`` or move it into a method definition. + using ``if () ... else ...`` or move it into a method definition. Monitoring Transitions ---------------------- @@ -293,13 +267,8 @@ The handler is a partial function which takes a pair of states as input; no resulting state is needed as it is not possible to modify the transition in progress. -.. code-block:: scala - - onTransition { - case Idle -> Active => setTimer("timeout") - case Active -> _ => cancelTimer("timeout") - case x -> Idle => log.info("entering Idle from "+x) - } +.. includecode:: code/akka/docs/actor/FSMDocSpec.scala + :include: transition-syntax The convenience extractor :obj:`->` enables decomposition of the pair of states with a clear visual reminder of the transition's direction. As usual in pattern @@ -311,13 +280,8 @@ It is also possible to pass a function object accepting two states to :func:`onTransition`, in case your transition handling logic is implemented as a method: -.. code-block:: scala - - onTransition(handler _) - - private def handler(from: State, to: State) { - ... - } +.. includecode:: code/akka/docs/actor/FSMDocSpec.scala + :include: alt-transition-syntax The handlers registered with this method are stacked, so you can intersperse :func:`onTransition` blocks with :func:`when` blocks as suits your design. It @@ -338,8 +302,8 @@ External Monitoring External actors may be registered to be notified of state transitions by sending a message :class:`SubscribeTransitionCallBack(actorRef)`. The named -actor will be sent a :class:`CurrentState(self, stateName)` message immediately -and will receive :class:`Transition(actorRef, oldState, newState)` messages +actor will be sent a :class:`CurrentState(self, stateName)` message immediately +and will receive :class:`Transition(actorRef, oldState, newState)` messages whenever a new state is reached. External monitors may be unregistered by sending :class:`UnsubscribeTransitionCallBack(actorRef)` to the FSM actor. @@ -347,13 +311,31 @@ Registering a not-running listener generates a warning and fails gracefully. Stopping a listener without unregistering will remove the listener from the subscription list upon the next transition. +Transforming State +------------------ + +The partial functions supplied as argument to the ``when()`` blocks can be +transformed using Scala’s full supplement of functional programming tools. In +order to retain type inference, there is a helper function which may be used in +case some common handling logic shall be applied to different clauses: + +.. includecode:: code/akka/docs/actor/FSMDocSpec.scala + :include: transform-syntax + +It goes without saying that the arguments to this method may also be stored, to +be used several times, e.g. when applying the same transformation to several +``when()`` blocks: + +.. includecode:: code/akka/docs/actor/FSMDocSpec.scala + :include: alt-transform-syntax + Timers ------ Besides state timeouts, FSM manages timers identified by :class:`String` names. You may set a timer using - :func:`setTimer(name, msg, interval, repeat)` + :func:`setTimer(name, msg, interval, repeat)` where :obj:`msg` is the message object which will be sent after the duration :obj:`interval` has elapsed. If :obj:`repeat` is :obj:`true`, then the timer is @@ -376,7 +358,7 @@ Termination from Inside The FSM is stopped by specifying the result state as - :func:`stop([reason[, data]])` + :func:`stop([reason[, data]])` The reason must be one of :obj:`Normal` (which is the default), :obj:`Shutdown` or :obj:`Failure(reason)`, and the second argument may be given to change the @@ -389,25 +371,15 @@ state data which is available during termination handling. the same way as a state transition (but note that the ``return`` statement may not be used within a :meth:`when` block). -.. code-block:: scala - - when(A) { - case Event(Stop, _) => - doCleanup() - stop() - } +.. includecode:: code/akka/docs/actor/FSMDocSpec.scala + :include: stop-syntax You can use :func:`onTermination(handler)` to specify custom code that is executed when the FSM is stopped. The handler is a partial function which takes -a :class:`StopEvent(reason, stateName, stateData)` as argument: +a :class:`StopEvent(reason, stateName, stateData)` as argument: -.. code-block:: scala - - onTermination { - case StopEvent(Normal, s, d) => ... - case StopEvent(Shutdown, _, _) => ... - case StopEvent(Failure(cause), s, d) => ... - } +.. includecode:: code/akka/docs/actor/FSMDocSpec.scala + :include: termination-syntax As for the :func:`whenUnhandled` case, this handler is not stacked, so each invocation of :func:`onTermination` replaces the previously installed handler. @@ -419,7 +391,7 @@ When an :class:`ActorRef` associated to a FSM is stopped using the :meth:`stop()` method, its :meth:`postStop` hook will be executed. The default implementation by the :class:`FSM` trait is to execute the :meth:`onTermination` handler if that is prepared to handle a -:obj:`StopEvent(Shutdown, ...)`. +:obj:`StopEvent(Shutdown, ...)`. .. warning:: @@ -438,11 +410,11 @@ Event Tracing ------------- The setting ``akka.actor.debug.fsm`` in :ref:`configuration` enables logging of an -event trace by :class:`LoggingFSM` instances:: +event trace by :class:`LoggingFSM` instances: - class MyFSM extends Actor with LoggingFSM[X, Z] { - ... - } +.. includecode:: code/akka/docs/actor/FSMDocSpec.scala + :include: logging-fsm + :exclude: body-elided This FSM will log at DEBUG level: @@ -459,17 +431,10 @@ Rolling Event Log The :class:`LoggingFSM` trait adds one more feature to the FSM: a rolling event log which may be used during debugging (for tracing how the FSM entered a -certain failure state) or for other creative uses:: +certain failure state) or for other creative uses: - class MyFSM extends Actor with LoggingFSM[X, Z] { - override def logDepth = 12 - onTermination { - case StopEvent(Failure(_), state, data) => - log.warning(this, "Failure in state "+state+" with data "+data+"\n"+ - "Events leading up to this point:\n\t"+getLog.mkString("\n\t")) - } - ... - } +.. includecode:: code/akka/docs/actor/FSMDocSpec.scala + :include: logging-fsm The :meth:`logDepth` defaults to zero, which turns off the event log. diff --git a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java index 0fcb423d1e..21074a44c0 100644 --- a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java +++ b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java @@ -80,147 +80,6 @@ public final class RemoteProtocol { // @@protoc_insertion_point(enum_scope:CommandType) } - public enum ReplicationStorageType - implements com.google.protobuf.ProtocolMessageEnum { - TRANSIENT(0, 1), - TRANSACTION_LOG(1, 2), - DATA_GRID(2, 3), - ; - - public static final int TRANSIENT_VALUE = 1; - public static final int TRANSACTION_LOG_VALUE = 2; - public static final int DATA_GRID_VALUE = 3; - - - public final int getNumber() { return value; } - - public static ReplicationStorageType valueOf(int value) { - switch (value) { - case 1: return TRANSIENT; - case 2: return TRANSACTION_LOG; - case 3: return DATA_GRID; - default: return null; - } - } - - public static com.google.protobuf.Internal.EnumLiteMap - internalGetValueMap() { - return internalValueMap; - } - private static com.google.protobuf.Internal.EnumLiteMap - internalValueMap = - new com.google.protobuf.Internal.EnumLiteMap() { - public ReplicationStorageType findValueByNumber(int number) { - return ReplicationStorageType.valueOf(number); - } - }; - - public final com.google.protobuf.Descriptors.EnumValueDescriptor - getValueDescriptor() { - return getDescriptor().getValues().get(index); - } - public final com.google.protobuf.Descriptors.EnumDescriptor - getDescriptorForType() { - return getDescriptor(); - } - public static final com.google.protobuf.Descriptors.EnumDescriptor - getDescriptor() { - return akka.remote.RemoteProtocol.getDescriptor().getEnumTypes().get(1); - } - - private static final ReplicationStorageType[] VALUES = { - TRANSIENT, TRANSACTION_LOG, DATA_GRID, - }; - - public static ReplicationStorageType valueOf( - com.google.protobuf.Descriptors.EnumValueDescriptor desc) { - if (desc.getType() != getDescriptor()) { - throw new java.lang.IllegalArgumentException( - "EnumValueDescriptor is not for this type."); - } - return VALUES[desc.getIndex()]; - } - - private final int index; - private final int value; - - private ReplicationStorageType(int index, int value) { - this.index = index; - this.value = value; - } - - // @@protoc_insertion_point(enum_scope:ReplicationStorageType) - } - - public enum ReplicationStrategyType - implements com.google.protobuf.ProtocolMessageEnum { - WRITE_THROUGH(0, 1), - WRITE_BEHIND(1, 2), - ; - - public static final int WRITE_THROUGH_VALUE = 1; - public static final int WRITE_BEHIND_VALUE = 2; - - - public final int getNumber() { return value; } - - public static ReplicationStrategyType valueOf(int value) { - switch (value) { - case 1: return WRITE_THROUGH; - case 2: return WRITE_BEHIND; - default: return null; - } - } - - public static com.google.protobuf.Internal.EnumLiteMap - internalGetValueMap() { - return internalValueMap; - } - private static com.google.protobuf.Internal.EnumLiteMap - internalValueMap = - new com.google.protobuf.Internal.EnumLiteMap() { - public ReplicationStrategyType findValueByNumber(int number) { - return ReplicationStrategyType.valueOf(number); - } - }; - - public final com.google.protobuf.Descriptors.EnumValueDescriptor - getValueDescriptor() { - return getDescriptor().getValues().get(index); - } - public final com.google.protobuf.Descriptors.EnumDescriptor - getDescriptorForType() { - return getDescriptor(); - } - public static final com.google.protobuf.Descriptors.EnumDescriptor - getDescriptor() { - return akka.remote.RemoteProtocol.getDescriptor().getEnumTypes().get(2); - } - - private static final ReplicationStrategyType[] VALUES = { - WRITE_THROUGH, WRITE_BEHIND, - }; - - public static ReplicationStrategyType valueOf( - com.google.protobuf.Descriptors.EnumValueDescriptor desc) { - if (desc.getType() != getDescriptor()) { - throw new java.lang.IllegalArgumentException( - "EnumValueDescriptor is not for this type."); - } - return VALUES[desc.getIndex()]; - } - - private final int index; - private final int value; - - private ReplicationStrategyType(int index, int value) { - this.index = index; - this.value = value; - } - - // @@protoc_insertion_point(enum_scope:ReplicationStrategyType) - } - public interface AkkaRemoteProtocolOrBuilder extends com.google.protobuf.MessageOrBuilder { @@ -4313,491 +4172,6 @@ public final class RemoteProtocol { // @@protoc_insertion_point(class_scope:AddressProtocol) } - public interface ExceptionProtocolOrBuilder - extends com.google.protobuf.MessageOrBuilder { - - // required string classname = 1; - boolean hasClassname(); - String getClassname(); - - // required string message = 2; - boolean hasMessage(); - String getMessage(); - } - public static final class ExceptionProtocol extends - com.google.protobuf.GeneratedMessage - implements ExceptionProtocolOrBuilder { - // Use ExceptionProtocol.newBuilder() to construct. - private ExceptionProtocol(Builder builder) { - super(builder); - } - private ExceptionProtocol(boolean noInit) {} - - private static final ExceptionProtocol defaultInstance; - public static ExceptionProtocol getDefaultInstance() { - return defaultInstance; - } - - public ExceptionProtocol getDefaultInstanceForType() { - return defaultInstance; - } - - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return akka.remote.RemoteProtocol.internal_static_ExceptionProtocol_descriptor; - } - - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return akka.remote.RemoteProtocol.internal_static_ExceptionProtocol_fieldAccessorTable; - } - - private int bitField0_; - // required string classname = 1; - public static final int CLASSNAME_FIELD_NUMBER = 1; - private java.lang.Object classname_; - public boolean hasClassname() { - return ((bitField0_ & 0x00000001) == 0x00000001); - } - public String getClassname() { - java.lang.Object ref = classname_; - if (ref instanceof String) { - return (String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - String s = bs.toStringUtf8(); - if (com.google.protobuf.Internal.isValidUtf8(bs)) { - classname_ = s; - } - return s; - } - } - private com.google.protobuf.ByteString getClassnameBytes() { - java.lang.Object ref = classname_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8((String) ref); - classname_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - - // required string message = 2; - public static final int MESSAGE_FIELD_NUMBER = 2; - private java.lang.Object message_; - public boolean hasMessage() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - public String getMessage() { - java.lang.Object ref = message_; - if (ref instanceof String) { - return (String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - String s = bs.toStringUtf8(); - if (com.google.protobuf.Internal.isValidUtf8(bs)) { - message_ = s; - } - return s; - } - } - private com.google.protobuf.ByteString getMessageBytes() { - java.lang.Object ref = message_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8((String) ref); - message_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - - private void initFields() { - classname_ = ""; - message_ = ""; - } - private byte memoizedIsInitialized = -1; - public final boolean isInitialized() { - byte isInitialized = memoizedIsInitialized; - if (isInitialized != -1) return isInitialized == 1; - - if (!hasClassname()) { - memoizedIsInitialized = 0; - return false; - } - if (!hasMessage()) { - memoizedIsInitialized = 0; - return false; - } - memoizedIsInitialized = 1; - return true; - } - - public void writeTo(com.google.protobuf.CodedOutputStream output) - throws java.io.IOException { - getSerializedSize(); - if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeBytes(1, getClassnameBytes()); - } - if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeBytes(2, getMessageBytes()); - } - getUnknownFields().writeTo(output); - } - - private int memoizedSerializedSize = -1; - public int getSerializedSize() { - int size = memoizedSerializedSize; - if (size != -1) return size; - - size = 0; - if (((bitField0_ & 0x00000001) == 0x00000001)) { - size += com.google.protobuf.CodedOutputStream - .computeBytesSize(1, getClassnameBytes()); - } - if (((bitField0_ & 0x00000002) == 0x00000002)) { - size += com.google.protobuf.CodedOutputStream - .computeBytesSize(2, getMessageBytes()); - } - size += getUnknownFields().getSerializedSize(); - memoizedSerializedSize = size; - return size; - } - - private static final long serialVersionUID = 0L; - @java.lang.Override - protected java.lang.Object writeReplace() - throws java.io.ObjectStreamException { - return super.writeReplace(); - } - - public static akka.remote.RemoteProtocol.ExceptionProtocol parseFrom( - com.google.protobuf.ByteString data) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data).buildParsed(); - } - public static akka.remote.RemoteProtocol.ExceptionProtocol parseFrom( - com.google.protobuf.ByteString data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data, extensionRegistry) - .buildParsed(); - } - public static akka.remote.RemoteProtocol.ExceptionProtocol parseFrom(byte[] data) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data).buildParsed(); - } - public static akka.remote.RemoteProtocol.ExceptionProtocol parseFrom( - byte[] data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data, extensionRegistry) - .buildParsed(); - } - public static akka.remote.RemoteProtocol.ExceptionProtocol parseFrom(java.io.InputStream input) - throws java.io.IOException { - return newBuilder().mergeFrom(input).buildParsed(); - } - public static akka.remote.RemoteProtocol.ExceptionProtocol parseFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return newBuilder().mergeFrom(input, extensionRegistry) - .buildParsed(); - } - public static akka.remote.RemoteProtocol.ExceptionProtocol parseDelimitedFrom(java.io.InputStream input) - throws java.io.IOException { - Builder builder = newBuilder(); - if (builder.mergeDelimitedFrom(input)) { - return builder.buildParsed(); - } else { - return null; - } - } - public static akka.remote.RemoteProtocol.ExceptionProtocol parseDelimitedFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - Builder builder = newBuilder(); - if (builder.mergeDelimitedFrom(input, extensionRegistry)) { - return builder.buildParsed(); - } else { - return null; - } - } - public static akka.remote.RemoteProtocol.ExceptionProtocol parseFrom( - com.google.protobuf.CodedInputStream input) - throws java.io.IOException { - return newBuilder().mergeFrom(input).buildParsed(); - } - public static akka.remote.RemoteProtocol.ExceptionProtocol parseFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return newBuilder().mergeFrom(input, extensionRegistry) - .buildParsed(); - } - - public static Builder newBuilder() { return Builder.create(); } - public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder(akka.remote.RemoteProtocol.ExceptionProtocol prototype) { - return newBuilder().mergeFrom(prototype); - } - public Builder toBuilder() { return newBuilder(this); } - - @java.lang.Override - protected Builder newBuilderForType( - com.google.protobuf.GeneratedMessage.BuilderParent parent) { - Builder builder = new Builder(parent); - return builder; - } - public static final class Builder extends - com.google.protobuf.GeneratedMessage.Builder - implements akka.remote.RemoteProtocol.ExceptionProtocolOrBuilder { - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return akka.remote.RemoteProtocol.internal_static_ExceptionProtocol_descriptor; - } - - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return akka.remote.RemoteProtocol.internal_static_ExceptionProtocol_fieldAccessorTable; - } - - // Construct using akka.remote.RemoteProtocol.ExceptionProtocol.newBuilder() - private Builder() { - maybeForceBuilderInitialization(); - } - - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { - super(parent); - maybeForceBuilderInitialization(); - } - private void maybeForceBuilderInitialization() { - if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { - } - } - private static Builder create() { - return new Builder(); - } - - public Builder clear() { - super.clear(); - classname_ = ""; - bitField0_ = (bitField0_ & ~0x00000001); - message_ = ""; - bitField0_ = (bitField0_ & ~0x00000002); - return this; - } - - public Builder clone() { - return create().mergeFrom(buildPartial()); - } - - public com.google.protobuf.Descriptors.Descriptor - getDescriptorForType() { - return akka.remote.RemoteProtocol.ExceptionProtocol.getDescriptor(); - } - - public akka.remote.RemoteProtocol.ExceptionProtocol getDefaultInstanceForType() { - return akka.remote.RemoteProtocol.ExceptionProtocol.getDefaultInstance(); - } - - public akka.remote.RemoteProtocol.ExceptionProtocol build() { - akka.remote.RemoteProtocol.ExceptionProtocol result = buildPartial(); - if (!result.isInitialized()) { - throw newUninitializedMessageException(result); - } - return result; - } - - private akka.remote.RemoteProtocol.ExceptionProtocol buildParsed() - throws com.google.protobuf.InvalidProtocolBufferException { - akka.remote.RemoteProtocol.ExceptionProtocol result = buildPartial(); - if (!result.isInitialized()) { - throw newUninitializedMessageException( - result).asInvalidProtocolBufferException(); - } - return result; - } - - public akka.remote.RemoteProtocol.ExceptionProtocol buildPartial() { - akka.remote.RemoteProtocol.ExceptionProtocol result = new akka.remote.RemoteProtocol.ExceptionProtocol(this); - int from_bitField0_ = bitField0_; - int to_bitField0_ = 0; - if (((from_bitField0_ & 0x00000001) == 0x00000001)) { - to_bitField0_ |= 0x00000001; - } - result.classname_ = classname_; - if (((from_bitField0_ & 0x00000002) == 0x00000002)) { - to_bitField0_ |= 0x00000002; - } - result.message_ = message_; - result.bitField0_ = to_bitField0_; - onBuilt(); - return result; - } - - public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof akka.remote.RemoteProtocol.ExceptionProtocol) { - return mergeFrom((akka.remote.RemoteProtocol.ExceptionProtocol)other); - } else { - super.mergeFrom(other); - return this; - } - } - - public Builder mergeFrom(akka.remote.RemoteProtocol.ExceptionProtocol other) { - if (other == akka.remote.RemoteProtocol.ExceptionProtocol.getDefaultInstance()) return this; - if (other.hasClassname()) { - setClassname(other.getClassname()); - } - if (other.hasMessage()) { - setMessage(other.getMessage()); - } - this.mergeUnknownFields(other.getUnknownFields()); - return this; - } - - public final boolean isInitialized() { - if (!hasClassname()) { - - return false; - } - if (!hasMessage()) { - - return false; - } - return true; - } - - public Builder mergeFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - com.google.protobuf.UnknownFieldSet.Builder unknownFields = - com.google.protobuf.UnknownFieldSet.newBuilder( - this.getUnknownFields()); - while (true) { - int tag = input.readTag(); - switch (tag) { - case 0: - this.setUnknownFields(unknownFields.build()); - onChanged(); - return this; - default: { - if (!parseUnknownField(input, unknownFields, - extensionRegistry, tag)) { - this.setUnknownFields(unknownFields.build()); - onChanged(); - return this; - } - break; - } - case 10: { - bitField0_ |= 0x00000001; - classname_ = input.readBytes(); - break; - } - case 18: { - bitField0_ |= 0x00000002; - message_ = input.readBytes(); - break; - } - } - } - } - - private int bitField0_; - - // required string classname = 1; - private java.lang.Object classname_ = ""; - public boolean hasClassname() { - return ((bitField0_ & 0x00000001) == 0x00000001); - } - public String getClassname() { - java.lang.Object ref = classname_; - if (!(ref instanceof String)) { - String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); - classname_ = s; - return s; - } else { - return (String) ref; - } - } - public Builder setClassname(String value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000001; - classname_ = value; - onChanged(); - return this; - } - public Builder clearClassname() { - bitField0_ = (bitField0_ & ~0x00000001); - classname_ = getDefaultInstance().getClassname(); - onChanged(); - return this; - } - void setClassname(com.google.protobuf.ByteString value) { - bitField0_ |= 0x00000001; - classname_ = value; - onChanged(); - } - - // required string message = 2; - private java.lang.Object message_ = ""; - public boolean hasMessage() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - public String getMessage() { - java.lang.Object ref = message_; - if (!(ref instanceof String)) { - String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); - message_ = s; - return s; - } else { - return (String) ref; - } - } - public Builder setMessage(String value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000002; - message_ = value; - onChanged(); - return this; - } - public Builder clearMessage() { - bitField0_ = (bitField0_ & ~0x00000002); - message_ = getDefaultInstance().getMessage(); - onChanged(); - return this; - } - void setMessage(com.google.protobuf.ByteString value) { - bitField0_ |= 0x00000002; - message_ = value; - onChanged(); - } - - // @@protoc_insertion_point(builder_scope:ExceptionProtocol) - } - - static { - defaultInstance = new ExceptionProtocol(true); - defaultInstance.initFields(); - } - - // @@protoc_insertion_point(class_scope:ExceptionProtocol) - } - public interface DurableMailboxMessageProtocolOrBuilder extends com.google.protobuf.MessageOrBuilder { @@ -5496,11 +4870,6 @@ public final class RemoteProtocol { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_AddressProtocol_fieldAccessorTable; - private static com.google.protobuf.Descriptors.Descriptor - internal_static_ExceptionProtocol_descriptor; - private static - com.google.protobuf.GeneratedMessage.FieldAccessorTable - internal_static_ExceptionProtocol_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor internal_static_DurableMailboxMessageProtocol_descriptor; private static @@ -5531,17 +4900,12 @@ public final class RemoteProtocol { "anifest\030\003 \001(\014\"3\n\025MetadataEntryProtocol\022\013" + "\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"A\n\017AddressPro" + "tocol\022\016\n\006system\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022" + - "\014\n\004port\030\003 \002(\r\"7\n\021ExceptionProtocol\022\021\n\tcl" + - "assname\030\001 \002(\t\022\017\n\007message\030\002 \002(\t\"y\n\035Durabl" + - "eMailboxMessageProtocol\022$\n\trecipient\030\001 \002" + - "(\0132\021.ActorRefProtocol\022!\n\006sender\030\002 \001(\0132\021.", - "ActorRefProtocol\022\017\n\007message\030\003 \002(\014*7\n\013Com" + - "mandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020\002\022\r\n\tH" + - "EARTBEAT\020\003*K\n\026ReplicationStorageType\022\r\n\t" + - "TRANSIENT\020\001\022\023\n\017TRANSACTION_LOG\020\002\022\r\n\tDATA" + - "_GRID\020\003*>\n\027ReplicationStrategyType\022\021\n\rWR" + - "ITE_THROUGH\020\001\022\020\n\014WRITE_BEHIND\020\002B\017\n\013akka." + - "remoteH\001" + "\014\n\004port\030\003 \002(\r\"y\n\035DurableMailboxMessagePr" + + "otocol\022$\n\trecipient\030\001 \002(\0132\021.ActorRefProt" + + "ocol\022!\n\006sender\030\002 \001(\0132\021.ActorRefProtocol\022" + + "\017\n\007message\030\003 \002(\014*7\n\013CommandType\022\013\n\007CONNE", + "CT\020\001\022\014\n\010SHUTDOWN\020\002\022\r\n\tHEARTBEAT\020\003B\017\n\013akk" + + "a.remoteH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5604,16 +4968,8 @@ public final class RemoteProtocol { new java.lang.String[] { "System", "Hostname", "Port", }, akka.remote.RemoteProtocol.AddressProtocol.class, akka.remote.RemoteProtocol.AddressProtocol.Builder.class); - internal_static_ExceptionProtocol_descriptor = - getDescriptor().getMessageTypes().get(7); - internal_static_ExceptionProtocol_fieldAccessorTable = new - com.google.protobuf.GeneratedMessage.FieldAccessorTable( - internal_static_ExceptionProtocol_descriptor, - new java.lang.String[] { "Classname", "Message", }, - akka.remote.RemoteProtocol.ExceptionProtocol.class, - akka.remote.RemoteProtocol.ExceptionProtocol.Builder.class); internal_static_DurableMailboxMessageProtocol_descriptor = - getDescriptor().getMessageTypes().get(8); + getDescriptor().getMessageTypes().get(7); internal_static_DurableMailboxMessageProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_DurableMailboxMessageProtocol_descriptor, diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index da9414a110..730c3a5883 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -44,23 +44,6 @@ enum CommandType { HEARTBEAT = 3; } -/** - * Defines the type of the ReplicationStorage - */ -enum ReplicationStorageType { - TRANSIENT = 1; - TRANSACTION_LOG = 2; - DATA_GRID = 3; -} - -/** - * Defines the type of the ReplicationStrategy - */ -enum ReplicationStrategyType { - WRITE_THROUGH = 1; - WRITE_BEHIND = 2; -} - /** * Defines a remote ActorRef that "remembers" and uses its original Actor instance * on the original node. @@ -95,14 +78,6 @@ message AddressProtocol { required uint32 port = 3; } -/** - * Defines an exception. - */ -message ExceptionProtocol { - required string classname = 1; - required string message = 2; -} - /** * Defines the durable mailbox message. */ diff --git a/akka-spring/src/test/java/akka/spring/foo/PingActor.java b/akka-spring/src/test/java/akka/spring/foo/PingActor.java index 3a04ba77cf..fc1344ff73 100644 --- a/akka-spring/src/test/java/akka/spring/foo/PingActor.java +++ b/akka-spring/src/test/java/akka/spring/foo/PingActor.java @@ -1,6 +1,5 @@ package akka.spring.foo; -import static akka.actor.Actors.*; import akka.actor.ActorRef; import akka.actor.UntypedActor; diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java index 61905a775c..8760db5084 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java @@ -5,7 +5,6 @@ package akka.transactor; import akka.actor.ActorRef; -import akka.actor.Actors; import akka.actor.UntypedActor; import scala.concurrent.stm.Ref; import scala.concurrent.stm.japi.STM;