=str #16658 tickSource in JavaDSL must expose keyed source / cancellable

This commit is contained in:
Konrad Malawski 2015-01-22 10:57:42 +01:00
parent 18e2d50aa9
commit b23d459eeb
3 changed files with 12 additions and 11 deletions

View file

@ -4,6 +4,7 @@
package akka.stream.javadsl;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.dispatch.Foreach;
import akka.dispatch.Futures;
import akka.dispatch.OnSuccess;
@ -414,12 +415,13 @@ public class SourceTest extends StreamTest {
return "tick-" + (count++);
}
};
Source.from(FiniteDuration.create(1, TimeUnit.SECONDS), FiniteDuration.create(500, TimeUnit.MILLISECONDS), tick)
.foreach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}, materializer);
KeyedSource<String, Cancellable> tickSource = Source.from(FiniteDuration.create(1, TimeUnit.SECONDS), FiniteDuration.create(500, TimeUnit.MILLISECONDS), tick);
MaterializedMap map = tickSource.to(Sink.foreach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
})).run(materializer);
Cancellable cancellable = map.get(tickSource); // validates we can obtain the cancellable
probe.expectNoMsg(FiniteDuration.create(600, TimeUnit.MILLISECONDS));
probe.expectMsgEquals("tick-1");
probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));

View file

@ -3,7 +3,7 @@
*/
package akka.stream.scaladsl
import java.io.{File, FileInputStream}
import java.io.{ File, FileInputStream }
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current random }

View file

@ -4,8 +4,7 @@
package akka.stream.javadsl
import java.util.concurrent.Callable
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.{ Cancellable, ActorRef, Props }
import akka.japi.Util
import akka.stream._
import akka.stream.scaladsl.PropsSource
@ -102,8 +101,8 @@ object Source {
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Callable[O]): javadsl.Source[O] =
new Source(scaladsl.Source(initialDelay, interval, () tick.call()))
def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Callable[O]): javadsl.KeyedSource[O, Cancellable] =
new KeyedSource(scaladsl.Source(initialDelay, interval, () tick.call()))
/**
* Creates a `Source` by using a [[FlowGraphBuilder]] from this [[PartialFlowGraph]] on a block that expects