summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/_base.py3
-rw-r--r--synapse/handlers/federation.py11
-rw-r--r--synapse/handlers/message.py10
-rw-r--r--synapse/handlers/room.py17
-rw-r--r--synapse/handlers/sync.py98
-rw-r--r--synapse/state.py4
-rw-r--r--synapse/storage/events.py2
-rw-r--r--synapse/util/async.py32
8 files changed, 93 insertions, 84 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 90eabb6eb7..d407eaeee9 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -261,8 +261,7 @@ class BaseHandler(object):
 
                 context = yield state_handler.compute_event_context(
                     builder,
-                    old_state=(prev_member_event,),
-                    outlier=True
+                    old_state=(prev_member_event,)
                 )
 
         if builder.is_state():
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 4a35344d32..4049c01d26 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1118,11 +1118,9 @@ class FederationHandler(BaseHandler):
         """
         events_to_context = {}
         for e in itertools.chain(auth_events, state):
-            ctx = yield self.state_handler.compute_event_context(
-                e, outlier=True,
-            )
-            events_to_context[e.event_id] = ctx
             e.internal_metadata.outlier = True
+            ctx = yield self.state_handler.compute_event_context(e)
+            events_to_context[e.event_id] = ctx
 
         event_map = {
             e.event_id: e
@@ -1169,7 +1167,7 @@ class FederationHandler(BaseHandler):
         )
 
         new_event_context = yield self.state_handler.compute_event_context(
-            event, old_state=state, outlier=False,
+            event, old_state=state
         )
 
         event_stream_id, max_stream_id = yield self.store.persist_event(
@@ -1181,10 +1179,9 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _prep_event(self, origin, event, state=None, auth_events=None):
-        outlier = event.internal_metadata.is_outlier()
 
         context = yield self.state_handler.compute_event_context(
-            event, old_state=state, outlier=outlier,
+            event, old_state=state,
         )
 
         if not auth_events:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 5c50c611ba..0bb111d047 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -21,6 +21,7 @@ from synapse.streams.config import PaginationConfig
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.util import unwrapFirstError
+from synapse.util.async import concurrently_execute
 from synapse.util.caches.snapshot_cache import SnapshotCache
 from synapse.types import UserID, RoomStreamToken, StreamToken
 
@@ -556,14 +557,7 @@ class MessageHandler(BaseHandler):
             except:
                 logger.exception("Failed to get snapshot")
 
-        # Only do N rooms at once
-        n = 5
-        d_list = [handle_room(e) for e in room_list]
-        for i in range(0, len(d_list), n):
-            yield defer.gatherResults(
-                d_list[i:i + n],
-                consumeErrors=True
-            ).addErrback(unwrapFirstError)
+        yield concurrently_execute(handle_room, room_list, 10)
 
         account_data_events = []
         for account_data_type, content in account_data.items():
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index ee99ded214..3e1d9282d7 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -23,7 +23,8 @@ from synapse.api.constants import (
     EventTypes, JoinRules, RoomCreationPreset,
 )
 from synapse.api.errors import AuthError, StoreError, SynapseError
-from synapse.util import stringutils, unwrapFirstError
+from synapse.util import stringutils
+from synapse.util.async import concurrently_execute
 from synapse.util.logcontext import preserve_context_over_fn
 from synapse.util.caches.response_cache import ResponseCache
 
@@ -368,6 +369,8 @@ class RoomListHandler(BaseHandler):
     def _get_public_room_list(self):
         room_ids = yield self.store.get_public_room_ids()
 
+        results = []
+
         @defer.inlineCallbacks
         def handle_room(room_id):
             aliases = yield self.store.get_aliases_for_room(room_id)
@@ -428,18 +431,12 @@ class RoomListHandler(BaseHandler):
             joined_users = yield self.store.get_users_in_room(room_id)
             result["num_joined_members"] = len(joined_users)
 
-            defer.returnValue(result)
+            results.append(result)
 
-        result = []
-        for chunk in (room_ids[i:i + 10] for i in xrange(0, len(room_ids), 10)):
-            chunk_result = yield defer.gatherResults([
-                handle_room(room_id)
-                for room_id in chunk
-            ], consumeErrors=True).addErrback(unwrapFirstError)
-            result.extend(v for v in chunk_result if v)
+        yield concurrently_execute(handle_room, room_ids, 10)
 
         # FIXME (erikj): START is no longer a valid value
-        defer.returnValue({"start": "START", "end": "END", "chunk": result})
+        defer.returnValue({"start": "START", "end": "END", "chunk": results})
 
 
 class RoomContextHandler(BaseHandler):
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 48ab5707e1..e38fe1ef9c 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -17,8 +17,8 @@ from ._base import BaseHandler
 
 from synapse.streams.config import PaginationConfig
 from synapse.api.constants import Membership, EventTypes
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.async import concurrently_execute
+from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
@@ -250,58 +250,50 @@ class SyncHandler(BaseHandler):
         joined = []
         invited = []
         archived = []
-        deferreds = []
-
-        room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)]
-        for room_list_chunk in room_list_chunks:
-            for event in room_list_chunk:
-                if event.membership == Membership.JOIN:
-                    room_sync_deferred = preserve_fn(
-                        self.full_state_sync_for_joined_room
-                    )(
-                        room_id=event.room_id,
-                        sync_config=sync_config,
-                        now_token=now_token,
-                        timeline_since_token=timeline_since_token,
-                        ephemeral_by_room=ephemeral_by_room,
-                        tags_by_room=tags_by_room,
-                        account_data_by_room=account_data_by_room,
-                    )
-                    room_sync_deferred.addCallback(joined.append)
-                    deferreds.append(room_sync_deferred)
-                elif event.membership == Membership.INVITE:
-                    invite = yield self.store.get_event(event.event_id)
-                    invited.append(InvitedSyncResult(
-                        room_id=event.room_id,
-                        invite=invite,
-                    ))
-                elif event.membership in (Membership.LEAVE, Membership.BAN):
-                    # Always send down rooms we were banned or kicked from.
-                    if not sync_config.filter_collection.include_leave:
-                        if event.membership == Membership.LEAVE:
-                            if sync_config.user.to_string() == event.sender:
-                                continue
-
-                    leave_token = now_token.copy_and_replace(
-                        "room_key", "s%d" % (event.stream_ordering,)
-                    )
-                    room_sync_deferred = preserve_fn(
-                        self.full_state_sync_for_archived_room
-                    )(
-                        sync_config=sync_config,
-                        room_id=event.room_id,
-                        leave_event_id=event.event_id,
-                        leave_token=leave_token,
-                        timeline_since_token=timeline_since_token,
-                        tags_by_room=tags_by_room,
-                        account_data_by_room=account_data_by_room,
-                    )
-                    room_sync_deferred.addCallback(archived.append)
-                    deferreds.append(room_sync_deferred)
 
-            yield defer.gatherResults(
-                deferreds, consumeErrors=True
-            ).addErrback(unwrapFirstError)
+        user_id = sync_config.user.to_string()
+
+        @defer.inlineCallbacks
+        def _generate_room_entry(event):
+            if event.membership == Membership.JOIN:
+                room_result = yield self.full_state_sync_for_joined_room(
+                    room_id=event.room_id,
+                    sync_config=sync_config,
+                    now_token=now_token,
+                    timeline_since_token=timeline_since_token,
+                    ephemeral_by_room=ephemeral_by_room,
+                    tags_by_room=tags_by_room,
+                    account_data_by_room=account_data_by_room,
+                )
+                joined.append(room_result)
+            elif event.membership == Membership.INVITE:
+                invite = yield self.store.get_event(event.event_id)
+                invited.append(InvitedSyncResult(
+                    room_id=event.room_id,
+                    invite=invite,
+                ))
+            elif event.membership in (Membership.LEAVE, Membership.BAN):
+                # Always send down rooms we were banned or kicked from.
+                if not sync_config.filter_collection.include_leave:
+                    if event.membership == Membership.LEAVE:
+                        if user_id == event.sender:
+                            return
+
+                leave_token = now_token.copy_and_replace(
+                    "room_key", "s%d" % (event.stream_ordering,)
+                )
+                room_result = yield self.full_state_sync_for_archived_room(
+                    sync_config=sync_config,
+                    room_id=event.room_id,
+                    leave_event_id=event.event_id,
+                    leave_token=leave_token,
+                    timeline_since_token=timeline_since_token,
+                    tags_by_room=tags_by_room,
+                    account_data_by_room=account_data_by_room,
+                )
+                archived.append(room_result)
+
+        yield concurrently_execute(_generate_room_entry, room_list, 10)
 
         account_data_for_user = sync_config.filter_collection.filter_account_data(
             self.account_data_for_user(account_data)
diff --git a/synapse/state.py b/synapse/state.py
index 41d32e664a..4672ada1b3 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -100,7 +100,7 @@ class StateHandler(object):
         defer.returnValue(state)
 
     @defer.inlineCallbacks
-    def compute_event_context(self, event, old_state=None, outlier=False):
+    def compute_event_context(self, event, old_state=None):
         """ Fills out the context with the `current state` of the graph. The
         `current state` here is defined to be the state of the event graph
         just before the event - i.e. it never includes `event`
@@ -115,7 +115,7 @@ class StateHandler(object):
         """
         context = EventContext()
 
-        if outlier:
+        if event.internal_metadata.is_outlier():
             # If this is an outlier, then we know it shouldn't have any current
             # state. Certainly store.get_current_state won't return any, and
             # persisting the event won't store the state group.
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 4ab23c1597..c4dc3b3d51 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -192,7 +192,7 @@ class EventsStore(SQLBaseStore):
             txn.call_after(self.get_rooms_for_user.invalidate_all)
             txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
             txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
-            txn.call_after(self.get_room_name_and_aliases, event.room_id)
+            txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
 
             # Add an entry to the current_state_resets table to record the point
             # where we clobbered the current state
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 640fae3890..cd4d90f3cf 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -16,7 +16,8 @@
 
 from twisted.internet import defer, reactor
 
-from .logcontext import PreserveLoggingContext
+from .logcontext import PreserveLoggingContext, preserve_fn
+from synapse.util import unwrapFirstError
 
 
 @defer.inlineCallbacks
@@ -107,3 +108,32 @@ class ObservableDeferred(object):
         return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % (
             id(self), self._result, self._deferred,
         )
+
+
+def concurrently_execute(func, args, limit):
+    """Executes the function with each argument conncurrently while limiting
+    the number of concurrent executions.
+
+    Args:
+        func (func): Function to execute, should return a deferred.
+        args (list): List of arguments to pass to func, each invocation of func
+            gets a signle argument.
+        limit (int): Maximum number of conccurent executions.
+
+    Returns:
+        deferred: Resolved when all function invocations have finished.
+    """
+    it = iter(args)
+
+    @defer.inlineCallbacks
+    def _concurrently_execute_inner():
+        try:
+            while True:
+                yield func(it.next())
+        except StopIteration:
+            pass
+
+    return defer.gatherResults([
+        preserve_fn(_concurrently_execute_inner)()
+        for _ in xrange(limit)
+    ], consumeErrors=True).addErrback(unwrapFirstError)