summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2022-03-28 13:54:02 +0100
committerBrendan Abolivier <babolivier@matrix.org>2022-03-28 13:54:02 +0100
commit25507bffc67c40e83cbcd4a79fdfee3667855a7c (patch)
tree5620b2a06a5a9894ac875ddcf3b232db45cae48d /synapse/replication/tcp
parentMerge branch 'develop' of github.com:matrix-org/synapse into babolivier/sign_... (diff)
parentAdd restrictions by default to open registration in Synapse (#12091) (diff)
downloadsynapse-github/babolivier/sign_json_module.tar.xz
Merge branch 'develop' into babolivier/sign_json_module github/babolivier/sign_json_module babolivier/sign_json_module
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/client.py6
-rw-r--r--synapse/replication/tcp/external_cache.py4
-rw-r--r--synapse/replication/tcp/handler.py6
-rw-r--r--synapse/replication/tcp/redis.py8
-rw-r--r--synapse/replication/tcp/resource.py10
-rw-r--r--synapse/replication/tcp/streams/_base.py12
6 files changed, 29 insertions, 17 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py

index 1b8479b0b4..deeaaec4e6 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py
@@ -380,7 +380,7 @@ class FederationSenderHandler: # changes. hosts = {row.entity for row in rows if not row.entity.startswith("@")} for host in hosts: - self.federation_sender.send_device_messages(host) + self.federation_sender.send_device_messages(host, immediate=False) elif stream_name == ToDeviceStream.NAME: # The to_device stream includes stuff to be pushed to both local @@ -462,6 +462,8 @@ class FederationSenderHandler: # We ACK this token over replication so that the master can drop # its in memory queues - self._hs.get_tcp_replication().send_federation_ack(current_position) + self._hs.get_replication_command_handler().send_federation_ack( + current_position + ) except Exception: logger.exception("Error updating federation stream position") diff --git a/synapse/replication/tcp/external_cache.py b/synapse/replication/tcp/external_cache.py
index aaf91e5e02..bf7d017968 100644 --- a/synapse/replication/tcp/external_cache.py +++ b/synapse/replication/tcp/external_cache.py
@@ -21,7 +21,7 @@ from synapse.logging.context import make_deferred_yieldable from synapse.util import json_decoder, json_encoder if TYPE_CHECKING: - from txredisapi import RedisProtocol + from txredisapi import ConnectionHandler from synapse.server import HomeServer @@ -63,7 +63,7 @@ class ExternalCache: def __init__(self, hs: "HomeServer"): if hs.config.redis.redis_enabled: self._redis_connection: Optional[ - "RedisProtocol" + "ConnectionHandler" ] = hs.get_outbound_redis_connection() else: self._redis_connection = None diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 0d2013a3cf..b217c35f99 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -295,9 +295,7 @@ class ReplicationCommandHandler: raise Exception("Unrecognised command %s in stream queue", cmd.NAME) def start_replication(self, hs: "HomeServer") -> None: - """Helper method to start a replication connection to the remote server - using TCP. - """ + """Helper method to start replication.""" if hs.config.redis.redis_enabled: from synapse.replication.tcp.redis import ( RedisDirectTcpReplicationClientFactory, @@ -711,7 +709,7 @@ class ReplicationCommandHandler: self.send_command(RemoteServerUpCommand(server)) def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> None: - """Called when a new update is available to stream to clients. + """Called when a new update is available to stream to Redis subscribers. We need to check if the client is interested in the stream or not """ diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 3170f7c59b..989c5be032 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py
@@ -93,7 +93,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): synapse_handler: "ReplicationCommandHandler" synapse_stream_name: str - synapse_outbound_redis_connection: txredisapi.RedisProtocol + synapse_outbound_redis_connection: txredisapi.ConnectionHandler def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) @@ -313,7 +313,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): protocol = RedisSubscriber def __init__( - self, hs: "HomeServer", outbound_redis_connection: txredisapi.RedisProtocol + self, hs: "HomeServer", outbound_redis_connection: txredisapi.ConnectionHandler ): super().__init__( @@ -325,7 +325,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): password=hs.config.redis.redis_password, ) - self.synapse_handler = hs.get_tcp_replication() + self.synapse_handler = hs.get_replication_command_handler() self.synapse_stream_name = hs.hostname self.synapse_outbound_redis_connection = outbound_redis_connection @@ -353,7 +353,7 @@ def lazyConnection( reconnect: bool = True, password: Optional[str] = None, replyTimeout: int = 30, -) -> txredisapi.RedisProtocol: +) -> txredisapi.ConnectionHandler: """Creates a connection to Redis that is lazily set up and reconnects if the connections is lost. """ diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 494e42a2be..c6870df8f9 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py
@@ -44,7 +44,7 @@ class ReplicationStreamProtocolFactory(ServerFactory): """Factory for new replication connections.""" def __init__(self, hs: "HomeServer"): - self.command_handler = hs.get_tcp_replication() + self.command_handler = hs.get_replication_command_handler() self.clock = hs.get_clock() self.server_name = hs.config.server.server_name @@ -67,8 +67,8 @@ class ReplicationStreamProtocolFactory(ServerFactory): class ReplicationStreamer: """Handles replication connections. - This needs to be poked when new replication data may be available. When new - data is available it will propagate to all connected clients. + This needs to be poked when new replication data may be available. + When new data is available it will propagate to all Redis subscribers. """ def __init__(self, hs: "HomeServer"): @@ -85,7 +85,7 @@ class ReplicationStreamer: self.is_looping = False self.pending_updates = False - self.command_handler = hs.get_tcp_replication() + self.command_handler = hs.get_replication_command_handler() # Set of streams to replicate. self.streams = self.command_handler.get_streams_to_replicate() @@ -109,7 +109,7 @@ class ReplicationStreamer: def on_notifier_poke(self) -> None: """Checks if there is actually any new data and sends it to the - connections if there are. + Redis subscribers if there are. This should get called each time new data is available, even if it is currently being executed, so that nothing gets missed diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 23d631a769..495f2f0285 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py
@@ -316,7 +316,19 @@ class PresenceFederationStream(Stream): class TypingStream(Stream): @attr.s(slots=True, frozen=True, auto_attribs=True) class TypingStreamRow: + """ + An entry in the typing stream. + Describes all the users that are 'typing' right now in one room. + + When a user stops typing, it will be streamed as a new update with that + user absent; you can think of the `user_ids` list as overwriting the + entire list that was there previously. + """ + + # The room that this update is for. room_id: str + + # All the users that are 'typing' right now in the specified room. user_ids: List[str] NAME = "typing"