summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2020-10-20 17:52:25 +0100
committerAndrew Morgan <andrew@amorgan.xyz>2020-10-20 17:52:25 +0100
commit581445c6290a4482c920f9d8a0113867acf1bd6f (patch)
tree20d09f7cafb9cb71f52897512590818d6ff85500 /synapse/handlers/federation.py
parentMerge commit 'be16ee59a' into anoa/dinsic_release_1_21_x (diff)
parentRevert "Add experimental support for sharding event persister. (#8170)" (#8242) (diff)
downloadsynapse-581445c6290a4482c920f9d8a0113867acf1bd6f.tar.xz
Merge commit '9f8abdcc3' into anoa/dinsic_release_1_21_x
* commit '9f8abdcc3':
  Revert "Add experimental support for sharding event persister. (#8170)" (#8242)
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py44
1 files changed, 14 insertions, 30 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py

index 0472322b0b..72ba98bbca 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -926,8 +926,7 @@ class FederationHandler(BaseHandler): ) ) - if ev_infos: - await self._handle_new_events(dest, room_id, ev_infos, backfilled=True) + await self._handle_new_events(dest, ev_infos, backfilled=True) # Step 2: Persist the rest of the events in the chunk one by one events.sort(key=lambda e: e.depth) @@ -1220,7 +1219,7 @@ class FederationHandler(BaseHandler): event_infos.append(_NewEventInfo(event, None, auth)) await self._handle_new_events( - destination, room_id, event_infos, + destination, event_infos, ) def _sanity_check_event(self, ev): @@ -1367,15 +1366,15 @@ class FederationHandler(BaseHandler): ) max_stream_id = await self._persist_auth_tree( - origin, room_id, auth_chain, state, event, room_version_obj + origin, auth_chain, state, event, room_version_obj ) # We wait here until this instance has seen the events come down # replication (if we're using replication) as the below uses caches. + # + # TODO: Currently the events stream is written to from master await self._replication.wait_for_stream_position( - self.config.worker.events_shard_config.get_instance(room_id), - "events", - max_stream_id, + self.config.worker.writers.events, "events", max_stream_id ) # Check whether this room is the result of an upgrade of a room we already know @@ -1636,7 +1635,7 @@ class FederationHandler(BaseHandler): ) context = await self.state_handler.compute_event_context(event) - await self.persist_events_and_notify(event.room_id, [(event, context)]) + await self.persist_events_and_notify([(event, context)]) return event @@ -1663,9 +1662,7 @@ class FederationHandler(BaseHandler): await self.federation_client.send_leave(host_list, event) context = await self.state_handler.compute_event_context(event) - stream_id = await self.persist_events_and_notify( - event.room_id, [(event, context)] - ) + stream_id = await self.persist_events_and_notify([(event, context)]) return event, stream_id @@ -1913,7 +1910,7 @@ class FederationHandler(BaseHandler): ) await self.persist_events_and_notify( - event.room_id, [(event, context)], backfilled=backfilled + [(event, context)], backfilled=backfilled ) except Exception: run_in_background( @@ -1926,7 +1923,6 @@ class FederationHandler(BaseHandler): async def _handle_new_events( self, origin: str, - room_id: str, event_infos: Iterable[_NewEventInfo], backfilled: bool = False, ) -> None: @@ -1958,7 +1954,6 @@ class FederationHandler(BaseHandler): ) await self.persist_events_and_notify( - room_id, [ (ev_info.event, context) for ev_info, context in zip(event_infos, contexts) @@ -1969,7 +1964,6 @@ class FederationHandler(BaseHandler): async def _persist_auth_tree( self, origin: str, - room_id: str, auth_events: List[EventBase], state: List[EventBase], event: EventBase, @@ -1984,7 +1978,6 @@ class FederationHandler(BaseHandler): Args: origin: Where the events came from - room_id, auth_events state event @@ -2059,20 +2052,17 @@ class FederationHandler(BaseHandler): events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR await self.persist_events_and_notify( - room_id, [ (e, events_to_context[e.event_id]) for e in itertools.chain(auth_events, state) - ], + ] ) new_event_context = await self.state_handler.compute_event_context( event, old_state=state ) - return await self.persist_events_and_notify( - room_id, [(event, new_event_context)] - ) + return await self.persist_events_and_notify([(event, new_event_context)]) async def _prep_event( self, @@ -2923,7 +2913,6 @@ class FederationHandler(BaseHandler): async def persist_events_and_notify( self, - room_id: str, event_and_contexts: Sequence[Tuple[EventBase, EventContext]], backfilled: bool = False, ) -> int: @@ -2931,19 +2920,14 @@ class FederationHandler(BaseHandler): necessary. Args: - room_id: The room ID of events being persisted. - event_and_contexts: Sequence of events with their associated - context that should be persisted. All events must belong to - the same room. + event_and_contexts: backfilled: Whether these events are a result of backfilling or not """ - instance = self.config.worker.events_shard_config.get_instance(room_id) - if instance != self._instance_name: + if self.config.worker.writers.events != self._instance_name: result = await self._send_events( - instance_name=instance, + instance_name=self.config.worker.writers.events, store=self.store, - room_id=room_id, event_and_contexts=event_and_contexts, backfilled=backfilled, )