summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/federation.py68
-rw-r--r--synapse/util/logcontext.py25
2 files changed, 69 insertions, 24 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d0c2b4d6ed..0cd5501b05 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 """Contains handlers for federation events."""
+import synapse.util.logcontext
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
@@ -114,6 +115,14 @@ class FederationHandler(BaseHandler):
             logger.debug("Already seen pdu %s", pdu.event_id)
             return
 
+        # If we are currently in the process of joining this room, then we
+        # queue up events for later processing.
+        if pdu.room_id in self.room_queues:
+            logger.info("Ignoring PDU %s for room %s from %s for now; join "
+                        "in progress", pdu.event_id, pdu.room_id, origin)
+            self.room_queues[pdu.room_id].append((pdu, origin))
+            return
+
         state = None
 
         auth_chain = []
@@ -274,26 +283,13 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def _process_received_pdu(self, origin, pdu, state=None, auth_chain=None):
+    def _process_received_pdu(self, origin, pdu, state, auth_chain):
         """ Called when we have a new pdu. We need to do auth checks and put it
         through the StateHandler.
-
-        auth_chain and state are None if we already have the necessary state
-        and prev_events in the db
         """
         event = pdu
 
-        logger.debug("Got event: %s", event.event_id)
-
-        # If we are currently in the process of joining this room, then we
-        # queue up events for later processing.
-        if event.room_id in self.room_queues:
-            self.room_queues[event.room_id].append((pdu, origin))
-            return
-
-        logger.debug("Processing event: %s", event.event_id)
-
-        logger.debug("Event: %s", event)
+        logger.debug("Processing event: %s", event)
 
         # FIXME (erikj): Awful hack to make the case where we are not currently
         # in the room work
@@ -862,8 +858,6 @@ class FederationHandler(BaseHandler):
         """
         logger.debug("Joining %s to %s", joinee, room_id)
 
-        yield self.store.clean_room_for_join(room_id)
-
         origin, event = yield self._make_and_verify_event(
             target_hosts,
             room_id,
@@ -872,7 +866,15 @@ class FederationHandler(BaseHandler):
             content,
         )
 
+        # This shouldn't happen, because the RoomMemberHandler has a
+        # linearizer lock which only allows one operation per user per room
+        # at a time - so this is just paranoia.
+        assert (room_id not in self.room_queues)
+
         self.room_queues[room_id] = []
+
+        yield self.store.clean_room_for_join(room_id)
+
         handled_events = set()
 
         try:
@@ -925,18 +927,36 @@ class FederationHandler(BaseHandler):
             room_queue = self.room_queues[room_id]
             del self.room_queues[room_id]
 
-            for p, origin in room_queue:
-                if p.event_id in handled_events:
-                    continue
+            # we don't need to wait for the queued events to be processed -
+            # it's just a best-effort thing at this point. We do want to do
+            # them roughly in order, though, otherwise we'll end up making
+            # lots of requests for missing prev_events which we do actually
+            # have. Hence we fire off the deferred, but don't wait for it.
 
-                try:
-                    self._process_received_pdu(origin, p)
-                except:
-                    logger.exception("Couldn't handle pdu")
+            synapse.util.logcontext.reset_context_after_deferred(
+                self._handle_queued_pdus(room_queue))
 
         defer.returnValue(True)
 
     @defer.inlineCallbacks
+    def _handle_queued_pdus(self, room_queue):
+        """Process PDUs which got queued up while we were busy send_joining.
+
+        Args:
+            room_queue (list[FrozenEvent, str]): list of PDUs to be processed
+                and the servers that sent them
+        """
+        for p, origin in room_queue:
+            try:
+                logger.info("Processing queued PDU %s which was received "
+                            "while we were joining %s", p.event_id, p.room_id)
+                yield self.on_receive_pdu(origin, p)
+            except Exception as e:
+                logger.warn(
+                    "Error handling queued PDU %s from %s: %s",
+                    p.event_id, origin, e)
+
+    @defer.inlineCallbacks
     @log_function
     def on_make_join_request(self, room_id, user_id):
         """ We've received a /make_join/ request, so we create a partial
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 6c83eb213d..d73670f9f2 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -308,6 +308,31 @@ def preserve_context_over_deferred(deferred, context=None):
     return d
 
 
+def reset_context_after_deferred(deferred):
+    """If the deferred is incomplete, add a callback which will reset the
+    context.
+
+    This is useful when you want to fire off a deferred, but don't want to
+    wait for it to complete. (The deferred will restore the current log context
+    when it completes, so if you don't do anything, it will leak log context.)
+
+    (If this feels asymmetric, consider it this way: we are effectively forking
+    a new thread of execution. We are probably currently within a
+    ``with LoggingContext()`` block, which is supposed to have a single entry
+    and exit point. But by spawning off another deferred, we are effectively
+    adding a new exit point.)
+
+    Args:
+        deferred (defer.Deferred): deferred
+    """
+    def reset_context(result):
+        LoggingContext.set_current_context(LoggingContext.sentinel)
+        return result
+
+    if not deferred.called:
+        deferred.addBoth(reset_context)
+
+
 def preserve_fn(f):
     """Ensures that function is called with correct context and that context is
     restored after return. Useful for wrapping functions that return a deferred