summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/config/metrics.py31
-rw-r--r--synapse/config/server.py17
-rw-r--r--synapse/handlers/identity.py8
-rw-r--r--synapse/handlers/register.py12
-rw-r--r--synapse/handlers/stats.py4
-rw-r--r--synapse/metrics/__init__.py12
-rw-r--r--synapse/push/httppusher.py13
-rw-r--r--synapse/storage/events.py103
-rw-r--r--synapse/storage/registration.py37
-rw-r--r--synapse/storage/roommember.py59
-rw-r--r--synapse/storage/schema/delta/56/redaction_censor.sql17
-rw-r--r--synapse/storage/stats.py4
12 files changed, 304 insertions, 13 deletions
diff --git a/synapse/config/metrics.py b/synapse/config/metrics.py
index 3698441963..653b990e67 100644
--- a/synapse/config/metrics.py
+++ b/synapse/config/metrics.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,6 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import attr
+
 from ._base import Config, ConfigError
 
 MISSING_SENTRY = """Missing sentry-sdk library. This is required to enable sentry
@@ -20,6 +23,18 @@ MISSING_SENTRY = """Missing sentry-sdk library. This is required to enable sentr
     """
 
 
+@attr.s
+class MetricsFlags(object):
+    known_servers = attr.ib(default=False, validator=attr.validators.instance_of(bool))
+
+    @classmethod
+    def all_off(cls):
+        """
+        Instantiate the flags with all options set to off.
+        """
+        return cls(**{x.name: False for x in attr.fields(cls)})
+
+
 class MetricsConfig(Config):
     def read_config(self, config, **kwargs):
         self.enable_metrics = config.get("enable_metrics", False)
@@ -27,6 +42,12 @@ class MetricsConfig(Config):
         self.metrics_port = config.get("metrics_port")
         self.metrics_bind_host = config.get("metrics_bind_host", "127.0.0.1")
 
+        if self.enable_metrics:
+            _metrics_config = config.get("metrics_flags") or {}
+            self.metrics_flags = MetricsFlags(**_metrics_config)
+        else:
+            self.metrics_flags = MetricsFlags.all_off()
+
         self.sentry_enabled = "sentry" in config
         if self.sentry_enabled:
             try:
@@ -58,6 +79,16 @@ class MetricsConfig(Config):
         #sentry:
         #    dsn: "..."
 
+        # Flags to enable Prometheus metrics which are not suitable to be
+        # enabled by default, either for performance reasons or limited use.
+        #
+        metrics_flags:
+            # Publish synapse_federation_known_servers, a g auge of the number of
+            # servers this homeserver knows about, including itself. May cause
+            # performance problems on large homeservers.
+            #
+            #known_servers: true
+
         # Whether or not to report anonymized homeserver usage statistics.
         """
 
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 2abdef0971..c8b9fe2d0f 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -162,6 +162,16 @@ class ServerConfig(Config):
 
         self.mau_trial_days = config.get("mau_trial_days", 0)
 
+        # How long to keep redacted events in the database in unredacted form
+        # before redacting them.
+        redaction_retention_period = config.get("redaction_retention_period", "7d")
+        if redaction_retention_period is not None:
+            self.redaction_retention_period = self.parse_duration(
+                redaction_retention_period
+            )
+        else:
+            self.redaction_retention_period = None
+
         # Options to disable HS
         self.hs_disabled = config.get("hs_disabled", False)
         self.hs_disabled_message = config.get("hs_disabled_message", "")
@@ -718,6 +728,13 @@ class ServerConfig(Config):
         # Defaults to 'true'.
         #
         #allow_per_room_profiles: false
+
+        # How long to keep redacted events in unredacted form in the database. After
+        # this period redacted events get replaced with their redacted form in the DB.
+        #
+        # Defaults to `7d`. Set to `null` to disable.
+        #
+        redaction_retention_period: 7d
         """
             % locals()
         )
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index f6d1d1717e..f0549666c3 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -144,12 +144,18 @@ class IdentityHandler(BaseHandler):
             creds
         )
 
+        sid = creds.get("sid")
+        if not sid:
+            raise SynapseError(
+                400, "No sid in three_pid_creds", errcode=Codes.MISSING_PARAM
+            )
+
         # If an id_access_token is not supplied, force usage of v1
         if id_access_token is None:
             use_v2 = False
 
         # Decide which API endpoint URLs to use
-        bind_data = {"sid": creds["sid"], "client_secret": client_secret, "mxid": mxid}
+        bind_data = {"sid": sid, "client_secret": client_secret, "mxid": mxid}
         if use_v2:
             bind_url = "https://%s/_matrix/identity/v2/3pid/bind" % (id_server,)
             bind_data["id_access_token"] = id_access_token
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 975da57ffd..06bd03b77c 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -275,16 +275,12 @@ class RegistrationHandler(BaseHandler):
         fake_requester = create_requester(user_id)
 
         # try to create the room if we're the first real user on the server. Note
-        # that an auto-generated support user is not a real user and will never be
+        # that an auto-generated support or bot user is not a real user and will never be
         # the user to create the room
         should_auto_create_rooms = False
-        is_support = yield self.store.is_support_user(user_id)
-        # There is an edge case where the first user is the support user, then
-        # the room is never created, though this seems unlikely and
-        # recoverable from given the support user being involved in the first
-        # place.
-        if self.hs.config.autocreate_auto_join_rooms and not is_support:
-            count = yield self.store.count_all_users()
+        is_real_user = yield self.store.is_real_user(user_id)
+        if self.hs.config.autocreate_auto_join_rooms and is_real_user:
+            count = yield self.store.count_real_users()
             should_auto_create_rooms = count == 1
         for r in self.hs.config.auto_join_rooms:
             logger.info("Auto-joining %s to %s", user_id, r)
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 921735edb3..3c265f3718 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -260,7 +260,9 @@ class StatsHandler(StateDeltasHandler):
                         room_stats_delta["local_users_in_room"] += delta
 
             elif typ == EventTypes.Create:
-                room_state["is_federatable"] = event_content.get("m.federate", True)
+                room_state["is_federatable"] = (
+                    event_content.get("m.federate", True) is True
+                )
                 if sender and self.is_mine_id(sender):
                     user_to_stats_deltas.setdefault(sender, Counter())[
                         "rooms_created"
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 488280b4a6..b5c9595cb9 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -29,11 +29,13 @@ from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricF
 
 from twisted.internet import reactor
 
+import synapse
 from synapse.metrics._exposition import (
     MetricsResource,
     generate_latest,
     start_http_server,
 )
+from synapse.util.versionstring import get_version_string
 
 logger = logging.getLogger(__name__)
 
@@ -385,6 +387,16 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"
 # finished being processed.
 event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
 
+# Build info of the running server.
+build_info = Gauge(
+    "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]
+)
+build_info.labels(
+    " ".join([platform.python_implementation(), platform.python_version()]),
+    get_version_string(synapse),
+    " ".join([platform.system(), platform.release()]),
+).set(1)
+
 last_ticked = time.time()
 
 
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index bd5d53af91..6299587808 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -22,6 +22,7 @@ from prometheus_client import Counter
 from twisted.internet import defer
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
+from synapse.logging import opentracing
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.push import PusherConfigException
 
@@ -194,7 +195,17 @@ class HttpPusher(object):
         )
 
         for push_action in unprocessed:
-            processed = yield self._process_one(push_action)
+            with opentracing.start_active_span(
+                "http-push",
+                tags={
+                    "authenticated_entity": self.user_id,
+                    "event_id": push_action["event_id"],
+                    "app_id": self.app_id,
+                    "app_display_name": self.app_display_name,
+                },
+            ):
+                processed = yield self._process_one(push_action)
+
             if processed:
                 http_push_processed_counter.inc()
                 self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 1958afe1d7..ddf7ab6479 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -23,7 +23,7 @@ from functools import wraps
 from six import iteritems, text_type
 from six.moves import range
 
-from canonicaljson import json
+from canonicaljson import encode_canonical_json, json
 from prometheus_client import Counter, Histogram
 
 from twisted.internet import defer
@@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
+from synapse.events.utils import prune_event_dict
 from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
 from synapse.logging.utils import log_function
 from synapse.metrics import BucketCollector
@@ -262,6 +263,14 @@ class EventsStore(
 
         hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)
 
+        def _censor_redactions():
+            return run_as_background_process(
+                "_censor_redactions", self._censor_redactions
+            )
+
+        if self.hs.config.redaction_retention_period is not None:
+            hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000)
+
     @defer.inlineCallbacks
     def _read_forward_extremities(self):
         def fetch(txn):
@@ -1549,6 +1558,98 @@ class EventsStore(
         )
 
     @defer.inlineCallbacks
+    def _censor_redactions(self):
+        """Censors all redactions older than the configured period that haven't
+        been censored yet.
+
+        By censor we mean update the event_json table with the redacted event.
+
+        Returns:
+            Deferred
+        """
+
+        if self.hs.config.redaction_retention_period is None:
+            return
+
+        max_pos = yield self.find_first_stream_ordering_after_ts(
+            self._clock.time_msec() - self.hs.config.redaction_retention_period
+        )
+
+        # We fetch all redactions that:
+        #   1. point to an event we have,
+        #   2. has a stream ordering from before the cut off, and
+        #   3. we haven't yet censored.
+        #
+        # This is limited to 100 events to ensure that we don't try and do too
+        # much at once. We'll get called again so this should eventually catch
+        # up.
+        #
+        # We use the range [-max_pos, max_pos] to handle backfilled events,
+        # which are given negative stream ordering.
+        sql = """
+            SELECT redact_event.event_id, redacts FROM redactions
+            INNER JOIN events AS redact_event USING (event_id)
+            INNER JOIN events AS original_event ON (
+                redact_event.room_id = original_event.room_id
+                AND redacts = original_event.event_id
+            )
+            WHERE NOT have_censored
+            AND ? <= redact_event.stream_ordering AND redact_event.stream_ordering <= ?
+            ORDER BY redact_event.stream_ordering ASC
+            LIMIT ?
+        """
+
+        rows = yield self._execute(
+            "_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100
+        )
+
+        updates = []
+
+        for redaction_id, event_id in rows:
+            redaction_event = yield self.get_event(redaction_id, allow_none=True)
+            original_event = yield self.get_event(
+                event_id, allow_rejected=True, allow_none=True
+            )
+
+            # The SQL above ensures that we have both the redaction and
+            # original event, so if the `get_event` calls return None it
+            # means that the redaction wasn't allowed. Either way we know that
+            # the result won't change so we mark the fact that we've checked.
+            if (
+                redaction_event
+                and original_event
+                and original_event.internal_metadata.is_redacted()
+            ):
+                # Redaction was allowed
+                pruned_json = encode_canonical_json(
+                    prune_event_dict(original_event.get_dict())
+                )
+            else:
+                # Redaction wasn't allowed
+                pruned_json = None
+
+            updates.append((redaction_id, event_id, pruned_json))
+
+        def _update_censor_txn(txn):
+            for redaction_id, event_id, pruned_json in updates:
+                if pruned_json:
+                    self._simple_update_one_txn(
+                        txn,
+                        table="event_json",
+                        keyvalues={"event_id": event_id},
+                        updatevalues={"json": pruned_json},
+                    )
+
+                self._simple_update_one_txn(
+                    txn,
+                    table="redactions",
+                    keyvalues={"event_id": redaction_id},
+                    updatevalues={"have_censored": True},
+                )
+
+        yield self.runInteraction("_update_censor_txn", _update_censor_txn)
+
+    @defer.inlineCallbacks
     def count_daily_messages(self):
         """
         Returns an estimate of the number of messages sent in the last day.
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 5138792a5f..109052fa41 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -323,6 +323,19 @@ class RegistrationWorkerStore(SQLBaseStore):
         return None
 
     @cachedInlineCallbacks()
+    def is_real_user(self, user_id):
+        """Determines if the user is a real user, ie does not have a 'user_type'.
+
+        Args:
+            user_id (str): user id to test
+
+        Returns:
+            Deferred[bool]: True if user 'user_type' is null or empty string
+        """
+        res = yield self.runInteraction("is_real_user", self.is_real_user_txn, user_id)
+        return res
+
+    @cachedInlineCallbacks()
     def is_support_user(self, user_id):
         """Determines if the user is of type UserTypes.SUPPORT
 
@@ -337,6 +350,16 @@ class RegistrationWorkerStore(SQLBaseStore):
         )
         return res
 
+    def is_real_user_txn(self, txn, user_id):
+        res = self._simple_select_one_onecol_txn(
+            txn=txn,
+            table="users",
+            keyvalues={"name": user_id},
+            retcol="user_type",
+            allow_none=True,
+        )
+        return res is None
+
     def is_support_user_txn(self, txn, user_id):
         res = self._simple_select_one_onecol_txn(
             txn=txn,
@@ -422,6 +445,20 @@ class RegistrationWorkerStore(SQLBaseStore):
         return ret
 
     @defer.inlineCallbacks
+    def count_real_users(self):
+        """Counts all users without a special user_type registered on the homeserver."""
+
+        def _count_users(txn):
+            txn.execute("SELECT COUNT(*) AS users FROM users where user_type is null")
+            rows = self.cursor_to_dict(txn)
+            if rows:
+                return rows[0]["users"]
+            return 0
+
+        ret = yield self.runInteraction("count_real_users", _count_users)
+        return ret
+
+    @defer.inlineCallbacks
     def find_next_generated_user_id_localpart(self):
         """
         Gets the localpart of the next generated user ID.
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index f8b682ebd9..4df8ebdacd 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -24,8 +24,10 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
+from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import LoggingTransaction
+from synapse.storage.engines import Sqlite3Engine
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import get_domain_from_id
 from synapse.util.async_helpers import Linearizer
@@ -74,6 +76,63 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         self._check_safe_current_state_events_membership_updated_txn(txn)
         txn.close()
 
+        if self.hs.config.metrics_flags.known_servers:
+            self._known_servers_count = 1
+            self.hs.get_clock().looping_call(
+                run_as_background_process,
+                60 * 1000,
+                "_count_known_servers",
+                self._count_known_servers,
+            )
+            self.hs.get_clock().call_later(
+                1000,
+                run_as_background_process,
+                "_count_known_servers",
+                self._count_known_servers,
+            )
+            LaterGauge(
+                "synapse_federation_known_servers",
+                "",
+                [],
+                lambda: self._known_servers_count,
+            )
+
+    @defer.inlineCallbacks
+    def _count_known_servers(self):
+        """
+        Count the servers that this server knows about.
+
+        The statistic is stored on the class for the
+        `synapse_federation_known_servers` LaterGauge to collect.
+        """
+
+        def _transact(txn):
+            if isinstance(self.database_engine, Sqlite3Engine):
+                query = """
+                    SELECT COUNT(DISTINCT substr(out.user_id, pos+1))
+                    FROM (
+                        SELECT rm.user_id as user_id, instr(rm.user_id, ':')
+                            AS pos FROM room_memberships as rm
+                        INNER JOIN current_state_events as c ON rm.event_id = c.event_id
+                        WHERE c.type = 'm.room.member'
+                    ) as out
+                """
+            else:
+                query = """
+                    SELECT COUNT(DISTINCT split_part(state_key, ':', 2))
+                    FROM current_state_events
+                    WHERE type = 'm.room.member' AND membership = 'join';
+                """
+            txn.execute(query)
+            return list(txn)[0][0]
+
+        count = yield self.runInteraction("get_known_servers", _transact)
+
+        # We always know about ourselves, even if we have nothing in
+        # room_memberships (for example, the server is new).
+        self._known_servers_count = max([count, 1])
+        return self._known_servers_count
+
     def _check_safe_current_state_events_membership_updated_txn(self, txn):
         """Checks if it is safe to assume the new current_state_events
         membership column is up to date
diff --git a/synapse/storage/schema/delta/56/redaction_censor.sql b/synapse/storage/schema/delta/56/redaction_censor.sql
new file mode 100644
index 0000000000..fe51b02309
--- /dev/null
+++ b/synapse/storage/schema/delta/56/redaction_censor.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE redactions ADD COLUMN have_censored BOOL NOT NULL DEFAULT false;
+CREATE INDEX redactions_have_censored ON redactions(event_id) WHERE not have_censored;
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 6560173c08..09190d684e 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -823,7 +823,9 @@ class StatsStore(StateDeltasStore):
             elif event.type == EventTypes.CanonicalAlias:
                 room_state["canonical_alias"] = event.content.get("alias")
             elif event.type == EventTypes.Create:
-                room_state["is_federatable"] = event.content.get("m.federate", True)
+                room_state["is_federatable"] = (
+                    event.content.get("m.federate", True) is True
+                )
 
         yield self.update_room_state(room_id, room_state)