summary refs log tree commit diff
path: root/synapse/federation/send_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/send_queue.py')
-rw-r--r--synapse/federation/send_queue.py267
1 files changed, 242 insertions, 25 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 78c852ed69..748548bbe2 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -31,20 +31,19 @@ Events are replicated via a separate events stream.
 
 from .units import Edu
 
+from synapse.storage.presence import UserPresenceState
 from synapse.util.metrics import Measure
 import synapse.metrics
 
 from blist import sorteddict
+from collections import namedtuple
 
+import logging
 
-metrics = synapse.metrics.get_metrics_for(__name__)
+logger = logging.getLogger(__name__)
 
 
-PRESENCE_TYPE = "p"
-KEYED_EDU_TYPE = "k"
-EDU_TYPE = "e"
-FAILURE_TYPE = "f"
-DEVICE_MESSAGE_TYPE = "d"
+metrics = synapse.metrics.get_metrics_for(__name__)
 
 
 class FederationRemoteSendQueue(object):
@@ -239,6 +238,8 @@ class FederationRemoteSendQueue(object):
         if from_token > self.pos:
             from_token = -1
 
+        # list of tuple(int, BaseFederationRow), where the first is the position
+        # of the federation stream.
         rows = []
 
         # There should be only one reader, so lets delete everything its
@@ -257,10 +258,10 @@ class FederationRemoteSendQueue(object):
         )
 
         for (key, (dest, user_id)) in dest_user_ids:
-            rows.append((key, PRESENCE_TYPE, {
-                "destination": dest,
-                "state": self.presence_map[user_id].as_dict(),
-            }))
+            rows.append((key, PresenceRow(
+                destination=dest,
+                state=self.presence_map[user_id],
+            )))
 
         # Fetch changes keyed edus
         keys = self.keyed_edu_changed.keys()
@@ -269,12 +270,10 @@ class FederationRemoteSendQueue(object):
         keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j])
 
         for (pos, (destination, edu_key)) in keyed_edus:
-            rows.append(
-                (pos, KEYED_EDU_TYPE, {
-                    "key": edu_key,
-                    "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(),
-                })
-            )
+            rows.append((pos, KeyedEduRow(
+                key=edu_key,
+                edu=self.keyed_edu[(destination, edu_key)],
+            )))
 
         # Fetch changed edus
         keys = self.edus.keys()
@@ -283,7 +282,7 @@ class FederationRemoteSendQueue(object):
         edus = set((k, self.edus[k]) for k in keys[i:j])
 
         for (pos, edu) in edus:
-            rows.append((pos, EDU_TYPE, edu.get_internal_dict()))
+            rows.append((pos, EduRow(edu)))
 
         # Fetch changed failures
         keys = self.failures.keys()
@@ -292,10 +291,10 @@ class FederationRemoteSendQueue(object):
         failures = set((k, self.failures[k]) for k in keys[i:j])
 
         for (pos, (destination, failure)) in failures:
-            rows.append((pos, FAILURE_TYPE, {
-                "destination": destination,
-                "failure": failure,
-            }))
+            rows.append((pos, FailureRow(
+                destination=destination,
+                failure=failure,
+            )))
 
         # Fetch changed device messages
         keys = self.device_messages.keys()
@@ -304,11 +303,229 @@ class FederationRemoteSendQueue(object):
         device_messages = set((k, self.device_messages[k]) for k in keys[i:j])
 
         for (pos, destination) in device_messages:
-            rows.append((pos, DEVICE_MESSAGE_TYPE, {
-                "destination": destination,
-            }))
+            rows.append((pos, DeviceRow(
+                destination=destination,
+            )))
 
         # Sort rows based on pos
         rows.sort()
 
-        return rows
+        return [(pos, row.TypeId, row.to_data()) for pos, row in rows]
+
+
+class BaseFederationRow(object):
+    """Base class for rows to be sent in the federation stream.
+
+    Specifies how to identify, serialize and deserialize the different types.
+    """
+
+    TypeId = None  # Unique string that ids the type. Must be overriden in sub classes.
+
+    @staticmethod
+    def from_data(data):
+        """Parse the data from the federation stream into a row.
+
+        Args:
+            data: The value of ``data`` from FederationStreamRow.data, type
+                depends on the type of stream
+        """
+        raise NotImplementedError()
+
+    def to_data(self):
+        """Serialize this row to be sent over the federation stream.
+
+        Returns:
+            The value to be sent in FederationStreamRow.data. The type depends
+            on the type of stream.
+        """
+        raise NotImplementedError()
+
+    def add_to_buffer(self, buff):
+        """Add this row to the appropriate field in the buffer ready for this
+        to be sent over federation.
+
+        We use a buffer so that we can batch up events that have come in at
+        the same time and send them all at once.
+
+        Args:
+            buff (BufferedToSend)
+        """
+        raise NotImplementedError()
+
+
+class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
+    "destination",  # str
+    "state",  # UserPresenceState
+))):
+    TypeId = "p"
+
+    @staticmethod
+    def from_data(data):
+        return PresenceRow(
+            destination=data["destination"],
+            state=UserPresenceState.from_dict(data["state"])
+        )
+
+    def to_data(self):
+        return {
+            "destination": self.destination,
+            "state": self.state.as_dict()
+        }
+
+    def add_to_buffer(self, buff):
+        buff.presence.setdefault(self.destination, []).append(self.state)
+
+
+class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
+    "key",  # tuple(str) - the edu key passed to send_edu
+    "edu",  # Edu
+))):
+    TypeId = "k"
+
+    @staticmethod
+    def from_data(data):
+        return KeyedEduRow(
+            key=tuple(data["key"]),
+            edu=Edu(**data["edu"]),
+        )
+
+    def to_data(self):
+        return {
+            "key": self.key,
+            "edu": self.edu.get_internal_dict(),
+        }
+
+    def add_to_buffer(self, buff):
+        buff.keyed_edus.setdefault(
+            self.edu.destination, {}
+        )[self.key] = self.edu
+
+
+class EduRow(BaseFederationRow, namedtuple("EduRow", (
+    "edu",  # Edu
+))):
+    TypeId = "e"
+
+    @staticmethod
+    def from_data(data):
+        return EduRow(Edu(**data))
+
+    def to_data(self):
+        return self.edu.get_internal_dict()
+
+    def add_to_buffer(self, buff):
+        buff.edus.setdefault(self.edu.destination, []).append(self.edu)
+
+
+class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
+    "destination",  # str
+    "failure",
+))):
+    TypeId = "f"
+
+    @staticmethod
+    def from_data(data):
+        return FailureRow(
+            destination=data["destination"],
+            failure=data["failure"],
+        )
+
+    def to_data(self):
+        return {
+            "destination": self.destination,
+            "failure": self.failure,
+        }
+
+    def add_to_buffer(self, buff):
+        buff.failures.setdefault(self.destination, []).append(self.failure)
+
+
+class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
+    "destination",  # str
+))):
+    TypeId = "d"
+
+    @staticmethod
+    def from_data(data):
+        return DeviceRow(destination=data["destination"])
+
+    def to_data(self):
+        return {"destination": self.destination}
+
+    def add_to_buffer(self, buff):
+        buff.device_destinations.add(self.destination)
+
+
+TypeToRow = {
+    Row.TypeId: Row
+    for Row in (
+        PresenceRow,
+        KeyedEduRow,
+        EduRow,
+        FailureRow,
+        DeviceRow,
+    )
+}
+
+
+ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
+    "presence",  # dict of destination -> [UserPresenceState]
+    "keyed_edus",  # dict of destination -> { key -> Edu }
+    "edus",  # dict of destination -> [Edu]
+    "failures",  # dict of destination -> [failures]
+    "device_destinations",  # set of destinations
+))
+
+
+def process_rows_for_federation(transaction_queue, rows):
+    """Parse a list of rows from the federation stream and put them in the
+    transaction queue ready for sending to the relevant homeservers.
+
+    Args:
+        transaction_queue (TransactionQueue)
+        rows (list(synapse.replication.tcp.streams.FederationStreamRow))
+    """
+
+    # The federation stream contains a bunch of different types of
+    # rows that need to be handled differently. We parse the rows, put
+    # them into the appropriate collection and then send them off.
+
+    buff = ParsedFederationStreamData(
+        presence={},
+        keyed_edus={},
+        edus={},
+        failures={},
+        device_destinations=set(),
+    )
+
+    # Parse the rows in the stream and add to the buffer
+    for row in rows:
+        if row.type not in TypeToRow:
+            logger.error("Unrecognized federation row type %r", row.type)
+            continue
+
+        RowType = TypeToRow[row.type]
+        parsed_row = RowType.from_data(row.data)
+        parsed_row.add_to_buffer(buff)
+
+    for destination, states in buff.presence.iteritems():
+        transaction_queue.send_presence(destination, states)
+
+    for destination, edu_map in buff.keyed_edus.iteritems():
+        for key, edu in edu_map.items():
+            transaction_queue.send_edu(
+                edu.destination, edu.edu_type, edu.content, key=key,
+            )
+
+    for destination, edu_list in buff.edus.iteritems():
+        for edu in edu_list:
+            transaction_queue.send_edu(
+                edu.destination, edu.edu_type, edu.content, key=None,
+            )
+
+    for destination, failure_list in buff.failures.iteritems():
+        for failure in failure_list:
+            transaction_queue.send_failure(destination, failure)
+
+    for destination in buff.device_destinations:
+        transaction_queue.send_device_messages(destination)