Merge pull request #24458 from akka/wip-24289-typed-streams-patriknw
Typed stream adapters, #23604
This commit is contained in:
commit
36372bb2a5
24 changed files with 1023 additions and 39 deletions
|
|
@ -11,6 +11,7 @@
|
|||
* [fault-tolerance](fault-tolerance.md)
|
||||
* [actor-discovery](actor-discovery.md)
|
||||
* [stash](stash.md)
|
||||
* [stream](stream.md)
|
||||
* [cluster](cluster.md)
|
||||
* [cluster-singleton](cluster-singleton.md)
|
||||
* [cluster-sharding](cluster-sharding.md)
|
||||
|
|
|
|||
53
akka-docs/src/main/paradox/typed/stream.md
Normal file
53
akka-docs/src/main/paradox/typed/stream.md
Normal file
|
|
@ -0,0 +1,53 @@
|
|||
# Streams
|
||||
|
||||
@@@ warning
|
||||
|
||||
This module is currently marked as @ref:[may change](../common/may-change.md) in the sense
|
||||
of being the subject of active research. This means that API or semantics can
|
||||
change without warning or deprecation period and it is not recommended to use
|
||||
this module in production just yet—you have been warned.
|
||||
|
||||
@@@
|
||||
|
||||
@ref:[Akka Streams](../stream/index.md) make it easy to model type-safe message processing pipelines. With typed actors it is possible to connect streams to actors without loosing the type information.
|
||||
|
||||
To use the typed stream source and sink factories add the following dependency:
|
||||
|
||||
@@dependency [sbt,Maven,Gradle] {
|
||||
group=com.typesafe.akka
|
||||
artifact=akka-stream-typed_2.12
|
||||
version=$akka.version$
|
||||
}
|
||||
|
||||
This dependency contains typed alternatives to the @ref:[already existing `ActorRef` sources and sinks](../stream/stream-integrations.md) together with a factory methods for @scala[@scaladoc[`ActorMaterializer`](akka.stream.typed.ActorMaterializer)]@java[@javadoc[`ActorMaterializer`](akka.stream.typed.ActorMaterializer)] which take a typed `ActorSystem`.
|
||||
|
||||
The materializer created from these factory methods and sources together with sinks contained in this module can be mixed and matched with the original Akka Streams building blocks from the original module.
|
||||
|
||||
## Actor Source
|
||||
|
||||
A stream that is driven by messages sent to a particular actor can be started with @scala[@scaladoc[`ActorSource.actorRef`](akka.stream.typed.scaladsl.ActorSource#actorRef)]@java[@javadoc[`ActorSource.actorRef`](akka.stream.typed.javadsl.ActorSource#actorRef)]. This source materializes to a typed `ActorRef` which only accepts messages that are of the same type as the stream.
|
||||
|
||||
Scala
|
||||
: @@snip [ActorSourceSinkExample.scala]($akka$/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-source-ref }
|
||||
|
||||
Java
|
||||
: @@snip [ActorSourceExample.java]($akka$/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java) { #actor-source-ref }
|
||||
|
||||
|
||||
## Actor Sink
|
||||
|
||||
There are two sinks availabe that accept typed `ActorRef`s. To send all of the messages from a stream to an actor without considering backpressure, use @scala[@scaladoc[`ActorSink.actorRef`](akka.stream.typed.scaladsl.ActorSink#actorRef)]@java[@javadoc[`ActorSink.actorRef`](akka.stream.typed.javadsl.ActorSink#actorRef)].
|
||||
|
||||
Scala
|
||||
: @@snip [ActorSourceSinkExample.scala]($akka$/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-sink-ref }
|
||||
|
||||
Java
|
||||
: @@snip [ActorSinkExample.java]($akka$/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkExample.java) { #actor-sink-ref }
|
||||
|
||||
For an actor to be able to react to backpressure, a protocol needs to be introduced between the actor and the stream. Use @scala[@scaladoc[`ActorSink.actorRefWithAck`](akka.stream.typed.scaladsl.ActorSink#actorRefWithAck)]@java[@javadoc[`ActorSink.actorRefWithAck`](akka.stream.typed.javadsl.ActorSink#actorRefWithAck)] to be able to signal demand when the actor is ready to receive more elements.
|
||||
|
||||
Scala
|
||||
: @@snip [ActorSourceSinkExample.scala]($akka$/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-sink-ref-with-ack }
|
||||
|
||||
Java
|
||||
: @@snip [ActorSinkWithAckExample.java]($akka$/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java) { #actor-sink-ref-with-ack }
|
||||
|
|
@ -117,6 +117,9 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
|
|||
Ignore(_ == akka.stream.scaladsl.Flow.getClass, _ == "apply", _ == 24, _ ⇒ true),
|
||||
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "apply", _ == 24, _ ⇒ true),
|
||||
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "collection", _ ⇒ true, _ ⇒ true),
|
||||
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRef", _ ⇒ true, _ ⇒ true), // Internal in scaladsl
|
||||
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRefWithAck", _ ⇒ true, _ ⇒ true), // Internal in scaladsl
|
||||
Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "actorRef", _ ⇒ true, _ ⇒ true), // Internal in scaladsl
|
||||
Ignore(_ == akka.stream.scaladsl.BidiFlow.getClass, _ == "apply", _ == 24, _ ⇒ true),
|
||||
Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "runnable", _ == 24, _ ⇒ true),
|
||||
Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "create", _ == 24, _ ⇒ true),
|
||||
|
|
|
|||
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
|
||||
*/
|
||||
package akka.stream.typed
|
||||
|
||||
import akka.actor.typed.ActorSystem
|
||||
import akka.stream.ActorMaterializerSettings
|
||||
|
||||
object ActorMaterializer {
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
|
||||
/**
|
||||
* Scala API: Creates an ActorMaterializer which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.typed.ActorSystem]]
|
||||
* will be used to create one actor that in turn creates actors for the transformation steps.
|
||||
*
|
||||
* The materializer's [[akka.stream.ActorMaterializerSettings]] will be obtained from the
|
||||
* configuration of the `context`'s underlying [[akka.actor.typed.ActorSystem]].
|
||||
*
|
||||
* The `namePrefix` is used as the first part of the names of the actors running
|
||||
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
|
||||
* `namePrefix-flowNumber-flowStepNumber-stepName`.
|
||||
*/
|
||||
def apply[T](materializerSettings: Option[ActorMaterializerSettings] = None, namePrefix: Option[String] = None)(implicit actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer =
|
||||
akka.stream.ActorMaterializer(materializerSettings, namePrefix)(actorSystem.toUntyped)
|
||||
|
||||
/**
|
||||
* Java API: Creates an ActorMaterializer which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.typed.ActorSystem]]
|
||||
* will be used to create these actors, therefore it is *forbidden* to pass this object
|
||||
* to another actor if the factory is an ActorContext.
|
||||
*
|
||||
* Defaults the actor name prefix used to name actors running the processing steps to `"flow"`.
|
||||
* The actor names are built up of `namePrefix-flowNumber-flowStepNumber-stepName`.
|
||||
*/
|
||||
def create[T](actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer =
|
||||
apply()(actorSystem)
|
||||
|
||||
/**
|
||||
* Java API: Creates an ActorMaterializer which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.typed.ActorSystem]]
|
||||
* will be used to create one actor that in turn creates actors for the transformation steps.
|
||||
*/
|
||||
def create[T](settings: ActorMaterializerSettings, actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer =
|
||||
apply(Option(settings), None)(actorSystem)
|
||||
|
||||
/**
|
||||
* Java API: Creates an ActorMaterializer which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.typed.ActorSystem]]
|
||||
* will be used to create these actors, therefore it is *forbidden* to pass this object
|
||||
* to another actor if the factory is an ActorContext.
|
||||
*
|
||||
* The `namePrefix` is used as the first part of the names of the actors running
|
||||
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
|
||||
* `namePrefix-flowNumber-flowStepNumber-stepName`.
|
||||
*/
|
||||
def create[T](settings: ActorMaterializerSettings, namePrefix: String, actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer =
|
||||
apply(Option(settings), Option(namePrefix))(actorSystem)
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
|
||||
*/
|
||||
package akka.stream.typed.javadsl
|
||||
|
||||
import akka.actor.typed._
|
||||
import akka.NotUsed
|
||||
import akka.stream.javadsl._
|
||||
import akka.stream.typed
|
||||
|
||||
/**
|
||||
* Collection of Sinks aimed at integrating with typed Actors.
|
||||
*/
|
||||
object ActorSink {
|
||||
/**
|
||||
* Sends the elements of the stream to the given `ActorRef`.
|
||||
* If the target actor terminates the stream will be canceled.
|
||||
* When the stream is completed successfully the given `onCompleteMessage`
|
||||
* will be sent to the destination actor.
|
||||
* When the stream is completed with failure a the throwable that was signaled
|
||||
* to the stream is adapted to the Actors protocol using `onFailureMessage` and
|
||||
* then then sent to the destination actor.
|
||||
*
|
||||
* It will request at most `maxInputBufferSize` number of elements from
|
||||
* upstream, but there is no back-pressure signal from the destination actor,
|
||||
* i.e. if the actor is not consuming the messages fast enough the mailbox
|
||||
* of the actor will grow. For potentially slow consumer actors it is recommended
|
||||
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
|
||||
* limiting stage in front of this `Sink`.
|
||||
*/
|
||||
def actorRef[T](ref: ActorRef[T], onCompleteMessage: T, onFailureMessage: akka.japi.function.Function[Throwable, T]): Sink[T, NotUsed] =
|
||||
typed.scaladsl.ActorSink.actorRef(ref, onCompleteMessage, onFailureMessage.apply).asJava
|
||||
|
||||
/**
|
||||
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
|
||||
* First element is always `onInitMessage`, then stream is waiting for acknowledgement message
|
||||
* `ackMessage` from the given actor which means that it is ready to process
|
||||
* elements. It also requires `ackMessage` message after each stream element
|
||||
* to make backpressure work.
|
||||
*
|
||||
* If the target actor terminates the stream will be canceled.
|
||||
* When the stream is completed successfully the given `onCompleteMessage`
|
||||
* will be sent to the destination actor.
|
||||
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
|
||||
* function will be sent to the destination actor.
|
||||
*/
|
||||
def actorRefWithAck[T, M, A](
|
||||
ref: ActorRef[M],
|
||||
messageAdapter: akka.japi.function.Function2[ActorRef[A], T, M],
|
||||
onInitMessage: akka.japi.function.Function[ActorRef[A], M],
|
||||
ackMessage: A,
|
||||
onCompleteMessage: M,
|
||||
onFailureMessage: akka.japi.function.Function[Throwable, M]): Sink[T, NotUsed] =
|
||||
typed.scaladsl.ActorSink.actorRefWithAck(
|
||||
ref, messageAdapter.apply, onInitMessage.apply, ackMessage, onCompleteMessage, onFailureMessage.apply).asJava
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,58 @@
|
|||
/**
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
|
||||
*/
|
||||
|
||||
package akka.stream.typed.javadsl
|
||||
|
||||
import java.util.function.Predicate
|
||||
|
||||
import akka.actor.typed._
|
||||
import akka.stream.OverflowStrategy
|
||||
import akka.stream.javadsl._
|
||||
|
||||
/**
|
||||
* Collection of Sources aimed at integrating with typed Actors.
|
||||
*/
|
||||
object ActorSource {
|
||||
|
||||
/**
|
||||
* Creates a `Source` that is materialized as an [[akka.actor.typed.ActorRef]].
|
||||
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
|
||||
* otherwise they will be buffered until request for demand is received.
|
||||
*
|
||||
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
|
||||
* there is no space available in the buffer.
|
||||
*
|
||||
* The strategy [[akka.stream.OverflowStrategy.backpressure]] is not supported, and an
|
||||
* IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.
|
||||
*
|
||||
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand
|
||||
* from downstream. When `bufferSize` is 0 the `overflowStrategy` does not matter. An async boundary is added after
|
||||
* this Source; as such, it is never safe to assume the downstream will always generate demand.
|
||||
*
|
||||
* The stream can be completed successfully by sending the actor reference a [[akka.actor.Status.Success]]
|
||||
* (whose content will be ignored) in which case already buffered elements will be signaled before signaling
|
||||
* completion, or by sending [[akka.actor.PoisonPill]] in which case completion will be signaled immediately.
|
||||
*
|
||||
* The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the
|
||||
* actor reference. In case the Actor is still draining its internal buffer (after having received
|
||||
* a [[akka.actor.Status.Success]]) before signaling completion and it receives a [[akka.actor.Status.Failure]],
|
||||
* the failure will be signaled downstream immediately (instead of the completion signal).
|
||||
*
|
||||
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
|
||||
* i.e. you can watch it to get notified when that happens.
|
||||
*
|
||||
* See also [[akka.stream.javadsl.Source.queue]].
|
||||
*
|
||||
* @param bufferSize The size of the buffer in element count
|
||||
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
|
||||
*/
|
||||
def actorRef[T](
|
||||
completionMatcher: Predicate[T],
|
||||
failureMatcher: PartialFunction[T, Throwable],
|
||||
bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = {
|
||||
akka.stream.typed.scaladsl.ActorSource.actorRef(
|
||||
{ case m if completionMatcher.test(m) ⇒ }: PartialFunction[T, Unit],
|
||||
failureMatcher, bufferSize, overflowStrategy).asJava
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,61 @@
|
|||
/**
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
|
||||
*/
|
||||
package akka.stream.typed.scaladsl
|
||||
|
||||
import akka.actor.typed._
|
||||
import akka.stream.scaladsl._
|
||||
import akka.NotUsed
|
||||
|
||||
/**
|
||||
* Collection of Sinks aimed at integrating with typed Actors.
|
||||
*/
|
||||
object ActorSink {
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
|
||||
/**
|
||||
* Sends the elements of the stream to the given `ActorRef`.
|
||||
* If the target actor terminates the stream will be canceled.
|
||||
* When the stream is completed successfully the given `onCompleteMessage`
|
||||
* will be sent to the destination actor.
|
||||
* When the stream is completed with failure a the throwable that was signaled
|
||||
* to the stream is adapted to the Actors protocol using `onFailureMessage` and
|
||||
* then then sent to the destination actor.
|
||||
*
|
||||
* It will request at most `maxInputBufferSize` number of elements from
|
||||
* upstream, but there is no back-pressure signal from the destination actor,
|
||||
* i.e. if the actor is not consuming the messages fast enough the mailbox
|
||||
* of the actor will grow. For potentially slow consumer actors it is recommended
|
||||
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
|
||||
* limiting stage in front of this `Sink`.
|
||||
*/
|
||||
def actorRef[T](ref: ActorRef[T], onCompleteMessage: T, onFailureMessage: Throwable ⇒ T): Sink[T, NotUsed] =
|
||||
Sink.actorRef(ref.toUntyped, onCompleteMessage, onFailureMessage)
|
||||
|
||||
/**
|
||||
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
|
||||
* First element is always `onInitMessage`, then stream is waiting for acknowledgement message
|
||||
* `ackMessage` from the given actor which means that it is ready to process
|
||||
* elements. It also requires `ackMessage` message after each stream element
|
||||
* to make backpressure work.
|
||||
*
|
||||
* If the target actor terminates the stream will be canceled.
|
||||
* When the stream is completed successfully the given `onCompleteMessage`
|
||||
* will be sent to the destination actor.
|
||||
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
|
||||
* function will be sent to the destination actor.
|
||||
*/
|
||||
def actorRefWithAck[T, M, A](
|
||||
ref: ActorRef[M],
|
||||
messageAdapter: (ActorRef[A], T) ⇒ M,
|
||||
onInitMessage: ActorRef[A] ⇒ M,
|
||||
ackMessage: A,
|
||||
onCompleteMessage: M,
|
||||
onFailureMessage: Throwable ⇒ M): Sink[T, NotUsed] =
|
||||
Sink.actorRefWithAck(
|
||||
ref.toUntyped,
|
||||
messageAdapter.curried.compose(actorRefAdapter),
|
||||
onInitMessage.compose(actorRefAdapter),
|
||||
ackMessage, onCompleteMessage, onFailureMessage)
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,58 @@
|
|||
/**
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
|
||||
*/
|
||||
|
||||
package akka.stream.typed.scaladsl
|
||||
|
||||
import akka.actor.typed._
|
||||
import akka.stream.OverflowStrategy
|
||||
import akka.stream.scaladsl._
|
||||
|
||||
/**
|
||||
* Collection of Sources aimed at integrating with typed Actors.
|
||||
*/
|
||||
object ActorSource {
|
||||
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
|
||||
/**
|
||||
* Creates a `Source` that is materialized as an [[akka.actor.typed.ActorRef]].
|
||||
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
|
||||
* otherwise they will be buffered until request for demand is received.
|
||||
*
|
||||
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
|
||||
* there is no space available in the buffer.
|
||||
*
|
||||
* The strategy [[akka.stream.OverflowStrategy.backpressure]] is not supported, and an
|
||||
* IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.
|
||||
*
|
||||
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand
|
||||
* from downstream. When `bufferSize` is 0 the `overflowStrategy` does not matter. An async boundary is added after
|
||||
* this Source; as such, it is never safe to assume the downstream will always generate demand.
|
||||
*
|
||||
* The stream can be completed successfully by sending the actor reference a message that is matched by
|
||||
* `completionMatcher` in which case already buffered elements will be signaled before signaling
|
||||
* completion.
|
||||
*
|
||||
* The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
|
||||
* [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
|
||||
* a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
|
||||
* the failure will be signaled downstream immediately (instead of the completion signal).
|
||||
*
|
||||
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
|
||||
* i.e. you can watch it to get notified when that happens.
|
||||
*
|
||||
* See also [[akka.stream.scaladsl.Source.queue]].
|
||||
*
|
||||
* @param bufferSize The size of the buffer in element count
|
||||
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
|
||||
*/
|
||||
def actorRef[T](
|
||||
completionMatcher: PartialFunction[T, Unit],
|
||||
failureMatcher: PartialFunction[T, Throwable],
|
||||
bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] =
|
||||
Source.actorRef[T](
|
||||
completionMatcher.asInstanceOf[PartialFunction[Any, Unit]],
|
||||
failureMatcher.asInstanceOf[PartialFunction[Any, Throwable]],
|
||||
bufferSize, overflowStrategy).mapMaterializedValue(actorRefAdapter)
|
||||
}
|
||||
|
|
@ -0,0 +1,87 @@
|
|||
/**
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
|
||||
*/
|
||||
|
||||
package akka.stream.typed.javadsl;
|
||||
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.actor.typed.ActorSystem;
|
||||
import akka.japi.JavaPartialFunction;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.OverflowStrategy;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
|
||||
public class ActorSourceSinkCompileTest {
|
||||
|
||||
interface Protocol {}
|
||||
class Init implements Protocol {}
|
||||
class Msg implements Protocol {}
|
||||
class Complete implements Protocol {}
|
||||
class Failure implements Protocol {
|
||||
public Exception ex;
|
||||
}
|
||||
|
||||
{
|
||||
final ActorSystem<String> system = null;
|
||||
final ActorMaterializer mat = akka.stream.typed.ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
{
|
||||
final ActorRef<String> ref = null;
|
||||
|
||||
Source.<String>queue(10, OverflowStrategy.dropBuffer())
|
||||
.map(s -> s + "!")
|
||||
.to(ActorSink.actorRef(ref, "DONE", ex -> "FAILED: " + ex.getMessage()));
|
||||
}
|
||||
|
||||
{
|
||||
final ActorRef<Protocol> ref = null;
|
||||
|
||||
Source.<String>queue(10, OverflowStrategy.dropBuffer())
|
||||
.to(ActorSink.actorRefWithAck(
|
||||
ref,
|
||||
(sender, msg) -> new Init(),
|
||||
(sender) -> new Msg(),
|
||||
"ACK",
|
||||
new Complete(),
|
||||
(f) -> new Failure()));
|
||||
}
|
||||
|
||||
{
|
||||
ActorSource
|
||||
.actorRef(
|
||||
(m) -> m == "complete",
|
||||
new JavaPartialFunction<String, Throwable>() {
|
||||
@Override
|
||||
public Throwable apply(String x, boolean isCheck) throws Exception {
|
||||
throw noMatch();
|
||||
}
|
||||
},
|
||||
10,
|
||||
OverflowStrategy.dropBuffer())
|
||||
.to(Sink.seq());
|
||||
}
|
||||
|
||||
{
|
||||
final JavaPartialFunction<Protocol, Throwable> failureMatcher = new JavaPartialFunction<Protocol, Throwable>() {
|
||||
@Override
|
||||
public Throwable apply(Protocol p, boolean isCheck) throws Exception {
|
||||
if (p instanceof Failure) {
|
||||
return ((Failure)p).ex;
|
||||
}
|
||||
else {
|
||||
throw noMatch();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
ActorSource
|
||||
.actorRef(
|
||||
(m) -> false,
|
||||
failureMatcher, 10,
|
||||
OverflowStrategy.dropBuffer())
|
||||
.to(Sink.seq());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,53 @@
|
|||
/**
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
|
||||
*/
|
||||
|
||||
package docs.akka.stream.typed;
|
||||
|
||||
// #actor-sink-ref
|
||||
import akka.NotUsed;
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
import akka.stream.typed.javadsl.ActorSink;
|
||||
// #actor-sink-ref
|
||||
|
||||
public class ActorSinkExample {
|
||||
|
||||
// #actor-sink-ref
|
||||
|
||||
interface Protocol {}
|
||||
class Message implements Protocol {
|
||||
private final String msg;
|
||||
public Message(String msg) {
|
||||
this.msg = msg;
|
||||
}
|
||||
}
|
||||
class Complete implements Protocol {}
|
||||
class Fail implements Protocol {
|
||||
private final Throwable ex;
|
||||
public Fail(Throwable ex) {
|
||||
this.ex = ex;
|
||||
}
|
||||
}
|
||||
// #actor-sink-ref
|
||||
|
||||
final ActorMaterializer mat = null;
|
||||
|
||||
{
|
||||
// #actor-sink-ref
|
||||
|
||||
final ActorRef<Protocol> actor = null;
|
||||
|
||||
final Sink<Protocol, NotUsed> sink = ActorSink.actorRef(
|
||||
actor,
|
||||
new Complete(),
|
||||
Fail::new
|
||||
);
|
||||
|
||||
Source.<Protocol>single(new Message("msg1")).runWith(sink, mat);
|
||||
// #actor-sink-ref
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,66 @@
|
|||
/**
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
|
||||
*/
|
||||
|
||||
package docs.akka.stream.typed;
|
||||
|
||||
// #actor-sink-ref-with-ack
|
||||
import akka.NotUsed;
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
import akka.stream.typed.javadsl.ActorSink;
|
||||
// #actor-sink-ref-with-ack
|
||||
|
||||
public class ActorSinkWithAckExample {
|
||||
|
||||
// #actor-sink-ref-with-ack
|
||||
|
||||
class Ack {}
|
||||
|
||||
interface Protocol {}
|
||||
class Init implements Protocol {
|
||||
private final ActorRef<Ack> ack;
|
||||
public Init(ActorRef<Ack> ack) {
|
||||
this.ack = ack;
|
||||
}
|
||||
}
|
||||
class Message implements Protocol {
|
||||
private final ActorRef<Ack> ackTo;
|
||||
private final String msg;
|
||||
public Message(ActorRef<Ack> ackTo, String msg) {
|
||||
this.ackTo = ackTo;
|
||||
this.msg = msg;
|
||||
}
|
||||
}
|
||||
class Complete implements Protocol {}
|
||||
class Fail implements Protocol {
|
||||
private final Throwable ex;
|
||||
public Fail(Throwable ex) {
|
||||
this.ex = ex;
|
||||
}
|
||||
}
|
||||
// #actor-sink-ref-with-ack
|
||||
|
||||
final ActorMaterializer mat = null;
|
||||
|
||||
{
|
||||
// #actor-sink-ref-with-ack
|
||||
|
||||
final ActorRef<Protocol> actor = null;
|
||||
|
||||
final Sink<String, NotUsed> sink = ActorSink.actorRefWithAck(
|
||||
actor,
|
||||
Message::new,
|
||||
Init::new,
|
||||
new Ack(),
|
||||
new Complete(),
|
||||
Fail::new
|
||||
);
|
||||
|
||||
Source.single("msg1").runWith(sink, mat);
|
||||
// #actor-sink-ref-with-ack
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
|
||||
*/
|
||||
|
||||
package docs.akka.stream.typed;
|
||||
|
||||
// #actor-source-ref
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.japi.JavaPartialFunction;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.OverflowStrategy;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
import akka.stream.typed.javadsl.ActorSource;
|
||||
// #actor-source-ref
|
||||
|
||||
public class ActorSourceExample {
|
||||
|
||||
// #actor-source-ref
|
||||
|
||||
interface Protocol {}
|
||||
class Message implements Protocol {
|
||||
private final String msg;
|
||||
public Message(String msg) {
|
||||
this.msg = msg;
|
||||
}
|
||||
}
|
||||
class Complete implements Protocol {}
|
||||
class Fail implements Protocol {
|
||||
private final Exception ex;
|
||||
public Fail(Exception ex) {
|
||||
this.ex = ex;
|
||||
}
|
||||
}
|
||||
// #actor-source-ref
|
||||
|
||||
final ActorMaterializer mat = null;
|
||||
|
||||
{
|
||||
// #actor-source-ref
|
||||
|
||||
final JavaPartialFunction<Protocol, Throwable> failureMatcher =
|
||||
new JavaPartialFunction<Protocol, Throwable>() {
|
||||
public Throwable apply(Protocol p, boolean isCheck) {
|
||||
if (p instanceof Fail) {
|
||||
return ((Fail)p).ex;
|
||||
} else {
|
||||
throw noMatch();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
final Source<Protocol, ActorRef<Protocol>> source = ActorSource.actorRef(
|
||||
(m) -> m instanceof Complete,
|
||||
failureMatcher,
|
||||
8,
|
||||
OverflowStrategy.fail()
|
||||
);
|
||||
|
||||
final ActorRef<Protocol> ref = source.collect(new JavaPartialFunction<Protocol, String>() {
|
||||
public String apply(Protocol p, boolean isCheck) {
|
||||
if (p instanceof Message) {
|
||||
return ((Message)p).msg;
|
||||
} else {
|
||||
throw noMatch();
|
||||
}
|
||||
}
|
||||
}).to(Sink.foreach(System.out::println)).run(mat);
|
||||
|
||||
ref.tell(new Message("msg1"));
|
||||
// ref.tell("msg2"); Does not compile
|
||||
// #actor-source-ref
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,123 @@
|
|||
/**
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
|
||||
*/
|
||||
package akka.stream.typed.scaladsl
|
||||
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.stream.OverflowStrategy
|
||||
import akka.actor.typed.{ ActorRef, ActorSystem }
|
||||
import akka.testkit.TestKit
|
||||
import akka.testkit.typed.scaladsl._
|
||||
import akka.stream.scaladsl.{ Keep, Sink, Source }
|
||||
import akka.stream.typed.ActorMaterializer
|
||||
import akka.testkit.typed.TestKitSettings
|
||||
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
|
||||
import org.scalatest.concurrent.ScalaFutures
|
||||
|
||||
object ActorSourceSinkSpec {
|
||||
|
||||
sealed trait AckProto
|
||||
case class Init(sender: ActorRef[String]) extends AckProto
|
||||
case class Msg(sender: ActorRef[String], msg: String) extends AckProto
|
||||
case object Complete extends AckProto
|
||||
case object Failed extends AckProto
|
||||
}
|
||||
|
||||
class ActorSourceSinkSpec extends TestKit(akka.actor.ActorSystem("ActorSourceSinkSpec")) with WordSpecLike with BeforeAndAfterAll with Matchers with ScalaFutures {
|
||||
import ActorSourceSinkSpec._
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
|
||||
// FIXME use Typed Teskit
|
||||
// The materializer creates a top-level actor when materializing a stream.
|
||||
// Currently that is not supported, because a Typed Teskit uses a typed actor system
|
||||
// with a custom guardian. Because of custom guardian, an exception is being thrown
|
||||
// when trying to create a top level actor during materialization.
|
||||
implicit val sys = ActorSystem.wrap(system)
|
||||
implicit val testkitSettings = TestKitSettings(sys)
|
||||
implicit val mat = ActorMaterializer()
|
||||
|
||||
override protected def afterAll(): Unit =
|
||||
sys.terminate()
|
||||
|
||||
"ActorSink" should {
|
||||
|
||||
"accept messages" in {
|
||||
val p = TestProbe[String]()
|
||||
|
||||
val in =
|
||||
Source.queue[String](10, OverflowStrategy.dropBuffer)
|
||||
.map(_ + "!")
|
||||
.to(ActorSink.actorRef(p.ref, "DONE", ex ⇒ "FAILED: " + ex.getMessage))
|
||||
.run()
|
||||
|
||||
val msg = "Zug zug"
|
||||
|
||||
in.offer(msg)
|
||||
p.expectMsg(msg + "!")
|
||||
}
|
||||
|
||||
"obey protocol" in {
|
||||
val p = TestProbe[AckProto]()
|
||||
|
||||
val autoPilot = Behaviors.immutable[AckProto] {
|
||||
(ctx, msg) ⇒
|
||||
msg match {
|
||||
case m @ Init(sender) ⇒
|
||||
p.ref ! m
|
||||
sender ! "ACK"
|
||||
Behaviors.same
|
||||
case m @ Msg(sender, _) ⇒
|
||||
p.ref ! m
|
||||
sender ! "ACK"
|
||||
Behaviors.same
|
||||
case m ⇒
|
||||
p.ref ! m
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
|
||||
val pilotRef: ActorRef[AckProto] = system.actorOf(PropsAdapter(autoPilot))
|
||||
|
||||
val in =
|
||||
Source.queue[String](10, OverflowStrategy.dropBuffer)
|
||||
.to(ActorSink.actorRefWithAck(pilotRef, Msg.apply, Init.apply, "ACK", Complete, _ ⇒ Failed))
|
||||
.run()
|
||||
|
||||
p.expectMsgType[Init]
|
||||
|
||||
in.offer("Dabu!")
|
||||
p.expectMsgType[Msg].msg shouldBe "Dabu!"
|
||||
|
||||
in.offer("Lok'tar!")
|
||||
p.expectMsgType[Msg].msg shouldBe "Lok'tar!"
|
||||
|
||||
in.offer("Swobu!")
|
||||
p.expectMsgType[Msg].msg shouldBe "Swobu!"
|
||||
}
|
||||
}
|
||||
|
||||
"ActorSource" should {
|
||||
"send messages and complete" in {
|
||||
val (in, out) = ActorSource.actorRef[String]({ case "complete" ⇒ }, PartialFunction.empty, 10, OverflowStrategy.dropBuffer)
|
||||
.toMat(Sink.seq)(Keep.both)
|
||||
.run()
|
||||
|
||||
in ! "one"
|
||||
in ! "two"
|
||||
in ! "complete"
|
||||
|
||||
out.futureValue should contain theSameElementsAs Seq("one", "two")
|
||||
}
|
||||
|
||||
"fail the stream" in {
|
||||
val (in, out) = ActorSource.actorRef[String](PartialFunction.empty, { case msg ⇒ new Error(msg) }, 10, OverflowStrategy.dropBuffer)
|
||||
.toMat(Sink.seq)(Keep.both)
|
||||
.run()
|
||||
|
||||
in ! "boom!"
|
||||
|
||||
out.failed.futureValue.getCause.getMessage shouldBe "boom!"
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,98 @@
|
|||
/**
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
|
||||
*/
|
||||
|
||||
package docs.akka.stream.typed
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.ActorMaterializer
|
||||
|
||||
object ActorSourceSinkExample {
|
||||
|
||||
implicit val mat: ActorMaterializer = ???
|
||||
|
||||
{
|
||||
// #actor-source-ref
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.stream.OverflowStrategy
|
||||
import akka.stream.scaladsl.{ Sink, Source }
|
||||
import akka.stream.typed.scaladsl.ActorSource
|
||||
|
||||
trait Protocol
|
||||
case class Message(msg: String) extends Protocol
|
||||
case object Complete extends Protocol
|
||||
case class Fail(ex: Exception) extends Protocol
|
||||
|
||||
val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](
|
||||
completionMatcher = {
|
||||
case Complete ⇒
|
||||
},
|
||||
failureMatcher = {
|
||||
case Fail(ex) ⇒ ex
|
||||
},
|
||||
bufferSize = 8,
|
||||
overflowStrategy = OverflowStrategy.fail
|
||||
)
|
||||
|
||||
val ref = source.collect {
|
||||
case Message(msg) ⇒ msg
|
||||
}.to(Sink.foreach(println)).run()
|
||||
|
||||
ref ! Message("msg1")
|
||||
// ref ! "msg2" Does not compile
|
||||
// #actor-source-ref
|
||||
}
|
||||
|
||||
{
|
||||
// #actor-sink-ref
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.stream.scaladsl.{ Sink, Source }
|
||||
import akka.stream.typed.scaladsl.ActorSink
|
||||
|
||||
trait Protocol
|
||||
case class Message(msg: String) extends Protocol
|
||||
case object Complete extends Protocol
|
||||
case class Fail(ex: Throwable) extends Protocol
|
||||
|
||||
val actor: ActorRef[Protocol] = ???
|
||||
|
||||
val sink: Sink[Protocol, NotUsed] = ActorSink.actorRef[Protocol](
|
||||
ref = actor,
|
||||
onCompleteMessage = Complete,
|
||||
onFailureMessage = Fail.apply
|
||||
)
|
||||
|
||||
Source.single(Message("msg1")).runWith(sink)
|
||||
// #actor-sink-ref
|
||||
}
|
||||
|
||||
{
|
||||
// #actor-sink-ref-with-ack
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.stream.scaladsl.{ Sink, Source }
|
||||
import akka.stream.typed.scaladsl.ActorSink
|
||||
|
||||
trait Ack
|
||||
object Ack extends Ack
|
||||
|
||||
trait Protocol
|
||||
case class Init(ackTo: ActorRef[Ack]) extends Protocol
|
||||
case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol
|
||||
case object Complete extends Protocol
|
||||
case class Fail(ex: Throwable) extends Protocol
|
||||
|
||||
val actor: ActorRef[Protocol] = ???
|
||||
|
||||
val sink: Sink[String, NotUsed] = ActorSink.actorRefWithAck(
|
||||
ref = actor,
|
||||
onCompleteMessage = Complete,
|
||||
onFailureMessage = Fail.apply,
|
||||
messageAdapter = Message.apply,
|
||||
onInitMessage = Init.apply,
|
||||
ackMessage = Ack
|
||||
)
|
||||
|
||||
Source.single("msg1").runWith(sink)
|
||||
// #actor-sink-ref-with-ack
|
||||
}
|
||||
}
|
||||
|
|
@ -3,3 +3,12 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Restart
|
|||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffFlow.this")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffLogic.this")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffSource.this")
|
||||
|
||||
# #23604 Typed stream adapters
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSinkActor.props")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSink.this")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSinkActor.this")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSourceActor.props")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSource.this")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefBackpressureSinkStage.this")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSourceActor.this")
|
||||
|
|
|
|||
|
|
@ -15,10 +15,13 @@ import akka.stream.stage._
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessage: Any,
|
||||
ackMessage: Any,
|
||||
onCompleteMessage: Any,
|
||||
onFailureMessage: (Throwable) ⇒ Any)
|
||||
@InternalApi private[akka] class ActorRefBackpressureSinkStage[In](
|
||||
ref: ActorRef,
|
||||
messageAdapter: ActorRef ⇒ In ⇒ Any,
|
||||
onInitMessage: ActorRef ⇒ Any,
|
||||
ackMessage: Any,
|
||||
onCompleteMessage: Any,
|
||||
onFailureMessage: (Throwable) ⇒ Any)
|
||||
extends GraphStage[SinkShape[In]] {
|
||||
val in: Inlet[In] = Inlet[In]("ActorRefBackpressureSink.in")
|
||||
override def initialAttributes = DefaultAttributes.actorRefWithAck
|
||||
|
|
@ -55,12 +58,12 @@ import akka.stream.stage._
|
|||
override def preStart() = {
|
||||
setKeepGoing(true)
|
||||
getStageActor(receive).watch(ref)
|
||||
ref ! onInitMessage
|
||||
ref ! onInitMessage(self)
|
||||
pull(in)
|
||||
}
|
||||
|
||||
private def dequeueAndSend(): Unit = {
|
||||
ref ! buffer.poll()
|
||||
ref ! messageAdapter(self)(buffer.poll())
|
||||
if (buffer.isEmpty && completeReceived) finish()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,14 +16,15 @@ import akka.annotation.InternalApi
|
|||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] object ActorRefSinkActor {
|
||||
def props(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any): Props =
|
||||
Props(new ActorRefSinkActor(ref, highWatermark, onCompleteMessage))
|
||||
def props(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any, onFailureMessage: Throwable ⇒ Any): Props =
|
||||
Props(new ActorRefSinkActor(ref, highWatermark, onCompleteMessage, onFailureMessage))
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] class ActorRefSinkActor(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any) extends ActorSubscriber {
|
||||
@InternalApi private[akka] class ActorRefSinkActor(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any, onFailureMessage: Throwable ⇒ Any)
|
||||
extends ActorSubscriber {
|
||||
import ActorSubscriberMessage._
|
||||
|
||||
override val requestStrategy = WatermarkRequestStrategy(highWatermark)
|
||||
|
|
@ -34,7 +35,7 @@ import akka.annotation.InternalApi
|
|||
case OnNext(elem) ⇒
|
||||
ref.tell(elem, ActorRef.noSender)
|
||||
case OnError(cause) ⇒
|
||||
ref.tell(Status.Failure(cause), ActorRef.noSender)
|
||||
ref.tell(onFailureMessage(cause), ActorRef.noSender)
|
||||
context.stop(self)
|
||||
case OnComplete ⇒
|
||||
ref.tell(onCompleteMessage, ActorRef.noSender)
|
||||
|
|
|
|||
|
|
@ -15,17 +15,20 @@ import akka.stream.ActorMaterializerSettings
|
|||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] object ActorRefSourceActor {
|
||||
def props(bufferSize: Int, overflowStrategy: OverflowStrategy, settings: ActorMaterializerSettings) = {
|
||||
def props(completionMatcher: PartialFunction[Any, Unit], failureMatcher: PartialFunction[Any, Throwable],
|
||||
bufferSize: Int, overflowStrategy: OverflowStrategy, settings: ActorMaterializerSettings) = {
|
||||
require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported")
|
||||
val maxFixedBufferSize = settings.maxFixedBufferSize
|
||||
Props(new ActorRefSourceActor(bufferSize, overflowStrategy, maxFixedBufferSize))
|
||||
Props(new ActorRefSourceActor(completionMatcher, failureMatcher, bufferSize, overflowStrategy, maxFixedBufferSize))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: OverflowStrategy, maxFixedBufferSize: Int)
|
||||
@InternalApi private[akka] class ActorRefSourceActor(
|
||||
completionMatcher: PartialFunction[Any, Unit], failureMatcher: PartialFunction[Any, Throwable],
|
||||
bufferSize: Int, overflowStrategy: OverflowStrategy, maxFixedBufferSize: Int)
|
||||
extends akka.stream.actor.ActorPublisher[Any] with ActorLogging {
|
||||
import akka.stream.actor.ActorPublisherMessage._
|
||||
|
||||
|
|
@ -35,15 +38,21 @@ import akka.stream.ActorMaterializerSettings
|
|||
def receive = ({
|
||||
case Cancel ⇒
|
||||
context.stop(self)
|
||||
}: Receive)
|
||||
.orElse(requestElem)
|
||||
.orElse(receiveFailure)
|
||||
.orElse(receiveComplete)
|
||||
.orElse(receiveElem)
|
||||
|
||||
case _: Status.Success ⇒
|
||||
if (bufferSize == 0 || buffer.isEmpty) context.stop(self) // will complete the stream successfully
|
||||
else context.become(drainBufferThenComplete)
|
||||
def receiveComplete: Receive = completionMatcher.andThen { _ ⇒
|
||||
if (bufferSize == 0 || buffer.isEmpty) context.stop(self) // will complete the stream successfully
|
||||
else context.become(drainBufferThenComplete)
|
||||
}
|
||||
|
||||
case Status.Failure(cause) if isActive ⇒
|
||||
def receiveFailure: Receive = failureMatcher.andThen { cause ⇒
|
||||
if (isActive)
|
||||
onErrorThenStop(cause)
|
||||
|
||||
}: Receive).orElse(requestElem).orElse(receiveElem)
|
||||
}
|
||||
|
||||
def requestElem: Receive = {
|
||||
case _: Request ⇒
|
||||
|
|
|
|||
|
|
@ -99,19 +99,24 @@ import akka.util.OptionVal
|
|||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] final class ActorRefSource[Out](
|
||||
bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: Attributes, shape: SourceShape[Out])
|
||||
completionMatcher: PartialFunction[Any, Unit],
|
||||
failureMatcher: PartialFunction[Any, Throwable],
|
||||
bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: Attributes, shape: SourceShape[Out])
|
||||
extends SourceModule[Out, ActorRef](shape) {
|
||||
|
||||
override protected def label: String = s"ActorRefSource($bufferSize, $overflowStrategy)"
|
||||
|
||||
override def create(context: MaterializationContext) = {
|
||||
val mat = ActorMaterializerHelper.downcast(context.materializer)
|
||||
val ref = mat.actorOf(context, ActorRefSourceActor.props(bufferSize, overflowStrategy, mat.settings))
|
||||
val ref = mat.actorOf(context, ActorRefSourceActor.props(
|
||||
completionMatcher,
|
||||
failureMatcher,
|
||||
bufferSize, overflowStrategy, mat.settings))
|
||||
(akka.stream.actor.ActorPublisher[Out](ref), ref)
|
||||
}
|
||||
|
||||
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] =
|
||||
new ActorRefSource[Out](bufferSize, overflowStrategy, attributes, shape)
|
||||
new ActorRefSource[Out](completionMatcher, failureMatcher, bufferSize, overflowStrategy, attributes, shape)
|
||||
override def withAttributes(attr: Attributes): SourceModule[Out, ActorRef] =
|
||||
new ActorRefSource(bufferSize, overflowStrategy, attr, amendShape(attr))
|
||||
new ActorRefSource(completionMatcher, failureMatcher, bufferSize, overflowStrategy, attr, amendShape(attr))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -168,7 +168,7 @@ import scala.collection.generic.CanBuildFrom
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any,
|
||||
@InternalApi private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable ⇒ Any,
|
||||
val attributes: Attributes,
|
||||
shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) {
|
||||
|
||||
|
|
@ -177,14 +177,14 @@ import scala.collection.generic.CanBuildFrom
|
|||
val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max
|
||||
val subscriberRef = actorMaterializer.actorOf(
|
||||
context,
|
||||
ActorRefSinkActor.props(ref, maxInputBufferSize, onCompleteMessage))
|
||||
ActorRefSinkActor.props(ref, maxInputBufferSize, onCompleteMessage, onFailureMessage))
|
||||
(akka.stream.actor.ActorSubscriber[In](subscriberRef), NotUsed)
|
||||
}
|
||||
|
||||
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] =
|
||||
new ActorRefSink[In](ref, onCompleteMessage, attributes, shape)
|
||||
new ActorRefSink[In](ref, onCompleteMessage, onFailureMessage, attributes, shape)
|
||||
override def withAttributes(attr: Attributes): SinkModule[In, NotUsed] =
|
||||
new ActorRefSink[In](ref, onCompleteMessage, attr, amendShape(attr))
|
||||
new ActorRefSink[In](ref, onCompleteMessage, onFailureMessage, attr, amendShape(attr))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -210,7 +210,7 @@ object Sink {
|
|||
*/
|
||||
def actorRefWithAck[In](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any,
|
||||
onFailureMessage: function.Function[Throwable, Any]): Sink[In, NotUsed] =
|
||||
new Sink(scaladsl.Sink.actorRefWithAck[In](ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage.apply))
|
||||
new Sink(scaladsl.Sink.actorRefWithAck[In](ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage.apply _))
|
||||
|
||||
/**
|
||||
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ package akka.stream.scaladsl
|
|||
import akka.{ Done, NotUsed }
|
||||
import akka.dispatch.ExecutionContexts
|
||||
import akka.actor.{ ActorRef, Props, Status }
|
||||
import akka.annotation.InternalApi
|
||||
import akka.stream.actor.ActorSubscriber
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
import akka.stream.impl._
|
||||
|
|
@ -368,6 +369,27 @@ object Sink {
|
|||
Flow[T].via(newOnCompleteStage()).to(Sink.ignore).named("onCompleteSink")
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Sends the elements of the stream to the given `ActorRef`.
|
||||
* If the target actor terminates the stream will be canceled.
|
||||
* When the stream is completed successfully the given `onCompleteMessage`
|
||||
* will be sent to the destination actor.
|
||||
* When the stream is completed with failure the `onFailureMessage` will be invoked
|
||||
* and its result will be sent to the destination actor.
|
||||
*
|
||||
* It will request at most `maxInputBufferSize` number of elements from
|
||||
* upstream, but there is no back-pressure signal from the destination actor,
|
||||
* i.e. if the actor is not consuming the messages fast enough the mailbox
|
||||
* of the actor will grow. For potentially slow consumer actors it is recommended
|
||||
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
|
||||
* limiting stage in front of this `Sink`.
|
||||
*/
|
||||
@InternalApi private[akka] def actorRef[T](ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable ⇒ Any): Sink[T, NotUsed] =
|
||||
fromGraph(new ActorRefSink(ref, onCompleteMessage, onFailureMessage,
|
||||
DefaultAttributes.actorRefSink, shape("ActorRefSink")))
|
||||
|
||||
/**
|
||||
* Sends the elements of the stream to the given `ActorRef`.
|
||||
* If the target actor terminates the stream will be canceled.
|
||||
|
|
@ -384,7 +406,33 @@ object Sink {
|
|||
* limiting stage in front of this `Sink`.
|
||||
*/
|
||||
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed] =
|
||||
fromGraph(new ActorRefSink(ref, onCompleteMessage, DefaultAttributes.actorRefSink, shape("ActorRefSink")))
|
||||
fromGraph(new ActorRefSink(ref, onCompleteMessage, t ⇒ Status.Failure(t),
|
||||
DefaultAttributes.actorRefSink, shape("ActorRefSink")))
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
|
||||
* First element is created by calling `onInitMessage` with an `ActorRef` of the actor that
|
||||
* expects acknowledgements. Then stream is waiting for acknowledgement message
|
||||
* `ackMessage` from the given actor which means that it is ready to process
|
||||
* elements. It also requires `ackMessage` message after each stream element
|
||||
* to make backpressure work.
|
||||
*
|
||||
* Every message that is sent to the actor is first transformed using `messageAdapter`.
|
||||
* This can be used to capture the ActorRef of the actor that expects acknowledgments as
|
||||
* well as transforming messages from the stream to the ones that actor under `ref` handles.
|
||||
*
|
||||
* If the target actor terminates the stream will be canceled.
|
||||
* When the stream is completed successfully the given `onCompleteMessage`
|
||||
* will be sent to the destination actor.
|
||||
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
|
||||
* function will be sent to the destination actor.
|
||||
*/
|
||||
@InternalApi private[akka] def actorRefWithAck[T](ref: ActorRef, messageAdapter: ActorRef ⇒ T ⇒ Any,
|
||||
onInitMessage: ActorRef ⇒ Any, ackMessage: Any, onCompleteMessage: Any,
|
||||
onFailureMessage: (Throwable) ⇒ Any): Sink[T, NotUsed] =
|
||||
Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, messageAdapter, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage))
|
||||
|
||||
/**
|
||||
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
|
||||
|
|
@ -398,10 +446,11 @@ object Sink {
|
|||
* will be sent to the destination actor.
|
||||
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
|
||||
* function will be sent to the destination actor.
|
||||
*
|
||||
*/
|
||||
def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any,
|
||||
onFailureMessage: (Throwable) ⇒ Any = Status.Failure): Sink[T, NotUsed] =
|
||||
Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage))
|
||||
actorRefWithAck(ref, _ ⇒ identity, _ ⇒ onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)
|
||||
|
||||
/**
|
||||
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ package akka.stream.scaladsl
|
|||
import java.util.concurrent.CompletionStage
|
||||
|
||||
import akka.actor.{ ActorRef, Cancellable, Props }
|
||||
import akka.annotation.InternalApi
|
||||
import akka.stream.actor.ActorPublisher
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
import akka.stream.impl.fusing.GraphStages
|
||||
|
|
@ -430,6 +431,49 @@ object Source {
|
|||
fromGraph(new ActorPublisherSource(props, DefaultAttributes.actorPublisherSource, shape("ActorPublisherSource")))
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
|
||||
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
|
||||
* otherwise they will be buffered until request for demand is received.
|
||||
*
|
||||
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
|
||||
* there is no space available in the buffer.
|
||||
*
|
||||
* The strategy [[akka.stream.OverflowStrategy.backpressure]] is not supported, and an
|
||||
* IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.
|
||||
*
|
||||
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand
|
||||
* from downstream. When `bufferSize` is 0 the `overflowStrategy` does not matter. An async boundary is added after
|
||||
* this Source; as such, it is never safe to assume the downstream will always generate demand.
|
||||
*
|
||||
* The stream can be completed successfully by sending the actor reference a message that is matched by
|
||||
* `completionMatcher` in which case already buffered elements will be signaled before signaling
|
||||
* completion, or by sending [[akka.actor.PoisonPill]] in which case completion will be signaled immediately.
|
||||
*
|
||||
* The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
|
||||
* [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
|
||||
* a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
|
||||
* the failure will be signaled downstream immediately (instead of the completion signal).
|
||||
*
|
||||
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
|
||||
* i.e. you can watch it to get notified when that happens.
|
||||
*
|
||||
* See also [[akka.stream.scaladsl.Source.queue]].
|
||||
*
|
||||
* @param bufferSize The size of the buffer in element count
|
||||
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
|
||||
*/
|
||||
@InternalApi private[akka] def actorRef[T](
|
||||
completionMatcher: PartialFunction[Any, Unit],
|
||||
failureMatcher: PartialFunction[Any, Throwable],
|
||||
bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = {
|
||||
require(bufferSize >= 0, "bufferSize must be greater than or equal to 0")
|
||||
require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported")
|
||||
fromGraph(new ActorRefSource(completionMatcher, failureMatcher, bufferSize, overflowStrategy, DefaultAttributes.actorRefSource, shape("ActorRefSource")))
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
|
||||
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
|
||||
|
|
@ -459,14 +503,15 @@ object Source {
|
|||
*
|
||||
* See also [[akka.stream.scaladsl.Source.queue]].
|
||||
*
|
||||
*
|
||||
* @param bufferSize The size of the buffer in element count
|
||||
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
|
||||
*/
|
||||
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = {
|
||||
require(bufferSize >= 0, "bufferSize must be greater than or equal to 0")
|
||||
require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported")
|
||||
fromGraph(new ActorRefSource(bufferSize, overflowStrategy, DefaultAttributes.actorRefSource, shape("ActorRefSource")))
|
||||
}
|
||||
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] =
|
||||
actorRef(
|
||||
{ case akka.actor.Status.Success(_) ⇒ },
|
||||
{ case akka.actor.Status.Failure(cause) ⇒ cause },
|
||||
bufferSize, overflowStrategy)
|
||||
|
||||
/**
|
||||
* Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`.
|
||||
|
|
|
|||
19
build.sbt
19
build.sbt
|
|
@ -37,7 +37,10 @@ lazy val aggregatedProjects: Seq[ProjectReference] = Seq(
|
|||
slf4j,
|
||||
stream, streamTestkit, streamTests, streamTestsTck,
|
||||
testkit,
|
||||
actorTyped, actorTypedTests, typedTestkit, persistenceTyped, clusterTyped, clusterShardingTyped
|
||||
actorTyped, actorTypedTests, typedTestkit,
|
||||
persistenceTyped,
|
||||
clusterTyped, clusterShardingTyped,
|
||||
streamTyped
|
||||
)
|
||||
|
||||
lazy val root = Project(
|
||||
|
|
@ -385,7 +388,6 @@ lazy val persistenceTyped = akkaModule("akka-persistence-typed")
|
|||
.dependsOn(
|
||||
actorTyped,
|
||||
persistence,
|
||||
testkit % "test->test",
|
||||
typedTestkit % "test->test",
|
||||
actorTypedTests % "test->test"
|
||||
)
|
||||
|
|
@ -401,7 +403,6 @@ lazy val clusterTyped = akkaModule("akka-cluster-typed")
|
|||
distributedData,
|
||||
persistence % "provided->test",
|
||||
persistenceTyped % "provided->test",
|
||||
testkit % "test->test",
|
||||
typedTestkit % "test->test",
|
||||
actorTypedTests % "test->test"
|
||||
)
|
||||
|
|
@ -414,7 +415,6 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed")
|
|||
clusterTyped,
|
||||
persistenceTyped,
|
||||
clusterSharding,
|
||||
testkit % "test->test",
|
||||
typedTestkit % "test->test",
|
||||
actorTypedTests % "test->test",
|
||||
persistenceTyped % "test->test"
|
||||
|
|
@ -425,6 +425,17 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed")
|
|||
.settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf" ))
|
||||
.disablePlugins(MimaPlugin)
|
||||
|
||||
lazy val streamTyped = akkaModule("akka-stream-typed")
|
||||
.dependsOn(
|
||||
actorTyped,
|
||||
stream,
|
||||
typedTestkit % "test->test",
|
||||
actorTypedTests % "test->test"
|
||||
)
|
||||
.settings(AkkaBuild.mayChangeSettings)
|
||||
.settings(AutomaticModuleName.settings("akka.stream.typed"))
|
||||
.disablePlugins(MimaPlugin)
|
||||
.enablePlugins(ScaladocNoVerificationOfDiagrams)
|
||||
|
||||
lazy val typedTestkit = akkaModule("akka-testkit-typed")
|
||||
.dependsOn(actorTyped, testkit % "compile->compile;test->test")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue