summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2020-09-04 10:19:42 +0100
committerGitHub <noreply@github.com>2020-09-04 10:19:42 +0100
commit9f8abdcc3828e942f09cc62e29dff93dbaa01ec7 (patch)
tree5811a075386a883af3e9da43c6123beb812d70c4 /synapse/replication
parentAdd type hints to more handlers (#8244) (diff)
downloadsynapse-9f8abdcc3828e942f09cc62e29dff93dbaa01ec7.tar.xz
Revert "Add experimental support for sharding event persister. (#8170)" (#8242)
* Revert "Add experimental support for sharding event persister. (#8170)"

This reverts commit 82c1ee1c22a87b9e6e3179947014b0f11c0a1ac3.

* Changelog
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/federation.py12
-rw-r--r--synapse/replication/tcp/handler.py2
-rw-r--r--synapse/replication/tcp/streams/events.py4
3 files changed, 6 insertions, 12 deletions
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py

index 5c8be747e1..6b56315148 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py
@@ -65,11 +65,10 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): self.federation_handler = hs.get_handlers().federation_handler @staticmethod - async def _serialize_payload(store, room_id, event_and_contexts, backfilled): + async def _serialize_payload(store, event_and_contexts, backfilled): """ Args: store - room_id (str) event_and_contexts (list[tuple[FrozenEvent, EventContext]]) backfilled (bool): Whether or not the events are the result of backfilling @@ -89,11 +88,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): } ) - payload = { - "events": event_payloads, - "backfilled": backfilled, - "room_id": room_id, - } + payload = {"events": event_payloads, "backfilled": backfilled} return payload @@ -101,7 +96,6 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): with Measure(self.clock, "repl_fed_send_events_parse"): content = parse_json_object_from_request(request) - room_id = content["room_id"] backfilled = content["backfilled"] event_payloads = content["events"] @@ -126,7 +120,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): logger.info("Got %d events from federation", len(event_and_contexts)) max_stream_id = await self.federation_handler.persist_events_and_notify( - room_id, event_and_contexts, backfilled + event_and_contexts, backfilled ) return 200, {"max_stream_id": max_stream_id} diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index b323841f73..1c303f3a46 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -109,7 +109,7 @@ class ReplicationCommandHandler: if isinstance(stream, (EventsStream, BackfillStream)): # Only add EventStream and BackfillStream as a source on the # instance in charge of event persistence. - if hs.get_instance_name() in hs.config.worker.writers.events: + if hs.config.worker.writers.events == hs.get_instance_name(): self._streams_to_replicate.append(stream) continue diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 3705618b4f..16c63ff4ec 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py
@@ -19,7 +19,7 @@ from typing import List, Tuple, Type import attr -from ._base import Stream, StreamUpdateResult, Token +from ._base import Stream, StreamUpdateResult, Token, current_token_without_instance """Handling of the 'events' replication stream @@ -117,7 +117,7 @@ class EventsStream(Stream): self._store = hs.get_datastore() super().__init__( hs.get_instance_name(), - self._store._stream_id_gen.get_current_token_for_writer, + current_token_without_instance(self._store.get_current_events_token), self._update_function, )