summary refs log tree commit diff
path: root/synapse/federation/send_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/send_queue.py')
-rw-r--r--synapse/federation/send_queue.py53
1 files changed, 4 insertions, 49 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 174f6e42be..001bb304ae 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -69,8 +69,6 @@ class FederationRemoteSendQueue(object):
 
         self.edus = SortedDict()  # stream position -> Edu
 
-        self.device_messages = SortedDict()  # stream position -> destination
-
         self.pos = 1
         self.pos_time = SortedDict()
 
@@ -92,7 +90,6 @@ class FederationRemoteSendQueue(object):
             "keyed_edu",
             "keyed_edu_changed",
             "edus",
-            "device_messages",
             "pos_time",
             "presence_destinations",
         ]:
@@ -171,12 +168,6 @@ class FederationRemoteSendQueue(object):
             for key in keys[:i]:
                 del self.edus[key]
 
-            # Delete things out of device map
-            keys = self.device_messages.keys()
-            i = self.device_messages.bisect_left(position_to_delete)
-            for key in keys[:i]:
-                del self.device_messages[key]
-
     def notify_new_events(self, current_id):
         """As per FederationSender"""
         # We don't need to replicate this as it gets sent down a different
@@ -249,9 +240,8 @@ class FederationRemoteSendQueue(object):
 
     def send_device_messages(self, destination):
         """As per FederationSender"""
-        pos = self._next_pos()
-        self.device_messages[pos] = destination
-        self.notifier.on_new_replication_data()
+        # We don't need to replicate this as it gets sent down a different
+        # stream.
 
     def get_current_token(self):
         return self.pos - 1
@@ -339,14 +329,6 @@ class FederationRemoteSendQueue(object):
         for (pos, edu) in edus:
             rows.append((pos, EduRow(edu)))
 
-        # Fetch changed device messages
-        i = self.device_messages.bisect_right(from_token)
-        j = self.device_messages.bisect_right(to_token) + 1
-        device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
-
-        for (destination, pos) in iteritems(device_messages):
-            rows.append((pos, DeviceRow(destination=destination)))
-
         # Sort rows based on pos
         rows.sort()
 
@@ -472,28 +454,9 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))):  # Edu
         buff.edus.setdefault(self.edu.destination, []).append(self.edu)
 
 
-class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ("destination",))):  # str
-    """Streams the fact that either a) there is pending to device messages for
-    users on the remote, or b) a local users device has changed and needs to
-    be sent to the remote.
-    """
-
-    TypeId = "d"
-
-    @staticmethod
-    def from_data(data):
-        return DeviceRow(destination=data["destination"])
-
-    def to_data(self):
-        return {"destination": self.destination}
-
-    def add_to_buffer(self, buff):
-        buff.device_destinations.add(self.destination)
-
-
 TypeToRow = {
     Row.TypeId: Row
-    for Row in (PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow, DeviceRow)
+    for Row in (PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow,)
 }
 
 
@@ -504,7 +467,6 @@ ParsedFederationStreamData = namedtuple(
         "presence_destinations",  # list of tuples of UserPresenceState and destinations
         "keyed_edus",  # dict of destination -> { key -> Edu }
         "edus",  # dict of destination -> [Edu]
-        "device_destinations",  # set of destinations
     ),
 )
 
@@ -523,11 +485,7 @@ def process_rows_for_federation(transaction_queue, rows):
     # them into the appropriate collection and then send them off.
 
     buff = ParsedFederationStreamData(
-        presence=[],
-        presence_destinations=[],
-        keyed_edus={},
-        edus={},
-        device_destinations=set(),
+        presence=[], presence_destinations=[], keyed_edus={}, edus={},
     )
 
     # Parse the rows in the stream and add to the buffer
@@ -555,6 +513,3 @@ def process_rows_for_federation(transaction_queue, rows):
     for destination, edu_list in iteritems(buff.edus):
         for edu in edu_list:
             transaction_queue.send_edu(edu, None)
-
-    for destination in buff.device_destinations:
-        transaction_queue.send_device_messages(destination)