diff options
-rw-r--r-- | synapse/app/federation_sender.py | 12 | ||||
-rw-r--r-- | synapse/federation/send_queue.py | 26 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 3 |
3 files changed, 41 insertions, 0 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 6678667c35..ba2b4c2615 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -215,6 +215,9 @@ def start(config_options): class FederationSenderHandler(object): + """Processes the replication stream and forwards the appropriate entries + to the federation sender. + """ def __init__(self, hs): self.store = hs.get_datastore() self.federation_sender = hs.get_federation_sender() @@ -236,16 +239,22 @@ class FederationSenderHandler(object): @defer.inlineCallbacks def process_replication(self, result): + # The federation stream contains things that we want to send out, e.g. + # presence, typing, etc. fed_stream = result.get("federation") if fed_stream: latest_id = int(fed_stream["position"]) + # The federation stream containis a bunch of different types of + # rows that need to be handled differently. We parse the rows, put + # them into the appropriate collection and then send them off. presence_to_send = {} keyed_edus = {} edus = {} failures = {} device_destinations = set() + # Parse the rows in the stream for row in fed_stream["rows"]: position, typ, content_js = row content = json.loads(content_js) @@ -276,6 +285,7 @@ class FederationSenderHandler(object): else: raise Exception("Unrecognised federation type: %r", typ) + # We've finished collecting, send everything off for destination, states in presence_to_send.items(): self.federation_sender.send_presence(destination, states) @@ -298,10 +308,12 @@ class FederationSenderHandler(object): for destination in device_destinations: self.federation_sender.send_device_messages(destination) + # Record where we are in the stream. yield self.store.update_federation_out_pos( "federation", latest_id ) + # We also need to poke the federation sender when new events happen event_stream = result.get("events") if event_stream: latest_pos = event_stream["position"] diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 99b5835780..98cf125cb5 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -13,6 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""A federation sender that forwards things to be sent across replication to +a worker process. + +It assumes there is a single worker process feeding off of it. + +Each row in the replication stream consists of a type and some json, where the +types indicate whether they are presence, or edus, etc. + +Ephemeral or non-event data are queued up in-memory. When the worker requests +updates since a particular point, all in-memory data since before that point is +dropped. We also expire things in the queue after 5 minutes, to ensure that a +dead worker doesn't cause the queues to grow limitlessly. + +Events are replicated via a separate events stream. +""" + from .units import Edu from blist import sorteddict @@ -27,6 +43,7 @@ DEVICE_MESSAGE_TYPE = "d" class FederationRemoteSendQueue(object): + """A drop in replacement for TransactionQueue""" def __init__(self, hs): self.server_name = hs.hostname @@ -58,6 +75,7 @@ class FederationRemoteSendQueue(object): return pos def _clear_queue(self): + """Clear the queues for anything older than N minutes""" # TODO measure this function time. FIVE_MINUTES_AGO = 5 * 60 * 1000 @@ -75,6 +93,7 @@ class FederationRemoteSendQueue(object): self._clear_queue_before_pos(position_to_delete) def _clear_queue_before_pos(self, position_to_delete): + """Clear all the queues from before a given position""" # Delete things out of presence maps keys = self.presence_changed.keys() i = keys.bisect_left(position_to_delete) @@ -122,9 +141,13 @@ class FederationRemoteSendQueue(object): del self.device_messages[key] def notify_new_events(self, current_id): + """As per TransactionQueue""" + # We don't need to replicate this as it gets sent down a different + # stream. pass def send_edu(self, destination, edu_type, content, key=None): + """As per TransactionQueue""" pos = self._next_pos() edu = Edu( @@ -142,6 +165,7 @@ class FederationRemoteSendQueue(object): self.edus[pos] = edu def send_presence(self, destination, states): + """As per TransactionQueue""" pos = self._next_pos() self.presence_map.update({ @@ -154,11 +178,13 @@ class FederationRemoteSendQueue(object): ] def send_failure(self, failure, destination): + """As per TransactionQueue""" pos = self._next_pos() self.failures[pos] = (destination, str(failure)) def send_device_messages(self, destination): + """As per TransactionQueue""" pos = self._next_pos() self.device_messages[pos] = destination diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 0b3fdc1067..c94c74a67e 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -131,6 +131,9 @@ class TransactionQueue(object): @defer.inlineCallbacks def notify_new_events(self, current_id): + """This gets called when we have some new events we might want to + send out to other servers. + """ self._last_poked_id = max(current_id, self._last_poked_id) if self._is_processing: |