summary refs log tree commit diff
path: root/synapse/storage/deviceinbox.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2019-03-05 09:06:25 +0000
committerGitHub <noreply@github.com>2019-03-05 09:06:25 +0000
commitc3c542bb4a8b62a765792218e982a293e7abf3b7 (patch)
tree6ee996e4b630284dc271ba28d48a735c5974fc7e /synapse/storage/deviceinbox.py
parentMerge pull request #4798 from matrix-org/rav/rr_debug (diff)
parentNewsfile (diff)
downloadsynapse-c3c542bb4a8b62a765792218e982a293e7abf3b7.tar.xz
Merge pull request #4796 from matrix-org/erikj/factor_out_e2e_keys
 Allow /keys/{changes,query} API to run on worker
Diffstat (limited to 'synapse/storage/deviceinbox.py')
-rw-r--r--synapse/storage/deviceinbox.py324
1 files changed, 163 insertions, 161 deletions
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index e06b0bc56d..e6a42a53bb 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -19,14 +19,174 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.storage._base import SQLBaseStore
+from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.util.caches.expiringcache import ExpiringCache
 
-from .background_updates import BackgroundUpdateStore
-
 logger = logging.getLogger(__name__)
 
 
-class DeviceInboxStore(BackgroundUpdateStore):
+class DeviceInboxWorkerStore(SQLBaseStore):
+    def get_to_device_stream_token(self):
+        return self._device_inbox_id_gen.get_current_token()
+
+    def get_new_messages_for_device(
+        self, user_id, device_id, last_stream_id, current_stream_id, limit=100
+    ):
+        """
+        Args:
+            user_id(str): The recipient user_id.
+            device_id(str): The recipient device_id.
+            current_stream_id(int): The current position of the to device
+                message stream.
+        Returns:
+            Deferred ([dict], int): List of messages for the device and where
+                in the stream the messages got to.
+        """
+        has_changed = self._device_inbox_stream_cache.has_entity_changed(
+            user_id, last_stream_id
+        )
+        if not has_changed:
+            return defer.succeed(([], current_stream_id))
+
+        def get_new_messages_for_device_txn(txn):
+            sql = (
+                "SELECT stream_id, message_json FROM device_inbox"
+                " WHERE user_id = ? AND device_id = ?"
+                " AND ? < stream_id AND stream_id <= ?"
+                " ORDER BY stream_id ASC"
+                " LIMIT ?"
+            )
+            txn.execute(sql, (
+                user_id, device_id, last_stream_id, current_stream_id, limit
+            ))
+            messages = []
+            for row in txn:
+                stream_pos = row[0]
+                messages.append(json.loads(row[1]))
+            if len(messages) < limit:
+                stream_pos = current_stream_id
+            return (messages, stream_pos)
+
+        return self.runInteraction(
+            "get_new_messages_for_device", get_new_messages_for_device_txn,
+        )
+
+    @defer.inlineCallbacks
+    def delete_messages_for_device(self, user_id, device_id, up_to_stream_id):
+        """
+        Args:
+            user_id(str): The recipient user_id.
+            device_id(str): The recipient device_id.
+            up_to_stream_id(int): Where to delete messages up to.
+        Returns:
+            A deferred that resolves to the number of messages deleted.
+        """
+        # If we have cached the last stream id we've deleted up to, we can
+        # check if there is likely to be anything that needs deleting
+        last_deleted_stream_id = self._last_device_delete_cache.get(
+            (user_id, device_id), None
+        )
+        if last_deleted_stream_id:
+            has_changed = self._device_inbox_stream_cache.has_entity_changed(
+                user_id, last_deleted_stream_id
+            )
+            if not has_changed:
+                defer.returnValue(0)
+
+        def delete_messages_for_device_txn(txn):
+            sql = (
+                "DELETE FROM device_inbox"
+                " WHERE user_id = ? AND device_id = ?"
+                " AND stream_id <= ?"
+            )
+            txn.execute(sql, (user_id, device_id, up_to_stream_id))
+            return txn.rowcount
+
+        count = yield self.runInteraction(
+            "delete_messages_for_device", delete_messages_for_device_txn
+        )
+
+        # Update the cache, ensuring that we only ever increase the value
+        last_deleted_stream_id = self._last_device_delete_cache.get(
+            (user_id, device_id), 0
+        )
+        self._last_device_delete_cache[(user_id, device_id)] = max(
+            last_deleted_stream_id, up_to_stream_id
+        )
+
+        defer.returnValue(count)
+
+    def get_new_device_msgs_for_remote(
+        self, destination, last_stream_id, current_stream_id, limit=100
+    ):
+        """
+        Args:
+            destination(str): The name of the remote server.
+            last_stream_id(int|long): The last position of the device message stream
+                that the server sent up to.
+            current_stream_id(int|long): The current position of the device
+                message stream.
+        Returns:
+            Deferred ([dict], int|long): List of messages for the device and where
+                in the stream the messages got to.
+        """
+
+        has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
+            destination, last_stream_id
+        )
+        if not has_changed or last_stream_id == current_stream_id:
+            return defer.succeed(([], current_stream_id))
+
+        def get_new_messages_for_remote_destination_txn(txn):
+            sql = (
+                "SELECT stream_id, messages_json FROM device_federation_outbox"
+                " WHERE destination = ?"
+                " AND ? < stream_id AND stream_id <= ?"
+                " ORDER BY stream_id ASC"
+                " LIMIT ?"
+            )
+            txn.execute(sql, (
+                destination, last_stream_id, current_stream_id, limit
+            ))
+            messages = []
+            for row in txn:
+                stream_pos = row[0]
+                messages.append(json.loads(row[1]))
+            if len(messages) < limit:
+                stream_pos = current_stream_id
+            return (messages, stream_pos)
+
+        return self.runInteraction(
+            "get_new_device_msgs_for_remote",
+            get_new_messages_for_remote_destination_txn,
+        )
+
+    def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
+        """Used to delete messages when the remote destination acknowledges
+        their receipt.
+
+        Args:
+            destination(str): The destination server_name
+            up_to_stream_id(int): Where to delete messages up to.
+        Returns:
+            A deferred that resolves when the messages have been deleted.
+        """
+        def delete_messages_for_remote_destination_txn(txn):
+            sql = (
+                "DELETE FROM device_federation_outbox"
+                " WHERE destination = ?"
+                " AND stream_id <= ?"
+            )
+            txn.execute(sql, (destination, up_to_stream_id))
+
+        return self.runInteraction(
+            "delete_device_msgs_for_remote",
+            delete_messages_for_remote_destination_txn
+        )
+
+
+class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
     DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
 
     def __init__(self, db_conn, hs):
@@ -220,93 +380,6 @@ class DeviceInboxStore(BackgroundUpdateStore):
 
         txn.executemany(sql, rows)
 
-    def get_new_messages_for_device(
-        self, user_id, device_id, last_stream_id, current_stream_id, limit=100
-    ):
-        """
-        Args:
-            user_id(str): The recipient user_id.
-            device_id(str): The recipient device_id.
-            current_stream_id(int): The current position of the to device
-                message stream.
-        Returns:
-            Deferred ([dict], int): List of messages for the device and where
-                in the stream the messages got to.
-        """
-        has_changed = self._device_inbox_stream_cache.has_entity_changed(
-            user_id, last_stream_id
-        )
-        if not has_changed:
-            return defer.succeed(([], current_stream_id))
-
-        def get_new_messages_for_device_txn(txn):
-            sql = (
-                "SELECT stream_id, message_json FROM device_inbox"
-                " WHERE user_id = ? AND device_id = ?"
-                " AND ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC"
-                " LIMIT ?"
-            )
-            txn.execute(sql, (
-                user_id, device_id, last_stream_id, current_stream_id, limit
-            ))
-            messages = []
-            for row in txn:
-                stream_pos = row[0]
-                messages.append(json.loads(row[1]))
-            if len(messages) < limit:
-                stream_pos = current_stream_id
-            return (messages, stream_pos)
-
-        return self.runInteraction(
-            "get_new_messages_for_device", get_new_messages_for_device_txn,
-        )
-
-    @defer.inlineCallbacks
-    def delete_messages_for_device(self, user_id, device_id, up_to_stream_id):
-        """
-        Args:
-            user_id(str): The recipient user_id.
-            device_id(str): The recipient device_id.
-            up_to_stream_id(int): Where to delete messages up to.
-        Returns:
-            A deferred that resolves to the number of messages deleted.
-        """
-        # If we have cached the last stream id we've deleted up to, we can
-        # check if there is likely to be anything that needs deleting
-        last_deleted_stream_id = self._last_device_delete_cache.get(
-            (user_id, device_id), None
-        )
-        if last_deleted_stream_id:
-            has_changed = self._device_inbox_stream_cache.has_entity_changed(
-                user_id, last_deleted_stream_id
-            )
-            if not has_changed:
-                defer.returnValue(0)
-
-        def delete_messages_for_device_txn(txn):
-            sql = (
-                "DELETE FROM device_inbox"
-                " WHERE user_id = ? AND device_id = ?"
-                " AND stream_id <= ?"
-            )
-            txn.execute(sql, (user_id, device_id, up_to_stream_id))
-            return txn.rowcount
-
-        count = yield self.runInteraction(
-            "delete_messages_for_device", delete_messages_for_device_txn
-        )
-
-        # Update the cache, ensuring that we only ever increase the value
-        last_deleted_stream_id = self._last_device_delete_cache.get(
-            (user_id, device_id), 0
-        )
-        self._last_device_delete_cache[(user_id, device_id)] = max(
-            last_deleted_stream_id, up_to_stream_id
-        )
-
-        defer.returnValue(count)
-
     def get_all_new_device_messages(self, last_pos, current_pos, limit):
         """
         Args:
@@ -351,77 +424,6 @@ class DeviceInboxStore(BackgroundUpdateStore):
             "get_all_new_device_messages", get_all_new_device_messages_txn
         )
 
-    def get_to_device_stream_token(self):
-        return self._device_inbox_id_gen.get_current_token()
-
-    def get_new_device_msgs_for_remote(
-        self, destination, last_stream_id, current_stream_id, limit=100
-    ):
-        """
-        Args:
-            destination(str): The name of the remote server.
-            last_stream_id(int|long): The last position of the device message stream
-                that the server sent up to.
-            current_stream_id(int|long): The current position of the device
-                message stream.
-        Returns:
-            Deferred ([dict], int|long): List of messages for the device and where
-                in the stream the messages got to.
-        """
-
-        has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
-            destination, last_stream_id
-        )
-        if not has_changed or last_stream_id == current_stream_id:
-            return defer.succeed(([], current_stream_id))
-
-        def get_new_messages_for_remote_destination_txn(txn):
-            sql = (
-                "SELECT stream_id, messages_json FROM device_federation_outbox"
-                " WHERE destination = ?"
-                " AND ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC"
-                " LIMIT ?"
-            )
-            txn.execute(sql, (
-                destination, last_stream_id, current_stream_id, limit
-            ))
-            messages = []
-            for row in txn:
-                stream_pos = row[0]
-                messages.append(json.loads(row[1]))
-            if len(messages) < limit:
-                stream_pos = current_stream_id
-            return (messages, stream_pos)
-
-        return self.runInteraction(
-            "get_new_device_msgs_for_remote",
-            get_new_messages_for_remote_destination_txn,
-        )
-
-    def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
-        """Used to delete messages when the remote destination acknowledges
-        their receipt.
-
-        Args:
-            destination(str): The destination server_name
-            up_to_stream_id(int): Where to delete messages up to.
-        Returns:
-            A deferred that resolves when the messages have been deleted.
-        """
-        def delete_messages_for_remote_destination_txn(txn):
-            sql = (
-                "DELETE FROM device_federation_outbox"
-                " WHERE destination = ?"
-                " AND stream_id <= ?"
-            )
-            txn.execute(sql, (destination, up_to_stream_id))
-
-        return self.runInteraction(
-            "delete_device_msgs_for_remote",
-            delete_messages_for_remote_destination_txn
-        )
-
     @defer.inlineCallbacks
     def _background_drop_index_device_inbox(self, progress, batch_size):
         def reindex_txn(conn):