summary refs log tree commit diff
diff options
context:
space:
mode:
authorWill Hunt <will@half-shot.uk>2020-10-15 17:33:28 +0100
committerGitHub <noreply@github.com>2020-10-15 12:33:28 -0400
commitc276bd996916adce899410b9c4c891892f51b992 (patch)
tree4e6e2737c82d963e8f00945f4e1b0fafcf835423
parentAdd option to scripts-dev/lint.sh to only lint files changed since the last g... (diff)
downloadsynapse-c276bd996916adce899410b9c4c891892f51b992.tar.xz
Send some ephemeral events to appservices (#8437)
Optionally sends typing, presence, and read receipt information to appservices.
-rw-r--r--changelog.d/8437.feature1
-rw-r--r--mypy.ini1
-rw-r--r--synapse/appservice/__init__.py180
-rw-r--r--synapse/appservice/api.py27
-rw-r--r--synapse/appservice/scheduler.py48
-rw-r--r--synapse/config/appservice.py3
-rw-r--r--synapse/handlers/appservice.py109
-rw-r--r--synapse/handlers/receipts.py35
-rw-r--r--synapse/handlers/sync.py1
-rw-r--r--synapse/handlers/typing.py31
-rw-r--r--synapse/notifier.py25
-rw-r--r--synapse/storage/databases/main/appservice.py66
-rw-r--r--synapse/storage/databases/main/receipts.py55
-rw-r--r--synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql18
-rw-r--r--tests/appservice/test_scheduler.py77
-rw-r--r--tests/storage/test_appservice.py8
16 files changed, 563 insertions, 122 deletions
diff --git a/changelog.d/8437.feature b/changelog.d/8437.feature
new file mode 100644
index 0000000000..4abcccb326
--- /dev/null
+++ b/changelog.d/8437.feature
@@ -0,0 +1 @@
+Implement [MSC2409](https://github.com/matrix-org/matrix-doc/pull/2409) to send typing, read receipts, and presence events to appservices.
diff --git a/mypy.ini b/mypy.ini
index 9748f6258c..b5db54ee3b 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -15,6 +15,7 @@ files =
   synapse/events/builder.py,
   synapse/events/spamcheck.py,
   synapse/federation,
+  synapse/handlers/appservice.py,
   synapse/handlers/account_data.py,
   synapse/handlers/auth.py,
   synapse/handlers/cas_handler.py,
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 13ec1f71a6..3862d9c08f 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -14,14 +14,15 @@
 # limitations under the License.
 import logging
 import re
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Iterable, List, Match, Optional
 
 from synapse.api.constants import EventTypes
-from synapse.appservice.api import ApplicationServiceApi
-from synapse.types import GroupID, get_domain_from_id
+from synapse.events import EventBase
+from synapse.types import GroupID, JsonDict, UserID, get_domain_from_id
 from synapse.util.caches.descriptors import cached
 
 if TYPE_CHECKING:
+    from synapse.appservice.api import ApplicationServiceApi
     from synapse.storage.databases.main import DataStore
 
 logger = logging.getLogger(__name__)
@@ -32,38 +33,6 @@ class ApplicationServiceState:
     UP = "up"
 
 
-class AppServiceTransaction:
-    """Represents an application service transaction."""
-
-    def __init__(self, service, id, events):
-        self.service = service
-        self.id = id
-        self.events = events
-
-    async def send(self, as_api: ApplicationServiceApi) -> bool:
-        """Sends this transaction using the provided AS API interface.
-
-        Args:
-            as_api: The API to use to send.
-        Returns:
-            True if the transaction was sent.
-        """
-        return await as_api.push_bulk(
-            service=self.service, events=self.events, txn_id=self.id
-        )
-
-    async def complete(self, store: "DataStore") -> None:
-        """Completes this transaction as successful.
-
-        Marks this transaction ID on the application service and removes the
-        transaction contents from the database.
-
-        Args:
-            store: The database store to operate on.
-        """
-        await store.complete_appservice_txn(service=self.service, txn_id=self.id)
-
-
 class ApplicationService:
     """Defines an application service. This definition is mostly what is
     provided to the /register AS API.
@@ -91,6 +60,7 @@ class ApplicationService:
         protocols=None,
         rate_limited=True,
         ip_range_whitelist=None,
+        supports_ephemeral=False,
     ):
         self.token = token
         self.url = (
@@ -102,6 +72,7 @@ class ApplicationService:
         self.namespaces = self._check_namespaces(namespaces)
         self.id = id
         self.ip_range_whitelist = ip_range_whitelist
+        self.supports_ephemeral = supports_ephemeral
 
         if "|" in self.id:
             raise Exception("application service ID cannot contain '|' character")
@@ -161,19 +132,21 @@ class ApplicationService:
                     raise ValueError("Expected string for 'regex' in ns '%s'" % ns)
         return namespaces
 
-    def _matches_regex(self, test_string, namespace_key):
+    def _matches_regex(self, test_string: str, namespace_key: str) -> Optional[Match]:
         for regex_obj in self.namespaces[namespace_key]:
             if regex_obj["regex"].match(test_string):
                 return regex_obj
         return None
 
-    def _is_exclusive(self, ns_key, test_string):
+    def _is_exclusive(self, ns_key: str, test_string: str) -> bool:
         regex_obj = self._matches_regex(test_string, ns_key)
         if regex_obj:
             return regex_obj["exclusive"]
         return False
 
-    async def _matches_user(self, event, store):
+    async def _matches_user(
+        self, event: Optional[EventBase], store: Optional["DataStore"] = None
+    ) -> bool:
         if not event:
             return False
 
@@ -188,14 +161,23 @@ class ApplicationService:
         if not store:
             return False
 
-        does_match = await self._matches_user_in_member_list(event.room_id, store)
+        does_match = await self.matches_user_in_member_list(event.room_id, store)
         return does_match
 
-    @cached(num_args=1, cache_context=True)
-    async def _matches_user_in_member_list(self, room_id, store, cache_context):
-        member_list = await store.get_users_in_room(
-            room_id, on_invalidate=cache_context.invalidate
-        )
+    @cached(num_args=1)
+    async def matches_user_in_member_list(
+        self, room_id: str, store: "DataStore"
+    ) -> bool:
+        """Check if this service is interested a room based upon it's membership
+
+        Args:
+            room_id: The room to check.
+            store: The datastore to query.
+
+        Returns:
+            True if this service would like to know about this room.
+        """
+        member_list = await store.get_users_in_room(room_id)
 
         # check joined member events
         for user_id in member_list:
@@ -203,12 +185,14 @@ class ApplicationService:
                 return True
         return False
 
-    def _matches_room_id(self, event):
+    def _matches_room_id(self, event: EventBase) -> bool:
         if hasattr(event, "room_id"):
             return self.is_interested_in_room(event.room_id)
         return False
 
-    async def _matches_aliases(self, event, store):
+    async def _matches_aliases(
+        self, event: EventBase, store: Optional["DataStore"] = None
+    ) -> bool:
         if not store or not event:
             return False
 
@@ -218,12 +202,15 @@ class ApplicationService:
                 return True
         return False
 
-    async def is_interested(self, event, store=None) -> bool:
+    async def is_interested(
+        self, event: EventBase, store: Optional["DataStore"] = None
+    ) -> bool:
         """Check if this service is interested in this event.
 
         Args:
-            event(Event): The event to check.
-            store(DataStore)
+            event: The event to check.
+            store: The datastore to query.
+
         Returns:
             True if this service would like to know about this event.
         """
@@ -231,39 +218,66 @@ class ApplicationService:
         if self._matches_room_id(event):
             return True
 
+        # This will check the namespaces first before
+        # checking the store, so should be run before _matches_aliases
+        if await self._matches_user(event, store):
+            return True
+
+        # This will check the store, so should be run last
         if await self._matches_aliases(event, store):
             return True
 
-        if await self._matches_user(event, store):
+        return False
+
+    @cached(num_args=1)
+    async def is_interested_in_presence(
+        self, user_id: UserID, store: "DataStore"
+    ) -> bool:
+        """Check if this service is interested a user's presence
+
+        Args:
+            user_id: The user to check.
+            store: The datastore to query.
+
+        Returns:
+            True if this service would like to know about presence for this user.
+        """
+        # Find all the rooms the sender is in
+        if self.is_interested_in_user(user_id.to_string()):
             return True
+        room_ids = await store.get_rooms_for_user(user_id.to_string())
 
+        # Then find out if the appservice is interested in any of those rooms
+        for room_id in room_ids:
+            if await self.matches_user_in_member_list(room_id, store):
+                return True
         return False
 
-    def is_interested_in_user(self, user_id):
+    def is_interested_in_user(self, user_id: str) -> bool:
         return (
-            self._matches_regex(user_id, ApplicationService.NS_USERS)
+            bool(self._matches_regex(user_id, ApplicationService.NS_USERS))
             or user_id == self.sender
         )
 
-    def is_interested_in_alias(self, alias):
+    def is_interested_in_alias(self, alias: str) -> bool:
         return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES))
 
-    def is_interested_in_room(self, room_id):
+    def is_interested_in_room(self, room_id: str) -> bool:
         return bool(self._matches_regex(room_id, ApplicationService.NS_ROOMS))
 
-    def is_exclusive_user(self, user_id):
+    def is_exclusive_user(self, user_id: str) -> bool:
         return (
             self._is_exclusive(ApplicationService.NS_USERS, user_id)
             or user_id == self.sender
         )
 
-    def is_interested_in_protocol(self, protocol):
+    def is_interested_in_protocol(self, protocol: str) -> bool:
         return protocol in self.protocols
 
-    def is_exclusive_alias(self, alias):
+    def is_exclusive_alias(self, alias: str) -> bool:
         return self._is_exclusive(ApplicationService.NS_ALIASES, alias)
 
-    def is_exclusive_room(self, room_id):
+    def is_exclusive_room(self, room_id: str) -> bool:
         return self._is_exclusive(ApplicationService.NS_ROOMS, room_id)
 
     def get_exclusive_user_regexes(self):
@@ -276,14 +290,14 @@ class ApplicationService:
             if regex_obj["exclusive"]
         ]
 
-    def get_groups_for_user(self, user_id):
+    def get_groups_for_user(self, user_id: str) -> Iterable[str]:
         """Get the groups that this user is associated with by this AS
 
         Args:
-            user_id (str): The ID of the user.
+            user_id: The ID of the user.
 
         Returns:
-            iterable[str]: an iterable that yields group_id strings.
+            An iterable that yields group_id strings.
         """
         return (
             regex_obj["group_id"]
@@ -291,7 +305,7 @@ class ApplicationService:
             if "group_id" in regex_obj and regex_obj["regex"].match(user_id)
         )
 
-    def is_rate_limited(self):
+    def is_rate_limited(self) -> bool:
         return self.rate_limited
 
     def __str__(self):
@@ -300,3 +314,45 @@ class ApplicationService:
         dict_copy["token"] = "<redacted>"
         dict_copy["hs_token"] = "<redacted>"
         return "ApplicationService: %s" % (dict_copy,)
+
+
+class AppServiceTransaction:
+    """Represents an application service transaction."""
+
+    def __init__(
+        self,
+        service: ApplicationService,
+        id: int,
+        events: List[EventBase],
+        ephemeral: List[JsonDict],
+    ):
+        self.service = service
+        self.id = id
+        self.events = events
+        self.ephemeral = ephemeral
+
+    async def send(self, as_api: "ApplicationServiceApi") -> bool:
+        """Sends this transaction using the provided AS API interface.
+
+        Args:
+            as_api: The API to use to send.
+        Returns:
+            True if the transaction was sent.
+        """
+        return await as_api.push_bulk(
+            service=self.service,
+            events=self.events,
+            ephemeral=self.ephemeral,
+            txn_id=self.id,
+        )
+
+    async def complete(self, store: "DataStore") -> None:
+        """Completes this transaction as successful.
+
+        Marks this transaction ID on the application service and removes the
+        transaction contents from the database.
+
+        Args:
+            store: The database store to operate on.
+        """
+        await store.complete_appservice_txn(service=self.service, txn_id=self.id)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index e8f0793795..e366a982b8 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -14,12 +14,13 @@
 # limitations under the License.
 import logging
 import urllib
-from typing import TYPE_CHECKING, Optional, Tuple
+from typing import TYPE_CHECKING, List, Optional, Tuple
 
 from prometheus_client import Counter
 
 from synapse.api.constants import EventTypes, ThirdPartyEntityKind
 from synapse.api.errors import CodeMessageException
+from synapse.events import EventBase
 from synapse.events.utils import serialize_event
 from synapse.http.client import SimpleHttpClient
 from synapse.types import JsonDict, ThirdPartyInstanceID
@@ -201,7 +202,13 @@ class ApplicationServiceApi(SimpleHttpClient):
         key = (service.id, protocol)
         return await self.protocol_meta_cache.wrap(key, _get)
 
-    async def push_bulk(self, service, events, txn_id=None):
+    async def push_bulk(
+        self,
+        service: "ApplicationService",
+        events: List[EventBase],
+        ephemeral: List[JsonDict],
+        txn_id: Optional[int] = None,
+    ):
         if service.url is None:
             return True
 
@@ -211,15 +218,19 @@ class ApplicationServiceApi(SimpleHttpClient):
             logger.warning(
                 "push_bulk: Missing txn ID sending events to %s", service.url
             )
-            txn_id = str(0)
-        txn_id = str(txn_id)
+            txn_id = 0
+
+        uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
+
+        # Never send ephemeral events to appservices that do not support it
+        if service.supports_ephemeral:
+            body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral}
+        else:
+            body = {"events": events}
 
-        uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id))
         try:
             await self.put_json(
-                uri=uri,
-                json_body={"events": events},
-                args={"access_token": service.hs_token},
+                uri=uri, json_body=body, args={"access_token": service.hs_token},
             )
             sent_transactions_counter.labels(service.id).inc()
             sent_events_counter.labels(service.id).inc(len(events))
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 8eb8c6f51c..ad3c408519 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -49,10 +49,13 @@ This is all tied together by the AppServiceScheduler which DIs the required
 components.
 """
 import logging
+from typing import List
 
-from synapse.appservice import ApplicationServiceState
+from synapse.appservice import ApplicationService, ApplicationServiceState
+from synapse.events import EventBase
 from synapse.logging.context import run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import JsonDict
 
 logger = logging.getLogger(__name__)
 
@@ -82,8 +85,13 @@ class ApplicationServiceScheduler:
         for service in services:
             self.txn_ctrl.start_recoverer(service)
 
-    def submit_event_for_as(self, service, event):
-        self.queuer.enqueue(service, event)
+    def submit_event_for_as(self, service: ApplicationService, event: EventBase):
+        self.queuer.enqueue_event(service, event)
+
+    def submit_ephemeral_events_for_as(
+        self, service: ApplicationService, events: List[JsonDict]
+    ):
+        self.queuer.enqueue_ephemeral(service, events)
 
 
 class _ServiceQueuer:
@@ -96,17 +104,15 @@ class _ServiceQueuer:
 
     def __init__(self, txn_ctrl, clock):
         self.queued_events = {}  # dict of {service_id: [events]}
+        self.queued_ephemeral = {}  # dict of {service_id: [events]}
 
         # the appservices which currently have a transaction in flight
         self.requests_in_flight = set()
         self.txn_ctrl = txn_ctrl
         self.clock = clock
 
-    def enqueue(self, service, event):
-        self.queued_events.setdefault(service.id, []).append(event)
-
+    def _start_background_request(self, service):
         # start a sender for this appservice if we don't already have one
-
         if service.id in self.requests_in_flight:
             return
 
@@ -114,7 +120,15 @@ class _ServiceQueuer:
             "as-sender-%s" % (service.id,), self._send_request, service
         )
 
-    async def _send_request(self, service):
+    def enqueue_event(self, service: ApplicationService, event: EventBase):
+        self.queued_events.setdefault(service.id, []).append(event)
+        self._start_background_request(service)
+
+    def enqueue_ephemeral(self, service: ApplicationService, events: List[JsonDict]):
+        self.queued_ephemeral.setdefault(service.id, []).extend(events)
+        self._start_background_request(service)
+
+    async def _send_request(self, service: ApplicationService):
         # sanity-check: we shouldn't get here if this service already has a sender
         # running.
         assert service.id not in self.requests_in_flight
@@ -123,10 +137,11 @@ class _ServiceQueuer:
         try:
             while True:
                 events = self.queued_events.pop(service.id, [])
-                if not events:
+                ephemeral = self.queued_ephemeral.pop(service.id, [])
+                if not events and not ephemeral:
                     return
                 try:
-                    await self.txn_ctrl.send(service, events)
+                    await self.txn_ctrl.send(service, events, ephemeral)
                 except Exception:
                     logger.exception("AS request failed")
         finally:
@@ -158,9 +173,16 @@ class _TransactionController:
         # for UTs
         self.RECOVERER_CLASS = _Recoverer
 
-    async def send(self, service, events):
+    async def send(
+        self,
+        service: ApplicationService,
+        events: List[EventBase],
+        ephemeral: List[JsonDict] = [],
+    ):
         try:
-            txn = await self.store.create_appservice_txn(service=service, events=events)
+            txn = await self.store.create_appservice_txn(
+                service=service, events=events, ephemeral=ephemeral
+            )
             service_is_up = await self._is_service_up(service)
             if service_is_up:
                 sent = await txn.send(self.as_api)
@@ -204,7 +226,7 @@ class _TransactionController:
         recoverer.recover()
         logger.info("Now %i active recoverers", len(self.recoverers))
 
-    async def _is_service_up(self, service):
+    async def _is_service_up(self, service: ApplicationService) -> bool:
         state = await self.store.get_appservice_state(service)
         return state == ApplicationServiceState.UP or state is None
 
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
index 8ed3e24258..746fc3cc02 100644
--- a/synapse/config/appservice.py
+++ b/synapse/config/appservice.py
@@ -160,6 +160,8 @@ def _load_appservice(hostname, as_info, config_filename):
     if as_info.get("ip_range_whitelist"):
         ip_range_whitelist = IPSet(as_info.get("ip_range_whitelist"))
 
+    supports_ephemeral = as_info.get("de.sorunome.msc2409.push_ephemeral", False)
+
     return ApplicationService(
         token=as_info["as_token"],
         hostname=hostname,
@@ -168,6 +170,7 @@ def _load_appservice(hostname, as_info, config_filename):
         hs_token=as_info["hs_token"],
         sender=user_id,
         id=as_info["id"],
+        supports_ephemeral=supports_ephemeral,
         protocols=protocols,
         rate_limited=rate_limited,
         ip_range_whitelist=ip_range_whitelist,
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index c8d5e58035..07240d3a14 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import logging
+from typing import Dict, List, Optional
 
 from prometheus_client import Counter
 
@@ -21,13 +22,16 @@ from twisted.internet import defer
 
 import synapse
 from synapse.api.constants import EventTypes
+from synapse.appservice import ApplicationService
+from synapse.events import EventBase
+from synapse.handlers.presence import format_user_presence_state
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.metrics import (
     event_processing_loop_counter,
     event_processing_loop_room_count,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import RoomStreamToken
+from synapse.types import Collection, JsonDict, RoomStreamToken, UserID
 from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
@@ -44,6 +48,7 @@ class ApplicationServicesHandler:
         self.started_scheduler = False
         self.clock = hs.get_clock()
         self.notify_appservices = hs.config.notify_appservices
+        self.event_sources = hs.get_event_sources()
 
         self.current_max = 0
         self.is_processing = False
@@ -82,7 +87,7 @@ class ApplicationServicesHandler:
                     if not events:
                         break
 
-                    events_by_room = {}
+                    events_by_room = {}  # type: Dict[str, List[EventBase]]
                     for event in events:
                         events_by_room.setdefault(event.room_id, []).append(event)
 
@@ -161,6 +166,104 @@ class ApplicationServicesHandler:
             finally:
                 self.is_processing = False
 
+    async def notify_interested_services_ephemeral(
+        self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [],
+    ):
+        """This is called by the notifier in the background
+        when a ephemeral event handled by the homeserver.
+
+        This will determine which appservices
+        are interested in the event, and submit them.
+
+        Events will only be pushed to appservices
+        that have opted into ephemeral events
+
+        Args:
+            stream_key: The stream the event came from.
+            new_token: The latest stream token
+            users: The user(s) involved with the event.
+        """
+        services = [
+            service
+            for service in self.store.get_app_services()
+            if service.supports_ephemeral
+        ]
+        if not services or not self.notify_appservices:
+            return
+        logger.info("Checking interested services for %s" % (stream_key))
+        with Measure(self.clock, "notify_interested_services_ephemeral"):
+            for service in services:
+                # Only handle typing if we have the latest token
+                if stream_key == "typing_key" and new_token is not None:
+                    events = await self._handle_typing(service, new_token)
+                    if events:
+                        self.scheduler.submit_ephemeral_events_for_as(service, events)
+                    # We don't persist the token for typing_key for performance reasons
+                elif stream_key == "receipt_key":
+                    events = await self._handle_receipts(service)
+                    if events:
+                        self.scheduler.submit_ephemeral_events_for_as(service, events)
+                        await self.store.set_type_stream_id_for_appservice(
+                            service, "read_receipt", new_token
+                        )
+                elif stream_key == "presence_key":
+                    events = await self._handle_presence(service, users)
+                    if events:
+                        self.scheduler.submit_ephemeral_events_for_as(service, events)
+                        await self.store.set_type_stream_id_for_appservice(
+                            service, "presence", new_token
+                        )
+
+    async def _handle_typing(self, service: ApplicationService, new_token: int):
+        typing_source = self.event_sources.sources["typing"]
+        # Get the typing events from just before current
+        typing, _ = await typing_source.get_new_events_as(
+            service=service,
+            # For performance reasons, we don't persist the previous
+            # token in the DB and instead fetch the latest typing information
+            # for appservices.
+            from_key=new_token - 1,
+        )
+        return typing
+
+    async def _handle_receipts(self, service: ApplicationService):
+        from_key = await self.store.get_type_stream_id_for_appservice(
+            service, "read_receipt"
+        )
+        receipts_source = self.event_sources.sources["receipt"]
+        receipts, _ = await receipts_source.get_new_events_as(
+            service=service, from_key=from_key
+        )
+        return receipts
+
+    async def _handle_presence(
+        self, service: ApplicationService, users: Collection[UserID]
+    ):
+        events = []  # type: List[JsonDict]
+        presence_source = self.event_sources.sources["presence"]
+        from_key = await self.store.get_type_stream_id_for_appservice(
+            service, "presence"
+        )
+        for user in users:
+            interested = await service.is_interested_in_presence(user, self.store)
+            if not interested:
+                continue
+            presence_events, _ = await presence_source.get_new_events(
+                user=user, service=service, from_key=from_key,
+            )
+            time_now = self.clock.time_msec()
+            presence_events = [
+                {
+                    "type": "m.presence",
+                    "sender": event.user_id,
+                    "content": format_user_presence_state(
+                        event, time_now, include_user_id=False
+                    ),
+                }
+                for event in presence_events
+            ]
+            events = events + presence_events
+
     async def query_user_exists(self, user_id):
         """Check if any application service knows this user_id exists.
 
@@ -223,7 +326,7 @@ class ApplicationServicesHandler:
 
     async def get_3pe_protocols(self, only_protocol=None):
         services = self.store.get_app_services()
-        protocols = {}
+        protocols = {}  # type: Dict[str, List[JsonDict]]
 
         # Collect up all the individual protocol responses out of the ASes
         for s in services:
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 7225923757..c242c409cf 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -13,9 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+from typing import List, Tuple
 
+from synapse.appservice import ApplicationService
 from synapse.handlers._base import BaseHandler
-from synapse.types import ReadReceipt, get_domain_from_id
+from synapse.types import JsonDict, ReadReceipt, get_domain_from_id
 from synapse.util.async_helpers import maybe_awaitable
 
 logger = logging.getLogger(__name__)
@@ -140,5 +142,36 @@ class ReceiptEventSource:
 
         return (events, to_key)
 
+    async def get_new_events_as(
+        self, from_key: int, service: ApplicationService
+    ) -> Tuple[List[JsonDict], int]:
+        """Returns a set of new receipt events that an appservice
+        may be interested in.
+
+        Args:
+            from_key: the stream position at which events should be fetched from
+            service: The appservice which may be interested
+        """
+        from_key = int(from_key)
+        to_key = self.get_current_key()
+
+        if from_key == to_key:
+            return [], to_key
+
+        # We first need to fetch all new receipts
+        rooms_to_events = await self.store.get_linearized_receipts_for_all_rooms(
+            from_key=from_key, to_key=to_key
+        )
+
+        # Then filter down to rooms that the AS can read
+        events = []
+        for room_id, event in rooms_to_events.items():
+            if not await service.matches_user_in_member_list(room_id, self.store):
+                continue
+
+            events.append(event)
+
+        return (events, to_key)
+
     def get_current_key(self, direction="f"):
         return self.store.get_max_receipt_stream_id()
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index a306631094..b527724bc4 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -13,7 +13,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import itertools
 import logging
 from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 3cbfc2d780..d3692842e3 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -12,16 +12,16 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import logging
 import random
 from collections import namedtuple
 from typing import TYPE_CHECKING, List, Set, Tuple
 
 from synapse.api.errors import AuthError, ShadowBanError, SynapseError
+from synapse.appservice import ApplicationService
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.tcp.streams import TypingStream
-from synapse.types import UserID, get_domain_from_id
+from synapse.types import JsonDict, UserID, get_domain_from_id
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
@@ -430,6 +430,33 @@ class TypingNotificationEventSource:
             "content": {"user_ids": list(typing)},
         }
 
+    async def get_new_events_as(
+        self, from_key: int, service: ApplicationService
+    ) -> Tuple[List[JsonDict], int]:
+        """Returns a set of new typing events that an appservice
+        may be interested in.
+
+        Args:
+            from_key: the stream position at which events should be fetched from
+            service: The appservice which may be interested
+        """
+        with Measure(self.clock, "typing.get_new_events_as"):
+            from_key = int(from_key)
+            handler = self.get_typing_handler()
+
+            events = []
+            for room_id in handler._room_serials.keys():
+                if handler._room_serials[room_id] <= from_key:
+                    continue
+                if not await service.matches_user_in_member_list(
+                    room_id, handler.store
+                ):
+                    continue
+
+                events.append(self._make_event_for(room_id))
+
+            return (events, handler._latest_room_serial)
+
     async def get_new_events(self, from_key, room_ids, **kwargs):
         with Measure(self.clock, "typing.get_new_events"):
             from_key = int(from_key)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 51c830c91e..2e993411b9 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -329,6 +329,22 @@ class Notifier:
         except Exception:
             logger.exception("Error notifying application services of event")
 
+    async def _notify_app_services_ephemeral(
+        self,
+        stream_key: str,
+        new_token: Union[int, RoomStreamToken],
+        users: Collection[UserID] = [],
+    ):
+        try:
+            stream_token = None
+            if isinstance(new_token, int):
+                stream_token = new_token
+            await self.appservice_handler.notify_interested_services_ephemeral(
+                stream_key, stream_token, users
+            )
+        except Exception:
+            logger.exception("Error notifying application services of event")
+
     async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
         try:
             await self._pusher_pool.on_new_notifications(max_room_stream_token)
@@ -367,6 +383,15 @@ class Notifier:
 
                 self.notify_replication()
 
+                # Notify appservices
+                run_as_background_process(
+                    "_notify_app_services_ephemeral",
+                    self._notify_app_services_ephemeral,
+                    stream_key,
+                    new_token,
+                    users,
+                )
+
     def on_new_replication_data(self) -> None:
         """Used to inform replication listeners that something has happend
         without waking up any of the normal user event streams"""
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 85f6b1e3fd..43bf0f649a 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -15,12 +15,15 @@
 # limitations under the License.
 import logging
 import re
+from typing import List
 
-from synapse.appservice import AppServiceTransaction
+from synapse.appservice import ApplicationService, AppServiceTransaction
 from synapse.config.appservice import load_appservices
+from synapse.events import EventBase
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
+from synapse.types import JsonDict
 from synapse.util import json_encoder
 
 logger = logging.getLogger(__name__)
@@ -172,15 +175,23 @@ class ApplicationServiceTransactionWorkerStore(
             "application_services_state", {"as_id": service.id}, {"state": state}
         )
 
-    async def create_appservice_txn(self, service, events):
+    async def create_appservice_txn(
+        self,
+        service: ApplicationService,
+        events: List[EventBase],
+        ephemeral: List[JsonDict],
+    ) -> AppServiceTransaction:
         """Atomically creates a new transaction for this application service
-        with the given list of events.
+        with the given list of events. Ephemeral events are NOT persisted to the
+        database and are not resent if a transaction is retried.
 
         Args:
-            service(ApplicationService): The service who the transaction is for.
-            events(list<Event>): A list of events to put in the transaction.
+            service: The service who the transaction is for.
+            events: A list of persistent events to put in the transaction.
+            ephemeral: A list of ephemeral events to put in the transaction.
+
         Returns:
-            AppServiceTransaction: A new transaction.
+            A new transaction.
         """
 
         def _create_appservice_txn(txn):
@@ -207,7 +218,9 @@ class ApplicationServiceTransactionWorkerStore(
                 "VALUES(?,?,?)",
                 (service.id, new_txn_id, event_ids),
             )
-            return AppServiceTransaction(service=service, id=new_txn_id, events=events)
+            return AppServiceTransaction(
+                service=service, id=new_txn_id, events=events, ephemeral=ephemeral
+            )
 
         return await self.db_pool.runInteraction(
             "create_appservice_txn", _create_appservice_txn
@@ -296,7 +309,9 @@ class ApplicationServiceTransactionWorkerStore(
 
         events = await self.get_events_as_list(event_ids)
 
-        return AppServiceTransaction(service=service, id=entry["txn_id"], events=events)
+        return AppServiceTransaction(
+            service=service, id=entry["txn_id"], events=events, ephemeral=[]
+        )
 
     def _get_last_txn(self, txn, service_id):
         txn.execute(
@@ -320,7 +335,7 @@ class ApplicationServiceTransactionWorkerStore(
         )
 
     async def get_new_events_for_appservice(self, current_id, limit):
-        """Get all new evnets"""
+        """Get all new events for an appservice"""
 
         def get_new_events_for_appservice_txn(txn):
             sql = (
@@ -351,6 +366,39 @@ class ApplicationServiceTransactionWorkerStore(
 
         return upper_bound, events
 
+    async def get_type_stream_id_for_appservice(
+        self, service: ApplicationService, type: str
+    ) -> int:
+        def get_type_stream_id_for_appservice_txn(txn):
+            stream_id_type = "%s_stream_id" % type
+            txn.execute(
+                "SELECT ? FROM application_services_state WHERE as_id=?",
+                (stream_id_type, service.id,),
+            )
+            last_txn_id = txn.fetchone()
+            if last_txn_id is None or last_txn_id[0] is None:  # no row exists
+                return 0
+            else:
+                return int(last_txn_id[0])
+
+        return await self.db_pool.runInteraction(
+            "get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn
+        )
+
+    async def set_type_stream_id_for_appservice(
+        self, service: ApplicationService, type: str, pos: int
+    ) -> None:
+        def set_type_stream_id_for_appservice_txn(txn):
+            stream_id_type = "%s_stream_id" % type
+            txn.execute(
+                "UPDATE ? SET device_list_stream_id = ? WHERE as_id=?",
+                (stream_id_type, pos, service.id),
+            )
+
+        await self.db_pool.runInteraction(
+            "set_type_stream_id_for_appservice", set_type_stream_id_for_appservice_txn
+        )
+
 
 class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore):
     # This is currently empty due to there not being any AS storage functions
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index c79ddff680..5cdf16521c 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -23,6 +23,7 @@ from twisted.internet import defer
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import DatabasePool
 from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.async_helpers import ObservableDeferred
 from synapse.util.caches.descriptors import cached, cachedList
@@ -274,6 +275,60 @@ class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
         }
         return results
 
+    @cached(num_args=2,)
+    async def get_linearized_receipts_for_all_rooms(
+        self, to_key: int, from_key: Optional[int] = None
+    ) -> Dict[str, JsonDict]:
+        """Get receipts for all rooms between two stream_ids.
+
+        Args:
+            to_key: Max stream id to fetch receipts upto.
+            from_key: Min stream id to fetch receipts from. None fetches
+                from the start.
+
+        Returns:
+            A dictionary of roomids to a list of receipts.
+        """
+
+        def f(txn):
+            if from_key:
+                sql = """
+                    SELECT * FROM receipts_linearized WHERE
+                    stream_id > ? AND stream_id <= ?
+                """
+                txn.execute(sql, [from_key, to_key])
+            else:
+                sql = """
+                    SELECT * FROM receipts_linearized WHERE
+                    stream_id <= ?
+                """
+
+                txn.execute(sql, [to_key])
+
+            return self.db_pool.cursor_to_dict(txn)
+
+        txn_results = await self.db_pool.runInteraction(
+            "get_linearized_receipts_for_all_rooms", f
+        )
+
+        results = {}
+        for row in txn_results:
+            # We want a single event per room, since we want to batch the
+            # receipts by room, event and type.
+            room_event = results.setdefault(
+                row["room_id"],
+                {"type": "m.receipt", "room_id": row["room_id"], "content": {}},
+            )
+
+            # The content is of the form:
+            # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. }
+            event_entry = room_event["content"].setdefault(row["event_id"], {})
+            receipt_type = event_entry.setdefault(row["receipt_type"], {})
+
+            receipt_type[row["user_id"]] = db_to_json(row["data"])
+
+        return results
+
     async def get_users_sent_receipts_between(
         self, last_id: int, current_id: int
     ) -> List[str]:
diff --git a/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql b/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql
new file mode 100644
index 0000000000..20f5a95a24
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE application_services_state
+    ADD COLUMN read_receipt_stream_id INT,
+    ADD COLUMN presence_stream_id INT;
\ No newline at end of file
diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py
index 68a4caabbf..2acb8b7603 100644
--- a/tests/appservice/test_scheduler.py
+++ b/tests/appservice/test_scheduler.py
@@ -60,7 +60,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
         self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events)))
 
         self.store.create_appservice_txn.assert_called_once_with(
-            service=service, events=events  # txn made and saved
+            service=service, events=events, ephemeral=[]  # txn made and saved
         )
         self.assertEquals(0, len(self.txnctrl.recoverers))  # no recoverer made
         txn.complete.assert_called_once_with(self.store)  # txn completed
@@ -81,7 +81,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
         self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events)))
 
         self.store.create_appservice_txn.assert_called_once_with(
-            service=service, events=events  # txn made and saved
+            service=service, events=events, ephemeral=[]  # txn made and saved
         )
         self.assertEquals(0, txn.send.call_count)  # txn not sent though
         self.assertEquals(0, txn.complete.call_count)  # or completed
@@ -106,7 +106,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
         self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events)))
 
         self.store.create_appservice_txn.assert_called_once_with(
-            service=service, events=events
+            service=service, events=events, ephemeral=[]
         )
         self.assertEquals(1, self.recoverer_fn.call_count)  # recoverer made
         self.assertEquals(1, self.recoverer.recover.call_count)  # and invoked
@@ -202,26 +202,28 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
         # Expect the event to be sent immediately.
         service = Mock(id=4)
         event = Mock()
-        self.queuer.enqueue(service, event)
-        self.txn_ctrl.send.assert_called_once_with(service, [event])
+        self.queuer.enqueue_event(service, event)
+        self.txn_ctrl.send.assert_called_once_with(service, [event], [])
 
     def test_send_single_event_with_queue(self):
         d = defer.Deferred()
-        self.txn_ctrl.send = Mock(side_effect=lambda x, y: make_deferred_yieldable(d))
+        self.txn_ctrl.send = Mock(
+            side_effect=lambda x, y, z: make_deferred_yieldable(d)
+        )
         service = Mock(id=4)
         event = Mock(event_id="first")
         event2 = Mock(event_id="second")
         event3 = Mock(event_id="third")
         # Send an event and don't resolve it just yet.
-        self.queuer.enqueue(service, event)
+        self.queuer.enqueue_event(service, event)
         # Send more events: expect send() to NOT be called multiple times.
-        self.queuer.enqueue(service, event2)
-        self.queuer.enqueue(service, event3)
-        self.txn_ctrl.send.assert_called_with(service, [event])
+        self.queuer.enqueue_event(service, event2)
+        self.queuer.enqueue_event(service, event3)
+        self.txn_ctrl.send.assert_called_with(service, [event], [])
         self.assertEquals(1, self.txn_ctrl.send.call_count)
         # Resolve the send event: expect the queued events to be sent
         d.callback(service)
-        self.txn_ctrl.send.assert_called_with(service, [event2, event3])
+        self.txn_ctrl.send.assert_called_with(service, [event2, event3], [])
         self.assertEquals(2, self.txn_ctrl.send.call_count)
 
     def test_multiple_service_queues(self):
@@ -239,21 +241,58 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
 
         send_return_list = [srv_1_defer, srv_2_defer]
 
-        def do_send(x, y):
+        def do_send(x, y, z):
             return make_deferred_yieldable(send_return_list.pop(0))
 
         self.txn_ctrl.send = Mock(side_effect=do_send)
 
         # send events for different ASes and make sure they are sent
-        self.queuer.enqueue(srv1, srv_1_event)
-        self.queuer.enqueue(srv1, srv_1_event2)
-        self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event])
-        self.queuer.enqueue(srv2, srv_2_event)
-        self.queuer.enqueue(srv2, srv_2_event2)
-        self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event])
+        self.queuer.enqueue_event(srv1, srv_1_event)
+        self.queuer.enqueue_event(srv1, srv_1_event2)
+        self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event], [])
+        self.queuer.enqueue_event(srv2, srv_2_event)
+        self.queuer.enqueue_event(srv2, srv_2_event2)
+        self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event], [])
 
         # make sure callbacks for a service only send queued events for THAT
         # service
         srv_2_defer.callback(srv2)
-        self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2])
+        self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2], [])
         self.assertEquals(3, self.txn_ctrl.send.call_count)
+
+    def test_send_single_ephemeral_no_queue(self):
+        # Expect the event to be sent immediately.
+        service = Mock(id=4, name="service")
+        event_list = [Mock(name="event")]
+        self.queuer.enqueue_ephemeral(service, event_list)
+        self.txn_ctrl.send.assert_called_once_with(service, [], event_list)
+
+    def test_send_multiple_ephemeral_no_queue(self):
+        # Expect the event to be sent immediately.
+        service = Mock(id=4, name="service")
+        event_list = [Mock(name="event1"), Mock(name="event2"), Mock(name="event3")]
+        self.queuer.enqueue_ephemeral(service, event_list)
+        self.txn_ctrl.send.assert_called_once_with(service, [], event_list)
+
+    def test_send_single_ephemeral_with_queue(self):
+        d = defer.Deferred()
+        self.txn_ctrl.send = Mock(
+            side_effect=lambda x, y, z: make_deferred_yieldable(d)
+        )
+        service = Mock(id=4)
+        event_list_1 = [Mock(event_id="event1"), Mock(event_id="event2")]
+        event_list_2 = [Mock(event_id="event3"), Mock(event_id="event4")]
+        event_list_3 = [Mock(event_id="event5"), Mock(event_id="event6")]
+
+        # Send an event and don't resolve it just yet.
+        self.queuer.enqueue_ephemeral(service, event_list_1)
+        # Send more events: expect send() to NOT be called multiple times.
+        self.queuer.enqueue_ephemeral(service, event_list_2)
+        self.queuer.enqueue_ephemeral(service, event_list_3)
+        self.txn_ctrl.send.assert_called_with(service, [], event_list_1)
+        self.assertEquals(1, self.txn_ctrl.send.call_count)
+        # Resolve txn_ctrl.send
+        d.callback(service)
+        # Expect the queued events to be sent
+        self.txn_ctrl.send.assert_called_with(service, [], event_list_2 + event_list_3)
+        self.assertEquals(2, self.txn_ctrl.send.call_count)
diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py
index c905a38930..c5c7987349 100644
--- a/tests/storage/test_appservice.py
+++ b/tests/storage/test_appservice.py
@@ -244,7 +244,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
         service = Mock(id=self.as_list[0]["id"])
         events = [Mock(event_id="e1"), Mock(event_id="e2")]
         txn = yield defer.ensureDeferred(
-            self.store.create_appservice_txn(service, events)
+            self.store.create_appservice_txn(service, events, [])
         )
         self.assertEquals(txn.id, 1)
         self.assertEquals(txn.events, events)
@@ -258,7 +258,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
         yield self._insert_txn(service.id, 9644, events)
         yield self._insert_txn(service.id, 9645, events)
         txn = yield defer.ensureDeferred(
-            self.store.create_appservice_txn(service, events)
+            self.store.create_appservice_txn(service, events, [])
         )
         self.assertEquals(txn.id, 9646)
         self.assertEquals(txn.events, events)
@@ -270,7 +270,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
         events = [Mock(event_id="e1"), Mock(event_id="e2")]
         yield self._set_last_txn(service.id, 9643)
         txn = yield defer.ensureDeferred(
-            self.store.create_appservice_txn(service, events)
+            self.store.create_appservice_txn(service, events, [])
         )
         self.assertEquals(txn.id, 9644)
         self.assertEquals(txn.events, events)
@@ -293,7 +293,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
         yield self._insert_txn(self.as_list[3]["id"], 9643, events)
 
         txn = yield defer.ensureDeferred(
-            self.store.create_appservice_txn(service, events)
+            self.store.create_appservice_txn(service, events, [])
         )
         self.assertEquals(txn.id, 9644)
         self.assertEquals(txn.events, events)