diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index d46f4aaeb1..2b2ee8612a 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -38,7 +38,7 @@ from synapse.metrics import (
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util.metrics import measure_func
+from synapse.util.metrics import Measure, measure_func
logger = logging.getLogger(__name__)
@@ -183,8 +183,8 @@ class FederationSender(object):
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
- destinations = yield self.state.get_current_hosts_in_room(
- event.room_id, latest_event_ids=event.prev_event_ids()
+ destinations = yield self.state.get_hosts_in_room_at_events(
+ event.room_id, event_ids=event.prev_event_ids()
)
except Exception:
logger.exception(
@@ -207,8 +207,9 @@ class FederationSender(object):
@defer.inlineCallbacks
def handle_room_events(events):
- for event in events:
- yield handle_event(event)
+ with Measure(self.clock, "handle_room_events"):
+ for event in events:
+ yield handle_event(event)
events_by_room = {}
for event in events:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f72b81d419..50fc0fde2a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2570,7 +2570,7 @@ class FederationHandler(BaseHandler):
)
try:
- self.auth.check_from_context(room_version, event, context)
+ yield self.auth.check_from_context(room_version, event, context)
except AuthError as e:
logger.warn("Denying third party invite %r because %s", event, e)
raise e
@@ -2599,7 +2599,12 @@ class FederationHandler(BaseHandler):
original_invite_id, allow_none=True
)
if original_invite:
- display_name = original_invite.content["display_name"]
+ # If the m.room.third_party_invite event's content is empty, it means the
+ # invite has been revoked. In this case, we don't have to raise an error here
+ # because the auth check will fail on the invite (because it's not able to
+ # fetch public keys from the m.room.third_party_invite event's content, which
+ # is empty).
+ display_name = original_invite.content.get("display_name")
event_dict["content"]["third_party_invite"]["display_name"] = display_name
else:
logger.info(
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index cbac7c347a..c62b113115 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -293,6 +293,7 @@ class StatsHandler(StateDeltasHandler):
room_state["guest_access"] = event_content.get("guest_access")
for room_id, state in room_to_state_updates.items():
+ logger.info("Updating room_stats_state for %s: %s", room_id, state)
yield self.store.update_room_state(room_id, state)
return room_to_stats_deltas, user_to_stats_deltas
diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py
index 5fefee4dde..65bbf00073 100644
--- a/synapse/rest/media/v1/_base.py
+++ b/synapse/rest/media/v1/_base.py
@@ -195,7 +195,7 @@ def respond_with_responder(request, responder, media_type, file_size, upload_nam
respond_404(request)
return
- logger.debug("Responding to media request with responder %s")
+ logger.debug("Responding to media request with responder %s", responder)
add_file_headers(request, media_type, file_size, upload_name)
try:
with responder:
diff --git a/synapse/rest/media/v1/thumbnailer.py b/synapse/rest/media/v1/thumbnailer.py
index c995d7e043..8cf415e29d 100644
--- a/synapse/rest/media/v1/thumbnailer.py
+++ b/synapse/rest/media/v1/thumbnailer.py
@@ -82,13 +82,21 @@ class Thumbnailer(object):
else:
return (max_height * self.width) // self.height, max_height
+ def _resize(self, width, height):
+ # 1-bit or 8-bit color palette images need converting to RGB
+ # otherwise they will be scaled using nearest neighbour which
+ # looks awful
+ if self.image.mode in ["1", "P"]:
+ self.image = self.image.convert("RGB")
+ return self.image.resize((width, height), Image.ANTIALIAS)
+
def scale(self, width, height, output_type):
"""Rescales the image to the given dimensions.
Returns:
BytesIO: the bytes of the encoded image ready to be written to disk
"""
- scaled = self.image.resize((width, height), Image.ANTIALIAS)
+ scaled = self._resize(width, height)
return self._encode_image(scaled, output_type)
def crop(self, width, height, output_type):
@@ -107,13 +115,13 @@ class Thumbnailer(object):
"""
if width * self.height > height * self.width:
scaled_height = (width * self.height) // self.width
- scaled_image = self.image.resize((width, scaled_height), Image.ANTIALIAS)
+ scaled_image = self._resize(width, scaled_height)
crop_top = (scaled_height - height) // 2
crop_bottom = height + crop_top
cropped = scaled_image.crop((0, crop_top, width, crop_bottom))
else:
scaled_width = (height * self.width) // self.height
- scaled_image = self.image.resize((scaled_width, height), Image.ANTIALIAS)
+ scaled_image = self._resize(scaled_width, height)
crop_left = (scaled_width - width) // 2
crop_right = width + crop_left
cropped = scaled_image.crop((crop_left, 0, crop_right, height))
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 2b0f4c79ee..dc9f5a9008 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -33,7 +33,7 @@ from synapse.state import v1, v2
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.metrics import Measure
+from synapse.util.metrics import Measure, measure_func
logger = logging.getLogger(__name__)
@@ -191,11 +191,22 @@ class StateHandler(object):
return joined_users
@defer.inlineCallbacks
- def get_current_hosts_in_room(self, room_id, latest_event_ids=None):
- if not latest_event_ids:
- latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
- logger.debug("calling resolve_state_groups from get_current_hosts_in_room")
- entry = yield self.resolve_state_groups_for_events(room_id, latest_event_ids)
+ def get_current_hosts_in_room(self, room_id):
+ event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
+ return (yield self.get_hosts_in_room_at_events(room_id, event_ids))
+
+ @defer.inlineCallbacks
+ def get_hosts_in_room_at_events(self, room_id, event_ids):
+ """Get the hosts that were in a room at the given event ids
+
+ Args:
+ room_id (str):
+ event_ids (list[str]):
+
+ Returns:
+ Deferred[list[str]]: the hosts in the room at the given events
+ """
+ entry = yield self.resolve_state_groups_for_events(room_id, event_ids)
joined_hosts = yield self.store.get_joined_hosts(room_id, entry)
return joined_hosts
@@ -344,6 +355,7 @@ class StateHandler(object):
return context
+ @measure_func()
@defer.inlineCallbacks
def resolve_state_groups_for_events(self, room_id, event_ids):
""" Given a list of event_ids this method fetches the state at each
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 4df8ebdacd..59a89fad60 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -32,7 +32,8 @@ from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
+from synapse.util.metrics import Measure
from synapse.util.stringutils import to_ascii
logger = logging.getLogger(__name__)
@@ -483,6 +484,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
)
return result
+ @defer.inlineCallbacks
def get_joined_users_from_state(self, room_id, state_entry):
state_group = state_entry.state_group
if not state_group:
@@ -492,9 +494,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# To do this we set the state_group to a new object as object() != object()
state_group = object()
- return self._get_joined_users_from_context(
- room_id, state_group, state_entry.state, context=state_entry
- )
+ with Measure(self._clock, "get_joined_users_from_state"):
+ return (
+ yield self._get_joined_users_from_context(
+ room_id, state_group, state_entry.state, context=state_entry
+ )
+ )
@cachedInlineCallbacks(
num_args=2, cache_context=True, iterable=True, max_entries=100000
@@ -567,25 +572,10 @@ class RoomMemberWorkerStore(EventsWorkerStore):
missing_member_event_ids.append(event_id)
if missing_member_event_ids:
- rows = yield self._simple_select_many_batch(
- table="room_memberships",
- column="event_id",
- iterable=missing_member_event_ids,
- retcols=("user_id", "display_name", "avatar_url"),
- keyvalues={"membership": Membership.JOIN},
- batch_size=500,
- desc="_get_joined_users_from_context",
- )
-
- users_in_room.update(
- {
- to_ascii(row["user_id"]): ProfileInfo(
- avatar_url=to_ascii(row["avatar_url"]),
- display_name=to_ascii(row["display_name"]),
- )
- for row in rows
- }
+ event_to_memberships = yield self._get_joined_profiles_from_event_ids(
+ missing_member_event_ids
)
+ users_in_room.update((row for row in event_to_memberships.values() if row))
if event is not None and event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
@@ -597,6 +587,47 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return users_in_room
+ @cached(max_entries=10000)
+ def _get_joined_profile_from_event_id(self, event_id):
+ raise NotImplementedError()
+
+ @cachedList(
+ cached_method_name="_get_joined_profile_from_event_id",
+ list_name="event_ids",
+ inlineCallbacks=True,
+ )
+ def _get_joined_profiles_from_event_ids(self, event_ids):
+ """For given set of member event_ids check if they point to a join
+ event and if so return the associated user and profile info.
+
+ Args:
+ event_ids (Iterable[str]): The member event IDs to lookup
+
+ Returns:
+ Deferred[dict[str, Tuple[str, ProfileInfo]|None]]: Map from event ID
+ to `user_id` and ProfileInfo (or None if not join event).
+ """
+
+ rows = yield self._simple_select_many_batch(
+ table="room_memberships",
+ column="event_id",
+ iterable=event_ids,
+ retcols=("user_id", "display_name", "avatar_url", "event_id"),
+ keyvalues={"membership": Membership.JOIN},
+ batch_size=500,
+ desc="_get_membership_from_event_ids",
+ )
+
+ return {
+ row["event_id"]: (
+ row["user_id"],
+ ProfileInfo(
+ avatar_url=row["avatar_url"], display_name=row["display_name"]
+ ),
+ )
+ for row in rows
+ }
+
@cachedInlineCallbacks(max_entries=10000)
def is_host_joined(self, room_id, host):
if "%" in host or "_" in host:
@@ -669,6 +700,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return True
+ @defer.inlineCallbacks
def get_joined_hosts(self, room_id, state_entry):
state_group = state_entry.state_group
if not state_group:
@@ -678,9 +710,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# To do this we set the state_group to a new object as object() != object()
state_group = object()
- return self._get_joined_hosts(
- room_id, state_group, state_entry.state, state_entry=state_entry
- )
+ with Measure(self._clock, "get_joined_hosts"):
+ return (
+ yield self._get_joined_hosts(
+ room_id, state_group, state_entry.state, state_entry=state_entry
+ )
+ )
@cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
# @defer.inlineCallbacks
diff --git a/synapse/storage/schema/delta/56/unique_user_filter_index.py b/synapse/storage/schema/delta/56/unique_user_filter_index.py
new file mode 100644
index 0000000000..60031f23ca
--- /dev/null
+++ b/synapse/storage/schema/delta/56/unique_user_filter_index.py
@@ -0,0 +1,46 @@
+import logging
+
+from synapse.storage.engines import PostgresEngine
+
+logger = logging.getLogger(__name__)
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+ if isinstance(database_engine, PostgresEngine):
+ select_clause = """
+ CREATE TEMPORARY TABLE user_filters_migration AS
+ SELECT DISTINCT ON (user_id, filter_id) user_id, filter_id, filter_json
+ FROM user_filters;
+ """
+ else:
+ select_clause = """
+ CREATE TEMPORARY TABLE user_filters_migration AS
+ SELECT * FROM user_filters GROUP BY user_id, filter_id;
+ """
+ sql = (
+ """
+ BEGIN;
+ %s
+ DROP INDEX user_filters_by_user_id_filter_id;
+ DELETE FROM user_filters;
+ ALTER TABLE user_filters
+ ALTER COLUMN user_id SET NOT NULL,
+ ALTER COLUMN filter_id SET NOT NULL,
+ ALTER COLUMN filter_json SET NOT NULL;
+ INSERT INTO user_filters(user_id, filter_id, filter_json)
+ SELECT * FROM user_filters_migration;
+ DROP TABLE user_filters_migration;
+ CREATE UNIQUE INDEX user_filters_by_user_id_filter_id_unique
+ ON user_filters(user_id, filter_id);
+ END;
+ """
+ % select_clause
+ )
+ if isinstance(database_engine, PostgresEngine):
+ cur.execute(sql)
+ else:
+ cur.executescript(sql)
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+ pass
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 09190d684e..7c224cd3d9 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -332,6 +332,9 @@ class StatsStore(StateDeltasStore):
def _bulk_update_stats_delta_txn(txn):
for stats_type, stats_updates in updates.items():
for stats_id, fields in stats_updates.items():
+ logger.info(
+ "Updating %s stats for %s: %s", stats_type, stats_id, fields
+ )
self._update_stats_delta_txn(
txn,
ts=ts,
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 0910930c21..4b1bcdf23c 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -60,12 +60,14 @@ in_flight = InFlightGauge(
)
-def measure_func(name):
+def measure_func(name=None):
def wrapper(func):
+ block_name = func.__name__ if name is None else name
+
@wraps(func)
@defer.inlineCallbacks
def measured_func(self, *args, **kwargs):
- with Measure(self.clock, name):
+ with Measure(self.clock, block_name):
r = yield func(self, *args, **kwargs)
return r
|