diff options
author | Erik Johnston <erik@matrix.org> | 2021-01-07 20:19:26 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-07 20:19:26 +0000 |
commit | b530eaa262b9c8af378f976e5d2628e8c02b10d8 (patch) | |
tree | 0ef6713632c385060feb27328bcbc790be84dd18 /synapse/replication | |
parent | Support routing edu's to multiple instances (#9042) (diff) | |
download | synapse-b530eaa262b9c8af378f976e5d2628e8c02b10d8.tar.xz |
Allow running sendToDevice on workers (#9044)
Diffstat (limited to 'synapse/replication')
-rw-r--r-- | synapse/replication/slave/storage/deviceinbox.py | 32 | ||||
-rw-r--r-- | synapse/replication/tcp/handler.py | 9 |
2 files changed, 10 insertions, 31 deletions
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 62b68dd6e9..1260f6d141 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -14,38 +14,8 @@ # limitations under the License. from synapse.replication.slave.storage._base import BaseSlavedStore -from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker -from synapse.replication.tcp.streams import ToDeviceStream -from synapse.storage.database import DatabasePool from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore -from synapse.util.caches.stream_change_cache import StreamChangeCache class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore): - def __init__(self, database: DatabasePool, db_conn, hs): - super().__init__(database, db_conn, hs) - self._device_inbox_id_gen = SlavedIdTracker( - db_conn, "device_inbox", "stream_id" - ) - self._device_inbox_stream_cache = StreamChangeCache( - "DeviceInboxStreamChangeCache", - self._device_inbox_id_gen.get_current_token(), - ) - self._device_federation_outbox_stream_cache = StreamChangeCache( - "DeviceFederationOutboxStreamChangeCache", - self._device_inbox_id_gen.get_current_token(), - ) - - def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == ToDeviceStream.NAME: - self._device_inbox_id_gen.advance(instance_name, token) - for row in rows: - if row.entity.startswith("@"): - self._device_inbox_stream_cache.entity_has_changed( - row.entity, token - ) - else: - self._device_federation_outbox_stream_cache.entity_has_changed( - row.entity, token - ) - return super().process_replication_rows(stream_name, instance_name, token, rows) + pass diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 95e5502bf2..1f89249475 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -56,6 +56,7 @@ from synapse.replication.tcp.streams import ( EventsStream, FederationStream, Stream, + ToDeviceStream, TypingStream, ) @@ -115,6 +116,14 @@ class ReplicationCommandHandler: continue + if isinstance(stream, ToDeviceStream): + # Only add ToDeviceStream as a source on instances in charge of + # sending to device messages. + if hs.get_instance_name() in hs.config.worker.writers.to_device: + self._streams_to_replicate.append(stream) + + continue + if isinstance(stream, TypingStream): # Only add TypingStream as a source on the instance in charge of # typing. |