1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
import logging
from typing import TYPE_CHECKING, Any, Set
from synapse.api.errors import SynapseError
from synapse.api.urls import ConsentURIBuilder
from synapse.config import ConfigError
from synapse.types import get_localpart_from_id
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class ConsentServerNotices:
"""Keeps track of whether we need to send users server_notices about
privacy policy consent, and sends one if we do.
"""
def __init__(self, hs: "HomeServer"):
self._server_notices_manager = hs.get_server_notices_manager()
self._store = hs.get_datastores().main
self._users_in_progress: Set[str] = set()
self._current_consent_version = hs.config.consent.user_consent_version
self._server_notice_content = (
hs.config.consent.user_consent_server_notice_content
)
self._send_to_guests = hs.config.consent.user_consent_server_notice_to_guests
if self._server_notice_content is not None:
if not self._server_notices_manager.is_enabled():
raise ConfigError(
"user_consent configuration requires server notices, but "
"server notices are not enabled."
)
if "body" not in self._server_notice_content:
raise ConfigError(
"user_consent server_notice_consent must contain a 'body' key."
)
self._consent_uri_builder = ConsentURIBuilder(hs.config)
async def maybe_send_server_notice_to_user(self, user_id: str) -> None:
"""Check if we need to send a notice to this user, and does so if so
Args:
user_id: user to check
"""
if self._server_notice_content is None:
# not enabled
return
# A consent version must be given.
assert self._current_consent_version is not None
# make sure we don't send two messages to the same user at once
if user_id in self._users_in_progress:
return
self._users_in_progress.add(user_id)
try:
u = await self._store.get_user_by_id(user_id)
# The user doesn't exist.
if u is None:
return
if u.is_guest and not self._send_to_guests:
# don't send to guests
return
if u.consent_version == self._current_consent_version:
# user has already consented
return
if u.consent_server_notice_sent == self._current_consent_version:
# we've already sent a notice to the user
return
# need to send a message.
try:
consent_uri = self._consent_uri_builder.build_user_consent_uri(
get_localpart_from_id(user_id)
)
content = copy_with_str_subst(
self._server_notice_content, {"consent_uri": consent_uri}
)
await self._server_notices_manager.send_notice(user_id, content)
await self._store.user_set_consent_server_notice_sent(
user_id, self._current_consent_version
)
except SynapseError as e:
logger.error("Error sending server notice about user consent: %s", e)
finally:
self._users_in_progress.remove(user_id)
def copy_with_str_subst(x: Any, substitutions: Any) -> Any:
"""Deep-copy a structure, carrying out string substitutions on any strings
Args:
x: structure to be copied
substitutions: substitutions to be made - passed into the string '%' operator
Returns:
copy of x
"""
if isinstance(x, str):
return x % substitutions
if isinstance(x, dict):
return {k: copy_with_str_subst(v, substitutions) for (k, v) in x.items()}
if isinstance(x, (list, tuple)):
return [copy_with_str_subst(y, substitutions) for y in x]
# assume it's uninterested and can be shallow-copied.
return x
|