diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index dbbde3db18..6abc2891cf 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -14,36 +14,24 @@
# limitations under the License.
import logging
+from typing import Collection, List, Union
from prometheus_client import Counter
from twisted.internet import defer
import synapse
-from typing import (
- Awaitable,
- Callable,
- Dict,
- Iterable,
- List,
- Optional,
- Set,
- Tuple,
- TypeVar,
- Union,
- Collection,
-)
-
-from synapse.types import RoomStreamToken, UserID
from synapse.api.constants import EventTypes
+from synapse.appservice import ApplicationService
+from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import (
event_processing_loop_counter,
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import RoomStreamToken, UserID
from synapse.util.metrics import Measure
-from synapse.handlers.presence import format_user_presence_state
logger = logging.getLogger(__name__)
@@ -175,8 +163,17 @@ class ApplicationServicesHandler:
finally:
self.is_processing = False
- async def notify_interested_services_ephemeral(self, stream_key: str, new_token: Union[int, RoomStreamToken], users: Collection[UserID] = []):
- services = [service for service in self.store.get_app_services() if service.supports_ephemeral]
+ async def notify_interested_services_ephemeral(
+ self,
+ stream_key: str,
+ new_token: Union[int, RoomStreamToken],
+ users: Collection[UserID] = [],
+ ):
+ services = [
+ service
+ for service in self.store.get_app_services()
+ if service.supports_ephemeral
+ ]
if not services or not self.notify_appservices:
return
logger.info("Checking interested services for %s" % (stream_key))
@@ -184,65 +181,99 @@ class ApplicationServicesHandler:
for service in services:
events = []
if stream_key == "typing_key":
- from_key = new_token - 1
- typing_source = self.event_sources.sources["typing"]
- # Get the typing events from just before current
- typing, _key = await typing_source.get_new_events_as(
- service=service,
- from_key=from_key
- )
- events = typing
+ events = await self._handle_typing(service, new_token)
elif stream_key == "receipt_key":
- from_key = new_token - 1
- receipts_source = self.event_sources.sources["receipt"]
- receipts, _key = await receipts_source.get_new_events_as(
- service=service,
- from_key=from_key
- )
- events = receipts
+ events = await self._handle_receipts(service)
elif stream_key == "presence_key":
events = await self._handle_as_presence(service, users)
elif stream_key == "device_list_key":
# Check if the device lists have changed for any of the users we are interested in
- print("device_list_key", users)
+ events = await self._handle_device_list(service, users, new_token)
elif stream_key == "to_device_key":
- # Check the inbox for any users the bridge owns
- events, to_device_token = await self._handle_to_device(service, users, new_token)
- if events:
- # TODO: Do in background?
- await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token)
- if stream_key == "to_device_key":
- # Update database with new token
- await self.store.set_device_messages_token_for_appservice(service, to_device_token)
- return
+ # Check the inbox for any users the bridge owns
+ events = await self._handle_to_device(service, users, new_token)
if events:
# TODO: Do in background?
- await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token)
+ await self.scheduler.submit_ephemeral_events_for_as(
+ service, events, new_token
+ )
+ # We don't persist the token for typing_key
+ if stream_key == "presence_key":
+ await self.store.set_type_stream_id_for_appservice(
+ service, "presence", new_token
+ )
+ elif stream_key == "receipt_key":
+ await self.store.set_type_stream_id_for_appservice(
+ service, "read_receipt", new_token
+ )
+ elif stream_key == "to_device_key":
+ await self.store.set_type_stream_id_for_appservice(
+ service, "to_device", new_token
+ )
- async def _handle_device_list(self, service, users, token):
- if not any([True for u in users if service.is_interested_in_user(u)]):
- return False
+ async def _handle_typing(self, service, new_token):
+ typing_source = self.event_sources.sources["typing"]
+ # Get the typing events from just before current
+ typing, _key = await typing_source.get_new_events_as(
+ service=service,
+ # For performance reasons, we don't persist the previous
+ # token in the DB and instead fetch the latest typing information
+ # for appservices.
+ from_key=new_token - 1,
+ )
+ return typing
+
+ async def _handle_receipts(self, service, token: int):
+ from_key = await self.store.get_type_stream_id_for_appservice(
+ service, "read_receipt"
+ )
+ receipts_source = self.event_sources.sources["receipt"]
+ receipts, _ = await receipts_source.get_new_events_as(
+ service=service, from_key=from_key
+ )
+ return receipts
+
+ async def _handle_device_list(
+ self, service: ApplicationService, users: List[str], new_token: int
+ ):
+ # TODO: Determine if any user have left and report those
+ from_token = await self.store.get_type_stream_id_for_appservice(
+ service, "device_list"
+ )
+ changed_user_ids = await self.store.get_device_changes_for_as(
+ service, from_token, new_token
+ )
+ # Return the
+ return {
+ "type": "m.device_list_update",
+ "content": {"changed": changed_user_ids,},
+ }
async def _handle_to_device(self, service, users, token):
if not any([True for u in users if service.is_interested_in_user(u)]):
return False
-
- since_token = await self.store.get_device_messages_token_for_appservice(service)
-
- messages, new_token = await self.store.get_new_messages_for_as(service, since_token, token)
- return messages, new_token
+
+ since_token = await self.store.get_type_stream_id_for_appservice(
+ service, "to_device"
+ )
+ messages, _ = await self.store.get_new_messages_for_as(
+ service, since_token, token
+ )
+ # This returns user_id -> device_id -> message
+ return messages
async def _handle_as_presence(self, service, users):
events = []
presence_source = self.event_sources.sources["presence"]
+ from_key = await self.store.get_type_stream_id_for_appservice(
+ service, "presence"
+ )
for user in users:
interested = await service.is_interested_in_presence(user, self.store)
if not interested:
continue
presence_events, _key = await presence_source.get_new_events(
- user=user,
- service=service,
- from_key=None, # TODO: I don't think this is required?
+ user=user, service=service, from_key=from_key,
)
time_now = self.clock.time_msec()
presence_events = [
|