Merge pull request #16239 from akka/wip-16235-rename-Sink.future-patriknw
!str #16235 Rename Sink.future to Sink.head
This commit is contained in:
commit
d9d905071b
28 changed files with 83 additions and 83 deletions
|
|
@ -59,7 +59,7 @@ sealed trait HttpEntity extends japi.HttpEntity {
|
|||
throw new java.util.concurrent.TimeoutException(
|
||||
s"HttpEntity.toStrict timed out after $timeout while still waiting for outstanding data")
|
||||
}
|
||||
dataBytes.timerTransform("toStrict", transformer).runWith(Sink.future)
|
||||
dataBytes.timerTransform("toStrict", transformer).runWith(Sink.head)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -108,7 +108,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
|
|||
private val HttpRequest(POST, uri, List(`User-Agent`(_), Host(_, _), Accept(Vector(MediaRanges.`*/*`))),
|
||||
Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext()
|
||||
uri shouldEqual Uri(s"http://$hostname:$port/chunked")
|
||||
Await.result(chunkStream.grouped(4).runWith(Sink.future), 100.millis) shouldEqual chunks
|
||||
Await.result(chunkStream.grouped(4).runWith(Sink.head), 100.millis) shouldEqual chunks
|
||||
|
||||
val serverOutSub = serverOut.expectSubscription()
|
||||
serverOutSub.sendNext(HttpResponse(206, List(RawHeader("Age", "42")), chunkedEntity))
|
||||
|
|
@ -117,7 +117,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
|
|||
clientInSub.request(1)
|
||||
val (HttpResponse(StatusCodes.PartialContent, List(Date(_), Server(_), RawHeader("Age", "42")),
|
||||
Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`), 12345678) = clientIn.expectNext()
|
||||
Await.result(chunkStream2.grouped(1000).runWith(Sink.future), 100.millis) shouldEqual chunks
|
||||
Await.result(chunkStream2.grouped(1000).runWith(Sink.head), 100.millis) shouldEqual chunks
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ object TestClient extends App {
|
|||
Source(List(HttpRequest() -> 'NoContext))
|
||||
.to(Sink(connection.requestSubscriber))
|
||||
.run()
|
||||
Source(connection.responsePublisher).map(_._1).runWith(Sink.future)
|
||||
Source(connection.responsePublisher).map(_._1).runWith(Sink.head)
|
||||
}
|
||||
|
||||
result onComplete {
|
||||
|
|
|
|||
|
|
@ -437,7 +437,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|
|||
}
|
||||
.flatten(FlattenStrategy.concat)
|
||||
.map(strictEqualify)
|
||||
.grouped(1000).runWith(Sink.future)
|
||||
.grouped(1000).runWith(Sink.head)
|
||||
Await.result(future, 250.millis)
|
||||
}
|
||||
|
||||
|
|
@ -451,7 +451,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|
|||
}
|
||||
|
||||
private def compactEntityChunks(data: Source[ChunkStreamPart]): Future[Seq[ChunkStreamPart]] =
|
||||
data.grouped(1000).runWith(Sink.future)
|
||||
data.grouped(1000).runWith(Sink.head)
|
||||
.fast.recover { case _: NoSuchElementException ⇒ Nil }
|
||||
|
||||
def prep(response: String) = response.stripMarginWithNewline("\r\n")
|
||||
|
|
|
|||
|
|
@ -272,7 +272,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|
|||
}
|
||||
.flatten(FlattenStrategy.concat)
|
||||
.map(strictEqualify)
|
||||
.grouped(1000).runWith(Sink.future)
|
||||
.grouped(1000).runWith(Sink.head)
|
||||
Await.result(future, 500.millis)
|
||||
}
|
||||
|
||||
|
|
@ -290,7 +290,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|
|||
}
|
||||
|
||||
private def compactEntityChunks(data: Source[ChunkStreamPart]): Future[Source[ChunkStreamPart]] =
|
||||
data.grouped(1000).runWith(Sink.future)
|
||||
data.grouped(1000).runWith(Sink.head)
|
||||
.fast.map(source(_: _*))
|
||||
.fast.recover { case _: NoSuchElementException ⇒ source() }
|
||||
|
||||
|
|
|
|||
|
|
@ -253,7 +253,7 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
|
|||
equal(expected.stripMarginWithNewline("\r\n")).matcher[String] compose { request ⇒
|
||||
val renderer = newRenderer
|
||||
val byteStringSource :: Nil = renderer.onNext(RequestRenderingContext(request, serverAddress))
|
||||
val future = byteStringSource.grouped(1000).runWith(Sink.future).map(_.reduceLeft(_ ++ _).utf8String)
|
||||
val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String)
|
||||
Await.result(future, 250.millis)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -403,7 +403,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
|
|||
equal(expected.stripMarginWithNewline("\r\n") -> close).matcher[(String, Boolean)] compose { ctx ⇒
|
||||
val renderer = newRenderer
|
||||
val byteStringSource :: Nil = renderer.onNext(ctx)
|
||||
val future = byteStringSource.grouped(1000).runWith(Sink.future).map(_.reduceLeft(_ ++ _).utf8String)
|
||||
val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String)
|
||||
Await.result(future, 250.millis) -> renderer.isComplete
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -107,7 +107,7 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll {
|
|||
|
||||
def collectBytesTo(bytes: ByteString*): Matcher[HttpEntity] =
|
||||
equal(bytes.toVector).matcher[Seq[ByteString]].compose { entity ⇒
|
||||
val future = entity.dataBytes.grouped(1000).runWith(Sink.future)
|
||||
val future = entity.dataBytes.grouped(1000).runWith(Sink.head)
|
||||
Await.result(future, 250.millis)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -95,6 +95,6 @@ trait RouteTestResultComponent {
|
|||
failTest("Request was neither completed nor rejected within " + timeout)
|
||||
|
||||
private def awaitAllElements[T](data: Source[T]): immutable.Seq[T] =
|
||||
Await.result(data.grouped(Int.MaxValue).runWith(Sink.future), timeout)
|
||||
Await.result(data.grouped(Int.MaxValue).runWith(Sink.head), timeout)
|
||||
}
|
||||
}
|
||||
|
|
@ -213,7 +213,7 @@ class UnmarshallingSpec extends FreeSpec with Matchers with BeforeAndAfterAll wi
|
|||
def haveParts[T <: Multipart](parts: Multipart.BodyPart*): Matcher[Future[T]] =
|
||||
equal(parts).matcher[Seq[Multipart.BodyPart]] compose { x ⇒
|
||||
Await.result(x
|
||||
.fast.flatMap(x ⇒ x.parts.grouped(100).runWith(Sink.future))
|
||||
.fast.flatMap(x ⇒ x.parts.grouped(100).runWith(Sink.head))
|
||||
.fast.recover { case _: NoSuchElementException ⇒ Nil }, 1.second)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ public class FlexiMergeTest {
|
|||
.addEdge(merge.out(), out1).build().run(materializer);
|
||||
|
||||
final Publisher<String> pub = m.get(out1);
|
||||
final Future<List<String>> all = Source.from(pub).grouped(100).runWith(Sink.<List<String>>future(), materializer);
|
||||
final Future<List<String>> all = Source.from(pub).grouped(100).runWith(Sink.<List<String>>head(), materializer);
|
||||
final List<String> result = Await.result(all, Duration.apply(3, TimeUnit.SECONDS));
|
||||
assertEquals(
|
||||
new HashSet<String>(Arrays.asList("a", "b", "c", "d", "e", "f")),
|
||||
|
|
@ -60,7 +60,7 @@ public class FlexiMergeTest {
|
|||
.addEdge(merge.out(), out1).build().run(materializer);
|
||||
|
||||
final Publisher<String> pub = m.get(out1);
|
||||
final Future<List<String>> all = Source.from(pub).grouped(100).runWith(Sink.<List<String>>future(), materializer);
|
||||
final Future<List<String>> all = Source.from(pub).grouped(100).runWith(Sink.<List<String>>head(), materializer);
|
||||
final List<String> result = Await.result(all, Duration.apply(3, TimeUnit.SECONDS));
|
||||
assertEquals(Arrays.asList("a", "e", "b", "f", "c", "d"), result);
|
||||
}
|
||||
|
|
@ -78,7 +78,7 @@ public class FlexiMergeTest {
|
|||
|
||||
final Publisher<Pair<Integer, String>> pub = m.get(out);
|
||||
final Future<List<Pair<Integer, String>>> all = Source.from(pub).grouped(100).
|
||||
runWith(Sink.<List<Pair<Integer, String>>>future(), materializer);
|
||||
runWith(Sink.<List<Pair<Integer, String>>>head(), materializer);
|
||||
final List<Pair<Integer, String>> result = Await.result(all, Duration.apply(3, TimeUnit.SECONDS));
|
||||
assertEquals(
|
||||
Arrays.asList(new Pair(1, "a"), new Pair(2, "b"), new Pair(3, "c")),
|
||||
|
|
|
|||
|
|
@ -32,8 +32,8 @@ public class FlexiRouteTest {
|
|||
|
||||
final Source<String> in = Source.from(Arrays.asList("a", "b", "c", "d", "e"));
|
||||
|
||||
final KeyedSink<List<String>, Future<List<String>>> out1 = Sink.<List<String>>future();
|
||||
final KeyedSink<List<String>, Future<List<String>>> out2 = Sink.<List<String>>future();
|
||||
final KeyedSink<List<String>, Future<List<String>>> out1 = Sink.<List<String>>head();
|
||||
final KeyedSink<List<String>, Future<List<String>>> out2 = Sink.<List<String>>head();
|
||||
|
||||
@Test
|
||||
public void mustBuildSimpleFairRoute() throws Exception {
|
||||
|
|
@ -71,8 +71,8 @@ public class FlexiRouteTest {
|
|||
Source<Pair<Integer, String>> input = Source.from(Arrays.<Pair<Integer, String>>asList(new Pair(1, "A"), new Pair(
|
||||
2, "B"), new Pair(3, "C"), new Pair(4, "D")));
|
||||
|
||||
final KeyedSink<List<Integer>, Future<List<Integer>>> outA = Sink.<List<Integer>>future();
|
||||
final KeyedSink<List<String>, Future<List<String>>> outB = Sink.<List<String>>future();
|
||||
final KeyedSink<List<Integer>, Future<List<Integer>>> outA = Sink.<List<Integer>>head();
|
||||
final KeyedSink<List<String>, Future<List<String>>> outB = Sink.<List<String>>head();
|
||||
|
||||
MaterializedMap m = FlowGraph.builder().addEdge(input, unzip.in())
|
||||
.addEdge(unzip.outputA, Flow.of(Integer.class).grouped(100), outA)
|
||||
|
|
|
|||
|
|
@ -327,7 +327,7 @@ public class FlowTest {
|
|||
|
||||
// collecting
|
||||
final Publisher<String> pub = m.get(publisher);
|
||||
final Future<List<String>> all = Source.from(pub).grouped(100).runWith(Sink.<List<String>> future(), materializer);
|
||||
final Future<List<String>> all = Source.from(pub).grouped(100).runWith(Sink.<List<String>>head(), materializer);
|
||||
|
||||
final List<String> result = Await.result(all, Duration.apply(200, TimeUnit.MILLISECONDS));
|
||||
assertEquals(new HashSet<Object>(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet<String>(result));
|
||||
|
|
@ -466,7 +466,7 @@ public class FlowTest {
|
|||
public String apply(String arg0) throws Exception {
|
||||
throw new RuntimeException("simulated err");
|
||||
}
|
||||
}).runWith(Sink.<String> future(), materializer).onComplete(new OnSuccess<Try<String>>() {
|
||||
}).runWith(Sink.<String>head(), materializer).onComplete(new OnSuccess<Try<String>>() {
|
||||
@Override
|
||||
public void onSuccess(Try<String> e) throws Throwable {
|
||||
if (e == null) {
|
||||
|
|
@ -484,7 +484,7 @@ public class FlowTest {
|
|||
public void mustBeAbleToUseToFuture() throws Exception {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final Iterable<String> input = Arrays.asList("A", "B", "C");
|
||||
Future<String> future = Source.from(input).runWith(Sink.<String> future(), materializer);
|
||||
Future<String> future = Source.from(input).runWith(Sink.<String>head(), materializer);
|
||||
String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
|
||||
assertEquals("A", result);
|
||||
}
|
||||
|
|
@ -494,12 +494,12 @@ public class FlowTest {
|
|||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final Iterable<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6);
|
||||
Future<Pair<List<Integer>, Source<Integer>>> future = Source.from(input).prefixAndTail(3)
|
||||
.runWith(Sink.<Pair<List<Integer>, Source<Integer>>> future(), materializer);
|
||||
.runWith(Sink.<Pair<List<Integer>, Source<Integer>>>head(), materializer);
|
||||
Pair<List<Integer>, Source<Integer>> result = Await.result(future,
|
||||
probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
|
||||
assertEquals(Arrays.asList(1, 2, 3), result.first());
|
||||
|
||||
Future<List<Integer>> tailFuture = result.second().grouped(4).runWith(Sink.<List<Integer>> future(), materializer);
|
||||
Future<List<Integer>> tailFuture = result.second().grouped(4).runWith(Sink.<List<Integer>>head(), materializer);
|
||||
List<Integer> tailResult = Await.result(tailFuture, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
|
||||
assertEquals(Arrays.asList(4, 5, 6), tailResult);
|
||||
}
|
||||
|
|
@ -514,7 +514,7 @@ public class FlowTest {
|
|||
|
||||
Future<List<Integer>> future = Source.from(mainInputs)
|
||||
.flatten(akka.stream.javadsl.FlattenStrategy.<Integer> concat()).grouped(6)
|
||||
.runWith(Sink.<List<Integer>> future(), materializer);
|
||||
.runWith(Sink.<List<Integer>>head(), materializer);
|
||||
|
||||
List<Integer> result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
|
||||
|
||||
|
|
@ -526,7 +526,7 @@ public class FlowTest {
|
|||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final List<String> input = Arrays.asList("A", "B", "C");
|
||||
Future<List<String>> future = Source.from(input).buffer(2, OverflowStrategy.backpressure()).grouped(4)
|
||||
.runWith(Sink.<List<String>> future(), materializer);
|
||||
.runWith(Sink.<List<String>>head(), materializer);
|
||||
|
||||
List<String> result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
|
||||
assertEquals(input, result);
|
||||
|
|
@ -571,7 +571,7 @@ public class FlowTest {
|
|||
public Pair<String, String> apply(String in) throws Exception {
|
||||
return new Pair<String, String>(in, in);
|
||||
}
|
||||
}).runWith(Sink.<String> future(), materializer);
|
||||
}).runWith(Sink.<String>head(), materializer);
|
||||
String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
|
||||
assertEquals("A", result);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ public class SinkTest {
|
|||
|
||||
@Test
|
||||
public void mustBeAbleToUseFuture() throws Exception {
|
||||
final KeyedSink<Integer, Future<Integer>> futSink = Sink.future();
|
||||
final KeyedSink<Integer, Future<Integer>> futSink = Sink.head();
|
||||
final List<Integer> list = new ArrayList<Integer>();
|
||||
list.add(1);
|
||||
final Future<Integer> future = Source.from(list).runWith(futSink, materializer);
|
||||
|
|
|
|||
|
|
@ -116,7 +116,7 @@ class SslTlsFlowSpec extends AkkaSpec("akka.actor.default-mailbox.mailbox-type =
|
|||
}
|
||||
|
||||
def replyFirstLineInUpperCase(scipher: SslTlsCipher): Unit = {
|
||||
val ssessionf = Source(scipher.sessionInbound).runWith(Sink.future)
|
||||
val ssessionf = Source(scipher.sessionInbound).runWith(Sink.head)
|
||||
val ssession = Await.result(ssessionf, duration)
|
||||
val sdata = ssession.data
|
||||
Source(sdata).map(bs ⇒ ByteString(bs.decodeString("utf-8").split('\n').head.toUpperCase + '\n')).
|
||||
|
|
@ -128,11 +128,11 @@ class SslTlsFlowSpec extends AkkaSpec("akka.actor.default-mailbox.mailbox-type =
|
|||
}
|
||||
|
||||
def sendLineAndReceiveResponse(ccipher: SslTlsCipher, message: String): String = {
|
||||
val csessionf = Source(ccipher.sessionInbound).runWith(Sink.future)
|
||||
val csessionf = Source(ccipher.sessionInbound).runWith(Sink.head)
|
||||
Source(List(ByteString(message + '\n'))).runWith(Sink(ccipher.plainTextOutbound))
|
||||
val csession = Await.result(csessionf, duration)
|
||||
val cdata = csession.data
|
||||
Await.result(Source(cdata).map(_.decodeString("utf-8").split('\n').head).runWith(Sink.future), duration)
|
||||
Await.result(Source(cdata).map(_.decodeString("utf-8").split('\n').head).runWith(Sink.head), duration)
|
||||
}
|
||||
|
||||
def sendLineAndReceiveResponse(connection: JavaSslConnection, message: String): String = {
|
||||
|
|
|
|||
|
|
@ -25,14 +25,14 @@ class FlowBufferSpec extends AkkaSpec {
|
|||
|
||||
"pass elements through normally in backpressured mode" in {
|
||||
val future: Future[Seq[Int]] = Source((1 to 1000).iterator).buffer(100, overflowStrategy = OverflowStrategy.backpressure).grouped(1001).
|
||||
runWith(Sink.future)
|
||||
runWith(Sink.head)
|
||||
Await.result(future, 3.seconds) should be(1 to 1000)
|
||||
}
|
||||
|
||||
"pass elements through normally in backpressured mode with buffer size one" in {
|
||||
val futureSink = Sink.future[Seq[Int]]
|
||||
val futureSink = Sink.head[Seq[Int]]
|
||||
val future = Source((1 to 1000).iterator).buffer(1, overflowStrategy = OverflowStrategy.backpressure).grouped(1001).
|
||||
runWith(Sink.future)
|
||||
runWith(Sink.head)
|
||||
Await.result(future, 3.seconds) should be(1 to 1000)
|
||||
}
|
||||
|
||||
|
|
@ -45,7 +45,7 @@ class FlowBufferSpec extends AkkaSpec {
|
|||
.buffer(5, overflowStrategy = OverflowStrategy.backpressure)
|
||||
.buffer(128, overflowStrategy = OverflowStrategy.backpressure)
|
||||
.grouped(1001)
|
||||
.runWith(Sink.future)
|
||||
.runWith(Sink.head)
|
||||
Await.result(future, 3.seconds) should be(1 to 1000)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ class FlowCompileSpec extends AkkaSpec {
|
|||
val closedSink: Sink[String] = Flow[String].map(_.hashCode).to(Sink.publisher[Int])
|
||||
val appended: Sink[Int] = open.to(closedSink)
|
||||
"appended.run()" shouldNot compile
|
||||
"appended.connect(Sink.future[Int])" shouldNot compile
|
||||
"appended.connect(Sink.head[Int])" shouldNot compile
|
||||
intSeq.to(appended).run
|
||||
}
|
||||
"be appended to Source" in {
|
||||
|
|
@ -70,7 +70,7 @@ class FlowCompileSpec extends AkkaSpec {
|
|||
intSeq.to(openSource)
|
||||
}
|
||||
"not accept Sink" in {
|
||||
"openSource.connect(Sink.future[String])" shouldNot compile
|
||||
"openSource.connect(Sink.head[String])" shouldNot compile
|
||||
}
|
||||
"not run()" in {
|
||||
"openSource.run()" shouldNot compile
|
||||
|
|
@ -92,7 +92,7 @@ class FlowCompileSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"RunnableFlow" should {
|
||||
Sink.future[String]
|
||||
Sink.head[String]
|
||||
val closed: RunnableFlow =
|
||||
Source(Seq(1, 2, 3)).map(_.toString).to(Sink.publisher[String])
|
||||
"run" in {
|
||||
|
|
@ -103,7 +103,7 @@ class FlowCompileSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"not accept Sink" in {
|
||||
"closed.connect(Sink.future[String])" shouldNot compile
|
||||
"closed.connect(Sink.head[String])" shouldNot compile
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ class FlowGraphCompileSpec extends AkkaSpec {
|
|||
val in1 = Source(List("a", "b", "c"))
|
||||
val in2 = Source(List("d", "e", "f"))
|
||||
val out1 = Sink.publisher[String]
|
||||
val out2 = Sink.future[String]
|
||||
val out2 = Sink.head[String]
|
||||
|
||||
"FlowGraph" should {
|
||||
"build simple merge" in {
|
||||
|
|
|
|||
|
|
@ -25,10 +25,10 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
|
|||
|
||||
val testException = new Exception("test") with NoStackTrace
|
||||
|
||||
def newFutureSink = Sink.future[(immutable.Seq[Int], Source[Int])]
|
||||
def newHeadSink = Sink.head[(immutable.Seq[Int], Source[Int])]
|
||||
|
||||
"work on empty input" in {
|
||||
val futureSink = newFutureSink
|
||||
val futureSink = newHeadSink
|
||||
val fut = Source.empty.prefixAndTail(10).runWith(futureSink)
|
||||
val (prefix, tailFlow) = Await.result(fut, 3.seconds)
|
||||
prefix should be(Nil)
|
||||
|
|
@ -38,7 +38,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"work on short input" in {
|
||||
val futureSink = newFutureSink
|
||||
val futureSink = newHeadSink
|
||||
val fut = Source(List(1, 2, 3)).prefixAndTail(10).runWith(futureSink)
|
||||
val (prefix, tailFlow) = Await.result(fut, 3.seconds)
|
||||
prefix should be(List(1, 2, 3))
|
||||
|
|
@ -48,40 +48,40 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"work on longer inputs" in {
|
||||
val futureSink = newFutureSink
|
||||
val futureSink = newHeadSink
|
||||
val fut = Source((1 to 10).iterator).prefixAndTail(5).runWith(futureSink)
|
||||
val (takes, tail) = Await.result(fut, 3.seconds)
|
||||
takes should be(1 to 5)
|
||||
|
||||
val futureSink2 = Sink.future[immutable.Seq[Int]]
|
||||
val futureSink2 = Sink.head[immutable.Seq[Int]]
|
||||
val fut2 = tail.grouped(6).runWith(futureSink2)
|
||||
Await.result(fut2, 3.seconds) should be(6 to 10)
|
||||
}
|
||||
|
||||
"handle zero take count" in {
|
||||
val futureSink = newFutureSink
|
||||
val futureSink = newHeadSink
|
||||
val fut = Source((1 to 10).iterator).prefixAndTail(0).runWith(futureSink)
|
||||
val (takes, tail) = Await.result(fut, 3.seconds)
|
||||
takes should be(Nil)
|
||||
|
||||
val futureSink2 = Sink.future[immutable.Seq[Int]]
|
||||
val futureSink2 = Sink.head[immutable.Seq[Int]]
|
||||
val fut2 = tail.grouped(11).runWith(futureSink2)
|
||||
Await.result(fut2, 3.seconds) should be(1 to 10)
|
||||
}
|
||||
|
||||
"handle negative take count" in {
|
||||
val futureSink = newFutureSink
|
||||
val futureSink = newHeadSink
|
||||
val fut = Source((1 to 10).iterator).prefixAndTail(-1).runWith(futureSink)
|
||||
val (takes, tail) = Await.result(fut, 3.seconds)
|
||||
takes should be(Nil)
|
||||
|
||||
val futureSink2 = Sink.future[immutable.Seq[Int]]
|
||||
val futureSink2 = Sink.head[immutable.Seq[Int]]
|
||||
val fut2 = tail.grouped(11).runWith(futureSink2)
|
||||
Await.result(fut2, 3.seconds) should be(1 to 10)
|
||||
}
|
||||
|
||||
"work if size of take is equal to stream size" in {
|
||||
val futureSink = newFutureSink
|
||||
val futureSink = newHeadSink
|
||||
val fut = Source((1 to 10).iterator).prefixAndTail(10).runWith(futureSink)
|
||||
val (takes, tail) = Await.result(fut, 3.seconds)
|
||||
takes should be(1 to 10)
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ import akka.stream.MaterializerSettings
|
|||
import akka.stream.testkit.{ AkkaSpec, StreamTestKit }
|
||||
import akka.stream.testkit.ScriptedTest
|
||||
|
||||
class FutureSinkSpec extends AkkaSpec with ScriptedTest {
|
||||
class HeadSinkSpec extends AkkaSpec with ScriptedTest {
|
||||
|
||||
val settings = MaterializerSettings(system)
|
||||
.withInputBuffer(initialSize = 2, maxSize = 16)
|
||||
|
|
@ -21,11 +21,11 @@ class FutureSinkSpec extends AkkaSpec with ScriptedTest {
|
|||
|
||||
implicit val materializer = FlowMaterializer(settings)
|
||||
|
||||
"A Flow with Sink.future" must {
|
||||
"A Flow with Sink.head" must {
|
||||
|
||||
"yield the first value" in {
|
||||
val p = StreamTestKit.PublisherProbe[Int]()
|
||||
val f: Future[Int] = Source(p).map(identity).runWith(Sink.future)
|
||||
val f: Future[Int] = Source(p).map(identity).runWith(Sink.head)
|
||||
val proc = p.expectSubscription
|
||||
proc.expectRequest()
|
||||
proc.sendNext(42)
|
||||
|
|
@ -35,7 +35,7 @@ class FutureSinkSpec extends AkkaSpec with ScriptedTest {
|
|||
|
||||
"yield the first value when actively constructing" in {
|
||||
val p = StreamTestKit.PublisherProbe[Int]()
|
||||
val f = Sink.future[Int]
|
||||
val f = Sink.head[Int]
|
||||
val s = Source.subscriber[Int]
|
||||
val m = s.to(f).run()
|
||||
p.subscribe(m.get(s))
|
||||
|
|
@ -48,7 +48,7 @@ class FutureSinkSpec extends AkkaSpec with ScriptedTest {
|
|||
|
||||
"yield the first error" in {
|
||||
val p = StreamTestKit.PublisherProbe[Int]()
|
||||
val f = Source(p).runWith(Sink.future)
|
||||
val f = Source(p).runWith(Sink.head)
|
||||
val proc = p.expectSubscription
|
||||
proc.expectRequest()
|
||||
val ex = new RuntimeException("ex")
|
||||
|
|
@ -59,7 +59,7 @@ class FutureSinkSpec extends AkkaSpec with ScriptedTest {
|
|||
|
||||
"yield NoSuchElementExcption for empty stream" in {
|
||||
val p = StreamTestKit.PublisherProbe[Int]()
|
||||
val f = Source(p).runWith(Sink.future)
|
||||
val f = Source(p).runWith(Sink.head)
|
||||
val proc = p.expectSubscription
|
||||
proc.expectRequest()
|
||||
proc.sendComplete()
|
||||
|
|
|
|||
|
|
@ -115,11 +115,11 @@ class GraphBalanceSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"work with 5-way balance" in {
|
||||
val f1 = Sink.future[Seq[Int]]
|
||||
val f2 = Sink.future[Seq[Int]]
|
||||
val f3 = Sink.future[Seq[Int]]
|
||||
val f4 = Sink.future[Seq[Int]]
|
||||
val f5 = Sink.future[Seq[Int]]
|
||||
val f1 = Sink.head[Seq[Int]]
|
||||
val f2 = Sink.head[Seq[Int]]
|
||||
val f3 = Sink.head[Seq[Int]]
|
||||
val f4 = Sink.head[Seq[Int]]
|
||||
val f5 = Sink.head[Seq[Int]]
|
||||
|
||||
val g = FlowGraph { implicit b ⇒
|
||||
val balance = Balance[Int]("balance", waitForAllDownstreams = true)
|
||||
|
|
|
|||
|
|
@ -48,11 +48,11 @@ class GraphBroadcastSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"work with n-way broadcast" in {
|
||||
val f1 = Sink.future[Seq[Int]]
|
||||
val f2 = Sink.future[Seq[Int]]
|
||||
val f3 = Sink.future[Seq[Int]]
|
||||
val f4 = Sink.future[Seq[Int]]
|
||||
val f5 = Sink.future[Seq[Int]]
|
||||
val f1 = Sink.head[Seq[Int]]
|
||||
val f2 = Sink.head[Seq[Int]]
|
||||
val f3 = Sink.head[Seq[Int]]
|
||||
val f4 = Sink.head[Seq[Int]]
|
||||
val f5 = Sink.head[Seq[Int]]
|
||||
|
||||
val g = FlowGraph { implicit b ⇒
|
||||
val bcast = Broadcast[Int]("broadcast")
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec {
|
|||
"FlowGraphs" must {
|
||||
|
||||
"support broadcast - merge layouts" in {
|
||||
val resultFuture = Sink.future[Seq[Int]]
|
||||
val resultFuture = Sink.head[Seq[Int]]
|
||||
|
||||
val g = FlowGraph { implicit b ⇒
|
||||
val bcast = Broadcast[Int]("broadcast")
|
||||
|
|
@ -83,7 +83,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec {
|
|||
val elements = 0 to 10
|
||||
val in = Source(elements)
|
||||
val f = Flow[Int]
|
||||
val out = Sink.future[Seq[Int]]
|
||||
val out = Sink.head[Seq[Int]]
|
||||
|
||||
val g = FlowGraph { implicit b ⇒
|
||||
val balance = Balance[Int]
|
||||
|
|
@ -101,9 +101,9 @@ class GraphOpsIntegrationSpec extends AkkaSpec {
|
|||
|
||||
"support wikipedia Topological_sorting 2" in {
|
||||
// see https://en.wikipedia.org/wiki/Topological_sorting#mediaviewer/File:Directed_acyclic_graph.png
|
||||
val resultFuture2 = Sink.future[Seq[Int]]
|
||||
val resultFuture9 = Sink.future[Seq[Int]]
|
||||
val resultFuture10 = Sink.future[Seq[Int]]
|
||||
val resultFuture2 = Sink.head[Seq[Int]]
|
||||
val resultFuture9 = Sink.head[Seq[Int]]
|
||||
val resultFuture10 = Sink.head[Seq[Int]]
|
||||
|
||||
val g = FlowGraph { implicit b ⇒
|
||||
val b3 = Broadcast[Int]("b3")
|
||||
|
|
@ -149,7 +149,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"allow adding of flows to sources and sinks to flows" in {
|
||||
val resultFuture = Sink.future[Seq[Int]]
|
||||
val resultFuture = Sink.head[Seq[Int]]
|
||||
|
||||
val g = FlowGraph { implicit b ⇒
|
||||
val bcast = Broadcast[Int]("broadcast")
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ class GraphPreferredMergeSpec extends TwoStreamsSetup {
|
|||
|
||||
val preferred = Source(Stream.fill(numElements)(1))
|
||||
val aux1, aux2, aux3 = Source(Stream.fill(numElements)(2))
|
||||
val sink = Sink.future[Seq[Int]]
|
||||
val sink = Sink.head[Seq[Int]]
|
||||
|
||||
val g = FlowGraph { implicit b ⇒
|
||||
val merge = MergePreferred[Int]
|
||||
|
|
@ -45,7 +45,7 @@ class GraphPreferredMergeSpec extends TwoStreamsSetup {
|
|||
val g = FlowGraph { implicit b ⇒
|
||||
val merge = MergePreferred[Int]
|
||||
|
||||
s1 ~> merge.preferred ~> Sink.future[Int]
|
||||
s1 ~> merge.preferred ~> Sink.head[Int]
|
||||
s2 ~> merge.preferred
|
||||
s3 ~> merge
|
||||
}
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
|
|||
// sleep long enough for it to be cleaned up
|
||||
Thread.sleep(1000)
|
||||
|
||||
val f = s3.runWith(Sink.future).recover { case _: SubscriptionTimeoutException ⇒ "expected" }
|
||||
val f = s3.runWith(Sink.head).recover { case _: SubscriptionTimeoutException ⇒ "expected" }
|
||||
Await.result(f, 300.millis) should equal("expected")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -106,8 +106,8 @@ object Sink {
|
|||
/**
|
||||
* A `Sink` that materializes into a `Future` of the first value received.
|
||||
*/
|
||||
def future[In]: KeyedSink[In, Future[In]] =
|
||||
new KeyedSink(scaladsl.Sink.future[In])
|
||||
def head[In]: KeyedSink[In, Future[In]] =
|
||||
new KeyedSink(scaladsl.Sink.head[In])
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -95,8 +95,8 @@ final case class FanoutPublisherSink[In](initialBufferSize: Int, maximumBufferSi
|
|||
}
|
||||
}
|
||||
|
||||
object FutureSink {
|
||||
def apply[T](): FutureSink[T] = new FutureSink[T]
|
||||
object HeadSink {
|
||||
def apply[T](): HeadSink[T] = new HeadSink[T]
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -106,7 +106,7 @@ object FutureSink {
|
|||
* the Future into the corresponding failed state) or the end-of-stream
|
||||
* (failing the Future with a NoSuchElementException).
|
||||
*/
|
||||
class FutureSink[In] extends KeyedActorFlowSink[In] {
|
||||
class HeadSink[In] extends KeyedActorFlowSink[In] {
|
||||
|
||||
type MaterializedType = Future[In]
|
||||
|
||||
|
|
@ -130,7 +130,7 @@ class FutureSink[In] extends KeyedActorFlowSink[In] {
|
|||
(sub, p.future)
|
||||
}
|
||||
|
||||
override def toString: String = "FutureSink"
|
||||
override def toString: String = "HeadSink"
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ object Sink {
|
|||
/**
|
||||
* A `Sink` that materializes into a `Future` of the first value received.
|
||||
*/
|
||||
def future[T]: FutureSink[T] = FutureSink[T]
|
||||
def head[T]: HeadSink[T] = HeadSink[T]
|
||||
|
||||
/**
|
||||
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]].
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue