diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 9711a7147c..1d43f2b075 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -38,7 +38,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.replication.tcp.streams import ReceiptsStream
+from synapse.replication.tcp.streams._base import ReceiptsStream
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.types import ReadReceipt
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
new file mode 100644
index 0000000000..1d5227971e
--- /dev/null
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -0,0 +1,50 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Defines all the valid streams that clients can subscribe to, and the format
+of the rows returned by each stream.
+
+Each stream is defined by the following information:
+
+ stream name: The name of the stream
+ row type: The type that is used to serialise/deserialse the row
+ current_token: The function that returns the current token for the stream
+ update_function: The function that returns a list of updates between two tokens
+"""
+
+from . import _base
+
+STREAMS_MAP = {
+ stream.NAME: stream
+ for stream in (
+ _base.EventsStream,
+ _base.BackfillStream,
+ _base.PresenceStream,
+ _base.TypingStream,
+ _base.ReceiptsStream,
+ _base.PushRulesStream,
+ _base.PushersStream,
+ _base.CachesStream,
+ _base.PublicRoomsStream,
+ _base.DeviceListsStream,
+ _base.ToDeviceStream,
+ _base.FederationStream,
+ _base.TagAccountDataStream,
+ _base.AccountDataStream,
+ _base.CurrentStateDeltaStream,
+ _base.GroupServerStream,
+ )
+}
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams/_base.py
index 42b8a25bd3..344c8ab916 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,16 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-"""Defines all the valid streams that clients can subscribe to, and the format
-of the rows returned by each stream.
-Each stream is defined by the following information:
-
- stream name: The name of the stream
- row type: The type that is used to serialise/deserialse the row
- current_token: The function that returns the current token for the stream
- update_function: The function that returns a list of updates between two tokens
-"""
import itertools
import logging
from collections import namedtuple
@@ -34,7 +26,6 @@ logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 10000
-
EventStreamRow = namedtuple("EventStreamRow", (
"event_id", # str
"room_id", # str
@@ -489,26 +480,3 @@ class GroupServerStream(Stream):
self.update_function = store.get_all_groups_changes
super(GroupServerStream, self).__init__(hs)
-
-
-STREAMS_MAP = {
- stream.NAME: stream
- for stream in (
- EventsStream,
- BackfillStream,
- PresenceStream,
- TypingStream,
- ReceiptsStream,
- PushRulesStream,
- PushersStream,
- CachesStream,
- PublicRoomsStream,
- DeviceListsStream,
- ToDeviceStream,
- FederationStream,
- TagAccountDataStream,
- AccountDataStream,
- CurrentStateDeltaStream,
- GroupServerStream,
- )
-}
diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py
index 9aa9dfe82e..d5a99f6caa 100644
--- a/tests/replication/tcp/streams/test_receipts.py
+++ b/tests/replication/tcp/streams/test_receipts.py
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.replication.tcp.streams import ReceiptsStreamRow
+from synapse.replication.tcp.streams._base import ReceiptsStreamRow
from tests.replication.tcp.streams._base import BaseStreamTestCase
|