From 3fbb6e6f8760a0833cf6f49a00fc69c7e2aa7895 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martynas=20Mickevi=C4=8Dius?= Date: Mon, 27 Oct 2014 10:05:53 +0200 Subject: [PATCH] =str #16041 Move PersistentSource to the latest Stream DSL * implemented as an extension of KeyedActorFlowSource --- .../stream/PersistentPublisherExample.scala | 58 -------- ...rSpec.scala => PersistentSourceSpec.scala} | 72 ++++++---- ...Publisher.scala => PersistentSource.scala} | 132 +++++++++--------- 3 files changed, 107 insertions(+), 155 deletions(-) delete mode 100644 akka-stream-tests/src/test/scala/akka/persistence/stream/PersistentPublisherExample.scala rename akka-stream-tests/src/test/scala/akka/persistence/stream/{PersistentPublisherSpec.scala => PersistentSourceSpec.scala} (66%) rename akka-stream/src/main/scala/akka/persistence/stream/{PersistentPublisher.scala => PersistentSource.scala} (56%) diff --git a/akka-stream-tests/src/test/scala/akka/persistence/stream/PersistentPublisherExample.scala b/akka-stream-tests/src/test/scala/akka/persistence/stream/PersistentPublisherExample.scala deleted file mode 100644 index dba4cc873f..0000000000 --- a/akka-stream-tests/src/test/scala/akka/persistence/stream/PersistentPublisherExample.scala +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.persistence.stream - -import org.reactivestreams.Publisher - -import akka.actor._ -import akka.persistence.PersistentActor -import akka.stream._ -import akka.stream.scaladsl._ - -// FIXME: move this file to akka-sample-persistence-scala once going back to project dependencies - -/** - * This example demonstrates how akka-persistence Views can be used as reactive-stream Publishers. A - * View-based Publisher is created with PersistentFlow.fromPersistentActor(persistenceId: String).toPublisher(). - * This Publisher produces events as they are written by its corresponding akka-persistence - * PersistentActor. The PersistentFlow object is an extension to the akka-stream DSL. - */ -object PersistentPublisherExample extends App { - implicit val system = ActorSystem("example") - - class ExamplePersistentActor(pid: String) extends PersistentActor { - override def persistenceId = pid - override def receiveCommand = { - case cmd: String ⇒ persist(cmd) { event ⇒ - // update state... - } - } - override def receiveRecover = { - case event: String ⇒ // update state... - } - } - - val p1 = system.actorOf(Props(classOf[ExamplePersistentActor], "p1")) - val p2 = system.actorOf(Props(classOf[ExamplePersistentActor], "p2")) - - implicit val materializer = FlowMaterializer() - - // 1 view-backed publisher and 2 subscribers: - val publisher1: Publisher[Any] = PersistentFlow.fromPersistentActor("p1").toPublisher() - Flow(publisher1).foreach(event ⇒ println(s"subscriber-1: $event")) - Flow(publisher1).foreach(event ⇒ println(s"subscriber-2: $event")) - - // 2 view-backed publishers (merged) and 1 subscriber: - // This is an example how message/event streams from multiple processors can be merged into a single stream. - val publisher2: Publisher[Any] = PersistentFlow.fromPersistentActor("p1").toPublisher() - val merged: Publisher[Any] = PersistentFlow.fromPersistentActor("p2").merge(publisher2).toPublisher() - Flow(merged).foreach(event ⇒ println(s"subscriber-3: $event")) - - while (true) { - p1 ! s"a-${System.currentTimeMillis()}" - p2 ! s"b-${System.currentTimeMillis()}" - Thread.sleep(500) - } -} - diff --git a/akka-stream-tests/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/persistence/stream/PersistentSourceSpec.scala similarity index 66% rename from akka-stream-tests/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala rename to akka-stream-tests/src/test/scala/akka/persistence/stream/PersistentSourceSpec.scala index 16224935dc..c51c8b8952 100644 --- a/akka-stream-tests/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/persistence/stream/PersistentSourceSpec.scala @@ -3,20 +3,25 @@ */ package akka.persistence.stream -import scala.concurrent.duration._ - -import akka.actor._ -import akka.persistence._ -import akka.stream._ -import akka.stream.scaladsl._ -import akka.stream.testkit._ +import akka.actor.ActorRef +import akka.actor.Props +import akka.persistence.RecoveryCompleted +import akka.stream.scaladsl2.FlowGraph +import akka.stream.scaladsl2.FlowGraphImplicits +import akka.stream.scaladsl2.FlowMaterializer +import akka.stream.scaladsl2.Merge +import akka.stream.scaladsl2.Sink +import akka.stream.scaladsl2.Source +import akka.stream.testkit.AkkaSpec import akka.testkit.TestProbe +import scala.concurrent.duration._ + // ------------------------------------------------------------------------------------------------ -// FIXME: move this file to akka-persistence-experimental once going back to project dependencies +// FIXME: #15964 move this file to akka-persistence-experimental once going back to project dependencies // ------------------------------------------------------------------------------------------------ -object PersistentPublisherSpec { +object PersistentSourceSpec { class TestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { override def receiveCommand = { case cmd ⇒ persist(cmd) { event ⇒ probe ! s"${event}-${lastSequenceNr}" } @@ -28,12 +33,12 @@ object PersistentPublisherSpec { } } -class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "ViewPublisherSpec", serialization = "off")) with PersistenceSpec { - import PersistentPublisherSpec._ +class PersistentSourceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "PersistentSourceSpec", serialization = "off")) with PersistenceSpec { + import PersistentSourceSpec._ val numMessages = 10 - val publisherSettings = PersistentPublisherSettings(idle = Some(100.millis)) + val sourceSettings = PersistentSourceSettings(idle = Some(100.millis)) implicit val materializer = FlowMaterializer() var persistentActor1: ActorRef = _ @@ -69,11 +74,12 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", super.afterEach() } - "A view publisher" must { + "A PersistentSource" must { + "pull existing events from a persistent actor's journal" in { val streamProbe = TestProbe() - PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings).foreach { + PersistentSource[String](persistenceId(1), sourceSettings).foreach { case event ⇒ streamProbe.ref ! event } @@ -81,10 +87,11 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", streamProbe.expectMsg(s"a$i") } } + "pull existing events and new from a persistent actor's journal" in { val streamProbe = TestProbe() - PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings).foreach { + PersistentSource[String](persistenceId(1), sourceSettings).foreach { case event ⇒ streamProbe.ref ! event } @@ -98,11 +105,12 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", streamProbe.expectMsg(s"a${numMessages + 1}") streamProbe.expectMsg(s"a${numMessages + 2}") } + "pull existing events from a persistent actor's journal starting form a specified sequence number" in { val streamProbe = TestProbe() val fromSequenceNr = 5L - PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr)).foreach { + PersistentSource[String](persistenceId(1), sourceSettings.copy(fromSequenceNr = fromSequenceNr)).foreach { case event ⇒ streamProbe.ref ! event } @@ -110,16 +118,14 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", streamProbe.expectMsg(s"a$i") } } - } - "A view publisher" can { - "have several subscribers" in { + "work with FanoutPublisher" in { val streamProbe1 = TestProbe() val streamProbe2 = TestProbe() - val publisher = PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings).toPublisher() + val publisher = PersistentSource[String](persistenceId(1), sourceSettings).runWith(Sink.fanoutPublisher(4, 16)) - Flow(publisher).foreach { + Source[String](publisher).foreach { case event ⇒ streamProbe1.ref ! event } @@ -129,7 +135,7 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", } // subscribe another subscriber - Flow(publisher).foreach { + Source[String](publisher).foreach { case event ⇒ streamProbe2.ref ! event } @@ -140,28 +146,34 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", streamProbe2.expectMsg(s"a${numMessages + i}") } } - } - "A subscriber" can { - "consume from several view publishers" in { + "work in FlowGraph" in { val streamProbe1 = TestProbe() val streamProbe2 = TestProbe() val fromSequenceNr1 = 7L val fromSequenceNr2 = 3L - val publisher1 = PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr1)).toPublisher() - val publisher2 = PersistentFlow.fromPersistentActor(persistenceId(2), publisherSettings.copy(fromSequenceNr = fromSequenceNr2)).toPublisher() + val source1 = PersistentSource[String](persistenceId(1), sourceSettings.copy(fromSequenceNr = fromSequenceNr1)) + val source2 = PersistentSource[String](persistenceId(2), sourceSettings.copy(fromSequenceNr = fromSequenceNr2)) - Flow(publisher1).merge(publisher2).foreach { - case event: String if (event.startsWith("a")) ⇒ streamProbe1.ref ! event - case event: String if (event.startsWith("b")) ⇒ streamProbe2.ref ! event + val sink = Sink.foreach[String] { + case event: String if event.startsWith("a") ⇒ streamProbe1.ref ! event + case event: String if event.startsWith("b") ⇒ streamProbe2.ref ! event } + FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + val merge = Merge[String] + source1 ~> merge ~> sink + source2 ~> merge + }.run() + 1 to numMessages foreach { i ⇒ if (i >= fromSequenceNr1) streamProbe1.expectMsg(s"a$i") if (i >= fromSequenceNr2) streamProbe2.expectMsg(s"b$i") } } } + } diff --git a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala b/akka-stream/src/main/scala/akka/persistence/stream/PersistentSource.scala similarity index 56% rename from akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala rename to akka-stream/src/main/scala/akka/persistence/stream/PersistentSource.scala index ef6dff25f7..80eec192d1 100644 --- a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala +++ b/akka-stream/src/main/scala/akka/persistence/stream/PersistentSource.scala @@ -3,100 +3,98 @@ */ package akka.persistence.stream -import scala.util.control.NonFatal -import scala.concurrent.duration._ - -import org.reactivestreams.{ Publisher, Subscriber } - import akka.actor._ import akka.persistence._ -import akka.stream._ -import akka.stream.impl._ -import akka.stream.impl.Ast.PublisherNode -import akka.stream.scaladsl.Flow +import akka.stream.MaterializerSettings +import akka.stream.impl.ActorPublisher +import akka.stream.impl.ActorSubscription +import akka.stream.impl.Cancel +import akka.stream.impl.ExposedPublisher +import akka.stream.impl.RequestMore +import akka.stream.impl.SoftShutdown +import akka.stream.impl.Stop +import akka.stream.impl.SubscribePending +import akka.stream.impl.SubscriberManagement +import akka.stream.impl2.ActorBasedFlowMaterializer +import akka.stream.scaladsl2.KeyedActorFlowSource +import org.reactivestreams.Subscriber + +import scala.concurrent.duration._ +import scala.util.control.NonFatal // ------------------------------------------------------------------------------------------------ -// FIXME: move this file to akka-persistence-experimental once going back to project dependencies +// FIXME: #15964 move this file to akka-persistence-experimental once going back to project dependencies // ------------------------------------------------------------------------------------------------ -object PersistentFlow { - /** - * Starts a new event flow from the given [[akka.persistence.PersistentActor]], - * identified by `persistenceId`. Events are pulled from the peristent actor's - * journal (using a [[akka.persistence.PersistentView]]) in accordance with the - * demand coming from the downstream transformation steps. - * - * Elements pulled from the peristent actor's journal are buffered in memory so that - * fine-grained demands (requests) from downstream can be served efficiently. - */ - def fromPersistentActor(persistenceId: String): Flow[Any] = - fromPersistentActor(persistenceId, PersistentPublisherSettings()) +/** + * Constructs a `Source` from the given [[akka.persistence.PersistentActor]], + * identified by `persistenceId`. Events are pulled from the persistent actor's + * journal (using a [[akka.persistence.PersistentView]]) in accordance with the + * demand coming from the downstream transformation steps. + * + * Elements pulled from the persistent actor's journal are buffered in memory so that + * fine-grained demands (requests) from downstream can be served efficiently. + * + * Reads from the journal are done in (coarse-grained) batches of configurable + * size (which correspond to the configurable maximum buffer size). + * + * @see [[akka.persistence.stream.PersistentSourceSettings]] + */ +final case class PersistentSource[Out](persistenceId: String, sourceSettings: PersistentSourceSettings = PersistentSourceSettings()) extends KeyedActorFlowSource[Out] { + override type MaterializedType = ActorRef - /** - * Starts a new event flow from the given [[akka.persistence.PersistentActor]], - * identified by `persistenceId`. Events are pulled from the peristent actor's - * journal (using a [[akka.persistence.PersistentView]]) in accordance with the - * demand coming from the downstream transformation steps. - * - * Elements pulled from the peristent actor's journal are buffered in memory so that - * fine-grained demands (requests) from downstream can be served efficiently. - * - * Reads from the journal are done in (coarse-grained) batches of configurable - * size (which correspond to the configurable maximum buffer size). - * - * @see [[akka.persistence.PersistentPublisherSettings]] - */ - def fromPersistentActor(persistenceId: String, publisherSettings: PersistentPublisherSettings): Flow[Any] = - FlowImpl(PersistentPublisherNode(persistenceId, publisherSettings), Nil) + override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = { + val (publisher, publisherRef) = create(materializer, flowName) + publisher.subscribe(flowSubscriber) + publisherRef + } + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = { + val publisherRef = materializer.actorOf(PersistentSourceImpl.props(persistenceId, sourceSettings, materializer.settings), name = s"$flowName-0-persistent-source") + (ActorPublisher[Out](publisherRef), publisherRef) + } } /** - * Configuration object for a persistent stream publisher. + * Configuration object for a `PersistentSource`. * * @param fromSequenceNr Sequence number where the published stream shall start (inclusive). * Default is `1L`. - * @param maxBufferSize Maximum number of persistent events to be buffered in memory (per publisher). + * @param maxBufferSize Maximum number of persistent events to be buffered in memory (per Source). * Default is `100`. * @param idle Optional duration to wait if no more persistent events can be pulled from the journal * before attempting the next pull. Default is `None` which causes the publisher to take * the value defined by the `akka.persistence.view.auto-update-interval` configuration * key. If defined, the `idle` value is taken directly. */ -case class PersistentPublisherSettings(fromSequenceNr: Long = 1L, maxBufferSize: Int = 100, idle: Option[FiniteDuration] = None) { +case class PersistentSourceSettings(fromSequenceNr: Long = 1L, maxBufferSize: Int = 100, idle: Option[FiniteDuration] = None) { require(fromSequenceNr > 0L, "fromSequenceNr must be > 0") } -private object PersistentPublisher { - def props(persistenceId: String, publisherSettings: PersistentPublisherSettings, settings: MaterializerSettings): Props = - Props(classOf[PersistentPublisherImpl], persistenceId, publisherSettings, settings).withDispatcher(settings.dispatcher) +private object PersistentSourceImpl { + def props(persistenceId: String, sourceSettings: PersistentSourceSettings, settings: MaterializerSettings): Props = + Props(classOf[PersistentSourceImpl], persistenceId, sourceSettings, settings).withDispatcher(settings.dispatcher) } -private case class PersistentPublisherNode(persistenceId: String, publisherSettings: PersistentPublisherSettings) extends PublisherNode[Any] { - def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[Any] = - ActorPublisher[Any](materializer.actorOf(PersistentPublisher.props(persistenceId, publisherSettings, materializer.settings), - name = s"$flowName-0-persistentPublisher")) -} - -private class PersistentPublisherImpl(persistenceId: String, publisherSettings: PersistentPublisherSettings, materializerSettings: MaterializerSettings) +private class PersistentSourceImpl(persistenceId: String, sourceSettings: PersistentSourceSettings, materializerSettings: MaterializerSettings) extends Actor with ActorLogging with SubscriberManagement[Any] with SoftShutdown { - import ActorBasedFlowMaterializer._ - import PersistentPublisherBuffer._ + import PersistentSourceBuffer._ type S = ActorSubscription[Any] - private val buffer = context.actorOf(Props(classOf[PersistentPublisherBuffer], persistenceId, publisherSettings, self). - withDispatcher(context.props.dispatcher), "publisherBuffer") + private val buffer = context.actorOf(Props(classOf[PersistentSourceBuffer], persistenceId, sourceSettings, self). + withDispatcher(context.props.dispatcher), "persistent-source-buffer") private var pub: ActorPublisher[Any] = _ private var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason final def receive = { - case ExposedPublisher(pub) ⇒ - this.pub = pub.asInstanceOf[ActorPublisher[Any]] + case ExposedPublisher(publisher) ⇒ + pub = publisher.asInstanceOf[ActorPublisher[Any]] context.become(waitingForSubscribers) } @@ -117,8 +115,8 @@ private class PersistentPublisherImpl(persistenceId: String, publisherSettings: try { ps.foreach(pushToDownstream) } catch { - case Stop ⇒ { completeDownstream(); shutdownReason = None } - case NonFatal(e) ⇒ { abortDownstream(e); shutdownReason = Some(e) } + case Stop ⇒ completeDownstream(); shutdownReason = None + case NonFatal(e) ⇒ abortDownstream(e); shutdownReason = Some(e) } } @@ -150,7 +148,7 @@ private class PersistentPublisherImpl(persistenceId: String, publisherSettings: } } -private object PersistentPublisherBuffer { +private object PersistentSourceBuffer { case class Request(n: Long) case class Response(events: Vector[Any]) @@ -159,13 +157,13 @@ private object PersistentPublisherBuffer { } /** - * A view that buffers up to `publisherSettings.maxBufferSize` persistent events in memory. + * A view that buffers up to `sourceSettings.maxBufferSize` persistent events in memory. * Downstream demands (requests) are served if the buffer is non-empty either while filling * the buffer or after having filled the buffer. When the buffer becomes empty new persistent - * events are loaded from the journal (in batches up to `publisherSettings.maxBufferSize`). + * events are loaded from the journal (in batches up to `sourceSettings.maxBufferSize`). */ -private class PersistentPublisherBuffer(override val persistenceId: String, publisherSettings: PersistentPublisherSettings, publisher: ActorRef) extends PersistentView { - import PersistentPublisherBuffer._ +private class PersistentSourceBuffer(override val persistenceId: String, sourceSettings: PersistentSourceSettings, publisher: ActorRef) extends PersistentView { + import PersistentSourceBuffer._ import context.dispatcher private var replayed = 0L @@ -216,13 +214,13 @@ private class PersistentPublisherBuffer(override val persistenceId: String, publ } override def lastSequenceNr: Long = - math.max(publisherSettings.fromSequenceNr - 1L, super.lastSequenceNr) + math.max(sourceSettings.fromSequenceNr - 1L, super.lastSequenceNr) override def autoUpdateInterval: FiniteDuration = - publisherSettings.idle.getOrElse(super.autoUpdateInterval) + sourceSettings.idle.getOrElse(super.autoUpdateInterval) override def autoUpdateReplayMax: Long = - publisherSettings.maxBufferSize + sourceSettings.maxBufferSize override def autoUpdate: Boolean = false