summary refs log tree commit diff
path: root/synapse/handlers/appservice.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/appservice.py')
-rw-r--r--synapse/handlers/appservice.py141
1 files changed, 86 insertions, 55 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index dbbde3db18..6abc2891cf 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -14,36 +14,24 @@
 # limitations under the License.
 
 import logging
+from typing import Collection, List, Union
 
 from prometheus_client import Counter
 
 from twisted.internet import defer
 
 import synapse
-from typing import (
-    Awaitable,
-    Callable,
-    Dict,
-    Iterable,
-    List,
-    Optional,
-    Set,
-    Tuple,
-    TypeVar,
-    Union,
-    Collection,
-)
-
-from synapse.types import RoomStreamToken, UserID
 from synapse.api.constants import EventTypes
+from synapse.appservice import ApplicationService
+from synapse.handlers.presence import format_user_presence_state
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.metrics import (
     event_processing_loop_counter,
     event_processing_loop_room_count,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import RoomStreamToken, UserID
 from synapse.util.metrics import Measure
-from synapse.handlers.presence import format_user_presence_state
 
 logger = logging.getLogger(__name__)
 
@@ -175,8 +163,17 @@ class ApplicationServicesHandler:
             finally:
                 self.is_processing = False
 
-    async def notify_interested_services_ephemeral(self, stream_key: str, new_token: Union[int, RoomStreamToken], users: Collection[UserID] = []):
-        services = [service for service in self.store.get_app_services() if service.supports_ephemeral]
+    async def notify_interested_services_ephemeral(
+        self,
+        stream_key: str,
+        new_token: Union[int, RoomStreamToken],
+        users: Collection[UserID] = [],
+    ):
+        services = [
+            service
+            for service in self.store.get_app_services()
+            if service.supports_ephemeral
+        ]
         if not services or not self.notify_appservices:
             return
         logger.info("Checking interested services for %s" % (stream_key))
@@ -184,65 +181,99 @@ class ApplicationServicesHandler:
             for service in services:
                 events = []
                 if stream_key == "typing_key":
-                    from_key = new_token - 1
-                    typing_source = self.event_sources.sources["typing"]
-                    # Get the typing events from just before current
-                    typing, _key = await typing_source.get_new_events_as(
-                        service=service,
-                        from_key=from_key
-                    )
-                    events = typing
+                    events = await self._handle_typing(service, new_token)
                 elif stream_key == "receipt_key":
-                    from_key = new_token - 1
-                    receipts_source = self.event_sources.sources["receipt"]
-                    receipts, _key = await receipts_source.get_new_events_as(
-                        service=service,
-                        from_key=from_key
-                    )
-                    events = receipts
+                    events = await self._handle_receipts(service)
                 elif stream_key == "presence_key":
                     events = await self._handle_as_presence(service, users)
                 elif stream_key == "device_list_key":
                     # Check if the device lists have changed for any of the users we are interested in
-                    print("device_list_key", users)
+                    events = await self._handle_device_list(service, users, new_token)
                 elif stream_key == "to_device_key":
-                    # Check the inbox for any users the bridge owns 
-                    events, to_device_token = await self._handle_to_device(service, users, new_token)
-                    if events:
-                        # TODO: Do in background?
-                        await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token)
-                        if stream_key == "to_device_key":
-                            # Update database with new token
-                            await self.store.set_device_messages_token_for_appservice(service, to_device_token)
-                        return
+                    # Check the inbox for any users the bridge owns
+                    events = await self._handle_to_device(service, users, new_token)
                 if events:
                     # TODO: Do in background?
-                    await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token)
+                    await self.scheduler.submit_ephemeral_events_for_as(
+                        service, events, new_token
+                    )
+                    # We don't persist the token for typing_key
+                    if stream_key == "presence_key":
+                        await self.store.set_type_stream_id_for_appservice(
+                            service, "presence", new_token
+                        )
+                    elif stream_key == "receipt_key":
+                        await self.store.set_type_stream_id_for_appservice(
+                            service, "read_receipt", new_token
+                        )
+                    elif stream_key == "to_device_key":
+                        await self.store.set_type_stream_id_for_appservice(
+                            service, "to_device", new_token
+                        )
 
-    async def _handle_device_list(self, service, users, token):
-        if not any([True for u in users if service.is_interested_in_user(u)]):
-            return False
+    async def _handle_typing(self, service, new_token):
+        typing_source = self.event_sources.sources["typing"]
+        # Get the typing events from just before current
+        typing, _key = await typing_source.get_new_events_as(
+            service=service,
+            # For performance reasons, we don't persist the previous
+            # token in the DB and instead fetch the latest typing information
+            # for appservices.
+            from_key=new_token - 1,
+        )
+        return typing
+
+    async def _handle_receipts(self, service, token: int):
+        from_key = await self.store.get_type_stream_id_for_appservice(
+            service, "read_receipt"
+        )
+        receipts_source = self.event_sources.sources["receipt"]
+        receipts, _ = await receipts_source.get_new_events_as(
+            service=service, from_key=from_key
+        )
+        return receipts
+
+    async def _handle_device_list(
+        self, service: ApplicationService, users: List[str], new_token: int
+    ):
+        # TODO: Determine if any user have left and report those
+        from_token = await self.store.get_type_stream_id_for_appservice(
+            service, "device_list"
+        )
+        changed_user_ids = await self.store.get_device_changes_for_as(
+            service, from_token, new_token
+        )
+        # Return the
+        return {
+            "type": "m.device_list_update",
+            "content": {"changed": changed_user_ids,}, 
+        }
 
     async def _handle_to_device(self, service, users, token):
         if not any([True for u in users if service.is_interested_in_user(u)]):
             return False
-        
-        since_token = await self.store.get_device_messages_token_for_appservice(service)
-        
-        messages, new_token = await self.store.get_new_messages_for_as(service, since_token, token)
-        return messages, new_token
+
+        since_token = await self.store.get_type_stream_id_for_appservice(
+            service, "to_device"
+        )
+        messages, _ = await self.store.get_new_messages_for_as(
+            service, since_token, token
+        )
+        # This returns user_id -> device_id -> message
+        return messages
 
     async def _handle_as_presence(self, service, users):
         events = []
         presence_source = self.event_sources.sources["presence"]
+        from_key = await self.store.get_type_stream_id_for_appservice(
+            service, "presence"
+        )
         for user in users:
             interested = await service.is_interested_in_presence(user, self.store)
             if not interested:
                 continue
             presence_events, _key = await presence_source.get_new_events(
-                user=user,
-                service=service,
-                from_key=None, # TODO: I don't think this is required?
+                user=user, service=service, from_key=from_key,
             )
             time_now = self.clock.time_msec()
             presence_events = [