summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/8648.bugfix1
-rw-r--r--synapse/handlers/appservice.py12
-rw-r--r--synapse/storage/databases/main/appservice.py29
-rw-r--r--tests/storage/test_appservice.py56
4 files changed, 85 insertions, 13 deletions
diff --git a/changelog.d/8648.bugfix b/changelog.d/8648.bugfix
new file mode 100644
index 0000000000..aa71ad0ff2
--- /dev/null
+++ b/changelog.d/8648.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in v1.22.0rc1 which would cause ephemeral events to not be sent to appservices.
\ No newline at end of file
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 7826387e53..03e9ec4d4e 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -236,16 +236,16 @@ class ApplicationServicesHandler:
                     events = await self._handle_receipts(service)
                     if events:
                         self.scheduler.submit_ephemeral_events_for_as(service, events)
-                        await self.store.set_type_stream_id_for_appservice(
-                            service, "read_receipt", new_token
-                        )
+                    await self.store.set_type_stream_id_for_appservice(
+                        service, "read_receipt", new_token
+                    )
                 elif stream_key == "presence_key":
                     events = await self._handle_presence(service, users)
                     if events:
                         self.scheduler.submit_ephemeral_events_for_as(service, events)
-                        await self.store.set_type_stream_id_for_appservice(
-                            service, "presence", new_token
-                        )
+                    await self.store.set_type_stream_id_for_appservice(
+                        service, "presence", new_token
+                    )
 
     async def _handle_typing(self, service: ApplicationService, new_token: int):
         typing_source = self.event_sources.sources["typing"]
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 43bf0f649a..637a938bac 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -369,17 +369,25 @@ class ApplicationServiceTransactionWorkerStore(
     async def get_type_stream_id_for_appservice(
         self, service: ApplicationService, type: str
     ) -> int:
+        if type not in ("read_receipt", "presence"):
+            raise ValueError(
+                "Expected type to be a valid application stream id type, got %s"
+                % (type,)
+            )
+
         def get_type_stream_id_for_appservice_txn(txn):
             stream_id_type = "%s_stream_id" % type
             txn.execute(
-                "SELECT ? FROM application_services_state WHERE as_id=?",
-                (stream_id_type, service.id,),
+                # We do NOT want to escape `stream_id_type`.
+                "SELECT %s FROM application_services_state WHERE as_id=?"
+                % stream_id_type,
+                (service.id,),
             )
-            last_txn_id = txn.fetchone()
-            if last_txn_id is None or last_txn_id[0] is None:  # no row exists
+            last_stream_id = txn.fetchone()
+            if last_stream_id is None or last_stream_id[0] is None:  # no row exists
                 return 0
             else:
-                return int(last_txn_id[0])
+                return int(last_stream_id[0])
 
         return await self.db_pool.runInteraction(
             "get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn
@@ -388,11 +396,18 @@ class ApplicationServiceTransactionWorkerStore(
     async def set_type_stream_id_for_appservice(
         self, service: ApplicationService, type: str, pos: int
     ) -> None:
+        if type not in ("read_receipt", "presence"):
+            raise ValueError(
+                "Expected type to be a valid application stream id type, got %s"
+                % (type,)
+            )
+
         def set_type_stream_id_for_appservice_txn(txn):
             stream_id_type = "%s_stream_id" % type
             txn.execute(
-                "UPDATE ? SET device_list_stream_id = ? WHERE as_id=?",
-                (stream_id_type, pos, service.id),
+                "UPDATE application_services_state SET %s = ? WHERE as_id=?"
+                % stream_id_type,
+                (pos, service.id),
             )
 
         await self.db_pool.runInteraction(
diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py
index c5c7987349..1ce29af5fd 100644
--- a/tests/storage/test_appservice.py
+++ b/tests/storage/test_appservice.py
@@ -410,6 +410,62 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
         )
 
 
+class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase):
+    def make_homeserver(self, reactor, clock):
+        hs = self.setup_test_homeserver()
+        return hs
+
+    def prepare(self, hs, reactor, clock):
+        self.service = Mock(id="foo")
+        self.store = self.hs.get_datastore()
+        self.get_success(self.store.set_appservice_state(self.service, "up"))
+
+    def test_get_type_stream_id_for_appservice_no_value(self):
+        value = self.get_success(
+            self.store.get_type_stream_id_for_appservice(self.service, "read_receipt")
+        )
+        self.assertEquals(value, 0)
+
+        value = self.get_success(
+            self.store.get_type_stream_id_for_appservice(self.service, "presence")
+        )
+        self.assertEquals(value, 0)
+
+    def test_get_type_stream_id_for_appservice_invalid_type(self):
+        self.get_failure(
+            self.store.get_type_stream_id_for_appservice(self.service, "foobar"),
+            ValueError,
+        )
+
+    def test_set_type_stream_id_for_appservice(self):
+        read_receipt_value = 1024
+        self.get_success(
+            self.store.set_type_stream_id_for_appservice(
+                self.service, "read_receipt", read_receipt_value
+            )
+        )
+        result = self.get_success(
+            self.store.get_type_stream_id_for_appservice(self.service, "read_receipt")
+        )
+        self.assertEqual(result, read_receipt_value)
+
+        self.get_success(
+            self.store.set_type_stream_id_for_appservice(
+                self.service, "presence", read_receipt_value
+            )
+        )
+        result = self.get_success(
+            self.store.get_type_stream_id_for_appservice(self.service, "presence")
+        )
+        self.assertEqual(result, read_receipt_value)
+
+    def test_set_type_stream_id_for_appservice_invalid_type(self):
+        self.get_failure(
+            self.store.set_type_stream_id_for_appservice(self.service, "foobar", 1024),
+            ValueError,
+        )
+
+
 # required for ApplicationServiceTransactionStoreTestCase tests
 class TestTransactionStore(ApplicationServiceTransactionStore, ApplicationServiceStore):
     def __init__(self, database: DatabasePool, db_conn, hs):