From 62b1ce85398f52e7d6137e77083294d0c90af459 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Sun, 5 Jul 2020 16:32:02 +0100 Subject: isort 5 compatibility (#7786) The CI appears to use the latest version of isort, which is a problem when isort gets a major version bump. Rather than try to pin the version, I've done the necessary to make isort5 happy with synapse. --- synapse/replication/tcp/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/replication/tcp/client.py') diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index df29732f51..4985e40b1f 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -33,8 +33,8 @@ from synapse.util.async_helpers import timeout_deferred from synapse.util.metrics import Measure if TYPE_CHECKING: - from synapse.server import HomeServer from synapse.replication.tcp.handler import ReplicationCommandHandler + from synapse.server import HomeServer logger = logging.getLogger(__name__) -- cgit 1.5.1 From 84d099ae1192af0f38d26f9a32e38bd4c0ad304e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jul 2020 14:10:53 +0100 Subject: Fix typing replication not being handled on master (#7959) Handling of incoming typing stream updates from replication was not hooked up on master, effecting set ups where typing was handled on a different worker. This is really only a problem if the master process is also handling sync requests, which is unlikely for those that are at the stage of moving typing off. The other observable effect is that if a worker restarts or a replication connect drops then the typing worker will issue a `POSITION typing`, triggering master process to try and stream *all* typing updates from position 0. Fixes #7907 --- changelog.d/7959.bugfix | 1 + synapse/app/generic_worker.py | 7 ------- synapse/replication/tcp/client.py | 8 ++++++++ synapse/server.pyi | 3 +++ 4 files changed, 12 insertions(+), 7 deletions(-) create mode 100644 changelog.d/7959.bugfix (limited to 'synapse/replication/tcp/client.py') diff --git a/changelog.d/7959.bugfix b/changelog.d/7959.bugfix new file mode 100644 index 0000000000..1982049a52 --- /dev/null +++ b/changelog.d/7959.bugfix @@ -0,0 +1 @@ +Add experimental support for moving typing off master. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index c1b76d827b..ec0dbddb8c 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -87,7 +87,6 @@ from synapse.replication.tcp.streams import ( ReceiptsStream, TagAccountDataStream, ToDeviceStream, - TypingStream, ) from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.client.v1 import events @@ -644,7 +643,6 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): super(GenericWorkerReplicationHandler, self).__init__(hs) self.store = hs.get_datastore() - self.typing_handler = hs.get_typing_handler() self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence self.notifier = hs.get_notifier() @@ -681,11 +679,6 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): await self.pusher_pool.on_new_receipts( token, token, {row.room_id for row in rows} ) - elif stream_name == TypingStream.NAME: - self.typing_handler.process_replication_rows(token, rows) - self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows] - ) elif stream_name == ToDeviceStream.NAME: entities = [row.entity for row in rows if row.entity.startswith("@")] if entities: diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 4985e40b1f..fcf8ebf1e7 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -24,6 +24,7 @@ from twisted.internet.protocol import ReconnectingClientFactory from synapse.api.constants import EventTypes from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol +from synapse.replication.tcp.streams import TypingStream from synapse.replication.tcp.streams.events import ( EventsStream, EventsStreamEventRow, @@ -104,6 +105,7 @@ class ReplicationDataHandler: self._clock = hs.get_clock() self._streams = hs.get_replication_streams() self._instance_name = hs.get_instance_name() + self._typing_handler = hs.get_typing_handler() # Map from stream to list of deferreds waiting for the stream to # arrive at a particular position. The lists are sorted by stream position. @@ -127,6 +129,12 @@ class ReplicationDataHandler: """ self.store.process_replication_rows(stream_name, instance_name, token, rows) + if stream_name == TypingStream.NAME: + self._typing_handler.process_replication_rows(token, rows) + self.notifier.on_new_event( + "typing_key", token, rooms=[row.room_id for row in rows] + ) + if stream_name == EventsStream.NAME: # We shouldn't get multiple rows per token for events stream, so # we don't need to optimise this for multiple rows. diff --git a/synapse/server.pyi b/synapse/server.pyi index 90a673778f..1aba408c21 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -31,6 +31,7 @@ import synapse.server_notices.server_notices_sender import synapse.state import synapse.storage from synapse.events.builder import EventBuilderFactory +from synapse.handlers.typing import FollowerTypingHandler from synapse.replication.tcp.streams import Stream class HomeServer(object): @@ -150,3 +151,5 @@ class HomeServer(object): pass def should_send_federation(self) -> bool: pass + def get_typing_handler(self) -> FollowerTypingHandler: + pass -- cgit 1.5.1 From 3b4556cf87666bb6f40d89c8c7fff42d336237b6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Aug 2020 17:12:45 +0100 Subject: Fix `wait_for_stream_position` for multiple waiters. (#8196) This fixes a bug where having multiple callers waiting on the same stream and position will cause it to try and compare two deferreds, which fails (due to the sorted list having an entry of `Tuple[int, Deferred]`). --- changelog.d/8196.misc | 1 + synapse/python_dependencies.py | 4 +++- synapse/replication/tcp/client.py | 6 ++---- 3 files changed, 6 insertions(+), 5 deletions(-) create mode 100644 changelog.d/8196.misc (limited to 'synapse/replication/tcp/client.py') diff --git a/changelog.d/8196.misc b/changelog.d/8196.misc new file mode 100644 index 0000000000..c42baf0e56 --- /dev/null +++ b/changelog.d/8196.misc @@ -0,0 +1 @@ +Fix `wait_for_stream_position` to allow multiple waiters on same stream ID. diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index dd77a44b8d..2d995ec456 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -66,7 +66,9 @@ REQUIREMENTS = [ "msgpack>=0.5.2", "phonenumbers>=8.2.0", "prometheus_client>=0.0.18,<0.9.0", - # we use attr.validators.deep_iterable, which arrived in 19.1.0 + # we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note: + # Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33 + # is out in November.) "attrs>=19.1.0", "netaddr>=0.7.18", "Jinja2>=2.9", diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index fcf8ebf1e7..d6ecf5b327 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -14,7 +14,6 @@ # limitations under the License. """A replication client for use by synapse workers. """ -import heapq import logging from typing import TYPE_CHECKING, Dict, List, Tuple @@ -219,9 +218,8 @@ class ReplicationDataHandler: waiting_list = self._streams_to_waiters.setdefault(stream_name, []) - # We insert into the list using heapq as it is more efficient than - # pushing then resorting each time. - heapq.heappush(waiting_list, (position, deferred)) + waiting_list.append((position, deferred)) + waiting_list.sort(key=lambda t: t[0]) # We measure here to get in flight counts and average waiting time. with Measure(self._clock, "repl.wait_for_stream_position"): -- cgit 1.5.1