summary refs log tree commit diff
path: root/synapse/appservice
diff options
context:
space:
mode:
authorWill Hunt <will@half-shot.uk>2020-10-15 17:33:28 +0100
committerGitHub <noreply@github.com>2020-10-15 12:33:28 -0400
commitc276bd996916adce899410b9c4c891892f51b992 (patch)
tree4e6e2737c82d963e8f00945f4e1b0fafcf835423 /synapse/appservice
parentAdd option to scripts-dev/lint.sh to only lint files changed since the last g... (diff)
downloadsynapse-c276bd996916adce899410b9c4c891892f51b992.tar.xz
Send some ephemeral events to appservices (#8437)
Optionally sends typing, presence, and read receipt information to appservices.
Diffstat (limited to 'synapse/appservice')
-rw-r--r--synapse/appservice/__init__.py180
-rw-r--r--synapse/appservice/api.py27
-rw-r--r--synapse/appservice/scheduler.py48
3 files changed, 172 insertions, 83 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 13ec1f71a6..3862d9c08f 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -14,14 +14,15 @@
 # limitations under the License.
 import logging
 import re
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Iterable, List, Match, Optional
 
 from synapse.api.constants import EventTypes
-from synapse.appservice.api import ApplicationServiceApi
-from synapse.types import GroupID, get_domain_from_id
+from synapse.events import EventBase
+from synapse.types import GroupID, JsonDict, UserID, get_domain_from_id
 from synapse.util.caches.descriptors import cached
 
 if TYPE_CHECKING:
+    from synapse.appservice.api import ApplicationServiceApi
     from synapse.storage.databases.main import DataStore
 
 logger = logging.getLogger(__name__)
@@ -32,38 +33,6 @@ class ApplicationServiceState:
     UP = "up"
 
 
-class AppServiceTransaction:
-    """Represents an application service transaction."""
-
-    def __init__(self, service, id, events):
-        self.service = service
-        self.id = id
-        self.events = events
-
-    async def send(self, as_api: ApplicationServiceApi) -> bool:
-        """Sends this transaction using the provided AS API interface.
-
-        Args:
-            as_api: The API to use to send.
-        Returns:
-            True if the transaction was sent.
-        """
-        return await as_api.push_bulk(
-            service=self.service, events=self.events, txn_id=self.id
-        )
-
-    async def complete(self, store: "DataStore") -> None:
-        """Completes this transaction as successful.
-
-        Marks this transaction ID on the application service and removes the
-        transaction contents from the database.
-
-        Args:
-            store: The database store to operate on.
-        """
-        await store.complete_appservice_txn(service=self.service, txn_id=self.id)
-
-
 class ApplicationService:
     """Defines an application service. This definition is mostly what is
     provided to the /register AS API.
@@ -91,6 +60,7 @@ class ApplicationService:
         protocols=None,
         rate_limited=True,
         ip_range_whitelist=None,
+        supports_ephemeral=False,
     ):
         self.token = token
         self.url = (
@@ -102,6 +72,7 @@ class ApplicationService:
         self.namespaces = self._check_namespaces(namespaces)
         self.id = id
         self.ip_range_whitelist = ip_range_whitelist
+        self.supports_ephemeral = supports_ephemeral
 
         if "|" in self.id:
             raise Exception("application service ID cannot contain '|' character")
@@ -161,19 +132,21 @@ class ApplicationService:
                     raise ValueError("Expected string for 'regex' in ns '%s'" % ns)
         return namespaces
 
-    def _matches_regex(self, test_string, namespace_key):
+    def _matches_regex(self, test_string: str, namespace_key: str) -> Optional[Match]:
         for regex_obj in self.namespaces[namespace_key]:
             if regex_obj["regex"].match(test_string):
                 return regex_obj
         return None
 
-    def _is_exclusive(self, ns_key, test_string):
+    def _is_exclusive(self, ns_key: str, test_string: str) -> bool:
         regex_obj = self._matches_regex(test_string, ns_key)
         if regex_obj:
             return regex_obj["exclusive"]
         return False
 
-    async def _matches_user(self, event, store):
+    async def _matches_user(
+        self, event: Optional[EventBase], store: Optional["DataStore"] = None
+    ) -> bool:
         if not event:
             return False
 
@@ -188,14 +161,23 @@ class ApplicationService:
         if not store:
             return False
 
-        does_match = await self._matches_user_in_member_list(event.room_id, store)
+        does_match = await self.matches_user_in_member_list(event.room_id, store)
         return does_match
 
-    @cached(num_args=1, cache_context=True)
-    async def _matches_user_in_member_list(self, room_id, store, cache_context):
-        member_list = await store.get_users_in_room(
-            room_id, on_invalidate=cache_context.invalidate
-        )
+    @cached(num_args=1)
+    async def matches_user_in_member_list(
+        self, room_id: str, store: "DataStore"
+    ) -> bool:
+        """Check if this service is interested a room based upon it's membership
+
+        Args:
+            room_id: The room to check.
+            store: The datastore to query.
+
+        Returns:
+            True if this service would like to know about this room.
+        """
+        member_list = await store.get_users_in_room(room_id)
 
         # check joined member events
         for user_id in member_list:
@@ -203,12 +185,14 @@ class ApplicationService:
                 return True
         return False
 
-    def _matches_room_id(self, event):
+    def _matches_room_id(self, event: EventBase) -> bool:
         if hasattr(event, "room_id"):
             return self.is_interested_in_room(event.room_id)
         return False
 
-    async def _matches_aliases(self, event, store):
+    async def _matches_aliases(
+        self, event: EventBase, store: Optional["DataStore"] = None
+    ) -> bool:
         if not store or not event:
             return False
 
@@ -218,12 +202,15 @@ class ApplicationService:
                 return True
         return False
 
-    async def is_interested(self, event, store=None) -> bool:
+    async def is_interested(
+        self, event: EventBase, store: Optional["DataStore"] = None
+    ) -> bool:
         """Check if this service is interested in this event.
 
         Args:
-            event(Event): The event to check.
-            store(DataStore)
+            event: The event to check.
+            store: The datastore to query.
+
         Returns:
             True if this service would like to know about this event.
         """
@@ -231,39 +218,66 @@ class ApplicationService:
         if self._matches_room_id(event):
             return True
 
+        # This will check the namespaces first before
+        # checking the store, so should be run before _matches_aliases
+        if await self._matches_user(event, store):
+            return True
+
+        # This will check the store, so should be run last
         if await self._matches_aliases(event, store):
             return True
 
-        if await self._matches_user(event, store):
+        return False
+
+    @cached(num_args=1)
+    async def is_interested_in_presence(
+        self, user_id: UserID, store: "DataStore"
+    ) -> bool:
+        """Check if this service is interested a user's presence
+
+        Args:
+            user_id: The user to check.
+            store: The datastore to query.
+
+        Returns:
+            True if this service would like to know about presence for this user.
+        """
+        # Find all the rooms the sender is in
+        if self.is_interested_in_user(user_id.to_string()):
             return True
+        room_ids = await store.get_rooms_for_user(user_id.to_string())
 
+        # Then find out if the appservice is interested in any of those rooms
+        for room_id in room_ids:
+            if await self.matches_user_in_member_list(room_id, store):
+                return True
         return False
 
-    def is_interested_in_user(self, user_id):
+    def is_interested_in_user(self, user_id: str) -> bool:
         return (
-            self._matches_regex(user_id, ApplicationService.NS_USERS)
+            bool(self._matches_regex(user_id, ApplicationService.NS_USERS))
             or user_id == self.sender
         )
 
-    def is_interested_in_alias(self, alias):
+    def is_interested_in_alias(self, alias: str) -> bool:
         return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES))
 
-    def is_interested_in_room(self, room_id):
+    def is_interested_in_room(self, room_id: str) -> bool:
         return bool(self._matches_regex(room_id, ApplicationService.NS_ROOMS))
 
-    def is_exclusive_user(self, user_id):
+    def is_exclusive_user(self, user_id: str) -> bool:
         return (
             self._is_exclusive(ApplicationService.NS_USERS, user_id)
             or user_id == self.sender
         )
 
-    def is_interested_in_protocol(self, protocol):
+    def is_interested_in_protocol(self, protocol: str) -> bool:
         return protocol in self.protocols
 
-    def is_exclusive_alias(self, alias):
+    def is_exclusive_alias(self, alias: str) -> bool:
         return self._is_exclusive(ApplicationService.NS_ALIASES, alias)
 
-    def is_exclusive_room(self, room_id):
+    def is_exclusive_room(self, room_id: str) -> bool:
         return self._is_exclusive(ApplicationService.NS_ROOMS, room_id)
 
     def get_exclusive_user_regexes(self):
@@ -276,14 +290,14 @@ class ApplicationService:
             if regex_obj["exclusive"]
         ]
 
-    def get_groups_for_user(self, user_id):
+    def get_groups_for_user(self, user_id: str) -> Iterable[str]:
         """Get the groups that this user is associated with by this AS
 
         Args:
-            user_id (str): The ID of the user.
+            user_id: The ID of the user.
 
         Returns:
-            iterable[str]: an iterable that yields group_id strings.
+            An iterable that yields group_id strings.
         """
         return (
             regex_obj["group_id"]
@@ -291,7 +305,7 @@ class ApplicationService:
             if "group_id" in regex_obj and regex_obj["regex"].match(user_id)
         )
 
-    def is_rate_limited(self):
+    def is_rate_limited(self) -> bool:
         return self.rate_limited
 
     def __str__(self):
@@ -300,3 +314,45 @@ class ApplicationService:
         dict_copy["token"] = "<redacted>"
         dict_copy["hs_token"] = "<redacted>"
         return "ApplicationService: %s" % (dict_copy,)
+
+
+class AppServiceTransaction:
+    """Represents an application service transaction."""
+
+    def __init__(
+        self,
+        service: ApplicationService,
+        id: int,
+        events: List[EventBase],
+        ephemeral: List[JsonDict],
+    ):
+        self.service = service
+        self.id = id
+        self.events = events
+        self.ephemeral = ephemeral
+
+    async def send(self, as_api: "ApplicationServiceApi") -> bool:
+        """Sends this transaction using the provided AS API interface.
+
+        Args:
+            as_api: The API to use to send.
+        Returns:
+            True if the transaction was sent.
+        """
+        return await as_api.push_bulk(
+            service=self.service,
+            events=self.events,
+            ephemeral=self.ephemeral,
+            txn_id=self.id,
+        )
+
+    async def complete(self, store: "DataStore") -> None:
+        """Completes this transaction as successful.
+
+        Marks this transaction ID on the application service and removes the
+        transaction contents from the database.
+
+        Args:
+            store: The database store to operate on.
+        """
+        await store.complete_appservice_txn(service=self.service, txn_id=self.id)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index e8f0793795..e366a982b8 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -14,12 +14,13 @@
 # limitations under the License.
 import logging
 import urllib
-from typing import TYPE_CHECKING, Optional, Tuple
+from typing import TYPE_CHECKING, List, Optional, Tuple
 
 from prometheus_client import Counter
 
 from synapse.api.constants import EventTypes, ThirdPartyEntityKind
 from synapse.api.errors import CodeMessageException
+from synapse.events import EventBase
 from synapse.events.utils import serialize_event
 from synapse.http.client import SimpleHttpClient
 from synapse.types import JsonDict, ThirdPartyInstanceID
@@ -201,7 +202,13 @@ class ApplicationServiceApi(SimpleHttpClient):
         key = (service.id, protocol)
         return await self.protocol_meta_cache.wrap(key, _get)
 
-    async def push_bulk(self, service, events, txn_id=None):
+    async def push_bulk(
+        self,
+        service: "ApplicationService",
+        events: List[EventBase],
+        ephemeral: List[JsonDict],
+        txn_id: Optional[int] = None,
+    ):
         if service.url is None:
             return True
 
@@ -211,15 +218,19 @@ class ApplicationServiceApi(SimpleHttpClient):
             logger.warning(
                 "push_bulk: Missing txn ID sending events to %s", service.url
             )
-            txn_id = str(0)
-        txn_id = str(txn_id)
+            txn_id = 0
+
+        uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
+
+        # Never send ephemeral events to appservices that do not support it
+        if service.supports_ephemeral:
+            body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral}
+        else:
+            body = {"events": events}
 
-        uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id))
         try:
             await self.put_json(
-                uri=uri,
-                json_body={"events": events},
-                args={"access_token": service.hs_token},
+                uri=uri, json_body=body, args={"access_token": service.hs_token},
             )
             sent_transactions_counter.labels(service.id).inc()
             sent_events_counter.labels(service.id).inc(len(events))
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 8eb8c6f51c..ad3c408519 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -49,10 +49,13 @@ This is all tied together by the AppServiceScheduler which DIs the required
 components.
 """
 import logging
+from typing import List
 
-from synapse.appservice import ApplicationServiceState
+from synapse.appservice import ApplicationService, ApplicationServiceState
+from synapse.events import EventBase
 from synapse.logging.context import run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import JsonDict
 
 logger = logging.getLogger(__name__)
 
@@ -82,8 +85,13 @@ class ApplicationServiceScheduler:
         for service in services:
             self.txn_ctrl.start_recoverer(service)
 
-    def submit_event_for_as(self, service, event):
-        self.queuer.enqueue(service, event)
+    def submit_event_for_as(self, service: ApplicationService, event: EventBase):
+        self.queuer.enqueue_event(service, event)
+
+    def submit_ephemeral_events_for_as(
+        self, service: ApplicationService, events: List[JsonDict]
+    ):
+        self.queuer.enqueue_ephemeral(service, events)
 
 
 class _ServiceQueuer:
@@ -96,17 +104,15 @@ class _ServiceQueuer:
 
     def __init__(self, txn_ctrl, clock):
         self.queued_events = {}  # dict of {service_id: [events]}
+        self.queued_ephemeral = {}  # dict of {service_id: [events]}
 
         # the appservices which currently have a transaction in flight
         self.requests_in_flight = set()
         self.txn_ctrl = txn_ctrl
         self.clock = clock
 
-    def enqueue(self, service, event):
-        self.queued_events.setdefault(service.id, []).append(event)
-
+    def _start_background_request(self, service):
         # start a sender for this appservice if we don't already have one
-
         if service.id in self.requests_in_flight:
             return
 
@@ -114,7 +120,15 @@ class _ServiceQueuer:
             "as-sender-%s" % (service.id,), self._send_request, service
         )
 
-    async def _send_request(self, service):
+    def enqueue_event(self, service: ApplicationService, event: EventBase):
+        self.queued_events.setdefault(service.id, []).append(event)
+        self._start_background_request(service)
+
+    def enqueue_ephemeral(self, service: ApplicationService, events: List[JsonDict]):
+        self.queued_ephemeral.setdefault(service.id, []).extend(events)
+        self._start_background_request(service)
+
+    async def _send_request(self, service: ApplicationService):
         # sanity-check: we shouldn't get here if this service already has a sender
         # running.
         assert service.id not in self.requests_in_flight
@@ -123,10 +137,11 @@ class _ServiceQueuer:
         try:
             while True:
                 events = self.queued_events.pop(service.id, [])
-                if not events:
+                ephemeral = self.queued_ephemeral.pop(service.id, [])
+                if not events and not ephemeral:
                     return
                 try:
-                    await self.txn_ctrl.send(service, events)
+                    await self.txn_ctrl.send(service, events, ephemeral)
                 except Exception:
                     logger.exception("AS request failed")
         finally:
@@ -158,9 +173,16 @@ class _TransactionController:
         # for UTs
         self.RECOVERER_CLASS = _Recoverer
 
-    async def send(self, service, events):
+    async def send(
+        self,
+        service: ApplicationService,
+        events: List[EventBase],
+        ephemeral: List[JsonDict] = [],
+    ):
         try:
-            txn = await self.store.create_appservice_txn(service=service, events=events)
+            txn = await self.store.create_appservice_txn(
+                service=service, events=events, ephemeral=ephemeral
+            )
             service_is_up = await self._is_service_up(service)
             if service_is_up:
                 sent = await txn.send(self.as_api)
@@ -204,7 +226,7 @@ class _TransactionController:
         recoverer.recover()
         logger.info("Now %i active recoverers", len(self.recoverers))
 
-    async def _is_service_up(self, service):
+    async def _is_service_up(self, service: ApplicationService) -> bool:
         state = await self.store.get_appservice_state(service)
         return state == ApplicationServiceState.UP or state is None