diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala index 171f7f3bb5..42bfba5c38 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala @@ -152,14 +152,14 @@ class FlowDocSpec extends AkkaSpec { //#flow-mat-combine // An empty source that can be shut down explicitly from the outside - val source: Source[Int, Promise[Unit]] = Source.lazyEmpty[Int]() + val source: Source[Int, Promise[Unit]] = Source.lazyEmpty[Int] // A flow that internally throttles elements to 1/second, and returns a Cancellable // which can be used to shut down the stream val flow: Flow[Int, Int, Cancellable] = throttler // A sink that returns the first element of a stream in the returned Future - val sink: Sink[Int, Future[Int]] = Sink.head[Int]() + val sink: Sink[Int, Future[Int]] = Sink.head[Int] // By default, the materialized value of the leftmost stage is preserved val r1: RunnableFlow[Promise[Unit]] = source.via(flow).to(sink) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala index 235c915eed..dde59b9d21 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala @@ -75,7 +75,7 @@ class FlowErrorDocSpec extends AkkaSpec { else acc + elem } } - val result = source.grouped(1000).runWith(Sink.head()) + val result = source.grouped(1000).runWith(Sink.head) // the negative element cause the scan stage to be restarted, // i.e. start from 0 again // result here will be a Future completed with Success(Vector(0, 1, 0, 5, 12)) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/GraphCyclesSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/GraphCyclesSpec.scala index aa082aa221..0dd2fa037e 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/GraphCyclesSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/GraphCyclesSpec.scala @@ -22,7 +22,7 @@ class GraphCyclesSpec extends AkkaSpec { val merge = b.add(Merge[Int](2)) val bcast = b.add(Broadcast[Int](2)) - source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore() + source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore merge <~ bcast } //#deadlocked @@ -39,7 +39,7 @@ class GraphCyclesSpec extends AkkaSpec { val merge = b.add(MergePreferred[Int](1)) val bcast = b.add(Broadcast[Int](2)) - source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore() + source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore merge.preferred <~ bcast } //#unfair @@ -55,7 +55,7 @@ class GraphCyclesSpec extends AkkaSpec { val merge = b.add(Merge[Int](2)) val bcast = b.add(Broadcast[Int](2)) - source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore() + source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore merge <~ Flow[Int].buffer(10, OverflowStrategy.dropHead) <~ bcast } //#dropping @@ -73,7 +73,7 @@ class GraphCyclesSpec extends AkkaSpec { val bcast = b.add(Broadcast[Int](2)) source ~> zip.in0 - zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore() + zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore zip.in1 <~ bcast } //#zipping-dead @@ -92,7 +92,7 @@ class GraphCyclesSpec extends AkkaSpec { val start = Source.single(0) source ~> zip.in0 - zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore() + zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore zip.in1 <~ concat <~ start concat <~ bcast } diff --git a/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala index 746ce763d6..7b5d82cbbc 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala @@ -43,7 +43,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec { val impl = new Fixture { override def tweets: Publisher[Tweet] = - TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.publisher()) + TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.publisher) override def storage = SubscriberProbe[Author] @@ -95,7 +95,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec { //#source-publisher val authorPublisher: Publisher[Author] = - Source(tweets).via(authors).runWith(Sink.publisher()) + Source(tweets).via(authors).runWith(Sink.publisher) authorPublisher.subscribe(storage) //#source-publisher diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala index 3f98cc5b22..8221c2e3db 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala @@ -60,7 +60,7 @@ class StreamBuffersRateSpec extends AkkaSpec { "explcit buffers" in { trait Job - def inboundJobsConnector(): Source[Job, Unit] = Source.empty() + def inboundJobsConnector(): Source[Job, Unit] = Source.empty //#explicit-buffers-backpressure // Getting a stream of jobs from an imaginary external system as a Source val jobs: Source[Job, Unit] = inboundJobsConnector() diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala index 32799495fe..719de76065 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala @@ -67,7 +67,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec { zip.out } - val firstPair: Future[(Int, Int)] = pairs.runWith(Sink.head()) + val firstPair: Future[(Int, Int)] = pairs.runWith(Sink.head) //#source-from-partial-flow-graph Await.result(firstPair, 300.millis) should equal(1 -> 2) } @@ -94,7 +94,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec { // format: OFF val (_, matSink: Future[(Int, String)]) = //#flow-from-partial-flow-graph - pairUpWithToString.runWith(Source(List(1)), Sink.head()) + pairUpWithToString.runWith(Source(List(1)), Sink.head) //#flow-from-partial-flow-graph // format: ON diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala index 7443ab2bf2..ddf17f2539 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala @@ -41,7 +41,7 @@ class RecipeByteStrings extends RecipeSpec { val chunksStream = rawBytes.transform(() => new Chunker(ChunkLimit)) //#bytestring-chunker - val chunksFuture = chunksStream.grouped(10).runWith(Sink.head()) + val chunksFuture = chunksStream.grouped(10).runWith(Sink.head) val chunks = Await.result(chunksFuture, 3.seconds) @@ -70,11 +70,11 @@ class RecipeByteStrings extends RecipeSpec { val bytes1 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9))) val bytes2 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9, 10))) - Await.result(bytes1.via(limiter).grouped(10).runWith(Sink.head()), 3.seconds) + Await.result(bytes1.via(limiter).grouped(10).runWith(Sink.head), 3.seconds) .fold(ByteString())(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9)) an[IllegalStateException] must be thrownBy { - Await.result(bytes2.via(limiter).grouped(10).runWith(Sink.head()), 3.seconds) + Await.result(bytes2.via(limiter).grouped(10).runWith(Sink.head), 3.seconds) } } @@ -86,7 +86,7 @@ class RecipeByteStrings extends RecipeSpec { val compacted: Source[ByteString, Unit] = data.map(_.compact) //#compacting-bytestrings - Await.result(compacted.grouped(10).runWith(Sink.head()), 3.seconds).forall(_.isCompact) should be(true) + Await.result(compacted.grouped(10).runWith(Sink.head), 3.seconds).forall(_.isCompact) should be(true) } } diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDigest.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDigest.scala index f3bf0306f6..3bfe762166 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDigest.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDigest.scala @@ -44,7 +44,7 @@ class RecipeDigest extends RecipeSpec { val digest: Source[ByteString, Unit] = data.transform(() => digestCalculator("SHA-256")) //#calculating-digest - Await.result(digest.runWith(Sink.head()), 3.seconds) should be( + Await.result(digest.runWith(Sink.head), 3.seconds) should be( ByteString( 0x24, 0x8d, 0x6a, 0x61, 0xd2, 0x06, 0x38, 0xb8, diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeFlattenSeq.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeFlattenSeq.scala index e5e468cfec..f46c7780ea 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeFlattenSeq.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeFlattenSeq.scala @@ -19,7 +19,7 @@ class RecipeFlattenSeq extends RecipeSpec { val flattened: Source[Message, Unit] = myData.mapConcat(identity) //#flattening-seqs - Await.result(flattened.grouped(8).runWith(Sink.head()), 3.seconds) should be(List("1", "2", "3", "4", "5", "6", "7")) + Await.result(flattened.grouped(8).runWith(Sink.head), 3.seconds) should be(List("1", "2", "3", "4", "5", "6", "7")) } diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeMultiGroupBy.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeMultiGroupBy.scala index 6b25c494f4..a9c2f52044 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeMultiGroupBy.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeMultiGroupBy.scala @@ -43,8 +43,8 @@ class RecipeMultiGroupBy extends RecipeSpec { //#multi-groupby val result = multiGroups.map { - case (topic, topicMessages) => topicMessages.grouped(10).map(topic.name + _.mkString("[", ", ", "]")).runWith(Sink.head()) - }.mapAsync(identity).grouped(10).runWith(Sink.head()) + case (topic, topicMessages) => topicMessages.grouped(10).map(topic.name + _.mkString("[", ", ", "]")).runWith(Sink.head) + }.mapAsync(identity).grouped(10).runWith(Sink.head) Await.result(result, 3.seconds).toSet should be(Set( "1[1: a, 1: b, all: c, all: d, 1: e]", diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala index 0074a8a779..e95ac181bd 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala @@ -24,7 +24,7 @@ class RecipeParseLines extends RecipeSpec { val linesStream = rawData.transform(() => parseLines("\r\n", 100)) - Await.result(linesStream.grouped(10).runWith(Sink.head()), 3.seconds) should be(List( + Await.result(linesStream.grouped(10).runWith(Sink.head), 3.seconds) should be(List( "Hello World\r!", "Hello Akka!", "Hello Streams!", diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala index 09ad9c71dd..fb6f8912b0 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala @@ -35,7 +35,7 @@ class RecipeReduceByKey extends RecipeSpec { .mapAsync(identity) //#word-count - Await.result(counts.grouped(10).runWith(Sink.head()), 3.seconds).toSet should be(Set( + Await.result(counts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set( ("hello", 2), ("world", 1), ("and", 1), @@ -72,7 +72,7 @@ class RecipeReduceByKey extends RecipeSpec { //#reduce-by-key-general - Await.result(wordCounts.grouped(10).runWith(Sink.head()), 3.seconds).toSet should be(Set( + Await.result(wordCounts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set( ("hello", 2), ("world", 1), ("and", 1), diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeToStrict.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeToStrict.scala index 7fd028b1e5..a73c581fc3 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeToStrict.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeToStrict.scala @@ -16,7 +16,7 @@ class RecipeToStrict extends RecipeSpec { //#draining-to-seq val strict: Future[immutable.Seq[Message]] = - myData.grouped(MaxAllowedSeqSize).runWith(Sink.head()) + myData.grouped(MaxAllowedSeqSize).runWith(Sink.head) //#draining-to-seq Await.result(strict, 3.seconds) should be(List("1", "2", "3")) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala index aeb8b8fb47..b547efa5ac 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala @@ -37,7 +37,7 @@ class RecipeWorkerPool extends RecipeSpec { val processedJobs: Source[Result, Unit] = myJobs.via(balancer(worker, 3)) //#worker-pool - Await.result(processedJobs.grouped(10).runWith(Sink.head()), 3.seconds).toSet should be(Set( + Await.result(processedJobs.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set( "1 done", "2 done", "3 done", "4 done", "5 done")) } diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index b6fde9517a..5beb23ea79 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -61,7 +61,7 @@ sealed trait HttpEntity extends japi.HttpEntity { } // TODO timerTransform is meant to be replaced / rewritten, it's currently private[akka]; See https://github.com/akka/akka/issues/16393 - dataBytes.section(name("toStrict"))(_.timerTransform(transformer)).runWith(Sink.head()) + dataBytes.section(name("toStrict"))(_.timerTransform(transformer)).runWith(Sink.head) } /** diff --git a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala index abd8f3735f..1530b461b1 100644 --- a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala @@ -133,11 +133,11 @@ private[http] object StreamUtils { case Nil ⇒ Nil case Seq(one) ⇒ Vector(input.via(one)) case multiple ⇒ - val (fanoutSub, fanoutPub) = Source.subscriber[ByteString]().toMat(Sink.fanoutPublisher(16, 16))(Keep.both).run() + val (fanoutSub, fanoutPub) = Source.subscriber[ByteString].toMat(Sink.fanoutPublisher(16, 16))(Keep.both).run() val sources = transformers.map { flow ⇒ // Doubly wrap to ensure that subscription to the running publisher happens before the final sources // are exposed, so there is no race - Source(Source(fanoutPub).via(flow).runWith(Sink.publisher())) + Source(Source(fanoutPub).via(flow).runWith(Sink.publisher)) } // The fanout publisher must be wired to the original source after all fanout subscribers have been subscribed input.runWith(Sink(fanoutSub)) diff --git a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala index f3291920df..01b13ddcd7 100644 --- a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala @@ -122,7 +122,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { private val HttpRequest(POST, uri, List(Accept(Seq(MediaRanges.`*/*`)), Host(_, _), `User-Agent`(_)), Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext() uri shouldEqual Uri(s"http://$hostname:$port/chunked") - Await.result(chunkStream.grouped(4).runWith(Sink.head()), 100.millis) shouldEqual chunks + Await.result(chunkStream.grouped(4).runWith(Sink.head), 100.millis) shouldEqual chunks val serverOutSub = serverOut.expectSubscription() serverOutSub.expectRequest() @@ -132,7 +132,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { clientInSub.request(1) val HttpResponse(StatusCodes.PartialContent, List(Age(42), Server(_), Date(_)), Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`) = clientIn.expectNext() - Await.result(chunkStream2.grouped(1000).runWith(Sink.head()), 100.millis) shouldEqual chunks + Await.result(chunkStream2.grouped(1000).runWith(Sink.head), 100.millis) shouldEqual chunks clientOutSub.sendComplete() serverInSub.request(1) // work-around for #16552 @@ -205,8 +205,8 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { def acceptConnection(): (SubscriberProbe[HttpRequest], PublisherProbe[HttpResponse]) = { connSourceSub.request(1) val incomingConnection = connSource.expectNext() - val sink = Sink.publisher[HttpRequest]() - val source = Source.subscriber[HttpResponse]() + val sink = Sink.publisher[HttpRequest] + val source = Source.subscriber[HttpResponse] val handler = Flow(sink, source)(Keep.both) { implicit b ⇒ (snk, src) ⇒ diff --git a/akka-http-core/src/test/scala/akka/http/TestClient.scala b/akka-http-core/src/test/scala/akka/http/TestClient.scala index 3edf2a2fbc..0d481817d3 100644 --- a/akka-http-core/src/test/scala/akka/http/TestClient.scala +++ b/akka-http-core/src/test/scala/akka/http/TestClient.scala @@ -25,7 +25,7 @@ object TestClient extends App { println(s"Fetching HTTP server version of host `$host` ...") val connection = Http().outgoingConnection(host) - val result = Source.single(HttpRequest()).via(connection).runWith(Sink.head()) + val result = Source.single(HttpRequest()).via(connection).runWith(Sink.head) result.map(_.header[headers.Server]) onComplete { case Success(res) ⇒ println(s"$host is running ${res mkString ", "}") diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala index d86226151e..dcb29884f9 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala @@ -233,7 +233,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { val parser = newParser val result = multiParse(newParser)(Seq(prep(start + manyChunks))) val HttpEntity.Chunked(_, chunks) = result.head.right.get.req.entity - val strictChunks = chunks.grouped(100000).runWith(Sink.head()).awaitResult(awaitAtMost) + val strictChunks = chunks.grouped(100000).runWith(Sink.head).awaitResult(awaitAtMost) strictChunks.size shouldEqual numChunks } } @@ -462,7 +462,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } .flatten(FlattenStrategy.concat) .map(strictEqualify) - .grouped(100000).runWith(Sink.head()) + .grouped(100000).runWith(Sink.head) .awaitResult(awaitAtMost) protected def parserSettings: ParserSettings = ParserSettings(system) @@ -475,7 +475,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } private def compactEntityChunks(data: Source[ChunkStreamPart, Unit]): Future[Seq[ChunkStreamPart]] = - data.grouped(100000).runWith(Sink.head()) + data.grouped(100000).runWith(Sink.head) .fast.recover { case _: NoSuchElementException ⇒ Nil } def prep(response: String) = response.stripMarginWithNewline("\r\n") diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala index 3df0e46fda..bc661930e1 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala @@ -279,7 +279,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } .flatten(FlattenStrategy.concat) .map(strictEqualify) - .grouped(100000).runWith(Sink.head()) + .grouped(100000).runWith(Sink.head) Await.result(future, 500.millis) } @@ -298,7 +298,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } private def compactEntityChunks(data: Source[ChunkStreamPart, Unit]): Future[Source[ChunkStreamPart, Unit]] = - data.grouped(100000).runWith(Sink.head()) + data.grouped(100000).runWith(Sink.head) .fast.map(source(_: _*)) .fast.recover { case _: NoSuchElementException ⇒ source() } diff --git a/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala index 6edbea7211..d2966e2da8 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala @@ -254,8 +254,8 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll val renderer = newRenderer val byteStringSource = Await.result(Source.single(RequestRenderingContext(request, serverAddress)). section(name("renderer"))(_.transform(() ⇒ renderer)). - runWith(Sink.head()), 1.second) - val future = byteStringSource.grouped(1000).runWith(Sink.head()).map(_.reduceLeft(_ ++ _).utf8String) + runWith(Sink.head), 1.second) + val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String) Await.result(future, 250.millis) } } diff --git a/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala index 475023a6ac..4041050693 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala @@ -413,8 +413,8 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll val renderer = newRenderer val byteStringSource = Await.result(Source.single(ctx). section(name("renderer"))(_.transform(() ⇒ renderer)). - runWith(Sink.head()), 1.second) - val future = byteStringSource.grouped(1000).runWith(Sink.head()).map(_.reduceLeft(_ ++ _).utf8String) + runWith(Sink.head), 1.second) + val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String) Await.result(future, 250.millis) -> renderer.isComplete } diff --git a/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala b/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala index d7a80af06b..e18761681d 100644 --- a/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala +++ b/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala @@ -105,7 +105,7 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { def collectBytesTo(bytes: ByteString*): Matcher[HttpEntity] = equal(bytes.toVector).matcher[Seq[ByteString]].compose { entity ⇒ - val future = entity.dataBytes.grouped(1000).runWith(Sink.head()) + val future = entity.dataBytes.grouped(1000).runWith(Sink.head) Await.result(future, 250.millis) } diff --git a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala index 7392a5f188..155f5f8fd7 100644 --- a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala +++ b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala @@ -96,6 +96,6 @@ trait RouteTestResultComponent { failTest("Request was neither completed nor rejected within " + timeout) private def awaitAllElements[T](data: Source[T, _]): immutable.Seq[T] = - data.grouped(100000).runWith(Sink.head()).awaitResult(timeout) + data.grouped(100000).runWith(Sink.head).awaitResult(timeout) } } \ No newline at end of file diff --git a/akka-http-tests/src/test/scala/akka/http/coding/CoderSpec.scala b/akka-http-tests/src/test/scala/akka/http/coding/CoderSpec.scala index ef8ed87133..0a071ec188 100644 --- a/akka-http-tests/src/test/scala/akka/http/coding/CoderSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/coding/CoderSpec.scala @@ -111,7 +111,7 @@ abstract class CoderSpec extends WordSpec with CodecSpecSupport with Inspectors val resultBs = Source.single(compressed) .via(Coder.withMaxBytesPerChunk(limit).decoderFlow) - .grouped(4200).runWith(Sink.head()) + .grouped(4200).runWith(Sink.head) .awaitResult(1.second) forAll(resultBs) { bs ⇒ diff --git a/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala index c118aa78c0..2a10f0a06b 100644 --- a/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala @@ -99,7 +99,7 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside { wrs { complete("Some random and not super short entity.") } } ~> check { header[`Content-Range`] should be(None) - val parts = Await.result(responseAs[Multipart.ByteRanges].parts.grouped(1000).runWith(Sink.head()), 1.second) + val parts = Await.result(responseAs[Multipart.ByteRanges].parts.grouped(1000).runWith(Sink.head), 1.second) parts.size shouldEqual 2 inside(parts(0)) { case Multipart.ByteRanges.BodyPart(range, entity, unit, headers) ⇒ @@ -124,7 +124,7 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside { wrs { complete(HttpEntity.Default(MediaTypes.`text/plain`, content.length, entityData())) } } ~> check { header[`Content-Range`] should be(None) - val parts = Await.result(responseAs[Multipart.ByteRanges].parts.grouped(1000).runWith(Sink.head()), 1.second) + val parts = Await.result(responseAs[Multipart.ByteRanges].parts.grouped(1000).runWith(Sink.head), 1.second) parts.size shouldEqual 2 } } diff --git a/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala b/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala index 7b66cd39bf..337763f283 100644 --- a/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/unmarshalling/UnmarshallingSpec.scala @@ -213,7 +213,7 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll wi def haveParts[T <: Multipart](parts: Multipart.BodyPart*): Matcher[Future[T]] = equal(parts).matcher[Seq[Multipart.BodyPart]] compose { x ⇒ Await.result(x - .fast.flatMap(x ⇒ x.parts.grouped(100).runWith(Sink.head())) + .fast.flatMap(x ⇒ x.parts.grouped(100).runWith(Sink.head)) .fast.recover { case _: NoSuchElementException ⇒ Nil }, 1.second) } } diff --git a/akka-http/src/main/scala/akka/http/coding/Decoder.scala b/akka-http/src/main/scala/akka/http/coding/Decoder.scala index 6d7440f7d8..9046803136 100644 --- a/akka-http/src/main/scala/akka/http/coding/Decoder.scala +++ b/akka-http/src/main/scala/akka/http/coding/Decoder.scala @@ -28,7 +28,7 @@ trait Decoder { def decoderFlow: Flow[ByteString, ByteString, Unit] def decode(input: ByteString)(implicit mat: FlowMaterializer): Future[ByteString] = - Source.single(input).via(decoderFlow).runWith(Sink.head()) + Source.single(input).via(decoderFlow).runWith(Sink.head) } object Decoder { val MaxBytesPerChunkDefault: Int = 65536 diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala index 52336d3ec3..225936b961 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala @@ -74,7 +74,7 @@ trait MultipartUnmarshallers { // note that iter.next() will throw exception if stream fails iter.foreach { case BodyPartStart(headers, createEntity) ⇒ - val entity = createEntity(Source.empty()) match { + val entity = createEntity(Source.empty) match { case x: HttpEntity.Strict ⇒ x case x ⇒ throw new IllegalStateException("Unexpected entity type from strict BodyPartParser: " + x) } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala index d14b85c342..36188fde76 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala @@ -29,7 +29,7 @@ abstract class AkkaIdentityProcessorVerification[T](env: TestEnvironment, publis StreamTestKit.errorPublisher(new Exception("Unable to serve subscribers right now!")) def processorFromFlow(flow: Flow[T, T, _])(implicit mat: ActorFlowMaterializer): Processor[T, T] = { - val (sub: Subscriber[T], pub: Publisher[T]) = flow.runWith(Source.subscriber[T](), Sink.publisher[T]()) + val (sub: Subscriber[T], pub: Publisher[T]) = flow.runWith(Source.subscriber[T], Sink.publisher[T]) new Processor[T, T] { override def onSubscribe(s: Subscription): Unit = sub.onSubscribe(s) diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/ConcatTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/ConcatTest.scala index d828c1d741..505a0a1c0b 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/ConcatTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/ConcatTest.scala @@ -10,7 +10,7 @@ import org.reactivestreams.Publisher class ConcatTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { - Source(iterable(elements / 2)).concat(Source(iterable((elements + 1) / 2))).runWith(Sink.publisher()) + Source(iterable(elements / 2)).concat(Source(iterable((elements + 1) / 2))).runWith(Sink.publisher) } // FIXME verifyNoAsyncErrors() without delay is wrong in TCK, enable again in RC4 diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala index 2f21743546..6660dfa3ac 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala @@ -13,7 +13,7 @@ class FlattenTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { val s1 = Source(iterable(elements / 2)) val s2 = Source(iterable((elements + 1) / 2)) - Source(List(s1, s2)).flatten(FlattenStrategy.concat).runWith(Sink.publisher()) + Source(List(s1, s2)).flatten(FlattenStrategy.concat).runWith(Sink.publisher) } // FIXME verifyNoAsyncErrors() without delay is wrong in TCK, enable again in RC4 diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FoldSinkSubscriberTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FoldSinkSubscriberTest.scala index 3edba0b6b3..bd8c303767 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FoldSinkSubscriberTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FoldSinkSubscriberTest.scala @@ -9,7 +9,7 @@ import org.reactivestreams.Subscriber class FoldSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] { override def createSubscriber(): Subscriber[Int] = - Flow[Int].to(Sink.fold(0)(_ + _)).runWith(Source.subscriber()) + Flow[Int].to(Sink.fold(0)(_ + _)).runWith(Source.subscriber) override def createElement(element: Int): Int = element } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/ForeachSinkSubscriberTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/ForeachSinkSubscriberTest.scala index 5e211076be..4f999bcd21 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/ForeachSinkSubscriberTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/ForeachSinkSubscriberTest.scala @@ -9,7 +9,7 @@ import org.reactivestreams.Subscriber class ForeachSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] { override def createSubscriber(): Subscriber[Int] = - Flow[Int].to(Sink.foreach { _ ⇒ }).runWith(Source.subscriber()) + Flow[Int].to(Sink.foreach { _ ⇒ }).runWith(Source.subscriber) override def createElement(element: Int): Int = element } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala index db829244af..3e2b73d87d 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala @@ -13,7 +13,7 @@ class FuturePublisherTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { val p = Promise[Int]() - val pub = Source(p.future).runWith(Sink.publisher()) + val pub = Source(p.future).runWith(Sink.publisher) p.success(0) pub } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/GroupByTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/GroupByTest.scala index afdb231c14..8127e45849 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/GroupByTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/GroupByTest.scala @@ -17,9 +17,9 @@ class GroupByTest extends AkkaPublisherVerification[Int] { if (elements == 0) EmptyPublisher[Int] else { val futureGroupSource = - Source(iterable(elements)).groupBy(elem ⇒ "all").map { case (_, group) ⇒ group }.runWith(Sink.head()) + Source(iterable(elements)).groupBy(elem ⇒ "all").map { case (_, group) ⇒ group }.runWith(Sink.head) val groupSource = Await.result(futureGroupSource, 3.seconds) - groupSource.runWith(Sink.publisher()) + groupSource.runWith(Sink.publisher) } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala index 6cab762130..db9cc276d9 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala @@ -11,7 +11,7 @@ import org.reactivestreams._ class IterablePublisherTest extends AkkaPublisherVerification[Int] { override def createPublisher(elements: Long): Publisher[Int] = { - Source(iterable(elements)).runWith(Sink.publisher()) + Source(iterable(elements)).runWith(Sink.publisher) } // FIXME #16983 diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/LazyEmptySourceTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/LazyEmptySourceTest.scala index 25fd2dfcbb..91bb1f8ad0 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/LazyEmptySourceTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/LazyEmptySourceTest.scala @@ -11,7 +11,7 @@ import akka.stream.scaladsl.Sink class LazyEmptySourceTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = - Source.lazyEmpty[Int].runWith(Sink.publisher()) + Source.lazyEmpty[Int].runWith(Sink.publisher) override def maxElementsFromPublisher(): Long = 0 } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/PrefixAndTailTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/PrefixAndTailTest.scala index e4c2c540e3..6173c2ce61 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/PrefixAndTailTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/PrefixAndTailTest.scala @@ -13,9 +13,9 @@ import org.reactivestreams.Publisher class PrefixAndTailTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { - val futureTailSource = Source(iterable(elements)).prefixAndTail(0).map { case (_, tail) ⇒ tail }.runWith(Sink.head()) + val futureTailSource = Source(iterable(elements)).prefixAndTail(0).map { case (_, tail) ⇒ tail }.runWith(Sink.head) val tailSource = Await.result(futureTailSource, 3.seconds) - tailSource.runWith(Sink.publisher()) + tailSource.runWith(Sink.publisher) } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementSourceTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementSourceTest.scala index 1795732779..e63a0e8063 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementSourceTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementSourceTest.scala @@ -11,7 +11,7 @@ import org.reactivestreams.Publisher class SingleElementSourceTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = - Source.single(1).runWith(Sink.publisher()) + Source.single(1).runWith(Sink.publisher) override def maxElementsFromPublisher(): Long = 1 } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SplitWhenTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SplitWhenTest.scala index c6cb077aec..e52303a4ed 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/SplitWhenTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SplitWhenTest.scala @@ -16,9 +16,9 @@ class SplitWhenTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = if (elements == 0) EmptyPublisher[Int] else { - val futureSource = Source(iterable(elements)).splitWhen(elem ⇒ false).runWith(Sink.head()) + val futureSource = Source(iterable(elements)).splitWhen(elem ⇒ false).runWith(Sink.head) val source = Await.result(futureSource, 3.seconds) - source.runWith(Sink.publisher()) + source.runWith(Sink.publisher) } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SyncIterablePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SyncIterablePublisherTest.scala index df52b6d07b..8d4de79b07 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/SyncIterablePublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SyncIterablePublisherTest.scala @@ -19,7 +19,7 @@ class SyncIterablePublisherTest extends AkkaPublisherVerification[Int] { else 0 until elements.toInt - Source(SynchronousIterablePublisher(iterable, "synchronous-iterable-publisher")).runWith(Sink.publisher()) + Source(SynchronousIterablePublisher(iterable, "synchronous-iterable-publisher")).runWith(Sink.publisher) } } diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/ScriptedTest.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/ScriptedTest.scala index d702212557..c22ca84405 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/ScriptedTest.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/ScriptedTest.scala @@ -19,7 +19,7 @@ trait ScriptedTest extends Matchers { class ScriptException(msg: String) extends RuntimeException(msg) def toPublisher[In, Out]: (Source[Out, _], ActorFlowMaterializer) ⇒ Publisher[Out] = - (f, m) ⇒ f.runWith(Sink.publisher())(m) + (f, m) ⇒ f.runWith(Sink.publisher)(m) object Script { def apply[In, Out](phases: (Seq[In], Seq[Out])*): Script[In, Out] = { diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala index 0b08fb137e..9fad368a6a 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala @@ -44,7 +44,7 @@ abstract class TwoStreamsSetup extends AkkaSpec { def completedPublisher[T]: Publisher[T] = StreamTestKit.emptyPublisher[T] - def nonemptyPublisher[T](elems: immutable.Iterable[T]): Publisher[T] = Source(elems).runWith(Sink.publisher()) + def nonemptyPublisher[T](elems: immutable.Iterable[T]): Publisher[T] = Source(elems).runWith(Sink.publisher) def soonToFailPublisher[T]: Publisher[T] = StreamTestKit.lazyErrorPublisher[T](TestException) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index d514e551a3..491598f51d 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -276,10 +276,10 @@ public class SourceTest extends StreamTest { final JavaTestKit probe = new JavaTestKit(system); final Iterable input = Arrays.asList("A", "B", "C"); - Source.from(input).runWith(Sink.onComplete(new Procedure() { + Source.from(input).runWith(Sink.onComplete(new Procedure>() { @Override - public void apply(BoxedUnit param) throws Exception { - probe.getRef().tell(param, ActorRef.noSender()); + public void apply(Try param) throws Exception { + probe.getRef().tell(param.get(), ActorRef.noSender()); } }), materializer); diff --git a/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala index e6e2806257..5b0b29e4f6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala @@ -108,7 +108,7 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest { val c1 = StreamTestKit.SubscriberProbe[String]() val c2 = flowOut.subscribe(c1) - val p = Source(0 to 100).runWith(Sink.publisher()) + val p = Source(0 to 100).runWith(Sink.publisher) p.subscribe(flowIn) val s = c1.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala index 59fcb9124b..46423199d6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala @@ -24,14 +24,14 @@ class FlowBufferSpec extends AkkaSpec { "pass elements through normally in backpressured mode" in { val future: Future[Seq[Int]] = Source(1 to 1000).buffer(100, overflowStrategy = OverflowStrategy.backpressure).grouped(1001). - runWith(Sink.head()) + runWith(Sink.head) Await.result(future, 3.seconds) should be(1 to 1000) } "pass elements through normally in backpressured mode with buffer size one" in { val futureSink = Sink.head[Seq[Int]] val future = Source(1 to 1000).buffer(1, overflowStrategy = OverflowStrategy.backpressure).grouped(1001). - runWith(Sink.head()) + runWith(Sink.head) Await.result(future, 3.seconds) should be(1 to 1000) } @@ -44,7 +44,7 @@ class FlowBufferSpec extends AkkaSpec { .buffer(5, overflowStrategy = OverflowStrategy.backpressure) .buffer(128, overflowStrategy = OverflowStrategy.backpressure) .grouped(1001) - .runWith(Sink.head()) + .runWith(Sink.head) Await.result(future, 3.seconds) should be(1 to 1000) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala index 6a0bd87c53..6291ce0796 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala @@ -26,7 +26,7 @@ class FlowForeachSpec extends AkkaSpec { } "complete the future for an empty stream" in { - Source.empty.runForeach(testActor ! _) onSuccess { + Source.empty[String].runForeach(testActor ! _) onSuccess { case _ ⇒ testActor ! "done" } expectMsg("done") @@ -46,4 +46,4 @@ class FlowForeachSpec extends AkkaSpec { } -} \ No newline at end of file +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala index 920f6bda83..9572eec255 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala @@ -20,7 +20,7 @@ class FlowFromFutureSpec extends AkkaSpec { "A Flow based on a Future" must { "produce one element from already successful Future" in { - val p = Source(Future.successful(1)).runWith(Sink.publisher()) + val p = Source(Future.successful(1)).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -32,7 +32,7 @@ class FlowFromFutureSpec extends AkkaSpec { "produce error from already failed Future" in { val ex = new RuntimeException("test") with NoStackTrace - val p = Source(Future.failed[Int](ex)).runWith(Sink.publisher()) + val p = Source(Future.failed[Int](ex)).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) c.expectSubscriptionAndError(ex) @@ -40,7 +40,7 @@ class FlowFromFutureSpec extends AkkaSpec { "produce one element when Future is completed" in { val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.publisher()) + val p = Source(promise.future).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -54,7 +54,7 @@ class FlowFromFutureSpec extends AkkaSpec { "produce one element when Future is completed but not before request" in { val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.publisher()) + val p = Source(promise.future).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -67,7 +67,7 @@ class FlowFromFutureSpec extends AkkaSpec { "produce elements with multiple subscribers" in { val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.publisher()) + val p = Source(promise.future).runWith(Sink.publisher) val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c1) @@ -85,7 +85,7 @@ class FlowFromFutureSpec extends AkkaSpec { "produce elements to later subscriber" in { val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.publisher()) + val p = Source(promise.future).runWith(Sink.publisher) val keepAlive = StreamTestKit.SubscriberProbe[Int]() val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() @@ -106,7 +106,7 @@ class FlowFromFutureSpec extends AkkaSpec { "allow cancel before receiving element" in { val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.publisher()) + val p = Source(promise.future).runWith(Sink.publisher) val keepAlive = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(keepAlive) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index 3542d66439..289b0745e0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -34,8 +34,8 @@ class FlowGroupBySpec extends AkkaSpec { } class SubstreamsSupport(groupCount: Int = 2, elementCount: Int = 6) { - val source = Source(1 to elementCount).runWith(Sink.publisher()) - val groupStream = Source(source).groupBy(_ % groupCount).runWith(Sink.publisher()) + val source = Source(1 to elementCount).runWith(Sink.publisher) + val groupStream = Source(source).groupBy(_ % groupCount).runWith(Sink.publisher) val masterSubscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]() groupStream.subscribe(masterSubscriber) @@ -56,7 +56,7 @@ class FlowGroupBySpec extends AkkaSpec { "groupBy" must { "work in the happy case" in new SubstreamsSupport(groupCount = 2) { - val s1 = StreamPuppet(getSubFlow(1).runWith(Sink.publisher())) + val s1 = StreamPuppet(getSubFlow(1).runWith(Sink.publisher)) masterSubscriber.expectNoMsg(100.millis) s1.expectNoMsg(100.millis) @@ -64,7 +64,7 @@ class FlowGroupBySpec extends AkkaSpec { s1.expectNext(1) s1.expectNoMsg(100.millis) - val s2 = StreamPuppet(getSubFlow(0).runWith(Sink.publisher())) + val s2 = StreamPuppet(getSubFlow(0).runWith(Sink.publisher)) s2.expectNoMsg(100.millis) s2.request(2) @@ -91,9 +91,9 @@ class FlowGroupBySpec extends AkkaSpec { } "accept cancellation of substreams" in new SubstreamsSupport(groupCount = 2) { - StreamPuppet(getSubFlow(1).runWith(Sink.publisher())).cancel() + StreamPuppet(getSubFlow(1).runWith(Sink.publisher)).cancel() - val substream = StreamPuppet(getSubFlow(0).runWith(Sink.publisher())) + val substream = StreamPuppet(getSubFlow(0).runWith(Sink.publisher)) substream.request(2) substream.expectNext(2) substream.expectNext(4) @@ -109,7 +109,7 @@ class FlowGroupBySpec extends AkkaSpec { "accept cancellation of master stream when not consumed anything" in { val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() - val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher()) + val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -120,7 +120,7 @@ class FlowGroupBySpec extends AkkaSpec { } "accept cancellation of master stream when substreams are open" in new SubstreamsSupport(groupCount = 3, elementCount = 13) { - val substream = StreamPuppet(getSubFlow(1).runWith(Sink.publisher())) + val substream = StreamPuppet(getSubFlow(1).runWith(Sink.publisher)) substream.request(1) substream.expectNext(1) @@ -138,7 +138,7 @@ class FlowGroupBySpec extends AkkaSpec { } "work with empty input stream" in { - val publisher = Source(List.empty[Int]).groupBy(_ % 2).runWith(Sink.publisher()) + val publisher = Source(List.empty[Int]).groupBy(_ % 2).runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -147,7 +147,7 @@ class FlowGroupBySpec extends AkkaSpec { "abort on onError from upstream" in { val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() - val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher()) + val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -164,7 +164,7 @@ class FlowGroupBySpec extends AkkaSpec { "abort on onError from upstream when substreams are running" in { val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() - val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher()) + val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -176,7 +176,7 @@ class FlowGroupBySpec extends AkkaSpec { upstreamSubscription.sendNext(1) val (_, substream) = subscriber.expectNext() - val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher())) + val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher)) substreamPuppet.request(1) substreamPuppet.expectNext(1) @@ -194,7 +194,7 @@ class FlowGroupBySpec extends AkkaSpec { val exc = TE("test") val publisher = Source(publisherProbeProbe) .groupBy(elem ⇒ if (elem == 2) throw exc else elem % 2) - .runWith(Sink.publisher()) + .runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, Unit])]() publisher.subscribe(subscriber) @@ -206,7 +206,7 @@ class FlowGroupBySpec extends AkkaSpec { upstreamSubscription.sendNext(1) val (_, substream) = subscriber.expectNext() - val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher())) + val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher)) substreamPuppet.request(1) substreamPuppet.expectNext(1) @@ -223,7 +223,7 @@ class FlowGroupBySpec extends AkkaSpec { val exc = TE("test") val publisher = Source(publisherProbeProbe).section(OperationAttributes.supervisionStrategy(resumingDecider))( _.groupBy(elem ⇒ if (elem == 2) throw exc else elem % 2)) - .runWith(Sink.publisher()) + .runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, Unit])]() publisher.subscribe(subscriber) @@ -235,7 +235,7 @@ class FlowGroupBySpec extends AkkaSpec { upstreamSubscription.sendNext(1) val (_, substream1) = subscriber.expectNext() - val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher())) + val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher)) substreamPuppet1.request(10) substreamPuppet1.expectNext(1) @@ -243,7 +243,7 @@ class FlowGroupBySpec extends AkkaSpec { upstreamSubscription.sendNext(4) val (_, substream2) = subscriber.expectNext() - val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher())) + val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher)) substreamPuppet2.request(10) substreamPuppet2.expectNext(4) // note that 2 was dropped diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala index cefb911598..c09938a0aa 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala @@ -39,7 +39,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { testName must { "produce elements" in { - val p = createSource(1 to 3).runWith(Sink.publisher()) + val p = createSource(1 to 3).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -53,7 +53,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { } "complete empty" in { - val p = createSource(immutable.Iterable.empty[Int]).runWith(Sink.publisher()) + val p = createSource(immutable.Iterable.empty[Int]).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) c.expectSubscriptionAndComplete() @@ -108,7 +108,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { } "produce elements with one transformation step" in { - val p = createSource(1 to 3).map(_ * 2).runWith(Sink.publisher()) + val p = createSource(1 to 3).map(_ * 2).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -120,7 +120,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { } "produce elements with two transformation steps" in { - val p = createSource(1 to 4).filter(_ % 2 == 0).map(_ * 2).runWith(Sink.publisher()) + val p = createSource(1 to 4).filter(_ % 2 == 0).map(_ * 2).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -131,7 +131,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { } "not produce after cancel" in { - val p = createSource(1 to 3).runWith(Sink.publisher()) + val p = createSource(1 to 3).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -147,7 +147,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { override def iterator: Iterator[Int] = (1 to 3).iterator.map(x ⇒ if (x == 2) throw new IllegalStateException("not two") else x) } - val p = createSource(iterable).runWith(Sink.publisher()) + val p = createSource(iterable).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -164,7 +164,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { val iterable = new immutable.Iterable[Int] { override def iterator: Iterator[Int] = throw new IllegalStateException("no good iterator") } - val p = createSource(iterable).runWith(Sink.publisher()) + val p = createSource(iterable).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) c.expectSubscriptionAndError().getMessage should be("no good iterator") @@ -178,7 +178,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { override def next(): Int = -1 } } - val p = createSource(iterable).runWith(Sink.publisher()) + val p = createSource(iterable).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) c.expectSubscriptionAndError().getMessage should be("no next") diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala index 8a6159ec65..58e96321e7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala @@ -28,7 +28,7 @@ class FlowMapSpec extends AkkaSpec with ScriptedTest { val probe = StreamTestKit.SubscriberProbe[Int]() Source(List(1)). map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1). - runWith(Sink.publisher()).subscribe(probe) + runWith(Sink.publisher).subscribe(probe) val subscription = probe.expectSubscription() for (_ ← 1 to 10000) { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 8efb707900..de24a5c230 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -68,7 +68,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } val toPublisher: (Source[Any, _], ActorFlowMaterializer) ⇒ Publisher[Any] = - (f, m) ⇒ f.runWith(Sink.publisher())(m) + (f, m) ⇒ f.runWith(Sink.publisher)(m) def toFanoutPublisher[In, Out](initialBufferSize: Int, maximumBufferSize: Int): (Source[Out, _], ActorFlowMaterializer) ⇒ Publisher[Out] = (f, m) ⇒ f.runWith(Sink.fanoutPublisher(initialBufferSize, maximumBufferSize))(m) @@ -157,7 +157,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val c1 = StreamTestKit.SubscriberProbe[String]() flowOut.subscribe(c1) - val source: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher()) + val source: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher) source.subscribe(flowIn) val sub1 = c1.expectSubscription @@ -178,7 +178,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece sub1.request(3) c1.expectNoMsg(200.millis) - val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher()) + val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher) source.subscribe(flowIn) c1.expectNext("1") @@ -197,7 +197,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece sub1.request(3) c1.expectNoMsg(200.millis) - val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher()) + val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher) source.subscribe(flowIn) c1.expectNext("elem-1") @@ -210,7 +210,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val flow: Flow[String, String, _] = Flow[String] val c1 = StreamTestKit.SubscriberProbe[String]() val sink: Sink[String, _] = flow.to(Sink(c1)) - val publisher: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher()) + val publisher: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher) Source(publisher).to(sink).run() val sub1 = c1.expectSubscription @@ -224,7 +224,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "perform transformation operation" in { val flow = Flow[Int].map(i ⇒ { testActor ! i.toString; i.toString }) - val publisher = Source(List(1, 2, 3)).runWith(Sink.publisher()) + val publisher = Source(List(1, 2, 3)).runWith(Sink.publisher) Source(publisher).via(flow).to(Sink.ignore).run() expectMsg("1") @@ -236,7 +236,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val flow = Flow[Int].map(_.toString) val c1 = StreamTestKit.SubscriberProbe[String]() val sink: Sink[Int, _] = flow.to(Sink(c1)) - val publisher: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher()) + val publisher: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher) Source(publisher).to(sink).run() val sub1 = c1.expectSubscription @@ -282,7 +282,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "be covariant" in { val f1: Source[Fruit, _] = Source[Fruit](apples) - val p1: Publisher[Fruit] = Source[Fruit](apples).runWith(Sink.publisher()) + val p1: Publisher[Fruit] = Source[Fruit](apples).runWith(Sink.publisher) val f2: Source[Source[Fruit, _], _] = Source[Fruit](apples).splitWhen(_ ⇒ true) val f3: Source[(Boolean, Source[Fruit, _]), _] = Source[Fruit](apples).groupBy(_ ⇒ true) val f4: Source[(immutable.Seq[Fruit], Source[Fruit, _]), _] = Source[Fruit](apples).prefixAndTail(1) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala index ac6126090d..71fc49f99d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala @@ -35,7 +35,7 @@ class FlowSplitWhenSpec extends AkkaSpec { class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) { val source = Source(1 to elementCount) - val groupStream = source.splitWhen(_ == splitWhen).runWith(Sink.publisher()) + val groupStream = source.splitWhen(_ == splitWhen).runWith(Sink.publisher) val masterSubscriber = StreamTestKit.SubscriberProbe[Source[Int, _]]() groupStream.subscribe(masterSubscriber) @@ -56,7 +56,7 @@ class FlowSplitWhenSpec extends AkkaSpec { "splitWhen" must { "work in the happy case" in new SubstreamsSupport(elementCount = 4) { - val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher())) + val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) masterSubscriber.expectNoMsg(100.millis) s1.request(2) @@ -65,7 +65,7 @@ class FlowSplitWhenSpec extends AkkaSpec { s1.request(1) s1.expectComplete() - val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher())) + val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) s2.request(1) s2.expectNext(3) @@ -80,9 +80,9 @@ class FlowSplitWhenSpec extends AkkaSpec { } "support cancelling substreams" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) { - val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher())) + val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) s1.cancel() - val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher())) + val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) s2.request(4) s2.expectNext(5) @@ -97,7 +97,7 @@ class FlowSplitWhenSpec extends AkkaSpec { } "support cancelling the master stream" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) { - val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher())) + val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) masterSubscription.cancel() s1.request(4) s1.expectNext(1) @@ -113,7 +113,7 @@ class FlowSplitWhenSpec extends AkkaSpec { val exc = TE("test") val publisher = Source(publisherProbeProbe) .splitWhen(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) - .runWith(Sink.publisher()) + .runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[Source[Int, Unit]]() publisher.subscribe(subscriber) @@ -125,7 +125,7 @@ class FlowSplitWhenSpec extends AkkaSpec { upstreamSubscription.sendNext(1) val substream = subscriber.expectNext() - val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher())) + val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher)) substreamPuppet.request(10) substreamPuppet.expectNext(1) @@ -145,7 +145,7 @@ class FlowSplitWhenSpec extends AkkaSpec { val exc = TE("test") val publisher = Source(publisherProbeProbe).section(OperationAttributes.supervisionStrategy(resumingDecider))( _.splitWhen(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0)) - .runWith(Sink.publisher()) + .runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[Source[Int, Unit]]() publisher.subscribe(subscriber) @@ -157,7 +157,7 @@ class FlowSplitWhenSpec extends AkkaSpec { upstreamSubscription.sendNext(1) val substream1 = subscriber.expectNext() - val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher())) + val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher)) substreamPuppet1.request(10) substreamPuppet1.expectNext(1) @@ -175,7 +175,7 @@ class FlowSplitWhenSpec extends AkkaSpec { upstreamSubscription.sendNext(6) substreamPuppet1.expectComplete() val substream2 = subscriber.expectNext() - val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher())) + val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher)) substreamPuppet2.request(10) substreamPuppet2.expectNext(6) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala index 93b049dfe0..e7e5c19770 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala @@ -22,7 +22,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug "A Flow with transform operations" must { "produce one-to-one transformation as expected" in { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher()) + val p = Source(List(1, 2, 3)).runWith(Sink.publisher) val p2 = Source(p). transform(() ⇒ new PushStage[Int, Int] { var tot = 0 @@ -31,7 +31,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug ctx.push(tot) } }). - runWith(Sink.publisher()) + runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -45,7 +45,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "produce one-to-several transformation as expected" in { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher()) + val p = Source(List(1, 2, 3)).runWith(Sink.publisher) val p2 = Source(p). transform(() ⇒ new StatefulStage[Int, Int] { var tot = 0 @@ -65,7 +65,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } }). - runWith(Sink.publisher()) + runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -102,7 +102,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug ctx.pull() } else ctx.push(elem) } - }).runWith(Sink.publisher()) + }).runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[Int]() p.subscribe(subscriber) @@ -128,7 +128,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "produce dropping transformation as expected" in { - val p = Source(List(1, 2, 3, 4)).runWith(Sink.publisher()) + val p = Source(List(1, 2, 3, 4)).runWith(Sink.publisher) val p2 = Source(p). transform(() ⇒ new PushStage[Int, Int] { var tot = 0 @@ -140,7 +140,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug ctx.push(tot) } }). - runWith(Sink.publisher()) + runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -154,7 +154,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "produce multi-step transformation as expected" in { - val p = Source(List("a", "bc", "def")).runWith(Sink.publisher()) + val p = Source(List("a", "bc", "def")).runWith(Sink.publisher) val p2 = Source(p). transform(() ⇒ new PushStage[String, Int] { var concat = "" @@ -193,7 +193,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "support emit onUpstreamFinish" in { - val p = Source(List("a")).runWith(Sink.publisher()) + val p = Source(List("a")).runWith(Sink.publisher) val p2 = Source(p). transform(() ⇒ new StatefulStage[String, String] { var s = "" @@ -206,7 +206,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug override def onUpstreamFinish(ctx: Context[String]) = terminationEmit(Iterator.single(s + "B"), ctx) }). - runWith(Sink.publisher()) + runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[String]() p2.subscribe(c) val s = c.expectSubscription() @@ -228,7 +228,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug ctx.push(element) } }). - runWith(Sink.publisher()) + runWith(Sink.publisher) val proc = p.expectSubscription val c = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(c) @@ -242,7 +242,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "report error when exception is thrown" in { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher()) + val p = Source(List(1, 2, 3)).runWith(Sink.publisher) val p2 = Source(p). transform(() ⇒ new StatefulStage[Int, Int] { override def initial = new State { @@ -255,7 +255,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } } }). - runWith(Sink.publisher()) + runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -269,7 +269,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "support emit of final elements when onUpstreamFailure" in { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher()) + val p = Source(List(1, 2, 3)).runWith(Sink.publisher) val p2 = Source(p). map(elem ⇒ if (elem == 2) throw new IllegalArgumentException("two not allowed") else elem). transform(() ⇒ new StatefulStage[Int, Int] { @@ -282,7 +282,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } }). filter(elem ⇒ elem != 1). // it's undefined if element 1 got through before the error or not - runWith(Sink.publisher()) + runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -296,7 +296,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "support cancel as expected" in { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher()) + val p = Source(List(1, 2, 3)).runWith(Sink.publisher) val p2 = Source(p). transform(() ⇒ new StatefulStage[Int, Int] { override def initial = new State { @@ -304,7 +304,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug emit(Iterator(elem, elem), ctx) } }). - runWith(Sink.publisher()) + runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -318,7 +318,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "support producing elements from empty inputs" in { - val p = Source(List.empty[Int]).runWith(Sink.publisher()) + val p = Source(List.empty[Int]).runWith(Sink.publisher) val p2 = Source(p). transform(() ⇒ new StatefulStage[Int, Int] { override def initial = new State { @@ -327,7 +327,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug override def onUpstreamFinish(ctx: Context[Int]) = terminationEmit(Iterator(1, 2, 3), ctx) }). - runWith(Sink.publisher()) + runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala index 570536c21f..524bcceeb5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala @@ -24,7 +24,7 @@ class FlowSupervisionSpec extends AkkaSpec { val failingMap = Flow[Int].map(n ⇒ if (n == 3) throw exc else n) def run(f: Flow[Int, Int, Unit]): immutable.Seq[Int] = - Await.result(Source((1 to 5).toSeq ++ (1 to 5)).via(f).grouped(1000).runWith(Sink.head()), 3.seconds) + Await.result(Source((1 to 5).toSeq ++ (1 to 5)).via(f).grouped(1000).runWith(Sink.head), 3.seconds) "Stream supervision" must { @@ -46,7 +46,7 @@ class FlowSupervisionSpec extends AkkaSpec { "complete stream with NPE failure when null is emitted" in { intercept[NullPointerException] { - Await.result(Source(List("a", "b")).map(_ ⇒ null).grouped(1000).runWith(Sink.head()), 3.seconds) + Await.result(Source(List("a", "b")).map(_ ⇒ null).grouped(1000).runWith(Sink.head), 3.seconds) }.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg) } @@ -54,7 +54,7 @@ class FlowSupervisionSpec extends AkkaSpec { val nullMap = Flow[String].map(elem ⇒ if (elem == "b") null else elem) .withAttributes(supervisionStrategy(Supervision.resumingDecider)) val result = Await.result(Source(List("a", "b", "c")).via(nullMap) - .grouped(1000).runWith(Sink.head()), 3.seconds) + .grouped(1000).runWith(Sink.head), 3.seconds) result should be(List("a", "c")) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala index 8b9fe16393..e9bfdc65ac 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala @@ -30,7 +30,7 @@ class FlowTimerTransformerSpec extends AkkaSpec { } override def isComplete: Boolean = !isTimerActive("tick") }). - runWith(Sink.publisher()) + runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -74,7 +74,7 @@ class FlowTimerTransformerSpec extends AkkaSpec { def onNext(element: Int) = Nil override def onTimer(timerKey: Any) = throw exception - }).runWith(Sink.publisher()) + }).runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala index 76b8b8b07b..7effae8692 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala @@ -143,7 +143,7 @@ class GraphFlowSpec extends AkkaSpec { "work with a Sink when having KeyedSource inside" in { val probe = StreamTestKit.SubscriberProbe[Int]() - val source = Source.apply(Source.subscriber[Int]()) { implicit b ⇒ + val source = Source.apply(Source.subscriber[Int]) { implicit b ⇒ subSource ⇒ subSource.outlet } @@ -317,7 +317,7 @@ class GraphFlowSpec extends AkkaSpec { val subscriber = m1 val publisher = m3 - source1.runWith(Sink.publisher()).subscribe(subscriber) + source1.runWith(Sink.publisher).subscribe(subscriber) publisher.subscribe(probe) validateProbe(probe, stdRequests, stdResult) @@ -347,7 +347,7 @@ class GraphFlowSpec extends AkkaSpec { val subscriber = m1 val publisher = m2 - source1.runWith(Sink.publisher()).subscribe(subscriber) + source1.runWith(Sink.publisher).subscribe(subscriber) publisher.subscribe(probe) validateProbe(probe, 4, (0 to 3).toSet) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala index 5ccffae185..4ad47ab7a3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala @@ -40,7 +40,7 @@ class GraphJunctionAttributesSpec extends AkkaSpec { zip.out } - val future = source.grouped(10).runWith(Sink.head()) + val future = source.grouped(10).runWith(Sink.head) // FIXME #16435 drop(2) needed because first two SlowTicks get only one FastTick Await.result(future, 2.seconds).map(_._2.size).filter(_ == 1).drop(2) should be(Nil) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala index b4ad0649d2..4e5e730bd7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala @@ -152,7 +152,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec with ConversionCheckedTripleEqual } "be able to run plain flow" in { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher()) + val p = Source(List(1, 2, 3)).runWith(Sink.publisher) val s = SubscriberProbe[Int] val flow = Flow[Int].map(_ * 2) FlowGraph.closed() { implicit builder ⇒ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala index 1427b2859e..69fb778008 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala @@ -26,7 +26,7 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest { "yield the first value" in { val p = StreamTestKit.PublisherProbe[Int]() - val f: Future[Int] = Source(p).map(identity).runWith(Sink.head()) + val f: Future[Int] = Source(p).map(identity).runWith(Sink.head) val proc = p.expectSubscription proc.expectRequest() proc.sendNext(42) @@ -50,7 +50,7 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest { "yield the first error" in { val p = StreamTestKit.PublisherProbe[Int]() - val f = Source(p).runWith(Sink.head()) + val f = Source(p).runWith(Sink.head) val proc = p.expectSubscription proc.expectRequest() val ex = new RuntimeException("ex") @@ -61,7 +61,7 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest { "yield NoSuchElementExcption for empty stream" in { val p = StreamTestKit.PublisherProbe[Int]() - val f = Source(p).runWith(Sink.head()) + val f = Source(p).runWith(Sink.head) val proc = p.expectSubscription proc.expectRequest() proc.sendComplete() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala index 3e7580a403..aab0ae68e2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala @@ -12,7 +12,7 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals { implicit val mat = ActorFlowMaterializer() val source = Source(List(1, 2, 3)) - val sink = Flow[Int].grouped(10).toMat(Sink.head())(Keep.right) + val sink = Flow[Int].grouped(10).toMat(Sink.head)(Keep.right) "Reverse Arrows in the Graph DSL" must { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 6502aede56..a26bb30c52 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -20,7 +20,7 @@ class SourceSpec extends AkkaSpec { "Singleton Source" must { "produce element" in { - val p = Source.single(1).runWith(Sink.publisher()) + val p = Source.single(1).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -30,7 +30,7 @@ class SourceSpec extends AkkaSpec { } "produce elements to later subscriber" in { - val p = Source.single(1).runWith(Sink.publisher()) + val p = Source.single(1).runWith(Sink.publisher) val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c1) @@ -50,7 +50,7 @@ class SourceSpec extends AkkaSpec { "Empty Source" must { "complete immediately" in { - val p = Source.empty.runWith(Sink.publisher()) + val p = Source.empty.runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) c.expectSubscriptionAndComplete() @@ -64,7 +64,7 @@ class SourceSpec extends AkkaSpec { "Failed Source" must { "emit error immediately" in { val ex = new RuntimeException with NoStackTrace - val p = Source.failed(ex).runWith(Sink.publisher()) + val p = Source.failed(ex).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) c.expectSubscriptionAndError(ex) @@ -77,7 +77,7 @@ class SourceSpec extends AkkaSpec { "Lazy Empty Source" must { "complete materialized future when stream cancels" in { - val neverSource = Source.lazyEmpty() + val neverSource = Source.lazyEmpty val pubSink = Sink.publisher val (f, neverPub) = neverSource.toMat(pubSink)(Keep.both).run() @@ -94,7 +94,7 @@ class SourceSpec extends AkkaSpec { } "allow external triggering of completion" in { - val neverSource = Source.lazyEmpty[Int]() + val neverSource = Source.lazyEmpty[Int] val counterSink = Sink.fold[Int, Int](0) { (acc, _) ⇒ acc + 1 } val (neverPromise, counterFuture) = neverSource.toMat(counterSink)(Keep.both).run() @@ -107,7 +107,7 @@ class SourceSpec extends AkkaSpec { } "allow external triggering of onError" in { - val neverSource = Source.lazyEmpty() + val neverSource = Source.lazyEmpty val counterSink = Sink.fold[Int, Int](0) { (acc, _) ⇒ acc + 1 } val (neverPromise, counterFuture) = neverSource.toMat(counterSink)(Keep.both).run() @@ -160,7 +160,7 @@ class SourceSpec extends AkkaSpec { "Repeat Source" must { "repeat as long as it takes" in { import FlowGraph.Implicits._ - val result = Await.result(Source.repeat(42).grouped(10000).runWith(Sink.head()), 1.second) + val result = Await.result(Source.repeat(42).grouped(10000).runWith(Sink.head), 1.second) result.size should ===(10000) result.toSet should ===(Set(42)) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala index 10ecb0dc15..74181f942b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala @@ -40,7 +40,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { "timeout and cancel substream publishers when no-one subscribes to them after some time (time them out)" in { val publisherProbe = StreamTestKit.PublisherProbe[Int]() - val publisher = Source(publisherProbe).groupBy(_ % 3).runWith(Sink.publisher()) + val publisher = Source(publisherProbe).groupBy(_ % 3).runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -56,14 +56,14 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { val (_, s1) = subscriber.expectNext() // should not break normal usage val s1SubscriberProbe = StreamTestKit.SubscriberProbe[Int]() - s1.runWith(Sink.publisher()).subscribe(s1SubscriberProbe) + s1.runWith(Sink.publisher).subscribe(s1SubscriberProbe) s1SubscriberProbe.expectSubscription().request(100) s1SubscriberProbe.expectNext(1) val (_, s2) = subscriber.expectNext() // should not break normal usage val s2SubscriberProbe = StreamTestKit.SubscriberProbe[Int]() - s2.runWith(Sink.publisher()).subscribe(s2SubscriberProbe) + s2.runWith(Sink.publisher).subscribe(s2SubscriberProbe) s2SubscriberProbe.expectSubscription().request(100) s2SubscriberProbe.expectNext(2) @@ -72,13 +72,13 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { // sleep long enough for it to be cleaned up Thread.sleep(1000) - val f = s3.runWith(Sink.head()).recover { case _: SubscriptionTimeoutException ⇒ "expected" } + val f = s3.runWith(Sink.head).recover { case _: SubscriptionTimeoutException ⇒ "expected" } Await.result(f, 300.millis) should equal("expected") } "timeout and stop groupBy parent actor if none of the substreams are actually consumed" in { val publisherProbe = StreamTestKit.PublisherProbe[Int]() - val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher()) + val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -103,7 +103,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { "not timeout and cancel substream publishers when they have been subscribed to" in { val publisherProbe = StreamTestKit.PublisherProbe[Int]() - val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher()) + val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -118,7 +118,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { val (_, s1) = subscriber.expectNext() // should not break normal usage val s1SubscriberProbe = StreamTestKit.SubscriberProbe[Int]() - s1.runWith(Sink.publisher()).subscribe(s1SubscriberProbe) + s1.runWith(Sink.publisher).subscribe(s1SubscriberProbe) val s1Sub = s1SubscriberProbe.expectSubscription() s1Sub.request(1) s1SubscriberProbe.expectNext(1) @@ -126,7 +126,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { val (_, s2) = subscriber.expectNext() // should not break normal usage val s2SubscriberProbe = StreamTestKit.SubscriberProbe[Int]() - s2.runWith(Sink.publisher()).subscribe(s2SubscriberProbe) + s2.runWith(Sink.publisher).subscribe(s2SubscriberProbe) val s2Sub = s2SubscriberProbe.expectSubscription() // sleep long enough for tiemout to trigger if not cancelled diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala index faa2123abe..1d6a719a48 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala @@ -50,7 +50,7 @@ class TickSourceSpec extends AkkaSpec { } "reject multiple subscribers, but keep the first" in { - val p = Source(1.second, 1.second, "tick").runWith(Sink.publisher()) + val p = Source(1.second, 1.second, "tick").runWith(Sink.publisher) val c1 = StreamTestKit.SubscriberProbe[String]() val c2 = StreamTestKit.SubscriberProbe[String]() p.subscribe(c1) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala index 62f9643b81..b31480ae88 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala @@ -25,7 +25,7 @@ private[akka] class ConcatAllImpl(materializer: ActorFlowMaterializer) val takeNextSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ val Extract.Source(source) = primaryInputs.dequeueInputElement() - val publisher = source.runWith(Sink.publisher())(materializer) + val publisher = source.runWith(Sink.publisher)(materializer) // FIXME we can pass the flow to createSubstreamInput (but avoiding copy impl now) val inputs = createAndSubscribeSubstreamInput(publisher) nextPhase(streamSubstream(inputs)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 4e7eb40b7b..402e1a53e0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -14,7 +14,10 @@ import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.{ Future, Promise } -abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends Module { +/** + * INTERNAL API + */ +private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends Module { def create(materializer: ActorFlowMaterializerImpl, flowName: String): (Subscriber[In] @uncheckedVariance, Mat) @@ -31,15 +34,24 @@ abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends Module { } override def subModules: Set[Module] = Set.empty + + def amendShape(attr: OperationAttributes): SinkShape[In] = { + attr.nameOption match { + case None ⇒ shape + case s: Some[String] if s == attributes.nameOption ⇒ shape + case Some(name) ⇒ shape.copy(inlet = new Inlet(name + ".in")) + } + } } /** + * INTERNAL API * Holds the downstream-most [[org.reactivestreams.Publisher]] interface of the materialized flow. * The stream will not have any subscribers attached at this point, which means that after prefetching * elements to fill the internal buffers it will assert back-pressure until * a subscriber connects and creates demand for elements to be emitted. */ -class PublisherSink[In](val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) { +private[akka] class PublisherSink[In](val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) { override def toString: String = "PublisherSink" @@ -54,10 +66,13 @@ class PublisherSink[In](val attributes: OperationAttributes, shape: SinkShape[In } override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] = new PublisherSink[In](attributes, shape) - override def withAttributes(attr: OperationAttributes): Module = new PublisherSink[In](attr, shape) + override def withAttributes(attr: OperationAttributes): Module = new PublisherSink[In](attr, amendShape(attr)) } -final class FanoutPublisherSink[In]( +/** + * INTERNAL API + */ +private[akka] final class FanoutPublisherSink[In]( initialBufferSize: Int, maximumBufferSize: Int, val attributes: OperationAttributes, @@ -75,12 +90,14 @@ final class FanoutPublisherSink[In]( new FanoutPublisherSink[In](initialBufferSize, maximumBufferSize, attributes, shape) override def withAttributes(attr: OperationAttributes): Module = - new FanoutPublisherSink[In](initialBufferSize, maximumBufferSize, attr, shape) + new FanoutPublisherSink[In](initialBufferSize, maximumBufferSize, attr, amendShape(attr)) } -object HeadSink { - /** INTERNAL API */ - private[akka] class HeadSinkSubscriber[In](p: Promise[In]) extends Subscriber[In] { +/** + * INTERNAL API + */ +private[akka] object HeadSink { + class HeadSinkSubscriber[In](p: Promise[In]) extends Subscriber[In] { private val sub = new AtomicReference[Subscription] override def onSubscribe(s: Subscription): Unit = { ReactiveStreamsCompliance.requireNonNullSubscription(s) @@ -105,13 +122,14 @@ object HeadSink { } /** + * INTERNAL API * Holds a [[scala.concurrent.Future]] that will be fulfilled with the first * thing that is signaled to this stream, which can be either an element (after * which the upstream subscription is canceled), an error condition (putting * the Future into the corresponding failed state) or the end-of-stream * (failing the Future with a NoSuchElementException). */ -class HeadSink[In](val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, Future[In]](shape) { +private[akka] class HeadSink[In](val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, Future[In]](shape) { override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { val p = Promise[In]() @@ -120,39 +138,42 @@ class HeadSink[In](val attributes: OperationAttributes, shape: SinkShape[In]) ex } override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Future[In]] = new HeadSink[In](attributes, shape) - override def withAttributes(attr: OperationAttributes): Module = new HeadSink[In](attr, shape) + override def withAttributes(attr: OperationAttributes): Module = new HeadSink[In](attr, amendShape(attr)) override def toString: String = "HeadSink" } /** + * INTERNAL API * Attaches a subscriber to this stream which will just discard all received * elements. */ -final class BlackholeSink(val attributes: OperationAttributes, shape: SinkShape[Any]) extends SinkModule[Any, Unit](shape) { +private[akka] final class BlackholeSink(val attributes: OperationAttributes, shape: SinkShape[Any]) extends SinkModule[Any, Unit](shape) { override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = (new BlackholeSubscriber[Any](materializer.settings.maxInputBufferSize), ()) override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Unit] = new BlackholeSink(attributes, shape) - override def withAttributes(attr: OperationAttributes): Module = new BlackholeSink(attr, shape) + override def withAttributes(attr: OperationAttributes): Module = new BlackholeSink(attr, amendShape(attr)) } /** + * INTERNAL API * Attaches a subscriber to this stream. */ -final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, Unit](shape) { +private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, Unit](shape) { override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = (subscriber, ()) override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Unit] = new SubscriberSink[In](subscriber, attributes, shape) - override def withAttributes(attr: OperationAttributes): Module = new SubscriberSink[In](subscriber, attr, shape) + override def withAttributes(attr: OperationAttributes): Module = new SubscriberSink[In](subscriber, attr, amendShape(attr)) } /** + * INTERNAL API * A sink that immediately cancels its upstream upon materialization. */ -final class CancelSink(val attributes: OperationAttributes, shape: SinkShape[Any]) extends SinkModule[Any, Unit](shape) { +private[akka] final class CancelSink(val attributes: OperationAttributes, shape: SinkShape[Any]) extends SinkModule[Any, Unit](shape) { /** * This method is only used for Sinks that return true from [[#isActive]], which then must @@ -169,14 +190,15 @@ final class CancelSink(val attributes: OperationAttributes, shape: SinkShape[Any } override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Unit] = new CancelSink(attributes, shape) - override def withAttributes(attr: OperationAttributes): Module = new CancelSink(attr, shape) + override def withAttributes(attr: OperationAttributes): Module = new CancelSink(attr, amendShape(attr)) } /** + * INTERNAL API * Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`, * which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]]. */ -final class PropsSink[In](props: Props, val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) { +private[akka] final class PropsSink[In](props: Props, val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) { override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { val subscriberRef = materializer.actorOf(props, name = s"$flowName-props") @@ -184,5 +206,5 @@ final class PropsSink[In](props: Props, val attributes: OperationAttributes, sha } override protected def newInstance(shape: SinkShape[In]): SinkModule[In, ActorRef] = new PropsSink[In](props, attributes, shape) - override def withAttributes(attr: OperationAttributes): Module = new PropsSink[In](props, attr, shape) + override def withAttributes(attr: OperationAttributes): Module = new PropsSink[In](props, attr, amendShape(attr)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala index 440c8e6ec7..d60b437efe 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -16,7 +16,10 @@ import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, Promise } import scala.util.{ Failure, Success } -abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends Module { +/** + * INTERNAL API + */ +private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends Module { def create(materializer: ActorFlowMaterializerImpl, flowName: String): (Publisher[Out] @uncheckedVariance, Mat) @@ -33,13 +36,23 @@ abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends Mod } override def subModules: Set[Module] = Set.empty + + def amendShape(attr: OperationAttributes): SourceShape[Out] = { + attr.nameOption match { + case None ⇒ shape + case s: Some[String] if s == attributes.nameOption ⇒ shape + case Some(name) ⇒ shape.copy(outlet = new Outlet(name + ".out")) + } + } + } /** + * INTERNAL API * Holds a `Subscriber` representing the input side of the flow. * The `Subscriber` can later be connected to an upstream `Publisher`. */ -final class SubscriberSource[Out](val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Subscriber[Out]](shape) { +private[akka] final class SubscriberSource[Out](val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Subscriber[Out]](shape) { /** * This method is only used for Sources that return true from [[#isActive]], which then must @@ -61,29 +74,31 @@ final class SubscriberSource[Out](val attributes: OperationAttributes, shape: So } override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Subscriber[Out]] = new SubscriberSource[Out](attributes, shape) - override def withAttributes(attr: OperationAttributes): Module = new SubscriberSource[Out](attr, shape) + override def withAttributes(attr: OperationAttributes): Module = new SubscriberSource[Out](attr, amendShape(attr)) } /** + * INTERNAL API * Construct a transformation starting with given publisher. The transformation steps * are executed by a series of [[org.reactivestreams.Processor]] instances * that mediate the flow of elements downstream and the propagation of * back-pressure upstream. */ -final class PublisherSource[Out](p: Publisher[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) { +private[akka] final class PublisherSource[Out](p: Publisher[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) { override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = (p, ()) override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Unit] = new PublisherSource[Out](p, attributes, shape) - override def withAttributes(attr: OperationAttributes): Module = new PublisherSource[Out](p, attr, shape) + override def withAttributes(attr: OperationAttributes): Module = new PublisherSource[Out](p, attr, amendShape(attr)) } /** + * INTERNAL API * Start a new `Source` from the given `Future`. The stream will consist of * one element when the `Future` is completed with a successful value, which * may happen before or after materializing the `Flow`. * The stream terminates with an error if the `Future` is completed with a failure. */ -final class FutureSource[Out](future: Future[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) { +private[akka] final class FutureSource[Out](future: Future[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) { override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = future.value match { case Some(Success(element)) ⇒ @@ -96,10 +111,13 @@ final class FutureSource[Out](future: Future[Out], val attributes: OperationAttr } override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Unit] = new FutureSource(future, attributes, shape) - override def withAttributes(attr: OperationAttributes): Module = new FutureSource(future, attr, shape) + override def withAttributes(attr: OperationAttributes): Module = new FutureSource(future, attr, amendShape(attr)) } -final class LazyEmptySource[Out](val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Unit]](shape) { +/** + * INTERNAL API + */ +private[akka] final class LazyEmptySource[Out](val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Unit]](shape) { import ReactiveStreamsCompliance._ override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { @@ -123,17 +141,18 @@ final class LazyEmptySource[Out](val attributes: OperationAttributes, shape: Sou } override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Promise[Unit]] = new LazyEmptySource[Out](attributes, shape) - override def withAttributes(attr: OperationAttributes): Module = new LazyEmptySource(attr, shape) + override def withAttributes(attr: OperationAttributes): Module = new LazyEmptySource(attr, amendShape(attr)) } /** + * INTERNAL API * Elements are emitted periodically with the specified interval. * The tick element will be delivered to downstream consumers that has requested any elements. * If a consumer has not requested any elements at the point in time when the tick * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ -final class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Out, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Cancellable](shape) { +private[akka] final class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Out, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Cancellable](shape) { override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { val cancelled = new AtomicBoolean(false) @@ -150,14 +169,15 @@ final class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDurati } override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Cancellable] = new TickSource[Out](initialDelay, interval, tick, attributes, shape) - override def withAttributes(attr: OperationAttributes): Module = new TickSource(initialDelay, interval, tick, attr, shape) + override def withAttributes(attr: OperationAttributes): Module = new TickSource(initialDelay, interval, tick, attr, amendShape(attr)) } /** + * INTERNAL API * Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`, * which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorPublisher]]. */ -final class PropsSource[Out](props: Props, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) { +private[akka] final class PropsSource[Out](props: Props, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) { override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { val publisherRef = materializer.actorOf(props, name = s"$flowName-0-props") @@ -165,5 +185,5 @@ final class PropsSource[Out](props: Props, val attributes: OperationAttributes, } override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = new PropsSource[Out](props, attributes, shape) - override def withAttributes(attr: OperationAttributes): Module = new PropsSource(props, attr, shape) + override def withAttributes(attr: OperationAttributes): Module = new PropsSource(props, attr, amendShape(attr)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 722015daa6..ee4e4dd331 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -92,7 +92,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * Connect the `KeyedSource` to this `Flow` and then connect it to the `KeyedSink` and run it. * * The returned tuple contains the materialized values of the `KeyedSource` and `KeyedSink`, - * e.g. the `Subscriber` of a `Source.subscriber()` and `Publisher` of a `Sink.publisher()`. + * e.g. the `Subscriber` of a `Source.subscriber` and `Publisher` of a `Sink.publisher`. * * @tparam T materialized type of given KeyedSource * @tparam U materialized type of given KeyedSink @@ -375,6 +375,12 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph val javaToScala = (flow: javadsl.Flow[Out, O, M]) ⇒ flow.asScala scalaToJava andThen section.apply andThen javaToScala }) + + def withAttributes(attr: OperationAttributes): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.withAttributes(attr.asScala)) + + def named(name: String): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.named(name)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 844522c08b..2cb7f48df9 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -12,6 +12,7 @@ import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import scala.concurrent.Future import akka.stream.impl.StreamLayout +import scala.util.Try /** Java API */ object Sink { @@ -51,7 +52,7 @@ object Sink { /** * A `Sink` that immediately cancels its upstream after materialization. */ - def cancelled[T]: Sink[T, Unit] = + def cancelled[T](): Sink[T, Unit] = new Sink(scaladsl.Sink.cancelled) /** @@ -65,7 +66,7 @@ object Sink { * that can handle one [[org.reactivestreams.Subscriber]]. */ def publisher[In](): Sink[In, Publisher[In]] = - new Sink(scaladsl.Sink.publisher()) + new Sink(scaladsl.Sink.publisher) /** * A `Sink` that will invoke the given procedure for each received element. The sink is materialized @@ -88,13 +89,13 @@ object Sink { * completion, apply the provided function with [[scala.util.Success]] * or [[scala.util.Failure]]. */ - def onComplete[In](onComplete: japi.Procedure[Unit]): Sink[In, Unit] = - new Sink(scaladsl.Sink.onComplete[In](x ⇒ onComplete.apply(x))) + def onComplete[In](callback: japi.Procedure[Try[Unit]]): Sink[In, Unit] = + new Sink(scaladsl.Sink.onComplete[In](x ⇒ callback.apply(x))) /** * A `Sink` that materializes into a `Future` of the first value received. */ - def head[In]: Sink[In, Future[In]] = + def head[In](): Sink[In, Future[In]] = new Sink(scaladsl.Sink.head[In]) } @@ -124,4 +125,10 @@ class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[ */ def mapMaterialized[Mat2](f: japi.Function[Mat, Mat2]): Sink[In, Mat2] = new Sink(delegate.mapMaterialized(f.apply _)) + + def withAttributes(attr: OperationAttributes): javadsl.Sink[In, Mat] = + new Sink(delegate.withAttributes(attr.asScala)) + + def named(name: String): javadsl.Sink[In, Mat] = + new Sink(delegate.named(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 1ec33e30ef..b269315d4a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -35,7 +35,7 @@ object Source { * for every connected `Sink`. */ def empty[O](): Source[O, Unit] = - new Source(scaladsl.Source.empty()) + new Source(scaladsl.Source.empty) /** * Create a `Source` with no elements, which does not complete its downstream, @@ -47,7 +47,7 @@ object Source { * to its downstream. */ def lazyEmpty[T](): Source[T, Promise[Unit]] = - new Source[T, Promise[Unit]](scaladsl.Source.lazyEmpty()) + new Source[T, Promise[Unit]](scaladsl.Source.lazyEmpty) /** * Helper to create [[Source]] from `Publisher`. @@ -69,7 +69,7 @@ object Source { * data.add(1); * data.add(2); * data.add(3); - * Source.from(data.iterator()); + * Source.from(() -> data.iterator()); * }}} * * Start a new `Source` from the given Iterator. The produced stream of elements @@ -150,7 +150,7 @@ object Source { * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] */ def subscriber[T](): Source[T, Subscriber[T]] = - new Source(scaladsl.Source.subscriber()) + new Source(scaladsl.Source.subscriber) /** * Concatenates two sources so that the first element @@ -210,7 +210,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour /** * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value - * of the `Sink`, e.g. the `Publisher` of a `Sink.publisher()`. + * of the `Sink`, e.g. the `Publisher` of a `Sink.publisher`. */ def runWith[M](sink: Sink[Out, M], materializer: FlowMaterializer): M = delegate.runWith(sink.asScala)(materializer) @@ -471,4 +471,11 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour val javaToScala = (source: javadsl.Flow[Out, O, M]) ⇒ source.asScala scalaToJava andThen section.apply andThen javaToScala }) + + def withAttributes(attr: OperationAttributes): javadsl.Source[Out, Mat] = + new Source(delegate.withAttributes(attr.asScala)) + + def named(name: String): javadsl.Source[Out, Mat] = + new Source(delegate.named(name)) + } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 7b5d1147e2..ba13b24c55 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -530,6 +530,8 @@ trait FlowOps[+Out, +Mat] { def withAttributes(attr: OperationAttributes): Repr[Out, Mat] + def named(name: String): Repr[Out, Mat] = withAttributes(OperationAttributes.name(name)) + /** INTERNAL API */ private[scaladsl] def andThen[U](op: StageModule): Repr[U, Mat] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala index 2435eda06a..6bbfac70de 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala @@ -22,16 +22,28 @@ final case class OperationAttributes private (attributes: List[OperationAttribut else if (other.attributes.isEmpty) this else OperationAttributes(attributes ::: other.attributes) + /** + * INTERNAL API + */ private[akka] def nameLifted: Option[String] = attributes.collect { case Name(name) ⇒ name }.reduceOption(_ + "-" + _) // FIXME don't do a double-traversal, use a fold instead + /** + * INTERNAL API + */ private[akka] def name: String = nameLifted match { case Some(name) ⇒ name case _ ⇒ "unknown-operation" } + /** + * INTERNAL API + */ + private[akka] def nameOption: Option[String] = + attributes.collectFirst { case Name(name) ⇒ name } + private[akka] def transform(node: StageModule): StageModule = if ((this eq OperationAttributes.none) || (this eq node.attributes)) node else node.withAttributes(attributes = this and node.attributes) @@ -53,8 +65,11 @@ object OperationAttributes { /** * Specifies the name of the operation. + * If the name is null or empty the name is ignored, i.e. [[#none]] is returned. */ - def name(name: String): OperationAttributes = OperationAttributes(Name(name)) + def name(name: String): OperationAttributes = + if (name == null || name.isEmpty) none + else OperationAttributes(Name(name)) /** * Specifies the initial and maximum size of the input buffer. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 8b24909a11..6fc8663cb9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -36,11 +36,13 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) def withAttributes(attr: OperationAttributes): Sink[In, Mat] = new Sink(module.withAttributes(attr).wrap()) + + def named(name: String): Sink[In, Mat] = withAttributes(OperationAttributes.name(name)) } object Sink extends SinkApply { - import OperationAttributes.{ none, name ⇒ named } + import OperationAttributes.none private def shape[T](name: String): SinkShape[T] = SinkShape(new Inlet(name + ".in")) @@ -53,58 +55,32 @@ object Sink extends SinkApply { /** * Helper to create [[Sink]] from `Subscriber`. */ - def apply[T](subscriber: Subscriber[T]): Sink[T, Unit] = new Sink(new SubscriberSink(subscriber, none, shape("SubscriberSink"))) - - /** - * Helper to create [[Sink]] from `Subscriber`. - */ - def apply[T](subscriber: Subscriber[T], name: String): Sink[T, Unit] = new Sink(new SubscriberSink(subscriber, named(name), shape(name))) + def apply[T](subscriber: Subscriber[T]): Sink[T, Unit] = + new Sink(new SubscriberSink(subscriber, none, shape("SubscriberSink"))) /** * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should * be [[akka.stream.actor.ActorSubscriber]]. */ - def apply[T](props: Props): Sink[T, ActorRef] = new Sink(new PropsSink(props, none, shape("PropsSink"))) - - /** - * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor - * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should - * be [[akka.stream.actor.ActorSubscriber]]. - */ - def apply[T](props: Props, name: String): Sink[T, ActorRef] = new Sink(new PropsSink(props, named(name), shape(name))) + def apply[T](props: Props): Sink[T, ActorRef] = + new Sink(new PropsSink(props, none, shape("PropsSink"))) /** * A `Sink` that immediately cancels its upstream after materialization. */ - def cancelled[T](): Sink[T, Unit] = new Sink[Any, Unit](new CancelSink(none, shape("CancelledSink"))) - - /** - * A `Sink` that immediately cancels its upstream after materialization. - */ - def cancelled[T](name: String): Sink[T, Unit] = new Sink[Any, Unit](new CancelSink(named(name), shape(name))) + def cancelled[T]: Sink[T, Unit] = new Sink[Any, Unit](new CancelSink(none, shape("CancelledSink"))) /** * A `Sink` that materializes into a `Future` of the first value received. */ - def head[T](): Sink[T, Future[T]] = new Sink(new HeadSink[T](none, shape("HeadSink"))) - - /** - * A `Sink` that materializes into a `Future` of the first value received. - */ - def head[T](name: String): Sink[T, Future[T]] = new Sink(new HeadSink[T](named(name), shape(name))) + def head[T]: Sink[T, Future[T]] = new Sink(new HeadSink[T](none, shape("HeadSink"))) /** * A `Sink` that materializes into a [[org.reactivestreams.Publisher]]. * that can handle one [[org.reactivestreams.Subscriber]]. */ - def publisher[T](): Sink[T, Publisher[T]] = new Sink(new PublisherSink[T](none, shape("PublisherSink"))) - - /** - * A `Sink` that materializes into a [[org.reactivestreams.Publisher]]. - * that can handle one [[org.reactivestreams.Subscriber]]. - */ - def publisher[T](name: String): Sink[T, Publisher[T]] = new Sink(new PublisherSink[T](named(name), shape(name))) + def publisher[T]: Sink[T, Publisher[T]] = new Sink(new PublisherSink[T](none, shape("PublisherSink"))) /** * A `Sink` that materializes into a [[org.reactivestreams.Publisher]] @@ -113,22 +89,11 @@ object Sink extends SinkApply { def fanoutPublisher[T](initialBufferSize: Int, maximumBufferSize: Int): Sink[T, Publisher[T]] = new Sink(new FanoutPublisherSink[T](initialBufferSize, maximumBufferSize, none, shape("FanoutPublisherSink"))) - /** - * A `Sink` that materializes into a [[org.reactivestreams.Publisher]] - * that can handle more than one [[org.reactivestreams.Subscriber]]. - */ - def fanoutPublisher[T](initialBufferSize: Int, maximumBufferSize: Int, name: String): Sink[T, Publisher[T]] = - new Sink(new FanoutPublisherSink[T](initialBufferSize, maximumBufferSize, named(name), shape(name))) - /** * A `Sink` that will consume the stream and discard the elements. */ - def ignore(): Sink[Any, Unit] = new Sink(new BlackholeSink(none, shape("BlackholeSink"))) - - /** - * A `Sink` that will consume the stream and discard the elements. - */ - def ignore(name: String): Sink[Any, Unit] = new Sink(new BlackholeSink(named(name), shape(name))) + def ignore: Sink[Any, Unit] = + new Sink(new BlackholeSink(none, shape("BlackholeSink"))) /** * A `Sink` that will invoke the given procedure for each received element. The sink is materialized @@ -159,8 +124,7 @@ object Sink extends SinkApply { (stage, promise.future) } - Flow[T].transformMaterializing(newForeachStage).to(Sink.ignore).withAttributes(name("foreach")) - + Flow[T].transformMaterializing(newForeachStage).to(Sink.ignore).named("ForeachSink") } /** @@ -197,8 +161,7 @@ object Sink extends SinkApply { (stage, promise.future) } - Flow[T].transformMaterializing(newFoldStage).to(Sink.ignore).withAttributes(name("fold")) - + Flow[T].transformMaterializing(newFoldStage).to(Sink.ignore).named("FoldSink") } /** @@ -222,6 +185,6 @@ object Sink extends SinkApply { } } - Flow[T].transform(newOnCompleteStage).to(Sink.ignore).withAttributes(name("onComplete")) + Flow[T].transform(newOnCompleteStage).to(Sink.ignore).named("OnCompleteSink") } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index d0e30000df..14ed935920 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -156,7 +156,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) object Source extends SourceApply { - import OperationAttributes.{ none, name ⇒ named } + import OperationAttributes.none private[stream] def apply[Out, Mat](module: SourceModule[Out, Mat]): Source[Out, Mat] = new Source(module) @@ -174,17 +174,6 @@ object Source extends SourceApply { def apply[T](publisher: Publisher[T]): Source[T, Unit] = new Source(new PublisherSource(publisher, none, shape("PublisherSource"))) - /** - * Helper to create [[Source]] from `Publisher`. - * - * Construct a transformation starting with given publisher. The transformation steps - * are executed by a series of [[org.reactivestreams.Processor]] instances - * that mediate the flow of elements downstream and the propagation of - * back-pressure upstream. - */ - def apply[T](publisher: Publisher[T], name: String): Source[T, Unit] = - new Source(new PublisherSource(publisher, named(name), shape(name))) - /** * Helper to create [[Source]] from `Iterator`. * Example usage: `Source(() => Iterator.from(0))` @@ -201,22 +190,6 @@ object Source extends SourceApply { }) } - /** - * Helper to create [[Source]] from `Iterator`. - * Example usage: `Source(() => Iterator.from(0))` - * - * Start a new `Source` from the given function that produces anIterator. - * The produced stream of elements will continue until the iterator runs empty - * or fails during evaluation of the `next()` method. - * Elements are pulled out of the iterator in accordance with the demand coming - * from the downstream transformation steps. - */ - def apply[T](f: () ⇒ Iterator[T], name: String): Source[T, Unit] = { - apply(new immutable.Iterable[T] { - override def iterator: Iterator[T] = f() - }) - } - /** * A graph with the shape of a source logically is a source, this method makes * it so also in type. @@ -232,8 +205,7 @@ object Source extends SourceApply { * stream will see an individual flow of elements (always starting from the * beginning) regardless of when they subscribed. */ - def apply[T](iterable: immutable.Iterable[T]): Source[T, Unit] = { // FIXME add naming of outlet - + def apply[T](iterable: immutable.Iterable[T]): Source[T, Unit] = { Source.empty.transform(() ⇒ { new PushPullStage[Nothing, T] { var iterator: Iterator[T] = null @@ -263,26 +235,17 @@ object Source extends SourceApply { } } - }).withAttributes(OperationAttributes.name("iterable")) + }).named("IterableSource") } - /** - * Start a new `Source` from the given `Future`. The stream will consist of - * one element when the `Future` is completed with a successful value, which - * may happen before or after materializing the `Flow`. - * The stream terminates with an error if the `Future` is completed with a failure. - */ - def apply[T](future: Future[T]): Source[T, Unit] = - new Source(new FutureSource(future, none, shape("FutureSource"))) - /** * Start a new `Source` from the given `Future`. The stream will consist of * one element when the `Future` is completed with a successful value, which * may happen before or after materializing the `Flow`. * The stream terminates with a failure if the `Future` is completed with a failure. */ - def apply[T](future: Future[T], name: String): Source[T, Unit] = - new Source(new FutureSource(future, named(name), shape(name))) + def apply[T](future: Future[T]): Source[T, Unit] = + new Source(new FutureSource(future, none, shape("FutureSource"))) /** * Elements are emitted periodically with the specified interval. @@ -294,46 +257,32 @@ object Source extends SourceApply { def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable] = new Source(new TickSource(initialDelay, interval, tick, none, shape("TickSource"))) - /** - * Elements are emitted periodically with the specified interval. - * The tick element will be delivered to downstream consumers that has requested any elements. - * If a consumer has not requested any elements at the point in time when the tick - * element is produced it will not receive that tick element later. It will - * receive new tick elements as soon as it has requested more elements. - */ - def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T, name: String): Source[T, Cancellable] = - new Source(new TickSource(initialDelay, interval, tick, named(name), shape(name))) - /** * Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should * be [[akka.stream.actor.ActorPublisher]]. */ - def apply[T](props: Props): Source[T, ActorRef] = new Source(new PropsSource(props, none, shape("PropsSource"))) - - /** - * Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor - * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should - * be [[akka.stream.actor.ActorPublisher]]. - */ - def apply[T](props: Props, name: String): Source[T, ActorRef] = new Source(new PropsSource(props, named(name), shape(name))) + def apply[T](props: Props): Source[T, ActorRef] = + new Source(new PropsSource(props, none, shape("PropsSource"))) /** * Create a `Source` with one element. * Every connected `Sink` of this stream will see an individual stream consisting of one element. */ - def single[T](element: T): Source[T, Unit] = apply(SynchronousIterablePublisher(List(element), "single")) // FIXME optimize + def single[T](element: T): Source[T, Unit] = + apply(SynchronousIterablePublisher(List(element), "SingleSource")) // FIXME optimize /** * Create a `Source` that will continually emit the given element. */ - def repeat[T](element: T): Source[T, Unit] = apply(() ⇒ Iterator.continually(element)) // FIXME optimize + def repeat[T](element: T): Source[T, Unit] = + apply(() ⇒ Iterator.continually(element)) // FIXME optimize /** * A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`. */ - def empty[T](): Source[T, Unit] = _empty - private[this] val _empty: Source[Nothing, Unit] = apply(EmptyPublisher, "EmptySource") + def empty[T]: Source[T, Unit] = _empty + private[this] val _empty: Source[Nothing, Unit] = apply(EmptyPublisher) /** * Create a `Source` with no elements, which does not complete its downstream, @@ -344,28 +293,13 @@ object Source extends SourceApply { * be used to externally trigger completion, which the source then signalls * to its downstream. */ - def lazyEmpty[T](): Source[T, Promise[Unit]] = new Source(new LazyEmptySource[T](none, shape("LazyEmptySource"))) - - /** - * Create a `Source` with no elements, which does not complete its downstream, - * until externally triggered to do so. - * - * It materializes a [[scala.concurrent.Promise]] which will be completed - * when the downstream stage of this source cancels. This promise can also - * be used to externally trigger completion, which the source then signalls - * to its downstream. - */ - def lazyEmpty[T](name: String): Source[T, Promise[Unit]] = new Source(new LazyEmptySource[T](named(name), shape(name))) + def lazyEmpty[T]: Source[T, Promise[Unit]] = + new Source(new LazyEmptySource[T](none, shape("LazyEmptySource"))) /** * Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`. */ - def failed[T](cause: Throwable): Source[T, Unit] = apply(ErrorPublisher(cause, "failed"), "FailedSource") - - /** - * Create a `Source` that immediately ends the stream with the `cause` failure to every connected `Sink`. - */ - def failed[T](cause: Throwable, name: String): Source[T, Unit] = apply(ErrorPublisher(cause, "failed"), name) + def failed[T](cause: Throwable): Source[T, Unit] = apply(ErrorPublisher(cause, "FailedSource")) /** * Concatenates two sources so that the first element @@ -385,11 +319,7 @@ object Source extends SourceApply { /** * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] */ - def subscriber[T](): Source[T, Subscriber[T]] = new Source(new SubscriberSource[T](none, shape("SubscriberSource"))) - - /** - * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] - */ - def subscriber[T](name: String): Source[T, Subscriber[T]] = new Source(new SubscriberSource[T](named(name), shape(name))) + def subscriber[T]: Source[T, Subscriber[T]] = + new Source(new SubscriberSource[T](none, shape("SubscriberSource"))) }