diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/send_queue.py | 26 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 3 |
2 files changed, 29 insertions, 0 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 99b5835780..98cf125cb5 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -13,6 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""A federation sender that forwards things to be sent across replication to +a worker process. + +It assumes there is a single worker process feeding off of it. + +Each row in the replication stream consists of a type and some json, where the +types indicate whether they are presence, or edus, etc. + +Ephemeral or non-event data are queued up in-memory. When the worker requests +updates since a particular point, all in-memory data since before that point is +dropped. We also expire things in the queue after 5 minutes, to ensure that a +dead worker doesn't cause the queues to grow limitlessly. + +Events are replicated via a separate events stream. +""" + from .units import Edu from blist import sorteddict @@ -27,6 +43,7 @@ DEVICE_MESSAGE_TYPE = "d" class FederationRemoteSendQueue(object): + """A drop in replacement for TransactionQueue""" def __init__(self, hs): self.server_name = hs.hostname @@ -58,6 +75,7 @@ class FederationRemoteSendQueue(object): return pos def _clear_queue(self): + """Clear the queues for anything older than N minutes""" # TODO measure this function time. FIVE_MINUTES_AGO = 5 * 60 * 1000 @@ -75,6 +93,7 @@ class FederationRemoteSendQueue(object): self._clear_queue_before_pos(position_to_delete) def _clear_queue_before_pos(self, position_to_delete): + """Clear all the queues from before a given position""" # Delete things out of presence maps keys = self.presence_changed.keys() i = keys.bisect_left(position_to_delete) @@ -122,9 +141,13 @@ class FederationRemoteSendQueue(object): del self.device_messages[key] def notify_new_events(self, current_id): + """As per TransactionQueue""" + # We don't need to replicate this as it gets sent down a different + # stream. pass def send_edu(self, destination, edu_type, content, key=None): + """As per TransactionQueue""" pos = self._next_pos() edu = Edu( @@ -142,6 +165,7 @@ class FederationRemoteSendQueue(object): self.edus[pos] = edu def send_presence(self, destination, states): + """As per TransactionQueue""" pos = self._next_pos() self.presence_map.update({ @@ -154,11 +178,13 @@ class FederationRemoteSendQueue(object): ] def send_failure(self, failure, destination): + """As per TransactionQueue""" pos = self._next_pos() self.failures[pos] = (destination, str(failure)) def send_device_messages(self, destination): + """As per TransactionQueue""" pos = self._next_pos() self.device_messages[pos] = destination diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 0b3fdc1067..c94c74a67e 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -131,6 +131,9 @@ class TransactionQueue(object): @defer.inlineCallbacks def notify_new_events(self, current_id): + """This gets called when we have some new events we might want to + send out to other servers. + """ self._last_poked_id = max(current_id, self._last_poked_id) if self._is_processing: |