From edbcb4152bd093ce2d77d44babb0e61de5362c5d Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 13 Sep 2016 00:02:39 +0100 Subject: make device IDs more useful for human disambiguation --- synapse/handlers/device.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 8d630c6b1a..aa68755936 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -58,7 +58,7 @@ class DeviceHandler(BaseHandler): attempts = 0 while attempts < 5: try: - device_id = stringutils.random_string_with_symbols(16) + device_id = stringutils.random_string(10).upper() yield self.store.store_device( user_id=user_id, device_id=device_id, -- cgit 1.4.1 From 18ab019a4a4f046e82abb822dcf6556613f88a03 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Sep 2016 11:35:35 +0100 Subject: Move the E2E key handling into the e2e handler --- synapse/handlers/e2e_keys.py | 105 +++++++++++++++++++++++++++- synapse/rest/client/v2_alpha/keys.py | 128 +++++------------------------------ 2 files changed, 118 insertions(+), 115 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 5bfd700931..60a1acc6f4 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -13,9 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json +import ujson as json import logging +from canonicaljson import encode_canonical_json from twisted.internet import defer from synapse.api.errors import SynapseError, CodeMessageException @@ -29,7 +30,9 @@ class E2eKeysHandler(object): def __init__(self, hs): self.store = hs.get_datastore() self.federation = hs.get_replication_layer() + self.device_handler = hs.get_device_handler() self.is_mine_id = hs.is_mine_id + self.clock = hs.get_clock() # doesn't really work as part of the generic query API, because the # query request requires an object POST, but we abuse the @@ -103,9 +106,9 @@ class E2eKeysHandler(object): for destination in remote_queries ])) - defer.returnValue((200, { + defer.returnValue({ "device_keys": results, "failures": failures, - })) + }) @defer.inlineCallbacks def query_local_devices(self, query): @@ -159,3 +162,99 @@ class E2eKeysHandler(object): device_keys_query = query_body.get("device_keys", {}) res = yield self.query_local_devices(device_keys_query) defer.returnValue({"device_keys": res}) + + @defer.inlineCallbacks + def claim_one_time_keys(self, query, timeout): + local_query = [] + remote_queries = {} + + for user_id, device_keys in query.get("one_time_keys", {}).items(): + if self.is_mine_id(user_id): + for device_id, algorithm in device_keys.items(): + local_query.append((user_id, device_id, algorithm)) + else: + domain = get_domain_from_id(user_id) + remote_queries.setdefault(domain, {})[user_id] = device_keys + + results = yield self.store.claim_e2e_one_time_keys(local_query) + + json_result = {} + failures = {} + for user_id, device_keys in results.items(): + for device_id, keys in device_keys.items(): + for key_id, json_bytes in keys.items(): + json_result.setdefault(user_id, {})[device_id] = { + key_id: json.loads(json_bytes) + } + + @defer.inlineCallbacks + def claim_client_keys(destination): + device_keys = remote_queries[destination] + try: + remote_result = yield self.federation.claim_client_keys( + destination, + {"one_time_keys": device_keys}, + timeout=timeout + ) + for user_id, keys in remote_result["one_time_keys"].items(): + if user_id in device_keys: + json_result[user_id] = keys + except CodeMessageException as e: + failures[destination] = { + "status": e.code, "message": e.message + } + + yield preserve_context_over_deferred(defer.gatherResults([ + preserve_fn(claim_client_keys)(destination) + for destination in remote_queries + ])) + + defer.returnValue({ + "one_time_keys": json_result, + "failures": failures + }) + + @defer.inlineCallbacks + def upload_keys_for_user(self, user_id, device_id, keys): + time_now = self.clock.time_msec() + + # TODO: Validate the JSON to make sure it has the right keys. + device_keys = keys.get("device_keys", None) + if device_keys: + logger.info( + "Updating device_keys for device %r for user %s at %d", + device_id, user_id, time_now + ) + # TODO: Sign the JSON with the server key + yield self.store.set_e2e_device_keys( + user_id, device_id, time_now, + encode_canonical_json(device_keys) + ) + + one_time_keys = keys.get("one_time_keys", None) + if one_time_keys: + logger.info( + "Adding %d one_time_keys for device %r for user %r at %d", + len(one_time_keys), device_id, user_id, time_now + ) + key_list = [] + for key_id, key_json in one_time_keys.items(): + algorithm, key_id = key_id.split(":") + key_list.append(( + algorithm, key_id, encode_canonical_json(key_json) + )) + + yield self.store.add_e2e_one_time_keys( + user_id, device_id, time_now, key_list + ) + + # the device should have been registered already, but it may have been + # deleted due to a race with a DELETE request. Or we may be using an + # old access_token without an associated device_id. Either way, we + # need to double-check the device is registered to avoid ending up with + # keys without a corresponding device. + self.device_handler.check_device_registered(user_id, device_id) + + result = yield self.store.count_e2e_one_time_keys(user_id, device_id) + + defer.returnValue({"one_time_key_counts": result}) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 8f05727652..f185f9a774 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -15,16 +15,12 @@ import logging -import simplejson as json -from canonicaljson import encode_canonical_json from twisted.internet import defer -from synapse.api.errors import SynapseError, CodeMessageException +from synapse.api.errors import SynapseError from synapse.http.servlet import ( RestServlet, parse_json_object_from_request, parse_integer ) -from synapse.types import get_domain_from_id -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from ._base import client_v2_patterns logger = logging.getLogger(__name__) @@ -64,17 +60,13 @@ class KeyUploadServlet(RestServlet): hs (synapse.server.HomeServer): server """ super(KeyUploadServlet, self).__init__() - self.store = hs.get_datastore() - self.clock = hs.get_clock() self.auth = hs.get_auth() - self.device_handler = hs.get_device_handler() + self.e2e_keys_handler = hs.get_e2e_keys_handler() @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request) - user_id = requester.user.to_string() - body = parse_json_object_from_request(request) if device_id is not None: @@ -94,47 +86,10 @@ class KeyUploadServlet(RestServlet): "To upload keys, you must pass device_id when authenticating" ) - time_now = self.clock.time_msec() - - # TODO: Validate the JSON to make sure it has the right keys. - device_keys = body.get("device_keys", None) - if device_keys: - logger.info( - "Updating device_keys for device %r for user %s at %d", - device_id, user_id, time_now - ) - # TODO: Sign the JSON with the server key - yield self.store.set_e2e_device_keys( - user_id, device_id, time_now, - encode_canonical_json(device_keys) - ) - - one_time_keys = body.get("one_time_keys", None) - if one_time_keys: - logger.info( - "Adding %d one_time_keys for device %r for user %r at %d", - len(one_time_keys), device_id, user_id, time_now - ) - key_list = [] - for key_id, key_json in one_time_keys.items(): - algorithm, key_id = key_id.split(":") - key_list.append(( - algorithm, key_id, encode_canonical_json(key_json) - )) - - yield self.store.add_e2e_one_time_keys( - user_id, device_id, time_now, key_list - ) - - # the device should have been registered already, but it may have been - # deleted due to a race with a DELETE request. Or we may be using an - # old access_token without an associated device_id. Either way, we - # need to double-check the device is registered to avoid ending up with - # keys without a corresponding device. - self.device_handler.check_device_registered(user_id, device_id) - - result = yield self.store.count_e2e_one_time_keys(user_id, device_id) - defer.returnValue((200, {"one_time_key_counts": result})) + result = yield self.e2e_keys_handler.upload_keys_for_user( + user_id, device_id, body + ) + defer.returnValue((200, result)) class KeyQueryServlet(RestServlet): @@ -199,7 +154,7 @@ class KeyQueryServlet(RestServlet): timeout = parse_integer(request, "timeout", 10 * 1000) body = parse_json_object_from_request(request) result = yield self.e2e_keys_handler.query_devices(body, timeout) - defer.returnValue(result) + defer.returnValue((200, result)) @defer.inlineCallbacks def on_GET(self, request, user_id, device_id): @@ -212,7 +167,7 @@ class KeyQueryServlet(RestServlet): {"device_keys": {user_id: device_ids}}, timeout, ) - defer.returnValue(result) + defer.returnValue((200, result)) class OneTimeKeyServlet(RestServlet): @@ -244,80 +199,29 @@ class OneTimeKeyServlet(RestServlet): def __init__(self, hs): super(OneTimeKeyServlet, self).__init__() - self.store = hs.get_datastore() self.auth = hs.get_auth() - self.clock = hs.get_clock() - self.federation = hs.get_replication_layer() - self.is_mine_id = hs.is_mine_id + self.e2e_keys_handler = hs.get_e2e_keys_handler() @defer.inlineCallbacks def on_GET(self, request, user_id, device_id, algorithm): yield self.auth.get_user_by_req(request) timeout = parse_integer(request, "timeout", 10 * 1000) - result = yield self.handle_request( + result = yield self.e2e_keys_handler.claim_one_time_keys( {"one_time_keys": {user_id: {device_id: algorithm}}}, timeout, ) - defer.returnValue(result) + defer.returnValue((200, result)) @defer.inlineCallbacks def on_POST(self, request, user_id, device_id, algorithm): yield self.auth.get_user_by_req(request) timeout = parse_integer(request, "timeout", 10 * 1000) body = parse_json_object_from_request(request) - result = yield self.handle_request(body, timeout) - defer.returnValue(result) - - @defer.inlineCallbacks - def handle_request(self, body, timeout): - local_query = [] - remote_queries = {} - - for user_id, device_keys in body.get("one_time_keys", {}).items(): - if self.is_mine_id(user_id): - for device_id, algorithm in device_keys.items(): - local_query.append((user_id, device_id, algorithm)) - else: - domain = get_domain_from_id(user_id) - remote_queries.setdefault(domain, {})[user_id] = device_keys - - results = yield self.store.claim_e2e_one_time_keys(local_query) - - json_result = {} - failures = {} - for user_id, device_keys in results.items(): - for device_id, keys in device_keys.items(): - for key_id, json_bytes in keys.items(): - json_result.setdefault(user_id, {})[device_id] = { - key_id: json.loads(json_bytes) - } - - @defer.inlineCallbacks - def claim_client_keys(destination): - device_keys = remote_queries[destination] - try: - remote_result = yield self.federation.claim_client_keys( - destination, - {"one_time_keys": device_keys}, - timeout=timeout - ) - for user_id, keys in remote_result["one_time_keys"].items(): - if user_id in device_keys: - json_result[user_id] = keys - except CodeMessageException as e: - failures[destination] = { - "status": e.code, "message": e.message - } - - yield preserve_context_over_deferred(defer.gatherResults([ - preserve_fn(claim_client_keys)(destination) - for destination in remote_queries - ])) - - defer.returnValue((200, { - "one_time_keys": json_result, - "failures": failures - })) + result = yield self.e2e_keys_handler.claim_one_time_keys( + body, + timeout, + ) + defer.returnValue((200, result)) def register_servlets(hs, http_server): -- cgit 1.4.1 From ad816b0add61a987c70e9264e20c3ec70b7af5d9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Sep 2016 11:53:50 +0100 Subject: Limit how often we ask for keys from dead servers --- synapse/handlers/e2e_keys.py | 41 ++++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 60a1acc6f4..fd11935b40 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -22,6 +22,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, CodeMessageException from synapse.types import get_domain_from_id from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination logger = logging.getLogger(__name__) @@ -88,18 +89,28 @@ class E2eKeysHandler(object): def do_remote_query(destination): destination_query = remote_queries[destination] try: - remote_result = yield self.federation.query_client_keys( - destination, - {"device_keys": destination_query}, - timeout=timeout + limiter = yield get_retry_limiter( + destination, self.clock, self.store ) + with limiter: + remote_result = yield self.federation.query_client_keys( + destination, + {"device_keys": destination_query}, + timeout=timeout + ) + for user_id, keys in remote_result["device_keys"].items(): if user_id in destination_query: results[user_id] = keys + except CodeMessageException as e: failures[destination] = { "status": e.code, "message": e.message } + except NotRetryingDestination as e: + failures[destination] = { + "status": 503, "message": "Not ready for retry", + } yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(do_remote_query)(destination) @@ -191,18 +202,26 @@ class E2eKeysHandler(object): def claim_client_keys(destination): device_keys = remote_queries[destination] try: - remote_result = yield self.federation.claim_client_keys( - destination, - {"one_time_keys": device_keys}, - timeout=timeout + limiter = yield get_retry_limiter( + destination, self.clock, self.store ) - for user_id, keys in remote_result["one_time_keys"].items(): - if user_id in device_keys: - json_result[user_id] = keys + with limiter: + remote_result = yield self.federation.claim_client_keys( + destination, + {"one_time_keys": device_keys}, + timeout=timeout + ) + for user_id, keys in remote_result["one_time_keys"].items(): + if user_id in device_keys: + json_result[user_id] = keys except CodeMessageException as e: failures[destination] = { "status": e.code, "message": e.message } + except NotRetryingDestination as e: + failures[destination] = { + "status": 503, "message": "Not ready for retry", + } yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(claim_client_keys)(destination) -- cgit 1.4.1 From ca35e54d6b080fce04bb92536977d48504933561 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Tue, 13 Sep 2016 13:26:33 +0100 Subject: Fix typo "persiting" --- synapse/handlers/presence.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a949e39bda..b047ae2250 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -217,7 +217,7 @@ class PresenceHandler(object): is some spurious presence changes that will self-correct. """ logger.info( - "Performing _on_shutdown. Persiting %d unpersisted changes", + "Performing _on_shutdown. Persisting %d unpersisted changes", len(self.user_to_current_state) ) @@ -234,7 +234,7 @@ class PresenceHandler(object): may stack up and slow down shutdown times. """ logger.info( - "Performing _persist_unpersisted_changes. Persiting %d unpersisted changes", + "Performing _persist_unpersisted_changes. Persisting %d unpersisted changes", len(self.unpersisted_users_changes) ) -- cgit 1.4.1 From d5ae1f129143d6436a238fd7882e39168f944846 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Sep 2016 10:03:48 +0100 Subject: Ensure we don't mutate state cache entries --- synapse/handlers/federation.py | 4 ++++ synapse/state.py | 6 ++++-- synapse/storage/state.py | 45 +++++++++++++++++++++++------------------- 3 files changed, 33 insertions(+), 22 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8a1038c44a..f7cb3c1bb2 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1585,10 +1585,12 @@ class FederationHandler(BaseHandler): current_state = set(e.event_id for e in auth_events.values()) different_auth = event_auth_events - current_state + context.current_state_ids = dict(context.current_state_ids) context.current_state_ids.update({ k: a.event_id for k, a in auth_events.items() if k != event_key }) + context.prev_state_ids = dict(context.prev_state_ids) context.prev_state_ids.update({ k: a.event_id for k, a in auth_events.items() }) @@ -1670,10 +1672,12 @@ class FederationHandler(BaseHandler): # 4. Look at rejects and their proofs. # TODO. + context.current_state_ids = dict(context.current_state_ids) context.current_state_ids.update({ k: a.event_id for k, a in auth_events.items() if k != event_key }) + context.prev_state_ids = dict(context.prev_state_ids) context.prev_state_ids.update({ k: a.event_id for k, a in auth_events.items() }) diff --git a/synapse/state.py b/synapse/state.py index 4520fa0415..617db8d2e2 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -26,6 +26,7 @@ from synapse.events.snapshot import EventContext from synapse.util.async import Linearizer from collections import namedtuple +from frozendict import frozendict import logging import hashlib @@ -58,11 +59,11 @@ class _StateCacheEntry(object): __slots__ = ["state", "state_group", "state_id", "prev_group", "delta_ids"] def __init__(self, state, state_group, prev_group=None, delta_ids=None): - self.state = state + self.state = frozendict(state) self.state_group = state_group self.prev_group = prev_group - self.delta_ids = delta_ids + self.delta_ids = frozendict(delta_ids) if delta_ids is not None else None # The `state_id` is a unique ID we generate that can be used as ID for # this collection of state. Usually this would be the same as the @@ -255,6 +256,7 @@ class StateHandler(object): context.prev_group = entry.prev_group context.delta_ids = entry.delta_ids if context.delta_ids is not None: + context.delta_ids = dict(context.delta_ids) context.delta_ids[key] = event.event_id else: context.current_state_ids = context.prev_state_ids diff --git a/synapse/storage/state.py b/synapse/storage/state.py index fdbdade536..ec8c62b653 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -817,27 +817,32 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def _background_index_state(self, progress, batch_size): - def reindex_txn(txn): - if isinstance(self.database_engine, PostgresEngine): - txn.execute( - "CREATE INDEX CONCURRENTLY state_groups_state_type_idx" - " ON state_groups_state(state_group, type, state_key)" - ) - txn.execute( - "DROP INDEX IF EXISTS state_groups_state_id" - ) - else: - txn.execute( - "CREATE INDEX state_groups_state_type_idx" - " ON state_groups_state(state_group, type, state_key)" - ) - txn.execute( - "DROP INDEX IF EXISTS state_groups_state_id" - ) + def reindex_txn(conn): + conn.rollback() + # postgres insists on autocommit for the index + conn.set_session(autocommit=True) + try: + txn = conn.cursor() + if isinstance(self.database_engine, PostgresEngine): + txn.execute( + "CREATE INDEX CONCURRENTLY state_groups_state_type_idx" + " ON state_groups_state(state_group, type, state_key)" + ) + txn.execute( + "DROP INDEX IF EXISTS state_groups_state_id" + ) + else: + txn.execute( + "CREATE INDEX state_groups_state_type_idx" + " ON state_groups_state(state_group, type, state_key)" + ) + txn.execute( + "DROP INDEX IF EXISTS state_groups_state_id" + ) + finally: + conn.set_session(autocommit=False) - yield self.runInteraction( - self.STATE_GROUP_INDEX_UPDATE_NAME, reindex_txn - ) + yield self.runWithConnection(reindex_txn) yield self._end_background_update(self.STATE_GROUP_INDEX_UPDATE_NAME) -- cgit 1.4.1 From a70a43bc51bfdd107b8eb4091fc710f2d157916f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Sep 2016 14:07:37 +0100 Subject: Move RoomListHandler into a separate file --- synapse/handlers/room.py | 159 +----------------------------------- synapse/handlers/room_list.py | 184 ++++++++++++++++++++++++++++++++++++++++++ synapse/server.py | 2 +- 3 files changed, 186 insertions(+), 159 deletions(-) create mode 100644 synapse/handlers/room_list.py (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d40ada60c1..cbd26f8f95 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -20,12 +20,10 @@ from ._base import BaseHandler from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken from synapse.api.constants import ( - EventTypes, JoinRules, RoomCreationPreset, Membership, + EventTypes, JoinRules, RoomCreationPreset ) from synapse.api.errors import AuthError, StoreError, SynapseError from synapse.util import stringutils -from synapse.util.async import concurrently_execute -from synapse.util.caches.response_cache import ResponseCache from synapse.visibility import filter_events_for_client from collections import OrderedDict @@ -36,8 +34,6 @@ import string logger = logging.getLogger(__name__) -REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000 - id_server_scheme = "https://" @@ -348,159 +344,6 @@ class RoomCreationHandler(BaseHandler): ) -class RoomListHandler(BaseHandler): - def __init__(self, hs): - super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache(hs) - self.remote_list_request_cache = ResponseCache(hs) - self.remote_list_cache = {} - self.fetch_looping_call = hs.get_clock().looping_call( - self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL - ) - self.fetch_all_remote_lists() - - def get_local_public_room_list(self): - result = self.response_cache.get(()) - if not result: - result = self.response_cache.set((), self._get_public_room_list()) - return result - - @defer.inlineCallbacks - def _get_public_room_list(self): - room_ids = yield self.store.get_public_room_ids() - - results = [] - - @defer.inlineCallbacks - def handle_room(room_id): - current_state = yield self.state_handler.get_current_state(room_id) - - # Double check that this is actually a public room. - join_rules_event = current_state.get((EventTypes.JoinRules, "")) - if join_rules_event: - join_rule = join_rules_event.content.get("join_rule", None) - if join_rule and join_rule != JoinRules.PUBLIC: - defer.returnValue(None) - - result = {"room_id": room_id} - - num_joined_users = len([ - 1 for _, event in current_state.items() - if event.type == EventTypes.Member - and event.membership == Membership.JOIN - ]) - if num_joined_users == 0: - return - - result["num_joined_members"] = num_joined_users - - aliases = yield self.store.get_aliases_for_room(room_id) - if aliases: - result["aliases"] = aliases - - name_event = yield current_state.get((EventTypes.Name, "")) - if name_event: - name = name_event.content.get("name", None) - if name: - result["name"] = name - - topic_event = current_state.get((EventTypes.Topic, "")) - if topic_event: - topic = topic_event.content.get("topic", None) - if topic: - result["topic"] = topic - - canonical_event = current_state.get((EventTypes.CanonicalAlias, "")) - if canonical_event: - canonical_alias = canonical_event.content.get("alias", None) - if canonical_alias: - result["canonical_alias"] = canonical_alias - - visibility_event = current_state.get((EventTypes.RoomHistoryVisibility, "")) - visibility = None - if visibility_event: - visibility = visibility_event.content.get("history_visibility", None) - result["world_readable"] = visibility == "world_readable" - - guest_event = current_state.get((EventTypes.GuestAccess, "")) - guest = None - if guest_event: - guest = guest_event.content.get("guest_access", None) - result["guest_can_join"] = guest == "can_join" - - avatar_event = current_state.get(("m.room.avatar", "")) - if avatar_event: - avatar_url = avatar_event.content.get("url", None) - if avatar_url: - result["avatar_url"] = avatar_url - - results.append(result) - - yield concurrently_execute(handle_room, room_ids, 10) - - # FIXME (erikj): START is no longer a valid value - defer.returnValue({"start": "START", "end": "END", "chunk": results}) - - @defer.inlineCallbacks - def fetch_all_remote_lists(self): - deferred = self.hs.get_replication_layer().get_public_rooms( - self.hs.config.secondary_directory_servers - ) - self.remote_list_request_cache.set((), deferred) - self.remote_list_cache = yield deferred - - @defer.inlineCallbacks - def get_remote_public_room_list(self, server_name): - res = yield self.hs.get_replication_layer().get_public_rooms( - [server_name] - ) - - if server_name not in res: - raise SynapseError(404, "Server not found") - defer.returnValue(res[server_name]) - - @defer.inlineCallbacks - def get_aggregated_public_room_list(self): - """ - Get the public room list from this server and the servers - specified in the secondary_directory_servers config option. - XXX: Pagination... - """ - # We return the results from out cache which is updated by a looping call, - # unless we're missing a cache entry, in which case wait for the result - # of the fetch if there's one in progress. If not, omit that server. - wait = False - for s in self.hs.config.secondary_directory_servers: - if s not in self.remote_list_cache: - logger.warn("No cached room list from %s: waiting for fetch", s) - wait = True - break - - if wait and self.remote_list_request_cache.get(()): - yield self.remote_list_request_cache.get(()) - - public_rooms = yield self.get_local_public_room_list() - - # keep track of which room IDs we've seen so we can de-dup - room_ids = set() - - # tag all the ones in our list with our server name. - # Also add the them to the de-deping set - for room in public_rooms['chunk']: - room["server_name"] = self.hs.hostname - room_ids.add(room["room_id"]) - - # Now add the results from federation - for server_name, server_result in self.remote_list_cache.items(): - for room in server_result["chunk"]: - if room["room_id"] not in room_ids: - room["server_name"] = server_name - public_rooms["chunk"].append(room) - room_ids.add(room["room_id"]) - - defer.returnValue(public_rooms) - - class RoomContextHandler(BaseHandler): @defer.inlineCallbacks def get_event_context(self, user, room_id, event_id, limit, is_guest): diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py new file mode 100644 index 0000000000..d72e8c99f9 --- /dev/null +++ b/synapse/handlers/room_list.py @@ -0,0 +1,184 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 - 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from ._base import BaseHandler + +from synapse.api.constants import ( + EventTypes, JoinRules, Membership, +) +from synapse.api.errors import SynapseError +from synapse.util.async import concurrently_execute +from synapse.util.caches.response_cache import ResponseCache + +import logging + +logger = logging.getLogger(__name__) + +REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000 + + +class RoomListHandler(BaseHandler): + def __init__(self, hs): + super(RoomListHandler, self).__init__(hs) + self.response_cache = ResponseCache(hs) + self.remote_list_request_cache = ResponseCache(hs) + self.remote_list_cache = {} + self.fetch_looping_call = hs.get_clock().looping_call( + self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL + ) + self.fetch_all_remote_lists() + + def get_local_public_room_list(self): + result = self.response_cache.get(()) + if not result: + result = self.response_cache.set((), self._get_public_room_list()) + return result + + @defer.inlineCallbacks + def _get_public_room_list(self): + room_ids = yield self.store.get_public_room_ids() + + results = [] + + @defer.inlineCallbacks + def handle_room(room_id): + current_state = yield self.state_handler.get_current_state(room_id) + + # Double check that this is actually a public room. + join_rules_event = current_state.get((EventTypes.JoinRules, "")) + if join_rules_event: + join_rule = join_rules_event.content.get("join_rule", None) + if join_rule and join_rule != JoinRules.PUBLIC: + defer.returnValue(None) + + result = {"room_id": room_id} + + num_joined_users = len([ + 1 for _, event in current_state.items() + if event.type == EventTypes.Member + and event.membership == Membership.JOIN + ]) + if num_joined_users == 0: + return + + result["num_joined_members"] = num_joined_users + + aliases = yield self.store.get_aliases_for_room(room_id) + if aliases: + result["aliases"] = aliases + + name_event = yield current_state.get((EventTypes.Name, "")) + if name_event: + name = name_event.content.get("name", None) + if name: + result["name"] = name + + topic_event = current_state.get((EventTypes.Topic, "")) + if topic_event: + topic = topic_event.content.get("topic", None) + if topic: + result["topic"] = topic + + canonical_event = current_state.get((EventTypes.CanonicalAlias, "")) + if canonical_event: + canonical_alias = canonical_event.content.get("alias", None) + if canonical_alias: + result["canonical_alias"] = canonical_alias + + visibility_event = current_state.get((EventTypes.RoomHistoryVisibility, "")) + visibility = None + if visibility_event: + visibility = visibility_event.content.get("history_visibility", None) + result["world_readable"] = visibility == "world_readable" + + guest_event = current_state.get((EventTypes.GuestAccess, "")) + guest = None + if guest_event: + guest = guest_event.content.get("guest_access", None) + result["guest_can_join"] = guest == "can_join" + + avatar_event = current_state.get(("m.room.avatar", "")) + if avatar_event: + avatar_url = avatar_event.content.get("url", None) + if avatar_url: + result["avatar_url"] = avatar_url + + results.append(result) + + yield concurrently_execute(handle_room, room_ids, 10) + + # FIXME (erikj): START is no longer a valid value + defer.returnValue({"start": "START", "end": "END", "chunk": results}) + + @defer.inlineCallbacks + def fetch_all_remote_lists(self): + deferred = self.hs.get_replication_layer().get_public_rooms( + self.hs.config.secondary_directory_servers + ) + self.remote_list_request_cache.set((), deferred) + self.remote_list_cache = yield deferred + + @defer.inlineCallbacks + def get_remote_public_room_list(self, server_name): + res = yield self.hs.get_replication_layer().get_public_rooms( + [server_name] + ) + + if server_name not in res: + raise SynapseError(404, "Server not found") + defer.returnValue(res[server_name]) + + @defer.inlineCallbacks + def get_aggregated_public_room_list(self): + """ + Get the public room list from this server and the servers + specified in the secondary_directory_servers config option. + XXX: Pagination... + """ + # We return the results from out cache which is updated by a looping call, + # unless we're missing a cache entry, in which case wait for the result + # of the fetch if there's one in progress. If not, omit that server. + wait = False + for s in self.hs.config.secondary_directory_servers: + if s not in self.remote_list_cache: + logger.warn("No cached room list from %s: waiting for fetch", s) + wait = True + break + + if wait and self.remote_list_request_cache.get(()): + yield self.remote_list_request_cache.get(()) + + public_rooms = yield self.get_local_public_room_list() + + # keep track of which room IDs we've seen so we can de-dup + room_ids = set() + + # tag all the ones in our list with our server name. + # Also add the them to the de-deping set + for room in public_rooms['chunk']: + room["server_name"] = self.hs.hostname + room_ids.add(room["room_id"]) + + # Now add the results from federation + for server_name, server_result in self.remote_list_cache.items(): + for room in server_result["chunk"]: + if room["room_id"] not in room_ids: + room["server_name"] = server_name + public_rooms["chunk"].append(room) + room_ids.add(room["room_id"]) + + defer.returnValue(public_rooms) diff --git a/synapse/server.py b/synapse/server.py index f516f08167..69860f3d82 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -39,7 +39,7 @@ from synapse.handlers.devicemessage import DeviceMessageHandler from synapse.handlers.device import DeviceHandler from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.presence import PresenceHandler -from synapse.handlers.room import RoomListHandler +from synapse.handlers.room_list import RoomListHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler from synapse.handlers.events import EventHandler, EventStreamHandler -- cgit 1.4.1 From 772c6067a368b3f6cf9939641e0547f62ebd3322 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Sep 2016 17:17:07 +0100 Subject: Refactor public rooms to not pull out the full state for each room --- synapse/handlers/room_list.py | 44 +++++++++++++++++++++++++++++-------------- synapse/state.py | 5 +++-- 2 files changed, 33 insertions(+), 16 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index d72e8c99f9..a3d554ff2c 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -18,7 +18,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.constants import ( - EventTypes, JoinRules, Membership, + EventTypes, JoinRules, ) from synapse.api.errors import SynapseError from synapse.util.async import concurrently_execute @@ -56,7 +56,35 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def handle_room(room_id): - current_state = yield self.state_handler.get_current_state(room_id) + joined_users = yield self.state_handler.get_current_user_in_room(room_id) + num_joined_users = len(joined_users) + if num_joined_users == 0: + return + + result = { + "room_id": room_id, + "num_joined_members": num_joined_users, + } + + current_state_ids = yield self.state_handler.get_current_state_ids(room_id) + + event_map = yield self.store.get_events([ + event_id for key, event_id in current_state_ids.items() + if key[0] in ( + EventTypes.JoinRules, + EventTypes.Name, + EventTypes.Topic, + EventTypes.CanonicalAlias, + EventTypes.RoomHistoryVisibility, + EventTypes.GuestAccess, + "m.room.avatar", + ) + ]) + + current_state = { + (ev.type, ev.state_key): ev + for ev in event_map.values() + } # Double check that this is actually a public room. join_rules_event = current_state.get((EventTypes.JoinRules, "")) @@ -65,18 +93,6 @@ class RoomListHandler(BaseHandler): if join_rule and join_rule != JoinRules.PUBLIC: defer.returnValue(None) - result = {"room_id": room_id} - - num_joined_users = len([ - 1 for _, event in current_state.items() - if event.type == EventTypes.Member - and event.membership == Membership.JOIN - ]) - if num_joined_users == 0: - return - - result["num_joined_members"] = num_joined_users - aliases = yield self.store.get_aliases_for_room(room_id) if aliases: result["aliases"] = aliases diff --git a/synapse/state.py b/synapse/state.py index 4520fa0415..5ce23add5d 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -156,8 +156,9 @@ class StateHandler(object): defer.returnValue(state) @defer.inlineCallbacks - def get_current_user_in_room(self, room_id): - latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + def get_current_user_in_room(self, room_id, latest_event_ids=None): + if not latest_event_ids: + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) entry = yield self.resolve_state_groups(room_id, latest_event_ids) joined_users = yield self.store.get_joined_users_from_state( room_id, entry.state_id, entry.state -- cgit 1.4.1 From c566f0ee17ee015e1a841e209f202c6e8aefdfcd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Sep 2016 17:28:52 +0100 Subject: Calculate the public room list from a stream_ordering --- synapse/handlers/room_list.py | 43 ++++++++++++++++++++++++++++++++++++++++--- synapse/storage/stream.py | 3 +++ 2 files changed, 43 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index a3d554ff2c..2b5a382052 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -52,12 +52,49 @@ class RoomListHandler(BaseHandler): def _get_public_room_list(self): room_ids = yield self.store.get_public_room_ids() + rooms_to_order_value = {} + rooms_to_num_joined = {} + rooms_to_latest_event_ids = {} + + current_stream_token = yield self.store.get_room_max_stream_ordering() + + # We want to return rooms in a particular order: the number of joined + # users. We then arbitrarily use the room_id as a tie breaker. + + @defer.inlineCallbacks + def get_order_for_room(room_id): + latest_event_ids = rooms_to_latest_event_ids.get(room_id, None) + if not latest_event_ids: + latest_event_ids = yield self.store.get_forward_extremeties_for_room( + room_id, current_stream_token + ) + rooms_to_latest_event_ids[room_id] = latest_event_ids + + if not latest_event_ids: + return + + joined_users = yield self.state_handler.get_current_user_in_room( + room_id, latest_event_ids, + ) + num_joined_users = len(joined_users) + rooms_to_num_joined[room_id] = num_joined_users + + if num_joined_users == 0: + return + + # We want larger rooms to be first, hence negating num_joined_users + rooms_to_order_value[room_id] = (-num_joined_users, room_id) + + yield concurrently_execute(get_order_for_room, room_ids, 10) + + sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1]) + sorted_rooms = [room_id for room_id, _ in sorted_entries] + results = [] @defer.inlineCallbacks def handle_room(room_id): - joined_users = yield self.state_handler.get_current_user_in_room(room_id) - num_joined_users = len(joined_users) + num_joined_users = rooms_to_num_joined[room_id] if num_joined_users == 0: return @@ -135,7 +172,7 @@ class RoomListHandler(BaseHandler): results.append(result) - yield concurrently_execute(handle_room, room_ids, 10) + yield concurrently_execute(handle_room, sorted_rooms, 10) # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": results}) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 0577a0525b..07ea969d4d 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -531,6 +531,9 @@ class StreamStore(SQLBaseStore): ) defer.returnValue("t%d-%d" % (topo, token)) + def get_room_max_stream_ordering(self): + return self._stream_id_gen.get_current_token() + def get_stream_token_for_event(self, event_id): """The stream token for an event Args: -- cgit 1.4.1 From 4f181f361d94311971231e699f45fd00472022e6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Sep 2016 09:08:57 +0100 Subject: Accept optional token to public room list --- synapse/handlers/room_list.py | 82 ++++++++++++++++++++++++++++++++++++++---- synapse/python_dependencies.py | 1 + 2 files changed, 76 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 2b5a382052..14e2487bbf 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -24,7 +24,11 @@ from synapse.api.errors import SynapseError from synapse.util.async import concurrently_execute from synapse.util.caches.response_cache import ResponseCache +from collections import namedtuple +from unpaddedbase64 import encode_base64, decode_base64 + import logging +import msgpack logger = logging.getLogger(__name__) @@ -42,21 +46,32 @@ class RoomListHandler(BaseHandler): ) self.fetch_all_remote_lists() - def get_local_public_room_list(self): - result = self.response_cache.get(()) + def get_local_public_room_list(self, limit=None, next_batch=None): + result = self.response_cache.get((limit, next_batch)) if not result: - result = self.response_cache.set((), self._get_public_room_list()) + result = self.response_cache.set( + (limit, next_batch), + self._get_public_room_list(limit, next_batch) + ) return result @defer.inlineCallbacks - def _get_public_room_list(self): + def _get_public_room_list(self, limit=None, next_batch=None): + if next_batch and next_batch != "END": + next_batch = RoomListNextBatch.from_token(next_batch) + else: + next_batch = None + room_ids = yield self.store.get_public_room_ids() rooms_to_order_value = {} rooms_to_num_joined = {} rooms_to_latest_event_ids = {} - current_stream_token = yield self.store.get_room_max_stream_ordering() + if next_batch: + current_stream_token = next_batch.sstream_ordering + else: + current_stream_token = yield self.store.get_room_max_stream_ordering() # We want to return rooms in a particular order: the number of joined # users. We then arbitrarily use the room_id as a tie breaker. @@ -90,6 +105,17 @@ class RoomListHandler(BaseHandler): sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1]) sorted_rooms = [room_id for room_id, _ in sorted_entries] + if next_batch: + sorted_rooms = sorted_rooms[next_batch.current_limit:] + + new_limit = None + if limit: + if sorted_rooms[limit:]: + new_limit = limit + if next_batch: + new_limit += next_batch.current_limit + sorted_rooms = sorted_rooms[:limit] + results = [] @defer.inlineCallbacks @@ -174,8 +200,24 @@ class RoomListHandler(BaseHandler): yield concurrently_execute(handle_room, sorted_rooms, 10) - # FIXME (erikj): START is no longer a valid value - defer.returnValue({"start": "START", "end": "END", "chunk": results}) + if new_limit: + end_token = RoomListNextBatch( + stream_ordering=current_stream_token, + current_limit=new_limit, + ).to_token() + else: + end_token = "END" + + if next_batch: + start_token = next_batch.to_token() + else: + start_token = "START" + + defer.returnValue({ + "start": start_token, + "end": end_token, + "chunk": results, + }) @defer.inlineCallbacks def fetch_all_remote_lists(self): @@ -235,3 +277,29 @@ class RoomListHandler(BaseHandler): room_ids.add(room["room_id"]) defer.returnValue(public_rooms) + + +class RoomListNextBatch(namedtuple("RoomListNextBatch", ( + "stream_ordering", # stream_ordering of the first public room list + "current_limit", # The number of previous rooms returned +))): + + KEY_DICT = { + "stream_ordering": "s", + "current_limit": "n", + } + + REVERSE_KEY_DICT = {v: k for k, v in KEY_DICT.items()} + + @classmethod + def from_token(cls, token): + return RoomListNextBatch(**{ + cls.REVERSE_KEY_DICT[key]: val + for key, val in msgpack.loads(decode_base64(token)).items() + }) + + def to_token(self): + return encode_base64(msgpack.dumps({ + self.KEY_DICT[key]: val + for key, val in self._asdict().items() + })) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 86e3d89154..b9e41770ee 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -36,6 +36,7 @@ REQUIREMENTS = { "blist": ["blist"], "pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"], "pymacaroons-pynacl": ["pymacaroons"], + "msgpack-python>=0.3.0": ["msgpack"], } CONDITIONAL_REQUIREMENTS = { "web_client": { -- cgit 1.4.1 From 413138112379594bf9290576d44c365af612817d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Sep 2016 09:27:47 +0100 Subject: Remove support for aggregate room lists --- synapse/config/server.py | 9 ------- synapse/handlers/room_list.py | 55 ------------------------------------------ synapse/rest/client/v1/room.py | 2 +- 3 files changed, 1 insertion(+), 65 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/config/server.py b/synapse/config/server.py index 51eaf423ce..ed5417d0c3 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -29,7 +29,6 @@ class ServerConfig(Config): self.user_agent_suffix = config.get("user_agent_suffix") self.use_frozen_dicts = config.get("use_frozen_dicts", False) self.public_baseurl = config.get("public_baseurl") - self.secondary_directory_servers = config.get("secondary_directory_servers", []) if self.public_baseurl is not None: if self.public_baseurl[-1] != '/': @@ -142,14 +141,6 @@ class ServerConfig(Config): # The GC threshold parameters to pass to `gc.set_threshold`, if defined # gc_thresholds: [700, 10, 10] - # A list of other Home Servers to fetch the public room directory from - # and include in the public room directory of this home server - # This is a temporary stopgap solution to populate new server with a - # list of rooms until there exists a good solution of a decentralized - # room directory. - # secondary_directory_servers: - # - matrix.org - # List of ports that Synapse should listen on, their purpose and their # configuration. listeners: diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 14e2487bbf..e545777c51 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -39,12 +39,6 @@ class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) self.response_cache = ResponseCache(hs) - self.remote_list_request_cache = ResponseCache(hs) - self.remote_list_cache = {} - self.fetch_looping_call = hs.get_clock().looping_call( - self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL - ) - self.fetch_all_remote_lists() def get_local_public_room_list(self, limit=None, next_batch=None): result = self.response_cache.get((limit, next_batch)) @@ -219,14 +213,6 @@ class RoomListHandler(BaseHandler): "chunk": results, }) - @defer.inlineCallbacks - def fetch_all_remote_lists(self): - deferred = self.hs.get_replication_layer().get_public_rooms( - self.hs.config.secondary_directory_servers - ) - self.remote_list_request_cache.set((), deferred) - self.remote_list_cache = yield deferred - @defer.inlineCallbacks def get_remote_public_room_list(self, server_name): res = yield self.hs.get_replication_layer().get_public_rooms( @@ -237,47 +223,6 @@ class RoomListHandler(BaseHandler): raise SynapseError(404, "Server not found") defer.returnValue(res[server_name]) - @defer.inlineCallbacks - def get_aggregated_public_room_list(self): - """ - Get the public room list from this server and the servers - specified in the secondary_directory_servers config option. - XXX: Pagination... - """ - # We return the results from out cache which is updated by a looping call, - # unless we're missing a cache entry, in which case wait for the result - # of the fetch if there's one in progress. If not, omit that server. - wait = False - for s in self.hs.config.secondary_directory_servers: - if s not in self.remote_list_cache: - logger.warn("No cached room list from %s: waiting for fetch", s) - wait = True - break - - if wait and self.remote_list_request_cache.get(()): - yield self.remote_list_request_cache.get(()) - - public_rooms = yield self.get_local_public_room_list() - - # keep track of which room IDs we've seen so we can de-dup - room_ids = set() - - # tag all the ones in our list with our server name. - # Also add the them to the de-deping set - for room in public_rooms['chunk']: - room["server_name"] = self.hs.hostname - room_ids.add(room["room_id"]) - - # Now add the results from federation - for server_name, server_result in self.remote_list_cache.items(): - for room in server_result["chunk"]: - if room["room_id"] not in room_ids: - room["server_name"] = server_name - public_rooms["chunk"].append(room) - room_ids.add(room["room_id"]) - - defer.returnValue(public_rooms) - class RoomListNextBatch(namedtuple("RoomListNextBatch", ( "stream_ordering", # stream_ordering of the first public room list diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 22d6a7d31e..c40913c7d1 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -321,7 +321,7 @@ class PublicRoomListRestServlet(ClientV1RestServlet): if server: data = yield handler.get_remote_public_room_list(server) else: - data = yield handler.get_aggregated_public_room_list() + data = yield handler.get_local_public_room_list() defer.returnValue((200, data)) -- cgit 1.4.1 From f3eead066044083457b3a60ee67d3d1fb2b8e4c8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Sep 2016 10:15:37 +0100 Subject: Allow paginating both forwards and backwards --- synapse/handlers/room_list.py | 71 +++++++++++++++++++++++++++++------------- synapse/rest/client/v1/room.py | 18 +++++++++-- 2 files changed, 65 insertions(+), 24 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index e545777c51..94a5e7f51c 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -63,7 +63,7 @@ class RoomListHandler(BaseHandler): rooms_to_latest_event_ids = {} if next_batch: - current_stream_token = next_batch.sstream_ordering + current_stream_token = next_batch.stream_ordering else: current_stream_token = yield self.store.get_room_max_stream_ordering() @@ -100,17 +100,25 @@ class RoomListHandler(BaseHandler): sorted_rooms = [room_id for room_id, _ in sorted_entries] if next_batch: - sorted_rooms = sorted_rooms[next_batch.current_limit:] + if next_batch.direction_is_forward: + sorted_rooms = sorted_rooms[next_batch.current_limit:] + else: + sorted_rooms = sorted_rooms[:next_batch.current_limit] + sorted_rooms.reverse() new_limit = None if limit: if sorted_rooms[limit:]: new_limit = limit if next_batch: - new_limit += next_batch.current_limit + if next_batch.direction_is_forward: + new_limit += next_batch.current_limit + else: + new_limit = next_batch.current_limit - new_limit + new_limit = max(0, new_limit) sorted_rooms = sorted_rooms[:limit] - results = [] + chunk = [] @defer.inlineCallbacks def handle_room(room_id): @@ -190,31 +198,45 @@ class RoomListHandler(BaseHandler): if avatar_url: result["avatar_url"] = avatar_url - results.append(result) + chunk.append(result) yield concurrently_execute(handle_room, sorted_rooms, 10) - if new_limit: - end_token = RoomListNextBatch( - stream_ordering=current_stream_token, - current_limit=new_limit, - ).to_token() - else: - end_token = "END" + chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"])) - if next_batch: - start_token = next_batch.to_token() + results = { + "chunk": chunk, + } + + if not next_batch or next_batch.direction_is_forward: + if new_limit: + results["next_batch"] = RoomListNextBatch( + stream_ordering=current_stream_token, + current_limit=new_limit, + direction_is_forward=True, + ).to_token() + + if next_batch: + results["prev_batch"] = next_batch.copy_and_replace( + direction_is_forward=False, + ).to_token() else: - start_token = "START" + if new_limit: + results["prev_batch"] = RoomListNextBatch( + stream_ordering=current_stream_token, + current_limit=new_limit, + direction_is_forward=False, + ).to_token() - defer.returnValue({ - "start": start_token, - "end": end_token, - "chunk": results, - }) + if next_batch: + results["next_batch"] = next_batch.copy_and_replace( + direction_is_forward=True, + ).to_token() + + defer.returnValue(results) @defer.inlineCallbacks - def get_remote_public_room_list(self, server_name): + def get_remote_public_room_list(self, server_name, limit=None, next_batch=None): res = yield self.hs.get_replication_layer().get_public_rooms( [server_name] ) @@ -227,11 +249,13 @@ class RoomListHandler(BaseHandler): class RoomListNextBatch(namedtuple("RoomListNextBatch", ( "stream_ordering", # stream_ordering of the first public room list "current_limit", # The number of previous rooms returned + "direction_is_forward", # Bool if this is a next_batch, false if prev_batch ))): KEY_DICT = { "stream_ordering": "s", "current_limit": "n", + "direction_is_forward": "d", } REVERSE_KEY_DICT = {v: k for k, v in KEY_DICT.items()} @@ -248,3 +272,8 @@ class RoomListNextBatch(namedtuple("RoomListNextBatch", ( self.KEY_DICT[key]: val for key, val in self._asdict().items() })) + + def copy_and_replace(self, **kwds): + return self._replace( + **kwds + ) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index c40913c7d1..00b7738e0b 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -23,7 +23,9 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.filtering import Filter from synapse.types import UserID, RoomID, RoomAlias from synapse.events.utils import serialize_event, format_event_for_client_v2 -from synapse.http.servlet import parse_json_object_from_request, parse_string +from synapse.http.servlet import ( + parse_json_object_from_request, parse_string, parse_integer +) import logging import urllib @@ -317,11 +319,21 @@ class PublicRoomListRestServlet(ClientV1RestServlet): else: pass + limit = parse_integer(request, "limit", 0) + next_batch = parse_string(request, "since", None) + handler = self.hs.get_room_list_handler() if server: - data = yield handler.get_remote_public_room_list(server) + data = yield handler.get_remote_public_room_list( + server, + limit=limit, + next_batch=next_batch, + ) else: - data = yield handler.get_local_public_room_list() + data = yield handler.get_local_public_room_list( + limit=limit, + next_batch=next_batch, + ) defer.returnValue((200, data)) -- cgit 1.4.1 From 5810cffd335f96ac448497e7caf46c5cbf29d6a8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Sep 2016 10:36:19 +0100 Subject: Pass since/from parameters over federation --- synapse/federation/federation_client.py | 22 +++---------- synapse/federation/transport/client.py | 9 +++++- synapse/federation/transport/server.py | 10 ++++-- synapse/handlers/room_list.py | 55 ++++++++++++++++----------------- synapse/http/servlet.py | 18 ++++++++--- synapse/rest/client/v1/room.py | 6 ++-- 6 files changed, 63 insertions(+), 57 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 91bed4746f..f0a684fc13 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -24,7 +24,6 @@ from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, ) from synapse.util import unwrapFirstError -from synapse.util.async import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred @@ -719,24 +718,11 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to send to any server.") - @defer.inlineCallbacks - def get_public_rooms(self, destinations): - results_by_server = {} - - @defer.inlineCallbacks - def _get_result(s): - if s == self.server_name: - defer.returnValue() - - try: - result = yield self.transport_layer.get_public_rooms(s) - results_by_server[s] = result - except: - logger.exception("Error getting room list from server %r", s) - - yield concurrently_execute(_get_result, destinations, 3) + def get_public_rooms(self, destination, limit=None, since_token=None): + if destination == self.server_name: + return - defer.returnValue(results_by_server) + return self.transport_layer.get_public_rooms(destination, limit, since_token) @defer.inlineCallbacks def query_auth(self, destination, room_id, event_id, local_auth): diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 2b138526ba..f508b70f11 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -248,12 +248,19 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def get_public_rooms(self, remote_server): + def get_public_rooms(self, remote_server, limit, since_token): path = PREFIX + "/publicRooms" + args = {} + if limit: + args["limit"] = [str(limit)] + if since_token: + args["since"] = [since_token] + response = yield self.client.get_json( destination=remote_server, path=path, + args=args, ) defer.returnValue(response) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 37c0d4fbc4..fec337be64 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -18,7 +18,9 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.api.errors import Codes, SynapseError from synapse.http.server import JsonResource -from synapse.http.servlet import parse_json_object_from_request +from synapse.http.servlet import ( + parse_json_object_from_request, parse_integer_from_args, parse_string_from_args, +) from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string @@ -554,7 +556,11 @@ class PublicRoomList(BaseFederationServlet): @defer.inlineCallbacks def on_GET(self, origin, content, query): - data = yield self.room_list_handler.get_local_public_room_list() + limit = parse_integer_from_args(query, "limit", 0) + since_token = parse_string_from_args(query, "since", None) + data = yield self.room_list_handler.get_local_public_room_list( + limit, since_token + ) defer.returnValue((200, data)) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 94a5e7f51c..6a62f3c27e 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -20,7 +20,6 @@ from ._base import BaseHandler from synapse.api.constants import ( EventTypes, JoinRules, ) -from synapse.api.errors import SynapseError from synapse.util.async import concurrently_execute from synapse.util.caches.response_cache import ResponseCache @@ -40,21 +39,21 @@ class RoomListHandler(BaseHandler): super(RoomListHandler, self).__init__(hs) self.response_cache = ResponseCache(hs) - def get_local_public_room_list(self, limit=None, next_batch=None): - result = self.response_cache.get((limit, next_batch)) + def get_local_public_room_list(self, limit=None, since_token=None): + result = self.response_cache.get((limit, since_token)) if not result: result = self.response_cache.set( - (limit, next_batch), - self._get_public_room_list(limit, next_batch) + (limit, since_token), + self._get_public_room_list(limit, since_token) ) return result @defer.inlineCallbacks - def _get_public_room_list(self, limit=None, next_batch=None): - if next_batch and next_batch != "END": - next_batch = RoomListNextBatch.from_token(next_batch) + def _get_public_room_list(self, limit=None, since_token=None): + if since_token and since_token != "END": + since_token = RoomListNextBatch.from_token(since_token) else: - next_batch = None + since_token = None room_ids = yield self.store.get_public_room_ids() @@ -62,8 +61,8 @@ class RoomListHandler(BaseHandler): rooms_to_num_joined = {} rooms_to_latest_event_ids = {} - if next_batch: - current_stream_token = next_batch.stream_ordering + if since_token: + current_stream_token = since_token.stream_ordering else: current_stream_token = yield self.store.get_room_max_stream_ordering() @@ -99,22 +98,22 @@ class RoomListHandler(BaseHandler): sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1]) sorted_rooms = [room_id for room_id, _ in sorted_entries] - if next_batch: - if next_batch.direction_is_forward: - sorted_rooms = sorted_rooms[next_batch.current_limit:] + if since_token: + if since_token.direction_is_forward: + sorted_rooms = sorted_rooms[since_token.current_limit:] else: - sorted_rooms = sorted_rooms[:next_batch.current_limit] + sorted_rooms = sorted_rooms[:since_token.current_limit] sorted_rooms.reverse() new_limit = None if limit: if sorted_rooms[limit:]: new_limit = limit - if next_batch: - if next_batch.direction_is_forward: - new_limit += next_batch.current_limit + if since_token: + if since_token.direction_is_forward: + new_limit += since_token.current_limit else: - new_limit = next_batch.current_limit - new_limit + new_limit = since_token.current_limit - new_limit new_limit = max(0, new_limit) sorted_rooms = sorted_rooms[:limit] @@ -208,7 +207,7 @@ class RoomListHandler(BaseHandler): "chunk": chunk, } - if not next_batch or next_batch.direction_is_forward: + if not since_token or since_token.direction_is_forward: if new_limit: results["next_batch"] = RoomListNextBatch( stream_ordering=current_stream_token, @@ -216,8 +215,8 @@ class RoomListHandler(BaseHandler): direction_is_forward=True, ).to_token() - if next_batch: - results["prev_batch"] = next_batch.copy_and_replace( + if since_token: + results["prev_batch"] = since_token.copy_and_replace( direction_is_forward=False, ).to_token() else: @@ -228,22 +227,20 @@ class RoomListHandler(BaseHandler): direction_is_forward=False, ).to_token() - if next_batch: - results["next_batch"] = next_batch.copy_and_replace( + if since_token: + results["next_batch"] = since_token.copy_and_replace( direction_is_forward=True, ).to_token() defer.returnValue(results) @defer.inlineCallbacks - def get_remote_public_room_list(self, server_name, limit=None, next_batch=None): + def get_remote_public_room_list(self, server_name, limit=None, since_token=None): res = yield self.hs.get_replication_layer().get_public_rooms( - [server_name] + server_name, limit=limit, since_token=since_token, ) - if server_name not in res: - raise SynapseError(404, "Server not found") - defer.returnValue(res[server_name]) + defer.returnValue(res) class RoomListNextBatch(namedtuple("RoomListNextBatch", ( diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index e41afeab8e..9346386238 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -41,9 +41,13 @@ def parse_integer(request, name, default=None, required=False): SynapseError: if the parameter is absent and required, or if the parameter is present and not an integer. """ - if name in request.args: + return parse_integer_from_args(request.args, name, default, required) + + +def parse_integer_from_args(args, name, default=None, required=False): + if name in args: try: - return int(request.args[name][0]) + return int(args[name][0]) except: message = "Query parameter %r must be an integer" % (name,) raise SynapseError(400, message) @@ -116,9 +120,15 @@ def parse_string(request, name, default=None, required=False, parameter is present, must be one of a list of allowed values and is not one of those allowed values. """ + return parse_string_from_args( + request.args, name, default, required, allowed_values, param_type, + ) - if name in request.args: - value = request.args[name][0] + +def parse_string_from_args(args, name, default=None, required=False, + allowed_values=None, param_type="string"): + if name in args: + value = args[name][0] if allowed_values is not None and value not in allowed_values: message = "Query parameter %r must be one of [%s]" % ( name, ", ".join(repr(v) for v in allowed_values) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 00b7738e0b..db0cd4380a 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -320,19 +320,19 @@ class PublicRoomListRestServlet(ClientV1RestServlet): pass limit = parse_integer(request, "limit", 0) - next_batch = parse_string(request, "since", None) + since_token = parse_string(request, "since", None) handler = self.hs.get_room_list_handler() if server: data = yield handler.get_remote_public_room_list( server, limit=limit, - next_batch=next_batch, + since_token=since_token, ) else: data = yield handler.get_local_public_room_list( limit=limit, - next_batch=next_batch, + since_token=since_token, ) defer.returnValue((200, data)) -- cgit 1.4.1 From 4fb65a10916481e0600d506d4c7e9bcfbffb7092 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Sep 2016 11:27:04 +0100 Subject: Base public room list off of public_rooms stream --- synapse/handlers/room_list.py | 34 ++++++++++++++++++++++------ synapse/storage/room.py | 52 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 6a62f3c27e..28bc35f8a3 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -55,16 +55,26 @@ class RoomListHandler(BaseHandler): else: since_token = None - room_ids = yield self.store.get_public_room_ids() - rooms_to_order_value = {} rooms_to_num_joined = {} rooms_to_latest_event_ids = {} + newly_visible = [] + newly_unpublished = [] if since_token: - current_stream_token = since_token.stream_ordering + stream_token = since_token.stream_ordering + current_public_id = yield self.store.get_current_public_room_stream_id() + public_room_stream_id = since_token.public_room_stream_id + newly_visible, newly_unpublished = yield self.store.get_public_room_changes( + public_room_stream_id, current_public_id + ) else: - current_stream_token = yield self.store.get_room_max_stream_ordering() + stream_token = yield self.store.get_room_max_stream_ordering() + public_room_stream_id = yield self.store.get_current_public_room_stream_id() + + room_ids = yield self.store.get_public_room_ids_at_stream_id( + public_room_stream_id + ) # We want to return rooms in a particular order: the number of joined # users. We then arbitrarily use the room_id as a tie breaker. @@ -74,7 +84,7 @@ class RoomListHandler(BaseHandler): latest_event_ids = rooms_to_latest_event_ids.get(room_id, None) if not latest_event_ids: latest_event_ids = yield self.store.get_forward_extremeties_for_room( - room_id, current_stream_token + room_id, stream_token ) rooms_to_latest_event_ids[room_id] = latest_event_ids @@ -125,6 +135,9 @@ class RoomListHandler(BaseHandler): if num_joined_users == 0: return + if room_id in newly_unpublished: + return + result = { "room_id": room_id, "num_joined_members": num_joined_users, @@ -207,10 +220,14 @@ class RoomListHandler(BaseHandler): "chunk": chunk, } + if since_token: + results["new_rooms"] = bool(newly_visible) + if not since_token or since_token.direction_is_forward: if new_limit: results["next_batch"] = RoomListNextBatch( - stream_ordering=current_stream_token, + stream_ordering=stream_token, + public_room_stream_id=public_room_stream_id, current_limit=new_limit, direction_is_forward=True, ).to_token() @@ -222,7 +239,8 @@ class RoomListHandler(BaseHandler): else: if new_limit: results["prev_batch"] = RoomListNextBatch( - stream_ordering=current_stream_token, + stream_ordering=stream_token, + public_room_stream_id=public_room_stream_id, current_limit=new_limit, direction_is_forward=False, ).to_token() @@ -245,12 +263,14 @@ class RoomListHandler(BaseHandler): class RoomListNextBatch(namedtuple("RoomListNextBatch", ( "stream_ordering", # stream_ordering of the first public room list + "public_room_stream_id", # public room stream id for first public room list "current_limit", # The number of previous rooms returned "direction_is_forward", # Bool if this is a next_batch, false if prev_batch ))): KEY_DICT = { "stream_ordering": "s", + "public_room_stream_id": "p", "current_limit": "n", "direction_is_forward": "d", } diff --git a/synapse/storage/room.py b/synapse/storage/room.py index ef0d79891e..8aa4545939 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -255,3 +255,55 @@ class RoomStore(SQLBaseStore): }, desc="add_event_report" ) + + def get_current_public_room_stream_id(self): + return self._public_room_id_gen.get_current_token() + + def get_public_room_ids_at_stream_id(self, stream_id): + return self.runInteraction( + "get_public_room_ids_at_stream_id", + self.get_public_room_ids_at_stream_id_txn, stream_id + ) + + def get_public_room_ids_at_stream_id_txn(self, txn, stream_id): + return { + rm + for rm, vis in self.get_published_at_stream_id_txn(txn, stream_id).items() + if vis + } + + def get_published_at_stream_id_txn(self, txn, stream_id): + sql = (""" + SELECT room_id, visibility FROM public_room_list_stream + INNER JOIN ( + SELECT room_id, max(stream_id) AS stream_id + FROM public_room_list_stream + WHERE stream_id <= ? + GROUP BY room_id + ) grouped USING (room_id, stream_id) + """) + + txn.execute(sql, (stream_id,)) + return dict(txn.fetchall()) + + def get_public_room_changes(self, prev_stream_id, new_stream_id): + def get_public_room_changes_txn(txn): + then_rooms = self.get_public_room_ids_at_stream_id_txn(txn, prev_stream_id) + + now_rooms_dict = self.get_published_at_stream_id_txn(txn, new_stream_id) + + now_rooms_visible = set( + rm for rm, vis in now_rooms_dict.items() if vis + ) + now_rooms_not_visible = set( + rm for rm, vis in now_rooms_dict.items() if not vis + ) + + newly_visible = now_rooms_visible - then_rooms + newly_unpublished = now_rooms_not_visible & then_rooms + + return newly_visible, newly_unpublished + + return self.runInteraction( + "get_public_room_changes", get_public_room_changes_txn + ) -- cgit 1.4.1 From c33b25fd8d7a91652dc7f887178a47343c6c9ed2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Sep 2016 17:35:20 +0100 Subject: Change the way we calculate new_limit in /publicRooms and add POST API --- synapse/handlers/room_list.py | 48 +++++++++++++++++++++++++++++------------- synapse/rest/client/v1/room.py | 29 +++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 15 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 28bc35f8a3..c5ee3b3cd9 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -39,7 +39,8 @@ class RoomListHandler(BaseHandler): super(RoomListHandler, self).__init__(hs) self.response_cache = ResponseCache(hs) - def get_local_public_room_list(self, limit=None, since_token=None): + def get_local_public_room_list(self, limit=None, since_token=None, + search_filter=None): result = self.response_cache.get((limit, since_token)) if not result: result = self.response_cache.set( @@ -49,7 +50,8 @@ class RoomListHandler(BaseHandler): return result @defer.inlineCallbacks - def _get_public_room_list(self, limit=None, since_token=None): + def _get_public_room_list(self, limit=None, since_token=None, + search_filter=None): if since_token and since_token != "END": since_token = RoomListNextBatch.from_token(since_token) else: @@ -115,22 +117,18 @@ class RoomListHandler(BaseHandler): sorted_rooms = sorted_rooms[:since_token.current_limit] sorted_rooms.reverse() - new_limit = None - if limit: - if sorted_rooms[limit:]: - new_limit = limit - if since_token: - if since_token.direction_is_forward: - new_limit += since_token.current_limit - else: - new_limit = since_token.current_limit - new_limit - new_limit = max(0, new_limit) - sorted_rooms = sorted_rooms[:limit] + rooms_to_scan = sorted_rooms + if limit and not search_filter: + rooms_to_scan = sorted_rooms[:limit] chunk = [] @defer.inlineCallbacks def handle_room(room_id): + if limit and len(chunk) > limit: + # We've already got enough, so lets just drop it. + return + num_joined_users = rooms_to_num_joined[room_id] if num_joined_users == 0: return @@ -212,10 +210,29 @@ class RoomListHandler(BaseHandler): chunk.append(result) - yield concurrently_execute(handle_room, sorted_rooms, 10) + yield concurrently_execute(handle_room, rooms_to_scan, 10) chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"])) + new_limit = None + if chunk: + addition = 1 + if since_token: + addition += since_token.current_limit + + if not since_token or since_token.direction_is_forward: + last_room_id = chunk[-1]["room_id"] + else: + last_room_id = chunk[0]["room_id"] + addition *= -1 + + try: + new_limit = sorted_rooms.index(last_room_id) + addition + if new_limit >= len(sorted_rooms): + new_limit = None + except ValueError: + pass + results = { "chunk": chunk, } @@ -253,7 +270,8 @@ class RoomListHandler(BaseHandler): defer.returnValue(results) @defer.inlineCallbacks - def get_remote_public_room_list(self, server_name, limit=None, since_token=None): + def get_remote_public_room_list(self, server_name, limit=None, since_token=None, + search_filter=None): res = yield self.hs.get_replication_layer().get_public_rooms( server_name, limit=limit, since_token=since_token, ) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index db0cd4380a..be14ae1061 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -337,6 +337,35 @@ class PublicRoomListRestServlet(ClientV1RestServlet): defer.returnValue((200, data)) + @defer.inlineCallbacks + def on_POST(self, request): + # FIXME + # yield self.auth.get_user_by_req(request) + + server = parse_string(request, "server", default=None) + content = parse_json_object_from_request(request) + + limit = int(content.get("limit", 100)) + since_token = content.get("since", None) + search_filter = content.get("filter", None) + + handler = self.hs.get_room_list_handler() + if server: + data = yield handler.get_remote_public_room_list( + server, + limit=limit, + since_token=since_token, + search_filter=search_filter, + ) + else: + data = yield handler.get_local_public_room_list( + limit=limit, + since_token=since_token, + search_filter=search_filter, + ) + + defer.returnValue((200, data)) + # TODO: Needs unit testing class RoomMemberListRestServlet(ClientV1RestServlet): -- cgit 1.4.1 From 098db4aa521704e8c98c369f0c6d11ca9e02d899 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Sep 2016 17:50:16 +0100 Subject: Add very basic filter API to /publicRooms --- synapse/handlers/room_list.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index c5ee3b3cd9..3a75176ef4 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -45,7 +45,7 @@ class RoomListHandler(BaseHandler): if not result: result = self.response_cache.set( (limit, since_token), - self._get_public_room_list(limit, since_token) + self._get_public_room_list(limit, since_token, search_filter) ) return result @@ -119,13 +119,13 @@ class RoomListHandler(BaseHandler): rooms_to_scan = sorted_rooms if limit and not search_filter: - rooms_to_scan = sorted_rooms[:limit] + rooms_to_scan = sorted_rooms[:limit + 1] chunk = [] @defer.inlineCallbacks def handle_room(room_id): - if limit and len(chunk) > limit: + if limit and len(chunk) > limit + 1: # We've already got enough, so lets just drop it. return @@ -208,14 +208,27 @@ class RoomListHandler(BaseHandler): if avatar_url: result["avatar_url"] = avatar_url - chunk.append(result) + logger.info("search_filter: %r", search_filter) + if search_filter and search_filter.get("generic_search_term", None): + generic_search_term = search_filter["generic_search_term"] + if generic_search_term in result.get("name", ""): + chunk.append(result) + elif generic_search_term in result.get("topic", ""): + chunk.append(result) + elif generic_search_term in result.get("canonical_alias", ""): + chunk.append(result) + else: + chunk.append(result) yield concurrently_execute(handle_room, rooms_to_scan, 10) chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"])) new_limit = None - if chunk: + if chunk and (not limit or len(chunk) > limit): + if limit: + chunk = chunk[:limit] + addition = 1 if since_token: addition += since_token.current_limit -- cgit 1.4.1 From 74d4cdee250e4134dd14e4edfb8f5833e28c02b0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Sep 2016 09:05:11 +0100 Subject: Don't cache searched in /publicRooms --- synapse/handlers/room_list.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 3a75176ef4..37213f4bd8 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -41,11 +41,15 @@ class RoomListHandler(BaseHandler): def get_local_public_room_list(self, limit=None, since_token=None, search_filter=None): + if search_filter: + # We explicitly don't bother caching searches. + return self._get_public_room_list(limit, since_token, search_filter) + result = self.response_cache.get((limit, since_token)) if not result: result = self.response_cache.set( (limit, since_token), - self._get_public_room_list(limit, since_token, search_filter) + self._get_public_room_list(limit, since_token) ) return result -- cgit 1.4.1 From e58a9d781c7808b66f6eda221c9ce91ccd3cd8d6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Sep 2016 10:19:32 +0100 Subject: Filter remote rooms lists locally --- synapse/handlers/room_list.py | 34 ++++++++++++++++++++++++---------- synapse/storage/event_federation.py | 2 +- 2 files changed, 25 insertions(+), 11 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 37213f4bd8..9383f2486c 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -212,16 +212,7 @@ class RoomListHandler(BaseHandler): if avatar_url: result["avatar_url"] = avatar_url - logger.info("search_filter: %r", search_filter) - if search_filter and search_filter.get("generic_search_term", None): - generic_search_term = search_filter["generic_search_term"] - if generic_search_term in result.get("name", ""): - chunk.append(result) - elif generic_search_term in result.get("topic", ""): - chunk.append(result) - elif generic_search_term in result.get("canonical_alias", ""): - chunk.append(result) - else: + if _matches_room_entry(result, search_filter): chunk.append(result) yield concurrently_execute(handle_room, rooms_to_scan, 10) @@ -291,8 +282,16 @@ class RoomListHandler(BaseHandler): search_filter=None): res = yield self.hs.get_replication_layer().get_public_rooms( server_name, limit=limit, since_token=since_token, + search_filter=search_filter, ) + if search_filter: + res["chunk"] = [ + entry + for entry in dict(res.get("chunk", [])) + if _matches_room_entry(entry, search_filter) + ] + defer.returnValue(res) @@ -329,3 +328,18 @@ class RoomListNextBatch(namedtuple("RoomListNextBatch", ( return self._replace( **kwds ) + + +def _matches_room_entry(room_entry, search_filter): + if search_filter and search_filter.get("generic_search_term", None): + generic_search_term = search_filter["generic_search_term"] + if generic_search_term in room_entry.get("name", ""): + return True + elif generic_search_term in room_entry.get("topic", ""): + return True + elif generic_search_term in room_entry.get("canonical_alias", ""): + return True + else: + return True + + return False diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 765b5a5bcb..53289f556b 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -376,7 +376,7 @@ class EventFederationStore(SQLBaseStore): INNER JOIN ( SELECT room_id, MAX(stream_ordering) AS stream_ordering FROM stream_ordering_to_exterm - WHERE stream_ordering < ? GROUP BY room_id + WHERE stream_ordering <= ? GROUP BY room_id ) AS rms USING (room_id, stream_ordering) WHERE room_id = ? """) -- cgit 1.4.1 From 23b6701a2869d50fefbc949fbb449de07636b5b8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Sep 2016 10:24:15 +0100 Subject: Support filtering remote room lists --- synapse/federation/federation_client.py | 7 +++++-- synapse/federation/transport/client.py | 5 ++++- synapse/handlers/room_list.py | 12 +++++++++--- 3 files changed, 18 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f0a684fc13..06d0320b1a 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -718,11 +718,14 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to send to any server.") - def get_public_rooms(self, destination, limit=None, since_token=None): + def get_public_rooms(self, destination, limit=None, since_token=None, + search_filter=None): if destination == self.server_name: return - return self.transport_layer.get_public_rooms(destination, limit, since_token) + return self.transport_layer.get_public_rooms( + destination, limit, since_token, search_filter + ) @defer.inlineCallbacks def query_auth(self, destination, room_id, event_id, local_auth): diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index f508b70f11..db45c7826c 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -248,7 +248,8 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def get_public_rooms(self, remote_server, limit, since_token): + def get_public_rooms(self, remote_server, limit, since_token, + search_filter=None): path = PREFIX + "/publicRooms" args = {} @@ -257,6 +258,8 @@ class TransportLayerClient(object): if since_token: args["since"] = [since_token] + # TODO(erikj): Actually send the search_filter across federation. + response = yield self.client.get_json( destination=remote_server, path=path, diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 9383f2486c..09189edb65 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -280,17 +280,23 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_remote_public_room_list(self, server_name, limit=None, since_token=None, search_filter=None): + if search_filter: + # We currently don't support searching across federation, so we have + # to do it manually without pagination + limit = None + since_token = None + res = yield self.hs.get_replication_layer().get_public_rooms( server_name, limit=limit, since_token=since_token, search_filter=search_filter, ) if search_filter: - res["chunk"] = [ + res = {"chunk": [ entry - for entry in dict(res.get("chunk", [])) + for entry in list(res.get("chunk", [])) if _matches_room_entry(entry, search_filter) - ] + ]} defer.returnValue(res) -- cgit 1.4.1 From d84319ae1048837c766eb2e56f91eb521aeed4a6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Sep 2016 10:31:59 +0100 Subject: Add remote reoom cache --- synapse/handlers/room_list.py | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 09189edb65..ec0a293a37 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -38,6 +38,7 @@ class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) self.response_cache = ResponseCache(hs) + self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000) def get_local_public_room_list(self, limit=None, since_token=None, search_filter=None): @@ -286,9 +287,8 @@ class RoomListHandler(BaseHandler): limit = None since_token = None - res = yield self.hs.get_replication_layer().get_public_rooms( + res = yield self._get_remote_list_cached( server_name, limit=limit, since_token=since_token, - search_filter=search_filter, ) if search_filter: @@ -300,6 +300,27 @@ class RoomListHandler(BaseHandler): defer.returnValue(res) + def _get_remote_list_cached(self, server_name, limit=None, since_token=None, + search_filter=None): + repl_layer = self.hs.get_replication_layer() + if search_filter: + # We can't cache when asking for search + return repl_layer.get_public_rooms( + server_name, limit=limit, since_token=since_token, + search_filter=search_filter, + ) + + result = self.remote_response_cache.get((server_name, limit, since_token)) + if not result: + result = self.remote_response_cache.set( + (server_name, limit, since_token), + repl_layer.get_public_rooms( + server_name, limit=limit, since_token=since_token, + search_filter=search_filter, + ) + ) + return result + class RoomListNextBatch(namedtuple("RoomListNextBatch", ( "stream_ordering", # stream_ordering of the first public room list -- cgit 1.4.1 From b7b62bf9eac3f1f92a4913cfdf1f568752dcf8c0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Sep 2016 11:00:29 +0100 Subject: Comment --- synapse/handlers/room_list.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index ec0a293a37..f15987b265 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -220,6 +220,8 @@ class RoomListHandler(BaseHandler): chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"])) + # Work out the new limit of the batch for pagination, or None if we + # know there are no more results that would be returned. new_limit = None if chunk and (not limit or len(chunk) > limit): if limit: -- cgit 1.4.1 From 5336acd46fae8f16846970f2cddbeebc2a2b0e81 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 16 Sep 2016 19:02:42 +0100 Subject: Make public room search case insensitive --- synapse/handlers/room_list.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index f15987b265..9a6bbeb603 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -361,12 +361,12 @@ class RoomListNextBatch(namedtuple("RoomListNextBatch", ( def _matches_room_entry(room_entry, search_filter): if search_filter and search_filter.get("generic_search_term", None): - generic_search_term = search_filter["generic_search_term"] - if generic_search_term in room_entry.get("name", ""): + generic_search_term = search_filter["generic_search_term"].upper() + if generic_search_term in room_entry.get("name", "").upper(): return True - elif generic_search_term in room_entry.get("topic", ""): + elif generic_search_term in room_entry.get("topic", "").upper(): return True - elif generic_search_term in room_entry.get("canonical_alias", ""): + elif generic_search_term in room_entry.get("canonical_alias", "").upper(): return True else: return True -- cgit 1.4.1 From 71edaae9812671bed7ffb7f08347251612cef71f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Sat, 17 Sep 2016 14:40:28 +0100 Subject: Fix and clean up publicRooms pagination --- synapse/handlers/room_list.py | 233 ++++++++++++++++++++++-------------------- 1 file changed, 123 insertions(+), 110 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index f15987b265..a33b644d0f 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -115,134 +115,63 @@ class RoomListHandler(BaseHandler): sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1]) sorted_rooms = [room_id for room_id, _ in sorted_entries] + # `sorted_rooms` should now be a list of all public room ids that is + # stable across pagination. Therefore, we can use indices into this + # list as our pagination tokens. + + # Filter out rooms that we don't want to return + rooms_to_scan = [ + r for r in sorted_rooms + if r not in newly_unpublished and rooms_to_num_joined[room_id] > 0 + ] + if since_token: + # Filter out rooms we've already returned previously + # `since_token.current_limit` is the index of the last room we + # sent down, so we exclude it and everything before/after it. if since_token.direction_is_forward: - sorted_rooms = sorted_rooms[since_token.current_limit:] + rooms_to_scan = rooms_to_scan[since_token.current_limit + 1:] else: - sorted_rooms = sorted_rooms[:since_token.current_limit] - sorted_rooms.reverse() + rooms_to_scan = rooms_to_scan[:since_token.current_limit] + rooms_to_scan.reverse() - rooms_to_scan = sorted_rooms + # If there's not search filter just limit the range since we'll + # return the vast majority of things. if limit and not search_filter: - rooms_to_scan = sorted_rooms[:limit + 1] + rooms_to_scan = rooms_to_scan[:limit + 1] + # Actually generate the entries. _generate_room_entry will append to + # chunk but will stop if len(chunk) > limit chunk = [] - - @defer.inlineCallbacks - def handle_room(room_id): - if limit and len(chunk) > limit + 1: - # We've already got enough, so lets just drop it. - return - - num_joined_users = rooms_to_num_joined[room_id] - if num_joined_users == 0: - return - - if room_id in newly_unpublished: - return - - result = { - "room_id": room_id, - "num_joined_members": num_joined_users, - } - - current_state_ids = yield self.state_handler.get_current_state_ids(room_id) - - event_map = yield self.store.get_events([ - event_id for key, event_id in current_state_ids.items() - if key[0] in ( - EventTypes.JoinRules, - EventTypes.Name, - EventTypes.Topic, - EventTypes.CanonicalAlias, - EventTypes.RoomHistoryVisibility, - EventTypes.GuestAccess, - "m.room.avatar", - ) - ]) - - current_state = { - (ev.type, ev.state_key): ev - for ev in event_map.values() - } - - # Double check that this is actually a public room. - join_rules_event = current_state.get((EventTypes.JoinRules, "")) - if join_rules_event: - join_rule = join_rules_event.content.get("join_rule", None) - if join_rule and join_rule != JoinRules.PUBLIC: - defer.returnValue(None) - - aliases = yield self.store.get_aliases_for_room(room_id) - if aliases: - result["aliases"] = aliases - - name_event = yield current_state.get((EventTypes.Name, "")) - if name_event: - name = name_event.content.get("name", None) - if name: - result["name"] = name - - topic_event = current_state.get((EventTypes.Topic, "")) - if topic_event: - topic = topic_event.content.get("topic", None) - if topic: - result["topic"] = topic - - canonical_event = current_state.get((EventTypes.CanonicalAlias, "")) - if canonical_event: - canonical_alias = canonical_event.content.get("alias", None) - if canonical_alias: - result["canonical_alias"] = canonical_alias - - visibility_event = current_state.get((EventTypes.RoomHistoryVisibility, "")) - visibility = None - if visibility_event: - visibility = visibility_event.content.get("history_visibility", None) - result["world_readable"] = visibility == "world_readable" - - guest_event = current_state.get((EventTypes.GuestAccess, "")) - guest = None - if guest_event: - guest = guest_event.content.get("guest_access", None) - result["guest_can_join"] = guest == "can_join" - - avatar_event = current_state.get(("m.room.avatar", "")) - if avatar_event: - avatar_url = avatar_event.content.get("url", None) - if avatar_url: - result["avatar_url"] = avatar_url - - if _matches_room_entry(result, search_filter): - chunk.append(result) - - yield concurrently_execute(handle_room, rooms_to_scan, 10) + yield concurrently_execute( + lambda r: self._generate_room_entry( + r, rooms_to_num_joined[r], + chunk, limit, search_filter + ), + rooms_to_scan, 10 + ) chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"])) # Work out the new limit of the batch for pagination, or None if we # know there are no more results that would be returned. + # i.e., [since_token.current_limit..new_limit] is the batch of rooms + # we've returned (or the reverse if we paginated backwards) + # We tried to pull out limit + 1 rooms above, so if we have <= limit + # then we know there are no more results to return new_limit = None if chunk and (not limit or len(chunk) > limit): - if limit: - chunk = chunk[:limit] - - addition = 1 - if since_token: - addition += since_token.current_limit if not since_token or since_token.direction_is_forward: + if limit: + chunk = chunk[:limit] last_room_id = chunk[-1]["room_id"] else: + if limit: + chunk = chunk[-limit:] last_room_id = chunk[0]["room_id"] - addition *= -1 - try: - new_limit = sorted_rooms.index(last_room_id) + addition - if new_limit >= len(sorted_rooms): - new_limit = None - except ValueError: - pass + new_limit = sorted_rooms.index(last_room_id) results = { "chunk": chunk, @@ -252,7 +181,7 @@ class RoomListHandler(BaseHandler): results["new_rooms"] = bool(newly_visible) if not since_token or since_token.direction_is_forward: - if new_limit: + if new_limit is not None: results["next_batch"] = RoomListNextBatch( stream_ordering=stream_token, public_room_stream_id=public_room_stream_id, @@ -263,9 +192,10 @@ class RoomListHandler(BaseHandler): if since_token: results["prev_batch"] = since_token.copy_and_replace( direction_is_forward=False, + current_limit=since_token.current_limit + 1, ).to_token() else: - if new_limit: + if new_limit is not None: results["prev_batch"] = RoomListNextBatch( stream_ordering=stream_token, public_room_stream_id=public_room_stream_id, @@ -276,10 +206,93 @@ class RoomListHandler(BaseHandler): if since_token: results["next_batch"] = since_token.copy_and_replace( direction_is_forward=True, + current_limit=since_token.current_limit - 1, ).to_token() defer.returnValue(results) + @defer.inlineCallbacks + def _generate_room_entry(self, room_id, num_joined_users, chunk, limit, + search_filter): + if limit and len(chunk) > limit + 1: + # We've already got enough, so lets just drop it. + return + + result = { + "room_id": room_id, + "num_joined_members": num_joined_users, + } + + current_state_ids = yield self.state_handler.get_current_state_ids(room_id) + + event_map = yield self.store.get_events([ + event_id for key, event_id in current_state_ids.items() + if key[0] in ( + EventTypes.JoinRules, + EventTypes.Name, + EventTypes.Topic, + EventTypes.CanonicalAlias, + EventTypes.RoomHistoryVisibility, + EventTypes.GuestAccess, + "m.room.avatar", + ) + ]) + + current_state = { + (ev.type, ev.state_key): ev + for ev in event_map.values() + } + + # Double check that this is actually a public room. + join_rules_event = current_state.get((EventTypes.JoinRules, "")) + if join_rules_event: + join_rule = join_rules_event.content.get("join_rule", None) + if join_rule and join_rule != JoinRules.PUBLIC: + defer.returnValue(None) + + aliases = yield self.store.get_aliases_for_room(room_id) + if aliases: + result["aliases"] = aliases + + name_event = yield current_state.get((EventTypes.Name, "")) + if name_event: + name = name_event.content.get("name", None) + if name: + result["name"] = name + + topic_event = current_state.get((EventTypes.Topic, "")) + if topic_event: + topic = topic_event.content.get("topic", None) + if topic: + result["topic"] = topic + + canonical_event = current_state.get((EventTypes.CanonicalAlias, "")) + if canonical_event: + canonical_alias = canonical_event.content.get("alias", None) + if canonical_alias: + result["canonical_alias"] = canonical_alias + + visibility_event = current_state.get((EventTypes.RoomHistoryVisibility, "")) + visibility = None + if visibility_event: + visibility = visibility_event.content.get("history_visibility", None) + result["world_readable"] = visibility == "world_readable" + + guest_event = current_state.get((EventTypes.GuestAccess, "")) + guest = None + if guest_event: + guest = guest_event.content.get("guest_access", None) + result["guest_can_join"] = guest == "can_join" + + avatar_event = current_state.get(("m.room.avatar", "")) + if avatar_event: + avatar_url = avatar_event.content.get("url", None) + if avatar_url: + result["avatar_url"] = avatar_url + + if _matches_room_entry(result, search_filter): + chunk.append(result) + @defer.inlineCallbacks def get_remote_public_room_list(self, server_name, limit=None, since_token=None, search_filter=None): -- cgit 1.4.1 From a298331de42fd0931e9a4d068925ac8ff1c52f46 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Sat, 17 Sep 2016 14:59:40 +0100 Subject: Spelling --- synapse/handlers/room_list.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index a33b644d0f..ff7a7a06b4 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -135,7 +135,7 @@ class RoomListHandler(BaseHandler): rooms_to_scan = rooms_to_scan[:since_token.current_limit] rooms_to_scan.reverse() - # If there's not search filter just limit the range since we'll + # If there's no search filter just limit the range since we'll # return the vast majority of things. if limit and not search_filter: rooms_to_scan = rooms_to_scan[:limit + 1] -- cgit 1.4.1 From 81570abfb2eb90b7e5d8dfa319935150d4e78751 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Sat, 17 Sep 2016 18:01:54 +0100 Subject: Handle fact that _generate_room_entry may not return a room entry --- synapse/handlers/room_list.py | 37 +++++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 12 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index ff7a7a06b4..45a689ef2c 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -135,21 +135,34 @@ class RoomListHandler(BaseHandler): rooms_to_scan = rooms_to_scan[:since_token.current_limit] rooms_to_scan.reverse() - # If there's no search filter just limit the range since we'll - # return the vast majority of things. - if limit and not search_filter: - rooms_to_scan = rooms_to_scan[:limit + 1] - # Actually generate the entries. _generate_room_entry will append to # chunk but will stop if len(chunk) > limit chunk = [] - yield concurrently_execute( - lambda r: self._generate_room_entry( - r, rooms_to_num_joined[r], - chunk, limit, search_filter - ), - rooms_to_scan, 10 - ) + if limit and not search_filter: + step = limit + 1 + for i in xrange(0 , len(rooms_to_scan), step): + # We iterate here because the vast majority of cases we'll stop + # at first iteration, but occaisonally _generate_room_entry + # won't append to the chunk and so we need to loop again. + # We don't want to scan over the entire range either as that + # would potentially waste a lot of work. + yield concurrently_execute( + lambda r: self._generate_room_entry( + r, rooms_to_num_joined[r], + chunk, limit, search_filter + ), + rooms_to_scan[i:i + step], 10 + ) + if len(chunk) >= limit + 1: + break + else: + yield concurrently_execute( + lambda r: self._generate_room_entry( + r, rooms_to_num_joined[r], + chunk, limit, search_filter + ), + rooms_to_scan, 5 + ) chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"])) -- cgit 1.4.1 From 4d49e0bdfd01fcd4f3cad580e3ca1184c432c9ff Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Sat, 17 Sep 2016 18:09:22 +0100 Subject: PEP8 --- synapse/handlers/room_list.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 45a689ef2c..a9558e6314 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -140,7 +140,7 @@ class RoomListHandler(BaseHandler): chunk = [] if limit and not search_filter: step = limit + 1 - for i in xrange(0 , len(rooms_to_scan), step): + for i in xrange(0, len(rooms_to_scan), step): # We iterate here because the vast majority of cases we'll stop # at first iteration, but occaisonally _generate_room_entry # won't append to the chunk and so we need to loop again. -- cgit 1.4.1 From 87528f07561d16dbf35aeebdfcecb111ed385b4f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Sep 2016 11:46:28 +0100 Subject: Support /initialSync in synchrotron worker --- synapse/app/synchrotron.py | 6 + synapse/handlers/initial_sync.py | 443 +++++++++++++++++++++++++++++++++ synapse/handlers/message.py | 381 +--------------------------- synapse/rest/client/v1/initial_sync.py | 5 +- synapse/rest/client/v1/room.py | 4 +- synapse/server.py | 5 + 6 files changed, 461 insertions(+), 383 deletions(-) create mode 100644 synapse/handlers/initial_sync.py (limited to 'synapse/handlers') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 46d390fd0f..64b209ffe6 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -27,6 +27,8 @@ from synapse.http.server import JsonResource from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.rest.client.v2_alpha import sync from synapse.rest.client.v1 import events +from synapse.rest.client.v1.room import RoomInitialSyncRestServlet +from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore @@ -37,6 +39,7 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore +from synapse.replication.slave.storage.room import RoomStore from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine @@ -74,6 +77,7 @@ class SynchrotronSlavedStore( SlavedFilteringStore, SlavedPresenceStore, SlavedDeviceInboxStore, + RoomStore, BaseSlavedStore, ClientIpStore, # After BaseSlavedStore because the constructor is different ): @@ -296,6 +300,8 @@ class SynchrotronServer(HomeServer): resource = JsonResource(self, canonical_json=False) sync.register_servlets(self, resource) events.register_servlets(self, resource) + InitialSyncRestServlet(self).register(resource) + RoomInitialSyncRestServlet(self).register(resource) resources.update({ "/_matrix/client/r0": resource, "/_matrix/client/unstable": resource, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py new file mode 100644 index 0000000000..fbfa5a0281 --- /dev/null +++ b/synapse/handlers/initial_sync.py @@ -0,0 +1,443 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, Membership +from synapse.api.errors import AuthError, Codes +from synapse.events.utils import serialize_event +from synapse.events.validator import EventValidator +from synapse.streams.config import PaginationConfig +from synapse.types import ( + UserID, StreamToken, +) +from synapse.util import unwrapFirstError +from synapse.util.async import concurrently_execute +from synapse.util.caches.snapshot_cache import SnapshotCache +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.visibility import filter_events_for_client + +from ._base import BaseHandler + +import logging + + +logger = logging.getLogger(__name__) + + +class InitialSyncHandler(BaseHandler): + def __init__(self, hs): + super(InitialSyncHandler, self).__init__(hs) + self.hs = hs + self.state = hs.get_state_handler() + self.clock = hs.get_clock() + self.validator = EventValidator() + self.snapshot_cache = SnapshotCache() + + def snapshot_all_rooms(self, user_id=None, pagin_config=None, + as_client_event=True, include_archived=False): + """Retrieve a snapshot of all rooms the user is invited or has joined. + + This snapshot may include messages for all rooms where the user is + joined, depending on the pagination config. + + Args: + user_id (str): The ID of the user making the request. + pagin_config (synapse.api.streams.PaginationConfig): The pagination + config used to determine how many messages *PER ROOM* to return. + as_client_event (bool): True to get events in client-server format. + include_archived (bool): True to get rooms that the user has left + Returns: + A list of dicts with "room_id" and "membership" keys for all rooms + the user is currently invited or joined in on. Rooms where the user + is joined on, may return a "messages" key with messages, depending + on the specified PaginationConfig. + """ + key = ( + user_id, + pagin_config.from_token, + pagin_config.to_token, + pagin_config.direction, + pagin_config.limit, + as_client_event, + include_archived, + ) + now_ms = self.clock.time_msec() + result = self.snapshot_cache.get(now_ms, key) + if result is not None: + return result + + return self.snapshot_cache.set(now_ms, key, self._snapshot_all_rooms( + user_id, pagin_config, as_client_event, include_archived + )) + + @defer.inlineCallbacks + def _snapshot_all_rooms(self, user_id=None, pagin_config=None, + as_client_event=True, include_archived=False): + + memberships = [Membership.INVITE, Membership.JOIN] + if include_archived: + memberships.append(Membership.LEAVE) + + room_list = yield self.store.get_rooms_for_user_where_membership_is( + user_id=user_id, membership_list=memberships + ) + + user = UserID.from_string(user_id) + + rooms_ret = [] + + now_token = yield self.hs.get_event_sources().get_current_token() + + presence_stream = self.hs.get_event_sources().sources["presence"] + pagination_config = PaginationConfig(from_token=now_token) + presence, _ = yield presence_stream.get_pagination_rows( + user, pagination_config.get_source_config("presence"), None + ) + + receipt_stream = self.hs.get_event_sources().sources["receipt"] + receipt, _ = yield receipt_stream.get_pagination_rows( + user, pagination_config.get_source_config("receipt"), None + ) + + tags_by_room = yield self.store.get_tags_for_user(user_id) + + account_data, account_data_by_room = ( + yield self.store.get_account_data_for_user(user_id) + ) + + public_room_ids = yield self.store.get_public_room_ids() + + limit = pagin_config.limit + if limit is None: + limit = 10 + + @defer.inlineCallbacks + def handle_room(event): + d = { + "room_id": event.room_id, + "membership": event.membership, + "visibility": ( + "public" if event.room_id in public_room_ids + else "private" + ), + } + + if event.membership == Membership.INVITE: + time_now = self.clock.time_msec() + d["inviter"] = event.sender + + invite_event = yield self.store.get_event(event.event_id) + d["invite"] = serialize_event(invite_event, time_now, as_client_event) + + rooms_ret.append(d) + + if event.membership not in (Membership.JOIN, Membership.LEAVE): + return + + try: + if event.membership == Membership.JOIN: + room_end_token = now_token.room_key + deferred_room_state = self.state_handler.get_current_state( + event.room_id + ) + elif event.membership == Membership.LEAVE: + room_end_token = "s%d" % (event.stream_ordering,) + deferred_room_state = self.store.get_state_for_events( + [event.event_id], None + ) + deferred_room_state.addCallback( + lambda states: states[event.event_id] + ) + + (messages, token), current_state = yield preserve_context_over_deferred( + defer.gatherResults( + [ + preserve_fn(self.store.get_recent_events_for_room)( + event.room_id, + limit=limit, + end_token=room_end_token, + ), + deferred_room_state, + ] + ) + ).addErrback(unwrapFirstError) + + messages = yield filter_events_for_client( + self.store, user_id, messages + ) + + start_token = now_token.copy_and_replace("room_key", token[0]) + end_token = now_token.copy_and_replace("room_key", token[1]) + time_now = self.clock.time_msec() + + d["messages"] = { + "chunk": [ + serialize_event(m, time_now, as_client_event) + for m in messages + ], + "start": start_token.to_string(), + "end": end_token.to_string(), + } + + d["state"] = [ + serialize_event(c, time_now, as_client_event) + for c in current_state.values() + ] + + account_data_events = [] + tags = tags_by_room.get(event.room_id) + if tags: + account_data_events.append({ + "type": "m.tag", + "content": {"tags": tags}, + }) + + account_data = account_data_by_room.get(event.room_id, {}) + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + d["account_data"] = account_data_events + except: + logger.exception("Failed to get snapshot") + + yield concurrently_execute(handle_room, room_list, 10) + + account_data_events = [] + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + ret = { + "rooms": rooms_ret, + "presence": presence, + "account_data": account_data_events, + "receipts": receipt, + "end": now_token.to_string(), + } + + defer.returnValue(ret) + + @defer.inlineCallbacks + def room_initial_sync(self, requester, room_id, pagin_config=None): + """Capture the a snapshot of a room. If user is currently a member of + the room this will be what is currently in the room. If the user left + the room this will be what was in the room when they left. + + Args: + requester(Requester): The user to get a snapshot for. + room_id(str): The room to get a snapshot of. + pagin_config(synapse.streams.config.PaginationConfig): + The pagination config used to determine how many messages to + return. + Raises: + AuthError if the user wasn't in the room. + Returns: + A JSON serialisable dict with the snapshot of the room. + """ + + user_id = requester.user.to_string() + + membership, member_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id, + ) + is_peeking = member_event_id is None + + if membership == Membership.JOIN: + result = yield self._room_initial_sync_joined( + user_id, room_id, pagin_config, membership, is_peeking + ) + elif membership == Membership.LEAVE: + result = yield self._room_initial_sync_parted( + user_id, room_id, pagin_config, membership, member_event_id, is_peeking + ) + + account_data_events = [] + tags = yield self.store.get_tags_for_room(user_id, room_id) + if tags: + account_data_events.append({ + "type": "m.tag", + "content": {"tags": tags}, + }) + + account_data = yield self.store.get_account_data_for_room(user_id, room_id) + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + result["account_data"] = account_data_events + + defer.returnValue(result) + + @defer.inlineCallbacks + def _room_initial_sync_parted(self, user_id, room_id, pagin_config, + membership, member_event_id, is_peeking): + room_state = yield self.store.get_state_for_events( + [member_event_id], None + ) + + room_state = room_state[member_event_id] + + limit = pagin_config.limit if pagin_config else None + if limit is None: + limit = 10 + + stream_token = yield self.store.get_stream_token_for_event( + member_event_id + ) + + messages, token = yield self.store.get_recent_events_for_room( + room_id, + limit=limit, + end_token=stream_token + ) + + messages = yield filter_events_for_client( + self.store, user_id, messages, is_peeking=is_peeking + ) + + start_token = StreamToken.START.copy_and_replace("room_key", token[0]) + end_token = StreamToken.START.copy_and_replace("room_key", token[1]) + + time_now = self.clock.time_msec() + + defer.returnValue({ + "membership": membership, + "room_id": room_id, + "messages": { + "chunk": [serialize_event(m, time_now) for m in messages], + "start": start_token.to_string(), + "end": end_token.to_string(), + }, + "state": [serialize_event(s, time_now) for s in room_state.values()], + "presence": [], + "receipts": [], + }) + + @defer.inlineCallbacks + def _room_initial_sync_joined(self, user_id, room_id, pagin_config, + membership, is_peeking): + current_state = yield self.state.get_current_state( + room_id=room_id, + ) + + # TODO: These concurrently + time_now = self.clock.time_msec() + state = [ + serialize_event(x, time_now) + for x in current_state.values() + ] + + now_token = yield self.hs.get_event_sources().get_current_token() + + limit = pagin_config.limit if pagin_config else None + if limit is None: + limit = 10 + + room_members = [ + m for m in current_state.values() + if m.type == EventTypes.Member + and m.content["membership"] == Membership.JOIN + ] + + presence_handler = self.hs.get_presence_handler() + + @defer.inlineCallbacks + def get_presence(): + states = yield presence_handler.get_states( + [m.user_id for m in room_members], + as_event=True, + ) + + defer.returnValue(states) + + @defer.inlineCallbacks + def get_receipts(): + receipts_handler = self.hs.get_handlers().receipts_handler + receipts = yield receipts_handler.get_receipts_for_room( + room_id, + now_token.receipt_key + ) + defer.returnValue(receipts) + + presence, receipts, (messages, token) = yield defer.gatherResults( + [ + preserve_fn(get_presence)(), + preserve_fn(get_receipts)(), + preserve_fn(self.store.get_recent_events_for_room)( + room_id, + limit=limit, + end_token=now_token.room_key, + ) + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) + + messages = yield filter_events_for_client( + self.store, user_id, messages, is_peeking=is_peeking, + ) + + start_token = now_token.copy_and_replace("room_key", token[0]) + end_token = now_token.copy_and_replace("room_key", token[1]) + + time_now = self.clock.time_msec() + + ret = { + "room_id": room_id, + "messages": { + "chunk": [serialize_event(m, time_now) for m in messages], + "start": start_token.to_string(), + "end": end_token.to_string(), + }, + "state": state, + "presence": presence, + "receipts": receipts, + } + if not is_peeking: + ret["membership"] = membership + + defer.returnValue(ret) + + @defer.inlineCallbacks + def _check_in_room_or_world_readable(self, room_id, user_id): + try: + # check_user_was_in_room will return the most recent membership + # event for the user if: + # * The user is a non-guest user, and was ever in the room + # * The user is a guest user, and has joined the room + # else it will throw. + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + defer.returnValue((member_event.membership, member_event.event_id)) + return + except AuthError: + visibility = yield self.state_handler.get_current_state( + room_id, EventTypes.RoomHistoryVisibility, "" + ) + if ( + visibility and + visibility.content["history_visibility"] == "world_readable" + ): + defer.returnValue((Membership.JOIN, None)) + return + raise AuthError( + 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN + ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 178209a209..30ea9630f7 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -21,14 +21,11 @@ from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.push.action_generator import ActionGenerator -from synapse.streams.config import PaginationConfig from synapse.types import ( - UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id + UserID, RoomAlias, RoomStreamToken, get_domain_from_id ) -from synapse.util import unwrapFirstError -from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock -from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.async import run_on_reactor, ReadWriteLock +from synapse.util.logcontext import preserve_fn from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client @@ -49,7 +46,6 @@ class MessageHandler(BaseHandler): self.state = hs.get_state_handler() self.clock = hs.get_clock() self.validator = EventValidator() - self.snapshot_cache = SnapshotCache() self.pagination_lock = ReadWriteLock() @@ -392,377 +388,6 @@ class MessageHandler(BaseHandler): [serialize_event(c, now) for c in room_state.values()] ) - def snapshot_all_rooms(self, user_id=None, pagin_config=None, - as_client_event=True, include_archived=False): - """Retrieve a snapshot of all rooms the user is invited or has joined. - - This snapshot may include messages for all rooms where the user is - joined, depending on the pagination config. - - Args: - user_id (str): The ID of the user making the request. - pagin_config (synapse.api.streams.PaginationConfig): The pagination - config used to determine how many messages *PER ROOM* to return. - as_client_event (bool): True to get events in client-server format. - include_archived (bool): True to get rooms that the user has left - Returns: - A list of dicts with "room_id" and "membership" keys for all rooms - the user is currently invited or joined in on. Rooms where the user - is joined on, may return a "messages" key with messages, depending - on the specified PaginationConfig. - """ - key = ( - user_id, - pagin_config.from_token, - pagin_config.to_token, - pagin_config.direction, - pagin_config.limit, - as_client_event, - include_archived, - ) - now_ms = self.clock.time_msec() - result = self.snapshot_cache.get(now_ms, key) - if result is not None: - return result - - return self.snapshot_cache.set(now_ms, key, self._snapshot_all_rooms( - user_id, pagin_config, as_client_event, include_archived - )) - - @defer.inlineCallbacks - def _snapshot_all_rooms(self, user_id=None, pagin_config=None, - as_client_event=True, include_archived=False): - - memberships = [Membership.INVITE, Membership.JOIN] - if include_archived: - memberships.append(Membership.LEAVE) - - room_list = yield self.store.get_rooms_for_user_where_membership_is( - user_id=user_id, membership_list=memberships - ) - - user = UserID.from_string(user_id) - - rooms_ret = [] - - now_token = yield self.hs.get_event_sources().get_current_token() - - presence_stream = self.hs.get_event_sources().sources["presence"] - pagination_config = PaginationConfig(from_token=now_token) - presence, _ = yield presence_stream.get_pagination_rows( - user, pagination_config.get_source_config("presence"), None - ) - - receipt_stream = self.hs.get_event_sources().sources["receipt"] - receipt, _ = yield receipt_stream.get_pagination_rows( - user, pagination_config.get_source_config("receipt"), None - ) - - tags_by_room = yield self.store.get_tags_for_user(user_id) - - account_data, account_data_by_room = ( - yield self.store.get_account_data_for_user(user_id) - ) - - public_room_ids = yield self.store.get_public_room_ids() - - limit = pagin_config.limit - if limit is None: - limit = 10 - - @defer.inlineCallbacks - def handle_room(event): - d = { - "room_id": event.room_id, - "membership": event.membership, - "visibility": ( - "public" if event.room_id in public_room_ids - else "private" - ), - } - - if event.membership == Membership.INVITE: - time_now = self.clock.time_msec() - d["inviter"] = event.sender - - invite_event = yield self.store.get_event(event.event_id) - d["invite"] = serialize_event(invite_event, time_now, as_client_event) - - rooms_ret.append(d) - - if event.membership not in (Membership.JOIN, Membership.LEAVE): - return - - try: - if event.membership == Membership.JOIN: - room_end_token = now_token.room_key - deferred_room_state = self.state_handler.get_current_state( - event.room_id - ) - elif event.membership == Membership.LEAVE: - room_end_token = "s%d" % (event.stream_ordering,) - deferred_room_state = self.store.get_state_for_events( - [event.event_id], None - ) - deferred_room_state.addCallback( - lambda states: states[event.event_id] - ) - - (messages, token), current_state = yield preserve_context_over_deferred( - defer.gatherResults( - [ - preserve_fn(self.store.get_recent_events_for_room)( - event.room_id, - limit=limit, - end_token=room_end_token, - ), - deferred_room_state, - ] - ) - ).addErrback(unwrapFirstError) - - messages = yield filter_events_for_client( - self.store, user_id, messages - ) - - start_token = now_token.copy_and_replace("room_key", token[0]) - end_token = now_token.copy_and_replace("room_key", token[1]) - time_now = self.clock.time_msec() - - d["messages"] = { - "chunk": [ - serialize_event(m, time_now, as_client_event) - for m in messages - ], - "start": start_token.to_string(), - "end": end_token.to_string(), - } - - d["state"] = [ - serialize_event(c, time_now, as_client_event) - for c in current_state.values() - ] - - account_data_events = [] - tags = tags_by_room.get(event.room_id) - if tags: - account_data_events.append({ - "type": "m.tag", - "content": {"tags": tags}, - }) - - account_data = account_data_by_room.get(event.room_id, {}) - for account_data_type, content in account_data.items(): - account_data_events.append({ - "type": account_data_type, - "content": content, - }) - - d["account_data"] = account_data_events - except: - logger.exception("Failed to get snapshot") - - yield concurrently_execute(handle_room, room_list, 10) - - account_data_events = [] - for account_data_type, content in account_data.items(): - account_data_events.append({ - "type": account_data_type, - "content": content, - }) - - ret = { - "rooms": rooms_ret, - "presence": presence, - "account_data": account_data_events, - "receipts": receipt, - "end": now_token.to_string(), - } - - defer.returnValue(ret) - - @defer.inlineCallbacks - def room_initial_sync(self, requester, room_id, pagin_config=None): - """Capture the a snapshot of a room. If user is currently a member of - the room this will be what is currently in the room. If the user left - the room this will be what was in the room when they left. - - Args: - requester(Requester): The user to get a snapshot for. - room_id(str): The room to get a snapshot of. - pagin_config(synapse.streams.config.PaginationConfig): - The pagination config used to determine how many messages to - return. - Raises: - AuthError if the user wasn't in the room. - Returns: - A JSON serialisable dict with the snapshot of the room. - """ - - user_id = requester.user.to_string() - - membership, member_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id, - ) - is_peeking = member_event_id is None - - if membership == Membership.JOIN: - result = yield self._room_initial_sync_joined( - user_id, room_id, pagin_config, membership, is_peeking - ) - elif membership == Membership.LEAVE: - result = yield self._room_initial_sync_parted( - user_id, room_id, pagin_config, membership, member_event_id, is_peeking - ) - - account_data_events = [] - tags = yield self.store.get_tags_for_room(user_id, room_id) - if tags: - account_data_events.append({ - "type": "m.tag", - "content": {"tags": tags}, - }) - - account_data = yield self.store.get_account_data_for_room(user_id, room_id) - for account_data_type, content in account_data.items(): - account_data_events.append({ - "type": account_data_type, - "content": content, - }) - - result["account_data"] = account_data_events - - defer.returnValue(result) - - @defer.inlineCallbacks - def _room_initial_sync_parted(self, user_id, room_id, pagin_config, - membership, member_event_id, is_peeking): - room_state = yield self.store.get_state_for_events( - [member_event_id], None - ) - - room_state = room_state[member_event_id] - - limit = pagin_config.limit if pagin_config else None - if limit is None: - limit = 10 - - stream_token = yield self.store.get_stream_token_for_event( - member_event_id - ) - - messages, token = yield self.store.get_recent_events_for_room( - room_id, - limit=limit, - end_token=stream_token - ) - - messages = yield filter_events_for_client( - self.store, user_id, messages, is_peeking=is_peeking - ) - - start_token = StreamToken.START.copy_and_replace("room_key", token[0]) - end_token = StreamToken.START.copy_and_replace("room_key", token[1]) - - time_now = self.clock.time_msec() - - defer.returnValue({ - "membership": membership, - "room_id": room_id, - "messages": { - "chunk": [serialize_event(m, time_now) for m in messages], - "start": start_token.to_string(), - "end": end_token.to_string(), - }, - "state": [serialize_event(s, time_now) for s in room_state.values()], - "presence": [], - "receipts": [], - }) - - @defer.inlineCallbacks - def _room_initial_sync_joined(self, user_id, room_id, pagin_config, - membership, is_peeking): - current_state = yield self.state.get_current_state( - room_id=room_id, - ) - - # TODO: These concurrently - time_now = self.clock.time_msec() - state = [ - serialize_event(x, time_now) - for x in current_state.values() - ] - - now_token = yield self.hs.get_event_sources().get_current_token() - - limit = pagin_config.limit if pagin_config else None - if limit is None: - limit = 10 - - room_members = [ - m for m in current_state.values() - if m.type == EventTypes.Member - and m.content["membership"] == Membership.JOIN - ] - - presence_handler = self.hs.get_presence_handler() - - @defer.inlineCallbacks - def get_presence(): - states = yield presence_handler.get_states( - [m.user_id for m in room_members], - as_event=True, - ) - - defer.returnValue(states) - - @defer.inlineCallbacks - def get_receipts(): - receipts_handler = self.hs.get_handlers().receipts_handler - receipts = yield receipts_handler.get_receipts_for_room( - room_id, - now_token.receipt_key - ) - defer.returnValue(receipts) - - presence, receipts, (messages, token) = yield defer.gatherResults( - [ - preserve_fn(get_presence)(), - preserve_fn(get_receipts)(), - preserve_fn(self.store.get_recent_events_for_room)( - room_id, - limit=limit, - end_token=now_token.room_key, - ) - ], - consumeErrors=True, - ).addErrback(unwrapFirstError) - - messages = yield filter_events_for_client( - self.store, user_id, messages, is_peeking=is_peeking, - ) - - start_token = now_token.copy_and_replace("room_key", token[0]) - end_token = now_token.copy_and_replace("room_key", token[1]) - - time_now = self.clock.time_msec() - - ret = { - "room_id": room_id, - "messages": { - "chunk": [serialize_event(m, time_now) for m in messages], - "start": start_token.to_string(), - "end": end_token.to_string(), - }, - "state": state, - "presence": presence, - "receipts": receipts, - } - if not is_peeking: - ret["membership"] = membership - - defer.returnValue(ret) - @measure_func("_create_new_client_event") @defer.inlineCallbacks def _create_new_client_event(self, builder, prev_event_ids=None): diff --git a/synapse/rest/client/v1/initial_sync.py b/synapse/rest/client/v1/initial_sync.py index 113a49e539..478e21eea8 100644 --- a/synapse/rest/client/v1/initial_sync.py +++ b/synapse/rest/client/v1/initial_sync.py @@ -25,16 +25,15 @@ class InitialSyncRestServlet(ClientV1RestServlet): def __init__(self, hs): super(InitialSyncRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.initial_sync_handler = hs.get_initial_sync_handler() @defer.inlineCallbacks def on_GET(self, request): requester = yield self.auth.get_user_by_req(request) as_client_event = "raw" not in request.args pagination_config = PaginationConfig.from_request(request) - handler = self.handlers.message_handler include_archived = request.args.get("archived", None) == ["true"] - content = yield handler.snapshot_all_rooms( + content = yield self.initial_sync_handler.snapshot_all_rooms( user_id=requester.user.to_string(), pagin_config=pagination_config, as_client_event=as_client_event, diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 45287bf05b..20889e4af0 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -456,13 +456,13 @@ class RoomInitialSyncRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomInitialSyncRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.initial_sync_handler = hs.get_initial_sync_handler() @defer.inlineCallbacks def on_GET(self, request, room_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) pagination_config = PaginationConfig.from_request(request) - content = yield self.handlers.message_handler.room_initial_sync( + content = yield self.initial_sync_handler.room_initial_sync( room_id=room_id, requester=requester, pagin_config=pagination_config, diff --git a/synapse/server.py b/synapse/server.py index 69860f3d82..374124a147 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -43,6 +43,7 @@ from synapse.handlers.room_list import RoomListHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler from synapse.handlers.events import EventHandler, EventStreamHandler +from synapse.handlers.initial_sync import InitialSyncHandler from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.notifier import Notifier @@ -98,6 +99,7 @@ class HomeServer(object): 'e2e_keys_handler', 'event_handler', 'event_stream_handler', + 'initial_sync_handler', 'application_service_api', 'application_service_scheduler', 'application_service_handler', @@ -228,6 +230,9 @@ class HomeServer(object): def build_event_stream_handler(self): return EventStreamHandler(self) + def build_initial_sync_handler(self): + return InitialSyncHandler(self) + def build_event_sources(self): return EventSources(self) -- cgit 1.4.1 From 90c070c8503a380367f02f98e56b68fe07405413 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Sep 2016 13:17:08 +0100 Subject: Add total_room_count_estimate to /publicRooms --- synapse/handlers/room_list.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 5a533682c5..b04aea0110 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -125,6 +125,8 @@ class RoomListHandler(BaseHandler): if r not in newly_unpublished and rooms_to_num_joined[room_id] > 0 ] + total_room_count = len(rooms_to_scan) + if since_token: # Filter out rooms we've already returned previously # `since_token.current_limit` is the index of the last room we @@ -188,6 +190,7 @@ class RoomListHandler(BaseHandler): results = { "chunk": chunk, + "total_room_count_estimate": total_room_count, } if since_token: -- cgit 1.4.1 From 1168cbd54db059cefdf968077f6b14163de6c04c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Sep 2016 10:56:53 +0100 Subject: Allow invites via 3pid to bypass sender sig check When a server sends a third party invite another server may be the one that the inviting user registers with. In this case it is that remote server that will issue an actual invitation, and wants to do it "in the name of" the original invitee. However, the new proper invite will not be signed by the original server, and thus other servers would reject the invite if it was seen as coming from the original user. To fix this, a special case has been added to the auth rules whereby another server can send an invite "in the name of" another server's user, so long as that user had previously issued a third party invite that is now being accepted. --- synapse/api/auth.py | 17 ++++++++++++++++- synapse/handlers/federation.py | 12 ++++++------ 2 files changed, 22 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 98a50f0948..d60c1b15ae 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -72,7 +72,7 @@ class Auth(object): auth_events = { (e.type, e.state_key): e for e in auth_events.values() } - self.check(event, auth_events=auth_events, do_sig_check=False) + self.check(event, auth_events=auth_events, do_sig_check=do_sig_check) def check(self, event, auth_events, do_sig_check=True): """ Checks if this event is correctly authed. @@ -92,9 +92,21 @@ class Auth(object): raise AuthError(500, "Event has no room_id: %s" % event) sender_domain = get_domain_from_id(event.sender) + event_id_domain = get_domain_from_id(event.event_id) + + is_invite_via_3pid = ( + event.type == EventTypes.Member + and event.membership == Membership.INVITE + and "third_party_invite" in event.content + ) # Check the sender's domain has signed the event if do_sig_check and not event.signatures.get(sender_domain): + if not is_invite_via_3pid: + raise AuthError(403, "Event not signed by sender's server") + + # Check the event_id's domain has signed the event + if do_sig_check and not event.signatures.get(event_id_domain): raise AuthError(403, "Event not signed by sending server") if auth_events is None: @@ -491,6 +503,9 @@ class Auth(object): if not invite_event: return False + if invite_event.sender != event.sender: + return False + if event.user_id != invite_event.user_id: return False diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f7cb3c1bb2..a393263e1e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1922,15 +1922,15 @@ class FederationHandler(BaseHandler): original_invite = yield self.store.get_event( original_invite_id, allow_none=True ) - if not original_invite: + if original_invite: + display_name = original_invite.content["display_name"] + event_dict["content"]["third_party_invite"]["display_name"] = display_name + else: logger.info( - "Could not find invite event for third_party_invite - " - "discarding: %s" % (event_dict,) + "Could not find invite event for third_party_invite: %r", + event_dict ) - return - display_name = original_invite.content["display_name"] - event_dict["content"]["third_party_invite"]["display_name"] = display_name builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) message_handler = self.hs.get_handlers().message_handler -- cgit 1.4.1 From 2e9ee3096907573773d3f0e4ff22dd014b8253c8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Sep 2016 11:59:46 +0100 Subject: Add comments --- synapse/api/auth.py | 3 +++ synapse/handlers/federation.py | 3 +++ 2 files changed, 6 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 377bfcc482..5bd250992a 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -103,6 +103,9 @@ class Auth(object): # Check the sender's domain has signed the event if not event.signatures.get(sender_domain): + # We allow invites via 3pid to have a sender from a differnt + # HS, as the sender must match the sender of the original + # 3pid invite. This is checked further down. if not is_invite_via_3pid: raise AuthError(403, "Event not signed by sender's server") diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a393263e1e..2d801bad47 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1930,6 +1930,9 @@ class FederationHandler(BaseHandler): "Could not find invite event for third_party_invite: %r", event_dict ) + # We don't discard here as this is not the appropriate place to do + # auth checks. If we need the invite and don't have it then the + # auth check code will explode appropriately. builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) -- cgit 1.4.1 From 22578545a05944284eab3ba7646e2c1c5c36e359 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 23 Sep 2016 13:56:14 +0100 Subject: Time out typing over federation --- synapse/federation/federation_client.py | 2 - synapse/handlers/typing.py | 175 +++++++++++++++++++------------- synapse/rest/client/v1/room.py | 5 +- tests/handlers/test_typing.py | 7 +- tests/rest/client/v1/test_typing.py | 5 +- tests/utils.py | 9 +- 6 files changed, 120 insertions(+), 83 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 06d0320b1a..94e76b1978 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -136,9 +136,7 @@ class FederationClient(FederationBase): sent_edus_counter.inc() - # TODO, add errback, etc. self._transaction_queue.enqueue_edu(edu, key=key) - return defer.succeed(None) @log_function def send_device_messages(self, destination): diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 0548b81c34..505a68d142 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -16,10 +16,9 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError -from synapse.util.logcontext import ( - PreserveLoggingContext, preserve_fn, preserve_context_over_deferred, -) +from synapse.util.logcontext import preserve_fn from synapse.util.metrics import Measure +from synapse.util.wheel_timer import WheelTimer from synapse.types import UserID, get_domain_from_id import logging @@ -35,6 +34,13 @@ logger = logging.getLogger(__name__) RoomMember = namedtuple("RoomMember", ("room_id", "user_id")) +# How often we expect remote servers to resend us presence. +FEDERATION_TIMEOUT = 60 * 1000 + +# How often to resend typing across federation. +FEDERATION_PING_INTERVAL = 40 * 1000 + + class TypingHandler(object): def __init__(self, hs): self.store = hs.get_datastore() @@ -44,7 +50,10 @@ class TypingHandler(object): self.notifier = hs.get_notifier() self.state = hs.get_state_handler() + self.hs = hs + self.clock = hs.get_clock() + self.wheel_timer = WheelTimer() self.federation = hs.get_replication_layer() @@ -53,7 +62,7 @@ class TypingHandler(object): hs.get_distributor().observe("user_left_room", self.user_left_room) self._member_typing_until = {} # clock time we expect to stop - self._member_typing_timer = {} # deferreds to manage theabove + self._member_last_federation_poke = {} # map room IDs to serial numbers self._room_serials = {} @@ -61,12 +70,41 @@ class TypingHandler(object): # map room IDs to sets of users currently typing self._room_typing = {} - def tearDown(self): - """Cancels all the pending timers. - Normally this shouldn't be needed, but it's required from unit tests - to avoid a "Reactor was unclean" warning.""" - for t in self._member_typing_timer.values(): - self.clock.cancel_call_later(t) + self.clock.looping_call( + self._handle_timeouts, + 5000, + ) + + def _handle_timeouts(self): + logger.info("Handling typing timeout") + + now = self.clock.time_msec() + + members = set(self.wheel_timer.fetch(now)) + + for member in members: + if not self.is_typing(member): + # Nothing to do if they're no longer typing + continue + + until = self._member_typing_until.get(member, None) + if not until or until < now: + logger.info("Timing out typing for: %s", member.user_id) + preserve_fn(self._stopped_typing)(member) + continue + + # Check if we need to resend a keep alive over federation for this + # user. + if self.hs.is_mine_id(member.user_id): + last_fed_poke = self._member_last_federation_poke.get(member, None) + if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL < now: + preserve_fn(self._push_remote)( + member=member, + typing=True + ) + + def is_typing(self, member): + return member.user_id in self._room_typing.get(member.room_id, []) @defer.inlineCallbacks def started_typing(self, target_user, auth_user, room_id, timeout): @@ -85,23 +123,23 @@ class TypingHandler(object): "%s has started typing in %s", target_user_id, room_id ) - until = self.clock.time_msec() + timeout member = RoomMember(room_id=room_id, user_id=target_user_id) - was_present = member in self._member_typing_until + was_present = member.user_id in self._room_typing.get(room_id, set()) - if member in self._member_typing_timer: - self.clock.cancel_call_later(self._member_typing_timer[member]) + now = self.clock.time_msec() + self._member_typing_until[member] = now + timeout - def _cb(): - logger.debug( - "%s has timed out in %s", target_user.to_string(), room_id - ) - self._stopped_typing(member) + self.wheel_timer.insert( + now=now, + obj=member, + then=now + timeout, + ) - self._member_typing_until[member] = until - self._member_typing_timer[member] = self.clock.call_later( - timeout / 1000.0, _cb + self.wheel_timer.insert( + now=now, + obj=member, + then=now + FEDERATION_PING_INTERVAL, ) if was_present: @@ -109,8 +147,7 @@ class TypingHandler(object): defer.returnValue(None) yield self._push_update( - room_id=room_id, - user_id=target_user_id, + member=member, typing=True, ) @@ -133,10 +170,6 @@ class TypingHandler(object): member = RoomMember(room_id=room_id, user_id=target_user_id) - if member in self._member_typing_timer: - self.clock.cancel_call_later(self._member_typing_timer[member]) - del self._member_typing_timer[member] - yield self._stopped_typing(member) @defer.inlineCallbacks @@ -148,57 +181,53 @@ class TypingHandler(object): @defer.inlineCallbacks def _stopped_typing(self, member): - if member not in self._member_typing_until: + if member.user_id not in self._room_typing.get(member.room_id, set()): # No point defer.returnValue(None) + self._member_typing_until.pop(member, None) + self._member_last_federation_poke.pop(member, None) + yield self._push_update( - room_id=member.room_id, - user_id=member.user_id, + member=member, typing=False, ) - del self._member_typing_until[member] - - if member in self._member_typing_timer: - # Don't cancel it - either it already expired, or the real - # stopped_typing() will cancel it - del self._member_typing_timer[member] - @defer.inlineCallbacks - def _push_update(self, room_id, user_id, typing): - users = yield self.state.get_current_user_in_room(room_id) - domains = set(get_domain_from_id(u) for u in users) + def _push_update(self, member, typing): + if self.hs.is_mine_id(member.user_id): + # Only send updates for changes to our own users. + yield self._push_remote(member, typing) + + self._push_update_local( + member=member, + typing=typing + ) - deferreds = [] - for domain in domains: - if domain == self.server_name: - preserve_fn(self._push_update_local)( - room_id=room_id, - user_id=user_id, - typing=typing - ) - else: - deferreds.append(preserve_fn(self.federation.send_edu)( + @defer.inlineCallbacks + def _push_remote(self, member, typing): + users = yield self.state.get_current_user_in_room(member.room_id) + self._member_last_federation_poke[member] = self.clock.time_msec() + for domain in set(get_domain_from_id(u) for u in users): + if domain != self.server_name: + self.federation.send_edu( destination=domain, edu_type="m.typing", content={ - "room_id": room_id, - "user_id": user_id, + "room_id": member.room_id, + "user_id": member.user_id, "typing": typing, }, - key=(room_id, user_id), - )) - - yield preserve_context_over_deferred( - defer.DeferredList(deferreds, consumeErrors=True) - ) + key=member, + ) @defer.inlineCallbacks def _recv_edu(self, origin, content): room_id = content["room_id"] user_id = content["user_id"] + member = RoomMember(user_id=user_id, room_id=room_id) + # Check that the string is a valid user id user = UserID.from_string(user_id) @@ -213,26 +242,32 @@ class TypingHandler(object): domains = set(get_domain_from_id(u) for u in users) if self.server_name in domains: + logger.info("Got typing update from %s: %r", user_id, content) + now = self.clock.time_msec() + self._member_typing_until[member] = now + FEDERATION_TIMEOUT + self.wheel_timer.insert( + now=now, + obj=member, + then=now + FEDERATION_TIMEOUT, + ) self._push_update_local( - room_id=room_id, - user_id=user_id, + member=member, typing=content["typing"] ) - def _push_update_local(self, room_id, user_id, typing): - room_set = self._room_typing.setdefault(room_id, set()) + def _push_update_local(self, member, typing): + room_set = self._room_typing.setdefault(member.room_id, set()) if typing: - room_set.add(user_id) + room_set.add(member.user_id) else: - room_set.discard(user_id) + room_set.discard(member.user_id) self._latest_room_serial += 1 - self._room_serials[room_id] = self._latest_room_serial + self._room_serials[member.room_id] = self._latest_room_serial - with PreserveLoggingContext(): - self.notifier.on_new_event( - "typing_key", self._latest_room_serial, rooms=[room_id] - ) + self.notifier.on_new_event( + "typing_key", self._latest_room_serial, rooms=[member.room_id] + ) def get_all_typing_updates(self, last_id, current_id): # TODO: Work out a way to do this without scanning the entire state. diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 20889e4af0..010fbc7c32 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -705,12 +705,15 @@ class RoomTypingRestServlet(ClientV1RestServlet): yield self.presence_handler.bump_presence_active_time(requester.user) + # Limit timeout to stop people from setting silly typing timeouts. + timeout = min(content.get("timeout", 30000), 120000) + if content["typing"]: yield self.typing_handler.started_typing( target_user=target_user, auth_user=requester.user, room_id=room_id, - timeout=content.get("timeout", 30000), + timeout=timeout, ) else: yield self.typing_handler.stopped_typing( diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index ea1f0f7c33..c3108f5181 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -267,10 +267,7 @@ class TypingNotificationsTestCase(unittest.TestCase): from synapse.handlers.typing import RoomMember member = RoomMember(self.room_id, self.u_apple.to_string()) self.handler._member_typing_until[member] = 1002000 - self.handler._member_typing_timer[member] = ( - self.clock.call_later(1002, lambda: 0) - ) - self.handler._room_typing[self.room_id] = set((self.u_apple.to_string(),)) + self.handler._room_typing[self.room_id] = set([self.u_apple.to_string()]) self.assertEquals(self.event_source.get_current_key(), 0) @@ -330,7 +327,7 @@ class TypingNotificationsTestCase(unittest.TestCase): }, }]) - self.clock.advance_time(11) + self.clock.advance_time(16) self.on_new_event.assert_has_calls([ call('typing_key', 2, rooms=[self.room_id]), diff --git a/tests/rest/client/v1/test_typing.py b/tests/rest/client/v1/test_typing.py index 467f253ef6..a269e6f56e 100644 --- a/tests/rest/client/v1/test_typing.py +++ b/tests/rest/client/v1/test_typing.py @@ -105,9 +105,6 @@ class RoomTypingTestCase(RestTestCase): # Need another user to make notifications actually work yield self.join(self.room_id, user="@jim:red") - def tearDown(self): - self.hs.get_typing_handler().tearDown() - @defer.inlineCallbacks def test_set_typing(self): (code, _) = yield self.mock_resource.trigger( @@ -147,7 +144,7 @@ class RoomTypingTestCase(RestTestCase): self.assertEquals(self.event_source.get_current_key(), 1) - self.clock.advance_time(31) + self.clock.advance_time(36) self.assertEquals(self.event_source.get_current_key(), 2) diff --git a/tests/utils.py b/tests/utils.py index 915b934e94..92d470cb48 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -220,6 +220,7 @@ class MockClock(object): # list of lists of [absolute_time, callback, expired] in no particular # order self.timers = [] + self.loopers = [] def time(self): return self.now @@ -240,7 +241,7 @@ class MockClock(object): return t def looping_call(self, function, interval): - pass + self.loopers.append([function, interval / 1000., self.now]) def cancel_call_later(self, timer, ignore_errs=False): if timer[2]: @@ -269,6 +270,12 @@ class MockClock(object): else: self.timers.append(t) + for looped in self.loopers: + func, interval, last = looped + if last + interval < self.now: + func() + looped[2] = self.now + def advance_time_msec(self, ms): self.advance_time(ms / 1000.) -- cgit 1.4.1 From 655891d179da91206525054eca1aaec562c37e66 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 23 Sep 2016 15:43:34 +0100 Subject: Move FEDERATION_PING_INTERVAL timer. Update log line --- synapse/handlers/typing.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 505a68d142..08313417b2 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -53,7 +53,7 @@ class TypingHandler(object): self.hs = hs self.clock = hs.get_clock() - self.wheel_timer = WheelTimer() + self.wheel_timer = WheelTimer(bucket_size=5000) self.federation = hs.get_replication_layer() @@ -76,7 +76,7 @@ class TypingHandler(object): ) def _handle_timeouts(self): - logger.info("Handling typing timeout") + logger.info("Checking for typing timeouts") now = self.clock.time_msec() @@ -136,12 +136,6 @@ class TypingHandler(object): then=now + timeout, ) - self.wheel_timer.insert( - now=now, - obj=member, - then=now + FEDERATION_PING_INTERVAL, - ) - if was_present: # No point sending another notification defer.returnValue(None) @@ -208,6 +202,14 @@ class TypingHandler(object): def _push_remote(self, member, typing): users = yield self.state.get_current_user_in_room(member.room_id) self._member_last_federation_poke[member] = self.clock.time_msec() + + now = self.clock.time_msec() + self.wheel_timer.insert( + now=now, + obj=member, + then=now + FEDERATION_PING_INTERVAL, + ) + for domain in set(get_domain_from_id(u) for u in users): if domain != self.server_name: self.federation.send_edu( -- cgit 1.4.1 From 3027ea22b066df6282bc6535319725cbfa2704e6 Mon Sep 17 00:00:00 2001 From: Martin Weinelt Date: Wed, 21 Sep 2016 03:13:34 +0200 Subject: Restructure ldap authentication - properly parse return values of ldap bind() calls - externalize authentication methods - change control flow to be more error-resilient - unbind ldap connections in many places - improve log messages and loglevels --- synapse/handlers/auth.py | 279 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 192 insertions(+), 87 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 6986930c0d..3933ce171a 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -31,6 +31,7 @@ import simplejson try: import ldap3 + import ldap3.core.exceptions except ImportError: ldap3 = None pass @@ -504,6 +505,144 @@ class AuthHandler(BaseHandler): raise LoginError(403, "", errcode=Codes.FORBIDDEN) defer.returnValue(user_id) + def _ldap_simple_bind(self, server, localpart, password): + """ Attempt a simple bind with the credentials + given by the user against the LDAP server. + + Returns True, LDAP3Connection + if the bind was successful + Returns False, None + if an error occured + """ + + try: + # bind with the the local users ldap credentials + bind_dn = "{prop}={value},{base}".format( + prop=self.ldap_attributes['uid'], + value=localpart, + base=self.ldap_base + ) + conn = ldap3.Connection(server, bind_dn, password) + logger.debug( + "Established LDAP connection in simple bind mode: %s", + conn + ) + + if self.ldap_start_tls: + conn.start_tls() + logger.debug( + "Upgraded LDAP connection in simple bind mode through StartTLS: %s", + conn + ) + + if conn.bind(): + # GOOD: bind okay + logger.debug("LDAP Bind successful in simple bind mode.") + return True, conn + + # BAD: bind failed + logger.info( + "Binding against LDAP failed for '%s' failed: %s", + localpart, conn.result['description'] + ) + conn.unbind() + return False, None + + except ldap3.core.exceptions.LDAPException as e: + logger.warn("Error during LDAP authentication: %s", e) + return False, None + + def _ldap_authenticated_search(self, server, localpart, password): + """ Attempt to login with the preconfigured bind_dn + and then continue searching and filtering within + the base_dn + + Returns (True, LDAP3Connection) + if a single matching DN within the base was found + that matched the filter expression, and with which + a successful bind was achieved + + The LDAP3Connection returned is the instance that was used to + verify the password not the one using the configured bind_dn. + Returns (False, None) + if an error occured + """ + + try: + conn = ldap3.Connection( + server, + self.ldap_bind_dn, + self.ldap_bind_password + ) + logger.debug( + "Established LDAP connection in search mode: %s", + conn + ) + + if self.ldap_start_tls: + conn.start_tls() + logger.debug( + "Upgraded LDAP connection in search mode through StartTLS: %s", + conn + ) + + if not conn.bind(): + logger.warn( + "Binding against LDAP with `bind_dn` failed: %s", + conn.result['description'] + ) + conn.unbind() + return False, None + + # construct search_filter like (uid=localpart) + query = "({prop}={value})".format( + prop=self.ldap_attributes['uid'], + value=localpart + ) + if self.ldap_filter: + # combine with the AND expression + query = "(&{query}{filter})".format( + query=query, + filter=self.ldap_filter + ) + logger.debug( + "LDAP search filter: %s", + query + ) + conn.search( + search_base=self.ldap_base, + search_filter=query + ) + + if len(conn.response) == 1: + # GOOD: found exactly one result + user_dn = conn.response[0]['dn'] + logger.debug('LDAP search found dn: %s', user_dn) + + # unbind and simple bind with user_dn to verify the password + # Note: do not use rebind(), for some reason it did not verify + # the password for me! + conn.unbind() + return self._ldap_simple_bind(server, localpart, password) + else: + # BAD: found 0 or > 1 results, abort! + if len(conn.response) == 0: + logger.info( + "LDAP search returned no results for '%s'", + localpart + ) + else: + logger.info( + "LDAP search returned too many (%s) results for '%s'", + len(conn.response), localpart + ) + conn.unbind() + return False, None + + except ldap3.core.exceptions.LDAPException as e: + logger.warn("Error during LDAP authentication: %s", e) + return False, None + @defer.inlineCallbacks def _check_ldap_password(self, user_id, password): """ Attempt to authenticate a user against an LDAP Server @@ -516,106 +655,62 @@ class AuthHandler(BaseHandler): if not ldap3 or not self.ldap_enabled: defer.returnValue(False) - if self.ldap_mode not in LDAPMode.LIST: - raise RuntimeError( - 'Invalid ldap mode specified: {mode}'.format( - mode=self.ldap_mode - ) - ) + localpart = UserID.from_string(user_id).localpart try: server = ldap3.Server(self.ldap_uri) logger.debug( - "Attempting ldap connection with %s", + "Attempting LDAP connection with %s", self.ldap_uri ) - localpart = UserID.from_string(user_id).localpart if self.ldap_mode == LDAPMode.SIMPLE: - # bind with the the local users ldap credentials - bind_dn = "{prop}={value},{base}".format( - prop=self.ldap_attributes['uid'], - value=localpart, - base=self.ldap_base + result, conn = self._ldap_simple_bind( + server=server, localpart=localpart, password=password ) - conn = ldap3.Connection(server, bind_dn, password) logger.debug( - "Established ldap connection in simple mode: %s", + 'LDAP authentication method simple bind returned: %s (conn: %s)', + result, conn ) - - if self.ldap_start_tls: - conn.start_tls() - logger.debug( - "Upgraded ldap connection in simple mode through StartTLS: %s", - conn - ) - - conn.bind() - + if not result: + defer.returnValue(False) elif self.ldap_mode == LDAPMode.SEARCH: - # connect with preconfigured credentials and search for local user - conn = ldap3.Connection( - server, - self.ldap_bind_dn, - self.ldap_bind_password + result, conn = self._ldap_authenticated_search( + server=server, localpart=localpart, password=password ) logger.debug( - "Established ldap connection in search mode: %s", + 'LDAP auth method authenticated search returned: %s (conn: %s)', + result, conn ) - - if self.ldap_start_tls: - conn.start_tls() - logger.debug( - "Upgraded ldap connection in search mode through StartTLS: %s", - conn + if not result: + defer.returnValue(False) + else: + raise RuntimeError( + 'Invalid LDAP mode specified: {mode}'.format( + mode=self.ldap_mode ) - - conn.bind() - - # find matching dn - query = "({prop}={value})".format( - prop=self.ldap_attributes['uid'], - value=localpart ) - if self.ldap_filter: - query = "(&{query}{filter})".format( - query=query, - filter=self.ldap_filter - ) - logger.debug("ldap search filter: %s", query) - result = conn.search(self.ldap_base, query) - - if result and len(conn.response) == 1: - # found exactly one result - user_dn = conn.response[0]['dn'] - logger.debug('ldap search found dn: %s', user_dn) - - # unbind and reconnect, rebind with found dn - conn.unbind() - conn = ldap3.Connection( - server, - user_dn, - password, - auto_bind=True - ) - else: - # found 0 or > 1 results, abort! - logger.warn( - "ldap search returned unexpected (%d!=1) amount of results", - len(conn.response) - ) - defer.returnValue(False) - logger.info( - "User authenticated against ldap server: %s", - conn - ) + try: + logger.info( + "User authenticated against LDAP server: %s", + conn + ) + except NameError: + logger.warn("Authentication method yielded no LDAP connection, aborting!") + defer.returnValue(False) + + # check if user with user_id exists + if (yield self.check_user_exists(user_id)): + # exists, authentication complete + conn.unbind() + defer.returnValue(True) - # check for existing account, if none exists, create one - if not (yield self.check_user_exists(user_id)): - # query user metadata for account creation + else: + # does not exist, fetch metadata for account creation from + # existing ldap connection query = "({prop}={value})".format( prop=self.ldap_attributes['uid'], value=localpart @@ -626,9 +721,12 @@ class AuthHandler(BaseHandler): filter=query, user_filter=self.ldap_filter ) - logger.debug("ldap registration filter: %s", query) + logger.debug( + "ldap registration filter: %s", + query + ) - result = conn.search( + conn.search( search_base=self.ldap_base, search_filter=query, attributes=[ @@ -651,20 +749,27 @@ class AuthHandler(BaseHandler): # TODO: bind email, set displayname with data from ldap directory logger.info( - "ldap registration successful: %d: %s (%s, %)", + "Registration based on LDAP data was successful: %d: %s (%s, %)", user_id, localpart, name, mail ) + + defer.returnValue(True) else: - logger.warn( - "ldap registration failed: unexpected (%d!=1) amount of results", - len(conn.response) - ) + if len(conn.response) == 0: + logger.warn("LDAP registration failed, no result.") + else: + logger.warn( + "LDAP registration failed, too many results (%s)", + len(conn.response) + ) + defer.returnValue(False) - defer.returnValue(True) + defer.returnValue(False) + except ldap3.core.exceptions.LDAPException as e: logger.warn("Error during ldap authentication: %s", e) defer.returnValue(False) -- cgit 1.4.1 From 850b103b36205d2c90da46a0d7413e6033de4f94 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Oct 2016 10:27:10 +0100 Subject: Implement pluggable password auth Allows delegating the password auth to an external module. This also moves the LDAP auth to using this system, allowing it to be removed from the synapse tree entirely in the future. --- synapse/config/homeserver.py | 6 +- synapse/config/ldap.py | 100 -------- synapse/config/password_auth_providers.py | 61 +++++ synapse/handlers/auth.py | 334 ++++----------------------- synapse/util/ldap_auth_provider.py | 368 ++++++++++++++++++++++++++++++ tests/storage/test_appservice.py | 17 +- tests/utils.py | 1 + 7 files changed, 486 insertions(+), 401 deletions(-) delete mode 100644 synapse/config/ldap.py create mode 100644 synapse/config/password_auth_providers.py create mode 100644 synapse/util/ldap_auth_provider.py (limited to 'synapse/handlers') diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 79b0534b3b..0f890fc04a 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -30,7 +30,7 @@ from .saml2 import SAML2Config from .cas import CasConfig from .password import PasswordConfig from .jwt import JWTConfig -from .ldap import LDAPConfig +from .password_auth_providers import PasswordAuthProviderConfig from .emailconfig import EmailConfig from .workers import WorkerConfig @@ -39,8 +39,8 @@ class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig, AppServiceConfig, KeyConfig, SAML2Config, CasConfig, - JWTConfig, LDAPConfig, PasswordConfig, EmailConfig, - WorkerConfig,): + JWTConfig, PasswordConfig, EmailConfig, + WorkerConfig, PasswordAuthProviderConfig,): pass diff --git a/synapse/config/ldap.py b/synapse/config/ldap.py deleted file mode 100644 index d83c2230be..0000000000 --- a/synapse/config/ldap.py +++ /dev/null @@ -1,100 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 Niklas Riekenbrauck -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ._base import Config, ConfigError - - -MISSING_LDAP3 = ( - "Missing ldap3 library. This is required for LDAP Authentication." -) - - -class LDAPMode(object): - SIMPLE = "simple", - SEARCH = "search", - - LIST = (SIMPLE, SEARCH) - - -class LDAPConfig(Config): - def read_config(self, config): - ldap_config = config.get("ldap_config", {}) - - self.ldap_enabled = ldap_config.get("enabled", False) - - if self.ldap_enabled: - # verify dependencies are available - try: - import ldap3 - ldap3 # to stop unused lint - except ImportError: - raise ConfigError(MISSING_LDAP3) - - self.ldap_mode = LDAPMode.SIMPLE - - # verify config sanity - self.require_keys(ldap_config, [ - "uri", - "base", - "attributes", - ]) - - self.ldap_uri = ldap_config["uri"] - self.ldap_start_tls = ldap_config.get("start_tls", False) - self.ldap_base = ldap_config["base"] - self.ldap_attributes = ldap_config["attributes"] - - if "bind_dn" in ldap_config: - self.ldap_mode = LDAPMode.SEARCH - self.require_keys(ldap_config, [ - "bind_dn", - "bind_password", - ]) - - self.ldap_bind_dn = ldap_config["bind_dn"] - self.ldap_bind_password = ldap_config["bind_password"] - self.ldap_filter = ldap_config.get("filter", None) - - # verify attribute lookup - self.require_keys(ldap_config['attributes'], [ - "uid", - "name", - "mail", - ]) - - def require_keys(self, config, required): - missing = [key for key in required if key not in config] - if missing: - raise ConfigError( - "LDAP enabled but missing required config values: {}".format( - ", ".join(missing) - ) - ) - - def default_config(self, **kwargs): - return """\ - # ldap_config: - # enabled: true - # uri: "ldap://ldap.example.com:389" - # start_tls: true - # base: "ou=users,dc=example,dc=com" - # attributes: - # uid: "cn" - # mail: "email" - # name: "givenName" - # #bind_dn: - # #bind_password: - # #filter: "(objectClass=posixAccount)" - """ diff --git a/synapse/config/password_auth_providers.py b/synapse/config/password_auth_providers.py new file mode 100644 index 0000000000..f6d9bb1c62 --- /dev/null +++ b/synapse/config/password_auth_providers.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 Openmarket +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import Config + +import importlib + + +class PasswordAuthProviderConfig(Config): + def read_config(self, config): + self.password_providers = [] + + # We want to be backwards compatible with the old `ldap_config` + # param. + ldap_config = config.get("ldap_config", {}) + self.ldap_enabled = ldap_config.get("enabled", False) + if self.ldap_enabled: + from synapse.util.ldap_auth_provider import LdapAuthProvider + parsed_config = LdapAuthProvider.parse_config(ldap_config) + self.password_providers.append((LdapAuthProvider, parsed_config)) + + providers = config.get("password_providers", []) + for provider in providers: + # We need to import the module, and then pick the class out of + # that, so we split based on the last dot. + module, clz = provider['module'].rsplit(".", 1) + module = importlib.import_module(module) + provider_class = getattr(module, clz) + + provider_config = provider_class.parse_config(provider["config"]) + self.password_providers.append((provider_class, provider_config)) + + def default_config(self, **kwargs): + return """\ + # password_providers: + # - module: "synapse.util.ldap_auth_provider.LdapAuthProvider" + # config: + # enabled: true + # uri: "ldap://ldap.example.com:389" + # start_tls: true + # base: "ou=users,dc=example,dc=com" + # attributes: + # uid: "cn" + # mail: "email" + # name: "givenName" + # #bind_dn: + # #bind_password: + # #filter: "(objectClass=posixAccount)" + """ diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 3933ce171a..9583ae1e93 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -20,7 +20,6 @@ from synapse.api.constants import LoginType from synapse.types import UserID from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError from synapse.util.async import run_on_reactor -from synapse.config.ldap import LDAPMode from twisted.web.client import PartialDownloadError @@ -29,13 +28,6 @@ import bcrypt import pymacaroons import simplejson -try: - import ldap3 - import ldap3.core.exceptions -except ImportError: - ldap3 = None - pass - import synapse.util.stringutils as stringutils @@ -61,21 +53,14 @@ class AuthHandler(BaseHandler): self.sessions = {} self.INVALID_TOKEN_HTTP_STATUS = 401 - self.ldap_enabled = hs.config.ldap_enabled - if self.ldap_enabled: - if not ldap3: - raise RuntimeError( - 'Missing ldap3 library. This is required for LDAP Authentication.' - ) - self.ldap_mode = hs.config.ldap_mode - self.ldap_uri = hs.config.ldap_uri - self.ldap_start_tls = hs.config.ldap_start_tls - self.ldap_base = hs.config.ldap_base - self.ldap_attributes = hs.config.ldap_attributes - if self.ldap_mode == LDAPMode.SEARCH: - self.ldap_bind_dn = hs.config.ldap_bind_dn - self.ldap_bind_password = hs.config.ldap_bind_password - self.ldap_filter = hs.config.ldap_filter + account_handler = _AccountHandler( + hs, check_user_exists=self.check_user_exists + ) + + self.password_providers = [ + module(config=config, account_handler=account_handler) + for module, config in hs.config.password_providers + ] self.hs = hs # FIXME better possibility to access registrationHandler later? self.device_handler = hs.get_device_handler() @@ -477,9 +462,10 @@ class AuthHandler(BaseHandler): Raises: LoginError if the password was incorrect """ - valid_ldap = yield self._check_ldap_password(user_id, password) - if valid_ldap: - defer.returnValue(user_id) + for provider in self.password_providers: + is_valid = yield provider.check_password(user_id, password) + if is_valid: + defer.returnValue(user_id) result = yield self._check_local_password(user_id, password) defer.returnValue(result) @@ -505,275 +491,6 @@ class AuthHandler(BaseHandler): raise LoginError(403, "", errcode=Codes.FORBIDDEN) defer.returnValue(user_id) - def _ldap_simple_bind(self, server, localpart, password): - """ Attempt a simple bind with the credentials - given by the user against the LDAP server. - - Returns True, LDAP3Connection - if the bind was successful - Returns False, None - if an error occured - """ - - try: - # bind with the the local users ldap credentials - bind_dn = "{prop}={value},{base}".format( - prop=self.ldap_attributes['uid'], - value=localpart, - base=self.ldap_base - ) - conn = ldap3.Connection(server, bind_dn, password) - logger.debug( - "Established LDAP connection in simple bind mode: %s", - conn - ) - - if self.ldap_start_tls: - conn.start_tls() - logger.debug( - "Upgraded LDAP connection in simple bind mode through StartTLS: %s", - conn - ) - - if conn.bind(): - # GOOD: bind okay - logger.debug("LDAP Bind successful in simple bind mode.") - return True, conn - - # BAD: bind failed - logger.info( - "Binding against LDAP failed for '%s' failed: %s", - localpart, conn.result['description'] - ) - conn.unbind() - return False, None - - except ldap3.core.exceptions.LDAPException as e: - logger.warn("Error during LDAP authentication: %s", e) - return False, None - - def _ldap_authenticated_search(self, server, localpart, password): - """ Attempt to login with the preconfigured bind_dn - and then continue searching and filtering within - the base_dn - - Returns (True, LDAP3Connection) - if a single matching DN within the base was found - that matched the filter expression, and with which - a successful bind was achieved - - The LDAP3Connection returned is the instance that was used to - verify the password not the one using the configured bind_dn. - Returns (False, None) - if an error occured - """ - - try: - conn = ldap3.Connection( - server, - self.ldap_bind_dn, - self.ldap_bind_password - ) - logger.debug( - "Established LDAP connection in search mode: %s", - conn - ) - - if self.ldap_start_tls: - conn.start_tls() - logger.debug( - "Upgraded LDAP connection in search mode through StartTLS: %s", - conn - ) - - if not conn.bind(): - logger.warn( - "Binding against LDAP with `bind_dn` failed: %s", - conn.result['description'] - ) - conn.unbind() - return False, None - - # construct search_filter like (uid=localpart) - query = "({prop}={value})".format( - prop=self.ldap_attributes['uid'], - value=localpart - ) - if self.ldap_filter: - # combine with the AND expression - query = "(&{query}{filter})".format( - query=query, - filter=self.ldap_filter - ) - logger.debug( - "LDAP search filter: %s", - query - ) - conn.search( - search_base=self.ldap_base, - search_filter=query - ) - - if len(conn.response) == 1: - # GOOD: found exactly one result - user_dn = conn.response[0]['dn'] - logger.debug('LDAP search found dn: %s', user_dn) - - # unbind and simple bind with user_dn to verify the password - # Note: do not use rebind(), for some reason it did not verify - # the password for me! - conn.unbind() - return self._ldap_simple_bind(server, localpart, password) - else: - # BAD: found 0 or > 1 results, abort! - if len(conn.response) == 0: - logger.info( - "LDAP search returned no results for '%s'", - localpart - ) - else: - logger.info( - "LDAP search returned too many (%s) results for '%s'", - len(conn.response), localpart - ) - conn.unbind() - return False, None - - except ldap3.core.exceptions.LDAPException as e: - logger.warn("Error during LDAP authentication: %s", e) - return False, None - - @defer.inlineCallbacks - def _check_ldap_password(self, user_id, password): - """ Attempt to authenticate a user against an LDAP Server - and register an account if none exists. - - Returns: - True if authentication against LDAP was successful - """ - - if not ldap3 or not self.ldap_enabled: - defer.returnValue(False) - - localpart = UserID.from_string(user_id).localpart - - try: - server = ldap3.Server(self.ldap_uri) - logger.debug( - "Attempting LDAP connection with %s", - self.ldap_uri - ) - - if self.ldap_mode == LDAPMode.SIMPLE: - result, conn = self._ldap_simple_bind( - server=server, localpart=localpart, password=password - ) - logger.debug( - 'LDAP authentication method simple bind returned: %s (conn: %s)', - result, - conn - ) - if not result: - defer.returnValue(False) - elif self.ldap_mode == LDAPMode.SEARCH: - result, conn = self._ldap_authenticated_search( - server=server, localpart=localpart, password=password - ) - logger.debug( - 'LDAP auth method authenticated search returned: %s (conn: %s)', - result, - conn - ) - if not result: - defer.returnValue(False) - else: - raise RuntimeError( - 'Invalid LDAP mode specified: {mode}'.format( - mode=self.ldap_mode - ) - ) - - try: - logger.info( - "User authenticated against LDAP server: %s", - conn - ) - except NameError: - logger.warn("Authentication method yielded no LDAP connection, aborting!") - defer.returnValue(False) - - # check if user with user_id exists - if (yield self.check_user_exists(user_id)): - # exists, authentication complete - conn.unbind() - defer.returnValue(True) - - else: - # does not exist, fetch metadata for account creation from - # existing ldap connection - query = "({prop}={value})".format( - prop=self.ldap_attributes['uid'], - value=localpart - ) - - if self.ldap_mode == LDAPMode.SEARCH and self.ldap_filter: - query = "(&{filter}{user_filter})".format( - filter=query, - user_filter=self.ldap_filter - ) - logger.debug( - "ldap registration filter: %s", - query - ) - - conn.search( - search_base=self.ldap_base, - search_filter=query, - attributes=[ - self.ldap_attributes['name'], - self.ldap_attributes['mail'] - ] - ) - - if len(conn.response) == 1: - attrs = conn.response[0]['attributes'] - mail = attrs[self.ldap_attributes['mail']][0] - name = attrs[self.ldap_attributes['name']][0] - - # create account - registration_handler = self.hs.get_handlers().registration_handler - user_id, access_token = ( - yield registration_handler.register(localpart=localpart) - ) - - # TODO: bind email, set displayname with data from ldap directory - - logger.info( - "Registration based on LDAP data was successful: %d: %s (%s, %)", - user_id, - localpart, - name, - mail - ) - - defer.returnValue(True) - else: - if len(conn.response) == 0: - logger.warn("LDAP registration failed, no result.") - else: - logger.warn( - "LDAP registration failed, too many results (%s)", - len(conn.response) - ) - - defer.returnValue(False) - - defer.returnValue(False) - - except ldap3.core.exceptions.LDAPException as e: - logger.warn("Error during ldap authentication: %s", e) - defer.returnValue(False) - @defer.inlineCallbacks def issue_access_token(self, user_id, device_id=None): access_token = self.generate_access_token(user_id) @@ -911,3 +628,30 @@ class AuthHandler(BaseHandler): stored_hash.encode('utf-8')) == stored_hash else: return False + + +class _AccountHandler(object): + """A proxy object that gets passed to password auth providers so they + can register new users etc if necessary. + """ + def __init__(self, hs, check_user_exists): + self.hs = hs + + self._check_user_exists = check_user_exists + + def check_user_exists(self, user_id): + """Check if user exissts. + + Returns: + Deferred(bool) + """ + return self._check_user_exists(user_id) + + def register(self, localpart): + """Registers a new user with given localpart + + Returns: + Deferred: a 2-tuple of (user_id, access_token) + """ + reg = self.hs.get_handlers().registration_handler + return reg.register(localpart=localpart) diff --git a/synapse/util/ldap_auth_provider.py b/synapse/util/ldap_auth_provider.py new file mode 100644 index 0000000000..f852e9b037 --- /dev/null +++ b/synapse/util/ldap_auth_provider.py @@ -0,0 +1,368 @@ + +from twisted.internet import defer + +from synapse.config._base import ConfigError +from synapse.types import UserID + +import ldap3 +import ldap3.core.exceptions + +import logging + +try: + import ldap3 + import ldap3.core.exceptions +except ImportError: + ldap3 = None + pass + + +logger = logging.getLogger(__name__) + + +class LDAPMode(object): + SIMPLE = "simple", + SEARCH = "search", + + LIST = (SIMPLE, SEARCH) + + +class LdapAuthProvider(object): + __version__ = "0.1" + + def __init__(self, config, account_handler): + self.account_handler = account_handler + + if not ldap3: + raise RuntimeError( + 'Missing ldap3 library. This is required for LDAP Authentication.' + ) + + self.ldap_mode = config.mode + self.ldap_uri = config.uri + self.ldap_start_tls = config.start_tls + self.ldap_base = config.base + self.ldap_attributes = config.attributes + if self.ldap_mode == LDAPMode.SEARCH: + self.ldap_bind_dn = config.bind_dn + self.ldap_bind_password = config.bind_password + self.ldap_filter = config.filter + + @defer.inlineCallbacks + def check_password(self, user_id, password): + """ Attempt to authenticate a user against an LDAP Server + and register an account if none exists. + + Returns: + True if authentication against LDAP was successful + """ + localpart = UserID.from_string(user_id).localpart + + try: + server = ldap3.Server(self.ldap_uri) + logger.debug( + "Attempting LDAP connection with %s", + self.ldap_uri + ) + + if self.ldap_mode == LDAPMode.SIMPLE: + result, conn = self._ldap_simple_bind( + server=server, localpart=localpart, password=password + ) + logger.debug( + 'LDAP authentication method simple bind returned: %s (conn: %s)', + result, + conn + ) + if not result: + defer.returnValue(False) + elif self.ldap_mode == LDAPMode.SEARCH: + result, conn = self._ldap_authenticated_search( + server=server, localpart=localpart, password=password + ) + logger.debug( + 'LDAP auth method authenticated search returned: %s (conn: %s)', + result, + conn + ) + if not result: + defer.returnValue(False) + else: + raise RuntimeError( + 'Invalid LDAP mode specified: {mode}'.format( + mode=self.ldap_mode + ) + ) + + try: + logger.info( + "User authenticated against LDAP server: %s", + conn + ) + except NameError: + logger.warn( + "Authentication method yielded no LDAP connection, aborting!" + ) + defer.returnValue(False) + + # check if user with user_id exists + if (yield self.account_handler.check_user_exists(user_id)): + # exists, authentication complete + conn.unbind() + defer.returnValue(True) + + else: + # does not exist, fetch metadata for account creation from + # existing ldap connection + query = "({prop}={value})".format( + prop=self.ldap_attributes['uid'], + value=localpart + ) + + if self.ldap_mode == LDAPMode.SEARCH and self.ldap_filter: + query = "(&{filter}{user_filter})".format( + filter=query, + user_filter=self.ldap_filter + ) + logger.debug( + "ldap registration filter: %s", + query + ) + + conn.search( + search_base=self.ldap_base, + search_filter=query, + attributes=[ + self.ldap_attributes['name'], + self.ldap_attributes['mail'] + ] + ) + + if len(conn.response) == 1: + attrs = conn.response[0]['attributes'] + mail = attrs[self.ldap_attributes['mail']][0] + name = attrs[self.ldap_attributes['name']][0] + + # create account + user_id, access_token = ( + yield self.account_handler.register(localpart=localpart) + ) + + # TODO: bind email, set displayname with data from ldap directory + + logger.info( + "Registration based on LDAP data was successful: %d: %s (%s, %)", + user_id, + localpart, + name, + mail + ) + + defer.returnValue(True) + else: + if len(conn.response) == 0: + logger.warn("LDAP registration failed, no result.") + else: + logger.warn( + "LDAP registration failed, too many results (%s)", + len(conn.response) + ) + + defer.returnValue(False) + + defer.returnValue(False) + + except ldap3.core.exceptions.LDAPException as e: + logger.warn("Error during ldap authentication: %s", e) + defer.returnValue(False) + + @staticmethod + def parse_config(config): + class _LdapConfig(object): + pass + + ldap_config = _LdapConfig() + + ldap_config.enabled = config.get("enabled", False) + + ldap_config.mode = LDAPMode.SIMPLE + + # verify config sanity + _require_keys(config, [ + "uri", + "base", + "attributes", + ]) + + ldap_config.uri = config["uri"] + ldap_config.start_tls = config.get("start_tls", False) + ldap_config.base = config["base"] + ldap_config.attributes = config["attributes"] + + if "bind_dn" in config: + ldap_config.mode = LDAPMode.SEARCH + _require_keys(config, [ + "bind_dn", + "bind_password", + ]) + + ldap_config.bind_dn = config["bind_dn"] + ldap_config.bind_password = config["bind_password"] + ldap_config.filter = config.get("filter", None) + + # verify attribute lookup + _require_keys(config['attributes'], [ + "uid", + "name", + "mail", + ]) + + return ldap_config + + def _ldap_simple_bind(self, server, localpart, password): + """ Attempt a simple bind with the credentials + given by the user against the LDAP server. + + Returns True, LDAP3Connection + if the bind was successful + Returns False, None + if an error occured + """ + + try: + # bind with the the local users ldap credentials + bind_dn = "{prop}={value},{base}".format( + prop=self.ldap_attributes['uid'], + value=localpart, + base=self.ldap_base + ) + conn = ldap3.Connection(server, bind_dn, password) + logger.debug( + "Established LDAP connection in simple bind mode: %s", + conn + ) + + if self.ldap_start_tls: + conn.start_tls() + logger.debug( + "Upgraded LDAP connection in simple bind mode through StartTLS: %s", + conn + ) + + if conn.bind(): + # GOOD: bind okay + logger.debug("LDAP Bind successful in simple bind mode.") + return True, conn + + # BAD: bind failed + logger.info( + "Binding against LDAP failed for '%s' failed: %s", + localpart, conn.result['description'] + ) + conn.unbind() + return False, None + + except ldap3.core.exceptions.LDAPException as e: + logger.warn("Error during LDAP authentication: %s", e) + return False, None + + def _ldap_authenticated_search(self, server, localpart, password): + """ Attempt to login with the preconfigured bind_dn + and then continue searching and filtering within + the base_dn + + Returns (True, LDAP3Connection) + if a single matching DN within the base was found + that matched the filter expression, and with which + a successful bind was achieved + + The LDAP3Connection returned is the instance that was used to + verify the password not the one using the configured bind_dn. + Returns (False, None) + if an error occured + """ + + try: + conn = ldap3.Connection( + server, + self.ldap_bind_dn, + self.ldap_bind_password + ) + logger.debug( + "Established LDAP connection in search mode: %s", + conn + ) + + if self.ldap_start_tls: + conn.start_tls() + logger.debug( + "Upgraded LDAP connection in search mode through StartTLS: %s", + conn + ) + + if not conn.bind(): + logger.warn( + "Binding against LDAP with `bind_dn` failed: %s", + conn.result['description'] + ) + conn.unbind() + return False, None + + # construct search_filter like (uid=localpart) + query = "({prop}={value})".format( + prop=self.ldap_attributes['uid'], + value=localpart + ) + if self.ldap_filter: + # combine with the AND expression + query = "(&{query}{filter})".format( + query=query, + filter=self.ldap_filter + ) + logger.debug( + "LDAP search filter: %s", + query + ) + conn.search( + search_base=self.ldap_base, + search_filter=query + ) + + if len(conn.response) == 1: + # GOOD: found exactly one result + user_dn = conn.response[0]['dn'] + logger.debug('LDAP search found dn: %s', user_dn) + + # unbind and simple bind with user_dn to verify the password + # Note: do not use rebind(), for some reason it did not verify + # the password for me! + conn.unbind() + return self._ldap_simple_bind(server, localpart, password) + else: + # BAD: found 0 or > 1 results, abort! + if len(conn.response) == 0: + logger.info( + "LDAP search returned no results for '%s'", + localpart + ) + else: + logger.info( + "LDAP search returned too many (%s) results for '%s'", + len(conn.response), localpart + ) + conn.unbind() + return False, None + + except ldap3.core.exceptions.LDAPException as e: + logger.warn("Error during LDAP authentication: %s", e) + return False, None + + +def _require_keys(config, required): + missing = [key for key in required if key not in config] + if missing: + raise ConfigError( + "LDAP enabled but missing required config values: {}".format( + ", ".join(missing) + ) + ) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 3e2862daae..dfe2338554 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -37,6 +37,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): config = Mock( app_service_config_files=self.as_yaml_files, event_cache_size=1, + password_providers=[], ) hs = yield setup_test_homeserver(config=config) @@ -112,6 +113,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): config = Mock( app_service_config_files=self.as_yaml_files, event_cache_size=1, + password_providers=[], ) hs = yield setup_test_homeserver(config=config) self.db_pool = hs.get_db_pool() @@ -440,7 +442,10 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): f1 = self._write_config(suffix="1") f2 = self._write_config(suffix="2") - config = Mock(app_service_config_files=[f1, f2], event_cache_size=1) + config = Mock( + app_service_config_files=[f1, f2], event_cache_size=1, + password_providers=[] + ) hs = yield setup_test_homeserver(config=config, datastore=Mock()) ApplicationServiceStore(hs) @@ -450,7 +455,10 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): f1 = self._write_config(id="id", suffix="1") f2 = self._write_config(id="id", suffix="2") - config = Mock(app_service_config_files=[f1, f2], event_cache_size=1) + config = Mock( + app_service_config_files=[f1, f2], event_cache_size=1, + password_providers=[] + ) hs = yield setup_test_homeserver(config=config, datastore=Mock()) with self.assertRaises(ConfigError) as cm: @@ -466,7 +474,10 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): f1 = self._write_config(as_token="as_token", suffix="1") f2 = self._write_config(as_token="as_token", suffix="2") - config = Mock(app_service_config_files=[f1, f2], event_cache_size=1) + config = Mock( + app_service_config_files=[f1, f2], event_cache_size=1, + password_providers=[] + ) hs = yield setup_test_homeserver(config=config, datastore=Mock()) with self.assertRaises(ConfigError) as cm: diff --git a/tests/utils.py b/tests/utils.py index 92d470cb48..f74526b6a7 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -52,6 +52,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): config.server_name = name config.trusted_third_party_id_servers = [] config.room_invite_state_types = [] + config.password_providers = [] config.use_frozen_dicts = True config.database_config = {"name": "sqlite3"} -- cgit 1.4.1 From 9bfc61779111624e972939491a0b5c02190d3463 Mon Sep 17 00:00:00 2001 From: Patrik Oldsberg Date: Thu, 6 Oct 2016 10:43:32 +0200 Subject: storage/appservice: make appservice methods only relying on the cache synchronous --- synapse/api/auth.py | 7 +++---- synapse/handlers/appservice.py | 20 +++++++++----------- synapse/handlers/directory.py | 11 ++++------- synapse/handlers/register.py | 5 ++--- synapse/handlers/room.py | 2 +- synapse/handlers/sync.py | 2 +- synapse/rest/client/v1/register.py | 2 +- synapse/storage/appservice.py | 12 ++++++------ tests/rest/client/v2_alpha/test_register.py | 2 +- tests/storage/test_appservice.py | 9 +++------ 10 files changed, 31 insertions(+), 41 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index e75fd518be..27599124d2 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -653,7 +653,7 @@ class Auth(object): @defer.inlineCallbacks def _get_appservice_user_id(self, request): - app_service = yield self.store.get_app_service_by_token( + app_service = self.store.get_app_service_by_token( get_access_token_from_request( request, self.TOKEN_NOT_FOUND_HTTP_STATUS ) @@ -855,13 +855,12 @@ class Auth(object): } defer.returnValue(user_info) - @defer.inlineCallbacks def get_appservice_by_req(self, request): try: token = get_access_token_from_request( request, self.TOKEN_NOT_FOUND_HTTP_STATUS ) - service = yield self.store.get_app_service_by_token(token) + service = self.store.get_app_service_by_token(token) if not service: logger.warn("Unrecognised appservice access token: %s" % (token,)) raise AuthError( @@ -870,7 +869,7 @@ class Auth(object): errcode=Codes.UNKNOWN_TOKEN ) request.authenticated_entity = service.sender - defer.returnValue(service) + return defer.succeed(service) except KeyError: raise AuthError( self.TOKEN_NOT_FOUND_HTTP_STATUS, "Missing access token." diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 88fa0bb2e4..05af54d31b 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -59,7 +59,7 @@ class ApplicationServicesHandler(object): Args: current_id(int): The current maximum ID. """ - services = yield self.store.get_app_services() + services = self.store.get_app_services() if not services or not self.notify_appservices: return @@ -142,7 +142,7 @@ class ApplicationServicesHandler(object): association can be found. """ room_alias_str = room_alias.to_string() - services = yield self.store.get_app_services() + services = self.store.get_app_services() alias_query_services = [ s for s in services if ( s.is_interested_in_alias(room_alias_str) @@ -177,7 +177,7 @@ class ApplicationServicesHandler(object): @defer.inlineCallbacks def get_3pe_protocols(self, only_protocol=None): - services = yield self.store.get_app_services() + services = self.store.get_app_services() protocols = {} # Collect up all the individual protocol responses out of the ASes @@ -224,7 +224,7 @@ class ApplicationServicesHandler(object): list: A list of services interested in this event based on the service regex. """ - services = yield self.store.get_app_services() + services = self.store.get_app_services() interested_list = [ s for s in services if ( yield s.is_interested(event, self.store) @@ -232,23 +232,21 @@ class ApplicationServicesHandler(object): ] defer.returnValue(interested_list) - @defer.inlineCallbacks def _get_services_for_user(self, user_id): - services = yield self.store.get_app_services() + services = self.store.get_app_services() interested_list = [ s for s in services if ( s.is_interested_in_user(user_id) ) ] - defer.returnValue(interested_list) + return defer.succeed(interested_list) - @defer.inlineCallbacks def _get_services_for_3pn(self, protocol): - services = yield self.store.get_app_services() + services = self.store.get_app_services() interested_list = [ s for s in services if s.is_interested_in_protocol(protocol) ] - defer.returnValue(interested_list) + return defer.succeed(interested_list) @defer.inlineCallbacks def _is_unknown_user(self, user_id): @@ -264,7 +262,7 @@ class ApplicationServicesHandler(object): return # user not found; could be the AS though, so check. - services = yield self.store.get_app_services() + services = self.store.get_app_services() service_list = [s for s in services if s.sender == user_id] defer.returnValue(len(service_list) == 0) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 14352985e2..c00274afc3 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -288,13 +288,12 @@ class DirectoryHandler(BaseHandler): result = yield as_handler.query_room_alias_exists(room_alias) defer.returnValue(result) - @defer.inlineCallbacks def can_modify_alias(self, alias, user_id=None): # Any application service "interested" in an alias they are regexing on # can modify the alias. # Users can only modify the alias if ALL the interested services have # non-exclusive locks on the alias (or there are no interested services) - services = yield self.store.get_app_services() + services = self.store.get_app_services() interested_services = [ s for s in services if s.is_interested_in_alias(alias.to_string()) ] @@ -302,14 +301,12 @@ class DirectoryHandler(BaseHandler): for service in interested_services: if user_id == service.sender: # this user IS the app service so they can do whatever they like - defer.returnValue(True) - return + return defer.succeed(True) elif service.is_exclusive_alias(alias.to_string()): # another service has an exclusive lock on this alias. - defer.returnValue(False) - return + return defer.succeed(False) # either no interested services, or no service with an exclusive lock - defer.returnValue(True) + return defer.succeed(True) @defer.inlineCallbacks def _user_can_delete_alias(self, alias, user_id): diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index dd75c4fecf..19329057d5 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -194,7 +194,7 @@ class RegistrationHandler(BaseHandler): def appservice_register(self, user_localpart, as_token): user = UserID(user_localpart, self.hs.hostname) user_id = user.to_string() - service = yield self.store.get_app_service_by_token(as_token) + service = self.store.get_app_service_by_token(as_token) if not service: raise AuthError(403, "Invalid application service token.") if not service.is_interested_in_user(user_id): @@ -305,11 +305,10 @@ class RegistrationHandler(BaseHandler): # XXX: This should be a deferred list, shouldn't it? yield identity_handler.bind_threepid(c, user_id) - @defer.inlineCallbacks def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None): # valid user IDs must not clash with any user ID namespaces claimed by # application services. - services = yield self.store.get_app_services() + services = self.store.get_app_services() interested_services = [ s for s in services if s.is_interested_in_user(user_id) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index cbd26f8f95..a7f533f7be 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -437,7 +437,7 @@ class RoomEventSource(object): logger.warn("Stream has topological part!!!! %r", from_key) from_key = "s%s" % (from_token.stream,) - app_service = yield self.store.get_app_service_by_user_id( + app_service = self.store.get_app_service_by_user_id( user.to_string() ) if app_service: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b5962f4f5a..1f910ff814 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -788,7 +788,7 @@ class SyncHandler(object): assert since_token - app_service = yield self.store.get_app_service_by_user_id(user_id) + app_service = self.store.get_app_service_by_user_id(user_id) if app_service: rooms = yield self.store.get_app_service_rooms(app_service) joined_room_ids = set(r.room_id for r in rooms) diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index 3046da7aec..fe4480b363 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -391,7 +391,7 @@ class CreateUserRestServlet(ClientV1RestServlet): user_json = parse_json_object_from_request(request) access_token = get_access_token_from_request(request) - app_service = yield self.store.get_app_service_by_token( + app_service = self.store.get_app_service_by_token( access_token ) if not app_service: diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index a854a87eab..3d5994a580 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -37,7 +37,7 @@ class ApplicationServiceStore(SQLBaseStore): ) def get_app_services(self): - return defer.succeed(self.services_cache) + return self.services_cache def get_app_service_by_user_id(self, user_id): """Retrieve an application service from their user ID. @@ -54,8 +54,8 @@ class ApplicationServiceStore(SQLBaseStore): """ for service in self.services_cache: if service.sender == user_id: - return defer.succeed(service) - return defer.succeed(None) + return service + return None def get_app_service_by_token(self, token): """Get the application service with the given appservice token. @@ -67,8 +67,8 @@ class ApplicationServiceStore(SQLBaseStore): """ for service in self.services_cache: if service.token == token: - return defer.succeed(service) - return defer.succeed(None) + return service + return None def get_app_service_rooms(self, service): """Get a list of RoomsForUser for this application service. @@ -163,7 +163,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): ["as_id"] ) # NB: This assumes this class is linked with ApplicationServiceStore - as_list = yield self.get_app_services() + as_list = self.get_app_services() services = [] for res in results: diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index 8ac56a1fb2..e9cb416e4b 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -19,7 +19,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.appservice = None self.auth = Mock(get_appservice_by_req=Mock( - side_effect=lambda x: defer.succeed(self.appservice)) + side_effect=lambda x: self.appservice) ) self.auth_result = (False, None, None, None) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 3e2862daae..f3df8302da 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -71,14 +71,12 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): outfile.write(yaml.dump(as_yaml)) self.as_yaml_files.append(as_token) - @defer.inlineCallbacks def test_retrieve_unknown_service_token(self): - service = yield self.store.get_app_service_by_token("invalid_token") + service = self.store.get_app_service_by_token("invalid_token") self.assertEquals(service, None) - @defer.inlineCallbacks def test_retrieval_of_service(self): - stored_service = yield self.store.get_app_service_by_token( + stored_service = self.store.get_app_service_by_token( self.as_token ) self.assertEquals(stored_service.token, self.as_token) @@ -97,9 +95,8 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): [] ) - @defer.inlineCallbacks def test_retrieval_of_all_services(self): - services = yield self.store.get_app_services() + services = self.store.get_app_services() self.assertEquals(len(services), 3) -- cgit 1.4.1 From 2ff2d36b80dc03bda5185ff7fffbb9fdd0bf6e9a Mon Sep 17 00:00:00 2001 From: Patrik Oldsberg Date: Tue, 4 Oct 2016 21:53:35 +0200 Subject: handers: do not ratelimit app service senders Signed-off-by: Patrik Oldsberg --- synapse/handlers/_base.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index e58735294e..4981643166 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -55,8 +55,14 @@ class BaseHandler(object): def ratelimit(self, requester): time_now = self.clock.time() + user_id = requester.user.to_string() + + app_service = self.store.get_app_service_by_user_id(user_id) + if app_service is not None: + return # do not ratelimit app service senders + allowed, time_allowed = self.ratelimiter.send_message( - requester.user.to_string(), time_now, + user_id, time_now, msg_rate_hz=self.hs.config.rc_messages_per_second, burst_count=self.hs.config.rc_message_burst_count, ) -- cgit 1.4.1 From 3de7c8a4d07f8baa41eb8b17ad3e49d1737552a4 Mon Sep 17 00:00:00 2001 From: Patrik Oldsberg Date: Tue, 4 Oct 2016 22:30:51 +0200 Subject: handlers/profile: added admin override for set_displayname and set_avatar_url Signed-off-by: Patrik Oldsberg --- synapse/handlers/profile.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index d9ac09078d..87f74dfb8e 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -65,13 +65,13 @@ class ProfileHandler(BaseHandler): defer.returnValue(result["displayname"]) @defer.inlineCallbacks - def set_displayname(self, target_user, requester, new_displayname): + def set_displayname(self, target_user, requester, new_displayname, by_admin=False): """target_user is the user whose displayname is to be changed; auth_user is the user attempting to make this change.""" if not self.hs.is_mine(target_user): raise SynapseError(400, "User is not hosted on this Home Server") - if target_user != requester.user: + if not by_admin and target_user != requester.user: raise AuthError(400, "Cannot set another user's displayname") if new_displayname == '': @@ -111,13 +111,13 @@ class ProfileHandler(BaseHandler): defer.returnValue(result["avatar_url"]) @defer.inlineCallbacks - def set_avatar_url(self, target_user, requester, new_avatar_url): + def set_avatar_url(self, target_user, requester, new_avatar_url, by_admin=False): """target_user is the user whose avatar_url is to be changed; auth_user is the user attempting to make this change.""" if not self.hs.is_mine(target_user): raise SynapseError(400, "User is not hosted on this Home Server") - if target_user != requester.user: + if not by_admin and target_user != requester.user: raise AuthError(400, "Cannot set another user's avatar_url") yield self.store.set_profile_avatar_url( -- cgit 1.4.1 From 7b5546d0776d8d238f4c8775ea8878d92ecc2527 Mon Sep 17 00:00:00 2001 From: Patrik Oldsberg Date: Tue, 4 Oct 2016 22:32:58 +0200 Subject: rest/client/v1/register: use the correct requester in createUser Signed-off-by: Patrik Oldsberg --- synapse/handlers/register.py | 6 ++---- synapse/rest/client/v1/register.py | 9 ++++++--- tests/handlers/test_register.py | 8 +++++--- tests/rest/client/v1/test_register.py | 30 +++++++++--------------------- 4 files changed, 22 insertions(+), 31 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 19329057d5..7e119f13b1 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -19,7 +19,6 @@ import urllib from twisted.internet import defer -import synapse.types from synapse.api.errors import ( AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError ) @@ -370,7 +369,7 @@ class RegistrationHandler(BaseHandler): defer.returnValue(data) @defer.inlineCallbacks - def get_or_create_user(self, localpart, displayname, duration_in_ms, + def get_or_create_user(self, requester, localpart, displayname, duration_in_ms, password_hash=None): """Creates a new user if the user does not exist, else revokes all previous access tokens and generates a new one. @@ -417,9 +416,8 @@ class RegistrationHandler(BaseHandler): if displayname is not None: logger.info("setting user display name: %s -> %s", user_id, displayname) profile_handler = self.hs.get_handlers().profile_handler - requester = synapse.types.create_requester(user) yield profile_handler.set_displayname( - user, requester, displayname + user, requester, displayname, by_admin=True, ) defer.returnValue((user_id, token)) diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index fe4480b363..b5a76fefac 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -22,6 +22,7 @@ from synapse.api.auth import get_access_token_from_request from .base import ClientV1RestServlet, client_path_patterns import synapse.util.stringutils as stringutils from synapse.http.servlet import parse_json_object_from_request +from synapse.types import create_requester from synapse.util.async import run_on_reactor @@ -397,9 +398,10 @@ class CreateUserRestServlet(ClientV1RestServlet): if not app_service: raise SynapseError(403, "Invalid application service token.") - logger.debug("creating user: %s", user_json) + requester = create_requester(app_service.sender) - response = yield self._do_create(user_json) + logger.debug("creating user: %s", user_json) + response = yield self._do_create(requester, user_json) defer.returnValue((200, response)) @@ -407,7 +409,7 @@ class CreateUserRestServlet(ClientV1RestServlet): return 403, {} @defer.inlineCallbacks - def _do_create(self, user_json): + def _do_create(self, requester, user_json): yield run_on_reactor() if "localpart" not in user_json: @@ -433,6 +435,7 @@ class CreateUserRestServlet(ClientV1RestServlet): handler = self.handlers.registration_handler user_id, token = yield handler.get_or_create_user( + requester=requester, localpart=localpart, displayname=displayname, duration_in_ms=(duration_seconds * 1000), diff --git a/tests/handlers/test_register.py b/tests/handlers/test_register.py index a7de3c7c17..9c9d144690 100644 --- a/tests/handlers/test_register.py +++ b/tests/handlers/test_register.py @@ -17,7 +17,7 @@ from twisted.internet import defer from .. import unittest from synapse.handlers.register import RegistrationHandler -from synapse.types import UserID +from synapse.types import UserID, create_requester from tests.utils import setup_test_homeserver @@ -57,8 +57,9 @@ class RegistrationTestCase(unittest.TestCase): local_part = "someone" display_name = "someone" user_id = "@someone:test" + requester = create_requester("@as:test") result_user_id, result_token = yield self.handler.get_or_create_user( - local_part, display_name, duration_ms) + requester, local_part, display_name, duration_ms) self.assertEquals(result_user_id, user_id) self.assertEquals(result_token, 'secret') @@ -74,7 +75,8 @@ class RegistrationTestCase(unittest.TestCase): local_part = "frank" display_name = "Frank" user_id = "@frank:test" + requester = create_requester("@as:test") result_user_id, result_token = yield self.handler.get_or_create_user( - local_part, display_name, duration_ms) + requester, local_part, display_name, duration_ms) self.assertEquals(result_user_id, user_id) self.assertEquals(result_token, 'secret') diff --git a/tests/rest/client/v1/test_register.py b/tests/rest/client/v1/test_register.py index 4a898a034f..44ba9ff58f 100644 --- a/tests/rest/client/v1/test_register.py +++ b/tests/rest/client/v1/test_register.py @@ -31,33 +31,21 @@ class CreateUserServletTestCase(unittest.TestCase): ) self.request.args = {} - self.appservice = None - self.auth = Mock(get_appservice_by_req=Mock( - side_effect=lambda x: defer.succeed(self.appservice)) - ) + self.registration_handler = Mock() - self.auth_result = (False, None, None, None) - self.auth_handler = Mock( - check_auth=Mock(side_effect=lambda x, y, z: self.auth_result), - get_session_data=Mock(return_value=None) + self.appservice = Mock(sender="@as:test") + self.datastore = Mock( + get_app_service_by_token=Mock(return_value=self.appservice) ) - self.registration_handler = Mock() - self.identity_handler = Mock() - self.login_handler = Mock() - # do the dance to hook it up to the hs global - self.handlers = Mock( - auth_handler=self.auth_handler, + # do the dance to hook things up to the hs global + handlers = Mock( registration_handler=self.registration_handler, - identity_handler=self.identity_handler, - login_handler=self.login_handler ) self.hs = Mock() - self.hs.hostname = "supergbig~testing~thing.com" - self.hs.get_auth = Mock(return_value=self.auth) - self.hs.get_handlers = Mock(return_value=self.handlers) - self.hs.config.enable_registration = True - # init the thing we're testing + self.hs.hostname = "superbig~testing~thing.com" + self.hs.get_datastore = Mock(return_value=self.datastore) + self.hs.get_handlers = Mock(return_value=handlers) self.servlet = CreateUserRestServlet(self.hs) @defer.inlineCallbacks -- cgit 1.4.1 From 5d9546f9f491fc5395b4e524af9a44ffbf056dc4 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 6 Oct 2016 18:20:27 +0100 Subject: Interactive Auth: Return 401 from for incorrect password This requires a bit of fettling, because I want to return a helpful error message too but we don't want to distinguish between unknown user and invalid password. To avoid hardcoding the error message into 15 places in the code, I've had to refactor a few methods to return None instead of throwing. Fixes https://matrix.org/jira/browse/SYN-744 --- synapse/handlers/auth.py | 84 ++++++++++++++++++++++++++++++------------------ 1 file changed, 52 insertions(+), 32 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 6986930c0d..f731d01af8 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -58,7 +58,6 @@ class AuthHandler(BaseHandler): } self.bcrypt_rounds = hs.config.bcrypt_rounds self.sessions = {} - self.INVALID_TOKEN_HTTP_STATUS = 401 self.ldap_enabled = hs.config.ldap_enabled if self.ldap_enabled: @@ -148,13 +147,19 @@ class AuthHandler(BaseHandler): creds = session['creds'] # check auth type currently being presented + errordict = {} if 'type' in authdict: if authdict['type'] not in self.checkers: raise LoginError(400, "", Codes.UNRECOGNIZED) - result = yield self.checkers[authdict['type']](authdict, clientip) - if result: - creds[authdict['type']] = result - self._save_session(session) + try: + result = yield self.checkers[authdict['type']](authdict, clientip) + if result: + creds[authdict['type']] = result + self._save_session(session) + except LoginError, e: + # this step failed. Merge the error dict into the response + # so that the client can have another go. + errordict = e.error_dict() for f in flows: if len(set(f) - set(creds.keys())) == 0: @@ -163,6 +168,7 @@ class AuthHandler(BaseHandler): ret = self._auth_dict_for_flows(flows, session) ret['completed'] = creds.keys() + ret.update(errordict) defer.returnValue((False, ret, clientdict, session['id'])) @defer.inlineCallbacks @@ -430,37 +436,40 @@ class AuthHandler(BaseHandler): defer.Deferred: (str) canonical_user_id, or None if zero or multiple matches """ - try: - res = yield self._find_user_id_and_pwd_hash(user_id) + res = yield self._find_user_id_and_pwd_hash(user_id) + if res is not None: defer.returnValue(res[0]) - except LoginError: - defer.returnValue(None) + defer.returnValue(None) @defer.inlineCallbacks def _find_user_id_and_pwd_hash(self, user_id): """Checks to see if a user with the given id exists. Will check case - insensitively, but will throw if there are multiple inexact matches. + insensitively, but will return None if there are multiple inexact + matches. Returns: tuple: A 2-tuple of `(canonical_user_id, password_hash)` + None: if there is not exactly one match """ user_infos = yield self.store.get_users_by_id_case_insensitive(user_id) + + result = None if not user_infos: logger.warn("Attempted to login as %s but they do not exist", user_id) - raise LoginError(403, "", errcode=Codes.FORBIDDEN) - - if len(user_infos) > 1: - if user_id not in user_infos: - logger.warn( - "Attempted to login as %s but it matches more than one user " - "inexactly: %r", - user_id, user_infos.keys() - ) - raise LoginError(403, "", errcode=Codes.FORBIDDEN) - - defer.returnValue((user_id, user_infos[user_id])) + elif len(user_infos) == 1: + # a single match (possibly not exact) + result = user_infos.popitem() + elif user_id in user_infos: + # multiple matches, but one is exact + result = (user_id, user_infos[user_id]) else: - defer.returnValue(user_infos.popitem()) + # multiple matches, none of them exact + logger.warn( + "Attempted to login as %s but it matches more than one user " + "inexactly: %r", + user_id, user_infos.keys() + ) + defer.returnValue(result) @defer.inlineCallbacks def _check_password(self, user_id, password): @@ -474,34 +483,45 @@ class AuthHandler(BaseHandler): Returns: (str) the canonical_user_id Raises: - LoginError if the password was incorrect + LoginError if login fails """ valid_ldap = yield self._check_ldap_password(user_id, password) if valid_ldap: defer.returnValue(user_id) - result = yield self._check_local_password(user_id, password) - defer.returnValue(result) + canonical_user_id = yield self._check_local_password(user_id, password) + + if canonical_user_id: + defer.returnValue(canonical_user_id) + + # unknown username or invalid password. We raise a 403 here, but note + # that if we're doing user-interactive login, it turns all LoginErrors + # into a 401 anyway. + raise LoginError( + 403, "Invalid password", + errcode=Codes.FORBIDDEN + ) @defer.inlineCallbacks def _check_local_password(self, user_id, password): """Authenticate a user against the local password database. - user_id is checked case insensitively, but will throw if there are + user_id is checked case insensitively, but will return None if there are multiple inexact matches. Args: user_id (str): complete @user:id Returns: - (str) the canonical_user_id - Raises: - LoginError if the password was incorrect + (str) the canonical_user_id, or None if unknown user / bad password """ - user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id) + lookupres = yield self._find_user_id_and_pwd_hash(user_id) + if not lookupres: + defer.returnValue(None) + (user_id, password_hash) = lookupres result = self.validate_hash(password, password_hash) if not result: logger.warn("Failed password login for user %s", user_id) - raise LoginError(403, "", errcode=Codes.FORBIDDEN) + defer.returnValue(None) defer.returnValue(user_id) @defer.inlineCallbacks -- cgit 1.4.1 From fa74fcf5120998e0bdb030638ce391914198f648 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 11 Oct 2016 11:34:40 +0100 Subject: Work around email-spamming Riot bug 5d9546f9 introduced a change to synapse behaviour, in that failures in the interactive-auth process would return the flows and params data as well as an error code (as specced in https://github.com/matrix-org/matrix-doc/pull/397). That change exposed a bug in Riot which would make it request a new validation token (and send a new email) each time it got a 401 with a `flows` parameter (see https://github.com/vector-im/vector-web/issues/2447 and the fix at https://github.com/matrix-org/matrix-react-sdk/pull/510). To preserve compatibility with broken versions of Riot, grandfather in the old behaviour for the email validation stage. --- synapse/handlers/auth.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 51888d1f97..6b8de1e7cf 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -150,14 +150,25 @@ class AuthHandler(BaseHandler): # check auth type currently being presented errordict = {} if 'type' in authdict: - if authdict['type'] not in self.checkers: + login_type = authdict['type'] + if login_type not in self.checkers: raise LoginError(400, "", Codes.UNRECOGNIZED) try: - result = yield self.checkers[authdict['type']](authdict, clientip) + result = yield self.checkers[login_type](authdict, clientip) if result: - creds[authdict['type']] = result + creds[login_type] = result self._save_session(session) except LoginError, e: + if login_type == LoginType.EMAIL_IDENTITY: + # riot used to have a bug where it would request a new + # validation token (thus sending a new email) each time it + # got a 401 with a 'flows' field. + # (https://github.com/vector-im/vector-web/issues/2447). + # + # Grandfather in the old behaviour for now to avoid + # breaking old riot deployments. + raise e + # this step failed. Merge the error dict into the response # so that the client can have another go. errordict = e.error_dict() -- cgit 1.4.1