diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 6e81d46c60..367ffc9543 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -37,10 +37,9 @@ from twisted.internet import defer
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
-from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken
-from synapse.util.logutils import log_function
+from synapse.util.logcontext import preserve_fn
import logging
@@ -78,13 +77,6 @@ def upper_bound(token):
class StreamStore(SQLBaseStore):
- def __init__(self, hs):
- super(StreamStore, self).__init__(hs)
-
- self._events_stream_cache = StreamChangeCache(
- "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None)
- )
-
@defer.inlineCallbacks
def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
# NB this lives here instead of appservice.py so we can reuse the
@@ -177,14 +169,14 @@ class StreamStore(SQLBaseStore):
results = {}
room_ids = list(room_ids)
- for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)):
+ for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
res = yield defer.gatherResults([
- self.get_room_events_stream_for_room(
- room_id, from_key, to_key, limit
- ).addCallback(lambda r, rm: (rm, r), room_id)
+ preserve_fn(self.get_room_events_stream_for_room)(
+ room_id, from_key, to_key, limit,
+ )
for room_id in room_ids
])
- results.update(dict(res))
+ results.update(dict(zip(rm_ids, res)))
defer.returnValue(results)
@@ -229,28 +221,30 @@ class StreamStore(SQLBaseStore):
rows = self.cursor_to_dict(txn)
- ret = self._get_events_txn(
- txn,
- [r["event_id"] for r in rows],
- get_prev_content=True
- )
+ return rows
+
+ rows = yield self.runInteraction("get_room_events_stream_for_room", f)
- self._set_before_and_after(ret, rows, topo_order=False)
+ ret = yield self._get_events(
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
- ret.reverse()
+ self._set_before_and_after(ret, rows, topo_order=False)
- if rows:
- key = "s%d" % min(r["stream_ordering"] for r in rows)
- else:
- # Assume we didn't get anything because there was nothing to
- # get.
- key = from_key
+ ret.reverse()
- return ret, key
- res = yield self.runInteraction("get_room_events_stream_for_room", f)
- defer.returnValue(res)
+ if rows:
+ key = "s%d" % min(r["stream_ordering"] for r in rows)
+ else:
+ # Assume we didn't get anything because there was nothing to
+ # get.
+ key = from_key
- def get_room_changes_for_user(self, user_id, from_key, to_key):
+ defer.returnValue((ret, key))
+
+ @defer.inlineCallbacks
+ def get_membership_changes_for_user(self, user_id, from_key, to_key):
if from_key is not None:
from_id = RoomStreamToken.parse_stream_token(from_key).stream
else:
@@ -258,7 +252,14 @@ class StreamStore(SQLBaseStore):
to_id = RoomStreamToken.parse_stream_token(to_key).stream
if from_key == to_key:
- return defer.succeed([])
+ defer.returnValue([])
+
+ if from_id:
+ has_changed = self._membership_stream_cache.has_entity_changed(
+ user_id, int(from_id)
+ )
+ if not has_changed:
+ defer.returnValue([])
def f(txn):
if from_id is not None:
@@ -283,17 +284,19 @@ class StreamStore(SQLBaseStore):
txn.execute(sql, (user_id, to_id,))
rows = self.cursor_to_dict(txn)
- ret = self._get_events_txn(
- txn,
- [r["event_id"] for r in rows],
- get_prev_content=True
- )
+ return rows
+
+ rows = yield self.runInteraction("get_membership_changes_for_user", f)
- return ret
+ ret = yield self._get_events(
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
- return self.runInteraction("get_room_changes_for_user", f)
+ self._set_before_and_after(ret, rows, topo_order=False)
+
+ defer.returnValue(ret)
- @log_function
def get_room_events_stream(
self,
user_id,
@@ -324,11 +327,6 @@ class StreamStore(SQLBaseStore):
" WHERE m.user_id = ? AND m.membership = 'join'"
)
current_room_membership_args = [user_id]
- if room_ids:
- current_room_membership_sql += " AND m.room_id in (%s)" % (
- ",".join(map(lambda _: "?", room_ids))
- )
- current_room_membership_args = [user_id] + room_ids
# We also want to get any membership events about that user, e.g.
# invites or leave notifications.
@@ -567,6 +565,7 @@ class StreamStore(SQLBaseStore):
table="events",
keyvalues={"event_id": event_id},
retcols=("stream_ordering", "topological_ordering"),
+ desc="get_topological_token_for_event",
).addCallback(lambda row: "t%d-%d" % (
row["topological_ordering"], row["stream_ordering"],)
)
@@ -604,6 +603,10 @@ class StreamStore(SQLBaseStore):
internal = event.internal_metadata
internal.before = str(RoomStreamToken(topo, stream - 1))
internal.after = str(RoomStreamToken(topo, stream))
+ internal.order = (
+ int(topo) if topo else 0,
+ int(stream),
+ )
@defer.inlineCallbacks
def get_events_around(self, room_id, event_id, before_limit, after_limit):
|