diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 1b8479b0b4..deeaaec4e6 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -380,7 +380,7 @@ class FederationSenderHandler:
# changes.
hosts = {row.entity for row in rows if not row.entity.startswith("@")}
for host in hosts:
- self.federation_sender.send_device_messages(host)
+ self.federation_sender.send_device_messages(host, immediate=False)
elif stream_name == ToDeviceStream.NAME:
# The to_device stream includes stuff to be pushed to both local
@@ -462,6 +462,8 @@ class FederationSenderHandler:
# We ACK this token over replication so that the master can drop
# its in memory queues
- self._hs.get_tcp_replication().send_federation_ack(current_position)
+ self._hs.get_replication_command_handler().send_federation_ack(
+ current_position
+ )
except Exception:
logger.exception("Error updating federation stream position")
diff --git a/synapse/replication/tcp/external_cache.py b/synapse/replication/tcp/external_cache.py
index aaf91e5e02..bf7d017968 100644
--- a/synapse/replication/tcp/external_cache.py
+++ b/synapse/replication/tcp/external_cache.py
@@ -21,7 +21,7 @@ from synapse.logging.context import make_deferred_yieldable
from synapse.util import json_decoder, json_encoder
if TYPE_CHECKING:
- from txredisapi import RedisProtocol
+ from txredisapi import ConnectionHandler
from synapse.server import HomeServer
@@ -63,7 +63,7 @@ class ExternalCache:
def __init__(self, hs: "HomeServer"):
if hs.config.redis.redis_enabled:
self._redis_connection: Optional[
- "RedisProtocol"
+ "ConnectionHandler"
] = hs.get_outbound_redis_connection()
else:
self._redis_connection = None
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 0d2013a3cf..b217c35f99 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -295,9 +295,7 @@ class ReplicationCommandHandler:
raise Exception("Unrecognised command %s in stream queue", cmd.NAME)
def start_replication(self, hs: "HomeServer") -> None:
- """Helper method to start a replication connection to the remote server
- using TCP.
- """
+ """Helper method to start replication."""
if hs.config.redis.redis_enabled:
from synapse.replication.tcp.redis import (
RedisDirectTcpReplicationClientFactory,
@@ -711,7 +709,7 @@ class ReplicationCommandHandler:
self.send_command(RemoteServerUpCommand(server))
def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> None:
- """Called when a new update is available to stream to clients.
+ """Called when a new update is available to stream to Redis subscribers.
We need to check if the client is interested in the stream or not
"""
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 3170f7c59b..989c5be032 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -93,7 +93,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
synapse_handler: "ReplicationCommandHandler"
synapse_stream_name: str
- synapse_outbound_redis_connection: txredisapi.RedisProtocol
+ synapse_outbound_redis_connection: txredisapi.ConnectionHandler
def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
@@ -313,7 +313,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
protocol = RedisSubscriber
def __init__(
- self, hs: "HomeServer", outbound_redis_connection: txredisapi.RedisProtocol
+ self, hs: "HomeServer", outbound_redis_connection: txredisapi.ConnectionHandler
):
super().__init__(
@@ -325,7 +325,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
password=hs.config.redis.redis_password,
)
- self.synapse_handler = hs.get_tcp_replication()
+ self.synapse_handler = hs.get_replication_command_handler()
self.synapse_stream_name = hs.hostname
self.synapse_outbound_redis_connection = outbound_redis_connection
@@ -353,7 +353,7 @@ def lazyConnection(
reconnect: bool = True,
password: Optional[str] = None,
replyTimeout: int = 30,
-) -> txredisapi.RedisProtocol:
+) -> txredisapi.ConnectionHandler:
"""Creates a connection to Redis that is lazily set up and reconnects if the
connections is lost.
"""
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 494e42a2be..c6870df8f9 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -44,7 +44,7 @@ class ReplicationStreamProtocolFactory(ServerFactory):
"""Factory for new replication connections."""
def __init__(self, hs: "HomeServer"):
- self.command_handler = hs.get_tcp_replication()
+ self.command_handler = hs.get_replication_command_handler()
self.clock = hs.get_clock()
self.server_name = hs.config.server.server_name
@@ -67,8 +67,8 @@ class ReplicationStreamProtocolFactory(ServerFactory):
class ReplicationStreamer:
"""Handles replication connections.
- This needs to be poked when new replication data may be available. When new
- data is available it will propagate to all connected clients.
+ This needs to be poked when new replication data may be available.
+ When new data is available it will propagate to all Redis subscribers.
"""
def __init__(self, hs: "HomeServer"):
@@ -85,7 +85,7 @@ class ReplicationStreamer:
self.is_looping = False
self.pending_updates = False
- self.command_handler = hs.get_tcp_replication()
+ self.command_handler = hs.get_replication_command_handler()
# Set of streams to replicate.
self.streams = self.command_handler.get_streams_to_replicate()
@@ -109,7 +109,7 @@ class ReplicationStreamer:
def on_notifier_poke(self) -> None:
"""Checks if there is actually any new data and sends it to the
- connections if there are.
+ Redis subscribers if there are.
This should get called each time new data is available, even if it
is currently being executed, so that nothing gets missed
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 23d631a769..495f2f0285 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -316,7 +316,19 @@ class PresenceFederationStream(Stream):
class TypingStream(Stream):
@attr.s(slots=True, frozen=True, auto_attribs=True)
class TypingStreamRow:
+ """
+ An entry in the typing stream.
+ Describes all the users that are 'typing' right now in one room.
+
+ When a user stops typing, it will be streamed as a new update with that
+ user absent; you can think of the `user_ids` list as overwriting the
+ entire list that was there previously.
+ """
+
+ # The room that this update is for.
room_id: str
+
+ # All the users that are 'typing' right now in the specified room.
user_ids: List[str]
NAME = "typing"
|