diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 709c69a926..dd183cebce 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -13,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+from synapse.api.errors import NotFoundError
from ._base import SQLBaseStore
from .util.id_generators import StreamIdGenerator
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached
@@ -332,6 +332,41 @@ class ReceiptsStore(ReceiptsWorkerStore):
def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
user_id, event_id, data, stream_id):
+ res = self._simple_select_one_txn(
+ txn,
+ table="events",
+ retcols=["topological_ordering", "stream_ordering"],
+ keyvalues={"event_id": event_id},
+ allow_none=True
+ )
+
+ if not res:
+ raise NotFoundError(
+ "Cannot set read receipt on unknown event %s" % (
+ event_id,
+ ),
+ )
+
+ stream_ordering = int(res["stream_ordering"])
+
+ # We don't want to clobber receipts for more recent events, so we
+ # have to compare orderings of existing receipts
+ sql = (
+ "SELECT stream_ordering, event_id FROM events"
+ " INNER JOIN receipts_linearized as r USING (event_id, room_id)"
+ " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
+ )
+ txn.execute(sql, (room_id, receipt_type, user_id))
+
+ for so, eid in txn:
+ if int(so) >= stream_ordering:
+ logger.debug(
+ "Ignoring new receipt for %s in favour of existing "
+ "one for later event %s",
+ event_id, eid,
+ )
+ return False
+
txn.call_after(
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
)
@@ -355,34 +390,6 @@ class ReceiptsStore(ReceiptsWorkerStore):
(user_id, room_id, receipt_type)
)
- res = self._simple_select_one_txn(
- txn,
- table="events",
- retcols=["topological_ordering", "stream_ordering"],
- keyvalues={"event_id": event_id},
- allow_none=True
- )
-
- topological_ordering = int(res["topological_ordering"]) if res else None
- stream_ordering = int(res["stream_ordering"]) if res else None
-
- # We don't want to clobber receipts for more recent events, so we
- # have to compare orderings of existing receipts
- sql = (
- "SELECT topological_ordering, stream_ordering, event_id FROM events"
- " INNER JOIN receipts_linearized as r USING (event_id, room_id)"
- " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
- )
-
- txn.execute(sql, (room_id, receipt_type, user_id))
-
- if topological_ordering:
- for to, so, _ in txn:
- if int(to) > topological_ordering:
- return False
- elif int(to) == topological_ordering and int(so) >= stream_ordering:
- return False
-
self._simple_delete_txn(
txn,
table="receipts_linearized",
@@ -406,7 +413,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
}
)
- if receipt_type == "m.read" and topological_ordering:
+ if receipt_type == "m.read":
self._remove_old_push_actions_before_txn(
txn,
room_id=room_id,
|