diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/handlers/federation.py | 142 |
1 files changed, 134 insertions, 8 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bfc1ab86f2..a2e935add0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -20,6 +20,9 @@ from ._base import BaseHandler from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent from synapse.api.constants import Membership from synapse.util.logutils import log_function +from synapse.federation.pdu_codec import PduCodec + +from synapse.api.errors import AuthError from twisted.internet import defer @@ -30,8 +33,14 @@ logger = logging.getLogger(__name__) class FederationHandler(BaseHandler): + """Handles events that originated from federation. + Responsible for: + a) handling received Pdus before handing them on as Events to the rest + of the home server (including auth and state conflict resoultion) + b) converting events that were produced by local clients that may need + to be sent to remote home servers. + """ - """Handles events that originated from federation.""" def __init__(self, hs): super(FederationHandler, self).__init__(hs) @@ -42,6 +51,112 @@ class FederationHandler(BaseHandler): self.waiting_for_join_list = {} + self.store = hs.get_datastore() + self.replication_layer = hs.get_replication_layer() + self.state_handler = hs.get_state_handler() + # self.auth_handler = gs.get_auth_handler() + self.server_name = hs.hostname + + self.lock_manager = hs.get_room_lock_manager() + + self.replication_layer.set_handler(self) + + self.pdu_codec = PduCodec(hs) + + @log_function + @defer.inlineCallbacks + def handle_new_event(self, event, snapshot): + """ Takes in an event from the client to server side, that has already + been authed and handled by the state module, and sends it to any + remote home servers that may be interested. + + Args: + event + snapshot (.storage.Snapshot): THe snapshot the event happened after + + Returns: + Deferred: Resolved when it has successfully been queued for + processing. + """ + yield self.fill_out_prev_events(event, snapshot) + + pdu = self.pdu_codec.pdu_from_event(event) + + if not hasattr(pdu, "destinations") or not pdu.destinations: + pdu.destinations = [] + + yield self.replication_layer.send_pdu(pdu) + + + @log_function + def get_state_for_room(self, destination, room_id): + return self.replication_layer.get_state_for_context( + destination, room_id + ) + + @log_function + @defer.inlineCallbacks + def on_receive_pdu(self, pdu, backfilled): + """ Called by the ReplicationLayer when we have a new pdu. We need to + do auth checks and put it throught the StateHandler. + """ + event = self.pdu_codec.event_from_pdu(pdu) + + try: + with (yield self.lock_manager.lock(pdu.context)): + if event.is_state and not backfilled: + is_new_state = yield self.state_handler.handle_new_state( + pdu + ) + if not is_new_state: + return + else: + is_new_state = False + + yield self.on_receive(event, is_new_state, backfilled) + + except AuthError: + # TODO: Implement something in federation that allows us to + # respond to PDU. + raise + + return + + @defer.inlineCallbacks + def _on_new_state(self, pdu, new_state_event): + # TODO: Do any store stuff here. Notifiy C2S about this new + # state. + + yield self.store.update_current_state( + pdu_id=pdu.pdu_id, + origin=pdu.origin, + context=pdu.context, + pdu_type=pdu.pdu_type, + state_key=pdu.state_key + ) + + yield self.on_receive(new_state_event) + + @defer.inlineCallbacks + def fill_out_prev_events(self, event, snapshot): + if hasattr(event, "prev_events"): + return + + results = snapshot.prev_pdus + + es = [ + "%s@%s" % (p_id, origin) for p_id, origin, _ in results + ] + + event.prev_events = [e for e in es if e != event.event_id] + + if results: + event.depth = max([int(v) for _, _, v in results]) + 1 + else: + event.depth = 0 + + + @log_function @defer.inlineCallbacks def on_receive(self, event, is_new_state, backfilled): @@ -86,8 +201,7 @@ class FederationHandler(BaseHandler): if not room: # Huh, let's try and get the current state try: - federation = self.hs.get_federation() - yield federation.get_state_for_room( + yield self.get_state_for_room( event.origin, event.room_id ) @@ -119,11 +233,10 @@ class FederationHandler(BaseHandler): "user_joined_room", user=user, room_id=event.room_id ) - @log_function @defer.inlineCallbacks def backfill(self, dest, room_id, limit): - events = yield self.hs.get_federation().backfill(dest, room_id, limit) + events = yield self._backfill(dest, room_id, limit) for event in events: try: @@ -133,10 +246,23 @@ class FederationHandler(BaseHandler): defer.returnValue(events) + @defer.inlineCallbacks + def _backfill(self, dest, room_id, limit): + pdus = yield self.replication_layer.backfill(dest, room_id, limit) + + if not pdus: + defer.returnValue([]) + + events = [ + self.pdu_codec.event_from_pdu(pdu) + for pdu in pdus + ] + + defer.returnValue(events) + @log_function @defer.inlineCallbacks def do_invite_join(self, target_host, room_id, joinee, content): - federation = self.hs.get_federation() hosts = yield self.store.get_joined_hosts_for_room(room_id) if self.hs.hostname in hosts: @@ -146,7 +272,7 @@ class FederationHandler(BaseHandler): # First get current state to see if we are already joined. try: - yield federation.get_state_for_room(target_host, room_id) + yield self.get_state_for_room(target_host, room_id) hosts = yield self.store.get_joined_hosts_for_room(room_id) if self.hs.hostname in hosts: @@ -166,7 +292,7 @@ class FederationHandler(BaseHandler): new_event.destinations = [target_host] - yield federation.handle_new_event(new_event) + yield self.handle_new_event(new_event) # TODO (erikj): Time out here. d = defer.Deferred() |