From 3b4556cf87666bb6f40d89c8c7fff42d336237b6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Aug 2020 17:12:45 +0100 Subject: Fix `wait_for_stream_position` for multiple waiters. (#8196) This fixes a bug where having multiple callers waiting on the same stream and position will cause it to try and compare two deferreds, which fails (due to the sorted list having an entry of `Tuple[int, Deferred]`). --- changelog.d/8196.misc | 1 + synapse/python_dependencies.py | 4 +++- synapse/replication/tcp/client.py | 6 ++---- 3 files changed, 6 insertions(+), 5 deletions(-) create mode 100644 changelog.d/8196.misc diff --git a/changelog.d/8196.misc b/changelog.d/8196.misc new file mode 100644 index 0000000000..c42baf0e56 --- /dev/null +++ b/changelog.d/8196.misc @@ -0,0 +1 @@ +Fix `wait_for_stream_position` to allow multiple waiters on same stream ID. diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index dd77a44b8d..2d995ec456 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -66,7 +66,9 @@ REQUIREMENTS = [ "msgpack>=0.5.2", "phonenumbers>=8.2.0", "prometheus_client>=0.0.18,<0.9.0", - # we use attr.validators.deep_iterable, which arrived in 19.1.0 + # we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note: + # Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33 + # is out in November.) "attrs>=19.1.0", "netaddr>=0.7.18", "Jinja2>=2.9", diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index fcf8ebf1e7..d6ecf5b327 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -14,7 +14,6 @@ # limitations under the License. """A replication client for use by synapse workers. """ -import heapq import logging from typing import TYPE_CHECKING, Dict, List, Tuple @@ -219,9 +218,8 @@ class ReplicationDataHandler: waiting_list = self._streams_to_waiters.setdefault(stream_name, []) - # We insert into the list using heapq as it is more efficient than - # pushing then resorting each time. - heapq.heappush(waiting_list, (position, deferred)) + waiting_list.append((position, deferred)) + waiting_list.sort(key=lambda t: t[0]) # We measure here to get in flight counts and average waiting time. with Measure(self._clock, "repl.wait_for_stream_position"): -- cgit 1.5.1 From b4826d6eb18eeb05c973882efd4c0f5d68359449 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 28 Aug 2020 17:39:14 +0100 Subject: Fix incorrect return signature --- synapse/storage/databases/main/registration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 000b8ccff4..01f20c03c2 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -99,7 +99,7 @@ class RegistrationWorkerStore(SQLBaseStore): ) @cached() - async def get_expiration_ts_for_user(self, user_id: str) -> Optional[None]: + async def get_expiration_ts_for_user(self, user_id: str) -> Optional[int]: """Get the expiration timestamp for the account bearing a given user ID. Args: -- cgit 1.5.1 From d2ac767de2ce895a965fb2fcdcb883636f19a5c5 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 28 Aug 2020 16:47:11 -0400 Subject: Convert ReadWriteLock to async/await. (#8202) --- changelog.d/8202.misc | 1 + synapse/handlers/pagination.py | 49 ++++++++++++++++++++++-------------------- synapse/util/async_helpers.py | 16 +++++++------- tests/util/test_rwlock.py | 6 ++++-- 4 files changed, 39 insertions(+), 33 deletions(-) create mode 100644 changelog.d/8202.misc diff --git a/changelog.d/8202.misc b/changelog.d/8202.misc new file mode 100644 index 0000000000..dfe4c03171 --- /dev/null +++ b/changelog.d/8202.misc @@ -0,0 +1 @@ +Convert various parts of the codebase to async/await. diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index ac3418d69d..5a1aa7d830 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -14,15 +14,18 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import Any, Dict, Optional from twisted.python.failure import Failure from synapse.api.constants import EventTypes, Membership from synapse.api.errors import SynapseError +from synapse.api.filtering import Filter from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.state import StateFilter -from synapse.types import RoomStreamToken +from synapse.streams.config import PaginationConfig +from synapse.types import Requester, RoomStreamToken from synapse.util.async_helpers import ReadWriteLock from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client @@ -247,15 +250,16 @@ class PaginationHandler(object): ) return purge_id - async def _purge_history(self, purge_id, room_id, token, delete_local_events): + async def _purge_history( + self, purge_id: str, room_id: str, token: str, delete_local_events: bool + ) -> None: """Carry out a history purge on a room. Args: - purge_id (str): The id for this purge - room_id (str): The room to purge from - token (str): topological token to delete events before - delete_local_events (bool): True to delete local events as well as - remote ones + purge_id: The id for this purge + room_id: The room to purge from + token: topological token to delete events before + delete_local_events: True to delete local events as well as remote ones """ self._purges_in_progress_by_room.add(room_id) try: @@ -291,9 +295,9 @@ class PaginationHandler(object): """ return self._purges_by_id.get(purge_id) - async def purge_room(self, room_id): + async def purge_room(self, room_id: str) -> None: """Purge the given room from the database""" - with (await self.pagination_lock.write(room_id)): + with await self.pagination_lock.write(room_id): # check we know about the room await self.store.get_room_version_id(room_id) @@ -307,23 +311,22 @@ class PaginationHandler(object): async def get_messages( self, - requester, - room_id=None, - pagin_config=None, - as_client_event=True, - event_filter=None, - ): + requester: Requester, + room_id: Optional[str] = None, + pagin_config: Optional[PaginationConfig] = None, + as_client_event: bool = True, + event_filter: Optional[Filter] = None, + ) -> Dict[str, Any]: """Get messages in a room. Args: - requester (Requester): The user requesting messages. - room_id (str): The room they want messages from. - pagin_config (synapse.api.streams.PaginationConfig): The pagination - config rules to apply, if any. - as_client_event (bool): True to get events in client-server format. - event_filter (Filter): Filter to apply to results or None + requester: The user requesting messages. + room_id: The room they want messages from. + pagin_config: The pagination config rules to apply, if any. + as_client_event: True to get events in client-server format. + event_filter: Filter to apply to results or None Returns: - dict: Pagination API results + Pagination API results """ user_id = requester.user.to_string() @@ -343,7 +346,7 @@ class PaginationHandler(object): source_config = pagin_config.get_source_config("room") - with (await self.pagination_lock.read(room_id)): + with await self.pagination_lock.read(room_id): ( membership, member_event_id, diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index f562770922..dfefbd996d 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -20,6 +20,7 @@ from contextlib import contextmanager from typing import Dict, Sequence, Set, Union import attr +from typing_extensions import ContextManager from twisted.internet import defer from twisted.internet.defer import CancelledError @@ -338,11 +339,11 @@ class Linearizer(object): class ReadWriteLock(object): - """A deferred style read write lock. + """An async read write lock. Example: - with (yield read_write_lock.read("test_key")): + with await read_write_lock.read("test_key"): # do some work """ @@ -365,8 +366,7 @@ class ReadWriteLock(object): # Latest writer queued self.key_to_current_writer = {} # type: Dict[str, defer.Deferred] - @defer.inlineCallbacks - def read(self, key): + async def read(self, key: str) -> ContextManager: new_defer = defer.Deferred() curr_readers = self.key_to_current_readers.setdefault(key, set()) @@ -376,7 +376,8 @@ class ReadWriteLock(object): # We wait for the latest writer to finish writing. We can safely ignore # any existing readers... as they're readers. - yield make_deferred_yieldable(curr_writer) + if curr_writer: + await make_deferred_yieldable(curr_writer) @contextmanager def _ctx_manager(): @@ -388,8 +389,7 @@ class ReadWriteLock(object): return _ctx_manager() - @defer.inlineCallbacks - def write(self, key): + async def write(self, key: str) -> ContextManager: new_defer = defer.Deferred() curr_readers = self.key_to_current_readers.get(key, set()) @@ -405,7 +405,7 @@ class ReadWriteLock(object): curr_readers.clear() self.key_to_current_writer[key] = new_defer - yield make_deferred_yieldable(defer.gatherResults(to_wait_on)) + await make_deferred_yieldable(defer.gatherResults(to_wait_on)) @contextmanager def _ctx_manager(): diff --git a/tests/util/test_rwlock.py b/tests/util/test_rwlock.py index bd32e2cee7..d3dea3b52a 100644 --- a/tests/util/test_rwlock.py +++ b/tests/util/test_rwlock.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from synapse.util.async_helpers import ReadWriteLock @@ -43,6 +44,7 @@ class ReadWriteLockTestCase(unittest.TestCase): rwlock.read(key), # 5 rwlock.write(key), # 6 ] + ds = [defer.ensureDeferred(d) for d in ds] self._assert_called_before_not_after(ds, 2) @@ -73,12 +75,12 @@ class ReadWriteLockTestCase(unittest.TestCase): with ds[6].result: pass - d = rwlock.write(key) + d = defer.ensureDeferred(rwlock.write(key)) self.assertTrue(d.called) with d.result: pass - d = rwlock.read(key) + d = defer.ensureDeferred(rwlock.read(key)) self.assertTrue(d.called) with d.result: pass -- cgit 1.5.1