summary refs log tree commit diff
diff options
context:
space:
mode:
authorWill Hunt <will@half-shot.uk>2021-09-24 10:52:46 +0100
committerWill Hunt <will@half-shot.uk>2021-09-24 14:00:58 +0100
commit9c4d018e4ee3507b219871aa53550658b58d1688 (patch)
tree0058eec827ec3022c0247e16a1c2792ae7500817
parentFix AuthBlocking check when requester is appservice (#10881) (diff)
downloadsynapse-9c4d018e4ee3507b219871aa53550658b58d1688.tar.xz
Setup synthetic_events structure
-rw-r--r--synapse/appservice/__init__.py21
-rw-r--r--synapse/appservice/api.py11
-rw-r--r--synapse/appservice/scheduler.py23
-rw-r--r--synapse/handlers/appservice.py58
-rw-r--r--synapse/storage/databases/main/appservice.py3
5 files changed, 104 insertions, 12 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py

index 6504c6bd3f..2597ad7917 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py
@@ -187,6 +187,7 @@ class ApplicationService: for user_id in member_list: if self.is_interested_in_user(user_id): return True + return False def _matches_room_id(self, event: EventBase) -> bool: @@ -233,6 +234,15 @@ class ApplicationService: return False + def is_interested_in_synthetic_user_event(self, event_type: str, user_id: UserID): + for regex_obj in self.namespaces["users"]: + if not regex_obj["regex"].match(user_id): + continue + # TODO: Validate structure further up. + if event_type in regex_obj.get("uk.half-shot.msc3395.synthetic_events", {"events": []})["events"]: + return True + return False + @cached(num_args=1) async def is_interested_in_presence( self, user_id: UserID, store: "DataStore" @@ -258,10 +268,10 @@ class ApplicationService: return False def is_interested_in_user(self, user_id: str) -> bool: - return ( - bool(self._matches_regex(user_id, ApplicationService.NS_USERS)) - or user_id == self.sender - ) + if user_id == self.sender: + return True + regex_obj = self._matches_regex(user_id, ApplicationService.NS_USERS) + return regex_obj and regex_obj.get("uk.half-shot.msc3395.events", True) def is_interested_in_alias(self, alias: str) -> bool: return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES)) @@ -329,11 +339,13 @@ class AppServiceTransaction: id: int, events: List[EventBase], ephemeral: List[JsonDict], + synthetic_events: Optional[JsonDict] = None, ): self.service = service self.id = id self.events = events self.ephemeral = ephemeral + self.synthetic_events = synthetic_events async def send(self, as_api: "ApplicationServiceApi") -> bool: """Sends this transaction using the provided AS API interface. @@ -347,6 +359,7 @@ class AppServiceTransaction: service=self.service, events=self.events, ephemeral=self.ephemeral, + synthetic_events=self.synthetic_events, txn_id=self.id, ) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 935f24263c..0475caae1a 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py
@@ -203,6 +203,7 @@ class ApplicationServiceApi(SimpleHttpClient): service: "ApplicationService", events: List[EventBase], ephemeral: List[JsonDict], + synthetic_events: Optional[List[JsonDict]], txn_id: Optional[int] = None, ): if service.url is None: @@ -218,11 +219,15 @@ class ApplicationServiceApi(SimpleHttpClient): uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id))) + body = {"events": events} + # Never send ephemeral events to appservices that do not support it if service.supports_ephemeral: - body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral} - else: - body = {"events": events} + body["de.sorunome.msc2409.ephemeral"] = ephemeral + + # We will only populate this if the appservice requests synthetic events + if synthetic_events and len(synthetic_events): + body["uk.half-shot.msc3395.synthetic_events"] = synthetic_events try: await self.put_json( diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 6a2ce99b55..9ff677fcf6 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py
@@ -65,6 +65,8 @@ MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100 # Maximum number of ephemeral events to provide in an AS transaction. MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100 +# Maximum number of syntethci events to provide in an AS transaction. +MAX_SYNTHETIC_EVENTS_PER_TRANSACTION = 100 class ApplicationServiceScheduler: """Public facing API for this module. Does the required DI to tie the @@ -99,6 +101,11 @@ class ApplicationServiceScheduler: ): self.queuer.enqueue_ephemeral(service, events) + def submit_synthetic_events_for_as( + self, service: ApplicationService, events: List[JsonDict] + ): + self.queuer.enqueue_ephemeral(service, events) + class _ServiceQueuer: """Queue of events waiting to be sent to appservices. @@ -111,6 +118,7 @@ class _ServiceQueuer: def __init__(self, txn_ctrl, clock): self.queued_events = {} # dict of {service_id: [events]} self.queued_ephemeral = {} # dict of {service_id: [events]} + self.queued_synthetic = {} # dict of {service_id: [events]} # the appservices which currently have a transaction in flight self.requests_in_flight = set() @@ -134,6 +142,10 @@ class _ServiceQueuer: self.queued_ephemeral.setdefault(service.id, []).extend(events) self._start_background_request(service) + def enqueue_syntheic(self, service: ApplicationService, events: List[JsonDict]): + self.queued_synthetic.setdefault(service.id, []).extend(events) + self._start_background_request(service) + async def _send_request(self, service: ApplicationService): # sanity-check: we shouldn't get here if this service already has a sender # running. @@ -150,11 +162,15 @@ class _ServiceQueuer: ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION] del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION] - if not events and not ephemeral: + all_events_synthetic = self.queued_synthetic.get(service.id, []) + synthetic = all_events_ephemeral[:MAX_SYNTHETIC_EVENTS_PER_TRANSACTION] + del all_events_synthetic[:MAX_SYNTHETIC_EVENTS_PER_TRANSACTION] + + if not events and not ephemeral and not synthetic: return try: - await self.txn_ctrl.send(service, events, ephemeral) + await self.txn_ctrl.send(service, events, ephemeral, synthetic) except Exception: logger.exception("AS request failed") finally: @@ -191,10 +207,11 @@ class _TransactionController: service: ApplicationService, events: List[EventBase], ephemeral: Optional[List[JsonDict]] = None, + synthetic_events: Optional[List[JsonDict]] = None, ): try: txn = await self.store.create_appservice_txn( - service=service, events=events, ephemeral=ephemeral or [] + service=service, events=events, ephemeral=ephemeral or [], synthetic_events=synthetic_events ) service_is_up = await self._is_service_up(service) if service_is_up: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 163278708c..cf9ca5b7e5 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py
@@ -179,6 +179,58 @@ class ApplicationServicesHandler: finally: self.is_processing = False + def notify_synthetic_event( + self, + event_type: str, + user_id: UserID, + content: JsonDict, + ) -> None: + """This is called when another service wishes to + notify about a synthetic event. + + This will determine which appservices + are interested in the event, and submit them. + + Events will only be pushed to appservices + that have opted into ephemeral events + + Args: + event_type: The type of event to notify about. + user_id: The user_id of the user involved in the event. + content: The content of the event itself. + """ + if not self.notify_appservices: + return + + logger.debug("Checking interested services for synthetic event %s:%s" % (event_type, user_id)) + services = self._get_services_for_user_synthetic_event(event_type, user_id) + + if not services: + return + + # We only start a new background process if necessary rather than + # optimistically (to cut down on overhead). + self._notify_synthetic_event( + services, user_id, { + "type": event_type, + "content": content, + } + ) + + @wrap_as_background_process("notify_synthetic_event") + async def _notify_synthetic_event( + self, + services: List[ApplicationService], + user_id: UserID, + event: JsonDict, + ) -> None: + logger.debug("Submitting synthetic event to interested services %s:%s" % (event["type"], user_id)) + with Measure(self.clock, "notify_synthetic_event"): + for service in services: + # TODO: Store event in DB if we can't submit it now. + self.scheduler.submit_synthetic_events_for_as(service, [event]) + + def notify_interested_services_ephemeral( self, stream_key: str, @@ -434,12 +486,16 @@ class ApplicationServicesHandler: def _get_services_for_user(self, user_id: str) -> List[ApplicationService]: services = self.store.get_app_services() - return [s for s in services if (s.is_interested_in_user(user_id))] + return [s for s in services if s.is_interested_in_user(user_id)] def _get_services_for_3pn(self, protocol: str) -> List[ApplicationService]: services = self.store.get_app_services() return [s for s in services if s.is_interested_in_protocol(protocol)] + def _get_services_for_user_synthetic_event(self, event_type: str, user_id: str) -> List[ApplicationService]: + services = self.store.get_app_services() + return [s for s in services if s.is_interested_in_synthetic_user_event(event_type, user_id)] + async def _is_unknown_user(self, user_id: str) -> bool: if not self.is_mine_id(user_id): # we don't know if they are unknown or not since it isn't one of our diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 2da2659f41..615690a603 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py
@@ -194,6 +194,7 @@ class ApplicationServiceTransactionWorkerStore( service: ApplicationService, events: List[EventBase], ephemeral: List[JsonDict], + synthetic_events: Optional[List[JsonDict]] = None, ) -> AppServiceTransaction: """Atomically creates a new transaction for this application service with the given list of events. Ephemeral events are NOT persisted to the @@ -233,7 +234,7 @@ class ApplicationServiceTransactionWorkerStore( (service.id, new_txn_id, event_ids), ) return AppServiceTransaction( - service=service, id=new_txn_id, events=events, ephemeral=ephemeral + service=service, id=new_txn_id, events=events, ephemeral=ephemeral, synthetic_events=synthetic_events ) return await self.db_pool.runInteraction(