diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 6504c6bd3f..2597ad7917 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -187,6 +187,7 @@ class ApplicationService:
for user_id in member_list:
if self.is_interested_in_user(user_id):
return True
+
return False
def _matches_room_id(self, event: EventBase) -> bool:
@@ -233,6 +234,15 @@ class ApplicationService:
return False
+ def is_interested_in_synthetic_user_event(self, event_type: str, user_id: UserID):
+ for regex_obj in self.namespaces["users"]:
+ if not regex_obj["regex"].match(user_id):
+ continue
+ # TODO: Validate structure further up.
+ if event_type in regex_obj.get("uk.half-shot.msc3395.synthetic_events", {"events": []})["events"]:
+ return True
+ return False
+
@cached(num_args=1)
async def is_interested_in_presence(
self, user_id: UserID, store: "DataStore"
@@ -258,10 +268,10 @@ class ApplicationService:
return False
def is_interested_in_user(self, user_id: str) -> bool:
- return (
- bool(self._matches_regex(user_id, ApplicationService.NS_USERS))
- or user_id == self.sender
- )
+ if user_id == self.sender:
+ return True
+ regex_obj = self._matches_regex(user_id, ApplicationService.NS_USERS)
+ return regex_obj and regex_obj.get("uk.half-shot.msc3395.events", True)
def is_interested_in_alias(self, alias: str) -> bool:
return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES))
@@ -329,11 +339,13 @@ class AppServiceTransaction:
id: int,
events: List[EventBase],
ephemeral: List[JsonDict],
+ synthetic_events: Optional[JsonDict] = None,
):
self.service = service
self.id = id
self.events = events
self.ephemeral = ephemeral
+ self.synthetic_events = synthetic_events
async def send(self, as_api: "ApplicationServiceApi") -> bool:
"""Sends this transaction using the provided AS API interface.
@@ -347,6 +359,7 @@ class AppServiceTransaction:
service=self.service,
events=self.events,
ephemeral=self.ephemeral,
+ synthetic_events=self.synthetic_events,
txn_id=self.id,
)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 935f24263c..0475caae1a 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -203,6 +203,7 @@ class ApplicationServiceApi(SimpleHttpClient):
service: "ApplicationService",
events: List[EventBase],
ephemeral: List[JsonDict],
+ synthetic_events: Optional[List[JsonDict]],
txn_id: Optional[int] = None,
):
if service.url is None:
@@ -218,11 +219,15 @@ class ApplicationServiceApi(SimpleHttpClient):
uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
+ body = {"events": events}
+
# Never send ephemeral events to appservices that do not support it
if service.supports_ephemeral:
- body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral}
- else:
- body = {"events": events}
+ body["de.sorunome.msc2409.ephemeral"] = ephemeral
+
+ # We will only populate this if the appservice requests synthetic events
+ if synthetic_events and len(synthetic_events):
+ body["uk.half-shot.msc3395.synthetic_events"] = synthetic_events
try:
await self.put_json(
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 6a2ce99b55..9ff677fcf6 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -65,6 +65,8 @@ MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100
# Maximum number of ephemeral events to provide in an AS transaction.
MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100
+# Maximum number of syntethci events to provide in an AS transaction.
+MAX_SYNTHETIC_EVENTS_PER_TRANSACTION = 100
class ApplicationServiceScheduler:
"""Public facing API for this module. Does the required DI to tie the
@@ -99,6 +101,11 @@ class ApplicationServiceScheduler:
):
self.queuer.enqueue_ephemeral(service, events)
+ def submit_synthetic_events_for_as(
+ self, service: ApplicationService, events: List[JsonDict]
+ ):
+ self.queuer.enqueue_ephemeral(service, events)
+
class _ServiceQueuer:
"""Queue of events waiting to be sent to appservices.
@@ -111,6 +118,7 @@ class _ServiceQueuer:
def __init__(self, txn_ctrl, clock):
self.queued_events = {} # dict of {service_id: [events]}
self.queued_ephemeral = {} # dict of {service_id: [events]}
+ self.queued_synthetic = {} # dict of {service_id: [events]}
# the appservices which currently have a transaction in flight
self.requests_in_flight = set()
@@ -134,6 +142,10 @@ class _ServiceQueuer:
self.queued_ephemeral.setdefault(service.id, []).extend(events)
self._start_background_request(service)
+ def enqueue_syntheic(self, service: ApplicationService, events: List[JsonDict]):
+ self.queued_synthetic.setdefault(service.id, []).extend(events)
+ self._start_background_request(service)
+
async def _send_request(self, service: ApplicationService):
# sanity-check: we shouldn't get here if this service already has a sender
# running.
@@ -150,11 +162,15 @@ class _ServiceQueuer:
ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
- if not events and not ephemeral:
+ all_events_synthetic = self.queued_synthetic.get(service.id, [])
+ synthetic = all_events_ephemeral[:MAX_SYNTHETIC_EVENTS_PER_TRANSACTION]
+ del all_events_synthetic[:MAX_SYNTHETIC_EVENTS_PER_TRANSACTION]
+
+ if not events and not ephemeral and not synthetic:
return
try:
- await self.txn_ctrl.send(service, events, ephemeral)
+ await self.txn_ctrl.send(service, events, ephemeral, synthetic)
except Exception:
logger.exception("AS request failed")
finally:
@@ -191,10 +207,11 @@ class _TransactionController:
service: ApplicationService,
events: List[EventBase],
ephemeral: Optional[List[JsonDict]] = None,
+ synthetic_events: Optional[List[JsonDict]] = None,
):
try:
txn = await self.store.create_appservice_txn(
- service=service, events=events, ephemeral=ephemeral or []
+ service=service, events=events, ephemeral=ephemeral or [], synthetic_events=synthetic_events
)
service_is_up = await self._is_service_up(service)
if service_is_up:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 163278708c..cf9ca5b7e5 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -179,6 +179,58 @@ class ApplicationServicesHandler:
finally:
self.is_processing = False
+ def notify_synthetic_event(
+ self,
+ event_type: str,
+ user_id: UserID,
+ content: JsonDict,
+ ) -> None:
+ """This is called when another service wishes to
+ notify about a synthetic event.
+
+ This will determine which appservices
+ are interested in the event, and submit them.
+
+ Events will only be pushed to appservices
+ that have opted into ephemeral events
+
+ Args:
+ event_type: The type of event to notify about.
+ user_id: The user_id of the user involved in the event.
+ content: The content of the event itself.
+ """
+ if not self.notify_appservices:
+ return
+
+ logger.debug("Checking interested services for synthetic event %s:%s" % (event_type, user_id))
+ services = self._get_services_for_user_synthetic_event(event_type, user_id)
+
+ if not services:
+ return
+
+ # We only start a new background process if necessary rather than
+ # optimistically (to cut down on overhead).
+ self._notify_synthetic_event(
+ services, user_id, {
+ "type": event_type,
+ "content": content,
+ }
+ )
+
+ @wrap_as_background_process("notify_synthetic_event")
+ async def _notify_synthetic_event(
+ self,
+ services: List[ApplicationService],
+ user_id: UserID,
+ event: JsonDict,
+ ) -> None:
+ logger.debug("Submitting synthetic event to interested services %s:%s" % (event["type"], user_id))
+ with Measure(self.clock, "notify_synthetic_event"):
+ for service in services:
+ # TODO: Store event in DB if we can't submit it now.
+ self.scheduler.submit_synthetic_events_for_as(service, [event])
+
+
def notify_interested_services_ephemeral(
self,
stream_key: str,
@@ -434,12 +486,16 @@ class ApplicationServicesHandler:
def _get_services_for_user(self, user_id: str) -> List[ApplicationService]:
services = self.store.get_app_services()
- return [s for s in services if (s.is_interested_in_user(user_id))]
+ return [s for s in services if s.is_interested_in_user(user_id)]
def _get_services_for_3pn(self, protocol: str) -> List[ApplicationService]:
services = self.store.get_app_services()
return [s for s in services if s.is_interested_in_protocol(protocol)]
+ def _get_services_for_user_synthetic_event(self, event_type: str, user_id: str) -> List[ApplicationService]:
+ services = self.store.get_app_services()
+ return [s for s in services if s.is_interested_in_synthetic_user_event(event_type, user_id)]
+
async def _is_unknown_user(self, user_id: str) -> bool:
if not self.is_mine_id(user_id):
# we don't know if they are unknown or not since it isn't one of our
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 2da2659f41..615690a603 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -194,6 +194,7 @@ class ApplicationServiceTransactionWorkerStore(
service: ApplicationService,
events: List[EventBase],
ephemeral: List[JsonDict],
+ synthetic_events: Optional[List[JsonDict]] = None,
) -> AppServiceTransaction:
"""Atomically creates a new transaction for this application service
with the given list of events. Ephemeral events are NOT persisted to the
@@ -233,7 +234,7 @@ class ApplicationServiceTransactionWorkerStore(
(service.id, new_txn_id, event_ids),
)
return AppServiceTransaction(
- service=service, id=new_txn_id, events=events, ephemeral=ephemeral
+ service=service, id=new_txn_id, events=events, ephemeral=ephemeral, synthetic_events=synthetic_events
)
return await self.db_pool.runInteraction(
|