summary refs log tree commit diff
diff options
context:
space:
mode:
authorPatrick Cloke <patrickc@matrix.org>2022-05-26 16:06:30 -0400
committerPatrick Cloke <patrickc@matrix.org>2022-06-13 09:57:05 -0400
commitf43e0b4b1a2eb77a9365276fff8b49e25f65e0fd (patch)
tree1db485f8ba94717623db2602b77583f6c2b23758
parentAccept an optional starting event ID for the /receipts API. (diff)
downloadsynapse-f43e0b4b1a2eb77a9365276fff8b49e25f65e0fd.tar.xz
Add a ranged receipts table and insert into it.
-rw-r--r--synapse/storage/databases/main/purge_events.py1
-rw-r--r--synapse/storage/databases/main/receipts.py80
-rw-r--r--synapse/storage/schema/main/delta/70/02ranged_read_receipts.sql32
3 files changed, 72 insertions, 41 deletions
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index ba385f9fc4..f51372810c 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -417,6 +417,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
             # "rooms" happens last, to keep the foreign keys in the other tables
             # happy
             "rooms",
+            "receipts_ranged",
         ):
             logger.info("[purge] removing %s from %s", room_id, table)
             txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index d318753650..7c586cf8f3 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -604,7 +604,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
         room_id: str,
         receipt_type: str,
         user_id: str,
-        event_id: str,
+        start_event_id: Optional[str],
+        end_event_id: str,
         data: JsonDict,
         stream_id: int,
     ) -> Optional[int]:
@@ -617,37 +618,31 @@ class ReceiptsWorkerStore(SQLBaseStore):
         """
         assert self._can_write_to_receipts
 
+        if start_event_id is not None:
+            res = self.db_pool.simple_select_one_onecol_txn(
+                txn,
+                table="events",
+                # XXX Use topo ordering
+                retcol="stream_ordering",
+                keyvalues={"event_id": start_event_id},
+                allow_none=True,
+            )
+            start_stream_ordering = int(res) if res else None
+        else:
+            start_stream_ordering = None
+
         res = self.db_pool.simple_select_one_txn(
             txn,
             table="events",
+            # XXX Use topo ordering
             retcols=["stream_ordering", "received_ts"],
-            keyvalues={"event_id": event_id},
+            keyvalues={"event_id": end_event_id},
             allow_none=True,
         )
-
-        stream_ordering = int(res["stream_ordering"]) if res else None
+        end_stream_ordering = int(res["stream_ordering"]) if res else None
+        # XXX This is just for logging in the caller, can it be removed.
         rx_ts = res["received_ts"] if res else 0
 
-        # We don't want to clobber receipts for more recent events, so we
-        # have to compare orderings of existing receipts
-        if stream_ordering is not None:
-            sql = (
-                "SELECT stream_ordering, event_id FROM events"
-                " INNER JOIN receipts_linearized AS r USING (event_id, room_id)"
-                " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
-            )
-            txn.execute(sql, (room_id, receipt_type, user_id))
-
-            for so, eid in txn:
-                if int(so) >= stream_ordering:
-                    logger.debug(
-                        "Ignoring new receipt for %s in favour of existing "
-                        "one for later event %s",
-                        event_id,
-                        eid,
-                    )
-                    return None
-
         txn.call_after(
             self.invalidate_caches_for_receipt, room_id, receipt_type, user_id
         )
@@ -656,33 +651,33 @@ class ReceiptsWorkerStore(SQLBaseStore):
             self._receipts_stream_cache.entity_has_changed, room_id, stream_id
         )
 
-        self.db_pool.simple_upsert_txn(
+        # Splat the receipt into the table.
+        # XXX This might overlap other ranges, should coalesce.
+        self.db_pool.simple_insert_txn(
             txn,
-            table="receipts_linearized",
-            keyvalues={
+            table="receipts_ranged",
+            values={
                 "room_id": room_id,
-                "receipt_type": receipt_type,
                 "user_id": user_id,
-            },
-            values={
+                "receipt_type": receipt_type,
+                "start_event_id": start_event_id,
+                "end_event_id": end_event_id,
                 "stream_id": stream_id,
-                "event_id": event_id,
                 "data": json_encoder.encode(data),
             },
-            # receipts_linearized has a unique constraint on
-            # (user_id, room_id, receipt_type), so no need to lock
-            lock=False,
         )
 
+        # XXX How do we migrate receipts_linearized or do we use one of non-ranged receipts?
+
         # When updating a local users read receipt, remove any push actions
         # which resulted from the receipt's event and all earlier events.
         if (
             self.hs.is_mine_id(user_id)
             and receipt_type in (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE)
-            and stream_ordering is not None
+            and end_stream_ordering is not None
         ):
             self._remove_old_push_actions_before_txn(  # type: ignore[attr-defined]
-                txn, room_id=room_id, user_id=user_id, stream_ordering=stream_ordering
+                txn, room_id=room_id, user_id=user_id, stream_ordering=end_stream_ordering
             )
 
         return rx_ts
@@ -742,18 +737,20 @@ class ReceiptsWorkerStore(SQLBaseStore):
             if not event_ids:
                 return None
 
+            start_event_id = None
             if len(event_ids) == 1:
-                linearized_event_id = event_ids[0]
+                end_event_id = event_ids[0]
             else:
                 # we need to points in graph -> linearized form.
-                linearized_event_id = await self.db_pool.runInteraction(
+                end_event_id = await self.db_pool.runInteraction(
                     "insert_receipt_conv",
                     self._graph_to_linear,
                     receipt.room_id,
                     event_ids,
                 )
         elif isinstance(receipt, RangedReadReceipt):
-            linearized_event_id = receipt.end_event_id
+            start_event_id = receipt.start_event_id
+            end_event_id = receipt.end_event_id
         else:
             raise ValueError("Unexpected receipt type: %s", type(receipt))
 
@@ -764,7 +761,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
                 receipt.room_id,
                 receipt.receipt_type,
                 receipt.user_id,
-                linearized_event_id,
+                start_event_id,
+                end_event_id,
                 receipt.data,
                 stream_id=stream_id,
                 # Read committed is actually beneficial here because we check for a receipt with
@@ -780,7 +778,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
         now = self._clock.time_msec()
         logger.debug(
             "RR for event %s in %s (%i ms old)",
-            linearized_event_id,
+            end_event_id,  # XXX log start?
             receipt.room_id,
             now - event_ts,
         )
diff --git a/synapse/storage/schema/main/delta/70/02ranged_read_receipts.sql b/synapse/storage/schema/main/delta/70/02ranged_read_receipts.sql
new file mode 100644
index 0000000000..8ae0865f14
--- /dev/null
+++ b/synapse/storage/schema/main/delta/70/02ranged_read_receipts.sql
@@ -0,0 +1,32 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE receipts_ranged (
+    stream_id bigint NOT NULL,
+    room_id text NOT NULL,
+    receipt_type text NOT NULL,
+    user_id text NOT NULL,
+    -- A null start means "everything before this".
+    start_event_id text,
+    end_event_id text NOT NULL,
+    data text NOT NULL,
+    instance_name text
+);
+
+
+CREATE INDEX receipts_ranged_id ON receipts_ranged (stream_id);
+CREATE INDEX receipts_ranged_room_type_user ON receipts_ranged (room_id, receipt_type, user_id);
+CREATE INDEX receipts_ranged_room_stream ON receipts_ranged (room_id, stream_id);
+CREATE INDEX receipts_ranged_user ON receipts_ranged (user_id);