diff --git a/changelog.d/4953.misc b/changelog.d/4953.misc
new file mode 100644
index 0000000000..06a084e6ef
--- /dev/null
+++ b/changelog.d/4953.misc
@@ -0,0 +1,2 @@
+Split synapse.replication.tcp.streams into smaller files.
+
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 9711a7147c..1d43f2b075 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -38,7 +38,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.replication.tcp.streams import ReceiptsStream
+from synapse.replication.tcp.streams._base import ReceiptsStream
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.types import ReadReceipt
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 7fc346c7b6..f6a38f5140 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -30,7 +30,8 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.metrics import Measure, measure_func
from .protocol import ServerReplicationStreamProtocol
-from .streams import STREAMS_MAP, FederationStream
+from .streams import STREAMS_MAP
+from .streams.federation import FederationStream
stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates",
"", ["stream_name"])
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
new file mode 100644
index 0000000000..5c715e3bfa
--- /dev/null
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -0,0 +1,50 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Defines all the valid streams that clients can subscribe to, and the format
+of the rows returned by each stream.
+
+Each stream is defined by the following information:
+
+ stream name: The name of the stream
+ row type: The type that is used to serialise/deserialse the row
+ current_token: The function that returns the current token for the stream
+ update_function: The function that returns a list of updates between two tokens
+"""
+
+from . import _base, events, federation
+
+STREAMS_MAP = {
+ stream.NAME: stream
+ for stream in (
+ events.EventsStream,
+ _base.BackfillStream,
+ _base.PresenceStream,
+ _base.TypingStream,
+ _base.ReceiptsStream,
+ _base.PushRulesStream,
+ _base.PushersStream,
+ _base.CachesStream,
+ _base.PublicRoomsStream,
+ _base.DeviceListsStream,
+ _base.ToDeviceStream,
+ federation.FederationStream,
+ _base.TagAccountDataStream,
+ _base.AccountDataStream,
+ _base.CurrentStateDeltaStream,
+ _base.GroupServerStream,
+ )
+}
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams/_base.py
index 42b8a25bd3..18df89deed 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,16 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-"""Defines all the valid streams that clients can subscribe to, and the format
-of the rows returned by each stream.
-Each stream is defined by the following information:
-
- stream name: The name of the stream
- row type: The type that is used to serialise/deserialse the row
- current_token: The function that returns the current token for the stream
- update_function: The function that returns a list of updates between two tokens
-"""
import itertools
import logging
from collections import namedtuple
@@ -34,14 +26,6 @@ logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 10000
-
-EventStreamRow = namedtuple("EventStreamRow", (
- "event_id", # str
- "room_id", # str
- "type", # str
- "state_key", # str, optional
- "redacts", # str, optional
-))
BackfillStreamRow = namedtuple("BackfillStreamRow", (
"event_id", # str
"room_id", # str
@@ -96,10 +80,6 @@ DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
"entity", # str
))
-FederationStreamRow = namedtuple("FederationStreamRow", (
- "type", # str, the type of data as defined in the BaseFederationRows
- "data", # dict, serialization of a federation.send_queue.BaseFederationRow
-))
TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
"user_id", # str
"room_id", # str
@@ -236,20 +216,6 @@ class Stream(object):
raise NotImplementedError()
-class EventsStream(Stream):
- """We received a new event, or an event went from being an outlier to not
- """
- NAME = "events"
- ROW_TYPE = EventStreamRow
-
- def __init__(self, hs):
- store = hs.get_datastore()
- self.current_token = store.get_current_events_token
- self.update_function = store.get_all_new_forward_event_rows
-
- super(EventsStream, self).__init__(hs)
-
-
class BackfillStream(Stream):
"""We fetched some old events and either we had never seen that event before
or it went from being an outlier to not.
@@ -404,22 +370,6 @@ class ToDeviceStream(Stream):
super(ToDeviceStream, self).__init__(hs)
-class FederationStream(Stream):
- """Data to be sent over federation. Only available when master has federation
- sending disabled.
- """
- NAME = "federation"
- ROW_TYPE = FederationStreamRow
-
- def __init__(self, hs):
- federation_sender = hs.get_federation_sender()
-
- self.current_token = federation_sender.get_current_token
- self.update_function = federation_sender.get_replication_rows
-
- super(FederationStream, self).__init__(hs)
-
-
class TagAccountDataStream(Stream):
"""Someone added/removed a tag for a room
"""
@@ -489,26 +439,3 @@ class GroupServerStream(Stream):
self.update_function = store.get_all_groups_changes
super(GroupServerStream, self).__init__(hs)
-
-
-STREAMS_MAP = {
- stream.NAME: stream
- for stream in (
- EventsStream,
- BackfillStream,
- PresenceStream,
- TypingStream,
- ReceiptsStream,
- PushRulesStream,
- PushersStream,
- CachesStream,
- PublicRoomsStream,
- DeviceListsStream,
- ToDeviceStream,
- FederationStream,
- TagAccountDataStream,
- AccountDataStream,
- CurrentStateDeltaStream,
- GroupServerStream,
- )
-}
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
new file mode 100644
index 0000000000..511dd6bcc7
--- /dev/null
+++ b/synapse/replication/tcp/streams/events.py
@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from collections import namedtuple
+
+from ._base import Stream
+
+EventStreamRow = namedtuple("EventStreamRow", (
+ "event_id", # str
+ "room_id", # str
+ "type", # str
+ "state_key", # str, optional
+ "redacts", # str, optional
+))
+
+
+class EventsStream(Stream):
+ """We received a new event, or an event went from being an outlier to not
+ """
+ NAME = "events"
+ ROW_TYPE = EventStreamRow
+
+ def __init__(self, hs):
+ store = hs.get_datastore()
+ self.current_token = store.get_current_events_token
+ self.update_function = store.get_all_new_forward_event_rows
+
+ super(EventsStream, self).__init__(hs)
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
new file mode 100644
index 0000000000..9aa43aa8d2
--- /dev/null
+++ b/synapse/replication/tcp/streams/federation.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from collections import namedtuple
+
+from ._base import Stream
+
+FederationStreamRow = namedtuple("FederationStreamRow", (
+ "type", # str, the type of data as defined in the BaseFederationRows
+ "data", # dict, serialization of a federation.send_queue.BaseFederationRow
+))
+
+
+class FederationStream(Stream):
+ """Data to be sent over federation. Only available when master has federation
+ sending disabled.
+ """
+ NAME = "federation"
+ ROW_TYPE = FederationStreamRow
+
+ def __init__(self, hs):
+ federation_sender = hs.get_federation_sender()
+
+ self.current_token = federation_sender.get_current_token
+ self.update_function = federation_sender.get_replication_rows
+
+ super(FederationStream, self).__init__(hs)
diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py
index 9aa9dfe82e..d5a99f6caa 100644
--- a/tests/replication/tcp/streams/test_receipts.py
+++ b/tests/replication/tcp/streams/test_receipts.py
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.replication.tcp.streams import ReceiptsStreamRow
+from synapse.replication.tcp.streams._base import ReceiptsStreamRow
from tests.replication.tcp.streams._base import BaseStreamTestCase
|