diff options
author | David Robertson <davidr@element.io> | 2022-08-21 23:02:58 +0100 |
---|---|---|
committer | David Robertson <davidr@element.io> | 2022-08-21 23:02:58 +0100 |
commit | 9d4da69ffd622e126c4d18af8dfe6f747188d01a (patch) | |
tree | 3d4f1bf7f252052cc009345d3225b982cb507c95 | |
parent | Annotate ThreadPool (diff) | |
download | synapse-9d4da69ffd622e126c4d18af8dfe6f747188d01a.tar.xz |
Annotate FakeTransport
-rw-r--r-- | tests/server.py | 54 |
1 files changed, 24 insertions, 30 deletions
diff --git a/tests/server.py b/tests/server.py index 2d05077268..fa3cc51359 100644 --- a/tests/server.py +++ b/tests/server.py @@ -594,7 +594,7 @@ def get_clock() -> Tuple[ThreadedMemoryReactorClock, Clock]: @implementer(ITransport) -@attr.s(cmp=False) +@attr.s(cmp=False, auto_attribs=True) class FakeTransport: """ A twisted.internet.interfaces.ITransport implementation which sends all its data @@ -609,35 +609,29 @@ class FakeTransport: If you want bidirectional communication, you'll need two instances. """ - other = attr.ib() - """The Protocol object which will receive any data written to this transport. + other: IProtocol + """The Protocol object which will receive any data written to this transport.""" - :type: twisted.internet.interfaces.IProtocol - """ - - _reactor = attr.ib() - """Test reactor - - :type: twisted.internet.interfaces.IReactorTime - """ + _reactor: IReactorTime + """Test reactor """ - _protocol = attr.ib(default=None) + _protocol: Optional[IProtocol] = None """The Protocol which is producing data for this transport. Optional, but if set will get called back for connectionLost() notifications etc. """ - _peer_address: Optional[IAddress] = attr.ib(default=None) + _peer_address: Optional[IAddress] = None """The value to be returned by getPeer""" - _host_address: Optional[IAddress] = attr.ib(default=None) + _host_address: Optional[IAddress] = None """The value to be returned by getHost""" - disconnecting = False - disconnected = False - connected = True - buffer = attr.ib(default=b"") - producer = attr.ib(default=None) - autoflush = attr.ib(default=True) + disconnecting: bool = False + disconnected: bool = False + connected: bool = True + buffer: bytes = b"" + producer: Optional[IPushProducer] = None + autoflush: bool = True def getPeer(self) -> Optional[IAddress]: return self._peer_address @@ -645,7 +639,7 @@ class FakeTransport: def getHost(self) -> Optional[IAddress]: return self._host_address - def loseConnection(self, reason=None): + def loseConnection(self, reason: Optional[Failure] = None) -> None: if not self.disconnecting: logger.info("FakeTransport: loseConnection(%s)", reason) self.disconnecting = True @@ -661,7 +655,7 @@ class FakeTransport: self.connected = False self.disconnected = True - def abortConnection(self): + def abortConnection(self) -> None: logger.info("FakeTransport: abortConnection()") if not self.disconnecting: @@ -671,28 +665,28 @@ class FakeTransport: self.disconnected = True - def pauseProducing(self): + def pauseProducing(self) -> None: if not self.producer: return self.producer.pauseProducing() - def resumeProducing(self): + def resumeProducing(self) -> None: if not self.producer: return self.producer.resumeProducing() - def unregisterProducer(self): + def unregisterProducer(self) -> None: if not self.producer: return self.producer = None - def registerProducer(self, producer, streaming): + def registerProducer(self, producer: IPushProducer, streaming: bool) -> None: self.producer = producer self.producerStreaming = streaming - def _produce(): + def _produce() -> None: if not self.producer: # we've been unregistered return @@ -704,7 +698,7 @@ class FakeTransport: if not streaming: self._reactor.callLater(0.0, _produce) - def write(self, byt): + def write(self, byt: bytes) -> None: if self.disconnecting: raise Exception("Writing to disconnecting FakeTransport") @@ -716,11 +710,11 @@ class FakeTransport: if self.autoflush: self._reactor.callLater(0.0, self.flush) - def writeSequence(self, seq): + def writeSequence(self, seq: Iterable[bytes]) -> None: for x in seq: self.write(x) - def flush(self, maxbytes=None): + def flush(self, maxbytes: Optional[int] = None) -> None: if not self.buffer: # nothing to do. Don't write empty buffers: it upsets the # TLSMemoryBIOProtocol |