diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 918d0e037c..5e5a64037d 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -125,8 +125,8 @@ class AdminHandler(BaseHandler):
else:
stream_ordering = room.stream_ordering
- from_key = str(RoomStreamToken(0, 0))
- to_key = str(RoomStreamToken(None, stream_ordering))
+ from_key = RoomStreamToken(0, 0)
+ to_key = RoomStreamToken(None, stream_ordering)
written_events = set() # Events that we've processed in this room
@@ -153,7 +153,7 @@ class AdminHandler(BaseHandler):
if not events:
break
- from_key = events[-1].internal_metadata.after
+ from_key = RoomStreamToken.parse(events[-1].internal_metadata.after)
events = await filter_events_for_client(self.storage, user_id, events)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 643d71a710..4b0a4f96cc 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -29,6 +29,7 @@ from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import (
RoomStreamToken,
+ StreamToken,
get_domain_from_id,
get_verify_key_from_cross_signing_key,
)
@@ -104,18 +105,15 @@ class DeviceWorkerHandler(BaseHandler):
@trace
@measure_func("device.get_user_ids_changed")
- async def get_user_ids_changed(self, user_id, from_token):
+ async def get_user_ids_changed(self, user_id: str, from_token: StreamToken):
"""Get list of users that have had the devices updated, or have newly
joined a room, that `user_id` may be interested in.
-
- Args:
- user_id (str)
- from_token (StreamToken)
"""
set_tag("user_id", user_id)
set_tag("from_token", from_token)
- now_room_key = await self.store.get_room_events_max_id()
+ now_room_id = self.store.get_room_max_stream_ordering()
+ now_room_key = RoomStreamToken(None, now_room_id)
room_ids = await self.store.get_rooms_for_user(user_id)
@@ -142,7 +140,7 @@ class DeviceWorkerHandler(BaseHandler):
)
rooms_changed.update(event.room_id for event in member_events)
- stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key).stream
+ stream_ordering = from_token.room_key.stream
possibly_changed = set(changed)
possibly_left = set()
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index b05e32f457..fdce54c5c3 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -39,10 +39,6 @@ class EventStreamHandler(BaseHandler):
def __init__(self, hs: "HomeServer"):
super(EventStreamHandler, self).__init__(hs)
- self.distributor = hs.get_distributor()
- self.distributor.declare("started_user_eventstream")
- self.distributor.declare("stopped_user_eventstream")
-
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 43f2986f89..c195eba830 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -69,7 +69,6 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
ReplicationStoreRoomOnInviteRestServlet,
)
-from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
@@ -80,7 +79,6 @@ from synapse.types import (
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer, concurrently_execute
-from synapse.util.distributor import user_joined_room
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server
@@ -130,7 +128,6 @@ class FederationHandler(BaseHandler):
self.keyring = hs.get_keyring()
self.action_generator = hs.get_action_generator()
self.is_mine_id = hs.is_mine_id
- self.pusher_pool = hs.get_pusherpool()
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._message_handler = hs.get_message_handler()
@@ -141,9 +138,6 @@ class FederationHandler(BaseHandler):
self._replication = hs.get_replication_data_handler()
self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
- self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client(
- hs
- )
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
)
@@ -704,31 +698,10 @@ class FederationHandler(BaseHandler):
logger.debug("[%s %s] Processing event: %s", room_id, event_id, event)
try:
- context = await self._handle_new_event(origin, event, state=state)
+ await self._handle_new_event(origin, event, state=state)
except AuthError as e:
raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
- if event.type == EventTypes.Member:
- if event.membership == Membership.JOIN:
- # Only fire user_joined_room if the user has acutally
- # joined the room. Don't bother if the user is just
- # changing their profile info.
- newly_joined = True
-
- prev_state_ids = await context.get_prev_state_ids()
-
- prev_state_id = prev_state_ids.get((event.type, event.state_key))
- if prev_state_id:
- prev_state = await self.store.get_event(
- prev_state_id, allow_none=True
- )
- if prev_state and prev_state.membership == Membership.JOIN:
- newly_joined = False
-
- if newly_joined:
- user = UserID.from_string(event.state_key)
- await self.user_joined_room(user, room_id)
-
# For encrypted messages we check that we know about the sending device,
# if we don't then we mark the device cache for that user as stale.
if event.type == EventTypes.Encrypted:
@@ -1550,11 +1523,6 @@ class FederationHandler(BaseHandler):
event.signatures,
)
- if event.type == EventTypes.Member:
- if event.content["membership"] == Membership.JOIN:
- user = UserID.from_string(event.state_key)
- await self.user_joined_room(user, event.room_id)
-
prev_state_ids = await context.get_prev_state_ids()
state_ids = list(prev_state_ids.values())
@@ -2970,8 +2938,6 @@ class FederationHandler(BaseHandler):
event, event_stream_id, max_stream_id, extra_users=extra_users
)
- await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
-
async def _clean_room_for_join(self, room_id: str) -> None:
"""Called to clean up any data in DB for a given room, ready for the
server to join the room.
@@ -2984,16 +2950,6 @@ class FederationHandler(BaseHandler):
else:
await self.store.clean_room_for_join(room_id)
- async def user_joined_room(self, user: UserID, room_id: str) -> None:
- """Called when a new user has joined the room
- """
- if self.config.worker_app:
- await self._notify_user_membership_change(
- room_id=room_id, user_id=user.to_string(), change="joined"
- )
- else:
- user_joined_room(self.distributor, user, room_id)
-
async def get_room_complexity(
self, remote_room_hosts: List[str], room_id: str
) -> Optional[dict]:
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index d5ddc583ad..ba4828c713 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -25,7 +25,7 @@ from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.storage.roommember import RoomsForUser
from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester, StreamToken, UserID
+from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken, UserID
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
@@ -116,14 +116,13 @@ class InitialSyncHandler(BaseHandler):
now_token = self.hs.get_event_sources().get_current_token()
presence_stream = self.hs.get_event_sources().sources["presence"]
- pagination_config = PaginationConfig(from_token=now_token)
- presence, _ = await presence_stream.get_pagination_rows(
- user, pagination_config.get_source_config("presence"), None
+ presence, _ = await presence_stream.get_new_events(
+ user, from_key=None, include_offline=False
)
- receipt_stream = self.hs.get_event_sources().sources["receipt"]
- receipt, _ = await receipt_stream.get_pagination_rows(
- user, pagination_config.get_source_config("receipt"), None
+ joined_rooms = [r.room_id for r in room_list if r.membership == Membership.JOIN]
+ receipt = await self.store.get_linearized_receipts_for_rooms(
+ joined_rooms, to_key=int(now_token.receipt_key),
)
tags_by_room = await self.store.get_tags_for_user(user_id)
@@ -168,7 +167,7 @@ class InitialSyncHandler(BaseHandler):
self.state_handler.get_current_state, event.room_id
)
elif event.membership == Membership.LEAVE:
- room_end_token = "s%d" % (event.stream_ordering,)
+ room_end_token = RoomStreamToken(None, event.stream_ordering,)
deferred_room_state = run_in_background(
self.state_store.get_state_for_events, [event.event_id]
)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 8a7b4916cd..e54e2b322b 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -387,8 +387,6 @@ class EventCreationHandler:
# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)
- self.pusher_pool = hs.get_pusherpool()
-
# We arbitrarily limit concurrent event creation for a room to 5.
# This is to stop us from diverging history *too* much.
self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
@@ -975,6 +973,7 @@ class EventCreationHandler:
This should only be run on the instance in charge of persisting events.
"""
assert self._is_event_writer
+ assert self.storage.persistence is not None
if ratelimit:
# We check if this is a room admin redacting an event so that we
@@ -1145,8 +1144,6 @@ class EventCreationHandler:
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)
- await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
-
def _notify():
try:
self.notifier.on_new_room_event(
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 34ed0e2921..d929a68f7d 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -335,20 +335,16 @@ class PaginationHandler:
user_id = requester.user.to_string()
if pagin_config.from_token:
- room_token = pagin_config.from_token.room_key
+ from_token = pagin_config.from_token
else:
- pagin_config.from_token = (
- self.hs.get_event_sources().get_current_token_for_pagination()
- )
- room_token = pagin_config.from_token.room_key
-
- room_token = RoomStreamToken.parse(room_token)
+ from_token = self.hs.get_event_sources().get_current_token_for_pagination()
- pagin_config.from_token = pagin_config.from_token.copy_and_replace(
- "room_key", str(room_token)
- )
+ if pagin_config.limit is None:
+ # This shouldn't happen as we've set a default limit before this
+ # gets called.
+ raise Exception("limit not set")
- source_config = pagin_config.get_source_config("room")
+ room_token = from_token.room_key
with await self.pagination_lock.read(room_id):
(
@@ -358,7 +354,7 @@ class PaginationHandler:
room_id, user_id, allow_departed_users=True
)
- if source_config.direction == "b":
+ if pagin_config.direction == "b":
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
@@ -377,26 +373,35 @@ class PaginationHandler:
# case "JOIN" would have been returned.
assert member_event_id
- leave_token = await self.store.get_topological_token_for_event(
+ leave_token_str = await self.store.get_topological_token_for_event(
member_event_id
)
- if RoomStreamToken.parse(leave_token).topological < max_topo:
- source_config.from_key = str(leave_token)
+ leave_token = RoomStreamToken.parse(leave_token_str)
+ assert leave_token.topological is not None
+
+ if leave_token.topological < max_topo:
+ from_token = from_token.copy_and_replace(
+ "room_key", leave_token
+ )
await self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, max_topo
)
+ to_room_key = None
+ if pagin_config.to_token:
+ to_room_key = pagin_config.to_token.room_key
+
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
- from_key=source_config.from_key,
- to_key=source_config.to_key,
- direction=source_config.direction,
- limit=source_config.limit,
+ from_key=from_token.room_key,
+ to_key=to_room_key,
+ direction=pagin_config.direction,
+ limit=pagin_config.limit,
event_filter=event_filter,
)
- next_token = pagin_config.from_token.copy_and_replace("room_key", next_key)
+ next_token = from_token.copy_and_replace("room_key", next_key)
if events:
if event_filter:
@@ -409,7 +414,7 @@ class PaginationHandler:
if not events:
return {
"chunk": [],
- "start": pagin_config.from_token.to_string(),
+ "start": from_token.to_string(),
"end": next_token.to_string(),
}
@@ -438,7 +443,7 @@ class PaginationHandler:
events, time_now, as_client_event=as_client_event
)
),
- "start": pagin_config.from_token.to_string(),
+ "start": from_token.to_string(),
"end": next_token.to_string(),
}
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 91a3aec1cc..1000ac95ff 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1108,9 +1108,6 @@ class PresenceEventSource:
def get_current_key(self):
return self.store.get_current_presence_token()
- async def get_pagination_rows(self, user, pagination_config, key):
- return await self.get_new_events(user, from_key=None, include_offline=False)
-
@cached(num_args=2, cache_context=True)
async def _get_interested_in(self, user, explicit_room_id, cache_context):
"""Returns the set of users that the given user should see presence
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 2cc6c2eb68..bdd8e52edd 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -142,18 +142,3 @@ class ReceiptEventSource:
def get_current_key(self, direction="f"):
return self.store.get_max_receipt_stream_id()
-
- async def get_pagination_rows(self, user, config, key):
- to_key = int(config.from_key)
-
- if config.to_key:
- from_key = int(config.to_key)
- else:
- from_key = None
-
- room_ids = await self.store.get_rooms_for_user(user.to_string())
- events = await self.store.get_linearized_receipts_for_rooms(
- room_ids, from_key=from_key, to_key=to_key
- )
-
- return (events, to_key)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index a29305f655..53d85ab97d 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1091,20 +1091,19 @@ class RoomEventSource:
async def get_new_events(
self,
user: UserID,
- from_key: str,
+ from_key: RoomStreamToken,
limit: int,
room_ids: List[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
- ) -> Tuple[List[EventBase], str]:
+ ) -> Tuple[List[EventBase], RoomStreamToken]:
# We just ignore the key for now.
to_key = self.get_current_key()
- from_token = RoomStreamToken.parse(from_key)
- if from_token.topological:
+ if from_key.topological:
logger.warning("Stream has topological part!!!! %r", from_key)
- from_key = "s%s" % (from_token.stream,)
+ from_key = RoomStreamToken(None, from_key.stream)
app_service = self.store.get_app_service_by_user_id(user.to_string())
if app_service:
@@ -1133,14 +1132,14 @@ class RoomEventSource:
events[:] = events[:limit]
if events:
- end_key = events[-1].internal_metadata.after
+ end_key = RoomStreamToken.parse(events[-1].internal_metadata.after)
else:
end_key = to_key
return (events, end_key)
- def get_current_key(self) -> str:
- return "s%d" % (self.store.get_room_max_stream_ordering(),)
+ def get_current_key(self) -> RoomStreamToken:
+ return RoomStreamToken(None, self.store.get_room_max_stream_ordering())
def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
return self.store.get_room_events_max_id(room_id)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 32b7e323fa..100f335b80 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -40,7 +40,7 @@ from synapse.events.validator import EventValidator
from synapse.storage.roommember import RoomsForUser
from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
from synapse.util.async_helpers import Linearizer
-from synapse.util.distributor import user_joined_room, user_left_room
+from synapse.util.distributor import user_left_room
from ._base import BaseHandler
@@ -149,17 +149,6 @@ class RoomMemberHandler:
raise NotImplementedError()
@abc.abstractmethod
- async def _user_joined_room(self, target: UserID, room_id: str) -> None:
- """Notifies distributor on master process that the user has joined the
- room.
-
- Args:
- target
- room_id
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has left the
room.
@@ -221,7 +210,6 @@ class RoomMemberHandler:
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
- newly_joined = False
if event.membership == Membership.JOIN:
newly_joined = True
if prev_member_event_id:
@@ -246,12 +234,7 @@ class RoomMemberHandler:
requester, event, context, extra_users=[target], ratelimit=ratelimit,
)
- if event.membership == Membership.JOIN and newly_joined:
- # Only fire user_joined_room if the user has actually joined the
- # room. Don't bother if the user is just changing their profile
- # info.
- await self._user_joined_room(target, room_id)
- elif event.membership == Membership.LEAVE:
+ if event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN:
@@ -726,17 +709,7 @@ class RoomMemberHandler:
(EventTypes.Member, event.state_key), None
)
- if event.membership == Membership.JOIN:
- # Only fire user_joined_room if the user has actually joined the
- # room. Don't bother if the user is just changing their profile
- # info.
- newly_joined = True
- if prev_member_event_id:
- prev_member_event = await self.store.get_event(prev_member_event_id)
- newly_joined = prev_member_event.membership != Membership.JOIN
- if newly_joined:
- await self._user_joined_room(target_user, room_id)
- elif event.membership == Membership.LEAVE:
+ if event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN:
@@ -1002,10 +975,9 @@ class RoomMemberHandler:
class RoomMemberMasterHandler(RoomMemberHandler):
def __init__(self, hs):
- super(RoomMemberMasterHandler, self).__init__(hs)
+ super().__init__(hs)
self.distributor = hs.get_distributor()
- self.distributor.declare("user_joined_room")
self.distributor.declare("user_left_room")
async def _is_remote_room_too_complex(
@@ -1085,7 +1057,6 @@ class RoomMemberMasterHandler(RoomMemberHandler):
event_id, stream_id = await self.federation_handler.do_invite_join(
remote_room_hosts, room_id, user.to_string(), content
)
- await self._user_joined_room(user, room_id)
# Check the room we just joined wasn't too large, if we didn't fetch the
# complexity of it before.
@@ -1228,11 +1199,6 @@ class RoomMemberMasterHandler(RoomMemberHandler):
)
return event.event_id, stream_id
- async def _user_joined_room(self, target: UserID, room_id: str) -> None:
- """Implements RoomMemberHandler._user_joined_room
- """
- user_joined_room(self.distributor, target, room_id)
-
async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_left_room
"""
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 897338fd54..e7f34737c6 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -57,8 +57,6 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
content=content,
)
- await self._user_joined_room(user, room_id)
-
return ret["event_id"], ret["stream_id"]
async def remote_reject_invite(
@@ -81,13 +79,6 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
)
return ret["event_id"], ret["stream_id"]
- async def _user_joined_room(self, target: UserID, room_id: str) -> None:
- """Implements RoomMemberHandler._user_joined_room
- """
- await self._notify_change_client(
- user_id=target.to_string(), room_id=room_id, change="joined"
- )
-
async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_left_room
"""
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index e2ddb628ff..a615c7c2f0 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -378,7 +378,7 @@ class SyncHandler:
sync_config = sync_result_builder.sync_config
with Measure(self.clock, "ephemeral_by_room"):
- typing_key = since_token.typing_key if since_token else "0"
+ typing_key = since_token.typing_key if since_token else 0
room_ids = sync_result_builder.joined_room_ids
@@ -402,7 +402,7 @@ class SyncHandler:
event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
- receipt_key = since_token.receipt_key if since_token else "0"
+ receipt_key = since_token.receipt_key if since_token else 0
receipt_source = self.event_sources.sources["receipt"]
receipts, receipt_key = await receipt_source.get_new_events(
@@ -533,7 +533,7 @@ class SyncHandler:
if len(recents) > timeline_limit:
limited = True
recents = recents[-timeline_limit:]
- room_key = recents[0].internal_metadata.before
+ room_key = RoomStreamToken.parse(recents[0].internal_metadata.before)
prev_batch_token = now_token.copy_and_replace("room_key", room_key)
@@ -1310,12 +1310,11 @@ class SyncHandler:
presence_source = self.event_sources.sources["presence"]
since_token = sync_result_builder.since_token
+ presence_key = None
+ include_offline = False
if since_token and not sync_result_builder.full_state:
presence_key = since_token.presence_key
include_offline = True
- else:
- presence_key = None
- include_offline = False
presence, presence_key = await presence_source.get_new_events(
user=user,
@@ -1323,6 +1322,7 @@ class SyncHandler:
is_guest=sync_config.is_guest,
include_offline=include_offline,
)
+ assert presence_key
sync_result_builder.now_token = now_token.copy_and_replace(
"presence_key", presence_key
)
@@ -1485,7 +1485,7 @@ class SyncHandler:
if rooms_changed:
return True
- stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
+ stream_id = since_token.room_key.stream
for room_id in sync_result_builder.joined_room_ids:
if self.store.has_room_changed_since(room_id, stream_id):
return True
@@ -1751,7 +1751,7 @@ class SyncHandler:
continue
leave_token = now_token.copy_and_replace(
- "room_key", "s%d" % (event.stream_ordering,)
+ "room_key", RoomStreamToken(None, event.stream_ordering)
)
room_entries.append(
RoomSyncResultBuilder(
|