= str - #15621 - Moving FlowMaterializer argument to the end of all Flow and Duct methods which uses it.

This commit is contained in:
Viktor Klang 2014-07-17 18:11:12 +02:00
parent 2ccf028a94
commit bb8ab0a925
33 changed files with 106 additions and 109 deletions

View file

@ -78,7 +78,7 @@ class HttpServerExampleSpec
case Http.IncomingConnection(remoteAddress, requestProducer, responseConsumer)
println("Accepted new connection from " + remoteAddress)
Flow(requestProducer).map(requestHandler).produceTo(materializer, responseConsumer)
Flow(requestProducer).map(requestHandler).produceTo(responseConsumer, materializer)
}.consume(materializer)
}
//#full-server-example

View file

@ -10,7 +10,6 @@ import akka.util.ByteString
import akka.http.model.headers.{ GenericHttpCredentials, BasicHttpCredentials }
import org.scalatest.MustMatchers
class ModelSpec extends AkkaSpec {
"construct request" in {
//#construct-request

View file

@ -47,7 +47,7 @@ private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettin
.transform(responseRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat)
.transform(errorLogger(log, "Outgoing request stream error"))
.produceTo(materializer, tcpConn.outputStream)
.produceTo(tcpConn.outputStream, materializer)
val responsePublisher =
Flow(tcpConn.inputStream)

View file

@ -167,7 +167,7 @@ object HttpEntity {
*/
def apply(contentType: ContentType, chunks: Publisher[ByteString], materializer: FlowMaterializer): Chunked =
Chunked(contentType, Flow(chunks).collect[ChunkStreamPart] {
case b: ByteString if b.nonEmpty => Chunk(b)
case b: ByteString if b.nonEmpty Chunk(b)
}.toPublisher(materializer))
}

View file

@ -59,7 +59,7 @@ private[http] class HttpServerPipeline(settings: ServerSettings,
.transform(responseRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat)
.transform(errorLogger(log, "Outgoing response stream error"))
.produceTo(materializer, tcpConn.outputStream)
.produceTo(tcpConn.outputStream, materializer)
Http.IncomingConnection(tcpConn.remoteAddress, requestPublisher, responseSubscriber)
}

View file

@ -48,7 +48,7 @@ public abstract class JavaTestServer {
return JavaApiTestCases.handleRequest(request);
}
})
.produceTo(materializer, conn.getResponseSubscriber());
.produceTo(conn.getResponseSubscriber(), materializer);
}
}).consume(materializer);
}

View file

@ -37,7 +37,7 @@ object TestClient extends App {
} yield response.header[headers.Server]
def sendRequest(request: HttpRequest, connection: Http.OutgoingConnection): Future[HttpResponse] = {
Flow(List(HttpRequest() -> 'NoContext)).produceTo(materializer, connection.processor)
Flow(List(HttpRequest() -> 'NoContext)).produceTo(connection.processor, materializer)
Flow(connection.processor).map(_._1).toFuture(materializer)
}

View file

@ -39,7 +39,7 @@ object TestServer extends App {
Flow(connectionStream).foreach {
case Http.IncomingConnection(remoteAddress, requestPublisher, responseSubscriber)
println("Accepted new connection from " + remoteAddress)
Flow(requestPublisher).map(requestHandler).produceTo(materializer, responseSubscriber)
Flow(requestPublisher).map(requestHandler).produceTo(responseSubscriber, materializer)
}.consume(materializer)
}

View file

@ -49,9 +49,9 @@ private[akka] case class FlowImpl[I, O](publisherNode: Ast.PublisherNode[I], ops
}
override def consume(materializer: FlowMaterializer): Unit =
produceTo(materializer, new BlackholeSubscriber(materializer.settings.maximumInputBufferSize))
produceTo(new BlackholeSubscriber(materializer.settings.maximumInputBufferSize), materializer)
override def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] Unit): Unit =
override def onComplete(callback: Try[Unit] Unit, materializer: FlowMaterializer): Unit =
transform(new Transformer[O, Unit] {
override def onNext(in: O) = Nil
override def onError(e: Throwable) = {
@ -66,7 +66,7 @@ private[akka] case class FlowImpl[I, O](publisherNode: Ast.PublisherNode[I], ops
override def toPublisher(materializer: FlowMaterializer): Publisher[O] = materializer.toPublisher(publisherNode, ops)
override def produceTo(materializer: FlowMaterializer, subscriber: Subscriber[_ >: O]): Unit =
override def produceTo(subscriber: Subscriber[_ >: O], materializer: FlowMaterializer): Unit =
toPublisher(materializer).subscribe(subscriber.asInstanceOf[Subscriber[O]])
}
@ -86,13 +86,13 @@ private[akka] case class DuctImpl[In, Out](ops: List[Ast.AstNode]) extends Duct[
override def appendJava[U](duct: akka.stream.javadsl.Duct[_ >: Out, U]): Duct[In, U] =
copy(ops = duct.ops ++: ops)
override def produceTo(materializer: FlowMaterializer, subscriber: Subscriber[Out]): Subscriber[In] =
override def produceTo(subscriber: Subscriber[Out], materializer: FlowMaterializer): Subscriber[In] =
materializer.ductProduceTo(subscriber, ops)
override def consume(materializer: FlowMaterializer): Subscriber[In] =
produceTo(materializer, new BlackholeSubscriber(materializer.settings.maximumInputBufferSize))
produceTo(new BlackholeSubscriber(materializer.settings.maximumInputBufferSize), materializer)
override def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] Unit): Subscriber[In] =
override def onComplete(callback: Try[Unit] Unit, materializer: FlowMaterializer): Subscriber[In] =
transform(new Transformer[Out, Unit] {
override def onNext(in: Out) = Nil
override def onError(e: Throwable) = {

View file

@ -287,7 +287,7 @@ abstract class Duct[In, Out] {
* The given FlowMaterializer decides how the flows logical structure is
* broken down into individual processing steps.
*/
def produceTo(materializer: FlowMaterializer, subscriber: Subscriber[Out]): Subscriber[In]
def produceTo(subscriber: Subscriber[Out], materializer: FlowMaterializer): Subscriber[In]
/**
* Attaches a subscriber to this stream which will just discard all received
@ -309,7 +309,7 @@ abstract class Duct[In, Out] {
*
* *This operation materializes the flow and initiates its execution.*
*/
def onComplete(materializer: FlowMaterializer)(callback: OnCompleteCallback): Subscriber[In]
def onComplete(callback: OnCompleteCallback, materializer: FlowMaterializer): Subscriber[In]
/**
* Materialize this `Duct` into a `Subscriber` representing the input side of the `Duct`
@ -414,17 +414,18 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In,
override def append[U](duct: Duct[_ >: T, U]): Duct[In, U] =
new DuctAdapter(delegate.appendJava(duct))
override def produceTo(materializer: FlowMaterializer, subscriber: Subscriber[T]): Subscriber[In] =
delegate.produceTo(materializer, subscriber)
override def produceTo(subscriber: Subscriber[T], materializer: FlowMaterializer): Subscriber[In] =
delegate.produceTo(subscriber, materializer)
override def consume(materializer: FlowMaterializer): Subscriber[In] =
delegate.consume(materializer)
override def onComplete(materializer: FlowMaterializer)(callback: OnCompleteCallback): Subscriber[In] =
delegate.onComplete(materializer) {
override def onComplete(callback: OnCompleteCallback, materializer: FlowMaterializer): Subscriber[In] =
delegate.onComplete({
case Success(_) callback.onComplete(null)
case Failure(e) callback.onComplete(e)
}
}, materializer)
override def build(materializer: FlowMaterializer): Pair[Subscriber[In], Publisher[T]] = {
val (in, out) = delegate.build(materializer)

View file

@ -366,7 +366,7 @@ abstract class Flow[T] {
*
* *This operation materializes the flow and initiates its execution.*
*/
def onComplete(materializer: FlowMaterializer)(callback: OnCompleteCallback): Unit
def onComplete(callback: OnCompleteCallback, materializer: FlowMaterializer): Unit
/**
* Materialize this flow and return the downstream-most
@ -388,7 +388,7 @@ abstract class Flow[T] {
* The given FlowMaterializer decides how the flows logical structure is
* broken down into individual processing steps.
*/
def produceTo(materializer: FlowMaterializer, subscriber: Subscriber[_ >: T]): Unit
def produceTo(subscriber: Subscriber[_ >: T], materializer: FlowMaterializer): Unit
}
@ -487,16 +487,16 @@ private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] {
override def consume(materializer: FlowMaterializer): Unit =
delegate.consume(materializer)
override def onComplete(materializer: FlowMaterializer)(callback: OnCompleteCallback): Unit =
delegate.onComplete(materializer) {
override def onComplete(callback: OnCompleteCallback, materializer: FlowMaterializer): Unit =
delegate.onComplete({
case Success(_) callback.onComplete(null)
case Failure(e) callback.onComplete(e)
}
}, materializer)
override def toPublisher(materializer: FlowMaterializer): Publisher[T] =
delegate.toPublisher(materializer)
override def produceTo(materializer: FlowMaterializer, subsriber: Subscriber[_ >: T]): Unit =
delegate.produceTo(materializer, subsriber)
override def produceTo(subsriber: Subscriber[_ >: T], materializer: FlowMaterializer): Unit =
delegate.produceTo(subsriber, materializer)
}

View file

@ -281,7 +281,7 @@ trait Duct[In, +Out] {
* The given FlowMaterializer decides how the flows logical structure is
* broken down into individual processing steps.
*/
def produceTo(materializer: FlowMaterializer, subscriber: Subscriber[Out] @uncheckedVariance): Subscriber[In]
def produceTo(subscriber: Subscriber[Out] @uncheckedVariance, materializer: FlowMaterializer): Subscriber[In]
/**
* Attaches a subscriber to this stream which will just discard all received
@ -303,7 +303,7 @@ trait Duct[In, +Out] {
*
* *This operation materializes the flow and initiates its execution.*
*/
def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] Unit): Subscriber[In]
def onComplete(callback: Try[Unit] Unit, materializer: FlowMaterializer): Subscriber[In]
/**
* Materialize this `Duct` into a `Subscriber` representing the input side of the `Duct`

View file

@ -368,7 +368,7 @@ trait Flow[+T] {
*
* *This operation materializes the flow and initiates its execution.*
*/
def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] Unit): Unit
def onComplete(callback: Try[Unit] Unit, materializer: FlowMaterializer): Unit
/**
* Materialize this flow and return the downstream-most
@ -390,7 +390,7 @@ trait Flow[+T] {
* The given FlowMaterializer decides how the flows logical structure is
* broken down into individual processing steps.
*/
def produceTo(materializer: FlowMaterializer, subscriber: Subscriber[_ >: T]): Unit
def produceTo(subscriber: Subscriber[_ >: T], materializer: FlowMaterializer): Unit
}

View file

@ -96,7 +96,7 @@ public class DuctTest {
}
}).consume(materializer);
Subscriber<String> inSubscriber = Duct.create(String.class).produceTo(materializer, subscriber);
Subscriber<String> inSubscriber = Duct.create(String.class).produceTo(subscriber, materializer);
probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
@ -160,7 +160,7 @@ public class DuctTest {
}
}).consume(materializer);
Flow.create(Arrays.asList(1, 2, 3)).produceTo(materializer, ductInSubscriber);
Flow.create(Arrays.asList(1, 2, 3)).produceTo(ductInSubscriber, materializer);
probe.expectMsgEquals("elem-12");
probe.expectMsgEquals("elem-14");
@ -175,7 +175,7 @@ public class DuctTest {
public String apply(Integer elem) {
return elem.toString();
}
}).onComplete(materializer, new OnCompleteCallback() {
}).onComplete(new OnCompleteCallback() {
@Override
public void onComplete(Throwable e) {
if (e == null)
@ -183,7 +183,7 @@ public class DuctTest {
else
probe.getRef().tell(e, ActorRef.noSender());
}
});
}, materializer);
Publisher<Integer> publisher = Flow.create(Arrays.asList(1, 2, 3)).toPublisher(materializer);
publisher.subscribe(inSubscriber);
@ -204,7 +204,7 @@ public class DuctTest {
}).consume(materializer);
final java.lang.Iterable<String> input = Arrays.asList("a", "b", "c");
Flow.create(input).produceTo(materializer, c);
Flow.create(input).produceTo(c, materializer);
probe.expectMsgEquals("A");
probe.expectMsgEquals("B");
probe.expectMsgEquals("C");

View file

@ -355,15 +355,12 @@ public class FlowTest {
public void mustBeAbleToUseOnCompleteSuccess() {
final JavaTestKit probe = new JavaTestKit(system);
final java.lang.Iterable<String> input = Arrays.asList("A", "B", "C");
Flow.create(input).onComplete(materializer, new OnCompleteCallback() {
Flow.create(input).onComplete(new OnCompleteCallback() {
@Override
public void onComplete(Throwable e) {
if (e == null)
probe.getRef().tell("done", ActorRef.noSender());
else
probe.getRef().tell(e, ActorRef.noSender());
probe.getRef().tell( (e == null) ? "done" : e, ActorRef.noSender());
}
});
}, materializer);
probe.expectMsgEquals("done");
}
@ -376,7 +373,7 @@ public class FlowTest {
public String apply(String arg0) throws Exception {
throw new RuntimeException("simulated err");
}
}).onComplete(materializer, new OnCompleteCallback() {
}).onComplete(new OnCompleteCallback() {
@Override
public void onComplete(Throwable e) {
if (e == null)
@ -384,7 +381,7 @@ public class FlowTest {
else
probe.getRef().tell(e.getMessage(), ActorRef.noSender());
}
});
}, materializer);
probe.expectMsgEquals("simulated err");
}

View file

@ -78,7 +78,7 @@ class DuctSpec extends AkkaSpec {
"subscribe Subscriber" in {
val duct: Duct[String, String] = Duct[String]
val c1 = StreamTestKit.SubscriberProbe[String]()
val c2: Subscriber[String] = duct.produceTo(materializer, c1)
val c2: Subscriber[String] = duct.produceTo(c1, materializer)
val source: Publisher[String] = Flow(List("1", "2", "3")).toPublisher(materializer)
source.subscribe(c2)
@ -117,7 +117,7 @@ class DuctSpec extends AkkaSpec {
"perform transformation operation and subscribe Subscriber" in {
val duct = Duct[Int].map(_.toString)
val c1 = StreamTestKit.SubscriberProbe[String]()
val c2: Subscriber[Int] = duct.produceTo(materializer, c1)
val c2: Subscriber[Int] = duct.produceTo(c1, materializer)
val sub1 = c1.expectSubscription
sub1.request(3)
@ -135,7 +135,7 @@ class DuctSpec extends AkkaSpec {
"perform multiple transformation operations and subscribe Subscriber" in {
val duct = Duct[Int].map(_.toString).map("elem-" + _)
val c1 = StreamTestKit.SubscriberProbe[String]()
val c2 = duct.produceTo(materializer, c1)
val c2 = duct.produceTo(c1, materializer)
val sub1 = c1.expectSubscription
sub1.request(3)
@ -152,10 +152,10 @@ class DuctSpec extends AkkaSpec {
"call onComplete callback when done" in {
val duct = Duct[Int].map(i { testActor ! i.toString; i.toString })
val c = duct.onComplete(materializer) {
val c = duct.onComplete({
case Success(_) testActor ! "DONE"
case Failure(e) testActor ! e
}
}, materializer)
val source = Flow(List(1, 2, 3)).toPublisher(materializer)
source.subscribe(c)
@ -169,7 +169,7 @@ class DuctSpec extends AkkaSpec {
"be appendable to a Flow" in {
val c = StreamTestKit.SubscriberProbe[String]()
val duct = Duct[Int].map(_ + 10).map(_.toString)
Flow(List(1, 2, 3)).map(_ * 2).append(duct).map((s: String) "elem-" + s).produceTo(materializer, c)
Flow(List(1, 2, 3)).map(_ * 2).append(duct).map((s: String) "elem-" + s).produceTo(c, materializer)
val sub = c.expectSubscription
sub.request(3)
@ -186,9 +186,9 @@ class DuctSpec extends AkkaSpec {
.map { i (i * 2).toString }
.append(duct1)
.map { i "elem-" + (i + 10) }
.produceTo(materializer, c)
.produceTo(c, materializer)
Flow(List(1, 2, 3)).produceTo(materializer, ductInSubscriber)
Flow(List(1, 2, 3)).produceTo(ductInSubscriber, materializer)
val sub = c.expectSubscription
sub.request(3)

View file

@ -47,7 +47,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Flow(publisher).buffer(100, overflowStrategy = OverflowStrategy.backpressure).produceTo(materializer, subscriber)
Flow(publisher).buffer(100, overflowStrategy = OverflowStrategy.backpressure).produceTo(subscriber, materializer)
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -67,7 +67,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Flow(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropHead).produceTo(materializer, subscriber)
Flow(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropHead).produceTo(subscriber, materializer)
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -95,7 +95,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Flow(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropTail).produceTo(materializer, subscriber)
Flow(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropTail).produceTo(subscriber, materializer)
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -126,7 +126,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Flow(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).produceTo(materializer, subscriber)
Flow(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).produceTo(subscriber, materializer)
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -157,7 +157,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Flow(publisher).buffer(1, overflowStrategy = strategy).produceTo(materializer, subscriber)
Flow(publisher).buffer(1, overflowStrategy = strategy).produceTo(subscriber, materializer)
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()

View file

@ -44,7 +44,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on onError on master stream cancel the current open substream and signal error" in {
val publisher = StreamTestKit.PublisherProbe[Publisher[Int]]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Flow(publisher).flatten(FlattenStrategy.concat).produceTo(m, subscriber)
Flow(publisher).flatten(FlattenStrategy.concat).produceTo(subscriber, m)
val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription()
@ -63,7 +63,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on onError on open substream, cancel the master stream and signal error " in {
val publisher = StreamTestKit.PublisherProbe[Publisher[Int]]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Flow(publisher).flatten(FlattenStrategy.concat).produceTo(m, subscriber)
Flow(publisher).flatten(FlattenStrategy.concat).produceTo(subscriber, m)
val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription()
@ -82,7 +82,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on cancellation cancel the current open substream and the master stream" in {
val publisher = StreamTestKit.PublisherProbe[Publisher[Int]]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Flow(publisher).flatten(FlattenStrategy.concat).produceTo(m, subscriber)
Flow(publisher).flatten(FlattenStrategy.concat).produceTo(subscriber, m)
val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription()

View file

@ -95,7 +95,7 @@ class FlowConcatSpec extends TwoStreamsSetup {
val promise = Promise[Int]()
val flow = Flow(List(1, 2, 3)).concat(Flow(promise.future).toPublisher(materializer))
val subscriber = StreamTestKit.SubscriberProbe[Int]()
flow.produceTo(materializer, subscriber)
flow.produceTo(subscriber, materializer)
val subscription = subscriber.expectSubscription()
subscription.request(4)
subscriber.expectNext(1)

View file

@ -24,7 +24,7 @@ class FlowConflateSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Flow(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).produceTo(materializer, subscriber)
Flow(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).produceTo(subscriber, materializer)
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -42,7 +42,7 @@ class FlowConflateSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Flow(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).produceTo(materializer, subscriber)
Flow(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).produceTo(subscriber, materializer)
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -67,12 +67,12 @@ class FlowConflateSpec extends AkkaSpec {
}
"backpressure subscriber when upstream is slower" in {
val oublisher = StreamTestKit.PublisherProbe[Int]()
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Flow(oublisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).produceTo(materializer, subscriber)
Flow(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).produceTo(subscriber, materializer)
val autoPublisher = new StreamTestKit.AutoPublisher(oublisher)
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
sub.request(1)

View file

@ -20,7 +20,7 @@ class FlowDropWithinSpec extends AkkaSpec {
val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[Int]()
Flow(p).dropWithin(1.second).produceTo(materializer, c)
Flow(p).dropWithin(1.second).produceTo(c, materializer)
val pSub = p.expectSubscription
val cSub = c.expectSubscription
cSub.request(100)

View file

@ -25,7 +25,7 @@ class FlowExpandSpec extends AkkaSpec {
val subscriber = StreamTestKit.SubscriberProbe[Int]()
// Simply repeat the last element as an extrapolation step
Flow(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).produceTo(materializer, subscriber)
Flow(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).produceTo(subscriber, materializer)
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -45,7 +45,7 @@ class FlowExpandSpec extends AkkaSpec {
val subscriber = StreamTestKit.SubscriberProbe[Int]()
// Simply repeat the last element as an extrapolation step
Flow(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).produceTo(materializer, subscriber)
Flow(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).produceTo(subscriber, materializer)
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -78,7 +78,7 @@ class FlowExpandSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Flow(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).produceTo(materializer, subscriber)
Flow(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).produceTo(subscriber, materializer)
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()

View file

@ -23,7 +23,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Flow(p).groupedWithin(1000, 1.second).produceTo(materializer, c)
Flow(p).groupedWithin(1000, 1.second).produceTo(c, materializer)
val pSub = p.expectSubscription
val cSub = c.expectSubscription
cSub.request(100)
@ -48,7 +48,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
"deliver bufferd elements onComplete before the timeout" in {
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Flow(1 to 3).groupedWithin(1000, 10.second).produceTo(materializer, c)
Flow(1 to 3).groupedWithin(1000, 10.second).produceTo(c, materializer)
val cSub = c.expectSubscription
cSub.request(100)
c.expectNext((1 to 3).toList)
@ -60,7 +60,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Flow(p).groupedWithin(1000, 1.second).produceTo(materializer, c)
Flow(p).groupedWithin(1000, 1.second).produceTo(c, materializer)
val pSub = p.expectSubscription
val cSub = c.expectSubscription
cSub.request(1)
@ -80,7 +80,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
"drop empty groups" in {
val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Flow(p).groupedWithin(1000, 500.millis).produceTo(materializer, c)
Flow(p).groupedWithin(1000, 500.millis).produceTo(c, materializer)
val pSub = p.expectSubscription
val cSub = c.expectSubscription
cSub.request(2)
@ -102,7 +102,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Flow(p).groupedWithin(3, 2.second).produceTo(materializer, c)
Flow(p).groupedWithin(3, 2.second).produceTo(c, materializer)
val pSub = p.expectSubscription
val cSub = c.expectSubscription
cSub.request(4)

View file

@ -25,7 +25,7 @@ class FlowMapFutureSpec extends AkkaSpec {
"produce future elements" in {
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Flow(1 to 3).mapFuture(n Future(n)).produceTo(materializer, c)
val p = Flow(1 to 3).mapFuture(n Future(n)).produceTo(c, materializer)
val sub = c.expectSubscription()
sub.request(2)
c.expectNext(1)
@ -42,7 +42,7 @@ class FlowMapFutureSpec extends AkkaSpec {
val p = Flow(1 to 50).mapFuture(n Future {
Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10))
n
}).produceTo(materializer, c)
}).produceTo(c, materializer)
val sub = c.expectSubscription()
sub.request(1000)
for (n 1 to 50) c.expectNext(n)
@ -56,7 +56,7 @@ class FlowMapFutureSpec extends AkkaSpec {
val p = Flow(1 to 20).mapFuture(n Future {
probe.ref ! n
n
}).produceTo(materializer, c)
}).produceTo(c, materializer)
val sub = c.expectSubscription()
// nothing before requested
probe.expectNoMsg(500.millis)
@ -84,7 +84,7 @@ class FlowMapFutureSpec extends AkkaSpec {
Await.ready(latch, 10.seconds)
n
}
}).produceTo(materializer, c)
}).produceTo(c, materializer)
val sub = c.expectSubscription()
sub.request(10)
c.expectError.getMessage should be("err1")
@ -103,7 +103,7 @@ class FlowMapFutureSpec extends AkkaSpec {
n
}
}).
produceTo(materializer, c)
produceTo(c, materializer)
val sub = c.expectSubscription()
sub.request(10)
c.expectError.getMessage should be("err2")

View file

@ -31,7 +31,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest {
"invoke callback on normal completion" in {
val onCompleteProbe = TestProbe()
val p = StreamTestKit.PublisherProbe[Int]()
Flow(p).onComplete(materializer) { onCompleteProbe.ref ! _ }
Flow(p).onComplete({ onCompleteProbe.ref ! _ }, materializer)
val proc = p.expectSubscription
proc.expectRequest()
proc.sendNext(42)
@ -43,7 +43,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest {
"yield the first error" in {
val onCompleteProbe = TestProbe()
val p = StreamTestKit.PublisherProbe[Int]()
Flow(p).onComplete(materializer) { onCompleteProbe.ref ! _ }
Flow(p).onComplete({ onCompleteProbe.ref ! _ }, materializer)
val proc = p.expectSubscription
proc.expectRequest()
val ex = new RuntimeException("ex") with NoStackTrace
@ -55,7 +55,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest {
"invoke callback for an empty stream" in {
val onCompleteProbe = TestProbe()
val p = StreamTestKit.PublisherProbe[Int]()
Flow(p).onComplete(materializer) { onCompleteProbe.ref ! _ }
Flow(p).onComplete({ onCompleteProbe.ref ! _ }, materializer)
val proc = p.expectSubscription
proc.expectRequest()
proc.sendComplete()
@ -71,7 +71,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest {
x
}.foreach {
x onCompleteProbe.ref ! ("foreach-" + x)
}.onComplete(materializer) { onCompleteProbe.ref ! _ }
}.onComplete({ onCompleteProbe.ref ! _ }, materializer)
val proc = p.expectSubscription
proc.expectRequest()
proc.sendNext(42)

View file

@ -49,7 +49,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val (takes, tail) = Await.result(Flow((1 to 10).iterator).prefixAndTail(10).toFuture(m), 3.seconds)
takes should be(1 to 10)
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Flow(tail).produceTo(m, subscriber)
Flow(tail).produceTo(subscriber, m)
subscriber.expectCompletedOrSubscriptionFollowedByComplete()
}
@ -57,7 +57,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[(Seq[Int], Publisher[Int])]()
Flow(publisher).prefixAndTail(3).produceTo(m, subscriber)
Flow(publisher).prefixAndTail(3).produceTo(subscriber, m)
val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription()
@ -75,7 +75,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[(Seq[Int], Publisher[Int])]()
Flow(publisher).prefixAndTail(1).produceTo(m, subscriber)
Flow(publisher).prefixAndTail(1).produceTo(subscriber, m)
val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription()
@ -90,7 +90,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
subscriber.expectComplete()
val substreamSubscriber = StreamTestKit.SubscriberProbe[Int]()
Flow(tail).produceTo(m, substreamSubscriber)
Flow(tail).produceTo(substreamSubscriber, m)
substreamSubscriber.expectSubscription()
upstream.sendError(testException)
@ -102,7 +102,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[(Seq[Int], Publisher[Int])]()
Flow(publisher).prefixAndTail(3).produceTo(m, subscriber)
Flow(publisher).prefixAndTail(3).produceTo(subscriber, m)
val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription()
@ -120,7 +120,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[(Seq[Int], Publisher[Int])]()
Flow(publisher).prefixAndTail(1).produceTo(m, subscriber)
Flow(publisher).prefixAndTail(1).produceTo(subscriber, m)
val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription()
@ -135,7 +135,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
subscriber.expectComplete()
val substreamSubscriber = StreamTestKit.SubscriberProbe[Int]()
Flow(tail).produceTo(m, substreamSubscriber)
Flow(tail).produceTo(substreamSubscriber, m)
substreamSubscriber.expectSubscription().cancel()
upstream.expectCancellation()

View file

@ -15,7 +15,7 @@ class FlowProduceToSubscriberSpec extends AkkaSpec {
"produce elements to the subscriber" in {
val c = StreamTestKit.SubscriberProbe[Int]()
Flow(List(1, 2, 3)).produceTo(materializer, c)
Flow(List(1, 2, 3)).produceTo(c, materializer)
val s = c.expectSubscription()
s.request(3)
c.expectNext(1)

View file

@ -20,7 +20,7 @@ class FlowTakeWithinSpec extends AkkaSpec {
val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[Int]()
Flow(p).takeWithin(1.second).produceTo(materializer, c)
Flow(p).takeWithin(1.second).produceTo(c, materializer)
val pSub = p.expectSubscription
val cSub = c.expectSubscription
cSub.request(100)
@ -40,7 +40,7 @@ class FlowTakeWithinSpec extends AkkaSpec {
"deliver bufferd elements onComplete before the timeout" in {
val c = StreamTestKit.SubscriberProbe[Int]()
Flow(1 to 3).takeWithin(1.second).produceTo(materializer, c)
Flow(1 to 3).takeWithin(1.second).produceTo(c, materializer)
val cSub = c.expectSubscription
c.expectNoMsg(200.millis)
cSub.request(100)

View file

@ -359,7 +359,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
case _ Nil
}
}
}).produceTo(materializer, subscriber)
}).produceTo(subscriber, materializer)
val subscription = subscriber.expectSubscription()
subscription.request(10)

View file

@ -19,7 +19,7 @@ class TickPublisherSpec extends AkkaSpec {
"produce ticks" in {
val tickGen = Iterator from 1
val c = StreamTestKit.SubscriberProbe[String]()
Flow(1.second, () "tick-" + tickGen.next()).produceTo(materializer, c)
Flow(1.second, () "tick-" + tickGen.next()).produceTo(c, materializer)
val sub = c.expectSubscription()
sub.request(3)
c.expectNext("tick-1")
@ -34,7 +34,7 @@ class TickPublisherSpec extends AkkaSpec {
"drop ticks when not requested" in {
val tickGen = Iterator from 1
val c = StreamTestKit.SubscriberProbe[String]()
Flow(1.second, () "tick-" + tickGen.next()).produceTo(materializer, c)
Flow(1.second, () "tick-" + tickGen.next()).produceTo(c, materializer)
val sub = c.expectSubscription()
sub.request(2)
c.expectNext("tick-1")
@ -75,7 +75,7 @@ class TickPublisherSpec extends AkkaSpec {
"signal onError when tick closure throws" in {
val c = StreamTestKit.SubscriberProbe[String]()
Flow(1.second, () throw new RuntimeException("tick err") with NoStackTrace).produceTo(materializer, c)
Flow(1.second, () throw new RuntimeException("tick err") with NoStackTrace).produceTo(c, materializer)
val sub = c.expectSubscription()
sub.request(3)
c.expectError.getMessage should be("tick err")
@ -84,7 +84,7 @@ class TickPublisherSpec extends AkkaSpec {
"be usable with zip for a simple form of rate limiting" in {
val c = StreamTestKit.SubscriberProbe[Int]()
val rate = Flow(1.second, () "tick").toPublisher(materializer)
Flow(1 to 100).zip(rate).map { case (n, _) n }.produceTo(materializer, c)
Flow(1 to 100).zip(rate).map { case (n, _) n }.produceTo(c, materializer)
val sub = c.expectSubscription()
sub.request(1000)
c.expectNext(1)

View file

@ -232,7 +232,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender {
val rcv = system.actorOf(receiverProps(probe.ref))
Flow(ActorPublisher[Int](snd)).collect {
case n if n % 2 == 0 "elem-" + n
}.produceTo(materializer, ActorSubscriber(rcv))
}.produceTo(ActorSubscriber(rcv), materializer)
(1 to 3) foreach { snd ! _ }
probe.expectMsg("elem-2")

View file

@ -106,7 +106,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender {
"receive requested elements" in {
val ref = system.actorOf(manualSubscriberProps(testActor))
Flow(List(1, 2, 3)).produceTo(materializer, ActorSubscriber(ref))
Flow(List(1, 2, 3)).produceTo(ActorSubscriber(ref), materializer)
expectNoMsg(200.millis)
ref ! "ready" // requesting 2
expectMsg(OnNext(1))
@ -120,14 +120,14 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender {
"signal error" in {
val ref = system.actorOf(manualSubscriberProps(testActor))
val e = new RuntimeException("simulated") with NoStackTrace
Flow(() throw e).produceTo(materializer, ActorSubscriber(ref))
Flow(() throw e).produceTo(ActorSubscriber(ref), materializer)
ref ! "ready"
expectMsg(OnError(e))
}
"remember requested after restart" in {
val ref = system.actorOf(manualSubscriberProps(testActor))
Flow(1 to 7).produceTo(materializer, ActorSubscriber(ref))
Flow(1 to 7).produceTo(ActorSubscriber(ref), materializer)
ref ! "ready"
expectMsg(OnNext(1))
expectMsg(OnNext(2))
@ -145,7 +145,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender {
"not deliver more after cancel" in {
val ref = system.actorOf(manualSubscriberProps(testActor))
Flow(1 to 5).produceTo(materializer, ActorSubscriber(ref))
Flow(1 to 5).produceTo(ActorSubscriber(ref), materializer)
ref ! "ready"
expectMsg(OnNext(1))
expectMsg(OnNext(2))
@ -155,14 +155,14 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender {
"work with OneByOneRequestStrategy" in {
val ref = system.actorOf(requestStrategySubscriberProps(testActor, OneByOneRequestStrategy))
Flow(1 to 17).produceTo(materializer, ActorSubscriber(ref))
Flow(1 to 17).produceTo(ActorSubscriber(ref), materializer)
for (n 1 to 17) expectMsg(OnNext(n))
expectMsg(OnComplete)
}
"work with WatermarkRequestStrategy" in {
val ref = system.actorOf(requestStrategySubscriberProps(testActor, WatermarkRequestStrategy(highWatermark = 10)))
Flow(1 to 17).produceTo(materializer, ActorSubscriber(ref))
Flow(1 to 17).produceTo(ActorSubscriber(ref), materializer)
for (n 1 to 17) expectMsg(OnNext(n))
expectMsg(OnComplete)
}
@ -170,7 +170,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender {
"suport custom max in flight request strategy with child workers" in {
val ref = system.actorOf(streamerProps)
val N = 117
Flow(1 to N).map(Msg(_, testActor)).produceTo(materializer, ActorSubscriber(ref))
Flow(1 to N).map(Msg(_, testActor)).produceTo(ActorSubscriber(ref), materializer)
receiveN(N).toSet should be((1 to N).map(Done(_)).toSet)
}

View file

@ -83,7 +83,7 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest {
val duct: Duct[Int, Long] = Duct[Int].map(_.toLong).timedIntervalBetween(in in % 2 == 1, d probe.ref ! d)
val c1 = StreamTestKit.SubscriberProbe[Long]()
val c2: Subscriber[Int] = duct.produceTo(materializer, c1)
val c2: Subscriber[Int] = duct.produceTo(c1, materializer)
val p = Flow(List(1, 2, 3)).toPublisher(materializer)
p.subscribe(c2)