clean up dependencies #19755 and other fixes

* remove inter-project dependencies between streamTestkit -> peristence -> remote
* make PartitionOutOfBoundsException extend IndexOutOfBoundsException
* some weird formatting
* remove -experimental from HTTP Testkit/Tests
* split out shared LevelDb journal tests into new subproject (not to be
  published)
This commit is contained in:
Roland Kuhn 2016-02-15 16:53:57 +01:00
parent 10d3af1478
commit e202ea8c40
18 changed files with 288 additions and 218 deletions

View file

@ -21,21 +21,21 @@ object PersistenceMultiDocSpec {
} }
//#default-plugins //#default-plugins
val OverrideConfig = """ val OverrideConfig = s"""
//#override-config //#override-config
# Configuration entry for the custom journal plugin, see `journalPluginId`. # Configuration entry for the custom journal plugin, see `journalPluginId`.
akka.persistence.chronicle.journal { akka.persistence.chronicle.journal {
# Standard persistence extension property: provider FQCN. # Standard persistence extension property: provider FQCN.
class = "akka.persistence.chronicle.ChronicleSyncJournal" class = "akka.persistence.chronicle.ChronicleSyncJournal"
# Custom setting specific for the journal `ChronicleSyncJournal`. # Custom setting specific for the journal `ChronicleSyncJournal`.
folder = ${user.dir}/store/journal folder = $${user.dir}/store/journal
} }
# Configuration entry for the custom snapshot store plugin, see `snapshotPluginId`. # Configuration entry for the custom snapshot store plugin, see `snapshotPluginId`.
akka.persistence.chronicle.snapshot-store { akka.persistence.chronicle.snapshot-store {
# Standard persistence extension property: provider FQCN. # Standard persistence extension property: provider FQCN.
class = "akka.persistence.chronicle.ChronicleSnapshotStore" class = "akka.persistence.chronicle.ChronicleSnapshotStore"
# Custom setting specific for the snapshot store `ChronicleSnapshotStore`. # Custom setting specific for the snapshot store `ChronicleSnapshotStore`.
folder = ${user.dir}/store/snapshot folder = $${user.dir}/store/snapshot
} }
//#override-config //#override-config
""" """

View file

@ -13,7 +13,7 @@ import akka.http.javadsl.model.ws._
import akka.http.javadsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings } import akka.http.javadsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings }
import akka.japi.Pair import akka.japi.Pair
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.event.{ NoLogging, LoggingAdapterTest } import akka.event.NoLogging
import akka.http.javadsl.model._ import akka.http.javadsl.model._
import akka.http.scaladsl.TestUtils import akka.http.scaladsl.TestUtils
import akka.japi.Function import akka.japi.Function

View file

@ -2,7 +2,6 @@ import akka._
import com.typesafe.tools.mima.plugin.MimaKeys import com.typesafe.tools.mima.plugin.MimaKeys
AkkaBuild.defaultSettings AkkaBuild.defaultSettings
AkkaBuild.experimentalSettings
Formatting.formatSettings Formatting.formatSettings
OSGi.httpTestkit OSGi.httpTestkit
Dependencies.httpTestkit Dependencies.httpTestkit

View file

@ -2,7 +2,6 @@ import akka._
AkkaBuild.defaultSettings AkkaBuild.defaultSettings
AkkaBuild.dontPublishSettings AkkaBuild.dontPublishSettings
AkkaBuild.experimentalSettings
Formatting.formatSettings Formatting.formatSettings
Dependencies.httpTests Dependencies.httpTests

View file

@ -0,0 +1,8 @@
import akka._
AkkaBuild.defaultSettings
AkkaBuild.dontPublishSettings
Formatting.formatSettings
Dependencies.persistenceShared
fork in Test := true

View file

@ -100,23 +100,18 @@ public class FlowTest extends StreamTest {
} }
@Test @Test
public void mustBeAbleToUseStatefullMaponcat() throws Exception { public void mustBeAbleToUseStatefulMaponcat() throws Exception {
final JavaTestKit probe = new JavaTestKit(system); final JavaTestKit probe = new JavaTestKit(system);
final java.lang.Iterable<Integer> input = Arrays.asList(1, 2, 3, 4, 5); final java.lang.Iterable<Integer> input = Arrays.asList(1, 2, 3, 4, 5);
final Source<Integer, NotUsed> ints = Source.from(input); final Source<Integer, NotUsed> ints = Source.from(input);
final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).statefulMapConcat( final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).statefulMapConcat(
new Creator<Function<Integer, Iterable<Integer>>>() { () -> {
public Function<Integer, Iterable<Integer>> create() { int[] state = new int[] {0};
int[] state = new int[1]; return (elem) -> {
state[0] = 0; List<Integer> list = new ArrayList<>(Collections.nCopies(state[0], elem));
return new Function<Integer, Iterable<Integer>>() { state[0] = elem;
public List<Integer> apply(Integer elem) { return list;
List<Integer> list = new ArrayList<>(Collections.nCopies(state[0], elem)); };
state[0] = elem;
return list;
}
};
}
}); });
ints.via(flow) ints.via(flow)

View file

@ -501,6 +501,68 @@ public class SourceTest extends StreamTest {
probe.expectMsgEquals(2); probe.expectMsgEquals(2);
} }
@Test
public void mustBeAbleToUseStatefulMaponcat() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final java.lang.Iterable<Integer> input = Arrays.asList(1, 2, 3, 4, 5);
final Source<Integer, NotUsed> ints = Source.from(input).statefulMapConcat(
() -> {
int[] state = new int[] {0};
return (elem) -> {
List<Integer> list = new ArrayList<>(Collections.nCopies(state[0], elem));
state[0] = elem;
return list;
};
});
ints
.runFold("", (acc, elem) -> acc + elem, materializer)
.thenAccept(elem -> probe.getRef().tell(elem, ActorRef.noSender()));
probe.expectMsgEquals("2334445555");
}
@Test
public void mustBeAbleToUseIntersperse() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Source<String, NotUsed> source = Source.from(Arrays.asList("0", "1", "2", "3"))
.intersperse("[", ",", "]");
final CompletionStage<Done> future =
source.runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer);
probe.expectMsgEquals("[");
probe.expectMsgEquals("0");
probe.expectMsgEquals(",");
probe.expectMsgEquals("1");
probe.expectMsgEquals(",");
probe.expectMsgEquals("2");
probe.expectMsgEquals(",");
probe.expectMsgEquals("3");
probe.expectMsgEquals("]");
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
}
@Test
public void mustBeAbleToUseIntersperseAndConcat() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Source<String, NotUsed> source = Source.from(Arrays.asList("0", "1", "2", "3"))
.intersperse(",");
final CompletionStage<Done> future =
Source.single(">> ").concat(source).runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer);
probe.expectMsgEquals(">> ");
probe.expectMsgEquals("0");
probe.expectMsgEquals(",");
probe.expectMsgEquals("1");
probe.expectMsgEquals(",");
probe.expectMsgEquals("2");
probe.expectMsgEquals(",");
probe.expectMsgEquals("3");
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
}
@Test @Test
public void mustBeAbleToUseDropWhile() throws Exception { public void mustBeAbleToUseDropWhile() throws Exception {
final JavaTestKit probe = new JavaTestKit(system); final JavaTestKit probe = new JavaTestKit(system);

View file

@ -362,12 +362,12 @@ object StreamLayout {
} }
final case class CompositeModule( final case class CompositeModule(
override val subModules: Set[Module], override val subModules: Set[Module],
override val shape: Shape, override val shape: Shape,
override val downstreams: Map[OutPort, InPort], override val downstreams: Map[OutPort, InPort],
override val upstreams: Map[InPort, OutPort], override val upstreams: Map[InPort, OutPort],
override val materializedValueComputation: MaterializedValueNode, override val materializedValueComputation: MaterializedValueNode,
override val attributes: Attributes) extends Module { override val attributes: Attributes) extends Module {
override def replaceShape(s: Shape): Module = { override def replaceShape(s: Shape): Module = {
shape.requireSamePortsAs(s) shape.requireSamePortsAs(s)
@ -392,13 +392,13 @@ object StreamLayout {
} }
final case class FusedModule( final case class FusedModule(
override val subModules: Set[Module], override val subModules: Set[Module],
override val shape: Shape, override val shape: Shape,
override val downstreams: Map[OutPort, InPort], override val downstreams: Map[OutPort, InPort],
override val upstreams: Map[InPort, OutPort], override val upstreams: Map[InPort, OutPort],
override val materializedValueComputation: MaterializedValueNode, override val materializedValueComputation: MaterializedValueNode,
override val attributes: Attributes, override val attributes: Attributes,
info: Fusing.StructuralInfo) extends Module { info: Fusing.StructuralInfo) extends Module {
override def isFused: Boolean = true override def isFused: Boolean = true

View file

@ -289,34 +289,34 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
new Flow(delegate.mapConcat { elem Util.immutableSeq(f(elem)) }) new Flow(delegate.mapConcat { elem Util.immutableSeq(f(elem)) })
/** /**
* Transform each input element into an `Iterable` of output elements that is * Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful, * then flattened into the output stream. The transformation is meant to be stateful,
* which is enabled by creating the transformation function anew for every materialization * which is enabled by creating the transformation function anew for every materialization
* the returned function will typically close over mutable objects to store state between * the returned function will typically close over mutable objects to store state between
* invocations. For the stateless variant see [[#mapConcat]]. * invocations. For the stateless variant see [[#mapConcat]].
* *
* Make sure that the `Iterable` is immutable or at least not modified after * Make sure that the `Iterable` is immutable or at least not modified after
* being used as an output sequence. Otherwise the stream may fail with * being used as an output sequence. Otherwise the stream may fail with
* `ConcurrentModificationException` or other more subtle errors may occur. * `ConcurrentModificationException` or other more subtle errors may occur.
* *
* The returned `Iterable` MUST NOT contain `null` values, * The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification. * as they are illegal as stream elements - according to the Reactive Streams specification.
* *
* '''Emits when''' the mapping function returns an element or there are still remaining elements * '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection * from the previously calculated collection
* *
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the * '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection * previously calculated collection
* *
* '''Completes when''' upstream completes and all remaining elements has been emitted * '''Completes when''' upstream completes and all remaining elements has been emitted
* *
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
*/ */
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]):javadsl.Flow[In, T, Mat] = def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.statefulMapConcat{ () { new Flow(delegate.statefulMapConcat { ()
val fun = f.create() val fun = f.create()
elem Util.immutableSeq(fun(elem)) elem Util.immutableSeq(fun(elem))
}}) })
/** /**
* Transform this stream by applying the given function to each of the elements * Transform this stream by applying the given function to each of the elements
@ -762,23 +762,23 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
new Flow(delegate.recover(pf)) new Flow(delegate.recover(pf))
/** /**
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized. * Source may be materialized.
* *
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped. * This stage can recover the failure signal, but not the skipped elements, which will be dropped.
* *
* '''Emits when''' element is available from the upstream or upstream is failed and element is available * '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source * from alternative Source
* *
* '''Backpressures when''' downstream backpressures * '''Backpressures when''' downstream backpressures
* *
* '''Completes when''' upstream completes or upstream failed with exception pf can handle * '''Completes when''' upstream completes or upstream failed with exception pf can handle
* *
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
* *
*/ */
def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Flow[In, T, Mat @uncheckedVariance] = def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Flow[In, T, Mat @uncheckedVariance] =
new Flow(delegate.recoverWith(pf)) new Flow(delegate.recoverWith(pf))

View file

@ -735,23 +735,23 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
new Source(delegate.recover(pf)) new Source(delegate.recover(pf))
/** /**
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized. * Source may be materialized.
* *
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped. * This stage can recover the failure signal, but not the skipped elements, which will be dropped.
* *
* '''Emits when''' element is available from the upstream or upstream is failed and element is available * '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source * from alternative Source
* *
* '''Backpressures when''' downstream backpressures * '''Backpressures when''' downstream backpressures
* *
* '''Completes when''' upstream completes or upstream failed with exception pf can handle * '''Completes when''' upstream completes or upstream failed with exception pf can handle
* *
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
* *
*/ */
def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): Source[T, Mat @uncheckedVariance] = def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): Source[T, Mat @uncheckedVariance] =
new Source(delegate.recoverWith(pf)) new Source(delegate.recoverWith(pf))
@ -777,38 +777,37 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
*/ */
def mapConcat[T](f: function.Function[Out, _ <: java.lang.Iterable[T]]): javadsl.Source[T, Mat] = def mapConcat[T](f: function.Function[Out, _ <: java.lang.Iterable[T]]): javadsl.Source[T, Mat] =
new Source(delegate.statefulMapConcat(() elem Util.immutableSeq(f.apply(elem)))) new Source(delegate.mapConcat(elem Util.immutableSeq(f.apply(elem))))
/** /**
* Transform each input element into an `Iterable` of output elements that is * Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful, * then flattened into the output stream. The transformation is meant to be stateful,
* which is enabled by creating the transformation function anew for every materialization * which is enabled by creating the transformation function anew for every materialization
* the returned function will typically close over mutable objects to store state between * the returned function will typically close over mutable objects to store state between
* invocations. For the stateless variant see [[#mapConcat]]. * invocations. For the stateless variant see [[#mapConcat]].
* *
* Make sure that the `Iterable` is immutable or at least not modified after * Make sure that the `Iterable` is immutable or at least not modified after
* being used as an output sequence. Otherwise the stream may fail with * being used as an output sequence. Otherwise the stream may fail with
* `ConcurrentModificationException` or other more subtle errors may occur. * `ConcurrentModificationException` or other more subtle errors may occur.
* *
* The returned `Iterable` MUST NOT contain `null` values, * The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification. * as they are illegal as stream elements - according to the Reactive Streams specification.
* *
* '''Emits when''' the mapping function returns an element or there are still remaining elements * '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection * from the previously calculated collection
* *
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the * '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection * previously calculated collection
* *
* '''Completes when''' upstream completes and all remaining elements has been emitted * '''Completes when''' upstream completes and all remaining elements has been emitted
* *
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
*/ */
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): javadsl.Source[T, Mat] = def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): javadsl.Source[T, Mat] =
new Source(delegate.statefulMapConcat{ () { new Source(delegate.statefulMapConcat { ()
val fun = f.create() val fun = f.create()
elem Util.immutableSeq(fun(elem)) elem Util.immutableSeq(fun(elem))
}}) })
/** /**
* Transform this stream by applying the given function to each of the elements * Transform this stream by applying the given function to each of the elements

View file

@ -140,38 +140,37 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
*/ */
def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): SubFlow[In, T, Mat] = def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.statefulMapConcat { () elem Util.immutableSeq(f(elem)) }) new SubFlow(delegate.mapConcat { elem Util.immutableSeq(f(elem)) })
/** /**
* Transform each input element into an `Iterable` of output elements that is * Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful, * then flattened into the output stream. The transformation is meant to be stateful,
* which is enabled by creating the transformation function anew for every materialization * which is enabled by creating the transformation function anew for every materialization
* the returned function will typically close over mutable objects to store state between * the returned function will typically close over mutable objects to store state between
* invocations. For the stateless variant see [[#mapConcat]]. * invocations. For the stateless variant see [[#mapConcat]].
* *
* Make sure that the `Iterable` is immutable or at least not modified after * Make sure that the `Iterable` is immutable or at least not modified after
* being used as an output sequence. Otherwise the stream may fail with * being used as an output sequence. Otherwise the stream may fail with
* `ConcurrentModificationException` or other more subtle errors may occur. * `ConcurrentModificationException` or other more subtle errors may occur.
* *
* The returned `Iterable` MUST NOT contain `null` values, * The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification. * as they are illegal as stream elements - according to the Reactive Streams specification.
* *
* '''Emits when''' the mapping function returns an element or there are still remaining elements * '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection * from the previously calculated collection
* *
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the * '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection * previously calculated collection
* *
* '''Completes when''' upstream completes and all remaining elements has been emitted * '''Completes when''' upstream completes and all remaining elements has been emitted
* *
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
*/ */
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubFlow[In, T, Mat] = def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.statefulMapConcat{ () { new SubFlow(delegate.statefulMapConcat { ()
val fun = f.create() val fun = f.create()
elem Util.immutableSeq(fun(elem)) elem Util.immutableSeq(fun(elem))
}}) })
/** /**
* Transform this stream by applying the given function to each of the elements * Transform this stream by applying the given function to each of the elements
@ -615,23 +614,23 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
new SubFlow(delegate.recover(pf)) new SubFlow(delegate.recover(pf))
/** /**
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized. * Source may be materialized.
* *
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped. * This stage can recover the failure signal, but not the skipped elements, which will be dropped.
* *
* '''Emits when''' element is available from the upstream or upstream is failed and element is available * '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source * from alternative Source
* *
* '''Backpressures when''' downstream backpressures * '''Backpressures when''' downstream backpressures
* *
* '''Completes when''' upstream completes or upstream failed with exception pf can handle * '''Completes when''' upstream completes or upstream failed with exception pf can handle
* *
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
* *
*/ */
def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubFlow[In, T, Mat @uncheckedVariance] = def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubFlow[In, T, Mat @uncheckedVariance] =
new SubFlow(delegate.recoverWith(pf)) new SubFlow(delegate.recoverWith(pf))

View file

@ -138,37 +138,37 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
*/ */
def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): SubSource[T, Mat] = def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): SubSource[T, Mat] =
new SubSource(delegate.statefulMapConcat { () elem Util.immutableSeq(f(elem)) }) new SubSource(delegate.mapConcat { elem Util.immutableSeq(f(elem)) })
/** /**
* Transform each input element into an `Iterable` of output elements that is * Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful, * then flattened into the output stream. The transformation is meant to be stateful,
* which is enabled by creating the transformation function anew for every materialization * which is enabled by creating the transformation function anew for every materialization
* the returned function will typically close over mutable objects to store state between * the returned function will typically close over mutable objects to store state between
* invocations. For the stateless variant see [[#mapConcat]]. * invocations. For the stateless variant see [[#mapConcat]].
* *
* Make sure that the `Iterable` is immutable or at least not modified after * Make sure that the `Iterable` is immutable or at least not modified after
* being used as an output sequence. Otherwise the stream may fail with * being used as an output sequence. Otherwise the stream may fail with
* `ConcurrentModificationException` or other more subtle errors may occur. * `ConcurrentModificationException` or other more subtle errors may occur.
* *
* The returned `Iterable` MUST NOT contain `null` values, * The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification. * as they are illegal as stream elements - according to the Reactive Streams specification.
* *
* '''Emits when''' the mapping function returns an element or there are still remaining elements * '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection * from the previously calculated collection
* *
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the * '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection * previously calculated collection
* *
* '''Completes when''' upstream completes and all remaining elements has been emitted * '''Completes when''' upstream completes and all remaining elements has been emitted
* *
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
*/ */
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]):SubSource[T, Mat] = def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubSource[T, Mat] =
new SubSource(delegate.statefulMapConcat{ () { new SubSource(delegate.statefulMapConcat { ()
val fun = f.create() val fun = f.create()
elem Util.immutableSeq(fun(elem)) elem Util.immutableSeq(fun(elem))
}}) })
/** /**
* Transform this stream by applying the given function to each of the elements * Transform this stream by applying the given function to each of the elements
@ -612,23 +612,23 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
new SubSource(delegate.recover(pf)) new SubSource(delegate.recover(pf))
/** /**
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized. * Source may be materialized.
* *
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped. * This stage can recover the failure signal, but not the skipped elements, which will be dropped.
* *
* '''Emits when''' element is available from the upstream or upstream is failed and element is available * '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source * from alternative Source
* *
* '''Backpressures when''' downstream backpressures * '''Backpressures when''' downstream backpressures
* *
* '''Completes when''' upstream completes or upstream failed with exception pf can handle * '''Completes when''' upstream completes or upstream failed with exception pf can handle
* *
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
* *
*/ */
def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubSource[T, Mat @uncheckedVariance] = def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubSource[T, Mat @uncheckedVariance] =
new SubSource(delegate.recoverWith(pf)) new SubSource(delegate.recoverWith(pf))

View file

@ -471,7 +471,7 @@ final class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) ext
object Partition { object Partition {
case class PartitionOutOfBoundsException(msg:String) extends RuntimeException(msg) with NoStackTrace case class PartitionOutOfBoundsException(msg: String) extends IndexOutOfBoundsException(msg) with NoStackTrace
/** /**
* Create a new `Partition` stage with the specified input type. * Create a new `Partition` stage with the specified input type.
@ -1009,7 +1009,7 @@ object GraphDSL extends GraphApply {
} }
private class PortOpsImpl[+Out](override val outlet: Outlet[Out @uncheckedVariance], b: Builder[_]) private class PortOpsImpl[+Out](override val outlet: Outlet[Out @uncheckedVariance], b: Builder[_])
extends PortOps[Out] { extends PortOps[Out] {
override def withAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported override def withAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported
override def addAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported override def addAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported
@ -1019,7 +1019,6 @@ object GraphDSL extends GraphApply {
private def settingAttrNotSupported = private def settingAttrNotSupported =
new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port") new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port")
override def importAndGetPort(b: Builder[_]): Outlet[Out @uncheckedVariance] = outlet override def importAndGetPort(b: Builder[_]): Outlet[Out @uncheckedVariance] = outlet
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] =

View file

@ -55,7 +55,8 @@ object AkkaBuild extends Build {
), ),
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel,
cluster, clusterMetrics, clusterTools, clusterSharding, distributedData, cluster, clusterMetrics, clusterTools, clusterSharding, distributedData,
slf4j, agent, persistence, persistenceQuery, persistenceTck, kernel, osgi, docs, contrib, samples, multiNodeTestkit, benchJmh, typed, protobuf, slf4j, agent, persistence, persistenceQuery, persistenceTck, persistenceShared,
kernel, osgi, docs, contrib, samples, multiNodeTestkit, benchJmh, typed, protobuf,
stream, streamTestkit, streamTests, streamTestsTck, parsing, stream, streamTestkit, streamTests, streamTestsTck, parsing,
httpCore, http, httpSprayJson, httpXml, httpJackson, httpTests, httpTestkit httpCore, http, httpSprayJson, httpXml, httpJackson, httpTests, httpTestkit
) )
@ -68,7 +69,8 @@ object AkkaBuild extends Build {
// samples don't work with dbuild right now // samples don't work with dbuild right now
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel,
cluster, clusterMetrics, clusterTools, clusterSharding, distributedData, cluster, clusterMetrics, clusterTools, clusterSharding, distributedData,
slf4j, persistence, persistenceQuery, persistenceTck, kernel, osgi, contrib, multiNodeTestkit, benchJmh, typed, protobuf, slf4j, persistence, persistenceQuery, persistenceTck, persistenceShared,
kernel, osgi, contrib, multiNodeTestkit, benchJmh, typed, protobuf,
stream, streamTestkit, streamTests, streamTestsTck, parsing, stream, streamTestkit, streamTests, streamTestsTck, parsing,
httpCore, http, httpSprayJson, httpXml, httpJackson, httpTests, httpTestkit httpCore, http, httpSprayJson, httpXml, httpJackson, httpTests, httpTestkit
) )
@ -181,7 +183,7 @@ object AkkaBuild extends Build {
lazy val persistence = Project( lazy val persistence = Project(
id = "akka-persistence", id = "akka-persistence",
base = file("akka-persistence"), base = file("akka-persistence"),
dependencies = Seq(actor, remote % "test->test", testkit % "test->test", protobuf) dependencies = Seq(actor, testkit % "test->test", protobuf)
) )
lazy val persistenceQuery = Project( lazy val persistenceQuery = Project(
@ -200,6 +202,12 @@ object AkkaBuild extends Build {
dependencies = Seq(persistence % "compile;provided->provided;test->test", testkit % "compile;test->test") dependencies = Seq(persistence % "compile;provided->provided;test->test", testkit % "compile;test->test")
) )
lazy val persistenceShared = Project(
id = "akka-persistence-shared",
base = file("akka-persistence-shared"),
dependencies = Seq(persistence % "test->test", testkit % "test->test", remote % "test", protobuf)
)
lazy val httpCore = Project( lazy val httpCore = Project(
id = "akka-http-core", id = "akka-http-core",
base = file("akka-http-core"), base = file("akka-http-core"),
@ -213,13 +221,13 @@ object AkkaBuild extends Build {
) )
lazy val httpTestkit = Project( lazy val httpTestkit = Project(
id = "akka-http-testkit-experimental", id = "akka-http-testkit",
base = file("akka-http-testkit"), base = file("akka-http-testkit"),
dependencies = Seq(http, streamTestkit) dependencies = Seq(http, streamTestkit)
) )
lazy val httpTests = Project( lazy val httpTests = Project(
id = "akka-http-tests-experimental", id = "akka-http-tests",
base = file("akka-http-tests"), base = file("akka-http-tests"),
dependencies = Seq(httpTestkit % "test", httpSprayJson, httpXml, httpJackson) dependencies = Seq(httpTestkit % "test", httpSprayJson, httpXml, httpJackson)
) )
@ -272,8 +280,8 @@ object AkkaBuild extends Build {
lazy val streamTestkit = Project( lazy val streamTestkit = Project(
id = "akka-stream-testkit", id = "akka-stream-testkit",
base = file("akka-stream-testkit"), // TODO that persistence dependency base = file("akka-stream-testkit"),
dependencies = Seq(stream, persistence % "compile;provided->provided;test->test", testkit % "compile;test->test") dependencies = Seq(stream, testkit % "compile;test->test")
) )
lazy val streamTests = Project( lazy val streamTests = Project(

View file

@ -140,12 +140,14 @@ object Dependencies {
val agent = l ++= Seq(scalaStm.value, Test.scalatest.value, Test.junit) val agent = l ++= Seq(scalaStm.value, Test.scalatest.value, Test.junit)
val persistence = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Test.scalatest.value, Test.junit, Test.commonsIo, Test.scalaXml) val persistence = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Test.scalatest.value, Test.junit, Test.commonsIo, Test.commonsCodec, Test.scalaXml)
val persistenceQuery = l ++= Seq(Test.scalatest.value, Test.junit, Test.commonsIo) val persistenceQuery = l ++= Seq(Test.scalatest.value, Test.junit, Test.commonsIo)
val persistenceTck = l ++= Seq(Test.scalatest.value.copy(configurations = Some("compile")), Test.junit.copy(configurations = Some("compile"))) val persistenceTck = l ++= Seq(Test.scalatest.value.copy(configurations = Some("compile")), Test.junit.copy(configurations = Some("compile")))
val persistenceShared = l ++= Seq(Provided.levelDB, Provided.levelDBNative)
val kernel = l ++= Seq(Test.scalatest.value, Test.junit) val kernel = l ++= Seq(Test.scalatest.value, Test.junit)
val camel = l ++= Seq(camelCore, Test.scalatest.value, Test.junit, Test.mockito, Test.logback, Test.commonsIo, Test.junitIntf) val camel = l ++= Seq(camelCore, Test.scalatest.value, Test.junit, Test.mockito, Test.logback, Test.commonsIo, Test.junitIntf)