diff options
Diffstat (limited to 'synapse/handlers/read_marker.py')
-rw-r--r-- | synapse/handlers/read_marker.py | 87 |
1 files changed, 48 insertions, 39 deletions
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 77164f3f49..021faff376 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -18,65 +18,74 @@ from ._base import BaseHandler from twisted.internet import defer from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.async import Linearizer from synapse.types import get_domain_from_id +from synapse.api.errors import SynapseError import logging - - logger = logging.getLogger(__name__) - class ReadMarkerHandler(BaseHandler): def __init__(self, hs): super(ReadMarkerHandler, self).__init__(hs) - self.server_name = hs.config.server_name self.store = hs.get_datastore() + self.read_marker_linearizer = Linearizer(name="read_marker") + self.notifier = hs.get_notifier() @defer.inlineCallbacks def received_client_read_marker(self, room_id, user_id, event_id): - """NEEDS DOC - """ + """Updates the read marker for a given user in a given room if the event ID given + is ahead in the stream relative to the current read marker. - room_id = read_marker["room_id"] - user_id = read_marker["user_id"] - event_id = read_marker["event_id"] + This uses a notifier to indicate that account data should be sent down /sync if + the read marker has changed. + """ # Get ordering for existing read marker - account_data = yield self.store.get_account_data_for_room(user_id, room_id) - existing_read_marker = account_data["m.read_marker"] + with (yield self.read_marker_linearizer.queue(room_id + "_" + user_id)): + account_data = yield self.store.get_account_data_for_room(user_id, room_id) + existing_read_marker = account_data["m.read_marker"] - if existing_read_marker: - # Get ordering for new read marker - res = self.store._simple_select_one_txn( - txn, - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": event_id}, - allow_none=True - ) - new_to = int(res["topological_ordering"]) if res else None - new_so = int(res["stream_ordering"]) if res else None + should_update = True - res = self.store._simple_select_one_txn( - txn, + res = yield self.store._simple_select_one( table="events", retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": existing_read_marker.content.marker}, + keyvalues={"event_id": event_id}, allow_none=True ) - existing_to = int(res["topological_ordering"]) if res else None - existing_so = int(res["stream_ordering"]) if res else None - - if new_to > existing_to: - return False - elif new_to == existing_to and new_so >= existing_so: - return False - # Update account data - content = { - "marker": event_id - } - yield self.store.add_account_data_to_room( - user_id, room_id, "m.read_marker", content - ) + if not res: + raise SynapseError(404, 'Event does not exist') + + if existing_read_marker: + new_to = int(res["topological_ordering"]) + new_so = int(res["stream_ordering"]) + + # Get ordering for existing read marker + res = yield self.store._simple_select_one( + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": existing_read_marker['marker']}, + allow_none=True + ) + existing_to = int(res["topological_ordering"]) if res else None + existing_so = int(res["stream_ordering"]) if res else None + + # Prevent updating if the existing marker is ahead in the stream + if existing_to > new_to: + should_update = False + elif existing_to == new_to and existing_so >= new_so: + should_update = False + + if should_update: + content = { + "marker": event_id + } + max_id = yield self.store.add_account_data_to_room( + user_id, room_id, "m.read_marker", content + ) + self.notifier.on_new_event( + "account_data_key", max_id, users=[user_id], rooms=[room_id] + ) |