summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/sender/__init__.py11
-rw-r--r--synapse/handlers/federation.py9
-rw-r--r--synapse/handlers/stats.py1
-rw-r--r--synapse/rest/media/v1/_base.py2
-rw-r--r--synapse/rest/media/v1/thumbnailer.py14
-rw-r--r--synapse/state/__init__.py24
-rw-r--r--synapse/storage/roommember.py85
-rw-r--r--synapse/storage/schema/delta/56/unique_user_filter_index.py46
-rw-r--r--synapse/storage/stats.py3
-rw-r--r--synapse/util/metrics.py6
10 files changed, 157 insertions, 44 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index d46f4aaeb1..2b2ee8612a 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -38,7 +38,7 @@ from synapse.metrics import (
     events_processed_counter,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util.metrics import measure_func
+from synapse.util.metrics import Measure, measure_func
 
 logger = logging.getLogger(__name__)
 
@@ -183,8 +183,8 @@ class FederationSender(object):
                         # Otherwise if the last member on a server in a room is
                         # banned then it won't receive the event because it won't
                         # be in the room after the ban.
-                        destinations = yield self.state.get_current_hosts_in_room(
-                            event.room_id, latest_event_ids=event.prev_event_ids()
+                        destinations = yield self.state.get_hosts_in_room_at_events(
+                            event.room_id, event_ids=event.prev_event_ids()
                         )
                     except Exception:
                         logger.exception(
@@ -207,8 +207,9 @@ class FederationSender(object):
 
                 @defer.inlineCallbacks
                 def handle_room_events(events):
-                    for event in events:
-                        yield handle_event(event)
+                    with Measure(self.clock, "handle_room_events"):
+                        for event in events:
+                            yield handle_event(event)
 
                 events_by_room = {}
                 for event in events:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f72b81d419..50fc0fde2a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2570,7 +2570,7 @@ class FederationHandler(BaseHandler):
         )
 
         try:
-            self.auth.check_from_context(room_version, event, context)
+            yield self.auth.check_from_context(room_version, event, context)
         except AuthError as e:
             logger.warn("Denying third party invite %r because %s", event, e)
             raise e
@@ -2599,7 +2599,12 @@ class FederationHandler(BaseHandler):
                 original_invite_id, allow_none=True
             )
         if original_invite:
-            display_name = original_invite.content["display_name"]
+            # If the m.room.third_party_invite event's content is empty, it means the
+            # invite has been revoked. In this case, we don't have to raise an error here
+            # because the auth check will fail on the invite (because it's not able to
+            # fetch public keys from the m.room.third_party_invite event's content, which
+            # is empty).
+            display_name = original_invite.content.get("display_name")
             event_dict["content"]["third_party_invite"]["display_name"] = display_name
         else:
             logger.info(
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index cbac7c347a..c62b113115 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -293,6 +293,7 @@ class StatsHandler(StateDeltasHandler):
                 room_state["guest_access"] = event_content.get("guest_access")
 
         for room_id, state in room_to_state_updates.items():
+            logger.info("Updating room_stats_state for %s: %s", room_id, state)
             yield self.store.update_room_state(room_id, state)
 
         return room_to_stats_deltas, user_to_stats_deltas
diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py
index 5fefee4dde..65bbf00073 100644
--- a/synapse/rest/media/v1/_base.py
+++ b/synapse/rest/media/v1/_base.py
@@ -195,7 +195,7 @@ def respond_with_responder(request, responder, media_type, file_size, upload_nam
         respond_404(request)
         return
 
-    logger.debug("Responding to media request with responder %s")
+    logger.debug("Responding to media request with responder %s", responder)
     add_file_headers(request, media_type, file_size, upload_name)
     try:
         with responder:
diff --git a/synapse/rest/media/v1/thumbnailer.py b/synapse/rest/media/v1/thumbnailer.py
index c995d7e043..8cf415e29d 100644
--- a/synapse/rest/media/v1/thumbnailer.py
+++ b/synapse/rest/media/v1/thumbnailer.py
@@ -82,13 +82,21 @@ class Thumbnailer(object):
         else:
             return (max_height * self.width) // self.height, max_height
 
+    def _resize(self, width, height):
+        # 1-bit or 8-bit color palette images need converting to RGB
+        # otherwise they will be scaled using nearest neighbour which
+        # looks awful
+        if self.image.mode in ["1", "P"]:
+            self.image = self.image.convert("RGB")
+        return self.image.resize((width, height), Image.ANTIALIAS)
+
     def scale(self, width, height, output_type):
         """Rescales the image to the given dimensions.
 
         Returns:
             BytesIO: the bytes of the encoded image ready to be written to disk
         """
-        scaled = self.image.resize((width, height), Image.ANTIALIAS)
+        scaled = self._resize(width, height)
         return self._encode_image(scaled, output_type)
 
     def crop(self, width, height, output_type):
@@ -107,13 +115,13 @@ class Thumbnailer(object):
         """
         if width * self.height > height * self.width:
             scaled_height = (width * self.height) // self.width
-            scaled_image = self.image.resize((width, scaled_height), Image.ANTIALIAS)
+            scaled_image = self._resize(width, scaled_height)
             crop_top = (scaled_height - height) // 2
             crop_bottom = height + crop_top
             cropped = scaled_image.crop((0, crop_top, width, crop_bottom))
         else:
             scaled_width = (height * self.width) // self.height
-            scaled_image = self.image.resize((scaled_width, height), Image.ANTIALIAS)
+            scaled_image = self._resize(scaled_width, height)
             crop_left = (scaled_width - width) // 2
             crop_right = width + crop_left
             cropped = scaled_image.crop((crop_left, 0, crop_right, height))
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 2b0f4c79ee..dc9f5a9008 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -33,7 +33,7 @@ from synapse.state import v1, v2
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches import get_cache_factor_for
 from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.metrics import Measure
+from synapse.util.metrics import Measure, measure_func
 
 logger = logging.getLogger(__name__)
 
@@ -191,11 +191,22 @@ class StateHandler(object):
         return joined_users
 
     @defer.inlineCallbacks
-    def get_current_hosts_in_room(self, room_id, latest_event_ids=None):
-        if not latest_event_ids:
-            latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
-        logger.debug("calling resolve_state_groups from get_current_hosts_in_room")
-        entry = yield self.resolve_state_groups_for_events(room_id, latest_event_ids)
+    def get_current_hosts_in_room(self, room_id):
+        event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
+        return (yield self.get_hosts_in_room_at_events(room_id, event_ids))
+
+    @defer.inlineCallbacks
+    def get_hosts_in_room_at_events(self, room_id, event_ids):
+        """Get the hosts that were in a room at the given event ids
+
+        Args:
+            room_id (str):
+            event_ids (list[str]):
+
+        Returns:
+            Deferred[list[str]]: the hosts in the room at the given events
+        """
+        entry = yield self.resolve_state_groups_for_events(room_id, event_ids)
         joined_hosts = yield self.store.get_joined_hosts(room_id, entry)
         return joined_hosts
 
@@ -344,6 +355,7 @@ class StateHandler(object):
 
         return context
 
+    @measure_func()
     @defer.inlineCallbacks
     def resolve_state_groups_for_events(self, room_id, event_ids):
         """ Given a list of event_ids this method fetches the state at each
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 4df8ebdacd..59a89fad60 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -32,7 +32,8 @@ from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import get_domain_from_id
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches import intern_string
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
+from synapse.util.metrics import Measure
 from synapse.util.stringutils import to_ascii
 
 logger = logging.getLogger(__name__)
@@ -483,6 +484,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         )
         return result
 
+    @defer.inlineCallbacks
     def get_joined_users_from_state(self, room_id, state_entry):
         state_group = state_entry.state_group
         if not state_group:
@@ -492,9 +494,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             # To do this we set the state_group to a new object as object() != object()
             state_group = object()
 
-        return self._get_joined_users_from_context(
-            room_id, state_group, state_entry.state, context=state_entry
-        )
+        with Measure(self._clock, "get_joined_users_from_state"):
+            return (
+                yield self._get_joined_users_from_context(
+                    room_id, state_group, state_entry.state, context=state_entry
+                )
+            )
 
     @cachedInlineCallbacks(
         num_args=2, cache_context=True, iterable=True, max_entries=100000
@@ -567,25 +572,10 @@ class RoomMemberWorkerStore(EventsWorkerStore):
                 missing_member_event_ids.append(event_id)
 
         if missing_member_event_ids:
-            rows = yield self._simple_select_many_batch(
-                table="room_memberships",
-                column="event_id",
-                iterable=missing_member_event_ids,
-                retcols=("user_id", "display_name", "avatar_url"),
-                keyvalues={"membership": Membership.JOIN},
-                batch_size=500,
-                desc="_get_joined_users_from_context",
-            )
-
-            users_in_room.update(
-                {
-                    to_ascii(row["user_id"]): ProfileInfo(
-                        avatar_url=to_ascii(row["avatar_url"]),
-                        display_name=to_ascii(row["display_name"]),
-                    )
-                    for row in rows
-                }
+            event_to_memberships = yield self._get_joined_profiles_from_event_ids(
+                missing_member_event_ids
             )
+            users_in_room.update((row for row in event_to_memberships.values() if row))
 
         if event is not None and event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
@@ -597,6 +587,47 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         return users_in_room
 
+    @cached(max_entries=10000)
+    def _get_joined_profile_from_event_id(self, event_id):
+        raise NotImplementedError()
+
+    @cachedList(
+        cached_method_name="_get_joined_profile_from_event_id",
+        list_name="event_ids",
+        inlineCallbacks=True,
+    )
+    def _get_joined_profiles_from_event_ids(self, event_ids):
+        """For given set of member event_ids check if they point to a join
+        event and if so return the associated user and profile info.
+
+        Args:
+            event_ids (Iterable[str]): The member event IDs to lookup
+
+        Returns:
+            Deferred[dict[str, Tuple[str, ProfileInfo]|None]]: Map from event ID
+            to `user_id` and ProfileInfo (or None if not join event).
+        """
+
+        rows = yield self._simple_select_many_batch(
+            table="room_memberships",
+            column="event_id",
+            iterable=event_ids,
+            retcols=("user_id", "display_name", "avatar_url", "event_id"),
+            keyvalues={"membership": Membership.JOIN},
+            batch_size=500,
+            desc="_get_membership_from_event_ids",
+        )
+
+        return {
+            row["event_id"]: (
+                row["user_id"],
+                ProfileInfo(
+                    avatar_url=row["avatar_url"], display_name=row["display_name"]
+                ),
+            )
+            for row in rows
+        }
+
     @cachedInlineCallbacks(max_entries=10000)
     def is_host_joined(self, room_id, host):
         if "%" in host or "_" in host:
@@ -669,6 +700,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         return True
 
+    @defer.inlineCallbacks
     def get_joined_hosts(self, room_id, state_entry):
         state_group = state_entry.state_group
         if not state_group:
@@ -678,9 +710,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             # To do this we set the state_group to a new object as object() != object()
             state_group = object()
 
-        return self._get_joined_hosts(
-            room_id, state_group, state_entry.state, state_entry=state_entry
-        )
+        with Measure(self._clock, "get_joined_hosts"):
+            return (
+                yield self._get_joined_hosts(
+                    room_id, state_group, state_entry.state, state_entry=state_entry
+                )
+            )
 
     @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
     # @defer.inlineCallbacks
diff --git a/synapse/storage/schema/delta/56/unique_user_filter_index.py b/synapse/storage/schema/delta/56/unique_user_filter_index.py
new file mode 100644
index 0000000000..60031f23ca
--- /dev/null
+++ b/synapse/storage/schema/delta/56/unique_user_filter_index.py
@@ -0,0 +1,46 @@
+import logging
+
+from synapse.storage.engines import PostgresEngine
+
+logger = logging.getLogger(__name__)
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+    if isinstance(database_engine, PostgresEngine):
+        select_clause = """
+        CREATE TEMPORARY TABLE user_filters_migration AS
+            SELECT DISTINCT ON (user_id, filter_id) user_id, filter_id, filter_json
+            FROM user_filters;
+        """
+    else:
+        select_clause = """
+        CREATE TEMPORARY TABLE user_filters_migration AS
+            SELECT * FROM user_filters GROUP BY user_id, filter_id;
+        """
+    sql = (
+        """
+        BEGIN;
+            %s
+            DROP INDEX user_filters_by_user_id_filter_id;
+            DELETE FROM user_filters;
+            ALTER TABLE user_filters
+               ALTER COLUMN user_id SET NOT NULL,
+               ALTER COLUMN filter_id SET NOT NULL,
+               ALTER COLUMN filter_json SET NOT NULL;
+            INSERT INTO user_filters(user_id, filter_id, filter_json)
+                SELECT * FROM user_filters_migration;
+            DROP TABLE user_filters_migration;
+            CREATE UNIQUE INDEX user_filters_by_user_id_filter_id_unique
+                ON user_filters(user_id, filter_id);
+        END;
+    """
+        % select_clause
+    )
+    if isinstance(database_engine, PostgresEngine):
+        cur.execute(sql)
+    else:
+        cur.executescript(sql)
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+    pass
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 09190d684e..7c224cd3d9 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -332,6 +332,9 @@ class StatsStore(StateDeltasStore):
         def _bulk_update_stats_delta_txn(txn):
             for stats_type, stats_updates in updates.items():
                 for stats_id, fields in stats_updates.items():
+                    logger.info(
+                        "Updating %s stats for %s: %s", stats_type, stats_id, fields
+                    )
                     self._update_stats_delta_txn(
                         txn,
                         ts=ts,
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 0910930c21..4b1bcdf23c 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -60,12 +60,14 @@ in_flight = InFlightGauge(
 )
 
 
-def measure_func(name):
+def measure_func(name=None):
     def wrapper(func):
+        block_name = func.__name__ if name is None else name
+
         @wraps(func)
         @defer.inlineCallbacks
         def measured_func(self, *args, **kwargs):
-            with Measure(self.clock, name):
+            with Measure(self.clock, block_name):
                 r = yield func(self, *args, **kwargs)
             return r