diff options
Diffstat (limited to 'synapse/handlers/appservice.py')
-rw-r--r-- | synapse/handlers/appservice.py | 141 |
1 files changed, 86 insertions, 55 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index dbbde3db18..6abc2891cf 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -14,36 +14,24 @@ # limitations under the License. import logging +from typing import Collection, List, Union from prometheus_client import Counter from twisted.internet import defer import synapse -from typing import ( - Awaitable, - Callable, - Dict, - Iterable, - List, - Optional, - Set, - Tuple, - TypeVar, - Union, - Collection, -) - -from synapse.types import RoomStreamToken, UserID from synapse.api.constants import EventTypes +from synapse.appservice import ApplicationService +from synapse.handlers.presence import format_user_presence_state from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics import ( event_processing_loop_counter, event_processing_loop_room_count, ) from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import RoomStreamToken, UserID from synapse.util.metrics import Measure -from synapse.handlers.presence import format_user_presence_state logger = logging.getLogger(__name__) @@ -175,8 +163,17 @@ class ApplicationServicesHandler: finally: self.is_processing = False - async def notify_interested_services_ephemeral(self, stream_key: str, new_token: Union[int, RoomStreamToken], users: Collection[UserID] = []): - services = [service for service in self.store.get_app_services() if service.supports_ephemeral] + async def notify_interested_services_ephemeral( + self, + stream_key: str, + new_token: Union[int, RoomStreamToken], + users: Collection[UserID] = [], + ): + services = [ + service + for service in self.store.get_app_services() + if service.supports_ephemeral + ] if not services or not self.notify_appservices: return logger.info("Checking interested services for %s" % (stream_key)) @@ -184,65 +181,99 @@ class ApplicationServicesHandler: for service in services: events = [] if stream_key == "typing_key": - from_key = new_token - 1 - typing_source = self.event_sources.sources["typing"] - # Get the typing events from just before current - typing, _key = await typing_source.get_new_events_as( - service=service, - from_key=from_key - ) - events = typing + events = await self._handle_typing(service, new_token) elif stream_key == "receipt_key": - from_key = new_token - 1 - receipts_source = self.event_sources.sources["receipt"] - receipts, _key = await receipts_source.get_new_events_as( - service=service, - from_key=from_key - ) - events = receipts + events = await self._handle_receipts(service) elif stream_key == "presence_key": events = await self._handle_as_presence(service, users) elif stream_key == "device_list_key": # Check if the device lists have changed for any of the users we are interested in - print("device_list_key", users) + events = await self._handle_device_list(service, users, new_token) elif stream_key == "to_device_key": - # Check the inbox for any users the bridge owns - events, to_device_token = await self._handle_to_device(service, users, new_token) - if events: - # TODO: Do in background? - await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token) - if stream_key == "to_device_key": - # Update database with new token - await self.store.set_device_messages_token_for_appservice(service, to_device_token) - return + # Check the inbox for any users the bridge owns + events = await self._handle_to_device(service, users, new_token) if events: # TODO: Do in background? - await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token) + await self.scheduler.submit_ephemeral_events_for_as( + service, events, new_token + ) + # We don't persist the token for typing_key + if stream_key == "presence_key": + await self.store.set_type_stream_id_for_appservice( + service, "presence", new_token + ) + elif stream_key == "receipt_key": + await self.store.set_type_stream_id_for_appservice( + service, "read_receipt", new_token + ) + elif stream_key == "to_device_key": + await self.store.set_type_stream_id_for_appservice( + service, "to_device", new_token + ) - async def _handle_device_list(self, service, users, token): - if not any([True for u in users if service.is_interested_in_user(u)]): - return False + async def _handle_typing(self, service, new_token): + typing_source = self.event_sources.sources["typing"] + # Get the typing events from just before current + typing, _key = await typing_source.get_new_events_as( + service=service, + # For performance reasons, we don't persist the previous + # token in the DB and instead fetch the latest typing information + # for appservices. + from_key=new_token - 1, + ) + return typing + + async def _handle_receipts(self, service, token: int): + from_key = await self.store.get_type_stream_id_for_appservice( + service, "read_receipt" + ) + receipts_source = self.event_sources.sources["receipt"] + receipts, _ = await receipts_source.get_new_events_as( + service=service, from_key=from_key + ) + return receipts + + async def _handle_device_list( + self, service: ApplicationService, users: List[str], new_token: int + ): + # TODO: Determine if any user have left and report those + from_token = await self.store.get_type_stream_id_for_appservice( + service, "device_list" + ) + changed_user_ids = await self.store.get_device_changes_for_as( + service, from_token, new_token + ) + # Return the + return { + "type": "m.device_list_update", + "content": {"changed": changed_user_ids,}, + } async def _handle_to_device(self, service, users, token): if not any([True for u in users if service.is_interested_in_user(u)]): return False - - since_token = await self.store.get_device_messages_token_for_appservice(service) - - messages, new_token = await self.store.get_new_messages_for_as(service, since_token, token) - return messages, new_token + + since_token = await self.store.get_type_stream_id_for_appservice( + service, "to_device" + ) + messages, _ = await self.store.get_new_messages_for_as( + service, since_token, token + ) + # This returns user_id -> device_id -> message + return messages async def _handle_as_presence(self, service, users): events = [] presence_source = self.event_sources.sources["presence"] + from_key = await self.store.get_type_stream_id_for_appservice( + service, "presence" + ) for user in users: interested = await service.is_interested_in_presence(user, self.store) if not interested: continue presence_events, _key = await presence_source.get_new_events( - user=user, - service=service, - from_key=None, # TODO: I don't think this is required? + user=user, service=service, from_key=from_key, ) time_now = self.clock.time_msec() presence_events = [ |