summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-11-21 16:55:23 +0000
committerErik Johnston <erik@matrix.org>2016-11-21 16:55:23 +0000
commit50934ce4604001898707f75179dd748884659f12 (patch)
treebaf00a1d89d246fa4dd0027d10164f1479a510a1
parentRemove explicit calls to send_pdu (diff)
downloadsynapse-50934ce4604001898707f75179dd748884659f12.tar.xz
Comments
-rw-r--r--synapse/app/federation_sender.py12
-rw-r--r--synapse/federation/send_queue.py26
-rw-r--r--synapse/federation/transaction_queue.py3
3 files changed, 41 insertions, 0 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 6678667c35..ba2b4c2615 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -215,6 +215,9 @@ def start(config_options):
 
 
 class FederationSenderHandler(object):
+    """Processes the replication stream and forwards the appropriate entries
+    to the federation sender.
+    """
     def __init__(self, hs):
         self.store = hs.get_datastore()
         self.federation_sender = hs.get_federation_sender()
@@ -236,16 +239,22 @@ class FederationSenderHandler(object):
 
     @defer.inlineCallbacks
     def process_replication(self, result):
+        # The federation stream contains things that we want to send out, e.g.
+        # presence, typing, etc.
         fed_stream = result.get("federation")
         if fed_stream:
             latest_id = int(fed_stream["position"])
 
+            # The federation stream containis a bunch of different types of
+            # rows that need to be handled differently. We parse the rows, put
+            # them into the appropriate collection and then send them off.
             presence_to_send = {}
             keyed_edus = {}
             edus = {}
             failures = {}
             device_destinations = set()
 
+            # Parse the rows in the stream
             for row in fed_stream["rows"]:
                 position, typ, content_js = row
                 content = json.loads(content_js)
@@ -276,6 +285,7 @@ class FederationSenderHandler(object):
                 else:
                     raise Exception("Unrecognised federation type: %r", typ)
 
+            # We've finished collecting, send everything off
             for destination, states in presence_to_send.items():
                 self.federation_sender.send_presence(destination, states)
 
@@ -298,10 +308,12 @@ class FederationSenderHandler(object):
             for destination in device_destinations:
                 self.federation_sender.send_device_messages(destination)
 
+            # Record where we are in the stream.
             yield self.store.update_federation_out_pos(
                 "federation", latest_id
             )
 
+        # We also need to poke the federation sender when new events happen
         event_stream = result.get("events")
         if event_stream:
             latest_pos = event_stream["position"]
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 99b5835780..98cf125cb5 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -13,6 +13,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+"""A federation sender that forwards things to be sent across replication to
+a worker process.
+
+It assumes there is a single worker process feeding off of it.
+
+Each row in the replication stream consists of a type and some json, where the
+types indicate whether they are presence, or edus, etc.
+
+Ephemeral or non-event data are queued up in-memory. When the worker requests
+updates since a particular point, all in-memory data since before that point is
+dropped. We also expire things in the queue after 5 minutes, to ensure that a
+dead worker doesn't cause the queues to grow limitlessly.
+
+Events are replicated via a separate events stream.
+"""
+
 from .units import Edu
 
 from blist import sorteddict
@@ -27,6 +43,7 @@ DEVICE_MESSAGE_TYPE = "d"
 
 
 class FederationRemoteSendQueue(object):
+    """A drop in replacement for TransactionQueue"""
 
     def __init__(self, hs):
         self.server_name = hs.hostname
@@ -58,6 +75,7 @@ class FederationRemoteSendQueue(object):
         return pos
 
     def _clear_queue(self):
+        """Clear the queues for anything older than N minutes"""
         # TODO measure this function time.
 
         FIVE_MINUTES_AGO = 5 * 60 * 1000
@@ -75,6 +93,7 @@ class FederationRemoteSendQueue(object):
         self._clear_queue_before_pos(position_to_delete)
 
     def _clear_queue_before_pos(self, position_to_delete):
+        """Clear all the queues from before a given position"""
         # Delete things out of presence maps
         keys = self.presence_changed.keys()
         i = keys.bisect_left(position_to_delete)
@@ -122,9 +141,13 @@ class FederationRemoteSendQueue(object):
             del self.device_messages[key]
 
     def notify_new_events(self, current_id):
+        """As per TransactionQueue"""
+        # We don't need to replicate this as it gets sent down a different
+        # stream.
         pass
 
     def send_edu(self, destination, edu_type, content, key=None):
+        """As per TransactionQueue"""
         pos = self._next_pos()
 
         edu = Edu(
@@ -142,6 +165,7 @@ class FederationRemoteSendQueue(object):
             self.edus[pos] = edu
 
     def send_presence(self, destination, states):
+        """As per TransactionQueue"""
         pos = self._next_pos()
 
         self.presence_map.update({
@@ -154,11 +178,13 @@ class FederationRemoteSendQueue(object):
         ]
 
     def send_failure(self, failure, destination):
+        """As per TransactionQueue"""
         pos = self._next_pos()
 
         self.failures[pos] = (destination, str(failure))
 
     def send_device_messages(self, destination):
+        """As per TransactionQueue"""
         pos = self._next_pos()
         self.device_messages[pos] = destination
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 0b3fdc1067..c94c74a67e 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -131,6 +131,9 @@ class TransactionQueue(object):
 
     @defer.inlineCallbacks
     def notify_new_events(self, current_id):
+        """This gets called when we have some new events we might want to
+        send out to other servers.
+        """
         self._last_poked_id = max(current_id, self._last_poked_id)
 
         if self._is_processing: