diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2019-03-04 19:04:05 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-04 19:04:05 +0000 |
commit | 48583cef7e8d6c994a139ad7ae7ba3f83c2c2c79 (patch) | |
tree | 98184aec7c6ca1c28d61c3c359d028753677768a /synapse | |
parent | Merge pull request #4797 from matrix-org/rav/inline_rr_send (diff) | |
parent | Add some debug about processing read receipts. (diff) | |
download | synapse-48583cef7e8d6c994a139ad7ae7ba3f83c2c2c79.tar.xz |
Merge pull request #4798 from matrix-org/rav/rr_debug
Add some debug about processing read receipts.
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/storage/receipts.py | 26 |
1 files changed, 20 insertions, 6 deletions
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 0ac665e967..0fd1ccc40a 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -346,15 +346,23 @@ class ReceiptsStore(ReceiptsWorkerStore): def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, user_id, event_id, data, stream_id): + """Inserts a read-receipt into the database if it's newer than the current RR + + Returns: int|None + None if the RR is older than the current RR + otherwise, the rx timestamp of the event that the RR corresponds to + (or 0 if the event is unknown) + """ res = self._simple_select_one_txn( txn, table="events", - retcols=["topological_ordering", "stream_ordering"], + retcols=["stream_ordering", "received_ts"], keyvalues={"event_id": event_id}, allow_none=True ) stream_ordering = int(res["stream_ordering"]) if res else None + rx_ts = res["received_ts"] if res else 0 # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts @@ -373,7 +381,7 @@ class ReceiptsStore(ReceiptsWorkerStore): "one for later event %s", event_id, eid, ) - return False + return None txn.call_after( self.get_receipts_for_room.invalidate, (room_id, receipt_type) @@ -429,7 +437,7 @@ class ReceiptsStore(ReceiptsWorkerStore): stream_ordering=stream_ordering, ) - return True + return rx_ts @defer.inlineCallbacks def insert_receipt(self, room_id, receipt_type, user_id, event_ids, data): @@ -466,7 +474,7 @@ class ReceiptsStore(ReceiptsWorkerStore): stream_id_manager = self._receipts_id_gen.get_next() with stream_id_manager as stream_id: - have_persisted = yield self.runInteraction( + event_ts = yield self.runInteraction( "insert_linearized_receipt", self.insert_linearized_receipt_txn, room_id, receipt_type, user_id, linearized_event_id, @@ -474,8 +482,14 @@ class ReceiptsStore(ReceiptsWorkerStore): stream_id=stream_id, ) - if not have_persisted: - defer.returnValue(None) + if event_ts is None: + defer.returnValue(None) + + now = self._clock.time_msec() + logger.debug( + "RR for event %s in %s (%i ms old)", + linearized_event_id, room_id, now - event_ts, + ) yield self.insert_graph_receipt( room_id, receipt_type, user_id, event_ids, data |