summary refs log tree commit diff
path: root/synapse/streams/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/streams/events.py')
-rw-r--r--synapse/streams/events.py28
1 files changed, 12 insertions, 16 deletions
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index fcd2aaa9c9..92fd5d489f 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -15,8 +15,6 @@
 
 from typing import Any, Dict
 
-from twisted.internet import defer
-
 from synapse.handlers.account_data import AccountDataEventSource
 from synapse.handlers.presence import PresenceEventSource
 from synapse.handlers.receipts import ReceiptEventSource
@@ -25,7 +23,7 @@ from synapse.handlers.typing import TypingNotificationEventSource
 from synapse.types import StreamToken
 
 
-class EventSources(object):
+class EventSources:
     SOURCE_TYPES = {
         "room": RoomEventSource,
         "presence": PresenceEventSource,
@@ -40,19 +38,18 @@ class EventSources(object):
         }  # type: Dict[str, Any]
         self.store = hs.get_datastore()
 
-    @defer.inlineCallbacks
-    def get_current_token(self):
-        push_rules_key, _ = self.store.get_push_rules_stream_token()
+    def get_current_token(self) -> StreamToken:
+        push_rules_key = self.store.get_max_push_rules_stream_id()
         to_device_key = self.store.get_to_device_stream_token()
         device_list_key = self.store.get_device_stream_token()
         groups_key = self.store.get_group_stream_token()
 
         token = StreamToken(
-            room_key=(yield self.sources["room"].get_current_key()),
-            presence_key=(yield self.sources["presence"].get_current_key()),
-            typing_key=(yield self.sources["typing"].get_current_key()),
-            receipt_key=(yield self.sources["receipt"].get_current_key()),
-            account_data_key=(yield self.sources["account_data"].get_current_key()),
+            room_key=self.sources["room"].get_current_key(),
+            presence_key=self.sources["presence"].get_current_key(),
+            typing_key=self.sources["typing"].get_current_key(),
+            receipt_key=self.sources["receipt"].get_current_key(),
+            account_data_key=self.sources["account_data"].get_current_key(),
             push_rules_key=push_rules_key,
             to_device_key=to_device_key,
             device_list_key=device_list_key,
@@ -60,19 +57,18 @@ class EventSources(object):
         )
         return token
 
-    @defer.inlineCallbacks
-    def get_current_token_for_pagination(self):
+    def get_current_token_for_pagination(self) -> StreamToken:
         """Get the current token for a given room to be used to paginate
         events.
 
         The returned token does not have the current values for fields other
         than `room`, since they are not used during pagination.
 
-        Retuns:
-            Deferred[StreamToken]
+        Returns:
+            The current token for pagination.
         """
         token = StreamToken(
-            room_key=(yield self.sources["room"].get_current_key()),
+            room_key=self.sources["room"].get_current_key(),
             presence_key=0,
             typing_key=0,
             receipt_key=0,