summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/federation_sender.py38
1 files changed, 22 insertions, 16 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 32113c175c..6678667c35 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -125,27 +125,22 @@ class FederationSenderServer(HomeServer):
         http_client = self.get_simple_http_client()
         store = self.get_datastore()
         replication_url = self.config.worker_replication_url
-        send_handler = self._get_send_handler()
+        send_handler = FederationSenderHandler(self)
+
+        send_handler.on_start()
 
         while True:
             try:
                 args = store.stream_positions()
-                args.update(send_handler.stream_positions())
+                args.update((yield send_handler.stream_positions()))
                 args["timeout"] = 30000
                 result = yield http_client.get_json(replication_url, args=args)
                 yield store.process_replication(result)
-                send_handler.process_replication(result)
+                yield send_handler.process_replication(result)
             except:
                 logger.exception("Error replicating from %r", replication_url)
                 yield sleep(30)
 
-    def _get_send_handler(self):
-        try:
-            return self._send_handler
-        except AttributeError:
-            self._send_handler = FederationSenderHandler(self)
-            return self._send_handler
-
 
 def start(config_options):
     try:
@@ -221,22 +216,29 @@ def start(config_options):
 
 class FederationSenderHandler(object):
     def __init__(self, hs):
+        self.store = hs.get_datastore()
         self.federation_sender = hs.get_federation_sender()
 
-        self._latest_room_serial = -1
         self._room_serials = {}
         self._room_typing = {}
 
+    def on_start(self):
+        # There may be some events that are persisted but haven't been sent,
+        # so send them now.
+        self.federation_sender.notify_new_events(
+            self.store.get_room_max_stream_ordering()
+        )
+
+    @defer.inlineCallbacks
     def stream_positions(self):
-        # We must update this token from the response of the previous
-        # sync. In particular, the stream id may "reset" back to zero/a low
-        # value which we *must* use for the next replication request.
-        return {"federation": self._latest_room_serial}
+        stream_id = yield self.store.get_federation_out_pos("federation")
+        defer.returnValue({"federation": stream_id})
 
+    @defer.inlineCallbacks
     def process_replication(self, result):
         fed_stream = result.get("federation")
         if fed_stream:
-            self._latest_room_serial = int(fed_stream["position"])
+            latest_id = int(fed_stream["position"])
 
             presence_to_send = {}
             keyed_edus = {}
@@ -296,6 +298,10 @@ class FederationSenderHandler(object):
             for destination in device_destinations:
                 self.federation_sender.send_device_messages(destination)
 
+            yield self.store.update_federation_out_pos(
+                "federation", latest_id
+            )
+
         event_stream = result.get("events")
         if event_stream:
             latest_pos = event_stream["position"]