summary refs log tree commit diff
path: root/synapse/federation/federation_server.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r--synapse/federation/federation_server.py112
1 files changed, 74 insertions, 38 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index f1d231b9d8..2a589524a4 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
 from .federation_base import FederationBase
 from .units import Transaction, Edu
 
+from synapse.util.async import Linearizer
 from synapse.util.logutils import log_function
 from synapse.events import FrozenEvent
 import synapse.metrics
@@ -44,6 +45,11 @@ received_queries_counter = metrics.register_counter("received_queries", labels=[
 
 
 class FederationServer(FederationBase):
+    def __init__(self, hs):
+        super(FederationServer, self).__init__(hs)
+
+        self._room_pdu_linearizer = Linearizer()
+
     def set_handler(self, handler):
         """Sets the handler that the replication layer will use to communicate
         receipt of new PDUs from other home servers. The required methods are
@@ -187,13 +193,16 @@ class FederationServer(FederationBase):
             )
 
             for event in auth_chain:
-                event.signatures.update(
-                    compute_event_signature(
-                        event,
-                        self.hs.hostname,
-                        self.hs.config.signing_key[0]
+                # We sign these again because there was a bug where we
+                # incorrectly signed things the first time round
+                if self.hs.is_mine_id(event.event_id):
+                    event.signatures.update(
+                        compute_event_signature(
+                            event,
+                            self.hs.hostname,
+                            self.hs.config.signing_key[0]
+                        )
                     )
-                )
         else:
             raise NotImplementedError("Specify an event")
 
@@ -377,10 +386,20 @@ class FederationServer(FederationBase):
     @log_function
     def on_get_missing_events(self, origin, room_id, earliest_events,
                               latest_events, limit, min_depth):
+        logger.info(
+            "on_get_missing_events: earliest_events: %r, latest_events: %r,"
+            " limit: %d, min_depth: %d",
+            earliest_events, latest_events, limit, min_depth
+        )
         missing_events = yield self.handler.on_get_missing_events(
             origin, room_id, earliest_events, latest_events, limit, min_depth
         )
 
+        if len(missing_events) < 5:
+            logger.info("Returning %d events: %r", len(missing_events), missing_events)
+        else:
+            logger.info("Returning %d events", len(missing_events))
+
         time_now = self._clock.time_msec()
 
         defer.returnValue({
@@ -481,42 +500,59 @@ class FederationServer(FederationBase):
                 pdu.internal_metadata.outlier = True
             elif min_depth and pdu.depth > min_depth:
                 if get_missing and prevs - seen:
-                    latest = yield self.store.get_latest_event_ids_in_room(
-                        pdu.room_id
-                    )
-
-                    # We add the prev events that we have seen to the latest
-                    # list to ensure the remote server doesn't give them to us
-                    latest = set(latest)
-                    latest |= seen
-
-                    missing_events = yield self.get_missing_events(
-                        origin,
-                        pdu.room_id,
-                        earliest_events_ids=list(latest),
-                        latest_events=[pdu],
-                        limit=10,
-                        min_depth=min_depth,
-                    )
-
-                    # We want to sort these by depth so we process them and
-                    # tell clients about them in order.
-                    missing_events.sort(key=lambda x: x.depth)
-
-                    for e in missing_events:
-                        yield self._handle_new_pdu(
-                            origin,
-                            e,
-                            get_missing=False
-                        )
-
-                    have_seen = yield self.store.have_events(
-                        [ev for ev, _ in pdu.prev_events]
-                    )
+                    # If we're missing stuff, ensure we only fetch stuff one
+                    # at a time.
+                    with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
+                        # We recalculate seen, since it may have changed.
+                        have_seen = yield self.store.have_events(prevs)
+                        seen = set(have_seen.keys())
+
+                        if prevs - seen:
+                            latest = yield self.store.get_latest_event_ids_in_room(
+                                pdu.room_id
+                            )
+
+                            # We add the prev events that we have seen to the latest
+                            # list to ensure the remote server doesn't give them to us
+                            latest = set(latest)
+                            latest |= seen
+
+                            logger.info(
+                                "Missing %d events for room %r: %r...",
+                                len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+                            )
+
+                            missing_events = yield self.get_missing_events(
+                                origin,
+                                pdu.room_id,
+                                earliest_events_ids=list(latest),
+                                latest_events=[pdu],
+                                limit=10,
+                                min_depth=min_depth,
+                            )
+
+                            # We want to sort these by depth so we process them and
+                            # tell clients about them in order.
+                            missing_events.sort(key=lambda x: x.depth)
+
+                            for e in missing_events:
+                                yield self._handle_new_pdu(
+                                    origin,
+                                    e,
+                                    get_missing=False
+                                )
+
+                            have_seen = yield self.store.have_events(
+                                [ev for ev, _ in pdu.prev_events]
+                            )
 
             prevs = {e_id for e_id, _ in pdu.prev_events}
             seen = set(have_seen.keys())
             if prevs - seen:
+                logger.info(
+                    "Still missing %d events for room %r: %r...",
+                    len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+                )
                 fetch_state = True
 
         if fetch_state: