diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 145c01f3a3..477e16e0fa 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -23,7 +23,6 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
from synapse.http.site import SynapseSite
from synapse.federation import send_queue
-from synapse.federation.units import Edu
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.events import SlavedEventStore
@@ -33,7 +32,6 @@ from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.storage.engines import create_engine
-from synapse.storage.presence import UserPresenceState
from synapse.util.async import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
@@ -277,69 +275,7 @@ class FederationSenderHandler(object):
# The federation stream contains things that we want to send out, e.g.
# presence, typing, etc.
if stream_name == "federation":
- # The federation stream containis a bunch of different types of
- # rows that need to be handled differently. We parse the rows, put
- # them into the appropriate collection and then send them off.
- presence_to_send = {}
- keyed_edus = {}
- edus = {}
- failures = {}
- device_destinations = set()
-
- # Parse the rows in the stream
- for row in rows:
- typ = row.type
- content = row.data
-
- if typ == send_queue.PRESENCE_TYPE:
- destination = content["destination"]
- state = UserPresenceState.from_dict(content["state"])
-
- presence_to_send.setdefault(destination, []).append(state)
- elif typ == send_queue.KEYED_EDU_TYPE:
- key = content["key"]
- edu = Edu(**content["edu"])
-
- keyed_edus.setdefault(
- edu.destination, {}
- )[(edu.destination, tuple(key))] = edu
- elif typ == send_queue.EDU_TYPE:
- edu = Edu(**content)
-
- edus.setdefault(edu.destination, []).append(edu)
- elif typ == send_queue.FAILURE_TYPE:
- destination = content["destination"]
- failure = content["failure"]
-
- failures.setdefault(destination, []).append(failure)
- elif typ == send_queue.DEVICE_MESSAGE_TYPE:
- device_destinations.add(content["destination"])
- else:
- raise Exception("Unrecognised federation type: %r", typ)
-
- # We've finished collecting, send everything off
- for destination, states in presence_to_send.items():
- self.federation_sender.send_presence(destination, states)
-
- for destination, edu_map in keyed_edus.items():
- for key, edu in edu_map.items():
- self.federation_sender.send_edu(
- edu.destination, edu.edu_type, edu.content, key=key,
- )
-
- for destination, edu_list in edus.items():
- for edu in edu_list:
- self.federation_sender.send_edu(
- edu.destination, edu.edu_type, edu.content, key=None,
- )
-
- for destination, failure_list in failures.items():
- for failure in failure_list:
- self.federation_sender.send_failure(destination, failure)
-
- for destination in device_destinations:
- self.federation_sender.send_device_messages(destination)
-
+ send_queue.process_rows_for_federation(self.federation_sender, rows)
preserve_fn(self.update_token)(token)
# We also need to poke the federation sender when new events happen
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 78c852ed69..748548bbe2 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -31,20 +31,19 @@ Events are replicated via a separate events stream.
from .units import Edu
+from synapse.storage.presence import UserPresenceState
from synapse.util.metrics import Measure
import synapse.metrics
from blist import sorteddict
+from collections import namedtuple
+import logging
-metrics = synapse.metrics.get_metrics_for(__name__)
+logger = logging.getLogger(__name__)
-PRESENCE_TYPE = "p"
-KEYED_EDU_TYPE = "k"
-EDU_TYPE = "e"
-FAILURE_TYPE = "f"
-DEVICE_MESSAGE_TYPE = "d"
+metrics = synapse.metrics.get_metrics_for(__name__)
class FederationRemoteSendQueue(object):
@@ -239,6 +238,8 @@ class FederationRemoteSendQueue(object):
if from_token > self.pos:
from_token = -1
+ # list of tuple(int, BaseFederationRow), where the first is the position
+ # of the federation stream.
rows = []
# There should be only one reader, so lets delete everything its
@@ -257,10 +258,10 @@ class FederationRemoteSendQueue(object):
)
for (key, (dest, user_id)) in dest_user_ids:
- rows.append((key, PRESENCE_TYPE, {
- "destination": dest,
- "state": self.presence_map[user_id].as_dict(),
- }))
+ rows.append((key, PresenceRow(
+ destination=dest,
+ state=self.presence_map[user_id],
+ )))
# Fetch changes keyed edus
keys = self.keyed_edu_changed.keys()
@@ -269,12 +270,10 @@ class FederationRemoteSendQueue(object):
keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j])
for (pos, (destination, edu_key)) in keyed_edus:
- rows.append(
- (pos, KEYED_EDU_TYPE, {
- "key": edu_key,
- "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(),
- })
- )
+ rows.append((pos, KeyedEduRow(
+ key=edu_key,
+ edu=self.keyed_edu[(destination, edu_key)],
+ )))
# Fetch changed edus
keys = self.edus.keys()
@@ -283,7 +282,7 @@ class FederationRemoteSendQueue(object):
edus = set((k, self.edus[k]) for k in keys[i:j])
for (pos, edu) in edus:
- rows.append((pos, EDU_TYPE, edu.get_internal_dict()))
+ rows.append((pos, EduRow(edu)))
# Fetch changed failures
keys = self.failures.keys()
@@ -292,10 +291,10 @@ class FederationRemoteSendQueue(object):
failures = set((k, self.failures[k]) for k in keys[i:j])
for (pos, (destination, failure)) in failures:
- rows.append((pos, FAILURE_TYPE, {
- "destination": destination,
- "failure": failure,
- }))
+ rows.append((pos, FailureRow(
+ destination=destination,
+ failure=failure,
+ )))
# Fetch changed device messages
keys = self.device_messages.keys()
@@ -304,11 +303,229 @@ class FederationRemoteSendQueue(object):
device_messages = set((k, self.device_messages[k]) for k in keys[i:j])
for (pos, destination) in device_messages:
- rows.append((pos, DEVICE_MESSAGE_TYPE, {
- "destination": destination,
- }))
+ rows.append((pos, DeviceRow(
+ destination=destination,
+ )))
# Sort rows based on pos
rows.sort()
- return rows
+ return [(pos, row.TypeId, row.to_data()) for pos, row in rows]
+
+
+class BaseFederationRow(object):
+ """Base class for rows to be sent in the federation stream.
+
+ Specifies how to identify, serialize and deserialize the different types.
+ """
+
+ TypeId = None # Unique string that ids the type. Must be overriden in sub classes.
+
+ @staticmethod
+ def from_data(data):
+ """Parse the data from the federation stream into a row.
+
+ Args:
+ data: The value of ``data`` from FederationStreamRow.data, type
+ depends on the type of stream
+ """
+ raise NotImplementedError()
+
+ def to_data(self):
+ """Serialize this row to be sent over the federation stream.
+
+ Returns:
+ The value to be sent in FederationStreamRow.data. The type depends
+ on the type of stream.
+ """
+ raise NotImplementedError()
+
+ def add_to_buffer(self, buff):
+ """Add this row to the appropriate field in the buffer ready for this
+ to be sent over federation.
+
+ We use a buffer so that we can batch up events that have come in at
+ the same time and send them all at once.
+
+ Args:
+ buff (BufferedToSend)
+ """
+ raise NotImplementedError()
+
+
+class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
+ "destination", # str
+ "state", # UserPresenceState
+))):
+ TypeId = "p"
+
+ @staticmethod
+ def from_data(data):
+ return PresenceRow(
+ destination=data["destination"],
+ state=UserPresenceState.from_dict(data["state"])
+ )
+
+ def to_data(self):
+ return {
+ "destination": self.destination,
+ "state": self.state.as_dict()
+ }
+
+ def add_to_buffer(self, buff):
+ buff.presence.setdefault(self.destination, []).append(self.state)
+
+
+class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
+ "key", # tuple(str) - the edu key passed to send_edu
+ "edu", # Edu
+))):
+ TypeId = "k"
+
+ @staticmethod
+ def from_data(data):
+ return KeyedEduRow(
+ key=tuple(data["key"]),
+ edu=Edu(**data["edu"]),
+ )
+
+ def to_data(self):
+ return {
+ "key": self.key,
+ "edu": self.edu.get_internal_dict(),
+ }
+
+ def add_to_buffer(self, buff):
+ buff.keyed_edus.setdefault(
+ self.edu.destination, {}
+ )[self.key] = self.edu
+
+
+class EduRow(BaseFederationRow, namedtuple("EduRow", (
+ "edu", # Edu
+))):
+ TypeId = "e"
+
+ @staticmethod
+ def from_data(data):
+ return EduRow(Edu(**data))
+
+ def to_data(self):
+ return self.edu.get_internal_dict()
+
+ def add_to_buffer(self, buff):
+ buff.edus.setdefault(self.edu.destination, []).append(self.edu)
+
+
+class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
+ "destination", # str
+ "failure",
+))):
+ TypeId = "f"
+
+ @staticmethod
+ def from_data(data):
+ return FailureRow(
+ destination=data["destination"],
+ failure=data["failure"],
+ )
+
+ def to_data(self):
+ return {
+ "destination": self.destination,
+ "failure": self.failure,
+ }
+
+ def add_to_buffer(self, buff):
+ buff.failures.setdefault(self.destination, []).append(self.failure)
+
+
+class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
+ "destination", # str
+))):
+ TypeId = "d"
+
+ @staticmethod
+ def from_data(data):
+ return DeviceRow(destination=data["destination"])
+
+ def to_data(self):
+ return {"destination": self.destination}
+
+ def add_to_buffer(self, buff):
+ buff.device_destinations.add(self.destination)
+
+
+TypeToRow = {
+ Row.TypeId: Row
+ for Row in (
+ PresenceRow,
+ KeyedEduRow,
+ EduRow,
+ FailureRow,
+ DeviceRow,
+ )
+}
+
+
+ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
+ "presence", # dict of destination -> [UserPresenceState]
+ "keyed_edus", # dict of destination -> { key -> Edu }
+ "edus", # dict of destination -> [Edu]
+ "failures", # dict of destination -> [failures]
+ "device_destinations", # set of destinations
+))
+
+
+def process_rows_for_federation(transaction_queue, rows):
+ """Parse a list of rows from the federation stream and put them in the
+ transaction queue ready for sending to the relevant homeservers.
+
+ Args:
+ transaction_queue (TransactionQueue)
+ rows (list(synapse.replication.tcp.streams.FederationStreamRow))
+ """
+
+ # The federation stream contains a bunch of different types of
+ # rows that need to be handled differently. We parse the rows, put
+ # them into the appropriate collection and then send them off.
+
+ buff = ParsedFederationStreamData(
+ presence={},
+ keyed_edus={},
+ edus={},
+ failures={},
+ device_destinations=set(),
+ )
+
+ # Parse the rows in the stream and add to the buffer
+ for row in rows:
+ if row.type not in TypeToRow:
+ logger.error("Unrecognized federation row type %r", row.type)
+ continue
+
+ RowType = TypeToRow[row.type]
+ parsed_row = RowType.from_data(row.data)
+ parsed_row.add_to_buffer(buff)
+
+ for destination, states in buff.presence.iteritems():
+ transaction_queue.send_presence(destination, states)
+
+ for destination, edu_map in buff.keyed_edus.iteritems():
+ for key, edu in edu_map.items():
+ transaction_queue.send_edu(
+ edu.destination, edu.edu_type, edu.content, key=key,
+ )
+
+ for destination, edu_list in buff.edus.iteritems():
+ for edu in edu_list:
+ transaction_queue.send_edu(
+ edu.destination, edu.edu_type, edu.content, key=None,
+ )
+
+ for destination, failure_list in buff.failures.iteritems():
+ for failure in failure_list:
+ transaction_queue.send_failure(destination, failure)
+
+ for destination in buff.device_destinations:
+ transaction_queue.send_device_messages(destination)
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index 967b459e0e..369d5f2428 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -98,8 +98,8 @@ ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
"entity", # str
))
FederationStreamRow = namedtuple("FederationStreamRow", (
- "type", # str
- "data", # dict
+ "type", # str, the type of data as defined in the BaseFederationRows
+ "data", # dict, serialization of a federation.send_queue.BaseFederationRow
))
TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
"user_id", # str
|