summary refs log tree commit diff
path: root/synapse/app/federation_sender.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/federation_sender.py')
-rw-r--r--synapse/app/federation_sender.py66
1 files changed, 1 insertions, 65 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 145c01f3a3..477e16e0fa 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -23,7 +23,6 @@ from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
 from synapse.http.site import SynapseSite
 from synapse.federation import send_queue
-from synapse.federation.units import Edu
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
 from synapse.replication.slave.storage.events import SlavedEventStore
@@ -33,7 +32,6 @@ from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.storage.engines import create_engine
-from synapse.storage.presence import UserPresenceState
 from synapse.util.async import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
@@ -277,69 +275,7 @@ class FederationSenderHandler(object):
         # The federation stream contains things that we want to send out, e.g.
         # presence, typing, etc.
         if stream_name == "federation":
-            # The federation stream containis a bunch of different types of
-            # rows that need to be handled differently. We parse the rows, put
-            # them into the appropriate collection and then send them off.
-            presence_to_send = {}
-            keyed_edus = {}
-            edus = {}
-            failures = {}
-            device_destinations = set()
-
-            # Parse the rows in the stream
-            for row in rows:
-                typ = row.type
-                content = row.data
-
-                if typ == send_queue.PRESENCE_TYPE:
-                    destination = content["destination"]
-                    state = UserPresenceState.from_dict(content["state"])
-
-                    presence_to_send.setdefault(destination, []).append(state)
-                elif typ == send_queue.KEYED_EDU_TYPE:
-                    key = content["key"]
-                    edu = Edu(**content["edu"])
-
-                    keyed_edus.setdefault(
-                        edu.destination, {}
-                    )[(edu.destination, tuple(key))] = edu
-                elif typ == send_queue.EDU_TYPE:
-                    edu = Edu(**content)
-
-                    edus.setdefault(edu.destination, []).append(edu)
-                elif typ == send_queue.FAILURE_TYPE:
-                    destination = content["destination"]
-                    failure = content["failure"]
-
-                    failures.setdefault(destination, []).append(failure)
-                elif typ == send_queue.DEVICE_MESSAGE_TYPE:
-                    device_destinations.add(content["destination"])
-                else:
-                    raise Exception("Unrecognised federation type: %r", typ)
-
-            # We've finished collecting, send everything off
-            for destination, states in presence_to_send.items():
-                self.federation_sender.send_presence(destination, states)
-
-            for destination, edu_map in keyed_edus.items():
-                for key, edu in edu_map.items():
-                    self.federation_sender.send_edu(
-                        edu.destination, edu.edu_type, edu.content, key=key,
-                    )
-
-            for destination, edu_list in edus.items():
-                for edu in edu_list:
-                    self.federation_sender.send_edu(
-                        edu.destination, edu.edu_type, edu.content, key=None,
-                    )
-
-            for destination, failure_list in failures.items():
-                for failure in failure_list:
-                    self.federation_sender.send_failure(destination, failure)
-
-            for destination in device_destinations:
-                self.federation_sender.send_device_messages(destination)
-
+            send_queue.process_rows_for_federation(self.federation_sender, rows)
             preserve_fn(self.update_token)(token)
 
         # We also need to poke the federation sender when new events happen