diff options
author | Richard van der Hoff <richard@matrix.org> | 2019-03-04 18:18:11 +0000 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2019-03-04 18:19:40 +0000 |
commit | 2db49ea476ce3fdf32b14a2b376a508be2540da9 (patch) | |
tree | 98184aec7c6ca1c28d61c3c359d028753677768a /synapse | |
parent | Clean up read-receipt handling. (diff) | |
download | synapse-2db49ea476ce3fdf32b14a2b376a508be2540da9.tar.xz |
Add some debug about processing read receipts.
I'm hoping to establish which rooms are having lots of RRs sent for them, and how old the events are when they are sent.
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/storage/receipts.py | 26 |
1 files changed, 20 insertions, 6 deletions
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 0ac665e967..0fd1ccc40a 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -346,15 +346,23 @@ class ReceiptsStore(ReceiptsWorkerStore): def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, user_id, event_id, data, stream_id): + """Inserts a read-receipt into the database if it's newer than the current RR + + Returns: int|None + None if the RR is older than the current RR + otherwise, the rx timestamp of the event that the RR corresponds to + (or 0 if the event is unknown) + """ res = self._simple_select_one_txn( txn, table="events", - retcols=["topological_ordering", "stream_ordering"], + retcols=["stream_ordering", "received_ts"], keyvalues={"event_id": event_id}, allow_none=True ) stream_ordering = int(res["stream_ordering"]) if res else None + rx_ts = res["received_ts"] if res else 0 # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts @@ -373,7 +381,7 @@ class ReceiptsStore(ReceiptsWorkerStore): "one for later event %s", event_id, eid, ) - return False + return None txn.call_after( self.get_receipts_for_room.invalidate, (room_id, receipt_type) @@ -429,7 +437,7 @@ class ReceiptsStore(ReceiptsWorkerStore): stream_ordering=stream_ordering, ) - return True + return rx_ts @defer.inlineCallbacks def insert_receipt(self, room_id, receipt_type, user_id, event_ids, data): @@ -466,7 +474,7 @@ class ReceiptsStore(ReceiptsWorkerStore): stream_id_manager = self._receipts_id_gen.get_next() with stream_id_manager as stream_id: - have_persisted = yield self.runInteraction( + event_ts = yield self.runInteraction( "insert_linearized_receipt", self.insert_linearized_receipt_txn, room_id, receipt_type, user_id, linearized_event_id, @@ -474,8 +482,14 @@ class ReceiptsStore(ReceiptsWorkerStore): stream_id=stream_id, ) - if not have_persisted: - defer.returnValue(None) + if event_ts is None: + defer.returnValue(None) + + now = self._clock.time_msec() + logger.debug( + "RR for event %s in %s (%i ms old)", + linearized_event_id, room_id, now - event_ts, + ) yield self.insert_graph_receipt( room_id, receipt_type, user_id, event_ids, data |