Store federation stream positions in the database
1 files changed, 22 insertions, 16 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 32113c175c..6678667c35 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -125,27 +125,22 @@ class FederationSenderServer(HomeServer):
http_client = self.get_simple_http_client()
store = self.get_datastore()
replication_url = self.config.worker_replication_url
- send_handler = self._get_send_handler()
+ send_handler = FederationSenderHandler(self)
+
+ send_handler.on_start()
while True:
try:
args = store.stream_positions()
- args.update(send_handler.stream_positions())
+ args.update((yield send_handler.stream_positions()))
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
yield store.process_replication(result)
- send_handler.process_replication(result)
+ yield send_handler.process_replication(result)
except:
logger.exception("Error replicating from %r", replication_url)
yield sleep(30)
- def _get_send_handler(self):
- try:
- return self._send_handler
- except AttributeError:
- self._send_handler = FederationSenderHandler(self)
- return self._send_handler
-
def start(config_options):
try:
@@ -221,22 +216,29 @@ def start(config_options):
class FederationSenderHandler(object):
def __init__(self, hs):
+ self.store = hs.get_datastore()
self.federation_sender = hs.get_federation_sender()
- self._latest_room_serial = -1
self._room_serials = {}
self._room_typing = {}
+ def on_start(self):
+ # There may be some events that are persisted but haven't been sent,
+ # so send them now.
+ self.federation_sender.notify_new_events(
+ self.store.get_room_max_stream_ordering()
+ )
+
+ @defer.inlineCallbacks
def stream_positions(self):
- # We must update this token from the response of the previous
- # sync. In particular, the stream id may "reset" back to zero/a low
- # value which we *must* use for the next replication request.
- return {"federation": self._latest_room_serial}
+ stream_id = yield self.store.get_federation_out_pos("federation")
+ defer.returnValue({"federation": stream_id})
+ @defer.inlineCallbacks
def process_replication(self, result):
fed_stream = result.get("federation")
if fed_stream:
- self._latest_room_serial = int(fed_stream["position"])
+ latest_id = int(fed_stream["position"])
presence_to_send = {}
keyed_edus = {}
@@ -296,6 +298,10 @@ class FederationSenderHandler(object):
for destination in device_destinations:
self.federation_sender.send_device_messages(destination)
+ yield self.store.update_federation_out_pos(
+ "federation", latest_id
+ )
+
event_stream = result.get("events")
if event_stream:
latest_pos = event_stream["position"]
|