summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/5934.feature1
-rw-r--r--changelog.d/6003.misc1
-rw-r--r--changelog.d/6005.feature1
-rw-r--r--docs/sample_config.yaml7
-rw-r--r--synapse/config/server.py17
-rw-r--r--synapse/metrics/__init__.py12
-rw-r--r--synapse/push/httppusher.py13
-rw-r--r--synapse/storage/events.py103
-rw-r--r--synapse/storage/schema/delta/56/redaction_censor.sql17
-rw-r--r--tests/storage/test_redaction.py77
-rw-r--r--tests/test_metrics.py22
11 files changed, 266 insertions, 5 deletions
diff --git a/changelog.d/5934.feature b/changelog.d/5934.feature
new file mode 100644
index 0000000000..eae969a52a
--- /dev/null
+++ b/changelog.d/5934.feature
@@ -0,0 +1 @@
+Redact events in the database that have been redacted for a month.
diff --git a/changelog.d/6003.misc b/changelog.d/6003.misc
new file mode 100644
index 0000000000..4152d05f87
--- /dev/null
+++ b/changelog.d/6003.misc
@@ -0,0 +1 @@
+Add opentracing span over HTTP push processing.
diff --git a/changelog.d/6005.feature b/changelog.d/6005.feature
new file mode 100644
index 0000000000..ed6491d3e4
--- /dev/null
+++ b/changelog.d/6005.feature
@@ -0,0 +1 @@
+The new Prometheus metric `synapse_build_info` exposes the Python version, OS version, and Synapse version of the running server.
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index 93c0edd8ce..9b1ae58a27 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -306,6 +306,13 @@ listeners:
 #
 #allow_per_room_profiles: false
 
+# How long to keep redacted events in unredacted form in the database. After
+# this period redacted events get replaced with their redacted form in the DB.
+#
+# Defaults to `7d`. Set to `null` to disable.
+#
+redaction_retention_period: 7d
+
 
 ## TLS ##
 
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 2abdef0971..c8b9fe2d0f 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -162,6 +162,16 @@ class ServerConfig(Config):
 
         self.mau_trial_days = config.get("mau_trial_days", 0)
 
+        # How long to keep redacted events in the database in unredacted form
+        # before redacting them.
+        redaction_retention_period = config.get("redaction_retention_period", "7d")
+        if redaction_retention_period is not None:
+            self.redaction_retention_period = self.parse_duration(
+                redaction_retention_period
+            )
+        else:
+            self.redaction_retention_period = None
+
         # Options to disable HS
         self.hs_disabled = config.get("hs_disabled", False)
         self.hs_disabled_message = config.get("hs_disabled_message", "")
@@ -718,6 +728,13 @@ class ServerConfig(Config):
         # Defaults to 'true'.
         #
         #allow_per_room_profiles: false
+
+        # How long to keep redacted events in unredacted form in the database. After
+        # this period redacted events get replaced with their redacted form in the DB.
+        #
+        # Defaults to `7d`. Set to `null` to disable.
+        #
+        redaction_retention_period: 7d
         """
             % locals()
         )
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 488280b4a6..b5c9595cb9 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -29,11 +29,13 @@ from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricF
 
 from twisted.internet import reactor
 
+import synapse
 from synapse.metrics._exposition import (
     MetricsResource,
     generate_latest,
     start_http_server,
 )
+from synapse.util.versionstring import get_version_string
 
 logger = logging.getLogger(__name__)
 
@@ -385,6 +387,16 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"
 # finished being processed.
 event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
 
+# Build info of the running server.
+build_info = Gauge(
+    "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]
+)
+build_info.labels(
+    " ".join([platform.python_implementation(), platform.python_version()]),
+    get_version_string(synapse),
+    " ".join([platform.system(), platform.release()]),
+).set(1)
+
 last_ticked = time.time()
 
 
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index bd5d53af91..6299587808 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -22,6 +22,7 @@ from prometheus_client import Counter
 from twisted.internet import defer
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
+from synapse.logging import opentracing
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.push import PusherConfigException
 
@@ -194,7 +195,17 @@ class HttpPusher(object):
         )
 
         for push_action in unprocessed:
-            processed = yield self._process_one(push_action)
+            with opentracing.start_active_span(
+                "http-push",
+                tags={
+                    "authenticated_entity": self.user_id,
+                    "event_id": push_action["event_id"],
+                    "app_id": self.app_id,
+                    "app_display_name": self.app_display_name,
+                },
+            ):
+                processed = yield self._process_one(push_action)
+
             if processed:
                 http_push_processed_counter.inc()
                 self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 1958afe1d7..ddf7ab6479 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -23,7 +23,7 @@ from functools import wraps
 from six import iteritems, text_type
 from six.moves import range
 
-from canonicaljson import json
+from canonicaljson import encode_canonical_json, json
 from prometheus_client import Counter, Histogram
 
 from twisted.internet import defer
@@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
+from synapse.events.utils import prune_event_dict
 from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
 from synapse.logging.utils import log_function
 from synapse.metrics import BucketCollector
@@ -262,6 +263,14 @@ class EventsStore(
 
         hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)
 
+        def _censor_redactions():
+            return run_as_background_process(
+                "_censor_redactions", self._censor_redactions
+            )
+
+        if self.hs.config.redaction_retention_period is not None:
+            hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000)
+
     @defer.inlineCallbacks
     def _read_forward_extremities(self):
         def fetch(txn):
@@ -1549,6 +1558,98 @@ class EventsStore(
         )
 
     @defer.inlineCallbacks
+    def _censor_redactions(self):
+        """Censors all redactions older than the configured period that haven't
+        been censored yet.
+
+        By censor we mean update the event_json table with the redacted event.
+
+        Returns:
+            Deferred
+        """
+
+        if self.hs.config.redaction_retention_period is None:
+            return
+
+        max_pos = yield self.find_first_stream_ordering_after_ts(
+            self._clock.time_msec() - self.hs.config.redaction_retention_period
+        )
+
+        # We fetch all redactions that:
+        #   1. point to an event we have,
+        #   2. has a stream ordering from before the cut off, and
+        #   3. we haven't yet censored.
+        #
+        # This is limited to 100 events to ensure that we don't try and do too
+        # much at once. We'll get called again so this should eventually catch
+        # up.
+        #
+        # We use the range [-max_pos, max_pos] to handle backfilled events,
+        # which are given negative stream ordering.
+        sql = """
+            SELECT redact_event.event_id, redacts FROM redactions
+            INNER JOIN events AS redact_event USING (event_id)
+            INNER JOIN events AS original_event ON (
+                redact_event.room_id = original_event.room_id
+                AND redacts = original_event.event_id
+            )
+            WHERE NOT have_censored
+            AND ? <= redact_event.stream_ordering AND redact_event.stream_ordering <= ?
+            ORDER BY redact_event.stream_ordering ASC
+            LIMIT ?
+        """
+
+        rows = yield self._execute(
+            "_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100
+        )
+
+        updates = []
+
+        for redaction_id, event_id in rows:
+            redaction_event = yield self.get_event(redaction_id, allow_none=True)
+            original_event = yield self.get_event(
+                event_id, allow_rejected=True, allow_none=True
+            )
+
+            # The SQL above ensures that we have both the redaction and
+            # original event, so if the `get_event` calls return None it
+            # means that the redaction wasn't allowed. Either way we know that
+            # the result won't change so we mark the fact that we've checked.
+            if (
+                redaction_event
+                and original_event
+                and original_event.internal_metadata.is_redacted()
+            ):
+                # Redaction was allowed
+                pruned_json = encode_canonical_json(
+                    prune_event_dict(original_event.get_dict())
+                )
+            else:
+                # Redaction wasn't allowed
+                pruned_json = None
+
+            updates.append((redaction_id, event_id, pruned_json))
+
+        def _update_censor_txn(txn):
+            for redaction_id, event_id, pruned_json in updates:
+                if pruned_json:
+                    self._simple_update_one_txn(
+                        txn,
+                        table="event_json",
+                        keyvalues={"event_id": event_id},
+                        updatevalues={"json": pruned_json},
+                    )
+
+                self._simple_update_one_txn(
+                    txn,
+                    table="redactions",
+                    keyvalues={"event_id": redaction_id},
+                    updatevalues={"have_censored": True},
+                )
+
+        yield self.runInteraction("_update_censor_txn", _update_censor_txn)
+
+    @defer.inlineCallbacks
     def count_daily_messages(self):
         """
         Returns an estimate of the number of messages sent in the last day.
diff --git a/synapse/storage/schema/delta/56/redaction_censor.sql b/synapse/storage/schema/delta/56/redaction_censor.sql
new file mode 100644
index 0000000000..fe51b02309
--- /dev/null
+++ b/synapse/storage/schema/delta/56/redaction_censor.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE redactions ADD COLUMN have_censored BOOL NOT NULL DEFAULT false;
+CREATE INDEX redactions_have_censored ON redactions(event_id) WHERE not have_censored;
diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py
index d961b81d48..deecfad9fb 100644
--- a/tests/storage/test_redaction.py
+++ b/tests/storage/test_redaction.py
@@ -17,6 +17,8 @@
 
 from mock import Mock
 
+from canonicaljson import json
+
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
@@ -29,8 +31,10 @@ from tests.utils import create_room
 
 class RedactionTestCase(unittest.HomeserverTestCase):
     def make_homeserver(self, reactor, clock):
+        config = self.default_config()
+        config["redaction_retention_period"] = "30d"
         return self.setup_test_homeserver(
-            resource_for_federation=Mock(), http_client=None
+            resource_for_federation=Mock(), http_client=None, config=config
         )
 
     def prepare(self, reactor, clock, hs):
@@ -286,3 +290,74 @@ class RedactionTestCase(unittest.HomeserverTestCase):
         self.assertEqual(
             fetched.unsigned["redacted_because"].event_id, redaction_event_id2
         )
+
+    def test_redact_censor(self):
+        """Test that a redacted event gets censored in the DB after a month
+        """
+
+        self.get_success(
+            self.inject_room_member(self.room1, self.u_alice, Membership.JOIN)
+        )
+
+        msg_event = self.get_success(self.inject_message(self.room1, self.u_alice, "t"))
+
+        # Check event has not been redacted:
+        event = self.get_success(self.store.get_event(msg_event.event_id))
+
+        self.assertObjectHasAttributes(
+            {
+                "type": EventTypes.Message,
+                "user_id": self.u_alice.to_string(),
+                "content": {"body": "t", "msgtype": "message"},
+            },
+            event,
+        )
+
+        self.assertFalse("redacted_because" in event.unsigned)
+
+        # Redact event
+        reason = "Because I said so"
+        self.get_success(
+            self.inject_redaction(self.room1, msg_event.event_id, self.u_alice, reason)
+        )
+
+        event = self.get_success(self.store.get_event(msg_event.event_id))
+
+        self.assertTrue("redacted_because" in event.unsigned)
+
+        self.assertObjectHasAttributes(
+            {
+                "type": EventTypes.Message,
+                "user_id": self.u_alice.to_string(),
+                "content": {},
+            },
+            event,
+        )
+
+        event_json = self.get_success(
+            self.store._simple_select_one_onecol(
+                table="event_json",
+                keyvalues={"event_id": msg_event.event_id},
+                retcol="json",
+            )
+        )
+
+        self.assert_dict(
+            {"content": {"body": "t", "msgtype": "message"}}, json.loads(event_json)
+        )
+
+        # Advance by 30 days, then advance again to ensure that the looping call
+        # for updating the stream position gets called and then the looping call
+        # for the censoring gets called.
+        self.reactor.advance(60 * 60 * 24 * 31)
+        self.reactor.advance(60 * 60 * 2)
+
+        event_json = self.get_success(
+            self.store._simple_select_one_onecol(
+                table="event_json",
+                keyvalues={"event_id": msg_event.event_id},
+                retcol="json",
+            )
+        )
+
+        self.assert_dict({"content": {}}, json.loads(event_json))
diff --git a/tests/test_metrics.py b/tests/test_metrics.py
index 2edbae5c6d..270f853d60 100644
--- a/tests/test_metrics.py
+++ b/tests/test_metrics.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2018 New Vector Ltd
+# Copyright 2019 Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,8 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
-from synapse.metrics import InFlightGauge
+from synapse.metrics import REGISTRY, InFlightGauge, generate_latest
 
 from tests import unittest
 
@@ -111,3 +111,21 @@ class TestMauLimit(unittest.TestCase):
             }
 
         return results
+
+
+class BuildInfoTests(unittest.TestCase):
+    def test_get_build(self):
+        """
+        The synapse_build_info metric reports the OS version, Python version,
+        and Synapse version.
+        """
+        items = list(
+            filter(
+                lambda x: b"synapse_build_info{" in x,
+                generate_latest(REGISTRY).split(b"\n"),
+            )
+        )
+        self.assertEqual(len(items), 1)
+        self.assertTrue(b"osversion=" in items[0])
+        self.assertTrue(b"pythonversion=" in items[0])
+        self.assertTrue(b"version=" in items[0])