From 5c467750ec801a45cff1c34222c7324108581bfa Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 22 Aug 2014 09:52:32 +0200 Subject: [PATCH] !str #15705 Update to Akka 2.3.5, and new akka-persistent api * 2.3.5 dependency * PersistentPublisher is using PersistentView, and is producing the raw events instead of the Persistent wrappers --- .../stream/PersistentPublisher.scala | 79 +++++++------- .../persistence/stream/PersistenceSpec.scala | 8 +- .../stream/PersistentPublisherExample.scala | 59 ++++++++++ .../stream/PersistentPublisherSpec.scala | 102 +++++++++--------- 4 files changed, 157 insertions(+), 91 deletions(-) create mode 100644 akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherExample.scala diff --git a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala index 94d3ed88de..261a1d3db3 100644 --- a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala +++ b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala @@ -21,42 +21,43 @@ import akka.stream.scaladsl.Flow object PersistentFlow { /** - * Starts a new [[akka.persistence.Persistent]] message flow from the given processor, - * identified by `processorId`. Elements are pulled from the processor's - * journal (using a [[akka.persistence.View]]) in accordance with the demand coming from - * the downstream transformation steps. + * Starts a new event flow from the given [[akka.persistence.PersistentActor]], + * identified by `persistenceId`. Events are pulled from the peristent actor's + * journal (using a [[akka.persistence.PersistentView]]) in accordance with the + * demand coming from the downstream transformation steps. * - * Elements pulled from the processor's journal are buffered in memory so that + * Elements pulled from the peristent actor's journal are buffered in memory so that * fine-grained demands (requests) from downstream can be served efficiently. */ - def fromProcessor(processorId: String): Flow[Persistent] = - fromProcessor(processorId, PersistentPublisherSettings()) + def fromPersistentActor(persistenceId: String): Flow[Any] = + fromPersistentActor(persistenceId, PersistentPublisherSettings()) /** - * Starts a new [[akka.persistence.Persistent]] message flow from the given processor, - * identified by `processorId`. Elements are pulled from the processor's - * journal (using a [[akka.persistence.View]]) in accordance with the demand coming from - * the downstream transformation steps. + * Starts a new event flow from the given [[akka.persistence.PersistentActor]], + * identified by `persistenceId`. Events are pulled from the peristent actor's + * journal (using a [[akka.persistence.PersistentView]]) in accordance with the + * demand coming from the downstream transformation steps. * - * Elements pulled from the processor's journal are buffered in memory so that + * Elements pulled from the peristent actor's journal are buffered in memory so that * fine-grained demands (requests) from downstream can be served efficiently. + * * Reads from the journal are done in (coarse-grained) batches of configurable * size (which correspond to the configurable maximum buffer size). * * @see [[akka.persistence.PersistentPublisherSettings]] */ - def fromProcessor(processorId: String, publisherSettings: PersistentPublisherSettings): Flow[Persistent] = - FlowImpl(PersistentPublisherNode(processorId, publisherSettings), Nil) + def fromPersistentActor(persistenceId: String, publisherSettings: PersistentPublisherSettings): Flow[Any] = + FlowImpl(PersistentPublisherNode(persistenceId, publisherSettings), Nil) } /** - * Configuration object for a [[akka.persistence.Persistent]] stream publisher. + * Configuration object for a persistent stream publisher. * * @param fromSequenceNr Sequence number where the published stream shall start (inclusive). * Default is `1L`. - * @param maxBufferSize Maximum number of persistent messages to be buffered in memory (per publisher). + * @param maxBufferSize Maximum number of persistent events to be buffered in memory (per publisher). * Default is `100`. - * @param idle Optional duration to wait if no more persistent messages can be pulled from the journal + * @param idle Optional duration to wait if no more persistent events can be pulled from the journal * before attempting the next pull. Default is `None` which causes the publisher to take * the value defined by the `akka.persistence.view.auto-update-interval` configuration * key. If defined, the `idle` value is taken directly. @@ -66,36 +67,36 @@ case class PersistentPublisherSettings(fromSequenceNr: Long = 1L, maxBufferSize: } private object PersistentPublisher { - def props(processorId: String, publisherSettings: PersistentPublisherSettings, settings: MaterializerSettings): Props = - Props(classOf[PersistentPublisherImpl], processorId, publisherSettings, settings).withDispatcher(settings.dispatcher) + def props(persistenceId: String, publisherSettings: PersistentPublisherSettings, settings: MaterializerSettings): Props = + Props(classOf[PersistentPublisherImpl], persistenceId, publisherSettings, settings).withDispatcher(settings.dispatcher) } -private case class PersistentPublisherNode(processorId: String, publisherSettings: PersistentPublisherSettings) extends PublisherNode[Persistent] { - def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[Persistent] = - ActorPublisher[Persistent](materializer.actorOf(PersistentPublisher.props(processorId, publisherSettings, materializer.settings), +private case class PersistentPublisherNode(persistenceId: String, publisherSettings: PersistentPublisherSettings) extends PublisherNode[Any] { + def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[Any] = + ActorPublisher[Any](materializer.actorOf(PersistentPublisher.props(persistenceId, publisherSettings, materializer.settings), name = s"$flowName-0-persistentPublisher")) } -private class PersistentPublisherImpl(processorId: String, publisherSettings: PersistentPublisherSettings, materializerSettings: MaterializerSettings) +private class PersistentPublisherImpl(persistenceId: String, publisherSettings: PersistentPublisherSettings, materializerSettings: MaterializerSettings) extends Actor with ActorLogging - with SubscriberManagement[Persistent] + with SubscriberManagement[Any] with SoftShutdown { import ActorBasedFlowMaterializer._ import PersistentPublisherBuffer._ - type S = ActorSubscription[Persistent] + type S = ActorSubscription[Any] - private val buffer = context.actorOf(Props(classOf[PersistentPublisherBuffer], processorId, publisherSettings, self). + private val buffer = context.actorOf(Props(classOf[PersistentPublisherBuffer], persistenceId, publisherSettings, self). withDispatcher(context.props.dispatcher), "publisherBuffer") - private var pub: ActorPublisher[Persistent] = _ + private var pub: ActorPublisher[Any] = _ private var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason final def receive = { case ExposedPublisher(pub) ⇒ - this.pub = pub.asInstanceOf[ActorPublisher[Persistent]] + this.pub = pub.asInstanceOf[ActorPublisher[Any]] context.become(waitingForSubscribers) } @@ -130,7 +131,7 @@ private class PersistentPublisherImpl(processorId: String, publisherSettings: Pe override def maxBufferSize = materializerSettings.maxFanOutBufferSize - override def createSubscription(subscriber: Subscriber[Persistent]): ActorSubscription[Persistent] = + override def createSubscription(subscriber: Subscriber[Any]): ActorSubscription[Any] = new ActorSubscription(self, subscriber) override def cancelUpstream(): Unit = { @@ -151,31 +152,29 @@ private class PersistentPublisherImpl(processorId: String, publisherSettings: Pe private object PersistentPublisherBuffer { case class Request(num: Int) - case class Response(messages: Vector[Persistent]) + case class Response(events: Vector[Any]) case object Fill case object Filled } /** - * A view that buffers up to `publisherSettings.maxBufferSize` persistent messages in memory. + * A view that buffers up to `publisherSettings.maxBufferSize` persistent events in memory. * Downstream demands (requests) are served if the buffer is non-empty either while filling * the buffer or after having filled the buffer. When the buffer becomes empty new persistent - * messages are loaded from the journal (in batches up to `publisherSettings.maxBufferSize`). + * events are loaded from the journal (in batches up to `publisherSettings.maxBufferSize`). */ -private class PersistentPublisherBuffer(override val processorId: String, publisherSettings: PersistentPublisherSettings, publisher: ActorRef) extends View { +private class PersistentPublisherBuffer(override val persistenceId: String, publisherSettings: PersistentPublisherSettings, publisher: ActorRef) extends PersistentView { import PersistentPublisherBuffer._ import context.dispatcher private var replayed = 0 private var requested = 0 - private var buffer: Vector[Persistent] = Vector.empty + private var buffer: Vector[Any] = Vector.empty + + override def viewId: String = persistenceId + "-stream-view" private val filling: Receive = { - case p: Persistent ⇒ - buffer :+= p - replayed += 1 - if (requested > 0) respond(requested) case Filled ⇒ if (buffer.nonEmpty && requested > 0) respond(requested) if (buffer.nonEmpty) pause() @@ -184,6 +183,10 @@ private class PersistentPublisherBuffer(override val processorId: String, publis case Request(num) ⇒ requested += num if (buffer.nonEmpty) respond(requested) + case persistentEvent ⇒ + buffer :+= persistentEvent + replayed += 1 + if (requested > 0) respond(requested) } private val pausing: Receive = { diff --git a/akka-stream/src/test/scala/akka/persistence/stream/PersistenceSpec.scala b/akka-stream/src/test/scala/akka/persistence/stream/PersistenceSpec.scala index 8096afa8b6..547cefafbf 100644 --- a/akka-stream/src/test/scala/akka/persistence/stream/PersistenceSpec.scala +++ b/akka-stream/src/test/scala/akka/persistence/stream/PersistenceSpec.scala @@ -39,9 +39,9 @@ trait PersistenceSpec extends BeforeAndAfterEach with Cleanup { this: AkkaSpec def namePrefix: String = system.name /** - * Creates a processor with current name as constructor argument. + * Creates a persistent actor with current name as constructor argument. */ - def namedProcessor[T <: NamedProcessor: ClassTag] = + def namedPersistentActor[T <: NamedPersistentActor: ClassTag] = system.actorOf(Props(implicitly[ClassTag[T]].runtimeClass, name)) override protected def beforeEach() { @@ -78,6 +78,6 @@ trait Cleanup { this: AkkaSpec ⇒ } } -abstract class NamedProcessor(name: String) extends Processor { - override def processorId: String = name +abstract class NamedPersistentActor(name: String) extends PersistentActor { + override def persistenceId: String = name } diff --git a/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherExample.scala b/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherExample.scala new file mode 100644 index 0000000000..1ef8d78061 --- /dev/null +++ b/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherExample.scala @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.persistence.stream + +import org.reactivestreams.Publisher + +import akka.actor._ +import akka.persistence.PersistentActor +import akka.persistence.stream.PersistentFlow +import akka.stream._ +import akka.stream.scaladsl._ + +// FIXME: move this file to akka-sample-persistence-scala once going back to project dependencies + +/** + * This example demonstrates how akka-persistence Views can be used as reactive-stream Publishers. A + * View-based Publisher is created with PersistentFlow.fromPersistentActor(persistenceId: String).toPublisher(). + * This Publisher produces events as they are written by its corresponding akka-persistence + * PersistentActor. The PersistentFlow object is an extension to the akka-stream DSL. + */ +object PersistentPublisherExample extends App { + implicit val system = ActorSystem("example") + + class ExamplePersistentActor(pid: String) extends PersistentActor { + override def persistenceId = pid + override def receiveCommand = { + case cmd: String ⇒ persist(cmd) { event ⇒ + // update state... + } + } + override def receiveRecover = { + case event: String ⇒ // update state... + } + } + + val p1 = system.actorOf(Props(classOf[ExamplePersistentActor], "p1")) + val p2 = system.actorOf(Props(classOf[ExamplePersistentActor], "p2")) + + implicit val materializer = FlowMaterializer(MaterializerSettings()) + + // 1 view-backed publisher and 2 subscribers: + val publisher1: Publisher[Any] = PersistentFlow.fromPersistentActor("p1").toPublisher() + Flow(publisher1).foreach(event ⇒ println(s"subscriber-1: $event")) + Flow(publisher1).foreach(event ⇒ println(s"subscriber-2: $event")) + + // 2 view-backed publishers (merged) and 1 subscriber: + // This is an example how message/event streams from multiple processors can be merged into a single stream. + val publisher2: Publisher[Any] = PersistentFlow.fromPersistentActor("p1").toPublisher() + val merged: Publisher[Any] = PersistentFlow.fromPersistentActor("p2").merge(publisher2).toPublisher() + Flow(merged).foreach(event ⇒ println(s"subscriber-3: $event")) + + while (true) { + p1 ! s"a-${System.currentTimeMillis()}" + p2 ! s"b-${System.currentTimeMillis()}" + Thread.sleep(500) + } +} + diff --git a/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala b/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala index b6e1819c44..25aefbc85c 100644 --- a/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala +++ b/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala @@ -17,9 +17,13 @@ import akka.testkit.TestProbe // ------------------------------------------------------------------------------------------------ object PersistentPublisherSpec { - class TestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) { - def receive = { - case Persistent(payload, sequenceNr) ⇒ probe ! s"${payload}-${sequenceNr}" + class TestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { + override def receiveCommand = { + case cmd ⇒ persist(cmd) { event ⇒ probe ! s"${event}-${lastSequenceNr}" } + } + override def receiveRecover = { + case RecoveryCompleted ⇒ // ignore + case event ⇒ probe ! s"${event}-${lastSequenceNr}" } } } @@ -32,78 +36,78 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", val publisherSettings = PersistentPublisherSettings(idle = Some(100.millis)) implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) - var processor1: ActorRef = _ - var processor2: ActorRef = _ + var persistentActor1: ActorRef = _ + var persistentActor2: ActorRef = _ - var processor1Probe: TestProbe = _ - var processor2Probe: TestProbe = _ + var persistentActor1Probe: TestProbe = _ + var persistentActor2Probe: TestProbe = _ - def processorId(num: Int): String = + def persistenceId(num: Int): String = name + num override protected def beforeEach(): Unit = { super.beforeEach() - processor1Probe = TestProbe() - processor2Probe = TestProbe() + persistentActor1Probe = TestProbe() + persistentActor2Probe = TestProbe() - processor1 = system.actorOf(Props(classOf[TestProcessor], processorId(1), processor1Probe.ref)) - processor2 = system.actorOf(Props(classOf[TestProcessor], processorId(2), processor2Probe.ref)) + persistentActor1 = system.actorOf(Props(classOf[TestPersistentActor], persistenceId(1), persistentActor1Probe.ref)) + persistentActor2 = system.actorOf(Props(classOf[TestPersistentActor], persistenceId(2), persistentActor2Probe.ref)) 1 to numMessages foreach { i ⇒ - processor1 ! Persistent("a") - processor2 ! Persistent("b") + persistentActor1 ! ("a" + i) + persistentActor2 ! ("b" + i) - processor1Probe.expectMsg(s"a-${i}") - processor2Probe.expectMsg(s"b-${i}") + persistentActor1Probe.expectMsg(s"a$i-$i") + persistentActor2Probe.expectMsg(s"b$i-$i") } } override protected def afterEach(): Unit = { - system.stop(processor1) - system.stop(processor1) + system.stop(persistentActor1) + system.stop(persistentActor1) super.afterEach() } "A view publisher" must { - "pull existing messages from a processor's journal" in { + "pull existing events from a persistent actor's journal" in { val streamProbe = TestProbe() - PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach { - case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}" + PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings).foreach { + case event ⇒ streamProbe.ref ! event } 1 to numMessages foreach { i ⇒ - streamProbe.expectMsg(s"a-${i}") + streamProbe.expectMsg(s"a$i") } } - "pull existing messages and new from a processor's journal" in { + "pull existing events and new from a persistent actor's journal" in { val streamProbe = TestProbe() - PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach { - case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}" + PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings).foreach { + case event ⇒ streamProbe.ref ! event } 1 to numMessages foreach { i ⇒ - streamProbe.expectMsg(s"a-${i}") + streamProbe.expectMsg(s"a$i") } - processor1 ! Persistent("a") - processor1 ! Persistent("a") + persistentActor1 ! s"a${numMessages + 1}" + persistentActor1 ! s"a${numMessages + 2}" - streamProbe.expectMsg(s"a-${numMessages + 1}") - streamProbe.expectMsg(s"a-${numMessages + 2}") + streamProbe.expectMsg(s"a${numMessages + 1}") + streamProbe.expectMsg(s"a${numMessages + 2}") } - "pull existing messages from a processor's journal starting form a specified sequence number" in { + "pull existing events from a persistent actor's journal starting form a specified sequence number" in { val streamProbe = TestProbe() val fromSequenceNr = 5L - PersistentFlow.fromProcessor(processorId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr)).foreach { - case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}" + PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr)).foreach { + case event ⇒ streamProbe.ref ! event } fromSequenceNr to numMessages foreach { i ⇒ - streamProbe.expectMsg(s"a-${i}") + streamProbe.expectMsg(s"a$i") } } } @@ -113,27 +117,27 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", val streamProbe1 = TestProbe() val streamProbe2 = TestProbe() - val publisher = PersistentFlow.fromProcessor(processorId(1), publisherSettings).toPublisher() + val publisher = PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings).toPublisher() Flow(publisher).foreach { - case Persistent(payload, sequenceNr) ⇒ streamProbe1.ref ! s"${payload}-${sequenceNr}" + case event ⇒ streamProbe1.ref ! event } - // let subscriber consume all existing messages + // let subscriber consume all existing events 1 to numMessages foreach { i ⇒ - streamProbe1.expectMsg(s"a-${i}") + streamProbe1.expectMsg(s"a$i") } // subscribe another subscriber Flow(publisher).foreach { - case Persistent(payload, sequenceNr) ⇒ streamProbe2.ref ! s"${payload}-${sequenceNr}" + case event ⇒ streamProbe2.ref ! event } - // produce new messages and let both subscribers handle them + // produce new events and let both subscribers handle them 1 to 2 foreach { i ⇒ - processor1 ! Persistent("a") - streamProbe1.expectMsg(s"a-${numMessages + i}") - streamProbe2.expectMsg(s"a-${numMessages + i}") + persistentActor1 ! s"a${numMessages + i}" + streamProbe1.expectMsg(s"a${numMessages + i}") + streamProbe2.expectMsg(s"a${numMessages + i}") } } } @@ -146,17 +150,17 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", val fromSequenceNr1 = 7L val fromSequenceNr2 = 3L - val publisher1 = PersistentFlow.fromProcessor(processorId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr1)).toPublisher() - val publisher2 = PersistentFlow.fromProcessor(processorId(2), publisherSettings.copy(fromSequenceNr = fromSequenceNr2)).toPublisher() + val publisher1 = PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr1)).toPublisher() + val publisher2 = PersistentFlow.fromPersistentActor(persistenceId(2), publisherSettings.copy(fromSequenceNr = fromSequenceNr2)).toPublisher() Flow(publisher1).merge(publisher2).foreach { - case Persistent(payload: String, sequenceNr) if (payload.startsWith("a")) ⇒ streamProbe1.ref ! s"${payload}-${sequenceNr}" - case Persistent(payload: String, sequenceNr) if (payload.startsWith("b")) ⇒ streamProbe2.ref ! s"${payload}-${sequenceNr}" + case event: String if (event.startsWith("a")) ⇒ streamProbe1.ref ! event + case event: String if (event.startsWith("b")) ⇒ streamProbe2.ref ! event } 1 to numMessages foreach { i ⇒ - if (i >= fromSequenceNr1) streamProbe1.expectMsg(s"a-${i}") - if (i >= fromSequenceNr2) streamProbe2.expectMsg(s"b-${i}") + if (i >= fromSequenceNr1) streamProbe1.expectMsg(s"a$i") + if (i >= fromSequenceNr2) streamProbe2.expectMsg(s"b$i") } } }