!str #15026 change thunk publisher api to () => Option[T]

This commit is contained in:
Patrik Nordwall 2014-08-26 14:03:47 +02:00
parent 9dd281428d
commit 2ef9962eb0
7 changed files with 32 additions and 32 deletions

View file

@ -4,11 +4,9 @@
package akka.stream.impl package akka.stream.impl
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
import akka.stream.{ MaterializerSettings, Stop } import akka.stream.{ MaterializerSettings }
import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration

View file

@ -5,7 +5,6 @@ package akka.stream.impl
import akka.actor.Props import akka.actor.Props
import akka.stream.MaterializerSettings import akka.stream.MaterializerSettings
import akka.stream.Stop
/** /**
* INTERNAL API * INTERNAL API

View file

@ -1,18 +1,15 @@
/** /**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.stream package akka.stream.impl
import scala.util.control.NoStackTrace import scala.util.control.ControlThrowable
/** /**
* INTERNAL API
*
* This exception must be thrown from a callback-based stream publisher to * This exception must be thrown from a callback-based stream publisher to
* signal the end of stream (if the produced stream is not infinite). This is used for example in * signal the end of stream (if the produced stream is not infinite). This is used for example in
* [[akka.stream.scaladsl.Flow#apply]] (the variant which takes a closure). * [[akka.stream.scaladsl.Flow#apply]] (the variant which takes a closure).
*/ */
case object Stop extends RuntimeException("Stop this flow") with NoStackTrace { private[akka] case object Stop extends ControlThrowable
/**
* Java API: get the singleton instance
*/
def getInstance = this
}

View file

@ -10,7 +10,12 @@ import scala.concurrent.Future
import scala.util.Failure import scala.util.Failure
import scala.util.Success import scala.util.Success
import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.{ Publisher, Subscriber }
import akka.japi._ import akka.japi.Creator
import akka.japi.Function
import akka.japi.Function2
import akka.japi.Pair
import akka.japi.Predicate
import akka.japi.Procedure
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import akka.stream._ import akka.stream._
import akka.stream.scaladsl.{ Flow SFlow } import akka.stream.scaladsl.{ Flow SFlow }
@ -55,11 +60,10 @@ object Flow {
/** /**
* Define the sequence of elements to be produced by the given Callable. * Define the sequence of elements to be produced by the given Callable.
* The stream ends normally when evaluation of the Callable results in * The stream ends normally when evaluation of the `Callable` returns a `null`.
* a [[akka.stream.Stop]] exception being thrown; it ends exceptionally * The stream ends exceptionally when an exception is thrown from the `Callable`.
* when any other exception is thrown.
*/ */
def create[T](block: Callable[T]): Flow[T] = new FlowAdapter(SFlow.apply(() block.call())) def create[T](block: Callable[T]): Flow[T] = new FlowAdapter(SFlow.apply(() Option(block.call())))
/** /**
* Elements are produced from the tick `Callable` periodically with the specified interval. * Elements are produced from the tick `Callable` periodically with the specified interval.

View file

@ -14,6 +14,7 @@ import akka.stream.impl.FlowImpl
import akka.stream.impl.Ast.TickPublisherNode import akka.stream.impl.Ast.TickPublisherNode
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.stream.impl.Stop
/** /**
* Scala API * Scala API
@ -46,11 +47,11 @@ object Flow {
/** /**
* Define the sequence of elements to be produced by the given closure. * Define the sequence of elements to be produced by the given closure.
* The stream ends normally when evaluation of the closure results in * The stream ends normally when evaluation of the `Callable` returns a `None`.
* a [[akka.stream.Stop]] exception being thrown; it ends exceptionally * The stream ends exceptionally when an exception is thrown from the `Callable`.
* when any other exception is thrown.
*/ */
def apply[T](f: () T): Flow[T] = FlowImpl(ThunkPublisherNode(f), Nil) def apply[T](f: () Option[T]): Flow[T] =
FlowImpl(ThunkPublisherNode(() f().getOrElse(throw Stop)), Nil)
/** /**
* Start a new `Flow` from the given `Future`. The stream will consist of * Start a new `Flow` from the given `Future`. The stream will consist of

View file

@ -333,7 +333,7 @@ public class FlowTest {
@Override @Override
public Integer call() { public Integer call() {
if (countdown == 0) if (countdown == 0)
throw akka.stream.Stop.getInstance(); return null;
else { else {
countdown -= 1; countdown -= 1;
return countdown; return countdown;
@ -358,7 +358,7 @@ public class FlowTest {
Flow.create(input).onComplete(new OnCompleteCallback() { Flow.create(input).onComplete(new OnCompleteCallback() {
@Override @Override
public void onComplete(Throwable e) { public void onComplete(Throwable e) {
probe.getRef().tell( (e == null) ? "done" : e, ActorRef.noSender()); probe.getRef().tell((e == null) ? "done" : e, ActorRef.noSender());
} }
}, materializer); }, materializer);
@ -490,11 +490,12 @@ public class FlowTest {
return "tick-" + (count++); return "tick-" + (count++);
} }
}; };
Flow.create(FiniteDuration.create(1, TimeUnit.SECONDS), FiniteDuration.create(500, TimeUnit.MILLISECONDS), tick).foreach(new Procedure<String>() { Flow.create(FiniteDuration.create(1, TimeUnit.SECONDS), FiniteDuration.create(500, TimeUnit.MILLISECONDS), tick)
public void apply(String elem) { .foreach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender()); probe.getRef().tell(elem, ActorRef.noSender());
} }
}, materializer); }, materializer);
probe.expectNoMsg(FiniteDuration.create(600, TimeUnit.MILLISECONDS)); probe.expectNoMsg(FiniteDuration.create(600, TimeUnit.MILLISECONDS));
probe.expectMsgEquals("tick-1"); probe.expectMsgEquals("tick-1");
probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));

View file

@ -320,11 +320,11 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
"if an internal error occurs future subscribers' onError should be called instead of onSubscribed" in pending "if an internal error occurs future subscribers' onError should be called instead of onSubscribed" in pending
"be covariant" in { "be covariant" in {
val f1: Flow[Fruit] = Flow(() new Apple) val f1: Flow[Fruit] = Flow(() Some(new Apple))
val p1: Publisher[Fruit] = Flow(() new Apple).toPublisher() val p1: Publisher[Fruit] = Flow(() Some(new Apple)).toPublisher()
val f2: Flow[Publisher[Fruit]] = Flow(() new Apple).splitWhen(_ true) val f2: Flow[Publisher[Fruit]] = Flow(() Some(new Apple)).splitWhen(_ true)
val f3: Flow[(Boolean, Publisher[Fruit])] = Flow(() new Apple).groupBy(_ true) val f3: Flow[(Boolean, Publisher[Fruit])] = Flow(() Some(new Apple)).groupBy(_ true)
val f4: Flow[(immutable.Seq[Apple], Publisher[Fruit])] = Flow(() new Apple).prefixAndTail(1) val f4: Flow[(immutable.Seq[Apple], Publisher[Fruit])] = Flow(() Some(new Apple)).prefixAndTail(1)
} }
} }