diff options
Diffstat (limited to 'synapse')
42 files changed, 1104 insertions, 520 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index 722b53a67d..83b8e4897f 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -48,7 +48,7 @@ try: except ImportError: pass -__version__ = "1.21.1" +__version__ = "1.21.2" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index d53181deb1..1b511890aa 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -790,10 +790,6 @@ class FederationSenderHandler: send_queue.process_rows_for_federation(self.federation_sender, rows) await self.update_token(token) - # We also need to poke the federation sender when new events happen - elif stream_name == "events": - self.federation_sender.notify_new_events(token) - # ... and when new receipts happen elif stream_name == ReceiptsStream.NAME: await self._on_new_receipts(rows) diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 13ec1f71a6..3862d9c08f 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -14,14 +14,15 @@ # limitations under the License. import logging import re -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Iterable, List, Match, Optional from synapse.api.constants import EventTypes -from synapse.appservice.api import ApplicationServiceApi -from synapse.types import GroupID, get_domain_from_id +from synapse.events import EventBase +from synapse.types import GroupID, JsonDict, UserID, get_domain_from_id from synapse.util.caches.descriptors import cached if TYPE_CHECKING: + from synapse.appservice.api import ApplicationServiceApi from synapse.storage.databases.main import DataStore logger = logging.getLogger(__name__) @@ -32,38 +33,6 @@ class ApplicationServiceState: UP = "up" -class AppServiceTransaction: - """Represents an application service transaction.""" - - def __init__(self, service, id, events): - self.service = service - self.id = id - self.events = events - - async def send(self, as_api: ApplicationServiceApi) -> bool: - """Sends this transaction using the provided AS API interface. - - Args: - as_api: The API to use to send. - Returns: - True if the transaction was sent. - """ - return await as_api.push_bulk( - service=self.service, events=self.events, txn_id=self.id - ) - - async def complete(self, store: "DataStore") -> None: - """Completes this transaction as successful. - - Marks this transaction ID on the application service and removes the - transaction contents from the database. - - Args: - store: The database store to operate on. - """ - await store.complete_appservice_txn(service=self.service, txn_id=self.id) - - class ApplicationService: """Defines an application service. This definition is mostly what is provided to the /register AS API. @@ -91,6 +60,7 @@ class ApplicationService: protocols=None, rate_limited=True, ip_range_whitelist=None, + supports_ephemeral=False, ): self.token = token self.url = ( @@ -102,6 +72,7 @@ class ApplicationService: self.namespaces = self._check_namespaces(namespaces) self.id = id self.ip_range_whitelist = ip_range_whitelist + self.supports_ephemeral = supports_ephemeral if "|" in self.id: raise Exception("application service ID cannot contain '|' character") @@ -161,19 +132,21 @@ class ApplicationService: raise ValueError("Expected string for 'regex' in ns '%s'" % ns) return namespaces - def _matches_regex(self, test_string, namespace_key): + def _matches_regex(self, test_string: str, namespace_key: str) -> Optional[Match]: for regex_obj in self.namespaces[namespace_key]: if regex_obj["regex"].match(test_string): return regex_obj return None - def _is_exclusive(self, ns_key, test_string): + def _is_exclusive(self, ns_key: str, test_string: str) -> bool: regex_obj = self._matches_regex(test_string, ns_key) if regex_obj: return regex_obj["exclusive"] return False - async def _matches_user(self, event, store): + async def _matches_user( + self, event: Optional[EventBase], store: Optional["DataStore"] = None + ) -> bool: if not event: return False @@ -188,14 +161,23 @@ class ApplicationService: if not store: return False - does_match = await self._matches_user_in_member_list(event.room_id, store) + does_match = await self.matches_user_in_member_list(event.room_id, store) return does_match - @cached(num_args=1, cache_context=True) - async def _matches_user_in_member_list(self, room_id, store, cache_context): - member_list = await store.get_users_in_room( - room_id, on_invalidate=cache_context.invalidate - ) + @cached(num_args=1) + async def matches_user_in_member_list( + self, room_id: str, store: "DataStore" + ) -> bool: + """Check if this service is interested a room based upon it's membership + + Args: + room_id: The room to check. + store: The datastore to query. + + Returns: + True if this service would like to know about this room. + """ + member_list = await store.get_users_in_room(room_id) # check joined member events for user_id in member_list: @@ -203,12 +185,14 @@ class ApplicationService: return True return False - def _matches_room_id(self, event): + def _matches_room_id(self, event: EventBase) -> bool: if hasattr(event, "room_id"): return self.is_interested_in_room(event.room_id) return False - async def _matches_aliases(self, event, store): + async def _matches_aliases( + self, event: EventBase, store: Optional["DataStore"] = None + ) -> bool: if not store or not event: return False @@ -218,12 +202,15 @@ class ApplicationService: return True return False - async def is_interested(self, event, store=None) -> bool: + async def is_interested( + self, event: EventBase, store: Optional["DataStore"] = None + ) -> bool: """Check if this service is interested in this event. Args: - event(Event): The event to check. - store(DataStore) + event: The event to check. + store: The datastore to query. + Returns: True if this service would like to know about this event. """ @@ -231,39 +218,66 @@ class ApplicationService: if self._matches_room_id(event): return True + # This will check the namespaces first before + # checking the store, so should be run before _matches_aliases + if await self._matches_user(event, store): + return True + + # This will check the store, so should be run last if await self._matches_aliases(event, store): return True - if await self._matches_user(event, store): + return False + + @cached(num_args=1) + async def is_interested_in_presence( + self, user_id: UserID, store: "DataStore" + ) -> bool: + """Check if this service is interested a user's presence + + Args: + user_id: The user to check. + store: The datastore to query. + + Returns: + True if this service would like to know about presence for this user. + """ + # Find all the rooms the sender is in + if self.is_interested_in_user(user_id.to_string()): return True + room_ids = await store.get_rooms_for_user(user_id.to_string()) + # Then find out if the appservice is interested in any of those rooms + for room_id in room_ids: + if await self.matches_user_in_member_list(room_id, store): + return True return False - def is_interested_in_user(self, user_id): + def is_interested_in_user(self, user_id: str) -> bool: return ( - self._matches_regex(user_id, ApplicationService.NS_USERS) + bool(self._matches_regex(user_id, ApplicationService.NS_USERS)) or user_id == self.sender ) - def is_interested_in_alias(self, alias): + def is_interested_in_alias(self, alias: str) -> bool: return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES)) - def is_interested_in_room(self, room_id): + def is_interested_in_room(self, room_id: str) -> bool: return bool(self._matches_regex(room_id, ApplicationService.NS_ROOMS)) - def is_exclusive_user(self, user_id): + def is_exclusive_user(self, user_id: str) -> bool: return ( self._is_exclusive(ApplicationService.NS_USERS, user_id) or user_id == self.sender ) - def is_interested_in_protocol(self, protocol): + def is_interested_in_protocol(self, protocol: str) -> bool: return protocol in self.protocols - def is_exclusive_alias(self, alias): + def is_exclusive_alias(self, alias: str) -> bool: return self._is_exclusive(ApplicationService.NS_ALIASES, alias) - def is_exclusive_room(self, room_id): + def is_exclusive_room(self, room_id: str) -> bool: return self._is_exclusive(ApplicationService.NS_ROOMS, room_id) def get_exclusive_user_regexes(self): @@ -276,14 +290,14 @@ class ApplicationService: if regex_obj["exclusive"] ] - def get_groups_for_user(self, user_id): + def get_groups_for_user(self, user_id: str) -> Iterable[str]: """Get the groups that this user is associated with by this AS Args: - user_id (str): The ID of the user. + user_id: The ID of the user. Returns: - iterable[str]: an iterable that yields group_id strings. + An iterable that yields group_id strings. """ return ( regex_obj["group_id"] @@ -291,7 +305,7 @@ class ApplicationService: if "group_id" in regex_obj and regex_obj["regex"].match(user_id) ) - def is_rate_limited(self): + def is_rate_limited(self) -> bool: return self.rate_limited def __str__(self): @@ -300,3 +314,45 @@ class ApplicationService: dict_copy["token"] = "<redacted>" dict_copy["hs_token"] = "<redacted>" return "ApplicationService: %s" % (dict_copy,) + + +class AppServiceTransaction: + """Represents an application service transaction.""" + + def __init__( + self, + service: ApplicationService, + id: int, + events: List[EventBase], + ephemeral: List[JsonDict], + ): + self.service = service + self.id = id + self.events = events + self.ephemeral = ephemeral + + async def send(self, as_api: "ApplicationServiceApi") -> bool: + """Sends this transaction using the provided AS API interface. + + Args: + as_api: The API to use to send. + Returns: + True if the transaction was sent. + """ + return await as_api.push_bulk( + service=self.service, + events=self.events, + ephemeral=self.ephemeral, + txn_id=self.id, + ) + + async def complete(self, store: "DataStore") -> None: + """Completes this transaction as successful. + + Marks this transaction ID on the application service and removes the + transaction contents from the database. + + Args: + store: The database store to operate on. + """ + await store.complete_appservice_txn(service=self.service, txn_id=self.id) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index e8f0793795..e366a982b8 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -14,12 +14,13 @@ # limitations under the License. import logging import urllib -from typing import TYPE_CHECKING, Optional, Tuple +from typing import TYPE_CHECKING, List, Optional, Tuple from prometheus_client import Counter from synapse.api.constants import EventTypes, ThirdPartyEntityKind from synapse.api.errors import CodeMessageException +from synapse.events import EventBase from synapse.events.utils import serialize_event from synapse.http.client import SimpleHttpClient from synapse.types import JsonDict, ThirdPartyInstanceID @@ -201,7 +202,13 @@ class ApplicationServiceApi(SimpleHttpClient): key = (service.id, protocol) return await self.protocol_meta_cache.wrap(key, _get) - async def push_bulk(self, service, events, txn_id=None): + async def push_bulk( + self, + service: "ApplicationService", + events: List[EventBase], + ephemeral: List[JsonDict], + txn_id: Optional[int] = None, + ): if service.url is None: return True @@ -211,15 +218,19 @@ class ApplicationServiceApi(SimpleHttpClient): logger.warning( "push_bulk: Missing txn ID sending events to %s", service.url ) - txn_id = str(0) - txn_id = str(txn_id) + txn_id = 0 + + uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id))) + + # Never send ephemeral events to appservices that do not support it + if service.supports_ephemeral: + body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral} + else: + body = {"events": events} - uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id)) try: await self.put_json( - uri=uri, - json_body={"events": events}, - args={"access_token": service.hs_token}, + uri=uri, json_body=body, args={"access_token": service.hs_token}, ) sent_transactions_counter.labels(service.id).inc() sent_events_counter.labels(service.id).inc(len(events)) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 8eb8c6f51c..ad3c408519 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -49,10 +49,13 @@ This is all tied together by the AppServiceScheduler which DIs the required components. """ import logging +from typing import List -from synapse.appservice import ApplicationServiceState +from synapse.appservice import ApplicationService, ApplicationServiceState +from synapse.events import EventBase from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import JsonDict logger = logging.getLogger(__name__) @@ -82,8 +85,13 @@ class ApplicationServiceScheduler: for service in services: self.txn_ctrl.start_recoverer(service) - def submit_event_for_as(self, service, event): - self.queuer.enqueue(service, event) + def submit_event_for_as(self, service: ApplicationService, event: EventBase): + self.queuer.enqueue_event(service, event) + + def submit_ephemeral_events_for_as( + self, service: ApplicationService, events: List[JsonDict] + ): + self.queuer.enqueue_ephemeral(service, events) class _ServiceQueuer: @@ -96,17 +104,15 @@ class _ServiceQueuer: def __init__(self, txn_ctrl, clock): self.queued_events = {} # dict of {service_id: [events]} + self.queued_ephemeral = {} # dict of {service_id: [events]} # the appservices which currently have a transaction in flight self.requests_in_flight = set() self.txn_ctrl = txn_ctrl self.clock = clock - def enqueue(self, service, event): - self.queued_events.setdefault(service.id, []).append(event) - + def _start_background_request(self, service): # start a sender for this appservice if we don't already have one - if service.id in self.requests_in_flight: return @@ -114,7 +120,15 @@ class _ServiceQueuer: "as-sender-%s" % (service.id,), self._send_request, service ) - async def _send_request(self, service): + def enqueue_event(self, service: ApplicationService, event: EventBase): + self.queued_events.setdefault(service.id, []).append(event) + self._start_background_request(service) + + def enqueue_ephemeral(self, service: ApplicationService, events: List[JsonDict]): + self.queued_ephemeral.setdefault(service.id, []).extend(events) + self._start_background_request(service) + + async def _send_request(self, service: ApplicationService): # sanity-check: we shouldn't get here if this service already has a sender # running. assert service.id not in self.requests_in_flight @@ -123,10 +137,11 @@ class _ServiceQueuer: try: while True: events = self.queued_events.pop(service.id, []) - if not events: + ephemeral = self.queued_ephemeral.pop(service.id, []) + if not events and not ephemeral: return try: - await self.txn_ctrl.send(service, events) + await self.txn_ctrl.send(service, events, ephemeral) except Exception: logger.exception("AS request failed") finally: @@ -158,9 +173,16 @@ class _TransactionController: # for UTs self.RECOVERER_CLASS = _Recoverer - async def send(self, service, events): + async def send( + self, + service: ApplicationService, + events: List[EventBase], + ephemeral: List[JsonDict] = [], + ): try: - txn = await self.store.create_appservice_txn(service=service, events=events) + txn = await self.store.create_appservice_txn( + service=service, events=events, ephemeral=ephemeral + ) service_is_up = await self._is_service_up(service) if service_is_up: sent = await txn.send(self.as_api) @@ -204,7 +226,7 @@ class _TransactionController: recoverer.recover() logger.info("Now %i active recoverers", len(self.recoverers)) - async def _is_service_up(self, service): + async def _is_service_up(self, service: ApplicationService) -> bool: state = await self.store.get_appservice_state(service) return state == ApplicationServiceState.UP or state is None diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index 8ed3e24258..746fc3cc02 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -160,6 +160,8 @@ def _load_appservice(hostname, as_info, config_filename): if as_info.get("ip_range_whitelist"): ip_range_whitelist = IPSet(as_info.get("ip_range_whitelist")) + supports_ephemeral = as_info.get("de.sorunome.msc2409.push_ephemeral", False) + return ApplicationService( token=as_info["as_token"], hostname=hostname, @@ -168,6 +170,7 @@ def _load_appservice(hostname, as_info, config_filename): hs_token=as_info["hs_token"], sender=user_id, id=as_info["id"], + supports_ephemeral=supports_ephemeral, protocols=protocols, rate_limited=rate_limited, ip_range_whitelist=ip_range_whitelist, diff --git a/synapse/events/builder.py b/synapse/events/builder.py index b6c47be646..df4f950fec 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -97,32 +97,37 @@ class EventBuilder: def is_state(self): return self._state_key is not None - async def build(self, prev_event_ids: List[str]) -> EventBase: + async def build( + self, prev_event_ids: List[str], auth_event_ids: Optional[List[str]] + ) -> EventBase: """Transform into a fully signed and hashed event Args: prev_event_ids: The event IDs to use as the prev events + auth_event_ids: The event IDs to use as the auth events. + Should normally be set to None, which will cause them to be calculated + based on the room state at the prev_events. Returns: The signed and hashed event. """ - - state_ids = await self._state.get_current_state_ids( - self.room_id, prev_event_ids - ) - auth_ids = self._auth.compute_auth_events(self, state_ids) + if auth_event_ids is None: + state_ids = await self._state.get_current_state_ids( + self.room_id, prev_event_ids + ) + auth_event_ids = self._auth.compute_auth_events(self, state_ids) format_version = self.room_version.event_format if format_version == EventFormatVersions.V1: # The types of auth/prev events changes between event versions. auth_events = await self._store.add_event_hashes( - auth_ids + auth_event_ids ) # type: Union[List[str], List[Tuple[str, Dict[str, str]]]] prev_events = await self._store.add_event_hashes( prev_event_ids ) # type: Union[List[str], List[Tuple[str, Dict[str, str]]]] else: - auth_events = auth_ids + auth_events = auth_event_ids prev_events = prev_event_ids old_depth = await self._store.get_max_depth_of(prev_event_ids) diff --git a/synapse/events/validator.py b/synapse/events/validator.py index 9df35b54ba..5f9af8529b 100644 --- a/synapse/events/validator.py +++ b/synapse/events/validator.py @@ -83,6 +83,9 @@ class EventValidator: Args: event (FrozenEvent): The event to validate. """ + if not event.is_state(): + raise SynapseError(code=400, msg="must be a state event") + min_lifetime = event.content.get("min_lifetime") max_lifetime = event.content.get("max_lifetime") diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 8e46957d15..5f1bf492c1 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -188,7 +188,7 @@ class FederationRemoteSendQueue: for key in keys[:i]: del self.edus[key] - def notify_new_events(self, current_id): + def notify_new_events(self, max_token): """As per FederationSender""" # We don't need to replicate this as it gets sent down a different # stream. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index e33b29a42c..604cfd1935 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -40,7 +40,7 @@ from synapse.metrics import ( events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import ReadReceipt +from synapse.types import ReadReceipt, RoomStreamToken from synapse.util.metrics import Measure, measure_func logger = logging.getLogger(__name__) @@ -154,10 +154,15 @@ class FederationSender: self._per_destination_queues[destination] = queue return queue - def notify_new_events(self, current_id: int) -> None: + def notify_new_events(self, max_token: RoomStreamToken) -> None: """This gets called when we have some new events we might want to send out to other servers. """ + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + current_id = max_token.stream + self._last_poked_id = max(current_id, self._last_poked_id) if self._is_processing: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 9d4e87dad6..07240d3a14 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import Dict, List, Optional from prometheus_client import Counter @@ -21,12 +22,16 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes +from synapse.appservice import ApplicationService +from synapse.events import EventBase +from synapse.handlers.presence import format_user_presence_state from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics import ( event_processing_loop_counter, event_processing_loop_room_count, ) from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import Collection, JsonDict, RoomStreamToken, UserID from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -43,19 +48,22 @@ class ApplicationServicesHandler: self.started_scheduler = False self.clock = hs.get_clock() self.notify_appservices = hs.config.notify_appservices + self.event_sources = hs.get_event_sources() self.current_max = 0 self.is_processing = False - async def notify_interested_services(self, current_id): + async def notify_interested_services(self, max_token: RoomStreamToken): """Notifies (pushes) all application services interested in this event. Pushing is done asynchronously, so this method won't block for any prolonged length of time. - - Args: - current_id(int): The current maximum ID. """ + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + current_id = max_token.stream + services = self.store.get_app_services() if not services or not self.notify_appservices: return @@ -79,7 +87,7 @@ class ApplicationServicesHandler: if not events: break - events_by_room = {} + events_by_room = {} # type: Dict[str, List[EventBase]] for event in events: events_by_room.setdefault(event.room_id, []).append(event) @@ -158,6 +166,104 @@ class ApplicationServicesHandler: finally: self.is_processing = False + async def notify_interested_services_ephemeral( + self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [], + ): + """This is called by the notifier in the background + when a ephemeral event handled by the homeserver. + + This will determine which appservices + are interested in the event, and submit them. + + Events will only be pushed to appservices + that have opted into ephemeral events + + Args: + stream_key: The stream the event came from. + new_token: The latest stream token + users: The user(s) involved with the event. + """ + services = [ + service + for service in self.store.get_app_services() + if service.supports_ephemeral + ] + if not services or not self.notify_appservices: + return + logger.info("Checking interested services for %s" % (stream_key)) + with Measure(self.clock, "notify_interested_services_ephemeral"): + for service in services: + # Only handle typing if we have the latest token + if stream_key == "typing_key" and new_token is not None: + events = await self._handle_typing(service, new_token) + if events: + self.scheduler.submit_ephemeral_events_for_as(service, events) + # We don't persist the token for typing_key for performance reasons + elif stream_key == "receipt_key": + events = await self._handle_receipts(service) + if events: + self.scheduler.submit_ephemeral_events_for_as(service, events) + await self.store.set_type_stream_id_for_appservice( + service, "read_receipt", new_token + ) + elif stream_key == "presence_key": + events = await self._handle_presence(service, users) + if events: + self.scheduler.submit_ephemeral_events_for_as(service, events) + await self.store.set_type_stream_id_for_appservice( + service, "presence", new_token + ) + + async def _handle_typing(self, service: ApplicationService, new_token: int): + typing_source = self.event_sources.sources["typing"] + # Get the typing events from just before current + typing, _ = await typing_source.get_new_events_as( + service=service, + # For performance reasons, we don't persist the previous + # token in the DB and instead fetch the latest typing information + # for appservices. + from_key=new_token - 1, + ) + return typing + + async def _handle_receipts(self, service: ApplicationService): + from_key = await self.store.get_type_stream_id_for_appservice( + service, "read_receipt" + ) + receipts_source = self.event_sources.sources["receipt"] + receipts, _ = await receipts_source.get_new_events_as( + service=service, from_key=from_key + ) + return receipts + + async def _handle_presence( + self, service: ApplicationService, users: Collection[UserID] + ): + events = [] # type: List[JsonDict] + presence_source = self.event_sources.sources["presence"] + from_key = await self.store.get_type_stream_id_for_appservice( + service, "presence" + ) + for user in users: + interested = await service.is_interested_in_presence(user, self.store) + if not interested: + continue + presence_events, _ = await presence_source.get_new_events( + user=user, service=service, from_key=from_key, + ) + time_now = self.clock.time_msec() + presence_events = [ + { + "type": "m.presence", + "sender": event.user_id, + "content": format_user_presence_state( + event, time_now, include_user_id=False + ), + } + for event in presence_events + ] + events = events + presence_events + async def query_user_exists(self, user_id): """Check if any application service knows this user_id exists. @@ -220,7 +326,7 @@ class ApplicationServicesHandler: async def get_3pe_protocols(self, only_protocol=None): services = self.store.get_app_services() - protocols = {} + protocols = {} # type: Dict[str, List[JsonDict]] # Collect up all the individual protocol responses out of the ASes for s in services: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 0c6aec347e..7f00805a91 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -437,9 +437,9 @@ class EventCreationHandler: self, requester: Requester, event_dict: dict, - token_id: Optional[str] = None, txn_id: Optional[str] = None, prev_event_ids: Optional[List[str]] = None, + auth_event_ids: Optional[List[str]] = None, require_consent: bool = True, ) -> Tuple[EventBase, EventContext]: """ @@ -453,13 +453,18 @@ class EventCreationHandler: Args: requester event_dict: An entire event - token_id txn_id prev_event_ids: the forward extremities to use as the prev_events for the new event. If None, they will be requested from the database. + + auth_event_ids: + The event ids to use as the auth_events for the new event. + Should normally be left as None, which will cause them to be calculated + based on the room state at the prev_events. + require_consent: Whether to check if the requester has consented to the privacy policy. Raises: @@ -511,14 +516,17 @@ class EventCreationHandler: if require_consent and not is_exempt: await self.assert_accepted_privacy_policy(requester) - if token_id is not None: - builder.internal_metadata.token_id = token_id + if requester.access_token_id is not None: + builder.internal_metadata.token_id = requester.access_token_id if txn_id is not None: builder.internal_metadata.txn_id = txn_id event, context = await self.create_new_client_event( - builder=builder, requester=requester, prev_event_ids=prev_event_ids, + builder=builder, + requester=requester, + prev_event_ids=prev_event_ids, + auth_event_ids=auth_event_ids, ) # In an ideal world we wouldn't need the second part of this condition. However, @@ -726,7 +734,7 @@ class EventCreationHandler: return event, event.internal_metadata.stream_ordering event, context = await self.create_event( - requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id + requester, event_dict, txn_id=txn_id ) assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( @@ -757,6 +765,7 @@ class EventCreationHandler: builder: EventBuilder, requester: Optional[Requester] = None, prev_event_ids: Optional[List[str]] = None, + auth_event_ids: Optional[List[str]] = None, ) -> Tuple[EventBase, EventContext]: """Create a new event for a local client @@ -769,6 +778,11 @@ class EventCreationHandler: If None, they will be requested from the database. + auth_event_ids: + The event ids to use as the auth_events for the new event. + Should normally be left as None, which will cause them to be calculated + based on the room state at the prev_events. + Returns: Tuple of created event, context """ @@ -790,7 +804,9 @@ class EventCreationHandler: builder.type == EventTypes.Create or len(prev_event_ids) > 0 ), "Attempting to create an event with no prev_events" - event = await builder.build(prev_event_ids=prev_event_ids) + event = await builder.build( + prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids + ) context = await self.state.compute_event_context(event) if requester: context.app_service = requester.app_service diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 7225923757..c242c409cf 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -13,9 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import List, Tuple +from synapse.appservice import ApplicationService from synapse.handlers._base import BaseHandler -from synapse.types import ReadReceipt, get_domain_from_id +from synapse.types import JsonDict, ReadReceipt, get_domain_from_id from synapse.util.async_helpers import maybe_awaitable logger = logging.getLogger(__name__) @@ -140,5 +142,36 @@ class ReceiptEventSource: return (events, to_key) + async def get_new_events_as( + self, from_key: int, service: ApplicationService + ) -> Tuple[List[JsonDict], int]: + """Returns a set of new receipt events that an appservice + may be interested in. + + Args: + from_key: the stream position at which events should be fetched from + service: The appservice which may be interested + """ + from_key = int(from_key) + to_key = self.get_current_key() + + if from_key == to_key: + return [], to_key + + # We first need to fetch all new receipts + rooms_to_events = await self.store.get_linearized_receipts_for_all_rooms( + from_key=from_key, to_key=to_key + ) + + # Then filter down to rooms that the AS can read + events = [] + for room_id, event in rooms_to_events.items(): + if not await service.matches_user_in_member_list(room_id, self.store): + continue + + events.append(event) + + return (events, to_key) + def get_current_key(self, direction="f"): return self.store.get_max_receipt_stream_id() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 93ed51063a..ec300d8877 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -214,7 +214,6 @@ class RoomCreationHandler(BaseHandler): "replacement_room": new_room_id, }, }, - token_id=requester.access_token_id, ) old_room_version = await self.store.get_room_version_id(old_room_id) await self.auth.check_from_context( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 0080eeaf8d..ec784030e9 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -17,12 +17,10 @@ import abc import logging import random from http import HTTPStatus -from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, Union - -from unpaddedbase64 import encode_base64 +from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple from synapse import types -from synapse.api.constants import MAX_DEPTH, AccountDataTypes, EventTypes, Membership +from synapse.api.constants import AccountDataTypes, EventTypes, Membership from synapse.api.errors import ( AuthError, Codes, @@ -31,12 +29,8 @@ from synapse.api.errors import ( SynapseError, ) from synapse.api.ratelimiting import Ratelimiter -from synapse.api.room_versions import EventFormatVersions -from synapse.crypto.event_signing import compute_event_reference_hash from synapse.events import EventBase -from synapse.events.builder import create_local_event_from_event_dict from synapse.events.snapshot import EventContext -from synapse.events.validator import EventValidator from synapse.storage.roommember import RoomsForUser from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID from synapse.util.async_helpers import Linearizer @@ -193,7 +187,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # For backwards compatibility: "membership": membership, }, - token_id=requester.access_token_id, txn_id=txn_id, prev_event_ids=prev_event_ids, require_consent=require_consent, @@ -1133,31 +1126,10 @@ class RoomMemberMasterHandler(RoomMemberHandler): room_id = invite_event.room_id target_user = invite_event.state_key - room_version = await self.store.get_room_version(room_id) content["membership"] = Membership.LEAVE - # the auth events for the new event are the same as that of the invite, plus - # the invite itself. - # - # the prev_events are just the invite. - invite_hash = invite_event.event_id # type: Union[str, Tuple] - if room_version.event_format == EventFormatVersions.V1: - alg, h = compute_event_reference_hash(invite_event) - invite_hash = (invite_event.event_id, {alg: encode_base64(h)}) - - auth_events = tuple(invite_event.auth_events) + (invite_hash,) - prev_events = (invite_hash,) - - # we cap depth of generated events, to ensure that they are not - # rejected by other servers (and so that they can be persisted in - # the db) - depth = min(invite_event.depth + 1, MAX_DEPTH) - event_dict = { - "depth": depth, - "auth_events": auth_events, - "prev_events": prev_events, "type": EventTypes.Member, "room_id": room_id, "sender": target_user, @@ -1165,24 +1137,23 @@ class RoomMemberMasterHandler(RoomMemberHandler): "state_key": target_user, } - event = create_local_event_from_event_dict( - clock=self.clock, - hostname=self.hs.hostname, - signing_key=self.hs.signing_key, - room_version=room_version, - event_dict=event_dict, + # the auth events for the new event are the same as that of the invite, plus + # the invite itself. + # + # the prev_events are just the invite. + prev_event_ids = [invite_event.event_id] + auth_event_ids = invite_event.auth_event_ids() + prev_event_ids + + event, context = await self.event_creation_handler.create_event( + requester, + event_dict, + txn_id=txn_id, + prev_event_ids=prev_event_ids, + auth_event_ids=auth_event_ids, ) event.internal_metadata.outlier = True event.internal_metadata.out_of_band_membership = True - if txn_id is not None: - event.internal_metadata.txn_id = txn_id - if requester.access_token_id is not None: - event.internal_metadata.token_id = requester.access_token_id - - EventValidator().validate_new(event, self.config) - context = await self.state_handler.compute_event_context(event) - context.app_service = requester.app_service result_event = await self.event_creation_handler.handle_new_client_event( requester, event, context, extra_users=[UserID.from_string(target_user)], ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a306631094..b527724bc4 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import itertools import logging from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 3cbfc2d780..d3692842e3 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -12,16 +12,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import random from collections import namedtuple from typing import TYPE_CHECKING, List, Set, Tuple from synapse.api.errors import AuthError, ShadowBanError, SynapseError +from synapse.appservice import ApplicationService from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.streams import TypingStream -from synapse.types import UserID, get_domain_from_id +from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -430,6 +430,33 @@ class TypingNotificationEventSource: "content": {"user_ids": list(typing)}, } + async def get_new_events_as( + self, from_key: int, service: ApplicationService + ) -> Tuple[List[JsonDict], int]: + """Returns a set of new typing events that an appservice + may be interested in. + + Args: + from_key: the stream position at which events should be fetched from + service: The appservice which may be interested + """ + with Measure(self.clock, "typing.get_new_events_as"): + from_key = int(from_key) + handler = self.get_typing_handler() + + events = [] + for room_id in handler._room_serials.keys(): + if handler._room_serials[room_id] <= from_key: + continue + if not await service.matches_user_in_member_list( + room_id, handler.store + ): + continue + + events.append(self._make_event_for(room_id)) + + return (events, handler._latest_room_serial) + async def get_new_events(self, from_key, room_ids, **kwargs): with Measure(self.clock, "typing.get_new_events"): from_key = int(from_key) diff --git a/synapse/notifier.py b/synapse/notifier.py index 13adeed01e..2e993411b9 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -319,19 +319,35 @@ class Notifier: ) if self.federation_sender: - self.federation_sender.notify_new_events(max_room_stream_token.stream) + self.federation_sender.notify_new_events(max_room_stream_token) async def _notify_app_services(self, max_room_stream_token: RoomStreamToken): try: await self.appservice_handler.notify_interested_services( - max_room_stream_token.stream + max_room_stream_token + ) + except Exception: + logger.exception("Error notifying application services of event") + + async def _notify_app_services_ephemeral( + self, + stream_key: str, + new_token: Union[int, RoomStreamToken], + users: Collection[UserID] = [], + ): + try: + stream_token = None + if isinstance(new_token, int): + stream_token = new_token + await self.appservice_handler.notify_interested_services_ephemeral( + stream_key, stream_token, users ) except Exception: logger.exception("Error notifying application services of event") async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): try: - await self._pusher_pool.on_new_notifications(max_room_stream_token.stream) + await self._pusher_pool.on_new_notifications(max_room_stream_token) except Exception: logger.exception("Error pusher pool of event") @@ -367,6 +383,15 @@ class Notifier: self.notify_replication() + # Notify appservices + run_as_background_process( + "_notify_app_services_ephemeral", + self._notify_app_services_ephemeral, + stream_key, + new_token, + users, + ) + def on_new_replication_data(self) -> None: """Used to inform replication listeners that something has happend without waking up any of the normal user event streams""" diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 28bd8ab748..c6763971ee 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -18,6 +18,7 @@ import logging from twisted.internet.error import AlreadyCalled, AlreadyCancelled from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import RoomStreamToken logger = logging.getLogger(__name__) @@ -91,7 +92,12 @@ class EmailPusher: pass self.timed_call = None - def on_new_notifications(self, max_stream_ordering): + def on_new_notifications(self, max_token: RoomStreamToken): + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_ordering = max_token.stream + if self.max_stream_ordering: self.max_stream_ordering = max( max_stream_ordering, self.max_stream_ordering diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 26706bf3e1..793d0db2d9 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -23,6 +23,7 @@ from synapse.api.constants import EventTypes from synapse.logging import opentracing from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import PusherConfigException +from synapse.types import RoomStreamToken from . import push_rule_evaluator, push_tools @@ -114,7 +115,12 @@ class HttpPusher: if should_check_for_notifs: self._start_processing() - def on_new_notifications(self, max_stream_ordering): + def on_new_notifications(self, max_token: RoomStreamToken): + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_ordering = max_token.stream + self.max_stream_ordering = max( max_stream_ordering, self.max_stream_ordering or 0 ) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 76150e117b..0080c68ce2 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -24,6 +24,7 @@ from synapse.push import PusherConfigException from synapse.push.emailpusher import EmailPusher from synapse.push.httppusher import HttpPusher from synapse.push.pusher import PusherFactory +from synapse.types import RoomStreamToken from synapse.util.async_helpers import concurrently_execute if TYPE_CHECKING: @@ -186,11 +187,16 @@ class PusherPool: ) await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"]) - async def on_new_notifications(self, max_stream_id: int): + async def on_new_notifications(self, max_token: RoomStreamToken): if not self.pushers: # nothing to do here. return + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_id = max_token.stream + if max_stream_id < self._last_room_stream_id_seen: # Nothing to do return @@ -214,7 +220,7 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): - p.on_new_notifications(max_stream_id) + p.on_new_notifications(max_token) except Exception: logger.exception("Exception in pusher on_new_notifications") diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py index 1f8dafe7ea..4b0ea0cc01 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py @@ -15,7 +15,7 @@ from synapse.storage.database import DatabasePool from synapse.storage.databases.main.client_ips import LAST_SEEN_GRANULARITY -from synapse.util.caches.descriptors import Cache +from synapse.util.caches.deferred_cache import DeferredCache from ._base import BaseSlavedStore @@ -24,9 +24,9 @@ class SlavedClientIpStore(BaseSlavedStore): def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) - self.client_ip_last_seen = Cache( + self.client_ip_last_seen = DeferredCache( name="client_ip_last_seen", keylen=4, max_entries=50000 - ) + ) # type: DeferredCache[tuple, int] async def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py index b686cd671f..e7fcd2b1ff 100644 --- a/synapse/rest/client/v1/profile.py +++ b/synapse/rest/client/v1/profile.py @@ -59,7 +59,9 @@ class ProfileDisplaynameRestServlet(RestServlet): try: new_name = content["displayname"] except Exception: - return 400, "Unable to parse name" + raise SynapseError( + code=400, msg="Unable to parse name", errcode=Codes.BAD_JSON, + ) await self.profile_handler.set_displayname(user, requester, new_name, is_admin) diff --git a/synapse/server.py b/synapse/server.py index f921ee4b53..21a232bbd9 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -205,7 +205,13 @@ class HomeServer(metaclass=abc.ABCMeta): # instantiated during setup() for future return by get_datastore() DATASTORE_CLASS = abc.abstractproperty() - def __init__(self, hostname: str, config: HomeServerConfig, reactor=None, **kwargs): + def __init__( + self, + hostname: str, + config: HomeServerConfig, + reactor=None, + version_string="Synapse", + ): """ Args: hostname : The hostname for the server. @@ -236,11 +242,9 @@ class HomeServer(metaclass=abc.ABCMeta): burst_count=config.rc_registration.burst_count, ) - self.datastores = None # type: Optional[Databases] + self.version_string = version_string - # Other kwargs are explicit dependencies - for depname in kwargs: - setattr(self, depname, kwargs[depname]) + self.datastores = None # type: Optional[Databases] def get_instance_id(self) -> str: """A unique ID for this synapse process instance. diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 0ba3a025cf..763722d6bc 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -893,6 +893,12 @@ class DatabasePool: attempts = 0 while True: try: + # We can autocommit if we are going to use native upserts + autocommit = ( + self.engine.can_native_upsert + and table not in self._unsafe_to_upsert_tables + ) + return await self.runInteraction( desc, self.simple_upsert_txn, @@ -901,6 +907,7 @@ class DatabasePool: values, insertion_values, lock=lock, + db_autocommit=autocommit, ) except self.engine.module.IntegrityError as e: attempts += 1 @@ -1063,6 +1070,43 @@ class DatabasePool: ) txn.execute(sql, list(allvalues.values())) + async def simple_upsert_many( + self, + table: str, + key_names: Collection[str], + key_values: Collection[Iterable[Any]], + value_names: Collection[str], + value_values: Iterable[Iterable[Any]], + desc: str, + ) -> None: + """ + Upsert, many times. + + Args: + table: The table to upsert into + key_names: The key column names. + key_values: A list of each row's key column values. + value_names: The value column names + value_values: A list of each row's value column values. + Ignored if value_names is empty. + """ + + # We can autocommit if we are going to use native upserts + autocommit = ( + self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables + ) + + return await self.runInteraction( + desc, + self.simple_upsert_many_txn, + table, + key_names, + key_values, + value_names, + value_values, + db_autocommit=autocommit, + ) + def simple_upsert_many_txn( self, txn: LoggingTransaction, @@ -1214,7 +1258,13 @@ class DatabasePool: desc: description of the transaction, for logging and metrics """ return await self.runInteraction( - desc, self.simple_select_one_txn, table, keyvalues, retcols, allow_none + desc, + self.simple_select_one_txn, + table, + keyvalues, + retcols, + allow_none, + db_autocommit=True, ) @overload @@ -1265,6 +1315,7 @@ class DatabasePool: keyvalues, retcol, allow_none=allow_none, + db_autocommit=True, ) @overload @@ -1346,7 +1397,12 @@ class DatabasePool: Results in a list """ return await self.runInteraction( - desc, self.simple_select_onecol_txn, table, keyvalues, retcol + desc, + self.simple_select_onecol_txn, + table, + keyvalues, + retcol, + db_autocommit=True, ) async def simple_select_list( @@ -1371,7 +1427,12 @@ class DatabasePool: A list of dictionaries. """ return await self.runInteraction( - desc, self.simple_select_list_txn, table, keyvalues, retcols + desc, + self.simple_select_list_txn, + table, + keyvalues, + retcols, + db_autocommit=True, ) @classmethod @@ -1450,6 +1511,7 @@ class DatabasePool: chunk, keyvalues, retcols, + db_autocommit=True, ) results.extend(rows) @@ -1548,7 +1610,12 @@ class DatabasePool: desc: description of the transaction, for logging and metrics """ await self.runInteraction( - desc, self.simple_update_one_txn, table, keyvalues, updatevalues + desc, + self.simple_update_one_txn, + table, + keyvalues, + updatevalues, + db_autocommit=True, ) @classmethod @@ -1607,7 +1674,9 @@ class DatabasePool: keyvalues: dict of column names and values to select the row with desc: description of the transaction, for logging and metrics """ - await self.runInteraction(desc, self.simple_delete_one_txn, table, keyvalues) + await self.runInteraction( + desc, self.simple_delete_one_txn, table, keyvalues, db_autocommit=True, + ) @staticmethod def simple_delete_one_txn( @@ -1646,7 +1715,9 @@ class DatabasePool: Returns: The number of deleted rows. """ - return await self.runInteraction(desc, self.simple_delete_txn, table, keyvalues) + return await self.runInteraction( + desc, self.simple_delete_txn, table, keyvalues, db_autocommit=True + ) @staticmethod def simple_delete_txn( @@ -1694,7 +1765,13 @@ class DatabasePool: Number rows deleted """ return await self.runInteraction( - desc, self.simple_delete_many_txn, table, column, iterable, keyvalues + desc, + self.simple_delete_many_txn, + table, + column, + iterable, + keyvalues, + db_autocommit=True, ) @staticmethod @@ -1860,7 +1937,13 @@ class DatabasePool: """ return await self.runInteraction( - desc, self.simple_search_list_txn, table, term, col, retcols + desc, + self.simple_search_list_txn, + table, + term, + col, + retcols, + db_autocommit=True, ) @classmethod diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 85f6b1e3fd..43bf0f649a 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -15,12 +15,15 @@ # limitations under the License. import logging import re +from typing import List -from synapse.appservice import AppServiceTransaction +from synapse.appservice import ApplicationService, AppServiceTransaction from synapse.config.appservice import load_appservices +from synapse.events import EventBase from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool from synapse.storage.databases.main.events_worker import EventsWorkerStore +from synapse.types import JsonDict from synapse.util import json_encoder logger = logging.getLogger(__name__) @@ -172,15 +175,23 @@ class ApplicationServiceTransactionWorkerStore( "application_services_state", {"as_id": service.id}, {"state": state} ) - async def create_appservice_txn(self, service, events): + async def create_appservice_txn( + self, + service: ApplicationService, + events: List[EventBase], + ephemeral: List[JsonDict], + ) -> AppServiceTransaction: """Atomically creates a new transaction for this application service - with the given list of events. + with the given list of events. Ephemeral events are NOT persisted to the + database and are not resent if a transaction is retried. Args: - service(ApplicationService): The service who the transaction is for. - events(list<Event>): A list of events to put in the transaction. + service: The service who the transaction is for. + events: A list of persistent events to put in the transaction. + ephemeral: A list of ephemeral events to put in the transaction. + Returns: - AppServiceTransaction: A new transaction. + A new transaction. """ def _create_appservice_txn(txn): @@ -207,7 +218,9 @@ class ApplicationServiceTransactionWorkerStore( "VALUES(?,?,?)", (service.id, new_txn_id, event_ids), ) - return AppServiceTransaction(service=service, id=new_txn_id, events=events) + return AppServiceTransaction( + service=service, id=new_txn_id, events=events, ephemeral=ephemeral + ) return await self.db_pool.runInteraction( "create_appservice_txn", _create_appservice_txn @@ -296,7 +309,9 @@ class ApplicationServiceTransactionWorkerStore( events = await self.get_events_as_list(event_ids) - return AppServiceTransaction(service=service, id=entry["txn_id"], events=events) + return AppServiceTransaction( + service=service, id=entry["txn_id"], events=events, ephemeral=[] + ) def _get_last_txn(self, txn, service_id): txn.execute( @@ -320,7 +335,7 @@ class ApplicationServiceTransactionWorkerStore( ) async def get_new_events_for_appservice(self, current_id, limit): - """Get all new evnets""" + """Get all new events for an appservice""" def get_new_events_for_appservice_txn(txn): sql = ( @@ -351,6 +366,39 @@ class ApplicationServiceTransactionWorkerStore( return upper_bound, events + async def get_type_stream_id_for_appservice( + self, service: ApplicationService, type: str + ) -> int: + def get_type_stream_id_for_appservice_txn(txn): + stream_id_type = "%s_stream_id" % type + txn.execute( + "SELECT ? FROM application_services_state WHERE as_id=?", + (stream_id_type, service.id,), + ) + last_txn_id = txn.fetchone() + if last_txn_id is None or last_txn_id[0] is None: # no row exists + return 0 + else: + return int(last_txn_id[0]) + + return await self.db_pool.runInteraction( + "get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn + ) + + async def set_type_stream_id_for_appservice( + self, service: ApplicationService, type: str, pos: int + ) -> None: + def set_type_stream_id_for_appservice_txn(txn): + stream_id_type = "%s_stream_id" % type + txn.execute( + "UPDATE ? SET device_list_stream_id = ? WHERE as_id=?", + (stream_id_type, pos, service.id), + ) + + await self.db_pool.runInteraction( + "set_type_stream_id_for_appservice", set_type_stream_id_for_appservice_txn + ) + class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore): # This is currently empty due to there not being any AS storage functions diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index a25a888443..9e66e6648a 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -19,7 +19,7 @@ from typing import Dict, Optional, Tuple from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool, make_tuple_comparison_clause -from synapse.util.caches.descriptors import Cache +from synapse.util.caches.deferred_cache import DeferredCache logger = logging.getLogger(__name__) @@ -410,7 +410,7 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore): class ClientIpStore(ClientIpWorkerStore): def __init__(self, database: DatabasePool, db_conn, hs): - self.client_ip_last_seen = Cache( + self.client_ip_last_seen = DeferredCache( name="client_ip_last_seen", keylen=4, max_entries=50000 ) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 88fd97e1df..e662a20d24 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -34,7 +34,8 @@ from synapse.storage.database import ( ) from synapse.types import Collection, JsonDict, get_verify_key_from_cross_signing_key from synapse.util import json_decoder, json_encoder -from synapse.util.caches.descriptors import Cache, cached, cachedList +from synapse.util.caches.deferred_cache import DeferredCache +from synapse.util.caches.descriptors import cached, cachedList from synapse.util.iterutils import batch_iter from synapse.util.stringutils import shortstr @@ -1004,7 +1005,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): # Map of (user_id, device_id) -> bool. If there is an entry that implies # the device exists. - self.device_id_exists_cache = Cache( + self.device_id_exists_cache = DeferredCache( name="device_id_exists", keylen=2, max_entries=10000 ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index fdb17745f6..ba3b1769b0 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1270,6 +1270,10 @@ class PersistEventsStore: ) def _store_retention_policy_for_room_txn(self, txn, event): + if not event.is_state(): + logger.debug("Ignoring non-state m.room.retention event") + return + if hasattr(event, "content") and ( "min_lifetime" in event.content or "max_lifetime" in event.content ): diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 3ec4d1d9c2..ff150f0be7 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -42,7 +42,8 @@ from synapse.storage.database import DatabasePool from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator from synapse.types import Collection, get_domain_from_id -from synapse.util.caches.descriptors import Cache, cached +from synapse.util.caches.deferred_cache import DeferredCache +from synapse.util.caches.descriptors import cached from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -145,7 +146,7 @@ class EventsWorkerStore(SQLBaseStore): self._cleanup_old_transaction_ids, ) - self._get_event_cache = Cache( + self._get_event_cache = DeferredCache( "*getEvent*", keylen=3, max_entries=hs.config.caches.event_cache_size, diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py index ad43bb05ab..f8f4bb9b3f 100644 --- a/synapse/storage/databases/main/keys.py +++ b/synapse/storage/databases/main/keys.py @@ -122,9 +122,7 @@ class KeyStore(SQLBaseStore): # param, which is itself the 2-tuple (server_name, key_id). invalidations.append((server_name, key_id)) - await self.db_pool.runInteraction( - "store_server_verify_keys", - self.db_pool.simple_upsert_many_txn, + await self.db_pool.simple_upsert_many( table="server_signature_keys", key_names=("server_name", "key_id"), key_values=key_values, @@ -135,6 +133,7 @@ class KeyStore(SQLBaseStore): "verify_key", ), value_values=value_values, + desc="store_server_verify_keys", ) invalidate = self._get_server_verify_key.invalidate diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index 0acf0617ca..79b01d16f9 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -281,9 +281,14 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): a_day_in_milliseconds = 24 * 60 * 60 * 1000 now = self._clock.time_msec() + # A note on user_agent. Technically a given device can have multiple + # user agents, so we need to decide which one to pick. We could have handled this + # in number of ways, but given that we don't _that_ much have gone for MAX() + # For more details of the other options considered see + # https://github.com/matrix-org/synapse/pull/8503#discussion_r502306111 sql = """ - INSERT INTO user_daily_visits (user_id, device_id, timestamp) - SELECT u.user_id, u.device_id, ? + INSERT INTO user_daily_visits (user_id, device_id, timestamp, user_agent) + SELECT u.user_id, u.device_id, ?, MAX(u.user_agent) FROM user_ips AS u LEFT JOIN ( SELECT user_id, device_id, timestamp FROM user_daily_visits @@ -294,7 +299,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): WHERE last_seen > ? AND last_seen <= ? AND udv.timestamp IS NULL AND users.is_guest=0 AND users.appservice_id IS NULL - GROUP BY u.user_id, u.device_id + GROUP BY u.user_id, u.device_id, u.user_agent """ # This means that the day has rolled over but there could still diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index c79ddff680..5cdf16521c 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -23,6 +23,7 @@ from twisted.internet import defer from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool from synapse.storage.util.id_generators import StreamIdGenerator +from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedList @@ -274,6 +275,60 @@ class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta): } return results + @cached(num_args=2,) + async def get_linearized_receipts_for_all_rooms( + self, to_key: int, from_key: Optional[int] = None + ) -> Dict[str, JsonDict]: + """Get receipts for all rooms between two stream_ids. + + Args: + to_key: Max stream id to fetch receipts upto. + from_key: Min stream id to fetch receipts from. None fetches + from the start. + + Returns: + A dictionary of roomids to a list of receipts. + """ + + def f(txn): + if from_key: + sql = """ + SELECT * FROM receipts_linearized WHERE + stream_id > ? AND stream_id <= ? + """ + txn.execute(sql, [from_key, to_key]) + else: + sql = """ + SELECT * FROM receipts_linearized WHERE + stream_id <= ? + """ + + txn.execute(sql, [to_key]) + + return self.db_pool.cursor_to_dict(txn) + + txn_results = await self.db_pool.runInteraction( + "get_linearized_receipts_for_all_rooms", f + ) + + results = {} + for row in txn_results: + # We want a single event per room, since we want to batch the + # receipts by room, event and type. + room_event = results.setdefault( + row["room_id"], + {"type": "m.receipt", "room_id": row["room_id"], "content": {}}, + ) + + # The content is of the form: + # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. } + event_entry = room_event["content"].setdefault(row["event_id"], {}) + receipt_type = event_entry.setdefault(row["receipt_type"], {}) + + receipt_type[row["user_id"]] = db_to_json(row["data"]) + + return results + async def get_users_sent_receipts_between( self, last_id: int, current_id: int ) -> List[str]: diff --git a/synapse/storage/databases/main/schema/delta/58/20user_daily_visits.sql b/synapse/storage/databases/main/schema/delta/58/20user_daily_visits.sql new file mode 100644 index 0000000000..b0b5dcddce --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/20user_daily_visits.sql @@ -0,0 +1,18 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + -- Add new column to user_daily_visits to track user agent +ALTER TABLE user_daily_visits + ADD COLUMN user_agent TEXT; diff --git a/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql b/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql new file mode 100644 index 0000000000..20f5a95a24 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql @@ -0,0 +1,18 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE application_services_state + ADD COLUMN read_receipt_stream_id INT, + ADD COLUMN presence_stream_id INT; \ No newline at end of file diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 7d46090267..59207cadd4 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -208,42 +208,56 @@ class TransactionStore(TransactionWorkerStore): """ self._destination_retry_cache.pop(destination, None) - return await self.db_pool.runInteraction( - "set_destination_retry_timings", - self._set_destination_retry_timings, - destination, - failure_ts, - retry_last_ts, - retry_interval, - ) + if self.database_engine.can_native_upsert: + return await self.db_pool.runInteraction( + "set_destination_retry_timings", + self._set_destination_retry_timings_native, + destination, + failure_ts, + retry_last_ts, + retry_interval, + db_autocommit=True, # Safe as its a single upsert + ) + else: + return await self.db_pool.runInteraction( + "set_destination_retry_timings", + self._set_destination_retry_timings_emulated, + destination, + failure_ts, + retry_last_ts, + retry_interval, + ) - def _set_destination_retry_timings( + def _set_destination_retry_timings_native( self, txn, destination, failure_ts, retry_last_ts, retry_interval ): + assert self.database_engine.can_native_upsert + + # Upsert retry time interval if retry_interval is zero (i.e. we're + # resetting it) or greater than the existing retry interval. + # + # WARNING: This is executed in autocommit, so we shouldn't add any more + # SQL calls in here (without being very careful). + sql = """ + INSERT INTO destinations ( + destination, failure_ts, retry_last_ts, retry_interval + ) + VALUES (?, ?, ?, ?) + ON CONFLICT (destination) DO UPDATE SET + failure_ts = EXCLUDED.failure_ts, + retry_last_ts = EXCLUDED.retry_last_ts, + retry_interval = EXCLUDED.retry_interval + WHERE + EXCLUDED.retry_interval = 0 + OR destinations.retry_interval IS NULL + OR destinations.retry_interval < EXCLUDED.retry_interval + """ - if self.database_engine.can_native_upsert: - # Upsert retry time interval if retry_interval is zero (i.e. we're - # resetting it) or greater than the existing retry interval. - - sql = """ - INSERT INTO destinations ( - destination, failure_ts, retry_last_ts, retry_interval - ) - VALUES (?, ?, ?, ?) - ON CONFLICT (destination) DO UPDATE SET - failure_ts = EXCLUDED.failure_ts, - retry_last_ts = EXCLUDED.retry_last_ts, - retry_interval = EXCLUDED.retry_interval - WHERE - EXCLUDED.retry_interval = 0 - OR destinations.retry_interval IS NULL - OR destinations.retry_interval < EXCLUDED.retry_interval - """ - - txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval)) - - return + txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval)) + def _set_destination_retry_timings_emulated( + self, txn, destination, failure_ts, retry_last_ts, retry_interval + ): self.database_engine.lock_table(txn, "destinations") # We need to be careful here as the data may have changed from under us diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 5a390ff2f6..d87ceec6da 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -480,21 +480,16 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): user_id_tuples: iterable of 2-tuple of user IDs. """ - def _add_users_who_share_room_txn(txn): - self.db_pool.simple_upsert_many_txn( - txn, - table="users_who_share_private_rooms", - key_names=["user_id", "other_user_id", "room_id"], - key_values=[ - (user_id, other_user_id, room_id) - for user_id, other_user_id in user_id_tuples - ], - value_names=(), - value_values=None, - ) - - await self.db_pool.runInteraction( - "add_users_who_share_room", _add_users_who_share_room_txn + await self.db_pool.simple_upsert_many( + table="users_who_share_private_rooms", + key_names=["user_id", "other_user_id", "room_id"], + key_values=[ + (user_id, other_user_id, room_id) + for user_id, other_user_id in user_id_tuples + ], + value_names=(), + value_values=None, + desc="add_users_who_share_room", ) async def add_users_in_public_rooms( @@ -508,19 +503,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): user_ids """ - def _add_users_in_public_rooms_txn(txn): - - self.db_pool.simple_upsert_many_txn( - txn, - table="users_in_public_rooms", - key_names=["user_id", "room_id"], - key_values=[(user_id, room_id) for user_id in user_ids], - value_names=(), - value_values=None, - ) - - await self.db_pool.runInteraction( - "add_users_in_public_rooms", _add_users_in_public_rooms_txn + await self.db_pool.simple_upsert_many( + table="users_in_public_rooms", + key_names=["user_id", "room_id"], + key_values=[(user_id, room_id) for user_id in user_ids], + value_names=(), + value_values=None, + desc="add_users_in_public_rooms", ) async def delete_all_from_user_dir(self) -> None: diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 3d8da48f2d..02d71302ea 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -618,14 +618,7 @@ class _MultiWriterCtxManager: db_autocommit=True, ) - # Assert the fetched ID is actually greater than any ID we've already - # seen. If not, then the sequence and table have got out of sync - # somehow. with self.id_gen._lock: - assert max(self.id_gen._current_positions.values(), default=0) < min( - self.stream_ids - ) - self.id_gen._unfinished_ids.update(self.stream_ids) if self.multiple_ids is None: diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py new file mode 100644 index 0000000000..f728cd2cf2 --- /dev/null +++ b/synapse/util/caches/deferred_cache.py @@ -0,0 +1,292 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import enum +import threading +from typing import Callable, Generic, Iterable, MutableMapping, Optional, TypeVar, cast + +from prometheus_client import Gauge + +from twisted.internet import defer + +from synapse.util.async_helpers import ObservableDeferred +from synapse.util.caches import register_cache +from synapse.util.caches.lrucache import LruCache +from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry + +cache_pending_metric = Gauge( + "synapse_util_caches_cache_pending", + "Number of lookups currently pending for this cache", + ["name"], +) + + +KT = TypeVar("KT") +VT = TypeVar("VT") + + +class _Sentinel(enum.Enum): + # defining a sentinel in this way allows mypy to correctly handle the + # type of a dictionary lookup. + sentinel = object() + + +class DeferredCache(Generic[KT, VT]): + """Wraps an LruCache, adding support for Deferred results. + + It expects that each entry added with set() will be a Deferred; likewise get() + may return an ObservableDeferred. + """ + + __slots__ = ( + "cache", + "name", + "keylen", + "thread", + "metrics", + "_pending_deferred_cache", + ) + + def __init__( + self, + name: str, + max_entries: int = 1000, + keylen: int = 1, + tree: bool = False, + iterable: bool = False, + apply_cache_factor_from_config: bool = True, + ): + """ + Args: + name: The name of the cache + max_entries: Maximum amount of entries that the cache will hold + keylen: The length of the tuple used as the cache key. Ignored unless + `tree` is True. + tree: Use a TreeCache instead of a dict as the underlying cache type + iterable: If True, count each item in the cached object as an entry, + rather than each cached object + apply_cache_factor_from_config: Whether cache factors specified in the + config file affect `max_entries` + """ + cache_type = TreeCache if tree else dict + + # _pending_deferred_cache maps from the key value to a `CacheEntry` object. + self._pending_deferred_cache = ( + cache_type() + ) # type: MutableMapping[KT, CacheEntry] + + # cache is used for completed results and maps to the result itself, rather than + # a Deferred. + self.cache = LruCache( + max_size=max_entries, + keylen=keylen, + cache_type=cache_type, + size_callback=(lambda d: len(d)) if iterable else None, + evicted_callback=self._on_evicted, + apply_cache_factor_from_config=apply_cache_factor_from_config, + ) + + self.name = name + self.keylen = keylen + self.thread = None # type: Optional[threading.Thread] + self.metrics = register_cache( + "cache", + name, + self.cache, + collect_callback=self._metrics_collection_callback, + ) + + @property + def max_entries(self): + return self.cache.max_size + + def _on_evicted(self, evicted_count): + self.metrics.inc_evictions(evicted_count) + + def _metrics_collection_callback(self): + cache_pending_metric.labels(self.name).set(len(self._pending_deferred_cache)) + + def check_thread(self): + expected_thread = self.thread + if expected_thread is None: + self.thread = threading.current_thread() + else: + if expected_thread is not threading.current_thread(): + raise ValueError( + "Cache objects can only be accessed from the main thread" + ) + + def get( + self, + key: KT, + default=_Sentinel.sentinel, + callback: Optional[Callable[[], None]] = None, + update_metrics: bool = True, + ): + """Looks the key up in the caches. + + Args: + key(tuple) + default: What is returned if key is not in the caches. If not + specified then function throws KeyError instead + callback(fn): Gets called when the entry in the cache is invalidated + update_metrics (bool): whether to update the cache hit rate metrics + + Returns: + Either an ObservableDeferred or the result itself + """ + callbacks = [callback] if callback else [] + val = self._pending_deferred_cache.get(key, _Sentinel.sentinel) + if val is not _Sentinel.sentinel: + val.callbacks.update(callbacks) + if update_metrics: + self.metrics.inc_hits() + return val.deferred + + val = self.cache.get(key, _Sentinel.sentinel, callbacks=callbacks) + if val is not _Sentinel.sentinel: + self.metrics.inc_hits() + return val + + if update_metrics: + self.metrics.inc_misses() + + if default is _Sentinel.sentinel: + raise KeyError() + else: + return default + + def set( + self, + key: KT, + value: defer.Deferred, + callback: Optional[Callable[[], None]] = None, + ) -> ObservableDeferred: + if not isinstance(value, defer.Deferred): + raise TypeError("not a Deferred") + + callbacks = [callback] if callback else [] + self.check_thread() + observable = ObservableDeferred(value, consumeErrors=True) + observer = observable.observe() + entry = CacheEntry(deferred=observable, callbacks=callbacks) + + existing_entry = self._pending_deferred_cache.pop(key, None) + if existing_entry: + existing_entry.invalidate() + + self._pending_deferred_cache[key] = entry + + def compare_and_pop(): + """Check if our entry is still the one in _pending_deferred_cache, and + if so, pop it. + + Returns true if the entries matched. + """ + existing_entry = self._pending_deferred_cache.pop(key, None) + if existing_entry is entry: + return True + + # oops, the _pending_deferred_cache has been updated since + # we started our query, so we are out of date. + # + # Better put back whatever we took out. (We do it this way + # round, rather than peeking into the _pending_deferred_cache + # and then removing on a match, to make the common case faster) + if existing_entry is not None: + self._pending_deferred_cache[key] = existing_entry + + return False + + def cb(result): + if compare_and_pop(): + self.cache.set(key, result, entry.callbacks) + else: + # we're not going to put this entry into the cache, so need + # to make sure that the invalidation callbacks are called. + # That was probably done when _pending_deferred_cache was + # updated, but it's possible that `set` was called without + # `invalidate` being previously called, in which case it may + # not have been. Either way, let's double-check now. + entry.invalidate() + + def eb(_fail): + compare_and_pop() + entry.invalidate() + + # once the deferred completes, we can move the entry from the + # _pending_deferred_cache to the real cache. + # + observer.addCallbacks(cb, eb) + return observable + + def prefill(self, key: KT, value: VT, callback: Callable[[], None] = None): + callbacks = [callback] if callback else [] + self.cache.set(key, value, callbacks=callbacks) + + def invalidate(self, key): + self.check_thread() + self.cache.pop(key, None) + + # if we have a pending lookup for this key, remove it from the + # _pending_deferred_cache, which will (a) stop it being returned + # for future queries and (b) stop it being persisted as a proper entry + # in self.cache. + entry = self._pending_deferred_cache.pop(key, None) + + # run the invalidation callbacks now, rather than waiting for the + # deferred to resolve. + if entry: + entry.invalidate() + + def invalidate_many(self, key: KT): + self.check_thread() + if not isinstance(key, tuple): + raise TypeError("The cache key must be a tuple not %r" % (type(key),)) + self.cache.del_multi(key) + + # if we have a pending lookup for this key, remove it from the + # _pending_deferred_cache, as above + entry_dict = self._pending_deferred_cache.pop(cast(KT, key), None) + if entry_dict is not None: + for entry in iterate_tree_cache_entry(entry_dict): + entry.invalidate() + + def invalidate_all(self): + self.check_thread() + self.cache.clear() + for entry in self._pending_deferred_cache.values(): + entry.invalidate() + self._pending_deferred_cache.clear() + + +class CacheEntry: + __slots__ = ["deferred", "callbacks", "invalidated"] + + def __init__( + self, deferred: ObservableDeferred, callbacks: Iterable[Callable[[], None]] + ): + self.deferred = deferred + self.callbacks = set(callbacks) + self.invalidated = False + + def invalidate(self): + if not self.invalidated: + self.invalidated = True + for callback in self.callbacks: + callback() + self.callbacks.clear() diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 98b34f2223..1f43886804 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -13,25 +13,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import functools import inspect import logging -import threading from typing import Any, Callable, Generic, Optional, Tuple, TypeVar, Union, cast from weakref import WeakValueDictionary -from prometheus_client import Gauge - from twisted.internet import defer from synapse.logging.context import make_deferred_yieldable, preserve_fn from synapse.util import unwrapFirstError from synapse.util.async_helpers import ObservableDeferred -from synapse.util.caches.lrucache import LruCache -from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry - -from . import register_cache +from synapse.util.caches.deferred_cache import DeferredCache logger = logging.getLogger(__name__) @@ -55,239 +48,6 @@ class _CachedFunction(Generic[F]): __call__ = None # type: F -cache_pending_metric = Gauge( - "synapse_util_caches_cache_pending", - "Number of lookups currently pending for this cache", - ["name"], -) - -_CacheSentinel = object() - - -class CacheEntry: - __slots__ = ["deferred", "callbacks", "invalidated"] - - def __init__(self, deferred, callbacks): - self.deferred = deferred - self.callbacks = set(callbacks) - self.invalidated = False - - def invalidate(self): - if not self.invalidated: - self.invalidated = True - for callback in self.callbacks: - callback() - self.callbacks.clear() - - -class Cache: - __slots__ = ( - "cache", - "name", - "keylen", - "thread", - "metrics", - "_pending_deferred_cache", - ) - - def __init__( - self, - name: str, - max_entries: int = 1000, - keylen: int = 1, - tree: bool = False, - iterable: bool = False, - apply_cache_factor_from_config: bool = True, - ): - """ - Args: - name: The name of the cache - max_entries: Maximum amount of entries that the cache will hold - keylen: The length of the tuple used as the cache key - tree: Use a TreeCache instead of a dict as the underlying cache type - iterable: If True, count each item in the cached object as an entry, - rather than each cached object - apply_cache_factor_from_config: Whether cache factors specified in the - config file affect `max_entries` - - Returns: - Cache - """ - cache_type = TreeCache if tree else dict - self._pending_deferred_cache = cache_type() - - self.cache = LruCache( - max_size=max_entries, - keylen=keylen, - cache_type=cache_type, - size_callback=(lambda d: len(d)) if iterable else None, - evicted_callback=self._on_evicted, - apply_cache_factor_from_config=apply_cache_factor_from_config, - ) - - self.name = name - self.keylen = keylen - self.thread = None # type: Optional[threading.Thread] - self.metrics = register_cache( - "cache", - name, - self.cache, - collect_callback=self._metrics_collection_callback, - ) - - @property - def max_entries(self): - return self.cache.max_size - - def _on_evicted(self, evicted_count): - self.metrics.inc_evictions(evicted_count) - - def _metrics_collection_callback(self): - cache_pending_metric.labels(self.name).set(len(self._pending_deferred_cache)) - - def check_thread(self): - expected_thread = self.thread - if expected_thread is None: - self.thread = threading.current_thread() - else: - if expected_thread is not threading.current_thread(): - raise ValueError( - "Cache objects can only be accessed from the main thread" - ) - - def get(self, key, default=_CacheSentinel, callback=None, update_metrics=True): - """Looks the key up in the caches. - - Args: - key(tuple) - default: What is returned if key is not in the caches. If not - specified then function throws KeyError instead - callback(fn): Gets called when the entry in the cache is invalidated - update_metrics (bool): whether to update the cache hit rate metrics - - Returns: - Either an ObservableDeferred or the raw result - """ - callbacks = [callback] if callback else [] - val = self._pending_deferred_cache.get(key, _CacheSentinel) - if val is not _CacheSentinel: - val.callbacks.update(callbacks) - if update_metrics: - self.metrics.inc_hits() - return val.deferred - - val = self.cache.get(key, _CacheSentinel, callbacks=callbacks) - if val is not _CacheSentinel: - self.metrics.inc_hits() - return val - - if update_metrics: - self.metrics.inc_misses() - - if default is _CacheSentinel: - raise KeyError() - else: - return default - - def set(self, key, value, callback=None): - if not isinstance(value, defer.Deferred): - raise TypeError("not a Deferred") - - callbacks = [callback] if callback else [] - self.check_thread() - observable = ObservableDeferred(value, consumeErrors=True) - observer = observable.observe() - entry = CacheEntry(deferred=observable, callbacks=callbacks) - - existing_entry = self._pending_deferred_cache.pop(key, None) - if existing_entry: - existing_entry.invalidate() - - self._pending_deferred_cache[key] = entry - - def compare_and_pop(): - """Check if our entry is still the one in _pending_deferred_cache, and - if so, pop it. - - Returns true if the entries matched. - """ - existing_entry = self._pending_deferred_cache.pop(key, None) - if existing_entry is entry: - return True - - # oops, the _pending_deferred_cache has been updated since - # we started our query, so we are out of date. - # - # Better put back whatever we took out. (We do it this way - # round, rather than peeking into the _pending_deferred_cache - # and then removing on a match, to make the common case faster) - if existing_entry is not None: - self._pending_deferred_cache[key] = existing_entry - - return False - - def cb(result): - if compare_and_pop(): - self.cache.set(key, result, entry.callbacks) - else: - # we're not going to put this entry into the cache, so need - # to make sure that the invalidation callbacks are called. - # That was probably done when _pending_deferred_cache was - # updated, but it's possible that `set` was called without - # `invalidate` being previously called, in which case it may - # not have been. Either way, let's double-check now. - entry.invalidate() - - def eb(_fail): - compare_and_pop() - entry.invalidate() - - # once the deferred completes, we can move the entry from the - # _pending_deferred_cache to the real cache. - # - observer.addCallbacks(cb, eb) - return observable - - def prefill(self, key, value, callback=None): - callbacks = [callback] if callback else [] - self.cache.set(key, value, callbacks=callbacks) - - def invalidate(self, key): - self.check_thread() - self.cache.pop(key, None) - - # if we have a pending lookup for this key, remove it from the - # _pending_deferred_cache, which will (a) stop it being returned - # for future queries and (b) stop it being persisted as a proper entry - # in self.cache. - entry = self._pending_deferred_cache.pop(key, None) - - # run the invalidation callbacks now, rather than waiting for the - # deferred to resolve. - if entry: - entry.invalidate() - - def invalidate_many(self, key): - self.check_thread() - if not isinstance(key, tuple): - raise TypeError("The cache key must be a tuple not %r" % (type(key),)) - self.cache.del_multi(key) - - # if we have a pending lookup for this key, remove it from the - # _pending_deferred_cache, as above - entry_dict = self._pending_deferred_cache.pop(key, None) - if entry_dict is not None: - for entry in iterate_tree_cache_entry(entry_dict): - entry.invalidate() - - def invalidate_all(self): - self.check_thread() - self.cache.clear() - for entry in self._pending_deferred_cache.values(): - entry.invalidate() - self._pending_deferred_cache.clear() - - class _CacheDescriptorBase: def __init__(self, orig: _CachedFunction, num_args, cache_context=False): self.orig = orig @@ -390,13 +150,13 @@ class CacheDescriptor(_CacheDescriptorBase): self.iterable = iterable def __get__(self, obj, owner): - cache = Cache( + cache = DeferredCache( name=self.orig.__name__, max_entries=self.max_entries, keylen=self.num_args, tree=self.tree, iterable=self.iterable, - ) + ) # type: DeferredCache[Tuple, Any] def get_cache_key_gen(args, kwargs): """Given some args/kwargs return a generator that resolves into @@ -640,9 +400,9 @@ class _CacheContext: _cache_context_objects = ( WeakValueDictionary() - ) # type: WeakValueDictionary[Tuple[Cache, CacheKey], _CacheContext] + ) # type: WeakValueDictionary[Tuple[DeferredCache, CacheKey], _CacheContext] - def __init__(self, cache, cache_key): # type: (Cache, CacheKey) -> None + def __init__(self, cache, cache_key): # type: (DeferredCache, CacheKey) -> None self._cache = cache self._cache_key = cache_key @@ -651,7 +411,9 @@ class _CacheContext: self._cache.invalidate(self._cache_key) @classmethod - def get_instance(cls, cache, cache_key): # type: (Cache, CacheKey) -> _CacheContext + def get_instance( + cls, cache, cache_key + ): # type: (DeferredCache, CacheKey) -> _CacheContext """Returns an instance constructed with the given arguments. A new instance is only created if none already exists. diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 4bc1a67b58..33eae2b7c4 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -64,7 +64,8 @@ class LruCache: Args: max_size: The maximum amount of entries the cache can hold - keylen: The length of the tuple used as the cache key + keylen: The length of the tuple used as the cache key. Ignored unless + cache_type is `TreeCache`. cache_type (type): type of underlying cache to be used. Typically one of dict diff --git a/synapse/util/caches/ttlcache.py b/synapse/util/caches/ttlcache.py index 3e180cafd3..6ce2a3d12b 100644 --- a/synapse/util/caches/ttlcache.py +++ b/synapse/util/caches/ttlcache.py @@ -34,7 +34,7 @@ class TTLCache: self._data = {} # the _CacheEntries, sorted by expiry time - self._expiry_list = SortedList() + self._expiry_list = SortedList() # type: SortedList[_CacheEntry] self._timer = timer |