summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2020-08-04 07:21:47 -0400
committerGitHub <noreply@github.com>2020-08-04 07:21:47 -0400
commite19de43eb5903c3b6ccca82334971ebc57fc38de (patch)
tree5925175ac5a8c43303bc5e262fc9bd4f2d6e76f3 /synapse
parentre-implement daemonize (#8011) (diff)
downloadsynapse-e19de43eb5903c3b6ccca82334971ebc57fc38de.tar.xz
Convert streams to async. (#8014)
Diffstat (limited to '')
-rw-r--r--synapse/handlers/initial_sync.py4
-rw-r--r--synapse/handlers/pagination.py2
-rw-r--r--synapse/handlers/room.py10
-rw-r--r--synapse/handlers/search.py2
-rw-r--r--synapse/handlers/sync.py2
-rw-r--r--synapse/notifier.py4
-rw-r--r--synapse/storage/data_stores/main/stream.py8
-rw-r--r--synapse/streams/events.py22
8 files changed, 25 insertions, 29 deletions
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index f88bad5f25..ae6bd1d352 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -109,7 +109,7 @@ class InitialSyncHandler(BaseHandler):
 
         rooms_ret = []
 
-        now_token = await self.hs.get_event_sources().get_current_token()
+        now_token = self.hs.get_event_sources().get_current_token()
 
         presence_stream = self.hs.get_event_sources().sources["presence"]
         pagination_config = PaginationConfig(from_token=now_token)
@@ -360,7 +360,7 @@ class InitialSyncHandler(BaseHandler):
             current_state.values(), time_now
         )
 
-        now_token = await self.hs.get_event_sources().get_current_token()
+        now_token = self.hs.get_event_sources().get_current_token()
 
         limit = pagin_config.limit if pagin_config else None
         if limit is None:
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index da06582d4b..487420bb5d 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -309,7 +309,7 @@ class PaginationHandler(object):
             room_token = pagin_config.from_token.room_key
         else:
             pagin_config.from_token = (
-                await self.hs.get_event_sources().get_current_token_for_pagination()
+                self.hs.get_event_sources().get_current_token_for_pagination()
             )
             room_token = pagin_config.from_token.room_key
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 0c5b99234d..a8545255b1 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -22,7 +22,7 @@ import logging
 import math
 import string
 from collections import OrderedDict
-from typing import Optional, Tuple
+from typing import Awaitable, Optional, Tuple
 
 from synapse.api.constants import (
     EventTypes,
@@ -1041,7 +1041,7 @@ class RoomEventSource(object):
     ):
         # We just ignore the key for now.
 
-        to_key = await self.get_current_key()
+        to_key = self.get_current_key()
 
         from_token = RoomStreamToken.parse(from_key)
         if from_token.topological:
@@ -1081,10 +1081,10 @@ class RoomEventSource(object):
 
         return (events, end_key)
 
-    def get_current_key(self):
-        return self.store.get_room_events_max_id()
+    def get_current_key(self) -> str:
+        return "s%d" % (self.store.get_room_max_stream_ordering(),)
 
-    def get_current_key_for_room(self, room_id):
+    def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
         return self.store.get_room_events_max_id(room_id)
 
 
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 9b312a1558..d58f9788c5 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -340,7 +340,7 @@ class SearchHandler(BaseHandler):
         # If client has asked for "context" for each event (i.e. some surrounding
         # events and state), fetch that
         if event_context is not None:
-            now_token = await self.hs.get_event_sources().get_current_token()
+            now_token = self.hs.get_event_sources().get_current_token()
 
             contexts = {}
             for event in allowed_events:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index eaa4eeadf7..5a19bac929 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -961,7 +961,7 @@ class SyncHandler(object):
         # this is due to some of the underlying streams not supporting the ability
         # to query up to a given point.
         # Always use the `now_token` in `SyncResultBuilder`
-        now_token = await self.event_sources.get_current_token()
+        now_token = self.event_sources.get_current_token()
 
         logger.debug(
             "Calculating sync response for %r between %s and %s",
diff --git a/synapse/notifier.py b/synapse/notifier.py
index bd41f77852..22ab4a9da5 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -320,7 +320,7 @@ class Notifier(object):
         """
         user_stream = self.user_to_user_stream.get(user_id)
         if user_stream is None:
-            current_token = await self.event_sources.get_current_token()
+            current_token = self.event_sources.get_current_token()
             if room_ids is None:
                 room_ids = await self.store.get_rooms_for_user(user_id)
             user_stream = _NotifierUserStream(
@@ -397,7 +397,7 @@ class Notifier(object):
         """
         from_token = pagination_config.from_token
         if not from_token:
-            from_token = await self.event_sources.get_current_token()
+            from_token = self.event_sources.get_current_token()
 
         limit = pagination_config.limit
 
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index 10d39b3699..f1334a6efc 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -39,6 +39,7 @@ what sort order was used:
 import abc
 import logging
 from collections import namedtuple
+from typing import Optional
 
 from twisted.internet import defer
 
@@ -557,19 +558,18 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return self.db.runInteraction("get_room_event_before_stream_ordering", _f)
 
-    @defer.inlineCallbacks
-    def get_room_events_max_id(self, room_id=None):
+    async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str:
         """Returns the current token for rooms stream.
 
         By default, it returns the current global stream token. Specifying a
         `room_id` causes it to return the current room specific topological
         token.
         """
-        token = yield self.get_room_max_stream_ordering()
+        token = self.get_room_max_stream_ordering()
         if room_id is None:
             return "s%d" % (token,)
         else:
-            topo = yield self.db.runInteraction(
+            topo = await self.db.runInteraction(
                 "_get_max_topological_txn", self._get_max_topological_txn, room_id
             )
             return "t%d-%d" % (topo, token)
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 5d3eddcfdc..393e34b9fb 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -15,8 +15,6 @@
 
 from typing import Any, Dict
 
-from twisted.internet import defer
-
 from synapse.handlers.account_data import AccountDataEventSource
 from synapse.handlers.presence import PresenceEventSource
 from synapse.handlers.receipts import ReceiptEventSource
@@ -40,19 +38,18 @@ class EventSources(object):
         }  # type: Dict[str, Any]
         self.store = hs.get_datastore()
 
-    @defer.inlineCallbacks
-    def get_current_token(self):
+    def get_current_token(self) -> StreamToken:
         push_rules_key, _ = self.store.get_push_rules_stream_token()
         to_device_key = self.store.get_to_device_stream_token()
         device_list_key = self.store.get_device_stream_token()
         groups_key = self.store.get_group_stream_token()
 
         token = StreamToken(
-            room_key=(yield self.sources["room"].get_current_key()),
-            presence_key=(yield self.sources["presence"].get_current_key()),
-            typing_key=(yield self.sources["typing"].get_current_key()),
-            receipt_key=(yield self.sources["receipt"].get_current_key()),
-            account_data_key=(yield self.sources["account_data"].get_current_key()),
+            room_key=self.sources["room"].get_current_key(),
+            presence_key=self.sources["presence"].get_current_key(),
+            typing_key=self.sources["typing"].get_current_key(),
+            receipt_key=self.sources["receipt"].get_current_key(),
+            account_data_key=self.sources["account_data"].get_current_key(),
             push_rules_key=push_rules_key,
             to_device_key=to_device_key,
             device_list_key=device_list_key,
@@ -60,8 +57,7 @@ class EventSources(object):
         )
         return token
 
-    @defer.inlineCallbacks
-    def get_current_token_for_pagination(self):
+    def get_current_token_for_pagination(self) -> StreamToken:
         """Get the current token for a given room to be used to paginate
         events.
 
@@ -69,10 +65,10 @@ class EventSources(object):
         than `room`, since they are not used during pagination.
 
         Returns:
-            Deferred[StreamToken]
+            The current token for pagination.
         """
         token = StreamToken(
-            room_key=(yield self.sources["room"].get_current_key()),
+            room_key=self.sources["room"].get_current_key(),
             presence_key=0,
             typing_key=0,
             receipt_key=0,