From 8f8798bc0d572af103274fc07d3adac67ce7f51a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Jul 2016 15:30:25 +0100 Subject: Add ReadWriteLock for pagination and history prune --- synapse/handlers/message.py | 70 +++++++++++++++++++++++---------------------- synapse/storage/stream.py | 4 +-- 2 files changed, 38 insertions(+), 36 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 878809d50d..ad2753c1b5 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -26,7 +26,7 @@ from synapse.types import ( UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id ) from synapse.util import unwrapFirstError -from synapse.util.async import concurrently_execute, run_on_reactor +from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock from synapse.util.caches.snapshot_cache import SnapshotCache from synapse.util.logcontext import preserve_fn from synapse.visibility import filter_events_for_client @@ -50,6 +50,8 @@ class MessageHandler(BaseHandler): self.validator = EventValidator() self.snapshot_cache = SnapshotCache() + self.pagination_lock = ReadWriteLock() + @defer.inlineCallbacks def purge_history(self, room_id, event_id): event = yield self.store.get_event(event_id) @@ -59,9 +61,8 @@ class MessageHandler(BaseHandler): depth = event.depth - # TODO: Lock. - - yield self.store.delete_old_state(room_id, depth) + with (yield self.pagination_lock.write(room_id)): + yield self.store.delete_old_state(room_id, depth) @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, @@ -98,42 +99,43 @@ class MessageHandler(BaseHandler): source_config = pagin_config.get_source_config("room") - membership, member_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id - ) + with (yield self.pagination_lock.read(room_id)): + membership, member_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id + ) - if source_config.direction == 'b': - # if we're going backwards, we might need to backfill. This - # requires that we have a topo token. - if room_token.topological: - max_topo = room_token.topological - else: - max_topo = yield self.store.get_max_topological_token_for_stream_and_room( - room_id, room_token.stream - ) + if source_config.direction == 'b': + # if we're going backwards, we might need to backfill. This + # requires that we have a topo token. + if room_token.topological: + max_topo = room_token.topological + else: + max_topo = yield self.store.get_max_topological_token( + room_id, room_token.stream + ) + + if membership == Membership.LEAVE: + # If they have left the room then clamp the token to be before + # they left the room, to save the effort of loading from the + # database. + leave_token = yield self.store.get_topological_token_for_event( + member_event_id + ) + leave_token = RoomStreamToken.parse(leave_token) + if leave_token.topological < max_topo: + source_config.from_key = str(leave_token) - if membership == Membership.LEAVE: - # If they have left the room then clamp the token to be before - # they left the room, to save the effort of loading from the - # database. - leave_token = yield self.store.get_topological_token_for_event( - member_event_id + yield self.hs.get_handlers().federation_handler.maybe_backfill( + room_id, max_topo ) - leave_token = RoomStreamToken.parse(leave_token) - if leave_token.topological < max_topo: - source_config.from_key = str(leave_token) - yield self.hs.get_handlers().federation_handler.maybe_backfill( - room_id, max_topo + events, next_key = yield data_source.get_pagination_rows( + requester.user, source_config, room_id ) - events, next_key = yield data_source.get_pagination_rows( - requester.user, source_config, room_id - ) - - next_token = pagin_config.from_token.copy_and_replace( - "room_key", next_key - ) + next_token = pagin_config.from_token.copy_and_replace( + "room_key", next_key + ) if not events: defer.returnValue({ diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index b9ad965fd6..3dda2dab55 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -487,13 +487,13 @@ class StreamStore(SQLBaseStore): row["topological_ordering"], row["stream_ordering"],) ) - def get_max_topological_token_for_stream_and_room(self, room_id, stream_key): + def get_max_topological_token(self, room_id, stream_key): sql = ( "SELECT max(topological_ordering) FROM events" " WHERE room_id = ? AND stream_ordering < ?" ) return self._execute( - "get_max_topological_token_for_stream_and_room", None, + "get_max_topological_token", None, sql, room_id, stream_key, ).addCallback( lambda r: r[0][0] if r else 0 -- cgit 1.4.1