!str #16492 Remove closure in TickSource to make it shareable

This commit is contained in:
Patrik Nordwall 2015-01-26 14:16:57 +01:00
parent af4555ce1f
commit 631b4ca5ac
8 changed files with 37 additions and 60 deletions

View file

@ -48,10 +48,10 @@ class StreamBuffersRateSpec extends AkkaSpec {
val zipper = ZipWith[Tick, Int, Int]((tick, count) => count)
Source(initialDelay = 1.second, interval = 1.second, () => "message!")
Source(initialDelay = 1.second, interval = 1.second, "message!")
.conflate(seed = (_) => 1)((count, _) => count + 1) ~> zipper.right
Source(initialDelay = 3.second, interval = 3.second, () => Tick()) ~> zipper.left
Source(initialDelay = 3.second, interval = 3.second, Tick()) ~> zipper.left
zipper.out ~> Sink.foreach(println)
}

View file

@ -407,15 +407,8 @@ public class SourceTest extends StreamTest {
@Test
public void mustProduceTicks() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Callable<String> tick = new Callable<String>() {
private int count = 1;
@Override
public String call() {
return "tick-" + (count++);
}
};
KeyedSource<String, Cancellable> tickSource = Source.from(FiniteDuration.create(1, TimeUnit.SECONDS), FiniteDuration.create(500, TimeUnit.MILLISECONDS), tick);
KeyedSource<String, Cancellable> tickSource = Source.from(FiniteDuration.create(1, TimeUnit.SECONDS),
FiniteDuration.create(500, TimeUnit.MILLISECONDS), "tick");
MaterializedMap map = tickSource.to(Sink.foreach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
@ -423,9 +416,9 @@ public class SourceTest extends StreamTest {
})).run(materializer);
Cancellable cancellable = map.get(tickSource); // validates we can obtain the cancellable
probe.expectNoMsg(FiniteDuration.create(600, TimeUnit.MILLISECONDS));
probe.expectMsgEquals("tick-1");
probe.expectMsgEquals("tick");
probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
probe.expectMsgEquals("tick-2");
probe.expectMsgEquals("tick");
probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
}

View file

@ -29,8 +29,8 @@ class GraphJunctionAttributesSpec extends AkkaSpec {
val source = Source[(SlowTick, List[FastTick])]() { implicit b
import FlowGraphImplicits._
val slow = Source(0.seconds, 100.millis, () SlowTick)
val fast = Source(0.seconds, 10.millis, () FastTick)
val slow = Source(0.seconds, 100.millis, SlowTick)
val fast = Source(0.seconds, 10.millis, FastTick)
val sink = UndefinedSink[(SlowTick, List[FastTick])]
val zip = Zip[SlowTick, List[FastTick]](inputBuffer(1, 1))

View file

@ -5,10 +5,10 @@ package akka.stream.scaladsl
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.stream.FlowMaterializer
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.stream.MaterializerSettings
class TickSourceSpec extends AkkaSpec {
@ -16,42 +16,39 @@ class TickSourceSpec extends AkkaSpec {
"A Flow based on tick publisher" must {
"produce ticks" in {
val tickGen = Iterator from 1
val c = StreamTestKit.SubscriberProbe[String]()
Source(1.second, 500.millis, () "tick-" + tickGen.next()).to(Sink(c)).run()
Source(1.second, 500.millis, "tick").to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(3)
c.expectNoMsg(600.millis)
c.expectNext("tick-1")
c.expectNext("tick")
c.expectNoMsg(200.millis)
c.expectNext("tick-2")
c.expectNext("tick")
c.expectNoMsg(200.millis)
c.expectNext("tick-3")
c.expectNext("tick")
sub.cancel()
c.expectNoMsg(200.millis)
}
"drop ticks when not requested" in {
val tickGen = Iterator from 1
val c = StreamTestKit.SubscriberProbe[String]()
Source(1.second, 1.second, () "tick-" + tickGen.next()).to(Sink(c)).run()
Source(1.second, 1.second, "tick").to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(2)
c.expectNext("tick-1")
c.expectNext("tick")
c.expectNoMsg(200.millis)
c.expectNext("tick-2")
c.expectNext("tick")
c.expectNoMsg(1400.millis)
sub.request(2)
c.expectNext("tick-4")
c.expectNext("tick")
c.expectNoMsg(200.millis)
c.expectNext("tick-5")
c.expectNext("tick")
sub.cancel()
c.expectNoMsg(200.millis)
}
"reject multiple subscribers, but keep the first" in {
val tickGen = Iterator from 1
val p = Source(1.second, 1.second, () "tick-" + tickGen.next()).runWith(Sink.publisher)
val p = Source(1.second, 1.second, "tick").runWith(Sink.publisher)
val c1 = StreamTestKit.SubscriberProbe[String]()
val c2 = StreamTestKit.SubscriberProbe[String]()
p.subscribe(c1)
@ -59,24 +56,13 @@ class TickSourceSpec extends AkkaSpec {
val sub1 = c1.expectSubscription()
c2.expectError()
sub1.request(1)
c1.expectNext("tick-1")
c1.expectNext("tick")
c1.expectNoMsg(200.millis)
sub1.request(2)
c1.expectNext("tick-2")
c1.expectNext("tick")
sub1.cancel()
}
"signal onError when tick closure throws" in {
val c = StreamTestKit.SubscriberProbe[String]()
val tickSource = Source[String](1.second, 1.second, () throw new RuntimeException("tick err") with NoStackTrace)
val m = tickSource.to(Sink(c)).run()
val cancellable = m.get(tickSource)
val sub = c.expectSubscription()
sub.request(3)
c.expectError.getMessage should be("tick err")
awaitCond(cancellable.isCancelled)
}
"be usable with zip for a simple form of rate limiting" in {
val c = StreamTestKit.SubscriberProbe[Int]()
@ -84,7 +70,7 @@ class TickSourceSpec extends AkkaSpec {
import FlowGraphImplicits._
val zip = Zip[Int, String]
Source(1 to 100) ~> zip.left
Source(1.second, 1.second, () "tick") ~> zip.right
Source(1.second, 1.second, "tick") ~> zip.right
zip.out ~> Flow[(Int, String)].map { case (n, _) n } ~> Sink(c)
}.run()
@ -98,19 +84,18 @@ class TickSourceSpec extends AkkaSpec {
}
"be possible to cancel" in {
val tickGen = Iterator from 1
val c = StreamTestKit.SubscriberProbe[String]()
val tickSource = Source(1.second, 500.millis, () "tick-" + tickGen.next())
val tickSource = Source(1.second, 500.millis, "tick")
val m = tickSource.to(Sink(c)).run()
val cancellable = m.get(tickSource)
val sub = c.expectSubscription()
sub.request(3)
c.expectNoMsg(600.millis)
c.expectNext("tick-1")
c.expectNext("tick")
c.expectNoMsg(200.millis)
c.expectNext("tick-2")
c.expectNext("tick")
c.expectNoMsg(200.millis)
c.expectNext("tick-3")
c.expectNext("tick")
cancellable.cancel()
awaitCond(cancellable.isCancelled)
sub.request(3)

View file

@ -17,7 +17,7 @@ import scala.util.control.NonFatal
* INTERNAL API
*/
private[akka] object TickPublisher {
def props(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Any,
def props(initialDelay: FiniteDuration, interval: FiniteDuration, tick: Any,
settings: MaterializerSettings, cancelled: AtomicBoolean): Props =
Props(new TickPublisher(initialDelay, interval, tick, settings, cancelled)).withDispatcher(settings.dispatcher)
@ -39,11 +39,11 @@ private[akka] object TickPublisher {
/**
* INTERNAL API
*
* Elements are produced from the tick closure periodically with the specified interval. Supports only one subscriber.
* Elements are emitted with the specified interval. Supports only one subscriber.
* The subscriber will receive the tick element if it has requested any elements,
* otherwise the tick element is dropped.
*/
private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Any,
private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: FiniteDuration, tick: Any,
settings: MaterializerSettings, cancelled: AtomicBoolean) extends Actor with SoftShutdown {
import akka.stream.impl.TickPublisher.TickPublisherSubscription._
import akka.stream.impl.TickPublisher._
@ -86,10 +86,9 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
def active: Receive = {
case Tick
try {
val tickElement = tick() // FIXME should we call this even if we shouldn't send it?
if (demand > 0) {
demand -= 1
tryOnNext(subscriber, tickElement)
tryOnNext(subscriber, tick)
}
} catch {
case NonFatal(e) handleError(e)

View file

@ -95,14 +95,14 @@ object Source {
new Source(scaladsl.Source(future))
/**
* Elements are produced from the tick closure periodically with the specified interval.
* Elements are emitted periodically with the specified interval.
* The tick element will be delivered to downstream consumers that has requested any elements.
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Callable[O]): javadsl.KeyedSource[O, Cancellable] =
new KeyedSource(scaladsl.Source(initialDelay, interval, () tick.call()))
def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: O): javadsl.KeyedSource[O, Cancellable] =
new KeyedSource(scaladsl.Source(initialDelay, interval, tick))
/**
* Creates a `Source` by using a [[FlowGraphBuilder]] from this [[PartialFlowGraph]] on a block that expects

View file

@ -151,13 +151,13 @@ final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowS
}
/**
* Elements are produced from the tick closure periodically with the specified interval.
* Elements are emitted periodically with the specified interval.
* The tick element will be delivered to downstream consumers that has requested any elements.
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Out) extends KeyedActorFlowSource[Out, Cancellable] { // FIXME Why does this have anything to do with Actors?
final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Out) extends KeyedActorFlowSource[Out, Cancellable] { // FIXME Why does this have anything to do with Actors?
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = {
val (pub, cancellable) = create(materializer, flowName)
pub.subscribe(flowSubscriber)

View file

@ -130,13 +130,13 @@ object Source {
def apply[T](future: Future[T]): Source[T] = FutureSource(future)
/**
* Elements are produced from the tick closure periodically with the specified interval.
* Elements are emitted periodically with the specified interval.
* The tick element will be delivered to downstream consumers that has requested any elements.
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): TickSource[T] =
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): TickSource[T] =
TickSource(initialDelay, interval, tick)
/**