diff options
author | Erik Johnston <erik@matrix.org> | 2016-11-21 16:55:23 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-11-21 16:55:23 +0000 |
commit | 50934ce4604001898707f75179dd748884659f12 (patch) | |
tree | baf00a1d89d246fa4dd0027d10164f1479a510a1 /synapse/app | |
parent | Remove explicit calls to send_pdu (diff) | |
download | synapse-50934ce4604001898707f75179dd748884659f12.tar.xz |
Comments
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/federation_sender.py | 12 |
1 files changed, 12 insertions, 0 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 6678667c35..ba2b4c2615 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -215,6 +215,9 @@ def start(config_options): class FederationSenderHandler(object): + """Processes the replication stream and forwards the appropriate entries + to the federation sender. + """ def __init__(self, hs): self.store = hs.get_datastore() self.federation_sender = hs.get_federation_sender() @@ -236,16 +239,22 @@ class FederationSenderHandler(object): @defer.inlineCallbacks def process_replication(self, result): + # The federation stream contains things that we want to send out, e.g. + # presence, typing, etc. fed_stream = result.get("federation") if fed_stream: latest_id = int(fed_stream["position"]) + # The federation stream containis a bunch of different types of + # rows that need to be handled differently. We parse the rows, put + # them into the appropriate collection and then send them off. presence_to_send = {} keyed_edus = {} edus = {} failures = {} device_destinations = set() + # Parse the rows in the stream for row in fed_stream["rows"]: position, typ, content_js = row content = json.loads(content_js) @@ -276,6 +285,7 @@ class FederationSenderHandler(object): else: raise Exception("Unrecognised federation type: %r", typ) + # We've finished collecting, send everything off for destination, states in presence_to_send.items(): self.federation_sender.send_presence(destination, states) @@ -298,10 +308,12 @@ class FederationSenderHandler(object): for destination in device_destinations: self.federation_sender.send_device_messages(destination) + # Record where we are in the stream. yield self.store.update_federation_out_pos( "federation", latest_id ) + # We also need to poke the federation sender when new events happen event_stream = result.get("events") if event_stream: latest_pos = event_stream["position"] |