summary refs log tree commit diff
path: root/synapse/storage/deviceinbox.py
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2020-02-26 12:29:45 +0000
committerAndrew Morgan <andrew@amorgan.xyz>2020-02-26 12:29:45 +0000
commit8ea11f49ea39e9ee083563b23fc4b684bd331212 (patch)
tree76ec510b839c7a925188855afcc0c9a372b3eae6 /synapse/storage/deviceinbox.py
parentadd M_TOO_LARGE error code for uploading a too large file (#6151) (diff)
parentMerge pull request #6178 from matrix-org/babolivier/factor_out_bg_updates (diff)
downloadsynapse-8ea11f49ea39e9ee083563b23fc4b684bd331212.tar.xz
Merge pull request #6178 from matrix-org/babolivier/factor_out_bg_updates
Diffstat (limited to 'synapse/storage/deviceinbox.py')
-rw-r--r--synapse/storage/deviceinbox.py37
1 files changed, 22 insertions, 15 deletions
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py

index 6b7458304e..70bc2bb2cc 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py
@@ -208,11 +208,11 @@ class DeviceInboxWorkerStore(SQLBaseStore): ) -class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): +class DeviceInboxBackgroundUpdateStore(BackgroundUpdateStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" def __init__(self, db_conn, hs): - super(DeviceInboxStore, self).__init__(db_conn, hs) + super(DeviceInboxBackgroundUpdateStore, self).__init__(db_conn, hs) self.register_background_index_update( "device_inbox_stream_index", @@ -225,6 +225,26 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox ) + @defer.inlineCallbacks + def _background_drop_index_device_inbox(self, progress, batch_size): + def reindex_txn(conn): + txn = conn.cursor() + txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id") + txn.close() + + yield self.runWithConnection(reindex_txn) + + yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID) + + return 1 + + +class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): + DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" + + def __init__(self, db_conn, hs): + super(DeviceInboxStore, self).__init__(db_conn, hs) + # Map of (user_id, device_id) to the last stream_id that has been # deleted up to. This is so that we can no op deletions. self._last_device_delete_cache = ExpiringCache( @@ -435,16 +455,3 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): return self.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn ) - - @defer.inlineCallbacks - def _background_drop_index_device_inbox(self, progress, batch_size): - def reindex_txn(conn): - txn = conn.cursor() - txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id") - txn.close() - - yield self.runWithConnection(reindex_txn) - - yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID) - - return 1