diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index db417d60de..ee4666337a 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -234,7 +234,9 @@ class DeviceWorkerHandler(BaseHandler):
return result
async def on_federation_query_user_devices(self, user_id):
- stream_id, devices = await self.store.get_devices_with_keys_by_user(user_id)
+ stream_id, devices = await self.store.get_e2e_device_keys_for_federation_query(
+ user_id
+ )
master_key = await self.store.get_e2e_cross_signing_key(user_id, "master")
self_signing_key = await self.store.get_e2e_cross_signing_key(
user_id, "self_signing"
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index d8def45e38..dfd1c78549 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -353,7 +353,7 @@ class E2eKeysHandler(object):
# make sure that each queried user appears in the result dict
result_dict[user_id] = {}
- results = await self.store.get_e2e_device_keys(local_query)
+ results = await self.store.get_e2e_device_keys_for_cs_api(local_query)
# Build the result structure
for user_id, device_keys in results.items():
@@ -734,7 +734,7 @@ class E2eKeysHandler(object):
# fetch our stored devices. This is used to 1. verify
# signatures on the master key, and 2. to compare with what
# was sent if the device was signed
- devices = await self.store.get_e2e_device_keys([(user_id, None)])
+ devices = await self.store.get_e2e_device_keys_for_cs_api([(user_id, None)])
if user_id not in devices:
raise NotFoundError("No device keys found")
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 16389a0dca..bd8efbb768 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -923,7 +923,8 @@ class FederationHandler(BaseHandler):
)
)
- await self._handle_new_events(dest, ev_infos, backfilled=True)
+ if ev_infos:
+ await self._handle_new_events(dest, room_id, ev_infos, backfilled=True)
# Step 2: Persist the rest of the events in the chunk one by one
events.sort(key=lambda e: e.depth)
@@ -1216,7 +1217,7 @@ class FederationHandler(BaseHandler):
event_infos.append(_NewEventInfo(event, None, auth))
await self._handle_new_events(
- destination, event_infos,
+ destination, room_id, event_infos,
)
def _sanity_check_event(self, ev):
@@ -1363,15 +1364,15 @@ class FederationHandler(BaseHandler):
)
max_stream_id = await self._persist_auth_tree(
- origin, auth_chain, state, event, room_version_obj
+ origin, room_id, auth_chain, state, event, room_version_obj
)
# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
- #
- # TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
- self.config.worker.writers.events, "events", max_stream_id
+ self.config.worker.events_shard_config.get_instance(room_id),
+ "events",
+ max_stream_id,
)
# Check whether this room is the result of an upgrade of a room we already know
@@ -1625,7 +1626,7 @@ class FederationHandler(BaseHandler):
)
context = await self.state_handler.compute_event_context(event)
- await self.persist_events_and_notify([(event, context)])
+ await self.persist_events_and_notify(event.room_id, [(event, context)])
return event
@@ -1652,7 +1653,9 @@ class FederationHandler(BaseHandler):
await self.federation_client.send_leave(host_list, event)
context = await self.state_handler.compute_event_context(event)
- stream_id = await self.persist_events_and_notify([(event, context)])
+ stream_id = await self.persist_events_and_notify(
+ event.room_id, [(event, context)]
+ )
return event, stream_id
@@ -1900,7 +1903,7 @@ class FederationHandler(BaseHandler):
)
await self.persist_events_and_notify(
- [(event, context)], backfilled=backfilled
+ event.room_id, [(event, context)], backfilled=backfilled
)
except Exception:
run_in_background(
@@ -1913,6 +1916,7 @@ class FederationHandler(BaseHandler):
async def _handle_new_events(
self,
origin: str,
+ room_id: str,
event_infos: Iterable[_NewEventInfo],
backfilled: bool = False,
) -> None:
@@ -1944,6 +1948,7 @@ class FederationHandler(BaseHandler):
)
await self.persist_events_and_notify(
+ room_id,
[
(ev_info.event, context)
for ev_info, context in zip(event_infos, contexts)
@@ -1954,6 +1959,7 @@ class FederationHandler(BaseHandler):
async def _persist_auth_tree(
self,
origin: str,
+ room_id: str,
auth_events: List[EventBase],
state: List[EventBase],
event: EventBase,
@@ -1968,6 +1974,7 @@ class FederationHandler(BaseHandler):
Args:
origin: Where the events came from
+ room_id,
auth_events
state
event
@@ -2042,17 +2049,20 @@ class FederationHandler(BaseHandler):
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
await self.persist_events_and_notify(
+ room_id,
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
- ]
+ ],
)
new_event_context = await self.state_handler.compute_event_context(
event, old_state=state
)
- return await self.persist_events_and_notify([(event, new_event_context)])
+ return await self.persist_events_and_notify(
+ room_id, [(event, new_event_context)]
+ )
async def _prep_event(
self,
@@ -2903,6 +2913,7 @@ class FederationHandler(BaseHandler):
async def persist_events_and_notify(
self,
+ room_id: str,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> int:
@@ -2910,14 +2921,19 @@ class FederationHandler(BaseHandler):
necessary.
Args:
- event_and_contexts:
+ room_id: The room ID of events being persisted.
+ event_and_contexts: Sequence of events with their associated
+ context that should be persisted. All events must belong to
+ the same room.
backfilled: Whether these events are a result of
backfilling or not
"""
- if self.config.worker.writers.events != self._instance_name:
+ instance = self.config.worker.events_shard_config.get_instance(room_id)
+ if instance != self._instance_name:
result = await self._send_events(
- instance_name=self.config.worker.writers.events,
+ instance_name=instance,
store=self.store,
+ room_id=room_id,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7a48c69163..0016af44be 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -49,14 +49,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
-from synapse.types import (
- Collection,
- Requester,
- RoomAlias,
- StreamToken,
- UserID,
- create_requester,
-)
+from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
from synapse.util import json_decoder
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
@@ -383,9 +376,8 @@ class EventCreationHandler(object):
self.notifier = hs.get_notifier()
self.config = hs.config
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
- self._is_event_writer = (
- self.config.worker.writers.events == hs.get_instance_name()
- )
+ self._events_shard_config = self.config.worker.events_shard_config
+ self._instance_name = hs.get_instance_name()
self.room_invite_state_types = self.hs.config.room_invite_state_types
@@ -448,7 +440,7 @@ class EventCreationHandler(object):
event_dict: dict,
token_id: Optional[str] = None,
txn_id: Optional[str] = None,
- prev_event_ids: Optional[Collection[str]] = None,
+ prev_event_ids: Optional[List[str]] = None,
require_consent: bool = True,
) -> Tuple[EventBase, EventContext]:
"""
@@ -788,7 +780,7 @@ class EventCreationHandler(object):
self,
builder: EventBuilder,
requester: Optional[Requester] = None,
- prev_event_ids: Optional[Collection[str]] = None,
+ prev_event_ids: Optional[List[str]] = None,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client
@@ -913,9 +905,10 @@ class EventCreationHandler(object):
try:
# If we're a worker we need to hit out to the master.
- if not self._is_event_writer:
+ writer_instance = self._events_shard_config.get_instance(event.room_id)
+ if writer_instance != self._instance_name:
result = await self.send_event(
- instance_name=self.config.worker.writers.events,
+ instance_name=writer_instance,
event_id=event.event_id,
store=self.store,
requester=requester,
@@ -983,7 +976,9 @@ class EventCreationHandler(object):
This should only be run on the instance in charge of persisting events.
"""
- assert self._is_event_writer
+ assert self._events_shard_config.should_handle(
+ self._instance_name, event.room_id
+ )
if ratelimit:
# We check if this is a room admin redacting an event so that we
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index ac3418d69d..5a1aa7d830 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -14,15 +14,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+from typing import Any, Dict, Optional
from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
+from synapse.api.filtering import Filter
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
-from synapse.types import RoomStreamToken
+from synapse.streams.config import PaginationConfig
+from synapse.types import Requester, RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
@@ -247,15 +250,16 @@ class PaginationHandler(object):
)
return purge_id
- async def _purge_history(self, purge_id, room_id, token, delete_local_events):
+ async def _purge_history(
+ self, purge_id: str, room_id: str, token: str, delete_local_events: bool
+ ) -> None:
"""Carry out a history purge on a room.
Args:
- purge_id (str): The id for this purge
- room_id (str): The room to purge from
- token (str): topological token to delete events before
- delete_local_events (bool): True to delete local events as well as
- remote ones
+ purge_id: The id for this purge
+ room_id: The room to purge from
+ token: topological token to delete events before
+ delete_local_events: True to delete local events as well as remote ones
"""
self._purges_in_progress_by_room.add(room_id)
try:
@@ -291,9 +295,9 @@ class PaginationHandler(object):
"""
return self._purges_by_id.get(purge_id)
- async def purge_room(self, room_id):
+ async def purge_room(self, room_id: str) -> None:
"""Purge the given room from the database"""
- with (await self.pagination_lock.write(room_id)):
+ with await self.pagination_lock.write(room_id):
# check we know about the room
await self.store.get_room_version_id(room_id)
@@ -307,23 +311,22 @@ class PaginationHandler(object):
async def get_messages(
self,
- requester,
- room_id=None,
- pagin_config=None,
- as_client_event=True,
- event_filter=None,
- ):
+ requester: Requester,
+ room_id: Optional[str] = None,
+ pagin_config: Optional[PaginationConfig] = None,
+ as_client_event: bool = True,
+ event_filter: Optional[Filter] = None,
+ ) -> Dict[str, Any]:
"""Get messages in a room.
Args:
- requester (Requester): The user requesting messages.
- room_id (str): The room they want messages from.
- pagin_config (synapse.api.streams.PaginationConfig): The pagination
- config rules to apply, if any.
- as_client_event (bool): True to get events in client-server format.
- event_filter (Filter): Filter to apply to results or None
+ requester: The user requesting messages.
+ room_id: The room they want messages from.
+ pagin_config: The pagination config rules to apply, if any.
+ as_client_event: True to get events in client-server format.
+ event_filter: Filter to apply to results or None
Returns:
- dict: Pagination API results
+ Pagination API results
"""
user_id = requester.user.to_string()
@@ -343,7 +346,7 @@ class PaginationHandler(object):
source_config = pagin_config.get_source_config("room")
- with (await self.pagination_lock.read(room_id)):
+ with await self.pagination_lock.read(room_id):
(
membership,
member_event_id,
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 96c9d6bab4..0cb8fad89a 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -161,6 +161,9 @@ class BaseProfileHandler(BaseHandler):
Codes.FORBIDDEN,
)
+ if not isinstance(new_displayname, str):
+ raise SynapseError(400, "Invalid displayname")
+
if len(new_displayname) > MAX_DISPLAYNAME_LEN:
raise SynapseError(
400, "Displayname is too long (max %i)" % (MAX_DISPLAYNAME_LEN,)
@@ -235,6 +238,9 @@ class BaseProfileHandler(BaseHandler):
400, "Changing avatar is disabled on this server", Codes.FORBIDDEN
)
+ if not isinstance(new_avatar_url, str):
+ raise SynapseError(400, "Invalid displayname")
+
if len(new_avatar_url) > MAX_AVATAR_URL_LEN:
raise SynapseError(
400, "Avatar URL is too long (max %i)" % (MAX_AVATAR_URL_LEN,)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 9d5b1828df..55794c3057 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -804,7 +804,9 @@ class RoomCreationHandler(BaseHandler):
# Always wait for room creation to progate before returning
await self._replication.wait_for_stream_position(
- self.hs.config.worker.writers.events, "events", last_stream_id
+ self.hs.config.worker.events_shard_config.get_instance(room_id),
+ "events",
+ last_stream_id,
)
return result, last_stream_id
@@ -1260,10 +1262,10 @@ class RoomShutdownHandler(object):
# We now wait for the create room to come back in via replication so
# that we can assume that all the joins/invites have propogated before
# we try and auto join below.
- #
- # TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
- self.hs.config.worker.writers.events, "events", stream_id
+ self.hs.config.worker.events_shard_config.get_instance(new_room_id),
+ "events",
+ stream_id,
)
else:
new_room_id = None
@@ -1293,7 +1295,9 @@ class RoomShutdownHandler(object):
# Wait for leave to come in over replication before trying to forget.
await self._replication.wait_for_stream_position(
- self.hs.config.worker.writers.events, "events", stream_id
+ self.hs.config.worker.events_shard_config.get_instance(room_id),
+ "events",
+ stream_id,
)
await self.room_member_handler.forget(target_requester.user, room_id)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 1017ae6b19..ed1d1bd83d 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -38,15 +38,7 @@ from synapse.events.builder import create_local_event_from_event_dict
from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
from synapse.storage.roommember import RoomsForUser
-from synapse.types import (
- Collection,
- JsonDict,
- Requester,
- RoomAlias,
- RoomID,
- StateMap,
- UserID,
-)
+from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
@@ -91,13 +83,6 @@ class RoomMemberHandler(object):
self._enable_lookup = hs.config.enable_3pid_lookup
self.allow_per_room_profiles = self.config.allow_per_room_profiles
- self._event_stream_writer_instance = hs.config.worker.writers.events
- self._is_on_event_persistence_instance = (
- self._event_stream_writer_instance == hs.get_instance_name()
- )
- if self._is_on_event_persistence_instance:
- self.persist_event_storage = hs.get_storage().persistence
-
self._join_rate_limiter_local = Ratelimiter(
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
@@ -185,7 +170,7 @@ class RoomMemberHandler(object):
target: UserID,
room_id: str,
membership: str,
- prev_event_ids: Collection[str],
+ prev_event_ids: List[str],
txn_id: Optional[str] = None,
ratelimit: bool = True,
content: Optional[dict] = None,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 8118206f8e..c281ff163a 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -16,7 +16,7 @@
import itertools
import logging
-from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple
import attr
from prometheus_client import Counter
@@ -44,6 +44,9 @@ from synapse.util.caches.response_cache import ResponseCache
from synapse.util.metrics import Measure, measure_func
from synapse.visibility import filter_events_for_client
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
logger = logging.getLogger(__name__)
# Debug logger for https://github.com/matrix-org/synapse/issues/4422
@@ -96,7 +99,12 @@ class TimelineBatch:
__bool__ = __nonzero__ # python3
-@attr.s(slots=True, frozen=True)
+# We can't freeze this class, because we need to update it after it's instantiated to
+# update its unread count. This is because we calculate the unread count for a room only
+# if there are updates for it, which we check after the instance has been created.
+# This should not be a big deal because we update the notification counts afterwards as
+# well anyway.
+@attr.s(slots=True)
class JoinedSyncResult:
room_id = attr.ib(type=str)
timeline = attr.ib(type=TimelineBatch)
@@ -105,6 +113,7 @@ class JoinedSyncResult:
account_data = attr.ib(type=List[JsonDict])
unread_notifications = attr.ib(type=JsonDict)
summary = attr.ib(type=Optional[JsonDict])
+ unread_count = attr.ib(type=int)
def __nonzero__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
@@ -239,7 +248,7 @@ class SyncResult:
class SyncHandler(object):
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
self.hs_config = hs.config
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
@@ -714,9 +723,8 @@ class SyncHandler(object):
]
missing_hero_state = await self.store.get_events(missing_hero_event_ids)
- missing_hero_state = missing_hero_state.values()
- for s in missing_hero_state:
+ for s in missing_hero_state.values():
cache.set(s.state_key, s.event_id)
state[(EventTypes.Member, s.state_key)] = s
@@ -934,7 +942,7 @@ class SyncHandler(object):
async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
- ) -> Optional[Dict[str, str]]:
+ ) -> Dict[str, int]:
with Measure(self.clock, "unread_notifs_for_room_id"):
last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
user_id=sync_config.user.to_string(),
@@ -942,15 +950,10 @@ class SyncHandler(object):
receipt_type="m.read",
)
- if last_unread_event_id:
- notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
- room_id, sync_config.user.to_string(), last_unread_event_id
- )
- return notifs
-
- # There is no new information in this period, so your notification
- # count is whatever it was last time.
- return None
+ notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
+ room_id, sync_config.user.to_string(), last_unread_event_id
+ )
+ return notifs
async def generate_sync_result(
self,
@@ -1773,7 +1776,7 @@ class SyncHandler(object):
ignored_users: Set[str],
room_builder: "RoomSyncResultBuilder",
ephemeral: List[JsonDict],
- tags: Optional[List[JsonDict]],
+ tags: Optional[Dict[str, Dict[str, Any]]],
account_data: Dict[str, JsonDict],
always_include: bool = False,
):
@@ -1889,7 +1892,7 @@ class SyncHandler(object):
)
if room_builder.rtype == "joined":
- unread_notifications = {} # type: Dict[str, str]
+ unread_notifications = {} # type: Dict[str, int]
room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
@@ -1898,14 +1901,16 @@ class SyncHandler(object):
account_data=account_data_events,
unread_notifications=unread_notifications,
summary=summary,
+ unread_count=0,
)
if room_sync or always_include:
notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
- if notifs is not None:
- unread_notifications["notification_count"] = notifs["notify_count"]
- unread_notifications["highlight_count"] = notifs["highlight_count"]
+ unread_notifications["notification_count"] = notifs["notify_count"]
+ unread_notifications["highlight_count"] = notifs["highlight_count"]
+
+ room_sync.unread_count = notifs["unread_count"]
sync_result_builder.joined.append(room_sync)
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 521b6d620d..e21f8dbc58 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -234,7 +234,7 @@ class UserDirectoryHandler(StateDeltasHandler):
async def _handle_room_publicity_change(
self, room_id, prev_event_id, event_id, typ
):
- """Handle a room having potentially changed from/to world_readable/publically
+ """Handle a room having potentially changed from/to world_readable/publicly
joinable.
Args:
@@ -388,9 +388,15 @@ class UserDirectoryHandler(StateDeltasHandler):
prev_name = prev_event.content.get("displayname")
new_name = event.content.get("displayname")
+ # If the new name is an unexpected form, do not update the directory.
+ if not isinstance(new_name, str):
+ new_name = prev_name
prev_avatar = prev_event.content.get("avatar_url")
new_avatar = event.content.get("avatar_url")
+ # If the new avatar is an unexpected form, do not update the directory.
+ if not isinstance(new_avatar, str):
+ new_avatar = prev_avatar
if prev_name != new_name or prev_avatar != new_avatar:
await self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
|