diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index c8d5e58035..07240d3a14 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -14,6 +14,7 @@
# limitations under the License.
import logging
+from typing import Dict, List, Optional
from prometheus_client import Counter
@@ -21,13 +22,16 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
+from synapse.appservice import ApplicationService
+from synapse.events import EventBase
+from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import (
event_processing_loop_counter,
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import RoomStreamToken
+from synapse.types import Collection, JsonDict, RoomStreamToken, UserID
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -44,6 +48,7 @@ class ApplicationServicesHandler:
self.started_scheduler = False
self.clock = hs.get_clock()
self.notify_appservices = hs.config.notify_appservices
+ self.event_sources = hs.get_event_sources()
self.current_max = 0
self.is_processing = False
@@ -82,7 +87,7 @@ class ApplicationServicesHandler:
if not events:
break
- events_by_room = {}
+ events_by_room = {} # type: Dict[str, List[EventBase]]
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
@@ -161,6 +166,104 @@ class ApplicationServicesHandler:
finally:
self.is_processing = False
+ async def notify_interested_services_ephemeral(
+ self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [],
+ ):
+ """This is called by the notifier in the background
+ when a ephemeral event handled by the homeserver.
+
+ This will determine which appservices
+ are interested in the event, and submit them.
+
+ Events will only be pushed to appservices
+ that have opted into ephemeral events
+
+ Args:
+ stream_key: The stream the event came from.
+ new_token: The latest stream token
+ users: The user(s) involved with the event.
+ """
+ services = [
+ service
+ for service in self.store.get_app_services()
+ if service.supports_ephemeral
+ ]
+ if not services or not self.notify_appservices:
+ return
+ logger.info("Checking interested services for %s" % (stream_key))
+ with Measure(self.clock, "notify_interested_services_ephemeral"):
+ for service in services:
+ # Only handle typing if we have the latest token
+ if stream_key == "typing_key" and new_token is not None:
+ events = await self._handle_typing(service, new_token)
+ if events:
+ self.scheduler.submit_ephemeral_events_for_as(service, events)
+ # We don't persist the token for typing_key for performance reasons
+ elif stream_key == "receipt_key":
+ events = await self._handle_receipts(service)
+ if events:
+ self.scheduler.submit_ephemeral_events_for_as(service, events)
+ await self.store.set_type_stream_id_for_appservice(
+ service, "read_receipt", new_token
+ )
+ elif stream_key == "presence_key":
+ events = await self._handle_presence(service, users)
+ if events:
+ self.scheduler.submit_ephemeral_events_for_as(service, events)
+ await self.store.set_type_stream_id_for_appservice(
+ service, "presence", new_token
+ )
+
+ async def _handle_typing(self, service: ApplicationService, new_token: int):
+ typing_source = self.event_sources.sources["typing"]
+ # Get the typing events from just before current
+ typing, _ = await typing_source.get_new_events_as(
+ service=service,
+ # For performance reasons, we don't persist the previous
+ # token in the DB and instead fetch the latest typing information
+ # for appservices.
+ from_key=new_token - 1,
+ )
+ return typing
+
+ async def _handle_receipts(self, service: ApplicationService):
+ from_key = await self.store.get_type_stream_id_for_appservice(
+ service, "read_receipt"
+ )
+ receipts_source = self.event_sources.sources["receipt"]
+ receipts, _ = await receipts_source.get_new_events_as(
+ service=service, from_key=from_key
+ )
+ return receipts
+
+ async def _handle_presence(
+ self, service: ApplicationService, users: Collection[UserID]
+ ):
+ events = [] # type: List[JsonDict]
+ presence_source = self.event_sources.sources["presence"]
+ from_key = await self.store.get_type_stream_id_for_appservice(
+ service, "presence"
+ )
+ for user in users:
+ interested = await service.is_interested_in_presence(user, self.store)
+ if not interested:
+ continue
+ presence_events, _ = await presence_source.get_new_events(
+ user=user, service=service, from_key=from_key,
+ )
+ time_now = self.clock.time_msec()
+ presence_events = [
+ {
+ "type": "m.presence",
+ "sender": event.user_id,
+ "content": format_user_presence_state(
+ event, time_now, include_user_id=False
+ ),
+ }
+ for event in presence_events
+ ]
+ events = events + presence_events
+
async def query_user_exists(self, user_id):
"""Check if any application service knows this user_id exists.
@@ -223,7 +326,7 @@ class ApplicationServicesHandler:
async def get_3pe_protocols(self, only_protocol=None):
services = self.store.get_app_services()
- protocols = {}
+ protocols = {} # type: Dict[str, List[JsonDict]]
# Collect up all the individual protocol responses out of the ASes
for s in services:
|