summary refs log tree commit diff
diff options
context:
space:
mode:
authorPatrick Cloke <patrickc@matrix.org>2022-06-02 12:46:42 -0400
committerPatrick Cloke <patrickc@matrix.org>2022-06-13 09:57:06 -0400
commita8a45921fb26c938446e5589947fb417a2b08d4e (patch)
treeb7fa90b43a6e77ceb27dba292827d58dcaffcc28
parentStart splitting receipts on new events. (diff)
downloadsynapse-a8a45921fb26c938446e5589947fb417a2b08d4e.tar.xz
Coalesce receipts.
-rw-r--r--rrr_test.py20
-rw-r--r--synapse/storage/databases/main/receipts.py101
2 files changed, 100 insertions, 21 deletions
diff --git a/rrr_test.py b/rrr_test.py
index e37a84c602..4df7fbf81b 100644
--- a/rrr_test.py
+++ b/rrr_test.py
@@ -150,7 +150,7 @@ def main():
 
     # Create a fork in the DAG.
     prev_message_id = first_message_id
-    for msg in range(1):
+    for msg in range(3):
         prev_message_id = _send_and_append(f"Fork 2 Message {msg}", prev_message_id)
     sleep(1)
     # # Join the forks.
@@ -159,15 +159,15 @@ def main():
     _sync_and_show(room_id)
 
     # User 1 sends another read receipt.
-    # print("@test reads everything")
-    # result = requests.post(
-    #     f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/receipt/m.read/{event_ids[-1]}/{event_ids[0]}",
-    #     headers=USER_1_HEADERS,
-    #     json={},
-    # )
-    # _check_for_status(result)
-
-    # _sync_and_show(room_id)
+    print("@test reads everything")
+    result = requests.post(
+        f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/receipt/m.read/{event_ids[-1]}/{event_ids[0]}",
+        headers=USER_1_HEADERS,
+        json={},
+    )
+    _check_for_status(result)
+
+    _sync_and_show(room_id)
 
 
 if __name__ == "__main__":
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 269a7521c6..4622e8910e 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -651,28 +651,34 @@ class ReceiptsWorkerStore(SQLBaseStore):
         """
         assert self._can_write_to_receipts
 
+        start_topo_ordering = None
+        start_stream_ordering = None
         if start_event_id is not None:
-            res = self.db_pool.simple_select_one_onecol_txn(
+            res = self.db_pool.simple_select_one_txn(
                 txn,
                 table="events",
-                # XXX Use topo ordering
-                retcol="stream_ordering",
+                retcols=("topological_ordering", "stream_ordering"),
                 keyvalues={"event_id": start_event_id},
                 allow_none=True,
             )
-            start_stream_ordering = int(res) if res else None
-        else:
-            start_stream_ordering = None
+            if res is not None:
+                start_topo_ordering = int(res["topological_ordering"])
+                start_stream_ordering = int(res["stream_ordering"])
 
         res = self.db_pool.simple_select_one_txn(
             txn,
             table="events",
-            # XXX Use topo ordering
-            retcols=["stream_ordering", "received_ts"],
+            retcols=("topological_ordering", "stream_ordering", "received_ts"),
             keyvalues={"event_id": end_event_id},
             allow_none=True,
         )
-        end_stream_ordering = int(res["stream_ordering"]) if res else None
+        end_topo_ordering = (
+            None  # XXX When is it valid to not find this event? Federation?
+        )
+        end_stream_ordering = None
+        if res is not None:
+            end_topo_ordering = int(res["topological_ordering"])
+            end_stream_ordering = int(res["stream_ordering"])
         # XXX This is just for logging in the caller, can it be removed.
         rx_ts = res["received_ts"] if res else 0
 
@@ -684,8 +690,80 @@ class ReceiptsWorkerStore(SQLBaseStore):
             self._receipts_stream_cache.entity_has_changed, room_id, stream_id
         )
 
-        # Splat the receipt into the table.
-        # XXX This might overlap other ranges, should coalesce.
+        # Find all overlapping or adjacent receipts. These receipts are found by
+        # searching for any receipts which:
+        #
+        # * Have an end topological ordering directly before or after the new
+        #   receipt's start topological ordering.
+        # * Have a start topological ordering directly after or before the new
+        #   receipt's end topological ordering.
+        #
+        # E.g. the following would be found:
+        #
+        # * [1, 7] and [8, 10] should be combined.
+        # * [1, 7] and [5, 10] should be combined.
+        # * [None, 7] and [5, 10] should be combined.
+        #
+        # XXX Do we care about stream ordering here?
+        #
+        # XXX This doesn't handle a start_topo_ordering of None.
+        sql = """
+        SELECT
+            stream_id,
+            start_event_id,
+            start_event.topological_ordering,
+            end_event_id,
+            end_event.topological_ordering
+        FROM receipts_ranged
+        LEFT JOIN events AS end_event ON (end_event.event_id = end_event_id)
+        LEFT JOIN events AS start_event ON (start_event.event_id = start_event_id)
+        WHERE
+            receipts_ranged.room_id = ? AND
+            user_id = ? AND
+            receipt_type = ? AND
+            end_event.topological_ordering >= ? AND
+            start_event.topological_ordering <= ?;
+        """
+        txn.execute(
+            sql,
+            (
+                room_id,
+                user_id,
+                receipt_type,
+                start_topo_ordering - 1 if start_topo_ordering is not None else None,
+                end_topo_ordering + 1,
+            ),
+        )
+        overlapping_receipts = txn.fetchall()
+        # Delete the overlapping receipts by stream ID.
+        self.db_pool.simple_delete_many_txn(
+            txn,
+            table="receipts_ranged",
+            column="stream_id",
+            values=[receipt[0] for receipt in overlapping_receipts],
+            keyvalues={},
+        )
+
+        # Potentially expand the start/end event based on overlapping receipts.
+        for (
+            _,
+            overlapping_start_event_id,
+            overlapping_start_topo_ordering,
+            overlapping_end_event_id,
+            overlapping_end_topo_ordering,
+        ) in overlapping_receipts:
+            if (
+                start_topo_ordering is not None
+                and overlapping_start_topo_ordering < start_topo_ordering
+            ):
+                start_topo_ordering = overlapping_start_topo_ordering
+                start_event_id = overlapping_start_event_id
+
+            if end_topo_ordering < overlapping_end_topo_ordering:
+                end_topo_ordering = overlapping_end_topo_ordering
+                end_event_id = overlapping_end_event_id
+
+        # Insert the new receipt into the table.
         self.db_pool.simple_insert_txn(
             txn,
             table="receipts_ranged",
@@ -716,6 +794,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
             and receipt_type in (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE)
             and (start_stream_ordering is not None or end_stream_ordering is not None)
         ):
+            # XXX Topo ordering?
             self._remove_old_push_actions_txn(  # type: ignore[attr-defined]
                 txn, room_id, user_id, end_stream_ordering, start_stream_ordering
             )