summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/message.py60
1 files changed, 33 insertions, 27 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index fd09397226..7a57a69bd3 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -24,7 +24,7 @@ from synapse.push.action_generator import ActionGenerator
 from synapse.types import (
     UserID, RoomAlias, RoomStreamToken,
 )
-from synapse.util.async import run_on_reactor, ReadWriteLock
+from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
 from synapse.util.logcontext import preserve_fn
 from synapse.util.metrics import measure_func
 from synapse.visibility import filter_events_for_client
@@ -50,6 +50,10 @@ class MessageHandler(BaseHandler):
 
         self.pagination_lock = ReadWriteLock()
 
+        # We arbitrarily limit concurrent event creation for a room to 5.
+        # This is to stop us from diverging history *too* much.
+        self.limiter = Limiter(max_count=5)
+
     @defer.inlineCallbacks
     def purge_history(self, room_id, event_id):
         event = yield self.store.get_event(event_id)
@@ -191,36 +195,38 @@ class MessageHandler(BaseHandler):
         """
         builder = self.event_builder_factory.new(event_dict)
 
-        self.validator.validate_new(builder)
-
-        if builder.type == EventTypes.Member:
-            membership = builder.content.get("membership", None)
-            target = UserID.from_string(builder.state_key)
+        with (yield self.limiter.queue(builder.room_id)):
+            self.validator.validate_new(builder)
+
+            if builder.type == EventTypes.Member:
+                membership = builder.content.get("membership", None)
+                target = UserID.from_string(builder.state_key)
+
+                if membership in {Membership.JOIN, Membership.INVITE}:
+                    # If event doesn't include a display name, add one.
+                    profile = self.hs.get_handlers().profile_handler
+                    content = builder.content
+
+                    try:
+                        content["displayname"] = yield profile.get_displayname(target)
+                        content["avatar_url"] = yield profile.get_avatar_url(target)
+                    except Exception as e:
+                        logger.info(
+                            "Failed to get profile information for %r: %s",
+                            target, e
+                        )
 
-            if membership in {Membership.JOIN, Membership.INVITE}:
-                # If event doesn't include a display name, add one.
-                profile = self.hs.get_handlers().profile_handler
-                content = builder.content
+            if token_id is not None:
+                builder.internal_metadata.token_id = token_id
 
-                try:
-                    content["displayname"] = yield profile.get_displayname(target)
-                    content["avatar_url"] = yield profile.get_avatar_url(target)
-                except Exception as e:
-                    logger.info(
-                        "Failed to get profile information for %r: %s",
-                        target, e
-                    )
+            if txn_id is not None:
+                builder.internal_metadata.txn_id = txn_id
 
-        if token_id is not None:
-            builder.internal_metadata.token_id = token_id
-
-        if txn_id is not None:
-            builder.internal_metadata.txn_id = txn_id
+            event, context = yield self._create_new_client_event(
+                builder=builder,
+                prev_event_ids=prev_event_ids,
+            )
 
-        event, context = yield self._create_new_client_event(
-            builder=builder,
-            prev_event_ids=prev_event_ids,
-        )
         defer.returnValue((event, context))
 
     @defer.inlineCallbacks