summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py73
1 files changed, 34 insertions, 39 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 81df45177a..7a57a69bd3 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,9 +22,9 @@ from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.push.action_generator import ActionGenerator
 from synapse.types import (
-    UserID, RoomAlias, RoomStreamToken, get_domain_from_id
+    UserID, RoomAlias, RoomStreamToken,
 )
-from synapse.util.async import run_on_reactor, ReadWriteLock
+from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
 from synapse.util.logcontext import preserve_fn
 from synapse.util.metrics import measure_func
 from synapse.visibility import filter_events_for_client
@@ -50,6 +50,10 @@ class MessageHandler(BaseHandler):
 
         self.pagination_lock = ReadWriteLock()
 
+        # We arbitrarily limit concurrent event creation for a room to 5.
+        # This is to stop us from diverging history *too* much.
+        self.limiter = Limiter(max_count=5)
+
     @defer.inlineCallbacks
     def purge_history(self, room_id, event_id):
         event = yield self.store.get_event(event_id)
@@ -191,36 +195,38 @@ class MessageHandler(BaseHandler):
         """
         builder = self.event_builder_factory.new(event_dict)
 
-        self.validator.validate_new(builder)
-
-        if builder.type == EventTypes.Member:
-            membership = builder.content.get("membership", None)
-            target = UserID.from_string(builder.state_key)
-
-            if membership in {Membership.JOIN, Membership.INVITE}:
-                # If event doesn't include a display name, add one.
-                profile = self.hs.get_handlers().profile_handler
-                content = builder.content
+        with (yield self.limiter.queue(builder.room_id)):
+            self.validator.validate_new(builder)
+
+            if builder.type == EventTypes.Member:
+                membership = builder.content.get("membership", None)
+                target = UserID.from_string(builder.state_key)
+
+                if membership in {Membership.JOIN, Membership.INVITE}:
+                    # If event doesn't include a display name, add one.
+                    profile = self.hs.get_handlers().profile_handler
+                    content = builder.content
+
+                    try:
+                        content["displayname"] = yield profile.get_displayname(target)
+                        content["avatar_url"] = yield profile.get_avatar_url(target)
+                    except Exception as e:
+                        logger.info(
+                            "Failed to get profile information for %r: %s",
+                            target, e
+                        )
 
-                try:
-                    content["displayname"] = yield profile.get_displayname(target)
-                    content["avatar_url"] = yield profile.get_avatar_url(target)
-                except Exception as e:
-                    logger.info(
-                        "Failed to get profile information for %r: %s",
-                        target, e
-                    )
+            if token_id is not None:
+                builder.internal_metadata.token_id = token_id
 
-        if token_id is not None:
-            builder.internal_metadata.token_id = token_id
+            if txn_id is not None:
+                builder.internal_metadata.txn_id = txn_id
 
-        if txn_id is not None:
-            builder.internal_metadata.txn_id = txn_id
+            event, context = yield self._create_new_client_event(
+                builder=builder,
+                prev_event_ids=prev_event_ids,
+            )
 
-        event, context = yield self._create_new_client_event(
-            builder=builder,
-            prev_event_ids=prev_event_ids,
-        )
         defer.returnValue((event, context))
 
     @defer.inlineCallbacks
@@ -599,13 +605,6 @@ class MessageHandler(BaseHandler):
             event_stream_id, max_stream_id
         )
 
-        users_in_room = yield self.store.get_joined_users_from_context(event, context)
-
-        destinations = [
-            get_domain_from_id(user_id) for user_id in users_in_room
-            if not self.hs.is_mine_id(user_id)
-        ]
-
         @defer.inlineCallbacks
         def _notify():
             yield run_on_reactor()
@@ -618,7 +617,3 @@ class MessageHandler(BaseHandler):
 
         # If invite, remove room_state from unsigned before sending.
         event.unsigned.pop("invite_room_state", None)
-
-        preserve_fn(federation_handler.handle_new_event)(
-            event, destinations=destinations,
-        )