Typed stream adapters, #23604

* Change more general factories to private
* Typed Streams docs
* Remove BoxedUnit from Java Api
* Use JavaPartialFunction in Java examples
* Doc wording improvements, formatting fixes, no verification diagrams
This commit is contained in:
Martynas Mickevičius 2018-01-19 19:22:40 +07:00 committed by Patrik Nordwall
parent 171bb6c231
commit cbe0215c41
18 changed files with 419 additions and 53 deletions

View file

@ -11,6 +11,7 @@
* [fault-tolerance](fault-tolerance.md) * [fault-tolerance](fault-tolerance.md)
* [actor-discovery](actor-discovery.md) * [actor-discovery](actor-discovery.md)
* [stash](stash.md) * [stash](stash.md)
* [stream](stream.md)
* [cluster](cluster.md) * [cluster](cluster.md)
* [cluster-singleton](cluster-singleton.md) * [cluster-singleton](cluster-singleton.md)
* [cluster-sharding](cluster-sharding.md) * [cluster-sharding](cluster-sharding.md)

View file

@ -0,0 +1,53 @@
# Streams
@@@ warning
This module is currently marked as @ref:[may change](../common/may-change.md) in the sense
of being the subject of active research. This means that API or semantics can
change without warning or deprecation period and it is not recommended to use
this module in production just yet—you have been warned.
@@@
@ref:[Akka Streams](../stream/index.md) make it easy to model type-safe message processing pipelines. With typed actors it is possible to connect streams to actors without loosing the type information.
To use the typed stream source and sink factories add the following dependency:
@@dependency [sbt,Maven,Gradle] {
group=com.typesafe.akka
artifact=akka-stream-typed_2.12
version=$akka.version$
}
This dependency contains typed alternatives to the @ref:[already existing `ActorRef` sources and sinks](../stream/stream-integrations.md) together with a factory methods for @scala[@scaladoc[`ActorMaterializer`](akka.stream.typed.ActorMaterializer)]@java[@javadoc[`ActorMaterializer`](akka.stream.typed.ActorMaterializer)] which take a typed `ActorSystem`.
The materializer created from these factory methods and sources together with sinks contained in this module can be mixed and matched with the original Akka Streams building blocks from the original module.
## Actor Source
A stream that is driven by messages sent to a particular actor can be started with @scala[@scaladoc[`ActorSource.actorRef`](akka.stream.typed.scaladsl.ActorSource#actorRef)]@java[@javadoc[`ActorSource.actorRef`](akka.stream.typed.javadsl.ActorSource#actorRef)]. This source materializes to a typed `ActorRef` which only accepts messages that are of the same type as the stream.
Scala
: @@snip [ActorSourceSinkExample.scala]($akka$/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-source-ref }
Java
: @@snip [ActorSourceExample.java]($akka$/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java) { #actor-source-ref }
## Actor Sink
There are two sinks availabe that accept typed `ActorRef`s. To send all of the messages from a stream to an actor without considering backpressure, use @scala[@scaladoc[`ActorSink.actorRef`](akka.stream.typed.scaladsl.ActorSink#actorRef)]@java[@javadoc[`ActorSink.actorRef`](akka.stream.typed.javadsl.ActorSink#actorRef)].
Scala
: @@snip [ActorSourceSinkExample.scala]($akka$/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-sink-ref }
Java
: @@snip [ActorSinkExample.java]($akka$/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkExample.java) { #actor-sink-ref }
For an actor to be able to react to backpressure, a protocol needs to be introduced between the actor and the stream. Use @scala[@scaladoc[`ActorSink.actorRefWithAck`](akka.stream.typed.scaladsl.ActorSink#actorRefWithAck)]@java[@javadoc[`ActorSink.actorRefWithAck`](akka.stream.typed.javadsl.ActorSink#actorRefWithAck)] to be able to signal demand when the actor is ready to receive more elements.
Scala
: @@snip [ActorSourceSinkExample.scala]($akka$/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-sink-ref-with-ack }
Java
: @@snip [ActorSinkWithAckExample.java]($akka$/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java) { #actor-sink-ref-with-ack }

View file

@ -117,6 +117,9 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
Ignore(_ == akka.stream.scaladsl.Flow.getClass, _ == "apply", _ == 24, _ true), Ignore(_ == akka.stream.scaladsl.Flow.getClass, _ == "apply", _ == 24, _ true),
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "apply", _ == 24, _ true), Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "apply", _ == 24, _ true),
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "collection", _ true, _ true), Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "collection", _ true, _ true),
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRef", _ true, _ true), // Internal in scaladsl
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRefWithAck", _ true, _ true), // Internal in scaladsl
Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "actorRef", _ true, _ true), // Internal in scaladsl
Ignore(_ == akka.stream.scaladsl.BidiFlow.getClass, _ == "apply", _ == 24, _ true), Ignore(_ == akka.stream.scaladsl.BidiFlow.getClass, _ == "apply", _ == 24, _ true),
Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "runnable", _ == 24, _ true), Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "runnable", _ == 24, _ true),
Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "create", _ == 24, _ true), Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "create", _ == 24, _ true),

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
*/
package akka.stream.typed package akka.stream.typed
import akka.actor.typed.ActorSystem import akka.actor.typed.ActorSystem

View file

@ -1,11 +1,11 @@
/* /**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/> * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
*/ */
package akka.stream.typed.javadsl package akka.stream.typed.javadsl
import akka.actor.typed._ import akka.actor.typed._
import akka.NotUsed import akka.NotUsed
import akka.stream.scaladsl._ import akka.stream.javadsl._
import akka.stream.typed import akka.stream.typed
/** /**
@ -29,7 +29,7 @@ object ActorSink {
* limiting stage in front of this `Sink`. * limiting stage in front of this `Sink`.
*/ */
def actorRef[T](ref: ActorRef[T], onCompleteMessage: T, onFailureMessage: akka.japi.function.Function[Throwable, T]): Sink[T, NotUsed] = def actorRef[T](ref: ActorRef[T], onCompleteMessage: T, onFailureMessage: akka.japi.function.Function[Throwable, T]): Sink[T, NotUsed] =
typed.scaladsl.ActorSink.actorRef(ref, onCompleteMessage, onFailureMessage.apply) typed.scaladsl.ActorSink.actorRef(ref, onCompleteMessage, onFailureMessage.apply).asJava
/** /**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
@ -52,6 +52,6 @@ object ActorSink {
onCompleteMessage: M, onCompleteMessage: M,
onFailureMessage: akka.japi.function.Function[Throwable, M]): Sink[T, NotUsed] = onFailureMessage: akka.japi.function.Function[Throwable, M]): Sink[T, NotUsed] =
typed.scaladsl.ActorSink.actorRefWithAck( typed.scaladsl.ActorSink.actorRefWithAck(
ref, messageAdapter.apply, onInitMessage.apply, ackMessage, onCompleteMessage, onFailureMessage.apply) ref, messageAdapter.apply, onInitMessage.apply, ackMessage, onCompleteMessage, onFailureMessage.apply).asJava
} }

View file

@ -1,9 +1,11 @@
/* /**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/> * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
*/ */
package akka.stream.typed.javadsl package akka.stream.typed.javadsl
import java.util.function.Predicate
import akka.actor.typed._ import akka.actor.typed._
import akka.stream.OverflowStrategy import akka.stream.OverflowStrategy
import akka.stream.javadsl._ import akka.stream.javadsl._
@ -46,11 +48,11 @@ object ActorSource {
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/ */
def actorRef[T]( def actorRef[T](
completionMatcher: PartialFunction[T, Unit], completionMatcher: Predicate[T],
failureMatcher: PartialFunction[T, Throwable], failureMatcher: PartialFunction[T, Throwable],
bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = { bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = {
akka.stream.typed.scaladsl.ActorSource.actorRef( akka.stream.typed.scaladsl.ActorSource.actorRef(
completionMatcher, failureMatcher, { case m if completionMatcher.test(m) }: PartialFunction[T, Unit],
bufferSize, overflowStrategy).asJava failureMatcher, bufferSize, overflowStrategy).asJava
} }
} }

View file

@ -1,5 +1,5 @@
/* /**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/> * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
*/ */
package akka.stream.typed.scaladsl package akka.stream.typed.scaladsl

View file

@ -1,5 +1,5 @@
/* /**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/> * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
*/ */
package akka.stream.typed.scaladsl package akka.stream.typed.scaladsl

View file

@ -1,14 +1,16 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
*/
package akka.stream.typed.javadsl; package akka.stream.typed.javadsl;
import akka.actor.typed.ActorRef; import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem; import akka.actor.typed.ActorSystem;
import akka.japi.JavaPartialFunction;
import akka.stream.ActorMaterializer; import akka.stream.ActorMaterializer;
import akka.stream.OverflowStrategy; import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink; import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source; import akka.stream.javadsl.Source;
import scala.PartialFunction$;
import scala.runtime.AbstractPartialFunction;
import scala.runtime.BoxedUnit;
public class ActorSourceSinkCompileTest { public class ActorSourceSinkCompileTest {
@ -47,38 +49,36 @@ public class ActorSourceSinkCompileTest {
} }
{ {
final AbstractPartialFunction<String, BoxedUnit> completionMatcher = new AbstractPartialFunction<String, BoxedUnit>() {
@Override
public boolean isDefinedAt(String s) {
return s == "complete";
}
};
ActorSource ActorSource
.actorRef( .actorRef(
completionMatcher, (m) -> m == "complete",
PartialFunction$.MODULE$.empty(), // FIXME make the API nicer new JavaPartialFunction<String, Throwable>() {
@Override
public Throwable apply(String x, boolean isCheck) throws Exception {
throw noMatch();
}
},
10, 10,
OverflowStrategy.dropBuffer()) OverflowStrategy.dropBuffer())
.to(Sink.seq()); .to(Sink.seq());
} }
{ {
final AbstractPartialFunction<Protocol, Throwable> failureMatcher = new AbstractPartialFunction<Protocol, Throwable>() { final JavaPartialFunction<Protocol, Throwable> failureMatcher = new JavaPartialFunction<Protocol, Throwable>() {
@Override @Override
public boolean isDefinedAt(Protocol p) { public Throwable apply(Protocol p, boolean isCheck) throws Exception {
return p instanceof Failure; if (p instanceof Failure) {
} return ((Failure)p).ex;
}
@Override else {
public Throwable apply(Protocol p) { throw noMatch();
return ((Failure)p).ex; }
} }
}; };
ActorSource ActorSource
.actorRef( .actorRef(
PartialFunction$.MODULE$.empty(), // FIXME make the API nicer (m) -> false,
failureMatcher, 10, failureMatcher, 10,
OverflowStrategy.dropBuffer()) OverflowStrategy.dropBuffer())
.to(Sink.seq()); .to(Sink.seq());

View file

@ -0,0 +1,53 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
*/
package docs.akka.stream.typed;
// #actor-sink-ref
import akka.NotUsed;
import akka.actor.typed.ActorRef;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSink;
// #actor-sink-ref
public class ActorSinkExample {
// #actor-sink-ref
interface Protocol {}
class Message implements Protocol {
private final String msg;
public Message(String msg) {
this.msg = msg;
}
}
class Complete implements Protocol {}
class Fail implements Protocol {
private final Throwable ex;
public Fail(Throwable ex) {
this.ex = ex;
}
}
// #actor-sink-ref
final ActorMaterializer mat = null;
{
// #actor-sink-ref
final ActorRef<Protocol> actor = null;
final Sink<Protocol, NotUsed> sink = ActorSink.actorRef(
actor,
new Complete(),
Fail::new
);
Source.<Protocol>single(new Message("msg1")).runWith(sink, mat);
// #actor-sink-ref
}
}

View file

@ -0,0 +1,66 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
*/
package docs.akka.stream.typed;
// #actor-sink-ref-with-ack
import akka.NotUsed;
import akka.actor.typed.ActorRef;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSink;
// #actor-sink-ref-with-ack
public class ActorSinkWithAckExample {
// #actor-sink-ref-with-ack
class Ack {}
interface Protocol {}
class Init implements Protocol {
private final ActorRef<Ack> ack;
public Init(ActorRef<Ack> ack) {
this.ack = ack;
}
}
class Message implements Protocol {
private final ActorRef<Ack> ackTo;
private final String msg;
public Message(ActorRef<Ack> ackTo, String msg) {
this.ackTo = ackTo;
this.msg = msg;
}
}
class Complete implements Protocol {}
class Fail implements Protocol {
private final Throwable ex;
public Fail(Throwable ex) {
this.ex = ex;
}
}
// #actor-sink-ref-with-ack
final ActorMaterializer mat = null;
{
// #actor-sink-ref-with-ack
final ActorRef<Protocol> actor = null;
final Sink<String, NotUsed> sink = ActorSink.actorRefWithAck(
actor,
Message::new,
Init::new,
new Ack(),
new Complete(),
Fail::new
);
Source.single("msg1").runWith(sink, mat);
// #actor-sink-ref-with-ack
}
}

View file

@ -0,0 +1,74 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
*/
package docs.akka.stream.typed;
// #actor-source-ref
import akka.actor.typed.ActorRef;
import akka.japi.JavaPartialFunction;
import akka.stream.ActorMaterializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSource;
// #actor-source-ref
public class ActorSourceExample {
// #actor-source-ref
interface Protocol {}
class Message implements Protocol {
private final String msg;
public Message(String msg) {
this.msg = msg;
}
}
class Complete implements Protocol {}
class Fail implements Protocol {
private final Exception ex;
public Fail(Exception ex) {
this.ex = ex;
}
}
// #actor-source-ref
final ActorMaterializer mat = null;
{
// #actor-source-ref
final JavaPartialFunction<Protocol, Throwable> failureMatcher =
new JavaPartialFunction<Protocol, Throwable>() {
public Throwable apply(Protocol p, boolean isCheck) {
if (p instanceof Fail) {
return ((Fail)p).ex;
} else {
throw noMatch();
}
}
};
final Source<Protocol, ActorRef<Protocol>> source = ActorSource.actorRef(
(m) -> m instanceof Complete,
failureMatcher,
8,
OverflowStrategy.fail()
);
final ActorRef<Protocol> ref = source.collect(new JavaPartialFunction<Protocol, String>() {
public String apply(Protocol p, boolean isCheck) {
if (p instanceof Message) {
return ((Message)p).msg;
} else {
throw noMatch();
}
}
}).to(Sink.foreach(System.out::println)).run(mat);
ref.tell(new Message("msg1"));
// ref.tell("msg2"); Does not compile
// #actor-source-ref
}
}

View file

@ -1,9 +1,9 @@
/* /**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/> * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
*/ */
package akka.stream.typed.scaladsl package akka.stream.typed.scaladsl
import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.Behaviors
import akka.stream.OverflowStrategy import akka.stream.OverflowStrategy
import akka.actor.typed.{ ActorRef, ActorSystem } import akka.actor.typed.{ ActorRef, ActorSystem }
import akka.testkit.TestKit import akka.testkit.TestKit
@ -59,20 +59,20 @@ class ActorSourceSinkSpec extends TestKit(akka.actor.ActorSystem("ActorSourceSin
"obey protocol" in { "obey protocol" in {
val p = TestProbe[AckProto]() val p = TestProbe[AckProto]()
val autoPilot = Actor.immutable[AckProto] { val autoPilot = Behaviors.immutable[AckProto] {
(ctx, msg) (ctx, msg)
msg match { msg match {
case m @ Init(sender) case m @ Init(sender)
p.ref ! m p.ref ! m
sender ! "ACK" sender ! "ACK"
Actor.same Behaviors.same
case m @ Msg(sender, _) case m @ Msg(sender, _)
p.ref ! m p.ref ! m
sender ! "ACK" sender ! "ACK"
Actor.same Behaviors.same
case m case m
p.ref ! m p.ref ! m
Actor.same Behaviors.same
} }
} }

View file

@ -0,0 +1,98 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com/>
*/
package docs.akka.stream.typed
import akka.NotUsed
import akka.stream.ActorMaterializer
object ActorSourceSinkExample {
implicit val mat: ActorMaterializer = ???
{
// #actor-source-ref
import akka.actor.typed.ActorRef
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSource
trait Protocol
case class Message(msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Exception) extends Protocol
val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](
completionMatcher = {
case Complete
},
failureMatcher = {
case Fail(ex) ex
},
bufferSize = 8,
overflowStrategy = OverflowStrategy.fail
)
val ref = source.collect {
case Message(msg) msg
}.to(Sink.foreach(println)).run()
ref ! Message("msg1")
// ref ! "msg2" Does not compile
// #actor-source-ref
}
{
// #actor-sink-ref
import akka.actor.typed.ActorRef
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSink
trait Protocol
case class Message(msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Throwable) extends Protocol
val actor: ActorRef[Protocol] = ???
val sink: Sink[Protocol, NotUsed] = ActorSink.actorRef[Protocol](
ref = actor,
onCompleteMessage = Complete,
onFailureMessage = Fail.apply
)
Source.single(Message("msg1")).runWith(sink)
// #actor-sink-ref
}
{
// #actor-sink-ref-with-ack
import akka.actor.typed.ActorRef
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSink
trait Ack
object Ack extends Ack
trait Protocol
case class Init(ackTo: ActorRef[Ack]) extends Protocol
case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Throwable) extends Protocol
val actor: ActorRef[Protocol] = ???
val sink: Sink[String, NotUsed] = ActorSink.actorRefWithAck(
ref = actor,
onCompleteMessage = Complete,
onFailureMessage = Fail.apply,
messageAdapter = Message.apply,
onInitMessage = Init.apply,
ackMessage = Ack
)
Source.single("msg1").runWith(sink)
// #actor-sink-ref-with-ack
}
}

View file

@ -3,3 +3,12 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Restart
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffFlow.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffFlow.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffLogic.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffLogic.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffSource.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffSource.this")
# #23604 Typed stream adapters
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSinkActor.props")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSink.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSinkActor.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSourceActor.props")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSource.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefBackpressureSinkStage.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSourceActor.this")

View file

@ -6,6 +6,7 @@ package akka.stream.scaladsl
import akka.{ Done, NotUsed } import akka.{ Done, NotUsed }
import akka.dispatch.ExecutionContexts import akka.dispatch.ExecutionContexts
import akka.actor.{ ActorRef, Props, Status } import akka.actor.{ ActorRef, Props, Status }
import akka.annotation.InternalApi
import akka.stream.actor.ActorSubscriber import akka.stream.actor.ActorSubscriber
import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl._ import akka.stream.impl._
@ -369,6 +370,8 @@ object Sink {
} }
/** /**
* INTERNAL API
*
* Sends the elements of the stream to the given `ActorRef`. * Sends the elements of the stream to the given `ActorRef`.
* If the target actor terminates the stream will be canceled. * If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage` * When the stream is completed successfully the given `onCompleteMessage`
@ -383,7 +386,7 @@ object Sink {
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
* limiting stage in front of this `Sink`. * limiting stage in front of this `Sink`.
*/ */
def actorRef[T](ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable Any): Sink[T, NotUsed] = @InternalApi private[akka] def actorRef[T](ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable Any): Sink[T, NotUsed] =
fromGraph(new ActorRefSink(ref, onCompleteMessage, onFailureMessage, fromGraph(new ActorRefSink(ref, onCompleteMessage, onFailureMessage,
DefaultAttributes.actorRefSink, shape("ActorRefSink"))) DefaultAttributes.actorRefSink, shape("ActorRefSink")))
@ -402,12 +405,13 @@ object Sink {
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
* limiting stage in front of this `Sink`. * limiting stage in front of this `Sink`.
*/ */
@deprecated("Use `actorRef` that takes onFailureMessage instead. It allows controling the message that will be sent to the actor on failure.", since = "2.5.10")
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed] = def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed] =
fromGraph(new ActorRefSink(ref, onCompleteMessage, t Status.Failure(t), fromGraph(new ActorRefSink(ref, onCompleteMessage, t Status.Failure(t),
DefaultAttributes.actorRefSink, shape("ActorRefSink"))) DefaultAttributes.actorRefSink, shape("ActorRefSink")))
/** /**
* INTERNAL API
*
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is created by calling `onInitMessage` with an `ActorRef` of the actor that * First element is created by calling `onInitMessage` with an `ActorRef` of the actor that
* expects acknowledgements. Then stream is waiting for acknowledgement message * expects acknowledgements. Then stream is waiting for acknowledgement message
@ -425,9 +429,9 @@ object Sink {
* When the stream is completed with failure - result of `onFailureMessage(throwable)` * When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor. * function will be sent to the destination actor.
*/ */
def actorRefWithAck[T](ref: ActorRef, messageAdapter: ActorRef T Any, @InternalApi private[akka] def actorRefWithAck[T](ref: ActorRef, messageAdapter: ActorRef T Any,
onInitMessage: ActorRef Any, ackMessage: Any, onCompleteMessage: Any, onInitMessage: ActorRef Any, ackMessage: Any, onCompleteMessage: Any,
onFailureMessage: (Throwable) Any): Sink[T, NotUsed] = onFailureMessage: (Throwable) Any): Sink[T, NotUsed] =
Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, messageAdapter, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)) Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, messageAdapter, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage))
/** /**
@ -443,9 +447,7 @@ object Sink {
* When the stream is completed with failure - result of `onFailureMessage(throwable)` * When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor. * function will be sent to the destination actor.
* *
* @deprecated Use `actorRefWithAck` that takes `messageAdapter` instead. It allows capturing the original sender of the messages as well as transforming messages before sending them to the actor under the `ref`.
*/ */
@deprecated("Use `actorRefWithAck` that takes `messageAdapter` instead. It allows capturing the original sender of the messages as well as transforming messages before sending them to the actor under the `ref`.", since = "2.5.10")
def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any, def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any,
onFailureMessage: (Throwable) Any = Status.Failure): Sink[T, NotUsed] = onFailureMessage: (Throwable) Any = Status.Failure): Sink[T, NotUsed] =
actorRefWithAck(ref, _ identity, _ onInitMessage, ackMessage, onCompleteMessage, onFailureMessage) actorRefWithAck(ref, _ identity, _ onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)

View file

@ -6,6 +6,7 @@ package akka.stream.scaladsl
import java.util.concurrent.CompletionStage import java.util.concurrent.CompletionStage
import akka.actor.{ ActorRef, Cancellable, Props } import akka.actor.{ ActorRef, Cancellable, Props }
import akka.annotation.InternalApi
import akka.stream.actor.ActorPublisher import akka.stream.actor.ActorPublisher
import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.fusing.GraphStages import akka.stream.impl.fusing.GraphStages
@ -431,6 +432,8 @@ object Source {
} }
/** /**
* INTERNAL API
*
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream, * Messages sent to this actor will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received. * otherwise they will be buffered until request for demand is received.
@ -462,7 +465,7 @@ object Source {
* @param bufferSize The size of the buffer in element count * @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/ */
def actorRef[T]( @InternalApi private[akka] def actorRef[T](
completionMatcher: PartialFunction[Any, Unit], completionMatcher: PartialFunction[Any, Unit],
failureMatcher: PartialFunction[Any, Throwable], failureMatcher: PartialFunction[Any, Throwable],
bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = { bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = {
@ -500,12 +503,10 @@ object Source {
* *
* See also [[akka.stream.scaladsl.Source.queue]]. * See also [[akka.stream.scaladsl.Source.queue]].
* *
* @deprecated Use `actorRef` that takes matchers instead. It allows controlling the completion and failure messages that are sent to the actor.
* *
* @param bufferSize The size of the buffer in element count * @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/ */
@deprecated("Use `actorRef` that takes matchers instead. It allows controlling the messages that are used for completion and failure.", since = "2.5.10")
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] =
actorRef( actorRef(
{ case akka.actor.Status.Success(_) }, { case akka.actor.Status.Success(_) },

View file

@ -435,6 +435,7 @@ lazy val streamTyped = akkaModule("akka-stream-typed")
.settings(AkkaBuild.mayChangeSettings) .settings(AkkaBuild.mayChangeSettings)
.settings(AutomaticModuleName.settings("akka.stream.typed")) .settings(AutomaticModuleName.settings("akka.stream.typed"))
.disablePlugins(MimaPlugin) .disablePlugins(MimaPlugin)
.enablePlugins(ScaladocNoVerificationOfDiagrams)
lazy val typedTestkit = akkaModule("akka-testkit-typed") lazy val typedTestkit = akkaModule("akka-testkit-typed")
.dependsOn(actorTyped, testkit % "compile->compile;test->test") .dependsOn(actorTyped, testkit % "compile->compile;test->test")