summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/federation_sender.py31
1 files changed, 18 insertions, 13 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 7a4fec4a66..32113c175c 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -127,13 +127,6 @@ class FederationSenderServer(HomeServer):
         replication_url = self.config.worker_replication_url
         send_handler = self._get_send_handler()
 
-        def replicate(results):
-            stream = results.get("events")
-            if stream:
-                # max_stream_id = stream["position"]
-                # TODO
-                pass
-
         while True:
             try:
                 args = store.stream_positions()
@@ -142,7 +135,6 @@ class FederationSenderServer(HomeServer):
                 result = yield http_client.get_json(replication_url, args=args)
                 yield store.process_replication(result)
                 send_handler.process_replication(result)
-                replicate(result)
             except:
                 logger.exception("Error replicating from %r", replication_url)
                 yield sleep(30)
@@ -242,16 +234,17 @@ class FederationSenderHandler(object):
         return {"federation": self._latest_room_serial}
 
     def process_replication(self, result):
-        stream = result.get("federation")
-        if stream:
-            self._latest_room_serial = int(stream["position"])
+        fed_stream = result.get("federation")
+        if fed_stream:
+            self._latest_room_serial = int(fed_stream["position"])
 
             presence_to_send = {}
             keyed_edus = {}
             edus = {}
             failures = {}
+            device_destinations = set()
 
-            for row in stream["rows"]:
+            for row in fed_stream["rows"]:
                 position, typ, content_js = row
                 content = json.loads(content_js)
 
@@ -264,7 +257,9 @@ class FederationSenderHandler(object):
                     key = content["key"]
                     edu = Edu(**content["edu"])
 
-                    keyed_edus.setdefault(edu.destination, {})[key] = edu
+                    keyed_edus.setdefault(
+                        edu.destination, {}
+                    )[(edu.destination, tuple(key))] = edu
                 elif typ == send_queue.EDU_TYPE:
                     edu = Edu(**content)
 
@@ -274,6 +269,8 @@ class FederationSenderHandler(object):
                     failure = content["failure"]
 
                     failures.setdefault(destination, []).append(failure)
+                elif typ == send_queue.DEVICE_MESSAGE_TYPE:
+                    device_destinations.add(content["destination"])
                 else:
                     raise Exception("Unrecognised federation type: %r", typ)
 
@@ -296,6 +293,14 @@ class FederationSenderHandler(object):
                 for failure in failure_list:
                     self.federation_sender.send_failure(destination, failure)
 
+            for destination in device_destinations:
+                self.federation_sender.send_device_messages(destination)
+
+        event_stream = result.get("events")
+        if event_stream:
+            latest_pos = event_stream["position"]
+            self.federation_sender.notify_new_events(latest_pos)
+
 
 if __name__ == '__main__':
     with LoggingContext("main"):