Merge pull request #15720 from akka/wip-15705-stream-dependency-akka-2.3.5-patriknw

!str #15705 Update to Akka 2.3.5, and new akka-persistent api
This commit is contained in:
Björn Antonsson 2014-08-22 12:16:26 +02:00
commit 630169fa2d
4 changed files with 157 additions and 91 deletions

View file

@ -21,42 +21,43 @@ import akka.stream.scaladsl.Flow
object PersistentFlow {
/**
* Starts a new [[akka.persistence.Persistent]] message flow from the given processor,
* identified by `processorId`. Elements are pulled from the processor's
* journal (using a [[akka.persistence.View]]) in accordance with the demand coming from
* the downstream transformation steps.
* Starts a new event flow from the given [[akka.persistence.PersistentActor]],
* identified by `persistenceId`. Events are pulled from the peristent actor's
* journal (using a [[akka.persistence.PersistentView]]) in accordance with the
* demand coming from the downstream transformation steps.
*
* Elements pulled from the processor's journal are buffered in memory so that
* Elements pulled from the peristent actor's journal are buffered in memory so that
* fine-grained demands (requests) from downstream can be served efficiently.
*/
def fromProcessor(processorId: String): Flow[Persistent] =
fromProcessor(processorId, PersistentPublisherSettings())
def fromPersistentActor(persistenceId: String): Flow[Any] =
fromPersistentActor(persistenceId, PersistentPublisherSettings())
/**
* Starts a new [[akka.persistence.Persistent]] message flow from the given processor,
* identified by `processorId`. Elements are pulled from the processor's
* journal (using a [[akka.persistence.View]]) in accordance with the demand coming from
* the downstream transformation steps.
* Starts a new event flow from the given [[akka.persistence.PersistentActor]],
* identified by `persistenceId`. Events are pulled from the peristent actor's
* journal (using a [[akka.persistence.PersistentView]]) in accordance with the
* demand coming from the downstream transformation steps.
*
* Elements pulled from the processor's journal are buffered in memory so that
* Elements pulled from the peristent actor's journal are buffered in memory so that
* fine-grained demands (requests) from downstream can be served efficiently.
*
* Reads from the journal are done in (coarse-grained) batches of configurable
* size (which correspond to the configurable maximum buffer size).
*
* @see [[akka.persistence.PersistentPublisherSettings]]
*/
def fromProcessor(processorId: String, publisherSettings: PersistentPublisherSettings): Flow[Persistent] =
FlowImpl(PersistentPublisherNode(processorId, publisherSettings), Nil)
def fromPersistentActor(persistenceId: String, publisherSettings: PersistentPublisherSettings): Flow[Any] =
FlowImpl(PersistentPublisherNode(persistenceId, publisherSettings), Nil)
}
/**
* Configuration object for a [[akka.persistence.Persistent]] stream publisher.
* Configuration object for a persistent stream publisher.
*
* @param fromSequenceNr Sequence number where the published stream shall start (inclusive).
* Default is `1L`.
* @param maxBufferSize Maximum number of persistent messages to be buffered in memory (per publisher).
* @param maxBufferSize Maximum number of persistent events to be buffered in memory (per publisher).
* Default is `100`.
* @param idle Optional duration to wait if no more persistent messages can be pulled from the journal
* @param idle Optional duration to wait if no more persistent events can be pulled from the journal
* before attempting the next pull. Default is `None` which causes the publisher to take
* the value defined by the `akka.persistence.view.auto-update-interval` configuration
* key. If defined, the `idle` value is taken directly.
@ -66,36 +67,36 @@ case class PersistentPublisherSettings(fromSequenceNr: Long = 1L, maxBufferSize:
}
private object PersistentPublisher {
def props(processorId: String, publisherSettings: PersistentPublisherSettings, settings: MaterializerSettings): Props =
Props(classOf[PersistentPublisherImpl], processorId, publisherSettings, settings).withDispatcher(settings.dispatcher)
def props(persistenceId: String, publisherSettings: PersistentPublisherSettings, settings: MaterializerSettings): Props =
Props(classOf[PersistentPublisherImpl], persistenceId, publisherSettings, settings).withDispatcher(settings.dispatcher)
}
private case class PersistentPublisherNode(processorId: String, publisherSettings: PersistentPublisherSettings) extends PublisherNode[Persistent] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[Persistent] =
ActorPublisher[Persistent](materializer.actorOf(PersistentPublisher.props(processorId, publisherSettings, materializer.settings),
private case class PersistentPublisherNode(persistenceId: String, publisherSettings: PersistentPublisherSettings) extends PublisherNode[Any] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[Any] =
ActorPublisher[Any](materializer.actorOf(PersistentPublisher.props(persistenceId, publisherSettings, materializer.settings),
name = s"$flowName-0-persistentPublisher"))
}
private class PersistentPublisherImpl(processorId: String, publisherSettings: PersistentPublisherSettings, materializerSettings: MaterializerSettings)
private class PersistentPublisherImpl(persistenceId: String, publisherSettings: PersistentPublisherSettings, materializerSettings: MaterializerSettings)
extends Actor
with ActorLogging
with SubscriberManagement[Persistent]
with SubscriberManagement[Any]
with SoftShutdown {
import ActorBasedFlowMaterializer._
import PersistentPublisherBuffer._
type S = ActorSubscription[Persistent]
type S = ActorSubscription[Any]
private val buffer = context.actorOf(Props(classOf[PersistentPublisherBuffer], processorId, publisherSettings, self).
private val buffer = context.actorOf(Props(classOf[PersistentPublisherBuffer], persistenceId, publisherSettings, self).
withDispatcher(context.props.dispatcher), "publisherBuffer")
private var pub: ActorPublisher[Persistent] = _
private var pub: ActorPublisher[Any] = _
private var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason
final def receive = {
case ExposedPublisher(pub)
this.pub = pub.asInstanceOf[ActorPublisher[Persistent]]
this.pub = pub.asInstanceOf[ActorPublisher[Any]]
context.become(waitingForSubscribers)
}
@ -130,7 +131,7 @@ private class PersistentPublisherImpl(processorId: String, publisherSettings: Pe
override def maxBufferSize =
materializerSettings.maxFanOutBufferSize
override def createSubscription(subscriber: Subscriber[Persistent]): ActorSubscription[Persistent] =
override def createSubscription(subscriber: Subscriber[Any]): ActorSubscription[Any] =
new ActorSubscription(self, subscriber)
override def cancelUpstream(): Unit = {
@ -151,31 +152,29 @@ private class PersistentPublisherImpl(processorId: String, publisherSettings: Pe
private object PersistentPublisherBuffer {
case class Request(num: Int)
case class Response(messages: Vector[Persistent])
case class Response(events: Vector[Any])
case object Fill
case object Filled
}
/**
* A view that buffers up to `publisherSettings.maxBufferSize` persistent messages in memory.
* A view that buffers up to `publisherSettings.maxBufferSize` persistent events in memory.
* Downstream demands (requests) are served if the buffer is non-empty either while filling
* the buffer or after having filled the buffer. When the buffer becomes empty new persistent
* messages are loaded from the journal (in batches up to `publisherSettings.maxBufferSize`).
* events are loaded from the journal (in batches up to `publisherSettings.maxBufferSize`).
*/
private class PersistentPublisherBuffer(override val processorId: String, publisherSettings: PersistentPublisherSettings, publisher: ActorRef) extends View {
private class PersistentPublisherBuffer(override val persistenceId: String, publisherSettings: PersistentPublisherSettings, publisher: ActorRef) extends PersistentView {
import PersistentPublisherBuffer._
import context.dispatcher
private var replayed = 0
private var requested = 0
private var buffer: Vector[Persistent] = Vector.empty
private var buffer: Vector[Any] = Vector.empty
override def viewId: String = persistenceId + "-stream-view"
private val filling: Receive = {
case p: Persistent
buffer :+= p
replayed += 1
if (requested > 0) respond(requested)
case Filled
if (buffer.nonEmpty && requested > 0) respond(requested)
if (buffer.nonEmpty) pause()
@ -184,6 +183,10 @@ private class PersistentPublisherBuffer(override val processorId: String, publis
case Request(num)
requested += num
if (buffer.nonEmpty) respond(requested)
case persistentEvent
buffer :+= persistentEvent
replayed += 1
if (requested > 0) respond(requested)
}
private val pausing: Receive = {

View file

@ -39,9 +39,9 @@ trait PersistenceSpec extends BeforeAndAfterEach with Cleanup { this: AkkaSpec
def namePrefix: String = system.name
/**
* Creates a processor with current name as constructor argument.
* Creates a persistent actor with current name as constructor argument.
*/
def namedProcessor[T <: NamedProcessor: ClassTag] =
def namedPersistentActor[T <: NamedPersistentActor: ClassTag] =
system.actorOf(Props(implicitly[ClassTag[T]].runtimeClass, name))
override protected def beforeEach() {
@ -78,6 +78,6 @@ trait Cleanup { this: AkkaSpec ⇒
}
}
abstract class NamedProcessor(name: String) extends Processor {
override def processorId: String = name
abstract class NamedPersistentActor(name: String) extends PersistentActor {
override def persistenceId: String = name
}

View file

@ -0,0 +1,59 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.stream
import org.reactivestreams.Publisher
import akka.actor._
import akka.persistence.PersistentActor
import akka.persistence.stream.PersistentFlow
import akka.stream._
import akka.stream.scaladsl._
// FIXME: move this file to akka-sample-persistence-scala once going back to project dependencies
/**
* This example demonstrates how akka-persistence Views can be used as reactive-stream Publishers. A
* View-based Publisher is created with PersistentFlow.fromPersistentActor(persistenceId: String).toPublisher().
* This Publisher produces events as they are written by its corresponding akka-persistence
* PersistentActor. The PersistentFlow object is an extension to the akka-stream DSL.
*/
object PersistentPublisherExample extends App {
implicit val system = ActorSystem("example")
class ExamplePersistentActor(pid: String) extends PersistentActor {
override def persistenceId = pid
override def receiveCommand = {
case cmd: String persist(cmd) { event
// update state...
}
}
override def receiveRecover = {
case event: String // update state...
}
}
val p1 = system.actorOf(Props(classOf[ExamplePersistentActor], "p1"))
val p2 = system.actorOf(Props(classOf[ExamplePersistentActor], "p2"))
implicit val materializer = FlowMaterializer(MaterializerSettings())
// 1 view-backed publisher and 2 subscribers:
val publisher1: Publisher[Any] = PersistentFlow.fromPersistentActor("p1").toPublisher()
Flow(publisher1).foreach(event println(s"subscriber-1: $event"))
Flow(publisher1).foreach(event println(s"subscriber-2: $event"))
// 2 view-backed publishers (merged) and 1 subscriber:
// This is an example how message/event streams from multiple processors can be merged into a single stream.
val publisher2: Publisher[Any] = PersistentFlow.fromPersistentActor("p1").toPublisher()
val merged: Publisher[Any] = PersistentFlow.fromPersistentActor("p2").merge(publisher2).toPublisher()
Flow(merged).foreach(event println(s"subscriber-3: $event"))
while (true) {
p1 ! s"a-${System.currentTimeMillis()}"
p2 ! s"b-${System.currentTimeMillis()}"
Thread.sleep(500)
}
}

View file

@ -17,9 +17,13 @@ import akka.testkit.TestProbe
// ------------------------------------------------------------------------------------------------
object PersistentPublisherSpec {
class TestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) {
def receive = {
case Persistent(payload, sequenceNr) probe ! s"${payload}-${sequenceNr}"
class TestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) {
override def receiveCommand = {
case cmd persist(cmd) { event probe ! s"${event}-${lastSequenceNr}" }
}
override def receiveRecover = {
case RecoveryCompleted // ignore
case event probe ! s"${event}-${lastSequenceNr}"
}
}
}
@ -32,78 +36,78 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb",
val publisherSettings = PersistentPublisherSettings(idle = Some(100.millis))
implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
var processor1: ActorRef = _
var processor2: ActorRef = _
var persistentActor1: ActorRef = _
var persistentActor2: ActorRef = _
var processor1Probe: TestProbe = _
var processor2Probe: TestProbe = _
var persistentActor1Probe: TestProbe = _
var persistentActor2Probe: TestProbe = _
def processorId(num: Int): String =
def persistenceId(num: Int): String =
name + num
override protected def beforeEach(): Unit = {
super.beforeEach()
processor1Probe = TestProbe()
processor2Probe = TestProbe()
persistentActor1Probe = TestProbe()
persistentActor2Probe = TestProbe()
processor1 = system.actorOf(Props(classOf[TestProcessor], processorId(1), processor1Probe.ref))
processor2 = system.actorOf(Props(classOf[TestProcessor], processorId(2), processor2Probe.ref))
persistentActor1 = system.actorOf(Props(classOf[TestPersistentActor], persistenceId(1), persistentActor1Probe.ref))
persistentActor2 = system.actorOf(Props(classOf[TestPersistentActor], persistenceId(2), persistentActor2Probe.ref))
1 to numMessages foreach { i
processor1 ! Persistent("a")
processor2 ! Persistent("b")
persistentActor1 ! ("a" + i)
persistentActor2 ! ("b" + i)
processor1Probe.expectMsg(s"a-${i}")
processor2Probe.expectMsg(s"b-${i}")
persistentActor1Probe.expectMsg(s"a$i-$i")
persistentActor2Probe.expectMsg(s"b$i-$i")
}
}
override protected def afterEach(): Unit = {
system.stop(processor1)
system.stop(processor1)
system.stop(persistentActor1)
system.stop(persistentActor1)
super.afterEach()
}
"A view publisher" must {
"pull existing messages from a processor's journal" in {
"pull existing events from a persistent actor's journal" in {
val streamProbe = TestProbe()
PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach {
case Persistent(payload, sequenceNr) streamProbe.ref ! s"${payload}-${sequenceNr}"
PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings).foreach {
case event streamProbe.ref ! event
}
1 to numMessages foreach { i
streamProbe.expectMsg(s"a-${i}")
streamProbe.expectMsg(s"a$i")
}
}
"pull existing messages and new from a processor's journal" in {
"pull existing events and new from a persistent actor's journal" in {
val streamProbe = TestProbe()
PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach {
case Persistent(payload, sequenceNr) streamProbe.ref ! s"${payload}-${sequenceNr}"
PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings).foreach {
case event streamProbe.ref ! event
}
1 to numMessages foreach { i
streamProbe.expectMsg(s"a-${i}")
streamProbe.expectMsg(s"a$i")
}
processor1 ! Persistent("a")
processor1 ! Persistent("a")
persistentActor1 ! s"a${numMessages + 1}"
persistentActor1 ! s"a${numMessages + 2}"
streamProbe.expectMsg(s"a-${numMessages + 1}")
streamProbe.expectMsg(s"a-${numMessages + 2}")
streamProbe.expectMsg(s"a${numMessages + 1}")
streamProbe.expectMsg(s"a${numMessages + 2}")
}
"pull existing messages from a processor's journal starting form a specified sequence number" in {
"pull existing events from a persistent actor's journal starting form a specified sequence number" in {
val streamProbe = TestProbe()
val fromSequenceNr = 5L
PersistentFlow.fromProcessor(processorId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr)).foreach {
case Persistent(payload, sequenceNr) streamProbe.ref ! s"${payload}-${sequenceNr}"
PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr)).foreach {
case event streamProbe.ref ! event
}
fromSequenceNr to numMessages foreach { i
streamProbe.expectMsg(s"a-${i}")
streamProbe.expectMsg(s"a$i")
}
}
}
@ -113,27 +117,27 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb",
val streamProbe1 = TestProbe()
val streamProbe2 = TestProbe()
val publisher = PersistentFlow.fromProcessor(processorId(1), publisherSettings).toPublisher()
val publisher = PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings).toPublisher()
Flow(publisher).foreach {
case Persistent(payload, sequenceNr) streamProbe1.ref ! s"${payload}-${sequenceNr}"
case event streamProbe1.ref ! event
}
// let subscriber consume all existing messages
// let subscriber consume all existing events
1 to numMessages foreach { i
streamProbe1.expectMsg(s"a-${i}")
streamProbe1.expectMsg(s"a$i")
}
// subscribe another subscriber
Flow(publisher).foreach {
case Persistent(payload, sequenceNr) streamProbe2.ref ! s"${payload}-${sequenceNr}"
case event streamProbe2.ref ! event
}
// produce new messages and let both subscribers handle them
// produce new events and let both subscribers handle them
1 to 2 foreach { i
processor1 ! Persistent("a")
streamProbe1.expectMsg(s"a-${numMessages + i}")
streamProbe2.expectMsg(s"a-${numMessages + i}")
persistentActor1 ! s"a${numMessages + i}"
streamProbe1.expectMsg(s"a${numMessages + i}")
streamProbe2.expectMsg(s"a${numMessages + i}")
}
}
}
@ -146,17 +150,17 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb",
val fromSequenceNr1 = 7L
val fromSequenceNr2 = 3L
val publisher1 = PersistentFlow.fromProcessor(processorId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr1)).toPublisher()
val publisher2 = PersistentFlow.fromProcessor(processorId(2), publisherSettings.copy(fromSequenceNr = fromSequenceNr2)).toPublisher()
val publisher1 = PersistentFlow.fromPersistentActor(persistenceId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr1)).toPublisher()
val publisher2 = PersistentFlow.fromPersistentActor(persistenceId(2), publisherSettings.copy(fromSequenceNr = fromSequenceNr2)).toPublisher()
Flow(publisher1).merge(publisher2).foreach {
case Persistent(payload: String, sequenceNr) if (payload.startsWith("a")) streamProbe1.ref ! s"${payload}-${sequenceNr}"
case Persistent(payload: String, sequenceNr) if (payload.startsWith("b")) streamProbe2.ref ! s"${payload}-${sequenceNr}"
case event: String if (event.startsWith("a")) streamProbe1.ref ! event
case event: String if (event.startsWith("b")) streamProbe2.ref ! event
}
1 to numMessages foreach { i
if (i >= fromSequenceNr1) streamProbe1.expectMsg(s"a-${i}")
if (i >= fromSequenceNr2) streamProbe2.expectMsg(s"b-${i}")
if (i >= fromSequenceNr1) streamProbe1.expectMsg(s"a$i")
if (i >= fromSequenceNr2) streamProbe2.expectMsg(s"b$i")
}
}
}