!str #16992 Rework Source and Sink name parameter

* Remove name parameter (no overloads), naming is performed using `.withAttributes` or the new
  convenience `.named`. Those adds the OperationAttribute.Name and also change the name of the
  shape Inlet and Outlet.

* Remove Source/Sink parameter list for 0 parameter methods,
  this allows usage of `Sink.head` instead of `Sink.head()`
This commit is contained in:
Patrik Nordwall 2015-03-05 12:21:17 +01:00
parent 53e3dcad06
commit 3dc4e6d077
75 changed files with 336 additions and 364 deletions

View file

@ -152,14 +152,14 @@ class FlowDocSpec extends AkkaSpec {
//#flow-mat-combine
// An empty source that can be shut down explicitly from the outside
val source: Source[Int, Promise[Unit]] = Source.lazyEmpty[Int]()
val source: Source[Int, Promise[Unit]] = Source.lazyEmpty[Int]
// A flow that internally throttles elements to 1/second, and returns a Cancellable
// which can be used to shut down the stream
val flow: Flow[Int, Int, Cancellable] = throttler
// A sink that returns the first element of a stream in the returned Future
val sink: Sink[Int, Future[Int]] = Sink.head[Int]()
val sink: Sink[Int, Future[Int]] = Sink.head[Int]
// By default, the materialized value of the leftmost stage is preserved
val r1: RunnableFlow[Promise[Unit]] = source.via(flow).to(sink)

View file

@ -75,7 +75,7 @@ class FlowErrorDocSpec extends AkkaSpec {
else acc + elem
}
}
val result = source.grouped(1000).runWith(Sink.head())
val result = source.grouped(1000).runWith(Sink.head)
// the negative element cause the scan stage to be restarted,
// i.e. start from 0 again
// result here will be a Future completed with Success(Vector(0, 1, 0, 5, 12))

View file

@ -22,7 +22,7 @@ class GraphCyclesSpec extends AkkaSpec {
val merge = b.add(Merge[Int](2))
val bcast = b.add(Broadcast[Int](2))
source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore()
source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore
merge <~ bcast
}
//#deadlocked
@ -39,7 +39,7 @@ class GraphCyclesSpec extends AkkaSpec {
val merge = b.add(MergePreferred[Int](1))
val bcast = b.add(Broadcast[Int](2))
source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore()
source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore
merge.preferred <~ bcast
}
//#unfair
@ -55,7 +55,7 @@ class GraphCyclesSpec extends AkkaSpec {
val merge = b.add(Merge[Int](2))
val bcast = b.add(Broadcast[Int](2))
source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore()
source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore
merge <~ Flow[Int].buffer(10, OverflowStrategy.dropHead) <~ bcast
}
//#dropping
@ -73,7 +73,7 @@ class GraphCyclesSpec extends AkkaSpec {
val bcast = b.add(Broadcast[Int](2))
source ~> zip.in0
zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore()
zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore
zip.in1 <~ bcast
}
//#zipping-dead
@ -92,7 +92,7 @@ class GraphCyclesSpec extends AkkaSpec {
val start = Source.single(0)
source ~> zip.in0
zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore()
zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore
zip.in1 <~ concat <~ start
concat <~ bcast
}

View file

@ -43,7 +43,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
val impl = new Fixture {
override def tweets: Publisher[Tweet] =
TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.publisher())
TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.publisher)
override def storage = SubscriberProbe[Author]
@ -95,7 +95,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
//#source-publisher
val authorPublisher: Publisher[Author] =
Source(tweets).via(authors).runWith(Sink.publisher())
Source(tweets).via(authors).runWith(Sink.publisher)
authorPublisher.subscribe(storage)
//#source-publisher

View file

@ -60,7 +60,7 @@ class StreamBuffersRateSpec extends AkkaSpec {
"explcit buffers" in {
trait Job
def inboundJobsConnector(): Source[Job, Unit] = Source.empty()
def inboundJobsConnector(): Source[Job, Unit] = Source.empty
//#explicit-buffers-backpressure
// Getting a stream of jobs from an imaginary external system as a Source
val jobs: Source[Job, Unit] = inboundJobsConnector()

View file

@ -67,7 +67,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
zip.out
}
val firstPair: Future[(Int, Int)] = pairs.runWith(Sink.head())
val firstPair: Future[(Int, Int)] = pairs.runWith(Sink.head)
//#source-from-partial-flow-graph
Await.result(firstPair, 300.millis) should equal(1 -> 2)
}
@ -94,7 +94,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
// format: OFF
val (_, matSink: Future[(Int, String)]) =
//#flow-from-partial-flow-graph
pairUpWithToString.runWith(Source(List(1)), Sink.head())
pairUpWithToString.runWith(Source(List(1)), Sink.head)
//#flow-from-partial-flow-graph
// format: ON

View file

@ -41,7 +41,7 @@ class RecipeByteStrings extends RecipeSpec {
val chunksStream = rawBytes.transform(() => new Chunker(ChunkLimit))
//#bytestring-chunker
val chunksFuture = chunksStream.grouped(10).runWith(Sink.head())
val chunksFuture = chunksStream.grouped(10).runWith(Sink.head)
val chunks = Await.result(chunksFuture, 3.seconds)
@ -70,11 +70,11 @@ class RecipeByteStrings extends RecipeSpec {
val bytes1 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9)))
val bytes2 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9, 10)))
Await.result(bytes1.via(limiter).grouped(10).runWith(Sink.head()), 3.seconds)
Await.result(bytes1.via(limiter).grouped(10).runWith(Sink.head), 3.seconds)
.fold(ByteString())(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9))
an[IllegalStateException] must be thrownBy {
Await.result(bytes2.via(limiter).grouped(10).runWith(Sink.head()), 3.seconds)
Await.result(bytes2.via(limiter).grouped(10).runWith(Sink.head), 3.seconds)
}
}
@ -86,7 +86,7 @@ class RecipeByteStrings extends RecipeSpec {
val compacted: Source[ByteString, Unit] = data.map(_.compact)
//#compacting-bytestrings
Await.result(compacted.grouped(10).runWith(Sink.head()), 3.seconds).forall(_.isCompact) should be(true)
Await.result(compacted.grouped(10).runWith(Sink.head), 3.seconds).forall(_.isCompact) should be(true)
}
}

View file

@ -44,7 +44,7 @@ class RecipeDigest extends RecipeSpec {
val digest: Source[ByteString, Unit] = data.transform(() => digestCalculator("SHA-256"))
//#calculating-digest
Await.result(digest.runWith(Sink.head()), 3.seconds) should be(
Await.result(digest.runWith(Sink.head), 3.seconds) should be(
ByteString(
0x24, 0x8d, 0x6a, 0x61,
0xd2, 0x06, 0x38, 0xb8,

View file

@ -19,7 +19,7 @@ class RecipeFlattenSeq extends RecipeSpec {
val flattened: Source[Message, Unit] = myData.mapConcat(identity)
//#flattening-seqs
Await.result(flattened.grouped(8).runWith(Sink.head()), 3.seconds) should be(List("1", "2", "3", "4", "5", "6", "7"))
Await.result(flattened.grouped(8).runWith(Sink.head), 3.seconds) should be(List("1", "2", "3", "4", "5", "6", "7"))
}

View file

@ -43,8 +43,8 @@ class RecipeMultiGroupBy extends RecipeSpec {
//#multi-groupby
val result = multiGroups.map {
case (topic, topicMessages) => topicMessages.grouped(10).map(topic.name + _.mkString("[", ", ", "]")).runWith(Sink.head())
}.mapAsync(identity).grouped(10).runWith(Sink.head())
case (topic, topicMessages) => topicMessages.grouped(10).map(topic.name + _.mkString("[", ", ", "]")).runWith(Sink.head)
}.mapAsync(identity).grouped(10).runWith(Sink.head)
Await.result(result, 3.seconds).toSet should be(Set(
"1[1: a, 1: b, all: c, all: d, 1: e]",

View file

@ -24,7 +24,7 @@ class RecipeParseLines extends RecipeSpec {
val linesStream = rawData.transform(() => parseLines("\r\n", 100))
Await.result(linesStream.grouped(10).runWith(Sink.head()), 3.seconds) should be(List(
Await.result(linesStream.grouped(10).runWith(Sink.head), 3.seconds) should be(List(
"Hello World\r!",
"Hello Akka!",
"Hello Streams!",

View file

@ -35,7 +35,7 @@ class RecipeReduceByKey extends RecipeSpec {
.mapAsync(identity)
//#word-count
Await.result(counts.grouped(10).runWith(Sink.head()), 3.seconds).toSet should be(Set(
Await.result(counts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
("hello", 2),
("world", 1),
("and", 1),
@ -72,7 +72,7 @@ class RecipeReduceByKey extends RecipeSpec {
//#reduce-by-key-general
Await.result(wordCounts.grouped(10).runWith(Sink.head()), 3.seconds).toSet should be(Set(
Await.result(wordCounts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
("hello", 2),
("world", 1),
("and", 1),

View file

@ -16,7 +16,7 @@ class RecipeToStrict extends RecipeSpec {
//#draining-to-seq
val strict: Future[immutable.Seq[Message]] =
myData.grouped(MaxAllowedSeqSize).runWith(Sink.head())
myData.grouped(MaxAllowedSeqSize).runWith(Sink.head)
//#draining-to-seq
Await.result(strict, 3.seconds) should be(List("1", "2", "3"))

View file

@ -37,7 +37,7 @@ class RecipeWorkerPool extends RecipeSpec {
val processedJobs: Source[Result, Unit] = myJobs.via(balancer(worker, 3))
//#worker-pool
Await.result(processedJobs.grouped(10).runWith(Sink.head()), 3.seconds).toSet should be(Set(
Await.result(processedJobs.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
"1 done", "2 done", "3 done", "4 done", "5 done"))
}

View file

@ -61,7 +61,7 @@ sealed trait HttpEntity extends japi.HttpEntity {
}
// TODO timerTransform is meant to be replaced / rewritten, it's currently private[akka]; See https://github.com/akka/akka/issues/16393
dataBytes.section(name("toStrict"))(_.timerTransform(transformer)).runWith(Sink.head())
dataBytes.section(name("toStrict"))(_.timerTransform(transformer)).runWith(Sink.head)
}
/**

View file

@ -133,11 +133,11 @@ private[http] object StreamUtils {
case Nil Nil
case Seq(one) Vector(input.via(one))
case multiple
val (fanoutSub, fanoutPub) = Source.subscriber[ByteString]().toMat(Sink.fanoutPublisher(16, 16))(Keep.both).run()
val (fanoutSub, fanoutPub) = Source.subscriber[ByteString].toMat(Sink.fanoutPublisher(16, 16))(Keep.both).run()
val sources = transformers.map { flow
// Doubly wrap to ensure that subscription to the running publisher happens before the final sources
// are exposed, so there is no race
Source(Source(fanoutPub).via(flow).runWith(Sink.publisher()))
Source(Source(fanoutPub).via(flow).runWith(Sink.publisher))
}
// The fanout publisher must be wired to the original source after all fanout subscribers have been subscribed
input.runWith(Sink(fanoutSub))

View file

@ -122,7 +122,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
private val HttpRequest(POST, uri, List(Accept(Seq(MediaRanges.`*/*`)), Host(_, _), `User-Agent`(_)),
Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext()
uri shouldEqual Uri(s"http://$hostname:$port/chunked")
Await.result(chunkStream.grouped(4).runWith(Sink.head()), 100.millis) shouldEqual chunks
Await.result(chunkStream.grouped(4).runWith(Sink.head), 100.millis) shouldEqual chunks
val serverOutSub = serverOut.expectSubscription()
serverOutSub.expectRequest()
@ -132,7 +132,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
clientInSub.request(1)
val HttpResponse(StatusCodes.PartialContent, List(Age(42), Server(_), Date(_)),
Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`) = clientIn.expectNext()
Await.result(chunkStream2.grouped(1000).runWith(Sink.head()), 100.millis) shouldEqual chunks
Await.result(chunkStream2.grouped(1000).runWith(Sink.head), 100.millis) shouldEqual chunks
clientOutSub.sendComplete()
serverInSub.request(1) // work-around for #16552
@ -205,8 +205,8 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
def acceptConnection(): (SubscriberProbe[HttpRequest], PublisherProbe[HttpResponse]) = {
connSourceSub.request(1)
val incomingConnection = connSource.expectNext()
val sink = Sink.publisher[HttpRequest]()
val source = Source.subscriber[HttpResponse]()
val sink = Sink.publisher[HttpRequest]
val source = Source.subscriber[HttpResponse]
val handler = Flow(sink, source)(Keep.both) { implicit b
(snk, src)

View file

@ -25,7 +25,7 @@ object TestClient extends App {
println(s"Fetching HTTP server version of host `$host` ...")
val connection = Http().outgoingConnection(host)
val result = Source.single(HttpRequest()).via(connection).runWith(Sink.head())
val result = Source.single(HttpRequest()).via(connection).runWith(Sink.head)
result.map(_.header[headers.Server]) onComplete {
case Success(res) println(s"$host is running ${res mkString ", "}")

View file

@ -233,7 +233,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
val parser = newParser
val result = multiParse(newParser)(Seq(prep(start + manyChunks)))
val HttpEntity.Chunked(_, chunks) = result.head.right.get.req.entity
val strictChunks = chunks.grouped(100000).runWith(Sink.head()).awaitResult(awaitAtMost)
val strictChunks = chunks.grouped(100000).runWith(Sink.head).awaitResult(awaitAtMost)
strictChunks.size shouldEqual numChunks
}
}
@ -462,7 +462,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
}
.flatten(FlattenStrategy.concat)
.map(strictEqualify)
.grouped(100000).runWith(Sink.head())
.grouped(100000).runWith(Sink.head)
.awaitResult(awaitAtMost)
protected def parserSettings: ParserSettings = ParserSettings(system)
@ -475,7 +475,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
}
private def compactEntityChunks(data: Source[ChunkStreamPart, Unit]): Future[Seq[ChunkStreamPart]] =
data.grouped(100000).runWith(Sink.head())
data.grouped(100000).runWith(Sink.head)
.fast.recover { case _: NoSuchElementException Nil }
def prep(response: String) = response.stripMarginWithNewline("\r\n")

View file

@ -279,7 +279,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
}
.flatten(FlattenStrategy.concat)
.map(strictEqualify)
.grouped(100000).runWith(Sink.head())
.grouped(100000).runWith(Sink.head)
Await.result(future, 500.millis)
}
@ -298,7 +298,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
}
private def compactEntityChunks(data: Source[ChunkStreamPart, Unit]): Future[Source[ChunkStreamPart, Unit]] =
data.grouped(100000).runWith(Sink.head())
data.grouped(100000).runWith(Sink.head)
.fast.map(source(_: _*))
.fast.recover { case _: NoSuchElementException source() }

View file

@ -254,8 +254,8 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
val renderer = newRenderer
val byteStringSource = Await.result(Source.single(RequestRenderingContext(request, serverAddress)).
section(name("renderer"))(_.transform(() renderer)).
runWith(Sink.head()), 1.second)
val future = byteStringSource.grouped(1000).runWith(Sink.head()).map(_.reduceLeft(_ ++ _).utf8String)
runWith(Sink.head), 1.second)
val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String)
Await.result(future, 250.millis)
}
}

View file

@ -413,8 +413,8 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
val renderer = newRenderer
val byteStringSource = Await.result(Source.single(ctx).
section(name("renderer"))(_.transform(() renderer)).
runWith(Sink.head()), 1.second)
val future = byteStringSource.grouped(1000).runWith(Sink.head()).map(_.reduceLeft(_ ++ _).utf8String)
runWith(Sink.head), 1.second)
val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String)
Await.result(future, 250.millis) -> renderer.isComplete
}

View file

@ -105,7 +105,7 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll {
def collectBytesTo(bytes: ByteString*): Matcher[HttpEntity] =
equal(bytes.toVector).matcher[Seq[ByteString]].compose { entity
val future = entity.dataBytes.grouped(1000).runWith(Sink.head())
val future = entity.dataBytes.grouped(1000).runWith(Sink.head)
Await.result(future, 250.millis)
}

View file

@ -96,6 +96,6 @@ trait RouteTestResultComponent {
failTest("Request was neither completed nor rejected within " + timeout)
private def awaitAllElements[T](data: Source[T, _]): immutable.Seq[T] =
data.grouped(100000).runWith(Sink.head()).awaitResult(timeout)
data.grouped(100000).runWith(Sink.head).awaitResult(timeout)
}
}

View file

@ -111,7 +111,7 @@ abstract class CoderSpec extends WordSpec with CodecSpecSupport with Inspectors
val resultBs =
Source.single(compressed)
.via(Coder.withMaxBytesPerChunk(limit).decoderFlow)
.grouped(4200).runWith(Sink.head())
.grouped(4200).runWith(Sink.head)
.awaitResult(1.second)
forAll(resultBs) { bs

View file

@ -99,7 +99,7 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside {
wrs { complete("Some random and not super short entity.") }
} ~> check {
header[`Content-Range`] should be(None)
val parts = Await.result(responseAs[Multipart.ByteRanges].parts.grouped(1000).runWith(Sink.head()), 1.second)
val parts = Await.result(responseAs[Multipart.ByteRanges].parts.grouped(1000).runWith(Sink.head), 1.second)
parts.size shouldEqual 2
inside(parts(0)) {
case Multipart.ByteRanges.BodyPart(range, entity, unit, headers)
@ -124,7 +124,7 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside {
wrs { complete(HttpEntity.Default(MediaTypes.`text/plain`, content.length, entityData())) }
} ~> check {
header[`Content-Range`] should be(None)
val parts = Await.result(responseAs[Multipart.ByteRanges].parts.grouped(1000).runWith(Sink.head()), 1.second)
val parts = Await.result(responseAs[Multipart.ByteRanges].parts.grouped(1000).runWith(Sink.head), 1.second)
parts.size shouldEqual 2
}
}

View file

@ -213,7 +213,7 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll wi
def haveParts[T <: Multipart](parts: Multipart.BodyPart*): Matcher[Future[T]] =
equal(parts).matcher[Seq[Multipart.BodyPart]] compose { x
Await.result(x
.fast.flatMap(x x.parts.grouped(100).runWith(Sink.head()))
.fast.flatMap(x x.parts.grouped(100).runWith(Sink.head))
.fast.recover { case _: NoSuchElementException Nil }, 1.second)
}
}

View file

@ -28,7 +28,7 @@ trait Decoder {
def decoderFlow: Flow[ByteString, ByteString, Unit]
def decode(input: ByteString)(implicit mat: FlowMaterializer): Future[ByteString] =
Source.single(input).via(decoderFlow).runWith(Sink.head())
Source.single(input).via(decoderFlow).runWith(Sink.head)
}
object Decoder {
val MaxBytesPerChunkDefault: Int = 65536

View file

@ -74,7 +74,7 @@ trait MultipartUnmarshallers {
// note that iter.next() will throw exception if stream fails
iter.foreach {
case BodyPartStart(headers, createEntity)
val entity = createEntity(Source.empty()) match {
val entity = createEntity(Source.empty) match {
case x: HttpEntity.Strict x
case x throw new IllegalStateException("Unexpected entity type from strict BodyPartParser: " + x)
}

View file

@ -29,7 +29,7 @@ abstract class AkkaIdentityProcessorVerification[T](env: TestEnvironment, publis
StreamTestKit.errorPublisher(new Exception("Unable to serve subscribers right now!"))
def processorFromFlow(flow: Flow[T, T, _])(implicit mat: ActorFlowMaterializer): Processor[T, T] = {
val (sub: Subscriber[T], pub: Publisher[T]) = flow.runWith(Source.subscriber[T](), Sink.publisher[T]())
val (sub: Subscriber[T], pub: Publisher[T]) = flow.runWith(Source.subscriber[T], Sink.publisher[T])
new Processor[T, T] {
override def onSubscribe(s: Subscription): Unit = sub.onSubscribe(s)

View file

@ -10,7 +10,7 @@ import org.reactivestreams.Publisher
class ConcatTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
Source(iterable(elements / 2)).concat(Source(iterable((elements + 1) / 2))).runWith(Sink.publisher())
Source(iterable(elements / 2)).concat(Source(iterable((elements + 1) / 2))).runWith(Sink.publisher)
}
// FIXME verifyNoAsyncErrors() without delay is wrong in TCK, enable again in RC4

View file

@ -13,7 +13,7 @@ class FlattenTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val s1 = Source(iterable(elements / 2))
val s2 = Source(iterable((elements + 1) / 2))
Source(List(s1, s2)).flatten(FlattenStrategy.concat).runWith(Sink.publisher())
Source(List(s1, s2)).flatten(FlattenStrategy.concat).runWith(Sink.publisher)
}
// FIXME verifyNoAsyncErrors() without delay is wrong in TCK, enable again in RC4

View file

@ -9,7 +9,7 @@ import org.reactivestreams.Subscriber
class FoldSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
override def createSubscriber(): Subscriber[Int] =
Flow[Int].to(Sink.fold(0)(_ + _)).runWith(Source.subscriber())
Flow[Int].to(Sink.fold(0)(_ + _)).runWith(Source.subscriber)
override def createElement(element: Int): Int = element
}

View file

@ -9,7 +9,7 @@ import org.reactivestreams.Subscriber
class ForeachSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
override def createSubscriber(): Subscriber[Int] =
Flow[Int].to(Sink.foreach { _ }).runWith(Source.subscriber())
Flow[Int].to(Sink.foreach { _ }).runWith(Source.subscriber)
override def createElement(element: Int): Int = element
}

View file

@ -13,7 +13,7 @@ class FuturePublisherTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val p = Promise[Int]()
val pub = Source(p.future).runWith(Sink.publisher())
val pub = Source(p.future).runWith(Sink.publisher)
p.success(0)
pub
}

View file

@ -17,9 +17,9 @@ class GroupByTest extends AkkaPublisherVerification[Int] {
if (elements == 0) EmptyPublisher[Int]
else {
val futureGroupSource =
Source(iterable(elements)).groupBy(elem "all").map { case (_, group) group }.runWith(Sink.head())
Source(iterable(elements)).groupBy(elem "all").map { case (_, group) group }.runWith(Sink.head)
val groupSource = Await.result(futureGroupSource, 3.seconds)
groupSource.runWith(Sink.publisher())
groupSource.runWith(Sink.publisher)
}

View file

@ -11,7 +11,7 @@ import org.reactivestreams._
class IterablePublisherTest extends AkkaPublisherVerification[Int] {
override def createPublisher(elements: Long): Publisher[Int] = {
Source(iterable(elements)).runWith(Sink.publisher())
Source(iterable(elements)).runWith(Sink.publisher)
}
// FIXME #16983

View file

@ -11,7 +11,7 @@ import akka.stream.scaladsl.Sink
class LazyEmptySourceTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] =
Source.lazyEmpty[Int].runWith(Sink.publisher())
Source.lazyEmpty[Int].runWith(Sink.publisher)
override def maxElementsFromPublisher(): Long = 0
}

View file

@ -13,9 +13,9 @@ import org.reactivestreams.Publisher
class PrefixAndTailTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val futureTailSource = Source(iterable(elements)).prefixAndTail(0).map { case (_, tail) tail }.runWith(Sink.head())
val futureTailSource = Source(iterable(elements)).prefixAndTail(0).map { case (_, tail) tail }.runWith(Sink.head)
val tailSource = Await.result(futureTailSource, 3.seconds)
tailSource.runWith(Sink.publisher())
tailSource.runWith(Sink.publisher)
}
}

View file

@ -11,7 +11,7 @@ import org.reactivestreams.Publisher
class SingleElementSourceTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] =
Source.single(1).runWith(Sink.publisher())
Source.single(1).runWith(Sink.publisher)
override def maxElementsFromPublisher(): Long = 1
}

View file

@ -16,9 +16,9 @@ class SplitWhenTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] =
if (elements == 0) EmptyPublisher[Int]
else {
val futureSource = Source(iterable(elements)).splitWhen(elem false).runWith(Sink.head())
val futureSource = Source(iterable(elements)).splitWhen(elem false).runWith(Sink.head)
val source = Await.result(futureSource, 3.seconds)
source.runWith(Sink.publisher())
source.runWith(Sink.publisher)
}
}

View file

@ -19,7 +19,7 @@ class SyncIterablePublisherTest extends AkkaPublisherVerification[Int] {
else
0 until elements.toInt
Source(SynchronousIterablePublisher(iterable, "synchronous-iterable-publisher")).runWith(Sink.publisher())
Source(SynchronousIterablePublisher(iterable, "synchronous-iterable-publisher")).runWith(Sink.publisher)
}
}

View file

@ -19,7 +19,7 @@ trait ScriptedTest extends Matchers {
class ScriptException(msg: String) extends RuntimeException(msg)
def toPublisher[In, Out]: (Source[Out, _], ActorFlowMaterializer) Publisher[Out] =
(f, m) f.runWith(Sink.publisher())(m)
(f, m) f.runWith(Sink.publisher)(m)
object Script {
def apply[In, Out](phases: (Seq[In], Seq[Out])*): Script[In, Out] = {

View file

@ -44,7 +44,7 @@ abstract class TwoStreamsSetup extends AkkaSpec {
def completedPublisher[T]: Publisher[T] = StreamTestKit.emptyPublisher[T]
def nonemptyPublisher[T](elems: immutable.Iterable[T]): Publisher[T] = Source(elems).runWith(Sink.publisher())
def nonemptyPublisher[T](elems: immutable.Iterable[T]): Publisher[T] = Source(elems).runWith(Sink.publisher)
def soonToFailPublisher[T]: Publisher[T] = StreamTestKit.lazyErrorPublisher[T](TestException)

View file

@ -276,10 +276,10 @@ public class SourceTest extends StreamTest {
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<String> input = Arrays.asList("A", "B", "C");
Source.from(input).runWith(Sink.<String>onComplete(new Procedure<BoxedUnit>() {
Source.from(input).runWith(Sink.<String>onComplete(new Procedure<Try<BoxedUnit>>() {
@Override
public void apply(BoxedUnit param) throws Exception {
probe.getRef().tell(param, ActorRef.noSender());
public void apply(Try<BoxedUnit> param) throws Exception {
probe.getRef().tell(param.get(), ActorRef.noSender());
}
}), materializer);

View file

@ -108,7 +108,7 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest {
val c1 = StreamTestKit.SubscriberProbe[String]()
val c2 = flowOut.subscribe(c1)
val p = Source(0 to 100).runWith(Sink.publisher())
val p = Source(0 to 100).runWith(Sink.publisher)
p.subscribe(flowIn)
val s = c1.expectSubscription()

View file

@ -24,14 +24,14 @@ class FlowBufferSpec extends AkkaSpec {
"pass elements through normally in backpressured mode" in {
val future: Future[Seq[Int]] = Source(1 to 1000).buffer(100, overflowStrategy = OverflowStrategy.backpressure).grouped(1001).
runWith(Sink.head())
runWith(Sink.head)
Await.result(future, 3.seconds) should be(1 to 1000)
}
"pass elements through normally in backpressured mode with buffer size one" in {
val futureSink = Sink.head[Seq[Int]]
val future = Source(1 to 1000).buffer(1, overflowStrategy = OverflowStrategy.backpressure).grouped(1001).
runWith(Sink.head())
runWith(Sink.head)
Await.result(future, 3.seconds) should be(1 to 1000)
}
@ -44,7 +44,7 @@ class FlowBufferSpec extends AkkaSpec {
.buffer(5, overflowStrategy = OverflowStrategy.backpressure)
.buffer(128, overflowStrategy = OverflowStrategy.backpressure)
.grouped(1001)
.runWith(Sink.head())
.runWith(Sink.head)
Await.result(future, 3.seconds) should be(1 to 1000)
}

View file

@ -26,7 +26,7 @@ class FlowForeachSpec extends AkkaSpec {
}
"complete the future for an empty stream" in {
Source.empty.runForeach(testActor ! _) onSuccess {
Source.empty[String].runForeach(testActor ! _) onSuccess {
case _ testActor ! "done"
}
expectMsg("done")

View file

@ -20,7 +20,7 @@ class FlowFromFutureSpec extends AkkaSpec {
"A Flow based on a Future" must {
"produce one element from already successful Future" in {
val p = Source(Future.successful(1)).runWith(Sink.publisher())
val p = Source(Future.successful(1)).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -32,7 +32,7 @@ class FlowFromFutureSpec extends AkkaSpec {
"produce error from already failed Future" in {
val ex = new RuntimeException("test") with NoStackTrace
val p = Source(Future.failed[Int](ex)).runWith(Sink.publisher())
val p = Source(Future.failed[Int](ex)).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectSubscriptionAndError(ex)
@ -40,7 +40,7 @@ class FlowFromFutureSpec extends AkkaSpec {
"produce one element when Future is completed" in {
val promise = Promise[Int]()
val p = Source(promise.future).runWith(Sink.publisher())
val p = Source(promise.future).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -54,7 +54,7 @@ class FlowFromFutureSpec extends AkkaSpec {
"produce one element when Future is completed but not before request" in {
val promise = Promise[Int]()
val p = Source(promise.future).runWith(Sink.publisher())
val p = Source(promise.future).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -67,7 +67,7 @@ class FlowFromFutureSpec extends AkkaSpec {
"produce elements with multiple subscribers" in {
val promise = Promise[Int]()
val p = Source(promise.future).runWith(Sink.publisher())
val p = Source(promise.future).runWith(Sink.publisher)
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
@ -85,7 +85,7 @@ class FlowFromFutureSpec extends AkkaSpec {
"produce elements to later subscriber" in {
val promise = Promise[Int]()
val p = Source(promise.future).runWith(Sink.publisher())
val p = Source(promise.future).runWith(Sink.publisher)
val keepAlive = StreamTestKit.SubscriberProbe[Int]()
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
@ -106,7 +106,7 @@ class FlowFromFutureSpec extends AkkaSpec {
"allow cancel before receiving element" in {
val promise = Promise[Int]()
val p = Source(promise.future).runWith(Sink.publisher())
val p = Source(promise.future).runWith(Sink.publisher)
val keepAlive = StreamTestKit.SubscriberProbe[Int]()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(keepAlive)

View file

@ -34,8 +34,8 @@ class FlowGroupBySpec extends AkkaSpec {
}
class SubstreamsSupport(groupCount: Int = 2, elementCount: Int = 6) {
val source = Source(1 to elementCount).runWith(Sink.publisher())
val groupStream = Source(source).groupBy(_ % groupCount).runWith(Sink.publisher())
val source = Source(1 to elementCount).runWith(Sink.publisher)
val groupStream = Source(source).groupBy(_ % groupCount).runWith(Sink.publisher)
val masterSubscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]()
groupStream.subscribe(masterSubscriber)
@ -56,7 +56,7 @@ class FlowGroupBySpec extends AkkaSpec {
"groupBy" must {
"work in the happy case" in new SubstreamsSupport(groupCount = 2) {
val s1 = StreamPuppet(getSubFlow(1).runWith(Sink.publisher()))
val s1 = StreamPuppet(getSubFlow(1).runWith(Sink.publisher))
masterSubscriber.expectNoMsg(100.millis)
s1.expectNoMsg(100.millis)
@ -64,7 +64,7 @@ class FlowGroupBySpec extends AkkaSpec {
s1.expectNext(1)
s1.expectNoMsg(100.millis)
val s2 = StreamPuppet(getSubFlow(0).runWith(Sink.publisher()))
val s2 = StreamPuppet(getSubFlow(0).runWith(Sink.publisher))
s2.expectNoMsg(100.millis)
s2.request(2)
@ -91,9 +91,9 @@ class FlowGroupBySpec extends AkkaSpec {
}
"accept cancellation of substreams" in new SubstreamsSupport(groupCount = 2) {
StreamPuppet(getSubFlow(1).runWith(Sink.publisher())).cancel()
StreamPuppet(getSubFlow(1).runWith(Sink.publisher)).cancel()
val substream = StreamPuppet(getSubFlow(0).runWith(Sink.publisher()))
val substream = StreamPuppet(getSubFlow(0).runWith(Sink.publisher))
substream.request(2)
substream.expectNext(2)
substream.expectNext(4)
@ -109,7 +109,7 @@ class FlowGroupBySpec extends AkkaSpec {
"accept cancellation of master stream when not consumed anything" in {
val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]()
val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher())
val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]()
publisher.subscribe(subscriber)
@ -120,7 +120,7 @@ class FlowGroupBySpec extends AkkaSpec {
}
"accept cancellation of master stream when substreams are open" in new SubstreamsSupport(groupCount = 3, elementCount = 13) {
val substream = StreamPuppet(getSubFlow(1).runWith(Sink.publisher()))
val substream = StreamPuppet(getSubFlow(1).runWith(Sink.publisher))
substream.request(1)
substream.expectNext(1)
@ -138,7 +138,7 @@ class FlowGroupBySpec extends AkkaSpec {
}
"work with empty input stream" in {
val publisher = Source(List.empty[Int]).groupBy(_ % 2).runWith(Sink.publisher())
val publisher = Source(List.empty[Int]).groupBy(_ % 2).runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]()
publisher.subscribe(subscriber)
@ -147,7 +147,7 @@ class FlowGroupBySpec extends AkkaSpec {
"abort on onError from upstream" in {
val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]()
val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher())
val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]()
publisher.subscribe(subscriber)
@ -164,7 +164,7 @@ class FlowGroupBySpec extends AkkaSpec {
"abort on onError from upstream when substreams are running" in {
val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]()
val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher())
val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]()
publisher.subscribe(subscriber)
@ -176,7 +176,7 @@ class FlowGroupBySpec extends AkkaSpec {
upstreamSubscription.sendNext(1)
val (_, substream) = subscriber.expectNext()
val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher()))
val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher))
substreamPuppet.request(1)
substreamPuppet.expectNext(1)
@ -194,7 +194,7 @@ class FlowGroupBySpec extends AkkaSpec {
val exc = TE("test")
val publisher = Source(publisherProbeProbe)
.groupBy(elem if (elem == 2) throw exc else elem % 2)
.runWith(Sink.publisher())
.runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, Unit])]()
publisher.subscribe(subscriber)
@ -206,7 +206,7 @@ class FlowGroupBySpec extends AkkaSpec {
upstreamSubscription.sendNext(1)
val (_, substream) = subscriber.expectNext()
val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher()))
val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher))
substreamPuppet.request(1)
substreamPuppet.expectNext(1)
@ -223,7 +223,7 @@ class FlowGroupBySpec extends AkkaSpec {
val exc = TE("test")
val publisher = Source(publisherProbeProbe).section(OperationAttributes.supervisionStrategy(resumingDecider))(
_.groupBy(elem if (elem == 2) throw exc else elem % 2))
.runWith(Sink.publisher())
.runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, Unit])]()
publisher.subscribe(subscriber)
@ -235,7 +235,7 @@ class FlowGroupBySpec extends AkkaSpec {
upstreamSubscription.sendNext(1)
val (_, substream1) = subscriber.expectNext()
val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher()))
val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher))
substreamPuppet1.request(10)
substreamPuppet1.expectNext(1)
@ -243,7 +243,7 @@ class FlowGroupBySpec extends AkkaSpec {
upstreamSubscription.sendNext(4)
val (_, substream2) = subscriber.expectNext()
val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher()))
val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher))
substreamPuppet2.request(10)
substreamPuppet2.expectNext(4) // note that 2 was dropped

View file

@ -39,7 +39,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec {
testName must {
"produce elements" in {
val p = createSource(1 to 3).runWith(Sink.publisher())
val p = createSource(1 to 3).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -53,7 +53,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec {
}
"complete empty" in {
val p = createSource(immutable.Iterable.empty[Int]).runWith(Sink.publisher())
val p = createSource(immutable.Iterable.empty[Int]).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectSubscriptionAndComplete()
@ -108,7 +108,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec {
}
"produce elements with one transformation step" in {
val p = createSource(1 to 3).map(_ * 2).runWith(Sink.publisher())
val p = createSource(1 to 3).map(_ * 2).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -120,7 +120,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec {
}
"produce elements with two transformation steps" in {
val p = createSource(1 to 4).filter(_ % 2 == 0).map(_ * 2).runWith(Sink.publisher())
val p = createSource(1 to 4).filter(_ % 2 == 0).map(_ * 2).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -131,7 +131,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec {
}
"not produce after cancel" in {
val p = createSource(1 to 3).runWith(Sink.publisher())
val p = createSource(1 to 3).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -147,7 +147,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec {
override def iterator: Iterator[Int] =
(1 to 3).iterator.map(x if (x == 2) throw new IllegalStateException("not two") else x)
}
val p = createSource(iterable).runWith(Sink.publisher())
val p = createSource(iterable).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -164,7 +164,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec {
val iterable = new immutable.Iterable[Int] {
override def iterator: Iterator[Int] = throw new IllegalStateException("no good iterator")
}
val p = createSource(iterable).runWith(Sink.publisher())
val p = createSource(iterable).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectSubscriptionAndError().getMessage should be("no good iterator")
@ -178,7 +178,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec {
override def next(): Int = -1
}
}
val p = createSource(iterable).runWith(Sink.publisher())
val p = createSource(iterable).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectSubscriptionAndError().getMessage should be("no next")

View file

@ -28,7 +28,7 @@ class FlowMapSpec extends AkkaSpec with ScriptedTest {
val probe = StreamTestKit.SubscriberProbe[Int]()
Source(List(1)).
map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1).
runWith(Sink.publisher()).subscribe(probe)
runWith(Sink.publisher).subscribe(probe)
val subscription = probe.expectSubscription()
for (_ 1 to 10000) {

View file

@ -68,7 +68,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
}
val toPublisher: (Source[Any, _], ActorFlowMaterializer) Publisher[Any] =
(f, m) f.runWith(Sink.publisher())(m)
(f, m) f.runWith(Sink.publisher)(m)
def toFanoutPublisher[In, Out](initialBufferSize: Int, maximumBufferSize: Int): (Source[Out, _], ActorFlowMaterializer) Publisher[Out] =
(f, m) f.runWith(Sink.fanoutPublisher(initialBufferSize, maximumBufferSize))(m)
@ -157,7 +157,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val c1 = StreamTestKit.SubscriberProbe[String]()
flowOut.subscribe(c1)
val source: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher())
val source: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher)
source.subscribe(flowIn)
val sub1 = c1.expectSubscription
@ -178,7 +178,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
sub1.request(3)
c1.expectNoMsg(200.millis)
val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher())
val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher)
source.subscribe(flowIn)
c1.expectNext("1")
@ -197,7 +197,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
sub1.request(3)
c1.expectNoMsg(200.millis)
val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher())
val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher)
source.subscribe(flowIn)
c1.expectNext("elem-1")
@ -210,7 +210,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val flow: Flow[String, String, _] = Flow[String]
val c1 = StreamTestKit.SubscriberProbe[String]()
val sink: Sink[String, _] = flow.to(Sink(c1))
val publisher: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher())
val publisher: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher)
Source(publisher).to(sink).run()
val sub1 = c1.expectSubscription
@ -224,7 +224,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
"perform transformation operation" in {
val flow = Flow[Int].map(i { testActor ! i.toString; i.toString })
val publisher = Source(List(1, 2, 3)).runWith(Sink.publisher())
val publisher = Source(List(1, 2, 3)).runWith(Sink.publisher)
Source(publisher).via(flow).to(Sink.ignore).run()
expectMsg("1")
@ -236,7 +236,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val flow = Flow[Int].map(_.toString)
val c1 = StreamTestKit.SubscriberProbe[String]()
val sink: Sink[Int, _] = flow.to(Sink(c1))
val publisher: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher())
val publisher: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher)
Source(publisher).to(sink).run()
val sub1 = c1.expectSubscription
@ -282,7 +282,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
"be covariant" in {
val f1: Source[Fruit, _] = Source[Fruit](apples)
val p1: Publisher[Fruit] = Source[Fruit](apples).runWith(Sink.publisher())
val p1: Publisher[Fruit] = Source[Fruit](apples).runWith(Sink.publisher)
val f2: Source[Source[Fruit, _], _] = Source[Fruit](apples).splitWhen(_ true)
val f3: Source[(Boolean, Source[Fruit, _]), _] = Source[Fruit](apples).groupBy(_ true)
val f4: Source[(immutable.Seq[Fruit], Source[Fruit, _]), _] = Source[Fruit](apples).prefixAndTail(1)

View file

@ -35,7 +35,7 @@ class FlowSplitWhenSpec extends AkkaSpec {
class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) {
val source = Source(1 to elementCount)
val groupStream = source.splitWhen(_ == splitWhen).runWith(Sink.publisher())
val groupStream = source.splitWhen(_ == splitWhen).runWith(Sink.publisher)
val masterSubscriber = StreamTestKit.SubscriberProbe[Source[Int, _]]()
groupStream.subscribe(masterSubscriber)
@ -56,7 +56,7 @@ class FlowSplitWhenSpec extends AkkaSpec {
"splitWhen" must {
"work in the happy case" in new SubstreamsSupport(elementCount = 4) {
val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher()))
val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher))
masterSubscriber.expectNoMsg(100.millis)
s1.request(2)
@ -65,7 +65,7 @@ class FlowSplitWhenSpec extends AkkaSpec {
s1.request(1)
s1.expectComplete()
val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher()))
val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher))
s2.request(1)
s2.expectNext(3)
@ -80,9 +80,9 @@ class FlowSplitWhenSpec extends AkkaSpec {
}
"support cancelling substreams" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) {
val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher()))
val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher))
s1.cancel()
val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher()))
val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher))
s2.request(4)
s2.expectNext(5)
@ -97,7 +97,7 @@ class FlowSplitWhenSpec extends AkkaSpec {
}
"support cancelling the master stream" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) {
val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher()))
val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher))
masterSubscription.cancel()
s1.request(4)
s1.expectNext(1)
@ -113,7 +113,7 @@ class FlowSplitWhenSpec extends AkkaSpec {
val exc = TE("test")
val publisher = Source(publisherProbeProbe)
.splitWhen(elem if (elem == 3) throw exc else elem % 3 == 0)
.runWith(Sink.publisher())
.runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[Source[Int, Unit]]()
publisher.subscribe(subscriber)
@ -125,7 +125,7 @@ class FlowSplitWhenSpec extends AkkaSpec {
upstreamSubscription.sendNext(1)
val substream = subscriber.expectNext()
val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher()))
val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher))
substreamPuppet.request(10)
substreamPuppet.expectNext(1)
@ -145,7 +145,7 @@ class FlowSplitWhenSpec extends AkkaSpec {
val exc = TE("test")
val publisher = Source(publisherProbeProbe).section(OperationAttributes.supervisionStrategy(resumingDecider))(
_.splitWhen(elem if (elem == 3) throw exc else elem % 3 == 0))
.runWith(Sink.publisher())
.runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[Source[Int, Unit]]()
publisher.subscribe(subscriber)
@ -157,7 +157,7 @@ class FlowSplitWhenSpec extends AkkaSpec {
upstreamSubscription.sendNext(1)
val substream1 = subscriber.expectNext()
val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher()))
val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher))
substreamPuppet1.request(10)
substreamPuppet1.expectNext(1)
@ -175,7 +175,7 @@ class FlowSplitWhenSpec extends AkkaSpec {
upstreamSubscription.sendNext(6)
substreamPuppet1.expectComplete()
val substream2 = subscriber.expectNext()
val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher()))
val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher))
substreamPuppet2.request(10)
substreamPuppet2.expectNext(6)

View file

@ -22,7 +22,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
"A Flow with transform operations" must {
"produce one-to-one transformation as expected" in {
val p = Source(List(1, 2, 3)).runWith(Sink.publisher())
val p = Source(List(1, 2, 3)).runWith(Sink.publisher)
val p2 = Source(p).
transform(() new PushStage[Int, Int] {
var tot = 0
@ -31,7 +31,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
ctx.push(tot)
}
}).
runWith(Sink.publisher())
runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -45,7 +45,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
}
"produce one-to-several transformation as expected" in {
val p = Source(List(1, 2, 3)).runWith(Sink.publisher())
val p = Source(List(1, 2, 3)).runWith(Sink.publisher)
val p2 = Source(p).
transform(() new StatefulStage[Int, Int] {
var tot = 0
@ -65,7 +65,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
}
}).
runWith(Sink.publisher())
runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -102,7 +102,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
ctx.pull()
} else ctx.push(elem)
}
}).runWith(Sink.publisher())
}).runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(subscriber)
@ -128,7 +128,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
}
"produce dropping transformation as expected" in {
val p = Source(List(1, 2, 3, 4)).runWith(Sink.publisher())
val p = Source(List(1, 2, 3, 4)).runWith(Sink.publisher)
val p2 = Source(p).
transform(() new PushStage[Int, Int] {
var tot = 0
@ -140,7 +140,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
ctx.push(tot)
}
}).
runWith(Sink.publisher())
runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -154,7 +154,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
}
"produce multi-step transformation as expected" in {
val p = Source(List("a", "bc", "def")).runWith(Sink.publisher())
val p = Source(List("a", "bc", "def")).runWith(Sink.publisher)
val p2 = Source(p).
transform(() new PushStage[String, Int] {
var concat = ""
@ -193,7 +193,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
}
"support emit onUpstreamFinish" in {
val p = Source(List("a")).runWith(Sink.publisher())
val p = Source(List("a")).runWith(Sink.publisher)
val p2 = Source(p).
transform(() new StatefulStage[String, String] {
var s = ""
@ -206,7 +206,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
override def onUpstreamFinish(ctx: Context[String]) =
terminationEmit(Iterator.single(s + "B"), ctx)
}).
runWith(Sink.publisher())
runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[String]()
p2.subscribe(c)
val s = c.expectSubscription()
@ -228,7 +228,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
ctx.push(element)
}
}).
runWith(Sink.publisher())
runWith(Sink.publisher)
val proc = p.expectSubscription
val c = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(c)
@ -242,7 +242,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
}
"report error when exception is thrown" in {
val p = Source(List(1, 2, 3)).runWith(Sink.publisher())
val p = Source(List(1, 2, 3)).runWith(Sink.publisher)
val p2 = Source(p).
transform(() new StatefulStage[Int, Int] {
override def initial = new State {
@ -255,7 +255,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
}
}
}).
runWith(Sink.publisher())
runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -269,7 +269,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
}
"support emit of final elements when onUpstreamFailure" in {
val p = Source(List(1, 2, 3)).runWith(Sink.publisher())
val p = Source(List(1, 2, 3)).runWith(Sink.publisher)
val p2 = Source(p).
map(elem if (elem == 2) throw new IllegalArgumentException("two not allowed") else elem).
transform(() new StatefulStage[Int, Int] {
@ -282,7 +282,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
}
}).
filter(elem elem != 1). // it's undefined if element 1 got through before the error or not
runWith(Sink.publisher())
runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -296,7 +296,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
}
"support cancel as expected" in {
val p = Source(List(1, 2, 3)).runWith(Sink.publisher())
val p = Source(List(1, 2, 3)).runWith(Sink.publisher)
val p2 = Source(p).
transform(() new StatefulStage[Int, Int] {
override def initial = new State {
@ -304,7 +304,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
emit(Iterator(elem, elem), ctx)
}
}).
runWith(Sink.publisher())
runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -318,7 +318,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
}
"support producing elements from empty inputs" in {
val p = Source(List.empty[Int]).runWith(Sink.publisher())
val p = Source(List.empty[Int]).runWith(Sink.publisher)
val p2 = Source(p).
transform(() new StatefulStage[Int, Int] {
override def initial = new State {
@ -327,7 +327,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
override def onUpstreamFinish(ctx: Context[Int]) =
terminationEmit(Iterator(1, 2, 3), ctx)
}).
runWith(Sink.publisher())
runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()

View file

@ -24,7 +24,7 @@ class FlowSupervisionSpec extends AkkaSpec {
val failingMap = Flow[Int].map(n if (n == 3) throw exc else n)
def run(f: Flow[Int, Int, Unit]): immutable.Seq[Int] =
Await.result(Source((1 to 5).toSeq ++ (1 to 5)).via(f).grouped(1000).runWith(Sink.head()), 3.seconds)
Await.result(Source((1 to 5).toSeq ++ (1 to 5)).via(f).grouped(1000).runWith(Sink.head), 3.seconds)
"Stream supervision" must {
@ -46,7 +46,7 @@ class FlowSupervisionSpec extends AkkaSpec {
"complete stream with NPE failure when null is emitted" in {
intercept[NullPointerException] {
Await.result(Source(List("a", "b")).map(_ null).grouped(1000).runWith(Sink.head()), 3.seconds)
Await.result(Source(List("a", "b")).map(_ null).grouped(1000).runWith(Sink.head), 3.seconds)
}.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg)
}
@ -54,7 +54,7 @@ class FlowSupervisionSpec extends AkkaSpec {
val nullMap = Flow[String].map(elem if (elem == "b") null else elem)
.withAttributes(supervisionStrategy(Supervision.resumingDecider))
val result = Await.result(Source(List("a", "b", "c")).via(nullMap)
.grouped(1000).runWith(Sink.head()), 3.seconds)
.grouped(1000).runWith(Sink.head), 3.seconds)
result should be(List("a", "c"))
}

View file

@ -30,7 +30,7 @@ class FlowTimerTransformerSpec extends AkkaSpec {
}
override def isComplete: Boolean = !isTimerActive("tick")
}).
runWith(Sink.publisher())
runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -74,7 +74,7 @@ class FlowTimerTransformerSpec extends AkkaSpec {
def onNext(element: Int) = Nil
override def onTimer(timerKey: Any) =
throw exception
}).runWith(Sink.publisher())
}).runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)

View file

@ -143,7 +143,7 @@ class GraphFlowSpec extends AkkaSpec {
"work with a Sink when having KeyedSource inside" in {
val probe = StreamTestKit.SubscriberProbe[Int]()
val source = Source.apply(Source.subscriber[Int]()) { implicit b
val source = Source.apply(Source.subscriber[Int]) { implicit b
subSource
subSource.outlet
}
@ -317,7 +317,7 @@ class GraphFlowSpec extends AkkaSpec {
val subscriber = m1
val publisher = m3
source1.runWith(Sink.publisher()).subscribe(subscriber)
source1.runWith(Sink.publisher).subscribe(subscriber)
publisher.subscribe(probe)
validateProbe(probe, stdRequests, stdResult)
@ -347,7 +347,7 @@ class GraphFlowSpec extends AkkaSpec {
val subscriber = m1
val publisher = m2
source1.runWith(Sink.publisher()).subscribe(subscriber)
source1.runWith(Sink.publisher).subscribe(subscriber)
publisher.subscribe(probe)
validateProbe(probe, 4, (0 to 3).toSet)

View file

@ -40,7 +40,7 @@ class GraphJunctionAttributesSpec extends AkkaSpec {
zip.out
}
val future = source.grouped(10).runWith(Sink.head())
val future = source.grouped(10).runWith(Sink.head)
// FIXME #16435 drop(2) needed because first two SlowTicks get only one FastTick
Await.result(future, 2.seconds).map(_._2.size).filter(_ == 1).drop(2) should be(Nil)

View file

@ -152,7 +152,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec with ConversionCheckedTripleEqual
}
"be able to run plain flow" in {
val p = Source(List(1, 2, 3)).runWith(Sink.publisher())
val p = Source(List(1, 2, 3)).runWith(Sink.publisher)
val s = SubscriberProbe[Int]
val flow = Flow[Int].map(_ * 2)
FlowGraph.closed() { implicit builder

View file

@ -26,7 +26,7 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest {
"yield the first value" in {
val p = StreamTestKit.PublisherProbe[Int]()
val f: Future[Int] = Source(p).map(identity).runWith(Sink.head())
val f: Future[Int] = Source(p).map(identity).runWith(Sink.head)
val proc = p.expectSubscription
proc.expectRequest()
proc.sendNext(42)
@ -50,7 +50,7 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest {
"yield the first error" in {
val p = StreamTestKit.PublisherProbe[Int]()
val f = Source(p).runWith(Sink.head())
val f = Source(p).runWith(Sink.head)
val proc = p.expectSubscription
proc.expectRequest()
val ex = new RuntimeException("ex")
@ -61,7 +61,7 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest {
"yield NoSuchElementExcption for empty stream" in {
val p = StreamTestKit.PublisherProbe[Int]()
val f = Source(p).runWith(Sink.head())
val f = Source(p).runWith(Sink.head)
val proc = p.expectSubscription
proc.expectRequest()
proc.sendComplete()

View file

@ -12,7 +12,7 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
implicit val mat = ActorFlowMaterializer()
val source = Source(List(1, 2, 3))
val sink = Flow[Int].grouped(10).toMat(Sink.head())(Keep.right)
val sink = Flow[Int].grouped(10).toMat(Sink.head)(Keep.right)
"Reverse Arrows in the Graph DSL" must {

View file

@ -20,7 +20,7 @@ class SourceSpec extends AkkaSpec {
"Singleton Source" must {
"produce element" in {
val p = Source.single(1).runWith(Sink.publisher())
val p = Source.single(1).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -30,7 +30,7 @@ class SourceSpec extends AkkaSpec {
}
"produce elements to later subscriber" in {
val p = Source.single(1).runWith(Sink.publisher())
val p = Source.single(1).runWith(Sink.publisher)
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
@ -50,7 +50,7 @@ class SourceSpec extends AkkaSpec {
"Empty Source" must {
"complete immediately" in {
val p = Source.empty.runWith(Sink.publisher())
val p = Source.empty.runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectSubscriptionAndComplete()
@ -64,7 +64,7 @@ class SourceSpec extends AkkaSpec {
"Failed Source" must {
"emit error immediately" in {
val ex = new RuntimeException with NoStackTrace
val p = Source.failed(ex).runWith(Sink.publisher())
val p = Source.failed(ex).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectSubscriptionAndError(ex)
@ -77,7 +77,7 @@ class SourceSpec extends AkkaSpec {
"Lazy Empty Source" must {
"complete materialized future when stream cancels" in {
val neverSource = Source.lazyEmpty()
val neverSource = Source.lazyEmpty
val pubSink = Sink.publisher
val (f, neverPub) = neverSource.toMat(pubSink)(Keep.both).run()
@ -94,7 +94,7 @@ class SourceSpec extends AkkaSpec {
}
"allow external triggering of completion" in {
val neverSource = Source.lazyEmpty[Int]()
val neverSource = Source.lazyEmpty[Int]
val counterSink = Sink.fold[Int, Int](0) { (acc, _) acc + 1 }
val (neverPromise, counterFuture) = neverSource.toMat(counterSink)(Keep.both).run()
@ -107,7 +107,7 @@ class SourceSpec extends AkkaSpec {
}
"allow external triggering of onError" in {
val neverSource = Source.lazyEmpty()
val neverSource = Source.lazyEmpty
val counterSink = Sink.fold[Int, Int](0) { (acc, _) acc + 1 }
val (neverPromise, counterFuture) = neverSource.toMat(counterSink)(Keep.both).run()
@ -160,7 +160,7 @@ class SourceSpec extends AkkaSpec {
"Repeat Source" must {
"repeat as long as it takes" in {
import FlowGraph.Implicits._
val result = Await.result(Source.repeat(42).grouped(10000).runWith(Sink.head()), 1.second)
val result = Await.result(Source.repeat(42).grouped(10000).runWith(Sink.head), 1.second)
result.size should ===(10000)
result.toSet should ===(Set(42))
}

View file

@ -40,7 +40,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
"timeout and cancel substream publishers when no-one subscribes to them after some time (time them out)" in {
val publisherProbe = StreamTestKit.PublisherProbe[Int]()
val publisher = Source(publisherProbe).groupBy(_ % 3).runWith(Sink.publisher())
val publisher = Source(publisherProbe).groupBy(_ % 3).runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]()
publisher.subscribe(subscriber)
@ -56,14 +56,14 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
val (_, s1) = subscriber.expectNext()
// should not break normal usage
val s1SubscriberProbe = StreamTestKit.SubscriberProbe[Int]()
s1.runWith(Sink.publisher()).subscribe(s1SubscriberProbe)
s1.runWith(Sink.publisher).subscribe(s1SubscriberProbe)
s1SubscriberProbe.expectSubscription().request(100)
s1SubscriberProbe.expectNext(1)
val (_, s2) = subscriber.expectNext()
// should not break normal usage
val s2SubscriberProbe = StreamTestKit.SubscriberProbe[Int]()
s2.runWith(Sink.publisher()).subscribe(s2SubscriberProbe)
s2.runWith(Sink.publisher).subscribe(s2SubscriberProbe)
s2SubscriberProbe.expectSubscription().request(100)
s2SubscriberProbe.expectNext(2)
@ -72,13 +72,13 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
// sleep long enough for it to be cleaned up
Thread.sleep(1000)
val f = s3.runWith(Sink.head()).recover { case _: SubscriptionTimeoutException "expected" }
val f = s3.runWith(Sink.head).recover { case _: SubscriptionTimeoutException "expected" }
Await.result(f, 300.millis) should equal("expected")
}
"timeout and stop groupBy parent actor if none of the substreams are actually consumed" in {
val publisherProbe = StreamTestKit.PublisherProbe[Int]()
val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher())
val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]()
publisher.subscribe(subscriber)
@ -103,7 +103,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
"not timeout and cancel substream publishers when they have been subscribed to" in {
val publisherProbe = StreamTestKit.PublisherProbe[Int]()
val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher())
val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]()
publisher.subscribe(subscriber)
@ -118,7 +118,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
val (_, s1) = subscriber.expectNext()
// should not break normal usage
val s1SubscriberProbe = StreamTestKit.SubscriberProbe[Int]()
s1.runWith(Sink.publisher()).subscribe(s1SubscriberProbe)
s1.runWith(Sink.publisher).subscribe(s1SubscriberProbe)
val s1Sub = s1SubscriberProbe.expectSubscription()
s1Sub.request(1)
s1SubscriberProbe.expectNext(1)
@ -126,7 +126,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
val (_, s2) = subscriber.expectNext()
// should not break normal usage
val s2SubscriberProbe = StreamTestKit.SubscriberProbe[Int]()
s2.runWith(Sink.publisher()).subscribe(s2SubscriberProbe)
s2.runWith(Sink.publisher).subscribe(s2SubscriberProbe)
val s2Sub = s2SubscriberProbe.expectSubscription()
// sleep long enough for tiemout to trigger if not cancelled

View file

@ -50,7 +50,7 @@ class TickSourceSpec extends AkkaSpec {
}
"reject multiple subscribers, but keep the first" in {
val p = Source(1.second, 1.second, "tick").runWith(Sink.publisher())
val p = Source(1.second, 1.second, "tick").runWith(Sink.publisher)
val c1 = StreamTestKit.SubscriberProbe[String]()
val c2 = StreamTestKit.SubscriberProbe[String]()
p.subscribe(c1)

View file

@ -25,7 +25,7 @@ private[akka] class ConcatAllImpl(materializer: ActorFlowMaterializer)
val takeNextSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
val Extract.Source(source) = primaryInputs.dequeueInputElement()
val publisher = source.runWith(Sink.publisher())(materializer)
val publisher = source.runWith(Sink.publisher)(materializer)
// FIXME we can pass the flow to createSubstreamInput (but avoiding copy impl now)
val inputs = createAndSubscribeSubstreamInput(publisher)
nextPhase(streamSubstream(inputs))

View file

@ -14,7 +14,10 @@ import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.{ Future, Promise }
abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends Module {
/**
* INTERNAL API
*/
private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends Module {
def create(materializer: ActorFlowMaterializerImpl, flowName: String): (Subscriber[In] @uncheckedVariance, Mat)
@ -31,15 +34,24 @@ abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends Module {
}
override def subModules: Set[Module] = Set.empty
def amendShape(attr: OperationAttributes): SinkShape[In] = {
attr.nameOption match {
case None shape
case s: Some[String] if s == attributes.nameOption shape
case Some(name) shape.copy(inlet = new Inlet(name + ".in"))
}
}
}
/**
* INTERNAL API
* Holds the downstream-most [[org.reactivestreams.Publisher]] interface of the materialized flow.
* The stream will not have any subscribers attached at this point, which means that after prefetching
* elements to fill the internal buffers it will assert back-pressure until
* a subscriber connects and creates demand for elements to be emitted.
*/
class PublisherSink[In](val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) {
private[akka] class PublisherSink[In](val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) {
override def toString: String = "PublisherSink"
@ -54,10 +66,13 @@ class PublisherSink[In](val attributes: OperationAttributes, shape: SinkShape[In
}
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] = new PublisherSink[In](attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new PublisherSink[In](attr, shape)
override def withAttributes(attr: OperationAttributes): Module = new PublisherSink[In](attr, amendShape(attr))
}
final class FanoutPublisherSink[In](
/**
* INTERNAL API
*/
private[akka] final class FanoutPublisherSink[In](
initialBufferSize: Int,
maximumBufferSize: Int,
val attributes: OperationAttributes,
@ -75,12 +90,14 @@ final class FanoutPublisherSink[In](
new FanoutPublisherSink[In](initialBufferSize, maximumBufferSize, attributes, shape)
override def withAttributes(attr: OperationAttributes): Module =
new FanoutPublisherSink[In](initialBufferSize, maximumBufferSize, attr, shape)
new FanoutPublisherSink[In](initialBufferSize, maximumBufferSize, attr, amendShape(attr))
}
object HeadSink {
/** INTERNAL API */
private[akka] class HeadSinkSubscriber[In](p: Promise[In]) extends Subscriber[In] {
/**
* INTERNAL API
*/
private[akka] object HeadSink {
class HeadSinkSubscriber[In](p: Promise[In]) extends Subscriber[In] {
private val sub = new AtomicReference[Subscription]
override def onSubscribe(s: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(s)
@ -105,13 +122,14 @@ object HeadSink {
}
/**
* INTERNAL API
* Holds a [[scala.concurrent.Future]] that will be fulfilled with the first
* thing that is signaled to this stream, which can be either an element (after
* which the upstream subscription is canceled), an error condition (putting
* the Future into the corresponding failed state) or the end-of-stream
* (failing the Future with a NoSuchElementException).
*/
class HeadSink[In](val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, Future[In]](shape) {
private[akka] class HeadSink[In](val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, Future[In]](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = {
val p = Promise[In]()
@ -120,39 +138,42 @@ class HeadSink[In](val attributes: OperationAttributes, shape: SinkShape[In]) ex
}
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Future[In]] = new HeadSink[In](attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new HeadSink[In](attr, shape)
override def withAttributes(attr: OperationAttributes): Module = new HeadSink[In](attr, amendShape(attr))
override def toString: String = "HeadSink"
}
/**
* INTERNAL API
* Attaches a subscriber to this stream which will just discard all received
* elements.
*/
final class BlackholeSink(val attributes: OperationAttributes, shape: SinkShape[Any]) extends SinkModule[Any, Unit](shape) {
private[akka] final class BlackholeSink(val attributes: OperationAttributes, shape: SinkShape[Any]) extends SinkModule[Any, Unit](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) =
(new BlackholeSubscriber[Any](materializer.settings.maxInputBufferSize), ())
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Unit] = new BlackholeSink(attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new BlackholeSink(attr, shape)
override def withAttributes(attr: OperationAttributes): Module = new BlackholeSink(attr, amendShape(attr))
}
/**
* INTERNAL API
* Attaches a subscriber to this stream.
*/
final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, Unit](shape) {
private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, Unit](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = (subscriber, ())
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Unit] = new SubscriberSink[In](subscriber, attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new SubscriberSink[In](subscriber, attr, shape)
override def withAttributes(attr: OperationAttributes): Module = new SubscriberSink[In](subscriber, attr, amendShape(attr))
}
/**
* INTERNAL API
* A sink that immediately cancels its upstream upon materialization.
*/
final class CancelSink(val attributes: OperationAttributes, shape: SinkShape[Any]) extends SinkModule[Any, Unit](shape) {
private[akka] final class CancelSink(val attributes: OperationAttributes, shape: SinkShape[Any]) extends SinkModule[Any, Unit](shape) {
/**
* This method is only used for Sinks that return true from [[#isActive]], which then must
@ -169,14 +190,15 @@ final class CancelSink(val attributes: OperationAttributes, shape: SinkShape[Any
}
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Unit] = new CancelSink(attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new CancelSink(attr, shape)
override def withAttributes(attr: OperationAttributes): Module = new CancelSink(attr, amendShape(attr))
}
/**
* INTERNAL API
* Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`,
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]].
*/
final class PropsSink[In](props: Props, val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) {
private[akka] final class PropsSink[In](props: Props, val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = {
val subscriberRef = materializer.actorOf(props, name = s"$flowName-props")
@ -184,5 +206,5 @@ final class PropsSink[In](props: Props, val attributes: OperationAttributes, sha
}
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, ActorRef] = new PropsSink[In](props, attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new PropsSink[In](props, attr, shape)
override def withAttributes(attr: OperationAttributes): Module = new PropsSink[In](props, attr, amendShape(attr))
}

View file

@ -16,7 +16,10 @@ import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Future, Promise }
import scala.util.{ Failure, Success }
abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends Module {
/**
* INTERNAL API
*/
private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends Module {
def create(materializer: ActorFlowMaterializerImpl, flowName: String): (Publisher[Out] @uncheckedVariance, Mat)
@ -33,13 +36,23 @@ abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends Mod
}
override def subModules: Set[Module] = Set.empty
def amendShape(attr: OperationAttributes): SourceShape[Out] = {
attr.nameOption match {
case None shape
case s: Some[String] if s == attributes.nameOption shape
case Some(name) shape.copy(outlet = new Outlet(name + ".out"))
}
}
}
/**
* INTERNAL API
* Holds a `Subscriber` representing the input side of the flow.
* The `Subscriber` can later be connected to an upstream `Publisher`.
*/
final class SubscriberSource[Out](val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Subscriber[Out]](shape) {
private[akka] final class SubscriberSource[Out](val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Subscriber[Out]](shape) {
/**
* This method is only used for Sources that return true from [[#isActive]], which then must
@ -61,29 +74,31 @@ final class SubscriberSource[Out](val attributes: OperationAttributes, shape: So
}
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Subscriber[Out]] = new SubscriberSource[Out](attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new SubscriberSource[Out](attr, shape)
override def withAttributes(attr: OperationAttributes): Module = new SubscriberSource[Out](attr, amendShape(attr))
}
/**
* INTERNAL API
* Construct a transformation starting with given publisher. The transformation steps
* are executed by a series of [[org.reactivestreams.Processor]] instances
* that mediate the flow of elements downstream and the propagation of
* back-pressure upstream.
*/
final class PublisherSource[Out](p: Publisher[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) {
private[akka] final class PublisherSource[Out](p: Publisher[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = (p, ())
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Unit] = new PublisherSource[Out](p, attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new PublisherSource[Out](p, attr, shape)
override def withAttributes(attr: OperationAttributes): Module = new PublisherSource[Out](p, attr, amendShape(attr))
}
/**
* INTERNAL API
* Start a new `Source` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which
* may happen before or after materializing the `Flow`.
* The stream terminates with an error if the `Future` is completed with a failure.
*/
final class FutureSource[Out](future: Future[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) {
private[akka] final class FutureSource[Out](future: Future[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) =
future.value match {
case Some(Success(element))
@ -96,10 +111,13 @@ final class FutureSource[Out](future: Future[Out], val attributes: OperationAttr
}
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Unit] = new FutureSource(future, attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new FutureSource(future, attr, shape)
override def withAttributes(attr: OperationAttributes): Module = new FutureSource(future, attr, amendShape(attr))
}
final class LazyEmptySource[Out](val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Unit]](shape) {
/**
* INTERNAL API
*/
private[akka] final class LazyEmptySource[Out](val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Unit]](shape) {
import ReactiveStreamsCompliance._
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = {
@ -123,17 +141,18 @@ final class LazyEmptySource[Out](val attributes: OperationAttributes, shape: Sou
}
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Promise[Unit]] = new LazyEmptySource[Out](attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new LazyEmptySource(attr, shape)
override def withAttributes(attr: OperationAttributes): Module = new LazyEmptySource(attr, amendShape(attr))
}
/**
* INTERNAL API
* Elements are emitted periodically with the specified interval.
* The tick element will be delivered to downstream consumers that has requested any elements.
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
final class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Out, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Cancellable](shape) {
private[akka] final class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Out, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Cancellable](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = {
val cancelled = new AtomicBoolean(false)
@ -150,14 +169,15 @@ final class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDurati
}
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Cancellable] = new TickSource[Out](initialDelay, interval, tick, attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new TickSource(initialDelay, interval, tick, attr, shape)
override def withAttributes(attr: OperationAttributes): Module = new TickSource(initialDelay, interval, tick, attr, amendShape(attr))
}
/**
* INTERNAL API
* Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`,
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorPublisher]].
*/
final class PropsSource[Out](props: Props, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) {
private[akka] final class PropsSource[Out](props: Props, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = {
val publisherRef = materializer.actorOf(props, name = s"$flowName-0-props")
@ -165,5 +185,5 @@ final class PropsSource[Out](props: Props, val attributes: OperationAttributes,
}
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = new PropsSource[Out](props, attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new PropsSource(props, attr, shape)
override def withAttributes(attr: OperationAttributes): Module = new PropsSource(props, attr, amendShape(attr))
}

View file

@ -92,7 +92,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* Connect the `KeyedSource` to this `Flow` and then connect it to the `KeyedSink` and run it.
*
* The returned tuple contains the materialized values of the `KeyedSource` and `KeyedSink`,
* e.g. the `Subscriber` of a `Source.subscriber()` and `Publisher` of a `Sink.publisher()`.
* e.g. the `Subscriber` of a `Source.subscriber` and `Publisher` of a `Sink.publisher`.
*
* @tparam T materialized type of given KeyedSource
* @tparam U materialized type of given KeyedSink
@ -375,6 +375,12 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
val javaToScala = (flow: javadsl.Flow[Out, O, M]) flow.asScala
scalaToJava andThen section.apply andThen javaToScala
})
def withAttributes(attr: OperationAttributes): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.withAttributes(attr.asScala))
def named(name: String): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.named(name))
}
/**

View file

@ -12,6 +12,7 @@ import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import scala.concurrent.Future
import akka.stream.impl.StreamLayout
import scala.util.Try
/** Java API */
object Sink {
@ -51,7 +52,7 @@ object Sink {
/**
* A `Sink` that immediately cancels its upstream after materialization.
*/
def cancelled[T]: Sink[T, Unit] =
def cancelled[T](): Sink[T, Unit] =
new Sink(scaladsl.Sink.cancelled)
/**
@ -65,7 +66,7 @@ object Sink {
* that can handle one [[org.reactivestreams.Subscriber]].
*/
def publisher[In](): Sink[In, Publisher[In]] =
new Sink(scaladsl.Sink.publisher())
new Sink(scaladsl.Sink.publisher)
/**
* A `Sink` that will invoke the given procedure for each received element. The sink is materialized
@ -88,13 +89,13 @@ object Sink {
* completion, apply the provided function with [[scala.util.Success]]
* or [[scala.util.Failure]].
*/
def onComplete[In](onComplete: japi.Procedure[Unit]): Sink[In, Unit] =
new Sink(scaladsl.Sink.onComplete[In](x onComplete.apply(x)))
def onComplete[In](callback: japi.Procedure[Try[Unit]]): Sink[In, Unit] =
new Sink(scaladsl.Sink.onComplete[In](x callback.apply(x)))
/**
* A `Sink` that materializes into a `Future` of the first value received.
*/
def head[In]: Sink[In, Future[In]] =
def head[In](): Sink[In, Future[In]] =
new Sink(scaladsl.Sink.head[In])
}
@ -124,4 +125,10 @@ class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[
*/
def mapMaterialized[Mat2](f: japi.Function[Mat, Mat2]): Sink[In, Mat2] =
new Sink(delegate.mapMaterialized(f.apply _))
def withAttributes(attr: OperationAttributes): javadsl.Sink[In, Mat] =
new Sink(delegate.withAttributes(attr.asScala))
def named(name: String): javadsl.Sink[In, Mat] =
new Sink(delegate.named(name))
}

View file

@ -35,7 +35,7 @@ object Source {
* for every connected `Sink`.
*/
def empty[O](): Source[O, Unit] =
new Source(scaladsl.Source.empty())
new Source(scaladsl.Source.empty)
/**
* Create a `Source` with no elements, which does not complete its downstream,
@ -47,7 +47,7 @@ object Source {
* to its downstream.
*/
def lazyEmpty[T](): Source[T, Promise[Unit]] =
new Source[T, Promise[Unit]](scaladsl.Source.lazyEmpty())
new Source[T, Promise[Unit]](scaladsl.Source.lazyEmpty)
/**
* Helper to create [[Source]] from `Publisher`.
@ -69,7 +69,7 @@ object Source {
* data.add(1);
* data.add(2);
* data.add(3);
* Source.from(data.iterator());
* Source.from(() -> data.iterator());
* }}}
*
* Start a new `Source` from the given Iterator. The produced stream of elements
@ -150,7 +150,7 @@ object Source {
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]
*/
def subscriber[T](): Source[T, Subscriber[T]] =
new Source(scaladsl.Source.subscriber())
new Source(scaladsl.Source.subscriber)
/**
* Concatenates two sources so that the first element
@ -210,7 +210,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
/**
* Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
* of the `Sink`, e.g. the `Publisher` of a `Sink.publisher()`.
* of the `Sink`, e.g. the `Publisher` of a `Sink.publisher`.
*/
def runWith[M](sink: Sink[Out, M], materializer: FlowMaterializer): M =
delegate.runWith(sink.asScala)(materializer)
@ -471,4 +471,11 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
val javaToScala = (source: javadsl.Flow[Out, O, M]) source.asScala
scalaToJava andThen section.apply andThen javaToScala
})
def withAttributes(attr: OperationAttributes): javadsl.Source[Out, Mat] =
new Source(delegate.withAttributes(attr.asScala))
def named(name: String): javadsl.Source[Out, Mat] =
new Source(delegate.named(name))
}

View file

@ -530,6 +530,8 @@ trait FlowOps[+Out, +Mat] {
def withAttributes(attr: OperationAttributes): Repr[Out, Mat]
def named(name: String): Repr[Out, Mat] = withAttributes(OperationAttributes.name(name))
/** INTERNAL API */
private[scaladsl] def andThen[U](op: StageModule): Repr[U, Mat]

View file

@ -22,16 +22,28 @@ final case class OperationAttributes private (attributes: List[OperationAttribut
else if (other.attributes.isEmpty) this
else OperationAttributes(attributes ::: other.attributes)
/**
* INTERNAL API
*/
private[akka] def nameLifted: Option[String] =
attributes.collect {
case Name(name) name
}.reduceOption(_ + "-" + _) // FIXME don't do a double-traversal, use a fold instead
/**
* INTERNAL API
*/
private[akka] def name: String = nameLifted match {
case Some(name) name
case _ "unknown-operation"
}
/**
* INTERNAL API
*/
private[akka] def nameOption: Option[String] =
attributes.collectFirst { case Name(name) name }
private[akka] def transform(node: StageModule): StageModule =
if ((this eq OperationAttributes.none) || (this eq node.attributes)) node
else node.withAttributes(attributes = this and node.attributes)
@ -53,8 +65,11 @@ object OperationAttributes {
/**
* Specifies the name of the operation.
* If the name is null or empty the name is ignored, i.e. [[#none]] is returned.
*/
def name(name: String): OperationAttributes = OperationAttributes(Name(name))
def name(name: String): OperationAttributes =
if (name == null || name.isEmpty) none
else OperationAttributes(Name(name))
/**
* Specifies the initial and maximum size of the input buffer.

View file

@ -36,11 +36,13 @@ final class Sink[-In, +Mat](private[stream] override val module: Module)
def withAttributes(attr: OperationAttributes): Sink[In, Mat] =
new Sink(module.withAttributes(attr).wrap())
def named(name: String): Sink[In, Mat] = withAttributes(OperationAttributes.name(name))
}
object Sink extends SinkApply {
import OperationAttributes.{ none, name named }
import OperationAttributes.none
private def shape[T](name: String): SinkShape[T] = SinkShape(new Inlet(name + ".in"))
@ -53,58 +55,32 @@ object Sink extends SinkApply {
/**
* Helper to create [[Sink]] from `Subscriber`.
*/
def apply[T](subscriber: Subscriber[T]): Sink[T, Unit] = new Sink(new SubscriberSink(subscriber, none, shape("SubscriberSink")))
/**
* Helper to create [[Sink]] from `Subscriber`.
*/
def apply[T](subscriber: Subscriber[T], name: String): Sink[T, Unit] = new Sink(new SubscriberSink(subscriber, named(name), shape(name)))
def apply[T](subscriber: Subscriber[T]): Sink[T, Unit] =
new Sink(new SubscriberSink(subscriber, none, shape("SubscriberSink")))
/**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorSubscriber]].
*/
def apply[T](props: Props): Sink[T, ActorRef] = new Sink(new PropsSink(props, none, shape("PropsSink")))
/**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorSubscriber]].
*/
def apply[T](props: Props, name: String): Sink[T, ActorRef] = new Sink(new PropsSink(props, named(name), shape(name)))
def apply[T](props: Props): Sink[T, ActorRef] =
new Sink(new PropsSink(props, none, shape("PropsSink")))
/**
* A `Sink` that immediately cancels its upstream after materialization.
*/
def cancelled[T](): Sink[T, Unit] = new Sink[Any, Unit](new CancelSink(none, shape("CancelledSink")))
/**
* A `Sink` that immediately cancels its upstream after materialization.
*/
def cancelled[T](name: String): Sink[T, Unit] = new Sink[Any, Unit](new CancelSink(named(name), shape(name)))
def cancelled[T]: Sink[T, Unit] = new Sink[Any, Unit](new CancelSink(none, shape("CancelledSink")))
/**
* A `Sink` that materializes into a `Future` of the first value received.
*/
def head[T](): Sink[T, Future[T]] = new Sink(new HeadSink[T](none, shape("HeadSink")))
/**
* A `Sink` that materializes into a `Future` of the first value received.
*/
def head[T](name: String): Sink[T, Future[T]] = new Sink(new HeadSink[T](named(name), shape(name)))
def head[T]: Sink[T, Future[T]] = new Sink(new HeadSink[T](none, shape("HeadSink")))
/**
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]].
* that can handle one [[org.reactivestreams.Subscriber]].
*/
def publisher[T](): Sink[T, Publisher[T]] = new Sink(new PublisherSink[T](none, shape("PublisherSink")))
/**
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]].
* that can handle one [[org.reactivestreams.Subscriber]].
*/
def publisher[T](name: String): Sink[T, Publisher[T]] = new Sink(new PublisherSink[T](named(name), shape(name)))
def publisher[T]: Sink[T, Publisher[T]] = new Sink(new PublisherSink[T](none, shape("PublisherSink")))
/**
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]]
@ -113,22 +89,11 @@ object Sink extends SinkApply {
def fanoutPublisher[T](initialBufferSize: Int, maximumBufferSize: Int): Sink[T, Publisher[T]] =
new Sink(new FanoutPublisherSink[T](initialBufferSize, maximumBufferSize, none, shape("FanoutPublisherSink")))
/**
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]]
* that can handle more than one [[org.reactivestreams.Subscriber]].
*/
def fanoutPublisher[T](initialBufferSize: Int, maximumBufferSize: Int, name: String): Sink[T, Publisher[T]] =
new Sink(new FanoutPublisherSink[T](initialBufferSize, maximumBufferSize, named(name), shape(name)))
/**
* A `Sink` that will consume the stream and discard the elements.
*/
def ignore(): Sink[Any, Unit] = new Sink(new BlackholeSink(none, shape("BlackholeSink")))
/**
* A `Sink` that will consume the stream and discard the elements.
*/
def ignore(name: String): Sink[Any, Unit] = new Sink(new BlackholeSink(named(name), shape(name)))
def ignore: Sink[Any, Unit] =
new Sink(new BlackholeSink(none, shape("BlackholeSink")))
/**
* A `Sink` that will invoke the given procedure for each received element. The sink is materialized
@ -159,8 +124,7 @@ object Sink extends SinkApply {
(stage, promise.future)
}
Flow[T].transformMaterializing(newForeachStage).to(Sink.ignore).withAttributes(name("foreach"))
Flow[T].transformMaterializing(newForeachStage).to(Sink.ignore).named("ForeachSink")
}
/**
@ -197,8 +161,7 @@ object Sink extends SinkApply {
(stage, promise.future)
}
Flow[T].transformMaterializing(newFoldStage).to(Sink.ignore).withAttributes(name("fold"))
Flow[T].transformMaterializing(newFoldStage).to(Sink.ignore).named("FoldSink")
}
/**
@ -222,6 +185,6 @@ object Sink extends SinkApply {
}
}
Flow[T].transform(newOnCompleteStage).to(Sink.ignore).withAttributes(name("onComplete"))
Flow[T].transform(newOnCompleteStage).to(Sink.ignore).named("OnCompleteSink")
}
}

View file

@ -156,7 +156,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
object Source extends SourceApply {
import OperationAttributes.{ none, name named }
import OperationAttributes.none
private[stream] def apply[Out, Mat](module: SourceModule[Out, Mat]): Source[Out, Mat] =
new Source(module)
@ -174,17 +174,6 @@ object Source extends SourceApply {
def apply[T](publisher: Publisher[T]): Source[T, Unit] =
new Source(new PublisherSource(publisher, none, shape("PublisherSource")))
/**
* Helper to create [[Source]] from `Publisher`.
*
* Construct a transformation starting with given publisher. The transformation steps
* are executed by a series of [[org.reactivestreams.Processor]] instances
* that mediate the flow of elements downstream and the propagation of
* back-pressure upstream.
*/
def apply[T](publisher: Publisher[T], name: String): Source[T, Unit] =
new Source(new PublisherSource(publisher, named(name), shape(name)))
/**
* Helper to create [[Source]] from `Iterator`.
* Example usage: `Source(() => Iterator.from(0))`
@ -201,22 +190,6 @@ object Source extends SourceApply {
})
}
/**
* Helper to create [[Source]] from `Iterator`.
* Example usage: `Source(() => Iterator.from(0))`
*
* Start a new `Source` from the given function that produces anIterator.
* The produced stream of elements will continue until the iterator runs empty
* or fails during evaluation of the `next()` method.
* Elements are pulled out of the iterator in accordance with the demand coming
* from the downstream transformation steps.
*/
def apply[T](f: () Iterator[T], name: String): Source[T, Unit] = {
apply(new immutable.Iterable[T] {
override def iterator: Iterator[T] = f()
})
}
/**
* A graph with the shape of a source logically is a source, this method makes
* it so also in type.
@ -232,8 +205,7 @@ object Source extends SourceApply {
* stream will see an individual flow of elements (always starting from the
* beginning) regardless of when they subscribed.
*/
def apply[T](iterable: immutable.Iterable[T]): Source[T, Unit] = { // FIXME add naming of outlet
def apply[T](iterable: immutable.Iterable[T]): Source[T, Unit] = {
Source.empty.transform(() {
new PushPullStage[Nothing, T] {
var iterator: Iterator[T] = null
@ -263,26 +235,17 @@ object Source extends SourceApply {
}
}
}).withAttributes(OperationAttributes.name("iterable"))
}).named("IterableSource")
}
/**
* Start a new `Source` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which
* may happen before or after materializing the `Flow`.
* The stream terminates with an error if the `Future` is completed with a failure.
*/
def apply[T](future: Future[T]): Source[T, Unit] =
new Source(new FutureSource(future, none, shape("FutureSource")))
/**
* Start a new `Source` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which
* may happen before or after materializing the `Flow`.
* The stream terminates with a failure if the `Future` is completed with a failure.
*/
def apply[T](future: Future[T], name: String): Source[T, Unit] =
new Source(new FutureSource(future, named(name), shape(name)))
def apply[T](future: Future[T]): Source[T, Unit] =
new Source(new FutureSource(future, none, shape("FutureSource")))
/**
* Elements are emitted periodically with the specified interval.
@ -294,46 +257,32 @@ object Source extends SourceApply {
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable] =
new Source(new TickSource(initialDelay, interval, tick, none, shape("TickSource")))
/**
* Elements are emitted periodically with the specified interval.
* The tick element will be delivered to downstream consumers that has requested any elements.
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T, name: String): Source[T, Cancellable] =
new Source(new TickSource(initialDelay, interval, tick, named(name), shape(name)))
/**
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorPublisher]].
*/
def apply[T](props: Props): Source[T, ActorRef] = new Source(new PropsSource(props, none, shape("PropsSource")))
/**
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorPublisher]].
*/
def apply[T](props: Props, name: String): Source[T, ActorRef] = new Source(new PropsSource(props, named(name), shape(name)))
def apply[T](props: Props): Source[T, ActorRef] =
new Source(new PropsSource(props, none, shape("PropsSource")))
/**
* Create a `Source` with one element.
* Every connected `Sink` of this stream will see an individual stream consisting of one element.
*/
def single[T](element: T): Source[T, Unit] = apply(SynchronousIterablePublisher(List(element), "single")) // FIXME optimize
def single[T](element: T): Source[T, Unit] =
apply(SynchronousIterablePublisher(List(element), "SingleSource")) // FIXME optimize
/**
* Create a `Source` that will continually emit the given element.
*/
def repeat[T](element: T): Source[T, Unit] = apply(() Iterator.continually(element)) // FIXME optimize
def repeat[T](element: T): Source[T, Unit] =
apply(() Iterator.continually(element)) // FIXME optimize
/**
* A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`.
*/
def empty[T](): Source[T, Unit] = _empty
private[this] val _empty: Source[Nothing, Unit] = apply(EmptyPublisher, "EmptySource")
def empty[T]: Source[T, Unit] = _empty
private[this] val _empty: Source[Nothing, Unit] = apply(EmptyPublisher)
/**
* Create a `Source` with no elements, which does not complete its downstream,
@ -344,28 +293,13 @@ object Source extends SourceApply {
* be used to externally trigger completion, which the source then signalls
* to its downstream.
*/
def lazyEmpty[T](): Source[T, Promise[Unit]] = new Source(new LazyEmptySource[T](none, shape("LazyEmptySource")))
/**
* Create a `Source` with no elements, which does not complete its downstream,
* until externally triggered to do so.
*
* It materializes a [[scala.concurrent.Promise]] which will be completed
* when the downstream stage of this source cancels. This promise can also
* be used to externally trigger completion, which the source then signalls
* to its downstream.
*/
def lazyEmpty[T](name: String): Source[T, Promise[Unit]] = new Source(new LazyEmptySource[T](named(name), shape(name)))
def lazyEmpty[T]: Source[T, Promise[Unit]] =
new Source(new LazyEmptySource[T](none, shape("LazyEmptySource")))
/**
* Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`.
*/
def failed[T](cause: Throwable): Source[T, Unit] = apply(ErrorPublisher(cause, "failed"), "FailedSource")
/**
* Create a `Source` that immediately ends the stream with the `cause` failure to every connected `Sink`.
*/
def failed[T](cause: Throwable, name: String): Source[T, Unit] = apply(ErrorPublisher(cause, "failed"), name)
def failed[T](cause: Throwable): Source[T, Unit] = apply(ErrorPublisher(cause, "FailedSource"))
/**
* Concatenates two sources so that the first element
@ -385,11 +319,7 @@ object Source extends SourceApply {
/**
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]
*/
def subscriber[T](): Source[T, Subscriber[T]] = new Source(new SubscriberSource[T](none, shape("SubscriberSource")))
/**
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]
*/
def subscriber[T](name: String): Source[T, Subscriber[T]] = new Source(new SubscriberSource[T](named(name), shape(name)))
def subscriber[T]: Source[T, Subscriber[T]] =
new Source(new SubscriberSource[T](none, shape("SubscriberSource")))
}