Merge branch 'wip-fixups-RK'

This commit is contained in:
Roland Kuhn 2016-02-16 10:19:38 +01:00
commit 2b878be1c1
18 changed files with 288 additions and 218 deletions

View file

@ -21,21 +21,21 @@ object PersistenceMultiDocSpec {
}
//#default-plugins
val OverrideConfig = """
val OverrideConfig = s"""
//#override-config
# Configuration entry for the custom journal plugin, see `journalPluginId`.
akka.persistence.chronicle.journal {
# Standard persistence extension property: provider FQCN.
class = "akka.persistence.chronicle.ChronicleSyncJournal"
# Custom setting specific for the journal `ChronicleSyncJournal`.
folder = ${user.dir}/store/journal
folder = $${user.dir}/store/journal
}
# Configuration entry for the custom snapshot store plugin, see `snapshotPluginId`.
akka.persistence.chronicle.snapshot-store {
# Standard persistence extension property: provider FQCN.
class = "akka.persistence.chronicle.ChronicleSnapshotStore"
# Custom setting specific for the snapshot store `ChronicleSnapshotStore`.
folder = ${user.dir}/store/snapshot
folder = $${user.dir}/store/snapshot
}
//#override-config
"""

View file

@ -13,7 +13,7 @@ import akka.http.javadsl.model.ws._
import akka.http.javadsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings }
import akka.japi.Pair
import akka.actor.ActorSystem
import akka.event.{ NoLogging, LoggingAdapterTest }
import akka.event.NoLogging
import akka.http.javadsl.model._
import akka.http.scaladsl.TestUtils
import akka.japi.Function

View file

@ -2,7 +2,6 @@ import akka._
import com.typesafe.tools.mima.plugin.MimaKeys
AkkaBuild.defaultSettings
AkkaBuild.experimentalSettings
Formatting.formatSettings
OSGi.httpTestkit
Dependencies.httpTestkit

View file

@ -2,7 +2,6 @@ import akka._
AkkaBuild.defaultSettings
AkkaBuild.dontPublishSettings
AkkaBuild.experimentalSettings
Formatting.formatSettings
Dependencies.httpTests

View file

@ -0,0 +1,8 @@
import akka._
AkkaBuild.defaultSettings
AkkaBuild.dontPublishSettings
Formatting.formatSettings
Dependencies.persistenceShared
fork in Test := true

View file

@ -100,23 +100,18 @@ public class FlowTest extends StreamTest {
}
@Test
public void mustBeAbleToUseStatefullMaponcat() throws Exception {
public void mustBeAbleToUseStatefulMaponcat() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final java.lang.Iterable<Integer> input = Arrays.asList(1, 2, 3, 4, 5);
final Source<Integer, NotUsed> ints = Source.from(input);
final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).statefulMapConcat(
new Creator<Function<Integer, Iterable<Integer>>>() {
public Function<Integer, Iterable<Integer>> create() {
int[] state = new int[1];
state[0] = 0;
return new Function<Integer, Iterable<Integer>>() {
public List<Integer> apply(Integer elem) {
() -> {
int[] state = new int[] {0};
return (elem) -> {
List<Integer> list = new ArrayList<>(Collections.nCopies(state[0], elem));
state[0] = elem;
return list;
}
};
}
});
ints.via(flow)

View file

@ -501,6 +501,68 @@ public class SourceTest extends StreamTest {
probe.expectMsgEquals(2);
}
@Test
public void mustBeAbleToUseStatefulMaponcat() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final java.lang.Iterable<Integer> input = Arrays.asList(1, 2, 3, 4, 5);
final Source<Integer, NotUsed> ints = Source.from(input).statefulMapConcat(
() -> {
int[] state = new int[] {0};
return (elem) -> {
List<Integer> list = new ArrayList<>(Collections.nCopies(state[0], elem));
state[0] = elem;
return list;
};
});
ints
.runFold("", (acc, elem) -> acc + elem, materializer)
.thenAccept(elem -> probe.getRef().tell(elem, ActorRef.noSender()));
probe.expectMsgEquals("2334445555");
}
@Test
public void mustBeAbleToUseIntersperse() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Source<String, NotUsed> source = Source.from(Arrays.asList("0", "1", "2", "3"))
.intersperse("[", ",", "]");
final CompletionStage<Done> future =
source.runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer);
probe.expectMsgEquals("[");
probe.expectMsgEquals("0");
probe.expectMsgEquals(",");
probe.expectMsgEquals("1");
probe.expectMsgEquals(",");
probe.expectMsgEquals("2");
probe.expectMsgEquals(",");
probe.expectMsgEquals("3");
probe.expectMsgEquals("]");
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
}
@Test
public void mustBeAbleToUseIntersperseAndConcat() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Source<String, NotUsed> source = Source.from(Arrays.asList("0", "1", "2", "3"))
.intersperse(",");
final CompletionStage<Done> future =
Source.single(">> ").concat(source).runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer);
probe.expectMsgEquals(">> ");
probe.expectMsgEquals("0");
probe.expectMsgEquals(",");
probe.expectMsgEquals("1");
probe.expectMsgEquals(",");
probe.expectMsgEquals("2");
probe.expectMsgEquals(",");
probe.expectMsgEquals("3");
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
}
@Test
public void mustBeAbleToUseDropWhile() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);

View file

@ -313,10 +313,10 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* '''Cancels when''' downstream cancels
*/
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.statefulMapConcat{ () {
new Flow(delegate.statefulMapConcat { ()
val fun = f.create()
elem Util.immutableSeq(fun(elem))
}})
})
/**
* Transform this stream by applying the given function to each of the elements

View file

@ -777,8 +777,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* '''Cancels when''' downstream cancels
*/
def mapConcat[T](f: function.Function[Out, _ <: java.lang.Iterable[T]]): javadsl.Source[T, Mat] =
new Source(delegate.statefulMapConcat(() elem Util.immutableSeq(f.apply(elem))))
new Source(delegate.mapConcat(elem Util.immutableSeq(f.apply(elem))))
/**
* Transform each input element into an `Iterable` of output elements that is
@ -805,10 +804,10 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* '''Cancels when''' downstream cancels
*/
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): javadsl.Source[T, Mat] =
new Source(delegate.statefulMapConcat{ () {
new Source(delegate.statefulMapConcat { ()
val fun = f.create()
elem Util.immutableSeq(fun(elem))
}})
})
/**
* Transform this stream by applying the given function to each of the elements

View file

@ -140,7 +140,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* '''Cancels when''' downstream cancels
*/
def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.statefulMapConcat { () elem Util.immutableSeq(f(elem)) })
new SubFlow(delegate.mapConcat { elem Util.immutableSeq(f(elem)) })
/**
* Transform each input element into an `Iterable` of output elements that is
@ -167,11 +167,10 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* '''Cancels when''' downstream cancels
*/
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.statefulMapConcat{ () {
new SubFlow(delegate.statefulMapConcat { ()
val fun = f.create()
elem Util.immutableSeq(fun(elem))
}})
})
/**
* Transform this stream by applying the given function to each of the elements

View file

@ -138,7 +138,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
* '''Cancels when''' downstream cancels
*/
def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): SubSource[T, Mat] =
new SubSource(delegate.statefulMapConcat { () elem Util.immutableSeq(f(elem)) })
new SubSource(delegate.mapConcat { elem Util.immutableSeq(f(elem)) })
/**
* Transform each input element into an `Iterable` of output elements that is
@ -165,10 +165,10 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
* '''Cancels when''' downstream cancels
*/
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubSource[T, Mat] =
new SubSource(delegate.statefulMapConcat{ () {
new SubSource(delegate.statefulMapConcat { ()
val fun = f.create()
elem Util.immutableSeq(fun(elem))
}})
})
/**
* Transform this stream by applying the given function to each of the elements

View file

@ -471,7 +471,7 @@ final class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) ext
object Partition {
case class PartitionOutOfBoundsException(msg:String) extends RuntimeException(msg) with NoStackTrace
case class PartitionOutOfBoundsException(msg: String) extends IndexOutOfBoundsException(msg) with NoStackTrace
/**
* Create a new `Partition` stage with the specified input type.
@ -1019,7 +1019,6 @@ object GraphDSL extends GraphApply {
private def settingAttrNotSupported =
new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port")
override def importAndGetPort(b: Builder[_]): Outlet[Out @uncheckedVariance] = outlet
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] =

View file

@ -55,7 +55,8 @@ object AkkaBuild extends Build {
),
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel,
cluster, clusterMetrics, clusterTools, clusterSharding, distributedData,
slf4j, agent, persistence, persistenceQuery, persistenceTck, kernel, osgi, docs, contrib, samples, multiNodeTestkit, benchJmh, typed, protobuf,
slf4j, agent, persistence, persistenceQuery, persistenceTck, persistenceShared,
kernel, osgi, docs, contrib, samples, multiNodeTestkit, benchJmh, typed, protobuf,
stream, streamTestkit, streamTests, streamTestsTck, parsing,
httpCore, http, httpSprayJson, httpXml, httpJackson, httpTests, httpTestkit
)
@ -68,7 +69,8 @@ object AkkaBuild extends Build {
// samples don't work with dbuild right now
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel,
cluster, clusterMetrics, clusterTools, clusterSharding, distributedData,
slf4j, persistence, persistenceQuery, persistenceTck, kernel, osgi, contrib, multiNodeTestkit, benchJmh, typed, protobuf,
slf4j, persistence, persistenceQuery, persistenceTck, persistenceShared,
kernel, osgi, contrib, multiNodeTestkit, benchJmh, typed, protobuf,
stream, streamTestkit, streamTests, streamTestsTck, parsing,
httpCore, http, httpSprayJson, httpXml, httpJackson, httpTests, httpTestkit
)
@ -181,7 +183,7 @@ object AkkaBuild extends Build {
lazy val persistence = Project(
id = "akka-persistence",
base = file("akka-persistence"),
dependencies = Seq(actor, remote % "test->test", testkit % "test->test", protobuf)
dependencies = Seq(actor, testkit % "test->test", protobuf)
)
lazy val persistenceQuery = Project(
@ -200,6 +202,12 @@ object AkkaBuild extends Build {
dependencies = Seq(persistence % "compile;provided->provided;test->test", testkit % "compile;test->test")
)
lazy val persistenceShared = Project(
id = "akka-persistence-shared",
base = file("akka-persistence-shared"),
dependencies = Seq(persistence % "test->test", testkit % "test->test", remote % "test", protobuf)
)
lazy val httpCore = Project(
id = "akka-http-core",
base = file("akka-http-core"),
@ -213,13 +221,13 @@ object AkkaBuild extends Build {
)
lazy val httpTestkit = Project(
id = "akka-http-testkit-experimental",
id = "akka-http-testkit",
base = file("akka-http-testkit"),
dependencies = Seq(http, streamTestkit)
)
lazy val httpTests = Project(
id = "akka-http-tests-experimental",
id = "akka-http-tests",
base = file("akka-http-tests"),
dependencies = Seq(httpTestkit % "test", httpSprayJson, httpXml, httpJackson)
)
@ -272,8 +280,8 @@ object AkkaBuild extends Build {
lazy val streamTestkit = Project(
id = "akka-stream-testkit",
base = file("akka-stream-testkit"), // TODO that persistence dependency
dependencies = Seq(stream, persistence % "compile;provided->provided;test->test", testkit % "compile;test->test")
base = file("akka-stream-testkit"),
dependencies = Seq(stream, testkit % "compile;test->test")
)
lazy val streamTests = Project(

View file

@ -140,12 +140,14 @@ object Dependencies {
val agent = l ++= Seq(scalaStm.value, Test.scalatest.value, Test.junit)
val persistence = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Test.scalatest.value, Test.junit, Test.commonsIo, Test.scalaXml)
val persistence = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Test.scalatest.value, Test.junit, Test.commonsIo, Test.commonsCodec, Test.scalaXml)
val persistenceQuery = l ++= Seq(Test.scalatest.value, Test.junit, Test.commonsIo)
val persistenceTck = l ++= Seq(Test.scalatest.value.copy(configurations = Some("compile")), Test.junit.copy(configurations = Some("compile")))
val persistenceShared = l ++= Seq(Provided.levelDB, Provided.levelDBNative)
val kernel = l ++= Seq(Test.scalatest.value, Test.junit)
val camel = l ++= Seq(camelCore, Test.scalatest.value, Test.junit, Test.mockito, Test.logback, Test.commonsIo, Test.junitIntf)