=str more specific error type if idle-timeout triggers in tcp

This commit is contained in:
Konrad Malawski 2016-12-12 17:57:14 +01:00 committed by Konrad `ktoso` Malawski
parent 4207682624
commit e7e1f74427
12 changed files with 336 additions and 5 deletions

View file

@ -743,6 +743,8 @@ recover
^^^^^^^
Allow sending of one last element downstream when a failure has happened upstream.
Throwing an exception inside ``recover`` _will_ be logged on ERROR level automatically.
**emits** when the element is available from the upstream or upstream is failed and pf returns an element
**backpressures** when downstream backpressures, not when failure happened
@ -753,12 +755,45 @@ recoverWith
^^^^^^^^^^^
Allow switching to alternative Source when a failure has happened upstream.
Throwing an exception inside ``recoverWith`` _will_ be logged on ERROR level automatically.
**emits** the element is available from the upstream or upstream is failed and pf returns alternative Source
**backpressures** downstream backpressures, after failure happened it backprssures to alternative Source
**completes** upstream completes or upstream failed with exception pf can handle
recoverWithRetries
^^^^^^^^^^^^^^^^^^
RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
a failure has been recovered up to `attempts` number of times so that each time there is a failure
it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't
attempt to recover at all. Passing -1 will behave exactly the same as `recoverWith`.
Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
This stage can recover the failure signal, but not the skipped elements, which will be dropped.
**emits** when element is available from the upstream or upstream is failed and element is available from alternative Source
**backpressures** when downstream backpressures
**completes** when upstream completes or upstream failed with exception pf can handle
mapError
^^^^^^^^
While similar to ``recover`` this stage can be used to transform an error signal to a different one *without* logging
it as an error in the process. So in that sense it is NOT exactly equivalent to ``recover(t -> throw t2)`` since recover
would log the ``t2`` error.
Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
This stage can recover the failure signal, but not the skipped elements, which will be dropped.
Similarily to ``recover`` throwing an exception inside ``mapError`` _will_ be logged on ERROR level automatically.
**emits** when element is available from the upstream or upstream is failed and pf returns an element
**backpressures** when downstream backpressures
**completes** when upstream completes or upstream failed with exception pf can handle
detach
^^^^^^
Detach upstream demand from downstream demand without detaching the stream rates.

View file

@ -732,6 +732,8 @@ recover
^^^^^^^
Allow sending of one last element downstream when a failure has happened upstream.
Throwing an exception inside ``recover`` _will_ be logged on ERROR level automatically.
**emits** when the element is available from the upstream or upstream is failed and pf returns an element
**backpressures** when downstream backpressures, not when failure happened
@ -742,12 +744,45 @@ recoverWith
^^^^^^^^^^^
Allow switching to alternative Source when a failure has happened upstream.
Throwing an exception inside ``recoverWith`` _will_ be logged on ERROR level automatically.
**emits** the element is available from the upstream or upstream is failed and pf returns alternative Source
**backpressures** downstream backpressures, after failure happened it backprssures to alternative Source
**completes** upstream completes or upstream failed with exception pf can handle
recoverWithRetries
^^^^^^^^^^^^^^^^^^
RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
a failure has been recovered up to `attempts` number of times so that each time there is a failure
it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't
attempt to recover at all. Passing -1 will behave exactly the same as `recoverWith`.
Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
This stage can recover the failure signal, but not the skipped elements, which will be dropped.
**emits** when element is available from the upstream or upstream is failed and element is available from alternative Source
**backpressures** when downstream backpressures
**completes** when upstream completes or upstream failed with exception pf can handle
mapError
^^^^^^^^
While similar to ``recover`` this stage can be used to transform an error signal to a different one *without* logging
it as an error in the process. So in that sense it is NOT exactly equivalent to ``recover(t => throw t2)`` since recover
would log the ``t2`` error.
Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
This stage can recover the failure signal, but not the skipped elements, which will be dropped.
Similarily to ``recover`` throwing an exception inside ``mapError`` _will_ be logged on ERROR level automatically.
**emits** when element is available from the upstream or upstream is failed and pf returns an element
**backpressures** when downstream backpressures
**completes** when upstream completes or upstream failed with exception pf can handle
detach
^^^^^^
Detach upstream demand from downstream demand without detaching the stream rates.

View file

@ -0,0 +1,68 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit.StreamSpec
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import scala.util.control.NoStackTrace
class FlowMapErrorSpec extends StreamSpec {
val settings = ActorMaterializerSettings(system).withInputBuffer(initialSize = 1, maxSize = 1)
implicit val materializer = ActorMaterializer(settings)
val ex = new RuntimeException("ex") with NoStackTrace
val boom = new Exception("BOOM!") with NoStackTrace
"A MapError" must {
"mapError when there is a handler" in assertAllStagesStopped {
Source(1 to 4).map { a if (a == 3) throw ex else a }
.mapError { case t: Throwable boom }
.runWith(TestSink.probe[Int])
.request(3)
.expectNext(1)
.expectNext(2)
.expectError(boom)
}
"fail the stream with exception thrown in handler (and log it)" in assertAllStagesStopped {
Source(1 to 3).map { a if (a == 2) throw ex else a }
.mapError { case t: Exception throw boom }
.runWith(TestSink.probe[Int])
.requestNext(1)
.request(1)
.expectError(boom)
}
"pass through the original exception if partial function does not handle it" in assertAllStagesStopped {
Source(1 to 3).map { a if (a == 2) throw ex else a }
.mapError { case t: IndexOutOfBoundsException boom }
.runWith(TestSink.probe[Int])
.requestNext(1)
.request(1)
.expectError(ex)
}
"not influence stream when there is no exceptions" in assertAllStagesStopped {
Source(1 to 3).map(identity)
.mapError { case t: Throwable boom }
.runWith(TestSink.probe[Int])
.request(3)
.expectNextN(1 to 3)
.expectComplete()
}
"finish stream if it's empty" in assertAllStagesStopped {
Source.empty.map(identity)
.mapError { case t: Throwable boom }
.runWith(TestSink.probe[Int])
.request(1)
.expectComplete()
}
}
}

View file

@ -269,6 +269,28 @@ final case class Recover[T](pf: PartialFunction[Throwable, T]) extends SimpleLin
}
}
/**
* Maps error with the provided function if it is defined for an error or, otherwise, passes it on unchanged.
*
* While similar to [[Recover]] this stage can be used to transform an error signal to a different one *without* logging
* it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
* would log the `t2` error.
*/
final case class MapError[T](f: PartialFunction[Throwable, Throwable]) extends SimpleLinearGraphStage[T] {
override def createLogic(attr: Attributes) =
new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = push(out, grab(in))
override def onUpstreamFailure(ex: Throwable): Unit =
if (f.isDefinedAt(ex)) super.onUpstreamFailure(f(ex))
else super.onUpstreamFailure(ex)
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
/**
* INTERNAL API
*/

View file

@ -4,8 +4,10 @@
package akka.stream.impl.io
import java.net.InetSocketAddress
import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }
import akka.NotUsed
import akka.actor.{ ActorRef, Terminated }
import akka.dispatch.ExecutionContexts
import akka.io.Inet.SocketOption
@ -15,13 +17,14 @@ import akka.stream._
import akka.stream.impl.ReactiveStreamsCompliance
import akka.stream.impl.fusing.GraphStages.detacher
import akka.stream.scaladsl.Tcp.{ OutgoingConnection, ServerBinding }
import akka.stream.scaladsl.{ BidiFlow, Flow, Tcp StreamTcp }
import akka.stream.scaladsl.{ BidiFlow, Flow, TcpIdleTimeoutException, Tcp StreamTcp }
import akka.stream.stage._
import akka.util.ByteString
import scala.collection.immutable
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.{ Future, Promise }
import scala.util.Try
/**
* INTERNAL API
@ -111,7 +114,7 @@ private[stream] class ConnectionSourceStage(
// FIXME: Previous code was wrong, must add new tests
val handler = idleTimeout match {
case d: FiniteDuration tcpFlow.join(BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](d))
case d: FiniteDuration tcpFlow.join(TcpIdleTimeout(d, Some(connected.remoteAddress)))
case _ tcpFlow
}
@ -354,3 +357,26 @@ private[stream] class OutgoingConnectionStage(
override def toString = s"TCP-to($remoteAddress)"
}
/** INTERNAL API */
private[akka] object TcpIdleTimeout {
def apply(idleTimeout: FiniteDuration, remoteAddress: Option[InetSocketAddress]): BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = {
val connectionToString = remoteAddress match {
case Some(addr) s"on connection to [$addr]"
case _ ""
}
val toNetTimeout: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] =
BidiFlow.fromFlows(
Flow[ByteString].mapError { case t: TimeoutException new TcpIdleTimeoutException(s"TCP idle-timeout encountered $connectionToString, no bytes passed in the last $idleTimeout", idleTimeout) },
Flow[ByteString]
)
val fromNetTimeout: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] =
BidiFlow.fromFlows(
Flow[ByteString],
Flow[ByteString].mapError { case t: TimeoutException new TcpIdleTimeoutException(s"TCP idle-timeout encountered $connectionToString, no bytes passed in the last $idleTimeout", idleTimeout) }
)
fromNetTimeout atop BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](idleTimeout) atop toNetTimeout
}
}

View file

@ -835,6 +835,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recover` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
*
* '''Backpressures when''' downstream backpressures
@ -846,6 +848,28 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def recover[T >: Out](pf: PartialFunction[Throwable, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.recover(pf))
/**
* While similar to [[recover]] this stage can be used to transform an error signal to a different one *without* logging
* it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
* would log the `t2` error.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Similarily to [[recover]] throwing an exception inside `mapError` _will_ be logged.
*
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def mapError(pf: PartialFunction[Throwable, Throwable]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.mapError(pf))
/**
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
@ -854,6 +878,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
@ -877,6 +903,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recoverWithRetries` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*

View file

@ -929,6 +929,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recover` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
*
* '''Backpressures when''' downstream backpressures
@ -941,6 +943,28 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def recover[T >: Out](pf: PartialFunction[Throwable, T]): javadsl.Source[T, Mat] =
new Source(delegate.recover(pf))
/**
* While similar to [[recover]] this stage can be used to transform an error signal to a different one *without* logging
* it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
* would log the `t2` error.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Similarily to [[recover]] throwing an exception inside `mapError` _will_ be logged.
*
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def mapError(pf: PartialFunction[Throwable, Throwable]): javadsl.Source[Out, Mat] =
new Source(delegate.mapError(pf))
/**
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
@ -949,6 +973,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
@ -971,6 +997,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recoverWithRetries` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*

View file

@ -8,14 +8,18 @@ import akka.event.LoggingAdapter
import akka.japi.function
import akka.stream._
import akka.stream.impl.ConstantFun
import scala.collection.JavaConverters._
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.duration.FiniteDuration
import akka.japi.Util
import java.util.Comparator
import scala.compat.java8.FutureConverters._
import java.util.concurrent.CompletionStage
import akka.stream.impl.fusing.MapError
/**
* A stream of streams sub-flow of data elements, e.g. produced by `groupBy`.
* SubFlows cannot contribute to the super-flows materialized value since they
@ -664,6 +668,8 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recover` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
*
* '''Backpressures when''' downstream backpressures
@ -684,6 +690,8 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside ``recoverWith`` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
@ -707,6 +715,8 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recoverWithRetries` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
@ -720,6 +730,28 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubFlow[In, T, Mat @uncheckedVariance] =
new SubFlow(delegate.recoverWithRetries(attempts, pf))
/**
* While similar to [[recover]] this stage can be used to transform an error signal to a different one *without* logging
* it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
* would log the `t2` error.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Similarily to [[recover]] throwing an exception inside `mapError` _will_ be logged.
*
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def mapError(pf: PartialFunction[Throwable, Throwable]): SubFlow[In, Out, Mat @uncheckedVariance] =
new SubFlow(delegate.mapError(pf))
/**
* Terminate processing (and cancel the upstream publisher) after the given
* number of elements. Due to input buffering some elements may have been

View file

@ -8,12 +8,16 @@ import akka.event.LoggingAdapter
import akka.japi.function
import akka.stream._
import akka.stream.impl.ConstantFun
import scala.collection.JavaConverters._
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.duration.FiniteDuration
import akka.japi.Util
import java.util.Comparator
import java.util.concurrent.CompletionStage
import akka.stream.impl.fusing.MapError
import scala.compat.java8.FutureConverters._
/**
@ -718,6 +722,28 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubSource[T, Mat @uncheckedVariance] =
new SubSource(delegate.recoverWithRetries(attempts, pf))
/**
* While similar to [[recover]] this stage can be used to transform an error signal to a different one *without* logging
* it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
* would log the `t2` error.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Similarily to [[recover]] throwing an exception inside `mapError` _will_ be logged.
*
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def mapError(pf: PartialFunction[Throwable, Throwable]): SubSource[Out, Mat] =
new SubSource(delegate.mapError(pf))
/**
* Terminate processing (and cancel the upstream publisher) after the given
* number of elements. Due to input buffering some elements may have been

View file

@ -238,4 +238,5 @@ object BidiFlow {
*/
def bidirectionalIdleTimeout[I, O](timeout: FiniteDuration): BidiFlow[I, I, O, O, NotUsed] =
fromGraph(new Timers.IdleTimeoutBidi(timeout))
}

View file

@ -405,6 +405,8 @@ trait FlowOps[+Out, +Mat] {
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recover` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
*
* '''Backpressures when''' downstream backpressures
@ -424,6 +426,8 @@ trait FlowOps[+Out, +Mat] {
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
@ -447,6 +451,8 @@ trait FlowOps[+Out, +Mat] {
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recoverWithRetries` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
@ -464,6 +470,27 @@ trait FlowOps[+Out, +Mat] {
def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] =
via(new RecoverWith(attempts, pf))
/**
* While similar to [[recover]] this stage can be used to transform an error signal to a different one *without* logging
* it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
* would log the `t2` error.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Similarily to [[recover]] throwing an exception inside `mapError` _will_ be logged.
*
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def mapError(pf: PartialFunction[Throwable, Throwable]): Repr[Out] = via(MapError(pf))
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step.

View file

@ -4,6 +4,7 @@
package akka.stream.scaladsl
import java.net.InetSocketAddress
import java.util.concurrent.TimeoutException
import akka.NotUsed
import akka.actor._
@ -11,7 +12,7 @@ import akka.io.Inet.SocketOption
import akka.io.{ IO, Tcp IoTcp }
import akka.stream._
import akka.stream.impl.fusing.GraphStages.detacher
import akka.stream.impl.io.{ ConnectionSourceStage, OutgoingConnectionStage }
import akka.stream.impl.io.{ ConnectionSourceStage, OutgoingConnectionStage, TcpIdleTimeout }
import akka.util.ByteString
import scala.collection.immutable
@ -172,7 +173,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
connectTimeout)).via(detacher[ByteString]) // must read ahead for proper completions
idleTimeout match {
case d: FiniteDuration tcpFlow.join(BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](d))
case d: FiniteDuration tcpFlow.join(TcpIdleTimeout(d, Some(remoteAddress)))
case _ tcpFlow
}
@ -185,3 +186,5 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
def outgoingConnection(host: String, port: Int): Flow[ByteString, ByteString, Future[OutgoingConnection]] =
outgoingConnection(InetSocketAddress.createUnresolved(host, port))
}
final class TcpIdleTimeoutException(msg: String, timeout: Duration) extends TimeoutException(msg: String)