Merge pull request #1453 from akka/wip-3363-remotingpsec-failure-ban
Make associate in TestTransport wait for the association event linstener #3363
This commit is contained in:
commit
e92e698bd9
1 changed files with 23 additions and 22 deletions
|
|
@ -44,36 +44,37 @@ class TestTransport(
|
|||
private val associationListenerPromise = Promise[AssociationEventListener]()
|
||||
|
||||
private def defaultListen: Future[(Address, Promise[AssociationEventListener])] = {
|
||||
associationListenerPromise.future.onSuccess {
|
||||
case listener: AssociationEventListener ⇒ registry.registerTransport(this, listener)
|
||||
}
|
||||
registry.registerTransport(this, associationListenerPromise.future)
|
||||
Future.successful((localAddress, associationListenerPromise))
|
||||
}
|
||||
|
||||
private def defaultAssociate(remoteAddress: Address): Future[AssociationHandle] = {
|
||||
registry.transportFor(remoteAddress) match {
|
||||
|
||||
case Some((remoteTransport, remoteListener)) ⇒
|
||||
case Some((remoteTransport, remoteListenerFuture)) ⇒
|
||||
val (localHandle, remoteHandle) = createHandlePair(remoteTransport, remoteAddress)
|
||||
localHandle.writable = false
|
||||
remoteHandle.writable = false
|
||||
|
||||
// Pass a non-writable handle to remote first
|
||||
remoteListener notify InboundAssociation(remoteHandle)
|
||||
val remoteHandlerFuture = remoteHandle.readHandlerPromise.future
|
||||
remoteListenerFuture flatMap {
|
||||
case listener ⇒
|
||||
listener notify InboundAssociation(remoteHandle)
|
||||
val remoteHandlerFuture = remoteHandle.readHandlerPromise.future
|
||||
|
||||
// Registration of reader at local finishes the registration and enables communication
|
||||
for {
|
||||
remoteListener ← remoteHandlerFuture
|
||||
localListener ← localHandle.readHandlerPromise.future
|
||||
} {
|
||||
registry.registerListenerPair(localHandle.key, (localListener, remoteListener))
|
||||
localHandle.writable = true
|
||||
remoteHandle.writable = true
|
||||
// Registration of reader at local finishes the registration and enables communication
|
||||
for {
|
||||
remoteListener ← remoteHandlerFuture
|
||||
localListener ← localHandle.readHandlerPromise.future
|
||||
} {
|
||||
registry.registerListenerPair(localHandle.key, (localListener, remoteListener))
|
||||
localHandle.writable = true
|
||||
remoteHandle.writable = true
|
||||
}
|
||||
|
||||
remoteHandlerFuture.map { _ ⇒ localHandle }
|
||||
}
|
||||
|
||||
remoteHandlerFuture.map { _ ⇒ localHandle }
|
||||
|
||||
case None ⇒
|
||||
Future.failed(new InvalidAssociationException(s"No registered transport: $remoteAddress", null))
|
||||
}
|
||||
|
|
@ -285,7 +286,7 @@ object TestTransport {
|
|||
class AssociationRegistry {
|
||||
|
||||
private val activityLog = new CopyOnWriteArrayList[Activity]()
|
||||
private val transportTable = new ConcurrentHashMap[Address, (TestTransport, AssociationEventListener)]()
|
||||
private val transportTable = new ConcurrentHashMap[Address, (TestTransport, Future[AssociationEventListener])]()
|
||||
private val listenersTable = new ConcurrentHashMap[(Address, Address), (HandleEventListener, HandleEventListener)]()
|
||||
|
||||
/**
|
||||
|
|
@ -336,11 +337,11 @@ object TestTransport {
|
|||
*
|
||||
* @param transport
|
||||
* The transport that is to be registered. The address of this transport will be used as key.
|
||||
* @param associationEventListener
|
||||
* The listener that will handle the events for the given transport.
|
||||
* @param associationEventListenerFuture
|
||||
* The future that will be completed with the listener that will handle the events for the given transport.
|
||||
*/
|
||||
def registerTransport(transport: TestTransport, associationEventListener: AssociationEventListener): Unit = {
|
||||
transportTable.put(transport.localAddress, (transport, associationEventListener))
|
||||
def registerTransport(transport: TestTransport, associationEventListenerFuture: Future[AssociationEventListener]): Unit = {
|
||||
transportTable.put(transport.localAddress, (transport, associationEventListenerFuture))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -410,7 +411,7 @@ object TestTransport {
|
|||
* @param address The address bound to the transport.
|
||||
* @return The transport if exists.
|
||||
*/
|
||||
def transportFor(address: Address): Option[(TestTransport, AssociationEventListener)] =
|
||||
def transportFor(address: Address): Option[(TestTransport, Future[AssociationEventListener])] =
|
||||
Option(transportTable.get(address))
|
||||
|
||||
/**
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue