diff options
Diffstat (limited to 'synapse/federation/persistence.py')
-rw-r--r-- | synapse/federation/persistence.py | 240 |
1 files changed, 240 insertions, 0 deletions
diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py new file mode 100644 index 0000000000..ad4111c683 --- /dev/null +++ b/synapse/federation/persistence.py @@ -0,0 +1,240 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" This module contains all the persistence actions done by the federation +package. + +These actions are mostly only used by the :py:mod:`.replication` module. +""" + +from twisted.internet import defer + +from .units import Pdu + +from synapse.util.logutils import log_function + +import copy +import json +import logging + + +logger = logging.getLogger(__name__) + + +class PduActions(object): + """ Defines persistence actions that relate to handling PDUs. + """ + + def __init__(self, datastore): + self.store = datastore + + @log_function + def persist_received(self, pdu): + """ Persists the given `Pdu` that was received from a remote home + server. + + Returns: + Deferred + """ + return self._persist(pdu) + + @defer.inlineCallbacks + @log_function + def persist_outgoing(self, pdu): + """ Persists the given `Pdu` that this home server created. + + Returns: + Deferred + """ + ret = yield self._persist(pdu) + + defer.returnValue(ret) + + @log_function + def mark_as_processed(self, pdu): + """ Persist the fact that we have fully processed the given `Pdu` + + Returns: + Deferred + """ + return self.store.mark_pdu_as_processed(pdu.pdu_id, pdu.origin) + + @defer.inlineCallbacks + @log_function + def populate_previous_pdus(self, pdu): + """ Given an outgoing `Pdu` fill out its `prev_ids` key with the `Pdu`s + that we have received. + + Returns: + Deferred + """ + results = yield self.store.get_latest_pdus_in_context(pdu.context) + + pdu.prev_pdus = [(p_id, origin) for p_id, origin, _ in results] + + vs = [int(v) for _, _, v in results] + if vs: + pdu.depth = max(vs) + 1 + else: + pdu.depth = 0 + + @defer.inlineCallbacks + @log_function + def after_transaction(self, transaction_id, destination, origin): + """ Returns all `Pdu`s that we sent to the given remote home server + after a given transaction id. + + Returns: + Deferred: Results in a list of `Pdu`s + """ + results = yield self.store.get_pdus_after_transaction( + transaction_id, + destination + ) + + defer.returnValue([Pdu.from_pdu_tuple(p) for p in results]) + + @defer.inlineCallbacks + @log_function + def get_all_pdus_from_context(self, context): + results = yield self.store.get_all_pdus_from_context(context) + defer.returnValue([Pdu.from_pdu_tuple(p) for p in results]) + + @defer.inlineCallbacks + @log_function + def paginate(self, context, pdu_list, limit): + """ For a given list of PDU id and origins return the proceeding + `limit` `Pdu`s in the given `context`. + + Returns: + Deferred: Results in a list of `Pdu`s. + """ + results = yield self.store.get_pagination( + context, pdu_list, limit + ) + + defer.returnValue([Pdu.from_pdu_tuple(p) for p in results]) + + @log_function + def is_new(self, pdu): + """ When we receive a `Pdu` from a remote home server, we want to + figure out whether it is `new`, i.e. it is not some historic PDU that + we haven't seen simply because we haven't paginated back that far. + + Returns: + Deferred: Results in a `bool` + """ + return self.store.is_pdu_new( + pdu_id=pdu.pdu_id, + origin=pdu.origin, + context=pdu.context, + depth=pdu.depth + ) + + @defer.inlineCallbacks + @log_function + def _persist(self, pdu): + kwargs = copy.copy(pdu.__dict__) + unrec_keys = copy.copy(pdu.unrecognized_keys) + del kwargs["content"] + kwargs["content_json"] = json.dumps(pdu.content) + kwargs["unrecognized_keys"] = json.dumps(unrec_keys) + + logger.debug("Persisting: %s", repr(kwargs)) + + if pdu.is_state: + ret = yield self.store.persist_state(**kwargs) + else: + ret = yield self.store.persist_pdu(**kwargs) + + yield self.store.update_min_depth_for_context( + pdu.context, pdu.depth + ) + + defer.returnValue(ret) + + +class TransactionActions(object): + """ Defines persistence actions that relate to handling Transactions. + """ + + def __init__(self, datastore): + self.store = datastore + + @log_function + def have_responded(self, transaction): + """ Have we already responded to a transaction with the same id and + origin? + + Returns: + Deferred: Results in `None` if we have not previously responded to + this transaction or a 2-tuple of `(int, dict)` representing the + response code and response body. + """ + if not transaction.transaction_id: + raise RuntimeError("Cannot persist a transaction with no " + "transaction_id") + + return self.store.get_received_txn_response( + transaction.transaction_id, transaction.origin + ) + + @log_function + def set_response(self, transaction, code, response): + """ Persist how we responded to a transaction. + + Returns: + Deferred + """ + if not transaction.transaction_id: + raise RuntimeError("Cannot persist a transaction with no " + "transaction_id") + + return self.store.set_received_txn_response( + transaction.transaction_id, + transaction.origin, + code, + json.dumps(response) + ) + + @defer.inlineCallbacks + @log_function + def prepare_to_send(self, transaction): + """ Persists the `Transaction` we are about to send and works out the + correct value for the `prev_ids` key. + + Returns: + Deferred + """ + transaction.prev_ids = yield self.store.prep_send_transaction( + transaction.transaction_id, + transaction.destination, + transaction.ts, + [(p["pdu_id"], p["origin"]) for p in transaction.pdus] + ) + + @log_function + def delivered(self, transaction, response_code, response_dict): + """ Marks the given `Transaction` as having been successfully + delivered to the remote homeserver, and what the response was. + + Returns: + Deferred + """ + return self.store.delivered_txn( + transaction.transaction_id, + transaction.destination, + response_code, + json.dumps(response_dict) + ) |