diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0472322b0b..72ba98bbca 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -926,8 +926,7 @@ class FederationHandler(BaseHandler):
)
)
- if ev_infos:
- await self._handle_new_events(dest, room_id, ev_infos, backfilled=True)
+ await self._handle_new_events(dest, ev_infos, backfilled=True)
# Step 2: Persist the rest of the events in the chunk one by one
events.sort(key=lambda e: e.depth)
@@ -1220,7 +1219,7 @@ class FederationHandler(BaseHandler):
event_infos.append(_NewEventInfo(event, None, auth))
await self._handle_new_events(
- destination, room_id, event_infos,
+ destination, event_infos,
)
def _sanity_check_event(self, ev):
@@ -1367,15 +1366,15 @@ class FederationHandler(BaseHandler):
)
max_stream_id = await self._persist_auth_tree(
- origin, room_id, auth_chain, state, event, room_version_obj
+ origin, auth_chain, state, event, room_version_obj
)
# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
+ #
+ # TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
- self.config.worker.events_shard_config.get_instance(room_id),
- "events",
- max_stream_id,
+ self.config.worker.writers.events, "events", max_stream_id
)
# Check whether this room is the result of an upgrade of a room we already know
@@ -1636,7 +1635,7 @@ class FederationHandler(BaseHandler):
)
context = await self.state_handler.compute_event_context(event)
- await self.persist_events_and_notify(event.room_id, [(event, context)])
+ await self.persist_events_and_notify([(event, context)])
return event
@@ -1663,9 +1662,7 @@ class FederationHandler(BaseHandler):
await self.federation_client.send_leave(host_list, event)
context = await self.state_handler.compute_event_context(event)
- stream_id = await self.persist_events_and_notify(
- event.room_id, [(event, context)]
- )
+ stream_id = await self.persist_events_and_notify([(event, context)])
return event, stream_id
@@ -1913,7 +1910,7 @@ class FederationHandler(BaseHandler):
)
await self.persist_events_and_notify(
- event.room_id, [(event, context)], backfilled=backfilled
+ [(event, context)], backfilled=backfilled
)
except Exception:
run_in_background(
@@ -1926,7 +1923,6 @@ class FederationHandler(BaseHandler):
async def _handle_new_events(
self,
origin: str,
- room_id: str,
event_infos: Iterable[_NewEventInfo],
backfilled: bool = False,
) -> None:
@@ -1958,7 +1954,6 @@ class FederationHandler(BaseHandler):
)
await self.persist_events_and_notify(
- room_id,
[
(ev_info.event, context)
for ev_info, context in zip(event_infos, contexts)
@@ -1969,7 +1964,6 @@ class FederationHandler(BaseHandler):
async def _persist_auth_tree(
self,
origin: str,
- room_id: str,
auth_events: List[EventBase],
state: List[EventBase],
event: EventBase,
@@ -1984,7 +1978,6 @@ class FederationHandler(BaseHandler):
Args:
origin: Where the events came from
- room_id,
auth_events
state
event
@@ -2059,20 +2052,17 @@ class FederationHandler(BaseHandler):
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
await self.persist_events_and_notify(
- room_id,
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
- ],
+ ]
)
new_event_context = await self.state_handler.compute_event_context(
event, old_state=state
)
- return await self.persist_events_and_notify(
- room_id, [(event, new_event_context)]
- )
+ return await self.persist_events_and_notify([(event, new_event_context)])
async def _prep_event(
self,
@@ -2923,7 +2913,6 @@ class FederationHandler(BaseHandler):
async def persist_events_and_notify(
self,
- room_id: str,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> int:
@@ -2931,19 +2920,14 @@ class FederationHandler(BaseHandler):
necessary.
Args:
- room_id: The room ID of events being persisted.
- event_and_contexts: Sequence of events with their associated
- context that should be persisted. All events must belong to
- the same room.
+ event_and_contexts:
backfilled: Whether these events are a result of
backfilling or not
"""
- instance = self.config.worker.events_shard_config.get_instance(room_id)
- if instance != self._instance_name:
+ if self.config.worker.writers.events != self._instance_name:
result = await self._send_events(
- instance_name=instance,
+ instance_name=self.config.worker.writers.events,
store=self.store,
- room_id=room_id,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
)
|