diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index efaa0c8d6e..1b007d4945 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -296,7 +296,7 @@ class DeviceHandler(BaseHandler):
# ordering: treat it the same as a new room
event_ids = []
- current_state_ids = yield self.state.get_current_state_ids(room_id)
+ current_state_ids = yield self.store.get_current_state_ids(room_id)
# special-case for an empty prev state: include all members
# in the changed list
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 19eebbd43f..516cd9a6ac 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -21,6 +21,7 @@ from synapse.api.constants import (
EventTypes, JoinRules,
)
from synapse.util.async import concurrently_execute
+from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache
from synapse.types import ThirdPartyInstanceID
@@ -62,6 +63,10 @@ class RoomListHandler(BaseHandler):
appservice and network id to use an appservice specific one.
Setting to None returns all public rooms across all lists.
"""
+ logger.info(
+ "Getting public room list: limit=%r, since=%r, search=%r, network=%r",
+ limit, since_token, bool(search_filter), network_tuple,
+ )
if search_filter:
# We explicitly don't bother caching searches or requests for
# appservice specific lists.
@@ -91,7 +96,6 @@ class RoomListHandler(BaseHandler):
rooms_to_order_value = {}
rooms_to_num_joined = {}
- rooms_to_latest_event_ids = {}
newly_visible = []
newly_unpublished = []
@@ -116,19 +120,26 @@ class RoomListHandler(BaseHandler):
@defer.inlineCallbacks
def get_order_for_room(room_id):
- latest_event_ids = rooms_to_latest_event_ids.get(room_id, None)
- if not latest_event_ids:
+ # Most of the rooms won't have changed between the since token and
+ # now (especially if the since token is "now"). So, we can ask what
+ # the current users are in a room (that will hit a cache) and then
+ # check if the room has changed since the since token. (We have to
+ # do it in that order to avoid races).
+ # If things have changed then fall back to getting the current state
+ # at the since token.
+ joined_users = yield self.store.get_users_in_room(room_id)
+ if self.store.has_room_changed_since(room_id, stream_token):
latest_event_ids = yield self.store.get_forward_extremeties_for_room(
room_id, stream_token
)
- rooms_to_latest_event_ids[room_id] = latest_event_ids
- if not latest_event_ids:
- return
+ if not latest_event_ids:
+ return
+
+ joined_users = yield self.state_handler.get_current_user_in_room(
+ room_id, latest_event_ids,
+ )
- joined_users = yield self.state_handler.get_current_user_in_room(
- room_id, latest_event_ids,
- )
num_joined_users = len(joined_users)
rooms_to_num_joined[room_id] = num_joined_users
@@ -165,19 +176,19 @@ class RoomListHandler(BaseHandler):
rooms_to_scan = rooms_to_scan[:since_token.current_limit]
rooms_to_scan.reverse()
- # Actually generate the entries. _generate_room_entry will append to
+ # Actually generate the entries. _append_room_entry_to_chunk will append to
# chunk but will stop if len(chunk) > limit
chunk = []
if limit and not search_filter:
step = limit + 1
for i in xrange(0, len(rooms_to_scan), step):
# We iterate here because the vast majority of cases we'll stop
- # at first iteration, but occaisonally _generate_room_entry
+ # at first iteration, but occaisonally _append_room_entry_to_chunk
# won't append to the chunk and so we need to loop again.
# We don't want to scan over the entire range either as that
# would potentially waste a lot of work.
yield concurrently_execute(
- lambda r: self._generate_room_entry(
+ lambda r: self._append_room_entry_to_chunk(
r, rooms_to_num_joined[r],
chunk, limit, search_filter
),
@@ -187,7 +198,7 @@ class RoomListHandler(BaseHandler):
break
else:
yield concurrently_execute(
- lambda r: self._generate_room_entry(
+ lambda r: self._append_room_entry_to_chunk(
r, rooms_to_num_joined[r],
chunk, limit, search_filter
),
@@ -256,21 +267,35 @@ class RoomListHandler(BaseHandler):
defer.returnValue(results)
@defer.inlineCallbacks
- def _generate_room_entry(self, room_id, num_joined_users, chunk, limit,
- search_filter):
+ def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit,
+ search_filter):
+ """Generate the entry for a room in the public room list and append it
+ to the `chunk` if it matches the search filter
+ """
if limit and len(chunk) > limit + 1:
# We've already got enough, so lets just drop it.
return
+ result = yield self._generate_room_entry(room_id, num_joined_users)
+
+ if result and _matches_room_entry(result, search_filter):
+ chunk.append(result)
+
+ @cachedInlineCallbacks(num_args=1, cache_context=True)
+ def _generate_room_entry(self, room_id, num_joined_users, cache_context):
+ """Returns the entry for a room
+ """
result = {
"room_id": room_id,
"num_joined_members": num_joined_users,
}
- current_state_ids = yield self.state_handler.get_current_state_ids(room_id)
+ current_state_ids = yield self.store.get_current_state_ids(
+ room_id, on_invalidate=cache_context.invalidate,
+ )
event_map = yield self.store.get_events([
- event_id for key, event_id in current_state_ids.items()
+ event_id for key, event_id in current_state_ids.iteritems()
if key[0] in (
EventTypes.JoinRules,
EventTypes.Name,
@@ -294,7 +319,9 @@ class RoomListHandler(BaseHandler):
if join_rule and join_rule != JoinRules.PUBLIC:
defer.returnValue(None)
- aliases = yield self.store.get_aliases_for_room(room_id)
+ aliases = yield self.store.get_aliases_for_room(
+ room_id, on_invalidate=cache_context.invalidate
+ )
if aliases:
result["aliases"] = aliases
@@ -334,8 +361,7 @@ class RoomListHandler(BaseHandler):
if avatar_url:
result["avatar_url"] = avatar_url
- if _matches_room_entry(result, search_filter):
- chunk.append(result)
+ defer.returnValue(result)
@defer.inlineCallbacks
def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 62d794f22b..3a50c72e0b 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -139,7 +139,7 @@ class Mailer(object):
@defer.inlineCallbacks
def _fetch_room_state(room_id):
- room_state = yield self.state_handler.get_current_state_ids(room_id)
+ room_state = yield self.store.get_current_state_ids(room_id)
state_by_room[room_id] = room_state
# Run at most 3 of these at once: sync does 10 at a time but email
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 622b2d8540..518c9ea2e9 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -109,6 +109,10 @@ class SlavedEventStore(BaseSlavedStore):
get_recent_event_ids_for_room = (
StreamStore.__dict__["get_recent_event_ids_for_room"]
)
+ get_current_state_ids = (
+ StateStore.__dict__["get_current_state_ids"]
+ )
+ has_room_changed_since = DataStore.has_room_changed_since.__func__
get_unread_push_actions_for_user_in_range_for_http = (
DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 03881ea3dc..72319c35ae 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -467,14 +467,9 @@ class EventsStore(SQLBaseStore):
else:
return
- existing_state_rows = yield self._simple_select_list(
- table="current_state_events",
- keyvalues={"room_id": room_id},
- retcols=["event_id", "type", "state_key"],
- desc="_calculate_state_delta",
- )
+ existing_state = yield self.get_current_state_ids(room_id)
- existing_events = set(row["event_id"] for row in existing_state_rows)
+ existing_events = set(existing_state.itervalues())
new_events = set(ev_id for ev_id in current_state.itervalues())
changed_events = existing_events ^ new_events
@@ -482,9 +477,8 @@ class EventsStore(SQLBaseStore):
return
to_delete = {
- (row["type"], row["state_key"]): row["event_id"]
- for row in existing_state_rows
- if row["event_id"] in changed_events
+ key: ev_id for key, ev_id in existing_state.iteritems()
+ if ev_id in changed_events
}
events_to_insert = (new_events - existing_events)
to_insert = {
@@ -610,6 +604,10 @@ class EventsStore(SQLBaseStore):
txn, self.get_users_in_room, (room_id,)
)
+ self._invalidate_cache_and_stream(
+ txn, self.get_current_state_ids, (room_id,)
+ )
+
for room_id, new_extrem in new_forward_extremeties.items():
self._simple_delete_txn(
txn,
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 84482d8285..27f1ec89ec 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -14,7 +14,7 @@
# limitations under the License.
from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
from synapse.util.caches import intern_string
from synapse.storage.engines import PostgresEngine
@@ -69,6 +69,18 @@ class StateStore(SQLBaseStore):
where_clause="type='m.room.member'",
)
+ @cachedInlineCallbacks(max_entries=100000, iterable=True)
+ def get_current_state_ids(self, room_id):
+ rows = yield self._simple_select_list(
+ table="current_state_events",
+ keyvalues={"room_id": room_id},
+ retcols=["event_id", "type", "state_key"],
+ desc="_calculate_state_delta",
+ )
+ defer.returnValue({
+ (r["type"], r["state_key"]): r["event_id"] for r in rows
+ })
+
@defer.inlineCallbacks
def get_state_groups_ids(self, room_id, event_ids):
if not event_ids:
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 200d124632..dddd5fc0e7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -829,3 +829,6 @@ class StreamStore(SQLBaseStore):
updatevalues={"stream_id": stream_id},
desc="update_federation_out_pos",
)
+
+ def has_room_changed_since(self, room_id, stream_id):
+ return self._events_stream_cache.has_entity_changed(room_id, stream_id)
|