diff options
-rw-r--r-- | synapse/appservice/__init__.py | 6 | ||||
-rw-r--r-- | synapse/appservice/api.py | 22 | ||||
-rw-r--r-- | synapse/appservice/scheduler.py | 8 | ||||
-rw-r--r-- | synapse/config/appservice.py | 3 |
4 files changed, 35 insertions, 4 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 13ec1f71a6..8737b34e54 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -91,6 +91,7 @@ class ApplicationService: protocols=None, rate_limited=True, ip_range_whitelist=None, + supports_ephemeral=False, ): self.token = token self.url = ( @@ -102,6 +103,7 @@ class ApplicationService: self.namespaces = self._check_namespaces(namespaces) self.id = id self.ip_range_whitelist = ip_range_whitelist + self.supports_ephemeral = supports_ephemeral if "|" in self.id: raise Exception("application service ID cannot contain '|' character") @@ -188,11 +190,11 @@ class ApplicationService: if not store: return False - does_match = await self._matches_user_in_member_list(event.room_id, store) + does_match = await self.matches_user_in_member_list(event.room_id, store) return does_match @cached(num_args=1, cache_context=True) - async def _matches_user_in_member_list(self, room_id, store, cache_context): + async def matches_user_in_member_list(self, room_id, store, cache_context): member_list = await store.get_users_in_room( room_id, on_invalidate=cache_context.invalidate ) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 1514c0f691..48982d2ad3 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -201,6 +201,28 @@ class ApplicationServiceApi(SimpleHttpClient): key = (service.id, protocol) return await self.protocol_meta_cache.wrap(key, _get) + async def push_ephemeral(self, service, events): + if service.url is None: + return True + if service.supports_ephemeral is False: + return True + + uri = service.url + ( + "%s/uk.half-shot.appservice/ephemeral" % APP_SERVICE_PREFIX + ) + try: + await self.put_json( + uri=uri, + json_body={"events": events}, + args={"access_token": service.hs_token}, + ) + return True + except CodeMessageException as e: + logger.warning("push_ephemeral to %s received %s", uri, e.code) + except Exception as ex: + logger.warning("push_ephemeral to %s threw exception %s", uri, ex) + return False + async def push_bulk(self, service, events, txn_id=None): if service.url is None: return True diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 8eb8c6f51c..74bd095677 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -85,6 +85,10 @@ class ApplicationServiceScheduler: def submit_event_for_as(self, service, event): self.queuer.enqueue(service, event) + async def submit_ephemeral_events_for_as(self, service, events): + if self.txn_ctrl.is_service_up(service): + await self.as_api.push_ephemeral(service, events) + class _ServiceQueuer: """Queue of events waiting to be sent to appservices. @@ -161,7 +165,7 @@ class _TransactionController: async def send(self, service, events): try: txn = await self.store.create_appservice_txn(service=service, events=events) - service_is_up = await self._is_service_up(service) + service_is_up = await self.is_service_up(service) if service_is_up: sent = await txn.send(self.as_api) if sent: @@ -204,7 +208,7 @@ class _TransactionController: recoverer.recover() logger.info("Now %i active recoverers", len(self.recoverers)) - async def _is_service_up(self, service): + async def is_service_up(self, service): state = await self.store.get_appservice_state(service) return state == ApplicationServiceState.UP or state is None diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index 8ed3e24258..013cd0fd96 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -160,6 +160,8 @@ def _load_appservice(hostname, as_info, config_filename): if as_info.get("ip_range_whitelist"): ip_range_whitelist = IPSet(as_info.get("ip_range_whitelist")) + supports_ephemeral = as_info.get("uk.half-shot.appservice.push_ephemeral", False) + return ApplicationService( token=as_info["as_token"], hostname=hostname, @@ -168,6 +170,7 @@ def _load_appservice(hostname, as_info, config_filename): hs_token=as_info["hs_token"], sender=user_id, id=as_info["id"], + supports_ephemeral=supports_ephemeral, protocols=protocols, rate_limited=rate_limited, ip_range_whitelist=ip_range_whitelist, |