diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index b6df362cdc..952753c9cb 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -4,6 +4,7 @@ package akka.stream.javadsl; import akka.actor.ActorRef; +import akka.actor.Cancellable; import akka.dispatch.Foreach; import akka.dispatch.Futures; import akka.dispatch.OnSuccess; @@ -414,12 +415,13 @@ public class SourceTest extends StreamTest { return "tick-" + (count++); } }; - Source.from(FiniteDuration.create(1, TimeUnit.SECONDS), FiniteDuration.create(500, TimeUnit.MILLISECONDS), tick) - .foreach(new Procedure() { - public void apply(String elem) { - probe.getRef().tell(elem, ActorRef.noSender()); - } - }, materializer); + KeyedSource tickSource = Source.from(FiniteDuration.create(1, TimeUnit.SECONDS), FiniteDuration.create(500, TimeUnit.MILLISECONDS), tick); + MaterializedMap map = tickSource.to(Sink.foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + })).run(materializer); + Cancellable cancellable = map.get(tickSource); // validates we can obtain the cancellable probe.expectNoMsg(FiniteDuration.create(600, TimeUnit.MILLISECONDS)); probe.expectMsgEquals("tick-1"); probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCollectSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCollectSpec.scala index f9cb782416..d84b8fa8f3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCollectSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCollectSpec.scala @@ -3,7 +3,7 @@ */ package akka.stream.scaladsl -import java.io.{File, FileInputStream} +import java.io.{ File, FileInputStream } import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index fcc0ca8d7c..6c5751ccb3 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -4,8 +4,7 @@ package akka.stream.javadsl import java.util.concurrent.Callable -import akka.actor.ActorRef -import akka.actor.Props +import akka.actor.{ Cancellable, ActorRef, Props } import akka.japi.Util import akka.stream._ import akka.stream.scaladsl.PropsSource @@ -102,8 +101,8 @@ object Source { * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ - def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Callable[O]): javadsl.Source[O] = - new Source(scaladsl.Source(initialDelay, interval, () ⇒ tick.call())) + def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Callable[O]): javadsl.KeyedSource[O, Cancellable] = + new KeyedSource(scaladsl.Source(initialDelay, interval, () ⇒ tick.call())) /** * Creates a `Source` by using a [[FlowGraphBuilder]] from this [[PartialFlowGraph]] on a block that expects