diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 1d30b8cf57..e514bce640 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -296,7 +296,7 @@ private[http] object HttpServerBluePrint { val flow = Flow[ByteString] .transform[FrameEvent](() ⇒ new FrameEventParser) - .via(Flow.wrap(sink, source)((_, _) ⇒ ())) + .via(Flow.wrap(sink, source)(Keep.none)) .transform(() ⇒ new FrameEventRenderer) new WebsocketSetup { diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebsocketServerSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebsocketServerSpec.scala index eb2e669695..d05e6d7cba 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebsocketServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebsocketServerSpec.scala @@ -6,7 +6,7 @@ package akka.http.impl.engine.ws import akka.http.impl.engine.ws.Protocol.Opcode import akka.http.scaladsl.model.ws._ -import akka.stream.scaladsl.{ Sink, Flow, Source } +import akka.stream.scaladsl.{ Keep, Sink, Flow, Source } import akka.stream.testkit.Utils import akka.util.ByteString import org.scalatest.{ Matchers, FreeSpec } @@ -42,7 +42,7 @@ class WebsocketServerSpec extends FreeSpec with Matchers with WithMaterializerSp val source = Source(List(1, 2, 3, 4, 5)).map(num ⇒ TextMessage.Strict(s"Message $num")) - val handler = Flow.wrap(Sink.ignore, source)((_, _) ⇒ ()) + val handler = Flow.wrap(Sink.ignore, source)(Keep.none) val response = upgrade.get.handleMessages(handler) responsesSub.sendNext(response) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala index 017372e67c..2dce7bb5a2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala @@ -27,8 +27,6 @@ class StreamLayoutSpec extends AkkaSpec { def testSource(): Module = testAtomic(0, 1) def testSink(): Module = testAtomic(1, 0) - val ignore: (Any, Any) ⇒ Any = (x, y) ⇒ () - "StreamLayout" must { "be able to model simple linear stages" in { @@ -42,7 +40,7 @@ class StreamLayoutSpec extends AkkaSpec { stage1.isSource should be(false) val stage2 = testStage() - val flow12 = stage1.grow(stage2, ignore).connect(stage1.outPorts.head, stage2.inPorts.head) + val flow12 = stage1.grow(stage2, Keep.none).connect(stage1.outPorts.head, stage2.inPorts.head) flow12.inPorts should be(stage1.inPorts) flow12.outPorts should be(stage2.outPorts) @@ -67,7 +65,7 @@ class StreamLayoutSpec extends AkkaSpec { sink3.isSink should be(true) sink3.isSource should be(false) - val source012 = source0.grow(flow12, ignore).connect(source0.outPorts.head, flow12.inPorts.head) + val source012 = source0.grow(flow12, Keep.none).connect(source0.outPorts.head, flow12.inPorts.head) source012.inPorts.size should be(0) source012.outPorts should be(flow12.outPorts) source012.isRunnable should be(false) @@ -75,7 +73,7 @@ class StreamLayoutSpec extends AkkaSpec { source012.isSink should be(false) source012.isSource should be(true) - val sink123 = flow12.grow(sink3, ignore).connect(flow12.outPorts.head, sink3.inPorts.head) + val sink123 = flow12.grow(sink3, Keep.none).connect(flow12.outPorts.head, sink3.inPorts.head) sink123.inPorts should be(flow12.inPorts) sink123.outPorts.size should be(0) sink123.isRunnable should be(false) @@ -83,13 +81,13 @@ class StreamLayoutSpec extends AkkaSpec { sink123.isSink should be(true) sink123.isSource should be(false) - val runnable0123a = source0.grow(sink123, ignore).connect(source0.outPorts.head, sink123.inPorts.head) - val runnable0123b = source012.grow(sink3, ignore).connect(source012.outPorts.head, sink3.inPorts.head) + val runnable0123a = source0.grow(sink123, Keep.none).connect(source0.outPorts.head, sink123.inPorts.head) + val runnable0123b = source012.grow(sink3, Keep.none).connect(source012.outPorts.head, sink3.inPorts.head) val runnable0123c = source0 - .grow(flow12, ignore).connect(source0.outPorts.head, flow12.inPorts.head) - .grow(sink3, ignore).connect(flow12.outPorts.head, sink3.inPorts.head) + .grow(flow12, Keep.none).connect(source0.outPorts.head, flow12.inPorts.head) + .grow(sink3, Keep.none).connect(flow12.outPorts.head, sink3.inPorts.head) runnable0123a.inPorts.size should be(0) runnable0123a.outPorts.size should be(0) @@ -113,9 +111,9 @@ class StreamLayoutSpec extends AkkaSpec { val stage2 = testStage() val sink = testSink() - val runnable = source.grow(stage1, ignore).connect(source.outPorts.head, stage1.inPorts.head) - .grow(stage2, ignore).connect(stage1.outPorts.head, stage2.inPorts.head) - .grow(sink, ignore).connect(stage2.outPorts.head, sink.inPorts.head) + val runnable = source.grow(stage1, Keep.none).connect(source.outPorts.head, stage1.inPorts.head) + .grow(stage2, Keep.none).connect(stage1.outPorts.head, stage2.inPorts.head) + .grow(sink, Keep.none).connect(stage2.outPorts.head, sink.inPorts.head) checkMaterialized(runnable) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala index eef5b0141e..8291980619 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala @@ -25,7 +25,7 @@ class FlowFoldSpec extends AkkaSpec { "propagate an error" in assertAllStagesStopped { val error = new Exception with NoStackTrace - val future = Source[Unit](() ⇒ throw error).runFold(())((_, _) ⇒ ()) + val future = Source[Unit](() ⇒ throw error).runFold(())(Keep.none) the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Materialization.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Materialization.scala index 2983e2ae40..bd08f4ea16 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Materialization.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Materialization.scala @@ -7,12 +7,16 @@ import akka.japi.function import akka.stream.scaladsl import akka.japi.Pair +import scala.runtime.BoxedUnit + object Keep { private val _left = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = l } private val _right = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = r } private val _both = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = new akka.japi.Pair(l, r) } + private val _none = new function.Function2[Any, Any, Unit] with ((Any, Any) ⇒ Unit) { def apply(l: Any, r: Any) = () } def left[L, R]: function.Function2[L, R, L] = _left.asInstanceOf[function.Function2[L, R, L]] def right[L, R]: function.Function2[L, R, R] = _right.asInstanceOf[function.Function2[L, R, R]] def both[L, R]: function.Function2[L, R, L Pair R] = _both.asInstanceOf[function.Function2[L, R, L Pair R]] + def none[L, R]: function.Function2[L, R, Unit] = _none.asInstanceOf[function.Function2[L, R, Unit]] } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Materialization.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Materialization.scala index d0911e74b3..8537ae93cc 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Materialization.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Materialization.scala @@ -13,8 +13,10 @@ object Keep { private val _left = (l: Any, r: Any) ⇒ l private val _right = (l: Any, r: Any) ⇒ r private val _both = (l: Any, r: Any) ⇒ (l, r) + private val _none = (l: Any, r: Any) ⇒ () def left[L, R]: (L, R) ⇒ L = _left.asInstanceOf[(L, R) ⇒ L] def right[L, R]: (L, R) ⇒ R = _right.asInstanceOf[(L, R) ⇒ R] def both[L, R]: (L, R) ⇒ (L, R) = _both.asInstanceOf[(L, R) ⇒ (L, R)] + def none[L, R]: (L, R) ⇒ Unit = _none.asInstanceOf[(L, R) ⇒ Unit] }