From 65269274b14645650235c81d60c3ea56194c4589 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 6 Nov 2014 18:13:06 +0100 Subject: [PATCH] !str #16235 Rename Sink.future to Sink.head --- .../scala/akka/http/model/HttpEntity.scala | 2 +- .../scala/akka/http/ClientServerSpec.scala | 4 ++-- .../src/test/scala/akka/http/TestClient.scala | 2 +- .../engine/parsing/RequestParserSpec.scala | 4 ++-- .../engine/parsing/ResponseParserSpec.scala | 4 ++-- .../rendering/RequestRendererSpec.scala | 2 +- .../rendering/ResponseRendererSpec.scala | 2 +- .../akka/http/model/HttpEntitySpec.scala | 2 +- .../testkit/RouteTestResultComponent.scala | 2 +- .../unmarshalling/UnmarshallingSpec.scala | 2 +- .../akka/stream/javadsl/FlexiMergeTest.java | 6 +++--- .../akka/stream/javadsl/FlexiRouteTest.java | 8 ++++---- .../java/akka/stream/javadsl/FlowTest.java | 16 +++++++-------- .../java/akka/stream/javadsl/SinkTest.java | 2 +- .../scala/akka/stream/io/SslTlsFlowSpec.scala | 6 +++--- .../akka/stream/scaladsl/FlowBufferSpec.scala | 8 ++++---- .../stream/scaladsl/FlowCompileSpec.scala | 8 ++++---- .../scaladsl/FlowGraphCompileSpec.scala | 2 +- .../scaladsl/FlowPrefixAndTailSpec.scala | 20 +++++++++---------- .../akka/stream/scaladsl/FutureSinkSpec.scala | 12 +++++------ .../stream/scaladsl/GraphBalanceSpec.scala | 10 +++++----- .../stream/scaladsl/GraphBroadcastSpec.scala | 10 +++++----- .../scaladsl/GraphOpsIntegrationSpec.scala | 12 +++++------ .../scaladsl/GraphPreferredMergeSpec.scala | 4 ++-- .../SubstreamSubscriptionTimeoutSpec.scala | 2 +- .../main/scala/akka/stream/javadsl/Sink.scala | 4 ++-- .../akka/stream/scaladsl/ActorFlowSink.scala | 8 ++++---- .../scala/akka/stream/scaladsl/Sink.scala | 2 +- 28 files changed, 83 insertions(+), 83 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index b199624d59..e65337dea4 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -59,7 +59,7 @@ sealed trait HttpEntity extends japi.HttpEntity { throw new java.util.concurrent.TimeoutException( s"HttpEntity.toStrict timed out after $timeout while still waiting for outstanding data") } - dataBytes.timerTransform("toStrict", transformer).runWith(Sink.future) + dataBytes.timerTransform("toStrict", transformer).runWith(Sink.head) } /** diff --git a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala index 4f91ad896e..b46e4dd826 100644 --- a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala @@ -108,7 +108,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { private val HttpRequest(POST, uri, List(`User-Agent`(_), Host(_, _), Accept(Vector(MediaRanges.`*/*`))), Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext() uri shouldEqual Uri(s"http://$hostname:$port/chunked") - Await.result(chunkStream.grouped(4).runWith(Sink.future), 100.millis) shouldEqual chunks + Await.result(chunkStream.grouped(4).runWith(Sink.head), 100.millis) shouldEqual chunks val serverOutSub = serverOut.expectSubscription() serverOutSub.sendNext(HttpResponse(206, List(RawHeader("Age", "42")), chunkedEntity)) @@ -117,7 +117,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { clientInSub.request(1) val (HttpResponse(StatusCodes.PartialContent, List(Date(_), Server(_), RawHeader("Age", "42")), Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`), 12345678) = clientIn.expectNext() - Await.result(chunkStream2.grouped(1000).runWith(Sink.future), 100.millis) shouldEqual chunks + Await.result(chunkStream2.grouped(1000).runWith(Sink.head), 100.millis) shouldEqual chunks } } diff --git a/akka-http-core/src/test/scala/akka/http/TestClient.scala b/akka-http-core/src/test/scala/akka/http/TestClient.scala index 4df0849114..f79a37baad 100644 --- a/akka-http-core/src/test/scala/akka/http/TestClient.scala +++ b/akka-http-core/src/test/scala/akka/http/TestClient.scala @@ -41,7 +41,7 @@ object TestClient extends App { Source(List(HttpRequest() -> 'NoContext)) .to(Sink(connection.requestSubscriber)) .run() - Source(connection.responsePublisher).map(_._1).runWith(Sink.future) + Source(connection.responsePublisher).map(_._1).runWith(Sink.head) } result onComplete { diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala index 08c7e79d94..e00d99ec7e 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala @@ -437,7 +437,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } .flatten(FlattenStrategy.concat) .map(strictEqualify) - .grouped(1000).runWith(Sink.future) + .grouped(1000).runWith(Sink.head) Await.result(future, 250.millis) } @@ -451,7 +451,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } private def compactEntityChunks(data: Source[ChunkStreamPart]): Future[Seq[ChunkStreamPart]] = - data.grouped(1000).runWith(Sink.future) + data.grouped(1000).runWith(Sink.head) .fast.recover { case _: NoSuchElementException ⇒ Nil } def prep(response: String) = response.stripMarginWithNewline("\r\n") diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala index 05d66318e8..76da3481ec 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala @@ -272,7 +272,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } .flatten(FlattenStrategy.concat) .map(strictEqualify) - .grouped(1000).runWith(Sink.future) + .grouped(1000).runWith(Sink.head) Await.result(future, 500.millis) } @@ -290,7 +290,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } private def compactEntityChunks(data: Source[ChunkStreamPart]): Future[Source[ChunkStreamPart]] = - data.grouped(1000).runWith(Sink.future) + data.grouped(1000).runWith(Sink.head) .fast.map(source(_: _*)) .fast.recover { case _: NoSuchElementException ⇒ source() } diff --git a/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala index 96a54ba492..c28df29d16 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala @@ -253,7 +253,7 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll equal(expected.stripMarginWithNewline("\r\n")).matcher[String] compose { request ⇒ val renderer = newRenderer val byteStringSource :: Nil = renderer.onNext(RequestRenderingContext(request, serverAddress)) - val future = byteStringSource.grouped(1000).runWith(Sink.future).map(_.reduceLeft(_ ++ _).utf8String) + val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String) Await.result(future, 250.millis) } } diff --git a/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala index 8be9406582..e95afc8448 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala @@ -403,7 +403,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll equal(expected.stripMarginWithNewline("\r\n") -> close).matcher[(String, Boolean)] compose { ctx ⇒ val renderer = newRenderer val byteStringSource :: Nil = renderer.onNext(ctx) - val future = byteStringSource.grouped(1000).runWith(Sink.future).map(_.reduceLeft(_ ++ _).utf8String) + val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String) Await.result(future, 250.millis) -> renderer.isComplete } diff --git a/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala b/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala index 9e39d2efee..28a37e0aa1 100644 --- a/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala +++ b/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala @@ -107,7 +107,7 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { def collectBytesTo(bytes: ByteString*): Matcher[HttpEntity] = equal(bytes.toVector).matcher[Seq[ByteString]].compose { entity ⇒ - val future = entity.dataBytes.grouped(1000).runWith(Sink.future) + val future = entity.dataBytes.grouped(1000).runWith(Sink.head) Await.result(future, 250.millis) } diff --git a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala index 93ec823ba6..26a0d520de 100644 --- a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala +++ b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala @@ -95,6 +95,6 @@ trait RouteTestResultComponent { failTest("Request was neither completed nor rejected within " + timeout) private def awaitAllElements[T](data: Source[T]): immutable.Seq[T] = - Await.result(data.grouped(Int.MaxValue).runWith(Sink.future), timeout) + Await.result(data.grouped(Int.MaxValue).runWith(Sink.head), timeout) } } \ No newline at end of file diff --git a/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala b/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala index e2547d4a05..aaf8207e35 100644 --- a/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala @@ -213,7 +213,7 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll wi def haveParts[T <: Multipart](parts: Multipart.BodyPart*): Matcher[Future[T]] = equal(parts).matcher[Seq[Multipart.BodyPart]] compose { x ⇒ Await.result(x - .fast.flatMap(x ⇒ x.parts.grouped(100).runWith(Sink.future)) + .fast.flatMap(x ⇒ x.parts.grouped(100).runWith(Sink.head)) .fast.recover { case _: NoSuchElementException ⇒ Nil }, 1.second) } } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java index cbe9eeac8c..df95425064 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java @@ -45,7 +45,7 @@ public class FlexiMergeTest { .addEdge(merge.out(), out1).build().run(materializer); final Publisher pub = m.get(out1); - final Future> all = Source.from(pub).grouped(100).runWith(Sink.>future(), materializer); + final Future> all = Source.from(pub).grouped(100).runWith(Sink.>head(), materializer); final List result = Await.result(all, Duration.apply(3, TimeUnit.SECONDS)); assertEquals( new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), @@ -60,7 +60,7 @@ public class FlexiMergeTest { .addEdge(merge.out(), out1).build().run(materializer); final Publisher pub = m.get(out1); - final Future> all = Source.from(pub).grouped(100).runWith(Sink.>future(), materializer); + final Future> all = Source.from(pub).grouped(100).runWith(Sink.>head(), materializer); final List result = Await.result(all, Duration.apply(3, TimeUnit.SECONDS)); assertEquals(Arrays.asList("a", "e", "b", "f", "c", "d"), result); } @@ -78,7 +78,7 @@ public class FlexiMergeTest { final Publisher> pub = m.get(out); final Future>> all = Source.from(pub).grouped(100). - runWith(Sink.>>future(), materializer); + runWith(Sink.>>head(), materializer); final List> result = Await.result(all, Duration.apply(3, TimeUnit.SECONDS)); assertEquals( Arrays.asList(new Pair(1, "a"), new Pair(2, "b"), new Pair(3, "c")), diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiRouteTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiRouteTest.java index 21a43d1cee..fc3265ddba 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiRouteTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiRouteTest.java @@ -32,8 +32,8 @@ public class FlexiRouteTest { final Source in = Source.from(Arrays.asList("a", "b", "c", "d", "e")); - final KeyedSink, Future>> out1 = Sink.>future(); - final KeyedSink, Future>> out2 = Sink.>future(); + final KeyedSink, Future>> out1 = Sink.>head(); + final KeyedSink, Future>> out2 = Sink.>head(); @Test public void mustBuildSimpleFairRoute() throws Exception { @@ -71,8 +71,8 @@ public class FlexiRouteTest { Source> input = Source.from(Arrays.>asList(new Pair(1, "A"), new Pair( 2, "B"), new Pair(3, "C"), new Pair(4, "D"))); - final KeyedSink, Future>> outA = Sink.>future(); - final KeyedSink, Future>> outB = Sink.>future(); + final KeyedSink, Future>> outA = Sink.>head(); + final KeyedSink, Future>> outB = Sink.>head(); MaterializedMap m = FlowGraph.builder().addEdge(input, unzip.in()) .addEdge(unzip.outputA, Flow.of(Integer.class).grouped(100), outA) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 4e6823fb62..dd01fcee0a 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -327,7 +327,7 @@ public class FlowTest { // collecting final Publisher pub = m.get(publisher); - final Future> all = Source.from(pub).grouped(100).runWith(Sink.> future(), materializer); + final Future> all = Source.from(pub).grouped(100).runWith(Sink.>head(), materializer); final List result = Await.result(all, Duration.apply(200, TimeUnit.MILLISECONDS)); assertEquals(new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet(result)); @@ -466,7 +466,7 @@ public class FlowTest { public String apply(String arg0) throws Exception { throw new RuntimeException("simulated err"); } - }).runWith(Sink. future(), materializer).onComplete(new OnSuccess>() { + }).runWith(Sink.head(), materializer).onComplete(new OnSuccess>() { @Override public void onSuccess(Try e) throws Throwable { if (e == null) { @@ -484,7 +484,7 @@ public class FlowTest { public void mustBeAbleToUseToFuture() throws Exception { final JavaTestKit probe = new JavaTestKit(system); final Iterable input = Arrays.asList("A", "B", "C"); - Future future = Source.from(input).runWith(Sink. future(), materializer); + Future future = Source.from(input).runWith(Sink.head(), materializer); String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); assertEquals("A", result); } @@ -494,12 +494,12 @@ public class FlowTest { final JavaTestKit probe = new JavaTestKit(system); final Iterable input = Arrays.asList(1, 2, 3, 4, 5, 6); Future, Source>> future = Source.from(input).prefixAndTail(3) - .runWith(Sink., Source>> future(), materializer); + .runWith(Sink., Source>>head(), materializer); Pair, Source> result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); assertEquals(Arrays.asList(1, 2, 3), result.first()); - Future> tailFuture = result.second().grouped(4).runWith(Sink.> future(), materializer); + Future> tailFuture = result.second().grouped(4).runWith(Sink.>head(), materializer); List tailResult = Await.result(tailFuture, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); assertEquals(Arrays.asList(4, 5, 6), tailResult); } @@ -514,7 +514,7 @@ public class FlowTest { Future> future = Source.from(mainInputs) .flatten(akka.stream.javadsl.FlattenStrategy. concat()).grouped(6) - .runWith(Sink.> future(), materializer); + .runWith(Sink.>head(), materializer); List result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); @@ -526,7 +526,7 @@ public class FlowTest { final JavaTestKit probe = new JavaTestKit(system); final List input = Arrays.asList("A", "B", "C"); Future> future = Source.from(input).buffer(2, OverflowStrategy.backpressure()).grouped(4) - .runWith(Sink.> future(), materializer); + .runWith(Sink.>head(), materializer); List result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); assertEquals(input, result); @@ -571,7 +571,7 @@ public class FlowTest { public Pair apply(String in) throws Exception { return new Pair(in, in); } - }).runWith(Sink. future(), materializer); + }).runWith(Sink.head(), materializer); String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); assertEquals("A", result); } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index 60657e8514..4a2666c4e9 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -37,7 +37,7 @@ public class SinkTest { @Test public void mustBeAbleToUseFuture() throws Exception { - final KeyedSink> futSink = Sink.future(); + final KeyedSink> futSink = Sink.head(); final List list = new ArrayList(); list.add(1); final Future future = Source.from(list).runWith(futSink, materializer); diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SslTlsFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/SslTlsFlowSpec.scala index 7f582c2ef6..10e579fec5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/SslTlsFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/SslTlsFlowSpec.scala @@ -116,7 +116,7 @@ class SslTlsFlowSpec extends AkkaSpec("akka.actor.default-mailbox.mailbox-type = } def replyFirstLineInUpperCase(scipher: SslTlsCipher): Unit = { - val ssessionf = Source(scipher.sessionInbound).runWith(Sink.future) + val ssessionf = Source(scipher.sessionInbound).runWith(Sink.head) val ssession = Await.result(ssessionf, duration) val sdata = ssession.data Source(sdata).map(bs ⇒ ByteString(bs.decodeString("utf-8").split('\n').head.toUpperCase + '\n')). @@ -128,11 +128,11 @@ class SslTlsFlowSpec extends AkkaSpec("akka.actor.default-mailbox.mailbox-type = } def sendLineAndReceiveResponse(ccipher: SslTlsCipher, message: String): String = { - val csessionf = Source(ccipher.sessionInbound).runWith(Sink.future) + val csessionf = Source(ccipher.sessionInbound).runWith(Sink.head) Source(List(ByteString(message + '\n'))).runWith(Sink(ccipher.plainTextOutbound)) val csession = Await.result(csessionf, duration) val cdata = csession.data - Await.result(Source(cdata).map(_.decodeString("utf-8").split('\n').head).runWith(Sink.future), duration) + Await.result(Source(cdata).map(_.decodeString("utf-8").split('\n').head).runWith(Sink.head), duration) } def sendLineAndReceiveResponse(connection: JavaSslConnection, message: String): String = { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala index ee7b327dbc..4c3031aba9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala @@ -25,14 +25,14 @@ class FlowBufferSpec extends AkkaSpec { "pass elements through normally in backpressured mode" in { val future: Future[Seq[Int]] = Source((1 to 1000).iterator).buffer(100, overflowStrategy = OverflowStrategy.backpressure).grouped(1001). - runWith(Sink.future) + runWith(Sink.head) Await.result(future, 3.seconds) should be(1 to 1000) } "pass elements through normally in backpressured mode with buffer size one" in { - val futureSink = Sink.future[Seq[Int]] + val futureSink = Sink.head[Seq[Int]] val future = Source((1 to 1000).iterator).buffer(1, overflowStrategy = OverflowStrategy.backpressure).grouped(1001). - runWith(Sink.future) + runWith(Sink.head) Await.result(future, 3.seconds) should be(1 to 1000) } @@ -45,7 +45,7 @@ class FlowBufferSpec extends AkkaSpec { .buffer(5, overflowStrategy = OverflowStrategy.backpressure) .buffer(128, overflowStrategy = OverflowStrategy.backpressure) .grouped(1001) - .runWith(Sink.future) + .runWith(Sink.head) Await.result(future, 3.seconds) should be(1 to 1000) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala index dd090209fd..7e17389e98 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala @@ -50,7 +50,7 @@ class FlowCompileSpec extends AkkaSpec { val closedSink: Sink[String] = Flow[String].map(_.hashCode).to(Sink.publisher[Int]) val appended: Sink[Int] = open.to(closedSink) "appended.run()" shouldNot compile - "appended.connect(Sink.future[Int])" shouldNot compile + "appended.connect(Sink.head[Int])" shouldNot compile intSeq.to(appended).run } "be appended to Source" in { @@ -70,7 +70,7 @@ class FlowCompileSpec extends AkkaSpec { intSeq.to(openSource) } "not accept Sink" in { - "openSource.connect(Sink.future[String])" shouldNot compile + "openSource.connect(Sink.head[String])" shouldNot compile } "not run()" in { "openSource.run()" shouldNot compile @@ -92,7 +92,7 @@ class FlowCompileSpec extends AkkaSpec { } "RunnableFlow" should { - Sink.future[String] + Sink.head[String] val closed: RunnableFlow = Source(Seq(1, 2, 3)).map(_.toString).to(Sink.publisher[String]) "run" in { @@ -103,7 +103,7 @@ class FlowCompileSpec extends AkkaSpec { } "not accept Sink" in { - "closed.connect(Sink.future[String])" shouldNot compile + "closed.connect(Sink.head[String])" shouldNot compile } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala index bfbba0b880..4d82228f49 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala @@ -34,7 +34,7 @@ class FlowGraphCompileSpec extends AkkaSpec { val in1 = Source(List("a", "b", "c")) val in2 = Source(List("d", "e", "f")) val out1 = Sink.publisher[String] - val out2 = Sink.future[String] + val out2 = Sink.head[String] "FlowGraph" should { "build simple merge" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala index a3cd2fa889..12981b4444 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala @@ -25,10 +25,10 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val testException = new Exception("test") with NoStackTrace - def newFutureSink = Sink.future[(immutable.Seq[Int], Source[Int])] + def newHeadSink = Sink.head[(immutable.Seq[Int], Source[Int])] "work on empty input" in { - val futureSink = newFutureSink + val futureSink = newHeadSink val fut = Source.empty.prefixAndTail(10).runWith(futureSink) val (prefix, tailFlow) = Await.result(fut, 3.seconds) prefix should be(Nil) @@ -38,7 +38,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { } "work on short input" in { - val futureSink = newFutureSink + val futureSink = newHeadSink val fut = Source(List(1, 2, 3)).prefixAndTail(10).runWith(futureSink) val (prefix, tailFlow) = Await.result(fut, 3.seconds) prefix should be(List(1, 2, 3)) @@ -48,40 +48,40 @@ class FlowPrefixAndTailSpec extends AkkaSpec { } "work on longer inputs" in { - val futureSink = newFutureSink + val futureSink = newHeadSink val fut = Source((1 to 10).iterator).prefixAndTail(5).runWith(futureSink) val (takes, tail) = Await.result(fut, 3.seconds) takes should be(1 to 5) - val futureSink2 = Sink.future[immutable.Seq[Int]] + val futureSink2 = Sink.head[immutable.Seq[Int]] val fut2 = tail.grouped(6).runWith(futureSink2) Await.result(fut2, 3.seconds) should be(6 to 10) } "handle zero take count" in { - val futureSink = newFutureSink + val futureSink = newHeadSink val fut = Source((1 to 10).iterator).prefixAndTail(0).runWith(futureSink) val (takes, tail) = Await.result(fut, 3.seconds) takes should be(Nil) - val futureSink2 = Sink.future[immutable.Seq[Int]] + val futureSink2 = Sink.head[immutable.Seq[Int]] val fut2 = tail.grouped(11).runWith(futureSink2) Await.result(fut2, 3.seconds) should be(1 to 10) } "handle negative take count" in { - val futureSink = newFutureSink + val futureSink = newHeadSink val fut = Source((1 to 10).iterator).prefixAndTail(-1).runWith(futureSink) val (takes, tail) = Await.result(fut, 3.seconds) takes should be(Nil) - val futureSink2 = Sink.future[immutable.Seq[Int]] + val futureSink2 = Sink.head[immutable.Seq[Int]] val fut2 = tail.grouped(11).runWith(futureSink2) Await.result(fut2, 3.seconds) should be(1 to 10) } "work if size of take is equal to stream size" in { - val futureSink = newFutureSink + val futureSink = newHeadSink val fut = Source((1 to 10).iterator).prefixAndTail(10).runWith(futureSink) val (takes, tail) = Await.result(fut, 3.seconds) takes should be(1 to 10) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureSinkSpec.scala index 46c8fd22be..116d41fdc3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureSinkSpec.scala @@ -13,7 +13,7 @@ import akka.stream.MaterializerSettings import akka.stream.testkit.{ AkkaSpec, StreamTestKit } import akka.stream.testkit.ScriptedTest -class FutureSinkSpec extends AkkaSpec with ScriptedTest { +class HeadSinkSpec extends AkkaSpec with ScriptedTest { val settings = MaterializerSettings(system) .withInputBuffer(initialSize = 2, maxSize = 16) @@ -21,11 +21,11 @@ class FutureSinkSpec extends AkkaSpec with ScriptedTest { implicit val materializer = FlowMaterializer(settings) - "A Flow with Sink.future" must { + "A Flow with Sink.head" must { "yield the first value" in { val p = StreamTestKit.PublisherProbe[Int]() - val f: Future[Int] = Source(p).map(identity).runWith(Sink.future) + val f: Future[Int] = Source(p).map(identity).runWith(Sink.head) val proc = p.expectSubscription proc.expectRequest() proc.sendNext(42) @@ -35,7 +35,7 @@ class FutureSinkSpec extends AkkaSpec with ScriptedTest { "yield the first value when actively constructing" in { val p = StreamTestKit.PublisherProbe[Int]() - val f = Sink.future[Int] + val f = Sink.head[Int] val s = Source.subscriber[Int] val m = s.to(f).run() p.subscribe(m.get(s)) @@ -48,7 +48,7 @@ class FutureSinkSpec extends AkkaSpec with ScriptedTest { "yield the first error" in { val p = StreamTestKit.PublisherProbe[Int]() - val f = Source(p).runWith(Sink.future) + val f = Source(p).runWith(Sink.head) val proc = p.expectSubscription proc.expectRequest() val ex = new RuntimeException("ex") @@ -59,7 +59,7 @@ class FutureSinkSpec extends AkkaSpec with ScriptedTest { "yield NoSuchElementExcption for empty stream" in { val p = StreamTestKit.PublisherProbe[Int]() - val f = Source(p).runWith(Sink.future) + val f = Source(p).runWith(Sink.head) val proc = p.expectSubscription proc.expectRequest() proc.sendComplete() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala index 3e224574e6..ba856d1a96 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala @@ -115,11 +115,11 @@ class GraphBalanceSpec extends AkkaSpec { } "work with 5-way balance" in { - val f1 = Sink.future[Seq[Int]] - val f2 = Sink.future[Seq[Int]] - val f3 = Sink.future[Seq[Int]] - val f4 = Sink.future[Seq[Int]] - val f5 = Sink.future[Seq[Int]] + val f1 = Sink.head[Seq[Int]] + val f2 = Sink.head[Seq[Int]] + val f3 = Sink.head[Seq[Int]] + val f4 = Sink.head[Seq[Int]] + val f5 = Sink.head[Seq[Int]] val g = FlowGraph { implicit b ⇒ val balance = Balance[Int]("balance", waitForAllDownstreams = true) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala index 8cfb990dbb..8ce1b8ad9e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala @@ -48,11 +48,11 @@ class GraphBroadcastSpec extends AkkaSpec { } "work with n-way broadcast" in { - val f1 = Sink.future[Seq[Int]] - val f2 = Sink.future[Seq[Int]] - val f3 = Sink.future[Seq[Int]] - val f4 = Sink.future[Seq[Int]] - val f5 = Sink.future[Seq[Int]] + val f1 = Sink.head[Seq[Int]] + val f2 = Sink.head[Seq[Int]] + val f3 = Sink.head[Seq[Int]] + val f4 = Sink.head[Seq[Int]] + val f5 = Sink.head[Seq[Int]] val g = FlowGraph { implicit b ⇒ val bcast = Broadcast[Int]("broadcast") diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala index 9e425928e5..fecf7f59fa 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala @@ -64,7 +64,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec { "FlowGraphs" must { "support broadcast - merge layouts" in { - val resultFuture = Sink.future[Seq[Int]] + val resultFuture = Sink.head[Seq[Int]] val g = FlowGraph { implicit b ⇒ val bcast = Broadcast[Int]("broadcast") @@ -83,7 +83,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec { val elements = 0 to 10 val in = Source(elements) val f = Flow[Int] - val out = Sink.future[Seq[Int]] + val out = Sink.head[Seq[Int]] val g = FlowGraph { implicit b ⇒ val balance = Balance[Int] @@ -101,9 +101,9 @@ class GraphOpsIntegrationSpec extends AkkaSpec { "support wikipedia Topological_sorting 2" in { // see https://en.wikipedia.org/wiki/Topological_sorting#mediaviewer/File:Directed_acyclic_graph.png - val resultFuture2 = Sink.future[Seq[Int]] - val resultFuture9 = Sink.future[Seq[Int]] - val resultFuture10 = Sink.future[Seq[Int]] + val resultFuture2 = Sink.head[Seq[Int]] + val resultFuture9 = Sink.head[Seq[Int]] + val resultFuture10 = Sink.head[Seq[Int]] val g = FlowGraph { implicit b ⇒ val b3 = Broadcast[Int]("b3") @@ -149,7 +149,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec { } "allow adding of flows to sources and sinks to flows" in { - val resultFuture = Sink.future[Seq[Int]] + val resultFuture = Sink.head[Seq[Int]] val g = FlowGraph { implicit b ⇒ val bcast = Broadcast[Int]("broadcast") diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPreferredMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPreferredMergeSpec.scala index b16978af63..2c83a60996 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPreferredMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPreferredMergeSpec.scala @@ -25,7 +25,7 @@ class GraphPreferredMergeSpec extends TwoStreamsSetup { val preferred = Source(Stream.fill(numElements)(1)) val aux1, aux2, aux3 = Source(Stream.fill(numElements)(2)) - val sink = Sink.future[Seq[Int]] + val sink = Sink.head[Seq[Int]] val g = FlowGraph { implicit b ⇒ val merge = MergePreferred[Int] @@ -45,7 +45,7 @@ class GraphPreferredMergeSpec extends TwoStreamsSetup { val g = FlowGraph { implicit b ⇒ val merge = MergePreferred[Int] - s1 ~> merge.preferred ~> Sink.future[Int] + s1 ~> merge.preferred ~> Sink.head[Int] s2 ~> merge.preferred s3 ~> merge } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala index 9375b5464f..a4034d13f9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala @@ -73,7 +73,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { // sleep long enough for it to be cleaned up Thread.sleep(1000) - val f = s3.runWith(Sink.future).recover { case _: SubscriptionTimeoutException ⇒ "expected" } + val f = s3.runWith(Sink.head).recover { case _: SubscriptionTimeoutException ⇒ "expected" } Await.result(f, 300.millis) should equal("expected") } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 80990741cd..bd68d2907b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -106,8 +106,8 @@ object Sink { /** * A `Sink` that materializes into a `Future` of the first value received. */ - def future[In]: KeyedSink[In, Future[In]] = - new KeyedSink(scaladsl.Sink.future[In]) + def head[In]: KeyedSink[In, Future[In]] = + new KeyedSink(scaladsl.Sink.head[In]) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala index 81fc4213fb..ea795d847e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala @@ -95,8 +95,8 @@ final case class FanoutPublisherSink[In](initialBufferSize: Int, maximumBufferSi } } -object FutureSink { - def apply[T](): FutureSink[T] = new FutureSink[T] +object HeadSink { + def apply[T](): HeadSink[T] = new HeadSink[T] } /** @@ -106,7 +106,7 @@ object FutureSink { * the Future into the corresponding failed state) or the end-of-stream * (failing the Future with a NoSuchElementException). */ -class FutureSink[In] extends KeyedActorFlowSink[In] { +class HeadSink[In] extends KeyedActorFlowSink[In] { type MaterializedType = Future[In] @@ -130,7 +130,7 @@ class FutureSink[In] extends KeyedActorFlowSink[In] { (sub, p.future) } - override def toString: String = "FutureSink" + override def toString: String = "HeadSink" } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 5c161e1498..f9774426ab 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -64,7 +64,7 @@ object Sink { /** * A `Sink` that materializes into a `Future` of the first value received. */ - def future[T]: FutureSink[T] = FutureSink[T] + def head[T]: HeadSink[T] = HeadSink[T] /** * A `Sink` that materializes into a [[org.reactivestreams.Publisher]].