diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 6678667c35..ba2b4c2615 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -215,6 +215,9 @@ def start(config_options):
class FederationSenderHandler(object):
+ """Processes the replication stream and forwards the appropriate entries
+ to the federation sender.
+ """
def __init__(self, hs):
self.store = hs.get_datastore()
self.federation_sender = hs.get_federation_sender()
@@ -236,16 +239,22 @@ class FederationSenderHandler(object):
@defer.inlineCallbacks
def process_replication(self, result):
+ # The federation stream contains things that we want to send out, e.g.
+ # presence, typing, etc.
fed_stream = result.get("federation")
if fed_stream:
latest_id = int(fed_stream["position"])
+ # The federation stream containis a bunch of different types of
+ # rows that need to be handled differently. We parse the rows, put
+ # them into the appropriate collection and then send them off.
presence_to_send = {}
keyed_edus = {}
edus = {}
failures = {}
device_destinations = set()
+ # Parse the rows in the stream
for row in fed_stream["rows"]:
position, typ, content_js = row
content = json.loads(content_js)
@@ -276,6 +285,7 @@ class FederationSenderHandler(object):
else:
raise Exception("Unrecognised federation type: %r", typ)
+ # We've finished collecting, send everything off
for destination, states in presence_to_send.items():
self.federation_sender.send_presence(destination, states)
@@ -298,10 +308,12 @@ class FederationSenderHandler(object):
for destination in device_destinations:
self.federation_sender.send_device_messages(destination)
+ # Record where we are in the stream.
yield self.store.update_federation_out_pos(
"federation", latest_id
)
+ # We also need to poke the federation sender when new events happen
event_stream = result.get("events")
if event_stream:
latest_pos = event_stream["position"]
|