WIP initial typed streams adapters

Adapt ref source and sink for typed

 * do not use the typed teskit temporarily
This commit is contained in:
Konrad `ktoso` Malawski 2017-09-07 21:07:41 +02:00 committed by Patrik Nordwall
parent 84b8f3ac29
commit 171bb6c231
16 changed files with 657 additions and 39 deletions

View file

@ -0,0 +1,57 @@
package akka.stream.typed
import akka.actor.typed.ActorSystem
import akka.stream.ActorMaterializerSettings
object ActorMaterializer {
import akka.actor.typed.scaladsl.adapter._
/**
* Scala API: Creates an ActorMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.typed.ActorSystem]]
* will be used to create one actor that in turn creates actors for the transformation steps.
*
* The materializer's [[akka.stream.ActorMaterializerSettings]] will be obtained from the
* configuration of the `context`'s underlying [[akka.actor.typed.ActorSystem]].
*
* The `namePrefix` is used as the first part of the names of the actors running
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
* `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def apply[T](materializerSettings: Option[ActorMaterializerSettings] = None, namePrefix: Option[String] = None)(implicit actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer =
akka.stream.ActorMaterializer(materializerSettings, namePrefix)(actorSystem.toUntyped)
/**
* Java API: Creates an ActorMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.typed.ActorSystem]]
* will be used to create these actors, therefore it is *forbidden* to pass this object
* to another actor if the factory is an ActorContext.
*
* Defaults the actor name prefix used to name actors running the processing steps to `"flow"`.
* The actor names are built up of `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def create[T](actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer =
apply()(actorSystem)
/**
* Java API: Creates an ActorMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.typed.ActorSystem]]
* will be used to create one actor that in turn creates actors for the transformation steps.
*/
def create[T](settings: ActorMaterializerSettings, actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer =
apply(Option(settings), None)(actorSystem)
/**
* Java API: Creates an ActorMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.typed.ActorSystem]]
* will be used to create these actors, therefore it is *forbidden* to pass this object
* to another actor if the factory is an ActorContext.
*
* The `namePrefix` is used as the first part of the names of the actors running
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
* `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def create[T](settings: ActorMaterializerSettings, namePrefix: String, actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer =
apply(Option(settings), Option(namePrefix))(actorSystem)
}

View file

@ -0,0 +1,57 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.stream.typed.javadsl
import akka.actor.typed._
import akka.NotUsed
import akka.stream.scaladsl._
import akka.stream.typed
/**
* Collection of Sinks aimed at integrating with typed Actors.
*/
object ActorSink {
/**
* Sends the elements of the stream to the given `ActorRef`.
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure a the throwable that was signaled
* to the stream is adapted to the Actors protocol using `onFailureMessage` and
* then then sent to the destination actor.
*
* It will request at most `maxInputBufferSize` number of elements from
* upstream, but there is no back-pressure signal from the destination actor,
* i.e. if the actor is not consuming the messages fast enough the mailbox
* of the actor will grow. For potentially slow consumer actors it is recommended
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
* limiting stage in front of this `Sink`.
*/
def actorRef[T](ref: ActorRef[T], onCompleteMessage: T, onFailureMessage: akka.japi.function.Function[Throwable, T]): Sink[T, NotUsed] =
typed.scaladsl.ActorSink.actorRef(ref, onCompleteMessage, onFailureMessage.apply)
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is always `onInitMessage`, then stream is waiting for acknowledgement message
* `ackMessage` from the given actor which means that it is ready to process
* elements. It also requires `ackMessage` message after each stream element
* to make backpressure work.
*
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor.
*/
def actorRefWithAck[T, M, A](
ref: ActorRef[M],
messageAdapter: akka.japi.function.Function2[ActorRef[A], T, M],
onInitMessage: akka.japi.function.Function[ActorRef[A], M],
ackMessage: A,
onCompleteMessage: M,
onFailureMessage: akka.japi.function.Function[Throwable, M]): Sink[T, NotUsed] =
typed.scaladsl.ActorSink.actorRefWithAck(
ref, messageAdapter.apply, onInitMessage.apply, ackMessage, onCompleteMessage, onFailureMessage.apply)
}

View file

@ -0,0 +1,56 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.stream.typed.javadsl
import akka.actor.typed._
import akka.stream.OverflowStrategy
import akka.stream.javadsl._
/**
* Collection of Sources aimed at integrating with typed Actors.
*/
object ActorSource {
/**
* Creates a `Source` that is materialized as an [[akka.actor.typed.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received.
*
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
* there is no space available in the buffer.
*
* The strategy [[akka.stream.OverflowStrategy.backpressure]] is not supported, and an
* IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.
*
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand
* from downstream. When `bufferSize` is 0 the `overflowStrategy` does not matter. An async boundary is added after
* this Source; as such, it is never safe to assume the downstream will always generate demand.
*
* The stream can be completed successfully by sending the actor reference a [[akka.actor.Status.Success]]
* (whose content will be ignored) in which case already buffered elements will be signaled before signaling
* completion, or by sending [[akka.actor.PoisonPill]] in which case completion will be signaled immediately.
*
* The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the
* actor reference. In case the Actor is still draining its internal buffer (after having received
* a [[akka.actor.Status.Success]]) before signaling completion and it receives a [[akka.actor.Status.Failure]],
* the failure will be signaled downstream immediately (instead of the completion signal).
*
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
* i.e. you can watch it to get notified when that happens.
*
* See also [[akka.stream.javadsl.Source.queue]].
*
* @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def actorRef[T](
completionMatcher: PartialFunction[T, Unit],
failureMatcher: PartialFunction[T, Throwable],
bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = {
akka.stream.typed.scaladsl.ActorSource.actorRef(
completionMatcher, failureMatcher,
bufferSize, overflowStrategy).asJava
}
}

View file

@ -0,0 +1,61 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.stream.typed.scaladsl
import akka.actor.typed._
import akka.stream.scaladsl._
import akka.NotUsed
/**
* Collection of Sinks aimed at integrating with typed Actors.
*/
object ActorSink {
import akka.actor.typed.scaladsl.adapter._
/**
* Sends the elements of the stream to the given `ActorRef`.
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure a the throwable that was signaled
* to the stream is adapted to the Actors protocol using `onFailureMessage` and
* then then sent to the destination actor.
*
* It will request at most `maxInputBufferSize` number of elements from
* upstream, but there is no back-pressure signal from the destination actor,
* i.e. if the actor is not consuming the messages fast enough the mailbox
* of the actor will grow. For potentially slow consumer actors it is recommended
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
* limiting stage in front of this `Sink`.
*/
def actorRef[T](ref: ActorRef[T], onCompleteMessage: T, onFailureMessage: Throwable T): Sink[T, NotUsed] =
Sink.actorRef(ref.toUntyped, onCompleteMessage, onFailureMessage)
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is always `onInitMessage`, then stream is waiting for acknowledgement message
* `ackMessage` from the given actor which means that it is ready to process
* elements. It also requires `ackMessage` message after each stream element
* to make backpressure work.
*
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor.
*/
def actorRefWithAck[T, M, A](
ref: ActorRef[M],
messageAdapter: (ActorRef[A], T) M,
onInitMessage: ActorRef[A] M,
ackMessage: A,
onCompleteMessage: M,
onFailureMessage: Throwable M): Sink[T, NotUsed] =
Sink.actorRefWithAck(
ref.toUntyped,
messageAdapter.curried.compose(actorRefAdapter),
onInitMessage.compose(actorRefAdapter),
ackMessage, onCompleteMessage, onFailureMessage)
}

View file

@ -0,0 +1,58 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.stream.typed.scaladsl
import akka.actor.typed._
import akka.stream.OverflowStrategy
import akka.stream.scaladsl._
/**
* Collection of Sources aimed at integrating with typed Actors.
*/
object ActorSource {
import akka.actor.typed.scaladsl.adapter._
/**
* Creates a `Source` that is materialized as an [[akka.actor.typed.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received.
*
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
* there is no space available in the buffer.
*
* The strategy [[akka.stream.OverflowStrategy.backpressure]] is not supported, and an
* IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.
*
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand
* from downstream. When `bufferSize` is 0 the `overflowStrategy` does not matter. An async boundary is added after
* this Source; as such, it is never safe to assume the downstream will always generate demand.
*
* The stream can be completed successfully by sending the actor reference a message that is matched by
* `completionMatcher` in which case already buffered elements will be signaled before signaling
* completion.
*
* The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
* [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
* a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
* the failure will be signaled downstream immediately (instead of the completion signal).
*
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
* i.e. you can watch it to get notified when that happens.
*
* See also [[akka.stream.scaladsl.Source.queue]].
*
* @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def actorRef[T](
completionMatcher: PartialFunction[T, Unit],
failureMatcher: PartialFunction[T, Throwable],
bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] =
Source.actorRef[T](
completionMatcher.asInstanceOf[PartialFunction[Any, Unit]],
failureMatcher.asInstanceOf[PartialFunction[Any, Throwable]],
bufferSize, overflowStrategy).mapMaterializedValue(actorRefAdapter)
}

View file

@ -0,0 +1,87 @@
package akka.stream.typed.javadsl;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import scala.PartialFunction$;
import scala.runtime.AbstractPartialFunction;
import scala.runtime.BoxedUnit;
public class ActorSourceSinkCompileTest {
interface Protocol {}
class Init implements Protocol {}
class Msg implements Protocol {}
class Complete implements Protocol {}
class Failure implements Protocol {
public Exception ex;
}
{
final ActorSystem<String> system = null;
final ActorMaterializer mat = akka.stream.typed.ActorMaterializer.create(system);
}
{
final ActorRef<String> ref = null;
Source.<String>queue(10, OverflowStrategy.dropBuffer())
.map(s -> s + "!")
.to(ActorSink.actorRef(ref, "DONE", ex -> "FAILED: " + ex.getMessage()));
}
{
final ActorRef<Protocol> ref = null;
Source.<String>queue(10, OverflowStrategy.dropBuffer())
.to(ActorSink.actorRefWithAck(
ref,
(sender, msg) -> new Init(),
(sender) -> new Msg(),
"ACK",
new Complete(),
(f) -> new Failure()));
}
{
final AbstractPartialFunction<String, BoxedUnit> completionMatcher = new AbstractPartialFunction<String, BoxedUnit>() {
@Override
public boolean isDefinedAt(String s) {
return s == "complete";
}
};
ActorSource
.actorRef(
completionMatcher,
PartialFunction$.MODULE$.empty(), // FIXME make the API nicer
10,
OverflowStrategy.dropBuffer())
.to(Sink.seq());
}
{
final AbstractPartialFunction<Protocol, Throwable> failureMatcher = new AbstractPartialFunction<Protocol, Throwable>() {
@Override
public boolean isDefinedAt(Protocol p) {
return p instanceof Failure;
}
@Override
public Throwable apply(Protocol p) {
return ((Failure)p).ex;
}
};
ActorSource
.actorRef(
PartialFunction$.MODULE$.empty(), // FIXME make the API nicer
failureMatcher, 10,
OverflowStrategy.dropBuffer())
.to(Sink.seq());
}
}

View file

@ -0,0 +1,123 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.stream.typed.scaladsl
import akka.actor.typed.scaladsl.Actor
import akka.stream.OverflowStrategy
import akka.actor.typed.{ ActorRef, ActorSystem }
import akka.testkit.TestKit
import akka.testkit.typed.scaladsl._
import akka.stream.scaladsl.{ Keep, Sink, Source }
import akka.stream.typed.ActorMaterializer
import akka.testkit.typed.TestKitSettings
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
import org.scalatest.concurrent.ScalaFutures
object ActorSourceSinkSpec {
sealed trait AckProto
case class Init(sender: ActorRef[String]) extends AckProto
case class Msg(sender: ActorRef[String], msg: String) extends AckProto
case object Complete extends AckProto
case object Failed extends AckProto
}
class ActorSourceSinkSpec extends TestKit(akka.actor.ActorSystem("ActorSourceSinkSpec")) with WordSpecLike with BeforeAndAfterAll with Matchers with ScalaFutures {
import ActorSourceSinkSpec._
import akka.actor.typed.scaladsl.adapter._
// FIXME use Typed Teskit
// The materializer creates a top-level actor when materializing a stream.
// Currently that is not supported, because a Typed Teskit uses a typed actor system
// with a custom guardian. Because of custom guardian, an exception is being thrown
// when trying to create a top level actor during materialization.
implicit val sys = ActorSystem.wrap(system)
implicit val testkitSettings = TestKitSettings(sys)
implicit val mat = ActorMaterializer()
override protected def afterAll(): Unit =
sys.terminate()
"ActorSink" should {
"accept messages" in {
val p = TestProbe[String]()
val in =
Source.queue[String](10, OverflowStrategy.dropBuffer)
.map(_ + "!")
.to(ActorSink.actorRef(p.ref, "DONE", ex "FAILED: " + ex.getMessage))
.run()
val msg = "Zug zug"
in.offer(msg)
p.expectMsg(msg + "!")
}
"obey protocol" in {
val p = TestProbe[AckProto]()
val autoPilot = Actor.immutable[AckProto] {
(ctx, msg)
msg match {
case m @ Init(sender)
p.ref ! m
sender ! "ACK"
Actor.same
case m @ Msg(sender, _)
p.ref ! m
sender ! "ACK"
Actor.same
case m
p.ref ! m
Actor.same
}
}
val pilotRef: ActorRef[AckProto] = system.actorOf(PropsAdapter(autoPilot))
val in =
Source.queue[String](10, OverflowStrategy.dropBuffer)
.to(ActorSink.actorRefWithAck(pilotRef, Msg.apply, Init.apply, "ACK", Complete, _ Failed))
.run()
p.expectMsgType[Init]
in.offer("Dabu!")
p.expectMsgType[Msg].msg shouldBe "Dabu!"
in.offer("Lok'tar!")
p.expectMsgType[Msg].msg shouldBe "Lok'tar!"
in.offer("Swobu!")
p.expectMsgType[Msg].msg shouldBe "Swobu!"
}
}
"ActorSource" should {
"send messages and complete" in {
val (in, out) = ActorSource.actorRef[String]({ case "complete" }, PartialFunction.empty, 10, OverflowStrategy.dropBuffer)
.toMat(Sink.seq)(Keep.both)
.run()
in ! "one"
in ! "two"
in ! "complete"
out.futureValue should contain theSameElementsAs Seq("one", "two")
}
"fail the stream" in {
val (in, out) = ActorSource.actorRef[String](PartialFunction.empty, { case msg new Error(msg) }, 10, OverflowStrategy.dropBuffer)
.toMat(Sink.seq)(Keep.both)
.run()
in ! "boom!"
out.failed.futureValue.getCause.getMessage shouldBe "boom!"
}
}
}

View file

@ -15,10 +15,13 @@ import akka.stream.stage._
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessage: Any, @InternalApi private[akka] class ActorRefBackpressureSinkStage[In](
ackMessage: Any, ref: ActorRef,
onCompleteMessage: Any, messageAdapter: ActorRef In Any,
onFailureMessage: (Throwable) Any) onInitMessage: ActorRef Any,
ackMessage: Any,
onCompleteMessage: Any,
onFailureMessage: (Throwable) Any)
extends GraphStage[SinkShape[In]] { extends GraphStage[SinkShape[In]] {
val in: Inlet[In] = Inlet[In]("ActorRefBackpressureSink.in") val in: Inlet[In] = Inlet[In]("ActorRefBackpressureSink.in")
override def initialAttributes = DefaultAttributes.actorRefWithAck override def initialAttributes = DefaultAttributes.actorRefWithAck
@ -55,12 +58,12 @@ import akka.stream.stage._
override def preStart() = { override def preStart() = {
setKeepGoing(true) setKeepGoing(true)
getStageActor(receive).watch(ref) getStageActor(receive).watch(ref)
ref ! onInitMessage ref ! onInitMessage(self)
pull(in) pull(in)
} }
private def dequeueAndSend(): Unit = { private def dequeueAndSend(): Unit = {
ref ! buffer.poll() ref ! messageAdapter(self)(buffer.poll())
if (buffer.isEmpty && completeReceived) finish() if (buffer.isEmpty && completeReceived) finish()
} }

View file

@ -16,14 +16,15 @@ import akka.annotation.InternalApi
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] object ActorRefSinkActor { @InternalApi private[akka] object ActorRefSinkActor {
def props(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any): Props = def props(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any, onFailureMessage: Throwable Any): Props =
Props(new ActorRefSinkActor(ref, highWatermark, onCompleteMessage)) Props(new ActorRefSinkActor(ref, highWatermark, onCompleteMessage, onFailureMessage))
} }
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] class ActorRefSinkActor(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any) extends ActorSubscriber { @InternalApi private[akka] class ActorRefSinkActor(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any, onFailureMessage: Throwable Any)
extends ActorSubscriber {
import ActorSubscriberMessage._ import ActorSubscriberMessage._
override val requestStrategy = WatermarkRequestStrategy(highWatermark) override val requestStrategy = WatermarkRequestStrategy(highWatermark)
@ -34,7 +35,7 @@ import akka.annotation.InternalApi
case OnNext(elem) case OnNext(elem)
ref.tell(elem, ActorRef.noSender) ref.tell(elem, ActorRef.noSender)
case OnError(cause) case OnError(cause)
ref.tell(Status.Failure(cause), ActorRef.noSender) ref.tell(onFailureMessage(cause), ActorRef.noSender)
context.stop(self) context.stop(self)
case OnComplete case OnComplete
ref.tell(onCompleteMessage, ActorRef.noSender) ref.tell(onCompleteMessage, ActorRef.noSender)

View file

@ -15,17 +15,20 @@ import akka.stream.ActorMaterializerSettings
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] object ActorRefSourceActor { @InternalApi private[akka] object ActorRefSourceActor {
def props(bufferSize: Int, overflowStrategy: OverflowStrategy, settings: ActorMaterializerSettings) = { def props(completionMatcher: PartialFunction[Any, Unit], failureMatcher: PartialFunction[Any, Throwable],
bufferSize: Int, overflowStrategy: OverflowStrategy, settings: ActorMaterializerSettings) = {
require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported") require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported")
val maxFixedBufferSize = settings.maxFixedBufferSize val maxFixedBufferSize = settings.maxFixedBufferSize
Props(new ActorRefSourceActor(bufferSize, overflowStrategy, maxFixedBufferSize)) Props(new ActorRefSourceActor(completionMatcher, failureMatcher, bufferSize, overflowStrategy, maxFixedBufferSize))
} }
} }
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: OverflowStrategy, maxFixedBufferSize: Int) @InternalApi private[akka] class ActorRefSourceActor(
completionMatcher: PartialFunction[Any, Unit], failureMatcher: PartialFunction[Any, Throwable],
bufferSize: Int, overflowStrategy: OverflowStrategy, maxFixedBufferSize: Int)
extends akka.stream.actor.ActorPublisher[Any] with ActorLogging { extends akka.stream.actor.ActorPublisher[Any] with ActorLogging {
import akka.stream.actor.ActorPublisherMessage._ import akka.stream.actor.ActorPublisherMessage._
@ -35,15 +38,21 @@ import akka.stream.ActorMaterializerSettings
def receive = ({ def receive = ({
case Cancel case Cancel
context.stop(self) context.stop(self)
}: Receive)
.orElse(requestElem)
.orElse(receiveFailure)
.orElse(receiveComplete)
.orElse(receiveElem)
case _: Status.Success def receiveComplete: Receive = completionMatcher.andThen { _
if (bufferSize == 0 || buffer.isEmpty) context.stop(self) // will complete the stream successfully if (bufferSize == 0 || buffer.isEmpty) context.stop(self) // will complete the stream successfully
else context.become(drainBufferThenComplete) else context.become(drainBufferThenComplete)
}
case Status.Failure(cause) if isActive def receiveFailure: Receive = failureMatcher.andThen { cause
if (isActive)
onErrorThenStop(cause) onErrorThenStop(cause)
}
}: Receive).orElse(requestElem).orElse(receiveElem)
def requestElem: Receive = { def requestElem: Receive = {
case _: Request case _: Request

View file

@ -99,19 +99,24 @@ import akka.util.OptionVal
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] final class ActorRefSource[Out]( @InternalApi private[akka] final class ActorRefSource[Out](
bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: Attributes, shape: SourceShape[Out]) completionMatcher: PartialFunction[Any, Unit],
failureMatcher: PartialFunction[Any, Throwable],
bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: Attributes, shape: SourceShape[Out])
extends SourceModule[Out, ActorRef](shape) { extends SourceModule[Out, ActorRef](shape) {
override protected def label: String = s"ActorRefSource($bufferSize, $overflowStrategy)" override protected def label: String = s"ActorRefSource($bufferSize, $overflowStrategy)"
override def create(context: MaterializationContext) = { override def create(context: MaterializationContext) = {
val mat = ActorMaterializerHelper.downcast(context.materializer) val mat = ActorMaterializerHelper.downcast(context.materializer)
val ref = mat.actorOf(context, ActorRefSourceActor.props(bufferSize, overflowStrategy, mat.settings)) val ref = mat.actorOf(context, ActorRefSourceActor.props(
completionMatcher,
failureMatcher,
bufferSize, overflowStrategy, mat.settings))
(akka.stream.actor.ActorPublisher[Out](ref), ref) (akka.stream.actor.ActorPublisher[Out](ref), ref)
} }
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] =
new ActorRefSource[Out](bufferSize, overflowStrategy, attributes, shape) new ActorRefSource[Out](completionMatcher, failureMatcher, bufferSize, overflowStrategy, attributes, shape)
override def withAttributes(attr: Attributes): SourceModule[Out, ActorRef] = override def withAttributes(attr: Attributes): SourceModule[Out, ActorRef] =
new ActorRefSource(bufferSize, overflowStrategy, attr, amendShape(attr)) new ActorRefSource(completionMatcher, failureMatcher, bufferSize, overflowStrategy, attr, amendShape(attr))
} }

View file

@ -168,7 +168,7 @@ import scala.collection.generic.CanBuildFrom
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any, @InternalApi private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable Any,
val attributes: Attributes, val attributes: Attributes,
shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) { shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) {
@ -177,14 +177,14 @@ import scala.collection.generic.CanBuildFrom
val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max
val subscriberRef = actorMaterializer.actorOf( val subscriberRef = actorMaterializer.actorOf(
context, context,
ActorRefSinkActor.props(ref, maxInputBufferSize, onCompleteMessage)) ActorRefSinkActor.props(ref, maxInputBufferSize, onCompleteMessage, onFailureMessage))
(akka.stream.actor.ActorSubscriber[In](subscriberRef), NotUsed) (akka.stream.actor.ActorSubscriber[In](subscriberRef), NotUsed)
} }
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] = override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] =
new ActorRefSink[In](ref, onCompleteMessage, attributes, shape) new ActorRefSink[In](ref, onCompleteMessage, onFailureMessage, attributes, shape)
override def withAttributes(attr: Attributes): SinkModule[In, NotUsed] = override def withAttributes(attr: Attributes): SinkModule[In, NotUsed] =
new ActorRefSink[In](ref, onCompleteMessage, attr, amendShape(attr)) new ActorRefSink[In](ref, onCompleteMessage, onFailureMessage, attr, amendShape(attr))
} }
/** /**

View file

@ -210,7 +210,7 @@ object Sink {
*/ */
def actorRefWithAck[In](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any, def actorRefWithAck[In](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any,
onFailureMessage: function.Function[Throwable, Any]): Sink[In, NotUsed] = onFailureMessage: function.Function[Throwable, Any]): Sink[In, NotUsed] =
new Sink(scaladsl.Sink.actorRefWithAck[In](ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage.apply)) new Sink(scaladsl.Sink.actorRefWithAck[In](ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage.apply _))
/** /**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor

View file

@ -368,6 +368,25 @@ object Sink {
Flow[T].via(newOnCompleteStage()).to(Sink.ignore).named("onCompleteSink") Flow[T].via(newOnCompleteStage()).to(Sink.ignore).named("onCompleteSink")
} }
/**
* Sends the elements of the stream to the given `ActorRef`.
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure the `onFailureMessage` will be invoked
* and its result will be sent to the destination actor.
*
* It will request at most `maxInputBufferSize` number of elements from
* upstream, but there is no back-pressure signal from the destination actor,
* i.e. if the actor is not consuming the messages fast enough the mailbox
* of the actor will grow. For potentially slow consumer actors it is recommended
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
* limiting stage in front of this `Sink`.
*/
def actorRef[T](ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable Any): Sink[T, NotUsed] =
fromGraph(new ActorRefSink(ref, onCompleteMessage, onFailureMessage,
DefaultAttributes.actorRefSink, shape("ActorRefSink")))
/** /**
* Sends the elements of the stream to the given `ActorRef`. * Sends the elements of the stream to the given `ActorRef`.
* If the target actor terminates the stream will be canceled. * If the target actor terminates the stream will be canceled.
@ -383,8 +402,33 @@ object Sink {
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
* limiting stage in front of this `Sink`. * limiting stage in front of this `Sink`.
*/ */
@deprecated("Use `actorRef` that takes onFailureMessage instead. It allows controling the message that will be sent to the actor on failure.", since = "2.5.10")
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed] = def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed] =
fromGraph(new ActorRefSink(ref, onCompleteMessage, DefaultAttributes.actorRefSink, shape("ActorRefSink"))) fromGraph(new ActorRefSink(ref, onCompleteMessage, t Status.Failure(t),
DefaultAttributes.actorRefSink, shape("ActorRefSink")))
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is created by calling `onInitMessage` with an `ActorRef` of the actor that
* expects acknowledgements. Then stream is waiting for acknowledgement message
* `ackMessage` from the given actor which means that it is ready to process
* elements. It also requires `ackMessage` message after each stream element
* to make backpressure work.
*
* Every message that is sent to the actor is first transformed using `messageAdapter`.
* This can be used to capture the ActorRef of the actor that expects acknowledgments as
* well as transforming messages from the stream to the ones that actor under `ref` handles.
*
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor.
*/
def actorRefWithAck[T](ref: ActorRef, messageAdapter: ActorRef T Any,
onInitMessage: ActorRef Any, ackMessage: Any, onCompleteMessage: Any,
onFailureMessage: (Throwable) Any): Sink[T, NotUsed] =
Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, messageAdapter, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage))
/** /**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
@ -398,10 +442,13 @@ object Sink {
* will be sent to the destination actor. * will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)` * When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor. * function will be sent to the destination actor.
*
* @deprecated Use `actorRefWithAck` that takes `messageAdapter` instead. It allows capturing the original sender of the messages as well as transforming messages before sending them to the actor under the `ref`.
*/ */
@deprecated("Use `actorRefWithAck` that takes `messageAdapter` instead. It allows capturing the original sender of the messages as well as transforming messages before sending them to the actor under the `ref`.", since = "2.5.10")
def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any, def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any,
onFailureMessage: (Throwable) Any = Status.Failure): Sink[T, NotUsed] = onFailureMessage: (Throwable) Any = Status.Failure): Sink[T, NotUsed] =
Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)) actorRefWithAck(ref, _ identity, _ onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)
/** /**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor

View file

@ -430,6 +430,47 @@ object Source {
fromGraph(new ActorPublisherSource(props, DefaultAttributes.actorPublisherSource, shape("ActorPublisherSource"))) fromGraph(new ActorPublisherSource(props, DefaultAttributes.actorPublisherSource, shape("ActorPublisherSource")))
} }
/**
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received.
*
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
* there is no space available in the buffer.
*
* The strategy [[akka.stream.OverflowStrategy.backpressure]] is not supported, and an
* IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.
*
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped if there is no demand
* from downstream. When `bufferSize` is 0 the `overflowStrategy` does not matter. An async boundary is added after
* this Source; as such, it is never safe to assume the downstream will always generate demand.
*
* The stream can be completed successfully by sending the actor reference a message that is matched by
* `completionMatcher` in which case already buffered elements will be signaled before signaling
* completion, or by sending [[akka.actor.PoisonPill]] in which case completion will be signaled immediately.
*
* The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
* [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
* a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
* the failure will be signaled downstream immediately (instead of the completion signal).
*
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
* i.e. you can watch it to get notified when that happens.
*
* See also [[akka.stream.scaladsl.Source.queue]].
*
* @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def actorRef[T](
completionMatcher: PartialFunction[Any, Unit],
failureMatcher: PartialFunction[Any, Throwable],
bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = {
require(bufferSize >= 0, "bufferSize must be greater than or equal to 0")
require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported")
fromGraph(new ActorRefSource(completionMatcher, failureMatcher, bufferSize, overflowStrategy, DefaultAttributes.actorRefSource, shape("ActorRefSource")))
}
/** /**
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream, * Messages sent to this actor will be emitted to the stream if there is demand from downstream,
@ -459,14 +500,17 @@ object Source {
* *
* See also [[akka.stream.scaladsl.Source.queue]]. * See also [[akka.stream.scaladsl.Source.queue]].
* *
* @deprecated Use `actorRef` that takes matchers instead. It allows controlling the completion and failure messages that are sent to the actor.
*
* @param bufferSize The size of the buffer in element count * @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/ */
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = { @deprecated("Use `actorRef` that takes matchers instead. It allows controlling the messages that are used for completion and failure.", since = "2.5.10")
require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] =
require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported") actorRef(
fromGraph(new ActorRefSource(bufferSize, overflowStrategy, DefaultAttributes.actorRefSource, shape("ActorRefSource"))) { case akka.actor.Status.Success(_) },
} { case akka.actor.Status.Failure(cause) cause },
bufferSize, overflowStrategy)
/** /**
* Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`. * Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`.

View file

@ -37,7 +37,10 @@ lazy val aggregatedProjects: Seq[ProjectReference] = Seq(
slf4j, slf4j,
stream, streamTestkit, streamTests, streamTestsTck, stream, streamTestkit, streamTests, streamTestsTck,
testkit, testkit,
actorTyped, actorTypedTests, typedTestkit, persistenceTyped, clusterTyped, clusterShardingTyped actorTyped, actorTypedTests, typedTestkit,
persistenceTyped,
clusterTyped, clusterShardingTyped,
streamTyped
) )
lazy val root = Project( lazy val root = Project(
@ -385,7 +388,6 @@ lazy val persistenceTyped = akkaModule("akka-persistence-typed")
.dependsOn( .dependsOn(
actorTyped, actorTyped,
persistence, persistence,
testkit % "test->test",
typedTestkit % "test->test", typedTestkit % "test->test",
actorTypedTests % "test->test" actorTypedTests % "test->test"
) )
@ -401,7 +403,6 @@ lazy val clusterTyped = akkaModule("akka-cluster-typed")
distributedData, distributedData,
persistence % "provided->test", persistence % "provided->test",
persistenceTyped % "provided->test", persistenceTyped % "provided->test",
testkit % "test->test",
typedTestkit % "test->test", typedTestkit % "test->test",
actorTypedTests % "test->test" actorTypedTests % "test->test"
) )
@ -414,7 +415,6 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed")
clusterTyped, clusterTyped,
persistenceTyped, persistenceTyped,
clusterSharding, clusterSharding,
testkit % "test->test",
typedTestkit % "test->test", typedTestkit % "test->test",
actorTypedTests % "test->test", actorTypedTests % "test->test",
persistenceTyped % "test->test" persistenceTyped % "test->test"
@ -425,6 +425,16 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed")
.settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf" )) .settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf" ))
.disablePlugins(MimaPlugin) .disablePlugins(MimaPlugin)
lazy val streamTyped = akkaModule("akka-stream-typed")
.dependsOn(
actorTyped,
stream,
typedTestkit % "test->test",
actorTypedTests % "test->test"
)
.settings(AkkaBuild.mayChangeSettings)
.settings(AutomaticModuleName.settings("akka.stream.typed"))
.disablePlugins(MimaPlugin)
lazy val typedTestkit = akkaModule("akka-testkit-typed") lazy val typedTestkit = akkaModule("akka-testkit-typed")
.dependsOn(actorTyped, testkit % "compile->compile;test->test") .dependsOn(actorTyped, testkit % "compile->compile;test->test")