diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 174f6e42be..001bb304ae 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -69,8 +69,6 @@ class FederationRemoteSendQueue(object):
self.edus = SortedDict() # stream position -> Edu
- self.device_messages = SortedDict() # stream position -> destination
-
self.pos = 1
self.pos_time = SortedDict()
@@ -92,7 +90,6 @@ class FederationRemoteSendQueue(object):
"keyed_edu",
"keyed_edu_changed",
"edus",
- "device_messages",
"pos_time",
"presence_destinations",
]:
@@ -171,12 +168,6 @@ class FederationRemoteSendQueue(object):
for key in keys[:i]:
del self.edus[key]
- # Delete things out of device map
- keys = self.device_messages.keys()
- i = self.device_messages.bisect_left(position_to_delete)
- for key in keys[:i]:
- del self.device_messages[key]
-
def notify_new_events(self, current_id):
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
@@ -249,9 +240,8 @@ class FederationRemoteSendQueue(object):
def send_device_messages(self, destination):
"""As per FederationSender"""
- pos = self._next_pos()
- self.device_messages[pos] = destination
- self.notifier.on_new_replication_data()
+ # We don't need to replicate this as it gets sent down a different
+ # stream.
def get_current_token(self):
return self.pos - 1
@@ -339,14 +329,6 @@ class FederationRemoteSendQueue(object):
for (pos, edu) in edus:
rows.append((pos, EduRow(edu)))
- # Fetch changed device messages
- i = self.device_messages.bisect_right(from_token)
- j = self.device_messages.bisect_right(to_token) + 1
- device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
-
- for (destination, pos) in iteritems(device_messages):
- rows.append((pos, DeviceRow(destination=destination)))
-
# Sort rows based on pos
rows.sort()
@@ -472,28 +454,9 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu
buff.edus.setdefault(self.edu.destination, []).append(self.edu)
-class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ("destination",))): # str
- """Streams the fact that either a) there is pending to device messages for
- users on the remote, or b) a local users device has changed and needs to
- be sent to the remote.
- """
-
- TypeId = "d"
-
- @staticmethod
- def from_data(data):
- return DeviceRow(destination=data["destination"])
-
- def to_data(self):
- return {"destination": self.destination}
-
- def add_to_buffer(self, buff):
- buff.device_destinations.add(self.destination)
-
-
TypeToRow = {
Row.TypeId: Row
- for Row in (PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow, DeviceRow)
+ for Row in (PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow,)
}
@@ -504,7 +467,6 @@ ParsedFederationStreamData = namedtuple(
"presence_destinations", # list of tuples of UserPresenceState and destinations
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
- "device_destinations", # set of destinations
),
)
@@ -523,11 +485,7 @@ def process_rows_for_federation(transaction_queue, rows):
# them into the appropriate collection and then send them off.
buff = ParsedFederationStreamData(
- presence=[],
- presence_destinations=[],
- keyed_edus={},
- edus={},
- device_destinations=set(),
+ presence=[], presence_destinations=[], keyed_edus={}, edus={},
)
# Parse the rows in the stream and add to the buffer
@@ -555,6 +513,3 @@ def process_rows_for_federation(transaction_queue, rows):
for destination, edu_list in iteritems(buff.edus):
for edu in edu_list:
transaction_queue.send_edu(edu, None)
-
- for destination in buff.device_destinations:
- transaction_queue.send_device_messages(destination)
|