+per,+str #15035 Reactive-stream producers for persistent messages written by akka-persistence processors

(cherry picked from commit 5dc62400e4f2f9d7afe5efe93fc8bef5b9d226c0)

Conflicts:
	akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala
	project/AkkaBuild.scala
This commit is contained in:
Martin Krasser 2014-05-01 09:05:15 +02:00 committed by Patrik Nordwall
parent e22711107b
commit 062d304b73
6 changed files with 605 additions and 0 deletions

View file

@ -61,6 +61,10 @@ Architecture
processor. A view itself does not journal new messages, instead, it updates internal state only from a processor's processor. A view itself does not journal new messages, instead, it updates internal state only from a processor's
replicated message stream. replicated message stream.
* *Streams*: Messages written by a processor can be published in compliance with the `Reactive Streams`_ specification.
Only those messages that are explicitly requested from downstream processors are actually pulled from a processor's
journal.
* *Channel*: Channels are used by processors and views to communicate with other actors. They prevent that replayed * *Channel*: Channels are used by processors and views to communicate with other actors. They prevent that replayed
messages are redundantly delivered to these actors and provide at-least-once message delivery semantics, also in messages are redundantly delivered to these actors and provide at-least-once message delivery semantics, also in
case of sender and receiver JVM crashes. case of sender and receiver JVM crashes.
@ -78,6 +82,7 @@ Architecture
development of event sourced applications (see section :ref:`event-sourcing-java`) development of event sourced applications (see section :ref:`event-sourcing-java`)
.. _Community plugins: http://akka.io/community/ .. _Community plugins: http://akka.io/community/
.. _Reactive Streams: http://www.reactive-streams.org/
.. _processors-java: .. _processors-java:
@ -251,6 +256,13 @@ name in its actor hierarchy and hence influences only part of the view id. To fu
The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java` of a view and its The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java` of a view and its
processor shall be shared (which is what applications usually do not want). processor shall be shared (which is what applications usually do not want).
.. _streams-java:
Streams
=======
Java API coming soon. See also Scala :ref:`streams` documentation.
.. _channels-java: .. _channels-java:
Channels Channels

View file

@ -49,6 +49,10 @@ Architecture
processor. A view itself does not journal new messages, instead, it updates internal state only from a processor's processor. A view itself does not journal new messages, instead, it updates internal state only from a processor's
replicated message stream. replicated message stream.
* *Streams*: Messages written by a processor can be published in compliance with the `Reactive Streams`_ specification.
Only those messages that are explicitly requested from downstream processors are actually pulled from a processor's
journal.
* *Channel*: Channels are used by processors and views to communicate with other actors. They prevent that replayed * *Channel*: Channels are used by processors and views to communicate with other actors. They prevent that replayed
messages are redundantly delivered to these actors and provide at-least-once message delivery semantics, also in messages are redundantly delivered to these actors and provide at-least-once message delivery semantics, also in
case of sender and receiver JVM crashes. case of sender and receiver JVM crashes.
@ -66,6 +70,7 @@ Architecture
development of event sourced applications (see section :ref:`event-sourcing`) development of event sourced applications (see section :ref:`event-sourcing`)
.. _Community plugins: http://akka.io/community/ .. _Community plugins: http://akka.io/community/
.. _Reactive Streams: http://www.reactive-streams.org/
.. _processors: .. _processors:
@ -241,6 +246,44 @@ name in its actor hierarchy and hence influences only part of the view id. To fu
The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots` of a view and its The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots` of a view and its
processor shall be shared (which is what applications usually do not want). processor shall be shared (which is what applications usually do not want).
.. _streams:
Streams
=======
**TODO: rename *producer* to *publisher*.**
A `Reactive Streams`_ ``Producer`` can be created from a processor's message stream via the ``PersistentFlow``
extension of the Akka Streams Scala DSL:
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#producer-creation
The created ``flow`` object is of type ``Flow[Persistent]`` and can be composed with other flows using ``Flow``
combinators (= methods defined on ``Flow``). Calling the ``toProducer`` method on ``flow`` creates a producer
of type ``Producer[Persistent]``.
A persistent message producer only reads from a processor's journal when explicitly requested by downstream
consumers. In order to avoid frequent, fine grained read access to a processor's journal, the producer tries
to buffer persistent messages in memory from which it serves downstream requests. The maximum buffer size per
producer is configurable with a ``PersistentPublisherSettings`` configuration object.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#producer-buffer-size
Other ``ProducerSettings`` parameters are:
* ``fromSequenceNr``: specifies from which sequence number the persistent message stream shall start (defaults
to ``1L``). Please note that specifying ``fromSequenceNr`` is much more efficient than using the ``drop(Int)``
combinator, especially for larger sequence numbers.
* ``idle``: an optional parameter that specifies how long a producer shall wait after a journal read attempt didn't return
any new persistent messages. If not defined, the producer uses the ``akka.persistence.view.auto-update-interval``
configuration parameter, otherwise, it uses the defined ``idle`` parameter.
Here are two examples how persistent message producers can be connected to downstream consumers using the Akka
Streams Scala DSL and its ``PersistentFlow`` extension.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#producer-examples
.. _channels: .. _channels:
Channels Channels

View file

@ -0,0 +1,55 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.persistence
/* FIXME include when akka-stream is in sync
import org.reactivestreams.api._
import akka.actor._
import akka.persistence.{ Persistent, Processor }
import akka.persistence.stream.PersistentFlow
import akka.stream._
import akka.stream.scaladsl._
/**
* This example demonstrates how akka-persistence Views can be used as reactive-stream Producers. A
* View-based Producer is created with PersistentFlow.fromProcessor(processorId: String).toProducer().
* This Producer produces Persistent messages as they are written by its corresponding akka-persistence
* Processor. The PersistentFlow object is an extension to the akka-stream DSL.
*/
object StreamExample extends App {
implicit val system = ActorSystem("example")
class ExampleProcessor(pid: String) extends Processor {
override def processorId = pid
def receive = {
case Persistent(payload, _) =>
}
}
val p1 = system.actorOf(Props(classOf[ExampleProcessor], "p1"))
val p2 = system.actorOf(Props(classOf[ExampleProcessor], "p2"))
val materializer = FlowMaterializer(MaterializerSettings())
// 1 view-backed producer and 2 consumers:
val producer1: Producer[Persistent] = PersistentFlow.fromProcessor("p1").toProducer(materializer)
Flow(producer1).foreach { p => println(s"consumer-1: ${p.payload}") }.consume(materializer)
Flow(producer1).foreach { p => println(s"consumer-2: ${p.payload}") }.consume(materializer)
// 2 view-backed producers (merged) and 1 consumer:
// This is an example how message/event streams from multiple processors can be merged into a single stream.
val producer2: Producer[Persistent] = PersistentFlow.fromProcessor("p1").toProducer(materializer)
val merged: Producer[Persistent] = PersistentFlow.fromProcessor("p2").merge(producer2).toProducer(materializer)
Flow(merged).foreach { p => println(s"consumer-3: ${p.payload}") }.consume(materializer)
while (true) {
p1 ! Persistent("a-" + System.currentTimeMillis())
p2 ! Persistent("b-" + System.currentTimeMillis())
Thread.sleep(500)
}
}
*/

View file

@ -0,0 +1,249 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.stream
import scala.util.control.NonFatal
import scala.concurrent.duration._
import org.reactivestreams.api.Producer
import org.reactivestreams.spi.Subscriber
import akka.actor._
import akka.persistence._
import akka.stream._
import akka.stream.impl._
import akka.stream.impl.Ast.ProducerNode
import akka.stream.scaladsl.Flow
// ------------------------------------------------------------------------------------------------
// FIXME: move this file to akka-persistence-experimental once going back to project dependencies
// NOTE: "producer" has been changed to "publisher" wherever possible, covering the upcoming
// changes in reactive-streams.
// ------------------------------------------------------------------------------------------------
object PersistentFlow {
/**
* Starts a new [[Persistent]] message flow from the given processor,
* identified by `processorId`. Elements are pulled from the processor's
* journal (using a [[View]]) in accordance with the demand coming from
* the downstream transformation steps.
*
* Elements pulled from the processor's journal are buffered in memory so that
* fine-grained demands (requests) from downstream can be served efficiently.
*/
def fromProcessor(processorId: String): Flow[Persistent] =
fromProcessor(processorId, PersistentPublisherSettings())
/**
* Starts a new [[Persistent]] message flow from the given processor,
* identified by `processorId`. Elements are pulled from the processor's
* journal (using a [[View]]) in accordance with the demand coming from
* the downstream transformation steps.
*
* Elements pulled from the processor's journal are buffered in memory so that
* fine-grained demands (requests) from downstream can be served efficiently.
* Reads from the journal are done in (coarse-grained) batches of configurable
* size (which correspond to the configurable maximum buffer size).
*
* @see [[PersistentPublisherSettings]]
*/
def fromProcessor(processorId: String, publisherSettings: PersistentPublisherSettings): Flow[Persistent] =
FlowImpl(PersistentPublisherNode(processorId, publisherSettings), Nil)
}
/**
* Configuration object for a [[Persistent]] stream publisher.
*
* @param fromSequenceNr Sequence number where the published stream shall start (inclusive).
* Default is `1L`.
* @param maxBufferSize Maximum number of persistent messages to be buffered in memory (per publisher).
* Default is `100`.
* @param idle Optional duration to wait if no more persistent messages can be pulled from the journal
* before attempting the next pull. Default is `None` which causes the publisher to take
* the value defined by the `akka.persistence.view.auto-update-interval` configuration
* key. If defined, the `idle` value is taken directly.
*/
case class PersistentPublisherSettings(fromSequenceNr: Long = 1L, maxBufferSize: Int = 100, idle: Option[FiniteDuration] = None) {
require(fromSequenceNr > 0L, "fromSequenceNr must be > 0")
}
private object PersistentPublisher {
def props(processorId: String, publisherSettings: PersistentPublisherSettings, settings: MaterializerSettings): Props =
Props(classOf[PersistentPublisherImpl], processorId, publisherSettings, settings)
}
private case class PersistentPublisherNode(processorId: String, publisherSettings: PersistentPublisherSettings) extends ProducerNode[Persistent] {
def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[Persistent] =
new ActorProducer(context.actorOf(PersistentPublisher.props(processorId, publisherSettings, settings)))
}
private class PersistentPublisherImpl(processorId: String, publisherSettings: PersistentPublisherSettings, materializerSettings: MaterializerSettings)
extends Actor
with ActorLogging
with SubscriberManagement[Persistent]
with SoftShutdown {
import ActorBasedFlowMaterializer._
import PersistentPublisherBuffer._
type S = ActorSubscription[Persistent]
private val buffer = context.actorOf(Props(classOf[PersistentPublisherBuffer], processorId, publisherSettings, self), "publisherBuffer")
private var pub: ActorPublisher[Persistent] = _
private var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason
final def receive = {
case ExposedPublisher(pub)
this.pub = pub.asInstanceOf[ActorPublisher[Persistent]]
context.become(waitingForSubscribers)
}
final def waitingForSubscribers: Receive = {
case SubscribePending
pub.takePendingSubscribers() foreach registerSubscriber
context.become(active)
}
final def active: Receive = {
case SubscribePending
pub.takePendingSubscribers() foreach registerSubscriber
case RequestMore(sub, elements)
moreRequested(sub.asInstanceOf[S], elements)
case Cancel(sub)
unregisterSubscription(sub.asInstanceOf[S])
case Response(ps)
try {
ps.foreach(pushToDownstream)
} catch {
case Stop { completeDownstream(); shutdownReason = None }
case NonFatal(e) { abortDownstream(e); shutdownReason = Some(e) }
}
}
override def requestFromUpstream(elements: Int): Unit =
buffer ! Request(elements)
override def initialBufferSize =
materializerSettings.initialFanOutBufferSize
override def maxBufferSize =
materializerSettings.maxFanOutBufferSize
override def createSubscription(subscriber: Subscriber[Persistent]): ActorSubscription[Persistent] =
new ActorSubscription(self, subscriber)
override def cancelUpstream(): Unit = {
pub.shutdown(shutdownReason)
context.stop(buffer)
softShutdown()
}
override def shutdown(completed: Boolean): Unit = {
pub.shutdown(shutdownReason)
context.stop(buffer)
softShutdown()
}
override def postStop(): Unit = {
pub.shutdown(shutdownReason)
}
}
private object PersistentPublisherBuffer {
case class Request(num: Int)
case class Response(messages: Vector[Persistent])
case object Fill
case object Filled
}
/**
* A view that buffers up to `publisherSettings.maxBufferSize` persistent messages in memory.
* Downstream demands (requests) are served if the buffer is non-empty either while filling
* the buffer or after having filled the buffer. When the buffer becomes empty new persistent
* messages are loaded from the journal (in batches up to `publisherSettings.maxBufferSize`).
*/
private class PersistentPublisherBuffer(override val processorId: String, publisherSettings: PersistentPublisherSettings, publisher: ActorRef) extends View {
import PersistentPublisherBuffer._
import context.dispatcher
private var replayed = 0
private var requested = 0
private var buffer: Vector[Persistent] = Vector.empty
private val filling: Receive = {
case p: Persistent
buffer :+= p
replayed += 1
if (requested > 0) respond(requested)
case Filled
if (buffer.nonEmpty && requested > 0) respond(requested)
if (buffer.nonEmpty) pause()
else if (replayed > 0) fill()
else schedule()
case Request(num)
requested += num
if (buffer.nonEmpty) respond(requested)
}
private val pausing: Receive = {
case Request(num)
requested += num
respond(requested)
if (buffer.isEmpty) fill()
}
private val scheduled: Receive = {
case Fill
fill()
case Request(num)
requested += num
}
def receive = filling
override def onReplaySuccess(receive: Receive, await: Boolean): Unit = {
super.onReplaySuccess(receive, await)
self ! Filled
}
override def onReplayFailure(receive: Receive, await: Boolean, cause: Throwable): Unit = {
super.onReplayFailure(receive, await, cause)
self ! Filled
}
override def lastSequenceNr: Long =
math.max(publisherSettings.fromSequenceNr - 1L, super.lastSequenceNr)
override def autoUpdateInterval: FiniteDuration =
publisherSettings.idle.getOrElse(super.autoUpdateInterval)
override def autoUpdateReplayMax: Long =
publisherSettings.maxBufferSize
override def autoUpdate: Boolean =
false
private def fill(): Unit = {
replayed = 0
context.become(filling)
self ! Update(await = false, autoUpdateReplayMax)
}
private def pause(): Unit = {
context.become(pausing)
}
private def schedule(): Unit = {
context.become(scheduled)
context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Fill)
}
private def respond(num: Int): Unit = {
val (res, buf) = buffer.splitAt(num)
publisher ! Response(res)
buffer = buf
requested -= res.size
}
}

View file

@ -0,0 +1,83 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.stream
import java.io.File
import java.util.concurrent.atomic.AtomicInteger
import scala.reflect.ClassTag
import com.typesafe.config.ConfigFactory
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfterEach
import akka.actor.Props
import akka.persistence._
import akka.stream.testkit.AkkaSpec
// ---------------------------------------------------------------------------
// FIXME: remove this file once going back to project dependencies
// ---------------------------------------------------------------------------
trait PersistenceSpec extends BeforeAndAfterEach with Cleanup { this: AkkaSpec
private var _name: String = _
lazy val extension = Persistence(system)
val counter = new AtomicInteger(0)
/**
* Unique name per test.
*/
def name = _name
/**
* Prefix for generating a unique name per test.
*/
def namePrefix: String = system.name
/**
* Creates a processor with current name as constructor argument.
*/
def namedProcessor[T <: NamedProcessor: ClassTag] =
system.actorOf(Props(implicitly[ClassTag[T]].runtimeClass, name))
override protected def beforeEach() {
_name = s"${namePrefix}-${counter.incrementAndGet()}"
}
}
object PersistenceSpec {
def config(plugin: String, test: String, serialization: String = "on") = ConfigFactory.parseString(
s"""
akka.actor.serialize-creators = ${serialization}
akka.actor.serialize-messages = ${serialization}
akka.persistence.publish-confirmations = on
akka.persistence.publish-plugin-commands = on
akka.persistence.journal.plugin = "akka.persistence.journal.${plugin}"
akka.persistence.journal.leveldb.dir = "target/journal-${test}"
akka.persistence.snapshot-store.local.dir = "target/snapshots-${test}/"
akka.test.single-expect-default = 10s
""")
}
trait Cleanup { this: AkkaSpec
val storageLocations = List(
"akka.persistence.journal.leveldb.dir",
"akka.persistence.journal.leveldb-shared.store.dir",
"akka.persistence.snapshot-store.local.dir").map(s new File(system.settings.config.getString(s)))
override protected def atStartup() {
storageLocations.foreach(FileUtils.deleteDirectory)
}
override protected def afterTermination() {
storageLocations.foreach(FileUtils.deleteDirectory)
}
}
abstract class NamedProcessor(name: String) extends Processor {
override def processorId: String = name
}

View file

@ -0,0 +1,163 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.stream
import scala.concurrent.duration._
import akka.actor._
import akka.persistence._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.testkit._
import akka.testkit.TestProbe
// ------------------------------------------------------------------------------------------------
// FIXME: move this file to akka-persistence-experimental once going back to project dependencies
// ------------------------------------------------------------------------------------------------
object PersistentPublisherSpec {
class TestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) {
def receive = {
case Persistent(payload, sequenceNr) probe ! s"${payload}-${sequenceNr}"
}
}
}
class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "ViewProducerSpec", serialization = "off")) with PersistenceSpec {
import PersistentPublisherSpec._
val numMessages = 10
val publisherSettings = PersistentPublisherSettings(idle = Some(100.millis))
val materializer = FlowMaterializer(MaterializerSettings())
var processor1: ActorRef = _
var processor2: ActorRef = _
var processor1Probe: TestProbe = _
var processor2Probe: TestProbe = _
def processorId(num: Int): String =
name + num
override protected def beforeEach(): Unit = {
super.beforeEach()
processor1Probe = TestProbe()
processor2Probe = TestProbe()
processor1 = system.actorOf(Props(classOf[TestProcessor], processorId(1), processor1Probe.ref))
processor2 = system.actorOf(Props(classOf[TestProcessor], processorId(2), processor2Probe.ref))
1 to numMessages foreach { i
processor1 ! Persistent("a")
processor2 ! Persistent("b")
processor1Probe.expectMsg(s"a-${i}")
processor2Probe.expectMsg(s"b-${i}")
}
}
override protected def afterEach(): Unit = {
system.stop(processor1)
system.stop(processor1)
super.afterEach()
}
"A view producer" must {
"pull existing messages from a processor's journal" in {
val streamProbe = TestProbe()
PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach {
case Persistent(payload, sequenceNr) streamProbe.ref ! s"${payload}-${sequenceNr}"
}.consume(materializer)
1 to numMessages foreach { i
streamProbe.expectMsg(s"a-${i}")
}
}
"pull existing messages and new from a processor's journal" in {
val streamProbe = TestProbe()
PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach {
case Persistent(payload, sequenceNr) streamProbe.ref ! s"${payload}-${sequenceNr}"
}.consume(materializer)
1 to numMessages foreach { i
streamProbe.expectMsg(s"a-${i}")
}
processor1 ! Persistent("a")
processor1 ! Persistent("a")
streamProbe.expectMsg(s"a-${numMessages + 1}")
streamProbe.expectMsg(s"a-${numMessages + 2}")
}
"pull existing messages from a processor's journal starting form a specified sequence number" in {
val streamProbe = TestProbe()
val fromSequenceNr = 5L
PersistentFlow.fromProcessor(processorId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr)).foreach {
case Persistent(payload, sequenceNr) streamProbe.ref ! s"${payload}-${sequenceNr}"
}.consume(materializer)
fromSequenceNr to numMessages foreach { i
streamProbe.expectMsg(s"a-${i}")
}
}
}
"A view producer" can {
"have several consumers" in {
val streamProbe1 = TestProbe()
val streamProbe2 = TestProbe()
val producer = PersistentFlow.fromProcessor(processorId(1), publisherSettings).toProducer(materializer)
Flow(producer).foreach {
case Persistent(payload, sequenceNr) streamProbe1.ref ! s"${payload}-${sequenceNr}"
}.consume(materializer)
// let consumer consume all existing messages
1 to numMessages foreach { i
streamProbe1.expectMsg(s"a-${i}")
}
// subscribe another consumer
Flow(producer).foreach {
case Persistent(payload, sequenceNr) streamProbe2.ref ! s"${payload}-${sequenceNr}"
}.consume(materializer)
// produce new messages and let both consumers handle them
1 to 2 foreach { i
processor1 ! Persistent("a")
streamProbe1.expectMsg(s"a-${numMessages + i}")
streamProbe2.expectMsg(s"a-${numMessages + i}")
}
}
}
"A consumer" can {
"consume from several view producers" in {
val streamProbe1 = TestProbe()
val streamProbe2 = TestProbe()
val fromSequenceNr1 = 7L
val fromSequenceNr2 = 3L
val producer1 = PersistentFlow.fromProcessor(processorId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr1)).toProducer(materializer)
val producer2 = PersistentFlow.fromProcessor(processorId(2), publisherSettings.copy(fromSequenceNr = fromSequenceNr2)).toProducer(materializer)
Flow(producer1).merge(producer2).foreach {
case Persistent(payload: String, sequenceNr) if (payload.startsWith("a")) streamProbe1.ref ! s"${payload}-${sequenceNr}"
case Persistent(payload: String, sequenceNr) if (payload.startsWith("b")) streamProbe2.ref ! s"${payload}-${sequenceNr}"
}.consume(materializer)
1 to numMessages foreach { i
if (i >= fromSequenceNr1) streamProbe1.expectMsg(s"a-${i}")
if (i >= fromSequenceNr2) streamProbe2.expectMsg(s"b-${i}")
}
}
}
}