* Implements actorRef source variant with backpressure #17610 * Small improvements to documentation and source #17610 * Small improvements to test #17610 * Small improvements to implementation and tests #17610 * Adds API for akka-typed #17610 * Adds ack sender and java api for typed #17610
This commit is contained in:
parent
a9f4f2dd96
commit
f37f41574d
14 changed files with 472 additions and 6 deletions
|
|
@ -0,0 +1,24 @@
|
||||||
|
# actorRefWithAck
|
||||||
|
|
||||||
|
Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.
|
||||||
|
|
||||||
|
@ref[Source operators](../index.md#source-operators)
|
||||||
|
|
||||||
|
@@@ div { .group-scala }
|
||||||
|
## Signature
|
||||||
|
|
||||||
|
@@signature [Source.scala](/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala) { #actorRefWithAck }
|
||||||
|
@@@
|
||||||
|
|
||||||
|
## Description
|
||||||
|
|
||||||
|
Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`], sending messages to it will emit them on the stream. The actor responds with the provided ack message
|
||||||
|
once the element could be emitted alowing for backpressure from the source. Sending another message before the previous one has been acknowledged will fail the stream.
|
||||||
|
|
||||||
|
@@@div { .callout }
|
||||||
|
|
||||||
|
**emits** when there is demand and there are messages in the buffer or a message is sent to the `ActorRef`
|
||||||
|
|
||||||
|
**completes** when the `ActorRef` is sent `akka.actor.Status.Success`
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
@ -0,0 +1,33 @@
|
||||||
|
# actorRefWithAck
|
||||||
|
|
||||||
|
Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.
|
||||||
|
|
||||||
|
@ref[Source operators](../index.md#source-operators)
|
||||||
|
|
||||||
|
@@@ div { .group-scala }
|
||||||
|
## Signature
|
||||||
|
|
||||||
|
@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #actorRefWithAck }
|
||||||
|
@@@
|
||||||
|
|
||||||
|
## Description
|
||||||
|
|
||||||
|
Materialize an `ActorRef`, sending messages to it will emit them on the stream. The actor responds with the provided ack message
|
||||||
|
once the element could be emitted alowing for backpressure from the source. Sending another message before the previous one has been acknowledged will fail the stream.
|
||||||
|
|
||||||
|
@@@div { .callout }
|
||||||
|
|
||||||
|
**emits** when there is demand and there are messages in the buffer or a message is sent to the `ActorRef`
|
||||||
|
|
||||||
|
**completes** when the `ActorRef` is sent `akka.actor.Status.Success`
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
||||||
|
## Examples
|
||||||
|
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [actorRef.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #actorRefWithAck }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [actorRef.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #actor-ref-imports #actor-ref-with-ack }
|
||||||
|
|
@ -8,6 +8,8 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
|
||||||
| |Operator|Description|
|
| |Operator|Description|
|
||||||
|--|--|--|
|
|--|--|--|
|
||||||
|Source|<a name="actorref"></a>@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream.|
|
|Source|<a name="actorref"></a>@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream.|
|
||||||
|
|Source|<a name="actorrefwithack"></a>@ref[actorRefWithAck](Source/actorRefWithAck.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.|
|
||||||
|
|ActorSource|<a name="actorrefwithack"></a>@ref[actorRefWithAck](ActorSource/actorRefWithAck.md)|Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.|
|
||||||
|Source|<a name="assourcewithcontext"></a>@ref[asSourceWithContext](Source/asSourceWithContext.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.|
|
|Source|<a name="assourcewithcontext"></a>@ref[asSourceWithContext](Source/asSourceWithContext.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.|
|
||||||
|Source|<a name="assubscriber"></a>@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.|
|
|Source|<a name="assubscriber"></a>@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.|
|
||||||
|Source|<a name="combine"></a>@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.|
|
|Source|<a name="combine"></a>@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.|
|
||||||
|
|
@ -310,6 +312,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
|
||||||
* [lazilyAsync](Source/lazilyAsync.md)
|
* [lazilyAsync](Source/lazilyAsync.md)
|
||||||
* [asSubscriber](Source/asSubscriber.md)
|
* [asSubscriber](Source/asSubscriber.md)
|
||||||
* [actorRef](Source/actorRef.md)
|
* [actorRef](Source/actorRef.md)
|
||||||
|
* [actorRefWithAck](Source/actorRefWithAck.md)
|
||||||
* [zipN](Source/zipN.md)
|
* [zipN](Source/zipN.md)
|
||||||
* [zipWithN](Source/zipWithN.md)
|
* [zipWithN](Source/zipWithN.md)
|
||||||
* [queue](Source/queue.md)
|
* [queue](Source/queue.md)
|
||||||
|
|
@ -433,6 +436,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
|
||||||
* [withBackoff](RestartFlow/withBackoff.md)
|
* [withBackoff](RestartFlow/withBackoff.md)
|
||||||
* [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
|
* [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
|
||||||
* [withBackoff](RestartSink/withBackoff.md)
|
* [withBackoff](RestartSink/withBackoff.md)
|
||||||
|
* [actorRefWithAck](ActorSource/actorRefWithAck.md)
|
||||||
* [ask](ActorFlow/ask.md)
|
* [ask](ActorFlow/ask.md)
|
||||||
* [actorRef](ActorSink/actorRef.md)
|
* [actorRef](ActorSink/actorRef.md)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,8 +8,9 @@ package jdocs.stream.operators;
|
||||||
// #range-imports
|
// #range-imports
|
||||||
import akka.NotUsed;
|
import akka.NotUsed;
|
||||||
import akka.actor.ActorSystem;
|
import akka.actor.ActorSystem;
|
||||||
|
import akka.actor.testkit.typed.javadsl.ManualTime;
|
||||||
|
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
|
||||||
import akka.stream.ActorMaterializer;
|
import akka.stream.ActorMaterializer;
|
||||||
import akka.stream.CompletionStrategy;
|
|
||||||
import akka.stream.Materializer;
|
import akka.stream.Materializer;
|
||||||
import akka.stream.javadsl.Source;
|
import akka.stream.javadsl.Source;
|
||||||
// #range-imports
|
// #range-imports
|
||||||
|
|
@ -18,7 +19,9 @@ import akka.stream.javadsl.Source;
|
||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
import akka.actor.Status.Success;
|
import akka.actor.Status.Success;
|
||||||
import akka.stream.OverflowStrategy;
|
import akka.stream.OverflowStrategy;
|
||||||
|
import akka.stream.CompletionStrategy;
|
||||||
import akka.stream.javadsl.Sink;
|
import akka.stream.javadsl.Sink;
|
||||||
|
import akka.testkit.TestProbe;
|
||||||
// #actor-ref-imports
|
// #actor-ref-imports
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
@ -27,6 +30,8 @@ import java.util.Arrays;
|
||||||
|
|
||||||
public class SourceDocExamples {
|
public class SourceDocExamples {
|
||||||
|
|
||||||
|
public static final TestKitJunitResource testKit = new TestKitJunitResource(ManualTime.config());
|
||||||
|
|
||||||
public static void fromExample() {
|
public static void fromExample() {
|
||||||
// #source-from-example
|
// #source-from-example
|
||||||
final ActorSystem system = ActorSystem.create("SourceFromExample");
|
final ActorSystem system = ActorSystem.create("SourceFromExample");
|
||||||
|
|
@ -85,4 +90,24 @@ public class SourceDocExamples {
|
||||||
actorRef.tell(new Success(CompletionStrategy.draining()), ActorRef.noSender());
|
actorRef.tell(new Success(CompletionStrategy.draining()), ActorRef.noSender());
|
||||||
// #actor-ref
|
// #actor-ref
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void actorRefWithAck() {
|
||||||
|
final TestProbe probe = null;
|
||||||
|
|
||||||
|
// #actor-ref-with-ack
|
||||||
|
final ActorSystem system = ActorSystem.create();
|
||||||
|
final Materializer materializer = ActorMaterializer.create(system);
|
||||||
|
|
||||||
|
Source<Object, ActorRef> source = Source.actorRefWithAck("ack");
|
||||||
|
|
||||||
|
ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(materializer);
|
||||||
|
probe.send(actorRef, "hello");
|
||||||
|
probe.expectMsg("ack");
|
||||||
|
probe.send(actorRef, "hello");
|
||||||
|
probe.expectMsg("ack");
|
||||||
|
|
||||||
|
// The stream completes successfully with the following message
|
||||||
|
actorRef.tell(new Success(CompletionStrategy.draining()), ActorRef.noSender());
|
||||||
|
// #actor-ref-with-ack
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ package docs.stream.operators
|
||||||
|
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
import akka.stream.ActorMaterializer
|
import akka.stream.ActorMaterializer
|
||||||
|
import akka.testkit.TestProbe
|
||||||
|
|
||||||
object SourceOperators {
|
object SourceOperators {
|
||||||
|
|
||||||
|
|
@ -35,6 +36,7 @@ object SourceOperators {
|
||||||
import akka.actor.Status.Success
|
import akka.actor.Status.Success
|
||||||
import akka.actor.ActorRef
|
import akka.actor.ActorRef
|
||||||
import akka.stream.OverflowStrategy
|
import akka.stream.OverflowStrategy
|
||||||
|
import akka.stream.CompletionStrategy
|
||||||
import akka.stream.scaladsl._
|
import akka.stream.scaladsl._
|
||||||
|
|
||||||
implicit val system: ActorSystem = ActorSystem()
|
implicit val system: ActorSystem = ActorSystem()
|
||||||
|
|
@ -48,7 +50,32 @@ object SourceOperators {
|
||||||
actorRef ! "hello"
|
actorRef ! "hello"
|
||||||
|
|
||||||
// The stream completes successfully with the following message
|
// The stream completes successfully with the following message
|
||||||
actorRef ! Success("completes stream")
|
actorRef ! Success(CompletionStrategy.immediately)
|
||||||
//#actorRef
|
//#actorRef
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def actorRefWithAck(): Unit = {
|
||||||
|
//#actorRefWithAck
|
||||||
|
|
||||||
|
import akka.actor.Status.Success
|
||||||
|
import akka.actor.ActorRef
|
||||||
|
import akka.stream.CompletionStrategy
|
||||||
|
import akka.stream.scaladsl._
|
||||||
|
|
||||||
|
implicit val system: ActorSystem = ActorSystem()
|
||||||
|
implicit val materializer: ActorMaterializer = ActorMaterializer()
|
||||||
|
val probe = TestProbe()
|
||||||
|
|
||||||
|
val source: Source[Any, ActorRef] = Source.actorRefWithAck[Any]("ack")
|
||||||
|
val actorRef: ActorRef = source.to(Sink.foreach(println)).run()
|
||||||
|
|
||||||
|
probe.send(actorRef, "hello")
|
||||||
|
probe.expectMsg("ack")
|
||||||
|
probe.send(actorRef, "hello")
|
||||||
|
probe.expectMsg("ack")
|
||||||
|
|
||||||
|
// The stream completes successfully with the following message
|
||||||
|
actorRef ! Success(CompletionStrategy.immediately)
|
||||||
|
//#actorRefWithAck
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -142,6 +142,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
|
||||||
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRef", _ => true, _ => true), // Internal in scaladsl
|
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRef", _ => true, _ => true), // Internal in scaladsl
|
||||||
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRefWithAck", _ => true, _ => true), // Internal in scaladsl
|
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRefWithAck", _ => true, _ => true), // Internal in scaladsl
|
||||||
Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "actorRef", _ => true, _ => true), // Internal in scaladsl
|
Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "actorRef", _ => true, _ => true), // Internal in scaladsl
|
||||||
|
Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "actorRefWithAck", _ => true, _ => true), // Internal in scaladsl
|
||||||
Ignore(_ == akka.stream.scaladsl.BidiFlow.getClass, _ == "apply", _ == 24, _ => true),
|
Ignore(_ == akka.stream.scaladsl.BidiFlow.getClass, _ == "apply", _ == 24, _ => true),
|
||||||
Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "runnable", _ == 24, _ => true),
|
Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "runnable", _ == 24, _ => true),
|
||||||
Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "create", _ == 24, _ => true),
|
Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "create", _ == 24, _ => true),
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,124 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream.scaladsl
|
||||||
|
|
||||||
|
import akka.actor.Status
|
||||||
|
import akka.stream.testkit.Utils.TE
|
||||||
|
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
|
||||||
|
import akka.stream.testkit.scaladsl.TestSink
|
||||||
|
import akka.stream.{ ActorMaterializer, Attributes }
|
||||||
|
import akka.stream.testkit.StreamSpec
|
||||||
|
import akka.testkit.TestProbe
|
||||||
|
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
private object ActorRefBackpressureSourceSpec {
|
||||||
|
case object AckMsg
|
||||||
|
}
|
||||||
|
|
||||||
|
class ActorRefBackpressureSourceSpec extends StreamSpec {
|
||||||
|
import ActorRefBackpressureSourceSpec._
|
||||||
|
|
||||||
|
private implicit val materializer: ActorMaterializer = ActorMaterializer()
|
||||||
|
|
||||||
|
"An Source.actorRefWithAck" must {
|
||||||
|
|
||||||
|
"emit received messages to the stream and ack" in assertAllStagesStopped {
|
||||||
|
val probe = TestProbe()
|
||||||
|
val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run()
|
||||||
|
|
||||||
|
val sub = s.expectSubscription()
|
||||||
|
sub.request(10)
|
||||||
|
probe.send(ref, 1)
|
||||||
|
s.expectNext(1)
|
||||||
|
probe.expectMsg(AckMsg)
|
||||||
|
|
||||||
|
probe.send(ref, 2)
|
||||||
|
s.expectNext(2)
|
||||||
|
probe.expectMsg(AckMsg)
|
||||||
|
|
||||||
|
s.expectNoMessage(50.millis)
|
||||||
|
|
||||||
|
ref ! Status.Success("ok")
|
||||||
|
s.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
|
"fail when consumer does not await ack" in assertAllStagesStopped {
|
||||||
|
val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run()
|
||||||
|
|
||||||
|
val sub = s.expectSubscription()
|
||||||
|
for (n <- 1 to 20) ref ! n
|
||||||
|
sub.request(1)
|
||||||
|
|
||||||
|
@scala.annotation.tailrec
|
||||||
|
def verifyNext(n: Int): Unit = {
|
||||||
|
if (n > 10)
|
||||||
|
s.expectComplete()
|
||||||
|
else
|
||||||
|
s.expectNextOrError() match {
|
||||||
|
case Right(`n`) => verifyNext(n + 1)
|
||||||
|
case Right(x) => fail(s"expected $n, got $x")
|
||||||
|
case Left(t) => t.getMessage shouldBe "Received new element before ack was signaled back"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
verifyNext(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
"complete after receiving Status.Success" in assertAllStagesStopped {
|
||||||
|
val probe = TestProbe()
|
||||||
|
val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run()
|
||||||
|
|
||||||
|
val sub = s.expectSubscription()
|
||||||
|
sub.request(10)
|
||||||
|
probe.send(ref, 1)
|
||||||
|
s.expectNext(1)
|
||||||
|
probe.expectMsg(AckMsg)
|
||||||
|
|
||||||
|
ref ! Status.Success("ok")
|
||||||
|
|
||||||
|
s.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
|
"fail after receiving Status.Failure" in assertAllStagesStopped {
|
||||||
|
val probe = TestProbe()
|
||||||
|
val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run()
|
||||||
|
|
||||||
|
val sub = s.expectSubscription()
|
||||||
|
sub.request(10)
|
||||||
|
probe.send(ref, 1)
|
||||||
|
s.expectNext(1)
|
||||||
|
probe.expectMsg(AckMsg)
|
||||||
|
|
||||||
|
ref ! Status.Failure(TE("test"))
|
||||||
|
|
||||||
|
s.expectError(TE("test"))
|
||||||
|
}
|
||||||
|
|
||||||
|
"not buffer elements after receiving Status.Success" in assertAllStagesStopped {
|
||||||
|
val probe = TestProbe()
|
||||||
|
val (ref, s) = Source.actorRefWithAck[Int](AckMsg).toMat(TestSink.probe[Int])(Keep.both).run()
|
||||||
|
|
||||||
|
val sub = s.expectSubscription()
|
||||||
|
sub.request(10)
|
||||||
|
probe.send(ref, 1)
|
||||||
|
s.expectNext(1)
|
||||||
|
probe.expectMsg(AckMsg)
|
||||||
|
|
||||||
|
probe.send(ref, 2)
|
||||||
|
s.expectNext(2)
|
||||||
|
probe.expectMsg(AckMsg)
|
||||||
|
|
||||||
|
ref ! Status.Success("ok")
|
||||||
|
|
||||||
|
probe.send(ref, 100)
|
||||||
|
probe.send(ref, 100)
|
||||||
|
probe.send(ref, 100)
|
||||||
|
probe.send(ref, 100)
|
||||||
|
probe.expectNoMessage(200.millis)
|
||||||
|
|
||||||
|
s.expectComplete()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -7,8 +7,8 @@ package akka.stream.typed.javadsl
|
||||||
import java.util.function.Predicate
|
import java.util.function.Predicate
|
||||||
|
|
||||||
import akka.actor.typed._
|
import akka.actor.typed._
|
||||||
import akka.stream.OverflowStrategy
|
|
||||||
import akka.stream.javadsl._
|
import akka.stream.javadsl._
|
||||||
|
import akka.stream.{ CompletionStrategy, OverflowStrategy }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Collection of Sources aimed at integrating with typed Actors.
|
* Collection of Sources aimed at integrating with typed Actors.
|
||||||
|
|
@ -60,4 +60,31 @@ object ActorSource {
|
||||||
overflowStrategy)
|
overflowStrategy)
|
||||||
.asJava
|
.asJava
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
|
||||||
|
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
|
||||||
|
* and a new message will only be accepted after the previous messages has been consumed and acknowledged back.
|
||||||
|
* The stream will complete with failure if a message is sent before the acknowledgement has been replied back.
|
||||||
|
*
|
||||||
|
* The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
|
||||||
|
* [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
|
||||||
|
* a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
|
||||||
|
* the failure will be signaled downstream immediately (instead of the completion signal).
|
||||||
|
*
|
||||||
|
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
|
||||||
|
* i.e. you can watch it to get notified when that happens.
|
||||||
|
*/
|
||||||
|
def actorRefWithAck[T, Ack](
|
||||||
|
ackTo: ActorRef[Ack],
|
||||||
|
ackMessage: Ack,
|
||||||
|
completionMatcher: PartialFunction[T, CompletionStrategy],
|
||||||
|
failureMatcher: PartialFunction[T, Throwable]): Source[T, ActorRef[T]] =
|
||||||
|
akka.stream.typed.scaladsl.ActorSource
|
||||||
|
.actorRefWithAck[T, Ack](
|
||||||
|
ackTo,
|
||||||
|
ackMessage,
|
||||||
|
completionMatcher.asInstanceOf[PartialFunction[Any, CompletionStrategy]],
|
||||||
|
failureMatcher.asInstanceOf[PartialFunction[Any, Throwable]])
|
||||||
|
.asJava
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,8 +5,8 @@
|
||||||
package akka.stream.typed.scaladsl
|
package akka.stream.typed.scaladsl
|
||||||
|
|
||||||
import akka.actor.typed._
|
import akka.actor.typed._
|
||||||
import akka.stream.{ CompletionStrategy, OverflowStrategy }
|
|
||||||
import akka.stream.scaladsl._
|
import akka.stream.scaladsl._
|
||||||
|
import akka.stream.{ CompletionStrategy, OverflowStrategy }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Collection of Sources aimed at integrating with typed Actors.
|
* Collection of Sources aimed at integrating with typed Actors.
|
||||||
|
|
@ -59,4 +59,31 @@ object ActorSource {
|
||||||
bufferSize,
|
bufferSize,
|
||||||
overflowStrategy)
|
overflowStrategy)
|
||||||
.mapMaterializedValue(actorRefAdapter)
|
.mapMaterializedValue(actorRefAdapter)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
|
||||||
|
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
|
||||||
|
* and a new message will only be accepted after the previous messages has been consumed and acknowledged back.
|
||||||
|
* The stream will complete with failure if a message is sent before the acknowledgement has been replied back.
|
||||||
|
*
|
||||||
|
* The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
|
||||||
|
* [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
|
||||||
|
* a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
|
||||||
|
* the failure will be signaled downstream immediately (instead of the completion signal).
|
||||||
|
*
|
||||||
|
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
|
||||||
|
* i.e. you can watch it to get notified when that happens.
|
||||||
|
*/
|
||||||
|
def actorRefWithAck[T, Ack](
|
||||||
|
ackTo: ActorRef[Ack],
|
||||||
|
ackMessage: Ack,
|
||||||
|
completionMatcher: PartialFunction[T, CompletionStrategy],
|
||||||
|
failureMatcher: PartialFunction[T, Throwable]): Source[T, ActorRef[T]] =
|
||||||
|
Source
|
||||||
|
.actorRefWithAck[T](
|
||||||
|
Some(ackTo.toUntyped),
|
||||||
|
ackMessage,
|
||||||
|
completionMatcher.asInstanceOf[PartialFunction[Any, CompletionStrategy]],
|
||||||
|
failureMatcher.asInstanceOf[PartialFunction[Any, Throwable]])
|
||||||
|
.mapMaterializedValue(actorRefAdapter)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ package akka.stream.typed.scaladsl
|
||||||
|
|
||||||
import akka.actor.typed.ActorRef
|
import akka.actor.typed.ActorRef
|
||||||
import akka.actor.typed.scaladsl.Behaviors
|
import akka.actor.typed.scaladsl.Behaviors
|
||||||
import akka.stream.OverflowStrategy
|
import akka.stream.{ CompletionStrategy, OverflowStrategy }
|
||||||
import akka.stream.scaladsl.Keep
|
import akka.stream.scaladsl.Keep
|
||||||
import akka.stream.scaladsl.Sink
|
import akka.stream.scaladsl.Sink
|
||||||
import akka.stream.scaladsl.Source
|
import akka.stream.scaladsl.Source
|
||||||
|
|
@ -107,6 +107,25 @@ class ActorSourceSinkSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
||||||
|
|
||||||
out.failed.futureValue.getCause.getMessage shouldBe "boom!"
|
out.failed.futureValue.getCause.getMessage shouldBe "boom!"
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
"send message and ack" in {
|
||||||
|
val p = TestProbe[String]()
|
||||||
|
|
||||||
|
val (in, out) = ActorSource
|
||||||
|
.actorRefWithAck[String, String](
|
||||||
|
p.ref,
|
||||||
|
"ack", { case "complete" => CompletionStrategy.draining },
|
||||||
|
PartialFunction.empty)
|
||||||
|
.toMat(Sink.seq)(Keep.both)
|
||||||
|
.run()
|
||||||
|
|
||||||
|
in ! "one"
|
||||||
|
p.expectMessage("ack")
|
||||||
|
in ! "two"
|
||||||
|
p.expectMessage("ack")
|
||||||
|
in ! "complete"
|
||||||
|
|
||||||
|
out.futureValue should contain theSameElementsAs Seq("one", "two")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,95 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream.impl
|
||||||
|
|
||||||
|
import akka.actor.ActorRef
|
||||||
|
import akka.annotation.InternalApi
|
||||||
|
import akka.stream._
|
||||||
|
import akka.stream.impl.Stages.DefaultAttributes
|
||||||
|
import akka.stream.stage._
|
||||||
|
import akka.util.OptionVal
|
||||||
|
|
||||||
|
private object ActorRefBackpressureSource {
|
||||||
|
private sealed trait ActorRefStage { def ref: ActorRef }
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi private[akka] final class ActorRefBackpressureSource[T](
|
||||||
|
ackTo: Option[ActorRef],
|
||||||
|
ackMessage: Any,
|
||||||
|
completionMatcher: PartialFunction[Any, CompletionStrategy],
|
||||||
|
failureMatcher: PartialFunction[Any, Throwable])
|
||||||
|
extends GraphStageWithMaterializedValue[SourceShape[T], ActorRef] {
|
||||||
|
import ActorRefBackpressureSource._
|
||||||
|
|
||||||
|
val out: Outlet[T] = Outlet[T]("actorRefSource.out")
|
||||||
|
|
||||||
|
override val shape: SourceShape[T] = SourceShape.of(out)
|
||||||
|
override def initialAttributes: Attributes = DefaultAttributes.actorRefWithAckSource
|
||||||
|
|
||||||
|
def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ActorRef) =
|
||||||
|
throw new IllegalStateException("Not supported")
|
||||||
|
|
||||||
|
private[akka] override def createLogicAndMaterializedValue(
|
||||||
|
inheritedAttributes: Attributes,
|
||||||
|
eagerMaterializer: Materializer): (GraphStageLogic, ActorRef) = {
|
||||||
|
val stage: GraphStageLogic with StageLogging with ActorRefStage = new GraphStageLogic(shape) with StageLogging
|
||||||
|
with ActorRefStage {
|
||||||
|
override protected def logSource: Class[_] = classOf[ActorRefSource[_]]
|
||||||
|
|
||||||
|
private var isCompleting: Boolean = false
|
||||||
|
private var element: OptionVal[(ActorRef, T)] = OptionVal.none
|
||||||
|
|
||||||
|
override protected def stageActorName: String =
|
||||||
|
inheritedAttributes.get[Attributes.Name].map(_.n).getOrElse(super.stageActorName)
|
||||||
|
|
||||||
|
val ref: ActorRef = getEagerStageActor(eagerMaterializer, poisonPillCompatibility = false) {
|
||||||
|
case (_, m) if failureMatcher.isDefinedAt(m) ⇒
|
||||||
|
failStage(failureMatcher(m))
|
||||||
|
case (_, m) if completionMatcher.isDefinedAt(m) ⇒
|
||||||
|
completionMatcher(m) match {
|
||||||
|
case CompletionStrategy.Draining =>
|
||||||
|
isCompleting = true
|
||||||
|
tryPush()
|
||||||
|
case CompletionStrategy.Immediately =>
|
||||||
|
completeStage()
|
||||||
|
}
|
||||||
|
case e: (ActorRef, T) @unchecked ⇒
|
||||||
|
if (element.isDefined) {
|
||||||
|
failStage(new IllegalStateException("Received new element before ack was signaled back"))
|
||||||
|
} else {
|
||||||
|
ackTo match {
|
||||||
|
case Some(at) => element = OptionVal.Some((at, e._2))
|
||||||
|
case None => element = OptionVal.Some(e)
|
||||||
|
}
|
||||||
|
tryPush()
|
||||||
|
}
|
||||||
|
}.ref
|
||||||
|
|
||||||
|
private def tryPush(): Unit = {
|
||||||
|
if (isAvailable(out) && element.isDefined) {
|
||||||
|
val (s, e) = element.get
|
||||||
|
push(out, e)
|
||||||
|
element = OptionVal.none
|
||||||
|
s ! ackMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isCompleting && element.isEmpty) {
|
||||||
|
completeStage()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
setHandler(out, new OutHandler {
|
||||||
|
override def onPull(): Unit = {
|
||||||
|
tryPush()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
(stage, stage.ref)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -109,6 +109,7 @@ import akka.stream._
|
||||||
val subscriberSource = name("subscriberSource")
|
val subscriberSource = name("subscriberSource")
|
||||||
val actorPublisherSource = name("actorPublisherSource")
|
val actorPublisherSource = name("actorPublisherSource")
|
||||||
val actorRefSource = name("actorRefSource")
|
val actorRefSource = name("actorRefSource")
|
||||||
|
val actorRefWithAckSource = name("actorRefWithAckSource")
|
||||||
val queueSource = name("queueSource")
|
val queueSource = name("queueSource")
|
||||||
val inputStreamSource = name("inputStreamSource") and IODispatcher
|
val inputStreamSource = name("inputStreamSource") and IODispatcher
|
||||||
val outputStreamSource = name("outputStreamSource") and IODispatcher
|
val outputStreamSource = name("outputStreamSource") and IODispatcher
|
||||||
|
|
|
||||||
|
|
@ -348,6 +348,28 @@ object Source {
|
||||||
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] =
|
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] =
|
||||||
new Source(scaladsl.Source.actorRef(bufferSize, overflowStrategy))
|
new Source(scaladsl.Source.actorRef(bufferSize, overflowStrategy))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
|
||||||
|
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
|
||||||
|
* and a new message will only be accepted after the previous messages has been consumed and acknowledged back.
|
||||||
|
* The stream will complete with failure if a message is sent before the acknowledgement has been replied back.
|
||||||
|
*
|
||||||
|
* The stream can be completed successfully by sending the actor reference a [[akka.actor.Status.Success]].
|
||||||
|
* If the content is [[akka.stream.CompletionStrategy.immediately]] the completion will be signaled immidiately,
|
||||||
|
* otherwise if the content is [[akka.stream.CompletionStrategy.draining]] (or anything else)
|
||||||
|
* already buffered element will be signaled before siganling completion.
|
||||||
|
*
|
||||||
|
* The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the
|
||||||
|
* actor reference. In case the Actor is still draining its internal buffer (after having received
|
||||||
|
* a [[akka.actor.Status.Success]]) before signaling completion and it receives a [[akka.actor.Status.Failure]],
|
||||||
|
* the failure will be signaled downstream immediately (instead of the completion signal).
|
||||||
|
*
|
||||||
|
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
|
||||||
|
* i.e. you can watch it to get notified when that happens.
|
||||||
|
*/
|
||||||
|
def actorRefWithAck[T](ackMessage: Any): Source[T, ActorRef] =
|
||||||
|
new Source(scaladsl.Source.actorRefWithAck(ackMessage))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A graph with the shape of a source logically is a source, this method makes
|
* A graph with the shape of a source logically is a source, this method makes
|
||||||
* it so also in type.
|
* it so also in type.
|
||||||
|
|
|
||||||
|
|
@ -569,6 +569,43 @@ object Source {
|
||||||
case akka.actor.Status.Success => CompletionStrategy.Draining
|
case akka.actor.Status.Success => CompletionStrategy.Draining
|
||||||
}, { case akka.actor.Status.Failure(cause) => cause }, bufferSize, overflowStrategy)
|
}, { case akka.actor.Status.Failure(cause) => cause }, bufferSize, overflowStrategy)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi private[akka] def actorRefWithAck[T](
|
||||||
|
ackTo: Option[ActorRef],
|
||||||
|
ackMessage: Any,
|
||||||
|
completionMatcher: PartialFunction[Any, CompletionStrategy],
|
||||||
|
failureMatcher: PartialFunction[Any, Throwable]): Source[T, ActorRef] = {
|
||||||
|
Source.fromGraph(new ActorRefBackpressureSource(ackTo, ackMessage, completionMatcher, failureMatcher))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
|
||||||
|
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
|
||||||
|
* and a new message will only be accepted after the previous messages has been consumed and acknowledged back.
|
||||||
|
* The stream will complete with failure if a message is sent before the acknowledgement has been replied back.
|
||||||
|
*
|
||||||
|
* The stream can be completed successfully by sending the actor reference a [[akka.actor.Status.Success]].
|
||||||
|
* If the content is [[akka.stream.CompletionStrategy.immediately]] the completion will be signaled immidiately,
|
||||||
|
* otherwise if the content is [[akka.stream.CompletionStrategy.draining]] (or anything else)
|
||||||
|
* already buffered element will be signaled before siganling completion.
|
||||||
|
*
|
||||||
|
* The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the
|
||||||
|
* actor reference. In case the Actor is still draining its internal buffer (after having received
|
||||||
|
* a [[akka.actor.Status.Success]]) before signaling completion and it receives a [[akka.actor.Status.Failure]],
|
||||||
|
* the failure will be signaled downstream immediately (instead of the completion signal).
|
||||||
|
*
|
||||||
|
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
|
||||||
|
* i.e. you can watch it to get notified when that happens.
|
||||||
|
*/
|
||||||
|
def actorRefWithAck[T](ackMessage: Any): Source[T, ActorRef] =
|
||||||
|
actorRefWithAck(None, ackMessage, {
|
||||||
|
case akka.actor.Status.Success(s: CompletionStrategy) => s
|
||||||
|
case akka.actor.Status.Success(_) => CompletionStrategy.Draining
|
||||||
|
case akka.actor.Status.Success => CompletionStrategy.Draining
|
||||||
|
}, { case akka.actor.Status.Failure(cause) => cause })
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`.
|
* Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue