summary refs log tree commit diff
path: root/synapse/streams
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2020-08-04 07:21:47 -0400
committerGitHub <noreply@github.com>2020-08-04 07:21:47 -0400
commite19de43eb5903c3b6ccca82334971ebc57fc38de (patch)
tree5925175ac5a8c43303bc5e262fc9bd4f2d6e76f3 /synapse/streams
parentre-implement daemonize (#8011) (diff)
downloadsynapse-e19de43eb5903c3b6ccca82334971ebc57fc38de.tar.xz
Convert streams to async. (#8014)
Diffstat (limited to 'synapse/streams')
-rw-r--r--synapse/streams/events.py22
1 files changed, 9 insertions, 13 deletions
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 5d3eddcfdc..393e34b9fb 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -15,8 +15,6 @@
 
 from typing import Any, Dict
 
-from twisted.internet import defer
-
 from synapse.handlers.account_data import AccountDataEventSource
 from synapse.handlers.presence import PresenceEventSource
 from synapse.handlers.receipts import ReceiptEventSource
@@ -40,19 +38,18 @@ class EventSources(object):
         }  # type: Dict[str, Any]
         self.store = hs.get_datastore()
 
-    @defer.inlineCallbacks
-    def get_current_token(self):
+    def get_current_token(self) -> StreamToken:
         push_rules_key, _ = self.store.get_push_rules_stream_token()
         to_device_key = self.store.get_to_device_stream_token()
         device_list_key = self.store.get_device_stream_token()
         groups_key = self.store.get_group_stream_token()
 
         token = StreamToken(
-            room_key=(yield self.sources["room"].get_current_key()),
-            presence_key=(yield self.sources["presence"].get_current_key()),
-            typing_key=(yield self.sources["typing"].get_current_key()),
-            receipt_key=(yield self.sources["receipt"].get_current_key()),
-            account_data_key=(yield self.sources["account_data"].get_current_key()),
+            room_key=self.sources["room"].get_current_key(),
+            presence_key=self.sources["presence"].get_current_key(),
+            typing_key=self.sources["typing"].get_current_key(),
+            receipt_key=self.sources["receipt"].get_current_key(),
+            account_data_key=self.sources["account_data"].get_current_key(),
             push_rules_key=push_rules_key,
             to_device_key=to_device_key,
             device_list_key=device_list_key,
@@ -60,8 +57,7 @@ class EventSources(object):
         )
         return token
 
-    @defer.inlineCallbacks
-    def get_current_token_for_pagination(self):
+    def get_current_token_for_pagination(self) -> StreamToken:
         """Get the current token for a given room to be used to paginate
         events.
 
@@ -69,10 +65,10 @@ class EventSources(object):
         than `room`, since they are not used during pagination.
 
         Returns:
-            Deferred[StreamToken]
+            The current token for pagination.
         """
         token = StreamToken(
-            room_key=(yield self.sources["room"].get_current_key()),
+            room_key=self.sources["room"].get_current_key(),
             presence_key=0,
             typing_key=0,
             receipt_key=0,