diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 997476ca94..2e53859817 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -60,10 +60,6 @@ Architecture persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's replicated message stream. -* *Streams*: Messages written by a persistent actor can be published in compliance with the `Reactive Streams`_ specification. - Only those messages that are explicitly requested from downstream persistent actors are actually pulled from a persistent actor's - journal. - * *AbstractPersistentActorAtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in case of sender and receiver JVM crashes. @@ -80,7 +76,6 @@ Architecture development of event sourced applications (see section :ref:`event-sourcing-java-lambda`) .. _Community plugins: https://gist.github.com/krasserm/8612920#file-akka-persistence-plugins-md -.. _Reactive Streams: http://www.reactive-streams.org/ .. _event-sourcing-java-lambda: @@ -335,13 +330,6 @@ The identifier must be defined with the ``viewId`` method. The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java-lambda` of a view and its persistent actor shall be shared (which is what applications usually do not want). -.. _streams-java-lambda: - -Streams -======= - -Java API coming soon. See also Scala :ref:`streams` documentation. - .. _snapshots-java-lambda: Snapshots diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index ba61bc7f18..e33c79d958 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -65,10 +65,6 @@ Architecture persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's replicated message stream. -* *Streams*: Messages written by a persistent actor can be published in compliance with the `Reactive Streams`_ specification. - Only those messages that are explicitly requested from downstream persistent actors are actually pulled from a persistent actor's - journal. - * *UntypedPersistentActorAtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in case of sender and receiver JVM crashes. @@ -82,7 +78,6 @@ Architecture storage plugin writes to the local filesystem. .. _Community plugins: http://akka.io/community/ -.. _Reactive Streams: http://www.reactive-streams.org/ .. _event-sourcing-java: @@ -341,13 +336,6 @@ The identifier must be defined with the ``viewId`` method. The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java` of a view and its persistent actor shall be shared (which is what applications usually do not want). -.. _streams-java: - -Streams -======= - -Java API coming soon. See also Scala :ref:`streams` documentation. - .. _snapshots-java: Snapshots diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 0e5bf8735d..942906681c 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -475,46 +475,4 @@ trait PersistenceDocSpec { //#view-update } - new AnyRef { - // ------------------------------------------------------------------------------------------------ - // FIXME: uncomment once going back to project dependencies (in akka-stream-experimental) - // ------------------------------------------------------------------------------------------------ - /* - //#producer-creation - import org.reactivestreams.api.Producer - - import akka.persistence.Persistent - import akka.persistence.stream.{ PersistentFlow, PersistentPublisherSettings } - import akka.stream.{ FlowMaterializer, MaterializerSettings } - import akka.stream.scaladsl.Flow - - val materializer = FlowMaterializer(MaterializerSettings()) - - val flow: Flow[Persistent] = PersistentFlow.fromPersistence("some-persistence-id") - val producer: Producer[Persistent] = flow.toProducer(materializer) - //#producer-creation - - //#producer-buffer-size - PersistentFlow.fromPersistence("some-persistence-id", PersistentPublisherSettings(maxBufferSize = 200)) - //#producer-buffer-size - - //#producer-examples - // 1 producer and 2 consumers: - val producer1: Producer[Persistent] = - PersistentFlow.fromPersistence("processor-1").toProducer(materializer) - Flow(producer1).foreach(p => println(s"consumer-1: ${p.payload}")).consume(materializer) - Flow(producer1).foreach(p => println(s"consumer-2: ${p.payload}")).consume(materializer) - - // 2 producers (merged) and 1 consumer: - val producer2: Producer[Persistent] = - PersistentFlow.fromPersistence("processor-2").toProducer(materializer) - val producer3: Producer[Persistent] = - PersistentFlow.fromPersistence("processor-3").toProducer(materializer) - Flow(producer2).merge(producer3).foreach { p => - println(s"consumer-3: ${p.payload}") - }.consume(materializer) - //#producer-examples - */ - } - } diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index a10e262a3f..c9fc0834d0 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -57,10 +57,6 @@ Architecture persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's replicated message stream. -* *Streams*: Messages written by a persistent actor can be published in compliance with the `Reactive Streams`_ specification. - Only those messages that are explicitly requested from downstream persistent actors are actually pulled from a persistent actor's - journal. - * *AtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in case of sender and receiver JVM crashes. @@ -74,8 +70,6 @@ Architecture storage plugin writes to the local filesystem. .. _Community plugins: http://akka.io/community/ -.. _Reactive Streams: http://www.reactive-streams.org/ - .. _event-sourcing: @@ -336,44 +330,6 @@ The identifier must be defined with the ``viewId`` method. The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots` of a view and its persistent actor shall be shared (which is what applications usually do not want). -.. _streams: - -Streams -======= - -**TODO: rename *producer* to *publisher*.** - -A `Reactive Streams`_ ``Producer`` can be created from a persistent actor's message stream via the ``PersistentFlow`` -extension of the Akka Streams Scala DSL: - -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#producer-creation - -The created ``flow`` object is of type ``Flow[Persistent]`` and can be composed with other flows using ``Flow`` -combinators (= methods defined on ``Flow``). Calling the ``toProducer`` method on ``flow`` creates a producer -of type ``Producer[Persistent]``. - -A persistent message producer only reads from a persistent actor's journal when explicitly requested by downstream -consumers. In order to avoid frequent, fine grained read access to a persistent actor's journal, the producer tries -to buffer persistent messages in memory from which it serves downstream requests. The maximum buffer size per -producer is configurable with a ``PersistentPublisherSettings`` configuration object. - -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#producer-buffer-size - -Other ``ProducerSettings`` parameters are: - -* ``fromSequenceNr``: specifies from which sequence number the persistent message stream shall start (defaults - to ``1L``). Please note that specifying ``fromSequenceNr`` is much more efficient than using the ``drop(Int)`` - combinator, especially for larger sequence numbers. - -* ``idle``: an optional parameter that specifies how long a producer shall wait after a journal read attempt didn't return - any new persistent messages. If not defined, the producer uses the ``akka.persistence.view.auto-update-interval`` - configuration parameter, otherwise, it uses the defined ``idle`` parameter. - -Here are two examples how persistent message producers can be connected to downstream consumers using the Akka -Streams Scala DSL and its ``PersistentFlow`` extension. - -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#producer-examples - .. _snapshots: Snapshots diff --git a/akka-samples/akka-sample-persistence-scala/build.sbt b/akka-samples/akka-sample-persistence-scala/build.sbt index ebc68c6f94..ba6ee2091f 100644 --- a/akka-samples/akka-sample-persistence-scala/build.sbt +++ b/akka-samples/akka-sample-persistence-scala/build.sbt @@ -1,11 +1,10 @@ name := "akka-sample-persistence-scala" -version := "2.4-SNAPSHOT" +version := "2.3-SNAPSHOT" scalaVersion := "2.10.4" libraryDependencies ++= Seq( - "com.typesafe.akka" %% "akka-persistence-experimental" % "2.4-SNAPSHOT", - "com.typesafe.akka" %% "akka-stream-experimental" % "0.3" + "com.typesafe.akka" %% "akka-persistence-experimental" % "2.4-SNAPSHOT" ) diff --git a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/StreamExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/StreamExample.scala deleted file mode 100644 index 651b1a975e..0000000000 --- a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/StreamExample.scala +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package sample.persistence - -/* FIXME include when akka-stream is in sync - -import org.reactivestreams.api._ - -import akka.actor._ -import akka.persistence.{ Persistent, Processor } -import akka.persistence.stream.PersistentFlow -import akka.stream._ -import akka.stream.scaladsl._ - -/** - * This example demonstrates how akka-persistence Views can be used as reactive-stream Producers. A - * View-based Producer is created with PersistentFlow.fromProcessor(processorId: String).toProducer(). - * This Producer produces Persistent messages as they are written by its corresponding akka-persistence - * Processor. The PersistentFlow object is an extension to the akka-stream DSL. - */ -object StreamExample extends App { - implicit val system = ActorSystem("example") - - class ExampleProcessor(pid: String) extends Processor { - override def processorId = pid - def receive = { - case Persistent(payload, _) => - } - } - - val p1 = system.actorOf(Props(classOf[ExampleProcessor], "p1")) - val p2 = system.actorOf(Props(classOf[ExampleProcessor], "p2")) - - val materializer = FlowMaterializer(MaterializerSettings()) - - // 1 view-backed producer and 2 consumers: - val producer1: Producer[Persistent] = PersistentFlow.fromProcessor("p1").toProducer(materializer) - Flow(producer1).foreach { p => println(s"consumer-1: ${p.payload}") }.consume(materializer) - Flow(producer1).foreach { p => println(s"consumer-2: ${p.payload}") }.consume(materializer) - - // 2 view-backed producers (merged) and 1 consumer: - // This is an example how message/event streams from multiple processors can be merged into a single stream. - val producer2: Producer[Persistent] = PersistentFlow.fromProcessor("p1").toProducer(materializer) - val merged: Producer[Persistent] = PersistentFlow.fromProcessor("p2").merge(producer2).toProducer(materializer) - Flow(merged).foreach { p => println(s"consumer-3: ${p.payload}") }.consume(materializer) - - while (true) { - p1 ! Persistent("a-" + System.currentTimeMillis()) - p2 ! Persistent("b-" + System.currentTimeMillis()) - Thread.sleep(500) - } -} - -*/