summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/receipts.py67
1 files changed, 37 insertions, 30 deletions
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 709c69a926..dd183cebce 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -13,7 +13,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+from synapse.api.errors import NotFoundError
 from ._base import SQLBaseStore
 from .util.id_generators import StreamIdGenerator
 from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached
@@ -332,6 +332,41 @@ class ReceiptsStore(ReceiptsWorkerStore):
 
     def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
                                       user_id, event_id, data, stream_id):
+        res = self._simple_select_one_txn(
+            txn,
+            table="events",
+            retcols=["topological_ordering", "stream_ordering"],
+            keyvalues={"event_id": event_id},
+            allow_none=True
+        )
+
+        if not res:
+            raise NotFoundError(
+                "Cannot set read receipt on unknown event %s" % (
+                    event_id,
+                ),
+            )
+
+        stream_ordering = int(res["stream_ordering"])
+
+        # We don't want to clobber receipts for more recent events, so we
+        # have to compare orderings of existing receipts
+        sql = (
+            "SELECT stream_ordering, event_id FROM events"
+            " INNER JOIN receipts_linearized as r USING (event_id, room_id)"
+            " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
+        )
+        txn.execute(sql, (room_id, receipt_type, user_id))
+
+        for so, eid in txn:
+            if int(so) >= stream_ordering:
+                logger.debug(
+                    "Ignoring new receipt for %s in favour of existing "
+                    "one for later event %s",
+                    event_id, eid,
+                )
+                return False
+
         txn.call_after(
             self.get_receipts_for_room.invalidate, (room_id, receipt_type)
         )
@@ -355,34 +390,6 @@ class ReceiptsStore(ReceiptsWorkerStore):
             (user_id, room_id, receipt_type)
         )
 
-        res = self._simple_select_one_txn(
-            txn,
-            table="events",
-            retcols=["topological_ordering", "stream_ordering"],
-            keyvalues={"event_id": event_id},
-            allow_none=True
-        )
-
-        topological_ordering = int(res["topological_ordering"]) if res else None
-        stream_ordering = int(res["stream_ordering"]) if res else None
-
-        # We don't want to clobber receipts for more recent events, so we
-        # have to compare orderings of existing receipts
-        sql = (
-            "SELECT topological_ordering, stream_ordering, event_id FROM events"
-            " INNER JOIN receipts_linearized as r USING (event_id, room_id)"
-            " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
-        )
-
-        txn.execute(sql, (room_id, receipt_type, user_id))
-
-        if topological_ordering:
-            for to, so, _ in txn:
-                if int(to) > topological_ordering:
-                    return False
-                elif int(to) == topological_ordering and int(so) >= stream_ordering:
-                    return False
-
         self._simple_delete_txn(
             txn,
             table="receipts_linearized",
@@ -406,7 +413,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
             }
         )
 
-        if receipt_type == "m.read" and topological_ordering:
+        if receipt_type == "m.read":
             self._remove_old_push_actions_before_txn(
                 txn,
                 room_id=room_id,