summary refs log tree commit diff
path: root/synapse/app/federation_sender.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-11-21 16:55:23 +0000
committerErik Johnston <erik@matrix.org>2016-11-21 16:55:23 +0000
commit50934ce4604001898707f75179dd748884659f12 (patch)
treebaf00a1d89d246fa4dd0027d10164f1479a510a1 /synapse/app/federation_sender.py
parentRemove explicit calls to send_pdu (diff)
downloadsynapse-50934ce4604001898707f75179dd748884659f12.tar.xz
Comments
Diffstat (limited to 'synapse/app/federation_sender.py')
-rw-r--r--synapse/app/federation_sender.py12
1 files changed, 12 insertions, 0 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 6678667c35..ba2b4c2615 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -215,6 +215,9 @@ def start(config_options):
 
 
 class FederationSenderHandler(object):
+    """Processes the replication stream and forwards the appropriate entries
+    to the federation sender.
+    """
     def __init__(self, hs):
         self.store = hs.get_datastore()
         self.federation_sender = hs.get_federation_sender()
@@ -236,16 +239,22 @@ class FederationSenderHandler(object):
 
     @defer.inlineCallbacks
     def process_replication(self, result):
+        # The federation stream contains things that we want to send out, e.g.
+        # presence, typing, etc.
         fed_stream = result.get("federation")
         if fed_stream:
             latest_id = int(fed_stream["position"])
 
+            # The federation stream containis a bunch of different types of
+            # rows that need to be handled differently. We parse the rows, put
+            # them into the appropriate collection and then send them off.
             presence_to_send = {}
             keyed_edus = {}
             edus = {}
             failures = {}
             device_destinations = set()
 
+            # Parse the rows in the stream
             for row in fed_stream["rows"]:
                 position, typ, content_js = row
                 content = json.loads(content_js)
@@ -276,6 +285,7 @@ class FederationSenderHandler(object):
                 else:
                     raise Exception("Unrecognised federation type: %r", typ)
 
+            # We've finished collecting, send everything off
             for destination, states in presence_to_send.items():
                 self.federation_sender.send_presence(destination, states)
 
@@ -298,10 +308,12 @@ class FederationSenderHandler(object):
             for destination in device_destinations:
                 self.federation_sender.send_device_messages(destination)
 
+            # Record where we are in the stream.
             yield self.store.update_federation_out_pos(
                 "federation", latest_id
             )
 
+        # We also need to poke the federation sender when new events happen
         event_stream = result.get("events")
         if event_stream:
             latest_pos = event_stream["position"]