Deprecates status message based api #27503 (#27519)

* Deprecates status message based api #27503
* Deprecates actorRefWithAck for actorRefWithBackpressure
This commit is contained in:
Nicolas Vollmar 2019-09-10 11:59:19 +02:00 committed by Patrik Nordwall
parent aee0152da2
commit 751918e84c
34 changed files with 618 additions and 195 deletions

View file

@ -81,7 +81,7 @@ class SendQueueBenchmark {
val N = 100000 val N = 100000
val burstSize = 1000 val burstSize = 1000
val source = Source.actorRef(1024, OverflowStrategy.dropBuffer) val source = Source.actorRef(PartialFunction.empty, PartialFunction.empty, 1024, OverflowStrategy.dropBuffer)
val (ref, killSwitch) = source val (ref, killSwitch) = source
.viaMat(KillSwitches.single)(Keep.both) .viaMat(KillSwitches.single)(Keep.both)

View file

@ -1,4 +1,4 @@
# actorRefWithAck # actorRefWithBackpressure
Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source. Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.
@ -7,7 +7,7 @@ Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`]; sending messages to it
@@@ div { .group-scala } @@@ div { .group-scala }
## Signature ## Signature
@@signature [Source.scala](/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala) { #actorRefWithAck } @@signature [Source.scala](/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala) { #actorRefWithBackpressure }
@@@ @@@
## Description ## Description

View file

@ -1,4 +1,4 @@
# Sink.actorRefWithAck # Sink.actorRefWithBackpressure
Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink. Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink.
@ -14,18 +14,18 @@ to provide back pressure onto the sink.
Actor to be interacted with: Actor to be interacted with:
Scala Scala
: @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithAck-actor } : @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithBackpressure-actor }
Java Java
: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithAck-actor } : @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithBackpressure-actor }
Using the `actorRefWithAck` operator with the above actor: Using the `actorRefWithBackpressure` operator with the above actor:
Scala Scala
: @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithAck } : @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithBackpressure }
Java Java
: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithAck } : @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithBackpressure }
## Reactive Streams semantics ## Reactive Streams semantics

View file

@ -1,4 +1,4 @@
# actorRefWithAck # actorRefWithBackpressure
Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source. Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.
@ -7,7 +7,7 @@ Materialize an `ActorRef`; sending messages to it will emit them on the stream.
@@@ div { .group-scala } @@@ div { .group-scala }
## Signature ## Signature
@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #actorRefWithAck } @@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #actorRefWithBackpressure }
@@@ @@@
## Description ## Description
@ -27,7 +27,7 @@ once the element could be emitted allowing for backpressure from the source. Sen
Scala Scala
: @@snip [actorRef.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #actorRefWithAck } : @@snip [actorRef.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #actorRefWithBackpressure }
Java Java
: @@snip [actorRef.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #actor-ref-imports #actor-ref-with-ack } : @@snip [actorRef.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #actor-ref-imports #actorRefWithBackpressure }

View file

@ -8,8 +8,8 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
| |Operator|Description| | |Operator|Description|
|--|--|--| |--|--|--|
|Source|<a name="actorref"></a>@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream.| |Source|<a name="actorref"></a>@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream.|
|Source|<a name="actorrefwithack"></a>@ref[actorRefWithAck](Source/actorRefWithAck.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.| |Source|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](Source/actorRefWithBackpressure.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.|
|ActorSource|<a name="actorrefwithack"></a>@ref[actorRefWithAck](ActorSource/actorRefWithAck.md)|Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.| |ActorSource|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md)|Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.|
|Source|<a name="assourcewithcontext"></a>@ref[asSourceWithContext](Source/asSourceWithContext.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.| |Source|<a name="assourcewithcontext"></a>@ref[asSourceWithContext](Source/asSourceWithContext.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.|
|Source|<a name="assubscriber"></a>@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.| |Source|<a name="assubscriber"></a>@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.|
|Source|<a name="combine"></a>@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.| |Source|<a name="combine"></a>@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.|
@ -46,7 +46,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav
| |Operator|Description| | |Operator|Description|
|--|--|--| |--|--|--|
|Sink|<a name="actorref"></a>@ref[actorRef](Sink/actorRef.md)|Send the elements from the stream to an `ActorRef`.| |Sink|<a name="actorref"></a>@ref[actorRef](Sink/actorRef.md)|Send the elements from the stream to an `ActorRef`.|
|Sink|<a name="actorrefwithack"></a>@ref[actorRefWithAck](Sink/actorRefWithAck.md)|Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink.| |Sink|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](Sink/actorRefWithBackpressure.md)|Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink.|
|Sink|<a name="aspublisher"></a>@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.| |Sink|<a name="aspublisher"></a>@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.|
|Sink|<a name="cancelled"></a>@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream| |Sink|<a name="cancelled"></a>@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream|
|Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy| |Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
@ -335,7 +335,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [lazilyAsync](Source/lazilyAsync.md) * [lazilyAsync](Source/lazilyAsync.md)
* [asSubscriber](Source/asSubscriber.md) * [asSubscriber](Source/asSubscriber.md)
* [actorRef](Source/actorRef.md) * [actorRef](Source/actorRef.md)
* [actorRefWithAck](Source/actorRefWithAck.md) * [actorRefWithBackpressure](Source/actorRefWithBackpressure.md)
* [zipN](Source/zipN.md) * [zipN](Source/zipN.md)
* [zipWithN](Source/zipWithN.md) * [zipWithN](Source/zipWithN.md)
* [queue](Source/queue.md) * [queue](Source/queue.md)
@ -444,7 +444,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [reduce](Sink/reduce.md) * [reduce](Sink/reduce.md)
* [onComplete](Sink/onComplete.md) * [onComplete](Sink/onComplete.md)
* [actorRef](Sink/actorRef.md) * [actorRef](Sink/actorRef.md)
* [actorRefWithAck](Sink/actorRefWithAck.md) * [actorRefWithBackpressure](Sink/actorRefWithBackpressure.md)
* [queue](Sink/queue.md) * [queue](Sink/queue.md)
* [lazyInitAsync](Sink/lazyInitAsync.md) * [lazyInitAsync](Sink/lazyInitAsync.md)
* [fromInputStream](StreamConverters/fromInputStream.md) * [fromInputStream](StreamConverters/fromInputStream.md)
@ -464,7 +464,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [withBackoff](RestartFlow/withBackoff.md) * [withBackoff](RestartFlow/withBackoff.md)
* [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md) * [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
* [withBackoff](RestartSink/withBackoff.md) * [withBackoff](RestartSink/withBackoff.md)
* [actorRefWithAck](ActorSource/actorRefWithAck.md) * [actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md)
* [ask](ActorFlow/ask.md) * [ask](ActorFlow/ask.md)
* [actorRef](ActorSink/actorRef.md) * [actorRef](ActorSink/actorRef.md)
* [Partition](Partition.md) * [Partition](Partition.md)

View file

@ -13,7 +13,7 @@ To use Akka Streams, add the module to your project:
## Integrating with Actors ## Integrating with Actors
For piping the elements of a stream as messages to an ordinary actor you can use For piping the elements of a stream as messages to an ordinary actor you can use
`ask` in a `mapAsync` or use `Sink.actorRefWithAck`. `ask` in a `mapAsync` or use `Sink.actorRefWithBackpressure`.
Messages can be sent to a stream with `Source.queue` or via the `ActorRef` that is Messages can be sent to a stream with `Source.queue` or via the `ActorRef` that is
materialized by `Source.actorRef`. materialized by `Source.actorRef`.
@ -69,10 +69,10 @@ If you are intending to ask multiple actors by using @ref:[Actor routers](../rou
you should use `mapAsyncUnordered` and perform the ask manually in there, as the ordering of the replies is not important, you should use `mapAsyncUnordered` and perform the ask manually in there, as the ordering of the replies is not important,
since multiple actors are being asked concurrently to begin with, and no single actor is the one to be watched by the operator. since multiple actors are being asked concurrently to begin with, and no single actor is the one to be watched by the operator.
### Sink.actorRefWithAck ### Sink.actorRefWithBackpressure
@@@ note @@@ note
See also: @ref[Sink.actorRefWithAck operator reference docs](operators/Sink/actorRefWithAck.md) See also: @ref[Sink.actorRefWithBackpressure operator reference docs](operators/Sink/actorRefWithBackpressure.md)
@@@ @@@
The sink sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. The sink sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
@ -85,18 +85,18 @@ given `onCompleteMessage` will be sent to the destination actor. When the stream
failure a `akka.actor.Status.Failure` message will be sent to the destination actor. failure a `akka.actor.Status.Failure` message will be sent to the destination actor.
Scala Scala
: @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithAck } : @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithBackpressure }
Java Java
: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithAck } : @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithBackpressure }
The receiving actor would then need to be implemented similar to the following: The receiving actor would then need to be implemented similar to the following:
Scala Scala
: @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithAck-actor } : @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithBackpressure-actor }
Java Java
: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithAck-actor } : @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithBackpressure-actor }
Note that replying to the sender of the elements (the "stream") is required as lack of those ack signals would be interpreted Note that replying to the sender of the elements (the "stream") is required as lack of those ack signals would be interpreted
as back-pressure (as intended), and no new elements will be sent into the actor until it acknowledges some elements. as back-pressure (as intended), and no new elements will be sent into the actor until it acknowledges some elements.
@ -111,7 +111,7 @@ Using `Sink.actorRef` or ordinary `tell` from a `map` or `foreach` operator mean
no back-pressure signal from the destination actor, i.e. if the actor is not consuming the messages no back-pressure signal from the destination actor, i.e. if the actor is not consuming the messages
fast enough the mailbox of the actor will grow, unless you use a bounded mailbox with zero fast enough the mailbox of the actor will grow, unless you use a bounded mailbox with zero
*mailbox-push-timeout-time* or use a rate limiting operator in front. It's often better to *mailbox-push-timeout-time* or use a rate limiting operator in front. It's often better to
use `Sink.actorRefWithAck` or `ask` in `mapAsync`, though. use `Sink.actorRefWithBackpressure` or `ask` in `mapAsync`, though.
@@@ @@@

View file

@ -40,10 +40,10 @@ Scala
Java Java
: @@snip [ActorSinkExample.java](/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkExample.java) { #actor-sink-ref } : @@snip [ActorSinkExample.java](/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkExample.java) { #actor-sink-ref }
For an actor to be able to react to backpressure, a protocol needs to be introduced between the actor and the stream. Use @scala[@scaladoc[`ActorSink.actorRefWithAck`](akka.stream.typed.scaladsl.ActorSink#actorRefWithAck)]@java[@javadoc[`ActorSink.actorRefWithAck`](akka.stream.typed.javadsl.ActorSink#actorRefWithAck)] to be able to signal demand when the actor is ready to receive more elements. For an actor to be able to react to backpressure, a protocol needs to be introduced between the actor and the stream. Use @scala[@scaladoc[`ActorSink.actorRefWithBackpressure`](akka.stream.typed.scaladsl.ActorSink#actorRefWithBackpressure)]@java[@javadoc[`ActorSink.actorRefWithBackpressure`](akka.stream.typed.javadsl.ActorSink#actorRefWithBackpressure)] to be able to signal demand when the actor is ready to receive more elements.
Scala Scala
: @@snip [ActorSourceSinkExample.scala](/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-sink-ref-with-ack } : @@snip [ActorSourceSinkExample.scala](/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-sink-ref-with-backpressure }
Java Java
: @@snip [ActorSinkWithAckExample.java](/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java) { #actor-sink-ref-with-ack } : @@snip [ActorSinkWithAckExample.java](/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java) { #actor-sink-ref-with-backpressure }

View file

@ -320,7 +320,7 @@ public class IntegrationDocTest extends AbstractJavaTest {
} }
// #ask-actor // #ask-actor
// #actorRefWithAck-actor // #actorRefWithBackpressure-actor
enum Ack { enum Ack {
INSTANCE; INSTANCE;
} }
@ -381,7 +381,7 @@ public class IntegrationDocTest extends AbstractJavaTest {
.build(); .build();
} }
} }
// #actorRefWithAck-actor // #actorRefWithBackpressure-actor
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
@ -399,8 +399,8 @@ public class IntegrationDocTest extends AbstractJavaTest {
} }
@Test @Test
public void actorRefWithAckExample() throws Exception { public void actorRefWithBackpressure() throws Exception {
// #actorRefWithAck // #actorRefWithBackpressure
Source<String, NotUsed> words = Source.from(Arrays.asList("hello", "hi")); Source<String, NotUsed> words = Source.from(Arrays.asList("hello", "hi"));
final TestKit probe = new TestKit(system); final TestKit probe = new TestKit(system);
@ -408,7 +408,7 @@ public class IntegrationDocTest extends AbstractJavaTest {
ActorRef receiver = system.actorOf(Props.create(AckingReceiver.class, probe.getRef())); ActorRef receiver = system.actorOf(Props.create(AckingReceiver.class, probe.getRef()));
Sink<String, NotUsed> sink = Sink<String, NotUsed> sink =
Sink.<String>actorRefWithAck( Sink.<String>actorRefWithBackpressure(
receiver, receiver,
new StreamInitialized(), new StreamInitialized(),
Ack.INSTANCE, Ack.INSTANCE,
@ -421,7 +421,7 @@ public class IntegrationDocTest extends AbstractJavaTest {
probe.expectMsg("hello"); probe.expectMsg("hello");
probe.expectMsg("hi"); probe.expectMsg("hi");
probe.expectMsg("Stream completed"); probe.expectMsg("Stream completed");
// #actorRefWithAck // #actorRefWithBackpressure
} }
@Test @Test

View file

@ -23,6 +23,7 @@ import akka.testkit.TestProbe;
// #actor-ref-imports // #actor-ref-imports
import java.util.Arrays; import java.util.Arrays;
import java.util.Optional;
// #imports // #imports
@ -86,13 +87,20 @@ public class SourceDocExamples {
// #actor-ref // #actor-ref
} }
static void actorRefWithAck() { static void actorRefWithBackpressure() {
final TestProbe probe = null; final TestProbe probe = null;
// #actor-ref-with-ack // #actorRefWithBackpressure
final ActorSystem system = ActorSystem.create(); final ActorSystem system = ActorSystem.create();
Source<Object, ActorRef> source = Source.actorRefWithAck("ack"); Source<Object, ActorRef> source =
Source.actorRefWithBackpressure(
"ack",
o -> {
if (o == "complete") return Optional.of(CompletionStrategy.draining());
else return Optional.empty();
},
o -> Optional.empty());
ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(system); ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(system);
probe.send(actorRef, "hello"); probe.send(actorRef, "hello");
@ -101,7 +109,7 @@ public class SourceDocExamples {
probe.expectMsg("ack"); probe.expectMsg("ack");
// The stream completes successfully with the following message // The stream completes successfully with the following message
actorRef.tell(new Success(CompletionStrategy.draining()), ActorRef.noSender()); actorRef.tell("complete", ActorRef.noSender());
// #actor-ref-with-ack // #actorRefWithBackpressure
} }
} }

View file

@ -187,8 +187,8 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
probe.expectMsg("akkateam@somewhere.com") probe.expectMsg("akkateam@somewhere.com")
} }
"actorRefWithAck" in { "actorRefWithBackpressure" in {
//#actorRefWithAck //#actorRefWithBackpressure
val words: Source[String, NotUsed] = val words: Source[String, NotUsed] =
Source(List("hello", "hi")) Source(List("hello", "hi"))
@ -202,7 +202,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val probe = TestProbe() val probe = TestProbe()
val receiver = system.actorOf(Props(new AckingReceiver(probe.ref, ackWith = AckMessage))) val receiver = system.actorOf(Props(new AckingReceiver(probe.ref, ackWith = AckMessage)))
val sink = Sink.actorRefWithAck( val sink = Sink.actorRefWithBackpressure(
receiver, receiver,
onInitMessage = InitMessage, onInitMessage = InitMessage,
ackMessage = AckMessage, ackMessage = AckMessage,
@ -215,10 +215,10 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
probe.expectMsg("hello") probe.expectMsg("hello")
probe.expectMsg("hi") probe.expectMsg("hi")
probe.expectMsg("Stream completed!") probe.expectMsg("Stream completed!")
//#actorRefWithAck //#actorRefWithBackpressure
} }
//#actorRefWithAck-actor //#actorRefWithBackpressure-actor
object AckingReceiver { object AckingReceiver {
case object Ack case object Ack
@ -248,7 +248,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
log.error(ex, "Stream failed!") log.error(ex, "Stream failed!")
} }
} }
//#actorRefWithAck-actor //#actorRefWithBackpressure-actor
"lookup email with mapAsync and supervision" in { "lookup email with mapAsync and supervision" in {
val addressSystem = new AddressSystem2 val addressSystem = new AddressSystem2

View file

@ -50,8 +50,8 @@ object SourceOperators {
//#actorRef //#actorRef
} }
def actorRefWithAck(): Unit = { def actorRefWithBackpressure(): Unit = {
//#actorRefWithAck //#actorRefWithBackpressure
import akka.actor.Status.Success import akka.actor.Status.Success
import akka.actor.ActorRef import akka.actor.ActorRef
@ -61,7 +61,9 @@ object SourceOperators {
implicit val system: ActorSystem = ActorSystem() implicit val system: ActorSystem = ActorSystem()
val probe = TestProbe() val probe = TestProbe()
val source: Source[Any, ActorRef] = Source.actorRefWithAck[Any]("ack") val source: Source[Any, ActorRef] = Source.actorRefWithBackpressure[Any]("ack", {
case _: Success => CompletionStrategy.immediately
}, PartialFunction.empty)
val actorRef: ActorRef = source.to(Sink.foreach(println)).run() val actorRef: ActorRef = source.to(Sink.foreach(println)).run()
probe.send(actorRef, "hello") probe.send(actorRef, "hello")
@ -70,7 +72,7 @@ object SourceOperators {
probe.expectMsg("ack") probe.expectMsg("ack")
// The stream completes successfully with the following message // The stream completes successfully with the following message
actorRef ! Success(CompletionStrategy.immediately) actorRef ! Success(())
//#actorRefWithAck //#actorRefWithBackpressure
} }
} }

View file

@ -142,8 +142,10 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "collection", _ => true, _ => true), Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "collection", _ => true, _ => true),
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRef", _ => true, _ => true), // Internal in scaladsl Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRef", _ => true, _ => true), // Internal in scaladsl
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRefWithAck", _ => true, _ => true), // Internal in scaladsl Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRefWithAck", _ => true, _ => true), // Internal in scaladsl
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRefWithBackpressure", _ => true, _ => true), // Internal in scaladsl
Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "actorRef", _ => true, _ => true), // Internal in scaladsl Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "actorRef", _ => true, _ => true), // Internal in scaladsl
Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "actorRefWithAck", _ => true, _ => true), // Internal in scaladsl Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "actorRefWithAck", _ => true, _ => true), // Internal in scaladsl
Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "actorRefWithBackpressure", _ => true, _ => true), // Internal in scaladsl
Ignore(_ == akka.stream.scaladsl.BidiFlow.getClass, _ == "apply", _ == 24, _ => true), Ignore(_ == akka.stream.scaladsl.BidiFlow.getClass, _ == "apply", _ == 24, _ => true),
Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "runnable", _ == 24, _ => true), Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "runnable", _ == 24, _ => true),
Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "create", _ == 24, _ => true), Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "create", _ == 24, _ => true),

View file

@ -4,10 +4,7 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.actor.Actor import akka.actor.{ Actor, ActorRef, Props }
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.Status
import akka.stream.Attributes.inputBuffer import akka.stream.Attributes.inputBuffer
import akka.stream.Materializer import akka.stream.Materializer
import akka.stream.testkit._ import akka.stream.testkit._
@ -21,6 +18,7 @@ import scala.concurrent.duration._
object ActorRefBackpressureSinkSpec { object ActorRefBackpressureSinkSpec {
val initMessage = "start" val initMessage = "start"
val completeMessage = "done" val completeMessage = "done"
val failMessage = "failed"
val ackMessage = "ack" val ackMessage = "ack"
class Fw(ref: ActorRef) extends Actor { class Fw(ref: ActorRef) extends Actor {
@ -62,7 +60,8 @@ class ActorRefBackpressureSinkSpec extends StreamSpec {
"send the elements to the ActorRef" in assertAllStagesStopped { "send the elements to the ActorRef" in assertAllStagesStopped {
val fw = createActor(classOf[Fw]) val fw = createActor(classOf[Fw])
Source(List(1, 2, 3)).runWith(Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage)) Source(List(1, 2, 3))
.runWith(Sink.actorRefWithBackpressure(fw, initMessage, ackMessage, completeMessage, _ => failMessage))
expectMsg("start") expectMsg("start")
expectMsg(1) expectMsg(1)
expectMsg(2) expectMsg(2)
@ -72,7 +71,10 @@ class ActorRefBackpressureSinkSpec extends StreamSpec {
"send the elements to the ActorRef2" in assertAllStagesStopped { "send the elements to the ActorRef2" in assertAllStagesStopped {
val fw = createActor(classOf[Fw]) val fw = createActor(classOf[Fw])
val probe = TestSource.probe[Int].to(Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage)).run() val probe = TestSource
.probe[Int]
.to(Sink.actorRefWithBackpressure(fw, initMessage, ackMessage, completeMessage, _ => failMessage))
.run()
probe.sendNext(1) probe.sendNext(1)
expectMsg("start") expectMsg("start")
expectMsg(1) expectMsg(1)
@ -87,7 +89,11 @@ class ActorRefBackpressureSinkSpec extends StreamSpec {
"cancel stream when actor terminates" in assertAllStagesStopped { "cancel stream when actor terminates" in assertAllStagesStopped {
val fw = createActor(classOf[Fw]) val fw = createActor(classOf[Fw])
val publisher = val publisher =
TestSource.probe[Int].to(Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage)).run().sendNext(1) TestSource
.probe[Int]
.to(Sink.actorRefWithBackpressure(fw, initMessage, ackMessage, completeMessage, _ => failMessage))
.run()
.sendNext(1)
expectMsg(initMessage) expectMsg(initMessage)
expectMsg(1) expectMsg(1)
system.stop(fw) system.stop(fw)
@ -96,7 +102,10 @@ class ActorRefBackpressureSinkSpec extends StreamSpec {
"send message only when backpressure received" in assertAllStagesStopped { "send message only when backpressure received" in assertAllStagesStopped {
val fw = createActor(classOf[Fw2]) val fw = createActor(classOf[Fw2])
val publisher = TestSource.probe[Int].to(Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage)).run() val publisher = TestSource
.probe[Int]
.to(Sink.actorRefWithBackpressure(fw, initMessage, ackMessage, completeMessage, _ => failMessage))
.run()
expectMsg(initMessage) expectMsg(initMessage)
publisher.sendNext(1) publisher.sendNext(1)
@ -120,7 +129,7 @@ class ActorRefBackpressureSinkSpec extends StreamSpec {
val streamElementCount = bufferSize + 4 val streamElementCount = bufferSize + 4
val fw = createActor(classOf[Fw2]) val fw = createActor(classOf[Fw2])
val sink = Sink val sink = Sink
.actorRefWithAck(fw, initMessage, ackMessage, completeMessage) .actorRefWithBackpressure(fw, initMessage, ackMessage, completeMessage, _ => failMessage)
.withAttributes(inputBuffer(bufferSize, bufferSize)) .withAttributes(inputBuffer(bufferSize, bufferSize))
val bufferFullProbe = Promise[akka.Done.type] val bufferFullProbe = Promise[akka.Done.type]
Source(1 to streamElementCount) Source(1 to streamElementCount)
@ -142,7 +151,10 @@ class ActorRefBackpressureSinkSpec extends StreamSpec {
val publisher = val publisher =
TestSource TestSource
.probe[Int] .probe[Int]
.to(Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage).withAttributes(inputBuffer(1, 1))) .to(
Sink
.actorRefWithBackpressure(fw, initMessage, ackMessage, completeMessage, _ => failMessage)
.withAttributes(inputBuffer(1, 1)))
.run() .run()
expectMsg(initMessage) expectMsg(initMessage)
@ -166,7 +178,9 @@ class ActorRefBackpressureSinkSpec extends StreamSpec {
val fw = createActor(classOf[Fw]) val fw = createActor(classOf[Fw])
an[IllegalArgumentException] shouldBe thrownBy { an[IllegalArgumentException] shouldBe thrownBy {
val badSink = val badSink =
Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage).withAttributes(inputBuffer(0, 0)) Sink
.actorRefWithBackpressure(fw, initMessage, ackMessage, completeMessage, _ => failMessage)
.withAttributes(inputBuffer(0, 0))
Source.single(()).runWith(badSink) Source.single(()).runWith(badSink)
} }
} }
@ -176,14 +190,19 @@ class ActorRefBackpressureSinkSpec extends StreamSpec {
val probe = TestProbe() val probe = TestProbe()
val sink = Sink val sink = Sink
.actorRefWithAck[String](probe.ref, initMessage, ackMessage, completeMessage) .actorRefWithBackpressure[String](
probe.ref,
initMessage,
ackMessage,
completeMessage,
(_: Throwable) => failMessage)
.withAttributes(inputBuffer(1, 1)) .withAttributes(inputBuffer(1, 1))
Source.maybe[String].to(sink).run()(mat) Source.maybe[String].to(sink).run()(mat)
probe.expectMsg(initMessage) probe.expectMsg(initMessage)
mat.shutdown() mat.shutdown()
probe.expectMsgType[Status.Failure] probe.expectMsg(failMessage)
} }
} }

View file

@ -8,6 +8,7 @@ import akka.actor.Status
import akka.stream.testkit.Utils.TE import akka.stream.testkit.Utils.TE
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.stream.CompletionStrategy
import akka.stream.testkit.StreamSpec import akka.stream.testkit.StreamSpec
import akka.testkit.TestProbe import akka.testkit.TestProbe
@ -20,11 +21,16 @@ private object ActorRefBackpressureSourceSpec {
class ActorRefBackpressureSourceSpec extends StreamSpec { class ActorRefBackpressureSourceSpec extends StreamSpec {
import ActorRefBackpressureSourceSpec._ import ActorRefBackpressureSourceSpec._
"An Source.actorRefWithAck" must { "An Source.actorRefWithBackpressure" must {
"emit received messages to the stream and ack" in assertAllStagesStopped { "emit received messages to the stream and ack" in assertAllStagesStopped {
val probe = TestProbe() val probe = TestProbe()
val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run() val (ref, s) = Source
.actorRefWithBackpressure[Int](
AckMsg, { case "ok" => CompletionStrategy.draining }: PartialFunction[Any, CompletionStrategy],
PartialFunction.empty)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
val sub = s.expectSubscription() val sub = s.expectSubscription()
sub.request(10) sub.request(10)
@ -38,12 +44,15 @@ class ActorRefBackpressureSourceSpec extends StreamSpec {
s.expectNoMessage(50.millis) s.expectNoMessage(50.millis)
ref ! Status.Success("ok") ref ! "ok"
s.expectComplete() s.expectComplete()
} }
"fail when consumer does not await ack" in assertAllStagesStopped { "fail when consumer does not await ack" in assertAllStagesStopped {
val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run() val (ref, s) = Source
.actorRefWithBackpressure[Int](AckMsg, PartialFunction.empty, PartialFunction.empty)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
val sub = s.expectSubscription() val sub = s.expectSubscription()
for (n <- 1 to 20) ref ! n for (n <- 1 to 20) ref ! n
@ -65,7 +74,12 @@ class ActorRefBackpressureSourceSpec extends StreamSpec {
"complete after receiving Status.Success" in assertAllStagesStopped { "complete after receiving Status.Success" in assertAllStagesStopped {
val probe = TestProbe() val probe = TestProbe()
val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run() val (ref, s) = Source
.actorRefWithBackpressure[Int](
AckMsg, { case "ok" => CompletionStrategy.draining }: PartialFunction[Any, CompletionStrategy],
PartialFunction.empty)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
val sub = s.expectSubscription() val sub = s.expectSubscription()
sub.request(10) sub.request(10)
@ -73,14 +87,19 @@ class ActorRefBackpressureSourceSpec extends StreamSpec {
s.expectNext(1) s.expectNext(1)
probe.expectMsg(AckMsg) probe.expectMsg(AckMsg)
ref ! Status.Success("ok") ref ! "ok"
s.expectComplete() s.expectComplete()
} }
"fail after receiving Status.Failure" in assertAllStagesStopped { "fail after receiving Status.Failure" in assertAllStagesStopped {
val probe = TestProbe() val probe = TestProbe()
val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run() val (ref, s) = Source
.actorRefWithBackpressure[Int](
AckMsg,
PartialFunction.empty, { case Status.Failure(f) => f }: PartialFunction[Any, Throwable])
.toMat(TestSink.probe[Int])(Keep.both)
.run()
val sub = s.expectSubscription() val sub = s.expectSubscription()
sub.request(10) sub.request(10)
@ -95,7 +114,12 @@ class ActorRefBackpressureSourceSpec extends StreamSpec {
"not buffer elements after receiving Status.Success" in assertAllStagesStopped { "not buffer elements after receiving Status.Success" in assertAllStagesStopped {
val probe = TestProbe() val probe = TestProbe()
val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run() val (ref, s) = Source
.actorRefWithBackpressure[Int](
AckMsg, { case "ok" => CompletionStrategy.draining }: PartialFunction[Any, CompletionStrategy],
PartialFunction.empty)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
val sub = s.expectSubscription() val sub = s.expectSubscription()
sub.request(10) sub.request(10)
@ -107,7 +131,7 @@ class ActorRefBackpressureSourceSpec extends StreamSpec {
s.expectNext(2) s.expectNext(2)
probe.expectMsg(AckMsg) probe.expectMsg(AckMsg)
ref ! Status.Success("ok") ref ! "ok"
probe.send(ref, 100) probe.send(ref, 100)
probe.send(ref, 100) probe.send(ref, 100)

View file

@ -28,7 +28,7 @@ class ActorRefSinkSpec extends StreamSpec {
"A ActorRefSink" must { "A ActorRefSink" must {
"send the elements to the ActorRef" in assertAllStagesStopped { "send the elements to the ActorRef" in assertAllStagesStopped {
Source(List(1, 2, 3)).runWith(Sink.actorRef(testActor, onCompleteMessage = "done")) Source(List(1, 2, 3)).runWith(Sink.actorRef(testActor, onCompleteMessage = "done", _ => "failure"))
expectMsg(1) expectMsg(1)
expectMsg(2) expectMsg(2)
expectMsg(3) expectMsg(3)
@ -38,7 +38,12 @@ class ActorRefSinkSpec extends StreamSpec {
"cancel stream when actor terminates" in assertAllStagesStopped { "cancel stream when actor terminates" in assertAllStagesStopped {
val fw = system.actorOf(Props(classOf[Fw], testActor).withDispatcher("akka.test.stream-dispatcher")) val fw = system.actorOf(Props(classOf[Fw], testActor).withDispatcher("akka.test.stream-dispatcher"))
val publisher = val publisher =
TestSource.probe[Int].to(Sink.actorRef(fw, onCompleteMessage = "done")).run().sendNext(1).sendNext(2) TestSource
.probe[Int]
.to(Sink.actorRef(fw, onCompleteMessage = "done", _ => "failure"))
.run()
.sendNext(1)
.sendNext(2)
expectMsg(1) expectMsg(1)
expectMsg(2) expectMsg(2)
system.stop(fw) system.stop(fw)

View file

@ -5,10 +5,8 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.Done import akka.Done
import akka.actor.ActorRef import akka.actor.{ ActorRef, PoisonPill, Status }
import akka.actor.PoisonPill import akka.stream.{ OverflowStrategy, _ }
import akka.actor.Status
import akka.stream._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.StreamTestKit._
@ -23,7 +21,10 @@ class ActorRefSourceSpec extends StreamSpec {
"emit received messages to the stream" in { "emit received messages to the stream" in {
val s = TestSubscriber.manualProbe[Int]() val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() val ref = Source
.actorRef(PartialFunction.empty, PartialFunction.empty, 10, OverflowStrategy.fail)
.to(Sink.fromSubscriber(s))
.run()
val sub = s.expectSubscription() val sub = s.expectSubscription()
sub.request(2) sub.request(2)
ref ! 1 ref ! 1
@ -36,7 +37,10 @@ class ActorRefSourceSpec extends StreamSpec {
"buffer when needed" in { "buffer when needed" in {
val s = TestSubscriber.manualProbe[Int]() val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(100, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run() val ref = Source
.actorRef(PartialFunction.empty, PartialFunction.empty, 100, OverflowStrategy.dropHead)
.to(Sink.fromSubscriber(s))
.run()
val sub = s.expectSubscription() val sub = s.expectSubscription()
for (n <- 1 to 20) ref ! n for (n <- 1 to 20) ref ! n
sub.request(10) sub.request(10)
@ -50,7 +54,10 @@ class ActorRefSourceSpec extends StreamSpec {
} }
"drop new when full and with dropNew strategy" in { "drop new when full and with dropNew strategy" in {
val (ref, sub) = Source.actorRef(100, OverflowStrategy.dropNew).toMat(TestSink.probe[Int])(Keep.both).run() val (ref, sub) = Source
.actorRef(PartialFunction.empty, PartialFunction.empty, 100, OverflowStrategy.dropNew)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
for (n <- 1 to 20) ref ! n for (n <- 1 to 20) ref ! n
sub.request(10) sub.request(10)
@ -65,7 +72,10 @@ class ActorRefSourceSpec extends StreamSpec {
"terminate when the stream is cancelled" in assertAllStagesStopped { "terminate when the stream is cancelled" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]() val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(0, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() val ref = Source
.actorRef(PartialFunction.empty, PartialFunction.empty, 0, OverflowStrategy.fail)
.to(Sink.fromSubscriber(s))
.run()
watch(ref) watch(ref)
val sub = s.expectSubscription() val sub = s.expectSubscription()
sub.cancel() sub.cancel()
@ -74,7 +84,10 @@ class ActorRefSourceSpec extends StreamSpec {
"not fail when 0 buffer space and demand is signalled" in assertAllStagesStopped { "not fail when 0 buffer space and demand is signalled" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]() val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(0, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run() val ref = Source
.actorRef(PartialFunction.empty, PartialFunction.empty, 0, OverflowStrategy.dropHead)
.to(Sink.fromSubscriber(s))
.run()
watch(ref) watch(ref)
val sub = s.expectSubscription() val sub = s.expectSubscription()
sub.request(100) sub.request(100)
@ -84,12 +97,15 @@ class ActorRefSourceSpec extends StreamSpec {
"signal buffered elements and complete the stream after receiving Status.Success" in assertAllStagesStopped { "signal buffered elements and complete the stream after receiving Status.Success" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]() val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(3, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() val ref = Source
.actorRef({ case "ok" => CompletionStrategy.draining }, PartialFunction.empty, 3, OverflowStrategy.fail)
.to(Sink.fromSubscriber(s))
.run()
val sub = s.expectSubscription() val sub = s.expectSubscription()
ref ! 1 ref ! 1
ref ! 2 ref ! 2
ref ! 3 ref ! 3
ref ! Status.Success("ok") ref ! "ok"
sub.request(10) sub.request(10)
s.expectNext(1, 2, 3) s.expectNext(1, 2, 3)
s.expectComplete() s.expectComplete()
@ -97,22 +113,28 @@ class ActorRefSourceSpec extends StreamSpec {
"signal buffered elements and complete the stream after receiving a Status.Success companion" in assertAllStagesStopped { "signal buffered elements and complete the stream after receiving a Status.Success companion" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]() val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(3, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() val ref = Source
.actorRef({ case "ok" => CompletionStrategy.draining }, PartialFunction.empty, 3, OverflowStrategy.fail)
.to(Sink.fromSubscriber(s))
.run()
val sub = s.expectSubscription() val sub = s.expectSubscription()
ref ! 1 ref ! 1
ref ! 2 ref ! 2
ref ! 3 ref ! 3
ref ! Status.Success ref ! "ok"
sub.request(10) sub.request(10)
s.expectNext(1, 2, 3) s.expectNext(1, 2, 3)
s.expectComplete() s.expectComplete()
} }
"signal buffered elements and complete the stream after receiving a Status.Success with CompletionStrategy.Draining" in assertAllStagesStopped { "signal buffered elements and complete the stream after receiving a Status.Success with CompletionStrategy.Draining" in assertAllStagesStopped {
val (ref, s) = Source.actorRef(100, OverflowStrategy.fail).toMat(TestSink.probe[Int])(Keep.both).run() val (ref, s) = Source
.actorRef({ case "ok" => CompletionStrategy.draining }, PartialFunction.empty, 100, OverflowStrategy.fail)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
for (n <- 1 to 20) ref ! n for (n <- 1 to 20) ref ! n
ref ! Status.Success(CompletionStrategy.Draining) ref ! "ok"
s.request(10) s.request(10)
for (n <- 1 to 10) s.expectNext(n) for (n <- 1 to 10) s.expectNext(n)
@ -123,10 +145,13 @@ class ActorRefSourceSpec extends StreamSpec {
} }
"not signal buffered elements but complete immediately the stream after receiving a Status.Success with CompletionStrategy.Immediately" in assertAllStagesStopped { "not signal buffered elements but complete immediately the stream after receiving a Status.Success with CompletionStrategy.Immediately" in assertAllStagesStopped {
val (ref, s) = Source.actorRef(100, OverflowStrategy.fail).toMat(TestSink.probe[Int])(Keep.both).run() val (ref, s) = Source
.actorRef({ case "ok" => CompletionStrategy.immediately }, PartialFunction.empty, 100, OverflowStrategy.fail)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
for (n <- 1 to 20) ref ! n for (n <- 1 to 20) ref ! n
ref ! Status.Success(CompletionStrategy.Immediately) ref ! "ok"
s.request(10) s.request(10)
@ -144,7 +169,10 @@ class ActorRefSourceSpec extends StreamSpec {
} }
"not signal buffered elements but complete immediately the stream after receiving a PoisonPill (backwards compatibility)" in assertAllStagesStopped { "not signal buffered elements but complete immediately the stream after receiving a PoisonPill (backwards compatibility)" in assertAllStagesStopped {
val (ref, s) = Source.actorRef(100, OverflowStrategy.fail).toMat(TestSink.probe[Int])(Keep.both).run() val (ref, s) = Source
.actorRef(PartialFunction.empty, PartialFunction.empty, 100, OverflowStrategy.fail)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
for (n <- 1 to 20) ref ! n for (n <- 1 to 20) ref ! n
ref ! PoisonPill ref ! PoisonPill
@ -166,12 +194,15 @@ class ActorRefSourceSpec extends StreamSpec {
"not buffer elements after receiving Status.Success" in assertAllStagesStopped { "not buffer elements after receiving Status.Success" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]() val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(3, OverflowStrategy.dropBuffer).to(Sink.fromSubscriber(s)).run() val ref = Source
.actorRef({ case "ok" => CompletionStrategy.draining }, PartialFunction.empty, 3, OverflowStrategy.dropBuffer)
.to(Sink.fromSubscriber(s))
.run()
val sub = s.expectSubscription() val sub = s.expectSubscription()
ref ! 1 ref ! 1
ref ! 2 ref ! 2
ref ! 3 ref ! 3
ref ! Status.Success("ok") ref ! "ok"
ref ! 100 ref ! 100
ref ! 100 ref ! 100
ref ! 100 ref ! 100
@ -182,15 +213,21 @@ class ActorRefSourceSpec extends StreamSpec {
"complete and materialize the stream after receiving Status.Success" in assertAllStagesStopped { "complete and materialize the stream after receiving Status.Success" in assertAllStagesStopped {
val (ref, done) = { val (ref, done) = {
Source.actorRef(3, OverflowStrategy.dropBuffer).toMat(Sink.ignore)(Keep.both).run() Source
.actorRef({ case "ok" => CompletionStrategy.draining }, PartialFunction.empty, 3, OverflowStrategy.dropBuffer)
.toMat(Sink.ignore)(Keep.both)
.run()
} }
ref ! Status.Success("ok") ref ! "ok"
done.futureValue should be(Done) done.futureValue should be(Done)
} }
"fail the stream when receiving Status.Failure" in assertAllStagesStopped { "fail the stream when receiving Status.Failure" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]() val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() val ref = Source
.actorRef(PartialFunction.empty, { case Status.Failure(exc) => exc }, 10, OverflowStrategy.fail)
.to(Sink.fromSubscriber(s))
.run()
s.expectSubscription() s.expectSubscription()
val exc = TE("testfailure") val exc = TE("testfailure")
ref ! Status.Failure(exc) ref ! Status.Failure(exc)
@ -201,7 +238,7 @@ class ActorRefSourceSpec extends StreamSpec {
val s = TestSubscriber.manualProbe[Int]() val s = TestSubscriber.manualProbe[Int]()
val name = "SomeCustomName" val name = "SomeCustomName"
val ref = Source val ref = Source
.actorRef(10, OverflowStrategy.fail) .actorRef(PartialFunction.empty, PartialFunction.empty, 10, OverflowStrategy.fail)
.withAttributes(Attributes.name(name)) .withAttributes(Attributes.name(name))
.to(Sink.fromSubscriber(s)) .to(Sink.fromSubscriber(s))
.run() .run()
@ -212,7 +249,8 @@ class ActorRefSourceSpec extends StreamSpec {
"be possible to run immediately, reproducer of #26714" in { "be possible to run immediately, reproducer of #26714" in {
(1 to 100).foreach { _ => (1 to 100).foreach { _ =>
val mat = Materializer(system) val mat = Materializer(system)
val source: Source[String, ActorRef] = Source.actorRef[String](10000, OverflowStrategy.fail) val source: Source[String, ActorRef] =
Source.actorRef[String](PartialFunction.empty, PartialFunction.empty, 10000, OverflowStrategy.fail)
val (_: ActorRef, _: Publisher[String]) = val (_: ActorRef, _: Publisher[String]) =
source.toMat(Sink.asPublisher(false))(Keep.both).run()(mat) source.toMat(Sink.asPublisher(false))(Keep.both).run()(mat)
mat.shutdown() mat.shutdown()

View file

@ -315,7 +315,11 @@ class FlowThrottleSpec extends StreamSpec("""
val expectedMinRate = new AtomicInteger val expectedMinRate = new AtomicInteger
val expectedMaxRate = new AtomicInteger val expectedMaxRate = new AtomicInteger
val (ref, done) = Source val (ref, done) = Source
.actorRef[Int](bufferSize = 100000, OverflowStrategy.fail) .actorRef[Int](
{ case "done" => CompletionStrategy.draining }: PartialFunction[Any, CompletionStrategy],
PartialFunction.empty,
bufferSize = 100000,
OverflowStrategy.fail)
.throttle(300, 1000.millis) .throttle(300, 1000.millis)
.toMat(Sink.foreach { elem => .toMat(Sink.foreach { elem =>
val now = System.nanoTime() val now = System.nanoTime()
@ -366,7 +370,7 @@ class FlowThrottleSpec extends StreamSpec("""
} }
} }
} }
ref ! akka.actor.Status.Success("done") ref ! "done"
Await.result(done, 20.seconds) should ===(Done) Await.result(done, 20.seconds) should ===(Done)
} }

View file

@ -5,31 +5,18 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.NotUsed import akka.NotUsed
import akka.actor.Actor import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, ActorSystem, ActorSystemImpl, Identify, Props }
import akka.actor.ActorIdentity
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.ActorSystemImpl
import akka.actor.Identify
import akka.actor.Props
import akka.actor.Status.Failure
import akka.pattern._ import akka.pattern._
import akka.stream._ import akka.stream._
import akka.stream.impl.streamref.SinkRefImpl import akka.stream.impl.streamref.{ SinkRefImpl, SourceRefImpl }
import akka.stream.impl.streamref.SourceRefImpl
import akka.stream.testkit.TestPublisher import akka.stream.testkit.TestPublisher
import akka.stream.testkit.scaladsl._ import akka.stream.testkit.scaladsl._
import akka.testkit.AkkaSpec import akka.testkit.{ AkkaSpec, ImplicitSender, TestKit, TestProbe }
import akka.testkit.ImplicitSender
import akka.testkit.TestKit
import akka.testkit.TestProbe
import akka.util.ByteString import akka.util.ByteString
import com.typesafe.config._ import com.typesafe.config._
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Await import scala.concurrent.{ Await, Future }
import scala.concurrent.Future
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
@ -96,7 +83,7 @@ object StreamRefsSpec {
* For them it's a Sink; for us it's a Source. * For them it's a Sink; for us it's a Source.
*/ */
val sink = val sink =
StreamRefs.sinkRef[String]().to(Sink.actorRef(probe, "<COMPLETE>")).run() StreamRefs.sinkRef[String]().to(Sink.actorRef(probe, "<COMPLETE>", f => "<FAILED>: " + f.getMessage)).run()
sender() ! sink sender() ! sink
case "receive-ignore" => case "receive-ignore" =>
@ -108,7 +95,7 @@ object StreamRefsSpec {
val sink = StreamRefs val sink = StreamRefs
.sinkRef[String]() .sinkRef[String]()
.withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis)) .withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis))
.to(Sink.actorRef(probe, "<COMPLETE>")) .to(Sink.actorRef(probe, "<COMPLETE>", f => "<FAILED>: " + f.getMessage))
.run() .run()
sender() ! sink sender() ! sink
@ -209,7 +196,7 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend
remoteActor ! "give" remoteActor ! "give"
val sourceRef = expectMsgType[SourceRef[String]] val sourceRef = expectMsgType[SourceRef[String]]
sourceRef.runWith(Sink.actorRef(p.ref, "<COMPLETE>")) sourceRef.runWith(Sink.actorRef(p.ref, "<COMPLETE>", _ => "<FAILED>"))
p.expectMsg("hello") p.expectMsg("hello")
p.expectMsg("world") p.expectMsg("world")
@ -220,12 +207,12 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend
remoteActor ! "give-fail" remoteActor ! "give-fail"
val sourceRef = expectMsgType[SourceRef[String]] val sourceRef = expectMsgType[SourceRef[String]]
sourceRef.runWith(Sink.actorRef(p.ref, "<COMPLETE>")) sourceRef.runWith(Sink.actorRef(p.ref, "<COMPLETE>", t => "<FAILED>: " + t.getMessage))
val f = p.expectMsgType[Failure] val f = p.expectMsgType[String]
f.cause.getMessage should include("Remote stream (") f should include("Remote stream (")
// actor name here, for easier identification // actor name here, for easier identification
f.cause.getMessage should include("failed, reason: Booooom!") f should include("failed, reason: Booooom!")
} }
"complete properly when remote source is empty" in { "complete properly when remote source is empty" in {
@ -234,7 +221,7 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend
remoteActor ! "give-complete-asap" remoteActor ! "give-complete-asap"
val sourceRef = expectMsgType[SourceRef[String]] val sourceRef = expectMsgType[SourceRef[String]]
sourceRef.runWith(Sink.actorRef(p.ref, "<COMPLETE>")) sourceRef.runWith(Sink.actorRef(p.ref, "<COMPLETE>", _ => "<FAILED>"))
p.expectMsg("<COMPLETE>") p.expectMsg("<COMPLETE>")
} }
@ -332,10 +319,10 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend
val remoteFailureMessage = "Booom!" val remoteFailureMessage = "Booom!"
Source.failed(new Exception(remoteFailureMessage)).to(remoteSink).run() Source.failed(new Exception(remoteFailureMessage)).to(remoteSink).run()
val f = p.expectMsgType[akka.actor.Status.Failure] val f = p.expectMsgType[String]
f.cause.getMessage should include(s"Remote stream (") f should include(s"Remote stream (")
// actor name ere, for easier identification // actor name ere, for easier identification
f.cause.getMessage should include(s"failed, reason: $remoteFailureMessage") f should include(s"failed, reason: $remoteFailureMessage")
} }
"receive hundreds of elements via remoting" in { "receive hundreds of elements via remoting" in {
@ -359,8 +346,8 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSend
val probe = TestSource.probe[String](system).to(remoteSink).run() val probe = TestSource.probe[String](system).to(remoteSink).run()
val failure = p.expectMsgType[Failure] val failure = p.expectMsgType[String]
failure.cause.getMessage should include("Remote side did not subscribe (materialize) handed out Sink reference") failure should include("Remote side did not subscribe (materialize) handed out Sink reference")
// the local "remote sink" should cancel, since it should notice the origin target actor is dead // the local "remote sink" should cancel, since it should notice the origin target actor is dead
probe.expectCancellation() probe.expectCancellation()

View file

@ -49,6 +49,40 @@ object ActorSink {
* When the stream is completed with failure - result of `onFailureMessage(throwable)` * When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor. * function will be sent to the destination actor.
*/ */
def actorRefWithBackpressure[T, M, A](
ref: ActorRef[M],
messageAdapter: akka.japi.function.Function2[ActorRef[A], T, M],
onInitMessage: akka.japi.function.Function[ActorRef[A], M],
ackMessage: A,
onCompleteMessage: M,
onFailureMessage: akka.japi.function.Function[Throwable, M]): Sink[T, NotUsed] =
typed.scaladsl.ActorSink
.actorRefWithBackpressure(
ref,
messageAdapter.apply,
onInitMessage.apply,
ackMessage,
onCompleteMessage,
onFailureMessage.apply)
.asJava
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is always `onInitMessage`, then stream is waiting for acknowledgement message
* `ackMessage` from the given actor which means that it is ready to process
* elements. It also requires `ackMessage` message after each stream element
* to make backpressure work.
*
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor.
*
* @deprecated Use actorRefWithBackpressure instead
*/
@Deprecated
@deprecated("Use actorRefWithBackpressure instead", "2.6.0")
def actorRefWithAck[T, M, A]( def actorRefWithAck[T, M, A](
ref: ActorRef[M], ref: ActorRef[M],
messageAdapter: akka.japi.function.Function2[ActorRef[A], T, M], messageAdapter: akka.japi.function.Function2[ActorRef[A], T, M],
@ -57,7 +91,7 @@ object ActorSink {
onCompleteMessage: M, onCompleteMessage: M,
onFailureMessage: akka.japi.function.Function[Throwable, M]): Sink[T, NotUsed] = onFailureMessage: akka.japi.function.Function[Throwable, M]): Sink[T, NotUsed] =
typed.scaladsl.ActorSink typed.scaladsl.ActorSink
.actorRefWithAck( .actorRefWithBackpressure(
ref, ref,
messageAdapter.apply, messageAdapter.apply,
onInitMessage.apply, onInitMessage.apply,

View file

@ -82,13 +82,56 @@ object ActorSource {
* The actor will be stopped when the stream is completed, failed or canceled from downstream, * The actor will be stopped when the stream is completed, failed or canceled from downstream,
* i.e. you can watch it to get notified when that happens. * i.e. you can watch it to get notified when that happens.
*/ */
def actorRefWithBackpressure[T, Ack](
ackTo: ActorRef[Ack],
ackMessage: Ack,
completionMatcher: akka.japi.function.Function[T, java.util.Optional[CompletionStrategy]],
failureMatcher: akka.japi.function.Function[T, java.util.Optional[Throwable]]): Source[T, ActorRef[T]] =
akka.stream.typed.scaladsl.ActorSource
.actorRefWithBackpressure[T, Ack](
ackTo,
ackMessage,
new JavaPartialFunction[T, CompletionStrategy] {
override def apply(x: T, isCheck: Boolean): CompletionStrategy = {
val result = completionMatcher(x)
if (!result.isPresent) throw JavaPartialFunction.noMatch()
else result.get()
}
},
new JavaPartialFunction[T, Throwable] {
override def apply(x: T, isCheck: Boolean): Throwable = {
val result = failureMatcher(x)
if (!result.isPresent) throw JavaPartialFunction.noMatch()
else result.get()
}
})
.asJava
/**
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
* and a new message will only be accepted after the previous messages has been consumed and acknowledged back.
* The stream will complete with failure if a message is sent before the acknowledgement has been replied back.
*
* The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
* [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
* a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
* the failure will be signaled downstream immediately (instead of the completion signal).
*
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
* i.e. you can watch it to get notified when that happens.
*
* @deprecated Use actorRefWithBackpressure instead
*/
@Deprecated
@deprecated("Use actorRefWithBackpressure instead", "2.6.0")
def actorRefWithAck[T, Ack]( def actorRefWithAck[T, Ack](
ackTo: ActorRef[Ack], ackTo: ActorRef[Ack],
ackMessage: Ack, ackMessage: Ack,
completionMatcher: akka.japi.function.Function[T, java.util.Optional[CompletionStrategy]], completionMatcher: akka.japi.function.Function[T, java.util.Optional[CompletionStrategy]],
failureMatcher: akka.japi.function.Function[T, java.util.Optional[Throwable]]): Source[T, ActorRef[T]] = failureMatcher: akka.japi.function.Function[T, java.util.Optional[Throwable]]): Source[T, ActorRef[T]] =
akka.stream.typed.scaladsl.ActorSource akka.stream.typed.scaladsl.ActorSource
.actorRefWithAck[T, Ack]( .actorRefWithBackpressure[T, Ack](
ackTo, ackTo,
ackMessage, ackMessage,
new JavaPartialFunction[T, CompletionStrategy] { new JavaPartialFunction[T, CompletionStrategy] {

View file

@ -46,6 +46,35 @@ object ActorSink {
* When the stream is completed with failure - result of `onFailureMessage(throwable)` * When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor. * function will be sent to the destination actor.
*/ */
def actorRefWithBackpressure[T, M, A](
ref: ActorRef[M],
messageAdapter: (ActorRef[A], T) => M,
onInitMessage: ActorRef[A] => M,
ackMessage: A,
onCompleteMessage: M,
onFailureMessage: Throwable => M): Sink[T, NotUsed] =
Sink.actorRefWithAck(
ref.toClassic,
messageAdapter.curried.compose(actorRefAdapter),
onInitMessage.compose(actorRefAdapter),
ackMessage,
onCompleteMessage,
onFailureMessage)
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is always `onInitMessage`, then stream is waiting for acknowledgement message
* `ackMessage` from the given actor which means that it is ready to process
* elements. It also requires `ackMessage` message after each stream element
* to make backpressure work.
*
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor.
*/
@deprecated("Use actorRefWithBackpressure instead", "2.6.0")
def actorRefWithAck[T, M, A]( def actorRefWithAck[T, M, A](
ref: ActorRef[M], ref: ActorRef[M],
messageAdapter: (ActorRef[A], T) => M, messageAdapter: (ActorRef[A], T) => M,

View file

@ -61,7 +61,7 @@ object ActorSource {
.mapMaterializedValue(actorRefAdapter) .mapMaterializedValue(actorRefAdapter)
/** /**
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. * Creates a `Source` that is materialized as an [[akka.actor.typed.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream, * Messages sent to this actor will be emitted to the stream if there is demand from downstream,
* and a new message will only be accepted after the previous messages has been consumed and acknowledged back. * and a new message will only be accepted after the previous messages has been consumed and acknowledged back.
* The stream will complete with failure if a message is sent before the acknowledgement has been replied back. * The stream will complete with failure if a message is sent before the acknowledgement has been replied back.
@ -74,6 +74,34 @@ object ActorSource {
* The actor will be stopped when the stream is completed, failed or canceled from downstream, * The actor will be stopped when the stream is completed, failed or canceled from downstream,
* i.e. you can watch it to get notified when that happens. * i.e. you can watch it to get notified when that happens.
*/ */
def actorRefWithBackpressure[T, Ack](
ackTo: ActorRef[Ack],
ackMessage: Ack,
completionMatcher: PartialFunction[T, CompletionStrategy],
failureMatcher: PartialFunction[T, Throwable]): Source[T, ActorRef[T]] =
Source
.actorRefWithAck[T](
Some(ackTo.toClassic),
ackMessage,
completionMatcher.asInstanceOf[PartialFunction[Any, CompletionStrategy]],
failureMatcher.asInstanceOf[PartialFunction[Any, Throwable]])
.mapMaterializedValue(actorRefAdapter)
/**
* Creates a `Source` that is materialized as an [[akka.actor.typed.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
* and a new message will only be accepted after the previous messages has been consumed and acknowledged back.
* The stream will complete with failure if a message is sent before the acknowledgement has been replied back.
*
* The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
* [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
* a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
* the failure will be signaled downstream immediately (instead of the completion signal).
*
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
* i.e. you can watch it to get notified when that happens.
*/
@deprecated("Use actorRefWithBackpressure instead", "2.6.0")
def actorRefWithAck[T, Ack]( def actorRefWithAck[T, Ack](
ackTo: ActorRef[Ack], ackTo: ActorRef[Ack],
ackMessage: Ack, ackMessage: Ack,

View file

@ -45,7 +45,7 @@ public class ActorSourceSinkCompileTest {
Source.<String>queue(10, OverflowStrategy.dropBuffer()) Source.<String>queue(10, OverflowStrategy.dropBuffer())
.to( .to(
ActorSink.actorRefWithAck( ActorSink.actorRefWithBackpressure(
ref, ref,
(sender, msg) -> new Init(), (sender, msg) -> new Init(),
(sender) -> new Msg(), (sender) -> new Msg(),

View file

@ -4,18 +4,18 @@
package docs.akka.stream.typed; package docs.akka.stream.typed;
// #actor-sink-ref-with-ack // #actor-sink-ref-with-backpressure
import akka.NotUsed; import akka.NotUsed;
import akka.actor.typed.ActorRef; import akka.actor.typed.ActorRef;
import akka.stream.ActorMaterializer; import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink; import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source; import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSink; import akka.stream.typed.javadsl.ActorSink;
// #actor-sink-ref-with-ack // #actor-sink-ref-with-backpressure
public class ActorSinkWithAckExample { public class ActorSinkWithAckExample {
// #actor-sink-ref-with-ack // #actor-sink-ref-with-backpressure
class Ack {} class Ack {}
@ -48,20 +48,20 @@ public class ActorSinkWithAckExample {
this.ex = ex; this.ex = ex;
} }
} }
// #actor-sink-ref-with-ack // #actor-sink-ref-with-backpressure
final ActorMaterializer mat = null; final ActorMaterializer mat = null;
{ {
// #actor-sink-ref-with-ack // #actor-sink-ref-with-backpressure
final ActorRef<Protocol> actor = null; final ActorRef<Protocol> actor = null;
final Sink<String, NotUsed> sink = final Sink<String, NotUsed> sink =
ActorSink.actorRefWithAck( ActorSink.actorRefWithBackpressure(
actor, Message::new, Init::new, new Ack(), new Complete(), Fail::new); actor, Message::new, Init::new, new Ack(), new Complete(), Fail::new);
Source.single("msg1").runWith(sink, mat); Source.single("msg1").runWith(sink, mat);
// #actor-sink-ref-with-ack // #actor-sink-ref-with-backpressure
} }
} }

View file

@ -65,7 +65,7 @@ class ActorSourceSinkSpec extends ScalaTestWithActorTestKit with WordSpecLike {
val in = val in =
Source Source
.queue[String](10, OverflowStrategy.dropBuffer) .queue[String](10, OverflowStrategy.dropBuffer)
.to(ActorSink.actorRefWithAck(pilotRef, Msg.apply, Init.apply, "ACK", Complete, _ => Failed)) .to(ActorSink.actorRefWithBackpressure(pilotRef, Msg.apply, Init.apply, "ACK", Complete, _ => Failed))
.run() .run()
p.expectMessageType[Init] p.expectMessageType[Init]
@ -110,7 +110,7 @@ class ActorSourceSinkSpec extends ScalaTestWithActorTestKit with WordSpecLike {
val p = TestProbe[String]() val p = TestProbe[String]()
val (in, out) = ActorSource val (in, out) = ActorSource
.actorRefWithAck[String, String]( .actorRefWithBackpressure[String, String](
p.ref, p.ref,
"ack", { case "complete" => CompletionStrategy.draining }, "ack", { case "complete" => CompletionStrategy.draining },
PartialFunction.empty) PartialFunction.empty)

View file

@ -69,7 +69,7 @@ object ActorSourceSinkExample {
def targetActor(): ActorRef[Protocol] = ??? def targetActor(): ActorRef[Protocol] = ???
// #actor-sink-ref-with-ack // #actor-sink-ref-with-backpressure
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.stream.scaladsl.{ Sink, Source } import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSink import akka.stream.typed.scaladsl.ActorSink
@ -85,7 +85,7 @@ object ActorSourceSinkExample {
val actor: ActorRef[Protocol] = targetActor() val actor: ActorRef[Protocol] = targetActor()
val sink: Sink[String, NotUsed] = ActorSink.actorRefWithAck( val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure(
ref = actor, ref = actor,
onCompleteMessage = Complete, onCompleteMessage = Complete,
onFailureMessage = Fail.apply, onFailureMessage = Fail.apply,
@ -94,6 +94,6 @@ object ActorSourceSinkExample {
ackMessage = Ack) ackMessage = Ack)
Source.single("msg1").runWith(sink) Source.single("msg1").runWith(sink)
// #actor-sink-ref-with-ack // #actor-sink-ref-with-backpressure
} }
} }

View file

@ -25,7 +25,7 @@ import akka.stream.stage._
onFailureMessage: (Throwable) => Any) onFailureMessage: (Throwable) => Any)
extends GraphStage[SinkShape[In]] { extends GraphStage[SinkShape[In]] {
val in: Inlet[In] = Inlet[In]("ActorRefBackpressureSink.in") val in: Inlet[In] = Inlet[In]("ActorRefBackpressureSink.in")
override def initialAttributes = DefaultAttributes.actorRefWithAck override def initialAttributes = DefaultAttributes.actorRefWithBackpressureSink
override val shape: SinkShape[In] = SinkShape(in) override val shape: SinkShape[In] = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =

View file

@ -29,7 +29,7 @@ private object ActorRefBackpressureSource {
val out: Outlet[T] = Outlet[T]("actorRefSource.out") val out: Outlet[T] = Outlet[T]("actorRefSource.out")
override val shape: SourceShape[T] = SourceShape.of(out) override val shape: SourceShape[T] = SourceShape.of(out)
override def initialAttributes: Attributes = DefaultAttributes.actorRefWithAckSource override def initialAttributes: Attributes = DefaultAttributes.actorRefWithBackpressureSource
def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ActorRef) = def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ActorRef) =
throw new IllegalStateException("Not supported") throw new IllegalStateException("Not supported")

View file

@ -109,7 +109,7 @@ import akka.stream._
val subscriberSource = name("subscriberSource") val subscriberSource = name("subscriberSource")
val actorPublisherSource = name("actorPublisherSource") val actorPublisherSource = name("actorPublisherSource")
val actorRefSource = name("actorRefSource") val actorRefSource = name("actorRefSource")
val actorRefWithAckSource = name("actorRefWithAckSource") val actorRefWithBackpressureSource = name("actorRefWithBackpressureSource")
val queueSource = name("queueSource") val queueSource = name("queueSource")
val inputStreamSource = name("inputStreamSource") and IODispatcher val inputStreamSource = name("inputStreamSource") and IODispatcher
val outputStreamSource = name("outputStreamSource") and IODispatcher val outputStreamSource = name("outputStreamSource") and IODispatcher
@ -132,7 +132,7 @@ import akka.stream._
val fanoutPublisherSink = name("fanoutPublisherSink") val fanoutPublisherSink = name("fanoutPublisherSink")
val ignoreSink = name("ignoreSink") val ignoreSink = name("ignoreSink")
val actorRefSink = name("actorRefSink") val actorRefSink = name("actorRefSink")
val actorRefWithAck = name("actorRefWithAckSink") val actorRefWithBackpressureSink = name("actorRefWithBackpressureSink")
val actorSubscriberSink = name("actorSubscriberSink") val actorSubscriberSink = name("actorSubscriberSink")
val queueSink = name("queueSink") val queueSink = name("queueSink")
val lazySink = name("lazySink") val lazySink = name("lazySink")

View file

@ -8,19 +8,13 @@ import java.util.Optional
import java.util.concurrent.CompletionStage import java.util.concurrent.CompletionStage
import java.util.function.BiFunction import java.util.function.BiFunction
import akka.Done import akka.actor.{ ActorRef, ClassicActorSystemProvider, Status }
import akka.NotUsed
import akka.actor.ActorRef
import akka.actor.ClassicActorSystemProvider
import akka.dispatch.ExecutionContexts import akka.dispatch.ExecutionContexts
import akka.japi import akka._
import akka.japi.function import akka.japi.function
import akka.stream._
import akka.stream.impl.LinearTraversalBuilder import akka.stream.impl.LinearTraversalBuilder
import akka.stream.javadsl import akka.stream.{ javadsl, scaladsl, _ }
import akka.stream.scaladsl import org.reactivestreams.{ Publisher, Subscriber }
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable import scala.collection.immutable
@ -245,7 +239,7 @@ object Sink {
* *
*/ */
def actorRef[In](ref: ActorRef, onCompleteMessage: Any): Sink[In, NotUsed] = def actorRef[In](ref: ActorRef, onCompleteMessage: Any): Sink[In, NotUsed] =
new Sink(scaladsl.Sink.actorRef[In](ref, onCompleteMessage)) new Sink(scaladsl.Sink.actorRef[In](ref, onCompleteMessage, (t: Throwable) => Status.Failure(t)))
/** /**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
@ -260,6 +254,33 @@ object Sink {
* When the stream is completed with failure - result of `onFailureMessage(throwable)` * When the stream is completed with failure - result of `onFailureMessage(throwable)`
* message will be sent to the destination actor. * message will be sent to the destination actor.
*/ */
def actorRefWithBackpressure[In](
ref: ActorRef,
onInitMessage: Any,
ackMessage: Any,
onCompleteMessage: Any,
onFailureMessage: function.Function[Throwable, Any]): Sink[In, NotUsed] =
new Sink(
scaladsl.Sink
.actorRefWithBackpressure[In](ref, onInitMessage, ackMessage, onCompleteMessage, t => onFailureMessage(t)))
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is always `onInitMessage`, then stream is waiting for acknowledgement message
* `ackMessage` from the given actor which means that it is ready to process
* elements. It also requires `ackMessage` message after each stream element
* to make backpressure work.
*
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* message will be sent to the destination actor.
*
* @deprecated Use actorRefWithBackpressure instead
*/
@Deprecated
@deprecated("Use actorRefWithBackpressure instead", "2.6.0")
def actorRefWithAck[In]( def actorRefWithAck[In](
ref: ActorRef, ref: ActorRef,
onInitMessage: Any, onInitMessage: Any,
@ -267,7 +288,8 @@ object Sink {
onCompleteMessage: Any, onCompleteMessage: Any,
onFailureMessage: function.Function[Throwable, Any]): Sink[In, NotUsed] = onFailureMessage: function.Function[Throwable, Any]): Sink[In, NotUsed] =
new Sink( new Sink(
scaladsl.Sink.actorRefWithAck[In](ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage.apply _)) scaladsl.Sink
.actorRefWithBackpressure[In](ref, onInitMessage, ackMessage, onCompleteMessage, t => onFailureMessage(t)))
/** /**
* A graph with the shape of a sink logically is a sink, this method makes * A graph with the shape of a sink logically is a sink, this method makes

View file

@ -6,33 +6,27 @@ package akka.stream.javadsl
import java.util import java.util
import java.util.Optional import java.util.Optional
import java.util.concurrent.{ CompletableFuture, CompletionStage }
import java.util.function.{ BiFunction, Supplier }
import akka.actor.{ ActorRef, Cancellable } import akka.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider }
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.japi.{ function, Pair, Util } import akka.japi.{ function, JavaPartialFunction, Pair, Util }
import akka.stream._ import akka.stream._
import akka.stream.impl.LinearTraversalBuilder import akka.stream.impl.LinearTraversalBuilder
import akka.util.{ ConstantFun, Timeout }
import akka.util.JavaDurationConverters._ import akka.util.JavaDurationConverters._
import akka.util.ccompat.JavaConverters._
import akka.util.{ unused, _ }
import akka.{ Done, NotUsed } import akka.{ Done, NotUsed }
import com.github.ghik.silencer.silent
import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
import akka.util.ccompat.JavaConverters._
import scala.collection.immutable import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Future, Promise } import scala.concurrent.{ Future, Promise }
import scala.compat.java8.OptionConverters._
import java.util.concurrent.CompletionStage
import java.util.concurrent.CompletableFuture
import java.util.function.{ BiFunction, Supplier }
import akka.actor.ClassicActorSystemProvider
import akka.util.unused
import com.github.ghik.silencer.silent
import scala.compat.java8.FutureConverters._
import scala.reflect.ClassTag import scala.reflect.ClassTag
/** Java API */ /** Java API */
@ -287,6 +281,64 @@ object Source {
def asSubscriber[T](): Source[T, Subscriber[T]] = def asSubscriber[T](): Source[T, Subscriber[T]] =
new Source(scaladsl.Source.asSubscriber) new Source(scaladsl.Source.asSubscriber)
/**
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received.
*
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
* there is no space available in the buffer.
*
* The strategy [[akka.stream.OverflowStrategy.backpressure]] is not supported, and an
* IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.
*
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand
* from downstream. When `bufferSize` is 0 the `overflowStrategy` does not matter. An async boundary is added after
* this Source; as such, it is never safe to assume the downstream will always generate demand.
*
* The stream can be completed successfully by sending the actor reference a message that is matched by
* `completionMatcher` in which case already buffered elements will be signaled before signaling
* completion.
*
* The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
* [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
* a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
* the failure will be signaled downstream immediately (instead of the completion signal).
*
* Note that terminating the actor without first completing it, either with a success or a
* failure, will prevent the actor triggering downstream completion and the stream will continue
* to run even though the source actor is dead. Therefore you should **not** attempt to
* manually terminate the actor such as with a [[akka.actor.PoisonPill]].
*
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
* i.e. you can watch it to get notified when that happens.
*
* See also [[akka.stream.scaladsl.Source.queue]].
*
* @param completionMatcher catches the completion message to end the stream
* @param failureMatcher catches the failure message to fail the stream
* @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def actorRef[T](
completionMatcher: akka.japi.function.Function[Any, java.util.Optional[CompletionStrategy]],
failureMatcher: akka.japi.function.Function[Any, java.util.Optional[Throwable]],
bufferSize: Int,
overflowStrategy: OverflowStrategy): Source[T, ActorRef] =
new Source(scaladsl.Source.actorRef(new JavaPartialFunction[Any, CompletionStrategy] {
override def apply(x: Any, isCheck: Boolean): CompletionStrategy = {
val result = completionMatcher(x)
if (!result.isPresent) throw JavaPartialFunction.noMatch()
else result.get()
}
}, new JavaPartialFunction[Any, Throwable] {
override def apply(x: Any, isCheck: Boolean): Throwable = {
val result = failureMatcher(x)
if (!result.isPresent) throw JavaPartialFunction.noMatch()
else result.get()
}
}, bufferSize, overflowStrategy))
/** /**
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream, * Messages sent to this actor will be emitted to the stream if there is demand from downstream,
@ -330,8 +382,82 @@ object Source {
* @param bufferSize The size of the buffer in element count * @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/ */
@Deprecated
@deprecated("Use variant accepting completion and failure matchers", "2.6.0")
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] =
new Source(scaladsl.Source.actorRef(bufferSize, overflowStrategy)) new Source(scaladsl.Source.actorRef({
case akka.actor.Status.Success(s: CompletionStrategy) => s
case akka.actor.Status.Success(_) => CompletionStrategy.Draining
case akka.actor.Status.Success => CompletionStrategy.Draining
}, { case akka.actor.Status.Failure(cause) => cause }, bufferSize, overflowStrategy))
/**
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
* and a new message will only be accepted after the previous messages has been consumed and acknowledged back.
* The stream will complete with failure if a message is sent before the acknowledgement has been replied back.
*
* The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
* [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
* a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
* the failure will be signaled downstream immediately (instead of the completion signal).
*
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
* i.e. you can watch it to get notified when that happens.
*/
def actorRefWithBackpressure[T](
ackMessage: Any,
completionMatcher: akka.japi.function.Function[Any, java.util.Optional[CompletionStrategy]],
failureMatcher: akka.japi.function.Function[Any, java.util.Optional[Throwable]]): Source[T, ActorRef] =
new Source(scaladsl.Source.actorRefWithBackpressure(ackMessage, new JavaPartialFunction[Any, CompletionStrategy] {
override def apply(x: Any, isCheck: Boolean): CompletionStrategy = {
val result = completionMatcher(x)
if (!result.isPresent) throw JavaPartialFunction.noMatch()
else result.get()
}
}, new JavaPartialFunction[Any, Throwable] {
override def apply(x: Any, isCheck: Boolean): Throwable = {
val result = failureMatcher(x)
if (!result.isPresent) throw JavaPartialFunction.noMatch()
else result.get()
}
}))
/**
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
* and a new message will only be accepted after the previous messages has been consumed and acknowledged back.
* The stream will complete with failure if a message is sent before the acknowledgement has been replied back.
*
* The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
* [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
* a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
* the failure will be signaled downstream immediately (instead of the completion signal).
*
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
* i.e. you can watch it to get notified when that happens.
*
* @deprecated Use actorRefWithBackpressure instead
*/
@Deprecated
@deprecated("Use actorRefWithBackpressure instead", "2.6.0")
def actorRefWithAck[T](
ackMessage: Any,
completionMatcher: akka.japi.function.Function[Any, java.util.Optional[CompletionStrategy]],
failureMatcher: akka.japi.function.Function[Any, java.util.Optional[Throwable]]): Source[T, ActorRef] =
new Source(scaladsl.Source.actorRefWithBackpressure(ackMessage, new JavaPartialFunction[Any, CompletionStrategy] {
override def apply(x: Any, isCheck: Boolean): CompletionStrategy = {
val result = completionMatcher(x)
if (!result.isPresent) throw JavaPartialFunction.noMatch()
else result.get()
}
}, new JavaPartialFunction[Any, Throwable] {
override def apply(x: Any, isCheck: Boolean): Throwable = {
val result = failureMatcher(x)
if (!result.isPresent) throw JavaPartialFunction.noMatch()
else result.get()
}
}))
/** /**
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
@ -352,8 +478,14 @@ object Source {
* The actor will be stopped when the stream is completed, failed or canceled from downstream, * The actor will be stopped when the stream is completed, failed or canceled from downstream,
* i.e. you can watch it to get notified when that happens. * i.e. you can watch it to get notified when that happens.
*/ */
@Deprecated
@deprecated("Use actorRefWithBackpressure accepting completion and failure matchers", "2.6.0")
def actorRefWithAck[T](ackMessage: Any): Source[T, ActorRef] = def actorRefWithAck[T](ackMessage: Any): Source[T, ActorRef] =
new Source(scaladsl.Source.actorRefWithAck(ackMessage)) new Source(scaladsl.Source.actorRefWithBackpressure(ackMessage, {
case akka.actor.Status.Success(s: CompletionStrategy) => s
case akka.actor.Status.Success(_) => CompletionStrategy.Draining
case akka.actor.Status.Success => CompletionStrategy.Draining
}, { case akka.actor.Status.Failure(cause) => cause }))
/** /**
* A graph with the shape of a source logically is a source, this method makes * A graph with the shape of a source logically is a source, this method makes

View file

@ -453,10 +453,7 @@ object Sink {
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
* limiting operator in front of this `Sink`. * limiting operator in front of this `Sink`.
*/ */
@InternalApi private[akka] def actorRef[T]( def actorRef[T](ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable => Any): Sink[T, NotUsed] =
ref: ActorRef,
onCompleteMessage: Any,
onFailureMessage: Throwable => Any): Sink[T, NotUsed] =
fromGraph(new ActorRefSinkStage[T](ref, onCompleteMessage, onFailureMessage)) fromGraph(new ActorRefSinkStage[T](ref, onCompleteMessage, onFailureMessage))
/** /**
@ -474,6 +471,7 @@ object Sink {
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
* limiting operator in front of this `Sink`. * limiting operator in front of this `Sink`.
*/ */
@deprecated("Use variant accepting both on complete and on failure message", "2.6.0")
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed] = def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed] =
fromGraph(new ActorRefSinkStage[T](ref, onCompleteMessage, t => Status.Failure(t))) fromGraph(new ActorRefSinkStage[T](ref, onCompleteMessage, t => Status.Failure(t)))
@ -525,8 +523,29 @@ object Sink {
* will be sent to the destination actor. * will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)` * When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor. * function will be sent to the destination actor.
*
*/ */
def actorRefWithBackpressure[T](
ref: ActorRef,
onInitMessage: Any,
ackMessage: Any,
onCompleteMessage: Any,
onFailureMessage: Throwable => Any): Sink[T, NotUsed] =
actorRefWithAck(ref, _ => identity, _ => onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is always `onInitMessage`, then stream is waiting for acknowledgement message
* `ackMessage` from the given actor which means that it is ready to process
* elements. It also requires `ackMessage` message after each stream element
* to make backpressure work.
*
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor.
*/
@deprecated("Use actorRefWithBackpressure accepting completion and failure matchers instead", "2.6.0")
def actorRefWithAck[T]( def actorRefWithAck[T](
ref: ActorRef, ref: ActorRef,
onInitMessage: Any, onInitMessage: Any,

View file

@ -482,8 +482,6 @@ object Source {
fromGraph(new SubscriberSource[T](DefaultAttributes.subscriberSource, shape("SubscriberSource"))) fromGraph(new SubscriberSource[T](DefaultAttributes.subscriberSource, shape("SubscriberSource")))
/** /**
* INTERNAL API
*
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream, * Messages sent to this actor will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received. * otherwise they will be buffered until request for demand is received.
@ -517,10 +515,12 @@ object Source {
* *
* See also [[akka.stream.scaladsl.Source.queue]]. * See also [[akka.stream.scaladsl.Source.queue]].
* *
* @param completionMatcher catches the completion message to end the stream
* @param failureMatcher catches the failure message to fail the stream
* @param bufferSize The size of the buffer in element count * @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/ */
@InternalApi private[akka] def actorRef[T]( def actorRef[T](
completionMatcher: PartialFunction[Any, CompletionStrategy], completionMatcher: PartialFunction[Any, CompletionStrategy],
failureMatcher: PartialFunction[Any, Throwable], failureMatcher: PartialFunction[Any, Throwable],
bufferSize: Int, bufferSize: Int,
@ -568,6 +568,7 @@ object Source {
* @param bufferSize The size of the buffer in element count * @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/ */
@deprecated("Use variant accepting completion and failure matchers instead", "2.6.0")
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] =
actorRef({ actorRef({
case akka.actor.Status.Success(s: CompletionStrategy) => s case akka.actor.Status.Success(s: CompletionStrategy) => s
@ -586,6 +587,27 @@ object Source {
Source.fromGraph(new ActorRefBackpressureSource(ackTo, ackMessage, completionMatcher, failureMatcher)) Source.fromGraph(new ActorRefBackpressureSource(ackTo, ackMessage, completionMatcher, failureMatcher))
} }
/**
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
* and a new message will only be accepted after the previous messages has been consumed and acknowledged back.
* The stream will complete with failure if a message is sent before the acknowledgement has been replied back.
*
* The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
* [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
* a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
* the failure will be signaled downstream immediately (instead of the completion signal).
*
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
* i.e. you can watch it to get notified when that happens.
*/
def actorRefWithBackpressure[T](
ackMessage: Any,
completionMatcher: PartialFunction[Any, CompletionStrategy],
failureMatcher: PartialFunction[Any, Throwable]): Source[T, ActorRef] = {
Source.fromGraph(new ActorRefBackpressureSource(None, ackMessage, completionMatcher, failureMatcher))
}
/** /**
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream, * Messages sent to this actor will be emitted to the stream if there is demand from downstream,
@ -605,6 +627,7 @@ object Source {
* The actor will be stopped when the stream is completed, failed or canceled from downstream, * The actor will be stopped when the stream is completed, failed or canceled from downstream,
* i.e. you can watch it to get notified when that happens. * i.e. you can watch it to get notified when that happens.
*/ */
@deprecated("Use actorRefWithBackpressure accepting completion and failure matchers instead", "2.6.0")
def actorRefWithAck[T](ackMessage: Any): Source[T, ActorRef] = def actorRefWithAck[T](ackMessage: Any): Source[T, ActorRef] =
actorRefWithAck(None, ackMessage, { actorRefWithAck(None, ackMessage, {
case akka.actor.Status.Success(s: CompletionStrategy) => s case akka.actor.Status.Success(s: CompletionStrategy) => s

View file

@ -72,7 +72,8 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"alsoToGraph", "alsoToGraph",
"orElseGraph", "orElseGraph",
"divertToGraph", "divertToGraph",
"zipWithGraph" "zipWithGraph",
"actorRefWithAck" // deprecated
) )
// FIXME document these methods as well // FIXME document these methods as well
@ -104,13 +105,16 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"fromGraph", "fromGraph",
"actorSubscriber", "actorSubscriber",
"foldAsync", "foldAsync",
"newOnCompleteStage" "newOnCompleteStage",
"actorRefWithAck" // deprecated
), ),
"ActorSink" -> Seq( "ActorSink" -> Seq(
"actorRefWithAck" "actorRefWithBackpressure",
"actorRefWithAck" // deprecated
), ),
"ActorSource" -> Seq( "ActorSource" -> Seq(
"actorRef" "actorRef",
"actorRefWithAck" // deprecated
) )
) )
@ -119,7 +123,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
Set("productArity", "canEqual", "productPrefix", "copy", "productIterator", "productElement") ++ Set("productArity", "canEqual", "productPrefix", "copy", "productIterator", "productElement") ++
Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++ Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") ++ Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") ++
Set("++", "onPush", "onPull") Set("++", "onPush", "onPull", "actorRefWithAck")
def isPending(element: String, opName: String) = def isPending(element: String, opName: String) =
pendingTestCases.get(element).exists(_.contains(opName)) pendingTestCases.get(element).exists(_.contains(opName))