From 5130d80d79fe1f95ce03b8f1cfd4fbf0a32f5ac8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 25 Jun 2015 17:18:19 +0100 Subject: Add bulk insert events API --- synapse/handlers/federation.py | 227 ++++++++++++++++++++++------------------- 1 file changed, 121 insertions(+), 106 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b5d882fd65..079f46dffd 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -138,26 +138,29 @@ class FederationHandler(BaseHandler): if state and auth_chain is not None: # If we have any state or auth_chain given to us by the replication # layer, then we should handle them (if we haven't before.) + + event_infos = [] + for e in itertools.chain(auth_chain, state): if e.event_id in seen_ids: continue - e.internal_metadata.outlier = True - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - yield self._handle_new_event( - origin, e, auth_events=auth - ) - seen_ids.add(e.event_id) - except: - logger.exception( - "Failed to handle state event %s", - e.event_id, - ) + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + event_infos.append({ + "event": e, + "auth_events": auth, + }) + seen_ids.add(e.event_id) + + yield self._handle_new_events( + origin, + event_infos, + outliers=True + ) try: _, event_stream_id, max_stream_id = yield self._handle_new_event( @@ -292,38 +295,29 @@ class FederationHandler(BaseHandler): ).addErrback(unwrapFirstError) auth_events.update({a.event_id: a for a in results}) - yield defer.gatherResults( - [ - self._handle_new_event( - dest, a, - auth_events={ - (auth_events[a_id].type, auth_events[a_id].state_key): - auth_events[a_id] - for a_id, _ in a.auth_events - }, - ) - for a in auth_events.values() - if a.event_id not in seen_events - ], - consumeErrors=True, - ).addErrback(unwrapFirstError) - - yield defer.gatherResults( - [ - self._handle_new_event( - dest, event_map[e_id], - state=events_to_state[e_id], - backfilled=True, - auth_events={ - (auth_events[a_id].type, auth_events[a_id].state_key): - auth_events[a_id] - for a_id, _ in event_map[e_id].auth_events - }, - ) - for e_id in events_to_state - ], - consumeErrors=True - ).addErrback(unwrapFirstError) + ev_infos = [] + for a in auth_events.values(): + if a.event_id in seen_events: + continue + ev_infos.append({ + "event": a, + "auth_events": { + (auth_events[a_id].type, auth_events[a_id].state_key): + auth_events[a_id] + for a_id, _ in a.auth_events + } + }) + + for e_id in events_to_state: + ev_infos.append({ + "event": event_map[e_id], + "state": events_to_state[e_id], + "auth_events": { + (auth_events[a_id].type, auth_events[a_id].state_key): + auth_events[a_id] + for a_id, _ in event_map[e_id].auth_events + } + }) events.sort(key=lambda e: e.depth) @@ -331,10 +325,14 @@ class FederationHandler(BaseHandler): if event in events_to_state: continue - yield self._handle_new_event( - dest, event, - backfilled=True, - ) + ev_infos.append({ + "event": event, + }) + + yield self._handle_new_events( + dest, ev_infos, + backfilled=True, + ) defer.returnValue(events) @@ -600,32 +598,22 @@ class FederationHandler(BaseHandler): # FIXME pass - yield self._handle_auth_events( - origin, [e for e in auth_chain if e.event_id != event.event_id] - ) - - @defer.inlineCallbacks - def handle_state(e): + ev_infos = [] + for e in itertools.chain(state, auth_chain): if e.event_id == event.event_id: - return + continue e.internal_metadata.outlier = True - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { + auth_ids = [e_id for e_id, _ in e.auth_events] + ev_infos.append({ + "event": e, + "auth_events": { (e.type, e.state_key): e for e in auth_chain if e.event_id in auth_ids } - yield self._handle_new_event( - origin, e, auth_events=auth - ) - except: - logger.exception( - "Failed to handle state event %s", - e.event_id, - ) + }) - yield defer.DeferredList([handle_state(e) for e in state]) + yield self._handle_new_events(origin, ev_infos, outliers=True) auth_ids = [e_id for e_id, _ in event.auth_events] auth_events = { @@ -940,11 +928,54 @@ class FederationHandler(BaseHandler): def _handle_new_event(self, origin, event, state=None, backfilled=False, current_state=None, auth_events=None): - logger.debug( - "_handle_new_event: %s, sigs: %s", - event.event_id, event.signatures, + outlier = event.internal_metadata.is_outlier() + + context = yield self._prep_event( + origin, event, + state=state, + backfilled=backfilled, + current_state=current_state, + auth_events=auth_events, ) + event_stream_id, max_stream_id = yield self.store.persist_event( + event, + context=context, + backfilled=backfilled, + is_new_state=(not outlier and not backfilled), + current_state=current_state, + ) + + defer.returnValue((context, event_stream_id, max_stream_id)) + + @defer.inlineCallbacks + def _handle_new_events(self, origin, event_infos, backfilled=False, + outliers=False): + contexts = yield defer.gatherResults( + [ + self._prep_event( + origin, + ev_info["event"], + state=ev_info.get("state"), + backfilled=backfilled, + auth_events=ev_info.get("auth_events"), + ) + for ev_info in event_infos + ] + ) + + yield self.store.persist_events( + [ + (ev_info["event"], context) + for ev_info, context in itertools.izip(event_infos, contexts) + ], + backfilled=backfilled, + is_new_state=(not outliers and not backfilled), + ) + + @defer.inlineCallbacks + def _prep_event(self, origin, event, state=None, backfilled=False, + current_state=None, auth_events=None): outlier = event.internal_metadata.is_outlier() context = yield self.state_handler.compute_event_context( @@ -954,13 +985,6 @@ class FederationHandler(BaseHandler): if not auth_events: auth_events = context.current_state - logger.debug( - "_handle_new_event: %s, auth_events: %s", - event.event_id, auth_events, - ) - - is_new_state = not outlier - # This is a hack to fix some old rooms where the initial join event # didn't reference the create event in its auth events. if event.type == EventTypes.Member and not event.auth_events: @@ -984,26 +1008,7 @@ class FederationHandler(BaseHandler): context.rejected = RejectedReason.AUTH_ERROR - # FIXME: Don't store as rejected with AUTH_ERROR if we haven't - # seen all the auth events. - yield self.store.persist_event( - event, - context=context, - backfilled=backfilled, - is_new_state=False, - current_state=current_state, - ) - raise - - event_stream_id, max_stream_id = yield self.store.persist_event( - event, - context=context, - backfilled=backfilled, - is_new_state=(is_new_state and not backfilled), - current_state=current_state, - ) - - defer.returnValue((context, event_stream_id, max_stream_id)) + defer.returnValue(context) @defer.inlineCallbacks def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, @@ -1066,14 +1071,24 @@ class FederationHandler(BaseHandler): @log_function def do_auth(self, origin, event, context, auth_events): # Check if we have all the auth events. - have_events = yield self.store.have_events( - [e_id for e_id, _ in event.auth_events] - ) - + current_state = set(e.event_id for e in auth_events.values()) event_auth_events = set(e_id for e_id, _ in event.auth_events) + + if event_auth_events - current_state: + have_events = yield self.store.have_events( + event_auth_events - current_state + ) + else: + have_events = {} + + have_events.update({ + e.event_id: "" + for e in auth_events.values() + }) + seen_events = set(have_events.keys()) - missing_auth = event_auth_events - seen_events + missing_auth = event_auth_events - seen_events - current_state if missing_auth: logger.info("Missing auth: %s", missing_auth) -- cgit 1.5.1 From 0862fed2a89723e8aa6a4df9f1dbad975a7fbffc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Jul 2015 17:19:31 +0100 Subject: Add basic ReceiptHandler --- synapse/handlers/receipts.py | 130 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 synapse/handlers/receipts.py (limited to 'synapse/handlers') diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py new file mode 100644 index 0000000000..f3f7050633 --- /dev/null +++ b/synapse/handlers/receipts.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Contains handlers for federation events.""" + +from ._base import BaseHandler + +from twisted.internet import defer + +from synapse.util.logcontext import PreserveLoggingContext + +import logging + + +logger = logging.getLogger(__name__) + + +class ReceiptsHandler(BaseHandler): + def __init__(self, hs): + super(ReceiptsHandler, self).__init__(hs) + + self.federation.register_edu_handler( + "m.receipt", self._received_remote_receipt + ) + + self._latest_serial = 0 + + @defer.inlineCallbacks + def received_client_receipt(self, room_id, receipt_type, user_id, + event_id): + # 1. Persist. + # 2. Notify local clients + # 3. Notify remote servers + + receipt = { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_ids": [event_id], + } + + yield self._handle_new_receipts([receipt]) + self._push_remotes([receipt]) + + @defer.inlineCallbacks + def _received_remote_receipt(self, origin, content): + receipts = [ + { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_ids": [event_id], + } + for room_id, room_values in content.items() + for event_id, ev_values in room_values.items() + for receipt_type, users in ev_values.items() + for user_id in users + ] + + yield self._handle_new_receipts(receipts) + + @defer.inlineCallbacks + def _handle_new_receipts(self, receipts): + for receipt in receipts: + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + + stream_id, max_persisted_id = yield self.store.insert_receipt( + room_id, receipt_type, user_id, event_ids, + ) + + # TODO: Use max_persisted_id + + self._latest_serial = max(self._latest_serial, stream_id) + + with PreserveLoggingContext(): + self.notifier.on_new_user_event( + "recei[t_key", self._latest_serial, rooms=[room_id] + ) + + localusers = set() + remotedomains = set() + + rm_handler = self.homeserver.get_handlers().room_member_handler + yield rm_handler.fetch_room_distributions_into( + room_id, localusers=localusers, remotedomains=remotedomains + ) + + receipt["remotedomains"] = remotedomains + + self.notifier.on_new_user_event( + "receipt_key", self._latest_room_serial, rooms=[room_id] + ) + + def _push_remotes(self, receipts): + # TODO: Some of this stuff should be coallesced. + for receipt in receipts: + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + remotedomains = receipt["remotedomains"] + + for domain in remotedomains: + self.federation.send_edu( + destination=domain, + edu_type="m.receipt", + content={ + room_id: { + event_id: { + receipt_type: [user_id] + } + for event_id in event_ids + }, + }, + ) -- cgit 1.5.1 From bd1236c0ee5b3703de51dc773a02da92e0960d0f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Jul 2015 11:40:56 +0100 Subject: Consolidate duplicate code in notifier --- synapse/handlers/presence.py | 2 +- synapse/handlers/receipts.py | 4 ++-- synapse/handlers/typing.py | 2 +- synapse/notifier.py | 35 +++++++++++------------------------ tests/handlers/test_typing.py | 20 ++++++++++---------- 5 files changed, 25 insertions(+), 38 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7c03198313..341a516da2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -992,7 +992,7 @@ class PresenceHandler(BaseHandler): room_ids([str]): List of room_ids to notify. """ with PreserveLoggingContext(): - self.notifier.on_new_user_event( + self.notifier.on_new_event( "presence_key", self._user_cachemap_latest_serial, users_to_push, diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index f3f7050633..f0d12d35f4 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -88,7 +88,7 @@ class ReceiptsHandler(BaseHandler): self._latest_serial = max(self._latest_serial, stream_id) with PreserveLoggingContext(): - self.notifier.on_new_user_event( + self.notifier.on_new_event( "recei[t_key", self._latest_serial, rooms=[room_id] ) @@ -102,7 +102,7 @@ class ReceiptsHandler(BaseHandler): receipt["remotedomains"] = remotedomains - self.notifier.on_new_user_event( + self.notifier.on_new_event( "receipt_key", self._latest_room_serial, rooms=[room_id] ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index a9895292c2..026bd2b9d4 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -218,7 +218,7 @@ class TypingNotificationHandler(BaseHandler): self._room_serials[room_id] = self._latest_room_serial with PreserveLoggingContext(): - self.notifier.on_new_user_event( + self.notifier.on_new_event( "typing_key", self._latest_room_serial, rooms=[room_id] ) diff --git a/synapse/notifier.py b/synapse/notifier.py index f13164dbdc..85ae343135 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -221,16 +221,7 @@ class Notifier(object): event ) - room_id = event.room_id - - room_user_streams = self.room_to_user_streams.get(room_id, set()) - - user_streams = room_user_streams.copy() - - for user in extra_users: - user_stream = self.user_to_user_stream.get(str(user)) - if user_stream is not None: - user_streams.add(user_stream) + app_streams = set() for appservice in self.appservice_to_user_streams: # TODO (kegan): Redundant appservice listener checks? @@ -242,24 +233,20 @@ class Notifier(object): app_user_streams = self.appservice_to_user_streams.get( appservice, set() ) - user_streams |= app_user_streams - - logger.debug("on_new_room_event listeners %s", user_streams) + app_streams |= app_user_streams - time_now_ms = self.clock.time_msec() - for user_stream in user_streams: - try: - user_stream.notify( - "room_key", "s%d" % (room_stream_id,), time_now_ms - ) - except: - logger.exception("Failed to notify listener") + self.on_new_event( + "room_key", room_stream_id, + users=extra_users, + rooms=[event.room_id], + extra_streams=app_streams, + ) @defer.inlineCallbacks @log_function - def on_new_user_event(self, stream_key, new_token, users=[], rooms=[]): - """ Used to inform listeners that something has happend - presence/user event wise. + def on_new_event(self, stream_key, new_token, users=[], rooms=[], + extra_streams=set()): + """ Used to inform listeners that something has happend event wise. Will wake up all listeners for the given users and rooms. """ diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 7ccbe2ea9c..41bb08b7ca 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -66,8 +66,8 @@ class TypingNotificationsTestCase(unittest.TestCase): self.mock_federation_resource = MockHttpResource() - mock_notifier = Mock(spec=["on_new_user_event"]) - self.on_new_user_event = mock_notifier.on_new_user_event + mock_notifier = Mock(spec=["on_new_event"]) + self.on_new_event = mock_notifier.on_new_event self.auth = Mock(spec=[]) @@ -182,7 +182,7 @@ class TypingNotificationsTestCase(unittest.TestCase): timeout=20000, ) - self.on_new_user_event.assert_has_calls([ + self.on_new_event.assert_has_calls([ call('typing_key', 1, rooms=[self.room_id]), ]) @@ -245,7 +245,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) ) - self.on_new_user_event.assert_has_calls([ + self.on_new_event.assert_has_calls([ call('typing_key', 1, rooms=[self.room_id]), ]) @@ -299,7 +299,7 @@ class TypingNotificationsTestCase(unittest.TestCase): room_id=self.room_id, ) - self.on_new_user_event.assert_has_calls([ + self.on_new_event.assert_has_calls([ call('typing_key', 1, rooms=[self.room_id]), ]) @@ -331,10 +331,10 @@ class TypingNotificationsTestCase(unittest.TestCase): timeout=10000, ) - self.on_new_user_event.assert_has_calls([ + self.on_new_event.assert_has_calls([ call('typing_key', 1, rooms=[self.room_id]), ]) - self.on_new_user_event.reset_mock() + self.on_new_event.reset_mock() self.assertEquals(self.event_source.get_current_key(), 1) events = yield self.event_source.get_new_events_for_user(self.u_apple, 0, None) @@ -351,7 +351,7 @@ class TypingNotificationsTestCase(unittest.TestCase): self.clock.advance_time(11) - self.on_new_user_event.assert_has_calls([ + self.on_new_event.assert_has_calls([ call('typing_key', 2, rooms=[self.room_id]), ]) @@ -377,10 +377,10 @@ class TypingNotificationsTestCase(unittest.TestCase): timeout=10000, ) - self.on_new_user_event.assert_has_calls([ + self.on_new_event.assert_has_calls([ call('typing_key', 3, rooms=[self.room_id]), ]) - self.on_new_user_event.reset_mock() + self.on_new_event.reset_mock() self.assertEquals(self.event_source.get_current_key(), 3) events = yield self.event_source.get_new_events_for_user(self.u_apple, 0, None) -- cgit 1.5.1 From 1a605456260bfb46d8bb9cff2d40d19aec03daa4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Jul 2015 16:20:10 +0100 Subject: Add basic impl for room history ACL on GET /messages client API --- synapse/api/constants.py | 2 ++ synapse/handlers/message.py | 33 +++++++++++++++++++++++- synapse/storage/state.py | 63 +++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 95 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/constants.py b/synapse/api/constants.py index d8a18ee87b..3e15e8a9d7 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -75,6 +75,8 @@ class EventTypes(object): Redaction = "m.room.redaction" Feedback = "m.room.message.feedback" + RoomHistoryVisibility = "m.room.history_visibility" + # These are used for validation Message = "m.room.message" Topic = "m.room.topic" diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e324662f18..17c75f33c9 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -113,11 +113,42 @@ class MessageHandler(BaseHandler): "room_key", next_key ) + if not events: + defer.returnValue({ + "chunk": [], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + }) + + states = yield self.store.get_state_for_events( + room_id, [e.event_id for e in events], + ) + + events_and_states = zip(events, states) + + def allowed(event_and_state): + _, state = event_and_state + + membership = state.get((EventTypes.Member, user_id), None) + if membership and membership.membership == Membership.JOIN: + return True + + history = state.get((EventTypes.RoomHistoryVisibility, ''), None) + if history and history.content["visibility"] == "after_join": + return False + + events_and_states = filter(allowed, events_and_states) + events = [ + ev + for ev, _ in events_and_states + ] + time_now = self.clock.time_msec() chunk = { "chunk": [ - serialize_event(e, time_now, as_client_event) for e in events + serialize_event(e, time_now, as_client_event) + for e in events ], "start": pagin_config.from_token.to_string(), "end": next_token.to_string(), diff --git a/synapse/storage/state.py b/synapse/storage/state.py index f2b17f29ea..d7844edee3 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -92,11 +92,11 @@ class StateStore(SQLBaseStore): defer.returnValue(dict(state_list)) @cached(num_args=1) - def _fetch_events_for_group(self, state_group, events): + def _fetch_events_for_group(self, key, events): return self._get_events( events, get_prev_content=False ).addCallback( - lambda evs: (state_group, evs) + lambda evs: (key, evs) ) def _store_state_groups_txn(self, txn, event, context): @@ -194,6 +194,65 @@ class StateStore(SQLBaseStore): events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) + @defer.inlineCallbacks + def get_state_for_events(self, room_id, event_ids): + def f(txn): + groups = set() + event_to_group = {} + for event_id in event_ids: + # TODO: Remove this loop. + group = self._simple_select_one_onecol_txn( + txn, + table="event_to_state_groups", + keyvalues={"event_id": event_id}, + retcol="state_group", + allow_none=True, + ) + if group: + event_to_group[event_id] = group + groups.add(group) + + group_to_state_ids = {} + for group in groups: + state_ids = self._simple_select_onecol_txn( + txn, + table="state_groups_state", + keyvalues={"state_group": group}, + retcol="event_id", + ) + + group_to_state_ids[group] = state_ids + + return event_to_group, group_to_state_ids + + res = yield self.runInteraction( + "annotate_events_with_state_groups", + f, + ) + + event_to_group, group_to_state_ids = res + + state_list = yield defer.gatherResults( + [ + self._fetch_events_for_group(group, vals) + for group, vals in group_to_state_ids.items() + ], + consumeErrors=True, + ) + + state_dict = { + group: { + (ev.type, ev.state_key): ev + for ev in state + } + for group, state in state_list + } + + defer.returnValue([ + state_dict.get(event_to_group.get(event, None), None) + for event in event_ids + ]) + def _make_group_id(clock): return str(int(clock.time_msec())) + random_string(5) -- cgit 1.5.1 From 41938afed884361451ad6e91eb44e805ebbdaeb0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Jul 2015 17:02:10 +0100 Subject: Make v1 initial syncs respect room history ACL --- synapse/handlers/message.py | 51 ++++++++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 17 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 17c75f33c9..00c7dbec88 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -120,6 +120,23 @@ class MessageHandler(BaseHandler): "end": next_token.to_string(), }) + events = yield self._filter_events_for_client(user_id, room_id, events) + + time_now = self.clock.time_msec() + + chunk = { + "chunk": [ + serialize_event(e, time_now, as_client_event) + for e in events + ], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + } + + defer.returnValue(chunk) + + @defer.inlineCallbacks + def _filter_events_for_client(self, user_id, room_id, events): states = yield self.store.get_state_for_events( room_id, [e.event_id for e in events], ) @@ -127,34 +144,26 @@ class MessageHandler(BaseHandler): events_and_states = zip(events, states) def allowed(event_and_state): - _, state = event_and_state + event, state = event_and_state + + if event.type == EventTypes.RoomHistoryVisibility: + return True membership = state.get((EventTypes.Member, user_id), None) if membership and membership.membership == Membership.JOIN: return True history = state.get((EventTypes.RoomHistoryVisibility, ''), None) - if history and history.content["visibility"] == "after_join": + if history and history.content.get("visibility", None) == "after_join": return False + return True + events_and_states = filter(allowed, events_and_states) - events = [ + defer.returnValue([ ev for ev, _ in events_and_states - ] - - time_now = self.clock.time_msec() - - chunk = { - "chunk": [ - serialize_event(e, time_now, as_client_event) - for e in events - ], - "start": pagin_config.from_token.to_string(), - "end": next_token.to_string(), - } - - defer.returnValue(chunk) + ]) @defer.inlineCallbacks def create_and_send_event(self, event_dict, ratelimit=True, @@ -347,6 +356,10 @@ class MessageHandler(BaseHandler): ] ).addErrback(unwrapFirstError) + messages = yield self._filter_events_for_client( + user_id, event.room_id, messages + ) + start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) time_now = self.clock.time_msec() @@ -448,6 +461,10 @@ class MessageHandler(BaseHandler): consumeErrors=True, ).addErrback(unwrapFirstError) + messages = yield self._filter_events_for_client( + user_id, room_id, messages + ) + start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) -- cgit 1.5.1 From 400894616d15a01c168b2356d950972b6e746496 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 3 Jul 2015 14:51:01 +0100 Subject: Respect m.room.history_visibility in v2_alpha sync API --- synapse/handlers/sync.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index bd8c603681..5078c4e45e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -292,6 +292,36 @@ class SyncHandler(BaseHandler): next_batch=now_token, )) + @defer.inlineCallbacks + def _filter_events_for_client(self, user_id, room_id, events): + states = yield self.store.get_state_for_events( + room_id, [e.event_id for e in events], + ) + + events_and_states = zip(events, states) + + def allowed(event_and_state): + event, state = event_and_state + + if event.type == EventTypes.RoomHistoryVisibility: + return True + + membership = state.get((EventTypes.Member, user_id), None) + if membership and membership.membership == Membership.JOIN: + return True + + history = state.get((EventTypes.RoomHistoryVisibility, ''), None) + if history and history.content.get("visibility", None) == "after_join": + return False + + return True + + events_and_states = filter(allowed, events_and_states) + defer.returnValue([ + ev + for ev, _ in events_and_states + ]) + @defer.inlineCallbacks def load_filtered_recents(self, room_id, sync_config, now_token, since_token=None): @@ -313,6 +343,9 @@ class SyncHandler(BaseHandler): (room_key, _) = keys end_key = "s" + room_key.split('-')[-1] loaded_recents = sync_config.filter.filter_room_events(events) + loaded_recents = yield self._filter_events_for_client( + sync_config.user.to_string(), room_id, loaded_recents, + ) loaded_recents.extend(recents) recents = loaded_recents if len(events) <= load_limit: -- cgit 1.5.1 From c3e2600c6727534d4ebf20dcd8219e248ca31461 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 3 Jul 2015 17:52:57 +0100 Subject: Filter and redact events that the other server doesn't have permission to see during backfill --- synapse/handlers/federation.py | 44 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b5d882fd65..663d05c633 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -31,6 +31,8 @@ from synapse.crypto.event_signing import ( ) from synapse.types import UserID +from synapse.events.utils import prune_event + from synapse.util.retryutils import NotRetryingDestination from twisted.internet import defer @@ -222,6 +224,46 @@ class FederationHandler(BaseHandler): "user_joined_room", user=user, room_id=event.room_id ) + @defer.inlineCallbacks + def _filter_events_for_server(self, server_name, room_id, events): + states = yield self.store.get_state_for_events( + room_id, [e.event_id for e in events], + ) + + events_and_states = zip(events, states) + + def redact_disallowed(event_and_state): + event, state = event_and_state + + if not state: + return event + + history = state.get((EventTypes.RoomHistoryVisibility, ''), None) + if history and history.content.get("visibility", None) == "after_join": + for ev in state.values(): + if ev.type != EventTypes.Member: + continue + try: + domain = UserID.from_string(ev.state_key).domain + except: + continue + + if domain != server_name: + continue + + if ev.membership == Membership.JOIN: + return event + else: + return prune_event(event) + + return event + + res = map(redact_disallowed, events_and_states) + + logger.info("_filter_events_for_server %r", res) + + defer.returnValue(res) + @log_function @defer.inlineCallbacks def backfill(self, dest, room_id, limit, extremities=[]): @@ -882,6 +924,8 @@ class FederationHandler(BaseHandler): limit ) + events = yield self._filter_events_for_server(origin, room_id, events) + defer.returnValue(events) @defer.inlineCallbacks -- cgit 1.5.1 From fb47c3cfbe213c01b25e5605b81c998b764e2bf8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Jul 2015 13:05:52 +0100 Subject: Rename key and values for m.room.history_visibility. Support 'invited' value --- synapse/events/utils.py | 2 +- synapse/handlers/federation.py | 34 ++++++++++++++++++++-------------- synapse/handlers/message.py | 24 ++++++++++++++++++++---- synapse/handlers/sync.py | 25 ++++++++++++++++++++----- 4 files changed, 61 insertions(+), 24 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 4c82780f46..7bd78343f0 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -75,7 +75,7 @@ def prune_event(event): elif event_type == EventTypes.Aliases: add_fields("aliases") elif event_type == EventTypes.RoomHistoryVisibility: - add_fields("visibility") + add_fields("history_visibility") allowed_fields = { k: v diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 663d05c633..cd3867ed9c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -239,22 +239,28 @@ class FederationHandler(BaseHandler): return event history = state.get((EventTypes.RoomHistoryVisibility, ''), None) - if history and history.content.get("visibility", None) == "after_join": - for ev in state.values(): - if ev.type != EventTypes.Member: - continue - try: - domain = UserID.from_string(ev.state_key).domain - except: - continue + if history: + visibility = history.content.get("history_visibility", "shared") + if visibility in ["invited", "joined"]: + for ev in state.values(): + if ev.type != EventTypes.Member: + continue + try: + domain = UserID.from_string(ev.state_key).domain + except: + continue - if domain != server_name: - continue + if domain != server_name: + continue - if ev.membership == Membership.JOIN: - return event - else: - return prune_event(event) + memtype = ev.membership + if memtype == Membership.JOIN: + return event + elif memtype == Membership.INVITE: + if visibility == "invited": + return event + else: + return prune_event(event) return event diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 00c7dbec88..d8b117612d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -149,13 +149,29 @@ class MessageHandler(BaseHandler): if event.type == EventTypes.RoomHistoryVisibility: return True - membership = state.get((EventTypes.Member, user_id), None) - if membership and membership.membership == Membership.JOIN: + membership_ev = state.get((EventTypes.Member, user_id), None) + if membership_ev: + membership = membership_ev.membership + else: + membership = Membership.LEAVE + + if membership == Membership.JOIN: return True history = state.get((EventTypes.RoomHistoryVisibility, ''), None) - if history and history.content.get("visibility", None) == "after_join": - return False + if history: + visibility = history.content.get("history_visibility", "shared") + else: + visibility = "shared" + + if visibility == "public": + return True + elif visibility == "shared": + return True + elif visibility == "joined": + return membership == Membership.JOIN + elif visibility == "invited": + return membership == Membership.INVITE return True diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5078c4e45e..6cff6230c1 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -306,16 +306,31 @@ class SyncHandler(BaseHandler): if event.type == EventTypes.RoomHistoryVisibility: return True - membership = state.get((EventTypes.Member, user_id), None) - if membership and membership.membership == Membership.JOIN: + membership_ev = state.get((EventTypes.Member, user_id), None) + if membership_ev: + membership = membership_ev.membership + else: + membership = Membership.LEAVE + + if membership == Membership.JOIN: return True history = state.get((EventTypes.RoomHistoryVisibility, ''), None) - if history and history.content.get("visibility", None) == "after_join": - return False + if history: + visibility = history.content.get("history_visibility", "shared") + else: + visibility = "shared" - return True + if visibility == "public": + return True + elif visibility == "shared": + return True + elif visibility == "joined": + return membership == Membership.JOIN + elif visibility == "invited": + return membership == Membership.INVITE + return True events_and_states = filter(allowed, events_and_states) defer.returnValue([ ev -- cgit 1.5.1 From 1a3255b507550c76f11251c890a43947b1f4e272 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Jul 2015 13:09:16 +0100 Subject: Add m.room.history_visibility to newly created rooms' m.room.power_levels --- synapse/api/auth.py | 1 + synapse/handlers/room.py | 1 + 2 files changed, 2 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index deca747f72..1a25bf1086 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -575,6 +575,7 @@ class Auth(object): levels_to_check = [ ("users_default", []), ("events_default", []), + ("state_default", []), ("ban", []), ("redact", []), ("kick", []), diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 4bd027d9bb..891707df44 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -213,6 +213,7 @@ class RoomCreationHandler(BaseHandler): "events": { EventTypes.Name: 100, EventTypes.PowerLevels: 100, + EventTypes.RoomHistoryVisibility: 100, }, "events_default": 0, "state_default": 50, -- cgit 1.5.1 From 716e42693354553ee2878e3a8df6811226e91130 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jul 2015 10:55:31 +0100 Subject: Fix various typos --- synapse/handlers/__init__.py | 2 ++ synapse/handlers/receipts.py | 6 ++++-- synapse/storage/__init__.py | 3 +++ synapse/storage/receipts.py | 13 +++++++------ 4 files changed, 16 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 685792dbdc..dc5b6ef79d 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -32,6 +32,7 @@ from .appservice import ApplicationServicesHandler from .sync import SyncHandler from .auth import AuthHandler from .identity import IdentityHandler +from .receipts import ReceiptsHandler class Handlers(object): @@ -57,6 +58,7 @@ class Handlers(object): self.directory_handler = DirectoryHandler(hs) self.typing_notification_handler = TypingNotificationHandler(hs) self.admin_handler = AdminHandler(hs) + self.receipts_handler = ReceiptsHandler(hs) asapi = ApplicationServiceApi(hs) self.appservice_handler = ApplicationServicesHandler( hs, asapi, AppServiceScheduler( diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index f0d12d35f4..fc2f38c1c0 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -31,6 +31,8 @@ class ReceiptsHandler(BaseHandler): def __init__(self, hs): super(ReceiptsHandler, self).__init__(hs) + self.hs = hs + self.federation = hs.get_replication_layer() self.federation.register_edu_handler( "m.receipt", self._received_remote_receipt ) @@ -89,13 +91,13 @@ class ReceiptsHandler(BaseHandler): with PreserveLoggingContext(): self.notifier.on_new_event( - "recei[t_key", self._latest_serial, rooms=[room_id] + "receipt_key", self._latest_serial, rooms=[room_id] ) localusers = set() remotedomains = set() - rm_handler = self.homeserver.get_handlers().room_member_handler + rm_handler = self.hs.get_handlers().room_member_handler yield rm_handler.fetch_room_distributions_into( room_id, localusers=localusers, remotedomains=remotedomains ) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 275598adda..2bc88a7954 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -38,6 +38,8 @@ from .state import StateStore from .signatures import SignatureStore from .filtering import FilteringStore +from .receipts import ReceiptsStore + import fnmatch import imp @@ -74,6 +76,7 @@ class DataStore(RoomMemberStore, RoomStore, PushRuleStore, ApplicationServiceTransactionStore, EventsStore, + ReceiptsStore, ): def __init__(self, hs): diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 0168e74a0d..15c11fd410 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -18,7 +18,7 @@ from ._base import SQLBaseStore, cached from twisted.internet import defer -class ReceiptStore(SQLBaseStore): +class ReceiptsStore(SQLBaseStore): @cached @defer.inlineCallbacks @@ -77,6 +77,7 @@ class ReceiptStore(SQLBaseStore): txn, table="receipts_linearized", values={ + "stream_id": stream_id, "room_id": room_id, "receipt_type": receipt_type, "user_id": user_id, @@ -109,16 +110,16 @@ class ReceiptStore(SQLBaseStore): return None linearized_event_id = yield self.runInteraction( - graph_to_linear, desc="insert_receipt_conv" + "insert_receipt_conv", graph_to_linear ) - stream_id_manager = yield self._stream_id_gen.get_next(self) - with stream_id_manager() as stream_id: + stream_id_manager = yield self._receipts_id_gen.get_next(self) + with stream_id_manager as stream_id: yield self.runInteraction( + "insert_linearized_receipt", self.insert_linearized_receipt_txn, room_id, receipt_type, user_id, linearized_event_id, stream_id=stream_id, - desc="insert_linearized_receipt" ) yield self.insert_graph_receipt( @@ -131,9 +132,9 @@ class ReceiptStore(SQLBaseStore): def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids): return self.runInteraction( + "insert_graph_receipt", self.insert_graph_receipt_txn, room_id, receipt_type, user_id, event_ids, - desc="insert_graph_receipt" ) def insert_graph_receipt_txn(self, txn, room_id, receipt_type, -- cgit 1.5.1 From ca041d55267740214a2cfab95c44ee6f70cc6d0d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jul 2015 15:25:30 +0100 Subject: Wire together receipts and the notifer/federation --- synapse/handlers/receipts.py | 81 +++++++++++++++++++++++--------- synapse/rest/client/v2_alpha/receipts.py | 3 +- synapse/storage/receipts.py | 69 +++++++++++++++++++++++---- synapse/streams/events.py | 6 ++- 4 files changed, 126 insertions(+), 33 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index fc2f38c1c0..94f0810057 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -37,7 +37,8 @@ class ReceiptsHandler(BaseHandler): "m.receipt", self._received_remote_receipt ) - self._latest_serial = 0 + # self._earliest_cached_serial = 0 + # self._rooms_to_latest_serial = {} @defer.inlineCallbacks def received_client_receipt(self, room_id, receipt_type, user_id, @@ -53,8 +54,10 @@ class ReceiptsHandler(BaseHandler): "event_ids": [event_id], } - yield self._handle_new_receipts([receipt]) - self._push_remotes([receipt]) + is_new = yield self._handle_new_receipts([receipt]) + + if is_new: + self._push_remotes([receipt]) @defer.inlineCallbacks def _received_remote_receipt(self, origin, content): @@ -81,33 +84,24 @@ class ReceiptsHandler(BaseHandler): user_id = receipt["user_id"] event_ids = receipt["event_ids"] - stream_id, max_persisted_id = yield self.store.insert_receipt( + res = yield self.store.insert_receipt( room_id, receipt_type, user_id, event_ids, ) - # TODO: Use max_persisted_id + if not res: + # res will be None if this read receipt is 'old' + defer.returnValue(False) - self._latest_serial = max(self._latest_serial, stream_id) + stream_id, max_persisted_id = res with PreserveLoggingContext(): self.notifier.on_new_event( - "receipt_key", self._latest_serial, rooms=[room_id] + "receipt_key", max_persisted_id, rooms=[room_id] ) - localusers = set() - remotedomains = set() - - rm_handler = self.hs.get_handlers().room_member_handler - yield rm_handler.fetch_room_distributions_into( - room_id, localusers=localusers, remotedomains=remotedomains - ) - - receipt["remotedomains"] = remotedomains - - self.notifier.on_new_event( - "receipt_key", self._latest_room_serial, rooms=[room_id] - ) + defer.returnValue(True) + @defer.inlineCallbacks def _push_remotes(self, receipts): # TODO: Some of this stuff should be coallesced. for receipt in receipts: @@ -115,7 +109,15 @@ class ReceiptsHandler(BaseHandler): receipt_type = receipt["receipt_type"] user_id = receipt["user_id"] event_ids = receipt["event_ids"] - remotedomains = receipt["remotedomains"] + + remotedomains = set() + + rm_handler = self.hs.get_handlers().room_member_handler + yield rm_handler.fetch_room_distributions_into( + room_id, localusers=None, remotedomains=remotedomains + ) + + logger.debug("Sending receipt to: %r", remotedomains) for domain in remotedomains: self.federation.send_edu( @@ -130,3 +132,40 @@ class ReceiptsHandler(BaseHandler): }, }, ) + + +class ReceiptEventSource(object): + def __init__(self, hs): + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def get_new_events_for_user(self, user, from_key, limit): + from_key = int(from_key) + to_key = yield self.get_current_key() + + rooms = yield self.store.get_rooms_for_user(user.to_string()) + rooms = [room.room_id for room in rooms] + content = {} + for room_id in rooms: + result = yield self.store.get_linearized_receipts_for_room( + room_id, from_key, to_key + ) + if result: + content[room_id] = result + + if not content: + defer.returnValue(([], to_key)) + + event = { + "type": "m.receipt", + "content": content, + } + + defer.returnValue(([event], to_key)) + + def get_current_key(self, direction='f'): + return self.store.get_max_receipt_stream_id() + + @defer.inlineCallbacks + def get_pagination_rows(self, user, config, key): + defer.returnValue(([{}], 0)) diff --git a/synapse/rest/client/v2_alpha/receipts.py b/synapse/rest/client/v2_alpha/receipts.py index 829427b7b6..40406e2ede 100644 --- a/synapse/rest/client/v2_alpha/receipts.py +++ b/synapse/rest/client/v2_alpha/receipts.py @@ -28,7 +28,7 @@ class ReceiptRestServlet(RestServlet): PATTERN = client_v2_pattern( "/rooms/(?P[^/]*)" "/receipt/(?P[^/]*)" - "/(?P[^/])*" + "/(?P[^/]*)$" ) def __init__(self, hs): @@ -41,7 +41,6 @@ class ReceiptRestServlet(RestServlet): def on_POST(self, request, room_id, receipt_type, event_id): user, client = yield self.auth.get_user_by_req(request) - # TODO: STUFF yield self.receipts_handler.received_client_receipt( room_id, receipt_type, diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 15c11fd410..5a02c80252 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -17,17 +17,33 @@ from ._base import SQLBaseStore, cached from twisted.internet import defer +import logging + + +logger = logging.getLogger(__name__) + class ReceiptsStore(SQLBaseStore): - @cached @defer.inlineCallbacks - def get_linearized_receipts_for_room(self, room_id): - rows = yield self._simple_select_list( - table="receipts_linearized", - keyvalues={"room_id": room_id}, - retcols=["receipt_type", "user_id", "event_id"], - desc="get_linearized_receipts_for_room", + def get_linearized_receipts_for_room(self, room_id, from_key, to_key): + def f(txn): + sql = ( + "SELECT * FROM receipts_linearized WHERE" + " room_id = ? AND stream_id > ? AND stream_id <= ?" + ) + + txn.execute( + sql, + (room_id, from_key, to_key) + ) + + rows = self.cursor_to_dict(txn) + + return rows + + rows = yield self.runInteraction( + "get_linearized_receipts_for_room", f ) result = {} @@ -40,6 +56,9 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue(result) + def get_max_receipt_stream_id(self): + return self._receipts_id_gen.get_max_token(self) + @cached @defer.inlineCallbacks def get_graph_receipts_for_room(self, room_id): @@ -62,11 +81,38 @@ class ReceiptsStore(SQLBaseStore): def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, user_id, event_id, stream_id): + + # We don't want to clobber receipts for more recent events, so we + # have to compare orderings of existing receipts + sql = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " INNER JOIN receipts_linearized as r USING (event_id, room_id)" + " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" + ) + + txn.execute(sql, (room_id, receipt_type, user_id)) + results = txn.fetchall() + + if results: + res = self._simple_select_one_txn( + txn, + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + ) + topological_ordering = int(res["topological_ordering"]) + stream_ordering = int(res["stream_ordering"]) + + for to, so, _ in results: + if int(to) > topological_ordering: + return False + elif int(to) == topological_ordering and int(so) >= stream_ordering: + return False + self._simple_delete_txn( txn, table="receipts_linearized", keyvalues={ - "stream_id": stream_id, "room_id": room_id, "receipt_type": receipt_type, "user_id": user_id, @@ -85,6 +131,8 @@ class ReceiptsStore(SQLBaseStore): } ) + return True + @defer.inlineCallbacks def insert_receipt(self, room_id, receipt_type, user_id, event_ids): if not event_ids: @@ -115,13 +163,16 @@ class ReceiptsStore(SQLBaseStore): stream_id_manager = yield self._receipts_id_gen.get_next(self) with stream_id_manager as stream_id: - yield self.runInteraction( + have_persisted = yield self.runInteraction( "insert_linearized_receipt", self.insert_linearized_receipt_txn, room_id, receipt_type, user_id, linearized_event_id, stream_id=stream_id, ) + if not have_persisted: + defer.returnValue(None) + yield self.insert_graph_receipt( room_id, receipt_type, user_id, event_ids ) diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 0a1a3a3d03..aaa3609aa5 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -20,6 +20,7 @@ from synapse.types import StreamToken from synapse.handlers.presence import PresenceEventSource from synapse.handlers.room import RoomEventSource from synapse.handlers.typing import TypingNotificationEventSource +from synapse.handlers.receipts import ReceiptEventSource class NullSource(object): @@ -43,6 +44,7 @@ class EventSources(object): "room": RoomEventSource, "presence": PresenceEventSource, "typing": TypingNotificationEventSource, + "receipt": ReceiptEventSource, } def __init__(self, hs): @@ -63,7 +65,9 @@ class EventSources(object): typing_key=( yield self.sources["typing"].get_current_key() ), - receipt_key="0", + receipt_key=( + yield self.sources["receipt"].get_current_key() + ), ) defer.returnValue(token) -- cgit 1.5.1 From 87311d1b8cc648400dfce5db8a7fed46abbeb963 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jul 2015 10:54:01 +0100 Subject: Hook up receipts to v1 initialSync --- synapse/handlers/message.py | 16 +++++++++++++--- synapse/handlers/receipts.py | 45 +++++++++++++++++++++++++++++++++++++++++++- synapse/storage/receipts.py | 29 +++++++++++++++++++--------- 3 files changed, 77 insertions(+), 13 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e324662f18..7c1d6b5489 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -278,6 +278,11 @@ class MessageHandler(BaseHandler): user, pagination_config.get_source_config("presence"), None ) + receipt_stream = self.hs.get_event_sources().sources["receipt"] + receipt, _ = yield receipt_stream.get_pagination_rows( + user, pagination_config.get_source_config("receipt"), None + ) + public_room_ids = yield self.store.get_public_room_ids() limit = pagin_config.limit @@ -344,7 +349,8 @@ class MessageHandler(BaseHandler): ret = { "rooms": rooms_ret, "presence": presence, - "end": now_token.to_string() + "receipts": receipt, + "end": now_token.to_string(), } defer.returnValue(ret) @@ -405,9 +411,12 @@ class MessageHandler(BaseHandler): defer.returnValue([p for success, p in presence_defs if success]) - presence, (messages, token) = yield defer.gatherResults( + receipts_handler = self.hs.get_handlers().receipts_handler + + presence, receipts, (messages, token) = yield defer.gatherResults( [ get_presence(), + receipts_handler.get_receipts_for_room(room_id, now_token.receipt_key), self.store.get_recent_events_for_room( room_id, limit=limit, @@ -431,5 +440,6 @@ class MessageHandler(BaseHandler): "end": end_token.to_string(), }, "state": state, - "presence": presence + "presence": presence, + "receipts": receipts, }) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 94f0810057..f6cde30e63 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -133,6 +133,24 @@ class ReceiptsHandler(BaseHandler): }, ) + @defer.inlineCallbacks + def get_receipts_for_room(self, room_id, to_key): + result = yield self.store.get_linearized_receipts_for_room( + room_id, None, to_key + ) + + if not result: + defer.returnValue([]) + + event = { + "type": "m.receipt", + "content": { + room_id: result, + }, + } + + defer.returnValue([event]) + class ReceiptEventSource(object): def __init__(self, hs): @@ -168,4 +186,29 @@ class ReceiptEventSource(object): @defer.inlineCallbacks def get_pagination_rows(self, user, config, key): - defer.returnValue(([{}], 0)) + to_key = int(config.from_key) + + if config.to_key: + from_key = int(config.to_key) + else: + from_key = None + + rooms = yield self.store.get_rooms_for_user(user.to_string()) + rooms = [room.room_id for room in rooms] + content = {} + for room_id in rooms: + result = yield self.store.get_linearized_receipts_for_room( + room_id, from_key, to_key + ) + if result: + content[room_id] = result + + if not content: + defer.returnValue(([], to_key)) + + event = { + "type": "m.receipt", + "content": content, + } + + defer.returnValue(([event], to_key)) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 5a02c80252..07f8edaace 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -28,15 +28,26 @@ class ReceiptsStore(SQLBaseStore): @defer.inlineCallbacks def get_linearized_receipts_for_room(self, room_id, from_key, to_key): def f(txn): - sql = ( - "SELECT * FROM receipts_linearized WHERE" - " room_id = ? AND stream_id > ? AND stream_id <= ?" - ) - - txn.execute( - sql, - (room_id, from_key, to_key) - ) + if from_key: + sql = ( + "SELECT * FROM receipts_linearized WHERE" + " room_id = ? AND stream_id > ? AND stream_id <= ?" + ) + + txn.execute( + sql, + (room_id, from_key, to_key) + ) + else: + sql = ( + "SELECT * FROM receipts_linearized WHERE" + " room_id = ? AND stream_id <= ?" + ) + + txn.execute( + sql, + (room_id, to_key) + ) rows = self.cursor_to_dict(txn) -- cgit 1.5.1 From 81682d0f820a6209535267a45ee28b8f66ff7794 Mon Sep 17 00:00:00 2001 From: Muthu Subramanian Date: Tue, 7 Jul 2015 17:40:30 +0530 Subject: Integrate SAML2 basic authentication - uses pysaml2 --- synapse/config/homeserver.py | 6 ++-- synapse/config/saml2.py | 27 ++++++++++++++++++ synapse/handlers/register.py | 30 ++++++++++++++++++++ synapse/python_dependencies.py | 1 + synapse/rest/client/v1/login.py | 62 ++++++++++++++++++++++++++++++++++++++++- 5 files changed, 122 insertions(+), 4 deletions(-) create mode 100644 synapse/config/saml2.py (limited to 'synapse/handlers') diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index fe0ccb6eb7..5c655c5373 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -25,12 +25,12 @@ from .registration import RegistrationConfig from .metrics import MetricsConfig from .appservice import AppServiceConfig from .key import KeyConfig - +from .saml2 import SAML2Config class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, - VoipConfig, RegistrationConfig, - MetricsConfig, AppServiceConfig, KeyConfig,): + VoipConfig, RegistrationConfig, MetricsConfig, + AppServiceConfig, KeyConfig, SAML2Config, ): pass diff --git a/synapse/config/saml2.py b/synapse/config/saml2.py new file mode 100644 index 0000000000..4f3a724e27 --- /dev/null +++ b/synapse/config/saml2.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Ericsson +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import Config + +class SAML2Config(Config): + def read_config(self, config): + self.saml2_config = config["saml2_config"] + + def default_config(self, config_dir_path, server_name): + return """ + saml2_config: + config_path: "%s/sp_conf.py" + idp_redirect_url: "http://%s/idp" + """%(config_dir_path, server_name) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 7b68585a17..4c6c5e2972 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -192,6 +192,36 @@ class RegistrationHandler(BaseHandler): else: logger.info("Valid captcha entered from %s", ip) + @defer.inlineCallbacks + def register_saml2(self, localpart): + """ + Registers email_id as SAML2 Based Auth. + """ + if urllib.quote(localpart) != localpart: + raise SynapseError( + 400, + "User ID must only contain characters which do not" + " require URL encoding." + ) + user = UserID(localpart, self.hs.hostname) + user_id = user.to_string() + + yield self.check_user_id_is_valid(user_id) + token = self._generate_token(user_id) + try: + yield self.store.register( + user_id=user_id, + token=token, + password_hash=None + ) + yield self.distributor.fire("registered_user", user) + except Exception, e: + yield self.store.add_access_token_to_user(user_id, token) + # Ignore Registration errors + logger.exception(e) + defer.returnValue((user_id, token)) + + @defer.inlineCallbacks def register_email(self, threepidCreds): """ diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index f9e59dd917..17587170c8 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -31,6 +31,7 @@ REQUIREMENTS = { "pillow": ["PIL"], "pydenticon": ["pydenticon"], "ujson": ["ujson"], + "pysaml2": ["saml2"], } CONDITIONAL_REQUIREMENTS = { "web_client": { diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index b2257b749d..dc7615c6f3 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -20,14 +20,32 @@ from synapse.types import UserID from base import ClientV1RestServlet, client_path_pattern import simplejson as json +import cgi +import urllib + +import logging +from saml2 import BINDING_HTTP_REDIRECT +from saml2 import BINDING_HTTP_POST +from saml2.metadata import create_metadata_string +from saml2 import config +from saml2.client import Saml2Client +from saml2.httputil import ServiceError +from saml2.samlp import Extensions +from saml2.extension.pefim import SPCertEnc +from saml2.s_utils import rndstr class LoginRestServlet(ClientV1RestServlet): PATTERN = client_path_pattern("/login$") PASS_TYPE = "m.login.password" + SAML2_TYPE = "m.login.saml2" + + def __init__(self, hs): + super(LoginRestServlet, self).__init__(hs) + self.idp_redirect_url = hs.config.saml2_config['idp_redirect_url'] def on_GET(self, request): - return (200, {"flows": [{"type": LoginRestServlet.PASS_TYPE}]}) + return (200, {"flows": [{"type": LoginRestServlet.PASS_TYPE}, {"type": LoginRestServlet.SAML2_TYPE}]}) def on_OPTIONS(self, request): return (200, {}) @@ -39,6 +57,14 @@ class LoginRestServlet(ClientV1RestServlet): if login_submission["type"] == LoginRestServlet.PASS_TYPE: result = yield self.do_password_login(login_submission) defer.returnValue(result) + elif login_submission["type"] == LoginRestServlet.SAML2_TYPE: + relay_state = "" + if "relay_state" in login_submission: + relay_state = "&RelayState="+urllib.quote(login_submission["relay_state"]) + result = { + "uri": "%s%s"%(self.idp_redirect_url, relay_state) + } + defer.returnValue((200, result)) else: raise SynapseError(400, "Bad login type.") except KeyError: @@ -93,6 +119,39 @@ class PasswordResetRestServlet(ClientV1RestServlet): "Missing keys. Requires 'email' and 'user_id'." ) +class SAML2RestServlet(ClientV1RestServlet): + PATTERN = client_path_pattern("/login/saml2") + + def __init__(self, hs): + super(SAML2RestServlet, self).__init__(hs) + self.sp_config = hs.config.saml2_config['config_path'] + + @defer.inlineCallbacks + def on_POST(self, request): + saml2_auth = None + try: + conf = config.SPConfig() + conf.load_file(self.sp_config) + SP = Saml2Client(conf) + saml2_auth = SP.parse_authn_request_response(request.args['SAMLResponse'][0], BINDING_HTTP_POST) + except Exception, e: # Not authenticated + logger = logging.getLogger(__name__) + logger.exception(e) + if saml2_auth and saml2_auth.status_ok() and not saml2_auth.not_signed: + username = saml2_auth.name_id.text + handler = self.handlers.registration_handler + (user_id, token) = yield handler.register_saml2(username) + # Forward to the RelayState callback along with ava + if 'RelayState' in request.args: + request.redirect(urllib.unquote(request.args['RelayState'][0])+'?status=authenticated&access_token='+token+'&user_id='+user_id+'&ava='+urllib.quote(json.dumps(saml2_auth.ava))) + request.finish() + defer.returnValue(None) + defer.returnValue((200, {"status":"authenticated", "user_id": user_id, "token": token, "ava":saml2_auth.ava})) + elif 'RelayState' in request.args: + request.redirect(urllib.unquote(request.args['RelayState'][0])+'?status=not_authenticated') + request.finish() + defer.returnValue(None) + defer.returnValue((200, {"status":"not_authenticated"})) def _parse_json(request): try: @@ -106,4 +165,5 @@ def _parse_json(request): def register_servlets(hs, http_server): LoginRestServlet(hs).register(http_server) + SAML2RestServlet(hs).register(http_server) # TODO PasswordResetRestServlet(hs).register(http_server) -- cgit 1.5.1 From f53bae0c1948a8c0a229e0b20f237f7ff4b1d84c Mon Sep 17 00:00:00 2001 From: Muthu Subramanian Date: Wed, 8 Jul 2015 16:05:46 +0530 Subject: code beautify --- synapse/config/homeserver.py | 1 + synapse/config/saml2.py | 3 ++- synapse/handlers/register.py | 1 - 3 files changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 5c655c5373..d77f045406 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -27,6 +27,7 @@ from .appservice import AppServiceConfig from .key import KeyConfig from .saml2 import SAML2Config + class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, VoipConfig, RegistrationConfig, MetricsConfig, diff --git a/synapse/config/saml2.py b/synapse/config/saml2.py index 4f3a724e27..d18d076a89 100644 --- a/synapse/config/saml2.py +++ b/synapse/config/saml2.py @@ -15,6 +15,7 @@ from ._base import Config + class SAML2Config(Config): def read_config(self, config): self.saml2_config = config["saml2_config"] @@ -24,4 +25,4 @@ class SAML2Config(Config): saml2_config: config_path: "%s/sp_conf.py" idp_redirect_url: "http://%s/idp" - """%(config_dir_path, server_name) + """ % (config_dir_path, server_name) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 4c6c5e2972..a1288b4252 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -220,7 +220,6 @@ class RegistrationHandler(BaseHandler): # Ignore Registration errors logger.exception(e) defer.returnValue((user_id, token)) - @defer.inlineCallbacks def register_email(self, threepidCreds): -- cgit 1.5.1 From d85ce8d89bc1fb6ff6f277fb424dddca7ab2f47e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jul 2015 11:36:05 +0100 Subject: Split receipt events up into one per room --- synapse/handlers/receipts.py | 51 ++++++++++++++++++-------------------------- 1 file changed, 21 insertions(+), 30 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index f6cde30e63..b7567b9ead 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -144,9 +144,8 @@ class ReceiptsHandler(BaseHandler): event = { "type": "m.receipt", - "content": { - room_id: result, - }, + "room_id": room_id, + "content": result, } defer.returnValue([event]) @@ -163,23 +162,19 @@ class ReceiptEventSource(object): rooms = yield self.store.get_rooms_for_user(user.to_string()) rooms = [room.room_id for room in rooms] - content = {} + events = [] for room_id in rooms: - result = yield self.store.get_linearized_receipts_for_room( + content = yield self.store.get_linearized_receipts_for_room( room_id, from_key, to_key ) - if result: - content[room_id] = result - - if not content: - defer.returnValue(([], to_key)) - - event = { - "type": "m.receipt", - "content": content, - } + if content: + events.append({ + "type": "m.receipt", + "room_id": room_id, + "content": content, + }) - defer.returnValue(([event], to_key)) + defer.returnValue((events, to_key)) def get_current_key(self, direction='f'): return self.store.get_max_receipt_stream_id() @@ -195,20 +190,16 @@ class ReceiptEventSource(object): rooms = yield self.store.get_rooms_for_user(user.to_string()) rooms = [room.room_id for room in rooms] - content = {} + events = [] for room_id in rooms: - result = yield self.store.get_linearized_receipts_for_room( + content = yield self.store.get_linearized_receipts_for_room( room_id, from_key, to_key ) - if result: - content[room_id] = result - - if not content: - defer.returnValue(([], to_key)) - - event = { - "type": "m.receipt", - "content": content, - } - - defer.returnValue(([event], to_key)) + if content: + events.append({ + "type": "m.receipt", + "room_id": room_id, + "content": content, + }) + + defer.returnValue((events, to_key)) -- cgit 1.5.1 From af812b68ddbd1a69a8c98c463248d000633b075f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jul 2015 15:35:00 +0100 Subject: Add a cache to fetching of receipt streams --- synapse/handlers/receipts.py | 31 ++++----------- synapse/storage/receipts.py | 92 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 96 insertions(+), 27 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index b7567b9ead..053ed84805 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -37,8 +37,7 @@ class ReceiptsHandler(BaseHandler): "m.receipt", self._received_remote_receipt ) - # self._earliest_cached_serial = 0 - # self._rooms_to_latest_serial = {} + self._receipt_cache = None @defer.inlineCallbacks def received_client_receipt(self, room_id, receipt_type, user_id, @@ -162,17 +161,9 @@ class ReceiptEventSource(object): rooms = yield self.store.get_rooms_for_user(user.to_string()) rooms = [room.room_id for room in rooms] - events = [] - for room_id in rooms: - content = yield self.store.get_linearized_receipts_for_room( - room_id, from_key, to_key - ) - if content: - events.append({ - "type": "m.receipt", - "room_id": room_id, - "content": content, - }) + events = yield self.store.get_linearized_receipts_for_rooms( + rooms, from_key, to_key + ) defer.returnValue((events, to_key)) @@ -190,16 +181,8 @@ class ReceiptEventSource(object): rooms = yield self.store.get_rooms_for_user(user.to_string()) rooms = [room.room_id for room in rooms] - events = [] - for room_id in rooms: - content = yield self.store.get_linearized_receipts_for_room( - room_id, from_key, to_key - ) - if content: - events.append({ - "type": "m.receipt", - "room_id": room_id, - "content": content, - }) + events = yield self.store.get_linearized_receipts_for_rooms( + rooms, from_key, to_key + ) defer.returnValue((events, to_key)) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 07f8edaace..503f68f858 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -17,6 +17,9 @@ from ._base import SQLBaseStore, cached from twisted.internet import defer +from synapse.util import unwrapFirstError + +from blist import sorteddict import logging @@ -24,6 +27,29 @@ logger = logging.getLogger(__name__) class ReceiptsStore(SQLBaseStore): + def __init__(self, hs): + super(ReceiptsStore, self).__init__(hs) + + self._receipts_stream_cache = _RoomStreamChangeCache() + + @defer.inlineCallbacks + def get_linearized_receipts_for_rooms(self, room_ids, from_key, to_key): + room_ids = set(room_ids) + + if from_key: + room_ids = yield self._receipts_stream_cache.get_rooms_changed( + self, room_ids, from_key + ) + + results = yield defer.gatherResults( + [ + self.get_linearized_receipts_for_room(room_id, from_key, to_key) + for room_id in room_ids + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) + + defer.returnValue([ev for res in results for ev in res]) @defer.inlineCallbacks def get_linearized_receipts_for_room(self, room_id, from_key, to_key): @@ -57,15 +83,22 @@ class ReceiptsStore(SQLBaseStore): "get_linearized_receipts_for_room", f ) - result = {} + if not rows: + defer.returnValue([]) + + content = {} for row in rows: - result.setdefault( + content.setdefault( row["event_id"], {} ).setdefault( row["receipt_type"], [] ).append(row["user_id"]) - defer.returnValue(result) + defer.returnValue([{ + "type": "m.receipt", + "room_id": room_id, + "content": content, + }]) def get_max_receipt_stream_id(self): return self._receipts_id_gen.get_max_token(self) @@ -174,6 +207,9 @@ class ReceiptsStore(SQLBaseStore): stream_id_manager = yield self._receipts_id_gen.get_next(self) with stream_id_manager as stream_id: + yield self._receipts_stream_cache.room_has_changed( + self, room_id, stream_id + ) have_persisted = yield self.runInteraction( "insert_linearized_receipt", self.insert_linearized_receipt_txn, @@ -223,3 +259,53 @@ class ReceiptsStore(SQLBaseStore): for event_id in event_ids ], ) + + +class _RoomStreamChangeCache(object): + """Keeps track of the stream_id of the latest change in rooms. + + Given a list of rooms and stream key, it will give a subset of rooms that + may have changed since that key. If the key is too old then the cache + will simply return all rooms. + """ + def __init__(self, size_of_cache=1000): + self._size_of_cache = size_of_cache + self._room_to_key = {} + self._cache = sorteddict() + self._earliest_key = None + + @defer.inlineCallbacks + def get_rooms_changed(self, store, room_ids, key): + if key > (yield self._get_earliest_key(store)): + keys = self._cache.keys() + i = keys.bisect_right(key) + + result = set( + self._cache[k] for k in keys[i:] + ).intersection(room_ids) + else: + result = room_ids + + defer.returnValue(result) + + @defer.inlineCallbacks + def room_has_changed(self, store, room_id, key): + if key > (yield self._get_earliest_key(store)): + old_key = self._room_to_key.get(room_id, None) + if old_key: + key = max(key, old_key) + self._cache.pop(old_key, None) + self._cache[key] = room_id + + while len(self._cache) > self._size_of_cache: + k, r = self._cache.popitem() + self._earliest_key = max(k, self._earliest_key) + self._room_to_key.pop(r, None) + + @defer.inlineCallbacks + def _get_earliest_key(self, store): + if self._earliest_key is None: + self._earliest_key = yield store.get_max_receipt_stream_id() + self._earliest_key = int(self._earliest_key) + + defer.returnValue(self._earliest_key) -- cgit 1.5.1 From 1af188209a03567dc4b5300b9a0fc8613ad176df Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Jul 2015 11:39:30 +0100 Subject: Change format of receipts to allow inclusion of data --- synapse/handlers/receipts.py | 24 +++++++++++------ synapse/storage/receipts.py | 39 ++++++++++++++-------------- synapse/storage/schema/delta/21/receipts.sql | 16 +++++------- 3 files changed, 42 insertions(+), 37 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 053ed84805..8a052f071b 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -36,6 +36,7 @@ class ReceiptsHandler(BaseHandler): self.federation.register_edu_handler( "m.receipt", self._received_remote_receipt ) + self.clock = self.hs.get_clock() self._receipt_cache = None @@ -51,6 +52,9 @@ class ReceiptsHandler(BaseHandler): "receipt_type": receipt_type, "user_id": user_id, "event_ids": [event_id], + "data": { + "ts": self.clock.time_msec() + } } is_new = yield self._handle_new_receipts([receipt]) @@ -65,12 +69,12 @@ class ReceiptsHandler(BaseHandler): "room_id": room_id, "receipt_type": receipt_type, "user_id": user_id, - "event_ids": [event_id], + "event_ids": user_values["event_ids"], + "data": user_values.get("data", {}), } for room_id, room_values in content.items() - for event_id, ev_values in room_values.items() - for receipt_type, users in ev_values.items() - for user_id in users + for receipt_type, users in room_values.items() + for user_id, user_values in users.items() ] yield self._handle_new_receipts(receipts) @@ -82,9 +86,10 @@ class ReceiptsHandler(BaseHandler): receipt_type = receipt["receipt_type"] user_id = receipt["user_id"] event_ids = receipt["event_ids"] + data = receipt["data"] res = yield self.store.insert_receipt( - room_id, receipt_type, user_id, event_ids, + room_id, receipt_type, user_id, event_ids, data ) if not res: @@ -108,6 +113,7 @@ class ReceiptsHandler(BaseHandler): receipt_type = receipt["receipt_type"] user_id = receipt["user_id"] event_ids = receipt["event_ids"] + data = receipt["data"] remotedomains = set() @@ -124,10 +130,12 @@ class ReceiptsHandler(BaseHandler): edu_type="m.receipt", content={ room_id: { - event_id: { - receipt_type: [user_id] + receipt_type: { + user_id: { + "event_ids": event_ids, + "data": data, + } } - for event_id in event_ids }, }, ) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 503f68f858..c4e6b02bdf 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -21,6 +21,7 @@ from synapse.util import unwrapFirstError from blist import sorteddict import logging +import ujson as json logger = logging.getLogger(__name__) @@ -91,8 +92,8 @@ class ReceiptsStore(SQLBaseStore): content.setdefault( row["event_id"], {} ).setdefault( - row["receipt_type"], [] - ).append(row["user_id"]) + row["receipt_type"], {} + )[row["user_id"]] = json.loads(row["data"]) defer.returnValue([{ "type": "m.receipt", @@ -124,7 +125,7 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue(result) def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, - user_id, event_id, stream_id): + user_id, event_id, data, stream_id): # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts @@ -172,13 +173,14 @@ class ReceiptsStore(SQLBaseStore): "receipt_type": receipt_type, "user_id": user_id, "event_id": event_id, + "data": json.dumps(data), } ) return True @defer.inlineCallbacks - def insert_receipt(self, room_id, receipt_type, user_id, event_ids): + def insert_receipt(self, room_id, receipt_type, user_id, event_ids, data): if not event_ids: return @@ -214,6 +216,7 @@ class ReceiptsStore(SQLBaseStore): "insert_linearized_receipt", self.insert_linearized_receipt_txn, room_id, receipt_type, user_id, linearized_event_id, + data, stream_id=stream_id, ) @@ -221,22 +224,22 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue(None) yield self.insert_graph_receipt( - room_id, receipt_type, user_id, event_ids + room_id, receipt_type, user_id, event_ids, data ) max_persisted_id = yield self._stream_id_gen.get_max_token(self) defer.returnValue((stream_id, max_persisted_id)) - def insert_graph_receipt(self, room_id, receipt_type, - user_id, event_ids): + def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids, + data): return self.runInteraction( "insert_graph_receipt", self.insert_graph_receipt_txn, - room_id, receipt_type, user_id, event_ids, + room_id, receipt_type, user_id, event_ids, data ) def insert_graph_receipt_txn(self, txn, room_id, receipt_type, - user_id, event_ids): + user_id, event_ids, data): self._simple_delete_txn( txn, table="receipts_graph", @@ -246,18 +249,16 @@ class ReceiptsStore(SQLBaseStore): "user_id": user_id, } ) - self._simple_insert_many_txn( + self._simple_insert_txn( txn, table="receipts_graph", - values=[ - { - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, - "event_id": event_id, - } - for event_id in event_ids - ], + values={ + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_ids": json.dumps(event_ids), + "data": json.dumps(data), + } ) diff --git a/synapse/storage/schema/delta/21/receipts.sql b/synapse/storage/schema/delta/21/receipts.sql index ac7738e371..2f64d609fc 100644 --- a/synapse/storage/schema/delta/21/receipts.sql +++ b/synapse/storage/schema/delta/21/receipts.sql @@ -18,11 +18,9 @@ CREATE TABLE IF NOT EXISTS receipts_graph( room_id TEXT NOT NULL, receipt_type TEXT NOT NULL, user_id TEXT NOT NULL, - event_id TEXT NOT NULL -); - -CREATE INDEX receipts_graph_room_tuple ON receipts_graph( - room_id, receipt_type, user_id + event_ids TEXT NOT NULL, + data TEXT NOT NULL, + CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id) ); CREATE TABLE IF NOT EXISTS receipts_linearized ( @@ -30,11 +28,9 @@ CREATE TABLE IF NOT EXISTS receipts_linearized ( room_id TEXT NOT NULL, receipt_type TEXT NOT NULL, user_id TEXT NOT NULL, - event_id TEXT NOT NULL -); - -CREATE INDEX receipts_linearized_room_tuple ON receipts_linearized( - room_id, receipt_type, user_id + event_id TEXT NOT NULL, + data TEXT NOT NULL, + CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id) ); CREATE INDEX receipts_linearized_id ON receipts_linearized( -- cgit 1.5.1 From c2d08ca62aebdfa689ae7d39ff74b2a43e4b5e3a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Jul 2015 13:15:34 +0100 Subject: Integer timestamps --- synapse/handlers/receipts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 8a052f071b..403c1c8491 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -53,7 +53,7 @@ class ReceiptsHandler(BaseHandler): "user_id": user_id, "event_ids": [event_id], "data": { - "ts": self.clock.time_msec() + "ts": int(self.clock.time_msec()), } } -- cgit 1.5.1 From f0979afdb0b0b8f96730a97cf0b51de6024cabda Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Jul 2015 16:02:07 +0100 Subject: Remove spurious comment --- synapse/handlers/receipts.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 403c1c8491..f847360d0c 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Contains handlers for federation events.""" - from ._base import BaseHandler from twisted.internet import defer -- cgit 1.5.1 From b5f0d73ea3f6611c0980a03a0dfe57058071013e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Jul 2015 17:09:26 +0100 Subject: Add comment --- synapse/handlers/federation.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index cd3867ed9c..d7f197f247 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -242,6 +242,10 @@ class FederationHandler(BaseHandler): if history: visibility = history.content.get("history_visibility", "shared") if visibility in ["invited", "joined"]: + # We now loop through all state events looking for + # membership states for the requesting server to determine + # if the server is either in the room or has been invited + # into the room. for ev in state.values(): if ev.type != EventTypes.Member: continue -- cgit 1.5.1 From e5991af629df2e63c20c5f10e4589a9faf8305cb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 13 Jul 2015 13:30:43 +0100 Subject: Comments --- synapse/handlers/receipts.py | 16 ++++++++++++---- synapse/storage/receipts.py | 11 +++++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index f847360d0c..1925a48039 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -41,10 +41,9 @@ class ReceiptsHandler(BaseHandler): @defer.inlineCallbacks def received_client_receipt(self, room_id, receipt_type, user_id, event_id): - # 1. Persist. - # 2. Notify local clients - # 3. Notify remote servers - + """Called when a client tells us a local user has read up to the given + event_id in the room. + """ receipt = { "room_id": room_id, "receipt_type": receipt_type, @@ -62,6 +61,8 @@ class ReceiptsHandler(BaseHandler): @defer.inlineCallbacks def _received_remote_receipt(self, origin, content): + """Called when we receive an EDU of type m.receipt from a remote HS. + """ receipts = [ { "room_id": room_id, @@ -79,6 +80,8 @@ class ReceiptsHandler(BaseHandler): @defer.inlineCallbacks def _handle_new_receipts(self, receipts): + """Takes a list of receipts, stores them and informs the notifier. + """ for receipt in receipts: room_id = receipt["room_id"] receipt_type = receipt["receipt_type"] @@ -105,6 +108,9 @@ class ReceiptsHandler(BaseHandler): @defer.inlineCallbacks def _push_remotes(self, receipts): + """Given a list of receipts, works out which remote servers should be + poked and pokes them. + """ # TODO: Some of this stuff should be coallesced. for receipt in receipts: room_id = receipt["room_id"] @@ -140,6 +146,8 @@ class ReceiptsHandler(BaseHandler): @defer.inlineCallbacks def get_receipts_for_room(self, room_id, to_key): + """Gets all receipts for a room, upto the given key. + """ result = yield self.store.get_linearized_receipts_for_room( room_id, None, to_key ) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 593032713d..56b9fedfd8 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -35,6 +35,8 @@ class ReceiptsStore(SQLBaseStore): @defer.inlineCallbacks def get_linearized_receipts_for_rooms(self, room_ids, from_key, to_key): + """Get receipts for multiple rooms for sending to clients. + """ room_ids = set(room_ids) if from_key: @@ -54,6 +56,8 @@ class ReceiptsStore(SQLBaseStore): @defer.inlineCallbacks def get_linearized_receipts_for_room(self, room_id, from_key, to_key): + """Get receipts for a single room for sending to clients. + """ def f(txn): if from_key: sql = ( @@ -107,6 +111,8 @@ class ReceiptsStore(SQLBaseStore): @cached @defer.inlineCallbacks def get_graph_receipts_for_room(self, room_id): + """Get receipts for sending to remote servers. + """ rows = yield self._simple_select_list( table="receipts_graph", keyvalues={"room_id": room_id}, @@ -181,6 +187,11 @@ class ReceiptsStore(SQLBaseStore): @defer.inlineCallbacks def insert_receipt(self, room_id, receipt_type, user_id, event_ids, data): + """Insert a receipt, either from local client or remote server. + + Automatically does conversion between linearized and graph + representations. + """ if not event_ids: return -- cgit 1.5.1 From d5cc7945985e4cbf17c7dbf4c7c45071b87d030e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 13 Jul 2015 16:48:06 +0100 Subject: Implement presets at room creation --- synapse/api/constants.py | 5 +++ synapse/handlers/room.py | 82 +++++++++++++++++++++++++++++++++++------------- 2 files changed, 65 insertions(+), 22 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 3e15e8a9d7..885b9c3596 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -87,3 +87,8 @@ class RejectedReason(object): AUTH_ERROR = "auth_error" REPLACED = "replaced" NOT_ANCESTOR = "not_ancestor" + + +class RoomCreationPreset(object): + PrivateChat = "private_chat" + PublicChat = "public_chat" diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 891707df44..c654395437 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -19,7 +19,9 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.types import UserID, RoomAlias, RoomID -from synapse.api.constants import EventTypes, Membership, JoinRules +from synapse.api.constants import ( + EventTypes, Membership, JoinRules, RoomCreationPreset, +) from synapse.api.errors import StoreError, SynapseError from synapse.util import stringutils, unwrapFirstError from synapse.util.async import run_on_reactor @@ -33,6 +35,19 @@ logger = logging.getLogger(__name__) class RoomCreationHandler(BaseHandler): + PRESETS_DICT = { + RoomCreationPreset.PrivateChat: { + "join_rules": JoinRules.INVITE, + "history_visibility": "invited", + "everyone_ops": False, + }, + RoomCreationPreset.PublicChat: { + "join_rules": JoinRules.PUBLIC, + "history_visibility": "shared", + "everyone_ops": False, + }, + } + @defer.inlineCallbacks def create_room(self, user_id, room_id, config): """ Creates a new room. @@ -121,9 +136,18 @@ class RoomCreationHandler(BaseHandler): servers=[self.hs.hostname], ) + preset_config = config.get( + "preset", + RoomCreationPreset.PublicChat + if is_public + else RoomCreationPreset.PrivateChat + ) + user = UserID.from_string(user_id) creation_events = self._create_events_for_new_room( - user, room_id, is_public=is_public + user, room_id, + preset_config=preset_config, + invite_list=invite_list, ) msg_handler = self.hs.get_handlers().message_handler @@ -170,7 +194,10 @@ class RoomCreationHandler(BaseHandler): defer.returnValue(result) - def _create_events_for_new_room(self, creator, room_id, is_public=False): + def _create_events_for_new_room(self, creator, room_id, preset_config, + invite_list): + config = RoomCreationHandler.PRESETS_DICT[preset_config] + creator_id = creator.to_string() event_keys = { @@ -203,37 +230,48 @@ class RoomCreationHandler(BaseHandler): }, ) + power_level_content = { + "users": { + creator.to_string(): 100, + }, + "users_default": 0, + "events": { + EventTypes.Name: 100, + EventTypes.PowerLevels: 100, + EventTypes.RoomHistoryVisibility: 100, + }, + "events_default": 0, + "state_default": 50, + "ban": 50, + "kick": 50, + "redact": 50, + "invite": 0, + } + + if config["everyone_ops"]: + for invitee in invite_list: + power_level_content["users"][invitee] = 100 + power_levels_event = create( etype=EventTypes.PowerLevels, - content={ - "users": { - creator.to_string(): 100, - }, - "users_default": 0, - "events": { - EventTypes.Name: 100, - EventTypes.PowerLevels: 100, - EventTypes.RoomHistoryVisibility: 100, - }, - "events_default": 0, - "state_default": 50, - "ban": 50, - "kick": 50, - "redact": 50, - "invite": 0, - }, + content=power_level_content, ) - join_rule = JoinRules.PUBLIC if is_public else JoinRules.INVITE join_rules_event = create( etype=EventTypes.JoinRules, - content={"join_rule": join_rule}, + content={"join_rule": config["join_rules"]}, + ) + + history_event = create( + etype=EventTypes.RoomHistoryVisibility, + content={"history_visibility": config["history_visibility"]} ) return [ creation_event, join_event, power_levels_event, + history_event, join_rules_event, ] -- cgit 1.5.1 From 4624d6035e28c4ee05e38234f2aa1671b4ac701a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 Jul 2015 10:19:07 +0100 Subject: Docs --- synapse/handlers/receipts.py | 11 ++++++++--- synapse/storage/receipts.py | 31 ++++++++++++++++++++++++++++--- 2 files changed, 36 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 1925a48039..5b3df6932b 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -149,7 +149,8 @@ class ReceiptsHandler(BaseHandler): """Gets all receipts for a room, upto the given key. """ result = yield self.store.get_linearized_receipts_for_room( - room_id, None, to_key + room_id, + to_key=to_key, ) if not result: @@ -176,7 +177,9 @@ class ReceiptEventSource(object): rooms = yield self.store.get_rooms_for_user(user.to_string()) rooms = [room.room_id for room in rooms] events = yield self.store.get_linearized_receipts_for_rooms( - rooms, from_key, to_key + rooms, + from_key=from_key, + to_key=to_key, ) defer.returnValue((events, to_key)) @@ -196,7 +199,9 @@ class ReceiptEventSource(object): rooms = yield self.store.get_rooms_for_user(user.to_string()) rooms = [room.room_id for room in rooms] events = yield self.store.get_linearized_receipts_for_rooms( - rooms, from_key, to_key + rooms, + from_key=from_key, + to_key=to_key, ) defer.returnValue((events, to_key)) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 56b9fedfd8..d515a0a15c 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -34,8 +34,17 @@ class ReceiptsStore(SQLBaseStore): self._receipts_stream_cache = _RoomStreamChangeCache() @defer.inlineCallbacks - def get_linearized_receipts_for_rooms(self, room_ids, from_key, to_key): + def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): """Get receipts for multiple rooms for sending to clients. + + Args: + room_ids (list): List of room_ids. + to_key (int): Max stream id to fetch receipts upto. + from_key (int): Min stream id to fetch receipts from. None fetches + from the start. + + Returns: + list: A list of receipts. """ room_ids = set(room_ids) @@ -46,7 +55,9 @@ class ReceiptsStore(SQLBaseStore): results = yield defer.gatherResults( [ - self.get_linearized_receipts_for_room(room_id, from_key, to_key) + self.get_linearized_receipts_for_room( + room_id, to_key, from_key=from_key + ) for room_id in room_ids ], consumeErrors=True, @@ -55,8 +66,17 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue([ev for res in results for ev in res]) @defer.inlineCallbacks - def get_linearized_receipts_for_room(self, room_id, from_key, to_key): + def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): """Get receipts for a single room for sending to clients. + + Args: + room_ids (str): The room id. + to_key (int): Max stream id to fetch receipts upto. + from_key (int): Min stream id to fetch receipts from. None fetches + from the start. + + Returns: + list: A list of receipts. """ def f(txn): if from_key: @@ -288,6 +308,9 @@ class _RoomStreamChangeCache(object): @defer.inlineCallbacks def get_rooms_changed(self, store, room_ids, key): + """Returns subset of room ids that have had new receipts since the + given key. If the key is too old it will just return the given list. + """ if key > (yield self._get_earliest_key(store)): keys = self._cache.keys() i = keys.bisect_right(key) @@ -302,6 +325,8 @@ class _RoomStreamChangeCache(object): @defer.inlineCallbacks def room_has_changed(self, store, room_id, key): + """Informs the cache that the room has been changed at the given key. + """ if key > (yield self._get_earliest_key(store)): old_key = self._room_to_key.get(room_id, None) if old_key: -- cgit 1.5.1 From b49a30a972470914531c89cd481252e414b22d0e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 Jul 2015 10:20:31 +0100 Subject: Capitalize contants --- synapse/api/constants.py | 4 ++-- synapse/handlers/room.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 885b9c3596..7156ee4e7d 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -90,5 +90,5 @@ class RejectedReason(object): class RoomCreationPreset(object): - PrivateChat = "private_chat" - PublicChat = "public_chat" + PRIVATE_CHAT = "private_chat" + PUBLIC_CHAT = "public_chat" diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index c654395437..11f531dc75 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -36,12 +36,12 @@ logger = logging.getLogger(__name__) class RoomCreationHandler(BaseHandler): PRESETS_DICT = { - RoomCreationPreset.PrivateChat: { + RoomCreationPreset.PRIVATE_CHAT: { "join_rules": JoinRules.INVITE, "history_visibility": "invited", "everyone_ops": False, }, - RoomCreationPreset.PublicChat: { + RoomCreationPreset.PUBLIC_CHAT: { "join_rules": JoinRules.PUBLIC, "history_visibility": "shared", "everyone_ops": False, @@ -138,9 +138,9 @@ class RoomCreationHandler(BaseHandler): preset_config = config.get( "preset", - RoomCreationPreset.PublicChat + RoomCreationPreset.PUBLIC_CHAT if is_public - else RoomCreationPreset.PrivateChat + else RoomCreationPreset.PRIVATE_CHAT ) user = UserID.from_string(user_id) -- cgit 1.5.1 From 002a44ac1a59f101e3df18dee5ef7c3ad87440ed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 Jul 2015 10:37:42 +0100 Subject: s/everyone_ops/original_invitees_have_ops/ --- synapse/handlers/room.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 11f531dc75..c081efee3a 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -39,12 +39,12 @@ class RoomCreationHandler(BaseHandler): RoomCreationPreset.PRIVATE_CHAT: { "join_rules": JoinRules.INVITE, "history_visibility": "invited", - "everyone_ops": False, + "original_invitees_have_ops": False, }, RoomCreationPreset.PUBLIC_CHAT: { "join_rules": JoinRules.PUBLIC, "history_visibility": "shared", - "everyone_ops": False, + "original_invitees_have_ops": False, }, } @@ -248,7 +248,7 @@ class RoomCreationHandler(BaseHandler): "invite": 0, } - if config["everyone_ops"]: + if config["original_invitees_have_ops"]: for invitee in invite_list: power_level_content["users"][invitee] = 100 -- cgit 1.5.1 From 4da05fa0ae32425ce2755dcd479bb4c97f43b30e Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 15 Jul 2015 19:28:03 +0100 Subject: Add back in support for remembering parameters submitted to a user-interactive auth call. --- synapse/handlers/auth.py | 6 ++++-- synapse/rest/client/v2_alpha/register.py | 11 +++++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 63071653a3..1ecf7fef17 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -85,8 +85,10 @@ class AuthHandler(BaseHandler): # email auth link on there). It's probably too open to abuse # because it lets unauthenticated clients store arbitrary objects # on a home server. - # sess['clientdict'] = clientdict - # self._save_session(sess) + # Revisit: Assumimg the REST APIs do sensible validation, the data + # isn't arbintrary. + sess['clientdict'] = clientdict + self._save_session(sess) pass elif 'clientdict' in sess: clientdict = sess['clientdict'] diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 72dfb876c5..fa44572b7b 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -57,10 +57,17 @@ class RegisterRestServlet(RestServlet): yield run_on_reactor() body = parse_request_allow_empty(request) - if 'password' not in body: - raise SynapseError(400, "", Codes.MISSING_PARAM) + # we do basic sanity checks here because the auth layerwill store these in sessions + if 'password' in body: + print "%r" % (body['password']) + if (not isinstance(body['password'], str) and + not isinstance(body['password'], unicode)) or len(body['password']) > 512: + raise SynapseError(400, "Invalid password") if 'username' in body: + if (not isinstance(body['username'], str) and + not isinstance(body['username'], unicode)) or len(body['username']) > 512: + raise SynapseError(400, "Invalid username") desired_username = body['username'] yield self.registration_handler.check_username(desired_username) -- cgit 1.5.1 From c456d17daf1457bb341bd3c64c616d500c6c2517 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Jul 2015 15:25:29 +0100 Subject: Implement specifying custom initial state for /createRoom --- synapse/handlers/room.py | 100 ++++++++++++++++++++++++++++------------------- 1 file changed, 60 insertions(+), 40 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index c081efee3a..7511d294f3 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -27,6 +27,7 @@ from synapse.util import stringutils, unwrapFirstError from synapse.util.async import run_on_reactor from synapse.events.utils import serialize_event +from collections import OrderedDict import logging import string @@ -143,11 +144,18 @@ class RoomCreationHandler(BaseHandler): else RoomCreationPreset.PRIVATE_CHAT ) + raw_initial_state = config.get("initial_state", []) + + initial_state = OrderedDict() + for val in raw_initial_state: + initial_state[(val["type"], val.get("state_key", ""))] = val["content"] + user = UserID.from_string(user_id) creation_events = self._create_events_for_new_room( user, room_id, preset_config=preset_config, invite_list=invite_list, + initial_state=initial_state, ) msg_handler = self.hs.get_handlers().message_handler @@ -195,7 +203,7 @@ class RoomCreationHandler(BaseHandler): defer.returnValue(result) def _create_events_for_new_room(self, creator, room_id, preset_config, - invite_list): + invite_list, initial_state): config = RoomCreationHandler.PRESETS_DICT[preset_config] creator_id = creator.to_string() @@ -230,50 +238,62 @@ class RoomCreationHandler(BaseHandler): }, ) - power_level_content = { - "users": { - creator.to_string(): 100, - }, - "users_default": 0, - "events": { - EventTypes.Name: 100, - EventTypes.PowerLevels: 100, - EventTypes.RoomHistoryVisibility: 100, - }, - "events_default": 0, - "state_default": 50, - "ban": 50, - "kick": 50, - "redact": 50, - "invite": 0, - } + returned_events = [creation_event, join_event] + + if (EventTypes.PowerLevels, '') not in initial_state: + power_level_content = { + "users": { + creator.to_string(): 100, + }, + "users_default": 0, + "events": { + EventTypes.Name: 100, + EventTypes.PowerLevels: 100, + EventTypes.RoomHistoryVisibility: 100, + }, + "events_default": 0, + "state_default": 50, + "ban": 50, + "kick": 50, + "redact": 50, + "invite": 0, + } - if config["original_invitees_have_ops"]: - for invitee in invite_list: - power_level_content["users"][invitee] = 100 + if config["original_invitees_have_ops"]: + for invitee in invite_list: + power_level_content["users"][invitee] = 100 - power_levels_event = create( - etype=EventTypes.PowerLevels, - content=power_level_content, - ) + power_levels_event = create( + etype=EventTypes.PowerLevels, + content=power_level_content, + ) - join_rules_event = create( - etype=EventTypes.JoinRules, - content={"join_rule": config["join_rules"]}, - ) + returned_events.append(power_levels_event) - history_event = create( - etype=EventTypes.RoomHistoryVisibility, - content={"history_visibility": config["history_visibility"]} - ) + if (EventTypes.JoinRules, '') not in initial_state: + join_rules_event = create( + etype=EventTypes.JoinRules, + content={"join_rule": config["join_rules"]}, + ) - return [ - creation_event, - join_event, - power_levels_event, - history_event, - join_rules_event, - ] + returned_events.append(join_rules_event) + + if (EventTypes.RoomHistoryVisibility, '') not in initial_state: + history_event = create( + etype=EventTypes.RoomHistoryVisibility, + content={"history_visibility": config["history_visibility"]} + ) + + returned_events.append(history_event) + + for (etype, state_key), content in initial_state.items(): + returned_events.append(create( + etype=etype, + state_key=state_key, + content=content, + )) + + return returned_events class RoomMemberHandler(BaseHandler): -- cgit 1.5.1 From 4cab2cfa34ceb384847d8bd5dbe15147b27dc370 Mon Sep 17 00:00:00 2001 From: Matrix Date: Sat, 18 Jul 2015 19:07:12 +0100 Subject: Don't do any database hits in receipt handling if from_key == to_key --- synapse/handlers/receipts.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 5b3df6932b..86c911c4bf 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -174,6 +174,9 @@ class ReceiptEventSource(object): from_key = int(from_key) to_key = yield self.get_current_key() + if from_key == to_key: + defer.returnValue(([], to_key)) + rooms = yield self.store.get_rooms_for_user(user.to_string()) rooms = [room.room_id for room in rooms] events = yield self.store.get_linearized_receipts_for_rooms( -- cgit 1.5.1 From 4e2e67fd506899e0c64b5fdfcac91e660de4260b Mon Sep 17 00:00:00 2001 From: Matrix Date: Wed, 22 Jul 2015 16:13:46 +0100 Subject: Disable receipts for now --- synapse/handlers/receipts.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 86c911c4bf..415dd339f6 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -171,6 +171,7 @@ class ReceiptEventSource(object): @defer.inlineCallbacks def get_new_events_for_user(self, user, from_key, limit): + defer.returnValue(([], from_key)) from_key = int(from_key) to_key = yield self.get_current_key() @@ -193,6 +194,7 @@ class ReceiptEventSource(object): @defer.inlineCallbacks def get_pagination_rows(self, user, config, key): to_key = int(config.from_key) + defer.returnValue(([], to_key)) if config.to_key: from_key = int(config.to_key) -- cgit 1.5.1 From a4d62ba36afc54d4e60f1371fe9b31e8b8e6834c Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 28 Jul 2015 17:34:12 +0100 Subject: Fix v2_alpha registration. Add unit tests. V2 Registration forced everyone (including ASes) to create a password for a user, when ASes should be able to omit passwords. Also unbreak AS registration in general which checked too early if the given username was claimed by an AS; it was checked before knowing if the AS was the one doing the registration! Add unit tests for AS reg, user reg and disabled_registration flag. --- synapse/handlers/register.py | 3 +- synapse/rest/client/v2_alpha/register.py | 124 ++++++++++++++------------ tests/rest/client/v2_alpha/test_register.py | 132 ++++++++++++++++++++++++++++ 3 files changed, 204 insertions(+), 55 deletions(-) create mode 100644 tests/rest/client/v2_alpha/test_register.py (limited to 'synapse/handlers') diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index a1288b4252..f81d75017d 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -73,7 +73,8 @@ class RegistrationHandler(BaseHandler): localpart : The local part of the user ID to register. If None, one will be randomly generated. password (str) : The password to assign to this user so they can - login again. + login again. This can be None which means they cannot login again + via a password (e.g. the user is an application service user). Returns: A tuple of (user_id, access_token). Raises: diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 0c737d73b8..e1c42dd51e 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -19,7 +19,7 @@ from synapse.api.constants import LoginType from synapse.api.errors import SynapseError, Codes from synapse.http.servlet import RestServlet -from ._base import client_v2_pattern, parse_request_allow_empty +from ._base import client_v2_pattern, parse_json_dict_from_request import logging import hmac @@ -55,30 +55,52 @@ class RegisterRestServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request): yield run_on_reactor() + body = parse_json_dict_from_request(request) - body = parse_request_allow_empty(request) - # we do basic sanity checks here because the auth - # layer will store these in sessions + # we do basic sanity checks here because the auth layer will store these + # in sessions. Pull out the username/password provided to us. + desired_password = None if 'password' in body: - if ((not isinstance(body['password'], str) and - not isinstance(body['password'], unicode)) or + if (not isinstance(body['password'], basestring) or len(body['password']) > 512): raise SynapseError(400, "Invalid password") + desired_password = body["password"] + desired_username = None if 'username' in body: - if ((not isinstance(body['username'], str) and - not isinstance(body['username'], unicode)) or + if (not isinstance(body['username'], basestring) or len(body['username']) > 512): raise SynapseError(400, "Invalid username") desired_username = body['username'] - yield self.registration_handler.check_username(desired_username) - is_using_shared_secret = False - is_application_server = False - - service = None + appservice = None if 'access_token' in request.args: - service = yield self.auth.get_appservice_by_req(request) + appservice = yield self.auth.get_appservice_by_req(request) + + # fork off as soon as possible for ASes and shared secret auth which + # have completely different registration flows to normal users + + # == Application Service Registration == + if appservice: + result = yield self._do_appservice_registration(desired_username) + defer.returnValue((200, result)) # we throw for non 200 responses + return + + # == Shared Secret Registration == (e.g. create new user scripts) + if 'mac' in body: + # FIXME: Should we really be determining if this is shared secret + # auth based purely on the 'mac' key? + result = yield self._do_shared_secret_registration( + desired_username, desired_password, body["mac"] + ) + defer.returnValue((200, result)) # we throw for non 200 responses + return + + # == Normal User Registration == (everyone else) + if self.hs.config.disable_registration: + raise SynapseError(403, "Registration has been disabled") + + yield self.registration_handler.check_username(desired_username) if self.hs.config.enable_registration_captcha: flows = [ @@ -91,39 +113,20 @@ class RegisterRestServlet(RestServlet): [LoginType.EMAIL_IDENTITY] ] - result = None - if service: - is_application_server = True - params = body - elif 'mac' in body: - # Check registration-specific shared secret auth - if 'username' not in body: - raise SynapseError(400, "", Codes.MISSING_PARAM) - self._check_shared_secret_auth( - body['username'], body['mac'] - ) - is_using_shared_secret = True - params = body - else: - authed, result, params = yield self.auth_handler.check_auth( - flows, body, self.hs.get_ip_from_request(request) - ) - - if not authed: - defer.returnValue((401, result)) - - can_register = ( - not self.hs.config.disable_registration - or is_application_server - or is_using_shared_secret + authed, result, params = yield self.auth_handler.check_auth( + flows, body, self.hs.get_ip_from_request(request) ) - if not can_register: - raise SynapseError(403, "Registration has been disabled") + if not authed: + defer.returnValue((401, result)) + return + + # NB: This may be from the auth handler and NOT from the POST if 'password' not in params: - raise SynapseError(400, "", Codes.MISSING_PARAM) - desired_username = params['username'] if 'username' in params else None - new_password = params['password'] + raise SynapseError(400, "Missing password.", Codes.MISSING_PARAM) + + desired_username = params.get("username", None) + new_password = params.get("password", None) (user_id, token) = yield self.registration_handler.register( localpart=desired_username, @@ -156,18 +159,21 @@ class RegisterRestServlet(RestServlet): else: logger.info("bind_email not specified: not binding email") - result = { - "user_id": user_id, - "access_token": token, - "home_server": self.hs.hostname, - } - + result = self._create_registration_details(user_id, token) defer.returnValue((200, result)) def on_OPTIONS(self, _): return 200, {} - def _check_shared_secret_auth(self, username, mac): + @defer.inlineCallbacks + def _do_appservice_registration(self, username): + (user_id, token) = yield self.registration_handler.register( + localpart=username + ) + defer.returnValue(self._create_registration_details(user_id, token)) + + @defer.inlineCallbacks + def _do_shared_secret_registration(self, username, password, mac): if not self.hs.config.registration_shared_secret: raise SynapseError(400, "Shared secret registration is not enabled") @@ -183,13 +189,23 @@ class RegisterRestServlet(RestServlet): digestmod=sha1, ).hexdigest() - if compare_digest(want_mac, got_mac): - return True - else: + if not compare_digest(want_mac, got_mac): raise SynapseError( 403, "HMAC incorrect", ) + (user_id, token) = yield self.registration_handler.register( + localpart=username, password=password + ) + defer.returnValue(self._create_registration_details(user_id, token)) + + def _create_registration_details(self, user_id, token): + return { + "user_id": user_id, + "access_token": token, + "home_server": self.hs.hostname, + } + def register_servlets(hs, http_server): RegisterRestServlet(hs).register(http_server) diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py new file mode 100644 index 0000000000..3edc2ec2e9 --- /dev/null +++ b/tests/rest/client/v2_alpha/test_register.py @@ -0,0 +1,132 @@ +from synapse.rest.client.v2_alpha.register import RegisterRestServlet +from synapse.api.errors import SynapseError +from twisted.internet import defer +from mock import Mock, MagicMock +from tests import unittest +import json + + +class RegisterRestServletTestCase(unittest.TestCase): + + def setUp(self): + # do the dance to hook up request data to self.request_data + self.request_data = "" + self.request = Mock( + content=Mock(read=Mock(side_effect=lambda: self.request_data)), + ) + self.request.args = {} + + self.appservice = None + self.auth = Mock(get_appservice_by_req=Mock( + side_effect=lambda x: defer.succeed(self.appservice)) + ) + + self.auth_result = (False, None, None) + self.auth_handler = Mock( + check_auth=Mock(side_effect=lambda x,y,z: self.auth_result) + ) + self.registration_handler = Mock() + self.identity_handler = Mock() + self.login_handler = Mock() + + # do the dance to hook it up to the hs global + self.handlers = Mock( + auth_handler=self.auth_handler, + registration_handler=self.registration_handler, + identity_handler=self.identity_handler, + login_handler=self.login_handler + ) + self.hs = Mock() + self.hs.hostname = "superbig~testing~thing.com" + self.hs.get_auth = Mock(return_value=self.auth) + self.hs.get_handlers = Mock(return_value=self.handlers) + self.hs.config.disable_registration = False + + # init the thing we're testing + self.servlet = RegisterRestServlet(self.hs) + + @defer.inlineCallbacks + def test_POST_appservice_registration_valid(self): + user_id = "@kermit:muppet" + token = "kermits_access_token" + self.request.args = { + "access_token": "i_am_an_app_service" + } + self.request_data = json.dumps({ + "username": "kermit" + }) + self.appservice = { + "id": "1234" + } + self.registration_handler.register = Mock(return_value=(user_id, token)) + result = yield self.servlet.on_POST(self.request) + self.assertEquals(result, (200, { + "user_id": user_id, + "access_token": token, + "home_server": self.hs.hostname + })) + + @defer.inlineCallbacks + def test_POST_appservice_registration_invalid(self): + self.request.args = { + "access_token": "i_am_an_app_service" + } + self.request_data = json.dumps({ + "username": "kermit" + }) + self.appservice = None # no application service exists + result = yield self.servlet.on_POST(self.request) + self.assertEquals(result, (401, None)) + + def test_POST_bad_password(self): + self.request_data = json.dumps({ + "username": "kermit", + "password": 666 + }) + d = self.servlet.on_POST(self.request) + return self.assertFailure(d, SynapseError) + + def test_POST_bad_username(self): + self.request_data = json.dumps({ + "username": 777, + "password": "monkey" + }) + d = self.servlet.on_POST(self.request) + return self.assertFailure(d, SynapseError) + + @defer.inlineCallbacks + def test_POST_user_valid(self): + user_id = "@kermit:muppet" + token = "kermits_access_token" + self.request_data = json.dumps({ + "username": "kermit", + "password": "monkey" + }) + self.registration_handler.check_username = Mock(return_value=True) + self.auth_result = (True, None, { + "username": "kermit", + "password": "monkey" + }) + self.registration_handler.register = Mock(return_value=(user_id, token)) + + result = yield self.servlet.on_POST(self.request) + self.assertEquals(result, (200, { + "user_id": user_id, + "access_token": token, + "home_server": self.hs.hostname + })) + + def test_POST_disabled_registration(self): + self.hs.config.disable_registration = True + self.request_data = json.dumps({ + "username": "kermit", + "password": "monkey" + }) + self.registration_handler.check_username = Mock(return_value=True) + self.auth_result = (True, None, { + "username": "kermit", + "password": "monkey" + }) + self.registration_handler.register = Mock(return_value=("@user:id", "t")) + d = self.servlet.on_POST(self.request) + return self.assertFailure(d, SynapseError) \ No newline at end of file -- cgit 1.5.1 From 28d07a02e42c90732b708de870af2f9b21ba13f0 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 3 Aug 2015 15:31:21 +0100 Subject: Add vector.im as trusted ID server --- synapse/handlers/identity.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 6200e10775..c1095708a0 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -44,7 +44,7 @@ class IdentityHandler(BaseHandler): http_client = SimpleHttpClient(self.hs) # XXX: make this configurable! # trustedIdServers = ['matrix.org', 'localhost:8090'] - trustedIdServers = ['matrix.org'] + trustedIdServers = ['matrix.org', 'vector.im'] if 'id_server' in creds: id_server = creds['id_server'] -- cgit 1.5.1 From 4d6cb8814e134eba644afeed7bd49df0c7951342 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Aug 2015 09:32:23 +0100 Subject: Speed up event filtering (for ACL) logic --- synapse/handlers/federation.py | 6 ++- synapse/handlers/message.py | 6 ++- synapse/handlers/sync.py | 6 ++- synapse/storage/_base.py | 10 +++- synapse/storage/state.py | 117 ++++++++++++++++++++++++++++------------- 5 files changed, 102 insertions(+), 43 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f7155fd8d3..22f534e49a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -230,7 +230,11 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): states = yield self.store.get_state_for_events( - room_id, [e.event_id for e in events], + room_id, frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, None), + ) ) events_and_states = zip(events, states) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 9d6d4f0978..765b14d994 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -138,7 +138,11 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_client(self, user_id, room_id, events): states = yield self.store.get_state_for_events( - room_id, [e.event_id for e in events], + room_id, frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, user_id), + ) ) events_and_states = zip(events, states) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6cff6230c1..8f58774b31 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -295,7 +295,11 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_client(self, user_id, room_id, events): states = yield self.store.get_state_for_events( - room_id, [e.event_id for e in events], + room_id, frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, user_id), + ) ) events_and_states = zip(events, states) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 8f812f0fd7..7b76ee3b73 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -71,6 +71,11 @@ class Cache(object): self.thread = None caches_by_name[name] = self.cache + class Sentinel(object): + __slots__ = [] + + self.sentinel = Sentinel() + def check_thread(self): expected_thread = self.thread if expected_thread is None: @@ -85,9 +90,10 @@ class Cache(object): if len(keyargs) != self.keylen: raise ValueError("Expected a key to have %d items", self.keylen) - if keyargs in self.cache: + val = self.cache.get(keyargs, self.sentinel) + if val is not self.sentinel: cache_counter.inc_hits(self.name) - return self.cache[keyargs] + return val cache_counter.inc_misses(self.name) raise KeyError() diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 47bec65497..7e9bd232cf 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -17,6 +17,7 @@ from ._base import SQLBaseStore, cached from twisted.internet import defer +from synapse.util import unwrapFirstError from synapse.util.stringutils import random_string import logging @@ -206,62 +207,102 @@ class StateStore(SQLBaseStore): events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) - @defer.inlineCallbacks - def get_state_for_events(self, room_id, event_ids): + @cached(num_args=3, lru=True) + def _get_state_groups_from_group(self, room_id, group, types): def f(txn): - groups = set() - event_to_group = {} - for event_id in event_ids: - # TODO: Remove this loop. - group = self._simple_select_one_onecol_txn( - txn, - table="event_to_state_groups", - keyvalues={"event_id": event_id}, - retcol="state_group", - allow_none=True, - ) - if group: - event_to_group[event_id] = group - groups.add(group) - - group_to_state_ids = {} - for group in groups: - state_ids = self._simple_select_onecol_txn( - txn, - table="state_groups_state", - keyvalues={"state_group": group}, - retcol="event_id", - ) + sql = ( + "SELECT event_id FROM state_groups_state WHERE" + " room_id = ? AND state_group = ? AND (%s)" + ) % (" OR ".join(["(type = ? AND state_key = ?)"] * len(types)),) - group_to_state_ids[group] = state_ids + args = [room_id, group] + args.extend([i for typ in types for i in typ]) + txn.execute(sql, args) - return event_to_group, group_to_state_ids + return group, [ + r[0] + for r in txn.fetchall() + ] - res = yield self.runInteraction( - "annotate_events_with_state_groups", + return self.runInteraction( + "_get_state_groups_from_group", f, ) - event_to_group, group_to_state_ids = res + @cached(num_args=3, lru=True, max_entries=100000) + def _get_state_for_event_id(self, room_id, event_id, types): + def f(txn): + type_and_state_sql = " OR ".join([ + "(type = ? AND state_key = ?)" + if typ[1] is not None + else "type = ?" + for typ in types + ]) - state_list = yield defer.gatherResults( + sql = ( + "SELECT sg.event_id FROM state_groups_state as sg" + " INNER JOIN event_to_state_groups as e" + " ON e.state_group = sg.state_group" + " WHERE e.event_id = ? AND (%s)" + ) % (type_and_state_sql,) + + args = [event_id] + for typ, state_key in types: + args.extend( + [typ, state_key] if state_key is not None else [typ] + ) + txn.execute(sql, args) + + return event_id, [ + r[0] + for r in txn.fetchall() + ] + + return self.runInteraction( + "_get_state_for_event_id", + f, + ) + + @defer.inlineCallbacks + def get_state_for_events(self, room_id, event_ids, types): + set_types = frozenset(types) + res = yield defer.gatherResults( [ - self._fetch_events_for_group(group, vals) - for group, vals in group_to_state_ids.items() + self._get_state_for_event_id( + room_id, event_id, set_types, + ) + for event_id in event_ids ], consumeErrors=True, + ).addErrback(unwrapFirstError) + + event_to_state_ids = dict(res) + + event_dict = yield self._get_events( + [ + item + for lst in event_to_state_ids.values() + for item in lst + ], + get_prev_content=False + ).addCallback( + lambda evs: {ev.event_id: ev for ev in evs} ) - state_dict = { - group: { + event_to_state = { + event_id: { (ev.type, ev.state_key): ev - for ev in state + for ev in [ + event_dict[state_id] + for state_id in state_ids + if state_id in event_dict + ] } - for group, state in state_list + for event_id, state_ids in event_to_state_ids.items() } defer.returnValue([ - state_dict.get(event_to_group.get(event, None), None) + event_to_state[event] for event in event_ids ]) -- cgit 1.5.1 From c77048e12f032842cebbb0f1a0639bb62db88418 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 4 Aug 2015 14:37:09 +0100 Subject: Add endpoint that proxies ID server request token and errors if the given email is in use on this Home Server. --- synapse/api/errors.py | 1 + synapse/handlers/identity.py | 25 ++++++++++++++++++++ synapse/rest/client/v2_alpha/register.py | 27 +++++++++++++++++++++- .../schema/delta/22/user_threepids_unique.sql | 19 +++++++++++++++ 4 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/22/user_threepids_unique.sql (limited to 'synapse/handlers') diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 0b3320e62c..c3b4d971a8 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -40,6 +40,7 @@ class Codes(object): TOO_LARGE = "M_TOO_LARGE" EXCLUSIVE = "M_EXCLUSIVE" THREEPID_AUTH_FAILED = "M_THREEPID_AUTH_FAILED" + THREEPID_IN_USE = "THREEPID_IN_USE" class CodeMessageException(RuntimeError): diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index c1095708a0..2a99921d5f 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -117,3 +117,28 @@ class IdentityHandler(BaseHandler): except CodeMessageException as e: data = json.loads(e.msg) defer.returnValue(data) + + @defer.inlineCallbacks + def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs): + yield run_on_reactor() + http_client = SimpleHttpClient(self.hs) + + params = { + 'email': email, + 'client_secret': client_secret, + 'send_attempt': send_attempt, + } + params.update(kwargs) + + try: + data = yield http_client.post_urlencoded_get_json( + "https://%s%s" % ( + id_server, + "/_matrix/identity/api/v1/validate/email/requestToken" + ), + params + ) + defer.returnValue(data) + except CodeMessageException as e: + logger.info("Proxied requestToken failed: %r", e) + raise e diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index b5926f9ca6..7b97a73df6 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -41,7 +41,7 @@ logger = logging.getLogger(__name__) class RegisterRestServlet(RestServlet): - PATTERN = client_v2_pattern("/register") + PATTERN = client_v2_pattern("/register*") def __init__(self, hs): super(RegisterRestServlet, self).__init__() @@ -55,6 +55,11 @@ class RegisterRestServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request): yield run_on_reactor() + + if '/register/email/requestToken' in request.path: + ret = yield self.onEmailTokenRequest(request) + defer.returnValue(ret) + body = parse_json_dict_from_request(request) # we do basic sanity checks here because the auth layer will store these @@ -209,6 +214,26 @@ class RegisterRestServlet(RestServlet): "home_server": self.hs.hostname, } + @defer.inlineCallbacks + def onEmailTokenRequest(self, request): + body = parse_json_dict_from_request(request) + + required = ['id_server', 'client_secret', 'email', 'send_attempt'] + absent = [] + for k in required: + if k not in body: + absent.append(k) + + existingUid = self.hs.get_datastore().get_user_id_by_threepid('email', body['email']) + if existingUid is not None: + raise SynapseError(400, "Email is already in use", Codes.THREEPID_IN_USE) + + if len(absent) > 0: + raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM) + + ret = yield self.identity_handler.requestEmailToken(**body) + defer.returnValue((200, ret)) + def register_servlets(hs, http_server): RegisterRestServlet(hs).register(http_server) diff --git a/synapse/storage/schema/delta/22/user_threepids_unique.sql b/synapse/storage/schema/delta/22/user_threepids_unique.sql new file mode 100644 index 0000000000..87edfa454c --- /dev/null +++ b/synapse/storage/schema/delta/22/user_threepids_unique.sql @@ -0,0 +1,19 @@ +CREATE TABLE IF NOT EXISTS user_threepids2 ( + user_id TEXT NOT NULL, + medium TEXT NOT NULL, + address TEXT NOT NULL, + validated_at BIGINT NOT NULL, + added_at BIGINT NOT NULL, + CONSTRAINT medium_address UNIQUE (medium, address) +); + +INSERT INTO user_threepids2 + SELECT * FROM user_threepids WHERE added_at IN ( + SELECT max(added_at) FROM user_threepids GROUP BY medium, address + ) +; + +DROP TABLE user_threepids; +ALTER TABLE user_threepids2 RENAME TO user_threepids; + +CREATE INDEX user_threepids_user_id ON user_threepids(user_id); -- cgit 1.5.1 From 07507643cb6a2fde1a87d229f8d77525627a0632 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Aug 2015 15:06:51 +0100 Subject: Use dictionary cache to do group -> state fetching --- synapse/handlers/federation.py | 2 +- synapse/state.py | 10 +- synapse/storage/_base.py | 39 +++++--- synapse/storage/state.py | 191 ++++++++++++++++++++++++++------------- synapse/storage/stream.py | 3 +- synapse/util/dictionary_cache.py | 58 +++++++----- tests/test_state.py | 2 +- 7 files changed, 195 insertions(+), 110 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 22f534e49a..90649af9e1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -507,7 +507,7 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) states = yield defer.gatherResults([ - self.state_handler.resolve_state_groups([e]) + self.state_handler.resolve_state_groups(room_id, [e]) for e in event_ids ]) states = dict(zip(event_ids, [s[1] for s in states])) diff --git a/synapse/state.py b/synapse/state.py index 80da90a72c..b5e5d7bbda 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -96,7 +96,7 @@ class StateHandler(object): cache.ts = self.clock.time_msec() state = cache.state else: - res = yield self.resolve_state_groups(event_ids) + res = yield self.resolve_state_groups(room_id, event_ids) state = res[1] if event_type: @@ -155,13 +155,13 @@ class StateHandler(object): if event.is_state(): ret = yield self.resolve_state_groups( - [e for e, _ in event.prev_events], + event.room_id, [e for e, _ in event.prev_events], event_type=event.type, state_key=event.state_key, ) else: ret = yield self.resolve_state_groups( - [e for e, _ in event.prev_events], + event.room_id, [e for e, _ in event.prev_events], ) group, curr_state, prev_state = ret @@ -180,7 +180,7 @@ class StateHandler(object): @defer.inlineCallbacks @log_function - def resolve_state_groups(self, event_ids, event_type=None, state_key=""): + def resolve_state_groups(self, room_id, event_ids, event_type=None, state_key=""): """ Given a list of event_ids this method fetches the state at each event, resolves conflicts between them and returns them. @@ -205,7 +205,7 @@ class StateHandler(object): ) state_groups = yield self.store.get_state_groups( - event_ids + room_id, event_ids ) logger.debug( diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7b76ee3b73..803b9d599d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -18,6 +18,7 @@ from synapse.api.errors import StoreError from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn, LoggingContext from synapse.util.lrucache import LruCache +from synapse.util.dictionary_cache import DictionaryCache import synapse.metrics from util.id_generators import IdGenerator, StreamIdGenerator @@ -87,23 +88,33 @@ class Cache(object): ) def get(self, *keyargs): - if len(keyargs) != self.keylen: - raise ValueError("Expected a key to have %d items", self.keylen) + try: + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) - val = self.cache.get(keyargs, self.sentinel) - if val is not self.sentinel: - cache_counter.inc_hits(self.name) - return val + val = self.cache.get(keyargs, self.sentinel) + if val is not self.sentinel: + cache_counter.inc_hits(self.name) + return val - cache_counter.inc_misses(self.name) - raise KeyError() + cache_counter.inc_misses(self.name) + raise KeyError() + except KeyError: + raise + except: + logger.exception("Cache.get failed for %s" % (self.name,)) + raise def update(self, sequence, *args): - self.check_thread() - if self.sequence == sequence: - # Only update the cache if the caches sequence number matches the - # number that the cache had before the SELECT was started (SYN-369) - self.prefill(*args) + try: + self.check_thread() + if self.sequence == sequence: + # Only update the cache if the caches sequence number matches the + # number that the cache had before the SELECT was started (SYN-369) + self.prefill(*args) + except: + logger.exception("Cache.update failed for %s" % (self.name,)) + raise def prefill(self, *args): # because I can't *keyargs, value keyargs = args[:-1] @@ -327,6 +338,8 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) + self._state_group_cache = DictionaryCache("*stateGroupCache*", 100000) + self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 91a5ae86a4..a967b3d44b 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -45,52 +45,38 @@ class StateStore(SQLBaseStore): """ @defer.inlineCallbacks - def get_state_groups(self, event_ids): + def get_state_groups(self, room_id, event_ids): """ Get the state groups for the given list of event_ids The return value is a dict mapping group names to lists of events. """ - def f(txn): - groups = set() - for event_id in event_ids: - group = self._simple_select_one_onecol_txn( - txn, - table="event_to_state_groups", - keyvalues={"event_id": event_id}, - retcol="state_group", - allow_none=True, - ) - if group: - groups.add(group) - - res = {} - for group in groups: - state_ids = self._simple_select_onecol_txn( - txn, - table="state_groups_state", - keyvalues={"state_group": group}, - retcol="event_id", - ) - - res[group] = state_ids + event_and_groups = yield defer.gatherResults( + [ + self._get_state_group_for_event( + room_id, event_id, + ).addCallback(lambda group, event_id: (event_id, group), event_id) + for event_id in event_ids + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) - return res + groups = set(group for _, group in event_and_groups if group) - states = yield self.runInteraction( - "get_state_groups", - f, - ) - - state_list = yield defer.gatherResults( + group_to_state = yield defer.gatherResults( [ - self._fetch_events_for_group(group, vals) - for group, vals in states.items() + self._get_state_for_group( + group, + ).addCallback(lambda state_dict, group: (group, state_dict), group) + for group in groups ], consumeErrors=True, - ) + ).addErrback(unwrapFirstError) - defer.returnValue(dict(state_list)) + defer.returnValue({ + group: state_map.values() + for group, state_map in group_to_state + }) @cached(num_args=1) def _fetch_events_for_group(self, key, events): @@ -207,16 +193,25 @@ class StateStore(SQLBaseStore): events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) - @cached(num_args=3, lru=True) - def _get_state_groups_from_group(self, room_id, group, types): + @cached(num_args=2, lru=True, max_entries=10000) + def _get_state_groups_from_group(self, group, types): def f(txn): + if types is not None: + where_clause = "AND (%s)" % ( + " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), + ) + else: + where_clause = "" + sql = ( "SELECT event_id FROM state_groups_state WHERE" - " room_id = ? AND state_group = ? AND (%s)" - ) % (" OR ".join(["(type = ? AND state_key = ?)"] * len(types)),) + " state_group = ? %s" + ) % (where_clause,) + + args = [group] + if types is not None: + args.extend([i for typ in types for i in typ]) - args = [room_id, group] - args.extend([i for typ in types for i in typ]) txn.execute(sql, args) return group, [ @@ -229,7 +224,7 @@ class StateStore(SQLBaseStore): f, ) - @cached(num_args=3, lru=True, max_entries=100000) + @cached(num_args=3, lru=True, max_entries=20000) def _get_state_for_event_id(self, room_id, event_id, types): def f(txn): type_and_state_sql = " OR ".join([ @@ -280,40 +275,33 @@ class StateStore(SQLBaseStore): deferred: A list of dicts corresponding to the event_ids given. The dicts are mappings from (type, state_key) -> state_events """ - set_types = frozenset(types) - res = yield defer.gatherResults( + event_and_groups = yield defer.gatherResults( [ - self._get_state_for_event_id( - room_id, event_id, set_types, - ) + self._get_state_group_for_event( + room_id, event_id, + ).addCallback(lambda group, event_id: (event_id, group), event_id) for event_id in event_ids ], consumeErrors=True, ).addErrback(unwrapFirstError) - event_to_state_ids = dict(res) + groups = set(group for _, group in event_and_groups) - event_dict = yield self._get_events( + res = yield defer.gatherResults( [ - item - for lst in event_to_state_ids.values() - for item in lst + self._get_state_for_group( + group, types + ).addCallback(lambda state_dict, group: (group, state_dict), group) + for group in groups ], - get_prev_content=False - ).addCallback( - lambda evs: {ev.event_id: ev for ev in evs} - ) + consumeErrors=True, + ).addErrback(unwrapFirstError) + + group_to_state = dict(res) event_to_state = { - event_id: { - (ev.type, ev.state_key): ev - for ev in [ - event_dict[state_id] - for state_id in state_ids - if state_id in event_dict - ] - } - for event_id, state_ids in event_to_state_ids.items() + event_id: group_to_state[group] + for event_id, group in event_and_groups } defer.returnValue([ @@ -321,6 +309,79 @@ class StateStore(SQLBaseStore): for event in event_ids ]) + @cached(num_args=2, lru=True, max_entries=100000) + def _get_state_group_for_event(self, room_id, event_id): + return self._simple_select_one_onecol( + table="event_to_state_groups", + keyvalues={ + "event_id": event_id, + }, + retcol="state_group", + allow_none=True, + desc="_get_state_group_for_event", + ) + + @defer.inlineCallbacks + def _get_state_for_group(self, group, types=None): + is_all, state_dict = self._state_group_cache.get(group) + + type_to_key = {} + missing_types = set() + if types is not None: + for typ, state_key in types: + if state_key is None: + type_to_key[typ] = None + missing_types.add((typ, state_key)) + else: + if type_to_key.get(typ, object()) is not None: + type_to_key.setdefault(typ, set()).add(state_key) + + if (typ, state_key) not in state_dict: + missing_types.add((typ, state_key)) + + if is_all and types is None: + defer.returnValue(state_dict) + + if is_all or (types is not None and not missing_types): + def include(typ, state_key): + sentinel = object() + valid_state_keys = type_to_key.get(typ, sentinel) + if valid_state_keys is sentinel: + return False + if valid_state_keys is None: + return True + if state_key in valid_state_keys: + return True + return False + + defer.returnValue({ + k: v + for k, v in state_dict.items() + if include(k[0], k[1]) + }) + + # Okay, so we have some missing_types, lets fetch them. + cache_seq_num = self._state_group_cache.sequence + _, state_ids = yield self._get_state_groups_from_group( + group, + frozenset(types) if types else None + ) + state_events = yield self._get_events(state_ids, get_prev_content=False) + state_dict = { + (e.type, e.state_key): e + for e in state_events + } + + # Update the cache + self._state_group_cache.update( + cache_seq_num, + key=group, + value=state_dict, + full=(types is None), + ) + + defer.returnValue(state_dict) + def _make_group_id(clock): return str(int(clock.time_msec())) + random_string(5) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index af45fc5619..9db259d5fc 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -300,8 +300,7 @@ class StreamStore(SQLBaseStore): defer.returnValue((events, token)) @defer.inlineCallbacks - def get_recent_events_for_room(self, room_id, limit, end_token, - with_feedback=False, from_token=None): + def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): # TODO (erikj): Handle compressed feedback end_token = RoomStreamToken.parse_stream_token(end_token) diff --git a/synapse/util/dictionary_cache.py b/synapse/util/dictionary_cache.py index 0877cc79f6..38b131677c 100644 --- a/synapse/util/dictionary_cache.py +++ b/synapse/util/dictionary_cache.py @@ -16,6 +16,10 @@ from synapse.util.lrucache import LruCache from collections import namedtuple import threading +import logging + + +logger = logging.getLogger(__name__) DictionaryEntry = namedtuple("DictionaryEntry", ("full", "value")) @@ -47,21 +51,25 @@ class DictionaryCache(object): ) def get(self, key, dict_keys=None): - entry = self.cache.get(key, self.sentinel) - if entry is not self.sentinel: - # cache_counter.inc_hits(self.name) - - if dict_keys is None: - return DictionaryEntry(entry.full, dict(entry.value)) - else: - return DictionaryEntry(entry.full, { - k: entry.value[k] - for k in dict_keys - if k in entry.value - }) - - # cache_counter.inc_misses(self.name) - return DictionaryEntry(False, {}) + try: + entry = self.cache.get(key, self.sentinel) + if entry is not self.sentinel: + # cache_counter.inc_hits(self.name) + + if dict_keys is None: + return DictionaryEntry(entry.full, dict(entry.value)) + else: + return DictionaryEntry(entry.full, { + k: entry.value[k] + for k in dict_keys + if k in entry.value + }) + + # cache_counter.inc_misses(self.name) + return DictionaryEntry(False, {}) + except: + logger.exception("get failed") + raise def invalidate(self, key): self.check_thread() @@ -77,14 +85,18 @@ class DictionaryCache(object): self.cache.clear() def update(self, sequence, key, value, full=False): - self.check_thread() - if self.sequence == sequence: - # Only update the cache if the caches sequence number matches the - # number that the cache had before the SELECT was started (SYN-369) - if full: - self._insert(key, value) - else: - self._update_or_insert(key, value) + try: + self.check_thread() + if self.sequence == sequence: + # Only update the cache if the caches sequence number matches the + # number that the cache had before the SELECT was started (SYN-369) + if full: + self._insert(key, value) + else: + self._update_or_insert(key, value) + except: + logger.exception("update failed") + raise def _update_or_insert(self, key, value): entry = self.cache.setdefault(key, DictionaryEntry(False, {})) diff --git a/tests/test_state.py b/tests/test_state.py index fea25f7021..5845358754 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -69,7 +69,7 @@ class StateGroupStore(object): self._next_group = 1 - def get_state_groups(self, event_ids): + def get_state_groups(self, room_id, event_ids): groups = {} for event_id in event_ids: group = self._event_to_state_group.get(event_id) -- cgit 1.5.1 From ffdb8c382860ce2e351614a91c2ce07a91c61455 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Aug 2015 18:13:48 +0100 Subject: Don't be too enthusiatic with defer.gatherResults --- synapse/handlers/message.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 765b14d994..11c736f727 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -405,10 +405,14 @@ class MessageHandler(BaseHandler): except: logger.exception("Failed to get snapshot") - yield defer.gatherResults( - [handle_room(e) for e in room_list], - consumeErrors=True - ).addErrback(unwrapFirstError) + # Only do N rooms at once + n = 5 + d_list = [handle_room(e) for e in room_list] + for ds in [d_list[i:i + n] for i in range(0, len(d_list), n)]: + yield defer.gatherResults( + ds, + consumeErrors=True + ).addErrback(unwrapFirstError) ret = { "rooms": rooms_ret, -- cgit 1.5.1 From 1b994a97dd201d0f122a416f28dbbf1136304412 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Aug 2015 10:41:40 +0100 Subject: Fix application of ACLs --- synapse/handlers/federation.py | 11 +++++------ synapse/handlers/message.py | 16 ++++++++++++---- synapse/handlers/sync.py | 17 +++++++++++++---- synapse/storage/state.py | 6 +++--- 4 files changed, 33 insertions(+), 17 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 90649af9e1..2bfd0a40e0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -229,7 +229,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): - states = yield self.store.get_state_for_events( + event_to_state = yield self.store.get_state_for_events( room_id, frozenset(e.event_id for e in events), types=( (EventTypes.RoomHistoryVisibility, ""), @@ -237,8 +237,6 @@ class FederationHandler(BaseHandler): ) ) - events_and_states = zip(events, states) - def redact_disallowed(event_and_state): event, state = event_and_state @@ -275,9 +273,10 @@ class FederationHandler(BaseHandler): return event - res = map(redact_disallowed, events_and_states) - - logger.info("_filter_events_for_server %r", res) + res = map(redact_disallowed, [ + (e, event_to_state[e.event_id]) + for e in events + ]) defer.returnValue(res) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 11c736f727..95a8f05c05 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -137,7 +137,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_client(self, user_id, room_id, events): - states = yield self.store.get_state_for_events( + event_id_to_state = yield self.store.get_state_for_events( room_id, frozenset(e.event_id for e in events), types=( (EventTypes.RoomHistoryVisibility, ""), @@ -145,7 +145,8 @@ class MessageHandler(BaseHandler): ) ) - events_and_states = zip(events, states) + for ev, state in event_id_to_state.items(): + logger.info("event_id: %r, state: %r", ev, state) def allowed(event_and_state): event, state = event_and_state @@ -179,10 +180,17 @@ class MessageHandler(BaseHandler): return True - events_and_states = filter(allowed, events_and_states) + event_and_state = filter( + allowed, + [ + (e, event_id_to_state[e.event_id]) + for e in events + ] + ) + defer.returnValue([ ev - for ev, _ in events_and_states + for ev, _ in event_and_state ]) @defer.inlineCallbacks diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 8f58774b31..9a97bff840 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -294,7 +294,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_client(self, user_id, room_id, events): - states = yield self.store.get_state_for_events( + event_id_to_state = yield self.store.get_state_for_events( room_id, frozenset(e.event_id for e in events), types=( (EventTypes.RoomHistoryVisibility, ""), @@ -302,7 +302,8 @@ class SyncHandler(BaseHandler): ) ) - events_and_states = zip(events, states) + for ev, state in event_id_to_state.items(): + logger.info("event_id: %r, state: %r", ev, state) def allowed(event_and_state): event, state = event_and_state @@ -335,10 +336,18 @@ class SyncHandler(BaseHandler): return membership == Membership.INVITE return True - events_and_states = filter(allowed, events_and_states) + + event_and_state = filter( + allowed, + [ + (e, event_id_to_state[e.event_id]) + for e in events + ] + ) + defer.returnValue([ ev - for ev, _ in events_and_states + for ev, _ in event_and_state ]) @defer.inlineCallbacks diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 19b16ed404..a438530071 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -239,10 +239,10 @@ class StateStore(SQLBaseStore): for event_id, group in event_to_groups.items() } - defer.returnValue([ - event_to_state[event] + defer.returnValue({ + event: event_to_state[event] for event in event_ids - ]) + }) @cached(num_args=2, lru=True, max_entries=100000) def _get_state_group_for_event(self, room_id, event_id): -- cgit 1.5.1 From dc8399ee0059bb4ee93fb7c755bc36ade16230a8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Aug 2015 11:30:59 +0100 Subject: Remove debug loggers --- synapse/handlers/message.py | 3 --- synapse/handlers/sync.py | 3 --- 2 files changed, 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 95a8f05c05..b941312eff 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -145,9 +145,6 @@ class MessageHandler(BaseHandler): ) ) - for ev, state in event_id_to_state.items(): - logger.info("event_id: %r, state: %r", ev, state) - def allowed(event_and_state): event, state = event_and_state diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9a97bff840..d960078e7a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -302,9 +302,6 @@ class SyncHandler(BaseHandler): ) ) - for ev, state in event_id_to_state.items(): - logger.info("event_id: %r, state: %r", ev, state) - def allowed(event_and_state): event, state = event_and_state -- cgit 1.5.1 From 415c2f05491ce65a4fc34326519754cd1edd9c54 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Wed, 12 Aug 2015 15:49:37 +0100 Subject: Simplify LoginHander and AuthHandler * Merge LoginHandler -> AuthHandler * Add a bunch of documentation * Improve some naming * Remove unused branches I will start merging the actual logic of the two handlers shortly --- synapse/handlers/__init__.py | 2 - synapse/handlers/auth.py | 90 +++++++++++++++++++++++++------- synapse/handlers/login.py | 83 ----------------------------- synapse/handlers/register.py | 10 ++-- synapse/push/pusherpool.py | 11 ++-- synapse/rest/client/v1/login.py | 5 +- synapse/rest/client/v2_alpha/account.py | 8 ++- synapse/rest/client/v2_alpha/register.py | 3 +- synapse/storage/registration.py | 12 ++--- 9 files changed, 93 insertions(+), 131 deletions(-) delete mode 100644 synapse/handlers/login.py (limited to 'synapse/handlers') diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index dc5b6ef79d..8725c3c420 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -22,7 +22,6 @@ from .room import ( from .message import MessageHandler from .events import EventStreamHandler, EventHandler from .federation import FederationHandler -from .login import LoginHandler from .profile import ProfileHandler from .presence import PresenceHandler from .directory import DirectoryHandler @@ -54,7 +53,6 @@ class Handlers(object): self.profile_handler = ProfileHandler(hs) self.presence_handler = PresenceHandler(hs) self.room_list_handler = RoomListHandler(hs) - self.login_handler = LoginHandler(hs) self.directory_handler = DirectoryHandler(hs) self.typing_notification_handler = TypingNotificationHandler(hs) self.admin_handler = AdminHandler(hs) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 1ecf7fef17..1504b00d7e 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -47,17 +47,24 @@ class AuthHandler(BaseHandler): self.sessions = {} @defer.inlineCallbacks - def check_auth(self, flows, clientdict, clientip=None): + def check_auth(self, flows, clientdict, clientip): """ Takes a dictionary sent by the client in the login / registration protocol and handles the login flow. + As a side effect, this function fills in the 'creds' key on the user's + session with a map, which maps each auth-type (str) to the relevant + identity authenticated by that auth-type (mostly str, but for captcha, bool). + Args: - flows: list of list of stages - authdict: The dictionary from the client root level, not the - 'auth' key: this method prompts for auth if none is sent. + flows (list): A list of login flows. Each flow is an ordered list of + strings representing auth-types. At least one full + flow must be completed in order for auth to be successful. + clientdict: The dictionary from the client root level, not the + 'auth' key: this method prompts for auth if none is sent. + clientip (str): The IP address of the client. Returns: - A tuple of authed, dict, dict where authed is true if the client + A tuple of (authed, dict, dict) where authed is true if the client has successfully completed an auth flow. If it is true, the first dict contains the authenticated credentials of each stage. @@ -75,7 +82,7 @@ class AuthHandler(BaseHandler): del clientdict['auth'] if 'session' in authdict: sid = authdict['session'] - sess = self._get_session_info(sid) + session = self._get_session_info(sid) if len(clientdict) > 0: # This was designed to allow the client to omit the parameters @@ -87,20 +94,19 @@ class AuthHandler(BaseHandler): # on a home server. # Revisit: Assumimg the REST APIs do sensible validation, the data # isn't arbintrary. - sess['clientdict'] = clientdict - self._save_session(sess) - pass - elif 'clientdict' in sess: - clientdict = sess['clientdict'] + session['clientdict'] = clientdict + self._save_session(session) + elif 'clientdict' in session: + clientdict = session['clientdict'] if not authdict: defer.returnValue( - (False, self._auth_dict_for_flows(flows, sess), clientdict) + (False, self._auth_dict_for_flows(flows, session), clientdict) ) - if 'creds' not in sess: - sess['creds'] = {} - creds = sess['creds'] + if 'creds' not in session: + session['creds'] = {} + creds = session['creds'] # check auth type currently being presented if 'type' in authdict: @@ -109,15 +115,15 @@ class AuthHandler(BaseHandler): result = yield self.checkers[authdict['type']](authdict, clientip) if result: creds[authdict['type']] = result - self._save_session(sess) + self._save_session(session) for f in flows: if len(set(f) - set(creds.keys())) == 0: logger.info("Auth completed with creds: %r", creds) - self._remove_session(sess) + self._remove_session(session) defer.returnValue((True, creds, clientdict)) - ret = self._auth_dict_for_flows(flows, sess) + ret = self._auth_dict_for_flows(flows, session) ret['completed'] = creds.keys() defer.returnValue((False, ret, clientdict)) @@ -270,6 +276,54 @@ class AuthHandler(BaseHandler): return self.sessions[session_id] + @defer.inlineCallbacks + def login_with_password(self, user_id, password): + """ + Authenticates the user with their username and password. + + Used only by the v1 login API. + + Args: + user_id (str): User ID + password (str): Password + Returns: + The access token for the user's session. + Raises: + StoreError if there was a problem storing the token. + LoginError if there was an authentication problem. + """ + user_info = yield self.store.get_user_by_id(user_id=user_id) + if not user_info: + logger.warn("Attempted to login as %s but they do not exist", user_id) + raise LoginError(403, "", errcode=Codes.FORBIDDEN) + + stored_hash = user_info["password_hash"] + if not bcrypt.checkpw(password, stored_hash): + logger.warn("Failed password login for user %s", user_id) + raise LoginError(403, "", errcode=Codes.FORBIDDEN) + + reg_handler = self.hs.get_handlers().registration_handler + access_token = reg_handler.generate_token(user_id) + logger.info("Adding token %s for user %s", access_token, user_id) + yield self.store.add_access_token_to_user(user_id, access_token) + defer.returnValue(access_token) + + @defer.inlineCallbacks + def set_password(self, user_id, newpassword): + password_hash = bcrypt.hashpw(newpassword, bcrypt.gensalt()) + + yield self.store.user_set_password_hash(user_id, password_hash) + yield self.store.user_delete_access_tokens(user_id) + yield self.hs.get_pusherpool().remove_pushers_by_user(user_id) + yield self.store.flush_user(user_id) + + @defer.inlineCallbacks + def add_threepid(self, user_id, medium, address, validated_at): + yield self.store.user_add_threepid( + user_id, medium, address, validated_at, + self.hs.get_clock().time_msec() + ) + def _save_session(self, session): # TODO: Persistent storage logger.debug("Saving session %s", session) diff --git a/synapse/handlers/login.py b/synapse/handlers/login.py deleted file mode 100644 index 91d87d503d..0000000000 --- a/synapse/handlers/login.py +++ /dev/null @@ -1,83 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from twisted.internet import defer - -from ._base import BaseHandler -from synapse.api.errors import LoginError, Codes - -import bcrypt -import logging - -logger = logging.getLogger(__name__) - - -class LoginHandler(BaseHandler): - - def __init__(self, hs): - super(LoginHandler, self).__init__(hs) - self.hs = hs - - @defer.inlineCallbacks - def login(self, user, password): - """Login as the specified user with the specified password. - - Args: - user (str): The user ID. - password (str): The password. - Returns: - The newly allocated access token. - Raises: - StoreError if there was a problem storing the token. - LoginError if there was an authentication problem. - """ - # TODO do this better, it can't go in __init__ else it cyclic loops - if not hasattr(self, "reg_handler"): - self.reg_handler = self.hs.get_handlers().registration_handler - - # pull out the hash for this user if they exist - user_info = yield self.store.get_user_by_id(user_id=user) - if not user_info: - logger.warn("Attempted to login as %s but they do not exist", user) - raise LoginError(403, "", errcode=Codes.FORBIDDEN) - - stored_hash = user_info["password_hash"] - if bcrypt.checkpw(password, stored_hash): - # generate an access token and store it. - token = self.reg_handler._generate_token(user) - logger.info("Adding token %s for user %s", token, user) - yield self.store.add_access_token_to_user(user, token) - defer.returnValue(token) - else: - logger.warn("Failed password login for user %s", user) - raise LoginError(403, "", errcode=Codes.FORBIDDEN) - - @defer.inlineCallbacks - def set_password(self, user_id, newpassword, token_id=None): - password_hash = bcrypt.hashpw(newpassword, bcrypt.gensalt()) - - yield self.store.user_set_password_hash(user_id, password_hash) - yield self.store.user_delete_access_tokens_apart_from(user_id, token_id) - yield self.hs.get_pusherpool().remove_pushers_by_user_access_token( - user_id, token_id - ) - yield self.store.flush_user(user_id) - - @defer.inlineCallbacks - def add_threepid(self, user_id, medium, address, validated_at): - yield self.store.user_add_threepid( - user_id, medium, address, validated_at, - self.hs.get_clock().time_msec() - ) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index f81d75017d..39392d9fdd 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -91,7 +91,7 @@ class RegistrationHandler(BaseHandler): user = UserID(localpart, self.hs.hostname) user_id = user.to_string() - token = self._generate_token(user_id) + token = self.generate_token(user_id) yield self.store.register( user_id=user_id, token=token, @@ -111,7 +111,7 @@ class RegistrationHandler(BaseHandler): user_id = user.to_string() yield self.check_user_id_is_valid(user_id) - token = self._generate_token(user_id) + token = self.generate_token(user_id) yield self.store.register( user_id=user_id, token=token, @@ -161,7 +161,7 @@ class RegistrationHandler(BaseHandler): 400, "Invalid user localpart for this application service.", errcode=Codes.EXCLUSIVE ) - token = self._generate_token(user_id) + token = self.generate_token(user_id) yield self.store.register( user_id=user_id, token=token, @@ -208,7 +208,7 @@ class RegistrationHandler(BaseHandler): user_id = user.to_string() yield self.check_user_id_is_valid(user_id) - token = self._generate_token(user_id) + token = self.generate_token(user_id) try: yield self.store.register( user_id=user_id, @@ -273,7 +273,7 @@ class RegistrationHandler(BaseHandler): errcode=Codes.EXCLUSIVE ) - def _generate_token(self, user_id): + def generate_token(self, user_id): # urlsafe variant uses _ and - so use . as the separator and replace # all =s with .s so http clients don't quote =s when it is used as # query params. diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 0ab2f65972..e012c565ee 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -94,17 +94,14 @@ class PusherPool: self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) @defer.inlineCallbacks - def remove_pushers_by_user_access_token(self, user_id, not_access_token_id): + def remove_pushers_by_user(self, user_id): all = yield self.store.get_all_pushers() logger.info( - "Removing all pushers for user %s except access token %s", - user_id, not_access_token_id + "Removing all pushers for user %s", + user_id, ) for p in all: - if ( - p['user_name'] == user_id and - p['access_token'] != not_access_token_id - ): + if p['user_name'] == user_id: logger.info( "Removing pusher for app id %s, pushkey %s, user %s", p['app_id'], p['pushkey'], p['user_name'] diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 998d4d44c6..694072693d 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -78,9 +78,8 @@ class LoginRestServlet(ClientV1RestServlet): login_submission["user"] = UserID.create( login_submission["user"], self.hs.hostname).to_string() - handler = self.handlers.login_handler - token = yield handler.login( - user=login_submission["user"], + token = yield self.handlers.auth_handler.login_with_password( + user_id=login_submission["user"], password=login_submission["password"]) result = { diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index b082140f1f..897c54b539 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -36,7 +36,6 @@ class PasswordRestServlet(RestServlet): self.hs = hs self.auth = hs.get_auth() self.auth_handler = hs.get_handlers().auth_handler - self.login_handler = hs.get_handlers().login_handler @defer.inlineCallbacks def on_POST(self, request): @@ -47,7 +46,7 @@ class PasswordRestServlet(RestServlet): authed, result, params = yield self.auth_handler.check_auth([ [LoginType.PASSWORD], [LoginType.EMAIL_IDENTITY] - ], body) + ], body, self.hs.get_ip_from_request(request)) if not authed: defer.returnValue((401, result)) @@ -79,7 +78,7 @@ class PasswordRestServlet(RestServlet): raise SynapseError(400, "", Codes.MISSING_PARAM) new_password = params['new_password'] - yield self.login_handler.set_password( + yield self.auth_handler.set_password( user_id, new_password, None ) @@ -95,7 +94,6 @@ class ThreepidRestServlet(RestServlet): def __init__(self, hs): super(ThreepidRestServlet, self).__init__() self.hs = hs - self.login_handler = hs.get_handlers().login_handler self.identity_handler = hs.get_handlers().identity_handler self.auth = hs.get_auth() @@ -135,7 +133,7 @@ class ThreepidRestServlet(RestServlet): logger.warn("Couldn't add 3pid: invalid response from ID sevrer") raise SynapseError(500, "Invalid response from ID Server") - yield self.login_handler.add_threepid( + yield self.auth_handler.add_threepid( auth_user.to_string(), threepid['medium'], threepid['address'], diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index b5926f9ca6..012c447e88 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -50,7 +50,6 @@ class RegisterRestServlet(RestServlet): self.auth_handler = hs.get_handlers().auth_handler self.registration_handler = hs.get_handlers().registration_handler self.identity_handler = hs.get_handlers().identity_handler - self.login_handler = hs.get_handlers().login_handler @defer.inlineCallbacks def on_POST(self, request): @@ -143,7 +142,7 @@ class RegisterRestServlet(RestServlet): if reqd not in threepid: logger.info("Can't add incomplete 3pid") else: - yield self.login_handler.add_threepid( + yield self.auth_handler.add_threepid( user_id, threepid['medium'], threepid['address'], diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 4eaa088b36..d2d5b07cb3 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -111,16 +111,16 @@ class RegistrationStore(SQLBaseStore): }) @defer.inlineCallbacks - def user_delete_access_tokens_apart_from(self, user_id, token_id): + def user_delete_access_tokens(self, user_id): yield self.runInteraction( - "user_delete_access_tokens_apart_from", - self._user_delete_access_tokens_apart_from, user_id, token_id + "user_delete_access_tokens", + self._user_delete_access_tokens, user_id ) - def _user_delete_access_tokens_apart_from(self, txn, user_id, token_id): + def _user_delete_access_tokens(self, txn, user_id): txn.execute( - "DELETE FROM access_tokens WHERE user_id = ? AND id != ?", - (user_id, token_id) + "DELETE FROM access_tokens WHERE user_id = ?", + (user_id, ) ) @defer.inlineCallbacks -- cgit 1.5.1 From f7e2f981ea1feb8461a5bddd9378bd5084833fc0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Aug 2015 16:01:10 +0100 Subject: Use list comprehension instead of filter --- synapse/handlers/message.py | 13 +++---------- synapse/handlers/sync.py | 13 +++---------- 2 files changed, 6 insertions(+), 20 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index b941312eff..2c4af8dc97 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -177,17 +177,10 @@ class MessageHandler(BaseHandler): return True - event_and_state = filter( - allowed, - [ - (e, event_id_to_state[e.event_id]) - for e in events - ] - ) - defer.returnValue([ - ev - for ev, _ in event_and_state + event + for event in events + if allowed(event, event_id_to_state[event.event_id]) ]) @defer.inlineCallbacks diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d960078e7a..ec8d78ba8c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -334,17 +334,10 @@ class SyncHandler(BaseHandler): return True - event_and_state = filter( - allowed, - [ - (e, event_id_to_state[e.event_id]) - for e in events - ] - ) - defer.returnValue([ - ev - for ev, _ in event_and_state + event + for event in events + if allowed(event, event_id_to_state[event.event_id]) ]) @defer.inlineCallbacks -- cgit 1.5.1 From a7eeb34c64d828539dd6799f2347371a8eabae73 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Aug 2015 16:02:05 +0100 Subject: Simplify staggered deferred lists --- synapse/handlers/message.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 2c4af8dc97..8a9e6cf6ca 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -406,9 +406,9 @@ class MessageHandler(BaseHandler): # Only do N rooms at once n = 5 d_list = [handle_room(e) for e in room_list] - for ds in [d_list[i:i + n] for i in range(0, len(d_list), n)]: + for i in range(0, len(d_list), n): yield defer.gatherResults( - ds, + d_list[i:i + n], consumeErrors=True ).addErrback(unwrapFirstError) -- cgit 1.5.1 From 5ce903e2f7a4136b67ec16564e24244b00118367 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Wed, 12 Aug 2015 16:09:19 +0100 Subject: Merge password checking implementations --- synapse/handlers/auth.py | 35 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 20 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 1504b00d7e..98d99dd0a8 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -157,22 +157,13 @@ class AuthHandler(BaseHandler): if "user" not in authdict or "password" not in authdict: raise LoginError(400, "", Codes.MISSING_PARAM) - user = authdict["user"] + user_id = authdict["user"] password = authdict["password"] - if not user.startswith('@'): - user = UserID.create(user, self.hs.hostname).to_string() + if not user_id.startswith('@'): + user_id = UserID.create(user_id, self.hs.hostname).to_string() - user_info = yield self.store.get_user_by_id(user_id=user) - if not user_info: - logger.warn("Attempted to login as %s but they do not exist", user) - raise LoginError(401, "", errcode=Codes.UNAUTHORIZED) - - stored_hash = user_info["password_hash"] - if bcrypt.checkpw(password, stored_hash): - defer.returnValue(user) - else: - logger.warn("Failed password login for user %s", user) - raise LoginError(401, "", errcode=Codes.UNAUTHORIZED) + self._check_password(user_id, password) + defer.returnValue(user_id) @defer.inlineCallbacks def _check_recaptcha(self, authdict, clientip): @@ -292,6 +283,16 @@ class AuthHandler(BaseHandler): StoreError if there was a problem storing the token. LoginError if there was an authentication problem. """ + self._check_password(user_id, password) + + reg_handler = self.hs.get_handlers().registration_handler + access_token = reg_handler.generate_token(user_id) + logger.info("Adding token %s for user %s", access_token, user_id) + yield self.store.add_access_token_to_user(user_id, access_token) + defer.returnValue(access_token) + + def _check_password(self, user_id, password): + """Checks that user_id has passed password, raises LoginError if not.""" user_info = yield self.store.get_user_by_id(user_id=user_id) if not user_info: logger.warn("Attempted to login as %s but they do not exist", user_id) @@ -302,12 +303,6 @@ class AuthHandler(BaseHandler): logger.warn("Failed password login for user %s", user_id) raise LoginError(403, "", errcode=Codes.FORBIDDEN) - reg_handler = self.hs.get_handlers().registration_handler - access_token = reg_handler.generate_token(user_id) - logger.info("Adding token %s for user %s", access_token, user_id) - yield self.store.add_access_token_to_user(user_id, access_token) - defer.returnValue(access_token) - @defer.inlineCallbacks def set_password(self, user_id, newpassword): password_hash = bcrypt.hashpw(newpassword, bcrypt.gensalt()) -- cgit 1.5.1 From 7b0e7970800df8cedf7966e6fb3837ee233d9ea4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Aug 2015 17:05:24 +0100 Subject: Fix _filter_events_for_client --- synapse/handlers/message.py | 4 +--- synapse/handlers/sync.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8a9e6cf6ca..29e81085d1 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -145,9 +145,7 @@ class MessageHandler(BaseHandler): ) ) - def allowed(event_and_state): - event, state = event_and_state - + def allowed(event, state): if event.type == EventTypes.RoomHistoryVisibility: return True diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ec8d78ba8c..7206ae23d7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -302,9 +302,7 @@ class SyncHandler(BaseHandler): ) ) - def allowed(event_and_state): - event, state = event_and_state - + def allowed(event, state): if event.type == EventTypes.RoomHistoryVisibility: return True -- cgit 1.5.1 From 7e77a82c5f4c2dbda1d00dace74a1aece2ab5b78 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 13 Aug 2015 16:58:10 +0100 Subject: Re-enable receipts --- synapse/handlers/receipts.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 415dd339f6..86c911c4bf 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -171,7 +171,6 @@ class ReceiptEventSource(object): @defer.inlineCallbacks def get_new_events_for_user(self, user, from_key, limit): - defer.returnValue(([], from_key)) from_key = int(from_key) to_key = yield self.get_current_key() @@ -194,7 +193,6 @@ class ReceiptEventSource(object): @defer.inlineCallbacks def get_pagination_rows(self, user, config, key): to_key = int(config.from_key) - defer.returnValue(([], to_key)) if config.to_key: from_key = int(config.to_key) -- cgit 1.5.1 From 9f7f228ec2e2948c69ca3910d27fffdd2c2fea50 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 13 Aug 2015 17:20:59 +0100 Subject: Remove pointless map --- synapse/handlers/federation.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2bfd0a40e0..1e3dccf5a8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -237,9 +237,7 @@ class FederationHandler(BaseHandler): ) ) - def redact_disallowed(event_and_state): - event, state = event_and_state - + def redact_disallowed(event, state): if not state: return event @@ -273,13 +271,11 @@ class FederationHandler(BaseHandler): return event - res = map(redact_disallowed, [ - (e, event_to_state[e.event_id]) + defer.returnValue([ + redact_disallowed(e, event_to_state[e.event_id]) for e in events ]) - defer.returnValue(res) - @log_function @defer.inlineCallbacks def backfill(self, dest, room_id, limit, extremities=[]): -- cgit 1.5.1 From 1a9510bb84d79f6ff78d32390195bc97ed9a439e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 17 Aug 2015 10:40:23 +0100 Subject: Implement a batched presence_handler.get_state and use it --- synapse/handlers/message.py | 18 +++++-------- synapse/handlers/presence.py | 63 ++++++++++++++++++++++++++++++++++++++++++++ synapse/storage/presence.py | 6 +++-- 3 files changed, 73 insertions(+), 14 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 29e81085d1..f12465fa2c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -460,20 +460,14 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_presence(): - presence_defs = yield defer.DeferredList( - [ - presence_handler.get_state( - target_user=UserID.from_string(m.user_id), - auth_user=auth_user, - as_event=True, - check_auth=False, - ) - for m in room_members - ], - consumeErrors=True, + states = yield presence_handler.get_states( + target_users=[UserID.from_string(m.user_id) for m in room_members], + auth_user=auth_user, + as_event=True, + check_auth=False, ) - defer.returnValue([p for success, p in presence_defs if success]) + defer.returnValue(states.values()) receipts_handler = self.hs.get_handlers().receipts_handler diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 341a516da2..33d76efe08 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -232,6 +232,69 @@ class PresenceHandler(BaseHandler): else: defer.returnValue(state) + @defer.inlineCallbacks + def get_states(self, target_users, auth_user, as_event=False, check_auth=True): + try: + local_users, remote_users = partitionbool( + target_users, + lambda u: self.hs.is_mine(u) + ) + + if check_auth: + for u in local_users: + visible = yield self.is_presence_visible( + observer_user=auth_user, + observed_user=u + ) + + if not visible: + raise SynapseError(404, "Presence information not visible") + + results = {} + if local_users: + for u in local_users: + if u in self._user_cachemap: + results[u] = self._user_cachemap[u].get_state() + + local_to_user = {u.localpart: u for u in local_users} + + states = yield self.store.get_presence_states( + [u.localpart for u in local_users if u not in results] + ) + + for local_part, state in states.items(): + res = {"presence": state["state"]} + if "status_msg" in state and state["status_msg"]: + res["status_msg"] = state["status_msg"] + results[local_to_user[local_part]] = res + + for u in remote_users: + # TODO(paul): Have remote server send us permissions set + results[u] = self._get_or_offline_usercache(u).get_state() + + for state in results.values(): + if "last_active" in state: + state["last_active_ago"] = int( + self.clock.time_msec() - state.pop("last_active") + ) + + if as_event: + for user, state in results.items(): + content = state + content["user_id"] = user.to_string() + + if "last_active" in content: + content["last_active_ago"] = int( + self._clock.time_msec() - content.pop("last_active") + ) + + results[user] = {"type": "m.presence", "content": content} + except: + logger.exception(":(") + raise + + defer.returnValue(results) + @defer.inlineCallbacks @log_function def set_state(self, target_user, auth_user, state): diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index f351b76a70..15d98198e2 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -50,13 +50,15 @@ class PresenceStore(SQLBaseStore): def f(txn): results = {} for user_localpart in user_localparts: - results[user_localpart] = self._simple_select_one_txn( + res = self._simple_select_one_txn( txn, table="presence", keyvalues={"user_id": user_localpart}, retcols=["state", "status_msg", "mtime"], - desc="get_presence_state", + allow_none=True, ) + if res: + results[user_localpart] = res return results -- cgit 1.5.1 From f72ed6c6a353bad4a54cb695eae12d39fd41ad24 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Aug 2015 10:29:49 +0100 Subject: Remove debug try/catch --- synapse/handlers/presence.py | 90 +++++++++++++++++++++----------------------- 1 file changed, 43 insertions(+), 47 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 33d76efe08..b7664e30f8 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -234,64 +234,60 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def get_states(self, target_users, auth_user, as_event=False, check_auth=True): - try: - local_users, remote_users = partitionbool( - target_users, - lambda u: self.hs.is_mine(u) - ) + local_users, remote_users = partitionbool( + target_users, + lambda u: self.hs.is_mine(u) + ) - if check_auth: - for u in local_users: - visible = yield self.is_presence_visible( - observer_user=auth_user, - observed_user=u - ) + if check_auth: + for u in local_users: + visible = yield self.is_presence_visible( + observer_user=auth_user, + observed_user=u + ) - if not visible: - raise SynapseError(404, "Presence information not visible") + if not visible: + raise SynapseError(404, "Presence information not visible") - results = {} - if local_users: - for u in local_users: - if u in self._user_cachemap: - results[u] = self._user_cachemap[u].get_state() + results = {} + if local_users: + for u in local_users: + if u in self._user_cachemap: + results[u] = self._user_cachemap[u].get_state() - local_to_user = {u.localpart: u for u in local_users} + local_to_user = {u.localpart: u for u in local_users} - states = yield self.store.get_presence_states( - [u.localpart for u in local_users if u not in results] - ) + states = yield self.store.get_presence_states( + [u.localpart for u in local_users if u not in results] + ) - for local_part, state in states.items(): - res = {"presence": state["state"]} - if "status_msg" in state and state["status_msg"]: - res["status_msg"] = state["status_msg"] - results[local_to_user[local_part]] = res + for local_part, state in states.items(): + res = {"presence": state["state"]} + if "status_msg" in state and state["status_msg"]: + res["status_msg"] = state["status_msg"] + results[local_to_user[local_part]] = res - for u in remote_users: - # TODO(paul): Have remote server send us permissions set - results[u] = self._get_or_offline_usercache(u).get_state() + for u in remote_users: + # TODO(paul): Have remote server send us permissions set + results[u] = self._get_or_offline_usercache(u).get_state() - for state in results.values(): - if "last_active" in state: - state["last_active_ago"] = int( - self.clock.time_msec() - state.pop("last_active") - ) + for state in results.values(): + if "last_active" in state: + state["last_active_ago"] = int( + self.clock.time_msec() - state.pop("last_active") + ) - if as_event: - for user, state in results.items(): - content = state - content["user_id"] = user.to_string() + if as_event: + for user, state in results.items(): + content = state + content["user_id"] = user.to_string() - if "last_active" in content: - content["last_active_ago"] = int( - self._clock.time_msec() - content.pop("last_active") - ) + if "last_active" in content: + content["last_active_ago"] = int( + self._clock.time_msec() - content.pop("last_active") + ) - results[user] = {"type": "m.presence", "content": content} - except: - logger.exception(":(") - raise + results[user] = {"type": "m.presence", "content": content} defer.returnValue(results) -- cgit 1.5.1 From 776ee6d92b8672c36723b0b8dc9ae3467f34c08f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Aug 2015 10:30:07 +0100 Subject: Doc strings --- synapse/handlers/presence.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index b7664e30f8..1177cbe51b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -192,6 +192,20 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def get_state(self, target_user, auth_user, as_event=False, check_auth=True): + """Get the current presence state of the given user. + + Args: + target_user (UserID): The user whose presence we want + auth_user (UserID): The user requesting the presence, used for + checking if said user is allowed to see the persence of the + `target_user` + as_event (bool): Format the return as an event or not? + check_auth (bool): Perform the auth checks or not? + + Returns: + dict: The presence state of the `target_user`, whose format depends + on the `as_event` argument. + """ if self.hs.is_mine(target_user): if check_auth: visible = yield self.is_presence_visible( @@ -234,6 +248,20 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def get_states(self, target_users, auth_user, as_event=False, check_auth=True): + """A batched version of the `get_state` method that accepts a list of + `target_users` + + Args: + target_users (list): The list of UserID's whose presence we want + auth_user (UserID): The user requesting the presence, used for + checking if said user is allowed to see the persence of the + `target_users` + as_event (bool): Format the return as an event or not? + check_auth (bool): Perform the auth checks or not? + + Returns: + dict: A mapping from user -> presence_state + """ local_users, remote_users = partitionbool( target_users, lambda u: self.hs.is_mine(u) -- cgit 1.5.1 From 83eb627b5a50ef7cd8803f6c6fae9c5f5271bcb1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Aug 2015 10:33:11 +0100 Subject: More helpful variable names --- synapse/handlers/presence.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1177cbe51b..2b103b48bb 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -268,10 +268,10 @@ class PresenceHandler(BaseHandler): ) if check_auth: - for u in local_users: + for user in local_users: visible = yield self.is_presence_visible( observer_user=auth_user, - observed_user=u + observed_user=user ) if not visible: @@ -279,9 +279,9 @@ class PresenceHandler(BaseHandler): results = {} if local_users: - for u in local_users: - if u in self._user_cachemap: - results[u] = self._user_cachemap[u].get_state() + for user in local_users: + if user in self._user_cachemap: + results[user] = self._user_cachemap[user].get_state() local_to_user = {u.localpart: u for u in local_users} @@ -295,9 +295,9 @@ class PresenceHandler(BaseHandler): res["status_msg"] = state["status_msg"] results[local_to_user[local_part]] = res - for u in remote_users: + for user in remote_users: # TODO(paul): Have remote server send us permissions set - results[u] = self._get_or_offline_usercache(u).get_state() + results[user] = self._get_or_offline_usercache(user).get_state() for state in results.values(): if "last_active" in state: -- cgit 1.5.1 From e55291ce5ec0c65c54363ef9366c2357df2ee44f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Aug 2015 11:17:37 +0100 Subject: None check --- synapse/handlers/presence.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 2b103b48bb..748432959e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -290,6 +290,8 @@ class PresenceHandler(BaseHandler): ) for local_part, state in states.items(): + if stat is None: + continue res = {"presence": state["state"]} if "status_msg" in state and state["status_msg"]: res["status_msg"] = state["status_msg"] -- cgit 1.5.1 From 0d4abf77773ca0af73422aff1a35bb73c9235e1f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Aug 2015 11:19:08 +0100 Subject: Typo --- synapse/handlers/presence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 748432959e..e91e81831e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -290,7 +290,7 @@ class PresenceHandler(BaseHandler): ) for local_part, state in states.items(): - if stat is None: + if state is None: continue res = {"presence": state["state"]} if "status_msg" in state and state["status_msg"]: -- cgit 1.5.1 From abc6986a24e7f843ffdfc1610833feb96462d5a8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 19 Aug 2015 09:30:52 +0100 Subject: Fix regression where we incorrectly responded with a 200 to /login --- synapse/handlers/auth.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 98d99dd0a8..4947c40519 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -283,7 +283,7 @@ class AuthHandler(BaseHandler): StoreError if there was a problem storing the token. LoginError if there was an authentication problem. """ - self._check_password(user_id, password) + yield self._check_password(user_id, password) reg_handler = self.hs.get_handlers().registration_handler access_token = reg_handler.generate_token(user_id) @@ -291,6 +291,7 @@ class AuthHandler(BaseHandler): yield self.store.add_access_token_to_user(user_id, access_token) defer.returnValue(access_token) + @defer.inlineCallbacks def _check_password(self, user_id, password): """Checks that user_id has passed password, raises LoginError if not.""" user_info = yield self.store.get_user_by_id(user_id=user_id) -- cgit 1.5.1 From 40da1f200d6a7778acc76562fcecbbcd3f97eee3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 19 Aug 2015 09:41:07 +0100 Subject: Remove an access token log line --- synapse/handlers/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 4947c40519..be2baeaece 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -287,7 +287,7 @@ class AuthHandler(BaseHandler): reg_handler = self.hs.get_handlers().registration_handler access_token = reg_handler.generate_token(user_id) - logger.info("Adding token %s for user %s", access_token, user_id) + logger.info("Logging in user %s", user_id) yield self.store.add_access_token_to_user(user_id, access_token) defer.returnValue(access_token) -- cgit 1.5.1 From d7272f8d9d0ce3ac9a4095969453efef5aecce40 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 19 Aug 2015 12:03:09 +0100 Subject: Add canonical alias to the default power levels --- synapse/api/constants.py | 1 + synapse/handlers/room.py | 1 + 2 files changed, 2 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 7156ee4e7d..60a0d336da 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -76,6 +76,7 @@ class EventTypes(object): Feedback = "m.room.message.feedback" RoomHistoryVisibility = "m.room.history_visibility" + CanonicalAlias = "m.room.canonical_alias" # These are used for validation Message = "m.room.message" diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 7511d294f3..c56112a92a 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -250,6 +250,7 @@ class RoomCreationHandler(BaseHandler): EventTypes.Name: 100, EventTypes.PowerLevels: 100, EventTypes.RoomHistoryVisibility: 100, + EventTypes.CanonicalAlias: 100, }, "events_default": 0, "state_default": 50, -- cgit 1.5.1 From daa01842f889a8d93a33d7e11cddc1b72700810e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 19 Aug 2015 13:46:03 +0100 Subject: Don't get apservice interested rooms in RoomHandler.get_joined_rooms_for_users --- synapse/handlers/events.py | 10 +++++++++- synapse/handlers/room.py | 10 ++-------- synapse/handlers/sync.py | 24 +++++++++++++++++++++--- 3 files changed, 32 insertions(+), 12 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 993d33ba47..f9ca2f8634 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -70,7 +70,15 @@ class EventStreamHandler(BaseHandler): self._streams_per_user[auth_user] += 1 rm_handler = self.hs.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user(auth_user) + + app_service = yield self.store.get_app_service_by_user_id( + auth_user.to_string() + ) + if app_service: + rooms = yield self.store.get_app_service_rooms(app_service) + room_ids = set(r.room_id for r in rooms) + else: + room_ids = yield rm_handler.get_joined_rooms_for_user(auth_user) if timeout: # If they've set a timeout set a minimum limit. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 7511d294f3..82c16013a3 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -557,15 +557,9 @@ class RoomMemberHandler(BaseHandler): """Returns a list of roomids that the user has any of the given membership states in.""" - app_service = yield self.store.get_app_service_by_user_id( - user.to_string() + rooms = yield self.store.get_rooms_for_user( + user.to_string(), ) - if app_service: - rooms = yield self.store.get_app_service_rooms(app_service) - else: - rooms = yield self.store.get_rooms_for_user( - user.to_string(), - ) # For some reason the list of events contains duplicates # TODO(paul): work out why because I really don't think it should diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 7206ae23d7..353a416054 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -96,9 +96,18 @@ class SyncHandler(BaseHandler): return self.current_sync_for_user(sync_config, since_token) rm_handler = self.hs.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user( - sync_config.user + + app_service = yield self.store.get_app_service_by_user_id( + sync_config.user.to_string() ) + if app_service: + rooms = yield self.store.get_app_service_rooms(app_service) + room_ids = set(r.room_id for r in rooms) + else: + room_ids = yield rm_handler.get_joined_rooms_for_user( + sync_config.user + ) + result = yield self.notifier.wait_for_events( sync_config.user, room_ids, sync_config.filter, timeout, current_sync_callback @@ -229,7 +238,16 @@ class SyncHandler(BaseHandler): logger.debug("Typing %r", typing_by_room) rm_handler = self.hs.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user(sync_config.user) + app_service = yield self.store.get_app_service_by_user_id( + sync_config.user.to_string() + ) + if app_service: + rooms = yield self.store.get_app_service_rooms(app_service) + room_ids = set(r.room_id for r in rooms) + else: + room_ids = yield rm_handler.get_joined_rooms_for_user( + sync_config.user + ) # TODO (mjark): Does public mean "published"? published_rooms = yield self.store.get_rooms(is_public=True) -- cgit 1.5.1 From aadb2238c9647186711933666851def5e37a8dbf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 20 Aug 2015 09:55:04 +0100 Subject: Check that the canonical room alias actually points to the room --- synapse/handlers/_base.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index d6c064b398..e91f1129db 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError, SynapseError from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.api.constants import Membership, EventTypes -from synapse.types import UserID +from synapse.types import UserID, RoomAlias from synapse.util.logcontext import PreserveLoggingContext @@ -130,6 +130,22 @@ class BaseHandler(object): returned_invite.signatures ) + if event.type == EventTypes.CanonicalAlias: + # Check the alias is acually valid (at this time at least) + room_alias_str = event.content.get("alias", None) + if room_alias_str: + room_alias = RoomAlias.from_string(room_alias_str) + directory_handler = self.hs.get_handlers().directory_handler + mapping = yield directory_handler.get_association(room_alias) + + if mapping["room_id"] != event.room_id: + raise SynapseError( + 400, + "Room alias %s does not point to the room" % ( + room_alias_str, + ) + ) + destinations = set(extra_destinations) for k, s in context.current_state.items(): try: -- cgit 1.5.1 From 9b63def3887779c7c9a1aeadd2d16df506155953 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 20 Aug 2015 14:35:40 +0100 Subject: Add m.room.avatar to default power levels. Change default required power levels of such events to 50 --- synapse/api/constants.py | 1 + synapse/handlers/room.py | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 60a0d336da..1423986c1e 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -77,6 +77,7 @@ class EventTypes(object): RoomHistoryVisibility = "m.room.history_visibility" CanonicalAlias = "m.room.canonical_alias" + RoomAvatar = "m.room.avatar" # These are used for validation Message = "m.room.message" diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 8108c2763d..c5d1001b50 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -247,10 +247,11 @@ class RoomCreationHandler(BaseHandler): }, "users_default": 0, "events": { - EventTypes.Name: 100, + EventTypes.Name: 50, EventTypes.PowerLevels: 100, EventTypes.RoomHistoryVisibility: 100, - EventTypes.CanonicalAlias: 100, + EventTypes.CanonicalAlias: 50, + EventTypes.RoomAvatar: 50, }, "events_default": 0, "state_default": 50, -- cgit 1.5.1 From ca0d28ef34022874ebc9168146df53a10bcb925e Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 20 Aug 2015 15:35:14 +0100 Subject: Another use of check_password that got missed in the yield fix --- synapse/handlers/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index be2baeaece..ff2c66f442 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -162,7 +162,7 @@ class AuthHandler(BaseHandler): if not user_id.startswith('@'): user_id = UserID.create(user_id, self.hs.hostname).to_string() - self._check_password(user_id, password) + yield self._check_password(user_id, password) defer.returnValue(user_id) @defer.inlineCallbacks -- cgit 1.5.1 From 3e9ee62db0bfb3991254f7152b70491cbda385b1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 21 Aug 2015 09:15:13 +0100 Subject: Add missing param in store.get_state_groups invocation --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1e3dccf5a8..4ff20599d6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -874,7 +874,7 @@ class FederationHandler(BaseHandler): raise AuthError(403, "Host not in room.") state_groups = yield self.store.get_state_groups( - [event_id] + room_id, [event_id] ) if state_groups: -- cgit 1.5.1 From aa3c9c7bd0736bca1b3626c87535192b89431583 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 21 Aug 2015 10:57:47 +0100 Subject: Don't allow people to register user ids which only differ by case to an existing one --- synapse/handlers/register.py | 4 ++-- synapse/storage/registration.py | 11 +++++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 39392d9fdd..86390a3671 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -57,8 +57,8 @@ class RegistrationHandler(BaseHandler): yield self.check_user_id_is_valid(user_id) - u = yield self.store.get_user_by_id(user_id) - if u: + users = yield self.store.get_users_by_id_case_insensitive(user_id) + if users: raise SynapseError( 400, "User ID already taken.", diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index bf803f2c6e..25adecaf6d 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -98,6 +98,17 @@ class RegistrationStore(SQLBaseStore): allow_none=True, ) + def get_users_by_id_case_insensitive(self, user_id): + def f(txn): + sql = ( + "SELECT name, password_hash FROM users" + " WHERE name = lower(?)" + ) + txn.execute(sql, (user_id,)) + return self.cursor_to_dict(txn) + + return self.runInteraction("get_users_by_id_case_insensitive", f) + @defer.inlineCallbacks def user_set_password_hash(self, user_id, password_hash): """ -- cgit 1.5.1 From 42f12ad92f5bc372569f15ffc81e9cf8146d2ac6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 21 Aug 2015 11:34:43 +0100 Subject: When logging in fetch user by user_id case insensitively, *unless* there are multiple case insensitive matches, in which case require the exact user_id --- synapse/handlers/auth.py | 31 +++++++++++++++++++++++-------- synapse/rest/client/v1/login.py | 5 +++-- synapse/storage/registration.py | 7 +++++-- 3 files changed, 31 insertions(+), 12 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index ff2c66f442..058a0f416d 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -162,7 +162,8 @@ class AuthHandler(BaseHandler): if not user_id.startswith('@'): user_id = UserID.create(user_id, self.hs.hostname).to_string() - yield self._check_password(user_id, password) + user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id) + self._check_password(user_id, password, password_hash) defer.returnValue(user_id) @defer.inlineCallbacks @@ -283,23 +284,37 @@ class AuthHandler(BaseHandler): StoreError if there was a problem storing the token. LoginError if there was an authentication problem. """ - yield self._check_password(user_id, password) + user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id) + self._check_password(user_id, password, password_hash) reg_handler = self.hs.get_handlers().registration_handler access_token = reg_handler.generate_token(user_id) logger.info("Logging in user %s", user_id) yield self.store.add_access_token_to_user(user_id, access_token) - defer.returnValue(access_token) + defer.returnValue((user_id, access_token)) @defer.inlineCallbacks - def _check_password(self, user_id, password): - """Checks that user_id has passed password, raises LoginError if not.""" - user_info = yield self.store.get_user_by_id(user_id=user_id) - if not user_info: + def _find_user_id_and_pwd_hash(self, user_id): + user_infos = yield self.store.get_users_by_id_case_insensitive(user_id) + if not user_infos: logger.warn("Attempted to login as %s but they do not exist", user_id) raise LoginError(403, "", errcode=Codes.FORBIDDEN) - stored_hash = user_info["password_hash"] + if len(user_infos) > 1: + if user_id not in user_infos: + logger.warn( + "Attempted to login as %s but it matches more than one user " + "inexactly: %r", + user_id, user_infos.keys() + ) + raise LoginError(403, "", errcode=Codes.FORBIDDEN) + + defer.returnValue((user_id, user_infos[user_id])) + else: + defer.returnValue(user_infos.popitem()) + + def _check_password(self, user_id, password, stored_hash): + """Checks that user_id has passed password, raises LoginError if not.""" if not bcrypt.checkpw(password, stored_hash): logger.warn("Failed password login for user %s", user_id) raise LoginError(403, "", errcode=Codes.FORBIDDEN) diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 0d5eafd0fa..2444f27366 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -83,9 +83,10 @@ class LoginRestServlet(ClientV1RestServlet): if not user_id.startswith('@'): user_id = UserID.create( - user_id, self.hs.hostname).to_string() + user_id, self.hs.hostname + ).to_string() - token = yield self.handlers.auth_handler.login_with_password( + user_id, token = yield self.handlers.auth_handler.login_with_password( user_id=user_id, password=login_submission["password"]) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 25adecaf6d..586628579d 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -99,13 +99,16 @@ class RegistrationStore(SQLBaseStore): ) def get_users_by_id_case_insensitive(self, user_id): + """Gets users that match user_id case insensitively. + Returns a mapping of user_id -> password_hash. + """ def f(txn): sql = ( "SELECT name, password_hash FROM users" - " WHERE name = lower(?)" + " WHERE lower(name) = lower(?)" ) txn.execute(sql, (user_id,)) - return self.cursor_to_dict(txn) + return dict(txn.fetchall()) return self.runInteraction("get_users_by_id_case_insensitive", f) -- cgit 1.5.1 From fd5ad0f00ec963e9722d9f5bbe526dc84038e408 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 21 Aug 2015 11:45:43 +0100 Subject: Doc string --- synapse/handlers/auth.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 058a0f416d..602c5bcd89 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -295,6 +295,12 @@ class AuthHandler(BaseHandler): @defer.inlineCallbacks def _find_user_id_and_pwd_hash(self, user_id): + """Checks to see if a user with the given id exists. Will check case + insensitively, but will throw if there are multiple inexact matches. + + Returns: + tuple: A 2-tuple of `(canonical_user_id, password_hash)` + """ user_infos = yield self.store.get_users_by_id_case_insensitive(user_id) if not user_infos: logger.warn("Attempted to login as %s but they do not exist", user_id) -- cgit 1.5.1 From f8f3d72e2b6e5b38bb6ab8e057ede454d77d114f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Aug 2015 16:19:43 +0100 Subject: Don't make pushers handle presence/typing events --- synapse/handlers/events.py | 10 ++++++++-- synapse/notifier.py | 7 ++++++- synapse/push/__init__.py | 8 +++++--- 3 files changed, 19 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index f9ca2f8634..891502c04f 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -49,7 +49,12 @@ class EventStreamHandler(BaseHandler): @defer.inlineCallbacks @log_function def get_stream(self, auth_user_id, pagin_config, timeout=0, - as_client_event=True, affect_presence=True): + as_client_event=True, affect_presence=True, + only_room_events=False): + """Fetches the events stream for a given user. + + If `only_room_events` is `True` only room events will be returned. + """ auth_user = UserID.from_string(auth_user_id) try: @@ -89,7 +94,8 @@ class EventStreamHandler(BaseHandler): timeout = random.randint(int(timeout*0.9), int(timeout*1.1)) events, tokens = yield self.notifier.get_events_for( - auth_user, room_ids, pagin_config, timeout + auth_user, room_ids, pagin_config, timeout, + only_room_events=only_room_events ) time_now = self.clock.time_msec() diff --git a/synapse/notifier.py b/synapse/notifier.py index dbd8efe9fb..f998fc83bf 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -328,10 +328,13 @@ class Notifier(object): defer.returnValue(result) @defer.inlineCallbacks - def get_events_for(self, user, rooms, pagination_config, timeout): + def get_events_for(self, user, rooms, pagination_config, timeout, + only_room_events=False): """ For the given user and rooms, return any new events for them. If there are no new events wait for up to `timeout` milliseconds for any new events to happen before returning. + + If `only_room_events` is `True` only room events will be returned. """ from_token = pagination_config.from_token if not from_token: @@ -352,6 +355,8 @@ class Notifier(object): after_id = getattr(after_token, keyname) if before_id == after_id: continue + if only_room_events and name != "room": + continue new_events, new_key = yield source.get_new_events_for_user( user, getattr(from_token, keyname), limit, ) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 13002e0db4..f1952b5a0f 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -249,7 +249,9 @@ class Pusher(object): # we fail to dispatch the push) config = PaginationConfig(from_token=None, limit='1') chunk = yield self.evStreamHandler.get_stream( - self.user_name, config, timeout=0) + self.user_name, config, timeout=0, affect_presence=False, + only_room_events=True + ) self.last_token = chunk['end'] self.store.update_pusher_last_token( self.app_id, self.pushkey, self.user_name, self.last_token @@ -280,8 +282,8 @@ class Pusher(object): config = PaginationConfig(from_token=from_tok, limit='1') timeout = (300 + random.randint(-60, 60)) * 1000 chunk = yield self.evStreamHandler.get_stream( - self.user_name, config, - timeout=timeout, affect_presence=False + self.user_name, config, timeout=timeout, affect_presence=False, + only_room_events=True ) # limiting to 1 may get 1 event plus 1 presence event, so -- cgit 1.5.1 From 51c53369a318262ecc3adc3777be8b838509d19d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Aug 2015 16:38:20 +0100 Subject: Do auth checks *before* persisting the event --- synapse/handlers/_base.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index e91f1129db..cb992143f5 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -107,6 +107,22 @@ class BaseHandler(object): if not suppress_auth: self.auth.check(event, auth_events=context.current_state) + if event.type == EventTypes.CanonicalAlias: + # Check the alias is acually valid (at this time at least) + room_alias_str = event.content.get("alias", None) + if room_alias_str: + room_alias = RoomAlias.from_string(room_alias_str) + directory_handler = self.hs.get_handlers().directory_handler + mapping = yield directory_handler.get_association(room_alias) + + if mapping["room_id"] != event.room_id: + raise SynapseError( + 400, + "Room alias %s does not point to the room" % ( + room_alias_str, + ) + ) + (event_stream_id, max_stream_id) = yield self.store.persist_event( event, context=context ) @@ -130,22 +146,6 @@ class BaseHandler(object): returned_invite.signatures ) - if event.type == EventTypes.CanonicalAlias: - # Check the alias is acually valid (at this time at least) - room_alias_str = event.content.get("alias", None) - if room_alias_str: - room_alias = RoomAlias.from_string(room_alias_str) - directory_handler = self.hs.get_handlers().directory_handler - mapping = yield directory_handler.get_association(room_alias) - - if mapping["room_id"] != event.room_id: - raise SynapseError( - 400, - "Room alias %s does not point to the room" % ( - room_alias_str, - ) - ) - destinations = set(extra_destinations) for k, s in context.current_state.items(): try: -- cgit 1.5.1 From f4d552589e3cb815144dea646140db66d845a237 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 26 Aug 2015 10:51:08 +0100 Subject: Don't loop over all rooms ever in typing.get_new_events_for_user --- synapse/handlers/typing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 026bd2b9d4..1ed220d871 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -260,8 +260,8 @@ class TypingNotificationEventSource(object): ) events = [] - for room_id in handler._room_serials: - if room_id not in joined_room_ids: + for room_id in joined_room_ids: + if room_id not in handler._room_serials: continue if handler._room_serials[room_id] <= from_key: continue -- cgit 1.5.1 From da51acf0e752badb73c036f4b1cf0ec943b6dcb1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 26 Aug 2015 11:08:23 +0100 Subject: Remove needless existence checks --- synapse/handlers/typing.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 1ed220d871..d7096aab8c 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -204,15 +204,11 @@ class TypingNotificationHandler(BaseHandler): ) def _push_update_local(self, room_id, user, typing): - if room_id not in self._room_serials: - self._room_serials[room_id] = 0 - self._room_typing[room_id] = set() - - room_set = self._room_typing[room_id] + room_set = self._room_typing.setdefault(room_id, set()) if typing: room_set.add(user) - elif user in room_set: - room_set.remove(user) + else: + room_set.discard(user) self._latest_room_serial += 1 self._room_serials[room_id] = self._latest_room_serial -- cgit 1.5.1