diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/handler.py | 157 | ||||
-rw-r--r-- | synapse/federation/persistence.py | 64 | ||||
-rw-r--r-- | synapse/federation/replication.py | 6 |
3 files changed, 2 insertions, 225 deletions
diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py deleted file mode 100644 index 984c1558e9..0000000000 --- a/synapse/federation/handler.py +++ /dev/null @@ -1,157 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from twisted.internet import defer - -from .pdu_codec import PduCodec - -from synapse.api.errors import AuthError -from synapse.util.logutils import log_function - -import logging - - -logger = logging.getLogger(__name__) - - -class FederationEventHandler(object): - """ Responsible for: - a) handling received Pdus before handing them on as Events to the rest - of the home server (including auth and state conflict resoultion) - b) converting events that were produced by local clients that may need - to be sent to remote home servers. - """ - - def __init__(self, hs): - self.store = hs.get_datastore() - self.replication_layer = hs.get_replication_layer() - self.state_handler = hs.get_state_handler() - # self.auth_handler = gs.get_auth_handler() - self.event_handler = hs.get_handlers().federation_handler - self.server_name = hs.hostname - - self.lock_manager = hs.get_room_lock_manager() - - self.replication_layer.set_handler(self) - - self.pdu_codec = PduCodec(hs) - - @log_function - @defer.inlineCallbacks - def handle_new_event(self, event): - """ Takes in an event from the client to server side, that has already - been authed and handled by the state module, and sends it to any - remote home servers that may be interested. - - Args: - event - - Returns: - Deferred: Resolved when it has successfully been queued for - processing. - """ - yield self.fill_out_prev_events(event) - - pdu = self.pdu_codec.pdu_from_event(event) - - if not hasattr(pdu, "destinations") or not pdu.destinations: - pdu.destinations = [] - - yield self.replication_layer.send_pdu(pdu) - - @log_function - @defer.inlineCallbacks - def backfill(self, dest, room_id, limit): - pdus = yield self.replication_layer.backfill(dest, room_id, limit) - - if not pdus: - defer.returnValue([]) - - events = [ - self.pdu_codec.event_from_pdu(pdu) - for pdu in pdus - ] - - defer.returnValue(events) - - @log_function - def get_state_for_room(self, destination, room_id): - return self.replication_layer.get_state_for_context( - destination, room_id - ) - - @log_function - @defer.inlineCallbacks - def on_receive_pdu(self, pdu, backfilled): - """ Called by the ReplicationLayer when we have a new pdu. We need to - do auth checks and put it throught the StateHandler. - """ - event = self.pdu_codec.event_from_pdu(pdu) - - try: - with (yield self.lock_manager.lock(pdu.context)): - if event.is_state and not backfilled: - is_new_state = yield self.state_handler.handle_new_state( - pdu - ) - if not is_new_state: - return - else: - is_new_state = False - - yield self.event_handler.on_receive(event, is_new_state, backfilled) - - except AuthError: - # TODO: Implement something in federation that allows us to - # respond to PDU. - raise - - return - - @defer.inlineCallbacks - def _on_new_state(self, pdu, new_state_event): - # TODO: Do any store stuff here. Notifiy C2S about this new - # state. - - yield self.store.update_current_state( - pdu_id=pdu.pdu_id, - origin=pdu.origin, - context=pdu.context, - pdu_type=pdu.pdu_type, - state_key=pdu.state_key - ) - - yield self.event_handler.on_receive(new_state_event) - - @defer.inlineCallbacks - def fill_out_prev_events(self, event): - if hasattr(event, "prev_events"): - return - - results = yield self.store.get_latest_pdus_in_context( - event.room_id - ) - - es = [ - "%s@%s" % (p_id, origin) for p_id, origin, _ in results - ] - - event.prev_events = [e for e in es if e != event.event_id] - - if results: - event.depth = max([int(v) for _, _, v in results]) + 1 - else: - event.depth = 0 diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index e0e4de4e8c..4cf72b2e42 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -25,7 +25,6 @@ from .units import Pdu from synapse.util.logutils import log_function -import copy import json import logging @@ -41,28 +40,6 @@ class PduActions(object): self.store = datastore @log_function - def persist_received(self, pdu): - """ Persists the given `Pdu` that was received from a remote home - server. - - Returns: - Deferred - """ - return self._persist(pdu) - - @defer.inlineCallbacks - @log_function - def persist_outgoing(self, pdu): - """ Persists the given `Pdu` that this home server created. - - Returns: - Deferred - """ - ret = yield self._persist(pdu) - - defer.returnValue(ret) - - @log_function def mark_as_processed(self, pdu): """ Persist the fact that we have fully processed the given `Pdu` @@ -73,25 +50,6 @@ class PduActions(object): @defer.inlineCallbacks @log_function - def populate_previous_pdus(self, pdu): - """ Given an outgoing `Pdu` fill out its `prev_ids` key with the `Pdu`s - that we have received. - - Returns: - Deferred - """ - results = yield self.store.get_latest_pdus_in_context(pdu.context) - - pdu.prev_pdus = [(p_id, origin) for p_id, origin, _ in results] - - vs = [int(v) for _, _, v in results] - if vs: - pdu.depth = max(vs) + 1 - else: - pdu.depth = 0 - - @defer.inlineCallbacks - @log_function def after_transaction(self, transaction_id, destination, origin): """ Returns all `Pdu`s that we sent to the given remote home server after a given transaction id. @@ -143,28 +101,6 @@ class PduActions(object): depth=pdu.depth ) - @defer.inlineCallbacks - @log_function - def _persist(self, pdu): - kwargs = copy.copy(pdu.__dict__) - unrec_keys = copy.copy(pdu.unrecognized_keys) - del kwargs["content"] - kwargs["content_json"] = json.dumps(pdu.content) - kwargs["unrecognized_keys"] = json.dumps(unrec_keys) - - logger.debug("Persisting: %s", repr(kwargs)) - - if pdu.is_state: - ret = yield self.store.persist_state(**kwargs) - else: - ret = yield self.store.persist_pdu(**kwargs) - - yield self.store.update_min_depth_for_context( - pdu.context, pdu.depth - ) - - defer.returnValue(ret) - class TransactionActions(object): """ Defines persistence actions that relate to handling Transactions. diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index cf634a64b2..38ae360bcd 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -134,10 +134,8 @@ class ReplicationLayer(object): logger.debug("[%s] Persisting PDU", pdu.pdu_id) - #yield self.pdu_actions.populate_previous_pdus(pdu) - # Save *before* trying to send - yield self.pdu_actions.persist_outgoing(pdu) + yield self.store.persist_event(pdu=pdu) logger.debug("[%s] Persisted PDU", pdu.pdu_id) logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.pdu_id) @@ -450,7 +448,7 @@ class ReplicationLayer(object): logger.exception("Failed to get PDU") # Persist the Pdu, but don't mark it as processed yet. - yield self.pdu_actions.persist_received(pdu) + yield self.store.persist_event(pdu=pdu) if not backfilled: ret = yield self.handler.on_receive_pdu(pdu, backfilled=backfilled) |