summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/client.py4
-rw-r--r--synapse/replication/tcp/protocol.py2
-rw-r--r--synapse/replication/tcp/resource.py9
3 files changed, 11 insertions, 4 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py

index fc06a7b053..02ab5b66ea 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py
@@ -31,6 +31,7 @@ from .commands import ( Command, FederationAckCommand, InvalidateCacheCommand, + RemoteServerUpCommand, RemovePusherCommand, UserIpCommand, UserSyncCommand, @@ -210,6 +211,9 @@ class ReplicationClientHandler(AbstractReplicationClientHandler): cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen) self.send_command(cmd) + def send_remote_server_up(self, server: str): + self.send_command(RemoteServerUpCommand(server)) + def await_sync(self, data): """Returns a deferred that is resolved when we receive a SYNC command with given data. diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 131e5acb09..bc1482a9bb 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py
@@ -459,7 +459,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): await self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id) async def on_INVALIDATE_CACHE(self, cmd): - self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) + await self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand): self.streamer.on_remote_server_up(cmd.data) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 6ebf944f66..ce60ae2e07 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py
@@ -17,7 +17,7 @@ import logging import random -from typing import List +from typing import Any, List from six import itervalues @@ -271,11 +271,14 @@ class ReplicationStreamer(object): self.notifier.on_new_replication_data() @measure_func("repl.on_invalidate_cache") - def on_invalidate_cache(self, cache_func, keys): + async def on_invalidate_cache(self, cache_func: str, keys: List[Any]): """The client has asked us to invalidate a cache """ invalidate_cache_counter.inc() - getattr(self.store, cache_func).invalidate(tuple(keys)) + + # We invalidate the cache locally, but then also stream that to other + # workers. + await self.store.invalidate_cache_and_stream(cache_func, tuple(keys)) @measure_func("repl.on_user_ip") async def on_user_ip(