1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
|
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING
from synapse.events.utils import prune_event_dict
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.events import encode_json
from synapse.storage.databases.main.events_worker import EventsWorkerStore
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
super().__init__(database, db_conn, hs)
def _censor_redactions():
return run_as_background_process(
"_censor_redactions", self._censor_redactions
)
if self.hs.config.redaction_retention_period is not None:
hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000)
async def _censor_redactions(self):
"""Censors all redactions older than the configured period that haven't
been censored yet.
By censor we mean update the event_json table with the redacted event.
"""
if self.hs.config.redaction_retention_period is None:
return
if not (
await self.db_pool.updates.has_completed_background_update(
"redactions_have_censored_ts_idx"
)
):
# We don't want to run this until the appropriate index has been
# created.
return
before_ts = self._clock.time_msec() - self.hs.config.redaction_retention_period
# We fetch all redactions that:
# 1. point to an event we have,
# 2. has a received_ts from before the cut off, and
# 3. we haven't yet censored.
#
# This is limited to 100 events to ensure that we don't try and do too
# much at once. We'll get called again so this should eventually catch
# up.
sql = """
SELECT redactions.event_id, redacts FROM redactions
LEFT JOIN events AS original_event ON (
redacts = original_event.event_id
)
WHERE NOT have_censored
AND redactions.received_ts <= ?
ORDER BY redactions.received_ts ASC
LIMIT ?
"""
rows = await self.db_pool.execute(
"_censor_redactions_fetch", None, sql, before_ts, 100
)
updates = []
for redaction_id, event_id in rows:
redaction_event = await self.get_event(redaction_id, allow_none=True)
original_event = await self.get_event(
event_id, allow_rejected=True, allow_none=True
)
# The SQL above ensures that we have both the redaction and
# original event, so if the `get_event` calls return None it
# means that the redaction wasn't allowed. Either way we know that
# the result won't change so we mark the fact that we've checked.
if (
redaction_event
and original_event
and original_event.internal_metadata.is_redacted()
):
# Redaction was allowed
pruned_json = encode_json(
prune_event_dict(
original_event.room_version, original_event.get_dict()
)
)
else:
# Redaction wasn't allowed
pruned_json = None
updates.append((redaction_id, event_id, pruned_json))
def _update_censor_txn(txn):
for redaction_id, event_id, pruned_json in updates:
if pruned_json:
self._censor_event_txn(txn, event_id, pruned_json)
self.db_pool.simple_update_one_txn(
txn,
table="redactions",
keyvalues={"event_id": redaction_id},
updatevalues={"have_censored": True},
)
await self.db_pool.runInteraction("_update_censor_txn", _update_censor_txn)
def _censor_event_txn(self, txn, event_id, pruned_json):
"""Censor an event by replacing its JSON in the event_json table with the
provided pruned JSON.
Args:
txn (LoggingTransaction): The database transaction.
event_id (str): The ID of the event to censor.
pruned_json (str): The pruned JSON
"""
self.db_pool.simple_update_one_txn(
txn,
table="event_json",
keyvalues={"event_id": event_id},
updatevalues={"json": pruned_json},
)
async def expire_event(self, event_id: str) -> None:
"""Retrieve and expire an event that has expired, and delete its associated
expiry timestamp. If the event can't be retrieved, delete its associated
timestamp so we don't try to expire it again in the future.
Args:
event_id: The ID of the event to delete.
"""
# Try to retrieve the event's content from the database or the event cache.
event = await self.get_event(event_id)
def delete_expired_event_txn(txn):
# Delete the expiry timestamp associated with this event from the database.
self._delete_event_expiry_txn(txn, event_id)
if not event:
# If we can't find the event, log a warning and delete the expiry date
# from the database so that we don't try to expire it again in the
# future.
logger.warning(
"Can't expire event %s because we don't have it.", event_id
)
return
# Prune the event's dict then convert it to JSON.
pruned_json = encode_json(
prune_event_dict(event.room_version, event.get_dict())
)
# Update the event_json table to replace the event's JSON with the pruned
# JSON.
self._censor_event_txn(txn, event.event_id, pruned_json)
# We need to invalidate the event cache entry for this event because we
# changed its content in the database. We can't call
# self._invalidate_cache_and_stream because self.get_event_cache isn't of the
# right type.
txn.call_after(self._get_event_cache.invalidate, (event.event_id,))
# Send that invalidation to replication so that other workers also invalidate
# the event cache.
self._send_invalidation_to_replication(
txn, "_get_event_cache", (event.event_id,)
)
await self.db_pool.runInteraction(
"delete_expired_event", delete_expired_event_txn
)
def _delete_event_expiry_txn(self, txn, event_id):
"""Delete the expiry timestamp associated with an event ID without deleting the
actual event.
Args:
txn (LoggingTransaction): The transaction to use to perform the deletion.
event_id (str): The event ID to delete the associated expiry timestamp of.
"""
return self.db_pool.simple_delete_txn(
txn=txn, table="event_expiry", keyvalues={"event_id": event_id}
)
|