summary refs log tree commit diff
path: root/synapse/streams
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/streams')
-rw-r--r--synapse/streams/config.py27
-rw-r--r--synapse/streams/events.py65
2 files changed, 37 insertions, 55 deletions
diff --git a/synapse/streams/config.py b/synapse/streams/config.py
index 451e4fa441..f7f5906a99 100644
--- a/synapse/streams/config.py
+++ b/synapse/streams/config.py
@@ -30,34 +30,34 @@ class SourcePaginationConfig(object):
     """A configuration object which stores pagination parameters for a
     specific event source."""
 
-    def __init__(self, from_key=None, to_key=None, direction='f',
-                 limit=None):
+    def __init__(self, from_key=None, to_key=None, direction="f", limit=None):
         self.from_key = from_key
         self.to_key = to_key
-        self.direction = 'f' if direction == 'f' else 'b'
+        self.direction = "f" if direction == "f" else "b"
         self.limit = min(int(limit), MAX_LIMIT) if limit is not None else None
 
     def __repr__(self):
-        return (
-            "StreamConfig(from_key=%r, to_key=%r, direction=%r, limit=%r)"
-        ) % (self.from_key, self.to_key, self.direction, self.limit)
+        return ("StreamConfig(from_key=%r, to_key=%r, direction=%r, limit=%r)") % (
+            self.from_key,
+            self.to_key,
+            self.direction,
+            self.limit,
+        )
 
 
 class PaginationConfig(object):
 
     """A configuration object which stores pagination parameters."""
 
-    def __init__(self, from_token=None, to_token=None, direction='f',
-                 limit=None):
+    def __init__(self, from_token=None, to_token=None, direction="f", limit=None):
         self.from_token = from_token
         self.to_token = to_token
-        self.direction = 'f' if direction == 'f' else 'b'
+        self.direction = "f" if direction == "f" else "b"
         self.limit = min(int(limit), MAX_LIMIT) if limit is not None else None
 
     @classmethod
-    def from_request(cls, request, raise_invalid_params=True,
-                     default_limit=None):
-        direction = parse_string(request, "dir", default='f', allowed_values=['f', 'b'])
+    def from_request(cls, request, raise_invalid_params=True, default_limit=None):
+        direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"])
 
         from_tok = parse_string(request, "from")
         to_tok = parse_string(request, "to")
@@ -89,8 +89,7 @@ class PaginationConfig(object):
 
     def __repr__(self):
         return (
-            "PaginationConfig(from_tok=%r, to_tok=%r,"
-            " direction=%r, limit=%r)"
+            "PaginationConfig(from_tok=%r, to_tok=%r," " direction=%r, limit=%r)"
         ) % (self.from_token, self.to_token, self.direction, self.limit)
 
     def get_source_config(self, source_name):
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index e5220132a3..488c49747a 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -34,8 +34,7 @@ class EventSources(object):
 
     def __init__(self, hs):
         self.sources = {
-            name: cls(hs)
-            for name, cls in EventSources.SOURCE_TYPES.items()
+            name: cls(hs) for name, cls in EventSources.SOURCE_TYPES.items()
         }
         self.store = hs.get_datastore()
 
@@ -47,21 +46,11 @@ class EventSources(object):
         groups_key = self.store.get_group_stream_token()
 
         token = StreamToken(
-            room_key=(
-                yield self.sources["room"].get_current_key()
-            ),
-            presence_key=(
-                yield self.sources["presence"].get_current_key()
-            ),
-            typing_key=(
-                yield self.sources["typing"].get_current_key()
-            ),
-            receipt_key=(
-                yield self.sources["receipt"].get_current_key()
-            ),
-            account_data_key=(
-                yield self.sources["account_data"].get_current_key()
-            ),
+            room_key=(yield self.sources["room"].get_current_key()),
+            presence_key=(yield self.sources["presence"].get_current_key()),
+            typing_key=(yield self.sources["typing"].get_current_key()),
+            receipt_key=(yield self.sources["receipt"].get_current_key()),
+            account_data_key=(yield self.sources["account_data"].get_current_key()),
             push_rules_key=push_rules_key,
             to_device_key=to_device_key,
             device_list_key=device_list_key,
@@ -70,31 +59,25 @@ class EventSources(object):
         defer.returnValue(token)
 
     @defer.inlineCallbacks
-    def get_current_token_for_room(self, room_id):
-        push_rules_key, _ = self.store.get_push_rules_stream_token()
-        to_device_key = self.store.get_to_device_stream_token()
-        device_list_key = self.store.get_device_stream_token()
-        groups_key = self.store.get_group_stream_token()
+    def get_current_token_for_pagination(self):
+        """Get the current token for a given room to be used to paginate
+        events.
+
+        The returned token does not have the current values for fields other
+        than `room`, since they are not used during pagination.
 
+        Retuns:
+            Deferred[StreamToken]
+        """
         token = StreamToken(
-            room_key=(
-                yield self.sources["room"].get_current_key_for_room(room_id)
-            ),
-            presence_key=(
-                yield self.sources["presence"].get_current_key()
-            ),
-            typing_key=(
-                yield self.sources["typing"].get_current_key()
-            ),
-            receipt_key=(
-                yield self.sources["receipt"].get_current_key()
-            ),
-            account_data_key=(
-                yield self.sources["account_data"].get_current_key()
-            ),
-            push_rules_key=push_rules_key,
-            to_device_key=to_device_key,
-            device_list_key=device_list_key,
-            groups_key=groups_key,
+            room_key=(yield self.sources["room"].get_current_key()),
+            presence_key=0,
+            typing_key=0,
+            receipt_key=0,
+            account_data_key=0,
+            push_rules_key=0,
+            to_device_key=0,
+            device_list_key=0,
+            groups_key=0,
         )
         defer.returnValue(token)