=htt,str,doc #20009 Fix for found unsafe graph stages
This commit is contained in:
parent
ed8ba7873c
commit
7fe4b01f01
4 changed files with 7 additions and 4 deletions
|
|
@ -384,7 +384,8 @@ public class GraphStageDocTest extends AbstractJavaTest {
|
||||||
|
|
||||||
|
|
||||||
//#async-side-channel
|
//#async-side-channel
|
||||||
// will close upstream when the future completes
|
// will close upstream in all materializations of the stage instance
|
||||||
|
// when the completion stage completes
|
||||||
public class KillSwitch<A> extends GraphStage<FlowShape<A, A>> {
|
public class KillSwitch<A> extends GraphStage<FlowShape<A, A>> {
|
||||||
|
|
||||||
private final CompletionStage<Done> switchF;
|
private final CompletionStage<Done> switchF;
|
||||||
|
|
|
||||||
|
|
@ -274,7 +274,8 @@ class GraphStageDocSpec extends AkkaSpec {
|
||||||
import system.dispatcher
|
import system.dispatcher
|
||||||
|
|
||||||
//#async-side-channel
|
//#async-side-channel
|
||||||
// will close upstream when the future completes
|
// will close upstream in all materializations of the graph stage instance
|
||||||
|
// when the future completes
|
||||||
class KillSwitch[A](switch: Future[Unit]) extends GraphStage[FlowShape[A, A]] {
|
class KillSwitch[A](switch: Future[Unit]) extends GraphStage[FlowShape[A, A]] {
|
||||||
|
|
||||||
val in = Inlet[A]("KillSwitch.in")
|
val in = Inlet[A]("KillSwitch.in")
|
||||||
|
|
|
||||||
|
|
@ -115,10 +115,11 @@ private[http] object StreamUtils {
|
||||||
def limitByteChunksStage(maxBytesPerChunk: Int): GraphStage[FlowShape[ByteString, ByteString]] =
|
def limitByteChunksStage(maxBytesPerChunk: Int): GraphStage[FlowShape[ByteString, ByteString]] =
|
||||||
new SimpleLinearGraphStage[ByteString] {
|
new SimpleLinearGraphStage[ByteString] {
|
||||||
override def initialAttributes = Attributes.name("limitByteChunksStage")
|
override def initialAttributes = Attributes.name("limitByteChunksStage")
|
||||||
var remaining = ByteString.empty
|
|
||||||
|
|
||||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
|
||||||
|
|
||||||
|
var remaining = ByteString.empty
|
||||||
|
|
||||||
def splitAndPush(elem: ByteString): Unit = {
|
def splitAndPush(elem: ByteString): Unit = {
|
||||||
val toPush = remaining.take(maxBytesPerChunk)
|
val toPush = remaining.take(maxBytesPerChunk)
|
||||||
val toKeep = remaining.drop(maxBytesPerChunk)
|
val toKeep = remaining.drop(maxBytesPerChunk)
|
||||||
|
|
|
||||||
|
|
@ -76,9 +76,9 @@ private[stream] object Timers {
|
||||||
}
|
}
|
||||||
|
|
||||||
final class Idle[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
|
final class Idle[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
|
||||||
private var nextDeadline: Deadline = Deadline.now + timeout
|
|
||||||
|
|
||||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
|
||||||
|
private var nextDeadline: Deadline = Deadline.now + timeout
|
||||||
setHandler(in, new InHandler {
|
setHandler(in, new InHandler {
|
||||||
override def onPush(): Unit = {
|
override def onPush(): Unit = {
|
||||||
nextDeadline = Deadline.now + timeout
|
nextDeadline = Deadline.now + timeout
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue