diff options
Diffstat (limited to 'synapse/storage/deviceinbox.py')
-rw-r--r-- | synapse/storage/deviceinbox.py | 51 |
1 files changed, 29 insertions, 22 deletions
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 6b7458304e..f04aad0743 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -20,7 +20,7 @@ from canonicaljson import json from twisted.internet import defer from synapse.logging.opentracing import log_kv, set_tag, trace -from synapse.storage._base import SQLBaseStore +from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util.caches.expiringcache import ExpiringCache @@ -208,11 +208,11 @@ class DeviceInboxWorkerStore(SQLBaseStore): ) -class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): +class DeviceInboxBackgroundUpdateStore(BackgroundUpdateStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" def __init__(self, db_conn, hs): - super(DeviceInboxStore, self).__init__(db_conn, hs) + super(DeviceInboxBackgroundUpdateStore, self).__init__(db_conn, hs) self.register_background_index_update( "device_inbox_stream_index", @@ -225,6 +225,26 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox ) + @defer.inlineCallbacks + def _background_drop_index_device_inbox(self, progress, batch_size): + def reindex_txn(conn): + txn = conn.cursor() + txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id") + txn.close() + + yield self.runWithConnection(reindex_txn) + + yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID) + + return 1 + + +class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): + DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" + + def __init__(self, db_conn, hs): + super(DeviceInboxStore, self).__init__(db_conn, hs) + # Map of (user_id, device_id) to the last stream_id that has been # deleted up to. This is so that we can no op deletions. self._last_device_delete_cache = ExpiringCache( @@ -358,15 +378,15 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): else: if not devices: continue - sql = ( - "SELECT device_id FROM devices" - " WHERE user_id = ? AND device_id IN (" - + ",".join("?" * len(devices)) - + ")" + + clause, args = make_in_list_sql_clause( + txn.database_engine, "device_id", devices ) + sql = "SELECT device_id FROM devices WHERE user_id = ? AND " + clause + # TODO: Maybe this needs to be done in batches if there are # too many local devices for a given user. - txn.execute(sql, [user_id] + devices) + txn.execute(sql, [user_id] + list(args)) for row in txn: # Only insert into the local inbox if the device exists on # this server @@ -435,16 +455,3 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): return self.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn ) - - @defer.inlineCallbacks - def _background_drop_index_device_inbox(self, progress, batch_size): - def reindex_txn(conn): - txn = conn.cursor() - txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id") - txn.close() - - yield self.runWithConnection(reindex_txn) - - yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID) - - return 1 |