1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Callable, Union
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.types import Requester, StateMap
class ThirdPartyEventRules:
"""Allows server admins to provide a Python module implementing an extra
set of rules to apply when processing events.
This is designed to help admins of closed federations with enforcing custom
behaviours.
"""
def __init__(self, hs):
self.third_party_rules = None
self.store = hs.get_datastore()
module = None
config = None
if hs.config.third_party_event_rules:
module, config = hs.config.third_party_event_rules
if module is not None:
self.third_party_rules = module(
config=config,
module_api=hs.get_module_api(),
)
async def check_event_allowed(
self, event: EventBase, context: EventContext
) -> Union[bool, dict]:
"""Check if a provided event should be allowed in the given context.
The module can return:
* True: the event is allowed.
* False: the event is not allowed, and should be rejected with M_FORBIDDEN.
* a dict: replacement event data.
Args:
event: The event to be checked.
context: The context of the event.
Returns:
The result from the ThirdPartyRules module, as above
"""
if self.third_party_rules is None:
return True
prev_state_ids = await context.get_prev_state_ids()
# Retrieve the state events from the database.
events = await self.store.get_events(prev_state_ids.values())
state_events = {(ev.type, ev.state_key): ev for ev in events.values()}
# Ensure that the event is frozen, to make sure that the module is not tempted
# to try to modify it. Any attempt to modify it at this point will invalidate
# the hashes and signatures.
event.freeze()
return await self.third_party_rules.check_event_allowed(event, state_events)
async def on_create_room(
self, requester: Requester, config: dict, is_requester_admin: bool
) -> bool:
"""Intercept requests to create room to allow, deny or update the
request config.
Args:
requester
config: The creation config from the client.
is_requester_admin: If the requester is an admin
Returns:
Whether room creation is allowed or denied.
"""
if self.third_party_rules is None:
return True
ret = await self.third_party_rules.on_create_room(
requester, config, is_requester_admin
)
return ret
async def check_threepid_can_be_invited(
self, medium: str, address: str, room_id: str
) -> bool:
"""Check if a provided 3PID can be invited in the given room.
Args:
medium: The 3PID's medium.
address: The 3PID's address.
room_id: The room we want to invite the threepid to.
Returns:
True if the 3PID can be invited, False if not.
"""
if self.third_party_rules is None:
return True
state_events = await self._get_state_map_for_room(room_id)
ret = await self.third_party_rules.check_threepid_can_be_invited(
medium, address, state_events
)
return ret
async def check_visibility_can_be_modified(
self, room_id: str, new_visibility: str
) -> bool:
"""Check if a room is allowed to be published to, or removed from, the public room
list.
Args:
room_id: The ID of the room.
new_visibility: The new visibility state. Either "public" or "private".
Returns:
True if the room's visibility can be modified, False if not.
"""
if self.third_party_rules is None:
return True
check_func = getattr(
self.third_party_rules, "check_visibility_can_be_modified", None
)
if not check_func or not isinstance(check_func, Callable):
return True
state_events = await self._get_state_map_for_room(room_id)
return await check_func(room_id, state_events, new_visibility)
async def _get_state_map_for_room(self, room_id: str) -> StateMap[EventBase]:
"""Given a room ID, return the state events of that room.
Args:
room_id: The ID of the room.
Returns:
A dict mapping (event type, state key) to state event.
"""
state_ids = await self.store.get_filtered_current_state_ids(room_id)
room_state_events = await self.store.get_events(state_ids.values())
state_events = {}
for key, event_id in state_ids.items():
state_events[key] = room_state_events[event_id]
return state_events
|