!peq #19324 update persistence-query to stream 2.0.1

This commit is contained in:
Patrik Nordwall 2016-01-01 16:22:58 +01:00
parent b218b069bc
commit c015f22824
6 changed files with 23 additions and 21 deletions

View file

@ -335,7 +335,7 @@ public class PersistenceQueryDocTest {
.eventsByPersistenceId("user-1337", 0L, Long.MAX_VALUE)
.map(envelope -> envelope.event())
.grouped(20) // batch inserts into groups of 20
.runWith(Sink.create(dbBatchWriter), mat); // write batches to read-side database
.runWith(Sink.fromSubscriber(dbBatchWriter), mat); // write batches to read-side database
//#projection-into-different-store-rs
}

View file

@ -8,7 +8,6 @@ import akka.actor._
import akka.persistence.{ Recovery, PersistentActor }
import akka.persistence.query._
import akka.stream.{ FlowShape, ActorMaterializer }
import akka.stream.scaladsl.FlowGraph
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.javadsl
import akka.testkit.AkkaSpec
@ -153,7 +152,7 @@ object PersistenceQueryDocSpec {
.map(envelope => envelope.event)
.map(convertToReadSideTypes) // convert to datatype
.grouped(20) // batch inserts into groups of 20
.runWith(Sink(dbBatchWriter)) // write batches to read-side database
.runWith(Sink.fromSubscriber(dbBatchWriter)) // write batches to read-side database
//#projection-into-different-store-rs
}

View file

@ -14,7 +14,7 @@ import scala.runtime.BoxedUnit
*/
class DummyReadJournal extends scaladsl.ReadJournal with scaladsl.AllPersistenceIdsQuery {
override def allPersistenceIds(): Source[String, Unit] =
Source(() Iterator.from(0)).map(_.toString)
Source.fromIterator(() Iterator.from(0)).map(_.toString)
}
object DummyReadJournal {

View file

@ -45,10 +45,12 @@ class AllPersistenceIdsSpec extends AkkaSpec(AllPersistenceIdsSpec.config)
expectMsg("c1-done")
val src = queries.currentPersistenceIds()
src.runWith(TestSink.probe[String])
.request(5)
.expectNextUnordered("a", "b", "c")
.expectComplete()
val probe = src.runWith(TestSink.probe[String])
probe.within(10.seconds) {
probe.request(5)
.expectNextUnordered("a", "b", "c")
.expectComplete()
}
}
"find new persistenceIds" in {
@ -58,20 +60,22 @@ class AllPersistenceIdsSpec extends AkkaSpec(AllPersistenceIdsSpec.config)
val src = queries.allPersistenceIds()
val probe = src.runWith(TestSink.probe[String])
.request(5)
.expectNextUnorderedN(List("a", "b", "c", "d"))
probe.within(10.seconds) {
probe.request(5)
.expectNextUnorderedN(List("a", "b", "c", "d"))
system.actorOf(TestActor.props("e")) ! "e1"
probe.expectNext("e")
system.actorOf(TestActor.props("e")) ! "e1"
probe.expectNext("e")
val more = (1 to 100).map("f" + _)
more.foreach { p
system.actorOf(TestActor.props(p)) ! p
val more = (1 to 100).map("f" + _)
more.foreach { p
system.actorOf(TestActor.props(p)) ! p
}
probe.request(100)
probe.expectNextUnorderedN(more)
}
probe.request(100)
probe.expectNextUnorderedN(more)
}
}

View file

@ -167,7 +167,6 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
private val journalFallbackConfigPath = "akka.persistence.journal-plugin-fallback"
private val snapshotStoreFallbackConfigPath = "akka.persistence.snapshot-store-plugin-fallback"
config.getStringList("journal.auto-start-journals").forEach(new Consumer[String] {
override def accept(id: String): Unit = {
log.info(s"Auto-starting journal plugin `$id`")

View file

@ -21,7 +21,7 @@ object Dependencies {
// Compile
// FIXME: change to project dependency once akka-stream merged to master
val akkaStream = "com.typesafe.akka" %% "akka-stream-experimental" % "1.0"
val akkaStream = "com.typesafe.akka" %% "akka-stream-experimental" % "2.0.1"
val camelCore = "org.apache.camel" % "camel-core" % "2.13.4" exclude("org.slf4j", "slf4j-api") // ApacheV2
@ -60,7 +60,7 @@ object Dependencies {
val junitIntf = "com.novocode" % "junit-interface" % "0.11" % "test" // MIT
val scalaXml = "org.scala-lang.modules" %% "scala-xml" % "1.0.4" % "test"
// FIXME: change to project dependency once akka-stream merged to master
val akkaStreamTestkit = "com.typesafe.akka" %% "akka-stream-testkit-experimental" % "1.0" % "test"
val akkaStreamTestkit = "com.typesafe.akka" %% "akka-stream-testkit-experimental" % "2.0.1" % "test"
// metrics, measurements, perf testing
val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.2" % "test" // ApacheV2