summary refs log tree commit diff
path: root/synapse/storage/deviceinbox.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-04-17 19:44:40 +0100
committerErik Johnston <erik@matrix.org>2019-04-17 19:44:40 +0100
commitca90336a6935b36b5761244005b0f68b496d5d79 (patch)
tree6bbce5eafc0db3b24ccc3b59b051da850382ae09 /synapse/storage/deviceinbox.py
parentAdd management endpoints for account validity (diff)
parentMerge pull request #5047 from matrix-org/babolivier/account_expiration (diff)
downloadsynapse-ca90336a6935b36b5761244005b0f68b496d5d79.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into babolivier/account_expiration
Diffstat (limited to 'synapse/storage/deviceinbox.py')
-rw-r--r--synapse/storage/deviceinbox.py63
1 files changed, 24 insertions, 39 deletions
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index e6a42a53bb..fed4ea3610 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -57,9 +57,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 " ORDER BY stream_id ASC"
                 " LIMIT ?"
             )
-            txn.execute(sql, (
-                user_id, device_id, last_stream_id, current_stream_id, limit
-            ))
+            txn.execute(
+                sql, (user_id, device_id, last_stream_id, current_stream_id, limit)
+            )
             messages = []
             for row in txn:
                 stream_pos = row[0]
@@ -69,7 +69,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             return (messages, stream_pos)
 
         return self.runInteraction(
-            "get_new_messages_for_device", get_new_messages_for_device_txn,
+            "get_new_messages_for_device", get_new_messages_for_device_txn
         )
 
     @defer.inlineCallbacks
@@ -146,9 +146,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 " ORDER BY stream_id ASC"
                 " LIMIT ?"
             )
-            txn.execute(sql, (
-                destination, last_stream_id, current_stream_id, limit
-            ))
+            txn.execute(sql, (destination, last_stream_id, current_stream_id, limit))
             messages = []
             for row in txn:
                 stream_pos = row[0]
@@ -172,6 +170,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         Returns:
             A deferred that resolves when the messages have been deleted.
         """
+
         def delete_messages_for_remote_destination_txn(txn):
             sql = (
                 "DELETE FROM device_federation_outbox"
@@ -181,8 +180,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             txn.execute(sql, (destination, up_to_stream_id))
 
         return self.runInteraction(
-            "delete_device_msgs_for_remote",
-            delete_messages_for_remote_destination_txn
+            "delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn
         )
 
 
@@ -200,8 +198,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
         )
 
         self.register_background_update_handler(
-            self.DEVICE_INBOX_STREAM_ID,
-            self._background_drop_index_device_inbox,
+            self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
         )
 
         # Map of (user_id, device_id) to the last stream_id that has been
@@ -214,8 +211,9 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
         )
 
     @defer.inlineCallbacks
-    def add_messages_to_device_inbox(self, local_messages_by_user_then_device,
-                                     remote_messages_by_destination):
+    def add_messages_to_device_inbox(
+        self, local_messages_by_user_then_device, remote_messages_by_destination
+    ):
         """Used to send messages from this server.
 
         Args:
@@ -252,15 +250,10 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
         with self._device_inbox_id_gen.get_next() as stream_id:
             now_ms = self.clock.time_msec()
             yield self.runInteraction(
-                "add_messages_to_device_inbox",
-                add_messages_txn,
-                now_ms,
-                stream_id,
+                "add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id
             )
             for user_id in local_messages_by_user_then_device.keys():
-                self._device_inbox_stream_cache.entity_has_changed(
-                    user_id, stream_id
-                )
+                self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id)
             for destination in remote_messages_by_destination.keys():
                 self._device_federation_outbox_stream_cache.entity_has_changed(
                     destination, stream_id
@@ -277,7 +270,8 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
             # origin. This can happen if the origin doesn't receive our
             # acknowledgement from the first time we received the message.
             already_inserted = self._simple_select_one_txn(
-                txn, table="device_federation_inbox",
+                txn,
+                table="device_federation_inbox",
                 keyvalues={"origin": origin, "message_id": message_id},
                 retcols=("message_id",),
                 allow_none=True,
@@ -288,7 +282,8 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
             # Add an entry for this message_id so that we know we've processed
             # it.
             self._simple_insert_txn(
-                txn, table="device_federation_inbox",
+                txn,
+                table="device_federation_inbox",
                 values={
                     "origin": origin,
                     "message_id": message_id,
@@ -311,19 +306,14 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
                 stream_id,
             )
             for user_id in local_messages_by_user_then_device.keys():
-                self._device_inbox_stream_cache.entity_has_changed(
-                    user_id, stream_id
-                )
+                self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id)
 
         defer.returnValue(stream_id)
 
-    def _add_messages_to_local_device_inbox_txn(self, txn, stream_id,
-                                                messages_by_user_then_device):
-        sql = (
-            "UPDATE device_max_stream_id"
-            " SET stream_id = ?"
-            " WHERE stream_id < ?"
-        )
+    def _add_messages_to_local_device_inbox_txn(
+        self, txn, stream_id, messages_by_user_then_device
+    ):
+        sql = "UPDATE device_max_stream_id" " SET stream_id = ?" " WHERE stream_id < ?"
         txn.execute(sql, (stream_id, stream_id))
 
         local_by_user_then_device = {}
@@ -332,10 +322,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
             devices = list(messages_by_device.keys())
             if len(devices) == 1 and devices[0] == "*":
                 # Handle wildcard device_ids.
-                sql = (
-                    "SELECT device_id FROM devices"
-                    " WHERE user_id = ?"
-                )
+                sql = "SELECT device_id FROM devices" " WHERE user_id = ?"
                 txn.execute(sql, (user_id,))
                 message_json = json.dumps(messages_by_device["*"])
                 for row in txn:
@@ -428,9 +415,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
     def _background_drop_index_device_inbox(self, progress, batch_size):
         def reindex_txn(conn):
             txn = conn.cursor()
-            txn.execute(
-                "DROP INDEX IF EXISTS device_inbox_stream_id"
-            )
+            txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id")
             txn.close()
 
         yield self.runWithConnection(reindex_txn)