From d7983b63a6746d92225295f1e9d521f847cf8ba7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 May 2020 13:51:08 +0100 Subject: Support any process writing to cache invalidation stream. (#7436) --- synapse/replication/tcp/handler.py | 42 +++++++------------------------------- 1 file changed, 7 insertions(+), 35 deletions(-) (limited to 'synapse/replication/tcp/handler.py') diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index b14a3d9fca..7c5d6c76e7 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -15,18 +15,7 @@ # limitations under the License. import logging -from typing import ( - Any, - Callable, - Dict, - Iterable, - Iterator, - List, - Optional, - Set, - Tuple, - TypeVar, -) +from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, TypeVar from prometheus_client import Counter @@ -38,7 +27,6 @@ from synapse.replication.tcp.commands import ( ClearUserSyncsCommand, Command, FederationAckCommand, - InvalidateCacheCommand, PositionCommand, RdataCommand, RemoteServerUpCommand, @@ -171,7 +159,7 @@ class ReplicationCommandHandler: return for stream_name, stream in self._streams.items(): - current_token = stream.current_token() + current_token = stream.current_token(self._instance_name) self.send_command( PositionCommand(stream_name, self._instance_name, current_token) ) @@ -210,18 +198,6 @@ class ReplicationCommandHandler: self._notifier.on_new_replication_data() - async def on_INVALIDATE_CACHE( - self, conn: AbstractConnection, cmd: InvalidateCacheCommand - ): - invalidate_cache_counter.inc() - - if self._is_master: - # We invalidate the cache locally, but then also stream that to other - # workers. - await self._store.invalidate_cache_and_stream( - cmd.cache_func, tuple(cmd.keys) - ) - async def on_USER_IP(self, conn: AbstractConnection, cmd: UserIpCommand): user_ip_cache_counter.inc() @@ -295,7 +271,7 @@ class ReplicationCommandHandler: rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row. """ - logger.debug("Received rdata %s -> %s", stream_name, token) + logger.debug("Received rdata %s (%s) -> %s", stream_name, instance_name, token) await self._replication_data_handler.on_rdata( stream_name, instance_name, token, rows ) @@ -326,7 +302,7 @@ class ReplicationCommandHandler: self._pending_batches.pop(stream_name, []) # Find where we previously streamed up to. - current_token = stream.current_token() + current_token = stream.current_token(cmd.instance_name) # If the position token matches our current token then we're up to # date and there's nothing to do. Otherwise, fetch all updates @@ -363,7 +339,9 @@ class ReplicationCommandHandler: logger.info("Caught up with stream '%s' to %i", stream_name, cmd.token) # We've now caught up to position sent to us, notify handler. - await self._replication_data_handler.on_position(stream_name, cmd.token) + await self._replication_data_handler.on_position( + cmd.stream_name, cmd.instance_name, cmd.token + ) self._streams_by_connection.setdefault(conn, set()).add(stream_name) @@ -491,12 +469,6 @@ class ReplicationCommandHandler: cmd = RemovePusherCommand(app_id, push_key, user_id) self.send_command(cmd) - def send_invalidate_cache(self, cache_func: Callable, keys: tuple): - """Poke the master to invalidate a cache. - """ - cmd = InvalidateCacheCommand(cache_func.__name__, keys) - self.send_command(cmd) - def send_user_ip( self, user_id: str, -- cgit 1.5.1 From 8ca79613e62201f0990a482130527fc88c4ffd40 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2020 09:57:15 +0100 Subject: Fix Redis reconnection logic (#7482) Proactively send out `POSITION` commands (as if we had just received a `REPLICATE`) when we connect to Redis. This is important as other instances won't notice we've connected to issue a `REPLICATE` command (unlike for direct TCP connections). This is only currently an issue if master process reconnects without restarting (if it restarts then it won't have written anything and so other instances probably won't have missed anything). --- changelog.d/7482.bugfix | 1 + synapse/replication/tcp/handler.py | 9 ++++++++- synapse/replication/tcp/redis.py | 7 ++++++- 3 files changed, 15 insertions(+), 2 deletions(-) create mode 100644 changelog.d/7482.bugfix (limited to 'synapse/replication/tcp/handler.py') diff --git a/changelog.d/7482.bugfix b/changelog.d/7482.bugfix new file mode 100644 index 0000000000..018bf5cc89 --- /dev/null +++ b/changelog.d/7482.bugfix @@ -0,0 +1 @@ +Fix Redis reconnection logic that can result in missed updates over replication if master reconnects to Redis without restarting. diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 1b05468483..e6a50aa74e 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -151,6 +151,13 @@ class ReplicationCommandHandler: hs.get_reactor().connectTCP(host, port, self._factory) async def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand): + self.send_positions_to_connection(conn) + + def send_positions_to_connection(self, conn: AbstractConnection): + """Send current position of all streams this process is source of to + the connection. + """ + # We only want to announce positions by the writer of the streams. # Currently this is just the master process. if not self._is_master: @@ -158,7 +165,7 @@ class ReplicationCommandHandler: for stream_name, stream in self._streams.items(): current_token = stream.current_token(self._instance_name) - self.send_command( + conn.send_command( PositionCommand(stream_name, self._instance_name, current_token) ) diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 55bfa71dfd..e776b63183 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -70,7 +70,6 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): logger.info("Connected to redis") super().connectionMade() run_as_background_process("subscribe-replication", self._send_subscribe) - self.handler.new_connection(self) async def _send_subscribe(self): # it's important to make sure that we only send the REPLICATE command once we @@ -81,9 +80,15 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): logger.info( "Successfully subscribed to redis stream, sending REPLICATE command" ) + self.handler.new_connection(self) await self._async_send_command(ReplicateCommand()) logger.info("REPLICATE successfully sent") + # We send out our positions when there is a new connection in case the + # other side missed updates. We do this for Redis connections as the + # otherside won't know we've connected and so won't issue a REPLICATE. + self.handler.send_positions_to_connection(self) + def messageReceived(self, pattern: str, channel: str, message: str): """Received a message from redis. """ -- cgit 1.5.1 From 7ee24c5674a36dc9cd7163cfdd3e14b74570dc77 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2020 10:27:02 +0100 Subject: Have all instances correctly respond to REPLICATE command. (#7475) Before all streams were only written to from master, so only master needed to respond to `REPLICATE` commands. Before all instances wrote to the cache invalidation stream, but didn't respond to `REPLICATE`. This was a bug, which could lead to missed rows from cache invalidation stream if an instance is restarted, however all the caches would be empty in that case so it wasn't a problem. --- changelog.d/7475.misc | 1 + synapse/replication/http/streams.py | 4 +-- synapse/replication/tcp/handler.py | 55 ++++++++++++++++++++++++++++++------- synapse/replication/tcp/resource.py | 39 ++------------------------ 4 files changed, 51 insertions(+), 48 deletions(-) create mode 100644 changelog.d/7475.misc (limited to 'synapse/replication/tcp/handler.py') diff --git a/changelog.d/7475.misc b/changelog.d/7475.misc new file mode 100644 index 0000000000..77759c3bd4 --- /dev/null +++ b/changelog.d/7475.misc @@ -0,0 +1 @@ +Have all instance correctly respond to REPLICATE command. diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py index 0459f582bf..b705a8e16c 100644 --- a/synapse/replication/http/streams.py +++ b/synapse/replication/http/streams.py @@ -52,9 +52,9 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint): self._instance_name = hs.get_instance_name() - # We pull the streams from the replication steamer (if we try and make + # We pull the streams from the replication handler (if we try and make # them ourselves we end up in an import loop). - self.streams = hs.get_replication_streamer().get_streams() + self.streams = hs.get_tcp_replication().get_streams() @staticmethod def _serialize_payload(stream_name, from_token, upto_token): diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index e6a50aa74e..acfa66a7a8 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -36,7 +36,12 @@ from synapse.replication.tcp.commands import ( UserSyncCommand, ) from synapse.replication.tcp.protocol import AbstractConnection -from synapse.replication.tcp.streams import STREAMS_MAP, Stream +from synapse.replication.tcp.streams import ( + STREAMS_MAP, + CachesStream, + FederationStream, + Stream, +) from synapse.util.async_helpers import Linearizer logger = logging.getLogger(__name__) @@ -73,6 +78,26 @@ class ReplicationCommandHandler: stream.NAME: stream(hs) for stream in STREAMS_MAP.values() } # type: Dict[str, Stream] + # List of streams that this instance is the source of + self._streams_to_replicate = [] # type: List[Stream] + + for stream in self._streams.values(): + if stream.NAME == CachesStream.NAME: + # All workers can write to the cache invalidation stream. + self._streams_to_replicate.append(stream) + continue + + # Only add any other streams if we're on master. + if hs.config.worker_app is not None: + continue + + if stream.NAME == FederationStream.NAME and hs.config.send_federation: + # We only support federation stream if federation sending + # has been disabled on the master. + continue + + self._streams_to_replicate.append(stream) + self._position_linearizer = Linearizer( "replication_position", clock=self._clock ) @@ -150,6 +175,16 @@ class ReplicationCommandHandler: port = hs.config.worker_replication_port hs.get_reactor().connectTCP(host, port, self._factory) + def get_streams(self) -> Dict[str, Stream]: + """Get a map from stream name to all streams. + """ + return self._streams + + def get_streams_to_replicate(self) -> List[Stream]: + """Get a list of streams that this instances replicates. + """ + return self._streams_to_replicate + async def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand): self.send_positions_to_connection(conn) @@ -158,15 +193,15 @@ class ReplicationCommandHandler: the connection. """ - # We only want to announce positions by the writer of the streams. - # Currently this is just the master process. - if not self._is_master: - return - - for stream_name, stream in self._streams.items(): - current_token = stream.current_token(self._instance_name) - conn.send_command( - PositionCommand(stream_name, self._instance_name, current_token) + # We respond with current position of all streams this instance + # replicates. + for stream in self.get_streams_to_replicate(): + self.send_command( + PositionCommand( + stream.NAME, + self._instance_name, + stream.current_token(self._instance_name), + ) ) async def on_USER_SYNC(self, conn: AbstractConnection, cmd: UserSyncCommand): diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 002171ce7c..41569305df 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -17,7 +17,6 @@ import logging import random -from typing import Dict, List from prometheus_client import Counter @@ -25,12 +24,6 @@ from twisted.internet.protocol import Factory from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol -from synapse.replication.tcp.streams import ( - STREAMS_MAP, - CachesStream, - FederationStream, - Stream, -) from synapse.util.metrics import Measure stream_updates_counter = Counter( @@ -80,31 +73,7 @@ class ReplicationStreamer(object): self._replication_torture_level = hs.config.replication_torture_level - # Work out list of streams that this instance is the source of. - self.streams = [] # type: List[Stream] - - # All workers can write to the cache invalidation stream. - self.streams.append(CachesStream(hs)) - - if hs.config.worker_app is None: - for stream in STREAMS_MAP.values(): - if stream == FederationStream and hs.config.send_federation: - # We only support federation stream if federation sending - # has been disabled on the master. - continue - - if stream == CachesStream: - # We've already added it above. - continue - - self.streams.append(stream(hs)) - - self.streams_by_name = {stream.NAME: stream for stream in self.streams} - - # Only bother registering the notifier callback if we have streams to - # publish. - if self.streams: - self.notifier.add_replication_callback(self.on_notifier_poke) + self.notifier.add_replication_callback(self.on_notifier_poke) # Keeps track of whether we are currently checking for updates self.is_looping = False @@ -112,10 +81,8 @@ class ReplicationStreamer(object): self.command_handler = hs.get_tcp_replication() - def get_streams(self) -> Dict[str, Stream]: - """Get a mapp from stream name to stream instance. - """ - return self.streams_by_name + # Set of streams to replicate. + self.streams = self.command_handler.get_streams_to_replicate() def on_notifier_poke(self): """Checks if there is actually any new data and sends it to the -- cgit 1.5.1