diff options
author | Patrick Cloke <clokep@users.noreply.github.com> | 2020-08-04 07:21:47 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-04 07:21:47 -0400 |
commit | e19de43eb5903c3b6ccca82334971ebc57fc38de (patch) | |
tree | 5925175ac5a8c43303bc5e262fc9bd4f2d6e76f3 /synapse/streams | |
parent | re-implement daemonize (#8011) (diff) | |
download | synapse-e19de43eb5903c3b6ccca82334971ebc57fc38de.tar.xz |
Convert streams to async. (#8014)
Diffstat (limited to 'synapse/streams')
-rw-r--r-- | synapse/streams/events.py | 22 |
1 files changed, 9 insertions, 13 deletions
diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 5d3eddcfdc..393e34b9fb 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -15,8 +15,6 @@ from typing import Any, Dict -from twisted.internet import defer - from synapse.handlers.account_data import AccountDataEventSource from synapse.handlers.presence import PresenceEventSource from synapse.handlers.receipts import ReceiptEventSource @@ -40,19 +38,18 @@ class EventSources(object): } # type: Dict[str, Any] self.store = hs.get_datastore() - @defer.inlineCallbacks - def get_current_token(self): + def get_current_token(self) -> StreamToken: push_rules_key, _ = self.store.get_push_rules_stream_token() to_device_key = self.store.get_to_device_stream_token() device_list_key = self.store.get_device_stream_token() groups_key = self.store.get_group_stream_token() token = StreamToken( - room_key=(yield self.sources["room"].get_current_key()), - presence_key=(yield self.sources["presence"].get_current_key()), - typing_key=(yield self.sources["typing"].get_current_key()), - receipt_key=(yield self.sources["receipt"].get_current_key()), - account_data_key=(yield self.sources["account_data"].get_current_key()), + room_key=self.sources["room"].get_current_key(), + presence_key=self.sources["presence"].get_current_key(), + typing_key=self.sources["typing"].get_current_key(), + receipt_key=self.sources["receipt"].get_current_key(), + account_data_key=self.sources["account_data"].get_current_key(), push_rules_key=push_rules_key, to_device_key=to_device_key, device_list_key=device_list_key, @@ -60,8 +57,7 @@ class EventSources(object): ) return token - @defer.inlineCallbacks - def get_current_token_for_pagination(self): + def get_current_token_for_pagination(self) -> StreamToken: """Get the current token for a given room to be used to paginate events. @@ -69,10 +65,10 @@ class EventSources(object): than `room`, since they are not used during pagination. Returns: - Deferred[StreamToken] + The current token for pagination. """ token = StreamToken( - room_key=(yield self.sources["room"].get_current_key()), + room_key=self.sources["room"].get_current_key(), presence_key=0, typing_key=0, receipt_key=0, |