diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/appservice.py | 4 | ||||
-rw-r--r-- | synapse/handlers/initial_sync.py | 4 | ||||
-rw-r--r-- | synapse/handlers/room_list.py | 50 |
3 files changed, 34 insertions, 24 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 543bf28aec..feca3e4c10 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.util.metrics import Measure -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn import logging @@ -159,7 +159,7 @@ class ApplicationServicesHandler(object): def query_3pe(self, kind, protocol, fields): services = yield self._get_services_for_3pn(protocol) - results = yield preserve_context_over_deferred(defer.DeferredList([ + results = yield make_deferred_yieldable(defer.DeferredList([ preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) for service in services ], consumeErrors=True)) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 9718d4abc5..c5267b4b84 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -27,7 +27,7 @@ from synapse.types import ( from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -163,7 +163,7 @@ class InitialSyncHandler(BaseHandler): lambda states: states[event.event_id] ) - (messages, token), current_state = yield preserve_context_over_deferred( + (messages, token), current_state = yield make_deferred_yieldable( defer.gatherResults( [ preserve_fn(self.store.get_recent_events_for_room)( diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 2cf34e51cb..bb40075387 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -154,6 +154,8 @@ class RoomListHandler(BaseHandler): # We want larger rooms to be first, hence negating num_joined_users rooms_to_order_value[room_id] = (-num_joined_users, room_id) + logger.info("Getting ordering for %i rooms since %s", + len(room_ids), stream_token) yield concurrently_execute(get_order_for_room, room_ids, 10) sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1]) @@ -181,34 +183,42 @@ class RoomListHandler(BaseHandler): rooms_to_scan = rooms_to_scan[:since_token.current_limit] rooms_to_scan.reverse() - # Actually generate the entries. _append_room_entry_to_chunk will append to - # chunk but will stop if len(chunk) > limit - chunk = [] - if limit and not search_filter: + logger.info("After sorting and filtering, %i rooms remain", + len(rooms_to_scan)) + + # _append_room_entry_to_chunk will append to chunk but will stop if + # len(chunk) > limit + # + # Normally we will generate enough results on the first iteration here, + # but if there is a search filter, _append_room_entry_to_chunk may + # filter some results out, in which case we loop again. + # + # We don't want to scan over the entire range either as that + # would potentially waste a lot of work. + # + # XXX if there is no limit, we may end up DoSing the server with + # calls to get_current_state_ids for every single room on the + # server. Surely we should cap this somehow? + # + if limit: step = limit + 1 - for i in xrange(0, len(rooms_to_scan), step): - # We iterate here because the vast majority of cases we'll stop - # at first iteration, but occaisonally _append_room_entry_to_chunk - # won't append to the chunk and so we need to loop again. - # We don't want to scan over the entire range either as that - # would potentially waste a lot of work. - yield concurrently_execute( - lambda r: self._append_room_entry_to_chunk( - r, rooms_to_num_joined[r], - chunk, limit, search_filter - ), - rooms_to_scan[i:i + step], 10 - ) - if len(chunk) >= limit + 1: - break else: + step = len(rooms_to_scan) + + chunk = [] + for i in xrange(0, len(rooms_to_scan), step): + batch = rooms_to_scan[i:i + step] + logger.info("Processing %i rooms for result", len(batch)) yield concurrently_execute( lambda r: self._append_room_entry_to_chunk( r, rooms_to_num_joined[r], chunk, limit, search_filter ), - rooms_to_scan, 5 + batch, 5, ) + logger.info("Now %i rooms in result", len(chunk)) + if len(chunk) >= limit + 1: + break chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"])) |