diff options
Diffstat (limited to 'synapse/storage/deviceinbox.py')
-rw-r--r-- | synapse/storage/deviceinbox.py | 63 |
1 files changed, 24 insertions, 39 deletions
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index e6a42a53bb..fed4ea3610 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -57,9 +57,9 @@ class DeviceInboxWorkerStore(SQLBaseStore): " ORDER BY stream_id ASC" " LIMIT ?" ) - txn.execute(sql, ( - user_id, device_id, last_stream_id, current_stream_id, limit - )) + txn.execute( + sql, (user_id, device_id, last_stream_id, current_stream_id, limit) + ) messages = [] for row in txn: stream_pos = row[0] @@ -69,7 +69,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): return (messages, stream_pos) return self.runInteraction( - "get_new_messages_for_device", get_new_messages_for_device_txn, + "get_new_messages_for_device", get_new_messages_for_device_txn ) @defer.inlineCallbacks @@ -146,9 +146,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): " ORDER BY stream_id ASC" " LIMIT ?" ) - txn.execute(sql, ( - destination, last_stream_id, current_stream_id, limit - )) + txn.execute(sql, (destination, last_stream_id, current_stream_id, limit)) messages = [] for row in txn: stream_pos = row[0] @@ -172,6 +170,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): Returns: A deferred that resolves when the messages have been deleted. """ + def delete_messages_for_remote_destination_txn(txn): sql = ( "DELETE FROM device_federation_outbox" @@ -181,8 +180,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): txn.execute(sql, (destination, up_to_stream_id)) return self.runInteraction( - "delete_device_msgs_for_remote", - delete_messages_for_remote_destination_txn + "delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn ) @@ -200,8 +198,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): ) self.register_background_update_handler( - self.DEVICE_INBOX_STREAM_ID, - self._background_drop_index_device_inbox, + self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox ) # Map of (user_id, device_id) to the last stream_id that has been @@ -214,8 +211,9 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): ) @defer.inlineCallbacks - def add_messages_to_device_inbox(self, local_messages_by_user_then_device, - remote_messages_by_destination): + def add_messages_to_device_inbox( + self, local_messages_by_user_then_device, remote_messages_by_destination + ): """Used to send messages from this server. Args: @@ -252,15 +250,10 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): with self._device_inbox_id_gen.get_next() as stream_id: now_ms = self.clock.time_msec() yield self.runInteraction( - "add_messages_to_device_inbox", - add_messages_txn, - now_ms, - stream_id, + "add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id ) for user_id in local_messages_by_user_then_device.keys(): - self._device_inbox_stream_cache.entity_has_changed( - user_id, stream_id - ) + self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id) for destination in remote_messages_by_destination.keys(): self._device_federation_outbox_stream_cache.entity_has_changed( destination, stream_id @@ -277,7 +270,8 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): # origin. This can happen if the origin doesn't receive our # acknowledgement from the first time we received the message. already_inserted = self._simple_select_one_txn( - txn, table="device_federation_inbox", + txn, + table="device_federation_inbox", keyvalues={"origin": origin, "message_id": message_id}, retcols=("message_id",), allow_none=True, @@ -288,7 +282,8 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): # Add an entry for this message_id so that we know we've processed # it. self._simple_insert_txn( - txn, table="device_federation_inbox", + txn, + table="device_federation_inbox", values={ "origin": origin, "message_id": message_id, @@ -311,19 +306,14 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): stream_id, ) for user_id in local_messages_by_user_then_device.keys(): - self._device_inbox_stream_cache.entity_has_changed( - user_id, stream_id - ) + self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id) defer.returnValue(stream_id) - def _add_messages_to_local_device_inbox_txn(self, txn, stream_id, - messages_by_user_then_device): - sql = ( - "UPDATE device_max_stream_id" - " SET stream_id = ?" - " WHERE stream_id < ?" - ) + def _add_messages_to_local_device_inbox_txn( + self, txn, stream_id, messages_by_user_then_device + ): + sql = "UPDATE device_max_stream_id" " SET stream_id = ?" " WHERE stream_id < ?" txn.execute(sql, (stream_id, stream_id)) local_by_user_then_device = {} @@ -332,10 +322,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): devices = list(messages_by_device.keys()) if len(devices) == 1 and devices[0] == "*": # Handle wildcard device_ids. - sql = ( - "SELECT device_id FROM devices" - " WHERE user_id = ?" - ) + sql = "SELECT device_id FROM devices" " WHERE user_id = ?" txn.execute(sql, (user_id,)) message_json = json.dumps(messages_by_device["*"]) for row in txn: @@ -428,9 +415,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): def _background_drop_index_device_inbox(self, progress, batch_size): def reindex_txn(conn): txn = conn.cursor() - txn.execute( - "DROP INDEX IF EXISTS device_inbox_stream_id" - ) + txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id") txn.close() yield self.runWithConnection(reindex_txn) |