diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index 1d5227971e..edad37aef8 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -25,12 +25,12 @@ Each stream is defined by the following information:
update_function: The function that returns a list of updates between two tokens
"""
-from . import _base
+from . import _base, events
STREAMS_MAP = {
stream.NAME: stream
for stream in (
- _base.EventsStream,
+ events.EventsStream,
_base.BackfillStream,
_base.PresenceStream,
_base.TypingStream,
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 344c8ab916..04e585f8f2 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -26,13 +26,6 @@ logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 10000
-EventStreamRow = namedtuple("EventStreamRow", (
- "event_id", # str
- "room_id", # str
- "type", # str
- "state_key", # str, optional
- "redacts", # str, optional
-))
BackfillStreamRow = namedtuple("BackfillStreamRow", (
"event_id", # str
"room_id", # str
@@ -227,20 +220,6 @@ class Stream(object):
raise NotImplementedError()
-class EventsStream(Stream):
- """We received a new event, or an event went from being an outlier to not
- """
- NAME = "events"
- ROW_TYPE = EventStreamRow
-
- def __init__(self, hs):
- store = hs.get_datastore()
- self.current_token = store.get_current_events_token
- self.update_function = store.get_all_new_forward_event_rows
-
- super(EventsStream, self).__init__(hs)
-
-
class BackfillStream(Stream):
"""We fetched some old events and either we had never seen that event before
or it went from being an outlier to not.
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
new file mode 100644
index 0000000000..511dd6bcc7
--- /dev/null
+++ b/synapse/replication/tcp/streams/events.py
@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from collections import namedtuple
+
+from ._base import Stream
+
+EventStreamRow = namedtuple("EventStreamRow", (
+ "event_id", # str
+ "room_id", # str
+ "type", # str
+ "state_key", # str, optional
+ "redacts", # str, optional
+))
+
+
+class EventsStream(Stream):
+ """We received a new event, or an event went from being an outlier to not
+ """
+ NAME = "events"
+ ROW_TYPE = EventStreamRow
+
+ def __init__(self, hs):
+ store = hs.get_datastore()
+ self.current_token = store.get_current_events_token
+ self.update_function = store.get_all_new_forward_event_rows
+
+ super(EventsStream, self).__init__(hs)
|