diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala index 3801d9b7a4..c5a9b98436 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala @@ -81,7 +81,7 @@ class SendQueueBenchmark { val N = 100000 val burstSize = 1000 - val source = Source.actorRef(1024, OverflowStrategy.dropBuffer) + val source = Source.actorRef(PartialFunction.empty, PartialFunction.empty, 1024, OverflowStrategy.dropBuffer) val (ref, killSwitch) = source .viaMat(KillSwitches.single)(Keep.both) diff --git a/akka-docs/src/main/paradox/stream/operators/ActorSource/actorRefWithAck.md b/akka-docs/src/main/paradox/stream/operators/ActorSource/actorRefWithBackpressure.md similarity index 90% rename from akka-docs/src/main/paradox/stream/operators/ActorSource/actorRefWithAck.md rename to akka-docs/src/main/paradox/stream/operators/ActorSource/actorRefWithBackpressure.md index afa84deb05..98fff63d1c 100644 --- a/akka-docs/src/main/paradox/stream/operators/ActorSource/actorRefWithAck.md +++ b/akka-docs/src/main/paradox/stream/operators/ActorSource/actorRefWithBackpressure.md @@ -1,4 +1,4 @@ -# actorRefWithAck +# actorRefWithBackpressure Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source. @@ -7,7 +7,7 @@ Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`]; sending messages to it @@@ div { .group-scala } ## Signature -@@signature [Source.scala](/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala) { #actorRefWithAck } +@@signature [Source.scala](/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala) { #actorRefWithBackpressure } @@@ ## Description diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/actorRefWithAck.md b/akka-docs/src/main/paradox/stream/operators/Sink/actorRefWithBackpressure.md similarity index 69% rename from akka-docs/src/main/paradox/stream/operators/Sink/actorRefWithAck.md rename to akka-docs/src/main/paradox/stream/operators/Sink/actorRefWithBackpressure.md index 6271283e29..6f9d26da68 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/actorRefWithAck.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/actorRefWithBackpressure.md @@ -1,4 +1,4 @@ -# Sink.actorRefWithAck +# Sink.actorRefWithBackpressure Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink. @@ -14,18 +14,18 @@ to provide back pressure onto the sink. Actor to be interacted with: Scala -: @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithAck-actor } +: @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithBackpressure-actor } Java -: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithAck-actor } +: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithBackpressure-actor } -Using the `actorRefWithAck` operator with the above actor: +Using the `actorRefWithBackpressure` operator with the above actor: Scala -: @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithAck } +: @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithBackpressure } Java -: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithAck } +: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithBackpressure } ## Reactive Streams semantics diff --git a/akka-docs/src/main/paradox/stream/operators/Source/actorRefWithAck.md b/akka-docs/src/main/paradox/stream/operators/Source/actorRefWithBackpressure.md similarity index 81% rename from akka-docs/src/main/paradox/stream/operators/Source/actorRefWithAck.md rename to akka-docs/src/main/paradox/stream/operators/Source/actorRefWithBackpressure.md index afbe0ebaad..3e398f8f9a 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/actorRefWithAck.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/actorRefWithBackpressure.md @@ -1,4 +1,4 @@ -# actorRefWithAck +# actorRefWithBackpressure Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source. @@ -7,7 +7,7 @@ Materialize an `ActorRef`; sending messages to it will emit them on the stream. @@@ div { .group-scala } ## Signature -@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #actorRefWithAck } +@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #actorRefWithBackpressure } @@@ ## Description @@ -27,7 +27,7 @@ once the element could be emitted allowing for backpressure from the source. Sen Scala -: @@snip [actorRef.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #actorRefWithAck } +: @@snip [actorRef.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #actorRefWithBackpressure } Java -: @@snip [actorRef.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #actor-ref-imports #actor-ref-with-ack } +: @@snip [actorRef.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #actor-ref-imports #actorRefWithBackpressure } diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 0fb2d292e6..d4e74ab6d4 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -8,8 +8,8 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] | |Operator|Description| |--|--|--| |Source|@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream.| -|Source|@ref[actorRefWithAck](Source/actorRefWithAck.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.| -|ActorSource|@ref[actorRefWithAck](ActorSource/actorRefWithAck.md)|Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.| +|Source|@ref[actorRefWithBackpressure](Source/actorRefWithBackpressure.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.| +|ActorSource|@ref[actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md)|Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.| |Source|@ref[asSourceWithContext](Source/asSourceWithContext.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.| |Source|@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.| |Source|@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.| @@ -46,7 +46,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav | |Operator|Description| |--|--|--| |Sink|@ref[actorRef](Sink/actorRef.md)|Send the elements from the stream to an `ActorRef`.| -|Sink|@ref[actorRefWithAck](Sink/actorRefWithAck.md)|Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink.| +|Sink|@ref[actorRefWithBackpressure](Sink/actorRefWithBackpressure.md)|Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink.| |Sink|@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.| |Sink|@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream| |Sink|@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy| @@ -335,7 +335,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [lazilyAsync](Source/lazilyAsync.md) * [asSubscriber](Source/asSubscriber.md) * [actorRef](Source/actorRef.md) -* [actorRefWithAck](Source/actorRefWithAck.md) +* [actorRefWithBackpressure](Source/actorRefWithBackpressure.md) * [zipN](Source/zipN.md) * [zipWithN](Source/zipWithN.md) * [queue](Source/queue.md) @@ -444,7 +444,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [reduce](Sink/reduce.md) * [onComplete](Sink/onComplete.md) * [actorRef](Sink/actorRef.md) -* [actorRefWithAck](Sink/actorRefWithAck.md) +* [actorRefWithBackpressure](Sink/actorRefWithBackpressure.md) * [queue](Sink/queue.md) * [lazyInitAsync](Sink/lazyInitAsync.md) * [fromInputStream](StreamConverters/fromInputStream.md) @@ -464,7 +464,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [withBackoff](RestartFlow/withBackoff.md) * [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md) * [withBackoff](RestartSink/withBackoff.md) -* [actorRefWithAck](ActorSource/actorRefWithAck.md) +* [actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md) * [ask](ActorFlow/ask.md) * [actorRef](ActorSink/actorRef.md) * [Partition](Partition.md) diff --git a/akka-docs/src/main/paradox/stream/stream-integrations.md b/akka-docs/src/main/paradox/stream/stream-integrations.md index a1c51370e5..c976ee972c 100644 --- a/akka-docs/src/main/paradox/stream/stream-integrations.md +++ b/akka-docs/src/main/paradox/stream/stream-integrations.md @@ -13,7 +13,7 @@ To use Akka Streams, add the module to your project: ## Integrating with Actors For piping the elements of a stream as messages to an ordinary actor you can use -`ask` in a `mapAsync` or use `Sink.actorRefWithAck`. +`ask` in a `mapAsync` or use `Sink.actorRefWithBackpressure`. Messages can be sent to a stream with `Source.queue` or via the `ActorRef` that is materialized by `Source.actorRef`. @@ -69,10 +69,10 @@ If you are intending to ask multiple actors by using @ref:[Actor routers](../rou you should use `mapAsyncUnordered` and perform the ask manually in there, as the ordering of the replies is not important, since multiple actors are being asked concurrently to begin with, and no single actor is the one to be watched by the operator. -### Sink.actorRefWithAck +### Sink.actorRefWithBackpressure @@@ note - See also: @ref[Sink.actorRefWithAck operator reference docs](operators/Sink/actorRefWithAck.md) + See also: @ref[Sink.actorRefWithBackpressure operator reference docs](operators/Sink/actorRefWithBackpressure.md) @@@ The sink sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. @@ -85,18 +85,18 @@ given `onCompleteMessage` will be sent to the destination actor. When the stream failure a `akka.actor.Status.Failure` message will be sent to the destination actor. Scala -: @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithAck } +: @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithBackpressure } Java -: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithAck } +: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithBackpressure } The receiving actor would then need to be implemented similar to the following: Scala -: @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithAck-actor } +: @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithBackpressure-actor } Java -: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithAck-actor } +: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithBackpressure-actor } Note that replying to the sender of the elements (the "stream") is required as lack of those ack signals would be interpreted as back-pressure (as intended), and no new elements will be sent into the actor until it acknowledges some elements. @@ -111,7 +111,7 @@ Using `Sink.actorRef` or ordinary `tell` from a `map` or `foreach` operator mean no back-pressure signal from the destination actor, i.e. if the actor is not consuming the messages fast enough the mailbox of the actor will grow, unless you use a bounded mailbox with zero *mailbox-push-timeout-time* or use a rate limiting operator in front. It's often better to -use `Sink.actorRefWithAck` or `ask` in `mapAsync`, though. +use `Sink.actorRefWithBackpressure` or `ask` in `mapAsync`, though. @@@ diff --git a/akka-docs/src/main/paradox/typed/stream.md b/akka-docs/src/main/paradox/typed/stream.md index 0f04481cfc..365b2f1d84 100644 --- a/akka-docs/src/main/paradox/typed/stream.md +++ b/akka-docs/src/main/paradox/typed/stream.md @@ -40,10 +40,10 @@ Scala Java : @@snip [ActorSinkExample.java](/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkExample.java) { #actor-sink-ref } -For an actor to be able to react to backpressure, a protocol needs to be introduced between the actor and the stream. Use @scala[@scaladoc[`ActorSink.actorRefWithAck`](akka.stream.typed.scaladsl.ActorSink#actorRefWithAck)]@java[@javadoc[`ActorSink.actorRefWithAck`](akka.stream.typed.javadsl.ActorSink#actorRefWithAck)] to be able to signal demand when the actor is ready to receive more elements. +For an actor to be able to react to backpressure, a protocol needs to be introduced between the actor and the stream. Use @scala[@scaladoc[`ActorSink.actorRefWithBackpressure`](akka.stream.typed.scaladsl.ActorSink#actorRefWithBackpressure)]@java[@javadoc[`ActorSink.actorRefWithBackpressure`](akka.stream.typed.javadsl.ActorSink#actorRefWithBackpressure)] to be able to signal demand when the actor is ready to receive more elements. Scala -: @@snip [ActorSourceSinkExample.scala](/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-sink-ref-with-ack } +: @@snip [ActorSourceSinkExample.scala](/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-sink-ref-with-backpressure } Java -: @@snip [ActorSinkWithAckExample.java](/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java) { #actor-sink-ref-with-ack } +: @@snip [ActorSinkWithAckExample.java](/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java) { #actor-sink-ref-with-backpressure } diff --git a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java index 56e23e2887..03c83e2b0d 100644 --- a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java @@ -320,7 +320,7 @@ public class IntegrationDocTest extends AbstractJavaTest { } // #ask-actor - // #actorRefWithAck-actor + // #actorRefWithBackpressure-actor enum Ack { INSTANCE; } @@ -381,7 +381,7 @@ public class IntegrationDocTest extends AbstractJavaTest { .build(); } } - // #actorRefWithAck-actor + // #actorRefWithBackpressure-actor @SuppressWarnings("unchecked") @Test @@ -399,8 +399,8 @@ public class IntegrationDocTest extends AbstractJavaTest { } @Test - public void actorRefWithAckExample() throws Exception { - // #actorRefWithAck + public void actorRefWithBackpressure() throws Exception { + // #actorRefWithBackpressure Source words = Source.from(Arrays.asList("hello", "hi")); final TestKit probe = new TestKit(system); @@ -408,7 +408,7 @@ public class IntegrationDocTest extends AbstractJavaTest { ActorRef receiver = system.actorOf(Props.create(AckingReceiver.class, probe.getRef())); Sink sink = - Sink.actorRefWithAck( + Sink.actorRefWithBackpressure( receiver, new StreamInitialized(), Ack.INSTANCE, @@ -421,7 +421,7 @@ public class IntegrationDocTest extends AbstractJavaTest { probe.expectMsg("hello"); probe.expectMsg("hi"); probe.expectMsg("Stream completed"); - // #actorRefWithAck + // #actorRefWithBackpressure } @Test diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java index 7f21c4122e..82be3e0958 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java @@ -23,6 +23,7 @@ import akka.testkit.TestProbe; // #actor-ref-imports import java.util.Arrays; +import java.util.Optional; // #imports @@ -86,13 +87,20 @@ public class SourceDocExamples { // #actor-ref } - static void actorRefWithAck() { + static void actorRefWithBackpressure() { final TestProbe probe = null; - // #actor-ref-with-ack + // #actorRefWithBackpressure final ActorSystem system = ActorSystem.create(); - Source source = Source.actorRefWithAck("ack"); + Source source = + Source.actorRefWithBackpressure( + "ack", + o -> { + if (o == "complete") return Optional.of(CompletionStrategy.draining()); + else return Optional.empty(); + }, + o -> Optional.empty()); ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(system); probe.send(actorRef, "hello"); @@ -101,7 +109,7 @@ public class SourceDocExamples { probe.expectMsg("ack"); // The stream completes successfully with the following message - actorRef.tell(new Success(CompletionStrategy.draining()), ActorRef.noSender()); - // #actor-ref-with-ack + actorRef.tell("complete", ActorRef.noSender()); + // #actorRefWithBackpressure } } diff --git a/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala b/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala index 49a746c1af..fd664d4e77 100644 --- a/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala @@ -187,8 +187,8 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { probe.expectMsg("akkateam@somewhere.com") } - "actorRefWithAck" in { - //#actorRefWithAck + "actorRefWithBackpressure" in { + //#actorRefWithBackpressure val words: Source[String, NotUsed] = Source(List("hello", "hi")) @@ -202,7 +202,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { val probe = TestProbe() val receiver = system.actorOf(Props(new AckingReceiver(probe.ref, ackWith = AckMessage))) - val sink = Sink.actorRefWithAck( + val sink = Sink.actorRefWithBackpressure( receiver, onInitMessage = InitMessage, ackMessage = AckMessage, @@ -215,10 +215,10 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { probe.expectMsg("hello") probe.expectMsg("hi") probe.expectMsg("Stream completed!") - //#actorRefWithAck + //#actorRefWithBackpressure } - //#actorRefWithAck-actor + //#actorRefWithBackpressure-actor object AckingReceiver { case object Ack @@ -248,7 +248,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { log.error(ex, "Stream failed!") } } - //#actorRefWithAck-actor + //#actorRefWithBackpressure-actor "lookup email with mapAsync and supervision" in { val addressSystem = new AddressSystem2 diff --git a/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala b/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala index 29d1269f7c..15036c486e 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala @@ -50,8 +50,8 @@ object SourceOperators { //#actorRef } - def actorRefWithAck(): Unit = { - //#actorRefWithAck + def actorRefWithBackpressure(): Unit = { + //#actorRefWithBackpressure import akka.actor.Status.Success import akka.actor.ActorRef @@ -61,7 +61,9 @@ object SourceOperators { implicit val system: ActorSystem = ActorSystem() val probe = TestProbe() - val source: Source[Any, ActorRef] = Source.actorRefWithAck[Any]("ack") + val source: Source[Any, ActorRef] = Source.actorRefWithBackpressure[Any]("ack", { + case _: Success => CompletionStrategy.immediately + }, PartialFunction.empty) val actorRef: ActorRef = source.to(Sink.foreach(println)).run() probe.send(actorRef, "hello") @@ -70,7 +72,7 @@ object SourceOperators { probe.expectMsg("ack") // The stream completes successfully with the following message - actorRef ! Success(CompletionStrategy.immediately) - //#actorRefWithAck + actorRef ! Success(()) + //#actorRefWithBackpressure } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index b116e62290..ea83b362cd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -142,8 +142,10 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "collection", _ => true, _ => true), Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRef", _ => true, _ => true), // Internal in scaladsl Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRefWithAck", _ => true, _ => true), // Internal in scaladsl + Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRefWithBackpressure", _ => true, _ => true), // Internal in scaladsl Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "actorRef", _ => true, _ => true), // Internal in scaladsl Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "actorRefWithAck", _ => true, _ => true), // Internal in scaladsl + Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "actorRefWithBackpressure", _ => true, _ => true), // Internal in scaladsl Ignore(_ == akka.stream.scaladsl.BidiFlow.getClass, _ == "apply", _ == 24, _ => true), Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "runnable", _ == 24, _ => true), Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "create", _ == 24, _ => true), diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala index 35fdc93460..769fe5d45e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala @@ -4,10 +4,7 @@ package akka.stream.scaladsl -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.Props -import akka.actor.Status +import akka.actor.{ Actor, ActorRef, Props } import akka.stream.Attributes.inputBuffer import akka.stream.Materializer import akka.stream.testkit._ @@ -21,6 +18,7 @@ import scala.concurrent.duration._ object ActorRefBackpressureSinkSpec { val initMessage = "start" val completeMessage = "done" + val failMessage = "failed" val ackMessage = "ack" class Fw(ref: ActorRef) extends Actor { @@ -62,7 +60,8 @@ class ActorRefBackpressureSinkSpec extends StreamSpec { "send the elements to the ActorRef" in assertAllStagesStopped { val fw = createActor(classOf[Fw]) - Source(List(1, 2, 3)).runWith(Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage)) + Source(List(1, 2, 3)) + .runWith(Sink.actorRefWithBackpressure(fw, initMessage, ackMessage, completeMessage, _ => failMessage)) expectMsg("start") expectMsg(1) expectMsg(2) @@ -72,7 +71,10 @@ class ActorRefBackpressureSinkSpec extends StreamSpec { "send the elements to the ActorRef2" in assertAllStagesStopped { val fw = createActor(classOf[Fw]) - val probe = TestSource.probe[Int].to(Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage)).run() + val probe = TestSource + .probe[Int] + .to(Sink.actorRefWithBackpressure(fw, initMessage, ackMessage, completeMessage, _ => failMessage)) + .run() probe.sendNext(1) expectMsg("start") expectMsg(1) @@ -87,7 +89,11 @@ class ActorRefBackpressureSinkSpec extends StreamSpec { "cancel stream when actor terminates" in assertAllStagesStopped { val fw = createActor(classOf[Fw]) val publisher = - TestSource.probe[Int].to(Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage)).run().sendNext(1) + TestSource + .probe[Int] + .to(Sink.actorRefWithBackpressure(fw, initMessage, ackMessage, completeMessage, _ => failMessage)) + .run() + .sendNext(1) expectMsg(initMessage) expectMsg(1) system.stop(fw) @@ -96,7 +102,10 @@ class ActorRefBackpressureSinkSpec extends StreamSpec { "send message only when backpressure received" in assertAllStagesStopped { val fw = createActor(classOf[Fw2]) - val publisher = TestSource.probe[Int].to(Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage)).run() + val publisher = TestSource + .probe[Int] + .to(Sink.actorRefWithBackpressure(fw, initMessage, ackMessage, completeMessage, _ => failMessage)) + .run() expectMsg(initMessage) publisher.sendNext(1) @@ -120,7 +129,7 @@ class ActorRefBackpressureSinkSpec extends StreamSpec { val streamElementCount = bufferSize + 4 val fw = createActor(classOf[Fw2]) val sink = Sink - .actorRefWithAck(fw, initMessage, ackMessage, completeMessage) + .actorRefWithBackpressure(fw, initMessage, ackMessage, completeMessage, _ => failMessage) .withAttributes(inputBuffer(bufferSize, bufferSize)) val bufferFullProbe = Promise[akka.Done.type] Source(1 to streamElementCount) @@ -142,7 +151,10 @@ class ActorRefBackpressureSinkSpec extends StreamSpec { val publisher = TestSource .probe[Int] - .to(Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage).withAttributes(inputBuffer(1, 1))) + .to( + Sink + .actorRefWithBackpressure(fw, initMessage, ackMessage, completeMessage, _ => failMessage) + .withAttributes(inputBuffer(1, 1))) .run() expectMsg(initMessage) @@ -166,7 +178,9 @@ class ActorRefBackpressureSinkSpec extends StreamSpec { val fw = createActor(classOf[Fw]) an[IllegalArgumentException] shouldBe thrownBy { val badSink = - Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage).withAttributes(inputBuffer(0, 0)) + Sink + .actorRefWithBackpressure(fw, initMessage, ackMessage, completeMessage, _ => failMessage) + .withAttributes(inputBuffer(0, 0)) Source.single(()).runWith(badSink) } } @@ -176,14 +190,19 @@ class ActorRefBackpressureSinkSpec extends StreamSpec { val probe = TestProbe() val sink = Sink - .actorRefWithAck[String](probe.ref, initMessage, ackMessage, completeMessage) + .actorRefWithBackpressure[String]( + probe.ref, + initMessage, + ackMessage, + completeMessage, + (_: Throwable) => failMessage) .withAttributes(inputBuffer(1, 1)) Source.maybe[String].to(sink).run()(mat) probe.expectMsg(initMessage) mat.shutdown() - probe.expectMsgType[Status.Failure] + probe.expectMsg(failMessage) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSourceSpec.scala index d0b1ff9175..a32fa6fa6c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSourceSpec.scala @@ -8,6 +8,7 @@ import akka.actor.Status import akka.stream.testkit.Utils.TE import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import akka.stream.testkit.scaladsl.TestSink +import akka.stream.CompletionStrategy import akka.stream.testkit.StreamSpec import akka.testkit.TestProbe @@ -20,11 +21,16 @@ private object ActorRefBackpressureSourceSpec { class ActorRefBackpressureSourceSpec extends StreamSpec { import ActorRefBackpressureSourceSpec._ - "An Source.actorRefWithAck" must { + "An Source.actorRefWithBackpressure" must { "emit received messages to the stream and ack" in assertAllStagesStopped { val probe = TestProbe() - val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run() + val (ref, s) = Source + .actorRefWithBackpressure[Int]( + AckMsg, { case "ok" => CompletionStrategy.draining }: PartialFunction[Any, CompletionStrategy], + PartialFunction.empty) + .toMat(TestSink.probe[Int])(Keep.both) + .run() val sub = s.expectSubscription() sub.request(10) @@ -38,12 +44,15 @@ class ActorRefBackpressureSourceSpec extends StreamSpec { s.expectNoMessage(50.millis) - ref ! Status.Success("ok") + ref ! "ok" s.expectComplete() } "fail when consumer does not await ack" in assertAllStagesStopped { - val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run() + val (ref, s) = Source + .actorRefWithBackpressure[Int](AckMsg, PartialFunction.empty, PartialFunction.empty) + .toMat(TestSink.probe[Int])(Keep.both) + .run() val sub = s.expectSubscription() for (n <- 1 to 20) ref ! n @@ -65,7 +74,12 @@ class ActorRefBackpressureSourceSpec extends StreamSpec { "complete after receiving Status.Success" in assertAllStagesStopped { val probe = TestProbe() - val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run() + val (ref, s) = Source + .actorRefWithBackpressure[Int]( + AckMsg, { case "ok" => CompletionStrategy.draining }: PartialFunction[Any, CompletionStrategy], + PartialFunction.empty) + .toMat(TestSink.probe[Int])(Keep.both) + .run() val sub = s.expectSubscription() sub.request(10) @@ -73,14 +87,19 @@ class ActorRefBackpressureSourceSpec extends StreamSpec { s.expectNext(1) probe.expectMsg(AckMsg) - ref ! Status.Success("ok") + ref ! "ok" s.expectComplete() } "fail after receiving Status.Failure" in assertAllStagesStopped { val probe = TestProbe() - val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run() + val (ref, s) = Source + .actorRefWithBackpressure[Int]( + AckMsg, + PartialFunction.empty, { case Status.Failure(f) => f }: PartialFunction[Any, Throwable]) + .toMat(TestSink.probe[Int])(Keep.both) + .run() val sub = s.expectSubscription() sub.request(10) @@ -95,7 +114,12 @@ class ActorRefBackpressureSourceSpec extends StreamSpec { "not buffer elements after receiving Status.Success" in assertAllStagesStopped { val probe = TestProbe() - val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run() + val (ref, s) = Source + .actorRefWithBackpressure[Int]( + AckMsg, { case "ok" => CompletionStrategy.draining }: PartialFunction[Any, CompletionStrategy], + PartialFunction.empty) + .toMat(TestSink.probe[Int])(Keep.both) + .run() val sub = s.expectSubscription() sub.request(10) @@ -107,7 +131,7 @@ class ActorRefBackpressureSourceSpec extends StreamSpec { s.expectNext(2) probe.expectMsg(AckMsg) - ref ! Status.Success("ok") + ref ! "ok" probe.send(ref, 100) probe.send(ref, 100) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala index 5b80639698..de6dd809c6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala @@ -28,7 +28,7 @@ class ActorRefSinkSpec extends StreamSpec { "A ActorRefSink" must { "send the elements to the ActorRef" in assertAllStagesStopped { - Source(List(1, 2, 3)).runWith(Sink.actorRef(testActor, onCompleteMessage = "done")) + Source(List(1, 2, 3)).runWith(Sink.actorRef(testActor, onCompleteMessage = "done", _ => "failure")) expectMsg(1) expectMsg(2) expectMsg(3) @@ -38,7 +38,12 @@ class ActorRefSinkSpec extends StreamSpec { "cancel stream when actor terminates" in assertAllStagesStopped { val fw = system.actorOf(Props(classOf[Fw], testActor).withDispatcher("akka.test.stream-dispatcher")) val publisher = - TestSource.probe[Int].to(Sink.actorRef(fw, onCompleteMessage = "done")).run().sendNext(1).sendNext(2) + TestSource + .probe[Int] + .to(Sink.actorRef(fw, onCompleteMessage = "done", _ => "failure")) + .run() + .sendNext(1) + .sendNext(2) expectMsg(1) expectMsg(2) system.stop(fw) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala index 7a226516c2..58b84b294f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala @@ -5,10 +5,8 @@ package akka.stream.scaladsl import akka.Done -import akka.actor.ActorRef -import akka.actor.PoisonPill -import akka.actor.Status -import akka.stream._ +import akka.actor.{ ActorRef, PoisonPill, Status } +import akka.stream.{ OverflowStrategy, _ } import akka.stream.testkit.Utils._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.StreamTestKit._ @@ -23,7 +21,10 @@ class ActorRefSourceSpec extends StreamSpec { "emit received messages to the stream" in { val s = TestSubscriber.manualProbe[Int]() - val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() + val ref = Source + .actorRef(PartialFunction.empty, PartialFunction.empty, 10, OverflowStrategy.fail) + .to(Sink.fromSubscriber(s)) + .run() val sub = s.expectSubscription() sub.request(2) ref ! 1 @@ -36,7 +37,10 @@ class ActorRefSourceSpec extends StreamSpec { "buffer when needed" in { val s = TestSubscriber.manualProbe[Int]() - val ref = Source.actorRef(100, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run() + val ref = Source + .actorRef(PartialFunction.empty, PartialFunction.empty, 100, OverflowStrategy.dropHead) + .to(Sink.fromSubscriber(s)) + .run() val sub = s.expectSubscription() for (n <- 1 to 20) ref ! n sub.request(10) @@ -50,7 +54,10 @@ class ActorRefSourceSpec extends StreamSpec { } "drop new when full and with dropNew strategy" in { - val (ref, sub) = Source.actorRef(100, OverflowStrategy.dropNew).toMat(TestSink.probe[Int])(Keep.both).run() + val (ref, sub) = Source + .actorRef(PartialFunction.empty, PartialFunction.empty, 100, OverflowStrategy.dropNew) + .toMat(TestSink.probe[Int])(Keep.both) + .run() for (n <- 1 to 20) ref ! n sub.request(10) @@ -65,7 +72,10 @@ class ActorRefSourceSpec extends StreamSpec { "terminate when the stream is cancelled" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() - val ref = Source.actorRef(0, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() + val ref = Source + .actorRef(PartialFunction.empty, PartialFunction.empty, 0, OverflowStrategy.fail) + .to(Sink.fromSubscriber(s)) + .run() watch(ref) val sub = s.expectSubscription() sub.cancel() @@ -74,7 +84,10 @@ class ActorRefSourceSpec extends StreamSpec { "not fail when 0 buffer space and demand is signalled" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() - val ref = Source.actorRef(0, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run() + val ref = Source + .actorRef(PartialFunction.empty, PartialFunction.empty, 0, OverflowStrategy.dropHead) + .to(Sink.fromSubscriber(s)) + .run() watch(ref) val sub = s.expectSubscription() sub.request(100) @@ -84,12 +97,15 @@ class ActorRefSourceSpec extends StreamSpec { "signal buffered elements and complete the stream after receiving Status.Success" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() - val ref = Source.actorRef(3, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() + val ref = Source + .actorRef({ case "ok" => CompletionStrategy.draining }, PartialFunction.empty, 3, OverflowStrategy.fail) + .to(Sink.fromSubscriber(s)) + .run() val sub = s.expectSubscription() ref ! 1 ref ! 2 ref ! 3 - ref ! Status.Success("ok") + ref ! "ok" sub.request(10) s.expectNext(1, 2, 3) s.expectComplete() @@ -97,22 +113,28 @@ class ActorRefSourceSpec extends StreamSpec { "signal buffered elements and complete the stream after receiving a Status.Success companion" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() - val ref = Source.actorRef(3, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() + val ref = Source + .actorRef({ case "ok" => CompletionStrategy.draining }, PartialFunction.empty, 3, OverflowStrategy.fail) + .to(Sink.fromSubscriber(s)) + .run() val sub = s.expectSubscription() ref ! 1 ref ! 2 ref ! 3 - ref ! Status.Success + ref ! "ok" sub.request(10) s.expectNext(1, 2, 3) s.expectComplete() } "signal buffered elements and complete the stream after receiving a Status.Success with CompletionStrategy.Draining" in assertAllStagesStopped { - val (ref, s) = Source.actorRef(100, OverflowStrategy.fail).toMat(TestSink.probe[Int])(Keep.both).run() + val (ref, s) = Source + .actorRef({ case "ok" => CompletionStrategy.draining }, PartialFunction.empty, 100, OverflowStrategy.fail) + .toMat(TestSink.probe[Int])(Keep.both) + .run() for (n <- 1 to 20) ref ! n - ref ! Status.Success(CompletionStrategy.Draining) + ref ! "ok" s.request(10) for (n <- 1 to 10) s.expectNext(n) @@ -123,10 +145,13 @@ class ActorRefSourceSpec extends StreamSpec { } "not signal buffered elements but complete immediately the stream after receiving a Status.Success with CompletionStrategy.Immediately" in assertAllStagesStopped { - val (ref, s) = Source.actorRef(100, OverflowStrategy.fail).toMat(TestSink.probe[Int])(Keep.both).run() + val (ref, s) = Source + .actorRef({ case "ok" => CompletionStrategy.immediately }, PartialFunction.empty, 100, OverflowStrategy.fail) + .toMat(TestSink.probe[Int])(Keep.both) + .run() for (n <- 1 to 20) ref ! n - ref ! Status.Success(CompletionStrategy.Immediately) + ref ! "ok" s.request(10) @@ -144,7 +169,10 @@ class ActorRefSourceSpec extends StreamSpec { } "not signal buffered elements but complete immediately the stream after receiving a PoisonPill (backwards compatibility)" in assertAllStagesStopped { - val (ref, s) = Source.actorRef(100, OverflowStrategy.fail).toMat(TestSink.probe[Int])(Keep.both).run() + val (ref, s) = Source + .actorRef(PartialFunction.empty, PartialFunction.empty, 100, OverflowStrategy.fail) + .toMat(TestSink.probe[Int])(Keep.both) + .run() for (n <- 1 to 20) ref ! n ref ! PoisonPill @@ -166,12 +194,15 @@ class ActorRefSourceSpec extends StreamSpec { "not buffer elements after receiving Status.Success" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() - val ref = Source.actorRef(3, OverflowStrategy.dropBuffer).to(Sink.fromSubscriber(s)).run() + val ref = Source + .actorRef({ case "ok" => CompletionStrategy.draining }, PartialFunction.empty, 3, OverflowStrategy.dropBuffer) + .to(Sink.fromSubscriber(s)) + .run() val sub = s.expectSubscription() ref ! 1 ref ! 2 ref ! 3 - ref ! Status.Success("ok") + ref ! "ok" ref ! 100 ref ! 100 ref ! 100 @@ -182,15 +213,21 @@ class ActorRefSourceSpec extends StreamSpec { "complete and materialize the stream after receiving Status.Success" in assertAllStagesStopped { val (ref, done) = { - Source.actorRef(3, OverflowStrategy.dropBuffer).toMat(Sink.ignore)(Keep.both).run() + Source + .actorRef({ case "ok" => CompletionStrategy.draining }, PartialFunction.empty, 3, OverflowStrategy.dropBuffer) + .toMat(Sink.ignore)(Keep.both) + .run() } - ref ! Status.Success("ok") + ref ! "ok" done.futureValue should be(Done) } "fail the stream when receiving Status.Failure" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() - val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() + val ref = Source + .actorRef(PartialFunction.empty, { case Status.Failure(exc) => exc }, 10, OverflowStrategy.fail) + .to(Sink.fromSubscriber(s)) + .run() s.expectSubscription() val exc = TE("testfailure") ref ! Status.Failure(exc) @@ -201,7 +238,7 @@ class ActorRefSourceSpec extends StreamSpec { val s = TestSubscriber.manualProbe[Int]() val name = "SomeCustomName" val ref = Source - .actorRef(10, OverflowStrategy.fail) + .actorRef(PartialFunction.empty, PartialFunction.empty, 10, OverflowStrategy.fail) .withAttributes(Attributes.name(name)) .to(Sink.fromSubscriber(s)) .run() @@ -212,7 +249,8 @@ class ActorRefSourceSpec extends StreamSpec { "be possible to run immediately, reproducer of #26714" in { (1 to 100).foreach { _ => val mat = Materializer(system) - val source: Source[String, ActorRef] = Source.actorRef[String](10000, OverflowStrategy.fail) + val source: Source[String, ActorRef] = + Source.actorRef[String](PartialFunction.empty, PartialFunction.empty, 10000, OverflowStrategy.fail) val (_: ActorRef, _: Publisher[String]) = source.toMat(Sink.asPublisher(false))(Keep.both).run()(mat) mat.shutdown() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala index c15fb143d1..156bb719dd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala @@ -315,7 +315,11 @@ class FlowThrottleSpec extends StreamSpec(""" val expectedMinRate = new AtomicInteger val expectedMaxRate = new AtomicInteger val (ref, done) = Source - .actorRef[Int](bufferSize = 100000, OverflowStrategy.fail) + .actorRef[Int]( + { case "done" => CompletionStrategy.draining }: PartialFunction[Any, CompletionStrategy], + PartialFunction.empty, + bufferSize = 100000, + OverflowStrategy.fail) .throttle(300, 1000.millis) .toMat(Sink.foreach { elem => val now = System.nanoTime() @@ -366,7 +370,7 @@ class FlowThrottleSpec extends StreamSpec(""" } } } - ref ! akka.actor.Status.Success("done") + ref ! "done" Await.result(done, 20.seconds) should ===(Done) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala index 9215446890..d3e92c202f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala @@ -5,31 +5,18 @@ package akka.stream.scaladsl import akka.NotUsed -import akka.actor.Actor -import akka.actor.ActorIdentity -import akka.actor.ActorLogging -import akka.actor.ActorRef -import akka.actor.ActorSystem -import akka.actor.ActorSystemImpl -import akka.actor.Identify -import akka.actor.Props -import akka.actor.Status.Failure +import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, ActorSystem, ActorSystemImpl, Identify, Props } import akka.pattern._ import akka.stream._ -import akka.stream.impl.streamref.SinkRefImpl -import akka.stream.impl.streamref.SourceRefImpl +import akka.stream.impl.streamref.{ SinkRefImpl, SourceRefImpl } import akka.stream.testkit.TestPublisher import akka.stream.testkit.scaladsl._ -import akka.testkit.AkkaSpec -import akka.testkit.ImplicitSender -import akka.testkit.TestKit -import akka.testkit.TestProbe +import akka.testkit.{ AkkaSpec, ImplicitSender, TestKit, TestProbe } import akka.util.ByteString import com.typesafe.config._ import scala.collection.immutable -import scala.concurrent.Await -import scala.concurrent.Future +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -96,7 +83,7 @@ object StreamRefsSpec { * For them it's a Sink; for us it's a Source. */ val sink = - StreamRefs.sinkRef[String]().to(Sink.actorRef(probe, "")).run() + StreamRefs.sinkRef[String]().to(Sink.actorRef(probe, "", f => ": " + f.getMessage)).run() sender() ! sink case "receive-ignore" => @@ -108,7 +95,7 @@ object StreamRefsSpec { val sink = StreamRefs .sinkRef[String]() .withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis)) - .to(Sink.actorRef(probe, "")) + .to(Sink.actorRef(probe, "", f => ": " + f.getMessage)) .run() sender() ! sink @@ -209,7 +196,7 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend remoteActor ! "give" val sourceRef = expectMsgType[SourceRef[String]] - sourceRef.runWith(Sink.actorRef(p.ref, "")) + sourceRef.runWith(Sink.actorRef(p.ref, "", _ => "")) p.expectMsg("hello") p.expectMsg("world") @@ -220,12 +207,12 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend remoteActor ! "give-fail" val sourceRef = expectMsgType[SourceRef[String]] - sourceRef.runWith(Sink.actorRef(p.ref, "")) + sourceRef.runWith(Sink.actorRef(p.ref, "", t => ": " + t.getMessage)) - val f = p.expectMsgType[Failure] - f.cause.getMessage should include("Remote stream (") + val f = p.expectMsgType[String] + f should include("Remote stream (") // actor name here, for easier identification - f.cause.getMessage should include("failed, reason: Booooom!") + f should include("failed, reason: Booooom!") } "complete properly when remote source is empty" in { @@ -234,7 +221,7 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend remoteActor ! "give-complete-asap" val sourceRef = expectMsgType[SourceRef[String]] - sourceRef.runWith(Sink.actorRef(p.ref, "")) + sourceRef.runWith(Sink.actorRef(p.ref, "", _ => "")) p.expectMsg("") } @@ -332,10 +319,10 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend val remoteFailureMessage = "Booom!" Source.failed(new Exception(remoteFailureMessage)).to(remoteSink).run() - val f = p.expectMsgType[akka.actor.Status.Failure] - f.cause.getMessage should include(s"Remote stream (") + val f = p.expectMsgType[String] + f should include(s"Remote stream (") // actor name ere, for easier identification - f.cause.getMessage should include(s"failed, reason: $remoteFailureMessage") + f should include(s"failed, reason: $remoteFailureMessage") } "receive hundreds of elements via remoting" in { @@ -359,8 +346,8 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend val probe = TestSource.probe[String](system).to(remoteSink).run() - val failure = p.expectMsgType[Failure] - failure.cause.getMessage should include("Remote side did not subscribe (materialize) handed out Sink reference") + val failure = p.expectMsgType[String] + failure should include("Remote side did not subscribe (materialize) handed out Sink reference") // the local "remote sink" should cancel, since it should notice the origin target actor is dead probe.expectCancellation() diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala index a39ceedac3..42425d5a6a 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala @@ -49,6 +49,40 @@ object ActorSink { * When the stream is completed with failure - result of `onFailureMessage(throwable)` * function will be sent to the destination actor. */ + def actorRefWithBackpressure[T, M, A]( + ref: ActorRef[M], + messageAdapter: akka.japi.function.Function2[ActorRef[A], T, M], + onInitMessage: akka.japi.function.Function[ActorRef[A], M], + ackMessage: A, + onCompleteMessage: M, + onFailureMessage: akka.japi.function.Function[Throwable, M]): Sink[T, NotUsed] = + typed.scaladsl.ActorSink + .actorRefWithBackpressure( + ref, + messageAdapter.apply, + onInitMessage.apply, + ackMessage, + onCompleteMessage, + onFailureMessage.apply) + .asJava + + /** + * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. + * First element is always `onInitMessage`, then stream is waiting for acknowledgement message + * `ackMessage` from the given actor which means that it is ready to process + * elements. It also requires `ackMessage` message after each stream element + * to make backpressure work. + * + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure - result of `onFailureMessage(throwable)` + * function will be sent to the destination actor. + * + * @deprecated Use actorRefWithBackpressure instead + */ + @Deprecated + @deprecated("Use actorRefWithBackpressure instead", "2.6.0") def actorRefWithAck[T, M, A]( ref: ActorRef[M], messageAdapter: akka.japi.function.Function2[ActorRef[A], T, M], @@ -57,7 +91,7 @@ object ActorSink { onCompleteMessage: M, onFailureMessage: akka.japi.function.Function[Throwable, M]): Sink[T, NotUsed] = typed.scaladsl.ActorSink - .actorRefWithAck( + .actorRefWithBackpressure( ref, messageAdapter.apply, onInitMessage.apply, diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala index 95b4cd856f..1880623f8c 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala @@ -82,13 +82,56 @@ object ActorSource { * The actor will be stopped when the stream is completed, failed or canceled from downstream, * i.e. you can watch it to get notified when that happens. */ + def actorRefWithBackpressure[T, Ack]( + ackTo: ActorRef[Ack], + ackMessage: Ack, + completionMatcher: akka.japi.function.Function[T, java.util.Optional[CompletionStrategy]], + failureMatcher: akka.japi.function.Function[T, java.util.Optional[Throwable]]): Source[T, ActorRef[T]] = + akka.stream.typed.scaladsl.ActorSource + .actorRefWithBackpressure[T, Ack]( + ackTo, + ackMessage, + new JavaPartialFunction[T, CompletionStrategy] { + override def apply(x: T, isCheck: Boolean): CompletionStrategy = { + val result = completionMatcher(x) + if (!result.isPresent) throw JavaPartialFunction.noMatch() + else result.get() + } + }, + new JavaPartialFunction[T, Throwable] { + override def apply(x: T, isCheck: Boolean): Throwable = { + val result = failureMatcher(x) + if (!result.isPresent) throw JavaPartialFunction.noMatch() + else result.get() + } + }) + .asJava + + /** + * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * and a new message will only be accepted after the previous messages has been consumed and acknowledged back. + * The stream will complete with failure if a message is sent before the acknowledgement has been replied back. + * + * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted + * [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received + * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, + * the failure will be signaled downstream immediately (instead of the completion signal). + * + * The actor will be stopped when the stream is completed, failed or canceled from downstream, + * i.e. you can watch it to get notified when that happens. + * + * @deprecated Use actorRefWithBackpressure instead + */ + @Deprecated + @deprecated("Use actorRefWithBackpressure instead", "2.6.0") def actorRefWithAck[T, Ack]( ackTo: ActorRef[Ack], ackMessage: Ack, completionMatcher: akka.japi.function.Function[T, java.util.Optional[CompletionStrategy]], failureMatcher: akka.japi.function.Function[T, java.util.Optional[Throwable]]): Source[T, ActorRef[T]] = akka.stream.typed.scaladsl.ActorSource - .actorRefWithAck[T, Ack]( + .actorRefWithBackpressure[T, Ack]( ackTo, ackMessage, new JavaPartialFunction[T, CompletionStrategy] { diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala index 457404c652..53669718eb 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala @@ -46,6 +46,35 @@ object ActorSink { * When the stream is completed with failure - result of `onFailureMessage(throwable)` * function will be sent to the destination actor. */ + def actorRefWithBackpressure[T, M, A]( + ref: ActorRef[M], + messageAdapter: (ActorRef[A], T) => M, + onInitMessage: ActorRef[A] => M, + ackMessage: A, + onCompleteMessage: M, + onFailureMessage: Throwable => M): Sink[T, NotUsed] = + Sink.actorRefWithAck( + ref.toClassic, + messageAdapter.curried.compose(actorRefAdapter), + onInitMessage.compose(actorRefAdapter), + ackMessage, + onCompleteMessage, + onFailureMessage) + + /** + * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. + * First element is always `onInitMessage`, then stream is waiting for acknowledgement message + * `ackMessage` from the given actor which means that it is ready to process + * elements. It also requires `ackMessage` message after each stream element + * to make backpressure work. + * + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure - result of `onFailureMessage(throwable)` + * function will be sent to the destination actor. + */ + @deprecated("Use actorRefWithBackpressure instead", "2.6.0") def actorRefWithAck[T, M, A]( ref: ActorRef[M], messageAdapter: (ActorRef[A], T) => M, diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala index d9a667993a..a3cfb2279f 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala @@ -61,7 +61,7 @@ object ActorSource { .mapMaterializedValue(actorRefAdapter) /** - * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. + * Creates a `Source` that is materialized as an [[akka.actor.typed.ActorRef]]. * Messages sent to this actor will be emitted to the stream if there is demand from downstream, * and a new message will only be accepted after the previous messages has been consumed and acknowledged back. * The stream will complete with failure if a message is sent before the acknowledgement has been replied back. @@ -74,6 +74,34 @@ object ActorSource { * The actor will be stopped when the stream is completed, failed or canceled from downstream, * i.e. you can watch it to get notified when that happens. */ + def actorRefWithBackpressure[T, Ack]( + ackTo: ActorRef[Ack], + ackMessage: Ack, + completionMatcher: PartialFunction[T, CompletionStrategy], + failureMatcher: PartialFunction[T, Throwable]): Source[T, ActorRef[T]] = + Source + .actorRefWithAck[T]( + Some(ackTo.toClassic), + ackMessage, + completionMatcher.asInstanceOf[PartialFunction[Any, CompletionStrategy]], + failureMatcher.asInstanceOf[PartialFunction[Any, Throwable]]) + .mapMaterializedValue(actorRefAdapter) + + /** + * Creates a `Source` that is materialized as an [[akka.actor.typed.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * and a new message will only be accepted after the previous messages has been consumed and acknowledged back. + * The stream will complete with failure if a message is sent before the acknowledgement has been replied back. + * + * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted + * [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received + * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, + * the failure will be signaled downstream immediately (instead of the completion signal). + * + * The actor will be stopped when the stream is completed, failed or canceled from downstream, + * i.e. you can watch it to get notified when that happens. + */ + @deprecated("Use actorRefWithBackpressure instead", "2.6.0") def actorRefWithAck[T, Ack]( ackTo: ActorRef[Ack], ackMessage: Ack, diff --git a/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java b/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java index 260770c677..dc41d50e92 100644 --- a/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java +++ b/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java @@ -45,7 +45,7 @@ public class ActorSourceSinkCompileTest { Source.queue(10, OverflowStrategy.dropBuffer()) .to( - ActorSink.actorRefWithAck( + ActorSink.actorRefWithBackpressure( ref, (sender, msg) -> new Init(), (sender) -> new Msg(), diff --git a/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java index b928f76af8..2e6d40397f 100644 --- a/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java +++ b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java @@ -4,18 +4,18 @@ package docs.akka.stream.typed; -// #actor-sink-ref-with-ack +// #actor-sink-ref-with-backpressure import akka.NotUsed; import akka.actor.typed.ActorRef; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.stream.typed.javadsl.ActorSink; -// #actor-sink-ref-with-ack +// #actor-sink-ref-with-backpressure public class ActorSinkWithAckExample { - // #actor-sink-ref-with-ack + // #actor-sink-ref-with-backpressure class Ack {} @@ -48,20 +48,20 @@ public class ActorSinkWithAckExample { this.ex = ex; } } - // #actor-sink-ref-with-ack + // #actor-sink-ref-with-backpressure final ActorMaterializer mat = null; { - // #actor-sink-ref-with-ack + // #actor-sink-ref-with-backpressure final ActorRef actor = null; final Sink sink = - ActorSink.actorRefWithAck( + ActorSink.actorRefWithBackpressure( actor, Message::new, Init::new, new Ack(), new Complete(), Fail::new); Source.single("msg1").runWith(sink, mat); - // #actor-sink-ref-with-ack + // #actor-sink-ref-with-backpressure } } diff --git a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala index 75ff37eeaf..736cc46039 100644 --- a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala +++ b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala @@ -65,7 +65,7 @@ class ActorSourceSinkSpec extends ScalaTestWithActorTestKit with WordSpecLike { val in = Source .queue[String](10, OverflowStrategy.dropBuffer) - .to(ActorSink.actorRefWithAck(pilotRef, Msg.apply, Init.apply, "ACK", Complete, _ => Failed)) + .to(ActorSink.actorRefWithBackpressure(pilotRef, Msg.apply, Init.apply, "ACK", Complete, _ => Failed)) .run() p.expectMessageType[Init] @@ -110,7 +110,7 @@ class ActorSourceSinkSpec extends ScalaTestWithActorTestKit with WordSpecLike { val p = TestProbe[String]() val (in, out) = ActorSource - .actorRefWithAck[String, String]( + .actorRefWithBackpressure[String, String]( p.ref, "ack", { case "complete" => CompletionStrategy.draining }, PartialFunction.empty) diff --git a/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala b/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala index c7c150556e..d7015a1499 100644 --- a/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala +++ b/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala @@ -69,7 +69,7 @@ object ActorSourceSinkExample { def targetActor(): ActorRef[Protocol] = ??? - // #actor-sink-ref-with-ack + // #actor-sink-ref-with-backpressure import akka.actor.typed.ActorRef import akka.stream.scaladsl.{ Sink, Source } import akka.stream.typed.scaladsl.ActorSink @@ -85,7 +85,7 @@ object ActorSourceSinkExample { val actor: ActorRef[Protocol] = targetActor() - val sink: Sink[String, NotUsed] = ActorSink.actorRefWithAck( + val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure( ref = actor, onCompleteMessage = Complete, onFailureMessage = Fail.apply, @@ -94,6 +94,6 @@ object ActorSourceSinkExample { ackMessage = Ack) Source.single("msg1").runWith(sink) - // #actor-sink-ref-with-ack + // #actor-sink-ref-with-backpressure } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala index 837188255b..84da6173e7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala @@ -25,7 +25,7 @@ import akka.stream.stage._ onFailureMessage: (Throwable) => Any) extends GraphStage[SinkShape[In]] { val in: Inlet[In] = Inlet[In]("ActorRefBackpressureSink.in") - override def initialAttributes = DefaultAttributes.actorRefWithAck + override def initialAttributes = DefaultAttributes.actorRefWithBackpressureSink override val shape: SinkShape[In] = SinkShape(in) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSource.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSource.scala index 2020f66173..398e928442 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSource.scala @@ -29,7 +29,7 @@ private object ActorRefBackpressureSource { val out: Outlet[T] = Outlet[T]("actorRefSource.out") override val shape: SourceShape[T] = SourceShape.of(out) - override def initialAttributes: Attributes = DefaultAttributes.actorRefWithAckSource + override def initialAttributes: Attributes = DefaultAttributes.actorRefWithBackpressureSource def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ActorRef) = throw new IllegalStateException("Not supported") diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index d9213d3c62..cf995e6202 100755 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -109,7 +109,7 @@ import akka.stream._ val subscriberSource = name("subscriberSource") val actorPublisherSource = name("actorPublisherSource") val actorRefSource = name("actorRefSource") - val actorRefWithAckSource = name("actorRefWithAckSource") + val actorRefWithBackpressureSource = name("actorRefWithBackpressureSource") val queueSource = name("queueSource") val inputStreamSource = name("inputStreamSource") and IODispatcher val outputStreamSource = name("outputStreamSource") and IODispatcher @@ -132,7 +132,7 @@ import akka.stream._ val fanoutPublisherSink = name("fanoutPublisherSink") val ignoreSink = name("ignoreSink") val actorRefSink = name("actorRefSink") - val actorRefWithAck = name("actorRefWithAckSink") + val actorRefWithBackpressureSink = name("actorRefWithBackpressureSink") val actorSubscriberSink = name("actorSubscriberSink") val queueSink = name("queueSink") val lazySink = name("lazySink") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 1b03eecc8c..25e01e1142 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -8,19 +8,13 @@ import java.util.Optional import java.util.concurrent.CompletionStage import java.util.function.BiFunction -import akka.Done -import akka.NotUsed -import akka.actor.ActorRef -import akka.actor.ClassicActorSystemProvider +import akka.actor.{ ActorRef, ClassicActorSystemProvider, Status } import akka.dispatch.ExecutionContexts -import akka.japi +import akka._ import akka.japi.function -import akka.stream._ import akka.stream.impl.LinearTraversalBuilder -import akka.stream.javadsl -import akka.stream.scaladsl -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber +import akka.stream.{ javadsl, scaladsl, _ } +import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable @@ -245,7 +239,7 @@ object Sink { * */ def actorRef[In](ref: ActorRef, onCompleteMessage: Any): Sink[In, NotUsed] = - new Sink(scaladsl.Sink.actorRef[In](ref, onCompleteMessage)) + new Sink(scaladsl.Sink.actorRef[In](ref, onCompleteMessage, (t: Throwable) => Status.Failure(t))) /** * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. @@ -260,6 +254,33 @@ object Sink { * When the stream is completed with failure - result of `onFailureMessage(throwable)` * message will be sent to the destination actor. */ + def actorRefWithBackpressure[In]( + ref: ActorRef, + onInitMessage: Any, + ackMessage: Any, + onCompleteMessage: Any, + onFailureMessage: function.Function[Throwable, Any]): Sink[In, NotUsed] = + new Sink( + scaladsl.Sink + .actorRefWithBackpressure[In](ref, onInitMessage, ackMessage, onCompleteMessage, t => onFailureMessage(t))) + + /** + * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. + * First element is always `onInitMessage`, then stream is waiting for acknowledgement message + * `ackMessage` from the given actor which means that it is ready to process + * elements. It also requires `ackMessage` message after each stream element + * to make backpressure work. + * + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure - result of `onFailureMessage(throwable)` + * message will be sent to the destination actor. + * + * @deprecated Use actorRefWithBackpressure instead + */ + @Deprecated + @deprecated("Use actorRefWithBackpressure instead", "2.6.0") def actorRefWithAck[In]( ref: ActorRef, onInitMessage: Any, @@ -267,7 +288,8 @@ object Sink { onCompleteMessage: Any, onFailureMessage: function.Function[Throwable, Any]): Sink[In, NotUsed] = new Sink( - scaladsl.Sink.actorRefWithAck[In](ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage.apply _)) + scaladsl.Sink + .actorRefWithBackpressure[In](ref, onInitMessage, ackMessage, onCompleteMessage, t => onFailureMessage(t))) /** * A graph with the shape of a sink logically is a sink, this method makes diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 656d234f5c..5cf82b6fce 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -6,33 +6,27 @@ package akka.stream.javadsl import java.util import java.util.Optional +import java.util.concurrent.{ CompletableFuture, CompletionStage } +import java.util.function.{ BiFunction, Supplier } -import akka.actor.{ ActorRef, Cancellable } +import akka.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider } import akka.event.LoggingAdapter -import akka.japi.{ function, Pair, Util } +import akka.japi.{ function, JavaPartialFunction, Pair, Util } import akka.stream._ import akka.stream.impl.LinearTraversalBuilder -import akka.util.{ ConstantFun, Timeout } import akka.util.JavaDurationConverters._ +import akka.util.ccompat.JavaConverters._ +import akka.util.{ unused, _ } import akka.{ Done, NotUsed } +import com.github.ghik.silencer.silent import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance -import akka.util.ccompat.JavaConverters._ - import scala.collection.immutable +import scala.compat.java8.FutureConverters._ +import scala.compat.java8.OptionConverters._ import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, Promise } -import scala.compat.java8.OptionConverters._ -import java.util.concurrent.CompletionStage -import java.util.concurrent.CompletableFuture -import java.util.function.{ BiFunction, Supplier } - -import akka.actor.ClassicActorSystemProvider -import akka.util.unused -import com.github.ghik.silencer.silent - -import scala.compat.java8.FutureConverters._ import scala.reflect.ClassTag /** Java API */ @@ -287,6 +281,64 @@ object Source { def asSubscriber[T](): Source[T, Subscriber[T]] = new Source(scaladsl.Source.asSubscriber) + /** + * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * otherwise they will be buffered until request for demand is received. + * + * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if + * there is no space available in the buffer. + * + * The strategy [[akka.stream.OverflowStrategy.backpressure]] is not supported, and an + * IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument. + * + * The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand + * from downstream. When `bufferSize` is 0 the `overflowStrategy` does not matter. An async boundary is added after + * this Source; as such, it is never safe to assume the downstream will always generate demand. + * + * The stream can be completed successfully by sending the actor reference a message that is matched by + * `completionMatcher` in which case already buffered elements will be signaled before signaling + * completion. + * + * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted + * [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received + * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, + * the failure will be signaled downstream immediately (instead of the completion signal). + * + * Note that terminating the actor without first completing it, either with a success or a + * failure, will prevent the actor triggering downstream completion and the stream will continue + * to run even though the source actor is dead. Therefore you should **not** attempt to + * manually terminate the actor such as with a [[akka.actor.PoisonPill]]. + * + * The actor will be stopped when the stream is completed, failed or canceled from downstream, + * i.e. you can watch it to get notified when that happens. + * + * See also [[akka.stream.scaladsl.Source.queue]]. + * + * @param completionMatcher catches the completion message to end the stream + * @param failureMatcher catches the failure message to fail the stream + * @param bufferSize The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def actorRef[T]( + completionMatcher: akka.japi.function.Function[Any, java.util.Optional[CompletionStrategy]], + failureMatcher: akka.japi.function.Function[Any, java.util.Optional[Throwable]], + bufferSize: Int, + overflowStrategy: OverflowStrategy): Source[T, ActorRef] = + new Source(scaladsl.Source.actorRef(new JavaPartialFunction[Any, CompletionStrategy] { + override def apply(x: Any, isCheck: Boolean): CompletionStrategy = { + val result = completionMatcher(x) + if (!result.isPresent) throw JavaPartialFunction.noMatch() + else result.get() + } + }, new JavaPartialFunction[Any, Throwable] { + override def apply(x: Any, isCheck: Boolean): Throwable = { + val result = failureMatcher(x) + if (!result.isPresent) throw JavaPartialFunction.noMatch() + else result.get() + } + }, bufferSize, overflowStrategy)) + /** * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. * Messages sent to this actor will be emitted to the stream if there is demand from downstream, @@ -330,8 +382,82 @@ object Source { * @param bufferSize The size of the buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ + @Deprecated + @deprecated("Use variant accepting completion and failure matchers", "2.6.0") def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = - new Source(scaladsl.Source.actorRef(bufferSize, overflowStrategy)) + new Source(scaladsl.Source.actorRef({ + case akka.actor.Status.Success(s: CompletionStrategy) => s + case akka.actor.Status.Success(_) => CompletionStrategy.Draining + case akka.actor.Status.Success => CompletionStrategy.Draining + }, { case akka.actor.Status.Failure(cause) => cause }, bufferSize, overflowStrategy)) + + /** + * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * and a new message will only be accepted after the previous messages has been consumed and acknowledged back. + * The stream will complete with failure if a message is sent before the acknowledgement has been replied back. + * + * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted + * [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received + * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, + * the failure will be signaled downstream immediately (instead of the completion signal). + * + * The actor will be stopped when the stream is completed, failed or canceled from downstream, + * i.e. you can watch it to get notified when that happens. + */ + def actorRefWithBackpressure[T]( + ackMessage: Any, + completionMatcher: akka.japi.function.Function[Any, java.util.Optional[CompletionStrategy]], + failureMatcher: akka.japi.function.Function[Any, java.util.Optional[Throwable]]): Source[T, ActorRef] = + new Source(scaladsl.Source.actorRefWithBackpressure(ackMessage, new JavaPartialFunction[Any, CompletionStrategy] { + override def apply(x: Any, isCheck: Boolean): CompletionStrategy = { + val result = completionMatcher(x) + if (!result.isPresent) throw JavaPartialFunction.noMatch() + else result.get() + } + }, new JavaPartialFunction[Any, Throwable] { + override def apply(x: Any, isCheck: Boolean): Throwable = { + val result = failureMatcher(x) + if (!result.isPresent) throw JavaPartialFunction.noMatch() + else result.get() + } + })) + + /** + * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * and a new message will only be accepted after the previous messages has been consumed and acknowledged back. + * The stream will complete with failure if a message is sent before the acknowledgement has been replied back. + * + * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted + * [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received + * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, + * the failure will be signaled downstream immediately (instead of the completion signal). + * + * The actor will be stopped when the stream is completed, failed or canceled from downstream, + * i.e. you can watch it to get notified when that happens. + * + * @deprecated Use actorRefWithBackpressure instead + */ + @Deprecated + @deprecated("Use actorRefWithBackpressure instead", "2.6.0") + def actorRefWithAck[T]( + ackMessage: Any, + completionMatcher: akka.japi.function.Function[Any, java.util.Optional[CompletionStrategy]], + failureMatcher: akka.japi.function.Function[Any, java.util.Optional[Throwable]]): Source[T, ActorRef] = + new Source(scaladsl.Source.actorRefWithBackpressure(ackMessage, new JavaPartialFunction[Any, CompletionStrategy] { + override def apply(x: Any, isCheck: Boolean): CompletionStrategy = { + val result = completionMatcher(x) + if (!result.isPresent) throw JavaPartialFunction.noMatch() + else result.get() + } + }, new JavaPartialFunction[Any, Throwable] { + override def apply(x: Any, isCheck: Boolean): Throwable = { + val result = failureMatcher(x) + if (!result.isPresent) throw JavaPartialFunction.noMatch() + else result.get() + } + })) /** * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. @@ -352,8 +478,14 @@ object Source { * The actor will be stopped when the stream is completed, failed or canceled from downstream, * i.e. you can watch it to get notified when that happens. */ + @Deprecated + @deprecated("Use actorRefWithBackpressure accepting completion and failure matchers", "2.6.0") def actorRefWithAck[T](ackMessage: Any): Source[T, ActorRef] = - new Source(scaladsl.Source.actorRefWithAck(ackMessage)) + new Source(scaladsl.Source.actorRefWithBackpressure(ackMessage, { + case akka.actor.Status.Success(s: CompletionStrategy) => s + case akka.actor.Status.Success(_) => CompletionStrategy.Draining + case akka.actor.Status.Success => CompletionStrategy.Draining + }, { case akka.actor.Status.Failure(cause) => cause })) /** * A graph with the shape of a source logically is a source, this method makes diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 130ecf1f2f..4fc075265b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -453,10 +453,7 @@ object Sink { * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate * limiting operator in front of this `Sink`. */ - @InternalApi private[akka] def actorRef[T]( - ref: ActorRef, - onCompleteMessage: Any, - onFailureMessage: Throwable => Any): Sink[T, NotUsed] = + def actorRef[T](ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable => Any): Sink[T, NotUsed] = fromGraph(new ActorRefSinkStage[T](ref, onCompleteMessage, onFailureMessage)) /** @@ -474,6 +471,7 @@ object Sink { * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate * limiting operator in front of this `Sink`. */ + @deprecated("Use variant accepting both on complete and on failure message", "2.6.0") def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed] = fromGraph(new ActorRefSinkStage[T](ref, onCompleteMessage, t => Status.Failure(t))) @@ -525,8 +523,29 @@ object Sink { * will be sent to the destination actor. * When the stream is completed with failure - result of `onFailureMessage(throwable)` * function will be sent to the destination actor. - * */ + def actorRefWithBackpressure[T]( + ref: ActorRef, + onInitMessage: Any, + ackMessage: Any, + onCompleteMessage: Any, + onFailureMessage: Throwable => Any): Sink[T, NotUsed] = + actorRefWithAck(ref, _ => identity, _ => onInitMessage, ackMessage, onCompleteMessage, onFailureMessage) + + /** + * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. + * First element is always `onInitMessage`, then stream is waiting for acknowledgement message + * `ackMessage` from the given actor which means that it is ready to process + * elements. It also requires `ackMessage` message after each stream element + * to make backpressure work. + * + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure - result of `onFailureMessage(throwable)` + * function will be sent to the destination actor. + */ + @deprecated("Use actorRefWithBackpressure accepting completion and failure matchers instead", "2.6.0") def actorRefWithAck[T]( ref: ActorRef, onInitMessage: Any, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index d6454254be..5ec4952810 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -482,8 +482,6 @@ object Source { fromGraph(new SubscriberSource[T](DefaultAttributes.subscriberSource, shape("SubscriberSource"))) /** - * INTERNAL API - * * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. * Messages sent to this actor will be emitted to the stream if there is demand from downstream, * otherwise they will be buffered until request for demand is received. @@ -517,10 +515,12 @@ object Source { * * See also [[akka.stream.scaladsl.Source.queue]]. * + * @param completionMatcher catches the completion message to end the stream + * @param failureMatcher catches the failure message to fail the stream * @param bufferSize The size of the buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ - @InternalApi private[akka] def actorRef[T]( + def actorRef[T]( completionMatcher: PartialFunction[Any, CompletionStrategy], failureMatcher: PartialFunction[Any, Throwable], bufferSize: Int, @@ -568,6 +568,7 @@ object Source { * @param bufferSize The size of the buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ + @deprecated("Use variant accepting completion and failure matchers instead", "2.6.0") def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = actorRef({ case akka.actor.Status.Success(s: CompletionStrategy) => s @@ -586,6 +587,27 @@ object Source { Source.fromGraph(new ActorRefBackpressureSource(ackTo, ackMessage, completionMatcher, failureMatcher)) } + /** + * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * and a new message will only be accepted after the previous messages has been consumed and acknowledged back. + * The stream will complete with failure if a message is sent before the acknowledgement has been replied back. + * + * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted + * [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received + * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, + * the failure will be signaled downstream immediately (instead of the completion signal). + * + * The actor will be stopped when the stream is completed, failed or canceled from downstream, + * i.e. you can watch it to get notified when that happens. + */ + def actorRefWithBackpressure[T]( + ackMessage: Any, + completionMatcher: PartialFunction[Any, CompletionStrategy], + failureMatcher: PartialFunction[Any, Throwable]): Source[T, ActorRef] = { + Source.fromGraph(new ActorRefBackpressureSource(None, ackMessage, completionMatcher, failureMatcher)) + } + /** * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. * Messages sent to this actor will be emitted to the stream if there is demand from downstream, @@ -605,6 +627,7 @@ object Source { * The actor will be stopped when the stream is completed, failed or canceled from downstream, * i.e. you can watch it to get notified when that happens. */ + @deprecated("Use actorRefWithBackpressure accepting completion and failure matchers instead", "2.6.0") def actorRefWithAck[T](ackMessage: Any): Source[T, ActorRef] = actorRefWithAck(None, ackMessage, { case akka.actor.Status.Success(s: CompletionStrategy) => s diff --git a/project/StreamOperatorsIndexGenerator.scala b/project/StreamOperatorsIndexGenerator.scala index 1c8968c17d..0a2962c325 100644 --- a/project/StreamOperatorsIndexGenerator.scala +++ b/project/StreamOperatorsIndexGenerator.scala @@ -72,7 +72,8 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { "alsoToGraph", "orElseGraph", "divertToGraph", - "zipWithGraph" + "zipWithGraph", + "actorRefWithAck" // deprecated ) // FIXME document these methods as well @@ -104,13 +105,16 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { "fromGraph", "actorSubscriber", "foldAsync", - "newOnCompleteStage" + "newOnCompleteStage", + "actorRefWithAck" // deprecated ), "ActorSink" -> Seq( - "actorRefWithAck" + "actorRefWithBackpressure", + "actorRefWithAck" // deprecated ), "ActorSource" -> Seq( - "actorRef" + "actorRef", + "actorRefWithAck" // deprecated ) ) @@ -119,7 +123,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { Set("productArity", "canEqual", "productPrefix", "copy", "productIterator", "productElement") ++ Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++ Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") ++ - Set("++", "onPush", "onPull") + Set("++", "onPush", "onPull", "actorRefWithAck") def isPending(element: String, opName: String) = pendingTestCases.get(element).exists(_.contains(opName))