diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index bf89d568af..3e0cd294a1 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -27,14 +27,24 @@ from twisted.internet.abstract import isIPAddress
from twisted.python import failure
from synapse.api.constants import EventTypes
-from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError
+from synapse.api.errors import (
+ AuthError,
+ FederationError,
+ IncompatibleRoomVersionError,
+ NotFoundError,
+ SynapseError,
+)
from synapse.crypto.event_signing import compute_event_signature
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
+from synapse.replication.http.federation import (
+ ReplicationFederationSendEduRestServlet,
+ ReplicationGetQueryRestServlet,
+)
from synapse.types import get_domain_from_id
-from synapse.util import async
+from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logutils import log_function
@@ -61,8 +71,8 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler
- self._server_linearizer = async.Linearizer("fed_server")
- self._transaction_linearizer = async.Linearizer("fed_txn_handler")
+ self._server_linearizer = Linearizer("fed_server")
+ self._transaction_linearizer = Linearizer("fed_txn_handler")
self.transaction_actions = TransactionActions(self.store)
@@ -194,7 +204,7 @@ class FederationServer(FederationBase):
event_id, f.getTraceback().rstrip(),
)
- yield async.concurrently_execute(
+ yield concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),
TRANSACTION_CONCURRENCY_LIMIT,
)
@@ -323,12 +333,21 @@ class FederationServer(FederationBase):
defer.returnValue((200, resp))
@defer.inlineCallbacks
- def on_make_join_request(self, origin, room_id, user_id):
+ def on_make_join_request(self, origin, room_id, user_id, supported_versions):
origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id)
+
+ room_version = yield self.store.get_room_version(room_id)
+ if room_version not in supported_versions:
+ logger.warn("Room version %s not in %s", room_version, supported_versions)
+ raise IncompatibleRoomVersionError(room_version=room_version)
+
pdu = yield self.handler.on_make_join_request(room_id, user_id)
time_now = self._clock.time_msec()
- defer.returnValue({"event": pdu.get_pdu_json(time_now)})
+ defer.returnValue({
+ "event": pdu.get_pdu_json(time_now),
+ "room_version": room_version,
+ })
@defer.inlineCallbacks
def on_invite_request(self, origin, content):
@@ -745,6 +764,8 @@ class FederationHandlerRegistry(object):
if edu_type in self.edu_handlers:
raise KeyError("Already have an EDU handler for %s" % (edu_type,))
+ logger.info("Registering federation EDU handler for %r", edu_type)
+
self.edu_handlers[edu_type] = handler
def register_query_handler(self, query_type, handler):
@@ -763,6 +784,8 @@ class FederationHandlerRegistry(object):
"Already have a Query handler for %s" % (query_type,)
)
+ logger.info("Registering federation query handler for %r", query_type)
+
self.query_handlers[query_type] = handler
@defer.inlineCallbacks
@@ -785,3 +808,49 @@ class FederationHandlerRegistry(object):
raise NotFoundError("No handler for Query type '%s'" % (query_type,))
return handler(args)
+
+
+class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
+ """A FederationHandlerRegistry for worker processes.
+
+ When receiving EDU or queries it will check if an appropriate handler has
+ been registered on the worker, if there isn't one then it calls off to the
+ master process.
+ """
+
+ def __init__(self, hs):
+ self.config = hs.config
+ self.http_client = hs.get_simple_http_client()
+ self.clock = hs.get_clock()
+
+ self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
+ self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
+
+ super(ReplicationFederationHandlerRegistry, self).__init__()
+
+ def on_edu(self, edu_type, origin, content):
+ """Overrides FederationHandlerRegistry
+ """
+ handler = self.edu_handlers.get(edu_type)
+ if handler:
+ return super(ReplicationFederationHandlerRegistry, self).on_edu(
+ edu_type, origin, content,
+ )
+
+ return self._send_edu(
+ edu_type=edu_type,
+ origin=origin,
+ content=content,
+ )
+
+ def on_query(self, query_type, args):
+ """Overrides FederationHandlerRegistry
+ """
+ handler = self.query_handlers.get(query_type)
+ if handler:
+ return handler(args)
+
+ return self._get_query_client(
+ query_type=query_type,
+ args=args,
+ )
|