diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index a0b7cb7963..da2f5e8cfd 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -31,6 +31,9 @@ logger = logging.getLogger(__name__)
class FederationBase(object):
+ def __init__(self, hs):
+ pass
+
@defer.inlineCallbacks
def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False,
include_none=False):
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index d835c1b038..b06387051c 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -52,6 +52,8 @@ sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
class FederationClient(FederationBase):
+ def __init__(self, hs):
+ super(FederationClient, self).__init__(hs)
def start_get_pdu_cache(self):
self._get_pdu_cache = ExpiringCache(
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index f1d231b9d8..2a589524a4 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
from .federation_base import FederationBase
from .units import Transaction, Edu
+from synapse.util.async import Linearizer
from synapse.util.logutils import log_function
from synapse.events import FrozenEvent
import synapse.metrics
@@ -44,6 +45,11 @@ received_queries_counter = metrics.register_counter("received_queries", labels=[
class FederationServer(FederationBase):
+ def __init__(self, hs):
+ super(FederationServer, self).__init__(hs)
+
+ self._room_pdu_linearizer = Linearizer()
+
def set_handler(self, handler):
"""Sets the handler that the replication layer will use to communicate
receipt of new PDUs from other home servers. The required methods are
@@ -187,13 +193,16 @@ class FederationServer(FederationBase):
)
for event in auth_chain:
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
+ # We sign these again because there was a bug where we
+ # incorrectly signed things the first time round
+ if self.hs.is_mine_id(event.event_id):
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
)
- )
else:
raise NotImplementedError("Specify an event")
@@ -377,10 +386,20 @@ class FederationServer(FederationBase):
@log_function
def on_get_missing_events(self, origin, room_id, earliest_events,
latest_events, limit, min_depth):
+ logger.info(
+ "on_get_missing_events: earliest_events: %r, latest_events: %r,"
+ " limit: %d, min_depth: %d",
+ earliest_events, latest_events, limit, min_depth
+ )
missing_events = yield self.handler.on_get_missing_events(
origin, room_id, earliest_events, latest_events, limit, min_depth
)
+ if len(missing_events) < 5:
+ logger.info("Returning %d events: %r", len(missing_events), missing_events)
+ else:
+ logger.info("Returning %d events", len(missing_events))
+
time_now = self._clock.time_msec()
defer.returnValue({
@@ -481,42 +500,59 @@ class FederationServer(FederationBase):
pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth:
if get_missing and prevs - seen:
- latest = yield self.store.get_latest_event_ids_in_room(
- pdu.room_id
- )
-
- # We add the prev events that we have seen to the latest
- # list to ensure the remote server doesn't give them to us
- latest = set(latest)
- latest |= seen
-
- missing_events = yield self.get_missing_events(
- origin,
- pdu.room_id,
- earliest_events_ids=list(latest),
- latest_events=[pdu],
- limit=10,
- min_depth=min_depth,
- )
-
- # We want to sort these by depth so we process them and
- # tell clients about them in order.
- missing_events.sort(key=lambda x: x.depth)
-
- for e in missing_events:
- yield self._handle_new_pdu(
- origin,
- e,
- get_missing=False
- )
-
- have_seen = yield self.store.have_events(
- [ev for ev, _ in pdu.prev_events]
- )
+ # If we're missing stuff, ensure we only fetch stuff one
+ # at a time.
+ with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
+ # We recalculate seen, since it may have changed.
+ have_seen = yield self.store.have_events(prevs)
+ seen = set(have_seen.keys())
+
+ if prevs - seen:
+ latest = yield self.store.get_latest_event_ids_in_room(
+ pdu.room_id
+ )
+
+ # We add the prev events that we have seen to the latest
+ # list to ensure the remote server doesn't give them to us
+ latest = set(latest)
+ latest |= seen
+
+ logger.info(
+ "Missing %d events for room %r: %r...",
+ len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ )
+
+ missing_events = yield self.get_missing_events(
+ origin,
+ pdu.room_id,
+ earliest_events_ids=list(latest),
+ latest_events=[pdu],
+ limit=10,
+ min_depth=min_depth,
+ )
+
+ # We want to sort these by depth so we process them and
+ # tell clients about them in order.
+ missing_events.sort(key=lambda x: x.depth)
+
+ for e in missing_events:
+ yield self._handle_new_pdu(
+ origin,
+ e,
+ get_missing=False
+ )
+
+ have_seen = yield self.store.have_events(
+ [ev for ev, _ in pdu.prev_events]
+ )
prevs = {e_id for e_id, _ in pdu.prev_events}
seen = set(have_seen.keys())
if prevs - seen:
+ logger.info(
+ "Still missing %d events for room %r: %r...",
+ len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ )
fetch_state = True
if fetch_state:
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 3e062a5eab..ea66a5dcbc 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -72,5 +72,7 @@ class ReplicationLayer(FederationClient, FederationServer):
self.hs = hs
+ super(ReplicationLayer, self).__init__(hs)
+
def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a1a334955f..8a1965f45a 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -37,7 +37,7 @@ class TransportLayerServer(JsonResource):
self.hs = hs
self.clock = hs.get_clock()
- super(TransportLayerServer, self).__init__(hs)
+ super(TransportLayerServer, self).__init__(hs, canonical_json=False)
self.authenticator = Authenticator(hs)
self.ratelimiter = FederationRateLimiter(
@@ -528,15 +528,10 @@ class PublicRoomList(BaseFederationServlet):
PATH = "/publicRooms"
@defer.inlineCallbacks
- def on_GET(self, request):
+ def on_GET(self, origin, content, query):
data = yield self.room_list_handler.get_local_public_room_list()
defer.returnValue((200, data))
- # Avoid doing remote HS authorization checks which are done by default by
- # BaseFederationServlet.
- def _wrap(self, code):
- return code
-
SERVLET_CLASSES = (
FederationSendServlet,
|