summary refs log tree commit diff
path: root/synapse/replication/tcp/handler.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r--synapse/replication/tcp/handler.py42
1 files changed, 7 insertions, 35 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index b14a3d9fca..7c5d6c76e7 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -15,18 +15,7 @@
 # limitations under the License.
 
 import logging
-from typing import (
-    Any,
-    Callable,
-    Dict,
-    Iterable,
-    Iterator,
-    List,
-    Optional,
-    Set,
-    Tuple,
-    TypeVar,
-)
+from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, TypeVar
 
 from prometheus_client import Counter
 
@@ -38,7 +27,6 @@ from synapse.replication.tcp.commands import (
     ClearUserSyncsCommand,
     Command,
     FederationAckCommand,
-    InvalidateCacheCommand,
     PositionCommand,
     RdataCommand,
     RemoteServerUpCommand,
@@ -171,7 +159,7 @@ class ReplicationCommandHandler:
             return
 
         for stream_name, stream in self._streams.items():
-            current_token = stream.current_token()
+            current_token = stream.current_token(self._instance_name)
             self.send_command(
                 PositionCommand(stream_name, self._instance_name, current_token)
             )
@@ -210,18 +198,6 @@ class ReplicationCommandHandler:
 
             self._notifier.on_new_replication_data()
 
-    async def on_INVALIDATE_CACHE(
-        self, conn: AbstractConnection, cmd: InvalidateCacheCommand
-    ):
-        invalidate_cache_counter.inc()
-
-        if self._is_master:
-            # We invalidate the cache locally, but then also stream that to other
-            # workers.
-            await self._store.invalidate_cache_and_stream(
-                cmd.cache_func, tuple(cmd.keys)
-            )
-
     async def on_USER_IP(self, conn: AbstractConnection, cmd: UserIpCommand):
         user_ip_cache_counter.inc()
 
@@ -295,7 +271,7 @@ class ReplicationCommandHandler:
             rows: a list of Stream.ROW_TYPE objects as returned by
                 Stream.parse_row.
         """
-        logger.debug("Received rdata %s -> %s", stream_name, token)
+        logger.debug("Received rdata %s (%s) -> %s", stream_name, instance_name, token)
         await self._replication_data_handler.on_rdata(
             stream_name, instance_name, token, rows
         )
@@ -326,7 +302,7 @@ class ReplicationCommandHandler:
             self._pending_batches.pop(stream_name, [])
 
             # Find where we previously streamed up to.
-            current_token = stream.current_token()
+            current_token = stream.current_token(cmd.instance_name)
 
             # If the position token matches our current token then we're up to
             # date and there's nothing to do. Otherwise, fetch all updates
@@ -363,7 +339,9 @@ class ReplicationCommandHandler:
             logger.info("Caught up with stream '%s' to %i", stream_name, cmd.token)
 
             # We've now caught up to position sent to us, notify handler.
-            await self._replication_data_handler.on_position(stream_name, cmd.token)
+            await self._replication_data_handler.on_position(
+                cmd.stream_name, cmd.instance_name, cmd.token
+            )
 
             self._streams_by_connection.setdefault(conn, set()).add(stream_name)
 
@@ -491,12 +469,6 @@ class ReplicationCommandHandler:
         cmd = RemovePusherCommand(app_id, push_key, user_id)
         self.send_command(cmd)
 
-    def send_invalidate_cache(self, cache_func: Callable, keys: tuple):
-        """Poke the master to invalidate a cache.
-        """
-        cmd = InvalidateCacheCommand(cache_func.__name__, keys)
-        self.send_command(cmd)
-
     def send_user_ip(
         self,
         user_id: str,