From a93f3121f8fd1c2b77e003d8e43ce881635bb098 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 9 Oct 2020 07:20:51 -0400 Subject: Add type hints to some handlers (#8505) --- synapse/notifier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index 59415f6f88..13adeed01e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -339,7 +339,7 @@ class Notifier: self, stream_key: str, new_token: Union[int, RoomStreamToken], - users: Collection[UserID] = [], + users: Collection[Union[str, UserID]] = [], rooms: Collection[str] = [], ): """ Used to inform listeners that something has happened event wise. -- cgit 1.5.1 From 921a3f8a59da0f8fe706a22627f464a74b54c992 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Oct 2020 13:27:51 +0100 Subject: Fix not sending events over federation when using sharded event persisters (#8536) * Fix outbound federaion with multiple event persisters. We incorrectly notified federation senders that the minimum persisted stream position had advanced when we got an `RDATA` from an event persister. Notifying of federation senders already correctly happens in the notifier, so we just delete the offending line. * Change some interfaces to use RoomStreamToken. By enforcing use of `RoomStreamTokens` we make it less likely that people pass in random ints that they got from somewhere random. --- changelog.d/8536.bugfix | 1 + synapse/app/generic_worker.py | 4 ---- synapse/federation/send_queue.py | 2 +- synapse/federation/sender/__init__.py | 9 +++++++-- synapse/handlers/appservice.py | 11 +++++++---- synapse/notifier.py | 6 +++--- synapse/push/emailpusher.py | 8 +++++++- synapse/push/httppusher.py | 8 +++++++- synapse/push/pusherpool.py | 10 ++++++++-- tests/handlers/test_appservice.py | 13 ++++++++++--- 10 files changed, 51 insertions(+), 21 deletions(-) create mode 100644 changelog.d/8536.bugfix (limited to 'synapse/notifier.py') diff --git a/changelog.d/8536.bugfix b/changelog.d/8536.bugfix new file mode 100644 index 0000000000..8d238cc008 --- /dev/null +++ b/changelog.d/8536.bugfix @@ -0,0 +1 @@ +Fix not sending events over federation when using sharded event writers. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index d53181deb1..1b511890aa 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -790,10 +790,6 @@ class FederationSenderHandler: send_queue.process_rows_for_federation(self.federation_sender, rows) await self.update_token(token) - # We also need to poke the federation sender when new events happen - elif stream_name == "events": - self.federation_sender.notify_new_events(token) - # ... and when new receipts happen elif stream_name == ReceiptsStream.NAME: await self._on_new_receipts(rows) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 8e46957d15..5f1bf492c1 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -188,7 +188,7 @@ class FederationRemoteSendQueue: for key in keys[:i]: del self.edus[key] - def notify_new_events(self, current_id): + def notify_new_events(self, max_token): """As per FederationSender""" # We don't need to replicate this as it gets sent down a different # stream. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index e33b29a42c..604cfd1935 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -40,7 +40,7 @@ from synapse.metrics import ( events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import ReadReceipt +from synapse.types import ReadReceipt, RoomStreamToken from synapse.util.metrics import Measure, measure_func logger = logging.getLogger(__name__) @@ -154,10 +154,15 @@ class FederationSender: self._per_destination_queues[destination] = queue return queue - def notify_new_events(self, current_id: int) -> None: + def notify_new_events(self, max_token: RoomStreamToken) -> None: """This gets called when we have some new events we might want to send out to other servers. """ + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + current_id = max_token.stream + self._last_poked_id = max(current_id, self._last_poked_id) if self._is_processing: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 9d4e87dad6..c8d5e58035 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -27,6 +27,7 @@ from synapse.metrics import ( event_processing_loop_room_count, ) from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import RoomStreamToken from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -47,15 +48,17 @@ class ApplicationServicesHandler: self.current_max = 0 self.is_processing = False - async def notify_interested_services(self, current_id): + async def notify_interested_services(self, max_token: RoomStreamToken): """Notifies (pushes) all application services interested in this event. Pushing is done asynchronously, so this method won't block for any prolonged length of time. - - Args: - current_id(int): The current maximum ID. """ + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + current_id = max_token.stream + services = self.store.get_app_services() if not services or not self.notify_appservices: return diff --git a/synapse/notifier.py b/synapse/notifier.py index 13adeed01e..51c830c91e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -319,19 +319,19 @@ class Notifier: ) if self.federation_sender: - self.federation_sender.notify_new_events(max_room_stream_token.stream) + self.federation_sender.notify_new_events(max_room_stream_token) async def _notify_app_services(self, max_room_stream_token: RoomStreamToken): try: await self.appservice_handler.notify_interested_services( - max_room_stream_token.stream + max_room_stream_token ) except Exception: logger.exception("Error notifying application services of event") async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): try: - await self._pusher_pool.on_new_notifications(max_room_stream_token.stream) + await self._pusher_pool.on_new_notifications(max_room_stream_token) except Exception: logger.exception("Error pusher pool of event") diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 28bd8ab748..c6763971ee 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -18,6 +18,7 @@ import logging from twisted.internet.error import AlreadyCalled, AlreadyCancelled from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import RoomStreamToken logger = logging.getLogger(__name__) @@ -91,7 +92,12 @@ class EmailPusher: pass self.timed_call = None - def on_new_notifications(self, max_stream_ordering): + def on_new_notifications(self, max_token: RoomStreamToken): + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_ordering = max_token.stream + if self.max_stream_ordering: self.max_stream_ordering = max( max_stream_ordering, self.max_stream_ordering diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 26706bf3e1..793d0db2d9 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -23,6 +23,7 @@ from synapse.api.constants import EventTypes from synapse.logging import opentracing from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import PusherConfigException +from synapse.types import RoomStreamToken from . import push_rule_evaluator, push_tools @@ -114,7 +115,12 @@ class HttpPusher: if should_check_for_notifs: self._start_processing() - def on_new_notifications(self, max_stream_ordering): + def on_new_notifications(self, max_token: RoomStreamToken): + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_ordering = max_token.stream + self.max_stream_ordering = max( max_stream_ordering, self.max_stream_ordering or 0 ) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 76150e117b..0080c68ce2 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -24,6 +24,7 @@ from synapse.push import PusherConfigException from synapse.push.emailpusher import EmailPusher from synapse.push.httppusher import HttpPusher from synapse.push.pusher import PusherFactory +from synapse.types import RoomStreamToken from synapse.util.async_helpers import concurrently_execute if TYPE_CHECKING: @@ -186,11 +187,16 @@ class PusherPool: ) await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"]) - async def on_new_notifications(self, max_stream_id: int): + async def on_new_notifications(self, max_token: RoomStreamToken): if not self.pushers: # nothing to do here. return + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_id = max_token.stream + if max_stream_id < self._last_room_stream_id_seen: # Nothing to do return @@ -214,7 +220,7 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): - p.on_new_notifications(max_stream_id) + p.on_new_notifications(max_token) except Exception: logger.exception("Exception in pusher on_new_notifications") diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 2a0b7c1b56..ee4f3da31c 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -18,6 +18,7 @@ from mock import Mock from twisted.internet import defer from synapse.handlers.appservice import ApplicationServicesHandler +from synapse.types import RoomStreamToken from tests.test_utils import make_awaitable from tests.utils import MockClock @@ -61,7 +62,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred(self.handler.notify_interested_services(0)) + yield defer.ensureDeferred( + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + ) self.mock_scheduler.submit_event_for_as.assert_called_once_with( interested_service, event ) @@ -80,7 +83,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred(self.handler.notify_interested_services(0)) + yield defer.ensureDeferred( + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + ) self.mock_as_api.query_user.assert_called_once_with(services[0], user_id) @defer.inlineCallbacks @@ -97,7 +102,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred(self.handler.notify_interested_services(0)) + yield defer.ensureDeferred( + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + ) self.assertFalse( self.mock_as_api.query_user.called, "query_user called when it shouldn't have been.", -- cgit 1.5.1 From c276bd996916adce899410b9c4c891892f51b992 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Thu, 15 Oct 2020 17:33:28 +0100 Subject: Send some ephemeral events to appservices (#8437) Optionally sends typing, presence, and read receipt information to appservices. --- changelog.d/8437.feature | 1 + mypy.ini | 1 + synapse/appservice/__init__.py | 180 ++++++++++++++------- synapse/appservice/api.py | 27 +++- synapse/appservice/scheduler.py | 48 ++++-- synapse/config/appservice.py | 3 + synapse/handlers/appservice.py | 109 ++++++++++++- synapse/handlers/receipts.py | 35 +++- synapse/handlers/sync.py | 1 - synapse/handlers/typing.py | 31 +++- synapse/notifier.py | 25 +++ synapse/storage/databases/main/appservice.py | 66 ++++++-- synapse/storage/databases/main/receipts.py | 55 +++++++ .../main/schema/delta/59/19as_device_stream.sql | 18 +++ tests/appservice/test_scheduler.py | 77 ++++++--- tests/storage/test_appservice.py | 8 +- 16 files changed, 563 insertions(+), 122 deletions(-) create mode 100644 changelog.d/8437.feature create mode 100644 synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql (limited to 'synapse/notifier.py') diff --git a/changelog.d/8437.feature b/changelog.d/8437.feature new file mode 100644 index 0000000000..4abcccb326 --- /dev/null +++ b/changelog.d/8437.feature @@ -0,0 +1 @@ +Implement [MSC2409](https://github.com/matrix-org/matrix-doc/pull/2409) to send typing, read receipts, and presence events to appservices. diff --git a/mypy.ini b/mypy.ini index 9748f6258c..b5db54ee3b 100644 --- a/mypy.ini +++ b/mypy.ini @@ -15,6 +15,7 @@ files = synapse/events/builder.py, synapse/events/spamcheck.py, synapse/federation, + synapse/handlers/appservice.py, synapse/handlers/account_data.py, synapse/handlers/auth.py, synapse/handlers/cas_handler.py, diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 13ec1f71a6..3862d9c08f 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -14,14 +14,15 @@ # limitations under the License. import logging import re -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Iterable, List, Match, Optional from synapse.api.constants import EventTypes -from synapse.appservice.api import ApplicationServiceApi -from synapse.types import GroupID, get_domain_from_id +from synapse.events import EventBase +from synapse.types import GroupID, JsonDict, UserID, get_domain_from_id from synapse.util.caches.descriptors import cached if TYPE_CHECKING: + from synapse.appservice.api import ApplicationServiceApi from synapse.storage.databases.main import DataStore logger = logging.getLogger(__name__) @@ -32,38 +33,6 @@ class ApplicationServiceState: UP = "up" -class AppServiceTransaction: - """Represents an application service transaction.""" - - def __init__(self, service, id, events): - self.service = service - self.id = id - self.events = events - - async def send(self, as_api: ApplicationServiceApi) -> bool: - """Sends this transaction using the provided AS API interface. - - Args: - as_api: The API to use to send. - Returns: - True if the transaction was sent. - """ - return await as_api.push_bulk( - service=self.service, events=self.events, txn_id=self.id - ) - - async def complete(self, store: "DataStore") -> None: - """Completes this transaction as successful. - - Marks this transaction ID on the application service and removes the - transaction contents from the database. - - Args: - store: The database store to operate on. - """ - await store.complete_appservice_txn(service=self.service, txn_id=self.id) - - class ApplicationService: """Defines an application service. This definition is mostly what is provided to the /register AS API. @@ -91,6 +60,7 @@ class ApplicationService: protocols=None, rate_limited=True, ip_range_whitelist=None, + supports_ephemeral=False, ): self.token = token self.url = ( @@ -102,6 +72,7 @@ class ApplicationService: self.namespaces = self._check_namespaces(namespaces) self.id = id self.ip_range_whitelist = ip_range_whitelist + self.supports_ephemeral = supports_ephemeral if "|" in self.id: raise Exception("application service ID cannot contain '|' character") @@ -161,19 +132,21 @@ class ApplicationService: raise ValueError("Expected string for 'regex' in ns '%s'" % ns) return namespaces - def _matches_regex(self, test_string, namespace_key): + def _matches_regex(self, test_string: str, namespace_key: str) -> Optional[Match]: for regex_obj in self.namespaces[namespace_key]: if regex_obj["regex"].match(test_string): return regex_obj return None - def _is_exclusive(self, ns_key, test_string): + def _is_exclusive(self, ns_key: str, test_string: str) -> bool: regex_obj = self._matches_regex(test_string, ns_key) if regex_obj: return regex_obj["exclusive"] return False - async def _matches_user(self, event, store): + async def _matches_user( + self, event: Optional[EventBase], store: Optional["DataStore"] = None + ) -> bool: if not event: return False @@ -188,14 +161,23 @@ class ApplicationService: if not store: return False - does_match = await self._matches_user_in_member_list(event.room_id, store) + does_match = await self.matches_user_in_member_list(event.room_id, store) return does_match - @cached(num_args=1, cache_context=True) - async def _matches_user_in_member_list(self, room_id, store, cache_context): - member_list = await store.get_users_in_room( - room_id, on_invalidate=cache_context.invalidate - ) + @cached(num_args=1) + async def matches_user_in_member_list( + self, room_id: str, store: "DataStore" + ) -> bool: + """Check if this service is interested a room based upon it's membership + + Args: + room_id: The room to check. + store: The datastore to query. + + Returns: + True if this service would like to know about this room. + """ + member_list = await store.get_users_in_room(room_id) # check joined member events for user_id in member_list: @@ -203,12 +185,14 @@ class ApplicationService: return True return False - def _matches_room_id(self, event): + def _matches_room_id(self, event: EventBase) -> bool: if hasattr(event, "room_id"): return self.is_interested_in_room(event.room_id) return False - async def _matches_aliases(self, event, store): + async def _matches_aliases( + self, event: EventBase, store: Optional["DataStore"] = None + ) -> bool: if not store or not event: return False @@ -218,12 +202,15 @@ class ApplicationService: return True return False - async def is_interested(self, event, store=None) -> bool: + async def is_interested( + self, event: EventBase, store: Optional["DataStore"] = None + ) -> bool: """Check if this service is interested in this event. Args: - event(Event): The event to check. - store(DataStore) + event: The event to check. + store: The datastore to query. + Returns: True if this service would like to know about this event. """ @@ -231,39 +218,66 @@ class ApplicationService: if self._matches_room_id(event): return True + # This will check the namespaces first before + # checking the store, so should be run before _matches_aliases + if await self._matches_user(event, store): + return True + + # This will check the store, so should be run last if await self._matches_aliases(event, store): return True - if await self._matches_user(event, store): + return False + + @cached(num_args=1) + async def is_interested_in_presence( + self, user_id: UserID, store: "DataStore" + ) -> bool: + """Check if this service is interested a user's presence + + Args: + user_id: The user to check. + store: The datastore to query. + + Returns: + True if this service would like to know about presence for this user. + """ + # Find all the rooms the sender is in + if self.is_interested_in_user(user_id.to_string()): return True + room_ids = await store.get_rooms_for_user(user_id.to_string()) + # Then find out if the appservice is interested in any of those rooms + for room_id in room_ids: + if await self.matches_user_in_member_list(room_id, store): + return True return False - def is_interested_in_user(self, user_id): + def is_interested_in_user(self, user_id: str) -> bool: return ( - self._matches_regex(user_id, ApplicationService.NS_USERS) + bool(self._matches_regex(user_id, ApplicationService.NS_USERS)) or user_id == self.sender ) - def is_interested_in_alias(self, alias): + def is_interested_in_alias(self, alias: str) -> bool: return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES)) - def is_interested_in_room(self, room_id): + def is_interested_in_room(self, room_id: str) -> bool: return bool(self._matches_regex(room_id, ApplicationService.NS_ROOMS)) - def is_exclusive_user(self, user_id): + def is_exclusive_user(self, user_id: str) -> bool: return ( self._is_exclusive(ApplicationService.NS_USERS, user_id) or user_id == self.sender ) - def is_interested_in_protocol(self, protocol): + def is_interested_in_protocol(self, protocol: str) -> bool: return protocol in self.protocols - def is_exclusive_alias(self, alias): + def is_exclusive_alias(self, alias: str) -> bool: return self._is_exclusive(ApplicationService.NS_ALIASES, alias) - def is_exclusive_room(self, room_id): + def is_exclusive_room(self, room_id: str) -> bool: return self._is_exclusive(ApplicationService.NS_ROOMS, room_id) def get_exclusive_user_regexes(self): @@ -276,14 +290,14 @@ class ApplicationService: if regex_obj["exclusive"] ] - def get_groups_for_user(self, user_id): + def get_groups_for_user(self, user_id: str) -> Iterable[str]: """Get the groups that this user is associated with by this AS Args: - user_id (str): The ID of the user. + user_id: The ID of the user. Returns: - iterable[str]: an iterable that yields group_id strings. + An iterable that yields group_id strings. """ return ( regex_obj["group_id"] @@ -291,7 +305,7 @@ class ApplicationService: if "group_id" in regex_obj and regex_obj["regex"].match(user_id) ) - def is_rate_limited(self): + def is_rate_limited(self) -> bool: return self.rate_limited def __str__(self): @@ -300,3 +314,45 @@ class ApplicationService: dict_copy["token"] = "" dict_copy["hs_token"] = "" return "ApplicationService: %s" % (dict_copy,) + + +class AppServiceTransaction: + """Represents an application service transaction.""" + + def __init__( + self, + service: ApplicationService, + id: int, + events: List[EventBase], + ephemeral: List[JsonDict], + ): + self.service = service + self.id = id + self.events = events + self.ephemeral = ephemeral + + async def send(self, as_api: "ApplicationServiceApi") -> bool: + """Sends this transaction using the provided AS API interface. + + Args: + as_api: The API to use to send. + Returns: + True if the transaction was sent. + """ + return await as_api.push_bulk( + service=self.service, + events=self.events, + ephemeral=self.ephemeral, + txn_id=self.id, + ) + + async def complete(self, store: "DataStore") -> None: + """Completes this transaction as successful. + + Marks this transaction ID on the application service and removes the + transaction contents from the database. + + Args: + store: The database store to operate on. + """ + await store.complete_appservice_txn(service=self.service, txn_id=self.id) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index e8f0793795..e366a982b8 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -14,12 +14,13 @@ # limitations under the License. import logging import urllib -from typing import TYPE_CHECKING, Optional, Tuple +from typing import TYPE_CHECKING, List, Optional, Tuple from prometheus_client import Counter from synapse.api.constants import EventTypes, ThirdPartyEntityKind from synapse.api.errors import CodeMessageException +from synapse.events import EventBase from synapse.events.utils import serialize_event from synapse.http.client import SimpleHttpClient from synapse.types import JsonDict, ThirdPartyInstanceID @@ -201,7 +202,13 @@ class ApplicationServiceApi(SimpleHttpClient): key = (service.id, protocol) return await self.protocol_meta_cache.wrap(key, _get) - async def push_bulk(self, service, events, txn_id=None): + async def push_bulk( + self, + service: "ApplicationService", + events: List[EventBase], + ephemeral: List[JsonDict], + txn_id: Optional[int] = None, + ): if service.url is None: return True @@ -211,15 +218,19 @@ class ApplicationServiceApi(SimpleHttpClient): logger.warning( "push_bulk: Missing txn ID sending events to %s", service.url ) - txn_id = str(0) - txn_id = str(txn_id) + txn_id = 0 + + uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id))) + + # Never send ephemeral events to appservices that do not support it + if service.supports_ephemeral: + body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral} + else: + body = {"events": events} - uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id)) try: await self.put_json( - uri=uri, - json_body={"events": events}, - args={"access_token": service.hs_token}, + uri=uri, json_body=body, args={"access_token": service.hs_token}, ) sent_transactions_counter.labels(service.id).inc() sent_events_counter.labels(service.id).inc(len(events)) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 8eb8c6f51c..ad3c408519 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -49,10 +49,13 @@ This is all tied together by the AppServiceScheduler which DIs the required components. """ import logging +from typing import List -from synapse.appservice import ApplicationServiceState +from synapse.appservice import ApplicationService, ApplicationServiceState +from synapse.events import EventBase from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import JsonDict logger = logging.getLogger(__name__) @@ -82,8 +85,13 @@ class ApplicationServiceScheduler: for service in services: self.txn_ctrl.start_recoverer(service) - def submit_event_for_as(self, service, event): - self.queuer.enqueue(service, event) + def submit_event_for_as(self, service: ApplicationService, event: EventBase): + self.queuer.enqueue_event(service, event) + + def submit_ephemeral_events_for_as( + self, service: ApplicationService, events: List[JsonDict] + ): + self.queuer.enqueue_ephemeral(service, events) class _ServiceQueuer: @@ -96,17 +104,15 @@ class _ServiceQueuer: def __init__(self, txn_ctrl, clock): self.queued_events = {} # dict of {service_id: [events]} + self.queued_ephemeral = {} # dict of {service_id: [events]} # the appservices which currently have a transaction in flight self.requests_in_flight = set() self.txn_ctrl = txn_ctrl self.clock = clock - def enqueue(self, service, event): - self.queued_events.setdefault(service.id, []).append(event) - + def _start_background_request(self, service): # start a sender for this appservice if we don't already have one - if service.id in self.requests_in_flight: return @@ -114,7 +120,15 @@ class _ServiceQueuer: "as-sender-%s" % (service.id,), self._send_request, service ) - async def _send_request(self, service): + def enqueue_event(self, service: ApplicationService, event: EventBase): + self.queued_events.setdefault(service.id, []).append(event) + self._start_background_request(service) + + def enqueue_ephemeral(self, service: ApplicationService, events: List[JsonDict]): + self.queued_ephemeral.setdefault(service.id, []).extend(events) + self._start_background_request(service) + + async def _send_request(self, service: ApplicationService): # sanity-check: we shouldn't get here if this service already has a sender # running. assert service.id not in self.requests_in_flight @@ -123,10 +137,11 @@ class _ServiceQueuer: try: while True: events = self.queued_events.pop(service.id, []) - if not events: + ephemeral = self.queued_ephemeral.pop(service.id, []) + if not events and not ephemeral: return try: - await self.txn_ctrl.send(service, events) + await self.txn_ctrl.send(service, events, ephemeral) except Exception: logger.exception("AS request failed") finally: @@ -158,9 +173,16 @@ class _TransactionController: # for UTs self.RECOVERER_CLASS = _Recoverer - async def send(self, service, events): + async def send( + self, + service: ApplicationService, + events: List[EventBase], + ephemeral: List[JsonDict] = [], + ): try: - txn = await self.store.create_appservice_txn(service=service, events=events) + txn = await self.store.create_appservice_txn( + service=service, events=events, ephemeral=ephemeral + ) service_is_up = await self._is_service_up(service) if service_is_up: sent = await txn.send(self.as_api) @@ -204,7 +226,7 @@ class _TransactionController: recoverer.recover() logger.info("Now %i active recoverers", len(self.recoverers)) - async def _is_service_up(self, service): + async def _is_service_up(self, service: ApplicationService) -> bool: state = await self.store.get_appservice_state(service) return state == ApplicationServiceState.UP or state is None diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index 8ed3e24258..746fc3cc02 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -160,6 +160,8 @@ def _load_appservice(hostname, as_info, config_filename): if as_info.get("ip_range_whitelist"): ip_range_whitelist = IPSet(as_info.get("ip_range_whitelist")) + supports_ephemeral = as_info.get("de.sorunome.msc2409.push_ephemeral", False) + return ApplicationService( token=as_info["as_token"], hostname=hostname, @@ -168,6 +170,7 @@ def _load_appservice(hostname, as_info, config_filename): hs_token=as_info["hs_token"], sender=user_id, id=as_info["id"], + supports_ephemeral=supports_ephemeral, protocols=protocols, rate_limited=rate_limited, ip_range_whitelist=ip_range_whitelist, diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index c8d5e58035..07240d3a14 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import Dict, List, Optional from prometheus_client import Counter @@ -21,13 +22,16 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes +from synapse.appservice import ApplicationService +from synapse.events import EventBase +from synapse.handlers.presence import format_user_presence_state from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics import ( event_processing_loop_counter, event_processing_loop_room_count, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import RoomStreamToken +from synapse.types import Collection, JsonDict, RoomStreamToken, UserID from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -44,6 +48,7 @@ class ApplicationServicesHandler: self.started_scheduler = False self.clock = hs.get_clock() self.notify_appservices = hs.config.notify_appservices + self.event_sources = hs.get_event_sources() self.current_max = 0 self.is_processing = False @@ -82,7 +87,7 @@ class ApplicationServicesHandler: if not events: break - events_by_room = {} + events_by_room = {} # type: Dict[str, List[EventBase]] for event in events: events_by_room.setdefault(event.room_id, []).append(event) @@ -161,6 +166,104 @@ class ApplicationServicesHandler: finally: self.is_processing = False + async def notify_interested_services_ephemeral( + self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [], + ): + """This is called by the notifier in the background + when a ephemeral event handled by the homeserver. + + This will determine which appservices + are interested in the event, and submit them. + + Events will only be pushed to appservices + that have opted into ephemeral events + + Args: + stream_key: The stream the event came from. + new_token: The latest stream token + users: The user(s) involved with the event. + """ + services = [ + service + for service in self.store.get_app_services() + if service.supports_ephemeral + ] + if not services or not self.notify_appservices: + return + logger.info("Checking interested services for %s" % (stream_key)) + with Measure(self.clock, "notify_interested_services_ephemeral"): + for service in services: + # Only handle typing if we have the latest token + if stream_key == "typing_key" and new_token is not None: + events = await self._handle_typing(service, new_token) + if events: + self.scheduler.submit_ephemeral_events_for_as(service, events) + # We don't persist the token for typing_key for performance reasons + elif stream_key == "receipt_key": + events = await self._handle_receipts(service) + if events: + self.scheduler.submit_ephemeral_events_for_as(service, events) + await self.store.set_type_stream_id_for_appservice( + service, "read_receipt", new_token + ) + elif stream_key == "presence_key": + events = await self._handle_presence(service, users) + if events: + self.scheduler.submit_ephemeral_events_for_as(service, events) + await self.store.set_type_stream_id_for_appservice( + service, "presence", new_token + ) + + async def _handle_typing(self, service: ApplicationService, new_token: int): + typing_source = self.event_sources.sources["typing"] + # Get the typing events from just before current + typing, _ = await typing_source.get_new_events_as( + service=service, + # For performance reasons, we don't persist the previous + # token in the DB and instead fetch the latest typing information + # for appservices. + from_key=new_token - 1, + ) + return typing + + async def _handle_receipts(self, service: ApplicationService): + from_key = await self.store.get_type_stream_id_for_appservice( + service, "read_receipt" + ) + receipts_source = self.event_sources.sources["receipt"] + receipts, _ = await receipts_source.get_new_events_as( + service=service, from_key=from_key + ) + return receipts + + async def _handle_presence( + self, service: ApplicationService, users: Collection[UserID] + ): + events = [] # type: List[JsonDict] + presence_source = self.event_sources.sources["presence"] + from_key = await self.store.get_type_stream_id_for_appservice( + service, "presence" + ) + for user in users: + interested = await service.is_interested_in_presence(user, self.store) + if not interested: + continue + presence_events, _ = await presence_source.get_new_events( + user=user, service=service, from_key=from_key, + ) + time_now = self.clock.time_msec() + presence_events = [ + { + "type": "m.presence", + "sender": event.user_id, + "content": format_user_presence_state( + event, time_now, include_user_id=False + ), + } + for event in presence_events + ] + events = events + presence_events + async def query_user_exists(self, user_id): """Check if any application service knows this user_id exists. @@ -223,7 +326,7 @@ class ApplicationServicesHandler: async def get_3pe_protocols(self, only_protocol=None): services = self.store.get_app_services() - protocols = {} + protocols = {} # type: Dict[str, List[JsonDict]] # Collect up all the individual protocol responses out of the ASes for s in services: diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 7225923757..c242c409cf 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -13,9 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import List, Tuple +from synapse.appservice import ApplicationService from synapse.handlers._base import BaseHandler -from synapse.types import ReadReceipt, get_domain_from_id +from synapse.types import JsonDict, ReadReceipt, get_domain_from_id from synapse.util.async_helpers import maybe_awaitable logger = logging.getLogger(__name__) @@ -140,5 +142,36 @@ class ReceiptEventSource: return (events, to_key) + async def get_new_events_as( + self, from_key: int, service: ApplicationService + ) -> Tuple[List[JsonDict], int]: + """Returns a set of new receipt events that an appservice + may be interested in. + + Args: + from_key: the stream position at which events should be fetched from + service: The appservice which may be interested + """ + from_key = int(from_key) + to_key = self.get_current_key() + + if from_key == to_key: + return [], to_key + + # We first need to fetch all new receipts + rooms_to_events = await self.store.get_linearized_receipts_for_all_rooms( + from_key=from_key, to_key=to_key + ) + + # Then filter down to rooms that the AS can read + events = [] + for room_id, event in rooms_to_events.items(): + if not await service.matches_user_in_member_list(room_id, self.store): + continue + + events.append(event) + + return (events, to_key) + def get_current_key(self, direction="f"): return self.store.get_max_receipt_stream_id() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a306631094..b527724bc4 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import itertools import logging from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 3cbfc2d780..d3692842e3 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -12,16 +12,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import random from collections import namedtuple from typing import TYPE_CHECKING, List, Set, Tuple from synapse.api.errors import AuthError, ShadowBanError, SynapseError +from synapse.appservice import ApplicationService from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.streams import TypingStream -from synapse.types import UserID, get_domain_from_id +from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -430,6 +430,33 @@ class TypingNotificationEventSource: "content": {"user_ids": list(typing)}, } + async def get_new_events_as( + self, from_key: int, service: ApplicationService + ) -> Tuple[List[JsonDict], int]: + """Returns a set of new typing events that an appservice + may be interested in. + + Args: + from_key: the stream position at which events should be fetched from + service: The appservice which may be interested + """ + with Measure(self.clock, "typing.get_new_events_as"): + from_key = int(from_key) + handler = self.get_typing_handler() + + events = [] + for room_id in handler._room_serials.keys(): + if handler._room_serials[room_id] <= from_key: + continue + if not await service.matches_user_in_member_list( + room_id, handler.store + ): + continue + + events.append(self._make_event_for(room_id)) + + return (events, handler._latest_room_serial) + async def get_new_events(self, from_key, room_ids, **kwargs): with Measure(self.clock, "typing.get_new_events"): from_key = int(from_key) diff --git a/synapse/notifier.py b/synapse/notifier.py index 51c830c91e..2e993411b9 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -329,6 +329,22 @@ class Notifier: except Exception: logger.exception("Error notifying application services of event") + async def _notify_app_services_ephemeral( + self, + stream_key: str, + new_token: Union[int, RoomStreamToken], + users: Collection[UserID] = [], + ): + try: + stream_token = None + if isinstance(new_token, int): + stream_token = new_token + await self.appservice_handler.notify_interested_services_ephemeral( + stream_key, stream_token, users + ) + except Exception: + logger.exception("Error notifying application services of event") + async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): try: await self._pusher_pool.on_new_notifications(max_room_stream_token) @@ -367,6 +383,15 @@ class Notifier: self.notify_replication() + # Notify appservices + run_as_background_process( + "_notify_app_services_ephemeral", + self._notify_app_services_ephemeral, + stream_key, + new_token, + users, + ) + def on_new_replication_data(self) -> None: """Used to inform replication listeners that something has happend without waking up any of the normal user event streams""" diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 85f6b1e3fd..43bf0f649a 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -15,12 +15,15 @@ # limitations under the License. import logging import re +from typing import List -from synapse.appservice import AppServiceTransaction +from synapse.appservice import ApplicationService, AppServiceTransaction from synapse.config.appservice import load_appservices +from synapse.events import EventBase from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool from synapse.storage.databases.main.events_worker import EventsWorkerStore +from synapse.types import JsonDict from synapse.util import json_encoder logger = logging.getLogger(__name__) @@ -172,15 +175,23 @@ class ApplicationServiceTransactionWorkerStore( "application_services_state", {"as_id": service.id}, {"state": state} ) - async def create_appservice_txn(self, service, events): + async def create_appservice_txn( + self, + service: ApplicationService, + events: List[EventBase], + ephemeral: List[JsonDict], + ) -> AppServiceTransaction: """Atomically creates a new transaction for this application service - with the given list of events. + with the given list of events. Ephemeral events are NOT persisted to the + database and are not resent if a transaction is retried. Args: - service(ApplicationService): The service who the transaction is for. - events(list): A list of events to put in the transaction. + service: The service who the transaction is for. + events: A list of persistent events to put in the transaction. + ephemeral: A list of ephemeral events to put in the transaction. + Returns: - AppServiceTransaction: A new transaction. + A new transaction. """ def _create_appservice_txn(txn): @@ -207,7 +218,9 @@ class ApplicationServiceTransactionWorkerStore( "VALUES(?,?,?)", (service.id, new_txn_id, event_ids), ) - return AppServiceTransaction(service=service, id=new_txn_id, events=events) + return AppServiceTransaction( + service=service, id=new_txn_id, events=events, ephemeral=ephemeral + ) return await self.db_pool.runInteraction( "create_appservice_txn", _create_appservice_txn @@ -296,7 +309,9 @@ class ApplicationServiceTransactionWorkerStore( events = await self.get_events_as_list(event_ids) - return AppServiceTransaction(service=service, id=entry["txn_id"], events=events) + return AppServiceTransaction( + service=service, id=entry["txn_id"], events=events, ephemeral=[] + ) def _get_last_txn(self, txn, service_id): txn.execute( @@ -320,7 +335,7 @@ class ApplicationServiceTransactionWorkerStore( ) async def get_new_events_for_appservice(self, current_id, limit): - """Get all new evnets""" + """Get all new events for an appservice""" def get_new_events_for_appservice_txn(txn): sql = ( @@ -351,6 +366,39 @@ class ApplicationServiceTransactionWorkerStore( return upper_bound, events + async def get_type_stream_id_for_appservice( + self, service: ApplicationService, type: str + ) -> int: + def get_type_stream_id_for_appservice_txn(txn): + stream_id_type = "%s_stream_id" % type + txn.execute( + "SELECT ? FROM application_services_state WHERE as_id=?", + (stream_id_type, service.id,), + ) + last_txn_id = txn.fetchone() + if last_txn_id is None or last_txn_id[0] is None: # no row exists + return 0 + else: + return int(last_txn_id[0]) + + return await self.db_pool.runInteraction( + "get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn + ) + + async def set_type_stream_id_for_appservice( + self, service: ApplicationService, type: str, pos: int + ) -> None: + def set_type_stream_id_for_appservice_txn(txn): + stream_id_type = "%s_stream_id" % type + txn.execute( + "UPDATE ? SET device_list_stream_id = ? WHERE as_id=?", + (stream_id_type, pos, service.id), + ) + + await self.db_pool.runInteraction( + "set_type_stream_id_for_appservice", set_type_stream_id_for_appservice_txn + ) + class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore): # This is currently empty due to there not being any AS storage functions diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index c79ddff680..5cdf16521c 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -23,6 +23,7 @@ from twisted.internet import defer from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool from synapse.storage.util.id_generators import StreamIdGenerator +from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedList @@ -274,6 +275,60 @@ class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta): } return results + @cached(num_args=2,) + async def get_linearized_receipts_for_all_rooms( + self, to_key: int, from_key: Optional[int] = None + ) -> Dict[str, JsonDict]: + """Get receipts for all rooms between two stream_ids. + + Args: + to_key: Max stream id to fetch receipts upto. + from_key: Min stream id to fetch receipts from. None fetches + from the start. + + Returns: + A dictionary of roomids to a list of receipts. + """ + + def f(txn): + if from_key: + sql = """ + SELECT * FROM receipts_linearized WHERE + stream_id > ? AND stream_id <= ? + """ + txn.execute(sql, [from_key, to_key]) + else: + sql = """ + SELECT * FROM receipts_linearized WHERE + stream_id <= ? + """ + + txn.execute(sql, [to_key]) + + return self.db_pool.cursor_to_dict(txn) + + txn_results = await self.db_pool.runInteraction( + "get_linearized_receipts_for_all_rooms", f + ) + + results = {} + for row in txn_results: + # We want a single event per room, since we want to batch the + # receipts by room, event and type. + room_event = results.setdefault( + row["room_id"], + {"type": "m.receipt", "room_id": row["room_id"], "content": {}}, + ) + + # The content is of the form: + # {"$foo:bar": { "read": { "@user:host": }, .. }, .. } + event_entry = room_event["content"].setdefault(row["event_id"], {}) + receipt_type = event_entry.setdefault(row["receipt_type"], {}) + + receipt_type[row["user_id"]] = db_to_json(row["data"]) + + return results + async def get_users_sent_receipts_between( self, last_id: int, current_id: int ) -> List[str]: diff --git a/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql b/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql new file mode 100644 index 0000000000..20f5a95a24 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql @@ -0,0 +1,18 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE application_services_state + ADD COLUMN read_receipt_stream_id INT, + ADD COLUMN presence_stream_id INT; \ No newline at end of file diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 68a4caabbf..2acb8b7603 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -60,7 +60,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events))) self.store.create_appservice_txn.assert_called_once_with( - service=service, events=events # txn made and saved + service=service, events=events, ephemeral=[] # txn made and saved ) self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made txn.complete.assert_called_once_with(self.store) # txn completed @@ -81,7 +81,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events))) self.store.create_appservice_txn.assert_called_once_with( - service=service, events=events # txn made and saved + service=service, events=events, ephemeral=[] # txn made and saved ) self.assertEquals(0, txn.send.call_count) # txn not sent though self.assertEquals(0, txn.complete.call_count) # or completed @@ -106,7 +106,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events))) self.store.create_appservice_txn.assert_called_once_with( - service=service, events=events + service=service, events=events, ephemeral=[] ) self.assertEquals(1, self.recoverer_fn.call_count) # recoverer made self.assertEquals(1, self.recoverer.recover.call_count) # and invoked @@ -202,26 +202,28 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): # Expect the event to be sent immediately. service = Mock(id=4) event = Mock() - self.queuer.enqueue(service, event) - self.txn_ctrl.send.assert_called_once_with(service, [event]) + self.queuer.enqueue_event(service, event) + self.txn_ctrl.send.assert_called_once_with(service, [event], []) def test_send_single_event_with_queue(self): d = defer.Deferred() - self.txn_ctrl.send = Mock(side_effect=lambda x, y: make_deferred_yieldable(d)) + self.txn_ctrl.send = Mock( + side_effect=lambda x, y, z: make_deferred_yieldable(d) + ) service = Mock(id=4) event = Mock(event_id="first") event2 = Mock(event_id="second") event3 = Mock(event_id="third") # Send an event and don't resolve it just yet. - self.queuer.enqueue(service, event) + self.queuer.enqueue_event(service, event) # Send more events: expect send() to NOT be called multiple times. - self.queuer.enqueue(service, event2) - self.queuer.enqueue(service, event3) - self.txn_ctrl.send.assert_called_with(service, [event]) + self.queuer.enqueue_event(service, event2) + self.queuer.enqueue_event(service, event3) + self.txn_ctrl.send.assert_called_with(service, [event], []) self.assertEquals(1, self.txn_ctrl.send.call_count) # Resolve the send event: expect the queued events to be sent d.callback(service) - self.txn_ctrl.send.assert_called_with(service, [event2, event3]) + self.txn_ctrl.send.assert_called_with(service, [event2, event3], []) self.assertEquals(2, self.txn_ctrl.send.call_count) def test_multiple_service_queues(self): @@ -239,21 +241,58 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): send_return_list = [srv_1_defer, srv_2_defer] - def do_send(x, y): + def do_send(x, y, z): return make_deferred_yieldable(send_return_list.pop(0)) self.txn_ctrl.send = Mock(side_effect=do_send) # send events for different ASes and make sure they are sent - self.queuer.enqueue(srv1, srv_1_event) - self.queuer.enqueue(srv1, srv_1_event2) - self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event]) - self.queuer.enqueue(srv2, srv_2_event) - self.queuer.enqueue(srv2, srv_2_event2) - self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event]) + self.queuer.enqueue_event(srv1, srv_1_event) + self.queuer.enqueue_event(srv1, srv_1_event2) + self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event], []) + self.queuer.enqueue_event(srv2, srv_2_event) + self.queuer.enqueue_event(srv2, srv_2_event2) + self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event], []) # make sure callbacks for a service only send queued events for THAT # service srv_2_defer.callback(srv2) - self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2]) + self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2], []) self.assertEquals(3, self.txn_ctrl.send.call_count) + + def test_send_single_ephemeral_no_queue(self): + # Expect the event to be sent immediately. + service = Mock(id=4, name="service") + event_list = [Mock(name="event")] + self.queuer.enqueue_ephemeral(service, event_list) + self.txn_ctrl.send.assert_called_once_with(service, [], event_list) + + def test_send_multiple_ephemeral_no_queue(self): + # Expect the event to be sent immediately. + service = Mock(id=4, name="service") + event_list = [Mock(name="event1"), Mock(name="event2"), Mock(name="event3")] + self.queuer.enqueue_ephemeral(service, event_list) + self.txn_ctrl.send.assert_called_once_with(service, [], event_list) + + def test_send_single_ephemeral_with_queue(self): + d = defer.Deferred() + self.txn_ctrl.send = Mock( + side_effect=lambda x, y, z: make_deferred_yieldable(d) + ) + service = Mock(id=4) + event_list_1 = [Mock(event_id="event1"), Mock(event_id="event2")] + event_list_2 = [Mock(event_id="event3"), Mock(event_id="event4")] + event_list_3 = [Mock(event_id="event5"), Mock(event_id="event6")] + + # Send an event and don't resolve it just yet. + self.queuer.enqueue_ephemeral(service, event_list_1) + # Send more events: expect send() to NOT be called multiple times. + self.queuer.enqueue_ephemeral(service, event_list_2) + self.queuer.enqueue_ephemeral(service, event_list_3) + self.txn_ctrl.send.assert_called_with(service, [], event_list_1) + self.assertEquals(1, self.txn_ctrl.send.call_count) + # Resolve txn_ctrl.send + d.callback(service) + # Expect the queued events to be sent + self.txn_ctrl.send.assert_called_with(service, [], event_list_2 + event_list_3) + self.assertEquals(2, self.txn_ctrl.send.call_count) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index c905a38930..c5c7987349 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -244,7 +244,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): service = Mock(id=self.as_list[0]["id"]) events = [Mock(event_id="e1"), Mock(event_id="e2")] txn = yield defer.ensureDeferred( - self.store.create_appservice_txn(service, events) + self.store.create_appservice_txn(service, events, []) ) self.assertEquals(txn.id, 1) self.assertEquals(txn.events, events) @@ -258,7 +258,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): yield self._insert_txn(service.id, 9644, events) yield self._insert_txn(service.id, 9645, events) txn = yield defer.ensureDeferred( - self.store.create_appservice_txn(service, events) + self.store.create_appservice_txn(service, events, []) ) self.assertEquals(txn.id, 9646) self.assertEquals(txn.events, events) @@ -270,7 +270,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): events = [Mock(event_id="e1"), Mock(event_id="e2")] yield self._set_last_txn(service.id, 9643) txn = yield defer.ensureDeferred( - self.store.create_appservice_txn(service, events) + self.store.create_appservice_txn(service, events, []) ) self.assertEquals(txn.id, 9644) self.assertEquals(txn.events, events) @@ -293,7 +293,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): yield self._insert_txn(self.as_list[3]["id"], 9643, events) txn = yield defer.ensureDeferred( - self.store.create_appservice_txn(service, events) + self.store.create_appservice_txn(service, events, []) ) self.assertEquals(txn.id, 9644) self.assertEquals(txn.events, events) -- cgit 1.5.1 From 34a5696f9338f1a1ec52203e3871a797a02138a9 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 23 Oct 2020 12:38:40 -0400 Subject: Fix typos and spelling errors. (#8639) --- changelog.d/8639.misc | 1 + docs/sample_config.yaml | 6 +++--- docs/sample_log_config.yaml | 2 +- synapse/config/jwt_config.py | 2 +- synapse/config/logger.py | 2 +- synapse/config/registration.py | 2 +- synapse/config/room_directory.py | 2 +- synapse/config/tracer.py | 2 +- synapse/crypto/context_factory.py | 2 +- synapse/events/__init__.py | 2 +- synapse/events/utils.py | 2 +- synapse/groups/attestations.py | 2 +- synapse/groups/groups_server.py | 4 ++-- synapse/handlers/admin.py | 4 ++-- synapse/handlers/auth.py | 2 +- synapse/handlers/federation.py | 14 +++++++------- synapse/handlers/groups_local.py | 4 ++-- synapse/handlers/message.py | 2 +- synapse/handlers/oidc_handler.py | 6 +++--- synapse/handlers/presence.py | 4 ++-- synapse/handlers/profile.py | 2 +- synapse/handlers/room.py | 2 +- synapse/handlers/search.py | 2 +- synapse/handlers/state_deltas.py | 2 +- synapse/handlers/sync.py | 4 ++-- synapse/handlers/typing.py | 2 +- synapse/handlers/user_directory.py | 2 +- synapse/http/federation/well_known_resolver.py | 2 +- synapse/http/matrixfederationclient.py | 6 +++--- synapse/http/request_metrics.py | 2 +- synapse/http/server.py | 6 +++--- synapse/http/site.py | 4 +++- synapse/metrics/background_process_metrics.py | 2 +- synapse/notifier.py | 2 +- synapse/push/baserules.py | 2 +- synapse/push/bulk_push_rule_evaluator.py | 4 ++-- synapse/server_notices/consent_server_notices.py | 2 +- synapse/state/__init__.py | 2 +- synapse/state/v1.py | 2 +- synapse/state/v2.py | 2 +- synapse/static/client/login/js/login.js | 2 +- 41 files changed, 63 insertions(+), 60 deletions(-) create mode 100644 changelog.d/8639.misc (limited to 'synapse/notifier.py') diff --git a/changelog.d/8639.misc b/changelog.d/8639.misc new file mode 100644 index 0000000000..20a213df39 --- /dev/null +++ b/changelog.d/8639.misc @@ -0,0 +1 @@ +Fix typos and spelling errors in the code. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 061226ea6f..07f1628568 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -1886,7 +1886,7 @@ sso: # and issued at ("iat") claims are validated if present. # # Note that this is a non-standard login type and client support is -# expected to be non-existant. +# expected to be non-existent. # # See https://github.com/matrix-org/synapse/blob/master/docs/jwt.md. # @@ -2402,7 +2402,7 @@ spam_checker: # # Options for the rules include: # -# user_id: Matches agaisnt the creator of the alias +# user_id: Matches against the creator of the alias # room_id: Matches against the room ID being published # alias: Matches against any current local or canonical aliases # associated with the room @@ -2448,7 +2448,7 @@ opentracing: # This is a list of regexes which are matched against the server_name of the # homeserver. # - # By defult, it is empty, so no servers are matched. + # By default, it is empty, so no servers are matched. # #homeserver_whitelist: # - ".*" diff --git a/docs/sample_log_config.yaml b/docs/sample_log_config.yaml index 55a48a9ed6..e26657f9fe 100644 --- a/docs/sample_log_config.yaml +++ b/docs/sample_log_config.yaml @@ -59,7 +59,7 @@ root: # then write them to a file. # # Replace "buffer" with "console" to log to stderr instead. (Note that you'll - # also need to update the configuation for the `twisted` logger above, in + # also need to update the configuration for the `twisted` logger above, in # this case.) # handlers: [buffer] diff --git a/synapse/config/jwt_config.py b/synapse/config/jwt_config.py index 3252ad9e7f..f30330abb6 100644 --- a/synapse/config/jwt_config.py +++ b/synapse/config/jwt_config.py @@ -63,7 +63,7 @@ class JWTConfig(Config): # and issued at ("iat") claims are validated if present. # # Note that this is a non-standard login type and client support is - # expected to be non-existant. + # expected to be non-existent. # # See https://github.com/matrix-org/synapse/blob/master/docs/jwt.md. # diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 13d6f6a3ea..6b7be28aee 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -105,7 +105,7 @@ root: # then write them to a file. # # Replace "buffer" with "console" to log to stderr instead. (Note that you'll - # also need to update the configuation for the `twisted` logger above, in + # also need to update the configuration for the `twisted` logger above, in # this case.) # handlers: [buffer] diff --git a/synapse/config/registration.py b/synapse/config/registration.py index d7e3690a32..b0a77a2e43 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -143,7 +143,7 @@ class RegistrationConfig(Config): RoomCreationPreset.TRUSTED_PRIVATE_CHAT, } - # Pull the creater/inviter from the configuration, this gets used to + # Pull the creator/inviter from the configuration, this gets used to # send invites for invite-only rooms. mxid_localpart = config.get("auto_join_mxid_localpart") self.auto_join_user_id = None diff --git a/synapse/config/room_directory.py b/synapse/config/room_directory.py index 6de1f9d103..92e1b67528 100644 --- a/synapse/config/room_directory.py +++ b/synapse/config/room_directory.py @@ -99,7 +99,7 @@ class RoomDirectoryConfig(Config): # # Options for the rules include: # - # user_id: Matches agaisnt the creator of the alias + # user_id: Matches against the creator of the alias # room_id: Matches against the room ID being published # alias: Matches against any current local or canonical aliases # associated with the room diff --git a/synapse/config/tracer.py b/synapse/config/tracer.py index 8be1346113..0c1a854f09 100644 --- a/synapse/config/tracer.py +++ b/synapse/config/tracer.py @@ -67,7 +67,7 @@ class TracerConfig(Config): # This is a list of regexes which are matched against the server_name of the # homeserver. # - # By defult, it is empty, so no servers are matched. + # By default, it is empty, so no servers are matched. # #homeserver_whitelist: # - ".*" diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py index 79668a402e..57fd426e87 100644 --- a/synapse/crypto/context_factory.py +++ b/synapse/crypto/context_factory.py @@ -149,7 +149,7 @@ class FederationPolicyForHTTPS: return SSLClientConnectionCreator(host, ssl_context, should_verify) def creatorForNetloc(self, hostname, port): - """Implements the IPolicyForHTTPS interace so that this can be passed + """Implements the IPolicyForHTTPS interface so that this can be passed directly to agents. """ return self.get_options(hostname) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 65df62107f..e203206865 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -59,7 +59,7 @@ class DictProperty: # # To exclude the KeyError from the traceback, we explicitly # 'raise from e1.__context__' (which is better than 'raise from None', - # becuase that would omit any *earlier* exceptions). + # because that would omit any *earlier* exceptions). # raise AttributeError( "'%s' has no '%s' property" % (type(instance), self.key) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 355cbe05f1..14f7f1156f 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -180,7 +180,7 @@ def only_fields(dictionary, fields): in 'fields'. If there are no event fields specified then all fields are included. - The entries may include '.' charaters to indicate sub-fields. + The entries may include '.' characters to indicate sub-fields. So ['content.body'] will include the 'body' field of the 'content' object. A literal '.' character in a field name may be escaped using a '\'. diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index a86b3debc5..41cf07cc88 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -22,7 +22,7 @@ attestations have a validity period so need to be periodically renewed. If a user leaves (or gets kicked out of) a group, either side can still use their attestation to "prove" their membership, until the attestation expires. Therefore attestations shouldn't be relied on to prove membership in important -cases, but can for less important situtations, e.g. showing a users membership +cases, but can for less important situations, e.g. showing a users membership of groups on their profile, showing flairs, etc. An attestation is a signed blob of json that looks like: diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index e5f85b472d..0d042cbfac 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -113,7 +113,7 @@ class GroupsServerWorkerHandler: entry = await self.room_list_handler.generate_room_entry( room_id, len(joined_users), with_alias=False, allow_private=True ) - entry = dict(entry) # so we don't change whats cached + entry = dict(entry) # so we don't change what's cached entry.pop("room_id", None) room_entry["profile"] = entry @@ -550,7 +550,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler): group_id, room_id, is_public=is_public ) else: - raise SynapseError(400, "Uknown config option") + raise SynapseError(400, "Unknown config option") return {} diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 1ce2091b46..a703944543 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -88,7 +88,7 @@ class AdminHandler(BaseHandler): # We only try and fetch events for rooms the user has been in. If # they've been e.g. invited to a room without joining then we handle - # those seperately. + # those separately. rooms_user_has_been_in = await self.store.get_rooms_user_has_been_in(user_id) for index, room in enumerate(rooms): @@ -226,7 +226,7 @@ class ExfiltrationWriter: """ def finished(self): - """Called when all data has succesfully been exported and written. + """Called when all data has successfully been exported and written. This functions return value is passed to the caller of `export_user_data`. diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 48d60feaab..dd14ab69d7 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -690,7 +690,7 @@ class AuthHandler(BaseHandler): Creates a new access token for the user with the given user ID. The user is assumed to have been authenticated by some other - machanism (e.g. CAS), and the user_id converted to the canonical case. + mechanism (e.g. CAS), and the user_id converted to the canonical case. The device will be recorded in the table if it is not there already. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fde8f00531..c386957706 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -112,7 +112,7 @@ class FederationHandler(BaseHandler): """Handles events that originated from federation. Responsible for: a) handling received Pdus before handing them on as Events to the rest - of the homeserver (including auth and state conflict resoultion) + of the homeserver (including auth and state conflict resolutions) b) converting events that were produced by local clients that may need to be sent to remote homeservers. c) doing the necessary dances to invite remote users and join remote @@ -477,7 +477,7 @@ class FederationHandler(BaseHandler): # ---- # # Update richvdh 2018/09/18: There are a number of problems with timing this - # request out agressively on the client side: + # request out aggressively on the client side: # # - it plays badly with the server-side rate-limiter, which starts tarpitting you # if you send too many requests at once, so you end up with the server carefully @@ -495,13 +495,13 @@ class FederationHandler(BaseHandler): # we'll end up back here for the *next* PDU in the list, which exacerbates the # problem. # - # - the agressive 10s timeout was introduced to deal with incoming federation + # - the aggressive 10s timeout was introduced to deal with incoming federation # requests taking 8 hours to process. It's not entirely clear why that was going # on; certainly there were other issues causing traffic storms which are now # resolved, and I think in any case we may be more sensible about our locking # now. We're *certainly* more sensible about our logging. # - # All that said: Let's try increasing the timout to 60s and see what happens. + # All that said: Let's try increasing the timeout to 60s and see what happens. try: missing_events = await self.federation_client.get_missing_events( @@ -1120,7 +1120,7 @@ class FederationHandler(BaseHandler): logger.info(str(e)) continue except RequestSendFailed as e: - logger.info("Falied to get backfill from %s because %s", dom, e) + logger.info("Failed to get backfill from %s because %s", dom, e) continue except FederationDeniedError as e: logger.info(e) @@ -1545,7 +1545,7 @@ class FederationHandler(BaseHandler): # # The reasons we have the destination server rather than the origin # server send it are slightly mysterious: the origin server should have - # all the neccessary state once it gets the response to the send_join, + # all the necessary state once it gets the response to the send_join, # so it could send the event itself if it wanted to. It may be that # doing it this way reduces failure modes, or avoids certain attacks # where a new server selectively tells a subset of the federation that @@ -1649,7 +1649,7 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = True event.internal_metadata.out_of_band_membership = True - # Try the host that we succesfully called /make_leave/ on first for + # Try the host that we successfully called /make_leave/ on first for # the /send_leave/ request. host_list = list(target_hosts) try: diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index b2def93bb1..abd8d2af44 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -349,7 +349,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler): server_name=get_domain_from_id(group_id), ) - # TODO: Check that the group is public and we're being added publically + # TODO: Check that the group is public and we're being added publicly is_publicised = content.get("publicise", False) token = await self.store.register_user_group_membership( @@ -394,7 +394,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler): server_name=get_domain_from_id(group_id), ) - # TODO: Check that the group is public and we're being added publically + # TODO: Check that the group is public and we're being added publicly is_publicised = content.get("publicise", False) token = await self.store.register_user_group_membership( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index d6855c60ea..f1b4d35182 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -657,7 +657,7 @@ class EventCreationHandler: context: The event context. Returns: - The previous verion of the event is returned, if it is found in the + The previous version of the event is returned, if it is found in the event context. Otherwise, None is returned. """ prev_state_ids = await context.get_prev_state_ids() diff --git a/synapse/handlers/oidc_handler.py b/synapse/handlers/oidc_handler.py index a312610635..331d4e7e96 100644 --- a/synapse/handlers/oidc_handler.py +++ b/synapse/handlers/oidc_handler.py @@ -217,7 +217,7 @@ class OidcHandler: This is based on the requested scopes: if the scopes include ``openid``, the provider should give use an ID token containing the - user informations. If not, we should fetch them using the + user information. If not, we should fetch them using the ``access_token`` with the ``userinfo_endpoint``. """ @@ -426,7 +426,7 @@ class OidcHandler: return resp async def _fetch_userinfo(self, token: Token) -> UserInfo: - """Fetch user informations from the ``userinfo_endpoint``. + """Fetch user information from the ``userinfo_endpoint``. Args: token: the token given by the ``token_endpoint``. @@ -754,7 +754,7 @@ class OidcHandler: Defaults to an hour. Returns: - A signed macaroon token with the session informations. + A signed macaroon token with the session information. """ macaroon = pymacaroons.Macaroon( location=self._server_name, identifier="key", key=self._macaroon_secret_key, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1000ac95ff..49a00eed9c 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -802,7 +802,7 @@ class PresenceHandler(BasePresenceHandler): between the requested tokens due to the limit. The token returned can be used in a subsequent call to this - function to get further updatees. + function to get further updates. The updates are a list of 2-tuples of stream ID and the row data """ @@ -977,7 +977,7 @@ def should_notify(old_state, new_state): new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY ): - # Only notify about last active bumps if we're not currently acive + # Only notify about last active bumps if we're not currently active if not new_state.currently_active: notify_reason_counter.labels("last_active_change_online").inc() return True diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 92700b589c..da5692e03e 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -102,7 +102,7 @@ class ProfileHandler(BaseHandler): async def get_profile_from_cache(self, user_id: str) -> JsonDict: """Get the profile information from our local cache. If the user is - ours then the profile information will always be corect. Otherwise, + ours then the profile information will always be correct. Otherwise, it may be out of date/missing. """ target_user = UserID.from_string(user_id) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ec300d8877..c5b1f1f1e1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1268,7 +1268,7 @@ class RoomShutdownHandler: ) # We now wait for the create room to come back in via replication so - # that we can assume that all the joins/invites have propogated before + # that we can assume that all the joins/invites have propagated before # we try and auto join below. await self._replication.wait_for_stream_position( self.hs.config.worker.events_shard_config.get_instance(new_room_id), diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index e9402e6e2e..66f1bbcfc4 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -139,7 +139,7 @@ class SearchHandler(BaseHandler): # Filter to apply to results filter_dict = room_cat.get("filter", {}) - # What to order results by (impacts whether pagination can be doen) + # What to order results by (impacts whether pagination can be done) order_by = room_cat.get("order_by", "rank") # Return the current state of the rooms? diff --git a/synapse/handlers/state_deltas.py b/synapse/handlers/state_deltas.py index 7a4ae0727a..fb4f70e8e2 100644 --- a/synapse/handlers/state_deltas.py +++ b/synapse/handlers/state_deltas.py @@ -32,7 +32,7 @@ class StateDeltasHandler: Returns: None if the field in the events either both match `public_value` or if neither do, i.e. there has been no change. - True if it didnt match `public_value` but now does + True if it didn't match `public_value` but now does False if it did match `public_value` but now doesn't """ prev_event = None diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b527724bc4..32e53c2d25 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -754,7 +754,7 @@ class SyncHandler: """ # TODO(mjark) Check if the state events were received by the server # after the previous sync, since we need to include those state - # updates even if they occured logically before the previous event. + # updates even if they occurred logically before the previous event. # TODO(mjark) Check for new redactions in the state events. with Measure(self.clock, "compute_state_delta"): @@ -1882,7 +1882,7 @@ class SyncHandler: # members (as the client otherwise doesn't have enough info to form # the name itself). if sync_config.filter_collection.lazy_load_members() and ( - # we recalulate the summary: + # we recalculate the summary: # if there are membership changes in the timeline, or # if membership has changed during a gappy sync, or # if this is an initial sync. diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index d3692842e3..8758066c74 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -371,7 +371,7 @@ class TypingWriterHandler(FollowerTypingHandler): between the requested tokens due to the limit. The token returned can be used in a subsequent call to this - function to get further updatees. + function to get further updates. The updates are a list of 2-tuples of stream ID and the row data """ diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 79393c8829..afbebfc200 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -31,7 +31,7 @@ class UserDirectoryHandler(StateDeltasHandler): N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY The user directory is filled with users who this server can see are joined to a - world_readable or publically joinable room. We keep a database table up to date + world_readable or publicly joinable room. We keep a database table up to date by streaming changes of the current state and recalculating whether users should be in the directory or not when necessary. """ diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py index a306faa267..1cc666fbf6 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py @@ -172,7 +172,7 @@ class WellKnownResolver: had_valid_well_known = self._had_valid_well_known_cache.get(server_name, False) # We do this in two steps to differentiate between possibly transient - # errors (e.g. can't connect to host, 503 response) and more permenant + # errors (e.g. can't connect to host, 503 response) and more permanent # errors (such as getting a 404 response). response, body = await self._make_well_known_request( server_name, retry=had_valid_well_known diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index c23a4d7c0c..04766ca965 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -587,7 +587,7 @@ class MatrixFederationHttpClient: """ Builds the Authorization headers for a federation request Args: - destination (bytes|None): The desination homeserver of the request. + destination (bytes|None): The destination homeserver of the request. May be None if the destination is an identity server, in which case destination_is must be non-None. method (bytes): The HTTP method of the request @@ -640,7 +640,7 @@ class MatrixFederationHttpClient: backoff_on_404=False, try_trailing_slash_on_400=False, ): - """ Sends the specifed json data using PUT + """ Sends the specified json data using PUT Args: destination (str): The remote server to send the HTTP request @@ -729,7 +729,7 @@ class MatrixFederationHttpClient: ignore_backoff=False, args={}, ): - """ Sends the specifed json data using POST + """ Sends the specified json data using POST Args: destination (str): The remote server to send the HTTP request diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index cd94e789e8..7c5defec82 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -109,7 +109,7 @@ in_flight_requests_db_sched_duration = Counter( # The set of all in flight requests, set[RequestMetrics] _in_flight_requests = set() -# Protects the _in_flight_requests set from concurrent accesss +# Protects the _in_flight_requests set from concurrent access _in_flight_requests_lock = threading.Lock() diff --git a/synapse/http/server.py b/synapse/http/server.py index 00b98af3d4..65dbd339ac 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -182,7 +182,7 @@ class HttpServer: """ Register a callback that gets fired if we receive a http request with the given method for a path that matches the given regex. - If the regex contains groups these gets passed to the calback via + If the regex contains groups these gets passed to the callback via an unpacked tuple. Args: @@ -241,7 +241,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta): async def _async_render(self, request: Request): """Delegates to `_async_render_` methods, or returns a 400 if - no appropriate method exists. Can be overriden in sub classes for + no appropriate method exists. Can be overridden in sub classes for different routing. """ # Treat HEAD requests as GET requests. @@ -386,7 +386,7 @@ class JsonResource(DirectServeJsonResource): async def _async_render(self, request): callback, servlet_classname, group_dict = self._get_handler_for_request(request) - # Make sure we have an appopriate name for this handler in prometheus + # Make sure we have an appropriate name for this handler in prometheus # (rather than the default of JsonResource). request.request_metrics.name = servlet_classname diff --git a/synapse/http/site.py b/synapse/http/site.py index ca673028e4..ddb1770b09 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -167,7 +167,9 @@ class SynapseRequest(Request): yield except Exception: # this should already have been caught, and sent back to the client as a 500. - logger.exception("Asynchronous messge handler raised an uncaught exception") + logger.exception( + "Asynchronous message handler raised an uncaught exception" + ) finally: # the request handler has finished its work and either sent the whole response # back, or handed over responsibility to a Producer. diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index ea5f1c7b62..08fbf78eee 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -266,7 +266,7 @@ class BackgroundProcessLoggingContext(LoggingContext): super().__exit__(type, value, traceback) - # The background process has finished. We explictly remove and manually + # The background process has finished. We explicitly remove and manually # update the metrics here so that if nothing is scraping metrics the set # doesn't infinitely grow. with _bg_metrics_lock: diff --git a/synapse/notifier.py b/synapse/notifier.py index 2e993411b9..858b487bec 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -393,7 +393,7 @@ class Notifier: ) def on_new_replication_data(self) -> None: - """Used to inform replication listeners that something has happend + """Used to inform replication listeners that something has happened without waking up any of the normal user event streams""" self.notify_replication() diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py index 8047873ff1..2858b61fb1 100644 --- a/synapse/push/baserules.py +++ b/synapse/push/baserules.py @@ -37,7 +37,7 @@ def list_with_base_rules(rawrules, use_new_defaults=False): modified_base_rules = {r["rule_id"]: r for r in rawrules if r["priority_class"] < 0} # Remove the modified base rules from the list, They'll be added back - # in the default postions in the list. + # in the default positions in the list. rawrules = [r for r in rawrules if r["priority_class"] >= 0] # shove the server default rules for each kind onto the end of each diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index a701defcdd..d9b5478b53 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -390,12 +390,12 @@ class RulesForRoom: continue # If a user has left a room we remove their push rule. If they - # joined then we readd it later in _update_rules_with_member_event_ids + # joined then we re-add it later in _update_rules_with_member_event_ids ret_rules_by_user.pop(user_id, None) missing_member_event_ids[user_id] = event_id if missing_member_event_ids: - # If we have some memebr events we haven't seen, look them up + # If we have some member events we haven't seen, look them up # and fetch push rules for them if appropriate. logger.debug("Found new member events %r", missing_member_event_ids) await self._update_rules_with_member_event_ids( diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py index 3673e7f47e..9137c4edb1 100644 --- a/synapse/server_notices/consent_server_notices.py +++ b/synapse/server_notices/consent_server_notices.py @@ -104,7 +104,7 @@ class ConsentServerNotices: def copy_with_str_subst(x: Any, substitutions: Any) -> Any: - """Deep-copy a structure, carrying out string substitions on any strings + """Deep-copy a structure, carrying out string substitutions on any strings Args: x (object): structure to be copied diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 5b0900aa3c..1fa3b280b4 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -547,7 +547,7 @@ class StateResolutionHandler: event_map: a dict from event_id to event, for any events that we happen to have in flight (eg, those currently being persisted). This will be - used as a starting point fof finding the state we need; any missing + used as a starting point for finding the state we need; any missing events will be requested via state_res_store. If None, all events will be fetched via state_res_store. diff --git a/synapse/state/v1.py b/synapse/state/v1.py index a493279cbd..85edae053d 100644 --- a/synapse/state/v1.py +++ b/synapse/state/v1.py @@ -56,7 +56,7 @@ async def resolve_events_with_store( event_map: a dict from event_id to event, for any events that we happen to have in flight (eg, those currently being persisted). This will be - used as a starting point fof finding the state we need; any missing + used as a starting point for finding the state we need; any missing events will be requested via state_map_factory. If None, all events will be fetched via state_map_factory. diff --git a/synapse/state/v2.py b/synapse/state/v2.py index edf94e7ad6..f57df0d728 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -69,7 +69,7 @@ async def resolve_events_with_store( event_map: a dict from event_id to event, for any events that we happen to have in flight (eg, those currently being persisted). This will be - used as a starting point fof finding the state we need; any missing + used as a starting point for finding the state we need; any missing events will be requested via state_res_store. If None, all events will be fetched via state_res_store. diff --git a/synapse/static/client/login/js/login.js b/synapse/static/client/login/js/login.js index 3678670ec7..744800ec77 100644 --- a/synapse/static/client/login/js/login.js +++ b/synapse/static/client/login/js/login.js @@ -182,7 +182,7 @@ matrixLogin.passwordLogin = function() { }; /* - * The onLogin function gets called after a succesful login. + * The onLogin function gets called after a successful login. * * It is expected that implementations override this to be notified when the * login is complete. The response to the login call is provided as the single -- cgit 1.5.1 From 2b7c180879e5d62145feed88375ba55f18fc2ae5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Oct 2020 09:30:19 +0000 Subject: Start fewer opentracing spans (#8640) #8567 started a span for every background process. This is good as it means all Synapse code that gets run should be in a span (unless in the sentinel logging context), but it means we generate about 15x the number of spans as we did previously. This PR attempts to reduce that number by a) not starting one for send commands to Redis, and b) deferring starting background processes until after we're sure they're necessary. I don't really know how much this will help. --- changelog.d/8640.misc | 1 + synapse/handlers/appservice.py | 50 +++++++++++++++++++++++---- synapse/logging/opentracing.py | 10 +++--- synapse/metrics/background_process_metrics.py | 12 +++++-- synapse/notifier.py | 34 ++++++------------ synapse/push/pusherpool.py | 18 ++++++++-- synapse/replication/tcp/redis.py | 4 ++- tests/handlers/test_appservice.py | 20 +++++------ 8 files changed, 96 insertions(+), 53 deletions(-) create mode 100644 changelog.d/8640.misc (limited to 'synapse/notifier.py') diff --git a/changelog.d/8640.misc b/changelog.d/8640.misc new file mode 100644 index 0000000000..cf6023f783 --- /dev/null +++ b/changelog.d/8640.misc @@ -0,0 +1 @@ +Reduce number of OpenTracing spans started. diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 07240d3a14..7826387e53 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Union from prometheus_client import Counter @@ -30,7 +30,10 @@ from synapse.metrics import ( event_processing_loop_counter, event_processing_loop_room_count, ) -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import ( + run_as_background_process, + wrap_as_background_process, +) from synapse.types import Collection, JsonDict, RoomStreamToken, UserID from synapse.util.metrics import Measure @@ -53,7 +56,7 @@ class ApplicationServicesHandler: self.current_max = 0 self.is_processing = False - async def notify_interested_services(self, max_token: RoomStreamToken): + def notify_interested_services(self, max_token: RoomStreamToken): """Notifies (pushes) all application services interested in this event. Pushing is done asynchronously, so this method won't block for any @@ -72,6 +75,12 @@ class ApplicationServicesHandler: if self.is_processing: return + # We only start a new background process if necessary rather than + # optimistically (to cut down on overhead). + self._notify_interested_services(max_token) + + @wrap_as_background_process("notify_interested_services") + async def _notify_interested_services(self, max_token: RoomStreamToken): with Measure(self.clock, "notify_interested_services"): self.is_processing = True try: @@ -166,8 +175,11 @@ class ApplicationServicesHandler: finally: self.is_processing = False - async def notify_interested_services_ephemeral( - self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [], + def notify_interested_services_ephemeral( + self, + stream_key: str, + new_token: Optional[int], + users: Collection[Union[str, UserID]] = [], ): """This is called by the notifier in the background when a ephemeral event handled by the homeserver. @@ -183,13 +195,34 @@ class ApplicationServicesHandler: new_token: The latest stream token users: The user(s) involved with the event. """ + if not self.notify_appservices: + return + + if stream_key not in ("typing_key", "receipt_key", "presence_key"): + return + services = [ service for service in self.store.get_app_services() if service.supports_ephemeral ] - if not services or not self.notify_appservices: + if not services: return + + # We only start a new background process if necessary rather than + # optimistically (to cut down on overhead). + self._notify_interested_services_ephemeral( + services, stream_key, new_token, users + ) + + @wrap_as_background_process("notify_interested_services_ephemeral") + async def _notify_interested_services_ephemeral( + self, + services: List[ApplicationService], + stream_key: str, + new_token: Optional[int], + users: Collection[Union[str, UserID]], + ): logger.info("Checking interested services for %s" % (stream_key)) with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: @@ -237,7 +270,7 @@ class ApplicationServicesHandler: return receipts async def _handle_presence( - self, service: ApplicationService, users: Collection[UserID] + self, service: ApplicationService, users: Collection[Union[str, UserID]] ): events = [] # type: List[JsonDict] presence_source = self.event_sources.sources["presence"] @@ -245,6 +278,9 @@ class ApplicationServicesHandler: service, "presence" ) for user in users: + if isinstance(user, str): + user = UserID.from_string(user) + interested = await service.is_interested_in_presence(user, self.store) if not interested: continue diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index e58850faff..ab586c318c 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -317,7 +317,7 @@ def ensure_active_span(message, ret=None): @contextlib.contextmanager -def _noop_context_manager(*args, **kwargs): +def noop_context_manager(*args, **kwargs): """Does exactly what it says on the tin""" yield @@ -413,7 +413,7 @@ def start_active_span( """ if opentracing is None: - return _noop_context_manager() + return noop_context_manager() return opentracing.tracer.start_active_span( operation_name, @@ -428,7 +428,7 @@ def start_active_span( def start_active_span_follows_from(operation_name, contexts): if opentracing is None: - return _noop_context_manager() + return noop_context_manager() references = [opentracing.follows_from(context) for context in contexts] scope = start_active_span(operation_name, references=references) @@ -459,7 +459,7 @@ def start_active_span_from_request( # Also, twisted uses byte arrays while opentracing expects strings. if opentracing is None: - return _noop_context_manager() + return noop_context_manager() header_dict = { k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders() @@ -497,7 +497,7 @@ def start_active_span_from_edu( """ if opentracing is None: - return _noop_context_manager() + return noop_context_manager() carrier = json_decoder.decode(edu_content.get("context", "{}")).get( "opentracing", {} diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 08fbf78eee..658f6ecd72 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -24,7 +24,7 @@ from prometheus_client.core import REGISTRY, Counter, Gauge from twisted.internet import defer from synapse.logging.context import LoggingContext, PreserveLoggingContext -from synapse.logging.opentracing import start_active_span +from synapse.logging.opentracing import noop_context_manager, start_active_span if TYPE_CHECKING: import resource @@ -167,7 +167,7 @@ class _BackgroundProcess: ) -def run_as_background_process(desc: str, func, *args, **kwargs): +def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwargs): """Run the given function in its own logcontext, with resource metrics This should be used to wrap processes which are fired off to run in the @@ -181,6 +181,9 @@ def run_as_background_process(desc: str, func, *args, **kwargs): Args: desc: a description for this background process type func: a function, which may return a Deferred or a coroutine + bg_start_span: Whether to start an opentracing span. Defaults to True. + Should only be disabled for processes that will not log to or tag + a span. args: positional args for func kwargs: keyword args for func @@ -199,7 +202,10 @@ def run_as_background_process(desc: str, func, *args, **kwargs): with BackgroundProcessLoggingContext(desc) as context: context.request = "%s-%i" % (desc, count) try: - with start_active_span(desc, tags={"request_id": context.request}): + ctx = noop_context_manager() + if bg_start_span: + ctx = start_active_span(desc, tags={"request_id": context.request}) + with ctx: result = func(*args, **kwargs) if inspect.isawaitable(result): diff --git a/synapse/notifier.py b/synapse/notifier.py index 858b487bec..eb56b26f21 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -40,7 +40,6 @@ from synapse.handlers.presence import format_user_presence_state from synapse.logging.context import PreserveLoggingContext from synapse.logging.utils import log_function from synapse.metrics import LaterGauge -from synapse.metrics.background_process_metrics import run_as_background_process from synapse.streams.config import PaginationConfig from synapse.types import ( Collection, @@ -310,44 +309,37 @@ class Notifier: """ # poke any interested application service. - run_as_background_process( - "_notify_app_services", self._notify_app_services, max_room_stream_token - ) - - run_as_background_process( - "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token - ) + self._notify_app_services(max_room_stream_token) + self._notify_pusher_pool(max_room_stream_token) if self.federation_sender: self.federation_sender.notify_new_events(max_room_stream_token) - async def _notify_app_services(self, max_room_stream_token: RoomStreamToken): + def _notify_app_services(self, max_room_stream_token: RoomStreamToken): try: - await self.appservice_handler.notify_interested_services( - max_room_stream_token - ) + self.appservice_handler.notify_interested_services(max_room_stream_token) except Exception: logger.exception("Error notifying application services of event") - async def _notify_app_services_ephemeral( + def _notify_app_services_ephemeral( self, stream_key: str, new_token: Union[int, RoomStreamToken], - users: Collection[UserID] = [], + users: Collection[Union[str, UserID]] = [], ): try: stream_token = None if isinstance(new_token, int): stream_token = new_token - await self.appservice_handler.notify_interested_services_ephemeral( + self.appservice_handler.notify_interested_services_ephemeral( stream_key, stream_token, users ) except Exception: logger.exception("Error notifying application services of event") - async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): + def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): try: - await self._pusher_pool.on_new_notifications(max_room_stream_token) + self._pusher_pool.on_new_notifications(max_room_stream_token) except Exception: logger.exception("Error pusher pool of event") @@ -384,12 +376,8 @@ class Notifier: self.notify_replication() # Notify appservices - run_as_background_process( - "_notify_app_services_ephemeral", - self._notify_app_services_ephemeral, - stream_key, - new_token, - users, + self._notify_app_services_ephemeral( + stream_key, new_token, users, ) def on_new_replication_data(self) -> None: diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 0080c68ce2..f325964983 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -19,7 +19,10 @@ from typing import TYPE_CHECKING, Dict, Union from prometheus_client import Gauge -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import ( + run_as_background_process, + wrap_as_background_process, +) from synapse.push import PusherConfigException from synapse.push.emailpusher import EmailPusher from synapse.push.httppusher import HttpPusher @@ -187,7 +190,7 @@ class PusherPool: ) await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"]) - async def on_new_notifications(self, max_token: RoomStreamToken): + def on_new_notifications(self, max_token: RoomStreamToken): if not self.pushers: # nothing to do here. return @@ -201,6 +204,17 @@ class PusherPool: # Nothing to do return + # We only start a new background process if necessary rather than + # optimistically (to cut down on overhead). + self._on_new_notifications(max_token) + + @wrap_as_background_process("on_new_notifications") + async def _on_new_notifications(self, max_token: RoomStreamToken): + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_id = max_token.stream + prev_stream_id = self._last_room_stream_id_seen self._last_room_stream_id_seen = max_stream_id diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index de19705c1f..bc6ba709a7 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -166,7 +166,9 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): Args: cmd (Command) """ - run_as_background_process("send-cmd", self._async_send_command, cmd) + run_as_background_process( + "send-cmd", self._async_send_command, cmd, bg_start_span=False + ) async def _async_send_command(self, cmd: Command): """Encode a replication command and send it over our outbound connection""" diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index ee4f3da31c..53763cd0f9 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -42,7 +42,6 @@ class AppServiceHandlerTestCase(unittest.TestCase): hs.get_clock.return_value = MockClock() self.handler = ApplicationServicesHandler(hs) - @defer.inlineCallbacks def test_notify_interested_services(self): interested_service = self._mkservice(is_interested=True) services = [ @@ -62,14 +61,12 @@ class AppServiceHandlerTestCase(unittest.TestCase): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred( - self.handler.notify_interested_services(RoomStreamToken(None, 0)) - ) + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + self.mock_scheduler.submit_event_for_as.assert_called_once_with( interested_service, event ) - @defer.inlineCallbacks def test_query_user_exists_unknown_user(self): user_id = "@someone:anywhere" services = [self._mkservice(is_interested=True)] @@ -83,12 +80,11 @@ class AppServiceHandlerTestCase(unittest.TestCase): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred( - self.handler.notify_interested_services(RoomStreamToken(None, 0)) - ) + + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + self.mock_as_api.query_user.assert_called_once_with(services[0], user_id) - @defer.inlineCallbacks def test_query_user_exists_known_user(self): user_id = "@someone:anywhere" services = [self._mkservice(is_interested=True)] @@ -102,9 +98,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred( - self.handler.notify_interested_services(RoomStreamToken(None, 0)) - ) + + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + self.assertFalse( self.mock_as_api.query_user.called, "query_user called when it shouldn't have been.", -- cgit 1.5.1 From a6ea1a957e8e38ca3f98d4da32ee49a40fcb4807 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 28 Oct 2020 12:11:45 +0000 Subject: Don't pull event from DB when handling replication traffic. (#8669) I was trying to make it so that we didn't have to start a background task when handling RDATA, but that is a bigger job (due to all the code in `generic_worker`). However I still think not pulling the event from the DB may help reduce some DB usage due to replication, even if most workers will simply go and pull that event from the DB later anyway. Co-authored-by: Patrick Cloke --- changelog.d/8669.misc | 1 + synapse/notifier.py | 68 ++++++++++++++++++++----- synapse/replication/tcp/client.py | 20 +++++--- synapse/replication/tcp/streams/events.py | 21 +++++--- synapse/storage/databases/main/events_worker.py | 8 ++- 5 files changed, 87 insertions(+), 31 deletions(-) create mode 100644 changelog.d/8669.misc (limited to 'synapse/notifier.py') diff --git a/changelog.d/8669.misc b/changelog.d/8669.misc new file mode 100644 index 0000000000..5228105cd3 --- /dev/null +++ b/changelog.d/8669.misc @@ -0,0 +1 @@ +Don't pull event from DB when handling replication traffic. diff --git a/synapse/notifier.py b/synapse/notifier.py index eb56b26f21..a17352ef46 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -28,6 +28,7 @@ from typing import ( Union, ) +import attr from prometheus_client import Counter from twisted.internet import defer @@ -173,6 +174,17 @@ class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))): return bool(self.events) +@attr.s(slots=True, frozen=True) +class _PendingRoomEventEntry: + event_pos = attr.ib(type=PersistedEventPosition) + extra_users = attr.ib(type=Collection[UserID]) + + room_id = attr.ib(type=str) + type = attr.ib(type=str) + state_key = attr.ib(type=Optional[str]) + membership = attr.ib(type=Optional[str]) + + class Notifier: """ This class is responsible for notifying any listeners when there are new events available for it. @@ -190,9 +202,7 @@ class Notifier: self.storage = hs.get_storage() self.event_sources = hs.get_event_sources() self.store = hs.get_datastore() - self.pending_new_room_events = ( - [] - ) # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]] + self.pending_new_room_events = [] # type: List[_PendingRoomEventEntry] # Called when there are new things to stream over replication self.replication_callbacks = [] # type: List[Callable[[], None]] @@ -255,7 +265,29 @@ class Notifier: max_room_stream_token: RoomStreamToken, extra_users: Collection[UserID] = [], ): - """ Used by handlers to inform the notifier something has happened + """Unwraps event and calls `on_new_room_event_args`. + """ + self.on_new_room_event_args( + event_pos=event_pos, + room_id=event.room_id, + event_type=event.type, + state_key=event.get("state_key"), + membership=event.content.get("membership"), + max_room_stream_token=max_room_stream_token, + extra_users=extra_users, + ) + + def on_new_room_event_args( + self, + room_id: str, + event_type: str, + state_key: Optional[str], + membership: Optional[str], + event_pos: PersistedEventPosition, + max_room_stream_token: RoomStreamToken, + extra_users: Collection[UserID] = [], + ): + """Used by handlers to inform the notifier something has happened in the room, room event wise. This triggers the notifier to wake up any listeners that are @@ -266,7 +298,16 @@ class Notifier: until all previous events have been persisted before notifying the client streams. """ - self.pending_new_room_events.append((event_pos, event, extra_users)) + self.pending_new_room_events.append( + _PendingRoomEventEntry( + event_pos=event_pos, + extra_users=extra_users, + room_id=room_id, + type=event_type, + state_key=state_key, + membership=membership, + ) + ) self._notify_pending_new_room_events(max_room_stream_token) self.notify_replication() @@ -284,18 +325,19 @@ class Notifier: users = set() # type: Set[UserID] rooms = set() # type: Set[str] - for event_pos, event, extra_users in pending: - if event_pos.persisted_after(max_room_stream_token): - self.pending_new_room_events.append((event_pos, event, extra_users)) + for entry in pending: + if entry.event_pos.persisted_after(max_room_stream_token): + self.pending_new_room_events.append(entry) else: if ( - event.type == EventTypes.Member - and event.membership == Membership.JOIN + entry.type == EventTypes.Member + and entry.membership == Membership.JOIN + and entry.state_key ): - self._user_joined_room(event.state_key, event.room_id) + self._user_joined_room(entry.state_key, entry.room_id) - users.update(extra_users) - rooms.add(event.room_id) + users.update(entry.extra_users) + rooms.add(entry.room_id) if users or rooms: self.on_new_event( diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index e27ee216f0..2618eb1e53 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -141,21 +141,25 @@ class ReplicationDataHandler: if row.type != EventsStreamEventRow.TypeId: continue assert isinstance(row, EventsStreamRow) + assert isinstance(row.data, EventsStreamEventRow) - event = await self.store.get_event( - row.data.event_id, allow_rejected=True - ) - if event.rejected_reason: + if row.data.rejected: continue extra_users = () # type: Tuple[UserID, ...] - if event.type == EventTypes.Member: - extra_users = (UserID.from_string(event.state_key),) + if row.data.type == EventTypes.Member and row.data.state_key: + extra_users = (UserID.from_string(row.data.state_key),) max_token = self.store.get_room_max_token() event_pos = PersistedEventPosition(instance_name, token) - self.notifier.on_new_room_event( - event, event_pos, max_token, extra_users + self.notifier.on_new_room_event_args( + event_pos=event_pos, + max_room_stream_token=max_token, + extra_users=extra_users, + room_id=row.data.room_id, + event_type=row.data.type, + state_key=row.data.state_key, + membership=row.data.membership, ) # Notify any waiting deferreds. The list is ordered by position so we diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 82e9e0d64e..86a62b71eb 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -15,12 +15,15 @@ # limitations under the License. import heapq from collections.abc import Iterable -from typing import List, Tuple, Type +from typing import TYPE_CHECKING, List, Optional, Tuple, Type import attr from ._base import Stream, StreamUpdateResult, Token +if TYPE_CHECKING: + from synapse.server import HomeServer + """Handling of the 'events' replication stream This stream contains rows of various types. Each row therefore contains a 'type' @@ -81,12 +84,14 @@ class BaseEventsStreamRow: class EventsStreamEventRow(BaseEventsStreamRow): TypeId = "ev" - event_id = attr.ib() # str - room_id = attr.ib() # str - type = attr.ib() # str - state_key = attr.ib() # str, optional - redacts = attr.ib() # str, optional - relates_to = attr.ib() # str, optional + event_id = attr.ib(type=str) + room_id = attr.ib(type=str) + type = attr.ib(type=str) + state_key = attr.ib(type=Optional[str]) + redacts = attr.ib(type=Optional[str]) + relates_to = attr.ib(type=Optional[str]) + membership = attr.ib(type=Optional[str]) + rejected = attr.ib(type=bool) @attr.s(slots=True, frozen=True) @@ -113,7 +118,7 @@ class EventsStream(Stream): NAME = "events" - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self._store = hs.get_datastore() super().__init__( hs.get_instance_name(), diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index cd1f31aa62..5ae263827d 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1117,11 +1117,13 @@ class EventsWorkerStore(SQLBaseStore): def get_all_new_forward_event_rows(txn): sql = ( "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id" + " state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" " FROM events AS e" " LEFT JOIN redactions USING (event_id)" " LEFT JOIN state_events USING (event_id)" " LEFT JOIN event_relations USING (event_id)" + " LEFT JOIN room_memberships USING (event_id)" + " LEFT JOIN rejections USING (event_id)" " WHERE ? < stream_ordering AND stream_ordering <= ?" " AND instance_name = ?" " ORDER BY stream_ordering ASC" @@ -1152,12 +1154,14 @@ class EventsWorkerStore(SQLBaseStore): def get_ex_outlier_stream_rows_txn(txn): sql = ( "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id" + " state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" " FROM events AS e" " INNER JOIN ex_outlier_stream AS out USING (event_id)" " LEFT JOIN redactions USING (event_id)" " LEFT JOIN state_events USING (event_id)" " LEFT JOIN event_relations USING (event_id)" + " LEFT JOIN room_memberships USING (event_id)" + " LEFT JOIN rejections USING (event_id)" " WHERE ? < event_stream_ordering" " AND event_stream_ordering <= ?" " AND out.instance_name = ?" -- cgit 1.5.1