Artery test transport blackhole only let HandshakeReq pass (#30219)
* only let HandshakeReq is blackholed
This commit is contained in:
parent
5abc119577
commit
3dd0c4c86b
1 changed files with 19 additions and 7 deletions
|
|
@ -5,11 +5,10 @@
|
|||
package akka.remote.artery
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import scala.annotation.tailrec
|
||||
|
||||
import akka.actor.Address
|
||||
import akka.event.Logging
|
||||
import akka.remote.artery.OutboundHandshake.HandshakeReq
|
||||
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
||||
import akka.stream.Attributes
|
||||
import akka.stream.FlowShape
|
||||
|
|
@ -173,14 +172,27 @@ private[remote] class InboundTestStage(inboundContext: InboundContext, state: Sh
|
|||
push(out, env)
|
||||
case _ =>
|
||||
// unknown, handshake not completed
|
||||
if (state.anyBlackholePresent())
|
||||
if (state.anyBlackholePresent()) {
|
||||
env.message match {
|
||||
case _: HandshakeReq =>
|
||||
log.debug(
|
||||
"inbound message [{}] before handshake completed, cannot check if remote is blackholed, letting through",
|
||||
Logging.messageClassName(env.message))
|
||||
push(out, env) // let it through
|
||||
|
||||
case anyOther =>
|
||||
log.debug(
|
||||
"dropping inbound message [{}] with UID [{}] because of blackhole",
|
||||
Logging.messageClassName(anyOther),
|
||||
env.originUid)
|
||||
pull(in) // drop message
|
||||
}
|
||||
} else {
|
||||
push(out, env)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// OutHandler
|
||||
override def onPull(): Unit = pull(in)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue