1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
|
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from signedjson.sign import sign_json
from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.types import UserID
from ._base import client_patterns
logger = logging.getLogger(__name__)
class UserDirectorySearchRestServlet(RestServlet):
PATTERNS = client_patterns("/user_directory/search$")
def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer): server
"""
super(UserDirectorySearchRestServlet, self).__init__()
self.hs = hs
self.auth = hs.get_auth()
self.user_directory_handler = hs.get_user_directory_handler()
self.http_client = hs.get_simple_http_client()
@defer.inlineCallbacks
def on_POST(self, request):
"""Searches for users in directory
Returns:
dict of the form::
{
"limited": <bool>, # whether there were more results or not
"results": [ # Ordered by best match first
{
"user_id": <user_id>,
"display_name": <display_name>,
"avatar_url": <avatar_url>
}
]
}
"""
requester = yield self.auth.get_user_by_req(request, allow_guest=False)
user_id = requester.user.to_string()
if not self.hs.config.user_directory_search_enabled:
return 200, {"limited": False, "results": []}
body = parse_json_object_from_request(request)
if self.hs.config.user_directory_defer_to_id_server:
signed_body = sign_json(
body, self.hs.hostname, self.hs.config.signing_key[0]
)
url = "%s/_matrix/identity/api/v1/user_directory/search" % (
self.hs.config.user_directory_defer_to_id_server,
)
resp = yield self.http_client.post_json_get_json(url, signed_body)
defer.returnValue((200, resp))
limit = body.get("limit", 10)
limit = min(limit, 50)
try:
search_term = body["search_term"]
except Exception:
raise SynapseError(400, "`search_term` is required field")
results = yield self.user_directory_handler.search_users(
user_id, search_term, limit
)
return 200, results
class UserInfoServlet(RestServlet):
"""
GET /user/{user_id}/info HTTP/1.1
"""
PATTERNS = client_patterns("/user/(?P<user_id>[^/]*)/info$")
def __init__(self, hs):
super(UserInfoServlet, self).__init__()
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
self.clock = hs.get_clock()
self.transport_layer = hs.get_federation_transport_client()
registry = hs.get_federation_registry()
if not registry.query_handlers.get("user_info"):
registry.register_query_handler("user_info", self._on_federation_query)
@defer.inlineCallbacks
def on_GET(self, request, user_id):
# Ensure the user is authenticated
yield self.auth.get_user_by_req(request, allow_guest=False)
user = UserID.from_string(user_id)
if not self.hs.is_mine(user):
# Attempt to make a federation request to the server that owns this user
args = {"user_id": user_id}
res = yield self.transport_layer.make_query(
user.domain, "user_info", args, retry_on_dns_fail=True
)
defer.returnValue((200, res))
res = yield self._get_user_info(user_id)
defer.returnValue((200, res))
@defer.inlineCallbacks
def _on_federation_query(self, args):
"""Called when a request for user information appears over federation
Args:
args (dict): Dictionary of query arguments provided by the request
Returns:
Deferred[dict]: Deactivation and expiration information for a given user
"""
user_id = args.get("user_id")
if not user_id:
raise SynapseError(400, "user_id not provided")
user = UserID.from_string(user_id)
if not self.hs.is_mine(user):
raise SynapseError(400, "User is not hosted on this homeserver")
res = yield self._get_user_info(user_id)
defer.returnValue(res)
@defer.inlineCallbacks
def _get_user_info(self, user_id):
"""Retrieve information about a given user
Args:
user_id (str): The User ID of a given user on this homeserver
Returns:
Deferred[dict]: Deactivation and expiration information for a given user
"""
# Check whether user is deactivated
is_deactivated = yield self.store.get_user_deactivated_status(user_id)
# Check whether user is expired
expiration_ts = yield self.store.get_expiration_ts_for_user(user_id)
is_expired = (
expiration_ts is not None and self.clock.time_msec() >= expiration_ts
)
res = {"expired": is_expired, "deactivated": is_deactivated}
defer.returnValue(res)
def register_servlets(hs, http_server):
UserDirectorySearchRestServlet(hs).register(http_server)
UserInfoServlet(hs).register(http_server)
|