summary refs log tree commit diff
path: root/synapse/replication/tcp/streams
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-03-27 10:06:21 +0000
committerRichard van der Hoff <richard@matrix.org>2019-03-27 21:13:14 +0000
commitaa1e0178641c5dfbc039cb547bcafe990222bf90 (patch)
tree1f41aa350ed20a8f89b2a0fbb741868c1853fd72 /synapse/replication/tcp/streams
parentMove replication.tcp.streams into a package (diff)
downloadsynapse-aa1e0178641c5dfbc039cb547bcafe990222bf90.tar.xz
move EventsStream out to its own file
Diffstat (limited to 'synapse/replication/tcp/streams')
-rw-r--r--synapse/replication/tcp/streams/__init__.py4
-rw-r--r--synapse/replication/tcp/streams/_base.py21
-rw-r--r--synapse/replication/tcp/streams/events.py40
3 files changed, 42 insertions, 23 deletions
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index 1d5227971e..edad37aef8 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -25,12 +25,12 @@ Each stream is defined by the following information:
     update_function:    The function that returns a list of updates between two tokens
 """
 
-from . import _base
+from . import _base, events
 
 STREAMS_MAP = {
     stream.NAME: stream
     for stream in (
-        _base.EventsStream,
+        events.EventsStream,
         _base.BackfillStream,
         _base.PresenceStream,
         _base.TypingStream,
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 344c8ab916..04e585f8f2 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -26,13 +26,6 @@ logger = logging.getLogger(__name__)
 
 MAX_EVENTS_BEHIND = 10000
 
-EventStreamRow = namedtuple("EventStreamRow", (
-    "event_id",  # str
-    "room_id",  # str
-    "type",  # str
-    "state_key",  # str, optional
-    "redacts",  # str, optional
-))
 BackfillStreamRow = namedtuple("BackfillStreamRow", (
     "event_id",  # str
     "room_id",  # str
@@ -227,20 +220,6 @@ class Stream(object):
         raise NotImplementedError()
 
 
-class EventsStream(Stream):
-    """We received a new event, or an event went from being an outlier to not
-    """
-    NAME = "events"
-    ROW_TYPE = EventStreamRow
-
-    def __init__(self, hs):
-        store = hs.get_datastore()
-        self.current_token = store.get_current_events_token
-        self.update_function = store.get_all_new_forward_event_rows
-
-        super(EventsStream, self).__init__(hs)
-
-
 class BackfillStream(Stream):
     """We fetched some old events and either we had never seen that event before
     or it went from being an outlier to not.
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
new file mode 100644
index 0000000000..511dd6bcc7
--- /dev/null
+++ b/synapse/replication/tcp/streams/events.py
@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from collections import namedtuple
+
+from ._base import Stream
+
+EventStreamRow = namedtuple("EventStreamRow", (
+    "event_id",  # str
+    "room_id",  # str
+    "type",  # str
+    "state_key",  # str, optional
+    "redacts",  # str, optional
+))
+
+
+class EventsStream(Stream):
+    """We received a new event, or an event went from being an outlier to not
+    """
+    NAME = "events"
+    ROW_TYPE = EventStreamRow
+
+    def __init__(self, hs):
+        store = hs.get_datastore()
+        self.current_token = store.get_current_events_token
+        self.update_function = store.get_all_new_forward_event_rows
+
+        super(EventsStream, self).__init__(hs)