=per #15441 Remove persistent stream in docs and sample

We must first release akka-stream with dependency to akka 2.3.4
and it should be maintained in the akka-release-dev branch

(cherry picked from commit a97a067701cfc527b235707882e72326277415f3)

Conflicts:
	akka-samples/akka-sample-persistence-scala/build.sbt
	akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/StreamExample.scala
This commit is contained in:
Patrik Nordwall 2014-06-27 14:48:17 +02:00
parent 8eec3f92d3
commit bc2aeaf0d3
6 changed files with 2 additions and 168 deletions

View file

@ -60,10 +60,6 @@ Architecture
persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's
replicated message stream. replicated message stream.
* *Streams*: Messages written by a persistent actor can be published in compliance with the `Reactive Streams`_ specification.
Only those messages that are explicitly requested from downstream persistent actors are actually pulled from a persistent actor's
journal.
* *AbstractPersistentActorAtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in * *AbstractPersistentActorAtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in
case of sender and receiver JVM crashes. case of sender and receiver JVM crashes.
@ -80,7 +76,6 @@ Architecture
development of event sourced applications (see section :ref:`event-sourcing-java-lambda`) development of event sourced applications (see section :ref:`event-sourcing-java-lambda`)
.. _Community plugins: https://gist.github.com/krasserm/8612920#file-akka-persistence-plugins-md .. _Community plugins: https://gist.github.com/krasserm/8612920#file-akka-persistence-plugins-md
.. _Reactive Streams: http://www.reactive-streams.org/
.. _event-sourcing-java-lambda: .. _event-sourcing-java-lambda:
@ -335,13 +330,6 @@ The identifier must be defined with the ``viewId`` method.
The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java-lambda` of a view and its The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java-lambda` of a view and its
persistent actor shall be shared (which is what applications usually do not want). persistent actor shall be shared (which is what applications usually do not want).
.. _streams-java-lambda:
Streams
=======
Java API coming soon. See also Scala :ref:`streams` documentation.
.. _snapshots-java-lambda: .. _snapshots-java-lambda:
Snapshots Snapshots

View file

@ -65,10 +65,6 @@ Architecture
persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's
replicated message stream. replicated message stream.
* *Streams*: Messages written by a persistent actor can be published in compliance with the `Reactive Streams`_ specification.
Only those messages that are explicitly requested from downstream persistent actors are actually pulled from a persistent actor's
journal.
* *UntypedPersistentActorAtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in * *UntypedPersistentActorAtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in
case of sender and receiver JVM crashes. case of sender and receiver JVM crashes.
@ -82,7 +78,6 @@ Architecture
storage plugin writes to the local filesystem. storage plugin writes to the local filesystem.
.. _Community plugins: http://akka.io/community/ .. _Community plugins: http://akka.io/community/
.. _Reactive Streams: http://www.reactive-streams.org/
.. _event-sourcing-java: .. _event-sourcing-java:
@ -341,13 +336,6 @@ The identifier must be defined with the ``viewId`` method.
The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java` of a view and its The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java` of a view and its
persistent actor shall be shared (which is what applications usually do not want). persistent actor shall be shared (which is what applications usually do not want).
.. _streams-java:
Streams
=======
Java API coming soon. See also Scala :ref:`streams` documentation.
.. _snapshots-java: .. _snapshots-java:
Snapshots Snapshots

View file

@ -475,46 +475,4 @@ trait PersistenceDocSpec {
//#view-update //#view-update
} }
new AnyRef {
// ------------------------------------------------------------------------------------------------
// FIXME: uncomment once going back to project dependencies (in akka-stream-experimental)
// ------------------------------------------------------------------------------------------------
/*
//#producer-creation
import org.reactivestreams.api.Producer
import akka.persistence.Persistent
import akka.persistence.stream.{ PersistentFlow, PersistentPublisherSettings }
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
val materializer = FlowMaterializer(MaterializerSettings())
val flow: Flow[Persistent] = PersistentFlow.fromPersistence("some-persistence-id")
val producer: Producer[Persistent] = flow.toProducer(materializer)
//#producer-creation
//#producer-buffer-size
PersistentFlow.fromPersistence("some-persistence-id", PersistentPublisherSettings(maxBufferSize = 200))
//#producer-buffer-size
//#producer-examples
// 1 producer and 2 consumers:
val producer1: Producer[Persistent] =
PersistentFlow.fromPersistence("processor-1").toProducer(materializer)
Flow(producer1).foreach(p => println(s"consumer-1: ${p.payload}")).consume(materializer)
Flow(producer1).foreach(p => println(s"consumer-2: ${p.payload}")).consume(materializer)
// 2 producers (merged) and 1 consumer:
val producer2: Producer[Persistent] =
PersistentFlow.fromPersistence("processor-2").toProducer(materializer)
val producer3: Producer[Persistent] =
PersistentFlow.fromPersistence("processor-3").toProducer(materializer)
Flow(producer2).merge(producer3).foreach { p =>
println(s"consumer-3: ${p.payload}")
}.consume(materializer)
//#producer-examples
*/
}
} }

View file

@ -57,10 +57,6 @@ Architecture
persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's
replicated message stream. replicated message stream.
* *Streams*: Messages written by a persistent actor can be published in compliance with the `Reactive Streams`_ specification.
Only those messages that are explicitly requested from downstream persistent actors are actually pulled from a persistent actor's
journal.
* *AtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in * *AtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in
case of sender and receiver JVM crashes. case of sender and receiver JVM crashes.
@ -74,8 +70,6 @@ Architecture
storage plugin writes to the local filesystem. storage plugin writes to the local filesystem.
.. _Community plugins: http://akka.io/community/ .. _Community plugins: http://akka.io/community/
.. _Reactive Streams: http://www.reactive-streams.org/
.. _event-sourcing: .. _event-sourcing:
@ -336,44 +330,6 @@ The identifier must be defined with the ``viewId`` method.
The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots` of a view and its The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots` of a view and its
persistent actor shall be shared (which is what applications usually do not want). persistent actor shall be shared (which is what applications usually do not want).
.. _streams:
Streams
=======
**TODO: rename *producer* to *publisher*.**
A `Reactive Streams`_ ``Producer`` can be created from a persistent actor's message stream via the ``PersistentFlow``
extension of the Akka Streams Scala DSL:
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#producer-creation
The created ``flow`` object is of type ``Flow[Persistent]`` and can be composed with other flows using ``Flow``
combinators (= methods defined on ``Flow``). Calling the ``toProducer`` method on ``flow`` creates a producer
of type ``Producer[Persistent]``.
A persistent message producer only reads from a persistent actor's journal when explicitly requested by downstream
consumers. In order to avoid frequent, fine grained read access to a persistent actor's journal, the producer tries
to buffer persistent messages in memory from which it serves downstream requests. The maximum buffer size per
producer is configurable with a ``PersistentPublisherSettings`` configuration object.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#producer-buffer-size
Other ``ProducerSettings`` parameters are:
* ``fromSequenceNr``: specifies from which sequence number the persistent message stream shall start (defaults
to ``1L``). Please note that specifying ``fromSequenceNr`` is much more efficient than using the ``drop(Int)``
combinator, especially for larger sequence numbers.
* ``idle``: an optional parameter that specifies how long a producer shall wait after a journal read attempt didn't return
any new persistent messages. If not defined, the producer uses the ``akka.persistence.view.auto-update-interval``
configuration parameter, otherwise, it uses the defined ``idle`` parameter.
Here are two examples how persistent message producers can be connected to downstream consumers using the Akka
Streams Scala DSL and its ``PersistentFlow`` extension.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#producer-examples
.. _snapshots: .. _snapshots:
Snapshots Snapshots

View file

@ -1,11 +1,10 @@
name := "akka-sample-persistence-scala" name := "akka-sample-persistence-scala"
version := "2.4-SNAPSHOT" version := "2.3-SNAPSHOT"
scalaVersion := "2.10.4" scalaVersion := "2.10.4"
libraryDependencies ++= Seq( libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-persistence-experimental" % "2.4-SNAPSHOT", "com.typesafe.akka" %% "akka-persistence-experimental" % "2.4-SNAPSHOT"
"com.typesafe.akka" %% "akka-stream-experimental" % "0.3"
) )

View file

@ -1,55 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.persistence
/* FIXME include when akka-stream is in sync
import org.reactivestreams.api._
import akka.actor._
import akka.persistence.{ Persistent, Processor }
import akka.persistence.stream.PersistentFlow
import akka.stream._
import akka.stream.scaladsl._
/**
* This example demonstrates how akka-persistence Views can be used as reactive-stream Producers. A
* View-based Producer is created with PersistentFlow.fromProcessor(processorId: String).toProducer().
* This Producer produces Persistent messages as they are written by its corresponding akka-persistence
* Processor. The PersistentFlow object is an extension to the akka-stream DSL.
*/
object StreamExample extends App {
implicit val system = ActorSystem("example")
class ExampleProcessor(pid: String) extends Processor {
override def processorId = pid
def receive = {
case Persistent(payload, _) =>
}
}
val p1 = system.actorOf(Props(classOf[ExampleProcessor], "p1"))
val p2 = system.actorOf(Props(classOf[ExampleProcessor], "p2"))
val materializer = FlowMaterializer(MaterializerSettings())
// 1 view-backed producer and 2 consumers:
val producer1: Producer[Persistent] = PersistentFlow.fromProcessor("p1").toProducer(materializer)
Flow(producer1).foreach { p => println(s"consumer-1: ${p.payload}") }.consume(materializer)
Flow(producer1).foreach { p => println(s"consumer-2: ${p.payload}") }.consume(materializer)
// 2 view-backed producers (merged) and 1 consumer:
// This is an example how message/event streams from multiple processors can be merged into a single stream.
val producer2: Producer[Persistent] = PersistentFlow.fromProcessor("p1").toProducer(materializer)
val merged: Producer[Persistent] = PersistentFlow.fromProcessor("p2").merge(producer2).toProducer(materializer)
Flow(merged).foreach { p => println(s"consumer-3: ${p.payload}") }.consume(materializer)
while (true) {
p1 ! Persistent("a-" + System.currentTimeMillis())
p2 ! Persistent("b-" + System.currentTimeMillis())
Thread.sleep(500)
}
}
*/