diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/handlers/federation.py | 68 | ||||
-rw-r--r-- | synapse/util/logcontext.py | 25 |
2 files changed, 69 insertions, 24 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d0c2b4d6ed..0cd5501b05 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -14,6 +14,7 @@ # limitations under the License. """Contains handlers for federation events.""" +import synapse.util.logcontext from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json from unpaddedbase64 import decode_base64 @@ -114,6 +115,14 @@ class FederationHandler(BaseHandler): logger.debug("Already seen pdu %s", pdu.event_id) return + # If we are currently in the process of joining this room, then we + # queue up events for later processing. + if pdu.room_id in self.room_queues: + logger.info("Ignoring PDU %s for room %s from %s for now; join " + "in progress", pdu.event_id, pdu.room_id, origin) + self.room_queues[pdu.room_id].append((pdu, origin)) + return + state = None auth_chain = [] @@ -274,26 +283,13 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def _process_received_pdu(self, origin, pdu, state=None, auth_chain=None): + def _process_received_pdu(self, origin, pdu, state, auth_chain): """ Called when we have a new pdu. We need to do auth checks and put it through the StateHandler. - - auth_chain and state are None if we already have the necessary state - and prev_events in the db """ event = pdu - logger.debug("Got event: %s", event.event_id) - - # If we are currently in the process of joining this room, then we - # queue up events for later processing. - if event.room_id in self.room_queues: - self.room_queues[event.room_id].append((pdu, origin)) - return - - logger.debug("Processing event: %s", event.event_id) - - logger.debug("Event: %s", event) + logger.debug("Processing event: %s", event) # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work @@ -862,8 +858,6 @@ class FederationHandler(BaseHandler): """ logger.debug("Joining %s to %s", joinee, room_id) - yield self.store.clean_room_for_join(room_id) - origin, event = yield self._make_and_verify_event( target_hosts, room_id, @@ -872,7 +866,15 @@ class FederationHandler(BaseHandler): content, ) + # This shouldn't happen, because the RoomMemberHandler has a + # linearizer lock which only allows one operation per user per room + # at a time - so this is just paranoia. + assert (room_id not in self.room_queues) + self.room_queues[room_id] = [] + + yield self.store.clean_room_for_join(room_id) + handled_events = set() try: @@ -925,18 +927,36 @@ class FederationHandler(BaseHandler): room_queue = self.room_queues[room_id] del self.room_queues[room_id] - for p, origin in room_queue: - if p.event_id in handled_events: - continue + # we don't need to wait for the queued events to be processed - + # it's just a best-effort thing at this point. We do want to do + # them roughly in order, though, otherwise we'll end up making + # lots of requests for missing prev_events which we do actually + # have. Hence we fire off the deferred, but don't wait for it. - try: - self._process_received_pdu(origin, p) - except: - logger.exception("Couldn't handle pdu") + synapse.util.logcontext.reset_context_after_deferred( + self._handle_queued_pdus(room_queue)) defer.returnValue(True) @defer.inlineCallbacks + def _handle_queued_pdus(self, room_queue): + """Process PDUs which got queued up while we were busy send_joining. + + Args: + room_queue (list[FrozenEvent, str]): list of PDUs to be processed + and the servers that sent them + """ + for p, origin in room_queue: + try: + logger.info("Processing queued PDU %s which was received " + "while we were joining %s", p.event_id, p.room_id) + yield self.on_receive_pdu(origin, p) + except Exception as e: + logger.warn( + "Error handling queued PDU %s from %s: %s", + p.event_id, origin, e) + + @defer.inlineCallbacks @log_function def on_make_join_request(self, room_id, user_id): """ We've received a /make_join/ request, so we create a partial diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 6c83eb213d..d73670f9f2 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -308,6 +308,31 @@ def preserve_context_over_deferred(deferred, context=None): return d +def reset_context_after_deferred(deferred): + """If the deferred is incomplete, add a callback which will reset the + context. + + This is useful when you want to fire off a deferred, but don't want to + wait for it to complete. (The deferred will restore the current log context + when it completes, so if you don't do anything, it will leak log context.) + + (If this feels asymmetric, consider it this way: we are effectively forking + a new thread of execution. We are probably currently within a + ``with LoggingContext()`` block, which is supposed to have a single entry + and exit point. But by spawning off another deferred, we are effectively + adding a new exit point.) + + Args: + deferred (defer.Deferred): deferred + """ + def reset_context(result): + LoggingContext.set_current_context(LoggingContext.sentinel) + return result + + if not deferred.called: + deferred.addBoth(reset_context) + + def preserve_fn(f): """Ensures that function is called with correct context and that context is restored after return. Useful for wrapping functions that return a deferred |