diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/config/metrics.py | 31 | ||||
-rw-r--r-- | synapse/config/server.py | 17 | ||||
-rw-r--r-- | synapse/handlers/identity.py | 8 | ||||
-rw-r--r-- | synapse/handlers/register.py | 12 | ||||
-rw-r--r-- | synapse/handlers/stats.py | 4 | ||||
-rw-r--r-- | synapse/metrics/__init__.py | 12 | ||||
-rw-r--r-- | synapse/push/httppusher.py | 13 | ||||
-rw-r--r-- | synapse/storage/events.py | 103 | ||||
-rw-r--r-- | synapse/storage/registration.py | 37 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 59 | ||||
-rw-r--r-- | synapse/storage/schema/delta/56/redaction_censor.sql | 17 | ||||
-rw-r--r-- | synapse/storage/stats.py | 4 |
12 files changed, 304 insertions, 13 deletions
diff --git a/synapse/config/metrics.py b/synapse/config/metrics.py index 3698441963..653b990e67 100644 --- a/synapse/config/metrics.py +++ b/synapse/config/metrics.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import attr + from ._base import Config, ConfigError MISSING_SENTRY = """Missing sentry-sdk library. This is required to enable sentry @@ -20,6 +23,18 @@ MISSING_SENTRY = """Missing sentry-sdk library. This is required to enable sentr """ +@attr.s +class MetricsFlags(object): + known_servers = attr.ib(default=False, validator=attr.validators.instance_of(bool)) + + @classmethod + def all_off(cls): + """ + Instantiate the flags with all options set to off. + """ + return cls(**{x.name: False for x in attr.fields(cls)}) + + class MetricsConfig(Config): def read_config(self, config, **kwargs): self.enable_metrics = config.get("enable_metrics", False) @@ -27,6 +42,12 @@ class MetricsConfig(Config): self.metrics_port = config.get("metrics_port") self.metrics_bind_host = config.get("metrics_bind_host", "127.0.0.1") + if self.enable_metrics: + _metrics_config = config.get("metrics_flags") or {} + self.metrics_flags = MetricsFlags(**_metrics_config) + else: + self.metrics_flags = MetricsFlags.all_off() + self.sentry_enabled = "sentry" in config if self.sentry_enabled: try: @@ -58,6 +79,16 @@ class MetricsConfig(Config): #sentry: # dsn: "..." + # Flags to enable Prometheus metrics which are not suitable to be + # enabled by default, either for performance reasons or limited use. + # + metrics_flags: + # Publish synapse_federation_known_servers, a g auge of the number of + # servers this homeserver knows about, including itself. May cause + # performance problems on large homeservers. + # + #known_servers: true + # Whether or not to report anonymized homeserver usage statistics. """ diff --git a/synapse/config/server.py b/synapse/config/server.py index 2abdef0971..c8b9fe2d0f 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -162,6 +162,16 @@ class ServerConfig(Config): self.mau_trial_days = config.get("mau_trial_days", 0) + # How long to keep redacted events in the database in unredacted form + # before redacting them. + redaction_retention_period = config.get("redaction_retention_period", "7d") + if redaction_retention_period is not None: + self.redaction_retention_period = self.parse_duration( + redaction_retention_period + ) + else: + self.redaction_retention_period = None + # Options to disable HS self.hs_disabled = config.get("hs_disabled", False) self.hs_disabled_message = config.get("hs_disabled_message", "") @@ -718,6 +728,13 @@ class ServerConfig(Config): # Defaults to 'true'. # #allow_per_room_profiles: false + + # How long to keep redacted events in unredacted form in the database. After + # this period redacted events get replaced with their redacted form in the DB. + # + # Defaults to `7d`. Set to `null` to disable. + # + redaction_retention_period: 7d """ % locals() ) diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index f6d1d1717e..f0549666c3 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -144,12 +144,18 @@ class IdentityHandler(BaseHandler): creds ) + sid = creds.get("sid") + if not sid: + raise SynapseError( + 400, "No sid in three_pid_creds", errcode=Codes.MISSING_PARAM + ) + # If an id_access_token is not supplied, force usage of v1 if id_access_token is None: use_v2 = False # Decide which API endpoint URLs to use - bind_data = {"sid": creds["sid"], "client_secret": client_secret, "mxid": mxid} + bind_data = {"sid": sid, "client_secret": client_secret, "mxid": mxid} if use_v2: bind_url = "https://%s/_matrix/identity/v2/3pid/bind" % (id_server,) bind_data["id_access_token"] = id_access_token diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 975da57ffd..06bd03b77c 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -275,16 +275,12 @@ class RegistrationHandler(BaseHandler): fake_requester = create_requester(user_id) # try to create the room if we're the first real user on the server. Note - # that an auto-generated support user is not a real user and will never be + # that an auto-generated support or bot user is not a real user and will never be # the user to create the room should_auto_create_rooms = False - is_support = yield self.store.is_support_user(user_id) - # There is an edge case where the first user is the support user, then - # the room is never created, though this seems unlikely and - # recoverable from given the support user being involved in the first - # place. - if self.hs.config.autocreate_auto_join_rooms and not is_support: - count = yield self.store.count_all_users() + is_real_user = yield self.store.is_real_user(user_id) + if self.hs.config.autocreate_auto_join_rooms and is_real_user: + count = yield self.store.count_real_users() should_auto_create_rooms = count == 1 for r in self.hs.config.auto_join_rooms: logger.info("Auto-joining %s to %s", user_id, r) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 921735edb3..3c265f3718 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -260,7 +260,9 @@ class StatsHandler(StateDeltasHandler): room_stats_delta["local_users_in_room"] += delta elif typ == EventTypes.Create: - room_state["is_federatable"] = event_content.get("m.federate", True) + room_state["is_federatable"] = ( + event_content.get("m.federate", True) is True + ) if sender and self.is_mine_id(sender): user_to_stats_deltas.setdefault(sender, Counter())[ "rooms_created" diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 488280b4a6..b5c9595cb9 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -29,11 +29,13 @@ from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricF from twisted.internet import reactor +import synapse from synapse.metrics._exposition import ( MetricsResource, generate_latest, start_http_server, ) +from synapse.util.versionstring import get_version_string logger = logging.getLogger(__name__) @@ -385,6 +387,16 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name" # finished being processed. event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) +# Build info of the running server. +build_info = Gauge( + "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"] +) +build_info.labels( + " ".join([platform.python_implementation(), platform.python_version()]), + get_version_string(synapse), + " ".join([platform.system(), platform.release()]), +).set(1) + last_ticked = time.time() diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index bd5d53af91..6299587808 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -22,6 +22,7 @@ from prometheus_client import Counter from twisted.internet import defer from twisted.internet.error import AlreadyCalled, AlreadyCancelled +from synapse.logging import opentracing from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import PusherConfigException @@ -194,7 +195,17 @@ class HttpPusher(object): ) for push_action in unprocessed: - processed = yield self._process_one(push_action) + with opentracing.start_active_span( + "http-push", + tags={ + "authenticated_entity": self.user_id, + "event_id": push_action["event_id"], + "app_id": self.app_id, + "app_display_name": self.app_display_name, + }, + ): + processed = yield self._process_one(push_action) + if processed: http_push_processed_counter.inc() self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 1958afe1d7..ddf7ab6479 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -23,7 +23,7 @@ from functools import wraps from six import iteritems, text_type from six.moves import range -from canonicaljson import json +from canonicaljson import encode_canonical_json, json from prometheus_client import Counter, Histogram from twisted.internet import defer @@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 +from synapse.events.utils import prune_event_dict from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.utils import log_function from synapse.metrics import BucketCollector @@ -262,6 +263,14 @@ class EventsStore( hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000) + def _censor_redactions(): + return run_as_background_process( + "_censor_redactions", self._censor_redactions + ) + + if self.hs.config.redaction_retention_period is not None: + hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000) + @defer.inlineCallbacks def _read_forward_extremities(self): def fetch(txn): @@ -1549,6 +1558,98 @@ class EventsStore( ) @defer.inlineCallbacks + def _censor_redactions(self): + """Censors all redactions older than the configured period that haven't + been censored yet. + + By censor we mean update the event_json table with the redacted event. + + Returns: + Deferred + """ + + if self.hs.config.redaction_retention_period is None: + return + + max_pos = yield self.find_first_stream_ordering_after_ts( + self._clock.time_msec() - self.hs.config.redaction_retention_period + ) + + # We fetch all redactions that: + # 1. point to an event we have, + # 2. has a stream ordering from before the cut off, and + # 3. we haven't yet censored. + # + # This is limited to 100 events to ensure that we don't try and do too + # much at once. We'll get called again so this should eventually catch + # up. + # + # We use the range [-max_pos, max_pos] to handle backfilled events, + # which are given negative stream ordering. + sql = """ + SELECT redact_event.event_id, redacts FROM redactions + INNER JOIN events AS redact_event USING (event_id) + INNER JOIN events AS original_event ON ( + redact_event.room_id = original_event.room_id + AND redacts = original_event.event_id + ) + WHERE NOT have_censored + AND ? <= redact_event.stream_ordering AND redact_event.stream_ordering <= ? + ORDER BY redact_event.stream_ordering ASC + LIMIT ? + """ + + rows = yield self._execute( + "_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100 + ) + + updates = [] + + for redaction_id, event_id in rows: + redaction_event = yield self.get_event(redaction_id, allow_none=True) + original_event = yield self.get_event( + event_id, allow_rejected=True, allow_none=True + ) + + # The SQL above ensures that we have both the redaction and + # original event, so if the `get_event` calls return None it + # means that the redaction wasn't allowed. Either way we know that + # the result won't change so we mark the fact that we've checked. + if ( + redaction_event + and original_event + and original_event.internal_metadata.is_redacted() + ): + # Redaction was allowed + pruned_json = encode_canonical_json( + prune_event_dict(original_event.get_dict()) + ) + else: + # Redaction wasn't allowed + pruned_json = None + + updates.append((redaction_id, event_id, pruned_json)) + + def _update_censor_txn(txn): + for redaction_id, event_id, pruned_json in updates: + if pruned_json: + self._simple_update_one_txn( + txn, + table="event_json", + keyvalues={"event_id": event_id}, + updatevalues={"json": pruned_json}, + ) + + self._simple_update_one_txn( + txn, + table="redactions", + keyvalues={"event_id": redaction_id}, + updatevalues={"have_censored": True}, + ) + + yield self.runInteraction("_update_censor_txn", _update_censor_txn) + + @defer.inlineCallbacks def count_daily_messages(self): """ Returns an estimate of the number of messages sent in the last day. diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 5138792a5f..109052fa41 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -323,6 +323,19 @@ class RegistrationWorkerStore(SQLBaseStore): return None @cachedInlineCallbacks() + def is_real_user(self, user_id): + """Determines if the user is a real user, ie does not have a 'user_type'. + + Args: + user_id (str): user id to test + + Returns: + Deferred[bool]: True if user 'user_type' is null or empty string + """ + res = yield self.runInteraction("is_real_user", self.is_real_user_txn, user_id) + return res + + @cachedInlineCallbacks() def is_support_user(self, user_id): """Determines if the user is of type UserTypes.SUPPORT @@ -337,6 +350,16 @@ class RegistrationWorkerStore(SQLBaseStore): ) return res + def is_real_user_txn(self, txn, user_id): + res = self._simple_select_one_onecol_txn( + txn=txn, + table="users", + keyvalues={"name": user_id}, + retcol="user_type", + allow_none=True, + ) + return res is None + def is_support_user_txn(self, txn, user_id): res = self._simple_select_one_onecol_txn( txn=txn, @@ -422,6 +445,20 @@ class RegistrationWorkerStore(SQLBaseStore): return ret @defer.inlineCallbacks + def count_real_users(self): + """Counts all users without a special user_type registered on the homeserver.""" + + def _count_users(txn): + txn.execute("SELECT COUNT(*) AS users FROM users where user_type is null") + rows = self.cursor_to_dict(txn) + if rows: + return rows[0]["users"] + return 0 + + ret = yield self.runInteraction("count_real_users", _count_users) + return ret + + @defer.inlineCallbacks def find_next_generated_user_id_localpart(self): """ Gets the localpart of the next generated user ID. diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index f8b682ebd9..4df8ebdacd 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -24,8 +24,10 @@ from canonicaljson import json from twisted.internet import defer from synapse.api.constants import EventTypes, Membership +from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import LoggingTransaction +from synapse.storage.engines import Sqlite3Engine from synapse.storage.events_worker import EventsWorkerStore from synapse.types import get_domain_from_id from synapse.util.async_helpers import Linearizer @@ -74,6 +76,63 @@ class RoomMemberWorkerStore(EventsWorkerStore): self._check_safe_current_state_events_membership_updated_txn(txn) txn.close() + if self.hs.config.metrics_flags.known_servers: + self._known_servers_count = 1 + self.hs.get_clock().looping_call( + run_as_background_process, + 60 * 1000, + "_count_known_servers", + self._count_known_servers, + ) + self.hs.get_clock().call_later( + 1000, + run_as_background_process, + "_count_known_servers", + self._count_known_servers, + ) + LaterGauge( + "synapse_federation_known_servers", + "", + [], + lambda: self._known_servers_count, + ) + + @defer.inlineCallbacks + def _count_known_servers(self): + """ + Count the servers that this server knows about. + + The statistic is stored on the class for the + `synapse_federation_known_servers` LaterGauge to collect. + """ + + def _transact(txn): + if isinstance(self.database_engine, Sqlite3Engine): + query = """ + SELECT COUNT(DISTINCT substr(out.user_id, pos+1)) + FROM ( + SELECT rm.user_id as user_id, instr(rm.user_id, ':') + AS pos FROM room_memberships as rm + INNER JOIN current_state_events as c ON rm.event_id = c.event_id + WHERE c.type = 'm.room.member' + ) as out + """ + else: + query = """ + SELECT COUNT(DISTINCT split_part(state_key, ':', 2)) + FROM current_state_events + WHERE type = 'm.room.member' AND membership = 'join'; + """ + txn.execute(query) + return list(txn)[0][0] + + count = yield self.runInteraction("get_known_servers", _transact) + + # We always know about ourselves, even if we have nothing in + # room_memberships (for example, the server is new). + self._known_servers_count = max([count, 1]) + return self._known_servers_count + def _check_safe_current_state_events_membership_updated_txn(self, txn): """Checks if it is safe to assume the new current_state_events membership column is up to date diff --git a/synapse/storage/schema/delta/56/redaction_censor.sql b/synapse/storage/schema/delta/56/redaction_censor.sql new file mode 100644 index 0000000000..fe51b02309 --- /dev/null +++ b/synapse/storage/schema/delta/56/redaction_censor.sql @@ -0,0 +1,17 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE redactions ADD COLUMN have_censored BOOL NOT NULL DEFAULT false; +CREATE INDEX redactions_have_censored ON redactions(event_id) WHERE not have_censored; diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 6560173c08..09190d684e 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -823,7 +823,9 @@ class StatsStore(StateDeltasStore): elif event.type == EventTypes.CanonicalAlias: room_state["canonical_alias"] = event.content.get("alias") elif event.type == EventTypes.Create: - room_state["is_federatable"] = event.content.get("m.federate", True) + room_state["is_federatable"] = ( + event.content.get("m.federate", True) is True + ) yield self.update_room_state(room_id, room_state) |