From b97f6626b6f9b91498d06a7ae113b9d20f1fc2ef Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Jan 2016 09:54:30 +0000 Subject: Add cache to room stream --- synapse/storage/stream.py | 133 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index e31bad258a..3a32a0019a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -37,6 +37,7 @@ from twisted.internet import defer from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.room_change_cache import RoomStreamChangeCache from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logutils import log_function @@ -77,6 +78,12 @@ def upper_bound(token): class StreamStore(SQLBaseStore): + def __init__(self, hs): + super(StreamStore, self).__init__(hs) + + self._events_stream_cache = RoomStreamChangeCache( + "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None) + ) @defer.inlineCallbacks def get_appservice_room_stream(self, service, from_key, to_key, limit=0): @@ -157,6 +164,132 @@ class StreamStore(SQLBaseStore): results = yield self.runInteraction("get_appservice_room_stream", f) defer.returnValue(results) + @defer.inlineCallbacks + def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0): + from_id = RoomStreamToken.parse_stream_token(from_key).stream + + room_ids = yield self._events_stream_cache.get_rooms_changed( + self, room_ids, from_id + ) + + if not room_ids: + defer.returnValue({}) + + results = {} + room_ids = list(room_ids) + for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)): + res = yield defer.gatherResults([ + self.get_recent_room_events_stream_for_room( + room_id, from_key, to_key, limit + ).addCallback(lambda r, rm: (rm, r), room_id) + for room_id in room_ids + ]) + results.update(dict(res)) + + defer.returnValue(results) + + @defer.inlineCallbacks + def get_recent_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0): + if from_key is not None: + from_id = RoomStreamToken.parse_stream_token(from_key).stream + else: + from_id = None + to_id = RoomStreamToken.parse_stream_token(to_key).stream + + if from_key == to_key: + defer.returnValue(([], from_key)) + + has_changed = yield self._events_stream_cache.get_room_has_changed( + room_id, from_id + ) + + if not has_changed: + defer.returnValue(([], from_key)) + + def f(txn): + if from_id is not None: + sql = ( + "SELECT event_id, stream_ordering FROM events WHERE" + " room_id = ?" + " AND not outlier" + " AND stream_ordering > ? AND stream_ordering <= ?" + " ORDER BY stream_ordering DESC LIMIT ?" + ) + txn.execute(sql, (room_id, from_id, to_id, limit)) + else: + sql = ( + "SELECT event_id, stream_ordering FROM events WHERE" + " room_id = ?" + " AND not outlier" + " AND stream_ordering <= ?" + " ORDER BY stream_ordering DESC LIMIT ?" + ) + txn.execute(sql, (room_id, to_id, limit)) + + rows = self.cursor_to_dict(txn) + + ret = self._get_events_txn( + txn, + [r["event_id"] for r in rows], + get_prev_content=True + ) + + ret.reverse() + + self._set_before_and_after(ret, rows) + + if rows: + key = "s%d" % min(r["stream_ordering"] for r in rows) + else: + # Assume we didn't get anything because there was nothing to + # get. + key = from_key + + return ret, key + res = yield self.runInteraction("get_recent_room_events_stream_for_room", f) + defer.returnValue(res) + + def get_room_changes_for_user(self, user_id, from_key, to_key): + if from_key is not None: + from_id = RoomStreamToken.parse_stream_token(from_key).stream + else: + from_id = None + to_id = RoomStreamToken.parse_stream_token(to_key).stream + + if from_key == to_key: + return defer.succeed([]) + + def f(txn): + if from_id is not None: + sql = ( + "SELECT m.event_id, stream_ordering FROM events AS e, room_memberships AS m" + " WHERE e.event_id = m.event_id" + " AND m.user_id = ?" + " AND e.stream_ordering > ? AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + ) + txn.execute(sql, (user_id, from_id, to_id,)) + else: + sql = ( + "SELECT m.event_id, stream_ordering FROM events AS e, room_memberships AS m" + " WHERE e.event_id = m.event_id" + " AND m.user_id = ?" + " AND stream_ordering <= ?" + " ORDER BY stream_ordering ASC" + ) + txn.execute(sql, (user_id, to_id,)) + rows = self.cursor_to_dict(txn) + + ret = self._get_events_txn( + txn, + [r["event_id"] for r in rows], + get_prev_content=True + ) + + return ret + + return self.runInteraction("get_room_changes_for_user", f) + @log_function def get_room_events_stream( self, -- cgit 1.4.1 From aca3193efb8c5f9f20049f61c96e5ff12f328b05 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Jan 2016 17:06:52 +0000 Subject: Use the same path for incremental with gap or without gap --- synapse/handlers/sync.py | 352 +++++++++++++++++++--------------------------- synapse/storage/events.py | 1 - synapse/storage/stream.py | 6 +- 3 files changed, 147 insertions(+), 212 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 1fdf978313..f5e20d6a6e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -72,7 +72,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ ) -class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ +class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [ "room_id", # str "timeline", # TimelineBatch "state", # dict[(str, str), FrozenEvent] @@ -429,44 +429,20 @@ class SyncHandler(BaseHandler): defer.returnValue((now_token, ephemeral_by_room)) - @defer.inlineCallbacks def full_state_sync_for_archived_room(self, room_id, sync_config, leave_event_id, leave_token, timeline_since_token, tags_by_room, account_data_by_room): """Sync a room for a client which is starting without any state Returns: - A Deferred JoinedSyncResult. + A Deferred ArchivedSyncResult. """ - batch = yield self.load_filtered_recents( - room_id, sync_config, leave_token, since_token=timeline_since_token + return self.incremental_sync_for_archived_room( + sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room, + account_data_by_room, full_state=True, leave_token=leave_token, ) - leave_state = yield self.store.get_state_for_event(leave_event_id) - - leave_state = { - (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state( - leave_state.values() - ) - } - - account_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) - - account_data = sync_config.filter_collection.filter_room_account_data( - account_data - ) - - defer.returnValue(ArchivedSyncResult( - room_id=room_id, - timeline=batch, - state=leave_state, - account_data=account_data, - )) - @defer.inlineCallbacks def incremental_sync_with_gap(self, sync_config, since_token): """ Get the incremental delta needed to bring the client up to @@ -512,173 +488,127 @@ class SyncHandler(BaseHandler): sync_config.user ) + user_id = sync_config.user.to_string() + timeline_limit = sync_config.filter_collection.timeline_limit() tags_by_room = yield self.store.get_updated_tags( - sync_config.user.to_string(), + user_id, since_token.account_data_key, ) account_data, account_data_by_room = ( yield self.store.get_updated_account_data_for_user( - sync_config.user.to_string(), + user_id, since_token.account_data_key, ) ) + # Get a list of membership change events that have happened. rooms_changed = yield self.store.get_room_changes_for_user( - sync_config.user.to_string(), since_token.room_key, now_token.room_key + user_id, since_token.room_key, now_token.room_key ) + mem_change_events_by_room_id = {} + for event in rooms_changed: + mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) + + newly_joined_rooms = [] + archived = [] + invited = [] + for room_id, events in mem_change_events_by_room_id.items(): + non_joins = [e for e in events if e.membership != Membership.JOIN] + has_join = len(non_joins) != len(events) + + # We want to figure out if we joined the room at some point since + # the last sync (even if we have since left). This is to make sure + # we do send down the room, and with full state, where necessary + if room_id in joined_room_ids or has_join: + old_state = yield self.get_state_at(room_id, since_token) + old_mem_ev = old_state.get((EventTypes.Member, user_id), None) + if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: + newly_joined_rooms.append(room_id) + + if room_id in joined_room_ids: + continue + + if not non_joins: + continue + + # Only bother if we're still currently invited + should_invite = non_joins[-1].membership == Membership.INVITE + if should_invite: + room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) + if room_sync: + invited.append(room_sync) + + # Always include leave/ban events. Just take the last one. + # TODO: How do we handle ban -> leave in same batch? + leave_events = [ + e for e in non_joins + if e.membership in (Membership.LEAVE, Membership.BAN) + ] + + if leave_events: + leave_event = leave_events[-1] + room_sync = yield self.incremental_sync_for_archived_room( + sync_config, room_id, leave_event.event_id, since_token, + tags_by_room, account_data_by_room, + full_state=room_id in newly_joined_rooms + ) + if room_sync: + archived.append(room_sync) + + # Get all events for rooms we're currently joined to. room_to_events = yield self.store.get_room_events_stream_for_rooms( - room_ids=room_ids, + room_ids=joined_room_ids, from_key=since_token.room_key, to_key=now_token.room_key, limit=timeline_limit + 1, ) - room_events = [ - event - for events, _ in room_to_events.values() - for event in events - ] - - room_events.extend(rooms_changed) - - # room_events, _ = yield self.store.get_room_events_stream( - # sync_config.user.to_string(), - # from_key=since_token.room_key, - # to_key=now_token.room_key, - # limit=timeline_limit + 1, - # ) - joined = [] - archived = [] - if len(room_events) <= timeline_limit: - # There is no gap in any of the rooms. Therefore we can just - # partition the new events by room and return them. - logger.debug("Got %i events for incremental sync - not limited", - len(room_events)) - - invite_events = [] - leave_events = [] - events_by_room_id = {} - for event in room_events: - events_by_room_id.setdefault(event.room_id, []).append(event) - if event.room_id not in joined_room_ids: - if (event.type == EventTypes.Member - and event.state_key == sync_config.user.to_string()): - if event.membership == Membership.INVITE: - invite_events.append(event) - elif event.membership in (Membership.LEAVE, Membership.BAN): - leave_events.append(event) - - for room_id in joined_room_ids: - recents = events_by_room_id.get(room_id, []) - logger.debug("Events for room %s: %r", room_id, recents) - state = { - (event.type, event.state_key): event - for event in recents if event.is_state()} - limited = False + # We loop through all room ids, even if there are no new events, in case + # there are non room events taht we need to notify about. + for room_id in joined_room_ids: + room_entry = room_to_events.get(room_id, None) - if recents: - prev_batch = now_token.copy_and_replace( - "room_key", recents[0].internal_metadata.before - ) - else: - prev_batch = now_token - - just_joined = yield self.check_joined_room(sync_config, state) - if just_joined: - logger.debug("User has just joined %s: needs full state", - room_id) - state = yield self.get_state_at(room_id, now_token) - # the timeline is inherently limited if we've just joined - limited = True - - recents = sync_config.filter_collection.filter_room_timeline(recents) - - state = { - (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state( - state.values() - ) - } - - acc_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) + if room_entry: + events, start_key = room_entry - acc_data = sync_config.filter_collection.filter_room_account_data( - acc_data - ) + prev_batch_token = now_token.copy_and_replace("room_key", start_key) - ephemeral = sync_config.filter_collection.filter_room_ephemeral( - ephemeral_by_room.get(room_id, []) - ) + newly_joined_room = room_id in newly_joined_rooms + full_state = newly_joined_room - room_sync = JoinedSyncResult( - room_id=room_id, - timeline=TimelineBatch( - events=recents, - prev_batch=prev_batch, - limited=limited, - ), - state=state, - ephemeral=ephemeral, - account_data=acc_data, - unread_notifications={}, + batch = yield self.load_filtered_recents( + room_id, sync_config, prev_batch_token, + since_token=since_token, + recents=events, + newly_joined_room=newly_joined_room, ) - logger.debug("Result for room %s: %r", room_id, room_sync) - - if room_sync: - notifs = yield self.unread_notifs_for_room_id( - room_id, sync_config, all_ephemeral_by_room - ) - - if notifs is not None: - notif_dict = room_sync.unread_notifications - notif_dict["notification_count"] = len(notifs) - notif_dict["highlight_count"] = len([ - 1 for notif in notifs - if _action_has_highlight(notif["actions"]) - ]) - - joined.append(room_sync) - - else: - logger.debug("Got %i events for incremental sync - hit limit", - len(room_events)) - - invite_events = yield self.store.get_invites_for_user( - sync_config.user.to_string() - ) - - leave_events = yield self.store.get_leave_and_ban_events_for_user( - sync_config.user.to_string() - ) - - for room_id in joined_room_ids: - room_sync = yield self.incremental_sync_with_gap_for_room( - room_id, sync_config, since_token, now_token, - ephemeral_by_room, tags_by_room, account_data_by_room, - all_ephemeral_by_room=all_ephemeral_by_room, + else: + batch = TimelineBatch( + events=[], + prev_batch=since_token, + limited=False, ) - if room_sync: - joined.append(room_sync) + full_state = False - for leave_event in leave_events: - room_sync = yield self.incremental_sync_for_archived_room( - sync_config, leave_event, since_token, tags_by_room, - account_data_by_room + room_sync = yield self.incremental_sync_with_gap_for_room( + room_id=room_id, + sync_config=sync_config, + since_token=since_token, + now_token=now_token, + ephemeral_by_room=ephemeral_by_room, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + all_ephemeral_by_room=all_ephemeral_by_room, + batch=batch, + full_state=full_state, ) if room_sync: - archived.append(room_sync) - - invited = [ - InvitedSyncResult(room_id=event.room_id, invite=event) - for event in invite_events - ] + joined.append(room_sync) account_data_for_user = sync_config.filter_collection.filter_account_data( self.account_data_for_user(account_data) @@ -699,12 +629,10 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def load_filtered_recents(self, room_id, sync_config, now_token, - since_token=None): + since_token=None, recents=None, newly_joined_room=False): """ :returns a Deferred TimelineBatch """ - limited = True - recents = [] filtering_factor = 2 timeline_limit = sync_config.filter_collection.timeline_limit() load_limit = max(timeline_limit * filtering_factor, 100) @@ -712,11 +640,27 @@ class SyncHandler(BaseHandler): room_key = now_token.room_key end_key = room_key + limited = recents is None or newly_joined_room or timeline_limit < len(recents) + + if recents is not None: + recents = sync_config.filter_collection.filter_room_timeline(recents) + recents = yield self._filter_events_for_client( + sync_config.user.to_string(), + recents, + is_peeking=sync_config.is_guest, + ) + else: + recents = [] + + since_key = None + if since_token and not newly_joined_room: + since_key = since_token.room_key + while limited and len(recents) < timeline_limit and max_repeat: - events, end_key = yield self.store.get_recent_room_events_stream_for_room( + events, end_key = yield self.store.get_room_events_stream_for_room( room_id, limit=load_limit + 1, - from_key=since_token.room_key if since_token else None, + from_key=since_key, to_key=end_key, ) loaded_recents = sync_config.filter_collection.filter_room_timeline(events) @@ -727,6 +671,7 @@ class SyncHandler(BaseHandler): ) loaded_recents.extend(recents) recents = loaded_recents + if len(events) <= load_limit: limited = False break @@ -742,7 +687,9 @@ class SyncHandler(BaseHandler): ) defer.returnValue(TimelineBatch( - events=recents, prev_batch=prev_batch_token, limited=limited + events=recents, + prev_batch=prev_batch_token, + limited=limited or newly_joined_room )) @defer.inlineCallbacks @@ -750,24 +697,8 @@ class SyncHandler(BaseHandler): since_token, now_token, ephemeral_by_room, tags_by_room, account_data_by_room, - all_ephemeral_by_room): - """ Get the incremental delta needed to bring the client up to date for - the room. Gives the client the most recent events and the changes to - state. - Returns: - A Deferred JoinedSyncResult - """ - logger.debug("Doing incremental sync for room %s between %s and %s", - room_id, since_token, now_token) - - # TODO(mjark): Check for redactions we might have missed. - - batch = yield self.load_filtered_recents( - room_id, sync_config, now_token, since_token, - ) - - logger.debug("Recents %r", batch) - + all_ephemeral_by_room, + batch, full_state=False): if batch.limited: current_state = yield self.get_state_at(room_id, now_token) @@ -832,43 +763,48 @@ class SyncHandler(BaseHandler): defer.returnValue(room_sync) @defer.inlineCallbacks - def incremental_sync_for_archived_room(self, sync_config, leave_event, + def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id, since_token, tags_by_room, - account_data_by_room): + account_data_by_room, full_state, + leave_token=None): """ Get the incremental delta needed to bring the client up to date for the archived room. Returns: A Deferred ArchivedSyncResult """ - stream_token = yield self.store.get_stream_token_for_event( - leave_event.event_id - ) + if not leave_token: + stream_token = yield self.store.get_stream_token_for_event( + leave_event_id + ) - leave_token = since_token.copy_and_replace("room_key", stream_token) + leave_token = since_token.copy_and_replace("room_key", stream_token) - if since_token.is_after(leave_token): + if since_token and since_token.is_after(leave_token): defer.returnValue(None) batch = yield self.load_filtered_recents( - leave_event.room_id, sync_config, leave_token, since_token, + room_id, sync_config, leave_token, since_token, ) logger.debug("Recents %r", batch) state_events_at_leave = yield self.store.get_state_for_event( - leave_event.event_id + leave_event_id ) - state_at_previous_sync = yield self.get_state_at( - leave_event.room_id, stream_position=since_token - ) + if not full_state: + state_at_previous_sync = yield self.get_state_at( + room_id, stream_position=since_token + ) - state_events_delta = yield self.compute_state_delta( - since_token=since_token, - previous_state=state_at_previous_sync, - current_state=state_events_at_leave, - ) + state_events_delta = yield self.compute_state_delta( + since_token=since_token, + previous_state=state_at_previous_sync, + current_state=state_events_at_leave, + ) + else: + state_events_delta = state_events_at_leave state_events_delta = { (e.type, e.state_key): e @@ -878,7 +814,7 @@ class SyncHandler(BaseHandler): } account_data = self.account_data_for_room( - leave_event.room_id, tags_by_room, account_data_by_room + room_id, tags_by_room, account_data_by_room ) account_data = sync_config.filter_collection.filter_room_account_data( @@ -886,7 +822,7 @@ class SyncHandler(BaseHandler): ) room_sync = ArchivedSyncResult( - room_id=leave_event.room_id, + room_id=room_id, timeline=batch, state=state_events_delta, account_data=account_data, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d96ea3a30e..0dd1daaa2e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -128,7 +128,6 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) - logger.info("Invalidating %r at %r", event.room_id, stream_ordering) self._events_stream_cache.room_has_changed(None, event.room_id, stream_ordering) except _RollbackButIsFineException: pass diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3a32a0019a..563e289c4e 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -179,7 +179,7 @@ class StreamStore(SQLBaseStore): room_ids = list(room_ids) for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)): res = yield defer.gatherResults([ - self.get_recent_room_events_stream_for_room( + self.get_room_events_stream_for_room( room_id, from_key, to_key, limit ).addCallback(lambda r, rm: (rm, r), room_id) for room_id in room_ids @@ -189,7 +189,7 @@ class StreamStore(SQLBaseStore): defer.returnValue(results) @defer.inlineCallbacks - def get_recent_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0): + def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0): if from_key is not None: from_id = RoomStreamToken.parse_stream_token(from_key).stream else: @@ -246,7 +246,7 @@ class StreamStore(SQLBaseStore): key = from_key return ret, key - res = yield self.runInteraction("get_recent_room_events_stream_for_room", f) + res = yield self.runInteraction("get_room_events_stream_for_room", f) defer.returnValue(res) def get_room_changes_for_user(self, user_id, from_key, to_key): -- cgit 1.4.1 From e7febf4fbb1f1beb11e7a03252f6844f84af7f30 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Jan 2016 17:11:04 +0000 Subject: PEP8 --- synapse/storage/events.py | 4 +++- synapse/storage/receipts.py | 2 -- synapse/storage/stream.py | 9 ++++++--- 3 files changed, 9 insertions(+), 6 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0dd1daaa2e..80187722ea 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -128,7 +128,9 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) - self._events_stream_cache.room_has_changed(None, event.room_id, stream_ordering) + self._events_stream_cache.room_has_changed( + None, event.room_id, stream_ordering + ) except _RollbackButIsFineException: pass diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index b7a4e77748..7118368d97 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -15,12 +15,10 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached -from synapse.util.caches import cache_counter, caches_by_name from synapse.util.caches.room_change_cache import RoomStreamChangeCache from twisted.internet import defer -from blist import sorteddict import logging import ujson as json diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 563e289c4e..0b22251790 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -262,7 +262,8 @@ class StreamStore(SQLBaseStore): def f(txn): if from_id is not None: sql = ( - "SELECT m.event_id, stream_ordering FROM events AS e, room_memberships AS m" + "SELECT m.event_id, stream_ordering FROM events AS e," + " room_memberships AS m" " WHERE e.event_id = m.event_id" " AND m.user_id = ?" " AND e.stream_ordering > ? AND e.stream_ordering <= ?" @@ -271,7 +272,8 @@ class StreamStore(SQLBaseStore): txn.execute(sql, (user_id, from_id, to_id,)) else: sql = ( - "SELECT m.event_id, stream_ordering FROM events AS e, room_memberships AS m" + "SELECT m.event_id, stream_ordering FROM events AS e," + " room_memberships AS m" " WHERE e.event_id = m.event_id" " AND m.user_id = ?" " AND stream_ordering <= ?" @@ -307,7 +309,8 @@ class StreamStore(SQLBaseStore): "SELECT c.room_id FROM history_visibility AS h" " INNER JOIN current_state_events AS c" " ON h.event_id = c.event_id" - " WHERE c.room_id IN (%s) AND h.history_visibility = 'world_readable'" % ( + " WHERE c.room_id IN (%s)" + " AND h.history_visibility = 'world_readable'" % ( ",".join(map(lambda _: "?", room_ids)) ) ) -- cgit 1.4.1 From ba8931829b0b601eb14049c92e0f21a10772576d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 11:34:17 +0000 Subject: Return correct type of token --- synapse/storage/stream.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 0b22251790..28721e6994 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -236,7 +236,7 @@ class StreamStore(SQLBaseStore): ret.reverse() - self._set_before_and_after(ret, rows) + self._set_before_and_after(ret, rows, topo_order=False) if rows: key = "s%d" % min(r["stream_ordering"] for r in rows) @@ -581,10 +581,13 @@ class StreamStore(SQLBaseStore): return rows[0][0] if rows else 0 @staticmethod - def _set_before_and_after(events, rows): + def _set_before_and_after(events, rows, topo_order=True): for event, row in zip(events, rows): stream = row["stream_ordering"] - topo = event.depth + if topo_order: + topo = event.depth + else: + topo = None internal = event.internal_metadata internal.before = str(RoomStreamToken(topo, stream - 1)) internal.after = str(RoomStreamToken(topo, stream)) -- cgit 1.4.1 From 4e7948b47a3f197682de82fc0cda07ebb08a581d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 11:52:34 +0000 Subject: Allow paginating backwards from stream token --- synapse/handlers/message.py | 15 +++++++++------ synapse/storage/stream.py | 16 ++++++++++++++-- tests/rest/client/v1/test_rooms.py | 9 +-------- 3 files changed, 24 insertions(+), 16 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index b73ad62147..82c8cb5f0c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import SynapseError, AuthError, Codes +from synapse.api.errors import AuthError, Codes from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator @@ -119,9 +119,12 @@ class MessageHandler(BaseHandler): if source_config.direction == 'b': # if we're going backwards, we might need to backfill. This # requires that we have a topo token. - if room_token.topological is None: - raise SynapseError(400, "Invalid token: cannot paginate " - "backwards from a stream token") + if room_token.topological: + max_topo = room_token.topological + else: + max_topo = yield self.store.get_max_topological_token_for_stream_and_room( + room_id, room_token.stream + ) if membership == Membership.LEAVE: # If they have left the room then clamp the token to be before @@ -131,11 +134,11 @@ class MessageHandler(BaseHandler): member_event_id ) leave_token = RoomStreamToken.parse(leave_token) - if leave_token.topological < room_token.topological: + if leave_token.topological < max_topo: source_config.from_key = str(leave_token) yield self.hs.get_handlers().federation_handler.maybe_backfill( - room_id, room_token.topological + room_id, max_topo ) events, next_key = yield data_source.get_pagination_rows( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 28721e6994..5096b46864 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -234,10 +234,10 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) - ret.reverse() - self._set_before_and_after(ret, rows, topo_order=False) + ret.reverse() + if rows: key = "s%d" % min(r["stream_ordering"] for r in rows) else: @@ -570,6 +570,18 @@ class StreamStore(SQLBaseStore): row["topological_ordering"], row["stream_ordering"],) ) + def get_max_topological_token_for_stream_and_room(self, room_id, stream_key): + sql = ( + "SELECT max(topological_ordering) FROM events" + " WHERE room_id = ? AND stream_ordering < ?" + ) + return self._execute( + "get_max_topological_token_for_stream_and_room", None, + sql, room_id, stream_key, + ).addCallback( + lambda r: r[0][0] if r else 0 + ) + def _get_max_topological_txn(self, txn): txn.execute( "SELECT MAX(topological_ordering) FROM events" diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 2fe6f695f5..ad5dd3bd6e 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -1044,13 +1044,6 @@ class RoomMessageListTestCase(RestTestCase): self.assertTrue("chunk" in response) self.assertTrue("end" in response) - @defer.inlineCallbacks - def test_stream_token_is_rejected_for_back_pagination(self): - (code, response) = yield self.mock_resource.trigger_get( - "/rooms/%s/messages?access_token=x&from=s0_0_0_0_0&dir=b" % - self.room_id) - self.assertEquals(400, code) - @defer.inlineCallbacks def test_stream_token_is_accepted_for_fwd_pagianation(self): token = "s0_0_0_0_0" @@ -1061,4 +1054,4 @@ class RoomMessageListTestCase(RestTestCase): self.assertTrue("start" in response) self.assertEquals(token, response['start']) self.assertTrue("chunk" in response) - self.assertTrue("end" in response) \ No newline at end of file + self.assertTrue("end" in response) -- cgit 1.4.1 From e1941442d442fe62570551071edfd936304697e7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 15:02:37 +0000 Subject: Invalidate caches properly. Remove unused arg --- synapse/storage/events.py | 9 ++++++--- synapse/storage/receipts.py | 10 ++++++---- synapse/storage/stream.py | 2 +- synapse/util/caches/room_change_cache.py | 4 ++-- 4 files changed, 15 insertions(+), 10 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 80187722ea..2d2270b297 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -128,9 +128,6 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) - self._events_stream_cache.room_has_changed( - None, event.room_id, stream_ordering - ) except _RollbackButIsFineException: pass @@ -213,6 +210,12 @@ class EventsStore(SQLBaseStore): for event, _ in events_and_contexts: txn.call_after(self._invalidate_get_event_cache, event.event_id) + if not backfilled: + txn.call_after( + self._events_stream_cache.room_has_changed, + event.room_id, event.internal_metadata.stream_ordering, + ) + depth_updates = {} for event, _ in events_and_contexts: if event.internal_metadata.is_outlier(): diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 7118368d97..5ffbfdec51 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -78,7 +78,7 @@ class ReceiptsStore(SQLBaseStore): if from_key: room_ids = yield self._receipts_stream_cache.get_rooms_changed( - self, room_ids, from_key + room_ids, from_key ) results = yield self._get_linearized_receipts_for_rooms( @@ -221,6 +221,11 @@ class ReceiptsStore(SQLBaseStore): # FIXME: This shouldn't invalidate the whole cache txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) + txn.call_after( + self._receipts_stream_cache.room_has_changed, + room_id, stream_id + ) + # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts sql = ( @@ -308,9 +313,6 @@ class ReceiptsStore(SQLBaseStore): stream_id_manager = yield self._receipts_id_gen.get_next(self) with stream_id_manager as stream_id: - yield self._receipts_stream_cache.room_has_changed( - self, room_id, stream_id - ) have_persisted = yield self.runInteraction( "insert_linearized_receipt", self.insert_linearized_receipt_txn, diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 5096b46864..67e7e6a76f 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -169,7 +169,7 @@ class StreamStore(SQLBaseStore): from_id = RoomStreamToken.parse_stream_token(from_key).stream room_ids = yield self._events_stream_cache.get_rooms_changed( - self, room_ids, from_id + room_ids, from_id ) if not room_ids: diff --git a/synapse/util/caches/room_change_cache.py b/synapse/util/caches/room_change_cache.py index 3a873c9c30..eb2ab5f1e4 100644 --- a/synapse/util/caches/room_change_cache.py +++ b/synapse/util/caches/room_change_cache.py @@ -51,7 +51,7 @@ class RoomStreamChangeCache(object): return False - def get_rooms_changed(self, store, room_ids, key): + def get_rooms_changed(self, room_ids, key): """Returns subset of room ids that have had new things since the given key. If the key is too old it will just return the given list. """ @@ -70,7 +70,7 @@ class RoomStreamChangeCache(object): return result - def room_has_changed(self, store, room_id, key): + def room_has_changed(self, room_id, key): """Informs the cache that the room has been changed at the given key. """ if key > self._earliest_known_key: -- cgit 1.4.1 From c23a8c783382a0789c757e16e104cf08654e6cf8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 15:55:26 +0000 Subject: Ensure keys to RoomStreamChangeCache are ints --- synapse/storage/stream.py | 11 ++++++----- synapse/util/caches/room_change_cache.py | 6 ++++++ 2 files changed, 12 insertions(+), 5 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 67e7e6a76f..6a724193e1 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -199,12 +199,13 @@ class StreamStore(SQLBaseStore): if from_key == to_key: defer.returnValue(([], from_key)) - has_changed = yield self._events_stream_cache.get_room_has_changed( - room_id, from_id - ) + if from_id: + has_changed = yield self._events_stream_cache.get_room_has_changed( + room_id, from_id + ) - if not has_changed: - defer.returnValue(([], from_key)) + if not has_changed: + defer.returnValue(([], from_key)) def f(txn): if from_id is not None: diff --git a/synapse/util/caches/room_change_cache.py b/synapse/util/caches/room_change_cache.py index eb2ab5f1e4..e8bfedd72f 100644 --- a/synapse/util/caches/room_change_cache.py +++ b/synapse/util/caches/room_change_cache.py @@ -39,6 +39,8 @@ class RoomStreamChangeCache(object): caches_by_name[self.name] = self._cache def get_room_has_changed(self, room_id, key): + assert type(key) is int + if key <= self._earliest_known_key: return True @@ -55,6 +57,8 @@ class RoomStreamChangeCache(object): """Returns subset of room ids that have had new things since the given key. If the key is too old it will just return the given list. """ + assert type(key) is int + if key > self._earliest_known_key: keys = self._cache.keys() i = keys.bisect_right(key) @@ -73,6 +77,8 @@ class RoomStreamChangeCache(object): def room_has_changed(self, room_id, key): """Informs the cache that the room has been changed at the given key. """ + assert type(key) is int + if key > self._earliest_known_key: old_key = self._room_to_key.get(room_id, None) if old_key: -- cgit 1.4.1 From 00cb3eb24b277bb37bd1b7d8449c08a37cb4b014 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 16:37:41 +0000 Subject: Cache tags and account data --- synapse/storage/account_data.py | 20 ++++++- synapse/storage/events.py | 2 +- synapse/storage/receipts.py | 8 +-- synapse/storage/stream.py | 8 +-- synapse/storage/tags.py | 14 +++++ synapse/util/caches/room_change_cache.py | 92 ----------------------------- synapse/util/caches/stream_change_cache.py | 95 ++++++++++++++++++++++++++++++ 7 files changed, 137 insertions(+), 102 deletions(-) delete mode 100644 synapse/util/caches/room_change_cache.py create mode 100644 synapse/util/caches/stream_change_cache.py (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index 9c6597e012..95294c3f6c 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -14,6 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore +from synapse.util.caches.stream_change_cache import StreamChangeCache from twisted.internet import defer import ujson as json @@ -23,6 +24,13 @@ logger = logging.getLogger(__name__) class AccountDataStore(SQLBaseStore): + def __init__(self, hs): + super(AccountDataStore, self).__init__(hs) + + self._account_data_stream_cache = StreamChangeCache( + "AccountDataChangeCache", self._account_data_id_gen.get_max_token(None), + max_size=1000, + ) def get_account_data_for_user(self, user_id): """Get all the client account_data for a user. @@ -83,7 +91,7 @@ class AccountDataStore(SQLBaseStore): "get_account_data_for_room", get_account_data_for_room_txn ) - def get_updated_account_data_for_user(self, user_id, stream_id): + def get_updated_account_data_for_user(self, user_id, stream_id, room_ids=None): """Get all the client account_data for a that's changed. Args: @@ -120,6 +128,12 @@ class AccountDataStore(SQLBaseStore): return (global_account_data, account_data_by_room) + changed = self._account_data_stream_cache.get_entity_has_changed( + user_id, int(stream_id) + ) + if not changed: + defer.returnValue(({}, {})) + return self.runInteraction( "get_updated_account_data_for_user", get_updated_account_data_for_user_txn ) @@ -186,6 +200,10 @@ class AccountDataStore(SQLBaseStore): "content": content_json, } ) + txn.call_after( + self._account_data_stream_cache.entity_has_changed, + user_id, next_id, + ) self._update_max_stream_id(txn, next_id) with (yield self._account_data_id_gen.get_next(self)) as next_id: diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2d2270b297..5e85552029 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -212,7 +212,7 @@ class EventsStore(SQLBaseStore): if not backfilled: txn.call_after( - self._events_stream_cache.room_has_changed, + self._events_stream_cache.entity_has_changed, event.room_id, event.internal_metadata.stream_ordering, ) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 5ffbfdec51..8068c73740 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -15,7 +15,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached -from synapse.util.caches.room_change_cache import RoomStreamChangeCache +from synapse.util.caches.stream_change_cache import StreamChangeCache from twisted.internet import defer @@ -30,7 +30,7 @@ class ReceiptsStore(SQLBaseStore): def __init__(self, hs): super(ReceiptsStore, self).__init__(hs) - self._receipts_stream_cache = RoomStreamChangeCache( + self._receipts_stream_cache = StreamChangeCache( "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token(None) ) @@ -77,7 +77,7 @@ class ReceiptsStore(SQLBaseStore): room_ids = set(room_ids) if from_key: - room_ids = yield self._receipts_stream_cache.get_rooms_changed( + room_ids = yield self._receipts_stream_cache.get_entities_changed( room_ids, from_key ) @@ -222,7 +222,7 @@ class ReceiptsStore(SQLBaseStore): txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) txn.call_after( - self._receipts_stream_cache.room_has_changed, + self._receipts_stream_cache.entity_has_changed, room_id, stream_id ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 6a724193e1..c7d7893328 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -37,7 +37,7 @@ from twisted.internet import defer from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks -from synapse.util.caches.room_change_cache import RoomStreamChangeCache +from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logutils import log_function @@ -81,7 +81,7 @@ class StreamStore(SQLBaseStore): def __init__(self, hs): super(StreamStore, self).__init__(hs) - self._events_stream_cache = RoomStreamChangeCache( + self._events_stream_cache = StreamChangeCache( "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None) ) @@ -168,7 +168,7 @@ class StreamStore(SQLBaseStore): def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0): from_id = RoomStreamToken.parse_stream_token(from_key).stream - room_ids = yield self._events_stream_cache.get_rooms_changed( + room_ids = yield self._events_stream_cache.get_entities_changed( room_ids, from_id ) @@ -200,7 +200,7 @@ class StreamStore(SQLBaseStore): defer.returnValue(([], from_key)) if from_id: - has_changed = yield self._events_stream_cache.get_room_has_changed( + has_changed = yield self._events_stream_cache.get_entity_has_changed( room_id, from_id ) diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 4c39e07cbd..50af899192 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -15,6 +15,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached +from synapse.util.caches.stream_change_cache import StreamChangeCache from twisted.internet import defer import ujson as json @@ -24,6 +25,13 @@ logger = logging.getLogger(__name__) class TagsStore(SQLBaseStore): + def __init__(self, hs): + super(TagsStore, self).__init__(hs) + + self._tags_stream_cache = StreamChangeCache( + "TagsChangeCache", self._account_data_id_gen.get_max_token(None), + max_size=1000, + ) def get_max_account_data_stream_id(self): """Get the current max stream id for the private user data stream @@ -80,6 +88,10 @@ class TagsStore(SQLBaseStore): room_ids = [row[0] for row in txn.fetchall()] return room_ids + changed = self._tags_stream_cache.get_entity_has_changed(user_id, int(stream_id)) + if not changed: + defer.returnValue({}) + room_ids = yield self.runInteraction( "get_updated_tags", get_updated_tags_txn ) @@ -177,6 +189,8 @@ class TagsStore(SQLBaseStore): next_id(int): The the revision to advance to. """ + txn.call_after(self._tags_stream_cache.entity_has_changed, user_id, next_id) + update_max_id_sql = ( "UPDATE account_data_max_stream_id" " SET stream_id = ?" diff --git a/synapse/util/caches/room_change_cache.py b/synapse/util/caches/room_change_cache.py deleted file mode 100644 index e8bfedd72f..0000000000 --- a/synapse/util/caches/room_change_cache.py +++ /dev/null @@ -1,92 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.util.caches import cache_counter, caches_by_name - - -from blist import sorteddict -import logging - - -logger = logging.getLogger(__name__) - - -class RoomStreamChangeCache(object): - """Keeps track of the stream_id of the latest change in rooms. - - Given a list of rooms and stream key, it will give a subset of rooms that - may have changed since that key. If the key is too old then the cache - will simply return all rooms. - """ - def __init__(self, name, current_key, size_of_cache=10000): - self._size_of_cache = size_of_cache - self._room_to_key = {} - self._cache = sorteddict() - self._earliest_known_key = current_key - self.name = name - caches_by_name[self.name] = self._cache - - def get_room_has_changed(self, room_id, key): - assert type(key) is int - - if key <= self._earliest_known_key: - return True - - room_key = self._room_to_key.get(room_id, None) - if room_key is None: - return True - - if key < room_key: - return True - - return False - - def get_rooms_changed(self, room_ids, key): - """Returns subset of room ids that have had new things since the - given key. If the key is too old it will just return the given list. - """ - assert type(key) is int - - if key > self._earliest_known_key: - keys = self._cache.keys() - i = keys.bisect_right(key) - - result = set( - self._cache[k] for k in keys[i:] - ).intersection(room_ids) - - cache_counter.inc_hits(self.name) - else: - result = room_ids - cache_counter.inc_misses(self.name) - - return result - - def room_has_changed(self, room_id, key): - """Informs the cache that the room has been changed at the given key. - """ - assert type(key) is int - - if key > self._earliest_known_key: - old_key = self._room_to_key.get(room_id, None) - if old_key: - key = max(key, old_key) - self._cache.pop(old_key, None) - self._cache[key] = room_id - - while len(self._cache) > self._size_of_cache: - k, r = self._cache.popitem() - self._earliest_key = max(k, self._earliest_key) - self._room_to_key.pop(r, None) diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py new file mode 100644 index 0000000000..33b37f7f29 --- /dev/null +++ b/synapse/util/caches/stream_change_cache.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.util.caches import cache_counter, caches_by_name + + +from blist import sorteddict +import logging + + +logger = logging.getLogger(__name__) + + +class StreamChangeCache(object): + """Keeps track of the stream positions of the latest change in a set of entities. + + Typically the entity will be a room or user id. + + Given a list of entities and a stream position, it will give a subset of + entities that may have changed since that position. If position key is too + old then the cache will simply return all given entities. + """ + def __init__(self, name, current_stream_pos, max_size=10000): + self._max_size = max_size + self._entity_to_key = {} + self._cache = sorteddict() + self._earliest_known_stream_pos = current_stream_pos + self.name = name + caches_by_name[self.name] = self._cache + + def get_entity_has_changed(self, entity, stream_pos): + assert type(stream_pos) is int + + if stream_pos <= self._earliest_known_stream_pos: + return True + + latest_entity_change_pos = self._entity_to_key.get(entity, None) + if latest_entity_change_pos is None: + return True + + if stream_pos < latest_entity_change_pos: + return True + + return False + + def get_entities_changed(self, entities, stream_pos): + """Returns subset of entities that have had new things since the + given position. If the position is too old it will just return the given list. + """ + assert type(stream_pos) is int + + if stream_pos > self._earliest_known_stream_pos: + keys = self._cache.keys() + i = keys.bisect_right(stream_pos) + + result = set( + self._cache[k] for k in keys[i:] + ).intersection(entities) + + cache_counter.inc_hits(self.name) + else: + result = entities + cache_counter.inc_misses(self.name) + + return result + + def entity_has_changed(self, entitiy, stream_pos): + """Informs the cache that the entitiy has been changed at the given + position. + """ + assert type(stream_pos) is int + + if stream_pos > self._earliest_known_stream_pos: + old_pos = self._entity_to_key.get(entitiy, None) + if old_pos: + stream_pos = max(stream_pos, old_pos) + self._cache.pop(old_pos, None) + self._cache[stream_pos] = entitiy + + while len(self._cache) > self._max_size: + k, r = self._cache.popitem() + self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos) + self._entity_to_key.pop(r, None) -- cgit 1.4.1 From 45cf827c8fe7163a51f1d0d7c9e2531da9b58c8d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 16:39:18 +0000 Subject: Change name and doc has_entity_changed --- synapse/storage/account_data.py | 2 +- synapse/storage/stream.py | 2 +- synapse/storage/tags.py | 2 +- synapse/util/caches/stream_change_cache.py | 4 +++- 4 files changed, 6 insertions(+), 4 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index 95294c3f6c..62e49e1c0e 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -128,7 +128,7 @@ class AccountDataStore(SQLBaseStore): return (global_account_data, account_data_by_room) - changed = self._account_data_stream_cache.get_entity_has_changed( + changed = self._account_data_stream_cache.has_entity_changed( user_id, int(stream_id) ) if not changed: diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index c7d7893328..6e81d46c60 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -200,7 +200,7 @@ class StreamStore(SQLBaseStore): defer.returnValue(([], from_key)) if from_id: - has_changed = yield self._events_stream_cache.get_entity_has_changed( + has_changed = yield self._events_stream_cache.has_entity_changed( room_id, from_id ) diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 50af899192..75ce04092d 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -88,7 +88,7 @@ class TagsStore(SQLBaseStore): room_ids = [row[0] for row in txn.fetchall()] return room_ids - changed = self._tags_stream_cache.get_entity_has_changed(user_id, int(stream_id)) + changed = self._tags_stream_cache.has_entity_changed(user_id, int(stream_id)) if not changed: defer.returnValue({}) diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 33b37f7f29..3ca0e57780 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -40,7 +40,9 @@ class StreamChangeCache(object): self.name = name caches_by_name[self.name] = self._cache - def get_entity_has_changed(self, entity, stream_pos): + def has_entity_changed(self, entity, stream_pos): + """Returns True if the entity may have been updated since stream_pos + """ assert type(stream_pos) is int if stream_pos <= self._earliest_known_stream_pos: -- cgit 1.4.1