diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala index 3239c27f5b..fc25fdf2e5 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala @@ -5,6 +5,7 @@ package akka.stream.actor import java.util.concurrent.ConcurrentHashMap import org.reactivestreams.{ Publisher, Subscriber, Subscription } +import akka.actor.AbstractActor import akka.actor.Actor import akka.actor.ActorRef import akka.actor.ActorSystem @@ -12,29 +13,17 @@ import akka.actor.ExtendedActorSystem import akka.actor.Extension import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider +import akka.actor.UntypedActor object ActorPublisher { /** * Create a [[org.reactivestreams.Publisher]] backed by a [[ActorPublisher]] actor. It can be * attached to a [[org.reactivestreams.Subscriber]] or be used as an input source for a - * [[akka.stream.Flow]]. + * [[akka.stream.scaladsl.Flow]]. */ def apply[T](ref: ActorRef): Publisher[T] = ActorPublisherImpl(ref) - /** - * This message is delivered to the [[ActorPublisher]] actor when the stream subscriber requests - * more elements. - * @param n number of requested elements - */ - @SerialVersionUID(1L) case class Request(n: Int) - - /** - * This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the - * subscription. - */ - @SerialVersionUID(1L) case object Cancel - /** * INTERNAL API */ @@ -50,14 +39,40 @@ object ActorPublisher { } } +sealed abstract class ActorPublisherMessage + +object ActorPublisherMessage { + /** + * This message is delivered to the [[ActorPublisher]] actor when the stream subscriber requests + * more elements. + * @param n number of requested elements + */ + @SerialVersionUID(1L) case class Request(n: Int) extends ActorPublisherMessage + + /** + * This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the + * subscription. + */ + @SerialVersionUID(1L) case object Cancel extends ActorPublisherMessage + + /** + * Java API: get the singleton instance of the `Cancel` message + */ + def cancelInstance = Cancel + +} + /** * Extend/mixin this trait in your [[akka.actor.Actor]] to make it a * stream publisher that keeps track of the subscription life cycle and * requested elements. * - * Create a [[org.reactivestreams.Publisher]] backed by this actor with [[ActorPublisher#apply]]. + * Create a [[org.reactivestreams.Publisher]] backed by this actor with Scala API [[ActorPublisher#apply]], + * or Java API [[UntypedActorPublisher#create]] or Java API compatible with lambda expressions + * [[AbstractActorPublisher#create]]. + * * It can be attached to a [[org.reactivestreams.Subscriber]] or be used as an input source for a - * [[akka.stream.Flow]]. You can only attach one subscriber to this publisher. + * [[akka.stream.scaladsl.Flow]]. You can only attach one subscriber to this publisher. * * The life cycle state of the subscription is tracked with the following boolean members: * [[#isActive]], [[#isCompleted]], [[#isErrorEmitted]], and [[#isCanceled]]. @@ -86,6 +101,7 @@ object ActorPublisher { trait ActorPublisher[T] extends Actor { import ActorPublisher._ import ActorPublisher.Internal._ + import ActorPublisherMessage._ private val state = ActorPublisherState(context.system) private var subscriber: Subscriber[Any] = _ @@ -99,7 +115,7 @@ trait ActorPublisher[T] extends Actor { * allowed to call [[#onNext]] in this state when [[#totalDemand]] * is greater than zero. */ - final def isActive = lifecycleState == Active || lifecycleState == PreSubscriber + final def isActive: Boolean = lifecycleState == Active || lifecycleState == PreSubscriber /** * Total number of requested elements from the stream subscriber. @@ -186,6 +202,9 @@ trait ActorPublisher[T] extends Actor { case Canceled ⇒ // drop } + /** + * INTERNAL API + */ protected[akka] override def aroundReceive(receive: Receive, msg: Any): Unit = msg match { case Request(n) ⇒ demand += n @@ -212,12 +231,18 @@ trait ActorPublisher[T] extends Actor { super.aroundReceive(receive, msg) } + /** + * INTERNAL API + */ protected[akka] override def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { // some state must survive restart state.set(self, ActorPublisherState.State(Option(subscriber), demand, lifecycleState)) super.aroundPreRestart(reason, message) } + /** + * INTERNAL API + */ protected[akka] override def aroundPostRestart(reason: Throwable): Unit = { state.get(self) foreach { s ⇒ // restore previous state @@ -229,6 +254,9 @@ trait ActorPublisher[T] extends Actor { super.aroundPostRestart(reason) } + /** + * INTERNAL API + */ protected[akka] override def aroundPostStop(): Unit = { state.remove(self) if (lifecycleState == Active) subscriber.onComplete() @@ -253,6 +281,8 @@ private[akka] case class ActorPublisherImpl[T](ref: ActorRef) extends Publisher[ */ private[akka] class ActorPublisherSubscription[T](ref: ActorRef) extends Subscription { import ActorPublisher._ + import ActorPublisherMessage._ + override def request(n: Int): Unit = if (n <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") else ref ! Request(n) @@ -290,3 +320,39 @@ private[akka] class ActorPublisherState extends Extension { def remove(ref: ActorRef): Unit = state.remove(ref) } + +/** + * Java API + */ +object UntypedActorPublisher { + /** + * Java API: Create a [[org.reactivestreams.Publisher]] backed by a [[UntypedActorPublisher]] actor. It can be + * attached to a [[org.reactivestreams.Subscriber]] or be used as an input source for a + * [[akka.stream.javadsl.Flow]]. + */ + def create[T](ref: ActorRef): Publisher[T] = ActorPublisher.apply(ref) +} + +/** + * Java API + * @see [[akka.stream.actor.ActorPublisher]] + */ +abstract class UntypedActorPublisher[T] extends UntypedActor with ActorPublisher[T] + +/** + * Java API compatible with lambda expressions + */ +object AbstractActorPublisher { + /** + * Java API compatible with lambda expressions: Create a [[org.reactivestreams.Publisher]] + * backed by a [[AbstractActorPublisher]] actor. It can be attached to a [[org.reactivestreams.Subscriber]] + * or be used as an input source for a [[akka.stream.javadsl.Flow]]. + */ + def create[T](ref: ActorRef): Publisher[T] = ActorPublisher.apply(ref) +} + +/** + * Java API compatible with lambda expressions + * @see [[akka.stream.actor.ActorPublisher]] + */ +abstract class AbstractActorPublisher[T] extends AbstractActor with ActorPublisher[T] diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala index f35ba15de5..572a1fff79 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala @@ -4,7 +4,8 @@ package akka.stream.actor import java.util.concurrent.ConcurrentHashMap -import org.reactivestreams.{ Publisher, Subscriber, Subscription } +import org.reactivestreams.{ Subscriber, Subscription } +import akka.actor.AbstractActor import akka.actor.Actor import akka.actor.ActorRef import akka.actor.ActorSystem @@ -12,136 +13,159 @@ import akka.actor.ExtendedActorSystem import akka.actor.Extension import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider +import akka.actor.UntypedActor object ActorSubscriber { /** * Attach a [[ActorSubscriber]] actor as a [[org.reactivestreams.Subscriber]] - * to a [[org.reactivestreams.Publisher]] or [[akka.stream.Flow]]. + * to a [[org.reactivestreams.Publisher]] or [[akka.stream.scaladsl.Flow]]. */ def apply[T](ref: ActorRef): Subscriber[T] = new ActorSubscriberImpl(ref) - /** - * Java API: Attach a [[ActorSubscriber]] actor as a [[org.reactivestreams.Subscriber]] - * to a [[org.reactivestreams.Publisher]] or [[akka.stream.Flow]]. - */ - def create[T](ref: ActorRef): Subscriber[T] = apply(ref) - - @SerialVersionUID(1L) case class OnNext(element: Any) - @SerialVersionUID(1L) case object OnComplete - @SerialVersionUID(1L) case class OnError(cause: Throwable) - /** * INTERNAL API */ @SerialVersionUID(1L) private[akka] case class OnSubscribe(subscription: Subscription) - /** - * An [[ActorSubscriber]] defines a `RequestStrategy` to control the stream back pressure. - */ - trait RequestStrategy { - /** - * Invoked by the [[ActorSubscriber]] after each incoming message to - * determine how many more elements to request from the stream. - * - * @param remainingRequested current remaining number of elements that - * have been requested from upstream but not received yet - * @return demand of more elements from the stream, returning 0 means that no - * more elements will be requested - */ - def requestDemand(remainingRequested: Int): Int - } +} + +sealed abstract class ActorSubscriberMessage + +object ActorSubscriberMessage { + @SerialVersionUID(1L) case class OnNext(element: Any) extends ActorSubscriberMessage + @SerialVersionUID(1L) case class OnError(cause: Throwable) extends ActorSubscriberMessage + @SerialVersionUID(1L) case object OnComplete extends ActorSubscriberMessage /** - * Requests one more element when `remainingRequested` is 0, i.e. - * max one element in flight. + * Java API: get the singleton instance of the `OnComplete` message */ - case object OneByOneRequestStrategy extends RequestStrategy { - def requestDemand(remainingRequested: Int): Int = - if (remainingRequested == 0) 1 else 0 - } + def onCompleteInstance = OnComplete +} + +/** + * An [[ActorSubscriber]] defines a `RequestStrategy` to control the stream back pressure. + */ +trait RequestStrategy { + /** + * Invoked by the [[ActorSubscriber]] after each incoming message to + * determine how many more elements to request from the stream. + * + * @param remainingRequested current remaining number of elements that + * have been requested from upstream but not received yet + * @return demand of more elements from the stream, returning 0 means that no + * more elements will be requested for now + */ + def requestDemand(remainingRequested: Int): Int +} + +/** + * Requests one more element when `remainingRequested` is 0, i.e. + * max one element in flight. + */ +case object OneByOneRequestStrategy extends RequestStrategy { + def requestDemand(remainingRequested: Int): Int = + if (remainingRequested == 0) 1 else 0 /** - * When request is only controlled with manual calls to - * [[ActorSubscriber#request]]. + * Java API: get the singleton instance */ - case object ZeroRequestStrategy extends RequestStrategy { - def requestDemand(remainingRequested: Int): Int = 0 - } + def getInstance = this +} - object WatermarkRequestStrategy { - /** - * Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of - * the specifed `highWatermark`. - */ - def apply(highWatermark: Int): WatermarkRequestStrategy = - WatermarkRequestStrategy(highWatermark, lowWatermark = math.max(1, highWatermark / 2)) - } +/** + * When request is only controlled with manual calls to + * [[ActorSubscriber#request]]. + */ +case object ZeroRequestStrategy extends RequestStrategy { + def requestDemand(remainingRequested: Int): Int = 0 /** - * Requests up to the `highWatermark` when the `remainingRequested` is - * below the `lowWatermark`. This a good strategy when the actor performs work itself. + * Java API: get the singleton instance */ - case class WatermarkRequestStrategy(highWatermark: Int, lowWatermark: Int) extends RequestStrategy { - def requestDemand(remainingRequested: Int): Int = - if (remainingRequested < lowWatermark) - highWatermark - remainingRequested - else 0 - } + def getInstance = this +} + +object WatermarkRequestStrategy { + /** + * Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of + * the specifed `highWatermark`. + */ + def apply(highWatermark: Int): WatermarkRequestStrategy = new WatermarkRequestStrategy(highWatermark) +} + +/** + * Requests up to the `highWatermark` when the `remainingRequested` is + * below the `lowWatermark`. This a good strategy when the actor performs work itself. + */ +case class WatermarkRequestStrategy(highWatermark: Int, lowWatermark: Int) extends RequestStrategy { /** - * Requests up to the `max` and also takes the number of messages - * that have been queued internally or delegated to other actors into account. - * Concrete subclass must implement [[#inFlightInternally]]. - * It will request elements in minimum batches of the defined [[#batchSize]]. + * Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of + * the specifed `highWatermark`. */ - abstract class MaxInFlightRequestStrategy(max: Int) extends RequestStrategy { + def this(highWatermark: Int) = this(highWatermark, lowWatermark = math.max(1, highWatermark / 2)) - /** - * Concrete subclass must implement this method to define how many - * messages that are currently in progress or queued. - */ - def inFlightInternally: Int + def requestDemand(remainingRequested: Int): Int = + if (remainingRequested < lowWatermark) + highWatermark - remainingRequested + else 0 +} - /** - * Elements will be requested in minimum batches of this size. - * Default is 5. Subclass may override to define the batch size. - */ - def batchSize: Int = 5 +/** + * Requests up to the `max` and also takes the number of messages + * that have been queued internally or delegated to other actors into account. + * Concrete subclass must implement [[#inFlightInternally]]. + * It will request elements in minimum batches of the defined [[#batchSize]]. + */ +abstract class MaxInFlightRequestStrategy(max: Int) extends RequestStrategy { - override def requestDemand(remainingRequested: Int): Int = { - val batch = math.min(batchSize, max) - if ((remainingRequested + inFlightInternally) <= (max - batch)) - math.max(0, max - remainingRequested - inFlightInternally) - else 0 - } + /** + * Concrete subclass must implement this method to define how many + * messages that are currently in progress or queued. + */ + def inFlightInternally: Int + + /** + * Elements will be requested in minimum batches of this size. + * Default is 5. Subclass may override to define the batch size. + */ + def batchSize: Int = 5 + + override def requestDemand(remainingRequested: Int): Int = { + val batch = math.min(batchSize, max) + if ((remainingRequested + inFlightInternally) <= (max - batch)) + math.max(0, max - remainingRequested - inFlightInternally) + else 0 } } /** * Extend/mixin this trait in your [[akka.actor.Actor]] to make it a * stream subscriber with full control of stream back pressure. It will receive - * [[ActorSubscriber.OnNext]], [[ActorSubscriber.OnComplete]] and [[ActorSubscriber.OnError]] + * [[ActorSubscriberMessage.OnNext]], [[ActorSubscriberMessage.OnComplete]] and [[ActorSubscriberMessage.OnError]] * messages from the stream. It can also receive other, non-stream messages, in * the same way as any actor. * * Attach the actor as a [[org.reactivestreams.Subscriber]] to the stream with - * [[ActorSubscriber#apply]]. + * Scala API [[ActorSubscriber#apply]], or Java API [[UntypedActorSubscriber#create]] or + * Java API compatible with lambda expressions [[AbstractActorSubscriber#create]]. * * Subclass must define the [[RequestStrategy]] to control stream back pressure. * After each incoming message the `ActorSubscriber` will automatically invoke * the [[RequestStrategy#requestDemand]] and propagate the returned demand to the stream. - * The provided [[ActorSubscriber.WatermarkRequestStrategy]] is a good strategy if the actor + * The provided [[WatermarkRequestStrategy]] is a good strategy if the actor * performs work itself. - * The provided [[ActorSubscriber.MaxInFlightRequestStrategy]] is useful if messages are + * The provided [[MaxInFlightRequestStrategy]] is useful if messages are * queued internally or delegated to other actors. * You can also implement a custom [[RequestStrategy]] or call [[#request]] manually - * together with [[ActorSubscriber.ZeroRequestStrategy]] or some other strategy. In that case + * together with [[ZeroRequestStrategy]] or some other strategy. In that case * you must also call [[#request]] when the actor is started or when it is ready, otherwise * it will not receive any elements. */ trait ActorSubscriber extends Actor { import ActorSubscriber._ + import ActorSubscriberMessage._ private val state = ActorSubscriberState(context.system) private var subscription: Option[Subscription] = None @@ -150,6 +174,9 @@ trait ActorSubscriber extends Actor { protected def requestStrategy: RequestStrategy + /** + * INTERNAL API + */ protected[akka] override def aroundReceive(receive: Receive, msg: Any): Unit = msg match { case _: OnNext ⇒ requested -= 1 @@ -173,11 +200,17 @@ trait ActorSubscriber extends Actor { request(requestStrategy.requestDemand(remainingRequested)) } + /** + * INTERNAL API + */ protected[akka] override def aroundPreStart(): Unit = { super.aroundPreStart() request(requestStrategy.requestDemand(remainingRequested)) } + /** + * INTERNAL API + */ protected[akka] override def aroundPostRestart(reason: Throwable): Unit = { state.get(self) foreach { s ⇒ // restore previous state @@ -190,12 +223,18 @@ trait ActorSubscriber extends Actor { request(requestStrategy.requestDemand(remainingRequested)) } + /** + * INTERNAL API + */ protected[akka] override def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { // some state must survive restart state.set(self, ActorSubscriberState.State(subscription, requested, canceled)) super.aroundPreRestart(reason, message) } + /** + * INTERNAL API + */ protected[akka] override def aroundPostStop(): Unit = { state.remove(self) if (!canceled) subscription.foreach(_.cancel()) @@ -232,9 +271,10 @@ trait ActorSubscriber extends Actor { * INTERNAL API */ private[akka] final class ActorSubscriberImpl[T](val impl: ActorRef) extends Subscriber[T] { - override def onError(cause: Throwable): Unit = impl ! ActorSubscriber.OnError(cause) - override def onComplete(): Unit = impl ! ActorSubscriber.OnComplete - override def onNext(element: T): Unit = impl ! ActorSubscriber.OnNext(element) + import ActorSubscriberMessage._ + override def onError(cause: Throwable): Unit = impl ! OnError(cause) + override def onComplete(): Unit = impl ! OnComplete + override def onNext(element: T): Unit = impl ! OnNext(element) override def onSubscribe(subscription: Subscription): Unit = impl ! ActorSubscriber.OnSubscribe(subscription) } @@ -267,3 +307,38 @@ private[akka] class ActorSubscriberState extends Extension { def remove(ref: ActorRef): Unit = state.remove(ref) } + +/** + * Java API + */ +object UntypedActorSubscriber { + /** + * Java API: Attach a [[UntypedActorSubscriber]] actor as a [[org.reactivestreams.Subscriber]] + * to a [[org.reactivestreams.Publisher]] or [[akka.stream.javadsl.Flow]]. + */ + def create[T](ref: ActorRef): Subscriber[T] = ActorSubscriber.apply(ref) +} + +/** + * Java API + * @see [[akka.stream.actor.ActorSubscriber]] + */ +abstract class UntypedActorSubscriber extends UntypedActor with ActorSubscriber + +/** + * Java API compatible with lambda expressions + */ +object AbstractActorSubscriber { + /** + * Java API compatible with lambda expressions: Attach a [[AbstractActorSubscriber]] actor + * as a [[org.reactivestreams.Subscriber]] o a [[org.reactivestreams.Publisher]] or + * [[akka.stream.javadsl.Flow]]. + */ + def create[T](ref: ActorRef): Subscriber[T] = ActorSubscriber.apply(ref) +} + +/** + * Java API compatible with lambda expressions + * @see [[akka.stream.actor.ActorSubscriber]] + */ +abstract class AbstractActorSubscriber extends AbstractActor with ActorSubscriber diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index bb27554709..b3c97c123f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -6,7 +6,8 @@ package akka.stream.impl import org.reactivestreams.{ Publisher, Subscriber, Subscription, Processor } import akka.actor._ import akka.stream.MaterializerSettings -import akka.stream.actor.ActorSubscriber.{ OnSubscribe, OnNext, OnComplete, OnError } +import akka.stream.actor.ActorSubscriber.OnSubscribe +import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnComplete, OnError } import java.util.Arrays import akka.stream.TimerTransformer diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index 86f8addbc6..c336b820d4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -6,7 +6,8 @@ package akka.stream.impl import akka.stream.MaterializerSettings import akka.actor.{ Actor, Terminated, ActorRef } import org.reactivestreams.{ Publisher, Subscriber, Subscription } -import akka.stream.actor.ActorSubscriber.{ OnNext, OnError, OnComplete, OnSubscribe } +import akka.stream.actor.ActorSubscriber.OnSubscribe +import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete } import akka.actor.Stash /** diff --git a/akka-stream/src/test/java/akka/stream/actor/ActorPublisherTest.java b/akka-stream/src/test/java/akka/stream/actor/ActorPublisherTest.java new file mode 100644 index 0000000000..dc29397c6e --- /dev/null +++ b/akka-stream/src/test/java/akka/stream/actor/ActorPublisherTest.java @@ -0,0 +1,61 @@ +package akka.stream.actor; + +import org.reactivestreams.Publisher; +import org.junit.ClassRule; +import org.junit.Test; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.stream.FlowMaterializer; +import akka.stream.MaterializerSettings; +import akka.stream.javadsl.AkkaJUnitActorSystemResource; +import akka.stream.javadsl.Flow; +import akka.stream.testkit.AkkaSpec; +import akka.testkit.JavaTestKit; +import akka.japi.Procedure; + +import static akka.stream.actor.ActorPublisherMessage.Request; + +public class ActorPublisherTest { + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest", + AkkaSpec.testConf()); + + public static class TestPublisher extends UntypedActorPublisher { + + @Override + public void onReceive(Object msg) { + if (msg instanceof Request) { + onNext(1); + onComplete(); + } else if (msg == ActorPublisherMessage.cancelInstance()) { + getContext().stop(getSelf()); + } else { + unhandled(msg); + } + } + + } + + final ActorSystem system = actorSystemResource.getSystem(); + + final MaterializerSettings settings = MaterializerSettings.create().withDispatcher("akka.test.stream-dispatcher"); + final FlowMaterializer materializer = FlowMaterializer.create(settings, system); + + @Test + public void mustHaveJavaAPI() { + final JavaTestKit probe = new JavaTestKit(system); + final ActorRef ref = system + .actorOf(Props.create(TestPublisher.class).withDispatcher("akka.test.stream-dispatcher")); + final Publisher publisher = UntypedActorPublisher.create(ref); + Flow.create(publisher).foreach(new Procedure() { + public void apply(Integer elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + probe.expectMsgEquals(1); + } + +} diff --git a/akka-stream/src/test/java/akka/stream/actor/ActorSubscriberTest.java b/akka-stream/src/test/java/akka/stream/actor/ActorSubscriberTest.java new file mode 100644 index 0000000000..a8fc39ae1f --- /dev/null +++ b/akka-stream/src/test/java/akka/stream/actor/ActorSubscriberTest.java @@ -0,0 +1,79 @@ +package akka.stream.actor; + +import org.reactivestreams.Subscriber; +import org.junit.ClassRule; +import org.junit.Test; +import java.util.Arrays; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.stream.FlowMaterializer; +import akka.stream.MaterializerSettings; +import akka.stream.javadsl.AkkaJUnitActorSystemResource; +import akka.stream.javadsl.Flow; +import akka.stream.testkit.AkkaSpec; +import akka.testkit.JavaTestKit; +import akka.japi.Procedure; + +import static akka.stream.actor.ActorSubscriberMessage.OnNext; +import static akka.stream.actor.ActorSubscriberMessage.OnError; + +public class ActorSubscriberTest { + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest", + AkkaSpec.testConf()); + + public static class TestSubscriber extends UntypedActorSubscriber { + + final ActorRef probe; + + public TestSubscriber(ActorRef probe) { + this.probe = probe; + } + + @Override + public RequestStrategy requestStrategy() { + return ZeroRequestStrategy.getInstance(); + } + + @Override + public void onReceive(Object msg) { + if (msg.equals("run")) { + request(4); + } else if (msg instanceof OnNext) { + probe.tell(((OnNext) msg).element(), getSelf()); + } else if (msg == ActorSubscriberMessage.onCompleteInstance()) { + probe.tell("done", getSelf()); + getContext().stop(getSelf()); + } else if (msg instanceof OnError) { + probe.tell("err", getSelf()); + getContext().stop(getSelf()); + } else { + unhandled(msg); + } + } + } + + final ActorSystem system = actorSystemResource.getSystem(); + + final MaterializerSettings settings = MaterializerSettings.create().withDispatcher("akka.test.stream-dispatcher"); + final FlowMaterializer materializer = FlowMaterializer.create(settings, system); + + @Test + public void mustHaveJavaAPI() { + final JavaTestKit probe = new JavaTestKit(system); + final ActorRef ref = system.actorOf(Props.create(TestSubscriber.class, probe.getRef()).withDispatcher( + "akka.test.stream-dispatcher")); + final Subscriber subscriber = UntypedActorSubscriber.create(ref); + final java.util.Iterator input = Arrays.asList(1, 2, 3).iterator(); + Flow.create(input).produceTo(subscriber, materializer); + ref.tell("run", null); + probe.expectMsgEquals(1); + probe.expectMsgEquals(2); + probe.expectMsgEquals(3); + probe.expectMsgEquals("done"); + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala index 75732e67a0..a3ca5cd8fe 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala @@ -6,7 +6,7 @@ package akka.stream import akka.stream.testkit.AkkaSpec import akka.stream.testkit.ScriptedTest import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } -import akka.stream.actor.ActorSubscriber.{ OnNext, OnComplete } +import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnComplete } import akka.stream.impl.RequestMore class FlowTakeSpec extends AkkaSpec with ScriptedTest { diff --git a/akka-stream/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala index c37bdb4861..f42bee3686 100644 --- a/akka-stream/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala @@ -10,7 +10,6 @@ import akka.actor.PoisonPill import akka.actor.Props import akka.stream.FlowMaterializer import akka.stream.MaterializerSettings -import akka.stream.actor.ActorSubscriber.WatermarkRequestStrategy import akka.stream.scaladsl.Flow import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit @@ -32,6 +31,7 @@ object ActorPublisherSpec { class TestPublisher(probe: ActorRef) extends ActorPublisher[String] { import ActorPublisher._ + import ActorPublisherMessage._ def receive = { case Request(element) ⇒ probe ! TotalDemand(totalDemand) @@ -45,8 +45,7 @@ object ActorPublisherSpec { def senderProps: Props = Props[Sender].withDispatcher("akka.test.stream-dispatcher") class Sender extends ActorPublisher[Int] { - import ActorPublisher.Cancel - import ActorPublisher.Request + import ActorPublisherMessage._ var buf = Vector.empty[Int] @@ -76,7 +75,7 @@ object ActorPublisherSpec { Props(new Receiver(probe)).withDispatcher("akka.test.stream-dispatcher") class Receiver(probe: ActorRef) extends ActorSubscriber { - import ActorSubscriber._ + import ActorSubscriberMessage._ override val requestStrategy = WatermarkRequestStrategy(10) diff --git a/akka-stream/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala b/akka-stream/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala index 6540f42561..515f08192d 100644 --- a/akka-stream/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala @@ -10,7 +10,6 @@ import akka.stream.FlowMaterializer import akka.stream.MaterializerSettings import akka.stream.scaladsl.Flow import akka.stream.testkit.AkkaSpec -import akka.stream.actor.ActorSubscriber.RequestStrategy import akka.actor.Actor import akka.routing.ActorRefRoutee import akka.routing.Router @@ -24,7 +23,7 @@ object ActorSubscriberSpec { Props(new ManualSubscriber(probe)).withDispatcher("akka.test.stream-dispatcher") class ManualSubscriber(probe: ActorRef) extends ActorSubscriber { - import ActorSubscriber._ + import ActorSubscriberMessage._ override val requestStrategy = ZeroRequestStrategy @@ -42,7 +41,7 @@ object ActorSubscriberSpec { Props(new RequestStrategySubscriber(probe, strat)).withDispatcher("akka.test.stream-dispatcher") class RequestStrategySubscriber(probe: ActorRef, strat: RequestStrategy) extends ActorSubscriber { - import ActorSubscriber._ + import ActorSubscriberMessage._ override val requestStrategy = strat @@ -61,7 +60,7 @@ object ActorSubscriberSpec { Props(new Streamer).withDispatcher("akka.test.stream-dispatcher") class Streamer extends ActorSubscriber { - import ActorSubscriber._ + import ActorSubscriberMessage._ var queue = Map.empty[Int, ActorRef] val router = { @@ -98,7 +97,7 @@ object ActorSubscriberSpec { @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { import ActorSubscriberSpec._ - import ActorSubscriber._ + import ActorSubscriberMessage._ implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))