diff --git a/synapse/streams/config.py b/synapse/streams/config.py
index 451e4fa441..f7f5906a99 100644
--- a/synapse/streams/config.py
+++ b/synapse/streams/config.py
@@ -30,34 +30,34 @@ class SourcePaginationConfig(object):
"""A configuration object which stores pagination parameters for a
specific event source."""
- def __init__(self, from_key=None, to_key=None, direction='f',
- limit=None):
+ def __init__(self, from_key=None, to_key=None, direction="f", limit=None):
self.from_key = from_key
self.to_key = to_key
- self.direction = 'f' if direction == 'f' else 'b'
+ self.direction = "f" if direction == "f" else "b"
self.limit = min(int(limit), MAX_LIMIT) if limit is not None else None
def __repr__(self):
- return (
- "StreamConfig(from_key=%r, to_key=%r, direction=%r, limit=%r)"
- ) % (self.from_key, self.to_key, self.direction, self.limit)
+ return ("StreamConfig(from_key=%r, to_key=%r, direction=%r, limit=%r)") % (
+ self.from_key,
+ self.to_key,
+ self.direction,
+ self.limit,
+ )
class PaginationConfig(object):
"""A configuration object which stores pagination parameters."""
- def __init__(self, from_token=None, to_token=None, direction='f',
- limit=None):
+ def __init__(self, from_token=None, to_token=None, direction="f", limit=None):
self.from_token = from_token
self.to_token = to_token
- self.direction = 'f' if direction == 'f' else 'b'
+ self.direction = "f" if direction == "f" else "b"
self.limit = min(int(limit), MAX_LIMIT) if limit is not None else None
@classmethod
- def from_request(cls, request, raise_invalid_params=True,
- default_limit=None):
- direction = parse_string(request, "dir", default='f', allowed_values=['f', 'b'])
+ def from_request(cls, request, raise_invalid_params=True, default_limit=None):
+ direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"])
from_tok = parse_string(request, "from")
to_tok = parse_string(request, "to")
@@ -89,8 +89,7 @@ class PaginationConfig(object):
def __repr__(self):
return (
- "PaginationConfig(from_tok=%r, to_tok=%r,"
- " direction=%r, limit=%r)"
+ "PaginationConfig(from_tok=%r, to_tok=%r," " direction=%r, limit=%r)"
) % (self.from_token, self.to_token, self.direction, self.limit)
def get_source_config(self, source_name):
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index e5220132a3..488c49747a 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -34,8 +34,7 @@ class EventSources(object):
def __init__(self, hs):
self.sources = {
- name: cls(hs)
- for name, cls in EventSources.SOURCE_TYPES.items()
+ name: cls(hs) for name, cls in EventSources.SOURCE_TYPES.items()
}
self.store = hs.get_datastore()
@@ -47,21 +46,11 @@ class EventSources(object):
groups_key = self.store.get_group_stream_token()
token = StreamToken(
- room_key=(
- yield self.sources["room"].get_current_key()
- ),
- presence_key=(
- yield self.sources["presence"].get_current_key()
- ),
- typing_key=(
- yield self.sources["typing"].get_current_key()
- ),
- receipt_key=(
- yield self.sources["receipt"].get_current_key()
- ),
- account_data_key=(
- yield self.sources["account_data"].get_current_key()
- ),
+ room_key=(yield self.sources["room"].get_current_key()),
+ presence_key=(yield self.sources["presence"].get_current_key()),
+ typing_key=(yield self.sources["typing"].get_current_key()),
+ receipt_key=(yield self.sources["receipt"].get_current_key()),
+ account_data_key=(yield self.sources["account_data"].get_current_key()),
push_rules_key=push_rules_key,
to_device_key=to_device_key,
device_list_key=device_list_key,
@@ -70,31 +59,25 @@ class EventSources(object):
defer.returnValue(token)
@defer.inlineCallbacks
- def get_current_token_for_room(self, room_id):
- push_rules_key, _ = self.store.get_push_rules_stream_token()
- to_device_key = self.store.get_to_device_stream_token()
- device_list_key = self.store.get_device_stream_token()
- groups_key = self.store.get_group_stream_token()
+ def get_current_token_for_pagination(self):
+ """Get the current token for a given room to be used to paginate
+ events.
+
+ The returned token does not have the current values for fields other
+ than `room`, since they are not used during pagination.
+ Retuns:
+ Deferred[StreamToken]
+ """
token = StreamToken(
- room_key=(
- yield self.sources["room"].get_current_key_for_room(room_id)
- ),
- presence_key=(
- yield self.sources["presence"].get_current_key()
- ),
- typing_key=(
- yield self.sources["typing"].get_current_key()
- ),
- receipt_key=(
- yield self.sources["receipt"].get_current_key()
- ),
- account_data_key=(
- yield self.sources["account_data"].get_current_key()
- ),
- push_rules_key=push_rules_key,
- to_device_key=to_device_key,
- device_list_key=device_list_key,
- groups_key=groups_key,
+ room_key=(yield self.sources["room"].get_current_key()),
+ presence_key=0,
+ typing_key=0,
+ receipt_key=0,
+ account_data_key=0,
+ push_rules_key=0,
+ to_device_key=0,
+ device_list_key=0,
+ groups_key=0,
)
defer.returnValue(token)
|