diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceMultiDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceMultiDocSpec.scala index e66ee0291b..e89573de59 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceMultiDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceMultiDocSpec.scala @@ -21,21 +21,21 @@ object PersistenceMultiDocSpec { } //#default-plugins - val OverrideConfig = """ + val OverrideConfig = s""" //#override-config # Configuration entry for the custom journal plugin, see `journalPluginId`. akka.persistence.chronicle.journal { # Standard persistence extension property: provider FQCN. class = "akka.persistence.chronicle.ChronicleSyncJournal" # Custom setting specific for the journal `ChronicleSyncJournal`. - folder = ${user.dir}/store/journal + folder = $${user.dir}/store/journal } # Configuration entry for the custom snapshot store plugin, see `snapshotPluginId`. akka.persistence.chronicle.snapshot-store { # Standard persistence extension property: provider FQCN. class = "akka.persistence.chronicle.ChronicleSnapshotStore" # Custom setting specific for the snapshot store `ChronicleSnapshotStore`. - folder = ${user.dir}/store/snapshot + folder = $${user.dir}/store/snapshot } //#override-config """ diff --git a/akka-http-core/src/test/scala/akka/http/javadsl/HttpExtensionApiSpec.scala b/akka-http-core/src/test/scala/akka/http/javadsl/HttpExtensionApiSpec.scala index 748371d1a7..dce742fae2 100644 --- a/akka-http-core/src/test/scala/akka/http/javadsl/HttpExtensionApiSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/javadsl/HttpExtensionApiSpec.scala @@ -13,7 +13,7 @@ import akka.http.javadsl.model.ws._ import akka.http.javadsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings } import akka.japi.Pair import akka.actor.ActorSystem -import akka.event.{ NoLogging, LoggingAdapterTest } +import akka.event.NoLogging import akka.http.javadsl.model._ import akka.http.scaladsl.TestUtils import akka.japi.Function diff --git a/akka-http-testkit/build.sbt b/akka-http-testkit/build.sbt index fa3003fbd6..f7bfd7a569 100644 --- a/akka-http-testkit/build.sbt +++ b/akka-http-testkit/build.sbt @@ -2,7 +2,6 @@ import akka._ import com.typesafe.tools.mima.plugin.MimaKeys AkkaBuild.defaultSettings -AkkaBuild.experimentalSettings Formatting.formatSettings OSGi.httpTestkit Dependencies.httpTestkit diff --git a/akka-http-tests/build.sbt b/akka-http-tests/build.sbt index 3efb382ca1..360d3e92d9 100644 --- a/akka-http-tests/build.sbt +++ b/akka-http-tests/build.sbt @@ -2,7 +2,6 @@ import akka._ AkkaBuild.defaultSettings AkkaBuild.dontPublishSettings -AkkaBuild.experimentalSettings Formatting.formatSettings Dependencies.httpTests diff --git a/akka-persistence-shared/build.sbt b/akka-persistence-shared/build.sbt new file mode 100644 index 0000000000..00f7448099 --- /dev/null +++ b/akka-persistence-shared/build.sbt @@ -0,0 +1,8 @@ +import akka._ + +AkkaBuild.defaultSettings +AkkaBuild.dontPublishSettings +Formatting.formatSettings +Dependencies.persistenceShared + +fork in Test := true diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/leveldb/PersistencePluginProxySpec.scala b/akka-persistence-shared/src/test/scala/akka/persistence/journal/leveldb/PersistencePluginProxySpec.scala similarity index 100% rename from akka-persistence/src/test/scala/akka/persistence/journal/leveldb/PersistencePluginProxySpec.scala rename to akka-persistence-shared/src/test/scala/akka/persistence/journal/leveldb/PersistencePluginProxySpec.scala diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/leveldb/SharedLeveldbJournalSpec.scala b/akka-persistence-shared/src/test/scala/akka/persistence/journal/leveldb/SharedLeveldbJournalSpec.scala similarity index 100% rename from akka-persistence/src/test/scala/akka/persistence/journal/leveldb/SharedLeveldbJournalSpec.scala rename to akka-persistence-shared/src/test/scala/akka/persistence/journal/leveldb/SharedLeveldbJournalSpec.scala diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence-shared/src/test/scala/akka/persistence/serialization/SerializerSpec.scala similarity index 100% rename from akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala rename to akka-persistence-shared/src/test/scala/akka/persistence/serialization/SerializerSpec.scala diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index db15b8a1a4..3c8ca9ee12 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -100,23 +100,18 @@ public class FlowTest extends StreamTest { } @Test - public void mustBeAbleToUseStatefullMaponcat() throws Exception { + public void mustBeAbleToUseStatefulMaponcat() throws Exception { final JavaTestKit probe = new JavaTestKit(system); final java.lang.Iterable input = Arrays.asList(1, 2, 3, 4, 5); final Source ints = Source.from(input); final Flow flow = Flow.of(Integer.class).statefulMapConcat( - new Creator>>() { - public Function> create() { - int[] state = new int[1]; - state[0] = 0; - return new Function>() { - public List apply(Integer elem) { - List list = new ArrayList<>(Collections.nCopies(state[0], elem)); - state[0] = elem; - return list; - } - }; - } + () -> { + int[] state = new int[] {0}; + return (elem) -> { + List list = new ArrayList<>(Collections.nCopies(state[0], elem)); + state[0] = elem; + return list; + }; }); ints.via(flow) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 06a553f180..0005cec746 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -501,6 +501,68 @@ public class SourceTest extends StreamTest { probe.expectMsgEquals(2); } + @Test + public void mustBeAbleToUseStatefulMaponcat() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final java.lang.Iterable input = Arrays.asList(1, 2, 3, 4, 5); + final Source ints = Source.from(input).statefulMapConcat( + () -> { + int[] state = new int[] {0}; + return (elem) -> { + List list = new ArrayList<>(Collections.nCopies(state[0], elem)); + state[0] = elem; + return list; + }; + }); + + ints + .runFold("", (acc, elem) -> acc + elem, materializer) + .thenAccept(elem -> probe.getRef().tell(elem, ActorRef.noSender())); + + probe.expectMsgEquals("2334445555"); + } + + @Test + public void mustBeAbleToUseIntersperse() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Source source = Source.from(Arrays.asList("0", "1", "2", "3")) + .intersperse("[", ",", "]"); + + final CompletionStage future = + source.runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer); + + probe.expectMsgEquals("["); + probe.expectMsgEquals("0"); + probe.expectMsgEquals(","); + probe.expectMsgEquals("1"); + probe.expectMsgEquals(","); + probe.expectMsgEquals("2"); + probe.expectMsgEquals(","); + probe.expectMsgEquals("3"); + probe.expectMsgEquals("]"); + future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + } + + @Test + public void mustBeAbleToUseIntersperseAndConcat() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Source source = Source.from(Arrays.asList("0", "1", "2", "3")) + .intersperse(","); + + final CompletionStage future = + Source.single(">> ").concat(source).runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer); + + probe.expectMsgEquals(">> "); + probe.expectMsgEquals("0"); + probe.expectMsgEquals(","); + probe.expectMsgEquals("1"); + probe.expectMsgEquals(","); + probe.expectMsgEquals("2"); + probe.expectMsgEquals(","); + probe.expectMsgEquals("3"); + future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + } + @Test public void mustBeAbleToUseDropWhile() throws Exception { final JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 4e4bf7393a..8252f3838c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -362,12 +362,12 @@ object StreamLayout { } final case class CompositeModule( - override val subModules: Set[Module], - override val shape: Shape, - override val downstreams: Map[OutPort, InPort], - override val upstreams: Map[InPort, OutPort], - override val materializedValueComputation: MaterializedValueNode, - override val attributes: Attributes) extends Module { + override val subModules: Set[Module], + override val shape: Shape, + override val downstreams: Map[OutPort, InPort], + override val upstreams: Map[InPort, OutPort], + override val materializedValueComputation: MaterializedValueNode, + override val attributes: Attributes) extends Module { override def replaceShape(s: Shape): Module = { shape.requireSamePortsAs(s) @@ -392,13 +392,13 @@ object StreamLayout { } final case class FusedModule( - override val subModules: Set[Module], - override val shape: Shape, - override val downstreams: Map[OutPort, InPort], - override val upstreams: Map[InPort, OutPort], - override val materializedValueComputation: MaterializedValueNode, - override val attributes: Attributes, - info: Fusing.StructuralInfo) extends Module { + override val subModules: Set[Module], + override val shape: Shape, + override val downstreams: Map[OutPort, InPort], + override val upstreams: Map[InPort, OutPort], + override val materializedValueComputation: MaterializedValueNode, + override val attributes: Attributes, + info: Fusing.StructuralInfo) extends Module { override def isFused: Boolean = true diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 1505ecd57d..f3bbcee4ed 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -289,34 +289,34 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends new Flow(delegate.mapConcat { elem ⇒ Util.immutableSeq(f(elem)) }) /** - * Transform each input element into an `Iterable` of output elements that is - * then flattened into the output stream. The transformation is meant to be stateful, - * which is enabled by creating the transformation function anew for every materialization — - * the returned function will typically close over mutable objects to store state between - * invocations. For the stateless variant see [[#mapConcat]]. - * - * Make sure that the `Iterable` is immutable or at least not modified after - * being used as an output sequence. Otherwise the stream may fail with - * `ConcurrentModificationException` or other more subtle errors may occur. - * - * The returned `Iterable` MUST NOT contain `null` values, - * as they are illegal as stream elements - according to the Reactive Streams specification. - * - * '''Emits when''' the mapping function returns an element or there are still remaining elements - * from the previously calculated collection - * - * '''Backpressures when''' downstream backpressures or there are still remaining elements from the - * previously calculated collection - * - * '''Completes when''' upstream completes and all remaining elements has been emitted - * - * '''Cancels when''' downstream cancels - */ - def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]):javadsl.Flow[In, T, Mat] = - new Flow(delegate.statefulMapConcat{ () ⇒ { + * Transform each input element into an `Iterable` of output elements that is + * then flattened into the output stream. The transformation is meant to be stateful, + * which is enabled by creating the transformation function anew for every materialization — + * the returned function will typically close over mutable objects to store state between + * invocations. For the stateless variant see [[#mapConcat]]. + * + * Make sure that the `Iterable` is immutable or at least not modified after + * being used as an output sequence. Otherwise the stream may fail with + * `ConcurrentModificationException` or other more subtle errors may occur. + * + * The returned `Iterable` MUST NOT contain `null` values, + * as they are illegal as stream elements - according to the Reactive Streams specification. + * + * '''Emits when''' the mapping function returns an element or there are still remaining elements + * from the previously calculated collection + * + * '''Backpressures when''' downstream backpressures or there are still remaining elements from the + * previously calculated collection + * + * '''Completes when''' upstream completes and all remaining elements has been emitted + * + * '''Cancels when''' downstream cancels + */ + def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.statefulMapConcat { () ⇒ val fun = f.create() elem ⇒ Util.immutableSeq(fun(elem)) - }}) + }) /** * Transform this stream by applying the given function to each of the elements @@ -762,23 +762,23 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends new Flow(delegate.recover(pf)) /** - * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after - * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new - * Source may be materialized. - * - * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. - * This stage can recover the failure signal, but not the skipped elements, which will be dropped. - * - * '''Emits when''' element is available from the upstream or upstream is failed and element is available - * from alternative Source - * - * '''Backpressures when''' downstream backpressures - * - * '''Completes when''' upstream completes or upstream failed with exception pf can handle - * - * '''Cancels when''' downstream cancels - * - */ + * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new + * Source may be materialized. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Flow[In, T, Mat @uncheckedVariance] = new Flow(delegate.recoverWith(pf)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 96fc23335a..178157febd 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -735,23 +735,23 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap new Source(delegate.recover(pf)) /** - * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after - * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new - * Source may be materialized. - * - * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. - * This stage can recover the failure signal, but not the skipped elements, which will be dropped. - * - * '''Emits when''' element is available from the upstream or upstream is failed and element is available - * from alternative Source - * - * '''Backpressures when''' downstream backpressures - * - * '''Completes when''' upstream completes or upstream failed with exception pf can handle - * - * '''Cancels when''' downstream cancels - * - */ + * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new + * Source may be materialized. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): Source[T, Mat @uncheckedVariance] = new Source(delegate.recoverWith(pf)) @@ -777,38 +777,37 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * '''Cancels when''' downstream cancels */ def mapConcat[T](f: function.Function[Out, _ <: java.lang.Iterable[T]]): javadsl.Source[T, Mat] = - new Source(delegate.statefulMapConcat(() ⇒ elem ⇒ Util.immutableSeq(f.apply(elem)))) - + new Source(delegate.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) /** - * Transform each input element into an `Iterable` of output elements that is - * then flattened into the output stream. The transformation is meant to be stateful, - * which is enabled by creating the transformation function anew for every materialization — - * the returned function will typically close over mutable objects to store state between - * invocations. For the stateless variant see [[#mapConcat]]. - * - * Make sure that the `Iterable` is immutable or at least not modified after - * being used as an output sequence. Otherwise the stream may fail with - * `ConcurrentModificationException` or other more subtle errors may occur. - * - * The returned `Iterable` MUST NOT contain `null` values, - * as they are illegal as stream elements - according to the Reactive Streams specification. - * - * '''Emits when''' the mapping function returns an element or there are still remaining elements - * from the previously calculated collection - * - * '''Backpressures when''' downstream backpressures or there are still remaining elements from the - * previously calculated collection - * - * '''Completes when''' upstream completes and all remaining elements has been emitted - * - * '''Cancels when''' downstream cancels - */ + * Transform each input element into an `Iterable` of output elements that is + * then flattened into the output stream. The transformation is meant to be stateful, + * which is enabled by creating the transformation function anew for every materialization — + * the returned function will typically close over mutable objects to store state between + * invocations. For the stateless variant see [[#mapConcat]]. + * + * Make sure that the `Iterable` is immutable or at least not modified after + * being used as an output sequence. Otherwise the stream may fail with + * `ConcurrentModificationException` or other more subtle errors may occur. + * + * The returned `Iterable` MUST NOT contain `null` values, + * as they are illegal as stream elements - according to the Reactive Streams specification. + * + * '''Emits when''' the mapping function returns an element or there are still remaining elements + * from the previously calculated collection + * + * '''Backpressures when''' downstream backpressures or there are still remaining elements from the + * previously calculated collection + * + * '''Completes when''' upstream completes and all remaining elements has been emitted + * + * '''Cancels when''' downstream cancels + */ def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): javadsl.Source[T, Mat] = - new Source(delegate.statefulMapConcat{ () ⇒ { + new Source(delegate.statefulMapConcat { () ⇒ val fun = f.create() elem ⇒ Util.immutableSeq(fun(elem)) - }}) + }) /** * Transform this stream by applying the given function to each of the elements diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 3d9236b10b..26ea46a27a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -140,38 +140,37 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * '''Cancels when''' downstream cancels */ def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): SubFlow[In, T, Mat] = - new SubFlow(delegate.statefulMapConcat { () ⇒ elem ⇒ Util.immutableSeq(f(elem)) }) + new SubFlow(delegate.mapConcat { elem ⇒ Util.immutableSeq(f(elem)) }) /** - * Transform each input element into an `Iterable` of output elements that is - * then flattened into the output stream. The transformation is meant to be stateful, - * which is enabled by creating the transformation function anew for every materialization — - * the returned function will typically close over mutable objects to store state between - * invocations. For the stateless variant see [[#mapConcat]]. - * - * Make sure that the `Iterable` is immutable or at least not modified after - * being used as an output sequence. Otherwise the stream may fail with - * `ConcurrentModificationException` or other more subtle errors may occur. - * - * The returned `Iterable` MUST NOT contain `null` values, - * as they are illegal as stream elements - according to the Reactive Streams specification. - * - * '''Emits when''' the mapping function returns an element or there are still remaining elements - * from the previously calculated collection - * - * '''Backpressures when''' downstream backpressures or there are still remaining elements from the - * previously calculated collection - * - * '''Completes when''' upstream completes and all remaining elements has been emitted - * - * '''Cancels when''' downstream cancels - */ + * Transform each input element into an `Iterable` of output elements that is + * then flattened into the output stream. The transformation is meant to be stateful, + * which is enabled by creating the transformation function anew for every materialization — + * the returned function will typically close over mutable objects to store state between + * invocations. For the stateless variant see [[#mapConcat]]. + * + * Make sure that the `Iterable` is immutable or at least not modified after + * being used as an output sequence. Otherwise the stream may fail with + * `ConcurrentModificationException` or other more subtle errors may occur. + * + * The returned `Iterable` MUST NOT contain `null` values, + * as they are illegal as stream elements - according to the Reactive Streams specification. + * + * '''Emits when''' the mapping function returns an element or there are still remaining elements + * from the previously calculated collection + * + * '''Backpressures when''' downstream backpressures or there are still remaining elements from the + * previously calculated collection + * + * '''Completes when''' upstream completes and all remaining elements has been emitted + * + * '''Cancels when''' downstream cancels + */ def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubFlow[In, T, Mat] = - new SubFlow(delegate.statefulMapConcat{ () ⇒ { + new SubFlow(delegate.statefulMapConcat { () ⇒ val fun = f.create() elem ⇒ Util.immutableSeq(fun(elem)) - }}) - + }) /** * Transform this stream by applying the given function to each of the elements @@ -615,23 +614,23 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo new SubFlow(delegate.recover(pf)) /** - * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after - * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new - * Source may be materialized. - * - * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. - * This stage can recover the failure signal, but not the skipped elements, which will be dropped. - * - * '''Emits when''' element is available from the upstream or upstream is failed and element is available - * from alternative Source - * - * '''Backpressures when''' downstream backpressures - * - * '''Completes when''' upstream completes or upstream failed with exception pf can handle - * - * '''Cancels when''' downstream cancels - * - */ + * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new + * Source may be materialized. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubFlow[In, T, Mat @uncheckedVariance] = new SubFlow(delegate.recoverWith(pf)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index ae682fcb45..96b8dce5b2 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -138,37 +138,37 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * '''Cancels when''' downstream cancels */ def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): SubSource[T, Mat] = - new SubSource(delegate.statefulMapConcat { () ⇒ elem ⇒ Util.immutableSeq(f(elem)) }) + new SubSource(delegate.mapConcat { elem ⇒ Util.immutableSeq(f(elem)) }) /** - * Transform each input element into an `Iterable` of output elements that is - * then flattened into the output stream. The transformation is meant to be stateful, - * which is enabled by creating the transformation function anew for every materialization — - * the returned function will typically close over mutable objects to store state between - * invocations. For the stateless variant see [[#mapConcat]]. - * - * Make sure that the `Iterable` is immutable or at least not modified after - * being used as an output sequence. Otherwise the stream may fail with - * `ConcurrentModificationException` or other more subtle errors may occur. - * - * The returned `Iterable` MUST NOT contain `null` values, - * as they are illegal as stream elements - according to the Reactive Streams specification. - * - * '''Emits when''' the mapping function returns an element or there are still remaining elements - * from the previously calculated collection - * - * '''Backpressures when''' downstream backpressures or there are still remaining elements from the - * previously calculated collection - * - * '''Completes when''' upstream completes and all remaining elements has been emitted - * - * '''Cancels when''' downstream cancels - */ - def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]):SubSource[T, Mat] = - new SubSource(delegate.statefulMapConcat{ () ⇒ { + * Transform each input element into an `Iterable` of output elements that is + * then flattened into the output stream. The transformation is meant to be stateful, + * which is enabled by creating the transformation function anew for every materialization — + * the returned function will typically close over mutable objects to store state between + * invocations. For the stateless variant see [[#mapConcat]]. + * + * Make sure that the `Iterable` is immutable or at least not modified after + * being used as an output sequence. Otherwise the stream may fail with + * `ConcurrentModificationException` or other more subtle errors may occur. + * + * The returned `Iterable` MUST NOT contain `null` values, + * as they are illegal as stream elements - according to the Reactive Streams specification. + * + * '''Emits when''' the mapping function returns an element or there are still remaining elements + * from the previously calculated collection + * + * '''Backpressures when''' downstream backpressures or there are still remaining elements from the + * previously calculated collection + * + * '''Completes when''' upstream completes and all remaining elements has been emitted + * + * '''Cancels when''' downstream cancels + */ + def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubSource[T, Mat] = + new SubSource(delegate.statefulMapConcat { () ⇒ val fun = f.create() elem ⇒ Util.immutableSeq(fun(elem)) - }}) + }) /** * Transform this stream by applying the given function to each of the elements @@ -612,23 +612,23 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source new SubSource(delegate.recover(pf)) /** - * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after - * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new - * Source may be materialized. - * - * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. - * This stage can recover the failure signal, but not the skipped elements, which will be dropped. - * - * '''Emits when''' element is available from the upstream or upstream is failed and element is available - * from alternative Source - * - * '''Backpressures when''' downstream backpressures - * - * '''Completes when''' upstream completes or upstream failed with exception pf can handle - * - * '''Cancels when''' downstream cancels - * - */ + * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new + * Source may be materialized. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubSource[T, Mat @uncheckedVariance] = new SubSource(delegate.recoverWith(pf)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 01b61eb14e..b94777b096 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -471,7 +471,7 @@ final class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) ext object Partition { - case class PartitionOutOfBoundsException(msg:String) extends RuntimeException(msg) with NoStackTrace + case class PartitionOutOfBoundsException(msg: String) extends IndexOutOfBoundsException(msg) with NoStackTrace /** * Create a new `Partition` stage with the specified input type. @@ -1009,7 +1009,7 @@ object GraphDSL extends GraphApply { } private class PortOpsImpl[+Out](override val outlet: Outlet[Out @uncheckedVariance], b: Builder[_]) - extends PortOps[Out] { + extends PortOps[Out] { override def withAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported override def addAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported @@ -1019,7 +1019,6 @@ object GraphDSL extends GraphApply { private def settingAttrNotSupported = new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port") - override def importAndGetPort(b: Builder[_]): Outlet[Out @uncheckedVariance] = outlet override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 84ecfb4f69..df1358a253 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -55,7 +55,8 @@ object AkkaBuild extends Build { ), aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, clusterMetrics, clusterTools, clusterSharding, distributedData, - slf4j, agent, persistence, persistenceQuery, persistenceTck, kernel, osgi, docs, contrib, samples, multiNodeTestkit, benchJmh, typed, protobuf, + slf4j, agent, persistence, persistenceQuery, persistenceTck, persistenceShared, + kernel, osgi, docs, contrib, samples, multiNodeTestkit, benchJmh, typed, protobuf, stream, streamTestkit, streamTests, streamTestsTck, parsing, httpCore, http, httpSprayJson, httpXml, httpJackson, httpTests, httpTestkit ) @@ -68,7 +69,8 @@ object AkkaBuild extends Build { // samples don't work with dbuild right now aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, clusterMetrics, clusterTools, clusterSharding, distributedData, - slf4j, persistence, persistenceQuery, persistenceTck, kernel, osgi, contrib, multiNodeTestkit, benchJmh, typed, protobuf, + slf4j, persistence, persistenceQuery, persistenceTck, persistenceShared, + kernel, osgi, contrib, multiNodeTestkit, benchJmh, typed, protobuf, stream, streamTestkit, streamTests, streamTestsTck, parsing, httpCore, http, httpSprayJson, httpXml, httpJackson, httpTests, httpTestkit ) @@ -181,7 +183,7 @@ object AkkaBuild extends Build { lazy val persistence = Project( id = "akka-persistence", base = file("akka-persistence"), - dependencies = Seq(actor, remote % "test->test", testkit % "test->test", protobuf) + dependencies = Seq(actor, testkit % "test->test", protobuf) ) lazy val persistenceQuery = Project( @@ -200,6 +202,12 @@ object AkkaBuild extends Build { dependencies = Seq(persistence % "compile;provided->provided;test->test", testkit % "compile;test->test") ) + lazy val persistenceShared = Project( + id = "akka-persistence-shared", + base = file("akka-persistence-shared"), + dependencies = Seq(persistence % "test->test", testkit % "test->test", remote % "test", protobuf) + ) + lazy val httpCore = Project( id = "akka-http-core", base = file("akka-http-core"), @@ -213,13 +221,13 @@ object AkkaBuild extends Build { ) lazy val httpTestkit = Project( - id = "akka-http-testkit-experimental", + id = "akka-http-testkit", base = file("akka-http-testkit"), dependencies = Seq(http, streamTestkit) ) lazy val httpTests = Project( - id = "akka-http-tests-experimental", + id = "akka-http-tests", base = file("akka-http-tests"), dependencies = Seq(httpTestkit % "test", httpSprayJson, httpXml, httpJackson) ) @@ -272,8 +280,8 @@ object AkkaBuild extends Build { lazy val streamTestkit = Project( id = "akka-stream-testkit", - base = file("akka-stream-testkit"), // TODO that persistence dependency - dependencies = Seq(stream, persistence % "compile;provided->provided;test->test", testkit % "compile;test->test") + base = file("akka-stream-testkit"), + dependencies = Seq(stream, testkit % "compile;test->test") ) lazy val streamTests = Project( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ae88f434e3..9555a7001f 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -140,12 +140,14 @@ object Dependencies { val agent = l ++= Seq(scalaStm.value, Test.scalatest.value, Test.junit) - val persistence = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Test.scalatest.value, Test.junit, Test.commonsIo, Test.scalaXml) + val persistence = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Test.scalatest.value, Test.junit, Test.commonsIo, Test.commonsCodec, Test.scalaXml) val persistenceQuery = l ++= Seq(Test.scalatest.value, Test.junit, Test.commonsIo) val persistenceTck = l ++= Seq(Test.scalatest.value.copy(configurations = Some("compile")), Test.junit.copy(configurations = Some("compile"))) + val persistenceShared = l ++= Seq(Provided.levelDB, Provided.levelDBNative) + val kernel = l ++= Seq(Test.scalatest.value, Test.junit) val camel = l ++= Seq(camelCore, Test.scalatest.value, Test.junit, Test.mockito, Test.logback, Test.commonsIo, Test.junitIntf)