diff --git a/changelog.d/5934.feature b/changelog.d/5934.feature
new file mode 100644
index 0000000000..eae969a52a
--- /dev/null
+++ b/changelog.d/5934.feature
@@ -0,0 +1 @@
+Redact events in the database that have been redacted for a month.
diff --git a/changelog.d/6003.misc b/changelog.d/6003.misc
new file mode 100644
index 0000000000..4152d05f87
--- /dev/null
+++ b/changelog.d/6003.misc
@@ -0,0 +1 @@
+Add opentracing span over HTTP push processing.
diff --git a/changelog.d/6005.feature b/changelog.d/6005.feature
new file mode 100644
index 0000000000..ed6491d3e4
--- /dev/null
+++ b/changelog.d/6005.feature
@@ -0,0 +1 @@
+The new Prometheus metric `synapse_build_info` exposes the Python version, OS version, and Synapse version of the running server.
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index 93c0edd8ce..9b1ae58a27 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -306,6 +306,13 @@ listeners:
#
#allow_per_room_profiles: false
+# How long to keep redacted events in unredacted form in the database. After
+# this period redacted events get replaced with their redacted form in the DB.
+#
+# Defaults to `7d`. Set to `null` to disable.
+#
+redaction_retention_period: 7d
+
## TLS ##
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 2abdef0971..c8b9fe2d0f 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -162,6 +162,16 @@ class ServerConfig(Config):
self.mau_trial_days = config.get("mau_trial_days", 0)
+ # How long to keep redacted events in the database in unredacted form
+ # before redacting them.
+ redaction_retention_period = config.get("redaction_retention_period", "7d")
+ if redaction_retention_period is not None:
+ self.redaction_retention_period = self.parse_duration(
+ redaction_retention_period
+ )
+ else:
+ self.redaction_retention_period = None
+
# Options to disable HS
self.hs_disabled = config.get("hs_disabled", False)
self.hs_disabled_message = config.get("hs_disabled_message", "")
@@ -718,6 +728,13 @@ class ServerConfig(Config):
# Defaults to 'true'.
#
#allow_per_room_profiles: false
+
+ # How long to keep redacted events in unredacted form in the database. After
+ # this period redacted events get replaced with their redacted form in the DB.
+ #
+ # Defaults to `7d`. Set to `null` to disable.
+ #
+ redaction_retention_period: 7d
"""
% locals()
)
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 488280b4a6..b5c9595cb9 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -29,11 +29,13 @@ from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricF
from twisted.internet import reactor
+import synapse
from synapse.metrics._exposition import (
MetricsResource,
generate_latest,
start_http_server,
)
+from synapse.util.versionstring import get_version_string
logger = logging.getLogger(__name__)
@@ -385,6 +387,16 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"
# finished being processed.
event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
+# Build info of the running server.
+build_info = Gauge(
+ "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]
+)
+build_info.labels(
+ " ".join([platform.python_implementation(), platform.python_version()]),
+ get_version_string(synapse),
+ " ".join([platform.system(), platform.release()]),
+).set(1)
+
last_ticked = time.time()
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index bd5d53af91..6299587808 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -22,6 +22,7 @@ from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
+from synapse.logging import opentracing
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
@@ -194,7 +195,17 @@ class HttpPusher(object):
)
for push_action in unprocessed:
- processed = yield self._process_one(push_action)
+ with opentracing.start_active_span(
+ "http-push",
+ tags={
+ "authenticated_entity": self.user_id,
+ "event_id": push_action["event_id"],
+ "app_id": self.app_id,
+ "app_display_name": self.app_display_name,
+ },
+ ):
+ processed = yield self._process_one(push_action)
+
if processed:
http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 1958afe1d7..ddf7ab6479 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -23,7 +23,7 @@ from functools import wraps
from six import iteritems, text_type
from six.moves import range
-from canonicaljson import json
+from canonicaljson import encode_canonical_json, json
from prometheus_client import Counter, Histogram
from twisted.internet import defer
@@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.events.utils import prune_event_dict
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.logging.utils import log_function
from synapse.metrics import BucketCollector
@@ -262,6 +263,14 @@ class EventsStore(
hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)
+ def _censor_redactions():
+ return run_as_background_process(
+ "_censor_redactions", self._censor_redactions
+ )
+
+ if self.hs.config.redaction_retention_period is not None:
+ hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000)
+
@defer.inlineCallbacks
def _read_forward_extremities(self):
def fetch(txn):
@@ -1549,6 +1558,98 @@ class EventsStore(
)
@defer.inlineCallbacks
+ def _censor_redactions(self):
+ """Censors all redactions older than the configured period that haven't
+ been censored yet.
+
+ By censor we mean update the event_json table with the redacted event.
+
+ Returns:
+ Deferred
+ """
+
+ if self.hs.config.redaction_retention_period is None:
+ return
+
+ max_pos = yield self.find_first_stream_ordering_after_ts(
+ self._clock.time_msec() - self.hs.config.redaction_retention_period
+ )
+
+ # We fetch all redactions that:
+ # 1. point to an event we have,
+ # 2. has a stream ordering from before the cut off, and
+ # 3. we haven't yet censored.
+ #
+ # This is limited to 100 events to ensure that we don't try and do too
+ # much at once. We'll get called again so this should eventually catch
+ # up.
+ #
+ # We use the range [-max_pos, max_pos] to handle backfilled events,
+ # which are given negative stream ordering.
+ sql = """
+ SELECT redact_event.event_id, redacts FROM redactions
+ INNER JOIN events AS redact_event USING (event_id)
+ INNER JOIN events AS original_event ON (
+ redact_event.room_id = original_event.room_id
+ AND redacts = original_event.event_id
+ )
+ WHERE NOT have_censored
+ AND ? <= redact_event.stream_ordering AND redact_event.stream_ordering <= ?
+ ORDER BY redact_event.stream_ordering ASC
+ LIMIT ?
+ """
+
+ rows = yield self._execute(
+ "_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100
+ )
+
+ updates = []
+
+ for redaction_id, event_id in rows:
+ redaction_event = yield self.get_event(redaction_id, allow_none=True)
+ original_event = yield self.get_event(
+ event_id, allow_rejected=True, allow_none=True
+ )
+
+ # The SQL above ensures that we have both the redaction and
+ # original event, so if the `get_event` calls return None it
+ # means that the redaction wasn't allowed. Either way we know that
+ # the result won't change so we mark the fact that we've checked.
+ if (
+ redaction_event
+ and original_event
+ and original_event.internal_metadata.is_redacted()
+ ):
+ # Redaction was allowed
+ pruned_json = encode_canonical_json(
+ prune_event_dict(original_event.get_dict())
+ )
+ else:
+ # Redaction wasn't allowed
+ pruned_json = None
+
+ updates.append((redaction_id, event_id, pruned_json))
+
+ def _update_censor_txn(txn):
+ for redaction_id, event_id, pruned_json in updates:
+ if pruned_json:
+ self._simple_update_one_txn(
+ txn,
+ table="event_json",
+ keyvalues={"event_id": event_id},
+ updatevalues={"json": pruned_json},
+ )
+
+ self._simple_update_one_txn(
+ txn,
+ table="redactions",
+ keyvalues={"event_id": redaction_id},
+ updatevalues={"have_censored": True},
+ )
+
+ yield self.runInteraction("_update_censor_txn", _update_censor_txn)
+
+ @defer.inlineCallbacks
def count_daily_messages(self):
"""
Returns an estimate of the number of messages sent in the last day.
diff --git a/synapse/storage/schema/delta/56/redaction_censor.sql b/synapse/storage/schema/delta/56/redaction_censor.sql
new file mode 100644
index 0000000000..fe51b02309
--- /dev/null
+++ b/synapse/storage/schema/delta/56/redaction_censor.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE redactions ADD COLUMN have_censored BOOL NOT NULL DEFAULT false;
+CREATE INDEX redactions_have_censored ON redactions(event_id) WHERE not have_censored;
diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py
index d961b81d48..deecfad9fb 100644
--- a/tests/storage/test_redaction.py
+++ b/tests/storage/test_redaction.py
@@ -17,6 +17,8 @@
from mock import Mock
+from canonicaljson import json
+
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
@@ -29,8 +31,10 @@ from tests.utils import create_room
class RedactionTestCase(unittest.HomeserverTestCase):
def make_homeserver(self, reactor, clock):
+ config = self.default_config()
+ config["redaction_retention_period"] = "30d"
return self.setup_test_homeserver(
- resource_for_federation=Mock(), http_client=None
+ resource_for_federation=Mock(), http_client=None, config=config
)
def prepare(self, reactor, clock, hs):
@@ -286,3 +290,74 @@ class RedactionTestCase(unittest.HomeserverTestCase):
self.assertEqual(
fetched.unsigned["redacted_because"].event_id, redaction_event_id2
)
+
+ def test_redact_censor(self):
+ """Test that a redacted event gets censored in the DB after a month
+ """
+
+ self.get_success(
+ self.inject_room_member(self.room1, self.u_alice, Membership.JOIN)
+ )
+
+ msg_event = self.get_success(self.inject_message(self.room1, self.u_alice, "t"))
+
+ # Check event has not been redacted:
+ event = self.get_success(self.store.get_event(msg_event.event_id))
+
+ self.assertObjectHasAttributes(
+ {
+ "type": EventTypes.Message,
+ "user_id": self.u_alice.to_string(),
+ "content": {"body": "t", "msgtype": "message"},
+ },
+ event,
+ )
+
+ self.assertFalse("redacted_because" in event.unsigned)
+
+ # Redact event
+ reason = "Because I said so"
+ self.get_success(
+ self.inject_redaction(self.room1, msg_event.event_id, self.u_alice, reason)
+ )
+
+ event = self.get_success(self.store.get_event(msg_event.event_id))
+
+ self.assertTrue("redacted_because" in event.unsigned)
+
+ self.assertObjectHasAttributes(
+ {
+ "type": EventTypes.Message,
+ "user_id": self.u_alice.to_string(),
+ "content": {},
+ },
+ event,
+ )
+
+ event_json = self.get_success(
+ self.store._simple_select_one_onecol(
+ table="event_json",
+ keyvalues={"event_id": msg_event.event_id},
+ retcol="json",
+ )
+ )
+
+ self.assert_dict(
+ {"content": {"body": "t", "msgtype": "message"}}, json.loads(event_json)
+ )
+
+ # Advance by 30 days, then advance again to ensure that the looping call
+ # for updating the stream position gets called and then the looping call
+ # for the censoring gets called.
+ self.reactor.advance(60 * 60 * 24 * 31)
+ self.reactor.advance(60 * 60 * 2)
+
+ event_json = self.get_success(
+ self.store._simple_select_one_onecol(
+ table="event_json",
+ keyvalues={"event_id": msg_event.event_id},
+ retcol="json",
+ )
+ )
+
+ self.assert_dict({"content": {}}, json.loads(event_json))
diff --git a/tests/test_metrics.py b/tests/test_metrics.py
index 2edbae5c6d..270f853d60 100644
--- a/tests/test_metrics.py
+++ b/tests/test_metrics.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
+# Copyright 2019 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,8 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
-from synapse.metrics import InFlightGauge
+from synapse.metrics import REGISTRY, InFlightGauge, generate_latest
from tests import unittest
@@ -111,3 +111,21 @@ class TestMauLimit(unittest.TestCase):
}
return results
+
+
+class BuildInfoTests(unittest.TestCase):
+ def test_get_build(self):
+ """
+ The synapse_build_info metric reports the OS version, Python version,
+ and Synapse version.
+ """
+ items = list(
+ filter(
+ lambda x: b"synapse_build_info{" in x,
+ generate_latest(REGISTRY).split(b"\n"),
+ )
+ )
+ self.assertEqual(len(items), 1)
+ self.assertTrue(b"osversion=" in items[0])
+ self.assertTrue(b"pythonversion=" in items[0])
+ self.assertTrue(b"version=" in items[0])
|