diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index d439be050a..3fc625c4dd 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -23,11 +23,13 @@ PRESENCE_TYPE = "p"
KEYED_EDU_TYPE = "k"
EDU_TYPE = "e"
FAILURE_TYPE = "f"
+DEVICE_MESSAGE_TYPE = "d"
class FederationRemoteSendQueue(object):
def __init__(self, hs):
+ self.server_name = hs.hostname
self.clock = hs.get_clock()
# TODO: Add metrics for size of lists below
@@ -45,6 +47,8 @@ class FederationRemoteSendQueue(object):
self.pos = 1
self.pos_time = sorteddict()
+ self.device_messages = sorteddict()
+
self.clock.looping_call(self._clear_queue, 30 * 1000)
def _next_pos(self):
@@ -111,6 +115,15 @@ class FederationRemoteSendQueue(object):
for key in keys[:i]:
del self.failures[key]
+ # Delete things out of device map
+ keys = self.device_messages.keys()
+ i = keys.bisect_left(position_to_delete)
+ for key in keys[:i]:
+ del self.device_messages[key]
+
+ def notify_new_events(self, current_id):
+ pass
+
def send_edu(self, destination, edu_type, content, key=None):
pos = self._next_pos()
@@ -122,6 +135,7 @@ class FederationRemoteSendQueue(object):
)
if key:
+ assert isinstance(key, tuple)
self.keyed_edu[(destination, key)] = edu
self.keyed_edu_changed[pos] = (destination, key)
else:
@@ -148,9 +162,9 @@ class FederationRemoteSendQueue(object):
# This gets sent down a separate path
pass
- def notify_new_device_message(self, destination):
- # TODO
- pass
+ def send_device_messages(self, destination):
+ pos = self._next_pos()
+ self.device_messages[pos] = destination
def get_current_token(self):
return self.pos - 1
@@ -188,11 +202,11 @@ class FederationRemoteSendQueue(object):
i = keys.bisect_right(token)
keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
- for (pos, edu_key) in keyed_edus:
+ for (pos, (destination, edu_key)) in keyed_edus:
rows.append(
(pos, KEYED_EDU_TYPE, ujson.dumps({
"key": edu_key,
- "edu": self.keyed_edu[edu_key].get_dict(),
+ "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(),
}))
)
@@ -202,7 +216,7 @@ class FederationRemoteSendQueue(object):
edus = set((k, self.edus[k]) for k in keys[i:])
for (pos, edu) in edus:
- rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_dict())))
+ rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))
# Fetch changed failures
keys = self.failures.keys()
@@ -210,11 +224,21 @@ class FederationRemoteSendQueue(object):
failures = set((k, self.failures[k]) for k in keys[i:])
for (pos, (destination, failure)) in failures:
- rows.append((pos, None, FAILURE_TYPE, ujson.dumps({
+ rows.append((pos, FAILURE_TYPE, ujson.dumps({
"destination": destination,
"failure": failure,
})))
+ # Fetch changed device messages
+ keys = self.device_messages.keys()
+ i = keys.bisect_right(token)
+ device_messages = set((k, self.device_messages[k]) for k in keys[i:])
+
+ for (pos, destination) in device_messages:
+ rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({
+ "destination": destination,
+ })))
+
# Sort rows based on pos
rows.sort()
|