diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index e6a42a53bb..fed4ea3610 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -57,9 +57,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
" ORDER BY stream_id ASC"
" LIMIT ?"
)
- txn.execute(sql, (
- user_id, device_id, last_stream_id, current_stream_id, limit
- ))
+ txn.execute(
+ sql, (user_id, device_id, last_stream_id, current_stream_id, limit)
+ )
messages = []
for row in txn:
stream_pos = row[0]
@@ -69,7 +69,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
return (messages, stream_pos)
return self.runInteraction(
- "get_new_messages_for_device", get_new_messages_for_device_txn,
+ "get_new_messages_for_device", get_new_messages_for_device_txn
)
@defer.inlineCallbacks
@@ -146,9 +146,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
" ORDER BY stream_id ASC"
" LIMIT ?"
)
- txn.execute(sql, (
- destination, last_stream_id, current_stream_id, limit
- ))
+ txn.execute(sql, (destination, last_stream_id, current_stream_id, limit))
messages = []
for row in txn:
stream_pos = row[0]
@@ -172,6 +170,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
Returns:
A deferred that resolves when the messages have been deleted.
"""
+
def delete_messages_for_remote_destination_txn(txn):
sql = (
"DELETE FROM device_federation_outbox"
@@ -181,8 +180,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
txn.execute(sql, (destination, up_to_stream_id))
return self.runInteraction(
- "delete_device_msgs_for_remote",
- delete_messages_for_remote_destination_txn
+ "delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn
)
@@ -200,8 +198,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
)
self.register_background_update_handler(
- self.DEVICE_INBOX_STREAM_ID,
- self._background_drop_index_device_inbox,
+ self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
)
# Map of (user_id, device_id) to the last stream_id that has been
@@ -214,8 +211,9 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
)
@defer.inlineCallbacks
- def add_messages_to_device_inbox(self, local_messages_by_user_then_device,
- remote_messages_by_destination):
+ def add_messages_to_device_inbox(
+ self, local_messages_by_user_then_device, remote_messages_by_destination
+ ):
"""Used to send messages from this server.
Args:
@@ -252,15 +250,10 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self.clock.time_msec()
yield self.runInteraction(
- "add_messages_to_device_inbox",
- add_messages_txn,
- now_ms,
- stream_id,
+ "add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id
)
for user_id in local_messages_by_user_then_device.keys():
- self._device_inbox_stream_cache.entity_has_changed(
- user_id, stream_id
- )
+ self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id)
for destination in remote_messages_by_destination.keys():
self._device_federation_outbox_stream_cache.entity_has_changed(
destination, stream_id
@@ -277,7 +270,8 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
# origin. This can happen if the origin doesn't receive our
# acknowledgement from the first time we received the message.
already_inserted = self._simple_select_one_txn(
- txn, table="device_federation_inbox",
+ txn,
+ table="device_federation_inbox",
keyvalues={"origin": origin, "message_id": message_id},
retcols=("message_id",),
allow_none=True,
@@ -288,7 +282,8 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
# Add an entry for this message_id so that we know we've processed
# it.
self._simple_insert_txn(
- txn, table="device_federation_inbox",
+ txn,
+ table="device_federation_inbox",
values={
"origin": origin,
"message_id": message_id,
@@ -311,19 +306,14 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
stream_id,
)
for user_id in local_messages_by_user_then_device.keys():
- self._device_inbox_stream_cache.entity_has_changed(
- user_id, stream_id
- )
+ self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id)
defer.returnValue(stream_id)
- def _add_messages_to_local_device_inbox_txn(self, txn, stream_id,
- messages_by_user_then_device):
- sql = (
- "UPDATE device_max_stream_id"
- " SET stream_id = ?"
- " WHERE stream_id < ?"
- )
+ def _add_messages_to_local_device_inbox_txn(
+ self, txn, stream_id, messages_by_user_then_device
+ ):
+ sql = "UPDATE device_max_stream_id" " SET stream_id = ?" " WHERE stream_id < ?"
txn.execute(sql, (stream_id, stream_id))
local_by_user_then_device = {}
@@ -332,10 +322,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
devices = list(messages_by_device.keys())
if len(devices) == 1 and devices[0] == "*":
# Handle wildcard device_ids.
- sql = (
- "SELECT device_id FROM devices"
- " WHERE user_id = ?"
- )
+ sql = "SELECT device_id FROM devices" " WHERE user_id = ?"
txn.execute(sql, (user_id,))
message_json = json.dumps(messages_by_device["*"])
for row in txn:
@@ -428,9 +415,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
def _background_drop_index_device_inbox(self, progress, batch_size):
def reindex_txn(conn):
txn = conn.cursor()
- txn.execute(
- "DROP INDEX IF EXISTS device_inbox_stream_id"
- )
+ txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id")
txn.close()
yield self.runWithConnection(reindex_txn)
|