diff --git a/akka-docs/src/main/paradox/typed/index.md b/akka-docs/src/main/paradox/typed/index.md
index 81e3aa90fe..6ff1753eb7 100644
--- a/akka-docs/src/main/paradox/typed/index.md
+++ b/akka-docs/src/main/paradox/typed/index.md
@@ -11,6 +11,7 @@
* [fault-tolerance](fault-tolerance.md)
* [actor-discovery](actor-discovery.md)
* [stash](stash.md)
+* [stream](stream.md)
* [cluster](cluster.md)
* [cluster-singleton](cluster-singleton.md)
* [cluster-sharding](cluster-sharding.md)
diff --git a/akka-docs/src/main/paradox/typed/stream.md b/akka-docs/src/main/paradox/typed/stream.md
new file mode 100644
index 0000000000..94eea947cc
--- /dev/null
+++ b/akka-docs/src/main/paradox/typed/stream.md
@@ -0,0 +1,53 @@
+# Streams
+
+@@@ warning
+
+This module is currently marked as @ref:[may change](../common/may-change.md) in the sense
+ of being the subject of active research. This means that API or semantics can
+ change without warning or deprecation period and it is not recommended to use
+ this module in production just yet—you have been warned.
+
+@@@
+
+@ref:[Akka Streams](../stream/index.md) make it easy to model type-safe message processing pipelines. With typed actors it is possible to connect streams to actors without loosing the type information.
+
+To use the typed stream source and sink factories add the following dependency:
+
+@@dependency [sbt,Maven,Gradle] {
+ group=com.typesafe.akka
+ artifact=akka-stream-typed_2.12
+ version=$akka.version$
+}
+
+This dependency contains typed alternatives to the @ref:[already existing `ActorRef` sources and sinks](../stream/stream-integrations.md) together with a factory methods for @scala[@scaladoc[`ActorMaterializer`](akka.stream.typed.ActorMaterializer)]@java[@javadoc[`ActorMaterializer`](akka.stream.typed.ActorMaterializer)] which take a typed `ActorSystem`.
+
+The materializer created from these factory methods and sources together with sinks contained in this module can be mixed and matched with the original Akka Streams building blocks from the original module.
+
+## Actor Source
+
+A stream that is driven by messages sent to a particular actor can be started with @scala[@scaladoc[`ActorSource.actorRef`](akka.stream.typed.scaladsl.ActorSource#actorRef)]@java[@javadoc[`ActorSource.actorRef`](akka.stream.typed.javadsl.ActorSource#actorRef)]. This source materializes to a typed `ActorRef` which only accepts messages that are of the same type as the stream.
+
+Scala
+: @@snip [ActorSourceSinkExample.scala]($akka$/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-source-ref }
+
+Java
+: @@snip [ActorSourceExample.java]($akka$/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java) { #actor-source-ref }
+
+
+## Actor Sink
+
+There are two sinks availabe that accept typed `ActorRef`s. To send all of the messages from a stream to an actor without considering backpressure, use @scala[@scaladoc[`ActorSink.actorRef`](akka.stream.typed.scaladsl.ActorSink#actorRef)]@java[@javadoc[`ActorSink.actorRef`](akka.stream.typed.javadsl.ActorSink#actorRef)].
+
+Scala
+: @@snip [ActorSourceSinkExample.scala]($akka$/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-sink-ref }
+
+Java
+: @@snip [ActorSinkExample.java]($akka$/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkExample.java) { #actor-sink-ref }
+
+For an actor to be able to react to backpressure, a protocol needs to be introduced between the actor and the stream. Use @scala[@scaladoc[`ActorSink.actorRefWithAck`](akka.stream.typed.scaladsl.ActorSink#actorRefWithAck)]@java[@javadoc[`ActorSink.actorRefWithAck`](akka.stream.typed.javadsl.ActorSink#actorRefWithAck)] to be able to signal demand when the actor is ready to receive more elements.
+
+Scala
+: @@snip [ActorSourceSinkExample.scala]($akka$/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-sink-ref-with-ack }
+
+Java
+: @@snip [ActorSinkWithAckExample.java]($akka$/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java) { #actor-sink-ref-with-ack }
diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala
index e4d350ee81..1cc5abaa66 100644
--- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala
+++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala
@@ -117,6 +117,9 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
Ignore(_ == akka.stream.scaladsl.Flow.getClass, _ == "apply", _ == 24, _ ⇒ true),
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "apply", _ == 24, _ ⇒ true),
Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "collection", _ ⇒ true, _ ⇒ true),
+ Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRef", _ ⇒ true, _ ⇒ true), // Internal in scaladsl
+ Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRefWithAck", _ ⇒ true, _ ⇒ true), // Internal in scaladsl
+ Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "actorRef", _ ⇒ true, _ ⇒ true), // Internal in scaladsl
Ignore(_ == akka.stream.scaladsl.BidiFlow.getClass, _ == "apply", _ == 24, _ ⇒ true),
Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "runnable", _ == 24, _ ⇒ true),
Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "create", _ == 24, _ ⇒ true),
diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/ActorMaterializer.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/ActorMaterializer.scala
index df3c6b32ab..3fe14605eb 100644
--- a/akka-stream-typed/src/main/scala/akka/stream/typed/ActorMaterializer.scala
+++ b/akka-stream-typed/src/main/scala/akka/stream/typed/ActorMaterializer.scala
@@ -1,3 +1,6 @@
+/**
+ * Copyright (C) 2018 Lightbend Inc.
+ */
package akka.stream.typed
import akka.actor.typed.ActorSystem
diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala
index da75a2bc0e..d3f78cdff7 100644
--- a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala
+++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala
@@ -1,11 +1,11 @@
-/*
- * Copyright (C) 2017 Lightbend Inc.
- */
+/**
+ * Copyright (C) 2018 Lightbend Inc.
+ */
package akka.stream.typed.javadsl
import akka.actor.typed._
import akka.NotUsed
-import akka.stream.scaladsl._
+import akka.stream.javadsl._
import akka.stream.typed
/**
@@ -29,7 +29,7 @@ object ActorSink {
* limiting stage in front of this `Sink`.
*/
def actorRef[T](ref: ActorRef[T], onCompleteMessage: T, onFailureMessage: akka.japi.function.Function[Throwable, T]): Sink[T, NotUsed] =
- typed.scaladsl.ActorSink.actorRef(ref, onCompleteMessage, onFailureMessage.apply)
+ typed.scaladsl.ActorSink.actorRef(ref, onCompleteMessage, onFailureMessage.apply).asJava
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
@@ -52,6 +52,6 @@ object ActorSink {
onCompleteMessage: M,
onFailureMessage: akka.japi.function.Function[Throwable, M]): Sink[T, NotUsed] =
typed.scaladsl.ActorSink.actorRefWithAck(
- ref, messageAdapter.apply, onInitMessage.apply, ackMessage, onCompleteMessage, onFailureMessage.apply)
+ ref, messageAdapter.apply, onInitMessage.apply, ackMessage, onCompleteMessage, onFailureMessage.apply).asJava
}
diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala
index 72ec384862..0ca4011dc8 100644
--- a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala
+++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala
@@ -1,9 +1,11 @@
-/*
- * Copyright (C) 2017 Lightbend Inc.
+/**
+ * Copyright (C) 2018 Lightbend Inc.
*/
package akka.stream.typed.javadsl
+import java.util.function.Predicate
+
import akka.actor.typed._
import akka.stream.OverflowStrategy
import akka.stream.javadsl._
@@ -46,11 +48,11 @@ object ActorSource {
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def actorRef[T](
- completionMatcher: PartialFunction[T, Unit],
+ completionMatcher: Predicate[T],
failureMatcher: PartialFunction[T, Throwable],
bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = {
akka.stream.typed.scaladsl.ActorSource.actorRef(
- completionMatcher, failureMatcher,
- bufferSize, overflowStrategy).asJava
+ { case m if completionMatcher.test(m) ⇒ }: PartialFunction[T, Unit],
+ failureMatcher, bufferSize, overflowStrategy).asJava
}
}
diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala
index f3d5e405ac..8f978b4296 100644
--- a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala
+++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala
@@ -1,5 +1,5 @@
-/*
- * Copyright (C) 2017 Lightbend Inc.
+/**
+ * Copyright (C) 2018 Lightbend Inc.
*/
package akka.stream.typed.scaladsl
diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala
index bc5539e329..8bec128fbf 100644
--- a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala
+++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala
@@ -1,5 +1,5 @@
-/*
- * Copyright (C) 2017 Lightbend Inc.
+/**
+ * Copyright (C) 2018 Lightbend Inc.
*/
package akka.stream.typed.scaladsl
diff --git a/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java b/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java
index bf3d097f0c..6a6aec7037 100644
--- a/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java
+++ b/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java
@@ -1,14 +1,16 @@
+/**
+ * Copyright (C) 2018 Lightbend Inc.
+ */
+
package akka.stream.typed.javadsl;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
+import akka.japi.JavaPartialFunction;
import akka.stream.ActorMaterializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
-import scala.PartialFunction$;
-import scala.runtime.AbstractPartialFunction;
-import scala.runtime.BoxedUnit;
public class ActorSourceSinkCompileTest {
@@ -47,38 +49,36 @@ public class ActorSourceSinkCompileTest {
}
{
- final AbstractPartialFunction completionMatcher = new AbstractPartialFunction() {
- @Override
- public boolean isDefinedAt(String s) {
- return s == "complete";
- }
- };
-
ActorSource
.actorRef(
- completionMatcher,
- PartialFunction$.MODULE$.empty(), // FIXME make the API nicer
+ (m) -> m == "complete",
+ new JavaPartialFunction() {
+ @Override
+ public Throwable apply(String x, boolean isCheck) throws Exception {
+ throw noMatch();
+ }
+ },
10,
OverflowStrategy.dropBuffer())
.to(Sink.seq());
}
{
- final AbstractPartialFunction failureMatcher = new AbstractPartialFunction() {
+ final JavaPartialFunction failureMatcher = new JavaPartialFunction() {
@Override
- public boolean isDefinedAt(Protocol p) {
- return p instanceof Failure;
- }
-
- @Override
- public Throwable apply(Protocol p) {
- return ((Failure)p).ex;
+ public Throwable apply(Protocol p, boolean isCheck) throws Exception {
+ if (p instanceof Failure) {
+ return ((Failure)p).ex;
+ }
+ else {
+ throw noMatch();
+ }
}
};
ActorSource
.actorRef(
- PartialFunction$.MODULE$.empty(), // FIXME make the API nicer
+ (m) -> false,
failureMatcher, 10,
OverflowStrategy.dropBuffer())
.to(Sink.seq());
diff --git a/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkExample.java b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkExample.java
new file mode 100644
index 0000000000..94717a8f68
--- /dev/null
+++ b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkExample.java
@@ -0,0 +1,53 @@
+/**
+ * Copyright (C) 2018 Lightbend Inc.
+ */
+
+package docs.akka.stream.typed;
+
+// #actor-sink-ref
+import akka.NotUsed;
+import akka.actor.typed.ActorRef;
+import akka.stream.ActorMaterializer;
+import akka.stream.javadsl.Sink;
+import akka.stream.javadsl.Source;
+import akka.stream.typed.javadsl.ActorSink;
+// #actor-sink-ref
+
+public class ActorSinkExample {
+
+ // #actor-sink-ref
+
+ interface Protocol {}
+ class Message implements Protocol {
+ private final String msg;
+ public Message(String msg) {
+ this.msg = msg;
+ }
+ }
+ class Complete implements Protocol {}
+ class Fail implements Protocol {
+ private final Throwable ex;
+ public Fail(Throwable ex) {
+ this.ex = ex;
+ }
+ }
+ // #actor-sink-ref
+
+ final ActorMaterializer mat = null;
+
+ {
+ // #actor-sink-ref
+
+ final ActorRef actor = null;
+
+ final Sink sink = ActorSink.actorRef(
+ actor,
+ new Complete(),
+ Fail::new
+ );
+
+ Source.single(new Message("msg1")).runWith(sink, mat);
+ // #actor-sink-ref
+ }
+
+}
diff --git a/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java
new file mode 100644
index 0000000000..1ccf886afd
--- /dev/null
+++ b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright (C) 2018 Lightbend Inc.
+ */
+
+package docs.akka.stream.typed;
+
+// #actor-sink-ref-with-ack
+import akka.NotUsed;
+import akka.actor.typed.ActorRef;
+import akka.stream.ActorMaterializer;
+import akka.stream.javadsl.Sink;
+import akka.stream.javadsl.Source;
+import akka.stream.typed.javadsl.ActorSink;
+// #actor-sink-ref-with-ack
+
+public class ActorSinkWithAckExample {
+
+ // #actor-sink-ref-with-ack
+
+ class Ack {}
+
+ interface Protocol {}
+ class Init implements Protocol {
+ private final ActorRef ack;
+ public Init(ActorRef ack) {
+ this.ack = ack;
+ }
+ }
+ class Message implements Protocol {
+ private final ActorRef ackTo;
+ private final String msg;
+ public Message(ActorRef ackTo, String msg) {
+ this.ackTo = ackTo;
+ this.msg = msg;
+ }
+ }
+ class Complete implements Protocol {}
+ class Fail implements Protocol {
+ private final Throwable ex;
+ public Fail(Throwable ex) {
+ this.ex = ex;
+ }
+ }
+ // #actor-sink-ref-with-ack
+
+ final ActorMaterializer mat = null;
+
+ {
+ // #actor-sink-ref-with-ack
+
+ final ActorRef actor = null;
+
+ final Sink sink = ActorSink.actorRefWithAck(
+ actor,
+ Message::new,
+ Init::new,
+ new Ack(),
+ new Complete(),
+ Fail::new
+ );
+
+ Source.single("msg1").runWith(sink, mat);
+ // #actor-sink-ref-with-ack
+ }
+
+}
diff --git a/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java
new file mode 100644
index 0000000000..3e65476232
--- /dev/null
+++ b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2018 Lightbend Inc.
+ */
+
+package docs.akka.stream.typed;
+
+// #actor-source-ref
+import akka.actor.typed.ActorRef;
+import akka.japi.JavaPartialFunction;
+import akka.stream.ActorMaterializer;
+import akka.stream.OverflowStrategy;
+import akka.stream.javadsl.Sink;
+import akka.stream.javadsl.Source;
+import akka.stream.typed.javadsl.ActorSource;
+// #actor-source-ref
+
+public class ActorSourceExample {
+
+ // #actor-source-ref
+
+ interface Protocol {}
+ class Message implements Protocol {
+ private final String msg;
+ public Message(String msg) {
+ this.msg = msg;
+ }
+ }
+ class Complete implements Protocol {}
+ class Fail implements Protocol {
+ private final Exception ex;
+ public Fail(Exception ex) {
+ this.ex = ex;
+ }
+ }
+ // #actor-source-ref
+
+ final ActorMaterializer mat = null;
+
+ {
+ // #actor-source-ref
+
+ final JavaPartialFunction failureMatcher =
+ new JavaPartialFunction() {
+ public Throwable apply(Protocol p, boolean isCheck) {
+ if (p instanceof Fail) {
+ return ((Fail)p).ex;
+ } else {
+ throw noMatch();
+ }
+ }
+ };
+
+ final Source> source = ActorSource.actorRef(
+ (m) -> m instanceof Complete,
+ failureMatcher,
+ 8,
+ OverflowStrategy.fail()
+ );
+
+ final ActorRef ref = source.collect(new JavaPartialFunction() {
+ public String apply(Protocol p, boolean isCheck) {
+ if (p instanceof Message) {
+ return ((Message)p).msg;
+ } else {
+ throw noMatch();
+ }
+ }
+ }).to(Sink.foreach(System.out::println)).run(mat);
+
+ ref.tell(new Message("msg1"));
+ // ref.tell("msg2"); Does not compile
+ // #actor-source-ref
+ }
+}
\ No newline at end of file
diff --git a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala
index 0141711b44..8d5ba79249 100644
--- a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala
+++ b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala
@@ -1,9 +1,9 @@
-/*
- * Copyright (C) 2017 Lightbend Inc.
+/**
+ * Copyright (C) 2018 Lightbend Inc.
*/
package akka.stream.typed.scaladsl
-import akka.actor.typed.scaladsl.Actor
+import akka.actor.typed.scaladsl.Behaviors
import akka.stream.OverflowStrategy
import akka.actor.typed.{ ActorRef, ActorSystem }
import akka.testkit.TestKit
@@ -59,20 +59,20 @@ class ActorSourceSinkSpec extends TestKit(akka.actor.ActorSystem("ActorSourceSin
"obey protocol" in {
val p = TestProbe[AckProto]()
- val autoPilot = Actor.immutable[AckProto] {
+ val autoPilot = Behaviors.immutable[AckProto] {
(ctx, msg) ⇒
msg match {
case m @ Init(sender) ⇒
p.ref ! m
sender ! "ACK"
- Actor.same
+ Behaviors.same
case m @ Msg(sender, _) ⇒
p.ref ! m
sender ! "ACK"
- Actor.same
+ Behaviors.same
case m ⇒
p.ref ! m
- Actor.same
+ Behaviors.same
}
}
diff --git a/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala b/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala
new file mode 100644
index 0000000000..ae34b2caa5
--- /dev/null
+++ b/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala
@@ -0,0 +1,98 @@
+/**
+ * Copyright (C) 2018 Lightbend Inc.
+ */
+
+package docs.akka.stream.typed
+
+import akka.NotUsed
+import akka.stream.ActorMaterializer
+
+object ActorSourceSinkExample {
+
+ implicit val mat: ActorMaterializer = ???
+
+ {
+ // #actor-source-ref
+ import akka.actor.typed.ActorRef
+ import akka.stream.OverflowStrategy
+ import akka.stream.scaladsl.{ Sink, Source }
+ import akka.stream.typed.scaladsl.ActorSource
+
+ trait Protocol
+ case class Message(msg: String) extends Protocol
+ case object Complete extends Protocol
+ case class Fail(ex: Exception) extends Protocol
+
+ val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](
+ completionMatcher = {
+ case Complete ⇒
+ },
+ failureMatcher = {
+ case Fail(ex) ⇒ ex
+ },
+ bufferSize = 8,
+ overflowStrategy = OverflowStrategy.fail
+ )
+
+ val ref = source.collect {
+ case Message(msg) ⇒ msg
+ }.to(Sink.foreach(println)).run()
+
+ ref ! Message("msg1")
+ // ref ! "msg2" Does not compile
+ // #actor-source-ref
+ }
+
+ {
+ // #actor-sink-ref
+ import akka.actor.typed.ActorRef
+ import akka.stream.scaladsl.{ Sink, Source }
+ import akka.stream.typed.scaladsl.ActorSink
+
+ trait Protocol
+ case class Message(msg: String) extends Protocol
+ case object Complete extends Protocol
+ case class Fail(ex: Throwable) extends Protocol
+
+ val actor: ActorRef[Protocol] = ???
+
+ val sink: Sink[Protocol, NotUsed] = ActorSink.actorRef[Protocol](
+ ref = actor,
+ onCompleteMessage = Complete,
+ onFailureMessage = Fail.apply
+ )
+
+ Source.single(Message("msg1")).runWith(sink)
+ // #actor-sink-ref
+ }
+
+ {
+ // #actor-sink-ref-with-ack
+ import akka.actor.typed.ActorRef
+ import akka.stream.scaladsl.{ Sink, Source }
+ import akka.stream.typed.scaladsl.ActorSink
+
+ trait Ack
+ object Ack extends Ack
+
+ trait Protocol
+ case class Init(ackTo: ActorRef[Ack]) extends Protocol
+ case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol
+ case object Complete extends Protocol
+ case class Fail(ex: Throwable) extends Protocol
+
+ val actor: ActorRef[Protocol] = ???
+
+ val sink: Sink[String, NotUsed] = ActorSink.actorRefWithAck(
+ ref = actor,
+ onCompleteMessage = Complete,
+ onFailureMessage = Fail.apply,
+ messageAdapter = Message.apply,
+ onInitMessage = Init.apply,
+ ackMessage = Ack
+ )
+
+ Source.single("msg1").runWith(sink)
+ // #actor-sink-ref-with-ack
+ }
+}
diff --git a/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes
index 6cc828a75b..af2a648a6b 100644
--- a/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes
+++ b/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes
@@ -3,3 +3,12 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Restart
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffFlow.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffLogic.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffSource.this")
+
+# #23604 Typed stream adapters
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSinkActor.props")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSink.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSinkActor.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSourceActor.props")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSource.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefBackpressureSinkStage.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSourceActor.this")
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
index c3cc53c0f4..f00da21463 100644
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
@@ -6,6 +6,7 @@ package akka.stream.scaladsl
import akka.{ Done, NotUsed }
import akka.dispatch.ExecutionContexts
import akka.actor.{ ActorRef, Props, Status }
+import akka.annotation.InternalApi
import akka.stream.actor.ActorSubscriber
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl._
@@ -369,6 +370,8 @@ object Sink {
}
/**
+ * INTERNAL API
+ *
* Sends the elements of the stream to the given `ActorRef`.
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
@@ -383,7 +386,7 @@ object Sink {
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
* limiting stage in front of this `Sink`.
*/
- def actorRef[T](ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable ⇒ Any): Sink[T, NotUsed] =
+ @InternalApi private[akka] def actorRef[T](ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable ⇒ Any): Sink[T, NotUsed] =
fromGraph(new ActorRefSink(ref, onCompleteMessage, onFailureMessage,
DefaultAttributes.actorRefSink, shape("ActorRefSink")))
@@ -402,12 +405,13 @@ object Sink {
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
* limiting stage in front of this `Sink`.
*/
- @deprecated("Use `actorRef` that takes onFailureMessage instead. It allows controling the message that will be sent to the actor on failure.", since = "2.5.10")
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed] =
fromGraph(new ActorRefSink(ref, onCompleteMessage, t ⇒ Status.Failure(t),
DefaultAttributes.actorRefSink, shape("ActorRefSink")))
/**
+ * INTERNAL API
+ *
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is created by calling `onInitMessage` with an `ActorRef` of the actor that
* expects acknowledgements. Then stream is waiting for acknowledgement message
@@ -425,9 +429,9 @@ object Sink {
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor.
*/
- def actorRefWithAck[T](ref: ActorRef, messageAdapter: ActorRef ⇒ T ⇒ Any,
- onInitMessage: ActorRef ⇒ Any, ackMessage: Any, onCompleteMessage: Any,
- onFailureMessage: (Throwable) ⇒ Any): Sink[T, NotUsed] =
+ @InternalApi private[akka] def actorRefWithAck[T](ref: ActorRef, messageAdapter: ActorRef ⇒ T ⇒ Any,
+ onInitMessage: ActorRef ⇒ Any, ackMessage: Any, onCompleteMessage: Any,
+ onFailureMessage: (Throwable) ⇒ Any): Sink[T, NotUsed] =
Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, messageAdapter, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage))
/**
@@ -443,9 +447,7 @@ object Sink {
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor.
*
- * @deprecated Use `actorRefWithAck` that takes `messageAdapter` instead. It allows capturing the original sender of the messages as well as transforming messages before sending them to the actor under the `ref`.
*/
- @deprecated("Use `actorRefWithAck` that takes `messageAdapter` instead. It allows capturing the original sender of the messages as well as transforming messages before sending them to the actor under the `ref`.", since = "2.5.10")
def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any,
onFailureMessage: (Throwable) ⇒ Any = Status.Failure): Sink[T, NotUsed] =
actorRefWithAck(ref, _ ⇒ identity, _ ⇒ onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
index 650a13b09e..7e9dd39eec 100644
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
@@ -6,6 +6,7 @@ package akka.stream.scaladsl
import java.util.concurrent.CompletionStage
import akka.actor.{ ActorRef, Cancellable, Props }
+import akka.annotation.InternalApi
import akka.stream.actor.ActorPublisher
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.fusing.GraphStages
@@ -431,6 +432,8 @@ object Source {
}
/**
+ * INTERNAL API
+ *
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received.
@@ -462,7 +465,7 @@ object Source {
* @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
- def actorRef[T](
+ @InternalApi private[akka] def actorRef[T](
completionMatcher: PartialFunction[Any, Unit],
failureMatcher: PartialFunction[Any, Throwable],
bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = {
@@ -500,12 +503,10 @@ object Source {
*
* See also [[akka.stream.scaladsl.Source.queue]].
*
- * @deprecated Use `actorRef` that takes matchers instead. It allows controlling the completion and failure messages that are sent to the actor.
*
* @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
- @deprecated("Use `actorRef` that takes matchers instead. It allows controlling the messages that are used for completion and failure.", since = "2.5.10")
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] =
actorRef(
{ case akka.actor.Status.Success(_) ⇒ },
diff --git a/build.sbt b/build.sbt
index 2023878170..ec9cd5c2f0 100644
--- a/build.sbt
+++ b/build.sbt
@@ -435,6 +435,7 @@ lazy val streamTyped = akkaModule("akka-stream-typed")
.settings(AkkaBuild.mayChangeSettings)
.settings(AutomaticModuleName.settings("akka.stream.typed"))
.disablePlugins(MimaPlugin)
+ .enablePlugins(ScaladocNoVerificationOfDiagrams)
lazy val typedTestkit = akkaModule("akka-testkit-typed")
.dependsOn(actorTyped, testkit % "compile->compile;test->test")