diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 145c01f3a3..477e16e0fa 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -23,7 +23,6 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
from synapse.http.site import SynapseSite
from synapse.federation import send_queue
-from synapse.federation.units import Edu
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.events import SlavedEventStore
@@ -33,7 +32,6 @@ from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.storage.engines import create_engine
-from synapse.storage.presence import UserPresenceState
from synapse.util.async import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
@@ -277,69 +275,7 @@ class FederationSenderHandler(object):
# The federation stream contains things that we want to send out, e.g.
# presence, typing, etc.
if stream_name == "federation":
- # The federation stream containis a bunch of different types of
- # rows that need to be handled differently. We parse the rows, put
- # them into the appropriate collection and then send them off.
- presence_to_send = {}
- keyed_edus = {}
- edus = {}
- failures = {}
- device_destinations = set()
-
- # Parse the rows in the stream
- for row in rows:
- typ = row.type
- content = row.data
-
- if typ == send_queue.PRESENCE_TYPE:
- destination = content["destination"]
- state = UserPresenceState.from_dict(content["state"])
-
- presence_to_send.setdefault(destination, []).append(state)
- elif typ == send_queue.KEYED_EDU_TYPE:
- key = content["key"]
- edu = Edu(**content["edu"])
-
- keyed_edus.setdefault(
- edu.destination, {}
- )[(edu.destination, tuple(key))] = edu
- elif typ == send_queue.EDU_TYPE:
- edu = Edu(**content)
-
- edus.setdefault(edu.destination, []).append(edu)
- elif typ == send_queue.FAILURE_TYPE:
- destination = content["destination"]
- failure = content["failure"]
-
- failures.setdefault(destination, []).append(failure)
- elif typ == send_queue.DEVICE_MESSAGE_TYPE:
- device_destinations.add(content["destination"])
- else:
- raise Exception("Unrecognised federation type: %r", typ)
-
- # We've finished collecting, send everything off
- for destination, states in presence_to_send.items():
- self.federation_sender.send_presence(destination, states)
-
- for destination, edu_map in keyed_edus.items():
- for key, edu in edu_map.items():
- self.federation_sender.send_edu(
- edu.destination, edu.edu_type, edu.content, key=key,
- )
-
- for destination, edu_list in edus.items():
- for edu in edu_list:
- self.federation_sender.send_edu(
- edu.destination, edu.edu_type, edu.content, key=None,
- )
-
- for destination, failure_list in failures.items():
- for failure in failure_list:
- self.federation_sender.send_failure(destination, failure)
-
- for destination in device_destinations:
- self.federation_sender.send_device_messages(destination)
-
+ send_queue.process_rows_for_federation(self.federation_sender, rows)
preserve_fn(self.update_token)(token)
# We also need to poke the federation sender when new events happen
|