!str #15289 Java API for ActorPublisher and ActorSubscriber

* had to change api because of trait+object not useable from java
* ActorSubscriber.RequestStrategy and its implementations moved to
  top level
* messages moved to ActorSubscriberMessage and ActorPublisherMessage
  object
This commit is contained in:
Patrik Nordwall 2014-08-18 13:19:30 +02:00
parent 587af91ff9
commit b6a915a68c
9 changed files with 393 additions and 112 deletions

View file

@ -5,6 +5,7 @@ package akka.stream.actor
import java.util.concurrent.ConcurrentHashMap
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import akka.actor.AbstractActor
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
@ -12,29 +13,17 @@ import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.UntypedActor
object ActorPublisher {
/**
* Create a [[org.reactivestreams.Publisher]] backed by a [[ActorPublisher]] actor. It can be
* attached to a [[org.reactivestreams.Subscriber]] or be used as an input source for a
* [[akka.stream.Flow]].
* [[akka.stream.scaladsl.Flow]].
*/
def apply[T](ref: ActorRef): Publisher[T] = ActorPublisherImpl(ref)
/**
* This message is delivered to the [[ActorPublisher]] actor when the stream subscriber requests
* more elements.
* @param n number of requested elements
*/
@SerialVersionUID(1L) case class Request(n: Int)
/**
* This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the
* subscription.
*/
@SerialVersionUID(1L) case object Cancel
/**
* INTERNAL API
*/
@ -50,14 +39,40 @@ object ActorPublisher {
}
}
sealed abstract class ActorPublisherMessage
object ActorPublisherMessage {
/**
* This message is delivered to the [[ActorPublisher]] actor when the stream subscriber requests
* more elements.
* @param n number of requested elements
*/
@SerialVersionUID(1L) case class Request(n: Int) extends ActorPublisherMessage
/**
* This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the
* subscription.
*/
@SerialVersionUID(1L) case object Cancel extends ActorPublisherMessage
/**
* Java API: get the singleton instance of the `Cancel` message
*/
def cancelInstance = Cancel
}
/**
* Extend/mixin this trait in your [[akka.actor.Actor]] to make it a
* stream publisher that keeps track of the subscription life cycle and
* requested elements.
*
* Create a [[org.reactivestreams.Publisher]] backed by this actor with [[ActorPublisher#apply]].
* Create a [[org.reactivestreams.Publisher]] backed by this actor with Scala API [[ActorPublisher#apply]],
* or Java API [[UntypedActorPublisher#create]] or Java API compatible with lambda expressions
* [[AbstractActorPublisher#create]].
*
* It can be attached to a [[org.reactivestreams.Subscriber]] or be used as an input source for a
* [[akka.stream.Flow]]. You can only attach one subscriber to this publisher.
* [[akka.stream.scaladsl.Flow]]. You can only attach one subscriber to this publisher.
*
* The life cycle state of the subscription is tracked with the following boolean members:
* [[#isActive]], [[#isCompleted]], [[#isErrorEmitted]], and [[#isCanceled]].
@ -86,6 +101,7 @@ object ActorPublisher {
trait ActorPublisher[T] extends Actor {
import ActorPublisher._
import ActorPublisher.Internal._
import ActorPublisherMessage._
private val state = ActorPublisherState(context.system)
private var subscriber: Subscriber[Any] = _
@ -99,7 +115,7 @@ trait ActorPublisher[T] extends Actor {
* allowed to call [[#onNext]] in this state when [[#totalDemand]]
* is greater than zero.
*/
final def isActive = lifecycleState == Active || lifecycleState == PreSubscriber
final def isActive: Boolean = lifecycleState == Active || lifecycleState == PreSubscriber
/**
* Total number of requested elements from the stream subscriber.
@ -186,6 +202,9 @@ trait ActorPublisher[T] extends Actor {
case Canceled // drop
}
/**
* INTERNAL API
*/
protected[akka] override def aroundReceive(receive: Receive, msg: Any): Unit = msg match {
case Request(n)
demand += n
@ -212,12 +231,18 @@ trait ActorPublisher[T] extends Actor {
super.aroundReceive(receive, msg)
}
/**
* INTERNAL API
*/
protected[akka] override def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
// some state must survive restart
state.set(self, ActorPublisherState.State(Option(subscriber), demand, lifecycleState))
super.aroundPreRestart(reason, message)
}
/**
* INTERNAL API
*/
protected[akka] override def aroundPostRestart(reason: Throwable): Unit = {
state.get(self) foreach { s
// restore previous state
@ -229,6 +254,9 @@ trait ActorPublisher[T] extends Actor {
super.aroundPostRestart(reason)
}
/**
* INTERNAL API
*/
protected[akka] override def aroundPostStop(): Unit = {
state.remove(self)
if (lifecycleState == Active) subscriber.onComplete()
@ -253,6 +281,8 @@ private[akka] case class ActorPublisherImpl[T](ref: ActorRef) extends Publisher[
*/
private[akka] class ActorPublisherSubscription[T](ref: ActorRef) extends Subscription {
import ActorPublisher._
import ActorPublisherMessage._
override def request(n: Int): Unit =
if (n <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0")
else ref ! Request(n)
@ -290,3 +320,39 @@ private[akka] class ActorPublisherState extends Extension {
def remove(ref: ActorRef): Unit = state.remove(ref)
}
/**
* Java API
*/
object UntypedActorPublisher {
/**
* Java API: Create a [[org.reactivestreams.Publisher]] backed by a [[UntypedActorPublisher]] actor. It can be
* attached to a [[org.reactivestreams.Subscriber]] or be used as an input source for a
* [[akka.stream.javadsl.Flow]].
*/
def create[T](ref: ActorRef): Publisher[T] = ActorPublisher.apply(ref)
}
/**
* Java API
* @see [[akka.stream.actor.ActorPublisher]]
*/
abstract class UntypedActorPublisher[T] extends UntypedActor with ActorPublisher[T]
/**
* Java API compatible with lambda expressions
*/
object AbstractActorPublisher {
/**
* Java API compatible with lambda expressions: Create a [[org.reactivestreams.Publisher]]
* backed by a [[AbstractActorPublisher]] actor. It can be attached to a [[org.reactivestreams.Subscriber]]
* or be used as an input source for a [[akka.stream.javadsl.Flow]].
*/
def create[T](ref: ActorRef): Publisher[T] = ActorPublisher.apply(ref)
}
/**
* Java API compatible with lambda expressions
* @see [[akka.stream.actor.ActorPublisher]]
*/
abstract class AbstractActorPublisher[T] extends AbstractActor with ActorPublisher[T]

View file

@ -4,7 +4,8 @@
package akka.stream.actor
import java.util.concurrent.ConcurrentHashMap
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import org.reactivestreams.{ Subscriber, Subscription }
import akka.actor.AbstractActor
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
@ -12,136 +13,159 @@ import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.UntypedActor
object ActorSubscriber {
/**
* Attach a [[ActorSubscriber]] actor as a [[org.reactivestreams.Subscriber]]
* to a [[org.reactivestreams.Publisher]] or [[akka.stream.Flow]].
* to a [[org.reactivestreams.Publisher]] or [[akka.stream.scaladsl.Flow]].
*/
def apply[T](ref: ActorRef): Subscriber[T] = new ActorSubscriberImpl(ref)
/**
* Java API: Attach a [[ActorSubscriber]] actor as a [[org.reactivestreams.Subscriber]]
* to a [[org.reactivestreams.Publisher]] or [[akka.stream.Flow]].
*/
def create[T](ref: ActorRef): Subscriber[T] = apply(ref)
@SerialVersionUID(1L) case class OnNext(element: Any)
@SerialVersionUID(1L) case object OnComplete
@SerialVersionUID(1L) case class OnError(cause: Throwable)
/**
* INTERNAL API
*/
@SerialVersionUID(1L) private[akka] case class OnSubscribe(subscription: Subscription)
/**
* An [[ActorSubscriber]] defines a `RequestStrategy` to control the stream back pressure.
*/
trait RequestStrategy {
/**
* Invoked by the [[ActorSubscriber]] after each incoming message to
* determine how many more elements to request from the stream.
*
* @param remainingRequested current remaining number of elements that
* have been requested from upstream but not received yet
* @return demand of more elements from the stream, returning 0 means that no
* more elements will be requested
*/
def requestDemand(remainingRequested: Int): Int
}
}
sealed abstract class ActorSubscriberMessage
object ActorSubscriberMessage {
@SerialVersionUID(1L) case class OnNext(element: Any) extends ActorSubscriberMessage
@SerialVersionUID(1L) case class OnError(cause: Throwable) extends ActorSubscriberMessage
@SerialVersionUID(1L) case object OnComplete extends ActorSubscriberMessage
/**
* Requests one more element when `remainingRequested` is 0, i.e.
* max one element in flight.
* Java API: get the singleton instance of the `OnComplete` message
*/
case object OneByOneRequestStrategy extends RequestStrategy {
def requestDemand(remainingRequested: Int): Int =
if (remainingRequested == 0) 1 else 0
}
def onCompleteInstance = OnComplete
}
/**
* An [[ActorSubscriber]] defines a `RequestStrategy` to control the stream back pressure.
*/
trait RequestStrategy {
/**
* Invoked by the [[ActorSubscriber]] after each incoming message to
* determine how many more elements to request from the stream.
*
* @param remainingRequested current remaining number of elements that
* have been requested from upstream but not received yet
* @return demand of more elements from the stream, returning 0 means that no
* more elements will be requested for now
*/
def requestDemand(remainingRequested: Int): Int
}
/**
* Requests one more element when `remainingRequested` is 0, i.e.
* max one element in flight.
*/
case object OneByOneRequestStrategy extends RequestStrategy {
def requestDemand(remainingRequested: Int): Int =
if (remainingRequested == 0) 1 else 0
/**
* When request is only controlled with manual calls to
* [[ActorSubscriber#request]].
* Java API: get the singleton instance
*/
case object ZeroRequestStrategy extends RequestStrategy {
def requestDemand(remainingRequested: Int): Int = 0
}
def getInstance = this
}
object WatermarkRequestStrategy {
/**
* Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of
* the specifed `highWatermark`.
*/
def apply(highWatermark: Int): WatermarkRequestStrategy =
WatermarkRequestStrategy(highWatermark, lowWatermark = math.max(1, highWatermark / 2))
}
/**
* When request is only controlled with manual calls to
* [[ActorSubscriber#request]].
*/
case object ZeroRequestStrategy extends RequestStrategy {
def requestDemand(remainingRequested: Int): Int = 0
/**
* Requests up to the `highWatermark` when the `remainingRequested` is
* below the `lowWatermark`. This a good strategy when the actor performs work itself.
* Java API: get the singleton instance
*/
case class WatermarkRequestStrategy(highWatermark: Int, lowWatermark: Int) extends RequestStrategy {
def requestDemand(remainingRequested: Int): Int =
if (remainingRequested < lowWatermark)
highWatermark - remainingRequested
else 0
}
def getInstance = this
}
object WatermarkRequestStrategy {
/**
* Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of
* the specifed `highWatermark`.
*/
def apply(highWatermark: Int): WatermarkRequestStrategy = new WatermarkRequestStrategy(highWatermark)
}
/**
* Requests up to the `highWatermark` when the `remainingRequested` is
* below the `lowWatermark`. This a good strategy when the actor performs work itself.
*/
case class WatermarkRequestStrategy(highWatermark: Int, lowWatermark: Int) extends RequestStrategy {
/**
* Requests up to the `max` and also takes the number of messages
* that have been queued internally or delegated to other actors into account.
* Concrete subclass must implement [[#inFlightInternally]].
* It will request elements in minimum batches of the defined [[#batchSize]].
* Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of
* the specifed `highWatermark`.
*/
abstract class MaxInFlightRequestStrategy(max: Int) extends RequestStrategy {
def this(highWatermark: Int) = this(highWatermark, lowWatermark = math.max(1, highWatermark / 2))
/**
* Concrete subclass must implement this method to define how many
* messages that are currently in progress or queued.
*/
def inFlightInternally: Int
def requestDemand(remainingRequested: Int): Int =
if (remainingRequested < lowWatermark)
highWatermark - remainingRequested
else 0
}
/**
* Elements will be requested in minimum batches of this size.
* Default is 5. Subclass may override to define the batch size.
*/
def batchSize: Int = 5
/**
* Requests up to the `max` and also takes the number of messages
* that have been queued internally or delegated to other actors into account.
* Concrete subclass must implement [[#inFlightInternally]].
* It will request elements in minimum batches of the defined [[#batchSize]].
*/
abstract class MaxInFlightRequestStrategy(max: Int) extends RequestStrategy {
override def requestDemand(remainingRequested: Int): Int = {
val batch = math.min(batchSize, max)
if ((remainingRequested + inFlightInternally) <= (max - batch))
math.max(0, max - remainingRequested - inFlightInternally)
else 0
}
/**
* Concrete subclass must implement this method to define how many
* messages that are currently in progress or queued.
*/
def inFlightInternally: Int
/**
* Elements will be requested in minimum batches of this size.
* Default is 5. Subclass may override to define the batch size.
*/
def batchSize: Int = 5
override def requestDemand(remainingRequested: Int): Int = {
val batch = math.min(batchSize, max)
if ((remainingRequested + inFlightInternally) <= (max - batch))
math.max(0, max - remainingRequested - inFlightInternally)
else 0
}
}
/**
* Extend/mixin this trait in your [[akka.actor.Actor]] to make it a
* stream subscriber with full control of stream back pressure. It will receive
* [[ActorSubscriber.OnNext]], [[ActorSubscriber.OnComplete]] and [[ActorSubscriber.OnError]]
* [[ActorSubscriberMessage.OnNext]], [[ActorSubscriberMessage.OnComplete]] and [[ActorSubscriberMessage.OnError]]
* messages from the stream. It can also receive other, non-stream messages, in
* the same way as any actor.
*
* Attach the actor as a [[org.reactivestreams.Subscriber]] to the stream with
* [[ActorSubscriber#apply]].
* Scala API [[ActorSubscriber#apply]], or Java API [[UntypedActorSubscriber#create]] or
* Java API compatible with lambda expressions [[AbstractActorSubscriber#create]].
*
* Subclass must define the [[RequestStrategy]] to control stream back pressure.
* After each incoming message the `ActorSubscriber` will automatically invoke
* the [[RequestStrategy#requestDemand]] and propagate the returned demand to the stream.
* The provided [[ActorSubscriber.WatermarkRequestStrategy]] is a good strategy if the actor
* The provided [[WatermarkRequestStrategy]] is a good strategy if the actor
* performs work itself.
* The provided [[ActorSubscriber.MaxInFlightRequestStrategy]] is useful if messages are
* The provided [[MaxInFlightRequestStrategy]] is useful if messages are
* queued internally or delegated to other actors.
* You can also implement a custom [[RequestStrategy]] or call [[#request]] manually
* together with [[ActorSubscriber.ZeroRequestStrategy]] or some other strategy. In that case
* together with [[ZeroRequestStrategy]] or some other strategy. In that case
* you must also call [[#request]] when the actor is started or when it is ready, otherwise
* it will not receive any elements.
*/
trait ActorSubscriber extends Actor {
import ActorSubscriber._
import ActorSubscriberMessage._
private val state = ActorSubscriberState(context.system)
private var subscription: Option[Subscription] = None
@ -150,6 +174,9 @@ trait ActorSubscriber extends Actor {
protected def requestStrategy: RequestStrategy
/**
* INTERNAL API
*/
protected[akka] override def aroundReceive(receive: Receive, msg: Any): Unit = msg match {
case _: OnNext
requested -= 1
@ -173,11 +200,17 @@ trait ActorSubscriber extends Actor {
request(requestStrategy.requestDemand(remainingRequested))
}
/**
* INTERNAL API
*/
protected[akka] override def aroundPreStart(): Unit = {
super.aroundPreStart()
request(requestStrategy.requestDemand(remainingRequested))
}
/**
* INTERNAL API
*/
protected[akka] override def aroundPostRestart(reason: Throwable): Unit = {
state.get(self) foreach { s
// restore previous state
@ -190,12 +223,18 @@ trait ActorSubscriber extends Actor {
request(requestStrategy.requestDemand(remainingRequested))
}
/**
* INTERNAL API
*/
protected[akka] override def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
// some state must survive restart
state.set(self, ActorSubscriberState.State(subscription, requested, canceled))
super.aroundPreRestart(reason, message)
}
/**
* INTERNAL API
*/
protected[akka] override def aroundPostStop(): Unit = {
state.remove(self)
if (!canceled) subscription.foreach(_.cancel())
@ -232,9 +271,10 @@ trait ActorSubscriber extends Actor {
* INTERNAL API
*/
private[akka] final class ActorSubscriberImpl[T](val impl: ActorRef) extends Subscriber[T] {
override def onError(cause: Throwable): Unit = impl ! ActorSubscriber.OnError(cause)
override def onComplete(): Unit = impl ! ActorSubscriber.OnComplete
override def onNext(element: T): Unit = impl ! ActorSubscriber.OnNext(element)
import ActorSubscriberMessage._
override def onError(cause: Throwable): Unit = impl ! OnError(cause)
override def onComplete(): Unit = impl ! OnComplete
override def onNext(element: T): Unit = impl ! OnNext(element)
override def onSubscribe(subscription: Subscription): Unit = impl ! ActorSubscriber.OnSubscribe(subscription)
}
@ -267,3 +307,38 @@ private[akka] class ActorSubscriberState extends Extension {
def remove(ref: ActorRef): Unit = state.remove(ref)
}
/**
* Java API
*/
object UntypedActorSubscriber {
/**
* Java API: Attach a [[UntypedActorSubscriber]] actor as a [[org.reactivestreams.Subscriber]]
* to a [[org.reactivestreams.Publisher]] or [[akka.stream.javadsl.Flow]].
*/
def create[T](ref: ActorRef): Subscriber[T] = ActorSubscriber.apply(ref)
}
/**
* Java API
* @see [[akka.stream.actor.ActorSubscriber]]
*/
abstract class UntypedActorSubscriber extends UntypedActor with ActorSubscriber
/**
* Java API compatible with lambda expressions
*/
object AbstractActorSubscriber {
/**
* Java API compatible with lambda expressions: Attach a [[AbstractActorSubscriber]] actor
* as a [[org.reactivestreams.Subscriber]] o a [[org.reactivestreams.Publisher]] or
* [[akka.stream.javadsl.Flow]].
*/
def create[T](ref: ActorRef): Subscriber[T] = ActorSubscriber.apply(ref)
}
/**
* Java API compatible with lambda expressions
* @see [[akka.stream.actor.ActorSubscriber]]
*/
abstract class AbstractActorSubscriber extends AbstractActor with ActorSubscriber

View file

@ -6,7 +6,8 @@ package akka.stream.impl
import org.reactivestreams.{ Publisher, Subscriber, Subscription, Processor }
import akka.actor._
import akka.stream.MaterializerSettings
import akka.stream.actor.ActorSubscriber.{ OnSubscribe, OnNext, OnComplete, OnError }
import akka.stream.actor.ActorSubscriber.OnSubscribe
import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnComplete, OnError }
import java.util.Arrays
import akka.stream.TimerTransformer

View file

@ -6,7 +6,8 @@ package akka.stream.impl
import akka.stream.MaterializerSettings
import akka.actor.{ Actor, Terminated, ActorRef }
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import akka.stream.actor.ActorSubscriber.{ OnNext, OnError, OnComplete, OnSubscribe }
import akka.stream.actor.ActorSubscriber.OnSubscribe
import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete }
import akka.actor.Stash
/**

View file

@ -0,0 +1,61 @@
package akka.stream.actor;
import org.reactivestreams.Publisher;
import org.junit.ClassRule;
import org.junit.Test;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.stream.FlowMaterializer;
import akka.stream.MaterializerSettings;
import akka.stream.javadsl.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Flow;
import akka.stream.testkit.AkkaSpec;
import akka.testkit.JavaTestKit;
import akka.japi.Procedure;
import static akka.stream.actor.ActorPublisherMessage.Request;
public class ActorPublisherTest {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest",
AkkaSpec.testConf());
public static class TestPublisher extends UntypedActorPublisher<Integer> {
@Override
public void onReceive(Object msg) {
if (msg instanceof Request) {
onNext(1);
onComplete();
} else if (msg == ActorPublisherMessage.cancelInstance()) {
getContext().stop(getSelf());
} else {
unhandled(msg);
}
}
}
final ActorSystem system = actorSystemResource.getSystem();
final MaterializerSettings settings = MaterializerSettings.create().withDispatcher("akka.test.stream-dispatcher");
final FlowMaterializer materializer = FlowMaterializer.create(settings, system);
@Test
public void mustHaveJavaAPI() {
final JavaTestKit probe = new JavaTestKit(system);
final ActorRef ref = system
.actorOf(Props.create(TestPublisher.class).withDispatcher("akka.test.stream-dispatcher"));
final Publisher<Integer> publisher = UntypedActorPublisher.create(ref);
Flow.create(publisher).foreach(new Procedure<Integer>() {
public void apply(Integer elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}, materializer);
probe.expectMsgEquals(1);
}
}

View file

@ -0,0 +1,79 @@
package akka.stream.actor;
import org.reactivestreams.Subscriber;
import org.junit.ClassRule;
import org.junit.Test;
import java.util.Arrays;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.stream.FlowMaterializer;
import akka.stream.MaterializerSettings;
import akka.stream.javadsl.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Flow;
import akka.stream.testkit.AkkaSpec;
import akka.testkit.JavaTestKit;
import akka.japi.Procedure;
import static akka.stream.actor.ActorSubscriberMessage.OnNext;
import static akka.stream.actor.ActorSubscriberMessage.OnError;
public class ActorSubscriberTest {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest",
AkkaSpec.testConf());
public static class TestSubscriber extends UntypedActorSubscriber {
final ActorRef probe;
public TestSubscriber(ActorRef probe) {
this.probe = probe;
}
@Override
public RequestStrategy requestStrategy() {
return ZeroRequestStrategy.getInstance();
}
@Override
public void onReceive(Object msg) {
if (msg.equals("run")) {
request(4);
} else if (msg instanceof OnNext) {
probe.tell(((OnNext) msg).element(), getSelf());
} else if (msg == ActorSubscriberMessage.onCompleteInstance()) {
probe.tell("done", getSelf());
getContext().stop(getSelf());
} else if (msg instanceof OnError) {
probe.tell("err", getSelf());
getContext().stop(getSelf());
} else {
unhandled(msg);
}
}
}
final ActorSystem system = actorSystemResource.getSystem();
final MaterializerSettings settings = MaterializerSettings.create().withDispatcher("akka.test.stream-dispatcher");
final FlowMaterializer materializer = FlowMaterializer.create(settings, system);
@Test
public void mustHaveJavaAPI() {
final JavaTestKit probe = new JavaTestKit(system);
final ActorRef ref = system.actorOf(Props.create(TestSubscriber.class, probe.getRef()).withDispatcher(
"akka.test.stream-dispatcher"));
final Subscriber<Integer> subscriber = UntypedActorSubscriber.create(ref);
final java.util.Iterator<Integer> input = Arrays.asList(1, 2, 3).iterator();
Flow.create(input).produceTo(subscriber, materializer);
ref.tell("run", null);
probe.expectMsgEquals(1);
probe.expectMsgEquals(2);
probe.expectMsgEquals(3);
probe.expectMsgEquals("done");
}
}

View file

@ -6,7 +6,7 @@ package akka.stream
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.ScriptedTest
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current random }
import akka.stream.actor.ActorSubscriber.{ OnNext, OnComplete }
import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnComplete }
import akka.stream.impl.RequestMore
class FlowTakeSpec extends AkkaSpec with ScriptedTest {

View file

@ -10,7 +10,6 @@ import akka.actor.PoisonPill
import akka.actor.Props
import akka.stream.FlowMaterializer
import akka.stream.MaterializerSettings
import akka.stream.actor.ActorSubscriber.WatermarkRequestStrategy
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
@ -32,6 +31,7 @@ object ActorPublisherSpec {
class TestPublisher(probe: ActorRef) extends ActorPublisher[String] {
import ActorPublisher._
import ActorPublisherMessage._
def receive = {
case Request(element) probe ! TotalDemand(totalDemand)
@ -45,8 +45,7 @@ object ActorPublisherSpec {
def senderProps: Props = Props[Sender].withDispatcher("akka.test.stream-dispatcher")
class Sender extends ActorPublisher[Int] {
import ActorPublisher.Cancel
import ActorPublisher.Request
import ActorPublisherMessage._
var buf = Vector.empty[Int]
@ -76,7 +75,7 @@ object ActorPublisherSpec {
Props(new Receiver(probe)).withDispatcher("akka.test.stream-dispatcher")
class Receiver(probe: ActorRef) extends ActorSubscriber {
import ActorSubscriber._
import ActorSubscriberMessage._
override val requestStrategy = WatermarkRequestStrategy(10)

View file

@ -10,7 +10,6 @@ import akka.stream.FlowMaterializer
import akka.stream.MaterializerSettings
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
import akka.stream.actor.ActorSubscriber.RequestStrategy
import akka.actor.Actor
import akka.routing.ActorRefRoutee
import akka.routing.Router
@ -24,7 +23,7 @@ object ActorSubscriberSpec {
Props(new ManualSubscriber(probe)).withDispatcher("akka.test.stream-dispatcher")
class ManualSubscriber(probe: ActorRef) extends ActorSubscriber {
import ActorSubscriber._
import ActorSubscriberMessage._
override val requestStrategy = ZeroRequestStrategy
@ -42,7 +41,7 @@ object ActorSubscriberSpec {
Props(new RequestStrategySubscriber(probe, strat)).withDispatcher("akka.test.stream-dispatcher")
class RequestStrategySubscriber(probe: ActorRef, strat: RequestStrategy) extends ActorSubscriber {
import ActorSubscriber._
import ActorSubscriberMessage._
override val requestStrategy = strat
@ -61,7 +60,7 @@ object ActorSubscriberSpec {
Props(new Streamer).withDispatcher("akka.test.stream-dispatcher")
class Streamer extends ActorSubscriber {
import ActorSubscriber._
import ActorSubscriberMessage._
var queue = Map.empty[Int, ActorRef]
val router = {
@ -98,7 +97,7 @@ object ActorSubscriberSpec {
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorSubscriberSpec extends AkkaSpec with ImplicitSender {
import ActorSubscriberSpec._
import ActorSubscriber._
import ActorSubscriberMessage._
implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))