diff options
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r-- | synapse/storage/stream.py | 93 |
1 files changed, 48 insertions, 45 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 6e81d46c60..367ffc9543 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -37,10 +37,9 @@ from twisted.internet import defer from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks -from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken -from synapse.util.logutils import log_function +from synapse.util.logcontext import preserve_fn import logging @@ -78,13 +77,6 @@ def upper_bound(token): class StreamStore(SQLBaseStore): - def __init__(self, hs): - super(StreamStore, self).__init__(hs) - - self._events_stream_cache = StreamChangeCache( - "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None) - ) - @defer.inlineCallbacks def get_appservice_room_stream(self, service, from_key, to_key, limit=0): # NB this lives here instead of appservice.py so we can reuse the @@ -177,14 +169,14 @@ class StreamStore(SQLBaseStore): results = {} room_ids = list(room_ids) - for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)): + for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): res = yield defer.gatherResults([ - self.get_room_events_stream_for_room( - room_id, from_key, to_key, limit - ).addCallback(lambda r, rm: (rm, r), room_id) + preserve_fn(self.get_room_events_stream_for_room)( + room_id, from_key, to_key, limit, + ) for room_id in room_ids ]) - results.update(dict(res)) + results.update(dict(zip(rm_ids, res))) defer.returnValue(results) @@ -229,28 +221,30 @@ class StreamStore(SQLBaseStore): rows = self.cursor_to_dict(txn) - ret = self._get_events_txn( - txn, - [r["event_id"] for r in rows], - get_prev_content=True - ) + return rows + + rows = yield self.runInteraction("get_room_events_stream_for_room", f) - self._set_before_and_after(ret, rows, topo_order=False) + ret = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) - ret.reverse() + self._set_before_and_after(ret, rows, topo_order=False) - if rows: - key = "s%d" % min(r["stream_ordering"] for r in rows) - else: - # Assume we didn't get anything because there was nothing to - # get. - key = from_key + ret.reverse() - return ret, key - res = yield self.runInteraction("get_room_events_stream_for_room", f) - defer.returnValue(res) + if rows: + key = "s%d" % min(r["stream_ordering"] for r in rows) + else: + # Assume we didn't get anything because there was nothing to + # get. + key = from_key - def get_room_changes_for_user(self, user_id, from_key, to_key): + defer.returnValue((ret, key)) + + @defer.inlineCallbacks + def get_membership_changes_for_user(self, user_id, from_key, to_key): if from_key is not None: from_id = RoomStreamToken.parse_stream_token(from_key).stream else: @@ -258,7 +252,14 @@ class StreamStore(SQLBaseStore): to_id = RoomStreamToken.parse_stream_token(to_key).stream if from_key == to_key: - return defer.succeed([]) + defer.returnValue([]) + + if from_id: + has_changed = self._membership_stream_cache.has_entity_changed( + user_id, int(from_id) + ) + if not has_changed: + defer.returnValue([]) def f(txn): if from_id is not None: @@ -283,17 +284,19 @@ class StreamStore(SQLBaseStore): txn.execute(sql, (user_id, to_id,)) rows = self.cursor_to_dict(txn) - ret = self._get_events_txn( - txn, - [r["event_id"] for r in rows], - get_prev_content=True - ) + return rows + + rows = yield self.runInteraction("get_membership_changes_for_user", f) - return ret + ret = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) - return self.runInteraction("get_room_changes_for_user", f) + self._set_before_and_after(ret, rows, topo_order=False) + + defer.returnValue(ret) - @log_function def get_room_events_stream( self, user_id, @@ -324,11 +327,6 @@ class StreamStore(SQLBaseStore): " WHERE m.user_id = ? AND m.membership = 'join'" ) current_room_membership_args = [user_id] - if room_ids: - current_room_membership_sql += " AND m.room_id in (%s)" % ( - ",".join(map(lambda _: "?", room_ids)) - ) - current_room_membership_args = [user_id] + room_ids # We also want to get any membership events about that user, e.g. # invites or leave notifications. @@ -567,6 +565,7 @@ class StreamStore(SQLBaseStore): table="events", keyvalues={"event_id": event_id}, retcols=("stream_ordering", "topological_ordering"), + desc="get_topological_token_for_event", ).addCallback(lambda row: "t%d-%d" % ( row["topological_ordering"], row["stream_ordering"],) ) @@ -604,6 +603,10 @@ class StreamStore(SQLBaseStore): internal = event.internal_metadata internal.before = str(RoomStreamToken(topo, stream - 1)) internal.after = str(RoomStreamToken(topo, stream)) + internal.order = ( + int(topo) if topo else 0, + int(stream), + ) @defer.inlineCallbacks def get_events_around(self, room_id, event_id, before_limit, after_limit): |