diff options
author | Mark Haines <mark.haines@matrix.org> | 2014-08-26 19:49:42 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2014-08-26 19:49:42 +0100 |
commit | d2798de660c84dea775f3c79ff8200baf084d244 (patch) | |
tree | 11501e9446fe13244e2e5e71d0747f32a124e1e1 /synapse/federation | |
parent | Use store.persist_event rather than pdu_actions.persist_outgoing/pdu_actions.... (diff) | |
download | synapse-d2798de660c84dea775f3c79ff8200baf084d244.tar.xz |
Fold federation/handler into handlers/federation
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/handler.py | 156 |
1 files changed, 0 insertions, 156 deletions
diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py deleted file mode 100644 index ce98f4f94a..0000000000 --- a/synapse/federation/handler.py +++ /dev/null @@ -1,156 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from twisted.internet import defer - -from .pdu_codec import PduCodec - -from synapse.api.errors import AuthError -from synapse.util.logutils import log_function - -import logging - - -logger = logging.getLogger(__name__) - - -class FederationEventHandler(object): - """ Responsible for: - a) handling received Pdus before handing them on as Events to the rest - of the home server (including auth and state conflict resoultion) - b) converting events that were produced by local clients that may need - to be sent to remote home servers. - """ - - def __init__(self, hs): - self.store = hs.get_datastore() - self.replication_layer = hs.get_replication_layer() - self.state_handler = hs.get_state_handler() - # self.auth_handler = gs.get_auth_handler() - self.event_handler = hs.get_handlers().federation_handler - self.server_name = hs.hostname - - self.lock_manager = hs.get_room_lock_manager() - - self.replication_layer.set_handler(self) - - self.pdu_codec = PduCodec(hs) - - @log_function - @defer.inlineCallbacks - def handle_new_event(self, event, snapshot): - """ Takes in an event from the client to server side, that has already - been authed and handled by the state module, and sends it to any - remote home servers that may be interested. - - Args: - event - snapshot (.storage.Snapshot): THe snapshot the event happened after - - Returns: - Deferred: Resolved when it has successfully been queued for - processing. - """ - yield self.fill_out_prev_events(event, snapshot) - - pdu = self.pdu_codec.pdu_from_event(event) - - if not hasattr(pdu, "destinations") or not pdu.destinations: - pdu.destinations = [] - - yield self.replication_layer.send_pdu(pdu) - - @log_function - @defer.inlineCallbacks - def backfill(self, dest, room_id, limit): - pdus = yield self.replication_layer.backfill(dest, room_id, limit) - - if not pdus: - defer.returnValue([]) - - events = [ - self.pdu_codec.event_from_pdu(pdu) - for pdu in pdus - ] - - defer.returnValue(events) - - @log_function - def get_state_for_room(self, destination, room_id): - return self.replication_layer.get_state_for_context( - destination, room_id - ) - - @log_function - @defer.inlineCallbacks - def on_receive_pdu(self, pdu, backfilled): - """ Called by the ReplicationLayer when we have a new pdu. We need to - do auth checks and put it throught the StateHandler. - """ - event = self.pdu_codec.event_from_pdu(pdu) - - try: - with (yield self.lock_manager.lock(pdu.context)): - if event.is_state and not backfilled: - is_new_state = yield self.state_handler.handle_new_state( - pdu - ) - if not is_new_state: - return - else: - is_new_state = False - - yield self.event_handler.on_receive(event, is_new_state, backfilled) - - except AuthError: - # TODO: Implement something in federation that allows us to - # respond to PDU. - raise - - return - - @defer.inlineCallbacks - def _on_new_state(self, pdu, new_state_event): - # TODO: Do any store stuff here. Notifiy C2S about this new - # state. - - yield self.store.update_current_state( - pdu_id=pdu.pdu_id, - origin=pdu.origin, - context=pdu.context, - pdu_type=pdu.pdu_type, - state_key=pdu.state_key - ) - - yield self.event_handler.on_receive(new_state_event) - - @defer.inlineCallbacks - def fill_out_prev_events(self, event, snapshot): - if hasattr(event, "prev_events"): - return - - results = snapshot.prev_pdus - - es = [ - "%s@%s" % (p_id, origin) for p_id, origin, _ in results - ] - - event.prev_events = [e for e in es if e != event.event_id] - - if results: - event.depth = max([int(v) for _, _, v in results]) + 1 - else: - event.depth = 0 |