diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java index aecb3a43da..3add0a05ce 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java @@ -335,7 +335,7 @@ public class PersistenceQueryDocTest { .eventsByPersistenceId("user-1337", 0L, Long.MAX_VALUE) .map(envelope -> envelope.event()) .grouped(20) // batch inserts into groups of 20 - .runWith(Sink.create(dbBatchWriter), mat); // write batches to read-side database + .runWith(Sink.fromSubscriber(dbBatchWriter), mat); // write batches to read-side database //#projection-into-different-store-rs } diff --git a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala index 49dc80de3f..dc8cfdf57a 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala @@ -8,7 +8,6 @@ import akka.actor._ import akka.persistence.{ Recovery, PersistentActor } import akka.persistence.query._ import akka.stream.{ FlowShape, ActorMaterializer } -import akka.stream.scaladsl.FlowGraph import akka.stream.scaladsl.{ Flow, Sink, Source } import akka.stream.javadsl import akka.testkit.AkkaSpec @@ -153,7 +152,7 @@ object PersistenceQueryDocSpec { .map(envelope => envelope.event) .map(convertToReadSideTypes) // convert to datatype .grouped(20) // batch inserts into groups of 20 - .runWith(Sink(dbBatchWriter)) // write batches to read-side database + .runWith(Sink.fromSubscriber(dbBatchWriter)) // write batches to read-side database //#projection-into-different-store-rs } diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala index 480420a102..6879530545 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala @@ -14,7 +14,7 @@ import scala.runtime.BoxedUnit */ class DummyReadJournal extends scaladsl.ReadJournal with scaladsl.AllPersistenceIdsQuery { override def allPersistenceIds(): Source[String, Unit] = - Source(() ⇒ Iterator.from(0)).map(_.toString) + Source.fromIterator(() ⇒ Iterator.from(0)).map(_.toString) } object DummyReadJournal { diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala index f8db15ea33..15af8e2b79 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsSpec.scala @@ -45,10 +45,12 @@ class AllPersistenceIdsSpec extends AkkaSpec(AllPersistenceIdsSpec.config) expectMsg("c1-done") val src = queries.currentPersistenceIds() - src.runWith(TestSink.probe[String]) - .request(5) - .expectNextUnordered("a", "b", "c") - .expectComplete() + val probe = src.runWith(TestSink.probe[String]) + probe.within(10.seconds) { + probe.request(5) + .expectNextUnordered("a", "b", "c") + .expectComplete() + } } "find new persistenceIds" in { @@ -58,20 +60,22 @@ class AllPersistenceIdsSpec extends AkkaSpec(AllPersistenceIdsSpec.config) val src = queries.allPersistenceIds() val probe = src.runWith(TestSink.probe[String]) - .request(5) - .expectNextUnorderedN(List("a", "b", "c", "d")) + probe.within(10.seconds) { + probe.request(5) + .expectNextUnorderedN(List("a", "b", "c", "d")) - system.actorOf(TestActor.props("e")) ! "e1" - probe.expectNext("e") + system.actorOf(TestActor.props("e")) ! "e1" + probe.expectNext("e") - val more = (1 to 100).map("f" + _) - more.foreach { p ⇒ - system.actorOf(TestActor.props(p)) ! p + val more = (1 to 100).map("f" + _) + more.foreach { p ⇒ + system.actorOf(TestActor.props(p)) ! p + } + + probe.request(100) + probe.expectNextUnorderedN(more) } - probe.request(100) - probe.expectNextUnorderedN(more) - } } diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index a638de919d..b88c5ef320 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -167,7 +167,6 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { private val journalFallbackConfigPath = "akka.persistence.journal-plugin-fallback" private val snapshotStoreFallbackConfigPath = "akka.persistence.snapshot-store-plugin-fallback" - config.getStringList("journal.auto-start-journals").forEach(new Consumer[String] { override def accept(id: String): Unit = { log.info(s"Auto-starting journal plugin `$id`") diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 645e052439..a525579716 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -21,7 +21,7 @@ object Dependencies { // Compile // FIXME: change to project dependency once akka-stream merged to master - val akkaStream = "com.typesafe.akka" %% "akka-stream-experimental" % "1.0" + val akkaStream = "com.typesafe.akka" %% "akka-stream-experimental" % "2.0.1" val camelCore = "org.apache.camel" % "camel-core" % "2.13.4" exclude("org.slf4j", "slf4j-api") // ApacheV2 @@ -60,7 +60,7 @@ object Dependencies { val junitIntf = "com.novocode" % "junit-interface" % "0.11" % "test" // MIT val scalaXml = "org.scala-lang.modules" %% "scala-xml" % "1.0.4" % "test" // FIXME: change to project dependency once akka-stream merged to master - val akkaStreamTestkit = "com.typesafe.akka" %% "akka-stream-testkit-experimental" % "1.0" % "test" + val akkaStreamTestkit = "com.typesafe.akka" %% "akka-stream-testkit-experimental" % "2.0.1" % "test" // metrics, measurements, perf testing val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.2" % "test" // ApacheV2