diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 8a7b4916cd..ee271e85e5 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -376,9 +376,8 @@ class EventCreationHandler:
self.notifier = hs.get_notifier()
self.config = hs.config
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
- self._is_event_writer = (
- self.config.worker.writers.events == hs.get_instance_name()
- )
+ self._events_shard_config = self.config.worker.events_shard_config
+ self._instance_name = hs.get_instance_name()
self.room_invite_state_types = self.hs.config.room_invite_state_types
@@ -387,8 +386,6 @@ class EventCreationHandler:
# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)
- self.pusher_pool = hs.get_pusherpool()
-
# We arbitrarily limit concurrent event creation for a room to 5.
# This is to stop us from diverging history *too* much.
self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
@@ -904,9 +901,10 @@ class EventCreationHandler:
try:
# If we're a worker we need to hit out to the master.
- if not self._is_event_writer:
+ writer_instance = self._events_shard_config.get_instance(event.room_id)
+ if writer_instance != self._instance_name:
result = await self.send_event(
- instance_name=self.config.worker.writers.events,
+ instance_name=writer_instance,
event_id=event.event_id,
store=self.store,
requester=requester,
@@ -974,7 +972,10 @@ class EventCreationHandler:
This should only be run on the instance in charge of persisting events.
"""
- assert self._is_event_writer
+ assert self.storage.persistence is not None
+ assert self._events_shard_config.should_handle(
+ self._instance_name, event.room_id
+ )
if ratelimit:
# We check if this is a room admin redacting an event so that we
@@ -1137,7 +1138,7 @@ class EventCreationHandler:
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")
- event_stream_id, max_stream_id = await self.storage.persistence.persist_event(
+ event_pos, max_stream_token = await self.storage.persistence.persist_event(
event, context=context
)
@@ -1145,12 +1146,10 @@ class EventCreationHandler:
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)
- await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
-
def _notify():
try:
self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
+ event, event_pos, max_stream_token, extra_users=extra_users
)
except Exception:
logger.exception("Error notifying about new room event")
@@ -1162,7 +1161,7 @@ class EventCreationHandler:
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)
- return event_stream_id
+ return event_pos.stream
async def _bump_active_time(self, user: UserID) -> None:
try:
@@ -1183,54 +1182,7 @@ class EventCreationHandler:
)
for room_id in room_ids:
- # For each room we need to find a joined member we can use to send
- # the dummy event with.
-
- latest_event_ids = await self.store.get_prev_events_for_room(room_id)
-
- members = await self.state.get_current_users_in_room(
- room_id, latest_event_ids=latest_event_ids
- )
- dummy_event_sent = False
- for user_id in members:
- if not self.hs.is_mine_id(user_id):
- continue
- requester = create_requester(user_id)
- try:
- event, context = await self.create_event(
- requester,
- {
- "type": "org.matrix.dummy_event",
- "content": {},
- "room_id": room_id,
- "sender": user_id,
- },
- prev_event_ids=latest_event_ids,
- )
-
- event.internal_metadata.proactively_send = False
-
- # Since this is a dummy-event it is OK if it is sent by a
- # shadow-banned user.
- await self.send_nonmember_event(
- requester,
- event,
- context,
- ratelimit=False,
- ignore_shadow_ban=True,
- )
- dummy_event_sent = True
- break
- except ConsentNotGivenError:
- logger.info(
- "Failed to send dummy event into room %s for user %s due to "
- "lack of consent. Will try another user" % (room_id, user_id)
- )
- except AuthError:
- logger.info(
- "Failed to send dummy event into room %s for user %s due to "
- "lack of power. Will try another user" % (room_id, user_id)
- )
+ dummy_event_sent = await self._send_dummy_event_for_room(room_id)
if not dummy_event_sent:
# Did not find a valid user in the room, so remove from future attempts
@@ -1243,6 +1195,59 @@ class EventCreationHandler:
now = self.clock.time_msec()
self._rooms_to_exclude_from_dummy_event_insertion[room_id] = now
+ async def _send_dummy_event_for_room(self, room_id: str) -> bool:
+ """Attempt to send a dummy event for the given room.
+
+ Args:
+ room_id: room to try to send an event from
+
+ Returns:
+ True if a dummy event was successfully sent. False if no user was able
+ to send an event.
+ """
+
+ # For each room we need to find a joined member we can use to send
+ # the dummy event with.
+ latest_event_ids = await self.store.get_prev_events_for_room(room_id)
+ members = await self.state.get_current_users_in_room(
+ room_id, latest_event_ids=latest_event_ids
+ )
+ for user_id in members:
+ if not self.hs.is_mine_id(user_id):
+ continue
+ requester = create_requester(user_id)
+ try:
+ event, context = await self.create_event(
+ requester,
+ {
+ "type": "org.matrix.dummy_event",
+ "content": {},
+ "room_id": room_id,
+ "sender": user_id,
+ },
+ prev_event_ids=latest_event_ids,
+ )
+
+ event.internal_metadata.proactively_send = False
+
+ # Since this is a dummy-event it is OK if it is sent by a
+ # shadow-banned user.
+ await self.send_nonmember_event(
+ requester, event, context, ratelimit=False, ignore_shadow_ban=True,
+ )
+ return True
+ except ConsentNotGivenError:
+ logger.info(
+ "Failed to send dummy event into room %s for user %s due to "
+ "lack of consent. Will try another user" % (room_id, user_id)
+ )
+ except AuthError:
+ logger.info(
+ "Failed to send dummy event into room %s for user %s due to "
+ "lack of power. Will try another user" % (room_id, user_id)
+ )
+ return False
+
def _expire_rooms_to_exclude_from_dummy_event_insertion(self):
expire_before = self.clock.time_msec() - _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY
to_expire = set()
|