diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 014dab2940..9f773aefa7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -69,18 +69,18 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
ReplicationStoreRoomOnInviteRestServlet,
)
-from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
JsonDict,
MutableStateMap,
+ PersistedEventPosition,
+ RoomStreamToken,
StateMap,
UserID,
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer, concurrently_execute
-from synapse.util.distributor import user_joined_room
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server
@@ -88,7 +88,7 @@ from synapse.visibility import filter_events_for_server
logger = logging.getLogger(__name__)
-@attr.s
+@attr.s(slots=True)
class _NewEventInfo:
"""Holds information about a received event, ready for passing to _handle_new_events
@@ -117,7 +117,7 @@ class FederationHandler(BaseHandler):
"""
def __init__(self, hs):
- super(FederationHandler, self).__init__(hs)
+ super().__init__(hs)
self.hs = hs
@@ -130,7 +130,6 @@ class FederationHandler(BaseHandler):
self.keyring = hs.get_keyring()
self.action_generator = hs.get_action_generator()
self.is_mine_id = hs.is_mine_id
- self.pusher_pool = hs.get_pusherpool()
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._message_handler = hs.get_message_handler()
@@ -141,9 +140,6 @@ class FederationHandler(BaseHandler):
self._replication = hs.get_replication_data_handler()
self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
- self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client(
- hs
- )
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
)
@@ -704,31 +700,10 @@ class FederationHandler(BaseHandler):
logger.debug("[%s %s] Processing event: %s", room_id, event_id, event)
try:
- context = await self._handle_new_event(origin, event, state=state)
+ await self._handle_new_event(origin, event, state=state)
except AuthError as e:
raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
- if event.type == EventTypes.Member:
- if event.membership == Membership.JOIN:
- # Only fire user_joined_room if the user has acutally
- # joined the room. Don't bother if the user is just
- # changing their profile info.
- newly_joined = True
-
- prev_state_ids = await context.get_prev_state_ids()
-
- prev_state_id = prev_state_ids.get((event.type, event.state_key))
- if prev_state_id:
- prev_state = await self.store.get_event(
- prev_state_id, allow_none=True
- )
- if prev_state and prev_state.membership == Membership.JOIN:
- newly_joined = False
-
- if newly_joined:
- user = UserID.from_string(event.state_key)
- await self.user_joined_room(user, room_id)
-
# For encrypted messages we check that we know about the sending device,
# if we don't then we mark the device cache for that user as stale.
if event.type == EventTypes.Encrypted:
@@ -923,7 +898,8 @@ class FederationHandler(BaseHandler):
)
)
- await self._handle_new_events(dest, ev_infos, backfilled=True)
+ if ev_infos:
+ await self._handle_new_events(dest, room_id, ev_infos, backfilled=True)
# Step 2: Persist the rest of the events in the chunk one by one
events.sort(key=lambda e: e.depth)
@@ -1265,7 +1241,7 @@ class FederationHandler(BaseHandler):
event_infos.append(_NewEventInfo(event, None, auth))
await self._handle_new_events(
- destination, event_infos,
+ destination, room_id, event_infos,
)
def _sanity_check_event(self, ev):
@@ -1412,15 +1388,15 @@ class FederationHandler(BaseHandler):
)
max_stream_id = await self._persist_auth_tree(
- origin, auth_chain, state, event, room_version_obj
+ origin, room_id, auth_chain, state, event, room_version_obj
)
# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
- #
- # TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
- self.config.worker.writers.events, "events", max_stream_id
+ self.config.worker.events_shard_config.get_instance(room_id),
+ "events",
+ max_stream_id,
)
# Check whether this room is the result of an upgrade of a room we already know
@@ -1599,11 +1575,6 @@ class FederationHandler(BaseHandler):
event.signatures,
)
- if event.type == EventTypes.Member:
- if event.content["membership"] == Membership.JOIN:
- user = UserID.from_string(event.state_key)
- await self.user_joined_room(user, event.room_id)
-
prev_state_ids = await context.get_prev_state_ids()
state_ids = list(prev_state_ids.values())
@@ -1674,7 +1645,7 @@ class FederationHandler(BaseHandler):
)
context = await self.state_handler.compute_event_context(event)
- await self.persist_events_and_notify([(event, context)])
+ await self.persist_events_and_notify(event.room_id, [(event, context)])
return event
@@ -1701,7 +1672,9 @@ class FederationHandler(BaseHandler):
await self.federation_client.send_leave(host_list, event)
context = await self.state_handler.compute_event_context(event)
- stream_id = await self.persist_events_and_notify([(event, context)])
+ stream_id = await self.persist_events_and_notify(
+ event.room_id, [(event, context)]
+ )
return event, stream_id
@@ -1949,7 +1922,7 @@ class FederationHandler(BaseHandler):
)
await self.persist_events_and_notify(
- [(event, context)], backfilled=backfilled
+ event.room_id, [(event, context)], backfilled=backfilled
)
except Exception:
run_in_background(
@@ -1962,6 +1935,7 @@ class FederationHandler(BaseHandler):
async def _handle_new_events(
self,
origin: str,
+ room_id: str,
event_infos: Iterable[_NewEventInfo],
backfilled: bool = False,
) -> None:
@@ -1993,6 +1967,7 @@ class FederationHandler(BaseHandler):
)
await self.persist_events_and_notify(
+ room_id,
[
(ev_info.event, context)
for ev_info, context in zip(event_infos, contexts)
@@ -2003,6 +1978,7 @@ class FederationHandler(BaseHandler):
async def _persist_auth_tree(
self,
origin: str,
+ room_id: str,
auth_events: List[EventBase],
state: List[EventBase],
event: EventBase,
@@ -2017,6 +1993,7 @@ class FederationHandler(BaseHandler):
Args:
origin: Where the events came from
+ room_id,
auth_events
state
event
@@ -2091,17 +2068,20 @@ class FederationHandler(BaseHandler):
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
await self.persist_events_and_notify(
+ room_id,
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
- ]
+ ],
)
new_event_context = await self.state_handler.compute_event_context(
event, old_state=state
)
- return await self.persist_events_and_notify([(event, new_event_context)])
+ return await self.persist_events_and_notify(
+ room_id, [(event, new_event_context)]
+ )
async def _prep_event(
self,
@@ -2952,6 +2932,7 @@ class FederationHandler(BaseHandler):
async def persist_events_and_notify(
self,
+ room_id: str,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> int:
@@ -2959,20 +2940,25 @@ class FederationHandler(BaseHandler):
necessary.
Args:
- event_and_contexts:
+ room_id: The room ID of events being persisted.
+ event_and_contexts: Sequence of events with their associated
+ context that should be persisted. All events must belong to
+ the same room.
backfilled: Whether these events are a result of
backfilling or not
"""
- if self.config.worker.writers.events != self._instance_name:
+ instance = self.config.worker.events_shard_config.get_instance(room_id)
+ if instance != self._instance_name:
result = await self._send_events(
- instance_name=self.config.worker.writers.events,
+ instance_name=instance,
store=self.store,
+ room_id=room_id,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
)
return result["max_stream_id"]
else:
- max_stream_id = await self.storage.persistence.persist_events(
+ max_stream_token = await self.storage.persistence.persist_events(
event_and_contexts, backfilled=backfilled
)
@@ -2983,12 +2969,12 @@ class FederationHandler(BaseHandler):
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
- await self._notify_persisted_event(event, max_stream_id)
+ await self._notify_persisted_event(event, max_stream_token)
- return max_stream_id
+ return max_stream_token.stream
async def _notify_persisted_event(
- self, event: EventBase, max_stream_id: int
+ self, event: EventBase, max_stream_token: RoomStreamToken
) -> None:
"""Checks to see if notifier/pushers should be notified about the
event or not.
@@ -3014,13 +3000,13 @@ class FederationHandler(BaseHandler):
elif event.internal_metadata.is_outlier():
return
- event_stream_id = event.internal_metadata.stream_ordering
+ event_pos = PersistedEventPosition(
+ self._instance_name, event.internal_metadata.stream_ordering
+ )
self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
+ event, event_pos, max_stream_token, extra_users=extra_users
)
- await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
-
async def _clean_room_for_join(self, room_id: str) -> None:
"""Called to clean up any data in DB for a given room, ready for the
server to join the room.
@@ -3033,16 +3019,6 @@ class FederationHandler(BaseHandler):
else:
await self.store.clean_room_for_join(room_id)
- async def user_joined_room(self, user: UserID, room_id: str) -> None:
- """Called when a new user has joined the room
- """
- if self.config.worker_app:
- await self._notify_user_membership_change(
- room_id=room_id, user_id=user.to_string(), change="joined"
- )
- else:
- user_joined_room(self.distributor, user, room_id)
-
async def get_room_complexity(
self, remote_room_hosts: List[str], room_id: str
) -> Optional[dict]:
|