summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2019-03-28 13:43:25 +0000
committerGitHub <noreply@github.com>2019-03-28 13:43:25 +0000
commit4eeb2c2f0757cee4b258143dfa6aa6588d020175 (patch)
tree74804850760cf5625d893ec1e4d2218b1f23bbdd
parentRun `black` on some storage modules that the stats branch touches (#4959) (diff)
parentchangelog (diff)
downloadsynapse-4eeb2c2f0757cee4b258143dfa6aa6588d020175.tar.xz
Merge pull request #4953 from matrix-org/rav/refactor_replication_streams
Split up replication.tcp.streams into smaller files
-rw-r--r--changelog.d/4953.misc2
-rw-r--r--synapse/app/federation_sender.py2
-rw-r--r--synapse/replication/tcp/resource.py3
-rw-r--r--synapse/replication/tcp/streams/__init__.py50
-rw-r--r--synapse/replication/tcp/streams/_base.py (renamed from synapse/replication/tcp/streams.py)75
-rw-r--r--synapse/replication/tcp/streams/events.py40
-rw-r--r--synapse/replication/tcp/streams/federation.py39
-rw-r--r--tests/replication/tcp/streams/test_receipts.py2
8 files changed, 136 insertions, 77 deletions
diff --git a/changelog.d/4953.misc b/changelog.d/4953.misc
new file mode 100644
index 0000000000..06a084e6ef
--- /dev/null
+++ b/changelog.d/4953.misc
@@ -0,0 +1,2 @@
+Split synapse.replication.tcp.streams into smaller files.
+
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 9711a7147c..1d43f2b075 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -38,7 +38,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.replication.tcp.streams import ReceiptsStream
+from synapse.replication.tcp.streams._base import ReceiptsStream
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.types import ReadReceipt
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 7fc346c7b6..f6a38f5140 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -30,7 +30,8 @@ from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.metrics import Measure, measure_func
 
 from .protocol import ServerReplicationStreamProtocol
-from .streams import STREAMS_MAP, FederationStream
+from .streams import STREAMS_MAP
+from .streams.federation import FederationStream
 
 stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates",
                                  "", ["stream_name"])
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
new file mode 100644
index 0000000000..5c715e3bfa
--- /dev/null
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -0,0 +1,50 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Defines all the valid streams that clients can subscribe to, and the format
+of the rows returned by each stream.
+
+Each stream is defined by the following information:
+
+    stream name:        The name of the stream
+    row type:           The type that is used to serialise/deserialse the row
+    current_token:      The function that returns the current token for the stream
+    update_function:    The function that returns a list of updates between two tokens
+"""
+
+from . import _base, events, federation
+
+STREAMS_MAP = {
+    stream.NAME: stream
+    for stream in (
+        events.EventsStream,
+        _base.BackfillStream,
+        _base.PresenceStream,
+        _base.TypingStream,
+        _base.ReceiptsStream,
+        _base.PushRulesStream,
+        _base.PushersStream,
+        _base.CachesStream,
+        _base.PublicRoomsStream,
+        _base.DeviceListsStream,
+        _base.ToDeviceStream,
+        federation.FederationStream,
+        _base.TagAccountDataStream,
+        _base.AccountDataStream,
+        _base.CurrentStateDeltaStream,
+        _base.GroupServerStream,
+    )
+}
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams/_base.py
index 42b8a25bd3..18df89deed 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,16 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"""Defines all the valid streams that clients can subscribe to, and the format
-of the rows returned by each stream.
 
-Each stream is defined by the following information:
-
-    stream name:        The name of the stream
-    row type:           The type that is used to serialise/deserialse the row
-    current_token:      The function that returns the current token for the stream
-    update_function:    The function that returns a list of updates between two tokens
-"""
 import itertools
 import logging
 from collections import namedtuple
@@ -34,14 +26,6 @@ logger = logging.getLogger(__name__)
 
 MAX_EVENTS_BEHIND = 10000
 
-
-EventStreamRow = namedtuple("EventStreamRow", (
-    "event_id",  # str
-    "room_id",  # str
-    "type",  # str
-    "state_key",  # str, optional
-    "redacts",  # str, optional
-))
 BackfillStreamRow = namedtuple("BackfillStreamRow", (
     "event_id",  # str
     "room_id",  # str
@@ -96,10 +80,6 @@ DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
 ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
     "entity",  # str
 ))
-FederationStreamRow = namedtuple("FederationStreamRow", (
-    "type",  # str, the type of data as defined in the BaseFederationRows
-    "data",  # dict, serialization of a federation.send_queue.BaseFederationRow
-))
 TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
     "user_id",  # str
     "room_id",  # str
@@ -236,20 +216,6 @@ class Stream(object):
         raise NotImplementedError()
 
 
-class EventsStream(Stream):
-    """We received a new event, or an event went from being an outlier to not
-    """
-    NAME = "events"
-    ROW_TYPE = EventStreamRow
-
-    def __init__(self, hs):
-        store = hs.get_datastore()
-        self.current_token = store.get_current_events_token
-        self.update_function = store.get_all_new_forward_event_rows
-
-        super(EventsStream, self).__init__(hs)
-
-
 class BackfillStream(Stream):
     """We fetched some old events and either we had never seen that event before
     or it went from being an outlier to not.
@@ -404,22 +370,6 @@ class ToDeviceStream(Stream):
         super(ToDeviceStream, self).__init__(hs)
 
 
-class FederationStream(Stream):
-    """Data to be sent over federation. Only available when master has federation
-    sending disabled.
-    """
-    NAME = "federation"
-    ROW_TYPE = FederationStreamRow
-
-    def __init__(self, hs):
-        federation_sender = hs.get_federation_sender()
-
-        self.current_token = federation_sender.get_current_token
-        self.update_function = federation_sender.get_replication_rows
-
-        super(FederationStream, self).__init__(hs)
-
-
 class TagAccountDataStream(Stream):
     """Someone added/removed a tag for a room
     """
@@ -489,26 +439,3 @@ class GroupServerStream(Stream):
         self.update_function = store.get_all_groups_changes
 
         super(GroupServerStream, self).__init__(hs)
-
-
-STREAMS_MAP = {
-    stream.NAME: stream
-    for stream in (
-        EventsStream,
-        BackfillStream,
-        PresenceStream,
-        TypingStream,
-        ReceiptsStream,
-        PushRulesStream,
-        PushersStream,
-        CachesStream,
-        PublicRoomsStream,
-        DeviceListsStream,
-        ToDeviceStream,
-        FederationStream,
-        TagAccountDataStream,
-        AccountDataStream,
-        CurrentStateDeltaStream,
-        GroupServerStream,
-    )
-}
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
new file mode 100644
index 0000000000..511dd6bcc7
--- /dev/null
+++ b/synapse/replication/tcp/streams/events.py
@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from collections import namedtuple
+
+from ._base import Stream
+
+EventStreamRow = namedtuple("EventStreamRow", (
+    "event_id",  # str
+    "room_id",  # str
+    "type",  # str
+    "state_key",  # str, optional
+    "redacts",  # str, optional
+))
+
+
+class EventsStream(Stream):
+    """We received a new event, or an event went from being an outlier to not
+    """
+    NAME = "events"
+    ROW_TYPE = EventStreamRow
+
+    def __init__(self, hs):
+        store = hs.get_datastore()
+        self.current_token = store.get_current_events_token
+        self.update_function = store.get_all_new_forward_event_rows
+
+        super(EventsStream, self).__init__(hs)
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
new file mode 100644
index 0000000000..9aa43aa8d2
--- /dev/null
+++ b/synapse/replication/tcp/streams/federation.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from collections import namedtuple
+
+from ._base import Stream
+
+FederationStreamRow = namedtuple("FederationStreamRow", (
+    "type",  # str, the type of data as defined in the BaseFederationRows
+    "data",  # dict, serialization of a federation.send_queue.BaseFederationRow
+))
+
+
+class FederationStream(Stream):
+    """Data to be sent over federation. Only available when master has federation
+    sending disabled.
+    """
+    NAME = "federation"
+    ROW_TYPE = FederationStreamRow
+
+    def __init__(self, hs):
+        federation_sender = hs.get_federation_sender()
+
+        self.current_token = federation_sender.get_current_token
+        self.update_function = federation_sender.get_replication_rows
+
+        super(FederationStream, self).__init__(hs)
diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py
index 9aa9dfe82e..d5a99f6caa 100644
--- a/tests/replication/tcp/streams/test_receipts.py
+++ b/tests/replication/tcp/streams/test_receipts.py
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from synapse.replication.tcp.streams import ReceiptsStreamRow
+from synapse.replication.tcp.streams._base import ReceiptsStreamRow
 
 from tests.replication.tcp.streams._base import BaseStreamTestCase