From 423cca9efe06d78aaca5f62fb74ee7e5bceebe49 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 4 Mar 2022 11:48:15 +0000 Subject: Spread out sending device lists to remote hosts (#12132) --- synapse/replication/tcp/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/replication') diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 1b8479b0b4..b8fc1d4db9 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -380,7 +380,7 @@ class FederationSenderHandler: # changes. hosts = {row.entity for row in rows if not row.entity.startswith("@")} for host in hosts: - self.federation_sender.send_device_messages(host) + self.federation_sender.send_device_messages(host, immediate=False) elif stream_name == ToDeviceStream.NAME: # The to_device stream includes stuff to be pushed to both local -- cgit 1.5.1 From d8bab6793c75774db4bde8aeec6897b607e08799 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 8 Mar 2022 07:26:05 -0500 Subject: Fix incorrect type hints for txredis. (#12042) Some properties were marked as RedisProtocol instead of ConnectionHandler, which wraps RedisProtocol instance(s). --- changelog.d/12042.misc | 1 + stubs/txredisapi.pyi | 9 ++++++--- synapse/replication/tcp/external_cache.py | 4 ++-- synapse/replication/tcp/redis.py | 6 +++--- synapse/server.py | 4 ++-- 5 files changed, 14 insertions(+), 10 deletions(-) create mode 100644 changelog.d/12042.misc (limited to 'synapse/replication') diff --git a/changelog.d/12042.misc b/changelog.d/12042.misc new file mode 100644 index 0000000000..6ecdc96021 --- /dev/null +++ b/changelog.d/12042.misc @@ -0,0 +1 @@ +Correct type hints for txredis. diff --git a/stubs/txredisapi.pyi b/stubs/txredisapi.pyi index 429234d7ae..2d8ca018fb 100644 --- a/stubs/txredisapi.pyi +++ b/stubs/txredisapi.pyi @@ -20,7 +20,7 @@ from twisted.internet import protocol from twisted.internet.defer import Deferred class RedisProtocol(protocol.Protocol): - def publish(self, channel: str, message: bytes): ... + def publish(self, channel: str, message: bytes) -> "Deferred[None]": ... def ping(self) -> "Deferred[None]": ... def set( self, @@ -52,11 +52,14 @@ def lazyConnection( convertNumbers: bool = ..., ) -> RedisProtocol: ... -class ConnectionHandler: ... +# ConnectionHandler doesn't actually inherit from RedisProtocol, but it proxies +# most methods to it via ConnectionHandler.__getattr__. +class ConnectionHandler(RedisProtocol): + def disconnect(self) -> "Deferred[None]": ... class RedisFactory(protocol.ReconnectingClientFactory): continueTrying: bool - handler: RedisProtocol + handler: ConnectionHandler pool: List[RedisProtocol] replyTimeout: Optional[int] def __init__( diff --git a/synapse/replication/tcp/external_cache.py b/synapse/replication/tcp/external_cache.py index aaf91e5e02..bf7d017968 100644 --- a/synapse/replication/tcp/external_cache.py +++ b/synapse/replication/tcp/external_cache.py @@ -21,7 +21,7 @@ from synapse.logging.context import make_deferred_yieldable from synapse.util import json_decoder, json_encoder if TYPE_CHECKING: - from txredisapi import RedisProtocol + from txredisapi import ConnectionHandler from synapse.server import HomeServer @@ -63,7 +63,7 @@ class ExternalCache: def __init__(self, hs: "HomeServer"): if hs.config.redis.redis_enabled: self._redis_connection: Optional[ - "RedisProtocol" + "ConnectionHandler" ] = hs.get_outbound_redis_connection() else: self._redis_connection = None diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 3170f7c59b..b84e572da1 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -93,7 +93,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): synapse_handler: "ReplicationCommandHandler" synapse_stream_name: str - synapse_outbound_redis_connection: txredisapi.RedisProtocol + synapse_outbound_redis_connection: txredisapi.ConnectionHandler def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) @@ -313,7 +313,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): protocol = RedisSubscriber def __init__( - self, hs: "HomeServer", outbound_redis_connection: txredisapi.RedisProtocol + self, hs: "HomeServer", outbound_redis_connection: txredisapi.ConnectionHandler ): super().__init__( @@ -353,7 +353,7 @@ def lazyConnection( reconnect: bool = True, password: Optional[str] = None, replyTimeout: int = 30, -) -> txredisapi.RedisProtocol: +) -> txredisapi.ConnectionHandler: """Creates a connection to Redis that is lazily set up and reconnects if the connections is lost. """ diff --git a/synapse/server.py b/synapse/server.py index b5e2a319bc..46a64418ea 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -145,7 +145,7 @@ from synapse.util.stringutils import random_string logger = logging.getLogger(__name__) if TYPE_CHECKING: - from txredisapi import RedisProtocol + from txredisapi import ConnectionHandler from synapse.handlers.oidc import OidcHandler from synapse.handlers.saml import SamlHandler @@ -807,7 +807,7 @@ class HomeServer(metaclass=abc.ABCMeta): return AccountHandler(self) @cache_in_self - def get_outbound_redis_connection(self) -> "RedisProtocol": + def get_outbound_redis_connection(self) -> "ConnectionHandler": """ The Redis connection used for replication. -- cgit 1.5.1 From 180d8ff0d4d706344fa984abbd9ed6fa02ca13dc Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Wed, 9 Mar 2022 14:53:28 +0000 Subject: Retry some http replication failures (#12182) This allows for the target process to be down for around a minute which provides time for restarts during synapse upgrades/config updates. Closes: #12178 Signed off by Nick Mills-Barrett nick@beeper.com --- changelog.d/12182.misc | 1 + synapse/replication/http/_base.py | 47 ++++++++++++++++++++++++++++++--------- 2 files changed, 37 insertions(+), 11 deletions(-) create mode 100644 changelog.d/12182.misc (limited to 'synapse/replication') diff --git a/changelog.d/12182.misc b/changelog.d/12182.misc new file mode 100644 index 0000000000..7e9ad2c752 --- /dev/null +++ b/changelog.d/12182.misc @@ -0,0 +1 @@ +Retry HTTP replication failures, this should prevent 502's when restarting stateful workers (main, event persisters, stream writers). Contributed by Nick @ Beeper. diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 2e697c74a6..f1abb98653 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -21,6 +21,7 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Tuple from prometheus_client import Counter, Gauge +from twisted.internet.error import ConnectError, DNSLookupError from twisted.web.server import Request from synapse.api.errors import HttpResponseException, SynapseError @@ -87,6 +88,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): `_handle_request` must return a Deferred. RETRY_ON_TIMEOUT(bool): Whether or not to retry the request when a 504 is received. + RETRY_ON_CONNECT_ERROR (bool): Whether or not to retry the request when + a connection error is received. + RETRY_ON_CONNECT_ERROR_ATTEMPTS (int): Number of attempts to retry when + receiving connection errors, each will backoff exponentially longer. """ NAME: str = abc.abstractproperty() # type: ignore @@ -94,6 +99,8 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): METHOD = "POST" CACHE = True RETRY_ON_TIMEOUT = True + RETRY_ON_CONNECT_ERROR = True + RETRY_ON_CONNECT_ERROR_ATTEMPTS = 5 # =63s (2^6-1) def __init__(self, hs: "HomeServer"): if self.CACHE: @@ -236,18 +243,20 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): "/".join(url_args), ) + headers: Dict[bytes, List[bytes]] = {} + # Add an authorization header, if configured. + if replication_secret: + headers[b"Authorization"] = [b"Bearer " + replication_secret] + opentracing.inject_header_dict(headers, check_destination=False) + try: + # Keep track of attempts made so we can bail if we don't manage to + # connect to the target after N tries. + attempts = 0 # We keep retrying the same request for timeouts. This is so that we # have a good idea that the request has either succeeded or failed # on the master, and so whether we should clean up or not. while True: - headers: Dict[bytes, List[bytes]] = {} - # Add an authorization header, if configured. - if replication_secret: - headers[b"Authorization"] = [ - b"Bearer " + replication_secret - ] - opentracing.inject_header_dict(headers, check_destination=False) try: result = await request_func(uri, data, headers=headers) break @@ -255,11 +264,27 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): if not cls.RETRY_ON_TIMEOUT: raise - logger.warning("%s request timed out; retrying", cls.NAME) + logger.warning("%s request timed out; retrying", cls.NAME) + + # If we timed out we probably don't need to worry about backing + # off too much, but lets just wait a little anyway. + await clock.sleep(1) + except (ConnectError, DNSLookupError) as e: + if not cls.RETRY_ON_CONNECT_ERROR: + raise + if attempts > cls.RETRY_ON_CONNECT_ERROR_ATTEMPTS: + raise + + delay = 2 ** attempts + logger.warning( + "%s request connection failed; retrying in %ds: %r", + cls.NAME, + delay, + e, + ) - # If we timed out we probably don't need to worry about backing - # off too much, but lets just wait a little anyway. - await clock.sleep(1) + await clock.sleep(delay) + attempts += 1 except HttpResponseException as e: # We convert to SynapseError as we know that it was a SynapseError # on the main process that we should send to the client. (And -- cgit 1.5.1 From 3e4af36bc8515504721b3c1b1d64d4f45359bf88 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 10 Mar 2022 08:01:56 -0500 Subject: Rename get_tcp_replication to get_replication_command_handler. (#12192) Since the object it returns is a ReplicationCommandHandler. This is clean-up from adding support to Redis where the command handler was added as an additional layer of abstraction from the TCP protocol. --- changelog.d/12192.misc | 1 + synapse/app/generic_worker.py | 2 +- synapse/app/homeserver.py | 2 +- synapse/federation/transport/server/_base.py | 2 +- synapse/handlers/presence.py | 4 ++-- synapse/replication/slave/storage/client_ips.py | 2 +- synapse/replication/tcp/client.py | 4 +++- synapse/replication/tcp/handler.py | 4 +--- synapse/replication/tcp/redis.py | 2 +- synapse/replication/tcp/resource.py | 4 ++-- synapse/server.py | 2 +- tests/replication/_base.py | 4 ++-- tests/replication/tcp/streams/test_events.py | 2 +- tests/replication/tcp/streams/test_typing.py | 2 +- tests/replication/test_federation_ack.py | 2 +- 15 files changed, 20 insertions(+), 19 deletions(-) create mode 100644 changelog.d/12192.misc (limited to 'synapse/replication') diff --git a/changelog.d/12192.misc b/changelog.d/12192.misc new file mode 100644 index 0000000000..bdfe8dad98 --- /dev/null +++ b/changelog.d/12192.misc @@ -0,0 +1 @@ +Rename `HomeServer.get_tcp_replication` to `get_replication_command_handler`. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 1536a42723..a10a63b06c 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -417,7 +417,7 @@ class GenericWorkerServer(HomeServer): else: logger.warning("Unsupported listener type: %s", listener.type) - self.get_tcp_replication().start_replication(self) + self.get_replication_command_handler().start_replication(self) def start(config_options: List[str]) -> None: diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index a6789a840e..e4dc04c0b4 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -273,7 +273,7 @@ class SynapseHomeServer(HomeServer): # If redis is enabled we connect via the replication command handler # in the same way as the workers (since we're effectively a client # rather than a server). - self.get_tcp_replication().start_replication(self) + self.get_replication_command_handler().start_replication(self) for listener in self.config.server.listeners: if listener.type == "http": diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py index 87e99c7ddf..2529dee613 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py @@ -63,7 +63,7 @@ class Authenticator: self.replication_client = None if hs.config.worker.worker_app: - self.replication_client = hs.get_tcp_replication() + self.replication_client = hs.get_replication_command_handler() # A method just so we can pass 'self' as the authenticator to the Servlets async def authenticate_request( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index c155098bee..9927a30e6e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -424,13 +424,13 @@ class WorkerPresenceHandler(BasePresenceHandler): async def _on_shutdown(self) -> None: if self._presence_enabled: - self.hs.get_tcp_replication().send_command( + self.hs.get_replication_command_handler().send_command( ClearUserSyncsCommand(self.instance_id) ) def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None: if self._presence_enabled: - self.hs.get_tcp_replication().send_user_sync( + self.hs.get_replication_command_handler().send_user_sync( self.instance_id, user_id, is_syncing, last_sync_ms ) diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py index b5b84c09ae..14706a0817 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py @@ -54,6 +54,6 @@ class SlavedClientIpStore(BaseSlavedStore): self.client_ip_last_seen.set(key, now) - self.hs.get_tcp_replication().send_user_ip( + self.hs.get_replication_command_handler().send_user_ip( user_id, access_token, ip, user_agent, device_id, now ) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index b8fc1d4db9..deeaaec4e6 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -462,6 +462,8 @@ class FederationSenderHandler: # We ACK this token over replication so that the master can drop # its in memory queues - self._hs.get_tcp_replication().send_federation_ack(current_position) + self._hs.get_replication_command_handler().send_federation_ack( + current_position + ) except Exception: logger.exception("Error updating federation stream position") diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 0d2013a3cf..d51f045f22 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -295,9 +295,7 @@ class ReplicationCommandHandler: raise Exception("Unrecognised command %s in stream queue", cmd.NAME) def start_replication(self, hs: "HomeServer") -> None: - """Helper method to start a replication connection to the remote server - using TCP. - """ + """Helper method to start replication.""" if hs.config.redis.redis_enabled: from synapse.replication.tcp.redis import ( RedisDirectTcpReplicationClientFactory, diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index b84e572da1..989c5be032 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -325,7 +325,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): password=hs.config.redis.redis_password, ) - self.synapse_handler = hs.get_tcp_replication() + self.synapse_handler = hs.get_replication_command_handler() self.synapse_stream_name = hs.hostname self.synapse_outbound_redis_connection = outbound_redis_connection diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 494e42a2be..ab829040cd 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -44,7 +44,7 @@ class ReplicationStreamProtocolFactory(ServerFactory): """Factory for new replication connections.""" def __init__(self, hs: "HomeServer"): - self.command_handler = hs.get_tcp_replication() + self.command_handler = hs.get_replication_command_handler() self.clock = hs.get_clock() self.server_name = hs.config.server.server_name @@ -85,7 +85,7 @@ class ReplicationStreamer: self.is_looping = False self.pending_updates = False - self.command_handler = hs.get_tcp_replication() + self.command_handler = hs.get_replication_command_handler() # Set of streams to replicate. self.streams = self.command_handler.get_streams_to_replicate() diff --git a/synapse/server.py b/synapse/server.py index 46a64418ea..1270abb5a3 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -639,7 +639,7 @@ class HomeServer(metaclass=abc.ABCMeta): return ReadMarkerHandler(self) @cache_in_self - def get_tcp_replication(self) -> ReplicationCommandHandler: + def get_replication_command_handler(self) -> ReplicationCommandHandler: return ReplicationCommandHandler(self) @cache_in_self diff --git a/tests/replication/_base.py b/tests/replication/_base.py index a7a05a564f..9c5df266bd 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -251,7 +251,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): self.connect_any_redis_attempts, ) - self.hs.get_tcp_replication().start_replication(self.hs) + self.hs.get_replication_command_handler().start_replication(self.hs) # When we see a connection attempt to the master replication listener we # automatically set up the connection. This is so that tests don't @@ -375,7 +375,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): ) if worker_hs.config.redis.redis_enabled: - worker_hs.get_tcp_replication().start_replication(worker_hs) + worker_hs.get_replication_command_handler().start_replication(worker_hs) return worker_hs diff --git a/tests/replication/tcp/streams/test_events.py b/tests/replication/tcp/streams/test_events.py index f9d5da723c..641a94133b 100644 --- a/tests/replication/tcp/streams/test_events.py +++ b/tests/replication/tcp/streams/test_events.py @@ -420,7 +420,7 @@ class EventsStreamTestCase(BaseStreamTestCase): # Manually send an old RDATA command, which should get dropped. This # re-uses the row from above, but with an earlier stream token. - self.hs.get_tcp_replication().send_command( + self.hs.get_replication_command_handler().send_command( RdataCommand("events", "master", 1, row) ) diff --git a/tests/replication/tcp/streams/test_typing.py b/tests/replication/tcp/streams/test_typing.py index 3ff5afc6e5..9a229dd23f 100644 --- a/tests/replication/tcp/streams/test_typing.py +++ b/tests/replication/tcp/streams/test_typing.py @@ -118,7 +118,7 @@ class TypingStreamTestCase(BaseStreamTestCase): # Reset the typing handler self.hs.get_replication_streams()["typing"].last_token = 0 - self.hs.get_tcp_replication()._streams["typing"].last_token = 0 + self.hs.get_replication_command_handler()._streams["typing"].last_token = 0 typing._latest_room_serial = 0 typing._typing_stream_change_cache = StreamChangeCache( "TypingStreamChangeCache", typing._latest_room_serial diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py index 1b6a4bf4b0..26b8bd512a 100644 --- a/tests/replication/test_federation_ack.py +++ b/tests/replication/test_federation_ack.py @@ -48,7 +48,7 @@ class FederationAckTestCase(HomeserverTestCase): transport, rather than assuming that the implementation has a ReplicationCommandHandler. """ - rch = self.hs.get_tcp_replication() + rch = self.hs.get_replication_command_handler() # wire up the ReplicationCommandHandler to a mock connection, which needs # to implement IReplicationConnection. (Note that Mock doesn't understand -- cgit 1.5.1 From 4a53f357379c2dc407617a3d39e6da4790dec9aa Mon Sep 17 00:00:00 2001 From: reivilibre Date: Fri, 11 Mar 2022 14:00:15 +0000 Subject: Improve code documentation for the typing stream over replication. (#12211) --- changelog.d/12211.misc | 1 + synapse/handlers/typing.py | 5 +++-- synapse/replication/tcp/handler.py | 2 +- synapse/replication/tcp/resource.py | 6 +++--- synapse/replication/tcp/streams/_base.py | 12 ++++++++++++ 5 files changed, 20 insertions(+), 6 deletions(-) create mode 100644 changelog.d/12211.misc (limited to 'synapse/replication') diff --git a/changelog.d/12211.misc b/changelog.d/12211.misc new file mode 100644 index 0000000000..d11634a1ee --- /dev/null +++ b/changelog.d/12211.misc @@ -0,0 +1 @@ +Improve code documentation for the typing stream over replication. \ No newline at end of file diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 3b89126528..6854428b7c 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -160,8 +160,9 @@ class FollowerTypingHandler: """Should be called whenever we receive updates for typing stream.""" if self._latest_room_serial > token: - # The master has gone backwards. To prevent inconsistent data, just - # clear everything. + # The typing worker has gone backwards (e.g. it may have restarted). + # To prevent inconsistent data, just clear everything. + logger.info("Typing handler stream went backwards; resetting") self._reset() # Set the latest serial token to whatever the server gave us. diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index d51f045f22..b217c35f99 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -709,7 +709,7 @@ class ReplicationCommandHandler: self.send_command(RemoteServerUpCommand(server)) def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> None: - """Called when a new update is available to stream to clients. + """Called when a new update is available to stream to Redis subscribers. We need to check if the client is interested in the stream or not """ diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index ab829040cd..c6870df8f9 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -67,8 +67,8 @@ class ReplicationStreamProtocolFactory(ServerFactory): class ReplicationStreamer: """Handles replication connections. - This needs to be poked when new replication data may be available. When new - data is available it will propagate to all connected clients. + This needs to be poked when new replication data may be available. + When new data is available it will propagate to all Redis subscribers. """ def __init__(self, hs: "HomeServer"): @@ -109,7 +109,7 @@ class ReplicationStreamer: def on_notifier_poke(self) -> None: """Checks if there is actually any new data and sends it to the - connections if there are. + Redis subscribers if there are. This should get called each time new data is available, even if it is currently being executed, so that nothing gets missed diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 23d631a769..495f2f0285 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -316,7 +316,19 @@ class PresenceFederationStream(Stream): class TypingStream(Stream): @attr.s(slots=True, frozen=True, auto_attribs=True) class TypingStreamRow: + """ + An entry in the typing stream. + Describes all the users that are 'typing' right now in one room. + + When a user stops typing, it will be streamed as a new update with that + user absent; you can think of the `user_ids` list as overwriting the + entire list that was there previously. + """ + + # The room that this update is for. room_id: str + + # All the users that are 'typing' right now in the specified room. user_ids: List[str] NAME = "typing" -- cgit 1.5.1