diff options
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 99 |
1 files changed, 89 insertions, 10 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bfc1ab86f2..7253f56322 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -20,6 +20,9 @@ from ._base import BaseHandler from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent from synapse.api.constants import Membership from synapse.util.logutils import log_function +from synapse.federation.pdu_codec import PduCodec + +from synapse.api.errors import AuthError from twisted.internet import defer @@ -30,8 +33,14 @@ logger = logging.getLogger(__name__) class FederationHandler(BaseHandler): + """Handles events that originated from federation. + Responsible for: + a) handling received Pdus before handing them on as Events to the rest + of the home server (including auth and state conflict resoultion) + b) converting events that were produced by local clients that may need + to be sent to remote home servers. + """ - """Handles events that originated from federation.""" def __init__(self, hs): super(FederationHandler, self).__init__(hs) @@ -42,9 +51,67 @@ class FederationHandler(BaseHandler): self.waiting_for_join_list = {} + self.store = hs.get_datastore() + self.replication_layer = hs.get_replication_layer() + self.state_handler = hs.get_state_handler() + # self.auth_handler = gs.get_auth_handler() + self.server_name = hs.hostname + + self.lock_manager = hs.get_room_lock_manager() + + self.replication_layer.set_handler(self) + + self.pdu_codec = PduCodec(hs) + @log_function @defer.inlineCallbacks - def on_receive(self, event, is_new_state, backfilled): + def handle_new_event(self, event, snapshot): + """ Takes in an event from the client to server side, that has already + been authed and handled by the state module, and sends it to any + remote home servers that may be interested. + + Args: + event + snapshot (.storage.Snapshot): THe snapshot the event happened after + + Returns: + Deferred: Resolved when it has successfully been queued for + processing. + """ + + pdu = self.pdu_codec.pdu_from_event(event) + + if not hasattr(pdu, "destinations") or not pdu.destinations: + pdu.destinations = [] + + yield self.replication_layer.send_pdu(pdu) + + @log_function + def get_state_for_room(self, destination, room_id): + return self.replication_layer.get_state_for_context( + destination, room_id + ) + + @log_function + @defer.inlineCallbacks + def on_receive_pdu(self, pdu, backfilled): + """ Called by the ReplicationLayer when we have a new pdu. We need to + do auth checks and put it throught the StateHandler. + """ + event = self.pdu_codec.event_from_pdu(pdu) + + with (yield self.lock_manager.lock(pdu.context)): + if event.is_state and not backfilled: + is_new_state = yield self.state_handler.handle_new_state( + pdu + ) + if not is_new_state: + return + else: + is_new_state = False + # TODO: Implement something in federation that allows us to + # respond to PDU. + if hasattr(event, "state_key") and not is_new_state: logger.debug("Ignoring old state.") return @@ -86,8 +153,7 @@ class FederationHandler(BaseHandler): if not room: # Huh, let's try and get the current state try: - federation = self.hs.get_federation() - yield federation.get_state_for_room( + yield self.get_state_for_room( event.origin, event.room_id ) @@ -119,11 +185,10 @@ class FederationHandler(BaseHandler): "user_joined_room", user=user, room_id=event.room_id ) - @log_function @defer.inlineCallbacks def backfill(self, dest, room_id, limit): - events = yield self.hs.get_federation().backfill(dest, room_id, limit) + events = yield self._backfill(dest, room_id, limit) for event in events: try: @@ -133,10 +198,23 @@ class FederationHandler(BaseHandler): defer.returnValue(events) + @defer.inlineCallbacks + def _backfill(self, dest, room_id, limit): + pdus = yield self.replication_layer.backfill(dest, room_id, limit) + + if not pdus: + defer.returnValue([]) + + events = [ + self.pdu_codec.event_from_pdu(pdu) + for pdu in pdus + ] + + defer.returnValue(events) + @log_function @defer.inlineCallbacks - def do_invite_join(self, target_host, room_id, joinee, content): - federation = self.hs.get_federation() + def do_invite_join(self, target_host, room_id, joinee, content, snapshot): hosts = yield self.store.get_joined_hosts_for_room(room_id) if self.hs.hostname in hosts: @@ -146,7 +224,7 @@ class FederationHandler(BaseHandler): # First get current state to see if we are already joined. try: - yield federation.get_state_for_room(target_host, room_id) + yield self.get_state_for_room(target_host, room_id) hosts = yield self.store.get_joined_hosts_for_room(room_id) if self.hs.hostname in hosts: @@ -166,7 +244,8 @@ class FederationHandler(BaseHandler): new_event.destinations = [target_host] - yield federation.handle_new_event(new_event) + snapshot.fill_out_prev_events(new_event) + yield self.handle_new_event(new_event, snapshot) # TODO (erikj): Time out here. d = defer.Deferred() |