=str #16041 Move PersistentSource to the latest Stream DSL

* implemented as an extension of KeyedActorFlowSource
This commit is contained in:
Martynas Mickevičius 2014-10-27 10:05:53 +02:00
parent a5ecb93d7a
commit 3fbb6e6f87
3 changed files with 107 additions and 155 deletions

View file

@ -1,58 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.stream
import org.reactivestreams.Publisher
import akka.actor._
import akka.persistence.PersistentActor
import akka.stream._
import akka.stream.scaladsl._
// FIXME: move this file to akka-sample-persistence-scala once going back to project dependencies
/**
* This example demonstrates how akka-persistence Views can be used as reactive-stream Publishers. A
* View-based Publisher is created with PersistentFlow.fromPersistentActor(persistenceId: String).toPublisher().
* This Publisher produces events as they are written by its corresponding akka-persistence
* PersistentActor. The PersistentFlow object is an extension to the akka-stream DSL.
*/
object PersistentPublisherExample extends App {
implicit val system = ActorSystem("example")
class ExamplePersistentActor(pid: String) extends PersistentActor {
override def persistenceId = pid
override def receiveCommand = {
case cmd: String persist(cmd) { event
// update state...
}
}
override def receiveRecover = {
case event: String // update state...
}
}
val p1 = system.actorOf(Props(classOf[ExamplePersistentActor], "p1"))
val p2 = system.actorOf(Props(classOf[ExamplePersistentActor], "p2"))
implicit val materializer = FlowMaterializer()
// 1 view-backed publisher and 2 subscribers:
val publisher1: Publisher[Any] = PersistentFlow.fromPersistentActor("p1").toPublisher()
Flow(publisher1).foreach(event println(s"subscriber-1: $event"))
Flow(publisher1).foreach(event println(s"subscriber-2: $event"))
// 2 view-backed publishers (merged) and 1 subscriber:
// This is an example how message/event streams from multiple processors can be merged into a single stream.
val publisher2: Publisher[Any] = PersistentFlow.fromPersistentActor("p1").toPublisher()
val merged: Publisher[Any] = PersistentFlow.fromPersistentActor("p2").merge(publisher2).toPublisher()
Flow(merged).foreach(event println(s"subscriber-3: $event"))
while (true) {
p1 ! s"a-${System.currentTimeMillis()}"
p2 ! s"b-${System.currentTimeMillis()}"
Thread.sleep(500)
}
}

View file

@ -3,20 +3,25 @@
*/
package akka.persistence.stream
import scala.concurrent.duration._
import akka.actor._
import akka.persistence._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.testkit._
import akka.actor.ActorRef
import akka.actor.Props
import akka.persistence.RecoveryCompleted
import akka.stream.scaladsl2.FlowGraph
import akka.stream.scaladsl2.FlowGraphImplicits
import akka.stream.scaladsl2.FlowMaterializer
import akka.stream.scaladsl2.Merge
import akka.stream.scaladsl2.Sink
import akka.stream.scaladsl2.Source
import akka.stream.testkit.AkkaSpec
import akka.testkit.TestProbe
import scala.concurrent.duration._
// ------------------------------------------------------------------------------------------------
// FIXME: move this file to akka-persistence-experimental once going back to project dependencies
// FIXME: #15964 move this file to akka-persistence-experimental once going back to project dependencies
// ------------------------------------------------------------------------------------------------
object PersistentPublisherSpec {
object PersistentSourceSpec {
class TestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) {
override def receiveCommand = {
case cmd persist(cmd) { event probe ! s"${event}-${lastSequenceNr}" }
@ -28,12 +33,12 @@ object PersistentPublisherSpec {
}
}
class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "ViewPublisherSpec", serialization = "off")) with PersistenceSpec {
import PersistentPublisherSpec._
class PersistentSourceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "PersistentSourceSpec", serialization = "off")) with PersistenceSpec {
import PersistentSourceSpec._
val numMessages = 10
val publisherSettings = PersistentPublisherSettings(idle = Some(100.millis))
val sourceSettings = PersistentSourceSettings(idle = Some(100.millis))
implicit val materializer = FlowMaterializer()
var persistentActor1: ActorRef = _
@ -69,11 +74,12 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb",
super.afterEach()
}
"A view publisher" must {
"A PersistentSource" must {
"pull existing events from a persistent actor's journal" in {
val streamProbe = TestProbe()
PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings).foreach {
PersistentSource[String](persistenceId(1), sourceSettings).foreach {
case event streamProbe.ref ! event
}
@ -81,10 +87,11 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb",
streamProbe.expectMsg(s"a$i")
}
}
"pull existing events and new from a persistent actor's journal" in {
val streamProbe = TestProbe()
PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings).foreach {
PersistentSource[String](persistenceId(1), sourceSettings).foreach {
case event streamProbe.ref ! event
}
@ -98,11 +105,12 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb",
streamProbe.expectMsg(s"a${numMessages + 1}")
streamProbe.expectMsg(s"a${numMessages + 2}")
}
"pull existing events from a persistent actor's journal starting form a specified sequence number" in {
val streamProbe = TestProbe()
val fromSequenceNr = 5L
PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr)).foreach {
PersistentSource[String](persistenceId(1), sourceSettings.copy(fromSequenceNr = fromSequenceNr)).foreach {
case event streamProbe.ref ! event
}
@ -110,16 +118,14 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb",
streamProbe.expectMsg(s"a$i")
}
}
}
"A view publisher" can {
"have several subscribers" in {
"work with FanoutPublisher" in {
val streamProbe1 = TestProbe()
val streamProbe2 = TestProbe()
val publisher = PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings).toPublisher()
val publisher = PersistentSource[String](persistenceId(1), sourceSettings).runWith(Sink.fanoutPublisher(4, 16))
Flow(publisher).foreach {
Source[String](publisher).foreach {
case event streamProbe1.ref ! event
}
@ -129,7 +135,7 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb",
}
// subscribe another subscriber
Flow(publisher).foreach {
Source[String](publisher).foreach {
case event streamProbe2.ref ! event
}
@ -140,28 +146,34 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb",
streamProbe2.expectMsg(s"a${numMessages + i}")
}
}
}
"A subscriber" can {
"consume from several view publishers" in {
"work in FlowGraph" in {
val streamProbe1 = TestProbe()
val streamProbe2 = TestProbe()
val fromSequenceNr1 = 7L
val fromSequenceNr2 = 3L
val publisher1 = PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr1)).toPublisher()
val publisher2 = PersistentFlow.fromPersistentActor(persistenceId(2), publisherSettings.copy(fromSequenceNr = fromSequenceNr2)).toPublisher()
val source1 = PersistentSource[String](persistenceId(1), sourceSettings.copy(fromSequenceNr = fromSequenceNr1))
val source2 = PersistentSource[String](persistenceId(2), sourceSettings.copy(fromSequenceNr = fromSequenceNr2))
Flow(publisher1).merge(publisher2).foreach {
case event: String if (event.startsWith("a")) streamProbe1.ref ! event
case event: String if (event.startsWith("b")) streamProbe2.ref ! event
val sink = Sink.foreach[String] {
case event: String if event.startsWith("a") streamProbe1.ref ! event
case event: String if event.startsWith("b") streamProbe2.ref ! event
}
FlowGraph { implicit b
import FlowGraphImplicits._
val merge = Merge[String]
source1 ~> merge ~> sink
source2 ~> merge
}.run()
1 to numMessages foreach { i
if (i >= fromSequenceNr1) streamProbe1.expectMsg(s"a$i")
if (i >= fromSequenceNr2) streamProbe2.expectMsg(s"b$i")
}
}
}
}

View file

@ -3,100 +3,98 @@
*/
package akka.persistence.stream
import scala.util.control.NonFatal
import scala.concurrent.duration._
import org.reactivestreams.{ Publisher, Subscriber }
import akka.actor._
import akka.persistence._
import akka.stream._
import akka.stream.impl._
import akka.stream.impl.Ast.PublisherNode
import akka.stream.scaladsl.Flow
import akka.stream.MaterializerSettings
import akka.stream.impl.ActorPublisher
import akka.stream.impl.ActorSubscription
import akka.stream.impl.Cancel
import akka.stream.impl.ExposedPublisher
import akka.stream.impl.RequestMore
import akka.stream.impl.SoftShutdown
import akka.stream.impl.Stop
import akka.stream.impl.SubscribePending
import akka.stream.impl.SubscriberManagement
import akka.stream.impl2.ActorBasedFlowMaterializer
import akka.stream.scaladsl2.KeyedActorFlowSource
import org.reactivestreams.Subscriber
import scala.concurrent.duration._
import scala.util.control.NonFatal
// ------------------------------------------------------------------------------------------------
// FIXME: move this file to akka-persistence-experimental once going back to project dependencies
// FIXME: #15964 move this file to akka-persistence-experimental once going back to project dependencies
// ------------------------------------------------------------------------------------------------
object PersistentFlow {
/**
* Starts a new event flow from the given [[akka.persistence.PersistentActor]],
* identified by `persistenceId`. Events are pulled from the peristent actor's
* journal (using a [[akka.persistence.PersistentView]]) in accordance with the
* demand coming from the downstream transformation steps.
*
* Elements pulled from the peristent actor's journal are buffered in memory so that
* fine-grained demands (requests) from downstream can be served efficiently.
*/
def fromPersistentActor(persistenceId: String): Flow[Any] =
fromPersistentActor(persistenceId, PersistentPublisherSettings())
/**
* Constructs a `Source` from the given [[akka.persistence.PersistentActor]],
* identified by `persistenceId`. Events are pulled from the persistent actor's
* journal (using a [[akka.persistence.PersistentView]]) in accordance with the
* demand coming from the downstream transformation steps.
*
* Elements pulled from the persistent actor's journal are buffered in memory so that
* fine-grained demands (requests) from downstream can be served efficiently.
*
* Reads from the journal are done in (coarse-grained) batches of configurable
* size (which correspond to the configurable maximum buffer size).
*
* @see [[akka.persistence.stream.PersistentSourceSettings]]
*/
final case class PersistentSource[Out](persistenceId: String, sourceSettings: PersistentSourceSettings = PersistentSourceSettings()) extends KeyedActorFlowSource[Out] {
override type MaterializedType = ActorRef
/**
* Starts a new event flow from the given [[akka.persistence.PersistentActor]],
* identified by `persistenceId`. Events are pulled from the peristent actor's
* journal (using a [[akka.persistence.PersistentView]]) in accordance with the
* demand coming from the downstream transformation steps.
*
* Elements pulled from the peristent actor's journal are buffered in memory so that
* fine-grained demands (requests) from downstream can be served efficiently.
*
* Reads from the journal are done in (coarse-grained) batches of configurable
* size (which correspond to the configurable maximum buffer size).
*
* @see [[akka.persistence.PersistentPublisherSettings]]
*/
def fromPersistentActor(persistenceId: String, publisherSettings: PersistentPublisherSettings): Flow[Any] =
FlowImpl(PersistentPublisherNode(persistenceId, publisherSettings), Nil)
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = {
val (publisher, publisherRef) = create(materializer, flowName)
publisher.subscribe(flowSubscriber)
publisherRef
}
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = {
val publisherRef = materializer.actorOf(PersistentSourceImpl.props(persistenceId, sourceSettings, materializer.settings), name = s"$flowName-0-persistent-source")
(ActorPublisher[Out](publisherRef), publisherRef)
}
}
/**
* Configuration object for a persistent stream publisher.
* Configuration object for a `PersistentSource`.
*
* @param fromSequenceNr Sequence number where the published stream shall start (inclusive).
* Default is `1L`.
* @param maxBufferSize Maximum number of persistent events to be buffered in memory (per publisher).
* @param maxBufferSize Maximum number of persistent events to be buffered in memory (per Source).
* Default is `100`.
* @param idle Optional duration to wait if no more persistent events can be pulled from the journal
* before attempting the next pull. Default is `None` which causes the publisher to take
* the value defined by the `akka.persistence.view.auto-update-interval` configuration
* key. If defined, the `idle` value is taken directly.
*/
case class PersistentPublisherSettings(fromSequenceNr: Long = 1L, maxBufferSize: Int = 100, idle: Option[FiniteDuration] = None) {
case class PersistentSourceSettings(fromSequenceNr: Long = 1L, maxBufferSize: Int = 100, idle: Option[FiniteDuration] = None) {
require(fromSequenceNr > 0L, "fromSequenceNr must be > 0")
}
private object PersistentPublisher {
def props(persistenceId: String, publisherSettings: PersistentPublisherSettings, settings: MaterializerSettings): Props =
Props(classOf[PersistentPublisherImpl], persistenceId, publisherSettings, settings).withDispatcher(settings.dispatcher)
private object PersistentSourceImpl {
def props(persistenceId: String, sourceSettings: PersistentSourceSettings, settings: MaterializerSettings): Props =
Props(classOf[PersistentSourceImpl], persistenceId, sourceSettings, settings).withDispatcher(settings.dispatcher)
}
private case class PersistentPublisherNode(persistenceId: String, publisherSettings: PersistentPublisherSettings) extends PublisherNode[Any] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[Any] =
ActorPublisher[Any](materializer.actorOf(PersistentPublisher.props(persistenceId, publisherSettings, materializer.settings),
name = s"$flowName-0-persistentPublisher"))
}
private class PersistentPublisherImpl(persistenceId: String, publisherSettings: PersistentPublisherSettings, materializerSettings: MaterializerSettings)
private class PersistentSourceImpl(persistenceId: String, sourceSettings: PersistentSourceSettings, materializerSettings: MaterializerSettings)
extends Actor
with ActorLogging
with SubscriberManagement[Any]
with SoftShutdown {
import ActorBasedFlowMaterializer._
import PersistentPublisherBuffer._
import PersistentSourceBuffer._
type S = ActorSubscription[Any]
private val buffer = context.actorOf(Props(classOf[PersistentPublisherBuffer], persistenceId, publisherSettings, self).
withDispatcher(context.props.dispatcher), "publisherBuffer")
private val buffer = context.actorOf(Props(classOf[PersistentSourceBuffer], persistenceId, sourceSettings, self).
withDispatcher(context.props.dispatcher), "persistent-source-buffer")
private var pub: ActorPublisher[Any] = _
private var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason
final def receive = {
case ExposedPublisher(pub)
this.pub = pub.asInstanceOf[ActorPublisher[Any]]
case ExposedPublisher(publisher)
pub = publisher.asInstanceOf[ActorPublisher[Any]]
context.become(waitingForSubscribers)
}
@ -117,8 +115,8 @@ private class PersistentPublisherImpl(persistenceId: String, publisherSettings:
try {
ps.foreach(pushToDownstream)
} catch {
case Stop { completeDownstream(); shutdownReason = None }
case NonFatal(e) { abortDownstream(e); shutdownReason = Some(e) }
case Stop completeDownstream(); shutdownReason = None
case NonFatal(e) abortDownstream(e); shutdownReason = Some(e)
}
}
@ -150,7 +148,7 @@ private class PersistentPublisherImpl(persistenceId: String, publisherSettings:
}
}
private object PersistentPublisherBuffer {
private object PersistentSourceBuffer {
case class Request(n: Long)
case class Response(events: Vector[Any])
@ -159,13 +157,13 @@ private object PersistentPublisherBuffer {
}
/**
* A view that buffers up to `publisherSettings.maxBufferSize` persistent events in memory.
* A view that buffers up to `sourceSettings.maxBufferSize` persistent events in memory.
* Downstream demands (requests) are served if the buffer is non-empty either while filling
* the buffer or after having filled the buffer. When the buffer becomes empty new persistent
* events are loaded from the journal (in batches up to `publisherSettings.maxBufferSize`).
* events are loaded from the journal (in batches up to `sourceSettings.maxBufferSize`).
*/
private class PersistentPublisherBuffer(override val persistenceId: String, publisherSettings: PersistentPublisherSettings, publisher: ActorRef) extends PersistentView {
import PersistentPublisherBuffer._
private class PersistentSourceBuffer(override val persistenceId: String, sourceSettings: PersistentSourceSettings, publisher: ActorRef) extends PersistentView {
import PersistentSourceBuffer._
import context.dispatcher
private var replayed = 0L
@ -216,13 +214,13 @@ private class PersistentPublisherBuffer(override val persistenceId: String, publ
}
override def lastSequenceNr: Long =
math.max(publisherSettings.fromSequenceNr - 1L, super.lastSequenceNr)
math.max(sourceSettings.fromSequenceNr - 1L, super.lastSequenceNr)
override def autoUpdateInterval: FiniteDuration =
publisherSettings.idle.getOrElse(super.autoUpdateInterval)
sourceSettings.idle.getOrElse(super.autoUpdateInterval)
override def autoUpdateReplayMax: Long =
publisherSettings.maxBufferSize
sourceSettings.maxBufferSize
override def autoUpdate: Boolean =
false