summary refs log tree commit diff
path: root/synapse/replication/slave/storage/devices.py
diff options
context:
space:
mode:
authorBen Banfield-Zanin <benbz@matrix.org>2020-09-15 11:44:49 +0100
committerBen Banfield-Zanin <benbz@matrix.org>2020-09-15 11:44:49 +0100
commit1a7d96aa6ff81638f2ea696fdee2ec44e7bff75a (patch)
tree1839e80f89c53b34ff1b36974305c6cb0c94aab4 /synapse/replication/slave/storage/devices.py
parentFix group server for older synapse (diff)
parentClarify changelog. (diff)
downloadsynapse-1a7d96aa6ff81638f2ea696fdee2ec44e7bff75a.tar.xz
Merge remote-tracking branch 'origin/release-v1.20.0' into bbz/info-mainline-1.20.0 github/bbz/info-mainline-1.20.0 bbz/info-mainline-1.20.0
Diffstat (limited to 'synapse/replication/slave/storage/devices.py')
-rw-r--r--synapse/replication/slave/storage/devices.py15
1 files changed, 9 insertions, 6 deletions
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py

index 9d8067342f..3b788c9625 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py
@@ -16,14 +16,14 @@ from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream -from synapse.storage.data_stores.main.devices import DeviceWorkerStore -from synapse.storage.data_stores.main.end_to_end_keys import EndToEndKeyWorkerStore -from synapse.storage.database import Database +from synapse.storage.database import DatabasePool +from synapse.storage.databases.main.devices import DeviceWorkerStore +from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore from synapse.util.caches.stream_change_cache import StreamChangeCache class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedStore): - def __init__(self, database: Database, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs): super(SlavedDeviceStore, self).__init__(database, db_conn, hs) self.hs = hs @@ -48,12 +48,15 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto "DeviceListFederationStreamChangeCache", device_list_max ) + def get_device_stream_token(self) -> int: + return self._device_list_id_gen.get_current_token() + def process_replication_rows(self, stream_name, instance_name, token, rows): if stream_name == DeviceListsStream.NAME: - self._device_list_id_gen.advance(token) + self._device_list_id_gen.advance(instance_name, token) self._invalidate_caches_for_devices(token, rows) elif stream_name == UserSignatureStream.NAME: - self._device_list_id_gen.advance(token) + self._device_list_id_gen.advance(instance_name, token) for row in rows: self._user_signature_stream_cache.entity_has_changed(row.user_id, token) return super().process_replication_rows(stream_name, instance_name, token, rows)