diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 685823549e..82e894a708 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -75,7 +75,6 @@ public class FlowTest { }, system.dispatcher()); probe.expectMsgEquals("de"); - } @Test diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java new file mode 100644 index 0000000000..cf97664aa4 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.javadsl; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.dispatch.Foreach; +import akka.dispatch.Futures; +import akka.dispatch.OnSuccess; +import akka.japi.Pair; +import akka.japi.Util; +import akka.stream.FlowMaterializer; +import akka.stream.MaterializerSettings; +import akka.stream.OverflowStrategy; +import akka.stream.Transformer; +import akka.stream.javadsl.japi.*; +import akka.stream.testkit.AkkaSpec; +import akka.testkit.JavaTestKit; +import org.junit.ClassRule; +import org.junit.Test; +import org.reactivestreams.Publisher; +import scala.Option; +import scala.collection.immutable.Seq; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; +import scala.runtime.BoxedUnit; +import scala.util.Try; + +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class SinkTest { + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest", + AkkaSpec.testConf()); + + final ActorSystem system = actorSystemResource.getSystem(); + + final MaterializerSettings settings = new MaterializerSettings(2, 4, 2, 4, "akka.test.stream-dispatcher"); + final FlowMaterializer materializer = FlowMaterializer.create(settings, system); + + @Test + public void mustBeAbleToUseFanoutPublisher() throws Exception { + KeyedSink> pubSink = Sink.fanoutPublisher(2, 2); + Publisher publisher = Source.from(new ArrayList()).runWith(pubSink, materializer); + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 5579b38c8d..f88a90e0e2 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -92,7 +92,7 @@ object Sink { * A `Sink` that materializes into a [[org.reactivestreams.Publisher]] * that can handle more than one [[org.reactivestreams.Subscriber]]. */ - def fanoutPublisher[T](initialBufferSize: Int, maximumBufferSize: Int): KeyedSink[Publisher[T], T] = + def fanoutPublisher[T](initialBufferSize: Int, maximumBufferSize: Int): KeyedSink[T, Publisher[T]] = new KeyedSink(scaladsl.Sink.fanoutPublisher(initialBufferSize, maximumBufferSize)) /** @@ -116,7 +116,7 @@ object Sink { * function evaluation when the input stream ends, or completed with `Failure` * if there is an error is signaled in the stream. */ - def fold[U, T](zero: U, f: Function[akka.japi.Pair[U, T], U]): KeyedSink[T, U] = { + def fold[U, T](zero: U, f: Function[akka.japi.Pair[U, T], U]): KeyedSink[T, Future[U]] = { val sSink = scaladsl.Sink.fold[U, T](zero) { case (a, b) ⇒ f.apply(akka.japi.Pair(a, b)) } new KeyedSink(sSink) }