diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index b51a8c844b..88d946fac1 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -61,6 +61,10 @@ Architecture processor. A view itself does not journal new messages, instead, it updates internal state only from a processor's replicated message stream. +* *Streams*: Messages written by a processor can be published in compliance with the `Reactive Streams`_ specification. + Only those messages that are explicitly requested from downstream processors are actually pulled from a processor's + journal. + * *Channel*: Channels are used by processors and views to communicate with other actors. They prevent that replayed messages are redundantly delivered to these actors and provide at-least-once message delivery semantics, also in case of sender and receiver JVM crashes. @@ -78,6 +82,7 @@ Architecture development of event sourced applications (see section :ref:`event-sourcing-java`) .. _Community plugins: http://akka.io/community/ +.. _Reactive Streams: http://www.reactive-streams.org/ .. _processors-java: @@ -251,6 +256,13 @@ name in its actor hierarchy and hence influences only part of the view id. To fu The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java` of a view and its processor shall be shared (which is what applications usually do not want). +.. _streams-java: + +Streams +======= + +Java API coming soon. See also Scala :ref:`streams` documentation. + .. _channels-java: Channels diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index f3323dbb43..d0a1755528 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -49,6 +49,10 @@ Architecture processor. A view itself does not journal new messages, instead, it updates internal state only from a processor's replicated message stream. +* *Streams*: Messages written by a processor can be published in compliance with the `Reactive Streams`_ specification. + Only those messages that are explicitly requested from downstream processors are actually pulled from a processor's + journal. + * *Channel*: Channels are used by processors and views to communicate with other actors. They prevent that replayed messages are redundantly delivered to these actors and provide at-least-once message delivery semantics, also in case of sender and receiver JVM crashes. @@ -66,6 +70,7 @@ Architecture development of event sourced applications (see section :ref:`event-sourcing`) .. _Community plugins: http://akka.io/community/ +.. _Reactive Streams: http://www.reactive-streams.org/ .. _processors: @@ -241,6 +246,44 @@ name in its actor hierarchy and hence influences only part of the view id. To fu The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots` of a view and its processor shall be shared (which is what applications usually do not want). +.. _streams: + +Streams +======= + +**TODO: rename *producer* to *publisher*.** + +A `Reactive Streams`_ ``Producer`` can be created from a processor's message stream via the ``PersistentFlow`` +extension of the Akka Streams Scala DSL: + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#producer-creation + +The created ``flow`` object is of type ``Flow[Persistent]`` and can be composed with other flows using ``Flow`` +combinators (= methods defined on ``Flow``). Calling the ``toProducer`` method on ``flow`` creates a producer +of type ``Producer[Persistent]``. + +A persistent message producer only reads from a processor's journal when explicitly requested by downstream +consumers. In order to avoid frequent, fine grained read access to a processor's journal, the producer tries +to buffer persistent messages in memory from which it serves downstream requests. The maximum buffer size per +producer is configurable with a ``PersistentPublisherSettings`` configuration object. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#producer-buffer-size + +Other ``ProducerSettings`` parameters are: + +* ``fromSequenceNr``: specifies from which sequence number the persistent message stream shall start (defaults + to ``1L``). Please note that specifying ``fromSequenceNr`` is much more efficient than using the ``drop(Int)`` + combinator, especially for larger sequence numbers. + +* ``idle``: an optional parameter that specifies how long a producer shall wait after a journal read attempt didn't return + any new persistent messages. If not defined, the producer uses the ``akka.persistence.view.auto-update-interval`` + configuration parameter, otherwise, it uses the defined ``idle`` parameter. + +Here are two examples how persistent message producers can be connected to downstream consumers using the Akka +Streams Scala DSL and its ``PersistentFlow`` extension. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#producer-examples + .. _channels: Channels diff --git a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/StreamExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/StreamExample.scala new file mode 100644 index 0000000000..e495a9ce3b --- /dev/null +++ b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/StreamExample.scala @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package sample.persistence + +/* FIXME include when akka-stream is in sync + +import org.reactivestreams.api._ + +import akka.actor._ +import akka.persistence.{ Persistent, Processor } +import akka.persistence.stream.PersistentFlow +import akka.stream._ +import akka.stream.scaladsl._ + +/** + * This example demonstrates how akka-persistence Views can be used as reactive-stream Producers. A + * View-based Producer is created with PersistentFlow.fromProcessor(processorId: String).toProducer(). + * This Producer produces Persistent messages as they are written by its corresponding akka-persistence + * Processor. The PersistentFlow object is an extension to the akka-stream DSL. + */ +object StreamExample extends App { + implicit val system = ActorSystem("example") + + class ExampleProcessor(pid: String) extends Processor { + override def processorId = pid + def receive = { + case Persistent(payload, _) => + } + } + + val p1 = system.actorOf(Props(classOf[ExampleProcessor], "p1")) + val p2 = system.actorOf(Props(classOf[ExampleProcessor], "p2")) + + val materializer = FlowMaterializer(MaterializerSettings()) + + // 1 view-backed producer and 2 consumers: + val producer1: Producer[Persistent] = PersistentFlow.fromProcessor("p1").toProducer(materializer) + Flow(producer1).foreach { p => println(s"consumer-1: ${p.payload}") }.consume(materializer) + Flow(producer1).foreach { p => println(s"consumer-2: ${p.payload}") }.consume(materializer) + + // 2 view-backed producers (merged) and 1 consumer: + // This is an example how message/event streams from multiple processors can be merged into a single stream. + val producer2: Producer[Persistent] = PersistentFlow.fromProcessor("p1").toProducer(materializer) + val merged: Producer[Persistent] = PersistentFlow.fromProcessor("p2").merge(producer2).toProducer(materializer) + Flow(merged).foreach { p => println(s"consumer-3: ${p.payload}") }.consume(materializer) + + while (true) { + p1 ! Persistent("a-" + System.currentTimeMillis()) + p2 ! Persistent("b-" + System.currentTimeMillis()) + Thread.sleep(500) + } +} + +*/ \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala new file mode 100644 index 0000000000..438363b633 --- /dev/null +++ b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala @@ -0,0 +1,249 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.persistence.stream + +import scala.util.control.NonFatal +import scala.concurrent.duration._ + +import org.reactivestreams.api.Producer +import org.reactivestreams.spi.Subscriber + +import akka.actor._ +import akka.persistence._ +import akka.stream._ +import akka.stream.impl._ +import akka.stream.impl.Ast.ProducerNode +import akka.stream.scaladsl.Flow + +// ------------------------------------------------------------------------------------------------ +// FIXME: move this file to akka-persistence-experimental once going back to project dependencies +// NOTE: "producer" has been changed to "publisher" wherever possible, covering the upcoming +// changes in reactive-streams. +// ------------------------------------------------------------------------------------------------ + +object PersistentFlow { + /** + * Starts a new [[Persistent]] message flow from the given processor, + * identified by `processorId`. Elements are pulled from the processor's + * journal (using a [[View]]) in accordance with the demand coming from + * the downstream transformation steps. + * + * Elements pulled from the processor's journal are buffered in memory so that + * fine-grained demands (requests) from downstream can be served efficiently. + */ + def fromProcessor(processorId: String): Flow[Persistent] = + fromProcessor(processorId, PersistentPublisherSettings()) + + /** + * Starts a new [[Persistent]] message flow from the given processor, + * identified by `processorId`. Elements are pulled from the processor's + * journal (using a [[View]]) in accordance with the demand coming from + * the downstream transformation steps. + * + * Elements pulled from the processor's journal are buffered in memory so that + * fine-grained demands (requests) from downstream can be served efficiently. + * Reads from the journal are done in (coarse-grained) batches of configurable + * size (which correspond to the configurable maximum buffer size). + * + * @see [[PersistentPublisherSettings]] + */ + def fromProcessor(processorId: String, publisherSettings: PersistentPublisherSettings): Flow[Persistent] = + FlowImpl(PersistentPublisherNode(processorId, publisherSettings), Nil) +} + +/** + * Configuration object for a [[Persistent]] stream publisher. + * + * @param fromSequenceNr Sequence number where the published stream shall start (inclusive). + * Default is `1L`. + * @param maxBufferSize Maximum number of persistent messages to be buffered in memory (per publisher). + * Default is `100`. + * @param idle Optional duration to wait if no more persistent messages can be pulled from the journal + * before attempting the next pull. Default is `None` which causes the publisher to take + * the value defined by the `akka.persistence.view.auto-update-interval` configuration + * key. If defined, the `idle` value is taken directly. + */ +case class PersistentPublisherSettings(fromSequenceNr: Long = 1L, maxBufferSize: Int = 100, idle: Option[FiniteDuration] = None) { + require(fromSequenceNr > 0L, "fromSequenceNr must be > 0") +} + +private object PersistentPublisher { + def props(processorId: String, publisherSettings: PersistentPublisherSettings, settings: MaterializerSettings): Props = + Props(classOf[PersistentPublisherImpl], processorId, publisherSettings, settings) +} + +private case class PersistentPublisherNode(processorId: String, publisherSettings: PersistentPublisherSettings) extends ProducerNode[Persistent] { + def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[Persistent] = + new ActorProducer(context.actorOf(PersistentPublisher.props(processorId, publisherSettings, settings))) +} + +private class PersistentPublisherImpl(processorId: String, publisherSettings: PersistentPublisherSettings, materializerSettings: MaterializerSettings) + extends Actor + with ActorLogging + with SubscriberManagement[Persistent] + with SoftShutdown { + + import ActorBasedFlowMaterializer._ + import PersistentPublisherBuffer._ + + type S = ActorSubscription[Persistent] + + private val buffer = context.actorOf(Props(classOf[PersistentPublisherBuffer], processorId, publisherSettings, self), "publisherBuffer") + + private var pub: ActorPublisher[Persistent] = _ + private var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason + + final def receive = { + case ExposedPublisher(pub) ⇒ + this.pub = pub.asInstanceOf[ActorPublisher[Persistent]] + context.become(waitingForSubscribers) + } + + final def waitingForSubscribers: Receive = { + case SubscribePending ⇒ + pub.takePendingSubscribers() foreach registerSubscriber + context.become(active) + } + + final def active: Receive = { + case SubscribePending ⇒ + pub.takePendingSubscribers() foreach registerSubscriber + case RequestMore(sub, elements) ⇒ + moreRequested(sub.asInstanceOf[S], elements) + case Cancel(sub) ⇒ + unregisterSubscription(sub.asInstanceOf[S]) + case Response(ps) ⇒ + try { + ps.foreach(pushToDownstream) + } catch { + case Stop ⇒ { completeDownstream(); shutdownReason = None } + case NonFatal(e) ⇒ { abortDownstream(e); shutdownReason = Some(e) } + } + } + + override def requestFromUpstream(elements: Int): Unit = + buffer ! Request(elements) + + override def initialBufferSize = + materializerSettings.initialFanOutBufferSize + + override def maxBufferSize = + materializerSettings.maxFanOutBufferSize + + override def createSubscription(subscriber: Subscriber[Persistent]): ActorSubscription[Persistent] = + new ActorSubscription(self, subscriber) + + override def cancelUpstream(): Unit = { + pub.shutdown(shutdownReason) + context.stop(buffer) + softShutdown() + } + override def shutdown(completed: Boolean): Unit = { + pub.shutdown(shutdownReason) + context.stop(buffer) + softShutdown() + } + + override def postStop(): Unit = { + pub.shutdown(shutdownReason) + } +} + +private object PersistentPublisherBuffer { + case class Request(num: Int) + case class Response(messages: Vector[Persistent]) + + case object Fill + case object Filled +} + +/** + * A view that buffers up to `publisherSettings.maxBufferSize` persistent messages in memory. + * Downstream demands (requests) are served if the buffer is non-empty either while filling + * the buffer or after having filled the buffer. When the buffer becomes empty new persistent + * messages are loaded from the journal (in batches up to `publisherSettings.maxBufferSize`). + */ +private class PersistentPublisherBuffer(override val processorId: String, publisherSettings: PersistentPublisherSettings, publisher: ActorRef) extends View { + import PersistentPublisherBuffer._ + import context.dispatcher + + private var replayed = 0 + private var requested = 0 + private var buffer: Vector[Persistent] = Vector.empty + + private val filling: Receive = { + case p: Persistent ⇒ + buffer :+= p + replayed += 1 + if (requested > 0) respond(requested) + case Filled ⇒ + if (buffer.nonEmpty && requested > 0) respond(requested) + if (buffer.nonEmpty) pause() + else if (replayed > 0) fill() + else schedule() + case Request(num) ⇒ + requested += num + if (buffer.nonEmpty) respond(requested) + } + + private val pausing: Receive = { + case Request(num) ⇒ + requested += num + respond(requested) + if (buffer.isEmpty) fill() + } + + private val scheduled: Receive = { + case Fill ⇒ + fill() + case Request(num) ⇒ + requested += num + } + + def receive = filling + + override def onReplaySuccess(receive: Receive, await: Boolean): Unit = { + super.onReplaySuccess(receive, await) + self ! Filled + } + + override def onReplayFailure(receive: Receive, await: Boolean, cause: Throwable): Unit = { + super.onReplayFailure(receive, await, cause) + self ! Filled + } + + override def lastSequenceNr: Long = + math.max(publisherSettings.fromSequenceNr - 1L, super.lastSequenceNr) + + override def autoUpdateInterval: FiniteDuration = + publisherSettings.idle.getOrElse(super.autoUpdateInterval) + + override def autoUpdateReplayMax: Long = + publisherSettings.maxBufferSize + + override def autoUpdate: Boolean = + false + + private def fill(): Unit = { + replayed = 0 + context.become(filling) + self ! Update(await = false, autoUpdateReplayMax) + } + + private def pause(): Unit = { + context.become(pausing) + } + + private def schedule(): Unit = { + context.become(scheduled) + context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Fill) + } + + private def respond(num: Int): Unit = { + val (res, buf) = buffer.splitAt(num) + publisher ! Response(res) + buffer = buf + requested -= res.size + } +} diff --git a/akka-stream/src/test/scala/akka/persistence/stream/PersistenceSpec.scala b/akka-stream/src/test/scala/akka/persistence/stream/PersistenceSpec.scala new file mode 100644 index 0000000000..8096afa8b6 --- /dev/null +++ b/akka-stream/src/test/scala/akka/persistence/stream/PersistenceSpec.scala @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.persistence.stream + +import java.io.File +import java.util.concurrent.atomic.AtomicInteger + +import scala.reflect.ClassTag + +import com.typesafe.config.ConfigFactory + +import org.apache.commons.io.FileUtils +import org.scalatest.BeforeAndAfterEach + +import akka.actor.Props +import akka.persistence._ +import akka.stream.testkit.AkkaSpec + +// --------------------------------------------------------------------------- +// FIXME: remove this file once going back to project dependencies +// --------------------------------------------------------------------------- + +trait PersistenceSpec extends BeforeAndAfterEach with Cleanup { this: AkkaSpec ⇒ + private var _name: String = _ + + lazy val extension = Persistence(system) + val counter = new AtomicInteger(0) + + /** + * Unique name per test. + */ + def name = _name + + /** + * Prefix for generating a unique name per test. + */ + def namePrefix: String = system.name + + /** + * Creates a processor with current name as constructor argument. + */ + def namedProcessor[T <: NamedProcessor: ClassTag] = + system.actorOf(Props(implicitly[ClassTag[T]].runtimeClass, name)) + + override protected def beforeEach() { + _name = s"${namePrefix}-${counter.incrementAndGet()}" + } +} + +object PersistenceSpec { + def config(plugin: String, test: String, serialization: String = "on") = ConfigFactory.parseString( + s""" + akka.actor.serialize-creators = ${serialization} + akka.actor.serialize-messages = ${serialization} + akka.persistence.publish-confirmations = on + akka.persistence.publish-plugin-commands = on + akka.persistence.journal.plugin = "akka.persistence.journal.${plugin}" + akka.persistence.journal.leveldb.dir = "target/journal-${test}" + akka.persistence.snapshot-store.local.dir = "target/snapshots-${test}/" + akka.test.single-expect-default = 10s + """) +} + +trait Cleanup { this: AkkaSpec ⇒ + val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + + override protected def atStartup() { + storageLocations.foreach(FileUtils.deleteDirectory) + } + + override protected def afterTermination() { + storageLocations.foreach(FileUtils.deleteDirectory) + } +} + +abstract class NamedProcessor(name: String) extends Processor { + override def processorId: String = name +} diff --git a/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala b/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala new file mode 100644 index 0000000000..230ca428bf --- /dev/null +++ b/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala @@ -0,0 +1,163 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.persistence.stream + +import scala.concurrent.duration._ + +import akka.actor._ +import akka.persistence._ +import akka.stream._ +import akka.stream.scaladsl._ +import akka.stream.testkit._ +import akka.testkit.TestProbe + +// ------------------------------------------------------------------------------------------------ +// FIXME: move this file to akka-persistence-experimental once going back to project dependencies +// ------------------------------------------------------------------------------------------------ + +object PersistentPublisherSpec { + class TestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) { + def receive = { + case Persistent(payload, sequenceNr) ⇒ probe ! s"${payload}-${sequenceNr}" + } + } +} + +class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "ViewProducerSpec", serialization = "off")) with PersistenceSpec { + import PersistentPublisherSpec._ + + val numMessages = 10 + + val publisherSettings = PersistentPublisherSettings(idle = Some(100.millis)) + val materializer = FlowMaterializer(MaterializerSettings()) + + var processor1: ActorRef = _ + var processor2: ActorRef = _ + + var processor1Probe: TestProbe = _ + var processor2Probe: TestProbe = _ + + def processorId(num: Int): String = + name + num + + override protected def beforeEach(): Unit = { + super.beforeEach() + + processor1Probe = TestProbe() + processor2Probe = TestProbe() + + processor1 = system.actorOf(Props(classOf[TestProcessor], processorId(1), processor1Probe.ref)) + processor2 = system.actorOf(Props(classOf[TestProcessor], processorId(2), processor2Probe.ref)) + + 1 to numMessages foreach { i ⇒ + processor1 ! Persistent("a") + processor2 ! Persistent("b") + + processor1Probe.expectMsg(s"a-${i}") + processor2Probe.expectMsg(s"b-${i}") + } + } + + override protected def afterEach(): Unit = { + system.stop(processor1) + system.stop(processor1) + super.afterEach() + } + + "A view producer" must { + "pull existing messages from a processor's journal" in { + val streamProbe = TestProbe() + + PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach { + case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}" + }.consume(materializer) + + 1 to numMessages foreach { i ⇒ + streamProbe.expectMsg(s"a-${i}") + } + } + "pull existing messages and new from a processor's journal" in { + val streamProbe = TestProbe() + + PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach { + case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}" + }.consume(materializer) + + 1 to numMessages foreach { i ⇒ + streamProbe.expectMsg(s"a-${i}") + } + + processor1 ! Persistent("a") + processor1 ! Persistent("a") + + streamProbe.expectMsg(s"a-${numMessages + 1}") + streamProbe.expectMsg(s"a-${numMessages + 2}") + } + "pull existing messages from a processor's journal starting form a specified sequence number" in { + val streamProbe = TestProbe() + val fromSequenceNr = 5L + + PersistentFlow.fromProcessor(processorId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr)).foreach { + case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}" + }.consume(materializer) + + fromSequenceNr to numMessages foreach { i ⇒ + streamProbe.expectMsg(s"a-${i}") + } + } + } + + "A view producer" can { + "have several consumers" in { + val streamProbe1 = TestProbe() + val streamProbe2 = TestProbe() + + val producer = PersistentFlow.fromProcessor(processorId(1), publisherSettings).toProducer(materializer) + + Flow(producer).foreach { + case Persistent(payload, sequenceNr) ⇒ streamProbe1.ref ! s"${payload}-${sequenceNr}" + }.consume(materializer) + + // let consumer consume all existing messages + 1 to numMessages foreach { i ⇒ + streamProbe1.expectMsg(s"a-${i}") + } + + // subscribe another consumer + Flow(producer).foreach { + case Persistent(payload, sequenceNr) ⇒ streamProbe2.ref ! s"${payload}-${sequenceNr}" + }.consume(materializer) + + // produce new messages and let both consumers handle them + 1 to 2 foreach { i ⇒ + processor1 ! Persistent("a") + streamProbe1.expectMsg(s"a-${numMessages + i}") + streamProbe2.expectMsg(s"a-${numMessages + i}") + } + } + } + + "A consumer" can { + "consume from several view producers" in { + val streamProbe1 = TestProbe() + val streamProbe2 = TestProbe() + + val fromSequenceNr1 = 7L + val fromSequenceNr2 = 3L + + val producer1 = PersistentFlow.fromProcessor(processorId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr1)).toProducer(materializer) + val producer2 = PersistentFlow.fromProcessor(processorId(2), publisherSettings.copy(fromSequenceNr = fromSequenceNr2)).toProducer(materializer) + + Flow(producer1).merge(producer2).foreach { + case Persistent(payload: String, sequenceNr) if (payload.startsWith("a")) ⇒ streamProbe1.ref ! s"${payload}-${sequenceNr}" + case Persistent(payload: String, sequenceNr) if (payload.startsWith("b")) ⇒ streamProbe2.ref ! s"${payload}-${sequenceNr}" + }.consume(materializer) + + 1 to numMessages foreach { i ⇒ + if (i >= fromSequenceNr1) streamProbe1.expectMsg(s"a-${i}") + if (i >= fromSequenceNr2) streamProbe2.expectMsg(s"b-${i}") + } + } + } +}