summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/client.py15
-rw-r--r--synapse/replication/tcp/protocol.py6
-rw-r--r--synapse/replication/tcp/resource.py3
-rw-r--r--synapse/replication/tcp/streams/__init__.py49
-rw-r--r--synapse/replication/tcp/streams/_base.py (renamed from synapse/replication/tcp/streams.py)127
-rw-r--r--synapse/replication/tcp/streams/events.py146
-rw-r--r--synapse/replication/tcp/streams/federation.py39
7 files changed, 277 insertions, 108 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e558f90e1a..206dc3b397 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -103,10 +103,19 @@ class ReplicationClientHandler(object):
         hs.get_reactor().connectTCP(host, port, self.factory)
 
     def on_rdata(self, stream_name, token, rows):
-        """Called when we get new replication data. By default this just pokes
-        the slave store.
+        """Called to handle a batch of replication data with a given stream token.
 
-        Can be overriden in subclasses to handle more.
+        By default this just pokes the slave store. Can be overridden in subclasses to
+        handle more.
+
+        Args:
+            stream_name (str): name of the replication stream for this batch of rows
+            token (int): stream token for this batch of rows
+            rows (list): a list of Stream.ROW_TYPE objects as returned by
+                Stream.parse_row.
+
+        Returns:
+            Deferred|None
         """
         logger.debug("Received rdata %s -> %s", stream_name, token)
         return self.store.process_replication_rows(stream_name, token, rows)
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 02e5bf6cc8..b51590cf8f 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -42,8 +42,8 @@ indicate which side is sending, these are *not* included on the wire::
     > POSITION backfill 1
     > POSITION caches 1
     > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
-    > RDATA events 14 ["$149019767112vOHxz:localhost:8823",
-        "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
+    > RDATA events 14 ["ev", ["$149019767112vOHxz:localhost:8823",
+        "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]]
     < PING 1490197675618
     > ERROR server stopping
     * connection closed by server *
@@ -605,7 +605,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
         inbound_rdata_count.labels(stream_name).inc()
 
         try:
-            row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row)
+            row = STREAMS_MAP[stream_name].parse_row(cmd.row)
         except Exception:
             logger.exception(
                 "[%s] Failed to parse RDATA: %r %r",
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 7fc346c7b6..f6a38f5140 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -30,7 +30,8 @@ from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.metrics import Measure, measure_func
 
 from .protocol import ServerReplicationStreamProtocol
-from .streams import STREAMS_MAP, FederationStream
+from .streams import STREAMS_MAP
+from .streams.federation import FederationStream
 
 stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates",
                                  "", ["stream_name"])
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
new file mode 100644
index 0000000000..634f636dc9
--- /dev/null
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -0,0 +1,49 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Defines all the valid streams that clients can subscribe to, and the format
+of the rows returned by each stream.
+
+Each stream is defined by the following information:
+
+    stream name:        The name of the stream
+    row type:           The type that is used to serialise/deserialse the row
+    current_token:      The function that returns the current token for the stream
+    update_function:    The function that returns a list of updates between two tokens
+"""
+
+from . import _base, events, federation
+
+STREAMS_MAP = {
+    stream.NAME: stream
+    for stream in (
+        events.EventsStream,
+        _base.BackfillStream,
+        _base.PresenceStream,
+        _base.TypingStream,
+        _base.ReceiptsStream,
+        _base.PushRulesStream,
+        _base.PushersStream,
+        _base.CachesStream,
+        _base.PublicRoomsStream,
+        _base.DeviceListsStream,
+        _base.ToDeviceStream,
+        federation.FederationStream,
+        _base.TagAccountDataStream,
+        _base.AccountDataStream,
+        _base.GroupServerStream,
+    )
+}
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams/_base.py
index e23084baae..8971a6a22e 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,16 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"""Defines all the valid streams that clients can subscribe to, and the format
-of the rows returned by each stream.
 
-Each stream is defined by the following information:
-
-    stream name:        The name of the stream
-    row type:           The type that is used to serialise/deserialse the row
-    current_token:      The function that returns the current token for the stream
-    update_function:    The function that returns a list of updates between two tokens
-"""
 import itertools
 import logging
 from collections import namedtuple
@@ -34,14 +26,6 @@ logger = logging.getLogger(__name__)
 
 MAX_EVENTS_BEHIND = 10000
 
-
-EventStreamRow = namedtuple("EventStreamRow", (
-    "event_id",  # str
-    "room_id",  # str
-    "type",  # str
-    "state_key",  # str, optional
-    "redacts",  # str, optional
-))
 BackfillStreamRow = namedtuple("BackfillStreamRow", (
     "event_id",  # str
     "room_id",  # str
@@ -96,10 +80,6 @@ DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
 ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
     "entity",  # str
 ))
-FederationStreamRow = namedtuple("FederationStreamRow", (
-    "type",  # str, the type of data as defined in the BaseFederationRows
-    "data",  # dict, serialization of a federation.send_queue.BaseFederationRow
-))
 TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
     "user_id",  # str
     "room_id",  # str
@@ -111,12 +91,6 @@ AccountDataStreamRow = namedtuple("AccountDataStream", (
     "data_type",  # str
     "data",  # dict
 ))
-CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", (
-    "room_id",  # str
-    "type",  # str
-    "state_key",  # str
-    "event_id",  # str, optional
-))
 GroupsStreamRow = namedtuple("GroupsStreamRow", (
     "group_id",  # str
     "user_id",  # str
@@ -132,9 +106,24 @@ class Stream(object):
     time it was called up until the point `advance_current_token` was called.
     """
     NAME = None  # The name of the stream
-    ROW_TYPE = None  # The type of the row
+    ROW_TYPE = None  # The type of the row. Used by the default impl of parse_row.
     _LIMITED = True  # Whether the update function takes a limit
 
+    @classmethod
+    def parse_row(cls, row):
+        """Parse a row received over replication
+
+        By default, assumes that the row data is an array object and passes its contents
+        to the constructor of the ROW_TYPE for this stream.
+
+        Args:
+            row: row data from the incoming RDATA command, after json decoding
+
+        Returns:
+            ROW_TYPE object for this stream
+        """
+        return cls.ROW_TYPE(*row)
+
     def __init__(self, hs):
         # The token from which we last asked for updates
         self.last_token = self.current_token()
@@ -162,8 +151,10 @@ class Stream(object):
         until the `upto_token`
 
         Returns:
-            (list(ROW_TYPE), int): list of updates plus the token used as an
-                upper bound of the updates (i.e. the "current token")
+            Deferred[Tuple[List[Tuple[int, Any]], int]:
+                Resolves to a pair ``(updates, current_token)``, where ``updates`` is a
+                list of ``(token, row)`` entries. ``row`` will be json-serialised and
+                sent over the replication steam.
         """
         updates, current_token = yield self.get_updates_since(self.last_token)
         self.last_token = current_token
@@ -176,8 +167,10 @@ class Stream(object):
         stream updates
 
         Returns:
-            (list(ROW_TYPE), int): list of updates plus the token used as an
-                upper bound of the updates (i.e. the "current token")
+            Deferred[Tuple[List[Tuple[int, Any]], int]:
+                Resolves to a pair ``(updates, current_token)``, where ``updates`` is a
+                list of ``(token, row)`` entries. ``row`` will be json-serialised and
+                sent over the replication steam.
         """
         if from_token in ("NOW", "now"):
             defer.returnValue(([], self.upto_token))
@@ -202,7 +195,7 @@ class Stream(object):
                 from_token, current_token,
             )
 
-        updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows]
+        updates = [(row[0], row[1:]) for row in rows]
 
         # check we didn't get more rows than the limit.
         # doing it like this allows the update_function to be a generator.
@@ -232,20 +225,6 @@ class Stream(object):
         raise NotImplementedError()
 
 
-class EventsStream(Stream):
-    """We received a new event, or an event went from being an outlier to not
-    """
-    NAME = "events"
-    ROW_TYPE = EventStreamRow
-
-    def __init__(self, hs):
-        store = hs.get_datastore()
-        self.current_token = store.get_current_events_token
-        self.update_function = store.get_all_new_forward_event_rows
-
-        super(EventsStream, self).__init__(hs)
-
-
 class BackfillStream(Stream):
     """We fetched some old events and either we had never seen that event before
     or it went from being an outlier to not.
@@ -400,22 +379,6 @@ class ToDeviceStream(Stream):
         super(ToDeviceStream, self).__init__(hs)
 
 
-class FederationStream(Stream):
-    """Data to be sent over federation. Only available when master has federation
-    sending disabled.
-    """
-    NAME = "federation"
-    ROW_TYPE = FederationStreamRow
-
-    def __init__(self, hs):
-        federation_sender = hs.get_federation_sender()
-
-        self.current_token = federation_sender.get_current_token
-        self.update_function = federation_sender.get_replication_rows
-
-        super(FederationStream, self).__init__(hs)
-
-
 class TagAccountDataStream(Stream):
     """Someone added/removed a tag for a room
     """
@@ -459,21 +422,6 @@ class AccountDataStream(Stream):
         defer.returnValue(results)
 
 
-class CurrentStateDeltaStream(Stream):
-    """Current state for a room was changed
-    """
-    NAME = "current_state_deltas"
-    ROW_TYPE = CurrentStateDeltaStreamRow
-
-    def __init__(self, hs):
-        store = hs.get_datastore()
-
-        self.current_token = store.get_max_current_state_delta_stream_id
-        self.update_function = store.get_all_updated_current_state_deltas
-
-        super(CurrentStateDeltaStream, self).__init__(hs)
-
-
 class GroupServerStream(Stream):
     NAME = "groups"
     ROW_TYPE = GroupsStreamRow
@@ -485,26 +433,3 @@ class GroupServerStream(Stream):
         self.update_function = store.get_all_groups_changes
 
         super(GroupServerStream, self).__init__(hs)
-
-
-STREAMS_MAP = {
-    stream.NAME: stream
-    for stream in (
-        EventsStream,
-        BackfillStream,
-        PresenceStream,
-        TypingStream,
-        ReceiptsStream,
-        PushRulesStream,
-        PushersStream,
-        CachesStream,
-        PublicRoomsStream,
-        DeviceListsStream,
-        ToDeviceStream,
-        FederationStream,
-        TagAccountDataStream,
-        AccountDataStream,
-        CurrentStateDeltaStream,
-        GroupServerStream,
-    )
-}
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
new file mode 100644
index 0000000000..e0f6e29248
--- /dev/null
+++ b/synapse/replication/tcp/streams/events.py
@@ -0,0 +1,146 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import heapq
+
+import attr
+
+from twisted.internet import defer
+
+from ._base import Stream
+
+
+"""Handling of the 'events' replication stream
+
+This stream contains rows of various types. Each row therefore contains a 'type'
+identifier before the real data. For example::
+
+    RDATA events batch ["state", ["!room:id", "m.type", "", "$event:id"]]
+    RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]]
+
+An "ev" row is sent for each new event. The fields in the data part are:
+
+ * The new event id
+ * The room id for the event
+ * The type of the new event
+ * The state key of the event, for state events
+ * The event id of an event which is redacted by this event.
+
+A "state" row is sent whenever the "current state" in a room changes. The fields in the
+data part are:
+
+ * The room id for the state change
+ * The event type of the state which has changed
+ * The state_key of the state which has changed
+ * The event id of the new state
+
+"""
+
+
+@attr.s(slots=True, frozen=True)
+class EventsStreamRow(object):
+    """A parsed row from the events replication stream"""
+    type = attr.ib()  # str: the TypeId of one of the *EventsStreamRows
+    data = attr.ib()  # BaseEventsStreamRow
+
+
+class BaseEventsStreamRow(object):
+    """Base class for rows to be sent in the events stream.
+
+    Specifies how to identify, serialize and deserialize the different types.
+    """
+
+    TypeId = None  # Unique string that ids the type. Must be overriden in sub classes.
+
+    @classmethod
+    def from_data(cls, data):
+        """Parse the data from the replication stream into a row.
+
+        By default we just call the constructor with the data list as arguments
+
+        Args:
+            data: The value of the data object from the replication stream
+        """
+        return cls(*data)
+
+
+@attr.s(slots=True, frozen=True)
+class EventsStreamEventRow(BaseEventsStreamRow):
+    TypeId = "ev"
+
+    event_id = attr.ib()   # str
+    room_id = attr.ib()    # str
+    type = attr.ib()       # str
+    state_key = attr.ib()  # str, optional
+    redacts = attr.ib()    # str, optional
+
+
+@attr.s(slots=True, frozen=True)
+class EventsStreamCurrentStateRow(BaseEventsStreamRow):
+    TypeId = "state"
+
+    room_id = attr.ib()    # str
+    type = attr.ib()       # str
+    state_key = attr.ib()  # str
+    event_id = attr.ib()   # str, optional
+
+
+TypeToRow = {
+    Row.TypeId: Row
+    for Row in (
+        EventsStreamEventRow,
+        EventsStreamCurrentStateRow,
+    )
+}
+
+
+class EventsStream(Stream):
+    """We received a new event, or an event went from being an outlier to not
+    """
+    NAME = "events"
+
+    def __init__(self, hs):
+        self._store = hs.get_datastore()
+        self.current_token = self._store.get_current_events_token
+
+        super(EventsStream, self).__init__(hs)
+
+    @defer.inlineCallbacks
+    def update_function(self, from_token, current_token, limit=None):
+        event_rows = yield self._store.get_all_new_forward_event_rows(
+            from_token, current_token, limit,
+        )
+        event_updates = (
+            (row[0], EventsStreamEventRow.TypeId, row[1:])
+            for row in event_rows
+        )
+
+        state_rows = yield self._store.get_all_updated_current_state_deltas(
+            from_token, current_token, limit
+        )
+        state_updates = (
+            (row[0], EventsStreamCurrentStateRow.TypeId, row[1:])
+            for row in state_rows
+        )
+
+        all_updates = heapq.merge(event_updates, state_updates)
+
+        defer.returnValue(all_updates)
+
+    @classmethod
+    def parse_row(cls, row):
+        (typ, data) = row
+        data = TypeToRow[typ].from_data(data)
+        return EventsStreamRow(typ, data)
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
new file mode 100644
index 0000000000..9aa43aa8d2
--- /dev/null
+++ b/synapse/replication/tcp/streams/federation.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from collections import namedtuple
+
+from ._base import Stream
+
+FederationStreamRow = namedtuple("FederationStreamRow", (
+    "type",  # str, the type of data as defined in the BaseFederationRows
+    "data",  # dict, serialization of a federation.send_queue.BaseFederationRow
+))
+
+
+class FederationStream(Stream):
+    """Data to be sent over federation. Only available when master has federation
+    sending disabled.
+    """
+    NAME = "federation"
+    ROW_TYPE = FederationStreamRow
+
+    def __init__(self, hs):
+        federation_sender = hs.get_federation_sender()
+
+        self.current_token = federation_sender.get_current_token
+        self.update_function = federation_sender.get_replication_rows
+
+        super(FederationStream, self).__init__(hs)