diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/ActorMaterializer.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/ActorMaterializer.scala new file mode 100644 index 0000000000..df3c6b32ab --- /dev/null +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/ActorMaterializer.scala @@ -0,0 +1,57 @@ +package akka.stream.typed + +import akka.actor.typed.ActorSystem +import akka.stream.ActorMaterializerSettings + +object ActorMaterializer { + import akka.actor.typed.scaladsl.adapter._ + + /** + * Scala API: Creates an ActorMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.typed.ActorSystem]] + * will be used to create one actor that in turn creates actors for the transformation steps. + * + * The materializer's [[akka.stream.ActorMaterializerSettings]] will be obtained from the + * configuration of the `context`'s underlying [[akka.actor.typed.ActorSystem]]. + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def apply[T](materializerSettings: Option[ActorMaterializerSettings] = None, namePrefix: Option[String] = None)(implicit actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer = + akka.stream.ActorMaterializer(materializerSettings, namePrefix)(actorSystem.toUntyped) + + /** + * Java API: Creates an ActorMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.typed.ActorSystem]] + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + * + * Defaults the actor name prefix used to name actors running the processing steps to `"flow"`. + * The actor names are built up of `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def create[T](actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer = + apply()(actorSystem) + + /** + * Java API: Creates an ActorMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.typed.ActorSystem]] + * will be used to create one actor that in turn creates actors for the transformation steps. + */ + def create[T](settings: ActorMaterializerSettings, actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer = + apply(Option(settings), None)(actorSystem) + + /** + * Java API: Creates an ActorMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.typed.ActorSystem]] + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def create[T](settings: ActorMaterializerSettings, namePrefix: String, actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer = + apply(Option(settings), Option(namePrefix))(actorSystem) + +} diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala new file mode 100644 index 0000000000..da75a2bc0e --- /dev/null +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.stream.typed.javadsl + +import akka.actor.typed._ +import akka.NotUsed +import akka.stream.scaladsl._ +import akka.stream.typed + +/** + * Collection of Sinks aimed at integrating with typed Actors. + */ +object ActorSink { + /** + * Sends the elements of the stream to the given `ActorRef`. + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure a the throwable that was signaled + * to the stream is adapted to the Actors protocol using `onFailureMessage` and + * then then sent to the destination actor. + * + * It will request at most `maxInputBufferSize` number of elements from + * upstream, but there is no back-pressure signal from the destination actor, + * i.e. if the actor is not consuming the messages fast enough the mailbox + * of the actor will grow. For potentially slow consumer actors it is recommended + * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate + * limiting stage in front of this `Sink`. + */ + def actorRef[T](ref: ActorRef[T], onCompleteMessage: T, onFailureMessage: akka.japi.function.Function[Throwable, T]): Sink[T, NotUsed] = + typed.scaladsl.ActorSink.actorRef(ref, onCompleteMessage, onFailureMessage.apply) + + /** + * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. + * First element is always `onInitMessage`, then stream is waiting for acknowledgement message + * `ackMessage` from the given actor which means that it is ready to process + * elements. It also requires `ackMessage` message after each stream element + * to make backpressure work. + * + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure - result of `onFailureMessage(throwable)` + * function will be sent to the destination actor. + */ + def actorRefWithAck[T, M, A]( + ref: ActorRef[M], + messageAdapter: akka.japi.function.Function2[ActorRef[A], T, M], + onInitMessage: akka.japi.function.Function[ActorRef[A], M], + ackMessage: A, + onCompleteMessage: M, + onFailureMessage: akka.japi.function.Function[Throwable, M]): Sink[T, NotUsed] = + typed.scaladsl.ActorSink.actorRefWithAck( + ref, messageAdapter.apply, onInitMessage.apply, ackMessage, onCompleteMessage, onFailureMessage.apply) + +} diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala new file mode 100644 index 0000000000..72ec384862 --- /dev/null +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ + +package akka.stream.typed.javadsl + +import akka.actor.typed._ +import akka.stream.OverflowStrategy +import akka.stream.javadsl._ + +/** + * Collection of Sources aimed at integrating with typed Actors. + */ +object ActorSource { + + /** + * Creates a `Source` that is materialized as an [[akka.actor.typed.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * otherwise they will be buffered until request for demand is received. + * + * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if + * there is no space available in the buffer. + * + * The strategy [[akka.stream.OverflowStrategy.backpressure]] is not supported, and an + * IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument. + * + * The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand + * from downstream. When `bufferSize` is 0 the `overflowStrategy` does not matter. An async boundary is added after + * this Source; as such, it is never safe to assume the downstream will always generate demand. + * + * The stream can be completed successfully by sending the actor reference a [[akka.actor.Status.Success]] + * (whose content will be ignored) in which case already buffered elements will be signaled before signaling + * completion, or by sending [[akka.actor.PoisonPill]] in which case completion will be signaled immediately. + * + * The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the + * actor reference. In case the Actor is still draining its internal buffer (after having received + * a [[akka.actor.Status.Success]]) before signaling completion and it receives a [[akka.actor.Status.Failure]], + * the failure will be signaled downstream immediately (instead of the completion signal). + * + * The actor will be stopped when the stream is completed, failed or canceled from downstream, + * i.e. you can watch it to get notified when that happens. + * + * See also [[akka.stream.javadsl.Source.queue]]. + * + * @param bufferSize The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def actorRef[T]( + completionMatcher: PartialFunction[T, Unit], + failureMatcher: PartialFunction[T, Throwable], + bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = { + akka.stream.typed.scaladsl.ActorSource.actorRef( + completionMatcher, failureMatcher, + bufferSize, overflowStrategy).asJava + } +} diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala new file mode 100644 index 0000000000..f3d5e405ac --- /dev/null +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.stream.typed.scaladsl + +import akka.actor.typed._ +import akka.stream.scaladsl._ +import akka.NotUsed + +/** + * Collection of Sinks aimed at integrating with typed Actors. + */ +object ActorSink { + import akka.actor.typed.scaladsl.adapter._ + + /** + * Sends the elements of the stream to the given `ActorRef`. + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure a the throwable that was signaled + * to the stream is adapted to the Actors protocol using `onFailureMessage` and + * then then sent to the destination actor. + * + * It will request at most `maxInputBufferSize` number of elements from + * upstream, but there is no back-pressure signal from the destination actor, + * i.e. if the actor is not consuming the messages fast enough the mailbox + * of the actor will grow. For potentially slow consumer actors it is recommended + * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate + * limiting stage in front of this `Sink`. + */ + def actorRef[T](ref: ActorRef[T], onCompleteMessage: T, onFailureMessage: Throwable ⇒ T): Sink[T, NotUsed] = + Sink.actorRef(ref.toUntyped, onCompleteMessage, onFailureMessage) + + /** + * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. + * First element is always `onInitMessage`, then stream is waiting for acknowledgement message + * `ackMessage` from the given actor which means that it is ready to process + * elements. It also requires `ackMessage` message after each stream element + * to make backpressure work. + * + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure - result of `onFailureMessage(throwable)` + * function will be sent to the destination actor. + */ + def actorRefWithAck[T, M, A]( + ref: ActorRef[M], + messageAdapter: (ActorRef[A], T) ⇒ M, + onInitMessage: ActorRef[A] ⇒ M, + ackMessage: A, + onCompleteMessage: M, + onFailureMessage: Throwable ⇒ M): Sink[T, NotUsed] = + Sink.actorRefWithAck( + ref.toUntyped, + messageAdapter.curried.compose(actorRefAdapter), + onInitMessage.compose(actorRefAdapter), + ackMessage, onCompleteMessage, onFailureMessage) + +} diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala new file mode 100644 index 0000000000..bc5539e329 --- /dev/null +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ + +package akka.stream.typed.scaladsl + +import akka.actor.typed._ +import akka.stream.OverflowStrategy +import akka.stream.scaladsl._ + +/** + * Collection of Sources aimed at integrating with typed Actors. + */ +object ActorSource { + + import akka.actor.typed.scaladsl.adapter._ + + /** + * Creates a `Source` that is materialized as an [[akka.actor.typed.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * otherwise they will be buffered until request for demand is received. + * + * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if + * there is no space available in the buffer. + * + * The strategy [[akka.stream.OverflowStrategy.backpressure]] is not supported, and an + * IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument. + * + * The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand + * from downstream. When `bufferSize` is 0 the `overflowStrategy` does not matter. An async boundary is added after + * this Source; as such, it is never safe to assume the downstream will always generate demand. + * + * The stream can be completed successfully by sending the actor reference a message that is matched by + * `completionMatcher` in which case already buffered elements will be signaled before signaling + * completion. + * + * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted + * [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received + * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, + * the failure will be signaled downstream immediately (instead of the completion signal). + * + * The actor will be stopped when the stream is completed, failed or canceled from downstream, + * i.e. you can watch it to get notified when that happens. + * + * See also [[akka.stream.scaladsl.Source.queue]]. + * + * @param bufferSize The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def actorRef[T]( + completionMatcher: PartialFunction[T, Unit], + failureMatcher: PartialFunction[T, Throwable], + bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = + Source.actorRef[T]( + completionMatcher.asInstanceOf[PartialFunction[Any, Unit]], + failureMatcher.asInstanceOf[PartialFunction[Any, Throwable]], + bufferSize, overflowStrategy).mapMaterializedValue(actorRefAdapter) +} diff --git a/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java b/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java new file mode 100644 index 0000000000..bf3d097f0c --- /dev/null +++ b/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java @@ -0,0 +1,87 @@ +package akka.stream.typed.javadsl; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.stream.ActorMaterializer; +import akka.stream.OverflowStrategy; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import scala.PartialFunction$; +import scala.runtime.AbstractPartialFunction; +import scala.runtime.BoxedUnit; + +public class ActorSourceSinkCompileTest { + + interface Protocol {} + class Init implements Protocol {} + class Msg implements Protocol {} + class Complete implements Protocol {} + class Failure implements Protocol { + public Exception ex; + } + + { + final ActorSystem system = null; + final ActorMaterializer mat = akka.stream.typed.ActorMaterializer.create(system); + } + + { + final ActorRef ref = null; + + Source.queue(10, OverflowStrategy.dropBuffer()) + .map(s -> s + "!") + .to(ActorSink.actorRef(ref, "DONE", ex -> "FAILED: " + ex.getMessage())); + } + + { + final ActorRef ref = null; + + Source.queue(10, OverflowStrategy.dropBuffer()) + .to(ActorSink.actorRefWithAck( + ref, + (sender, msg) -> new Init(), + (sender) -> new Msg(), + "ACK", + new Complete(), + (f) -> new Failure())); + } + + { + final AbstractPartialFunction completionMatcher = new AbstractPartialFunction() { + @Override + public boolean isDefinedAt(String s) { + return s == "complete"; + } + }; + + ActorSource + .actorRef( + completionMatcher, + PartialFunction$.MODULE$.empty(), // FIXME make the API nicer + 10, + OverflowStrategy.dropBuffer()) + .to(Sink.seq()); + } + + { + final AbstractPartialFunction failureMatcher = new AbstractPartialFunction() { + @Override + public boolean isDefinedAt(Protocol p) { + return p instanceof Failure; + } + + @Override + public Throwable apply(Protocol p) { + return ((Failure)p).ex; + } + }; + + ActorSource + .actorRef( + PartialFunction$.MODULE$.empty(), // FIXME make the API nicer + failureMatcher, 10, + OverflowStrategy.dropBuffer()) + .to(Sink.seq()); + } + +} diff --git a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala new file mode 100644 index 0000000000..0141711b44 --- /dev/null +++ b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala @@ -0,0 +1,123 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.stream.typed.scaladsl + +import akka.actor.typed.scaladsl.Actor +import akka.stream.OverflowStrategy +import akka.actor.typed.{ ActorRef, ActorSystem } +import akka.testkit.TestKit +import akka.testkit.typed.scaladsl._ +import akka.stream.scaladsl.{ Keep, Sink, Source } +import akka.stream.typed.ActorMaterializer +import akka.testkit.typed.TestKitSettings +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } +import org.scalatest.concurrent.ScalaFutures + +object ActorSourceSinkSpec { + + sealed trait AckProto + case class Init(sender: ActorRef[String]) extends AckProto + case class Msg(sender: ActorRef[String], msg: String) extends AckProto + case object Complete extends AckProto + case object Failed extends AckProto +} + +class ActorSourceSinkSpec extends TestKit(akka.actor.ActorSystem("ActorSourceSinkSpec")) with WordSpecLike with BeforeAndAfterAll with Matchers with ScalaFutures { + import ActorSourceSinkSpec._ + import akka.actor.typed.scaladsl.adapter._ + + // FIXME use Typed Teskit + // The materializer creates a top-level actor when materializing a stream. + // Currently that is not supported, because a Typed Teskit uses a typed actor system + // with a custom guardian. Because of custom guardian, an exception is being thrown + // when trying to create a top level actor during materialization. + implicit val sys = ActorSystem.wrap(system) + implicit val testkitSettings = TestKitSettings(sys) + implicit val mat = ActorMaterializer() + + override protected def afterAll(): Unit = + sys.terminate() + + "ActorSink" should { + + "accept messages" in { + val p = TestProbe[String]() + + val in = + Source.queue[String](10, OverflowStrategy.dropBuffer) + .map(_ + "!") + .to(ActorSink.actorRef(p.ref, "DONE", ex ⇒ "FAILED: " + ex.getMessage)) + .run() + + val msg = "Zug zug" + + in.offer(msg) + p.expectMsg(msg + "!") + } + + "obey protocol" in { + val p = TestProbe[AckProto]() + + val autoPilot = Actor.immutable[AckProto] { + (ctx, msg) ⇒ + msg match { + case m @ Init(sender) ⇒ + p.ref ! m + sender ! "ACK" + Actor.same + case m @ Msg(sender, _) ⇒ + p.ref ! m + sender ! "ACK" + Actor.same + case m ⇒ + p.ref ! m + Actor.same + } + } + + val pilotRef: ActorRef[AckProto] = system.actorOf(PropsAdapter(autoPilot)) + + val in = + Source.queue[String](10, OverflowStrategy.dropBuffer) + .to(ActorSink.actorRefWithAck(pilotRef, Msg.apply, Init.apply, "ACK", Complete, _ ⇒ Failed)) + .run() + + p.expectMsgType[Init] + + in.offer("Dabu!") + p.expectMsgType[Msg].msg shouldBe "Dabu!" + + in.offer("Lok'tar!") + p.expectMsgType[Msg].msg shouldBe "Lok'tar!" + + in.offer("Swobu!") + p.expectMsgType[Msg].msg shouldBe "Swobu!" + } + } + + "ActorSource" should { + "send messages and complete" in { + val (in, out) = ActorSource.actorRef[String]({ case "complete" ⇒ }, PartialFunction.empty, 10, OverflowStrategy.dropBuffer) + .toMat(Sink.seq)(Keep.both) + .run() + + in ! "one" + in ! "two" + in ! "complete" + + out.futureValue should contain theSameElementsAs Seq("one", "two") + } + + "fail the stream" in { + val (in, out) = ActorSource.actorRef[String](PartialFunction.empty, { case msg ⇒ new Error(msg) }, 10, OverflowStrategy.dropBuffer) + .toMat(Sink.seq)(Keep.both) + .run() + + in ! "boom!" + + out.failed.futureValue.getCause.getMessage shouldBe "boom!" + } + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala index bd8fd86594..c2affc9e4e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala @@ -15,10 +15,13 @@ import akka.stream.stage._ /** * INTERNAL API */ -@InternalApi private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessage: Any, - ackMessage: Any, - onCompleteMessage: Any, - onFailureMessage: (Throwable) ⇒ Any) +@InternalApi private[akka] class ActorRefBackpressureSinkStage[In]( + ref: ActorRef, + messageAdapter: ActorRef ⇒ In ⇒ Any, + onInitMessage: ActorRef ⇒ Any, + ackMessage: Any, + onCompleteMessage: Any, + onFailureMessage: (Throwable) ⇒ Any) extends GraphStage[SinkShape[In]] { val in: Inlet[In] = Inlet[In]("ActorRefBackpressureSink.in") override def initialAttributes = DefaultAttributes.actorRefWithAck @@ -55,12 +58,12 @@ import akka.stream.stage._ override def preStart() = { setKeepGoing(true) getStageActor(receive).watch(ref) - ref ! onInitMessage + ref ! onInitMessage(self) pull(in) } private def dequeueAndSend(): Unit = { - ref ! buffer.poll() + ref ! messageAdapter(self)(buffer.poll()) if (buffer.isEmpty && completeReceived) finish() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala index 6859d7dd5a..e487c3acaf 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala @@ -16,14 +16,15 @@ import akka.annotation.InternalApi * INTERNAL API */ @InternalApi private[akka] object ActorRefSinkActor { - def props(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any): Props = - Props(new ActorRefSinkActor(ref, highWatermark, onCompleteMessage)) + def props(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any, onFailureMessage: Throwable ⇒ Any): Props = + Props(new ActorRefSinkActor(ref, highWatermark, onCompleteMessage, onFailureMessage)) } /** * INTERNAL API */ -@InternalApi private[akka] class ActorRefSinkActor(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any) extends ActorSubscriber { +@InternalApi private[akka] class ActorRefSinkActor(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any, onFailureMessage: Throwable ⇒ Any) + extends ActorSubscriber { import ActorSubscriberMessage._ override val requestStrategy = WatermarkRequestStrategy(highWatermark) @@ -34,7 +35,7 @@ import akka.annotation.InternalApi case OnNext(elem) ⇒ ref.tell(elem, ActorRef.noSender) case OnError(cause) ⇒ - ref.tell(Status.Failure(cause), ActorRef.noSender) + ref.tell(onFailureMessage(cause), ActorRef.noSender) context.stop(self) case OnComplete ⇒ ref.tell(onCompleteMessage, ActorRef.noSender) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala index 356ba3647c..a0eb40ef20 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala @@ -15,17 +15,20 @@ import akka.stream.ActorMaterializerSettings * INTERNAL API */ @InternalApi private[akka] object ActorRefSourceActor { - def props(bufferSize: Int, overflowStrategy: OverflowStrategy, settings: ActorMaterializerSettings) = { + def props(completionMatcher: PartialFunction[Any, Unit], failureMatcher: PartialFunction[Any, Throwable], + bufferSize: Int, overflowStrategy: OverflowStrategy, settings: ActorMaterializerSettings) = { require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported") val maxFixedBufferSize = settings.maxFixedBufferSize - Props(new ActorRefSourceActor(bufferSize, overflowStrategy, maxFixedBufferSize)) + Props(new ActorRefSourceActor(completionMatcher, failureMatcher, bufferSize, overflowStrategy, maxFixedBufferSize)) } } /** * INTERNAL API */ -@InternalApi private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: OverflowStrategy, maxFixedBufferSize: Int) +@InternalApi private[akka] class ActorRefSourceActor( + completionMatcher: PartialFunction[Any, Unit], failureMatcher: PartialFunction[Any, Throwable], + bufferSize: Int, overflowStrategy: OverflowStrategy, maxFixedBufferSize: Int) extends akka.stream.actor.ActorPublisher[Any] with ActorLogging { import akka.stream.actor.ActorPublisherMessage._ @@ -35,15 +38,21 @@ import akka.stream.ActorMaterializerSettings def receive = ({ case Cancel ⇒ context.stop(self) + }: Receive) + .orElse(requestElem) + .orElse(receiveFailure) + .orElse(receiveComplete) + .orElse(receiveElem) - case _: Status.Success ⇒ - if (bufferSize == 0 || buffer.isEmpty) context.stop(self) // will complete the stream successfully - else context.become(drainBufferThenComplete) + def receiveComplete: Receive = completionMatcher.andThen { _ ⇒ + if (bufferSize == 0 || buffer.isEmpty) context.stop(self) // will complete the stream successfully + else context.become(drainBufferThenComplete) + } - case Status.Failure(cause) if isActive ⇒ + def receiveFailure: Receive = failureMatcher.andThen { cause ⇒ + if (isActive) onErrorThenStop(cause) - - }: Receive).orElse(requestElem).orElse(receiveElem) + } def requestElem: Receive = { case _: Request ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index a590ea0a61..c922582dea 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -99,19 +99,24 @@ import akka.util.OptionVal * INTERNAL API */ @InternalApi private[akka] final class ActorRefSource[Out]( - bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: Attributes, shape: SourceShape[Out]) + completionMatcher: PartialFunction[Any, Unit], + failureMatcher: PartialFunction[Any, Throwable], + bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) { override protected def label: String = s"ActorRefSource($bufferSize, $overflowStrategy)" override def create(context: MaterializationContext) = { val mat = ActorMaterializerHelper.downcast(context.materializer) - val ref = mat.actorOf(context, ActorRefSourceActor.props(bufferSize, overflowStrategy, mat.settings)) + val ref = mat.actorOf(context, ActorRefSourceActor.props( + completionMatcher, + failureMatcher, + bufferSize, overflowStrategy, mat.settings)) (akka.stream.actor.ActorPublisher[Out](ref), ref) } override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = - new ActorRefSource[Out](bufferSize, overflowStrategy, attributes, shape) + new ActorRefSource[Out](completionMatcher, failureMatcher, bufferSize, overflowStrategy, attributes, shape) override def withAttributes(attr: Attributes): SourceModule[Out, ActorRef] = - new ActorRefSource(bufferSize, overflowStrategy, attr, amendShape(attr)) + new ActorRefSource(completionMatcher, failureMatcher, bufferSize, overflowStrategy, attr, amendShape(attr)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index b0a9cca2e9..badf68c3f0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -168,7 +168,7 @@ import scala.collection.generic.CanBuildFrom /** * INTERNAL API */ -@InternalApi private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any, +@InternalApi private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable ⇒ Any, val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) { @@ -177,14 +177,14 @@ import scala.collection.generic.CanBuildFrom val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max val subscriberRef = actorMaterializer.actorOf( context, - ActorRefSinkActor.props(ref, maxInputBufferSize, onCompleteMessage)) + ActorRefSinkActor.props(ref, maxInputBufferSize, onCompleteMessage, onFailureMessage)) (akka.stream.actor.ActorSubscriber[In](subscriberRef), NotUsed) } override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] = - new ActorRefSink[In](ref, onCompleteMessage, attributes, shape) + new ActorRefSink[In](ref, onCompleteMessage, onFailureMessage, attributes, shape) override def withAttributes(attr: Attributes): SinkModule[In, NotUsed] = - new ActorRefSink[In](ref, onCompleteMessage, attr, amendShape(attr)) + new ActorRefSink[In](ref, onCompleteMessage, onFailureMessage, attr, amendShape(attr)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 20c482dcf6..a4d9c6cc0d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -210,7 +210,7 @@ object Sink { */ def actorRefWithAck[In](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any, onFailureMessage: function.Function[Throwable, Any]): Sink[In, NotUsed] = - new Sink(scaladsl.Sink.actorRefWithAck[In](ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage.apply)) + new Sink(scaladsl.Sink.actorRefWithAck[In](ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage.apply _)) /** * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index de29f07f95..c3cc53c0f4 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -368,6 +368,25 @@ object Sink { Flow[T].via(newOnCompleteStage()).to(Sink.ignore).named("onCompleteSink") } + /** + * Sends the elements of the stream to the given `ActorRef`. + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure the `onFailureMessage` will be invoked + * and its result will be sent to the destination actor. + * + * It will request at most `maxInputBufferSize` number of elements from + * upstream, but there is no back-pressure signal from the destination actor, + * i.e. if the actor is not consuming the messages fast enough the mailbox + * of the actor will grow. For potentially slow consumer actors it is recommended + * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate + * limiting stage in front of this `Sink`. + */ + def actorRef[T](ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable ⇒ Any): Sink[T, NotUsed] = + fromGraph(new ActorRefSink(ref, onCompleteMessage, onFailureMessage, + DefaultAttributes.actorRefSink, shape("ActorRefSink"))) + /** * Sends the elements of the stream to the given `ActorRef`. * If the target actor terminates the stream will be canceled. @@ -383,8 +402,33 @@ object Sink { * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate * limiting stage in front of this `Sink`. */ + @deprecated("Use `actorRef` that takes onFailureMessage instead. It allows controling the message that will be sent to the actor on failure.", since = "2.5.10") def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed] = - fromGraph(new ActorRefSink(ref, onCompleteMessage, DefaultAttributes.actorRefSink, shape("ActorRefSink"))) + fromGraph(new ActorRefSink(ref, onCompleteMessage, t ⇒ Status.Failure(t), + DefaultAttributes.actorRefSink, shape("ActorRefSink"))) + + /** + * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. + * First element is created by calling `onInitMessage` with an `ActorRef` of the actor that + * expects acknowledgements. Then stream is waiting for acknowledgement message + * `ackMessage` from the given actor which means that it is ready to process + * elements. It also requires `ackMessage` message after each stream element + * to make backpressure work. + * + * Every message that is sent to the actor is first transformed using `messageAdapter`. + * This can be used to capture the ActorRef of the actor that expects acknowledgments as + * well as transforming messages from the stream to the ones that actor under `ref` handles. + * + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure - result of `onFailureMessage(throwable)` + * function will be sent to the destination actor. + */ + def actorRefWithAck[T](ref: ActorRef, messageAdapter: ActorRef ⇒ T ⇒ Any, + onInitMessage: ActorRef ⇒ Any, ackMessage: Any, onCompleteMessage: Any, + onFailureMessage: (Throwable) ⇒ Any): Sink[T, NotUsed] = + Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, messageAdapter, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)) /** * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. @@ -398,10 +442,13 @@ object Sink { * will be sent to the destination actor. * When the stream is completed with failure - result of `onFailureMessage(throwable)` * function will be sent to the destination actor. + * + * @deprecated Use `actorRefWithAck` that takes `messageAdapter` instead. It allows capturing the original sender of the messages as well as transforming messages before sending them to the actor under the `ref`. */ + @deprecated("Use `actorRefWithAck` that takes `messageAdapter` instead. It allows capturing the original sender of the messages as well as transforming messages before sending them to the actor under the `ref`.", since = "2.5.10") def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any, onFailureMessage: (Throwable) ⇒ Any = Status.Failure): Sink[T, NotUsed] = - Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)) + actorRefWithAck(ref, _ ⇒ identity, _ ⇒ onInitMessage, ackMessage, onCompleteMessage, onFailureMessage) /** * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 1cf78b5383..650a13b09e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -430,6 +430,47 @@ object Source { fromGraph(new ActorPublisherSource(props, DefaultAttributes.actorPublisherSource, shape("ActorPublisherSource"))) } + /** + * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * otherwise they will be buffered until request for demand is received. + * + * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if + * there is no space available in the buffer. + * + * The strategy [[akka.stream.OverflowStrategy.backpressure]] is not supported, and an + * IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument. + * + * The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand + * from downstream. When `bufferSize` is 0 the `overflowStrategy` does not matter. An async boundary is added after + * this Source; as such, it is never safe to assume the downstream will always generate demand. + * + * The stream can be completed successfully by sending the actor reference a message that is matched by + * `completionMatcher` in which case already buffered elements will be signaled before signaling + * completion, or by sending [[akka.actor.PoisonPill]] in which case completion will be signaled immediately. + * + * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted + * [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received + * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, + * the failure will be signaled downstream immediately (instead of the completion signal). + * + * The actor will be stopped when the stream is completed, failed or canceled from downstream, + * i.e. you can watch it to get notified when that happens. + * + * See also [[akka.stream.scaladsl.Source.queue]]. + * + * @param bufferSize The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def actorRef[T]( + completionMatcher: PartialFunction[Any, Unit], + failureMatcher: PartialFunction[Any, Throwable], + bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = { + require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") + require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported") + fromGraph(new ActorRefSource(completionMatcher, failureMatcher, bufferSize, overflowStrategy, DefaultAttributes.actorRefSource, shape("ActorRefSource"))) + } + /** * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. * Messages sent to this actor will be emitted to the stream if there is demand from downstream, @@ -459,14 +500,17 @@ object Source { * * See also [[akka.stream.scaladsl.Source.queue]]. * + * @deprecated Use `actorRef` that takes matchers instead. It allows controlling the completion and failure messages that are sent to the actor. + * * @param bufferSize The size of the buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ - def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = { - require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") - require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported") - fromGraph(new ActorRefSource(bufferSize, overflowStrategy, DefaultAttributes.actorRefSource, shape("ActorRefSource"))) - } + @deprecated("Use `actorRef` that takes matchers instead. It allows controlling the messages that are used for completion and failure.", since = "2.5.10") + def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = + actorRef( + { case akka.actor.Status.Success(_) ⇒ }, + { case akka.actor.Status.Failure(cause) ⇒ cause }, + bufferSize, overflowStrategy) /** * Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`. diff --git a/build.sbt b/build.sbt index daac3b402d..2023878170 100644 --- a/build.sbt +++ b/build.sbt @@ -37,7 +37,10 @@ lazy val aggregatedProjects: Seq[ProjectReference] = Seq( slf4j, stream, streamTestkit, streamTests, streamTestsTck, testkit, - actorTyped, actorTypedTests, typedTestkit, persistenceTyped, clusterTyped, clusterShardingTyped + actorTyped, actorTypedTests, typedTestkit, + persistenceTyped, + clusterTyped, clusterShardingTyped, + streamTyped ) lazy val root = Project( @@ -385,7 +388,6 @@ lazy val persistenceTyped = akkaModule("akka-persistence-typed") .dependsOn( actorTyped, persistence, - testkit % "test->test", typedTestkit % "test->test", actorTypedTests % "test->test" ) @@ -401,7 +403,6 @@ lazy val clusterTyped = akkaModule("akka-cluster-typed") distributedData, persistence % "provided->test", persistenceTyped % "provided->test", - testkit % "test->test", typedTestkit % "test->test", actorTypedTests % "test->test" ) @@ -414,7 +415,6 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed") clusterTyped, persistenceTyped, clusterSharding, - testkit % "test->test", typedTestkit % "test->test", actorTypedTests % "test->test", persistenceTyped % "test->test" @@ -425,6 +425,16 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed") .settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf" )) .disablePlugins(MimaPlugin) +lazy val streamTyped = akkaModule("akka-stream-typed") + .dependsOn( + actorTyped, + stream, + typedTestkit % "test->test", + actorTypedTests % "test->test" + ) + .settings(AkkaBuild.mayChangeSettings) + .settings(AutomaticModuleName.settings("akka.stream.typed")) + .disablePlugins(MimaPlugin) lazy val typedTestkit = akkaModule("akka-testkit-typed") .dependsOn(actorTyped, testkit % "compile->compile;test->test")