From 171bb6c231482eb60f8ccc1bb5aaf84f654dad30 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Thu, 7 Sep 2017 21:07:41 +0200 Subject: [PATCH 1/2] WIP initial typed streams adapters Adapt ref source and sink for typed * do not use the typed teskit temporarily --- .../akka/stream/typed/ActorMaterializer.scala | 57 ++++++++ .../akka/stream/typed/javadsl/ActorSink.scala | 57 ++++++++ .../stream/typed/javadsl/ActorSource.scala | 56 ++++++++ .../stream/typed/scaladsl/ActorSink.scala | 61 +++++++++ .../stream/typed/scaladsl/ActorSource.scala | 58 +++++++++ .../javadsl/ActorSourceSinkCompileTest.java | 87 +++++++++++++ .../typed/scaladsl/ActorSourceSinkSpec.scala | 123 ++++++++++++++++++ .../impl/ActorRefBackpressureSinkStage.scala | 15 ++- .../akka/stream/impl/ActorRefSinkActor.scala | 9 +- .../stream/impl/ActorRefSourceActor.scala | 27 ++-- .../main/scala/akka/stream/impl/Modules.scala | 13 +- .../main/scala/akka/stream/impl/Sinks.scala | 8 +- .../main/scala/akka/stream/javadsl/Sink.scala | 2 +- .../scala/akka/stream/scaladsl/Sink.scala | 51 +++++++- .../scala/akka/stream/scaladsl/Source.scala | 54 +++++++- build.sbt | 18 ++- 16 files changed, 657 insertions(+), 39 deletions(-) create mode 100644 akka-stream-typed/src/main/scala/akka/stream/typed/ActorMaterializer.scala create mode 100644 akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala create mode 100644 akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala create mode 100644 akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala create mode 100644 akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala create mode 100644 akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java create mode 100644 akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/ActorMaterializer.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/ActorMaterializer.scala new file mode 100644 index 0000000000..df3c6b32ab --- /dev/null +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/ActorMaterializer.scala @@ -0,0 +1,57 @@ +package akka.stream.typed + +import akka.actor.typed.ActorSystem +import akka.stream.ActorMaterializerSettings + +object ActorMaterializer { + import akka.actor.typed.scaladsl.adapter._ + + /** + * Scala API: Creates an ActorMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.typed.ActorSystem]] + * will be used to create one actor that in turn creates actors for the transformation steps. + * + * The materializer's [[akka.stream.ActorMaterializerSettings]] will be obtained from the + * configuration of the `context`'s underlying [[akka.actor.typed.ActorSystem]]. + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def apply[T](materializerSettings: Option[ActorMaterializerSettings] = None, namePrefix: Option[String] = None)(implicit actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer = + akka.stream.ActorMaterializer(materializerSettings, namePrefix)(actorSystem.toUntyped) + + /** + * Java API: Creates an ActorMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.typed.ActorSystem]] + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + * + * Defaults the actor name prefix used to name actors running the processing steps to `"flow"`. + * The actor names are built up of `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def create[T](actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer = + apply()(actorSystem) + + /** + * Java API: Creates an ActorMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.typed.ActorSystem]] + * will be used to create one actor that in turn creates actors for the transformation steps. + */ + def create[T](settings: ActorMaterializerSettings, actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer = + apply(Option(settings), None)(actorSystem) + + /** + * Java API: Creates an ActorMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.typed.ActorSystem]] + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def create[T](settings: ActorMaterializerSettings, namePrefix: String, actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer = + apply(Option(settings), Option(namePrefix))(actorSystem) + +} diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala new file mode 100644 index 0000000000..da75a2bc0e --- /dev/null +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.stream.typed.javadsl + +import akka.actor.typed._ +import akka.NotUsed +import akka.stream.scaladsl._ +import akka.stream.typed + +/** + * Collection of Sinks aimed at integrating with typed Actors. + */ +object ActorSink { + /** + * Sends the elements of the stream to the given `ActorRef`. + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure a the throwable that was signaled + * to the stream is adapted to the Actors protocol using `onFailureMessage` and + * then then sent to the destination actor. + * + * It will request at most `maxInputBufferSize` number of elements from + * upstream, but there is no back-pressure signal from the destination actor, + * i.e. if the actor is not consuming the messages fast enough the mailbox + * of the actor will grow. For potentially slow consumer actors it is recommended + * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate + * limiting stage in front of this `Sink`. + */ + def actorRef[T](ref: ActorRef[T], onCompleteMessage: T, onFailureMessage: akka.japi.function.Function[Throwable, T]): Sink[T, NotUsed] = + typed.scaladsl.ActorSink.actorRef(ref, onCompleteMessage, onFailureMessage.apply) + + /** + * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. + * First element is always `onInitMessage`, then stream is waiting for acknowledgement message + * `ackMessage` from the given actor which means that it is ready to process + * elements. It also requires `ackMessage` message after each stream element + * to make backpressure work. + * + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure - result of `onFailureMessage(throwable)` + * function will be sent to the destination actor. + */ + def actorRefWithAck[T, M, A]( + ref: ActorRef[M], + messageAdapter: akka.japi.function.Function2[ActorRef[A], T, M], + onInitMessage: akka.japi.function.Function[ActorRef[A], M], + ackMessage: A, + onCompleteMessage: M, + onFailureMessage: akka.japi.function.Function[Throwable, M]): Sink[T, NotUsed] = + typed.scaladsl.ActorSink.actorRefWithAck( + ref, messageAdapter.apply, onInitMessage.apply, ackMessage, onCompleteMessage, onFailureMessage.apply) + +} diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala new file mode 100644 index 0000000000..72ec384862 --- /dev/null +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ + +package akka.stream.typed.javadsl + +import akka.actor.typed._ +import akka.stream.OverflowStrategy +import akka.stream.javadsl._ + +/** + * Collection of Sources aimed at integrating with typed Actors. + */ +object ActorSource { + + /** + * Creates a `Source` that is materialized as an [[akka.actor.typed.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * otherwise they will be buffered until request for demand is received. + * + * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if + * there is no space available in the buffer. + * + * The strategy [[akka.stream.OverflowStrategy.backpressure]] is not supported, and an + * IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument. + * + * The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand + * from downstream. When `bufferSize` is 0 the `overflowStrategy` does not matter. An async boundary is added after + * this Source; as such, it is never safe to assume the downstream will always generate demand. + * + * The stream can be completed successfully by sending the actor reference a [[akka.actor.Status.Success]] + * (whose content will be ignored) in which case already buffered elements will be signaled before signaling + * completion, or by sending [[akka.actor.PoisonPill]] in which case completion will be signaled immediately. + * + * The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the + * actor reference. In case the Actor is still draining its internal buffer (after having received + * a [[akka.actor.Status.Success]]) before signaling completion and it receives a [[akka.actor.Status.Failure]], + * the failure will be signaled downstream immediately (instead of the completion signal). + * + * The actor will be stopped when the stream is completed, failed or canceled from downstream, + * i.e. you can watch it to get notified when that happens. + * + * See also [[akka.stream.javadsl.Source.queue]]. + * + * @param bufferSize The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def actorRef[T]( + completionMatcher: PartialFunction[T, Unit], + failureMatcher: PartialFunction[T, Throwable], + bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = { + akka.stream.typed.scaladsl.ActorSource.actorRef( + completionMatcher, failureMatcher, + bufferSize, overflowStrategy).asJava + } +} diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala new file mode 100644 index 0000000000..f3d5e405ac --- /dev/null +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.stream.typed.scaladsl + +import akka.actor.typed._ +import akka.stream.scaladsl._ +import akka.NotUsed + +/** + * Collection of Sinks aimed at integrating with typed Actors. + */ +object ActorSink { + import akka.actor.typed.scaladsl.adapter._ + + /** + * Sends the elements of the stream to the given `ActorRef`. + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure a the throwable that was signaled + * to the stream is adapted to the Actors protocol using `onFailureMessage` and + * then then sent to the destination actor. + * + * It will request at most `maxInputBufferSize` number of elements from + * upstream, but there is no back-pressure signal from the destination actor, + * i.e. if the actor is not consuming the messages fast enough the mailbox + * of the actor will grow. For potentially slow consumer actors it is recommended + * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate + * limiting stage in front of this `Sink`. + */ + def actorRef[T](ref: ActorRef[T], onCompleteMessage: T, onFailureMessage: Throwable ⇒ T): Sink[T, NotUsed] = + Sink.actorRef(ref.toUntyped, onCompleteMessage, onFailureMessage) + + /** + * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. + * First element is always `onInitMessage`, then stream is waiting for acknowledgement message + * `ackMessage` from the given actor which means that it is ready to process + * elements. It also requires `ackMessage` message after each stream element + * to make backpressure work. + * + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure - result of `onFailureMessage(throwable)` + * function will be sent to the destination actor. + */ + def actorRefWithAck[T, M, A]( + ref: ActorRef[M], + messageAdapter: (ActorRef[A], T) ⇒ M, + onInitMessage: ActorRef[A] ⇒ M, + ackMessage: A, + onCompleteMessage: M, + onFailureMessage: Throwable ⇒ M): Sink[T, NotUsed] = + Sink.actorRefWithAck( + ref.toUntyped, + messageAdapter.curried.compose(actorRefAdapter), + onInitMessage.compose(actorRefAdapter), + ackMessage, onCompleteMessage, onFailureMessage) + +} diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala new file mode 100644 index 0000000000..bc5539e329 --- /dev/null +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ + +package akka.stream.typed.scaladsl + +import akka.actor.typed._ +import akka.stream.OverflowStrategy +import akka.stream.scaladsl._ + +/** + * Collection of Sources aimed at integrating with typed Actors. + */ +object ActorSource { + + import akka.actor.typed.scaladsl.adapter._ + + /** + * Creates a `Source` that is materialized as an [[akka.actor.typed.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * otherwise they will be buffered until request for demand is received. + * + * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if + * there is no space available in the buffer. + * + * The strategy [[akka.stream.OverflowStrategy.backpressure]] is not supported, and an + * IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument. + * + * The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand + * from downstream. When `bufferSize` is 0 the `overflowStrategy` does not matter. An async boundary is added after + * this Source; as such, it is never safe to assume the downstream will always generate demand. + * + * The stream can be completed successfully by sending the actor reference a message that is matched by + * `completionMatcher` in which case already buffered elements will be signaled before signaling + * completion. + * + * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted + * [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received + * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, + * the failure will be signaled downstream immediately (instead of the completion signal). + * + * The actor will be stopped when the stream is completed, failed or canceled from downstream, + * i.e. you can watch it to get notified when that happens. + * + * See also [[akka.stream.scaladsl.Source.queue]]. + * + * @param bufferSize The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def actorRef[T]( + completionMatcher: PartialFunction[T, Unit], + failureMatcher: PartialFunction[T, Throwable], + bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = + Source.actorRef[T]( + completionMatcher.asInstanceOf[PartialFunction[Any, Unit]], + failureMatcher.asInstanceOf[PartialFunction[Any, Throwable]], + bufferSize, overflowStrategy).mapMaterializedValue(actorRefAdapter) +} diff --git a/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java b/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java new file mode 100644 index 0000000000..bf3d097f0c --- /dev/null +++ b/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java @@ -0,0 +1,87 @@ +package akka.stream.typed.javadsl; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.stream.ActorMaterializer; +import akka.stream.OverflowStrategy; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import scala.PartialFunction$; +import scala.runtime.AbstractPartialFunction; +import scala.runtime.BoxedUnit; + +public class ActorSourceSinkCompileTest { + + interface Protocol {} + class Init implements Protocol {} + class Msg implements Protocol {} + class Complete implements Protocol {} + class Failure implements Protocol { + public Exception ex; + } + + { + final ActorSystem system = null; + final ActorMaterializer mat = akka.stream.typed.ActorMaterializer.create(system); + } + + { + final ActorRef ref = null; + + Source.queue(10, OverflowStrategy.dropBuffer()) + .map(s -> s + "!") + .to(ActorSink.actorRef(ref, "DONE", ex -> "FAILED: " + ex.getMessage())); + } + + { + final ActorRef ref = null; + + Source.queue(10, OverflowStrategy.dropBuffer()) + .to(ActorSink.actorRefWithAck( + ref, + (sender, msg) -> new Init(), + (sender) -> new Msg(), + "ACK", + new Complete(), + (f) -> new Failure())); + } + + { + final AbstractPartialFunction completionMatcher = new AbstractPartialFunction() { + @Override + public boolean isDefinedAt(String s) { + return s == "complete"; + } + }; + + ActorSource + .actorRef( + completionMatcher, + PartialFunction$.MODULE$.empty(), // FIXME make the API nicer + 10, + OverflowStrategy.dropBuffer()) + .to(Sink.seq()); + } + + { + final AbstractPartialFunction failureMatcher = new AbstractPartialFunction() { + @Override + public boolean isDefinedAt(Protocol p) { + return p instanceof Failure; + } + + @Override + public Throwable apply(Protocol p) { + return ((Failure)p).ex; + } + }; + + ActorSource + .actorRef( + PartialFunction$.MODULE$.empty(), // FIXME make the API nicer + failureMatcher, 10, + OverflowStrategy.dropBuffer()) + .to(Sink.seq()); + } + +} diff --git a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala new file mode 100644 index 0000000000..0141711b44 --- /dev/null +++ b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala @@ -0,0 +1,123 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.stream.typed.scaladsl + +import akka.actor.typed.scaladsl.Actor +import akka.stream.OverflowStrategy +import akka.actor.typed.{ ActorRef, ActorSystem } +import akka.testkit.TestKit +import akka.testkit.typed.scaladsl._ +import akka.stream.scaladsl.{ Keep, Sink, Source } +import akka.stream.typed.ActorMaterializer +import akka.testkit.typed.TestKitSettings +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } +import org.scalatest.concurrent.ScalaFutures + +object ActorSourceSinkSpec { + + sealed trait AckProto + case class Init(sender: ActorRef[String]) extends AckProto + case class Msg(sender: ActorRef[String], msg: String) extends AckProto + case object Complete extends AckProto + case object Failed extends AckProto +} + +class ActorSourceSinkSpec extends TestKit(akka.actor.ActorSystem("ActorSourceSinkSpec")) with WordSpecLike with BeforeAndAfterAll with Matchers with ScalaFutures { + import ActorSourceSinkSpec._ + import akka.actor.typed.scaladsl.adapter._ + + // FIXME use Typed Teskit + // The materializer creates a top-level actor when materializing a stream. + // Currently that is not supported, because a Typed Teskit uses a typed actor system + // with a custom guardian. Because of custom guardian, an exception is being thrown + // when trying to create a top level actor during materialization. + implicit val sys = ActorSystem.wrap(system) + implicit val testkitSettings = TestKitSettings(sys) + implicit val mat = ActorMaterializer() + + override protected def afterAll(): Unit = + sys.terminate() + + "ActorSink" should { + + "accept messages" in { + val p = TestProbe[String]() + + val in = + Source.queue[String](10, OverflowStrategy.dropBuffer) + .map(_ + "!") + .to(ActorSink.actorRef(p.ref, "DONE", ex ⇒ "FAILED: " + ex.getMessage)) + .run() + + val msg = "Zug zug" + + in.offer(msg) + p.expectMsg(msg + "!") + } + + "obey protocol" in { + val p = TestProbe[AckProto]() + + val autoPilot = Actor.immutable[AckProto] { + (ctx, msg) ⇒ + msg match { + case m @ Init(sender) ⇒ + p.ref ! m + sender ! "ACK" + Actor.same + case m @ Msg(sender, _) ⇒ + p.ref ! m + sender ! "ACK" + Actor.same + case m ⇒ + p.ref ! m + Actor.same + } + } + + val pilotRef: ActorRef[AckProto] = system.actorOf(PropsAdapter(autoPilot)) + + val in = + Source.queue[String](10, OverflowStrategy.dropBuffer) + .to(ActorSink.actorRefWithAck(pilotRef, Msg.apply, Init.apply, "ACK", Complete, _ ⇒ Failed)) + .run() + + p.expectMsgType[Init] + + in.offer("Dabu!") + p.expectMsgType[Msg].msg shouldBe "Dabu!" + + in.offer("Lok'tar!") + p.expectMsgType[Msg].msg shouldBe "Lok'tar!" + + in.offer("Swobu!") + p.expectMsgType[Msg].msg shouldBe "Swobu!" + } + } + + "ActorSource" should { + "send messages and complete" in { + val (in, out) = ActorSource.actorRef[String]({ case "complete" ⇒ }, PartialFunction.empty, 10, OverflowStrategy.dropBuffer) + .toMat(Sink.seq)(Keep.both) + .run() + + in ! "one" + in ! "two" + in ! "complete" + + out.futureValue should contain theSameElementsAs Seq("one", "two") + } + + "fail the stream" in { + val (in, out) = ActorSource.actorRef[String](PartialFunction.empty, { case msg ⇒ new Error(msg) }, 10, OverflowStrategy.dropBuffer) + .toMat(Sink.seq)(Keep.both) + .run() + + in ! "boom!" + + out.failed.futureValue.getCause.getMessage shouldBe "boom!" + } + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala index bd8fd86594..c2affc9e4e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala @@ -15,10 +15,13 @@ import akka.stream.stage._ /** * INTERNAL API */ -@InternalApi private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessage: Any, - ackMessage: Any, - onCompleteMessage: Any, - onFailureMessage: (Throwable) ⇒ Any) +@InternalApi private[akka] class ActorRefBackpressureSinkStage[In]( + ref: ActorRef, + messageAdapter: ActorRef ⇒ In ⇒ Any, + onInitMessage: ActorRef ⇒ Any, + ackMessage: Any, + onCompleteMessage: Any, + onFailureMessage: (Throwable) ⇒ Any) extends GraphStage[SinkShape[In]] { val in: Inlet[In] = Inlet[In]("ActorRefBackpressureSink.in") override def initialAttributes = DefaultAttributes.actorRefWithAck @@ -55,12 +58,12 @@ import akka.stream.stage._ override def preStart() = { setKeepGoing(true) getStageActor(receive).watch(ref) - ref ! onInitMessage + ref ! onInitMessage(self) pull(in) } private def dequeueAndSend(): Unit = { - ref ! buffer.poll() + ref ! messageAdapter(self)(buffer.poll()) if (buffer.isEmpty && completeReceived) finish() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala index 6859d7dd5a..e487c3acaf 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala @@ -16,14 +16,15 @@ import akka.annotation.InternalApi * INTERNAL API */ @InternalApi private[akka] object ActorRefSinkActor { - def props(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any): Props = - Props(new ActorRefSinkActor(ref, highWatermark, onCompleteMessage)) + def props(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any, onFailureMessage: Throwable ⇒ Any): Props = + Props(new ActorRefSinkActor(ref, highWatermark, onCompleteMessage, onFailureMessage)) } /** * INTERNAL API */ -@InternalApi private[akka] class ActorRefSinkActor(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any) extends ActorSubscriber { +@InternalApi private[akka] class ActorRefSinkActor(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any, onFailureMessage: Throwable ⇒ Any) + extends ActorSubscriber { import ActorSubscriberMessage._ override val requestStrategy = WatermarkRequestStrategy(highWatermark) @@ -34,7 +35,7 @@ import akka.annotation.InternalApi case OnNext(elem) ⇒ ref.tell(elem, ActorRef.noSender) case OnError(cause) ⇒ - ref.tell(Status.Failure(cause), ActorRef.noSender) + ref.tell(onFailureMessage(cause), ActorRef.noSender) context.stop(self) case OnComplete ⇒ ref.tell(onCompleteMessage, ActorRef.noSender) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala index 356ba3647c..a0eb40ef20 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala @@ -15,17 +15,20 @@ import akka.stream.ActorMaterializerSettings * INTERNAL API */ @InternalApi private[akka] object ActorRefSourceActor { - def props(bufferSize: Int, overflowStrategy: OverflowStrategy, settings: ActorMaterializerSettings) = { + def props(completionMatcher: PartialFunction[Any, Unit], failureMatcher: PartialFunction[Any, Throwable], + bufferSize: Int, overflowStrategy: OverflowStrategy, settings: ActorMaterializerSettings) = { require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported") val maxFixedBufferSize = settings.maxFixedBufferSize - Props(new ActorRefSourceActor(bufferSize, overflowStrategy, maxFixedBufferSize)) + Props(new ActorRefSourceActor(completionMatcher, failureMatcher, bufferSize, overflowStrategy, maxFixedBufferSize)) } } /** * INTERNAL API */ -@InternalApi private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: OverflowStrategy, maxFixedBufferSize: Int) +@InternalApi private[akka] class ActorRefSourceActor( + completionMatcher: PartialFunction[Any, Unit], failureMatcher: PartialFunction[Any, Throwable], + bufferSize: Int, overflowStrategy: OverflowStrategy, maxFixedBufferSize: Int) extends akka.stream.actor.ActorPublisher[Any] with ActorLogging { import akka.stream.actor.ActorPublisherMessage._ @@ -35,15 +38,21 @@ import akka.stream.ActorMaterializerSettings def receive = ({ case Cancel ⇒ context.stop(self) + }: Receive) + .orElse(requestElem) + .orElse(receiveFailure) + .orElse(receiveComplete) + .orElse(receiveElem) - case _: Status.Success ⇒ - if (bufferSize == 0 || buffer.isEmpty) context.stop(self) // will complete the stream successfully - else context.become(drainBufferThenComplete) + def receiveComplete: Receive = completionMatcher.andThen { _ ⇒ + if (bufferSize == 0 || buffer.isEmpty) context.stop(self) // will complete the stream successfully + else context.become(drainBufferThenComplete) + } - case Status.Failure(cause) if isActive ⇒ + def receiveFailure: Receive = failureMatcher.andThen { cause ⇒ + if (isActive) onErrorThenStop(cause) - - }: Receive).orElse(requestElem).orElse(receiveElem) + } def requestElem: Receive = { case _: Request ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index a590ea0a61..c922582dea 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -99,19 +99,24 @@ import akka.util.OptionVal * INTERNAL API */ @InternalApi private[akka] final class ActorRefSource[Out]( - bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: Attributes, shape: SourceShape[Out]) + completionMatcher: PartialFunction[Any, Unit], + failureMatcher: PartialFunction[Any, Throwable], + bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) { override protected def label: String = s"ActorRefSource($bufferSize, $overflowStrategy)" override def create(context: MaterializationContext) = { val mat = ActorMaterializerHelper.downcast(context.materializer) - val ref = mat.actorOf(context, ActorRefSourceActor.props(bufferSize, overflowStrategy, mat.settings)) + val ref = mat.actorOf(context, ActorRefSourceActor.props( + completionMatcher, + failureMatcher, + bufferSize, overflowStrategy, mat.settings)) (akka.stream.actor.ActorPublisher[Out](ref), ref) } override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = - new ActorRefSource[Out](bufferSize, overflowStrategy, attributes, shape) + new ActorRefSource[Out](completionMatcher, failureMatcher, bufferSize, overflowStrategy, attributes, shape) override def withAttributes(attr: Attributes): SourceModule[Out, ActorRef] = - new ActorRefSource(bufferSize, overflowStrategy, attr, amendShape(attr)) + new ActorRefSource(completionMatcher, failureMatcher, bufferSize, overflowStrategy, attr, amendShape(attr)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index b0a9cca2e9..badf68c3f0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -168,7 +168,7 @@ import scala.collection.generic.CanBuildFrom /** * INTERNAL API */ -@InternalApi private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any, +@InternalApi private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable ⇒ Any, val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) { @@ -177,14 +177,14 @@ import scala.collection.generic.CanBuildFrom val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max val subscriberRef = actorMaterializer.actorOf( context, - ActorRefSinkActor.props(ref, maxInputBufferSize, onCompleteMessage)) + ActorRefSinkActor.props(ref, maxInputBufferSize, onCompleteMessage, onFailureMessage)) (akka.stream.actor.ActorSubscriber[In](subscriberRef), NotUsed) } override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] = - new ActorRefSink[In](ref, onCompleteMessage, attributes, shape) + new ActorRefSink[In](ref, onCompleteMessage, onFailureMessage, attributes, shape) override def withAttributes(attr: Attributes): SinkModule[In, NotUsed] = - new ActorRefSink[In](ref, onCompleteMessage, attr, amendShape(attr)) + new ActorRefSink[In](ref, onCompleteMessage, onFailureMessage, attr, amendShape(attr)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 20c482dcf6..a4d9c6cc0d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -210,7 +210,7 @@ object Sink { */ def actorRefWithAck[In](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any, onFailureMessage: function.Function[Throwable, Any]): Sink[In, NotUsed] = - new Sink(scaladsl.Sink.actorRefWithAck[In](ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage.apply)) + new Sink(scaladsl.Sink.actorRefWithAck[In](ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage.apply _)) /** * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index de29f07f95..c3cc53c0f4 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -368,6 +368,25 @@ object Sink { Flow[T].via(newOnCompleteStage()).to(Sink.ignore).named("onCompleteSink") } + /** + * Sends the elements of the stream to the given `ActorRef`. + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure the `onFailureMessage` will be invoked + * and its result will be sent to the destination actor. + * + * It will request at most `maxInputBufferSize` number of elements from + * upstream, but there is no back-pressure signal from the destination actor, + * i.e. if the actor is not consuming the messages fast enough the mailbox + * of the actor will grow. For potentially slow consumer actors it is recommended + * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate + * limiting stage in front of this `Sink`. + */ + def actorRef[T](ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable ⇒ Any): Sink[T, NotUsed] = + fromGraph(new ActorRefSink(ref, onCompleteMessage, onFailureMessage, + DefaultAttributes.actorRefSink, shape("ActorRefSink"))) + /** * Sends the elements of the stream to the given `ActorRef`. * If the target actor terminates the stream will be canceled. @@ -383,8 +402,33 @@ object Sink { * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate * limiting stage in front of this `Sink`. */ + @deprecated("Use `actorRef` that takes onFailureMessage instead. It allows controling the message that will be sent to the actor on failure.", since = "2.5.10") def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed] = - fromGraph(new ActorRefSink(ref, onCompleteMessage, DefaultAttributes.actorRefSink, shape("ActorRefSink"))) + fromGraph(new ActorRefSink(ref, onCompleteMessage, t ⇒ Status.Failure(t), + DefaultAttributes.actorRefSink, shape("ActorRefSink"))) + + /** + * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. + * First element is created by calling `onInitMessage` with an `ActorRef` of the actor that + * expects acknowledgements. Then stream is waiting for acknowledgement message + * `ackMessage` from the given actor which means that it is ready to process + * elements. It also requires `ackMessage` message after each stream element + * to make backpressure work. + * + * Every message that is sent to the actor is first transformed using `messageAdapter`. + * This can be used to capture the ActorRef of the actor that expects acknowledgments as + * well as transforming messages from the stream to the ones that actor under `ref` handles. + * + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure - result of `onFailureMessage(throwable)` + * function will be sent to the destination actor. + */ + def actorRefWithAck[T](ref: ActorRef, messageAdapter: ActorRef ⇒ T ⇒ Any, + onInitMessage: ActorRef ⇒ Any, ackMessage: Any, onCompleteMessage: Any, + onFailureMessage: (Throwable) ⇒ Any): Sink[T, NotUsed] = + Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, messageAdapter, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)) /** * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. @@ -398,10 +442,13 @@ object Sink { * will be sent to the destination actor. * When the stream is completed with failure - result of `onFailureMessage(throwable)` * function will be sent to the destination actor. + * + * @deprecated Use `actorRefWithAck` that takes `messageAdapter` instead. It allows capturing the original sender of the messages as well as transforming messages before sending them to the actor under the `ref`. */ + @deprecated("Use `actorRefWithAck` that takes `messageAdapter` instead. It allows capturing the original sender of the messages as well as transforming messages before sending them to the actor under the `ref`.", since = "2.5.10") def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any, onFailureMessage: (Throwable) ⇒ Any = Status.Failure): Sink[T, NotUsed] = - Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)) + actorRefWithAck(ref, _ ⇒ identity, _ ⇒ onInitMessage, ackMessage, onCompleteMessage, onFailureMessage) /** * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 1cf78b5383..650a13b09e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -430,6 +430,47 @@ object Source { fromGraph(new ActorPublisherSource(props, DefaultAttributes.actorPublisherSource, shape("ActorPublisherSource"))) } + /** + * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * otherwise they will be buffered until request for demand is received. + * + * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if + * there is no space available in the buffer. + * + * The strategy [[akka.stream.OverflowStrategy.backpressure]] is not supported, and an + * IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument. + * + * The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand + * from downstream. When `bufferSize` is 0 the `overflowStrategy` does not matter. An async boundary is added after + * this Source; as such, it is never safe to assume the downstream will always generate demand. + * + * The stream can be completed successfully by sending the actor reference a message that is matched by + * `completionMatcher` in which case already buffered elements will be signaled before signaling + * completion, or by sending [[akka.actor.PoisonPill]] in which case completion will be signaled immediately. + * + * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted + * [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received + * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, + * the failure will be signaled downstream immediately (instead of the completion signal). + * + * The actor will be stopped when the stream is completed, failed or canceled from downstream, + * i.e. you can watch it to get notified when that happens. + * + * See also [[akka.stream.scaladsl.Source.queue]]. + * + * @param bufferSize The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def actorRef[T]( + completionMatcher: PartialFunction[Any, Unit], + failureMatcher: PartialFunction[Any, Throwable], + bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = { + require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") + require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported") + fromGraph(new ActorRefSource(completionMatcher, failureMatcher, bufferSize, overflowStrategy, DefaultAttributes.actorRefSource, shape("ActorRefSource"))) + } + /** * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. * Messages sent to this actor will be emitted to the stream if there is demand from downstream, @@ -459,14 +500,17 @@ object Source { * * See also [[akka.stream.scaladsl.Source.queue]]. * + * @deprecated Use `actorRef` that takes matchers instead. It allows controlling the completion and failure messages that are sent to the actor. + * * @param bufferSize The size of the buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ - def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = { - require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") - require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported") - fromGraph(new ActorRefSource(bufferSize, overflowStrategy, DefaultAttributes.actorRefSource, shape("ActorRefSource"))) - } + @deprecated("Use `actorRef` that takes matchers instead. It allows controlling the messages that are used for completion and failure.", since = "2.5.10") + def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = + actorRef( + { case akka.actor.Status.Success(_) ⇒ }, + { case akka.actor.Status.Failure(cause) ⇒ cause }, + bufferSize, overflowStrategy) /** * Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`. diff --git a/build.sbt b/build.sbt index daac3b402d..2023878170 100644 --- a/build.sbt +++ b/build.sbt @@ -37,7 +37,10 @@ lazy val aggregatedProjects: Seq[ProjectReference] = Seq( slf4j, stream, streamTestkit, streamTests, streamTestsTck, testkit, - actorTyped, actorTypedTests, typedTestkit, persistenceTyped, clusterTyped, clusterShardingTyped + actorTyped, actorTypedTests, typedTestkit, + persistenceTyped, + clusterTyped, clusterShardingTyped, + streamTyped ) lazy val root = Project( @@ -385,7 +388,6 @@ lazy val persistenceTyped = akkaModule("akka-persistence-typed") .dependsOn( actorTyped, persistence, - testkit % "test->test", typedTestkit % "test->test", actorTypedTests % "test->test" ) @@ -401,7 +403,6 @@ lazy val clusterTyped = akkaModule("akka-cluster-typed") distributedData, persistence % "provided->test", persistenceTyped % "provided->test", - testkit % "test->test", typedTestkit % "test->test", actorTypedTests % "test->test" ) @@ -414,7 +415,6 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed") clusterTyped, persistenceTyped, clusterSharding, - testkit % "test->test", typedTestkit % "test->test", actorTypedTests % "test->test", persistenceTyped % "test->test" @@ -425,6 +425,16 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed") .settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf" )) .disablePlugins(MimaPlugin) +lazy val streamTyped = akkaModule("akka-stream-typed") + .dependsOn( + actorTyped, + stream, + typedTestkit % "test->test", + actorTypedTests % "test->test" + ) + .settings(AkkaBuild.mayChangeSettings) + .settings(AutomaticModuleName.settings("akka.stream.typed")) + .disablePlugins(MimaPlugin) lazy val typedTestkit = akkaModule("akka-testkit-typed") .dependsOn(actorTyped, testkit % "compile->compile;test->test") From cbe0215c41c33063ef2dc22296ae48ed2904faf9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martynas=20Mickevi=C4=8Dius?= Date: Fri, 19 Jan 2018 19:22:40 +0700 Subject: [PATCH 2/2] Typed stream adapters, #23604 * Change more general factories to private * Typed Streams docs * Remove BoxedUnit from Java Api * Use JavaPartialFunction in Java examples * Doc wording improvements, formatting fixes, no verification diagrams --- akka-docs/src/main/paradox/typed/index.md | 1 + akka-docs/src/main/paradox/typed/stream.md | 53 ++++++++++ .../stream/DslFactoriesConsistencySpec.scala | 3 + .../akka/stream/typed/ActorMaterializer.scala | 3 + .../akka/stream/typed/javadsl/ActorSink.scala | 12 +-- .../stream/typed/javadsl/ActorSource.scala | 12 ++- .../stream/typed/scaladsl/ActorSink.scala | 4 +- .../stream/typed/scaladsl/ActorSource.scala | 4 +- .../javadsl/ActorSourceSinkCompileTest.java | 42 ++++---- .../akka/stream/typed/ActorSinkExample.java | 53 ++++++++++ .../stream/typed/ActorSinkWithAckExample.java | 66 +++++++++++++ .../akka/stream/typed/ActorSourceExample.java | 74 ++++++++++++++ .../typed/scaladsl/ActorSourceSinkSpec.scala | 14 +-- .../stream/typed/ActorSourceSinkExample.scala | 98 +++++++++++++++++++ .../mima-filters/2.5.9.backwards.excludes | 9 ++ .../scala/akka/stream/scaladsl/Sink.scala | 16 +-- .../scala/akka/stream/scaladsl/Source.scala | 7 +- build.sbt | 1 + 18 files changed, 419 insertions(+), 53 deletions(-) create mode 100644 akka-docs/src/main/paradox/typed/stream.md create mode 100644 akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkExample.java create mode 100644 akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java create mode 100644 akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java create mode 100644 akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala diff --git a/akka-docs/src/main/paradox/typed/index.md b/akka-docs/src/main/paradox/typed/index.md index 81e3aa90fe..6ff1753eb7 100644 --- a/akka-docs/src/main/paradox/typed/index.md +++ b/akka-docs/src/main/paradox/typed/index.md @@ -11,6 +11,7 @@ * [fault-tolerance](fault-tolerance.md) * [actor-discovery](actor-discovery.md) * [stash](stash.md) +* [stream](stream.md) * [cluster](cluster.md) * [cluster-singleton](cluster-singleton.md) * [cluster-sharding](cluster-sharding.md) diff --git a/akka-docs/src/main/paradox/typed/stream.md b/akka-docs/src/main/paradox/typed/stream.md new file mode 100644 index 0000000000..94eea947cc --- /dev/null +++ b/akka-docs/src/main/paradox/typed/stream.md @@ -0,0 +1,53 @@ +# Streams + +@@@ warning + +This module is currently marked as @ref:[may change](../common/may-change.md) in the sense + of being the subject of active research. This means that API or semantics can + change without warning or deprecation period and it is not recommended to use + this module in production just yet—you have been warned. + +@@@ + +@ref:[Akka Streams](../stream/index.md) make it easy to model type-safe message processing pipelines. With typed actors it is possible to connect streams to actors without loosing the type information. + +To use the typed stream source and sink factories add the following dependency: + +@@dependency [sbt,Maven,Gradle] { + group=com.typesafe.akka + artifact=akka-stream-typed_2.12 + version=$akka.version$ +} + +This dependency contains typed alternatives to the @ref:[already existing `ActorRef` sources and sinks](../stream/stream-integrations.md) together with a factory methods for @scala[@scaladoc[`ActorMaterializer`](akka.stream.typed.ActorMaterializer)]@java[@javadoc[`ActorMaterializer`](akka.stream.typed.ActorMaterializer)] which take a typed `ActorSystem`. + +The materializer created from these factory methods and sources together with sinks contained in this module can be mixed and matched with the original Akka Streams building blocks from the original module. + +## Actor Source + +A stream that is driven by messages sent to a particular actor can be started with @scala[@scaladoc[`ActorSource.actorRef`](akka.stream.typed.scaladsl.ActorSource#actorRef)]@java[@javadoc[`ActorSource.actorRef`](akka.stream.typed.javadsl.ActorSource#actorRef)]. This source materializes to a typed `ActorRef` which only accepts messages that are of the same type as the stream. + +Scala +: @@snip [ActorSourceSinkExample.scala]($akka$/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-source-ref } + +Java +: @@snip [ActorSourceExample.java]($akka$/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java) { #actor-source-ref } + + +## Actor Sink + +There are two sinks availabe that accept typed `ActorRef`s. To send all of the messages from a stream to an actor without considering backpressure, use @scala[@scaladoc[`ActorSink.actorRef`](akka.stream.typed.scaladsl.ActorSink#actorRef)]@java[@javadoc[`ActorSink.actorRef`](akka.stream.typed.javadsl.ActorSink#actorRef)]. + +Scala +: @@snip [ActorSourceSinkExample.scala]($akka$/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-sink-ref } + +Java +: @@snip [ActorSinkExample.java]($akka$/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkExample.java) { #actor-sink-ref } + +For an actor to be able to react to backpressure, a protocol needs to be introduced between the actor and the stream. Use @scala[@scaladoc[`ActorSink.actorRefWithAck`](akka.stream.typed.scaladsl.ActorSink#actorRefWithAck)]@java[@javadoc[`ActorSink.actorRefWithAck`](akka.stream.typed.javadsl.ActorSink#actorRefWithAck)] to be able to signal demand when the actor is ready to receive more elements. + +Scala +: @@snip [ActorSourceSinkExample.scala]($akka$/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-sink-ref-with-ack } + +Java +: @@snip [ActorSinkWithAckExample.java]($akka$/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java) { #actor-sink-ref-with-ack } diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index e4d350ee81..1cc5abaa66 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -117,6 +117,9 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { Ignore(_ == akka.stream.scaladsl.Flow.getClass, _ == "apply", _ == 24, _ ⇒ true), Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "apply", _ == 24, _ ⇒ true), Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "collection", _ ⇒ true, _ ⇒ true), + Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRef", _ ⇒ true, _ ⇒ true), // Internal in scaladsl + Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "actorRefWithAck", _ ⇒ true, _ ⇒ true), // Internal in scaladsl + Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "actorRef", _ ⇒ true, _ ⇒ true), // Internal in scaladsl Ignore(_ == akka.stream.scaladsl.BidiFlow.getClass, _ == "apply", _ == 24, _ ⇒ true), Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "runnable", _ == 24, _ ⇒ true), Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "create", _ == 24, _ ⇒ true), diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/ActorMaterializer.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/ActorMaterializer.scala index df3c6b32ab..3fe14605eb 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/ActorMaterializer.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/ActorMaterializer.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ package akka.stream.typed import akka.actor.typed.ActorSystem diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala index da75a2bc0e..d3f78cdff7 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala @@ -1,11 +1,11 @@ -/* - * Copyright (C) 2017 Lightbend Inc. - */ +/** + * Copyright (C) 2018 Lightbend Inc. + */ package akka.stream.typed.javadsl import akka.actor.typed._ import akka.NotUsed -import akka.stream.scaladsl._ +import akka.stream.javadsl._ import akka.stream.typed /** @@ -29,7 +29,7 @@ object ActorSink { * limiting stage in front of this `Sink`. */ def actorRef[T](ref: ActorRef[T], onCompleteMessage: T, onFailureMessage: akka.japi.function.Function[Throwable, T]): Sink[T, NotUsed] = - typed.scaladsl.ActorSink.actorRef(ref, onCompleteMessage, onFailureMessage.apply) + typed.scaladsl.ActorSink.actorRef(ref, onCompleteMessage, onFailureMessage.apply).asJava /** * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. @@ -52,6 +52,6 @@ object ActorSink { onCompleteMessage: M, onFailureMessage: akka.japi.function.Function[Throwable, M]): Sink[T, NotUsed] = typed.scaladsl.ActorSink.actorRefWithAck( - ref, messageAdapter.apply, onInitMessage.apply, ackMessage, onCompleteMessage, onFailureMessage.apply) + ref, messageAdapter.apply, onInitMessage.apply, ackMessage, onCompleteMessage, onFailureMessage.apply).asJava } diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala index 72ec384862..0ca4011dc8 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala @@ -1,9 +1,11 @@ -/* - * Copyright (C) 2017 Lightbend Inc. +/** + * Copyright (C) 2018 Lightbend Inc. */ package akka.stream.typed.javadsl +import java.util.function.Predicate + import akka.actor.typed._ import akka.stream.OverflowStrategy import akka.stream.javadsl._ @@ -46,11 +48,11 @@ object ActorSource { * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ def actorRef[T]( - completionMatcher: PartialFunction[T, Unit], + completionMatcher: Predicate[T], failureMatcher: PartialFunction[T, Throwable], bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = { akka.stream.typed.scaladsl.ActorSource.actorRef( - completionMatcher, failureMatcher, - bufferSize, overflowStrategy).asJava + { case m if completionMatcher.test(m) ⇒ }: PartialFunction[T, Unit], + failureMatcher, bufferSize, overflowStrategy).asJava } } diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala index f3d5e405ac..8f978b4296 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala @@ -1,5 +1,5 @@ -/* - * Copyright (C) 2017 Lightbend Inc. +/** + * Copyright (C) 2018 Lightbend Inc. */ package akka.stream.typed.scaladsl diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala index bc5539e329..8bec128fbf 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala @@ -1,5 +1,5 @@ -/* - * Copyright (C) 2017 Lightbend Inc. +/** + * Copyright (C) 2018 Lightbend Inc. */ package akka.stream.typed.scaladsl diff --git a/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java b/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java index bf3d097f0c..6a6aec7037 100644 --- a/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java +++ b/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java @@ -1,14 +1,16 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + package akka.stream.typed.javadsl; import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; +import akka.japi.JavaPartialFunction; import akka.stream.ActorMaterializer; import akka.stream.OverflowStrategy; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; -import scala.PartialFunction$; -import scala.runtime.AbstractPartialFunction; -import scala.runtime.BoxedUnit; public class ActorSourceSinkCompileTest { @@ -47,38 +49,36 @@ public class ActorSourceSinkCompileTest { } { - final AbstractPartialFunction completionMatcher = new AbstractPartialFunction() { - @Override - public boolean isDefinedAt(String s) { - return s == "complete"; - } - }; - ActorSource .actorRef( - completionMatcher, - PartialFunction$.MODULE$.empty(), // FIXME make the API nicer + (m) -> m == "complete", + new JavaPartialFunction() { + @Override + public Throwable apply(String x, boolean isCheck) throws Exception { + throw noMatch(); + } + }, 10, OverflowStrategy.dropBuffer()) .to(Sink.seq()); } { - final AbstractPartialFunction failureMatcher = new AbstractPartialFunction() { + final JavaPartialFunction failureMatcher = new JavaPartialFunction() { @Override - public boolean isDefinedAt(Protocol p) { - return p instanceof Failure; - } - - @Override - public Throwable apply(Protocol p) { - return ((Failure)p).ex; + public Throwable apply(Protocol p, boolean isCheck) throws Exception { + if (p instanceof Failure) { + return ((Failure)p).ex; + } + else { + throw noMatch(); + } } }; ActorSource .actorRef( - PartialFunction$.MODULE$.empty(), // FIXME make the API nicer + (m) -> false, failureMatcher, 10, OverflowStrategy.dropBuffer()) .to(Sink.seq()); diff --git a/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkExample.java b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkExample.java new file mode 100644 index 0000000000..94717a8f68 --- /dev/null +++ b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkExample.java @@ -0,0 +1,53 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package docs.akka.stream.typed; + +// #actor-sink-ref +import akka.NotUsed; +import akka.actor.typed.ActorRef; +import akka.stream.ActorMaterializer; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.stream.typed.javadsl.ActorSink; +// #actor-sink-ref + +public class ActorSinkExample { + + // #actor-sink-ref + + interface Protocol {} + class Message implements Protocol { + private final String msg; + public Message(String msg) { + this.msg = msg; + } + } + class Complete implements Protocol {} + class Fail implements Protocol { + private final Throwable ex; + public Fail(Throwable ex) { + this.ex = ex; + } + } + // #actor-sink-ref + + final ActorMaterializer mat = null; + + { + // #actor-sink-ref + + final ActorRef actor = null; + + final Sink sink = ActorSink.actorRef( + actor, + new Complete(), + Fail::new + ); + + Source.single(new Message("msg1")).runWith(sink, mat); + // #actor-sink-ref + } + +} diff --git a/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java new file mode 100644 index 0000000000..1ccf886afd --- /dev/null +++ b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java @@ -0,0 +1,66 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package docs.akka.stream.typed; + +// #actor-sink-ref-with-ack +import akka.NotUsed; +import akka.actor.typed.ActorRef; +import akka.stream.ActorMaterializer; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.stream.typed.javadsl.ActorSink; +// #actor-sink-ref-with-ack + +public class ActorSinkWithAckExample { + + // #actor-sink-ref-with-ack + + class Ack {} + + interface Protocol {} + class Init implements Protocol { + private final ActorRef ack; + public Init(ActorRef ack) { + this.ack = ack; + } + } + class Message implements Protocol { + private final ActorRef ackTo; + private final String msg; + public Message(ActorRef ackTo, String msg) { + this.ackTo = ackTo; + this.msg = msg; + } + } + class Complete implements Protocol {} + class Fail implements Protocol { + private final Throwable ex; + public Fail(Throwable ex) { + this.ex = ex; + } + } + // #actor-sink-ref-with-ack + + final ActorMaterializer mat = null; + + { + // #actor-sink-ref-with-ack + + final ActorRef actor = null; + + final Sink sink = ActorSink.actorRefWithAck( + actor, + Message::new, + Init::new, + new Ack(), + new Complete(), + Fail::new + ); + + Source.single("msg1").runWith(sink, mat); + // #actor-sink-ref-with-ack + } + +} diff --git a/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java new file mode 100644 index 0000000000..3e65476232 --- /dev/null +++ b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package docs.akka.stream.typed; + +// #actor-source-ref +import akka.actor.typed.ActorRef; +import akka.japi.JavaPartialFunction; +import akka.stream.ActorMaterializer; +import akka.stream.OverflowStrategy; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.stream.typed.javadsl.ActorSource; +// #actor-source-ref + +public class ActorSourceExample { + + // #actor-source-ref + + interface Protocol {} + class Message implements Protocol { + private final String msg; + public Message(String msg) { + this.msg = msg; + } + } + class Complete implements Protocol {} + class Fail implements Protocol { + private final Exception ex; + public Fail(Exception ex) { + this.ex = ex; + } + } + // #actor-source-ref + + final ActorMaterializer mat = null; + + { + // #actor-source-ref + + final JavaPartialFunction failureMatcher = + new JavaPartialFunction() { + public Throwable apply(Protocol p, boolean isCheck) { + if (p instanceof Fail) { + return ((Fail)p).ex; + } else { + throw noMatch(); + } + } + }; + + final Source> source = ActorSource.actorRef( + (m) -> m instanceof Complete, + failureMatcher, + 8, + OverflowStrategy.fail() + ); + + final ActorRef ref = source.collect(new JavaPartialFunction() { + public String apply(Protocol p, boolean isCheck) { + if (p instanceof Message) { + return ((Message)p).msg; + } else { + throw noMatch(); + } + } + }).to(Sink.foreach(System.out::println)).run(mat); + + ref.tell(new Message("msg1")); + // ref.tell("msg2"); Does not compile + // #actor-source-ref + } +} \ No newline at end of file diff --git a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala index 0141711b44..8d5ba79249 100644 --- a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala +++ b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala @@ -1,9 +1,9 @@ -/* - * Copyright (C) 2017 Lightbend Inc. +/** + * Copyright (C) 2018 Lightbend Inc. */ package akka.stream.typed.scaladsl -import akka.actor.typed.scaladsl.Actor +import akka.actor.typed.scaladsl.Behaviors import akka.stream.OverflowStrategy import akka.actor.typed.{ ActorRef, ActorSystem } import akka.testkit.TestKit @@ -59,20 +59,20 @@ class ActorSourceSinkSpec extends TestKit(akka.actor.ActorSystem("ActorSourceSin "obey protocol" in { val p = TestProbe[AckProto]() - val autoPilot = Actor.immutable[AckProto] { + val autoPilot = Behaviors.immutable[AckProto] { (ctx, msg) ⇒ msg match { case m @ Init(sender) ⇒ p.ref ! m sender ! "ACK" - Actor.same + Behaviors.same case m @ Msg(sender, _) ⇒ p.ref ! m sender ! "ACK" - Actor.same + Behaviors.same case m ⇒ p.ref ! m - Actor.same + Behaviors.same } } diff --git a/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala b/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala new file mode 100644 index 0000000000..ae34b2caa5 --- /dev/null +++ b/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package docs.akka.stream.typed + +import akka.NotUsed +import akka.stream.ActorMaterializer + +object ActorSourceSinkExample { + + implicit val mat: ActorMaterializer = ??? + + { + // #actor-source-ref + import akka.actor.typed.ActorRef + import akka.stream.OverflowStrategy + import akka.stream.scaladsl.{ Sink, Source } + import akka.stream.typed.scaladsl.ActorSource + + trait Protocol + case class Message(msg: String) extends Protocol + case object Complete extends Protocol + case class Fail(ex: Exception) extends Protocol + + val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol]( + completionMatcher = { + case Complete ⇒ + }, + failureMatcher = { + case Fail(ex) ⇒ ex + }, + bufferSize = 8, + overflowStrategy = OverflowStrategy.fail + ) + + val ref = source.collect { + case Message(msg) ⇒ msg + }.to(Sink.foreach(println)).run() + + ref ! Message("msg1") + // ref ! "msg2" Does not compile + // #actor-source-ref + } + + { + // #actor-sink-ref + import akka.actor.typed.ActorRef + import akka.stream.scaladsl.{ Sink, Source } + import akka.stream.typed.scaladsl.ActorSink + + trait Protocol + case class Message(msg: String) extends Protocol + case object Complete extends Protocol + case class Fail(ex: Throwable) extends Protocol + + val actor: ActorRef[Protocol] = ??? + + val sink: Sink[Protocol, NotUsed] = ActorSink.actorRef[Protocol]( + ref = actor, + onCompleteMessage = Complete, + onFailureMessage = Fail.apply + ) + + Source.single(Message("msg1")).runWith(sink) + // #actor-sink-ref + } + + { + // #actor-sink-ref-with-ack + import akka.actor.typed.ActorRef + import akka.stream.scaladsl.{ Sink, Source } + import akka.stream.typed.scaladsl.ActorSink + + trait Ack + object Ack extends Ack + + trait Protocol + case class Init(ackTo: ActorRef[Ack]) extends Protocol + case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol + case object Complete extends Protocol + case class Fail(ex: Throwable) extends Protocol + + val actor: ActorRef[Protocol] = ??? + + val sink: Sink[String, NotUsed] = ActorSink.actorRefWithAck( + ref = actor, + onCompleteMessage = Complete, + onFailureMessage = Fail.apply, + messageAdapter = Message.apply, + onInitMessage = Init.apply, + ackMessage = Ack + ) + + Source.single("msg1").runWith(sink) + // #actor-sink-ref-with-ack + } +} diff --git a/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes index 6cc828a75b..af2a648a6b 100644 --- a/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes @@ -3,3 +3,12 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Restart ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffFlow.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffLogic.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffSource.this") + +# #23604 Typed stream adapters +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSinkActor.props") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSink.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSinkActor.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSourceActor.props") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSource.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefBackpressureSinkStage.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSourceActor.this") diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index c3cc53c0f4..f00da21463 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -6,6 +6,7 @@ package akka.stream.scaladsl import akka.{ Done, NotUsed } import akka.dispatch.ExecutionContexts import akka.actor.{ ActorRef, Props, Status } +import akka.annotation.InternalApi import akka.stream.actor.ActorSubscriber import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl._ @@ -369,6 +370,8 @@ object Sink { } /** + * INTERNAL API + * * Sends the elements of the stream to the given `ActorRef`. * If the target actor terminates the stream will be canceled. * When the stream is completed successfully the given `onCompleteMessage` @@ -383,7 +386,7 @@ object Sink { * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate * limiting stage in front of this `Sink`. */ - def actorRef[T](ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable ⇒ Any): Sink[T, NotUsed] = + @InternalApi private[akka] def actorRef[T](ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable ⇒ Any): Sink[T, NotUsed] = fromGraph(new ActorRefSink(ref, onCompleteMessage, onFailureMessage, DefaultAttributes.actorRefSink, shape("ActorRefSink"))) @@ -402,12 +405,13 @@ object Sink { * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate * limiting stage in front of this `Sink`. */ - @deprecated("Use `actorRef` that takes onFailureMessage instead. It allows controling the message that will be sent to the actor on failure.", since = "2.5.10") def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed] = fromGraph(new ActorRefSink(ref, onCompleteMessage, t ⇒ Status.Failure(t), DefaultAttributes.actorRefSink, shape("ActorRefSink"))) /** + * INTERNAL API + * * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. * First element is created by calling `onInitMessage` with an `ActorRef` of the actor that * expects acknowledgements. Then stream is waiting for acknowledgement message @@ -425,9 +429,9 @@ object Sink { * When the stream is completed with failure - result of `onFailureMessage(throwable)` * function will be sent to the destination actor. */ - def actorRefWithAck[T](ref: ActorRef, messageAdapter: ActorRef ⇒ T ⇒ Any, - onInitMessage: ActorRef ⇒ Any, ackMessage: Any, onCompleteMessage: Any, - onFailureMessage: (Throwable) ⇒ Any): Sink[T, NotUsed] = + @InternalApi private[akka] def actorRefWithAck[T](ref: ActorRef, messageAdapter: ActorRef ⇒ T ⇒ Any, + onInitMessage: ActorRef ⇒ Any, ackMessage: Any, onCompleteMessage: Any, + onFailureMessage: (Throwable) ⇒ Any): Sink[T, NotUsed] = Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, messageAdapter, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)) /** @@ -443,9 +447,7 @@ object Sink { * When the stream is completed with failure - result of `onFailureMessage(throwable)` * function will be sent to the destination actor. * - * @deprecated Use `actorRefWithAck` that takes `messageAdapter` instead. It allows capturing the original sender of the messages as well as transforming messages before sending them to the actor under the `ref`. */ - @deprecated("Use `actorRefWithAck` that takes `messageAdapter` instead. It allows capturing the original sender of the messages as well as transforming messages before sending them to the actor under the `ref`.", since = "2.5.10") def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any, onFailureMessage: (Throwable) ⇒ Any = Status.Failure): Sink[T, NotUsed] = actorRefWithAck(ref, _ ⇒ identity, _ ⇒ onInitMessage, ackMessage, onCompleteMessage, onFailureMessage) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 650a13b09e..7e9dd39eec 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -6,6 +6,7 @@ package akka.stream.scaladsl import java.util.concurrent.CompletionStage import akka.actor.{ ActorRef, Cancellable, Props } +import akka.annotation.InternalApi import akka.stream.actor.ActorPublisher import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.fusing.GraphStages @@ -431,6 +432,8 @@ object Source { } /** + * INTERNAL API + * * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. * Messages sent to this actor will be emitted to the stream if there is demand from downstream, * otherwise they will be buffered until request for demand is received. @@ -462,7 +465,7 @@ object Source { * @param bufferSize The size of the buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ - def actorRef[T]( + @InternalApi private[akka] def actorRef[T]( completionMatcher: PartialFunction[Any, Unit], failureMatcher: PartialFunction[Any, Throwable], bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = { @@ -500,12 +503,10 @@ object Source { * * See also [[akka.stream.scaladsl.Source.queue]]. * - * @deprecated Use `actorRef` that takes matchers instead. It allows controlling the completion and failure messages that are sent to the actor. * * @param bufferSize The size of the buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ - @deprecated("Use `actorRef` that takes matchers instead. It allows controlling the messages that are used for completion and failure.", since = "2.5.10") def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = actorRef( { case akka.actor.Status.Success(_) ⇒ }, diff --git a/build.sbt b/build.sbt index 2023878170..ec9cd5c2f0 100644 --- a/build.sbt +++ b/build.sbt @@ -435,6 +435,7 @@ lazy val streamTyped = akkaModule("akka-stream-typed") .settings(AkkaBuild.mayChangeSettings) .settings(AutomaticModuleName.settings("akka.stream.typed")) .disablePlugins(MimaPlugin) + .enablePlugins(ScaladocNoVerificationOfDiagrams) lazy val typedTestkit = akkaModule("akka-testkit-typed") .dependsOn(actorTyped, testkit % "compile->compile;test->test")