diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 5601ecea6e..5eeb7042c6 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -266,8 +266,7 @@ class BaseHandler(object):
context = yield state_handler.compute_event_context(
builder,
- old_state=(prev_member_event,),
- outlier=True
+ old_state=(prev_member_event,)
)
if builder.is_state():
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 092802b973..adafd06b24 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1118,11 +1118,9 @@ class FederationHandler(BaseHandler):
"""
events_to_context = {}
for e in itertools.chain(auth_events, state):
- ctx = yield self.state_handler.compute_event_context(
- e, outlier=True,
- )
- events_to_context[e.event_id] = ctx
e.internal_metadata.outlier = True
+ ctx = yield self.state_handler.compute_event_context(e)
+ events_to_context[e.event_id] = ctx
event_map = {
e.event_id: e
@@ -1169,7 +1167,7 @@ class FederationHandler(BaseHandler):
)
new_event_context = yield self.state_handler.compute_event_context(
- event, old_state=state, outlier=False,
+ event, old_state=state
)
event_stream_id, max_stream_id = yield self.store.persist_event(
@@ -1181,10 +1179,9 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, auth_events=None):
- outlier = event.internal_metadata.is_outlier()
context = yield self.state_handler.compute_event_context(
- event, old_state=state, outlier=outlier,
+ event, old_state=state,
)
if not auth_events:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 5c50c611ba..0bb111d047 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -21,6 +21,7 @@ from synapse.streams.config import PaginationConfig
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
+from synapse.util.async import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.types import UserID, RoomStreamToken, StreamToken
@@ -556,14 +557,7 @@ class MessageHandler(BaseHandler):
except:
logger.exception("Failed to get snapshot")
- # Only do N rooms at once
- n = 5
- d_list = [handle_room(e) for e in room_list]
- for i in range(0, len(d_list), n):
- yield defer.gatherResults(
- d_list[i:i + n],
- consumeErrors=True
- ).addErrback(unwrapFirstError)
+ yield concurrently_execute(handle_room, room_list, 10)
account_data_events = []
for account_data_type, content in account_data.items():
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index ee99ded214..3e1d9282d7 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -23,7 +23,8 @@ from synapse.api.constants import (
EventTypes, JoinRules, RoomCreationPreset,
)
from synapse.api.errors import AuthError, StoreError, SynapseError
-from synapse.util import stringutils, unwrapFirstError
+from synapse.util import stringutils
+from synapse.util.async import concurrently_execute
from synapse.util.logcontext import preserve_context_over_fn
from synapse.util.caches.response_cache import ResponseCache
@@ -368,6 +369,8 @@ class RoomListHandler(BaseHandler):
def _get_public_room_list(self):
room_ids = yield self.store.get_public_room_ids()
+ results = []
+
@defer.inlineCallbacks
def handle_room(room_id):
aliases = yield self.store.get_aliases_for_room(room_id)
@@ -428,18 +431,12 @@ class RoomListHandler(BaseHandler):
joined_users = yield self.store.get_users_in_room(room_id)
result["num_joined_members"] = len(joined_users)
- defer.returnValue(result)
+ results.append(result)
- result = []
- for chunk in (room_ids[i:i + 10] for i in xrange(0, len(room_ids), 10)):
- chunk_result = yield defer.gatherResults([
- handle_room(room_id)
- for room_id in chunk
- ], consumeErrors=True).addErrback(unwrapFirstError)
- result.extend(v for v in chunk_result if v)
+ yield concurrently_execute(handle_room, room_ids, 10)
# FIXME (erikj): START is no longer a valid value
- defer.returnValue({"start": "START", "end": "END", "chunk": result})
+ defer.returnValue({"start": "START", "end": "END", "chunk": results})
class RoomContextHandler(BaseHandler):
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 20a0626574..231140b655 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -17,8 +17,8 @@ from ._base import BaseHandler
from synapse.streams.config import PaginationConfig
from synapse.api.constants import Membership, EventTypes
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.async import concurrently_execute
+from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
@@ -250,58 +250,50 @@ class SyncHandler(BaseHandler):
joined = []
invited = []
archived = []
- deferreds = []
-
- room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)]
- for room_list_chunk in room_list_chunks:
- for event in room_list_chunk:
- if event.membership == Membership.JOIN:
- room_sync_deferred = preserve_fn(
- self.full_state_sync_for_joined_room
- )(
- room_id=event.room_id,
- sync_config=sync_config,
- now_token=now_token,
- timeline_since_token=timeline_since_token,
- ephemeral_by_room=ephemeral_by_room,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
- )
- room_sync_deferred.addCallback(joined.append)
- deferreds.append(room_sync_deferred)
- elif event.membership == Membership.INVITE:
- invite = yield self.store.get_event(event.event_id)
- invited.append(InvitedSyncResult(
- room_id=event.room_id,
- invite=invite,
- ))
- elif event.membership in (Membership.LEAVE, Membership.BAN):
- # Always send down rooms we were banned or kicked from.
- if not sync_config.filter_collection.include_leave:
- if event.membership == Membership.LEAVE:
- if sync_config.user.to_string() == event.sender:
- continue
-
- leave_token = now_token.copy_and_replace(
- "room_key", "s%d" % (event.stream_ordering,)
- )
- room_sync_deferred = preserve_fn(
- self.full_state_sync_for_archived_room
- )(
- sync_config=sync_config,
- room_id=event.room_id,
- leave_event_id=event.event_id,
- leave_token=leave_token,
- timeline_since_token=timeline_since_token,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
- )
- room_sync_deferred.addCallback(archived.append)
- deferreds.append(room_sync_deferred)
- yield defer.gatherResults(
- deferreds, consumeErrors=True
- ).addErrback(unwrapFirstError)
+ user_id = sync_config.user.to_string()
+
+ @defer.inlineCallbacks
+ def _generate_room_entry(event):
+ if event.membership == Membership.JOIN:
+ room_result = yield self.full_state_sync_for_joined_room(
+ room_id=event.room_id,
+ sync_config=sync_config,
+ now_token=now_token,
+ timeline_since_token=timeline_since_token,
+ ephemeral_by_room=ephemeral_by_room,
+ tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
+ )
+ joined.append(room_result)
+ elif event.membership == Membership.INVITE:
+ invite = yield self.store.get_event(event.event_id)
+ invited.append(InvitedSyncResult(
+ room_id=event.room_id,
+ invite=invite,
+ ))
+ elif event.membership in (Membership.LEAVE, Membership.BAN):
+ # Always send down rooms we were banned or kicked from.
+ if not sync_config.filter_collection.include_leave:
+ if event.membership == Membership.LEAVE:
+ if user_id == event.sender:
+ return
+
+ leave_token = now_token.copy_and_replace(
+ "room_key", "s%d" % (event.stream_ordering,)
+ )
+ room_result = yield self.full_state_sync_for_archived_room(
+ sync_config=sync_config,
+ room_id=event.room_id,
+ leave_event_id=event.event_id,
+ leave_token=leave_token,
+ timeline_since_token=timeline_since_token,
+ tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
+ )
+ archived.append(room_result)
+
+ yield concurrently_execute(_generate_room_entry, room_list, 10)
account_data_for_user = sync_config.filter_collection.filter_account_data(
self.account_data_for_user(account_data)
|