From 31c15dcb80c8f11fd03dbb9b0ccff4777dc8e457 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 18 Sep 2018 18:17:15 +0100 Subject: Refactor matrixfederationclient to fix logging (#3906) We want to wait until we have read the response body before we log the request as complete, otherwise a confusing thing happens where the request appears to have completed, but we later fail it. To do this, we factor the salient details of a request out to a separate object, which can then keep track of the txn_id, so that it can be logged. --- tests/server.py | 81 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) (limited to 'tests/server.py') diff --git a/tests/server.py b/tests/server.py index 420ec4e088..ccea3baa55 100644 --- a/tests/server.py +++ b/tests/server.py @@ -280,3 +280,84 @@ def get_clock(): clock = ThreadedMemoryReactorClock() hs_clock = Clock(clock) return (clock, hs_clock) + + +@attr.s +class FakeTransport(object): + """ + A twisted.internet.interfaces.ITransport implementation which sends all its data + straight into an IProtocol object: it exists to connect two IProtocols together. + + To use it, instantiate it with the receiving IProtocol, and then pass it to the + sending IProtocol's makeConnection method: + + server = HTTPChannel() + client.makeConnection(FakeTransport(server, self.reactor)) + + If you want bidirectional communication, you'll need two instances. + """ + + other = attr.ib() + """The Protocol object which will receive any data written to this transport. + + :type: twisted.internet.interfaces.IProtocol + """ + + _reactor = attr.ib() + """Test reactor + + :type: twisted.internet.interfaces.IReactorTime + """ + + disconnecting = False + buffer = attr.ib(default=b'') + producer = attr.ib(default=None) + + def getPeer(self): + return None + + def getHost(self): + return None + + def loseConnection(self): + self.disconnecting = True + + def abortConnection(self): + self.disconnecting = True + + def pauseProducing(self): + self.producer.pauseProducing() + + def unregisterProducer(self): + if not self.producer: + return + + self.producer = None + + def registerProducer(self, producer, streaming): + self.producer = producer + self.producerStreaming = streaming + + def _produce(): + d = self.producer.resumeProducing() + d.addCallback(lambda x: self._reactor.callLater(0.1, _produce)) + + if not streaming: + self._reactor.callLater(0.0, _produce) + + def write(self, byt): + self.buffer = self.buffer + byt + + def _write(): + if getattr(self.other, "transport") is not None: + self.other.dataReceived(self.buffer) + self.buffer = b"" + return + + self._reactor.callLater(0.0, _write) + + _write() + + def writeSequence(self, seq): + for x in seq: + self.write(x) -- cgit 1.4.1