diff options
-rw-r--r-- | synapse/handlers/_base.py | 3 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 11 | ||||
-rw-r--r-- | synapse/handlers/message.py | 10 | ||||
-rw-r--r-- | synapse/handlers/room.py | 17 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 98 | ||||
-rw-r--r-- | synapse/state.py | 4 | ||||
-rw-r--r-- | synapse/storage/events.py | 2 | ||||
-rw-r--r-- | synapse/util/async.py | 32 |
8 files changed, 93 insertions, 84 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 90eabb6eb7..d407eaeee9 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -261,8 +261,7 @@ class BaseHandler(object): context = yield state_handler.compute_event_context( builder, - old_state=(prev_member_event,), - outlier=True + old_state=(prev_member_event,) ) if builder.is_state(): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 4a35344d32..4049c01d26 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1118,11 +1118,9 @@ class FederationHandler(BaseHandler): """ events_to_context = {} for e in itertools.chain(auth_events, state): - ctx = yield self.state_handler.compute_event_context( - e, outlier=True, - ) - events_to_context[e.event_id] = ctx e.internal_metadata.outlier = True + ctx = yield self.state_handler.compute_event_context(e) + events_to_context[e.event_id] = ctx event_map = { e.event_id: e @@ -1169,7 +1167,7 @@ class FederationHandler(BaseHandler): ) new_event_context = yield self.state_handler.compute_event_context( - event, old_state=state, outlier=False, + event, old_state=state ) event_stream_id, max_stream_id = yield self.store.persist_event( @@ -1181,10 +1179,9 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _prep_event(self, origin, event, state=None, auth_events=None): - outlier = event.internal_metadata.is_outlier() context = yield self.state_handler.compute_event_context( - event, old_state=state, outlier=outlier, + event, old_state=state, ) if not auth_events: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5c50c611ba..0bb111d047 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -21,6 +21,7 @@ from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError +from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache from synapse.types import UserID, RoomStreamToken, StreamToken @@ -556,14 +557,7 @@ class MessageHandler(BaseHandler): except: logger.exception("Failed to get snapshot") - # Only do N rooms at once - n = 5 - d_list = [handle_room(e) for e in room_list] - for i in range(0, len(d_list), n): - yield defer.gatherResults( - d_list[i:i + n], - consumeErrors=True - ).addErrback(unwrapFirstError) + yield concurrently_execute(handle_room, room_list, 10) account_data_events = [] for account_data_type, content in account_data.items(): diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ee99ded214..3e1d9282d7 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -23,7 +23,8 @@ from synapse.api.constants import ( EventTypes, JoinRules, RoomCreationPreset, ) from synapse.api.errors import AuthError, StoreError, SynapseError -from synapse.util import stringutils, unwrapFirstError +from synapse.util import stringutils +from synapse.util.async import concurrently_execute from synapse.util.logcontext import preserve_context_over_fn from synapse.util.caches.response_cache import ResponseCache @@ -368,6 +369,8 @@ class RoomListHandler(BaseHandler): def _get_public_room_list(self): room_ids = yield self.store.get_public_room_ids() + results = [] + @defer.inlineCallbacks def handle_room(room_id): aliases = yield self.store.get_aliases_for_room(room_id) @@ -428,18 +431,12 @@ class RoomListHandler(BaseHandler): joined_users = yield self.store.get_users_in_room(room_id) result["num_joined_members"] = len(joined_users) - defer.returnValue(result) + results.append(result) - result = [] - for chunk in (room_ids[i:i + 10] for i in xrange(0, len(room_ids), 10)): - chunk_result = yield defer.gatherResults([ - handle_room(room_id) - for room_id in chunk - ], consumeErrors=True).addErrback(unwrapFirstError) - result.extend(v for v in chunk_result if v) + yield concurrently_execute(handle_room, room_ids, 10) # FIXME (erikj): START is no longer a valid value - defer.returnValue({"start": "START", "end": "END", "chunk": result}) + defer.returnValue({"start": "START", "end": "END", "chunk": results}) class RoomContextHandler(BaseHandler): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 48ab5707e1..e38fe1ef9c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -17,8 +17,8 @@ from ._base import BaseHandler from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes -from synapse.util import unwrapFirstError -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.async import concurrently_execute +from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user @@ -250,58 +250,50 @@ class SyncHandler(BaseHandler): joined = [] invited = [] archived = [] - deferreds = [] - - room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)] - for room_list_chunk in room_list_chunks: - for event in room_list_chunk: - if event.membership == Membership.JOIN: - room_sync_deferred = preserve_fn( - self.full_state_sync_for_joined_room - )( - room_id=event.room_id, - sync_config=sync_config, - now_token=now_token, - timeline_since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) - room_sync_deferred.addCallback(joined.append) - deferreds.append(room_sync_deferred) - elif event.membership == Membership.INVITE: - invite = yield self.store.get_event(event.event_id) - invited.append(InvitedSyncResult( - room_id=event.room_id, - invite=invite, - )) - elif event.membership in (Membership.LEAVE, Membership.BAN): - # Always send down rooms we were banned or kicked from. - if not sync_config.filter_collection.include_leave: - if event.membership == Membership.LEAVE: - if sync_config.user.to_string() == event.sender: - continue - - leave_token = now_token.copy_and_replace( - "room_key", "s%d" % (event.stream_ordering,) - ) - room_sync_deferred = preserve_fn( - self.full_state_sync_for_archived_room - )( - sync_config=sync_config, - room_id=event.room_id, - leave_event_id=event.event_id, - leave_token=leave_token, - timeline_since_token=timeline_since_token, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) - room_sync_deferred.addCallback(archived.append) - deferreds.append(room_sync_deferred) - yield defer.gatherResults( - deferreds, consumeErrors=True - ).addErrback(unwrapFirstError) + user_id = sync_config.user.to_string() + + @defer.inlineCallbacks + def _generate_room_entry(event): + if event.membership == Membership.JOIN: + room_result = yield self.full_state_sync_for_joined_room( + room_id=event.room_id, + sync_config=sync_config, + now_token=now_token, + timeline_since_token=timeline_since_token, + ephemeral_by_room=ephemeral_by_room, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + ) + joined.append(room_result) + elif event.membership == Membership.INVITE: + invite = yield self.store.get_event(event.event_id) + invited.append(InvitedSyncResult( + room_id=event.room_id, + invite=invite, + )) + elif event.membership in (Membership.LEAVE, Membership.BAN): + # Always send down rooms we were banned or kicked from. + if not sync_config.filter_collection.include_leave: + if event.membership == Membership.LEAVE: + if user_id == event.sender: + return + + leave_token = now_token.copy_and_replace( + "room_key", "s%d" % (event.stream_ordering,) + ) + room_result = yield self.full_state_sync_for_archived_room( + sync_config=sync_config, + room_id=event.room_id, + leave_event_id=event.event_id, + leave_token=leave_token, + timeline_since_token=timeline_since_token, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + ) + archived.append(room_result) + + yield concurrently_execute(_generate_room_entry, room_list, 10) account_data_for_user = sync_config.filter_collection.filter_account_data( self.account_data_for_user(account_data) diff --git a/synapse/state.py b/synapse/state.py index 41d32e664a..4672ada1b3 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -100,7 +100,7 @@ class StateHandler(object): defer.returnValue(state) @defer.inlineCallbacks - def compute_event_context(self, event, old_state=None, outlier=False): + def compute_event_context(self, event, old_state=None): """ Fills out the context with the `current state` of the graph. The `current state` here is defined to be the state of the event graph just before the event - i.e. it never includes `event` @@ -115,7 +115,7 @@ class StateHandler(object): """ context = EventContext() - if outlier: + if event.internal_metadata.is_outlier(): # If this is an outlier, then we know it shouldn't have any current # state. Certainly store.get_current_state won't return any, and # persisting the event won't store the state group. diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4ab23c1597..c4dc3b3d51 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -192,7 +192,7 @@ class EventsStore(SQLBaseStore): txn.call_after(self.get_rooms_for_user.invalidate_all) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) - txn.call_after(self.get_room_name_and_aliases, event.room_id) + txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,)) # Add an entry to the current_state_resets table to record the point # where we clobbered the current state diff --git a/synapse/util/async.py b/synapse/util/async.py index 640fae3890..cd4d90f3cf 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -16,7 +16,8 @@ from twisted.internet import defer, reactor -from .logcontext import PreserveLoggingContext +from .logcontext import PreserveLoggingContext, preserve_fn +from synapse.util import unwrapFirstError @defer.inlineCallbacks @@ -107,3 +108,32 @@ class ObservableDeferred(object): return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % ( id(self), self._result, self._deferred, ) + + +def concurrently_execute(func, args, limit): + """Executes the function with each argument conncurrently while limiting + the number of concurrent executions. + + Args: + func (func): Function to execute, should return a deferred. + args (list): List of arguments to pass to func, each invocation of func + gets a signle argument. + limit (int): Maximum number of conccurent executions. + + Returns: + deferred: Resolved when all function invocations have finished. + """ + it = iter(args) + + @defer.inlineCallbacks + def _concurrently_execute_inner(): + try: + while True: + yield func(it.next()) + except StopIteration: + pass + + return defer.gatherResults([ + preserve_fn(_concurrently_execute_inner)() + for _ in xrange(limit) + ], consumeErrors=True).addErrback(unwrapFirstError) |