diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index 19decc2c63..c3882313e6 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -42,41 +42,17 @@ class ReadMarkerHandler(BaseHandler):
"""
# Get ordering for existing read marker
- with (yield self.read_marker_linearizer.queue(room_id + "_" + user_id)):
+ with (yield self.read_marker_linearizer.queue((room_id, user_id))):
account_data = yield self.store.get_account_data_for_room(user_id, room_id)
existing_read_marker = account_data["m.read_marker"]
should_update = True
- res = yield self.store._simple_select_one(
- table="events",
- retcols=["topological_ordering", "stream_ordering"],
- keyvalues={"event_id": event_id},
- allow_none=True
- )
-
- if not res:
- raise SynapseError(404, 'Event does not exist')
-
if existing_read_marker:
- new_to = int(res["topological_ordering"])
- new_so = int(res["stream_ordering"])
-
- # Get ordering for existing read marker
- res = yield self.store._simple_select_one(
- table="events",
- retcols=["topological_ordering", "stream_ordering"],
- keyvalues={"event_id": existing_read_marker['marker']},
- allow_none=True
+ should_update = yield self.store.is_event_after(
+ existing_read_marker['marker'],
+ event_id
)
- existing_to = int(res["topological_ordering"]) if res else None
- existing_so = int(res["stream_ordering"]) if res else None
-
- # Prevent updating if the existing marker is ahead in the stream
- if existing_to > new_to:
- should_update = False
- elif existing_to == new_to and existing_so >= new_so:
- should_update = False
if should_update:
content = {
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 64fe937bdc..3c6df5c2d2 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -2159,6 +2159,34 @@ class EventsStore(SQLBaseStore):
]
)
+ @defer.inlineCallbacks
+ def is_event_after(self, event_id1, event_id2):
+ is_after = True
+
+ to_1, so_1 = yield self._get_event_ordering(event_id1)
+ to_2, so_2 = yield self._get_event_ordering(event_id2)
+
+ # Prevent updating if the existing marker is ahead in the stream
+ if to_1 > to_2:
+ is_after = False
+ elif to_1 == to_2 and so_1 >= so_2:
+ is_after = False
+
+ defer.returnValue(is_after)
+
+ @defer.inlineCallbacks
+ def _get_event_ordering(self, event_id):
+ res = yield self._simple_select_one(
+ table="events",
+ retcols=["topological_ordering", "stream_ordering"],
+ keyvalues={"event_id": event_id},
+ allow_none=True
+ )
+
+ if not res:
+ raise SynapseError(404, "Could not find event %s" % (event_id,))
+
+ defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"])))
AllNewEventsResult = namedtuple("AllNewEventsResult", [
"new_forward_events", "new_backfill_events",
|