diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 7a4fec4a66..32113c175c 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -127,13 +127,6 @@ class FederationSenderServer(HomeServer):
replication_url = self.config.worker_replication_url
send_handler = self._get_send_handler()
- def replicate(results):
- stream = results.get("events")
- if stream:
- # max_stream_id = stream["position"]
- # TODO
- pass
-
while True:
try:
args = store.stream_positions()
@@ -142,7 +135,6 @@ class FederationSenderServer(HomeServer):
result = yield http_client.get_json(replication_url, args=args)
yield store.process_replication(result)
send_handler.process_replication(result)
- replicate(result)
except:
logger.exception("Error replicating from %r", replication_url)
yield sleep(30)
@@ -242,16 +234,17 @@ class FederationSenderHandler(object):
return {"federation": self._latest_room_serial}
def process_replication(self, result):
- stream = result.get("federation")
- if stream:
- self._latest_room_serial = int(stream["position"])
+ fed_stream = result.get("federation")
+ if fed_stream:
+ self._latest_room_serial = int(fed_stream["position"])
presence_to_send = {}
keyed_edus = {}
edus = {}
failures = {}
+ device_destinations = set()
- for row in stream["rows"]:
+ for row in fed_stream["rows"]:
position, typ, content_js = row
content = json.loads(content_js)
@@ -264,7 +257,9 @@ class FederationSenderHandler(object):
key = content["key"]
edu = Edu(**content["edu"])
- keyed_edus.setdefault(edu.destination, {})[key] = edu
+ keyed_edus.setdefault(
+ edu.destination, {}
+ )[(edu.destination, tuple(key))] = edu
elif typ == send_queue.EDU_TYPE:
edu = Edu(**content)
@@ -274,6 +269,8 @@ class FederationSenderHandler(object):
failure = content["failure"]
failures.setdefault(destination, []).append(failure)
+ elif typ == send_queue.DEVICE_MESSAGE_TYPE:
+ device_destinations.add(content["destination"])
else:
raise Exception("Unrecognised federation type: %r", typ)
@@ -296,6 +293,14 @@ class FederationSenderHandler(object):
for failure in failure_list:
self.federation_sender.send_failure(destination, failure)
+ for destination in device_destinations:
+ self.federation_sender.send_device_messages(destination)
+
+ event_stream = result.get("events")
+ if event_stream:
+ latest_pos = event_stream["position"]
+ self.federation_sender.notify_new_events(latest_pos)
+
if __name__ == '__main__':
with LoggingContext("main"):
|