diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 328c049b03..1fdf978313 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -514,13 +514,6 @@ class SyncHandler(BaseHandler):
timeline_limit = sync_config.filter_collection.timeline_limit()
- room_events, _ = yield self.store.get_room_events_stream(
- sync_config.user.to_string(),
- from_key=since_token.room_key,
- to_key=now_token.room_key,
- limit=timeline_limit + 1,
- )
-
tags_by_room = yield self.store.get_updated_tags(
sync_config.user.to_string(),
since_token.account_data_key,
@@ -533,6 +526,32 @@ class SyncHandler(BaseHandler):
)
)
+ rooms_changed = yield self.store.get_room_changes_for_user(
+ sync_config.user.to_string(), since_token.room_key, now_token.room_key
+ )
+
+ room_to_events = yield self.store.get_room_events_stream_for_rooms(
+ room_ids=room_ids,
+ from_key=since_token.room_key,
+ to_key=now_token.room_key,
+ limit=timeline_limit + 1,
+ )
+
+ room_events = [
+ event
+ for events, _ in room_to_events.values()
+ for event in events
+ ]
+
+ room_events.extend(rooms_changed)
+
+ # room_events, _ = yield self.store.get_room_events_stream(
+ # sync_config.user.to_string(),
+ # from_key=since_token.room_key,
+ # to_key=now_token.room_key,
+ # limit=timeline_limit + 1,
+ # )
+
joined = []
archived = []
if len(room_events) <= timeline_limit:
@@ -694,14 +713,12 @@ class SyncHandler(BaseHandler):
end_key = room_key
while limited and len(recents) < timeline_limit and max_repeat:
- events, keys = yield self.store.get_recent_events_for_room(
+ events, end_key = yield self.store.get_recent_room_events_stream_for_room(
room_id,
limit=load_limit + 1,
- from_token=since_token.room_key if since_token else None,
- end_token=end_key,
+ from_key=since_token.room_key if since_token else None,
+ to_key=end_key,
)
- room_key, _ = keys
- end_key = "s" + room_key.split('-')[-1]
loaded_recents = sync_config.filter_collection.filter_room_timeline(events)
loaded_recents = yield self._filter_events_for_client(
sync_config.user.to_string(),
@@ -712,6 +729,7 @@ class SyncHandler(BaseHandler):
recents = loaded_recents
if len(events) <= load_limit:
limited = False
+ break
max_repeat -= 1
if len(recents) > timeline_limit:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 298cb9bada..d96ea3a30e 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -128,6 +128,8 @@ class EventsStore(SQLBaseStore):
is_new_state=is_new_state,
current_state=current_state,
)
+ logger.info("Invalidating %r at %r", event.room_id, stream_ordering)
+ self._events_stream_cache.room_has_changed(None, event.room_id, stream_ordering)
except _RollbackButIsFineException:
pass
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index c0593e23ee..b7a4e77748 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -16,6 +16,7 @@
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached
from synapse.util.caches import cache_counter, caches_by_name
+from synapse.util.caches.room_change_cache import RoomStreamChangeCache
from twisted.internet import defer
@@ -31,8 +32,8 @@ class ReceiptsStore(SQLBaseStore):
def __init__(self, hs):
super(ReceiptsStore, self).__init__(hs)
- self._receipts_stream_cache = _RoomStreamChangeCache(
- self._receipts_id_gen.get_max_token(None)
+ self._receipts_stream_cache = RoomStreamChangeCache(
+ "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token(None)
)
@cached(num_args=2)
@@ -370,63 +371,3 @@ class ReceiptsStore(SQLBaseStore):
"data": json.dumps(data),
}
)
-
-
-class _RoomStreamChangeCache(object):
- """Keeps track of the stream_id of the latest change in rooms.
-
- Given a list of rooms and stream key, it will give a subset of rooms that
- may have changed since that key. If the key is too old then the cache
- will simply return all rooms.
- """
- def __init__(self, current_key, size_of_cache=10000):
- self._size_of_cache = size_of_cache
- self._room_to_key = {}
- self._cache = sorteddict()
- self._earliest_key = current_key
- self.name = "ReceiptsRoomChangeCache"
- caches_by_name[self.name] = self._cache
-
- @defer.inlineCallbacks
- def get_rooms_changed(self, store, room_ids, key):
- """Returns subset of room ids that have had new receipts since the
- given key. If the key is too old it will just return the given list.
- """
- if key > (yield self._get_earliest_key(store)):
- keys = self._cache.keys()
- i = keys.bisect_right(key)
-
- result = set(
- self._cache[k] for k in keys[i:]
- ).intersection(room_ids)
-
- cache_counter.inc_hits(self.name)
- else:
- result = room_ids
- cache_counter.inc_misses(self.name)
-
- defer.returnValue(result)
-
- @defer.inlineCallbacks
- def room_has_changed(self, store, room_id, key):
- """Informs the cache that the room has been changed at the given key.
- """
- if key > (yield self._get_earliest_key(store)):
- old_key = self._room_to_key.get(room_id, None)
- if old_key:
- key = max(key, old_key)
- self._cache.pop(old_key, None)
- self._cache[key] = room_id
-
- while len(self._cache) > self._size_of_cache:
- k, r = self._cache.popitem()
- self._earliest_key = max(k, self._earliest_key)
- self._room_to_key.pop(r, None)
-
- @defer.inlineCallbacks
- def _get_earliest_key(self, store):
- if self._earliest_key is None:
- self._earliest_key = yield store.get_max_receipt_stream_id()
- self._earliest_key = int(self._earliest_key)
-
- defer.returnValue(self._earliest_key)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index e31bad258a..3a32a0019a 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -37,6 +37,7 @@ from twisted.internet import defer
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.room_change_cache import RoomStreamChangeCache
from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken
from synapse.util.logutils import log_function
@@ -77,6 +78,12 @@ def upper_bound(token):
class StreamStore(SQLBaseStore):
+ def __init__(self, hs):
+ super(StreamStore, self).__init__(hs)
+
+ self._events_stream_cache = RoomStreamChangeCache(
+ "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None)
+ )
@defer.inlineCallbacks
def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
@@ -157,6 +164,132 @@ class StreamStore(SQLBaseStore):
results = yield self.runInteraction("get_appservice_room_stream", f)
defer.returnValue(results)
+ @defer.inlineCallbacks
+ def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0):
+ from_id = RoomStreamToken.parse_stream_token(from_key).stream
+
+ room_ids = yield self._events_stream_cache.get_rooms_changed(
+ self, room_ids, from_id
+ )
+
+ if not room_ids:
+ defer.returnValue({})
+
+ results = {}
+ room_ids = list(room_ids)
+ for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)):
+ res = yield defer.gatherResults([
+ self.get_recent_room_events_stream_for_room(
+ room_id, from_key, to_key, limit
+ ).addCallback(lambda r, rm: (rm, r), room_id)
+ for room_id in room_ids
+ ])
+ results.update(dict(res))
+
+ defer.returnValue(results)
+
+ @defer.inlineCallbacks
+ def get_recent_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0):
+ if from_key is not None:
+ from_id = RoomStreamToken.parse_stream_token(from_key).stream
+ else:
+ from_id = None
+ to_id = RoomStreamToken.parse_stream_token(to_key).stream
+
+ if from_key == to_key:
+ defer.returnValue(([], from_key))
+
+ has_changed = yield self._events_stream_cache.get_room_has_changed(
+ room_id, from_id
+ )
+
+ if not has_changed:
+ defer.returnValue(([], from_key))
+
+ def f(txn):
+ if from_id is not None:
+ sql = (
+ "SELECT event_id, stream_ordering FROM events WHERE"
+ " room_id = ?"
+ " AND not outlier"
+ " AND stream_ordering > ? AND stream_ordering <= ?"
+ " ORDER BY stream_ordering DESC LIMIT ?"
+ )
+ txn.execute(sql, (room_id, from_id, to_id, limit))
+ else:
+ sql = (
+ "SELECT event_id, stream_ordering FROM events WHERE"
+ " room_id = ?"
+ " AND not outlier"
+ " AND stream_ordering <= ?"
+ " ORDER BY stream_ordering DESC LIMIT ?"
+ )
+ txn.execute(sql, (room_id, to_id, limit))
+
+ rows = self.cursor_to_dict(txn)
+
+ ret = self._get_events_txn(
+ txn,
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
+
+ ret.reverse()
+
+ self._set_before_and_after(ret, rows)
+
+ if rows:
+ key = "s%d" % min(r["stream_ordering"] for r in rows)
+ else:
+ # Assume we didn't get anything because there was nothing to
+ # get.
+ key = from_key
+
+ return ret, key
+ res = yield self.runInteraction("get_recent_room_events_stream_for_room", f)
+ defer.returnValue(res)
+
+ def get_room_changes_for_user(self, user_id, from_key, to_key):
+ if from_key is not None:
+ from_id = RoomStreamToken.parse_stream_token(from_key).stream
+ else:
+ from_id = None
+ to_id = RoomStreamToken.parse_stream_token(to_key).stream
+
+ if from_key == to_key:
+ return defer.succeed([])
+
+ def f(txn):
+ if from_id is not None:
+ sql = (
+ "SELECT m.event_id, stream_ordering FROM events AS e, room_memberships AS m"
+ " WHERE e.event_id = m.event_id"
+ " AND m.user_id = ?"
+ " AND e.stream_ordering > ? AND e.stream_ordering <= ?"
+ " ORDER BY e.stream_ordering ASC"
+ )
+ txn.execute(sql, (user_id, from_id, to_id,))
+ else:
+ sql = (
+ "SELECT m.event_id, stream_ordering FROM events AS e, room_memberships AS m"
+ " WHERE e.event_id = m.event_id"
+ " AND m.user_id = ?"
+ " AND stream_ordering <= ?"
+ " ORDER BY stream_ordering ASC"
+ )
+ txn.execute(sql, (user_id, to_id,))
+ rows = self.cursor_to_dict(txn)
+
+ ret = self._get_events_txn(
+ txn,
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
+
+ return ret
+
+ return self.runInteraction("get_room_changes_for_user", f)
+
@log_function
def get_room_events_stream(
self,
diff --git a/synapse/util/caches/room_change_cache.py b/synapse/util/caches/room_change_cache.py
new file mode 100644
index 0000000000..3a873c9c30
--- /dev/null
+++ b/synapse/util/caches/room_change_cache.py
@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.util.caches import cache_counter, caches_by_name
+
+
+from blist import sorteddict
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class RoomStreamChangeCache(object):
+ """Keeps track of the stream_id of the latest change in rooms.
+
+ Given a list of rooms and stream key, it will give a subset of rooms that
+ may have changed since that key. If the key is too old then the cache
+ will simply return all rooms.
+ """
+ def __init__(self, name, current_key, size_of_cache=10000):
+ self._size_of_cache = size_of_cache
+ self._room_to_key = {}
+ self._cache = sorteddict()
+ self._earliest_known_key = current_key
+ self.name = name
+ caches_by_name[self.name] = self._cache
+
+ def get_room_has_changed(self, room_id, key):
+ if key <= self._earliest_known_key:
+ return True
+
+ room_key = self._room_to_key.get(room_id, None)
+ if room_key is None:
+ return True
+
+ if key < room_key:
+ return True
+
+ return False
+
+ def get_rooms_changed(self, store, room_ids, key):
+ """Returns subset of room ids that have had new things since the
+ given key. If the key is too old it will just return the given list.
+ """
+ if key > self._earliest_known_key:
+ keys = self._cache.keys()
+ i = keys.bisect_right(key)
+
+ result = set(
+ self._cache[k] for k in keys[i:]
+ ).intersection(room_ids)
+
+ cache_counter.inc_hits(self.name)
+ else:
+ result = room_ids
+ cache_counter.inc_misses(self.name)
+
+ return result
+
+ def room_has_changed(self, store, room_id, key):
+ """Informs the cache that the room has been changed at the given key.
+ """
+ if key > self._earliest_known_key:
+ old_key = self._room_to_key.get(room_id, None)
+ if old_key:
+ key = max(key, old_key)
+ self._cache.pop(old_key, None)
+ self._cache[key] = room_id
+
+ while len(self._cache) > self._size_of_cache:
+ k, r = self._cache.popitem()
+ self._earliest_key = max(k, self._earliest_key)
+ self._room_to_key.pop(r, None)
|