diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java index 03d1f219a8..5fac956e12 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java @@ -257,7 +257,7 @@ public class BidiFlowTest extends StreamTest { } }); final Pair, Future>, Future>> result = - left.join(bidiMat, Keep., Future> both()).join(right, Keep., Future>, Future>> both()).run(materializer); + left.joinMat(bidiMat, Keep., Future> both()).joinMat(right, Keep., Future>, Future>> both()).run(materializer); final Future l = result.first().first(); final Future m = result.first().second(); final Future> r = result.second(); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java index 5ec4a5b6c9..cc226ebe77 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java @@ -31,14 +31,14 @@ public class TcpTest extends StreamTest { @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("TcpTest", AkkaSpec.testConf()); - + final Sink> echoHandler = Sink.foreach(new Procedure() { public void apply(IncomingConnection conn) { conn.handleWith(Flow.empty(), materializer); } }); - + final List testInput = new ArrayList(); { for (char c = 'a'; c <= 'z'; c++) { @@ -51,34 +51,34 @@ public class TcpTest extends StreamTest { final InetSocketAddress serverAddress = TestUtils.temporaryServerAddress("127.0.0.1", false); final Source> binding = Tcp.get(system) .bind(serverAddress.getHostName(), serverAddress.getPort()); // TODO getHostString in Java7 - + final Future future = binding.to(echoHandler).run(materializer); final ServerBinding b = Await.result(future, FiniteDuration.create(5, TimeUnit.SECONDS)); assertEquals(b.localAddress().getPort(), serverAddress.getPort()); - + final Future resultFuture = Source .from(testInput) // TODO getHostString in Java7 - .via(Tcp.get(system).outgoingConnection(serverAddress.getHostName(), serverAddress.getPort())) + .via(Tcp.get(system).outgoingConnection(serverAddress.getHostName(), serverAddress.getPort())) .runFold(ByteString.empty(), new Function2() { public ByteString apply(ByteString acc, ByteString elem) { return acc.concat(elem); } }, materializer); - + final byte[] result = Await.result(resultFuture, FiniteDuration.create(5, TimeUnit.SECONDS)).toArray(); for (int i = 0; i < testInput.size(); i ++) { - assertEquals(testInput.get(i).head(), result[i]); + assertEquals(testInput.get(i).head(), result[i]); } } - + @Test public void mustReportServerBindFailure() throws Exception { final InetSocketAddress serverAddress = TestUtils.temporaryServerAddress("127.0.0.1", false); final Source> binding = Tcp.get(system) .bind(serverAddress.getHostName(), serverAddress.getPort()); // TODO getHostString in Java7 - + final Future future = binding.to(echoHandler).run(materializer); final ServerBinding b = Await.result(future, FiniteDuration.create(5, TimeUnit.SECONDS)); assertEquals(b.localAddress().getPort(), serverAddress.getPort()); @@ -90,7 +90,7 @@ public class TcpTest extends StreamTest { // expected } } - + @Test public void mustReportClientConnectFailure() throws Exception { final InetSocketAddress serverAddress = TestUtils.temporaryServerAddress( @@ -99,7 +99,7 @@ public class TcpTest extends StreamTest { Await.result( Source.from(testInput) // TODO getHostString in Java7 - .via(Tcp.get(system).outgoingConnection(serverAddress.getHostName(), serverAddress.getPort()), + .viaMat(Tcp.get(system).outgoingConnection(serverAddress.getHostName(), serverAddress.getPort()), Keep.> right()) .to(Sink. ignore()) .run(materializer), diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index 2d27175a57..c6f2170264 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -24,7 +24,8 @@ class DslConsistencySpec extends WordSpec with Matchers { val ignore = Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++ - Set("create", "apply", "ops", "appendJava", "andThen", "withAttributes") ++ + Set("productArity", "canEqual", "productPrefix", "copy", "productIterator", "productElement") ++ + Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++ Set("asScala", "asJava") val allowMissing: Map[Class[_], Set[String]] = Map( @@ -58,7 +59,6 @@ class DslConsistencySpec extends WordSpec with Matchers { case (element, classes) ⇒ s"provide same $element transforming operators" in { - pending val allOps = (for { c ← classes @@ -74,7 +74,6 @@ class DslConsistencySpec extends WordSpec with Matchers { } s"provide same $element materializing operators" in { - pending val materializingOps = (for { c ← classes diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 8ef7531f37..eb25d765c7 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -79,7 +79,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph /** * Transform this [[Flow]] by appending the given processing steps. */ - def via[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = + def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = new Flow(delegate.viaMat(flow)(combinerToScala(combine))) /** @@ -91,7 +91,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph /** * Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both. */ - def to[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.Sink[In, M2] = + def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.Sink[In, M2] = new Sink(delegate.toMat(sink)(combinerToScala(combine))) /** @@ -103,7 +103,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph /** * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]] */ - def join[M, M2](flow: Graph[FlowShape[Out, In], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = + def joinMat[M, M2](flow: Graph[FlowShape[Out, In], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = new RunnableFlowAdapter(delegate.joinMat(flow)(combinerToScala(combine))) /** @@ -142,7 +142,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * The `combine` function is used to compose the materialized values of this flow and that * [[BidiFlow]] into the materialized value of the resulting [[Flow]]. */ - def join[I2, O2, Mat2, M](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2], combine: function.Function2[Mat, Mat2, M]): Flow[I2, O2, M] = + def joinMat[I2, O2, Mat2, M](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2], combine: function.Function2[Mat, Mat2, M]): Flow[I2, O2, M] = new Flow(delegate.joinMat(bidi)(combinerToScala(combine))) /** @@ -680,11 +680,27 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph new Flow(delegate.flatten(strategy)) /** - * Returns a new `Flow` that concatenates a secondary `Source` to this flow so that, - * the first element emitted by the given ("second") source is emitted after the last element of this Flow. + * Concatenate the given [[Source]] to this [[Flow]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. Note that the Source is materialized + * together with this Flow and just kept from producing elements by asserting + * back-pressure until its time comes. + * + * The resulting Flow’s materialized value is a Pair containing both materialized + * values (of this Flow and that Source). */ - def concat[M](second: Graph[SourceShape[Out @uncheckedVariance], M]): javadsl.Flow[In, Out, Mat @uncheckedVariance Pair M] = - new Flow(delegate.concat(second).mapMaterializedValue(p ⇒ Pair(p._1, p._2))) + def concat[M](source: Graph[SourceShape[Out @uncheckedVariance], M]): javadsl.Flow[In, Out, Mat @uncheckedVariance Pair M] = + new Flow(delegate.concat(source).mapMaterializedValue(p ⇒ Pair(p._1, p._2))) + + /** + * Concatenate the given [[Source]] to this [[Flow]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. Note that the Source is materialized + * together with this Flow and just kept from producing elements by asserting + * back-pressure until its time comes. + */ + def concatMat[M, M2](source: Graph[SourceShape[Out @uncheckedVariance], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] = + new Flow(delegate.concatMat(source)(combinerToScala(combine))) override def withAttributes(attr: OperationAttributes): javadsl.Flow[In, Out, Mat] = new Flow(delegate.withAttributes(attr)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 9c600c47fa..d30a020790 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -254,7 +254,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour /** * Transform this [[Source]] by appending the given processing stages. */ - def via[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = + def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = new Source(delegate.viaMat(flow)(combinerToScala(combine))) /** @@ -266,7 +266,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour /** * Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both. */ - def to[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = + def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = new RunnableFlowAdapter(delegate.toMat(sink)(combinerToScala(combine))) /** @@ -292,9 +292,17 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * emitted by that source is emitted after the last element of this * source. */ - def concat[Out2 >: Out, M2](second: Graph[SourceShape[Out2], M2]): Source[Out2, (Mat, M2)] = + def concat[Out2 >: Out, M2](second: Graph[SourceShape[Out2], M2]): javadsl.Source[Out2, (Mat, M2)] = Source.concat(this, second) + /** + * Concatenates a second source so that the first element + * emitted by that source is emitted after the last element of this + * source. + */ + def concatMat[M, M2](second: Graph[SourceShape[Out @uncheckedVariance], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] = + new Source(delegate.concatMat(second)(combinerToScala(combine))) + /** * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked * for each received element.