+str #17702: Add Keep.none
This commit is contained in:
parent
0e6694a102
commit
f3bcf89c52
6 changed files with 20 additions and 16 deletions
|
|
@ -296,7 +296,7 @@ private[http] object HttpServerBluePrint {
|
|||
val flow =
|
||||
Flow[ByteString]
|
||||
.transform[FrameEvent](() ⇒ new FrameEventParser)
|
||||
.via(Flow.wrap(sink, source)((_, _) ⇒ ()))
|
||||
.via(Flow.wrap(sink, source)(Keep.none))
|
||||
.transform(() ⇒ new FrameEventRenderer)
|
||||
|
||||
new WebsocketSetup {
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ package akka.http.impl.engine.ws
|
|||
|
||||
import akka.http.impl.engine.ws.Protocol.Opcode
|
||||
import akka.http.scaladsl.model.ws._
|
||||
import akka.stream.scaladsl.{ Sink, Flow, Source }
|
||||
import akka.stream.scaladsl.{ Keep, Sink, Flow, Source }
|
||||
import akka.stream.testkit.Utils
|
||||
import akka.util.ByteString
|
||||
import org.scalatest.{ Matchers, FreeSpec }
|
||||
|
|
@ -42,7 +42,7 @@ class WebsocketServerSpec extends FreeSpec with Matchers with WithMaterializerSp
|
|||
|
||||
val source =
|
||||
Source(List(1, 2, 3, 4, 5)).map(num ⇒ TextMessage.Strict(s"Message $num"))
|
||||
val handler = Flow.wrap(Sink.ignore, source)((_, _) ⇒ ())
|
||||
val handler = Flow.wrap(Sink.ignore, source)(Keep.none)
|
||||
val response = upgrade.get.handleMessages(handler)
|
||||
responsesSub.sendNext(response)
|
||||
|
||||
|
|
|
|||
|
|
@ -27,8 +27,6 @@ class StreamLayoutSpec extends AkkaSpec {
|
|||
def testSource(): Module = testAtomic(0, 1)
|
||||
def testSink(): Module = testAtomic(1, 0)
|
||||
|
||||
val ignore: (Any, Any) ⇒ Any = (x, y) ⇒ ()
|
||||
|
||||
"StreamLayout" must {
|
||||
|
||||
"be able to model simple linear stages" in {
|
||||
|
|
@ -42,7 +40,7 @@ class StreamLayoutSpec extends AkkaSpec {
|
|||
stage1.isSource should be(false)
|
||||
|
||||
val stage2 = testStage()
|
||||
val flow12 = stage1.grow(stage2, ignore).connect(stage1.outPorts.head, stage2.inPorts.head)
|
||||
val flow12 = stage1.grow(stage2, Keep.none).connect(stage1.outPorts.head, stage2.inPorts.head)
|
||||
|
||||
flow12.inPorts should be(stage1.inPorts)
|
||||
flow12.outPorts should be(stage2.outPorts)
|
||||
|
|
@ -67,7 +65,7 @@ class StreamLayoutSpec extends AkkaSpec {
|
|||
sink3.isSink should be(true)
|
||||
sink3.isSource should be(false)
|
||||
|
||||
val source012 = source0.grow(flow12, ignore).connect(source0.outPorts.head, flow12.inPorts.head)
|
||||
val source012 = source0.grow(flow12, Keep.none).connect(source0.outPorts.head, flow12.inPorts.head)
|
||||
source012.inPorts.size should be(0)
|
||||
source012.outPorts should be(flow12.outPorts)
|
||||
source012.isRunnable should be(false)
|
||||
|
|
@ -75,7 +73,7 @@ class StreamLayoutSpec extends AkkaSpec {
|
|||
source012.isSink should be(false)
|
||||
source012.isSource should be(true)
|
||||
|
||||
val sink123 = flow12.grow(sink3, ignore).connect(flow12.outPorts.head, sink3.inPorts.head)
|
||||
val sink123 = flow12.grow(sink3, Keep.none).connect(flow12.outPorts.head, sink3.inPorts.head)
|
||||
sink123.inPorts should be(flow12.inPorts)
|
||||
sink123.outPorts.size should be(0)
|
||||
sink123.isRunnable should be(false)
|
||||
|
|
@ -83,13 +81,13 @@ class StreamLayoutSpec extends AkkaSpec {
|
|||
sink123.isSink should be(true)
|
||||
sink123.isSource should be(false)
|
||||
|
||||
val runnable0123a = source0.grow(sink123, ignore).connect(source0.outPorts.head, sink123.inPorts.head)
|
||||
val runnable0123b = source012.grow(sink3, ignore).connect(source012.outPorts.head, sink3.inPorts.head)
|
||||
val runnable0123a = source0.grow(sink123, Keep.none).connect(source0.outPorts.head, sink123.inPorts.head)
|
||||
val runnable0123b = source012.grow(sink3, Keep.none).connect(source012.outPorts.head, sink3.inPorts.head)
|
||||
|
||||
val runnable0123c =
|
||||
source0
|
||||
.grow(flow12, ignore).connect(source0.outPorts.head, flow12.inPorts.head)
|
||||
.grow(sink3, ignore).connect(flow12.outPorts.head, sink3.inPorts.head)
|
||||
.grow(flow12, Keep.none).connect(source0.outPorts.head, flow12.inPorts.head)
|
||||
.grow(sink3, Keep.none).connect(flow12.outPorts.head, sink3.inPorts.head)
|
||||
|
||||
runnable0123a.inPorts.size should be(0)
|
||||
runnable0123a.outPorts.size should be(0)
|
||||
|
|
@ -113,9 +111,9 @@ class StreamLayoutSpec extends AkkaSpec {
|
|||
val stage2 = testStage()
|
||||
val sink = testSink()
|
||||
|
||||
val runnable = source.grow(stage1, ignore).connect(source.outPorts.head, stage1.inPorts.head)
|
||||
.grow(stage2, ignore).connect(stage1.outPorts.head, stage2.inPorts.head)
|
||||
.grow(sink, ignore).connect(stage2.outPorts.head, sink.inPorts.head)
|
||||
val runnable = source.grow(stage1, Keep.none).connect(source.outPorts.head, stage1.inPorts.head)
|
||||
.grow(stage2, Keep.none).connect(stage1.outPorts.head, stage2.inPorts.head)
|
||||
.grow(sink, Keep.none).connect(stage2.outPorts.head, sink.inPorts.head)
|
||||
|
||||
checkMaterialized(runnable)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ class FlowFoldSpec extends AkkaSpec {
|
|||
|
||||
"propagate an error" in assertAllStagesStopped {
|
||||
val error = new Exception with NoStackTrace
|
||||
val future = Source[Unit](() ⇒ throw error).runFold(())((_, _) ⇒ ())
|
||||
val future = Source[Unit](() ⇒ throw error).runFold(())(Keep.none)
|
||||
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,12 +7,16 @@ import akka.japi.function
|
|||
import akka.stream.scaladsl
|
||||
import akka.japi.Pair
|
||||
|
||||
import scala.runtime.BoxedUnit
|
||||
|
||||
object Keep {
|
||||
private val _left = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = l }
|
||||
private val _right = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = r }
|
||||
private val _both = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = new akka.japi.Pair(l, r) }
|
||||
private val _none = new function.Function2[Any, Any, Unit] with ((Any, Any) ⇒ Unit) { def apply(l: Any, r: Any) = () }
|
||||
|
||||
def left[L, R]: function.Function2[L, R, L] = _left.asInstanceOf[function.Function2[L, R, L]]
|
||||
def right[L, R]: function.Function2[L, R, R] = _right.asInstanceOf[function.Function2[L, R, R]]
|
||||
def both[L, R]: function.Function2[L, R, L Pair R] = _both.asInstanceOf[function.Function2[L, R, L Pair R]]
|
||||
def none[L, R]: function.Function2[L, R, Unit] = _none.asInstanceOf[function.Function2[L, R, Unit]]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,8 +13,10 @@ object Keep {
|
|||
private val _left = (l: Any, r: Any) ⇒ l
|
||||
private val _right = (l: Any, r: Any) ⇒ r
|
||||
private val _both = (l: Any, r: Any) ⇒ (l, r)
|
||||
private val _none = (l: Any, r: Any) ⇒ ()
|
||||
|
||||
def left[L, R]: (L, R) ⇒ L = _left.asInstanceOf[(L, R) ⇒ L]
|
||||
def right[L, R]: (L, R) ⇒ R = _right.asInstanceOf[(L, R) ⇒ R]
|
||||
def both[L, R]: (L, R) ⇒ (L, R) = _both.asInstanceOf[(L, R) ⇒ (L, R)]
|
||||
def none[L, R]: (L, R) ⇒ Unit = _none.asInstanceOf[(L, R) ⇒ Unit]
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue