From 975903ae1735f690a387c10b1ffd1a9384353213 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 Jan 2016 10:41:30 +0000 Subject: Sanitize filters --- synapse/api/filtering.py | 48 +++++++++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 21 deletions(-) (limited to 'synapse/api') diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index c7f021d1ff..5530b8c48f 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -28,14 +28,14 @@ class Filtering(object): return result def add_user_filter(self, user_localpart, user_filter): - self._check_valid_filter(user_filter) + self.check_valid_filter(user_filter) return self.store.add_user_filter(user_localpart, user_filter) # TODO(paul): surely we should probably add a delete_user_filter or # replace_user_filter at some point? There's no REST API specified for # them however - def _check_valid_filter(self, user_filter_json): + def check_valid_filter(self, user_filter_json): """Check if the provided filter is valid. This inspects all definitions contained within the filter. @@ -129,52 +129,55 @@ class Filtering(object): class FilterCollection(object): def __init__(self, filter_json): - self.filter_json = filter_json + self._filter_json = filter_json - room_filter_json = self.filter_json.get("room", {}) + room_filter_json = self._filter_json.get("room", {}) - self.room_filter = Filter({ + self._room_filter = Filter({ k: v for k, v in room_filter_json.items() if k in ("rooms", "not_rooms") }) - self.room_timeline_filter = Filter(room_filter_json.get("timeline", {})) - self.room_state_filter = Filter(room_filter_json.get("state", {})) - self.room_ephemeral_filter = Filter(room_filter_json.get("ephemeral", {})) - self.room_account_data = Filter(room_filter_json.get("account_data", {})) - self.presence_filter = Filter(self.filter_json.get("presence", {})) - self.account_data = Filter(self.filter_json.get("account_data", {})) + self._room_timeline_filter = Filter(room_filter_json.get("timeline", {})) + self._room_state_filter = Filter(room_filter_json.get("state", {})) + self._room_ephemeral_filter = Filter(room_filter_json.get("ephemeral", {})) + self._room_account_data = Filter(room_filter_json.get("account_data", {})) + self._presence_filter = Filter(filter_json.get("presence", {})) + self._account_data = Filter(filter_json.get("account_data", {})) - self.include_leave = self.filter_json.get("room", {}).get( + self.include_leave = filter_json.get("room", {}).get( "include_leave", False ) + def get_filter_json(self): + return self._filter_json + def timeline_limit(self): - return self.room_timeline_filter.limit() + return self._room_timeline_filter.limit() def presence_limit(self): - return self.presence_filter.limit() + return self._presence_filter.limit() def ephemeral_limit(self): - return self.room_ephemeral_filter.limit() + return self._room_ephemeral_filter.limit() def filter_presence(self, events): - return self.presence_filter.filter(events) + return self._presence_filter.filter(events) def filter_account_data(self, events): - return self.account_data.filter(events) + return self._account_data.filter(events) def filter_room_state(self, events): - return self.room_state_filter.filter(self.room_filter.filter(events)) + return self._room_state_filter.filter(self._room_filter.filter(events)) def filter_room_timeline(self, events): - return self.room_timeline_filter.filter(self.room_filter.filter(events)) + return self._room_timeline_filter.filter(self._room_filter.filter(events)) def filter_room_ephemeral(self, events): - return self.room_ephemeral_filter.filter(self.room_filter.filter(events)) + return self._room_ephemeral_filter.filter(self._room_filter.filter(events)) def filter_room_account_data(self, events): - return self.room_account_data.filter(self.room_filter.filter(events)) + return self._room_account_data.filter(self._room_filter.filter(events)) class Filter(object): @@ -258,3 +261,6 @@ def _matches_wildcard(actual_value, filter_value): return actual_value.startswith(type_prefix) else: return actual_value == filter_value + + +DEFAULT_FILTER_COLLECTION = FilterCollection({}) -- cgit 1.5.1 From 4021f95261ebdcca0ec2c3c91e8dd442a85c5ed4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 25 Jan 2016 10:10:44 +0000 Subject: Move logic from rest/ to handlers/ --- synapse/api/filtering.py | 22 ++-- synapse/handlers/sync.py | 189 +++++++++++++++++++++++++++-------- synapse/rest/client/v2_alpha/sync.py | 57 +++++------ 3 files changed, 181 insertions(+), 87 deletions(-) (limited to 'synapse/api') diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 5530b8c48f..116060ee7f 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -190,18 +190,16 @@ class Filter(object): Returns: bool: True if the event matches """ - if isinstance(event, dict): - return self.check_fields( - event.get("room_id", None), - event.get("sender", None), - event.get("type", None), - ) - else: - return self.check_fields( - getattr(event, "room_id", None), - getattr(event, "sender", None), - event.type, - ) + sender = event.get("sender", None) + if not sender: + # Presence events have their 'sender' in content.user_id + sender = event.get("conntent", {}).get("user_id", None) + + return self.check_fields( + event.get("room_id", None), + sender, + event.get("type", None), + ) def check_fields(self, room_id, sender, event_type): """Checks whether the filter matches the given event fields. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 53e1eb0508..9b5b4d2c9f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -17,6 +17,7 @@ from ._base import BaseHandler from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes +from synapse.api.filtering import DEFAULT_FILTER_COLLECTION from synapse.util import unwrapFirstError from twisted.internet import defer @@ -29,7 +30,7 @@ logger = logging.getLogger(__name__) SyncConfig = collections.namedtuple("SyncConfig", [ "user", - "filter", + "filter_collection", "is_guest", ]) @@ -129,6 +130,11 @@ class SyncHandler(BaseHandler): self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() + @defer.inlineCallbacks + def get_sync_for_user(self, sync_config, since_token=None, timeout=0, + filter_collection=DEFAULT_FILTER_COLLECTION): + pass + @defer.inlineCallbacks def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): @@ -142,8 +148,9 @@ class SyncHandler(BaseHandler): if timeout == 0 or since_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. - result = yield self.current_sync_for_user(sync_config, since_token, - full_state=full_state) + result = yield self.current_sync_for_user( + sync_config, since_token, full_state=full_state, + ) defer.returnValue(result) else: def current_sync_callback(before_token, after_token): @@ -151,7 +158,7 @@ class SyncHandler(BaseHandler): result = yield self.notifier.wait_for_events( sync_config.user.to_string(), timeout, current_sync_callback, - from_token=since_token + from_token=since_token, ) defer.returnValue(result) @@ -205,7 +212,7 @@ class SyncHandler(BaseHandler): ) membership_list = (Membership.INVITE, Membership.JOIN) - if sync_config.filter.include_leave: + if sync_config.filter_collection.include_leave: membership_list += (Membership.LEAVE, Membership.BAN) room_list = yield self.store.get_rooms_for_user_where_membership_is( @@ -266,9 +273,17 @@ class SyncHandler(BaseHandler): deferreds, consumeErrors=True ).addErrback(unwrapFirstError) + account_data_for_user = sync_config.filter_collection.filter_account_data( + self.account_data_for_user(account_data) + ) + + presence = sync_config.filter_collection.filter_presence( + presence + ) + defer.returnValue(SyncResult( presence=presence, - account_data=self.account_data_for_user(account_data), + account_data=account_data_for_user, joined=joined, invited=invited, archived=archived, @@ -302,14 +317,31 @@ class SyncHandler(BaseHandler): current_state = yield self.get_state_at(room_id, now_token) + current_state = { + (e.type, e.state_key): e + for e in sync_config.filter_collection.filter_room_state( + current_state.values() + ) + } + + account_data = self.account_data_for_room( + room_id, tags_by_room, account_data_by_room + ) + + account_data = sync_config.filter_collection.filter_room_account_data( + account_data + ) + + ephemeral = sync_config.filter_collection.filter_room_ephemeral( + ephemeral_by_room.get(room_id, []) + ) + defer.returnValue(JoinedSyncResult( room_id=room_id, timeline=batch, state=current_state, - ephemeral=ephemeral_by_room.get(room_id, []), - account_data=self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ), + ephemeral=ephemeral, + account_data=account_data, unread_notifications=unread_notifications, )) @@ -365,7 +397,7 @@ class SyncHandler(BaseHandler): typing, typing_key = yield typing_source.get_new_events( user=sync_config.user, from_key=typing_key, - limit=sync_config.filter.ephemeral_limit(), + limit=sync_config.filter_collection.ephemeral_limit(), room_ids=room_ids, is_guest=sync_config.is_guest, ) @@ -388,7 +420,7 @@ class SyncHandler(BaseHandler): receipts, receipt_key = yield receipt_source.get_new_events( user=sync_config.user, from_key=receipt_key, - limit=sync_config.filter.ephemeral_limit(), + limit=sync_config.filter_collection.ephemeral_limit(), room_ids=room_ids, is_guest=sync_config.is_guest, ) @@ -419,13 +451,26 @@ class SyncHandler(BaseHandler): leave_state = yield self.store.get_state_for_event(leave_event_id) + leave_state = { + (e.type, e.state_key): e + for e in sync_config.filter_collection.filter_room_state( + leave_state.values() + ) + } + + account_data = self.account_data_for_room( + room_id, tags_by_room, account_data_by_room + ) + + account_data = sync_config.filter_collection.filter_room_account_data( + account_data + ) + defer.returnValue(ArchivedSyncResult( room_id=room_id, timeline=batch, state=leave_state, - account_data=self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ), + account_data=account_data, )) @defer.inlineCallbacks @@ -444,7 +489,7 @@ class SyncHandler(BaseHandler): presence, presence_key = yield presence_source.get_new_events( user=sync_config.user, from_key=since_token.presence_key, - limit=sync_config.filter.presence_limit(), + limit=sync_config.filter_collection.presence_limit(), room_ids=room_ids, is_guest=sync_config.is_guest, ) @@ -473,7 +518,7 @@ class SyncHandler(BaseHandler): sync_config.user ) - timeline_limit = sync_config.filter.timeline_limit() + timeline_limit = sync_config.filter_collection.timeline_limit() room_events, _ = yield self.store.get_room_events_stream( sync_config.user.to_string(), @@ -538,6 +583,27 @@ class SyncHandler(BaseHandler): # the timeline is inherently limited if we've just joined limited = True + recents = sync_config.filter_collection.filter_room_timeline(recents) + + state = { + (e.type, e.state_key): e + for e in sync_config.filter_collection.filter_room_state( + state.values() + ) + } + + acc_data = self.account_data_for_room( + room_id, tags_by_room, account_data_by_room + ) + + acc_data = sync_config.filter_collection.filter_room_account_data( + acc_data + ) + + ephemeral = sync_config.filter_collection.filter_room_ephemeral( + ephemeral_by_room.get(room_id, []) + ) + room_sync = JoinedSyncResult( room_id=room_id, timeline=TimelineBatch( @@ -546,10 +612,8 @@ class SyncHandler(BaseHandler): limited=limited, ), state=state, - ephemeral=ephemeral_by_room.get(room_id, []), - account_data=self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ), + ephemeral=ephemeral, + account_data=acc_data, unread_notifications={}, ) logger.debug("Result for room %s: %r", room_id, room_sync) @@ -603,9 +667,17 @@ class SyncHandler(BaseHandler): for event in invite_events ] + account_data_for_user = sync_config.filter_collection.filter_account_data( + self.account_data_for_user(account_data) + ) + + presence = sync_config.filter_collection.filter_presence( + presence + ) + defer.returnValue(SyncResult( presence=presence, - account_data=self.account_data_for_user(account_data), + account_data=account_data_for_user, joined=joined, invited=invited, archived=archived, @@ -621,7 +693,7 @@ class SyncHandler(BaseHandler): limited = True recents = [] filtering_factor = 2 - timeline_limit = sync_config.filter.timeline_limit() + timeline_limit = sync_config.filter_collection.timeline_limit() load_limit = max(timeline_limit * filtering_factor, 100) max_repeat = 3 # Only try a few times per room, otherwise room_key = now_token.room_key @@ -634,9 +706,9 @@ class SyncHandler(BaseHandler): from_token=since_token.room_key if since_token else None, end_token=end_key, ) - (room_key, _) = keys + room_key, _ = keys end_key = "s" + room_key.split('-')[-1] - loaded_recents = sync_config.filter.filter_room_timeline(events) + loaded_recents = sync_config.filter_collection.filter_room_timeline(events) loaded_recents = yield self._filter_events_for_client( sync_config.user.to_string(), loaded_recents, @@ -684,21 +756,28 @@ class SyncHandler(BaseHandler): logger.debug("Recents %r", batch) - current_state = yield self.get_state_at(room_id, now_token) + if batch.limited: + current_state = yield self.get_state_at(room_id, now_token) - state_at_previous_sync = yield self.get_state_at( - room_id, stream_position=since_token - ) + state_at_previous_sync = yield self.get_state_at( + room_id, stream_position=since_token + ) - state = yield self.compute_state_delta( - since_token=since_token, - previous_state=state_at_previous_sync, - current_state=current_state, - ) + state = yield self.compute_state_delta( + since_token=since_token, + previous_state=state_at_previous_sync, + current_state=current_state, + ) + else: + state = { + (event.type, event.state_key): event + for event in batch.events if event.is_state() + } just_joined = yield self.check_joined_room(sync_config, state) if just_joined: state = yield self.get_state_at(room_id, now_token) + # batch.limited = True notifs = yield self.unread_notifs_for_room_id( room_id, sync_config, all_ephemeral_by_room @@ -711,14 +790,29 @@ class SyncHandler(BaseHandler): 1 for notif in notifs if _action_has_highlight(notif["actions"]) ]) + state = { + (e.type, e.state_key): e + for e in sync_config.filter_collection.filter_room_state(state.values()) + } + + account_data = self.account_data_for_room( + room_id, tags_by_room, account_data_by_room + ) + + account_data = sync_config.filter_collection.filter_room_account_data( + account_data + ) + + ephemeral = sync_config.filter_collection.filter_room_ephemeral( + ephemeral_by_room.get(room_id, []) + ) + room_sync = JoinedSyncResult( room_id=room_id, timeline=batch, state=state, - ephemeral=ephemeral_by_room.get(room_id, []), - account_data=self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ), + ephemeral=ephemeral, + account_data=account_data, unread_notifications=unread_notifications, ) @@ -765,13 +859,26 @@ class SyncHandler(BaseHandler): current_state=state_events_at_leave, ) + state_events_delta = { + (e.type, e.state_key): e + for e in sync_config.filter_collection.filter_room_state( + state_events_delta.values() + ) + } + + account_data = self.account_data_for_room( + leave_event.room_id, tags_by_room, account_data_by_room + ) + + account_data = sync_config.filter_collection.filter_room_account_data( + account_data + ) + room_sync = ArchivedSyncResult( room_id=leave_event.room_id, timeline=batch, state=state_events_delta, - account_data=self.account_data_for_room( - leave_event.room_id, tags_by_room, account_data_by_room - ), + account_data=account_data, ) logger.debug("Room sync: %r", room_sync) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index ab924ad9e0..07b5b5dfd5 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -130,7 +130,7 @@ class SyncRestServlet(RestServlet): sync_config = SyncConfig( user=user, - filter=filter, + filter_collection=filter, is_guest=requester.is_guest, ) @@ -154,23 +154,21 @@ class SyncRestServlet(RestServlet): time_now = self.clock.time_msec() joined = self.encode_joined( - sync_result.joined, filter, time_now, requester.access_token_id + sync_result.joined, time_now, requester.access_token_id ) invited = self.encode_invited( - sync_result.invited, filter, time_now, requester.access_token_id + sync_result.invited, time_now, requester.access_token_id ) archived = self.encode_archived( - sync_result.archived, filter, time_now, requester.access_token_id + sync_result.archived, time_now, requester.access_token_id ) response_content = { - "account_data": self.encode_account_data( - sync_result.account_data, filter, time_now - ), + "account_data": {"events": sync_result.account_data}, "presence": self.encode_presence( - sync_result.presence, filter, time_now + sync_result.presence, time_now ), "rooms": { "join": joined, @@ -182,24 +180,20 @@ class SyncRestServlet(RestServlet): defer.returnValue((200, response_content)) - def encode_presence(self, events, filter, time_now): + def encode_presence(self, events, time_now): formatted = [] for event in events: event = copy.deepcopy(event) event['sender'] = event['content'].pop('user_id') formatted.append(event) - return {"events": filter.filter_presence(formatted)} - - def encode_account_data(self, events, filter, time_now): - return {"events": filter.filter_account_data(events)} + return {"events": formatted} - def encode_joined(self, rooms, filter, time_now, token_id): + def encode_joined(self, rooms, time_now, token_id): """ Encode the joined rooms in a sync result :param list[synapse.handlers.sync.JoinedSyncResult] rooms: list of sync results for rooms this user is joined to - :param FilterCollection filter: filters to apply to the results :param int time_now: current time - used as a baseline for age calculations :param int token_id: ID of the user's auth token - used for namespacing @@ -211,18 +205,17 @@ class SyncRestServlet(RestServlet): joined = {} for room in rooms: joined[room.room_id] = self.encode_room( - room, filter, time_now, token_id + room, time_now, token_id ) return joined - def encode_invited(self, rooms, filter, time_now, token_id): + def encode_invited(self, rooms, time_now, token_id): """ Encode the invited rooms in a sync result :param list[synapse.handlers.sync.InvitedSyncResult] rooms: list of sync results for rooms this user is joined to - :param FilterCollection filter: filters to apply to the results :param int time_now: current time - used as a baseline for age calculations :param int token_id: ID of the user's auth token - used for namespacing @@ -237,7 +230,9 @@ class SyncRestServlet(RestServlet): room.invite, time_now, token_id=token_id, event_format=format_event_for_client_v2_without_room_id, ) - invited_state = invite.get("unsigned", {}).pop("invite_room_state", []) + unsigned = dict(invite.get("unsigned", {})) + invite["unsigned"] = unsigned + invited_state = list(unsigned.pop("invite_room_state", [])) invited_state.append(invite) invited[room.room_id] = { "invite_state": {"events": invited_state} @@ -245,13 +240,12 @@ class SyncRestServlet(RestServlet): return invited - def encode_archived(self, rooms, filter, time_now, token_id): + def encode_archived(self, rooms, time_now, token_id): """ Encode the archived rooms in a sync result :param list[synapse.handlers.sync.ArchivedSyncResult] rooms: list of sync results for rooms this user is joined to - :param FilterCollection filter: filters to apply to the results :param int time_now: current time - used as a baseline for age calculations :param int token_id: ID of the user's auth token - used for namespacing @@ -263,17 +257,16 @@ class SyncRestServlet(RestServlet): joined = {} for room in rooms: joined[room.room_id] = self.encode_room( - room, filter, time_now, token_id, joined=False + room, time_now, token_id, joined=False ) return joined @staticmethod - def encode_room(room, filter, time_now, token_id, joined=True): + def encode_room(room, time_now, token_id, joined=True): """ :param JoinedSyncResult|ArchivedSyncResult room: sync result for a single room - :param FilterCollection filter: filters to apply to the results :param int time_now: current time - used as a baseline for age calculations :param int token_id: ID of the user's auth token - used for namespacing @@ -292,19 +285,17 @@ class SyncRestServlet(RestServlet): ) state_dict = room.state - timeline_events = filter.filter_room_timeline(room.timeline.events) + timeline_events = room.timeline.events state_dict = SyncRestServlet._rollback_state_for_timeline( state_dict, timeline_events) - state_events = filter.filter_room_state(state_dict.values()) + state_events = state_dict.values() serialized_state = [serialize(e) for e in state_events] serialized_timeline = [serialize(e) for e in timeline_events] - account_data = filter.filter_room_account_data( - room.account_data - ) + account_data = room.account_data result = { "timeline": { @@ -317,7 +308,7 @@ class SyncRestServlet(RestServlet): } if joined: - ephemeral_events = filter.filter_room_ephemeral(room.ephemeral) + ephemeral_events = room.ephemeral result["ephemeral"] = {"events": ephemeral_events} result["unread_notifications"] = room.unread_notifications @@ -334,8 +325,6 @@ class SyncRestServlet(RestServlet): :param list[synapse.events.EventBase] timeline: the event timeline :return: updated state dictionary """ - logger.debug("Processing state dict %r; timeline %r", state, - [e.get_dict() for e in timeline]) result = state.copy() @@ -357,8 +346,8 @@ class SyncRestServlet(RestServlet): # the event graph, and the state is no longer valid. Really, # the event shouldn't be in the timeline. We're going to ignore # it for now, however. - logger.warn("Found state event %r in timeline which doesn't " - "match state dictionary", timeline_event) + logger.debug("Found state event %r in timeline which doesn't " + "match state dictionary", timeline_event) continue prev_event_id = timeline_event.unsigned.get("replaces_state", None) -- cgit 1.5.1 From 8c6012a4af4973b0a53af65a31cbdb92a3dec5a2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 25 Jan 2016 13:12:35 +0000 Subject: Fix tests --- synapse/api/filtering.py | 2 +- tests/api/test_filtering.py | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) (limited to 'synapse/api') diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 116060ee7f..6c13ada5df 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -193,7 +193,7 @@ class Filter(object): sender = event.get("sender", None) if not sender: # Presence events have their 'sender' in content.user_id - sender = event.get("conntent", {}).get("user_id", None) + sender = event.get("content", {}).get("user_id", None) return self.check_fields( event.get("room_id", None), diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py index 16ee6bbe6a..1a4e439d30 100644 --- a/tests/api/test_filtering.py +++ b/tests/api/test_filtering.py @@ -13,26 +13,24 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import namedtuple from tests import unittest from twisted.internet import defer -from mock import Mock, NonCallableMock +from mock import Mock from tests.utils import ( MockHttpResource, DeferredMockCallable, setup_test_homeserver ) from synapse.types import UserID -from synapse.api.filtering import FilterCollection, Filter +from synapse.api.filtering import Filter +from synapse.events import FrozenEvent user_localpart = "test_user" # MockEvent = namedtuple("MockEvent", "sender type room_id") def MockEvent(**kwargs): - ev = NonCallableMock(spec_set=kwargs.keys()) - ev.configure_mock(**kwargs) - return ev + return FrozenEvent(kwargs) class FilteringTestCase(unittest.TestCase): -- cgit 1.5.1 From 35981c8b71a2ce675f3b8414ca0e7920a5d1658e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jan 2016 17:19:51 +0000 Subject: Fix test --- synapse/api/filtering.py | 5 +++++ tests/api/test_filtering.py | 7 ++++--- 2 files changed, 9 insertions(+), 3 deletions(-) (limited to 'synapse/api') diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 6c13ada5df..6eff83e5f8 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -15,6 +15,8 @@ from synapse.api.errors import SynapseError from synapse.types import UserID, RoomID +import ujson as json + class Filtering(object): @@ -149,6 +151,9 @@ class FilterCollection(object): "include_leave", False ) + def __repr__(self): + return "" % (json.dumps(self._filter_json),) + def get_filter_json(self): return self._filter_json diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py index 1a4e439d30..ceb0089268 100644 --- a/tests/api/test_filtering.py +++ b/tests/api/test_filtering.py @@ -382,19 +382,20 @@ class FilteringTestCase(unittest.TestCase): "types": ["m.*"] } } - user = UserID.from_string("@" + user_localpart + ":test") + filter_id = yield self.datastore.add_user_filter( - user_localpart=user_localpart, + user_localpart=user_localpart + "2", user_filter=user_filter_json, ) event = MockEvent( + event_id="$asdasd:localhost", sender="@foo:bar", type="custom.avatar.3d.crazy", ) events = [event] user_filter = yield self.filtering.get_user_filter( - user_localpart=user_localpart, + user_localpart=user_localpart + "2", filter_id=filter_id, ) -- cgit 1.5.1 From d83d004ccdb7ace1dcb51b8acf7645bc176b10a5 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Tue, 2 Feb 2016 17:18:50 +0000 Subject: Fix flake8 warnings for new flake8 --- setup.cfg | 1 + synapse/api/auth.py | 2 +- synapse/app/__init__.py | 19 ++++++++++++++++ synapse/app/homeserver.py | 38 +++++++++----------------------- synapse/appservice/api.py | 2 +- synapse/federation/federation_client.py | 2 +- synapse/handlers/_base.py | 2 +- synapse/handlers/directory.py | 4 ++-- synapse/handlers/events.py | 2 +- synapse/handlers/presence.py | 2 +- synapse/handlers/register.py | 2 +- synapse/handlers/room.py | 2 +- synapse/http/matrixfederationclient.py | 2 +- synapse/notifier.py | 2 +- synapse/push/push_rule_evaluator.py | 2 +- synapse/rest/client/v1/login.py | 2 +- synapse/rest/client/v1/pusher.py | 4 ++-- synapse/rest/client/v1/register.py | 3 ++- synapse/rest/client/v2_alpha/register.py | 3 ++- synapse/rest/client/versions.py | 4 +--- synapse/server.py | 2 +- synapse/state.py | 2 +- synapse/storage/__init__.py | 2 +- synapse/storage/_base.py | 7 ++++-- synapse/storage/engines/sqlite3.py | 2 +- synapse/storage/event_federation.py | 2 +- synapse/storage/events.py | 6 ++--- synapse/storage/stream.py | 2 +- synapse/util/__init__.py | 2 +- synapse/util/caches/descriptors.py | 4 ++-- synapse/util/caches/expiringcache.py | 2 +- synapse/util/caches/treecache.py | 2 +- synapse/util/logutils.py | 2 +- synapse/util/ratelimitutils.py | 2 +- 34 files changed, 73 insertions(+), 66 deletions(-) (limited to 'synapse/api') diff --git a/setup.cfg b/setup.cfg index ba027c7d13..e7fc5ffe78 100644 --- a/setup.cfg +++ b/setup.cfg @@ -16,3 +16,4 @@ ignore = [flake8] max-line-length = 90 +ignore = W503 diff --git a/synapse/api/auth.py b/synapse/api/auth.py index b5536e8565..c5a2865e26 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -574,7 +574,7 @@ class Auth(object): raise AuthError( 403, "Application service has not registered this user" - ) + ) defer.returnValue(user_id) @defer.inlineCallbacks diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py index bfebb0f644..1bc4279807 100644 --- a/synapse/app/__init__.py +++ b/synapse/app/__init__.py @@ -12,3 +12,22 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import sys +sys.dont_write_bytecode = True + +from synapse.python_dependencies import ( + check_requirements, MissingRequirementError +) # NOQA + +try: + check_requirements() +except MissingRequirementError as e: + message = "\n".join([ + "Missing Requirement: %s" % (e.message,), + "To install run:", + " pip install --upgrade --force \"%s\"" % (e.dependency,), + "", + ]) + sys.stderr.writelines(message) + sys.exit(1) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e5066c48ef..c3066d6a0d 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -14,27 +14,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +import synapse + +import contextlib +import logging +import os +import re +import resource +import subprocess import sys -from synapse.rest import ClientRestResource +import time -sys.dont_write_bytecode = True from synapse.python_dependencies import ( - check_requirements, DEPENDENCY_LINKS, MissingRequirementError + check_requirements, DEPENDENCY_LINKS ) -if __name__ == '__main__': - try: - check_requirements() - except MissingRequirementError as e: - message = "\n".join([ - "Missing Requirement: %s" % (e.message,), - "To install run:", - " pip install --upgrade --force \"%s\"" % (e.dependency,), - "", - ]) - sys.stderr.writelines(message) - sys.exit(1) - +from synapse.rest import ClientRestResource from synapse.storage.engines import create_engine, IncorrectDatabaseSetup from synapse.storage import are_all_users_on_domain from synapse.storage.prepare_database import UpgradeDatabaseException @@ -73,17 +68,6 @@ from synapse import events from daemonize import Daemonize -import synapse - -import contextlib -import logging -import os -import re -import resource -import subprocess -import time - - logger = logging.getLogger("synapse.app.homeserver") diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index e1c07028e8..bc90605324 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -29,7 +29,7 @@ class ApplicationServiceApi(SimpleHttpClient): pushing. """ - def __init__(self, hs): + def __init__(self, hs): super(ApplicationServiceApi, self).__init__(hs) self.clock = hs.get_clock() diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c6259f9dc8..e30e2da58d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -57,7 +57,7 @@ class FederationClient(FederationBase): cache_name="get_pdu_cache", clock=self._clock, max_len=1000, - expiry_ms=120*1000, + expiry_ms=120 * 1000, reset_expiry_on_get=False, ) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 744a9ee507..1423df6cf3 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -147,7 +147,7 @@ class BaseHandler(object): ) if not allowed: raise LimitExceededError( - retry_after_ms=int(1000*(time_allowed - time_now)), + retry_after_ms=int(1000 * (time_allowed - time_now)), ) @defer.inlineCallbacks diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 691564c651..4efecb1ffd 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -175,8 +175,8 @@ class DirectoryHandler(BaseHandler): # If this server is in the list of servers, return it first. if self.server_name in servers: servers = ( - [self.server_name] - + [s for s in servers if s != self.server_name] + [self.server_name] + + [s for s in servers if s != self.server_name] ) else: servers = list(servers) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 254b483da6..5ad8f3779a 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -130,7 +130,7 @@ class EventStreamHandler(BaseHandler): # Add some randomness to this value to try and mitigate against # thundering herds on restart. - timeout = random.randint(int(timeout*0.9), int(timeout*1.1)) + timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1)) events, tokens = yield self.notifier.get_events_for( auth_user, pagin_config, timeout, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d36eb3b8d7..d0c21ff5c9 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -34,7 +34,7 @@ metrics = synapse.metrics.get_metrics_for(__name__) # Don't bother bumping "last active" time if it differs by less than 60 seconds -LAST_ACTIVE_GRANULARITY = 60*1000 +LAST_ACTIVE_GRANULARITY = 60 * 1000 # Keep no more than this number of offline serial revisions MAX_OFFLINE_SERIALS = 1000 diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index abd1a16a41..b8fbcf9233 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -213,7 +213,7 @@ class RegistrationHandler(BaseHandler): 400, "User ID must only contain characters which do not" " require URL encoding." - ) + ) user = UserID(localpart, self.hs.hostname) user_id = user.to_string() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 799221c198..088b76d237 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -927,7 +927,7 @@ class RoomContextHandler(BaseHandler): Returns: dict, or None if the event isn't found """ - before_limit = math.floor(limit/2.) + before_limit = math.floor(limit / 2.) after_limit = limit - before_limit now_token = yield self.hs.get_event_sources().get_current_token() diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index da13e32e78..c3589534f8 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -152,7 +152,7 @@ class MatrixFederationHttpClient(object): return self.clock.time_bound_deferred( request_deferred, - time_out=timeout/1000. if timeout else 60, + time_out=timeout / 1000. if timeout else 60, ) response = yield preserve_context_over_fn( diff --git a/synapse/notifier.py b/synapse/notifier.py index 29965a9ab5..1a90bd55cd 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -308,7 +308,7 @@ class Notifier(object): def timed_out(): if listener: listener.deferred.cancel() - timer = self.clock.call_later(timeout/1000., timed_out) + timer = self.clock.call_later(timeout / 1000., timed_out) prev_token = from_token while not result: diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index dca018af95..2a2b4437dc 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -304,7 +304,7 @@ def _flatten_dict(d, prefix=[], result={}): if isinstance(value, basestring): result[".".join(prefix + [key])] = value.lower() elif hasattr(value, "items"): - _flatten_dict(value, prefix=(prefix+[key]), result=result) + _flatten_dict(value, prefix=(prefix + [key]), result=result) return result diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 07836709fb..7199113dac 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -89,7 +89,7 @@ class LoginRestServlet(ClientV1RestServlet): LoginRestServlet.SAML2_TYPE): relay_state = "" if "relay_state" in login_submission: - relay_state = "&RelayState="+urllib.quote( + relay_state = "&RelayState=" + urllib.quote( login_submission["relay_state"]) result = { "uri": "%s%s" % (self.idp_redirect_url, relay_state) diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py index e218ed215c..5547f1b112 100644 --- a/synapse/rest/client/v1/pusher.py +++ b/synapse/rest/client/v1/pusher.py @@ -52,7 +52,7 @@ class PusherRestServlet(ClientV1RestServlet): if i not in content: missing.append(i) if len(missing): - raise SynapseError(400, "Missing parameters: "+','.join(missing), + raise SynapseError(400, "Missing parameters: " + ','.join(missing), errcode=Codes.MISSING_PARAM) logger.debug("set pushkey %s to kind %s", content['pushkey'], content['kind']) @@ -83,7 +83,7 @@ class PusherRestServlet(ClientV1RestServlet): data=content['data'] ) except PusherConfigException as pce: - raise SynapseError(400, "Config Error: "+pce.message, + raise SynapseError(400, "Config Error: " + pce.message, errcode=Codes.MISSING_PARAM) defer.returnValue((200, {})) diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index 5378a9a938..2bfd4d96bf 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -38,7 +38,8 @@ logger = logging.getLogger(__name__) if hasattr(hmac, "compare_digest"): compare_digest = hmac.compare_digest else: - compare_digest = lambda a, b: a == b + def compare_digest(a, b): + return a == b class RegisterRestServlet(ClientV1RestServlet): diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 5d50dd9e3d..56a5bbec30 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -34,7 +34,8 @@ from synapse.util.async import run_on_reactor if hasattr(hmac, "compare_digest"): compare_digest = hmac.compare_digest else: - compare_digest = lambda a, b: a == b + def compare_digest(a, b): + return a == b logger = logging.getLogger(__name__) diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index 349ef6b396..ca5468c402 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -26,9 +26,7 @@ class VersionsRestServlet(RestServlet): def on_GET(self, request): return (200, { - "versions": [ - "r0.0.1", - ] + "versions": ["r0.0.1"] }) diff --git a/synapse/server.py b/synapse/server.py index 5fee7fe130..368d615576 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -23,7 +23,7 @@ from twisted.web.client import BrowserLikePolicyForHTTPS from twisted.enterprise import adbapi from synapse.federation import initialize_http_replication -from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory +from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory from synapse.notifier import Notifier from synapse.api.auth import Auth from synapse.handlers import Handlers diff --git a/synapse/state.py b/synapse/state.py index 0acf309fe0..b9a1387520 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -63,7 +63,7 @@ class StateHandler(object): cache_name="state_cache", clock=self.clock, max_len=SIZE_OF_CACHE, - expiry_ms=EVICTION_TIMEOUT_SECONDS*1000, + expiry_ms=EVICTION_TIMEOUT_SECONDS * 1000, reset_expiry_on_get=True, ) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index c91c7a3729..5a9e7720d9 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -59,7 +59,7 @@ logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller # times give more inserts into the database even for readonly API hits # 120 seconds == 2 minutes -LAST_SEEN_GRANULARITY = 120*1000 +LAST_SEEN_GRANULARITY = 120 * 1000 class DataStore(RoomMemberStore, RoomStore, diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 5e77320540..cfb87d9328 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -185,7 +185,7 @@ class SQLBaseStore(object): time_then = self._previous_loop_ts self._previous_loop_ts = time_now - ratio = (curr - prev)/(time_now - time_then) + ratio = (curr - prev) / (time_now - time_then) top_three_counters = self._txn_perf_counters.interval( time_now - time_then, limit=3 @@ -643,7 +643,10 @@ class SQLBaseStore(object): if not iterable: defer.returnValue(results) - chunks = [iterable[i:i+batch_size] for i in xrange(0, len(iterable), batch_size)] + chunks = [ + iterable[i:i + batch_size] + for i in xrange(0, len(iterable), batch_size) + ] for chunk in chunks: rows = yield self.runInteraction( desc, diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py index 400c10103c..91fac33b8b 100644 --- a/synapse/storage/engines/sqlite3.py +++ b/synapse/storage/engines/sqlite3.py @@ -54,7 +54,7 @@ class Sqlite3Engine(object): def _parse_match_info(buf): bufsize = len(buf) - return [struct.unpack('@I', buf[i:i+4])[0] for i in range(0, bufsize, 4)] + return [struct.unpack('@I', buf[i:i + 4])[0] for i in range(0, bufsize, 4)] def _rank(raw_match_info): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 5f32eec6f8..ce2c794025 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -58,7 +58,7 @@ class EventFederationStore(SQLBaseStore): new_front = set() front_list = list(front) chunks = [ - front_list[x:x+100] + front_list[x:x + 100] for x in xrange(0, len(front), 100) ] for chunk in chunks: diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5e85552029..4d7cdd00d0 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -84,7 +84,7 @@ class EventsStore(SQLBaseStore): event.internal_metadata.stream_ordering = stream chunks = [ - events_and_contexts[x:x+100] + events_and_contexts[x:x + 100] for x in xrange(0, len(events_and_contexts), 100) ] @@ -740,7 +740,7 @@ class EventsStore(SQLBaseStore): rows = [] N = 200 for i in range(1 + len(events) / N): - evs = events[i*N:(i + 1)*N] + evs = events[i * N:(i + 1) * N] if not evs: break @@ -755,7 +755,7 @@ class EventsStore(SQLBaseStore): " LEFT JOIN rejections as rej USING (event_id)" " LEFT JOIN redactions as r ON e.event_id = r.redacts" " WHERE e.event_id IN (%s)" - ) % (",".join(["?"]*len(evs)),) + ) % (",".join(["?"] * len(evs)),) txn.execute(sql, evs) rows.extend(self.cursor_to_dict(txn)) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 338a9d40d5..2c49a5e499 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -168,7 +168,7 @@ class StreamStore(SQLBaseStore): results = {} room_ids = list(room_ids) - for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)): + for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): res = yield defer.gatherResults([ self.get_room_events_stream_for_room( room_id, from_key, to_key, limit diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index f1fe963adf..7566d9eb33 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -46,7 +46,7 @@ class Clock(object): def looping_call(self, f, msec): l = task.LoopingCall(f) - l.start(msec/1000.0, now=False) + l.start(msec / 1000.0, now=False) return l def stop_looping_call(self, loop): diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 88e56e3302..e27917c63a 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -149,7 +149,7 @@ class CacheDescriptor(object): self.lru = lru self.tree = tree - self.arg_names = inspect.getargspec(orig).args[1:num_args+1] + self.arg_names = inspect.getargspec(orig).args[1:num_args + 1] if len(self.arg_names) < self.num_args: raise Exception( @@ -250,7 +250,7 @@ class CacheListDescriptor(object): self.num_args = num_args self.list_name = list_name - self.arg_names = inspect.getargspec(orig).args[1:num_args+1] + self.arg_names = inspect.getargspec(orig).args[1:num_args + 1] self.list_pos = self.arg_names.index(self.list_name) self.cache = cache diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 494226f5ea..62cae99649 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -55,7 +55,7 @@ class ExpiringCache(object): def f(): self._prune_cache() - self._clock.looping_call(f, self._expiry_ms/2) + self._clock.looping_call(f, self._expiry_ms / 2) def __setitem__(self, key, value): now = self._clock.time_msec() diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index 29d02f7e95..03bc1401b7 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -58,7 +58,7 @@ class TreeCache(object): if n: break - node_and_keys[i+1][0].pop(k) + node_and_keys[i + 1][0].pop(k) popped, cnt = _strip_and_count_entires(popped) self.size -= cnt diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py index d5b1a37eff..c37a157787 100644 --- a/synapse/util/logutils.py +++ b/synapse/util/logutils.py @@ -111,7 +111,7 @@ def time_function(f): _log_debug_as_f( f, "[FUNC END] {%s-%d} %f", - (func_name, id, end-start,), + (func_name, id, end - start,), ) return r diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index c37d6f12e3..ea321bc6a9 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -163,7 +163,7 @@ class _PerHostRatelimiter(object): "Ratelimit [%s]: sleeping req", id(request_id), ) - ret_defer = sleep(self.sleep_msec/1000.0) + ret_defer = sleep(self.sleep_msec / 1000.0) self.sleeping_requests.add(request_id) -- cgit 1.5.1 From 2df6114bc449194fa99aae3f7c41b37e1ea0dbcf Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Tue, 2 Feb 2016 19:21:49 +0000 Subject: Log more diagnostics for unrecognised access tokens --- synapse/api/auth.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/api') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index c5a2865e26..5bba9343f6 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -696,6 +696,7 @@ class Auth(object): def _look_up_user_by_access_token(self, token): ret = yield self.store.get_user_by_access_token(token) if not ret: + logger.warn("Unrecognised access token - not in store: %s" % (token,)) raise AuthError( self.TOKEN_NOT_FOUND_HTTP_STATUS, "Unrecognised access token.", errcode=Codes.UNKNOWN_TOKEN @@ -713,6 +714,7 @@ class Auth(object): token = request.args["access_token"][0] service = yield self.store.get_app_service_by_token(token) if not service: + logger.warn("Unrecognised appservice access token: %s" % (token,)) raise AuthError( self.TOKEN_NOT_FOUND_HTTP_STATUS, "Unrecognised access token.", -- cgit 1.5.1 From 737c4223ef25fd2856d0ff6cc111d14b19f1adec Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Fri, 5 Feb 2016 10:47:46 +0000 Subject: Host /media/r0 as well as /media/v1 --- synapse/api/urls.py | 3 ++- synapse/app/homeserver.py | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) (limited to 'synapse/api') diff --git a/synapse/api/urls.py b/synapse/api/urls.py index 19824f9a02..0fd9b7f244 100644 --- a/synapse/api/urls.py +++ b/synapse/api/urls.py @@ -23,5 +23,6 @@ WEB_CLIENT_PREFIX = "/_matrix/client" CONTENT_REPO_PREFIX = "/_matrix/content" SERVER_KEY_PREFIX = "/_matrix/key/v1" SERVER_KEY_V2_PREFIX = "/_matrix/key/v2" -MEDIA_PREFIX = "/_matrix/media/v1" +MEDIA_PREFIX = "/_matrix/media/r0" +LEGACY_MEDIA_PREFIX = "/_matrix/media/v1" APP_SERVICE_PREFIX = "/_matrix/appservice/v1" diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 89238cb7e3..e5c7e39cf9 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -56,7 +56,7 @@ from synapse.rest.key.v1.server_key_resource import LocalKey from synapse.rest.key.v2 import KeyApiV2Resource from synapse.api.urls import ( FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX, - SERVER_KEY_PREFIX, MEDIA_PREFIX, STATIC_PREFIX, + SERVER_KEY_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, STATIC_PREFIX, SERVER_KEY_V2_PREFIX, ) from synapse.config.homeserver import HomeServerConfig @@ -148,8 +148,10 @@ class SynapseHomeServer(HomeServer): }) if name in ["media", "federation", "client"]: + media_repo = MediaRepositoryResource(self) resources.update({ - MEDIA_PREFIX: MediaRepositoryResource(self), + MEDIA_PREFIX: media_repo, + LEGACY_MEDIA_PREFIX: media_repo, CONTENT_REPO_PREFIX: ContentRepoResource( self, self.config.uploads_path, self.auth, self.content_addr ), -- cgit 1.5.1 From 2c1fbea5319db2c64fa486adb32b5e66680b6daf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Feb 2016 10:22:44 +0000 Subject: Fix up logcontexts --- synapse/api/auth.py | 4 +- synapse/app/homeserver.py | 2 + synapse/crypto/keyring.py | 83 ++++++++++++----------- synapse/federation/federation_server.py | 4 +- synapse/federation/transaction_queue.py | 3 - synapse/handlers/_base.py | 10 +-- synapse/handlers/events.py | 11 +++- synapse/handlers/federation.py | 50 ++------------ synapse/handlers/presence.py | 20 +++--- synapse/handlers/register.py | 2 +- synapse/handlers/room.py | 11 +++- synapse/handlers/sync.py | 40 ++++++------ synapse/http/server.py | 5 +- synapse/notifier.py | 58 ++++++++-------- synapse/push/__init__.py | 2 +- synapse/push/pusherpool.py | 9 +-- synapse/rest/client/v2_alpha/account_data.py | 4 +- synapse/rest/client/v2_alpha/tags.py | 4 +- synapse/storage/_base.py | 18 ++--- synapse/storage/events.py | 34 ++++++---- synapse/storage/presence.py | 5 +- synapse/storage/stream.py | 9 +-- synapse/util/__init__.py | 6 +- synapse/util/async.py | 11 +++- synapse/util/caches/descriptors.py | 16 +++-- synapse/util/caches/snapshot_cache.py | 3 +- synapse/util/distributor.py | 15 +++-- synapse/util/logcontext.py | 98 ++++++++++++++++++++++++++-- synapse/util/logutils.py | 35 ++++++++++ synapse/util/metrics.py | 10 +-- synapse/util/ratelimitutils.py | 3 +- 31 files changed, 356 insertions(+), 229 deletions(-) (limited to 'synapse/api') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 5bba9343f6..e2f84c4d57 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -24,6 +24,7 @@ from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import AuthError, Codes, SynapseError, EventSizeError from synapse.types import Requester, RoomID, UserID, EventID from synapse.util.logutils import log_function +from synapse.util.logcontext import preserve_context_over_fn from unpaddedbase64 import decode_base64 import logging @@ -529,7 +530,8 @@ class Auth(object): default=[""] )[0] if user and access_token and ip_addr: - self.store.insert_client_ip( + preserve_context_over_fn( + self.store.insert_client_ip, user=user, access_token=access_token, ip=ip_addr, diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e5c7e39cf9..2b4be7bdd0 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -709,6 +709,8 @@ def run(hs): phone_home_task.start(60 * 60 * 24, now=False) def in_thread(): + # Uncomment to enable tracing of log context changes. + # sys.settrace(logcontext_tracer) with LoggingContext("run"): change_resource_limit(hs.config.soft_file_limit) reactor.run() diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index cddec0b2bc..d08ee0aa91 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -18,6 +18,10 @@ from synapse.api.errors import SynapseError, Codes from synapse.util.retryutils import get_retry_limiter from synapse.util import unwrapFirstError from synapse.util.async import ObservableDeferred +from synapse.util.logcontext import ( + preserve_context_over_deferred, preserve_context_over_fn, PreserveLoggingContext, + preserve_fn +) from twisted.internet import defer @@ -142,40 +146,43 @@ class Keyring(object): for server_name, _ in server_and_json } - # We want to wait for any previous lookups to complete before - # proceeding. - wait_on_deferred = self.wait_for_previous_lookups( - [server_name for server_name, _ in server_and_json], - server_to_deferred, - ) + with PreserveLoggingContext(): - # Actually start fetching keys. - wait_on_deferred.addBoth( - lambda _: self.get_server_verify_keys(group_id_to_group, deferreds) - ) + # We want to wait for any previous lookups to complete before + # proceeding. + wait_on_deferred = self.wait_for_previous_lookups( + [server_name for server_name, _ in server_and_json], + server_to_deferred, + ) - # When we've finished fetching all the keys for a given server_name, - # resolve the deferred passed to `wait_for_previous_lookups` so that - # any lookups waiting will proceed. - server_to_gids = {} + # Actually start fetching keys. + wait_on_deferred.addBoth( + lambda _: self.get_server_verify_keys(group_id_to_group, deferreds) + ) + + # When we've finished fetching all the keys for a given server_name, + # resolve the deferred passed to `wait_for_previous_lookups` so that + # any lookups waiting will proceed. + server_to_gids = {} - def remove_deferreds(res, server_name, group_id): - server_to_gids[server_name].discard(group_id) - if not server_to_gids[server_name]: - d = server_to_deferred.pop(server_name, None) - if d: - d.callback(None) - return res + def remove_deferreds(res, server_name, group_id): + server_to_gids[server_name].discard(group_id) + if not server_to_gids[server_name]: + d = server_to_deferred.pop(server_name, None) + if d: + d.callback(None) + return res - for g_id, deferred in deferreds.items(): - server_name = group_id_to_group[g_id].server_name - server_to_gids.setdefault(server_name, set()).add(g_id) - deferred.addBoth(remove_deferreds, server_name, g_id) + for g_id, deferred in deferreds.items(): + server_name = group_id_to_group[g_id].server_name + server_to_gids.setdefault(server_name, set()).add(g_id) + deferred.addBoth(remove_deferreds, server_name, g_id) # Pass those keys to handle_key_deferred so that the json object # signatures can be verified return [ - handle_key_deferred( + preserve_context_over_fn( + handle_key_deferred, group_id_to_group[g_id], deferreds[g_id], ) @@ -198,12 +205,13 @@ class Keyring(object): if server_name in self.key_downloads ] if wait_on: - yield defer.DeferredList(wait_on) + with PreserveLoggingContext(): + yield defer.DeferredList(wait_on) else: break for server_name, deferred in server_to_deferred.items(): - d = ObservableDeferred(deferred) + d = ObservableDeferred(preserve_context_over_deferred(deferred)) self.key_downloads[server_name] = d def rm(r, server_name): @@ -244,12 +252,13 @@ class Keyring(object): for group in group_id_to_group.values(): for key_id in group.key_ids: if key_id in merged_results[group.server_name]: - group_id_to_deferred[group.group_id].callback(( - group.group_id, - group.server_name, - key_id, - merged_results[group.server_name][key_id], - )) + with PreserveLoggingContext(): + group_id_to_deferred[group.group_id].callback(( + group.group_id, + group.server_name, + key_id, + merged_results[group.server_name][key_id], + )) break else: missing_groups.setdefault( @@ -504,7 +513,7 @@ class Keyring(object): yield defer.gatherResults( [ - self.store_keys( + preserve_fn(self.store_keys)( server_name=key_server_name, from_server=server_name, verify_keys=verify_keys, @@ -573,7 +582,7 @@ class Keyring(object): yield defer.gatherResults( [ - self.store.store_server_keys_json( + preserve_fn(self.store.store_server_keys_json)( server_name=server_name, key_id=key_id, from_server=server_name, @@ -675,7 +684,7 @@ class Keyring(object): # TODO(markjh): Store whether the keys have expired. yield defer.gatherResults( [ - self.store.store_server_verify_key( + preserve_fn(self.store.store_server_verify_key)( server_name, server_name, key.time_added, key ) for key_id, key in verify_keys.items() diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index a97aa0c94a..90718192dd 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -126,10 +126,8 @@ class FederationServer(FederationBase): results = [] for pdu in pdu_list: - d = self._handle_new_pdu(transaction.origin, pdu) - try: - yield d + yield self._handle_new_pdu(transaction.origin, pdu) results.append({}) except FederationError as e: self.send_failure(e, transaction.origin) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 622adad3ae..1928da03b3 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -103,7 +103,6 @@ class TransactionQueue(object): else: return not destination.startswith("localhost") - @defer.inlineCallbacks def enqueue_pdu(self, pdu, destinations, order): # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus @@ -141,8 +140,6 @@ class TransactionQueue(object): deferreds.append(deferred) - yield defer.DeferredList(deferreds, consumeErrors=True) - # NO inlineCallbacks def enqueue_edu(self, edu): destination = edu.destination diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 1423df6cf3..fa83d3e464 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -293,19 +293,11 @@ class BaseHandler(object): with PreserveLoggingContext(): # Don't block waiting on waking up all the listeners. - notify_d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - notify_d.addErrback(log_failure) - # If invite, remove room_state from unsigned before sending. event.unsigned.pop("invite_room_state", None) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 5ad8f3779a..4933c31c19 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.types import UserID from synapse.events.utils import serialize_event +from synapse.util.logcontext import preserve_context_over_fn from ._base import BaseHandler @@ -29,11 +30,17 @@ logger = logging.getLogger(__name__) def started_user_eventstream(distributor, user): - return distributor.fire("started_user_eventstream", user) + return preserve_context_over_fn( + distributor.fire, + "started_user_eventstream", user + ) def stopped_user_eventstream(distributor, user): - return distributor.fire("stopped_user_eventstream", user) + return preserve_context_over_fn( + distributor.fire, + "stopped_user_eventstream", user + ) class EventStreamHandler(BaseHandler): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2ce1e9d6c7..b78b0502d9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -221,19 +221,11 @@ class FederationHandler(BaseHandler): extra_users.append(target_user) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - if event.type == EventTypes.Member: if event.membership == Membership.JOIN: prev_state = context.current_state.get((event.type, event.state_key)) @@ -643,19 +635,11 @@ class FederationHandler(BaseHandler): ) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=[joinee] ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - logger.debug("Finished joining %s to %s", joinee, room_id) finally: room_queue = self.room_queues[room_id] @@ -730,18 +714,10 @@ class FederationHandler(BaseHandler): extra_users.append(target_user) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: user = UserID.from_string(event.state_key) @@ -811,19 +787,11 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(event.state_key) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=[target_user], ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - defer.returnValue(event) @defer.inlineCallbacks @@ -948,18 +916,10 @@ class FederationHandler(BaseHandler): extra_users.append(target_user) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - new_pdu = event destinations = set() diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d0c21ff5c9..b61394f2b5 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -378,9 +378,9 @@ class PresenceHandler(BaseHandler): was_polling = target_user in self._user_cachemap if now_online and not was_polling: - self.start_polling_presence(target_user, state=state) + yield self.start_polling_presence(target_user, state=state) elif not now_online and was_polling: - self.stop_polling_presence(target_user) + yield self.stop_polling_presence(target_user) # TODO(paul): perform a presence push as part of start/stop poll so # we don't have to do this all the time @@ -394,7 +394,8 @@ class PresenceHandler(BaseHandler): if now - prev_state.state.get("last_active", 0) < LAST_ACTIVE_GRANULARITY: return - self.changed_presencelike_data(user, {"last_active": now}) + with PreserveLoggingContext(): + self.changed_presencelike_data(user, {"last_active": now}) def get_joined_rooms_for_user(self, user): """Get the list of rooms a user is joined to. @@ -466,11 +467,12 @@ class PresenceHandler(BaseHandler): local_user, room_ids=[room_id], add_to_cache=False ) - self.push_update_to_local_and_remote( - observed_user=local_user, - users_to_push=[user], - statuscache=statuscache, - ) + with PreserveLoggingContext(): + self.push_update_to_local_and_remote( + observed_user=local_user, + users_to_push=[user], + statuscache=statuscache, + ) @defer.inlineCallbacks def send_presence_invite(self, observer_user, observed_user): @@ -556,7 +558,7 @@ class PresenceHandler(BaseHandler): observer_user.localpart, observed_user.to_string() ) - self.start_polling_presence( + yield self.start_polling_presence( observer_user, target_user=observed_user ) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 2660fd21a2..24c850ae9b 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -186,7 +186,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash="" ) - registered_user(self.distributor, user) + yield registered_user(self.distributor, user) defer.returnValue((user_id, token)) @defer.inlineCallbacks diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index bfd7e44e9f..a8e3a9029c 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -25,6 +25,7 @@ from synapse.api.constants import ( from synapse.api.errors import AuthError, StoreError, SynapseError, Codes from synapse.util import stringutils, unwrapFirstError from synapse.util.async import run_on_reactor +from synapse.util.logcontext import preserve_context_over_fn from signedjson.sign import verify_signed_json from signedjson.key import decode_verify_key_bytes @@ -46,11 +47,17 @@ def collect_presencelike_data(distributor, user, content): def user_left_room(distributor, user, room_id): - return distributor.fire("user_left_room", user=user, room_id=room_id) + return preserve_context_over_fn( + distributor.fire, + "user_left_room", user=user, room_id=room_id + ) def user_joined_room(distributor, user, room_id): - return distributor.fire("user_joined_room", user=user, room_id=room_id) + return preserve_context_over_fn( + distributor.fire, + "user_joined_room", user=user, room_id=room_id + ) class RoomCreationHandler(BaseHandler): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 72271f2626..3f1cda5b0b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -18,7 +18,7 @@ from ._base import BaseHandler from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes from synapse.util import unwrapFirstError -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from twisted.internet import defer @@ -241,15 +241,16 @@ class SyncHandler(BaseHandler): deferreds = [] for event in room_list: if event.membership == Membership.JOIN: - room_sync_deferred = self.full_state_sync_for_joined_room( - room_id=event.room_id, - sync_config=sync_config, - now_token=now_token, - timeline_since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) + with PreserveLoggingContext(LoggingContext.current_context()): + room_sync_deferred = self.full_state_sync_for_joined_room( + room_id=event.room_id, + sync_config=sync_config, + now_token=now_token, + timeline_since_token=timeline_since_token, + ephemeral_by_room=ephemeral_by_room, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + ) room_sync_deferred.addCallback(joined.append) deferreds.append(room_sync_deferred) elif event.membership == Membership.INVITE: @@ -262,15 +263,16 @@ class SyncHandler(BaseHandler): leave_token = now_token.copy_and_replace( "room_key", "s%d" % (event.stream_ordering,) ) - room_sync_deferred = self.full_state_sync_for_archived_room( - sync_config=sync_config, - room_id=event.room_id, - leave_event_id=event.event_id, - leave_token=leave_token, - timeline_since_token=timeline_since_token, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) + with PreserveLoggingContext(LoggingContext.current_context()): + room_sync_deferred = self.full_state_sync_for_archived_room( + sync_config=sync_config, + room_id=event.room_id, + leave_event_id=event.event_id, + leave_token=leave_token, + timeline_since_token=timeline_since_token, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + ) room_sync_deferred.addCallback(archived.append) deferreds.append(room_sync_deferred) diff --git a/synapse/http/server.py b/synapse/http/server.py index 06935783ca..a90e2e1125 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -99,9 +99,8 @@ def request_handler(request_handler): request_context.request = request_id with request.processing(): try: - d = request_handler(self, request) - with PreserveLoggingContext(): - yield d + with PreserveLoggingContext(request_context): + yield request_handler(self, request) except CodeMessageException as e: code = e.code if isinstance(e, SynapseError): diff --git a/synapse/notifier.py b/synapse/notifier.py index 1a90bd55cd..560866b26e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -18,7 +18,8 @@ from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.util.logutils import log_function -from synapse.util.async import run_on_reactor, ObservableDeferred +from synapse.util.async import ObservableDeferred +from synapse.util.logcontext import PreserveLoggingContext from synapse.types import StreamToken import synapse.metrics @@ -73,7 +74,8 @@ class _NotifierUserStream(object): self.current_token = current_token self.last_notified_ms = time_now_ms - self.notify_deferred = ObservableDeferred(defer.Deferred()) + with PreserveLoggingContext(): + self.notify_deferred = ObservableDeferred(defer.Deferred()) def notify(self, stream_key, stream_id, time_now_ms): """Notify any listeners for this user of a new event from an @@ -88,8 +90,10 @@ class _NotifierUserStream(object): ) self.last_notified_ms = time_now_ms noify_deferred = self.notify_deferred - self.notify_deferred = ObservableDeferred(defer.Deferred()) - noify_deferred.callback(self.current_token) + + with PreserveLoggingContext(): + self.notify_deferred = ObservableDeferred(defer.Deferred()) + noify_deferred.callback(self.current_token) def remove(self, notifier): """ Remove this listener from all the indexes in the Notifier @@ -184,8 +188,6 @@ class Notifier(object): lambda: count(bool, self.appservice_to_user_streams.values()), ) - @log_function - @defer.inlineCallbacks def on_new_room_event(self, event, room_stream_id, max_room_stream_id, extra_users=[]): """ Used by handlers to inform the notifier something has happened @@ -199,12 +201,11 @@ class Notifier(object): until all previous events have been persisted before notifying the client streams. """ - yield run_on_reactor() - - self.pending_new_room_events.append(( - room_stream_id, event, extra_users - )) - self._notify_pending_new_room_events(max_room_stream_id) + with PreserveLoggingContext(): + self.pending_new_room_events.append(( + room_stream_id, event, extra_users + )) + self._notify_pending_new_room_events(max_room_stream_id) def _notify_pending_new_room_events(self, max_room_stream_id): """Notify for the room events that were queued waiting for a previous @@ -251,31 +252,29 @@ class Notifier(object): extra_streams=app_streams, ) - @defer.inlineCallbacks - @log_function def on_new_event(self, stream_key, new_token, users=[], rooms=[], extra_streams=set()): """ Used to inform listeners that something has happend event wise. Will wake up all listeners for the given users and rooms. """ - yield run_on_reactor() - user_streams = set() + with PreserveLoggingContext(): + user_streams = set() - for user in users: - user_stream = self.user_to_user_stream.get(str(user)) - if user_stream is not None: - user_streams.add(user_stream) + for user in users: + user_stream = self.user_to_user_stream.get(str(user)) + if user_stream is not None: + user_streams.add(user_stream) - for room in rooms: - user_streams |= self.room_to_user_streams.get(room, set()) + for room in rooms: + user_streams |= self.room_to_user_streams.get(room, set()) - time_now_ms = self.clock.time_msec() - for user_stream in user_streams: - try: - user_stream.notify(stream_key, new_token, time_now_ms) - except: - logger.exception("Failed to notify listener") + time_now_ms = self.clock.time_msec() + for user_stream in user_streams: + try: + user_stream.notify(stream_key, new_token, time_now_ms) + except: + logger.exception("Failed to notify listener") @defer.inlineCallbacks def wait_for_events(self, user_id, timeout, callback, room_ids=None, @@ -325,7 +324,8 @@ class Notifier(object): # that we don't miss any current_token updates. prev_token = current_token listener = user_stream.new_listener(prev_token) - yield listener.deferred + with PreserveLoggingContext(): + yield listener.deferred except defer.CancelledError: break diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 64e581b8ba..8da2d8716c 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -111,7 +111,7 @@ class Pusher(object): self.user_id, config, timeout=0, affect_presence=False ) self.last_token = chunk['end'] - self.store.update_pusher_last_token( + yield self.store.update_pusher_last_token( self.app_id, self.pushkey, self.user_id, self.last_token ) logger.info("New pusher %s for user %s starting from token %s", diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index d1b7c0802f..d7dcb2de4b 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -18,6 +18,7 @@ from twisted.internet import defer from httppusher import HttpPusher from synapse.push import PusherConfigException +from synapse.util.logcontext import preserve_fn import logging @@ -76,7 +77,7 @@ class PusherPool: "Removing pusher for app id %s, pushkey %s, user %s", app_id, pushkey, p['user_name'] ) - self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) + yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) @defer.inlineCallbacks def remove_pushers_by_user(self, user_id): @@ -91,7 +92,7 @@ class PusherPool: "Removing pusher for app id %s, pushkey %s, user %s", p['app_id'], p['pushkey'], p['user_name'] ) - self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) + yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) @defer.inlineCallbacks def _add_pusher_to_store(self, user_id, access_token, profile_tag, kind, @@ -110,7 +111,7 @@ class PusherPool: lang=lang, data=data, ) - self._refresh_pusher(app_id, pushkey, user_id) + yield self._refresh_pusher(app_id, pushkey, user_id) def _create_pusher(self, pusherdict): if pusherdict['kind'] == 'http': @@ -166,7 +167,7 @@ class PusherPool: if fullid in self.pushers: self.pushers[fullid].stop() self.pushers[fullid] = p - p.start() + preserve_fn(p.start)() logger.info("Started pushers") diff --git a/synapse/rest/client/v2_alpha/account_data.py b/synapse/rest/client/v2_alpha/account_data.py index 985efe2a62..1456881c1a 100644 --- a/synapse/rest/client/v2_alpha/account_data.py +++ b/synapse/rest/client/v2_alpha/account_data.py @@ -57,7 +57,7 @@ class AccountDataServlet(RestServlet): user_id, account_data_type, body ) - yield self.notifier.on_new_event( + self.notifier.on_new_event( "account_data_key", max_id, users=[user_id] ) @@ -99,7 +99,7 @@ class RoomAccountDataServlet(RestServlet): user_id, room_id, account_data_type, body ) - yield self.notifier.on_new_event( + self.notifier.on_new_event( "account_data_key", max_id, users=[user_id] ) diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py index 42f2203f3d..79c436a8cf 100644 --- a/synapse/rest/client/v2_alpha/tags.py +++ b/synapse/rest/client/v2_alpha/tags.py @@ -80,7 +80,7 @@ class TagServlet(RestServlet): max_id = yield self.store.add_tag_to_room(user_id, room_id, tag, body) - yield self.notifier.on_new_event( + self.notifier.on_new_event( "account_data_key", max_id, users=[user_id] ) @@ -94,7 +94,7 @@ class TagServlet(RestServlet): max_id = yield self.store.remove_tag_from_room(user_id, room_id, tag) - yield self.notifier.on_new_event( + self.notifier.on_new_event( "account_data_key", max_id, users=[user_id] ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index cfb87d9328..2e97ac84a8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -15,7 +15,7 @@ import logging from synapse.api.errors import StoreError -from synapse.util.logcontext import preserve_context_over_fn, LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache import synapse.metrics @@ -298,10 +298,10 @@ class SQLBaseStore(object): func, *args, **kwargs ) - result = yield preserve_context_over_fn( - self._db_pool.runWithConnection, - inner_func, *args, **kwargs - ) + with PreserveLoggingContext(): + result = yield self._db_pool.runWithConnection( + inner_func, *args, **kwargs + ) for after_callback, after_args in after_callbacks: after_callback(*after_args) @@ -326,10 +326,10 @@ class SQLBaseStore(object): return func(conn, *args, **kwargs) - result = yield preserve_context_over_fn( - self._db_pool.runWithConnection, - inner_func, *args, **kwargs - ) + with PreserveLoggingContext(): + result = yield self._db_pool.runWithConnection( + inner_func, *args, **kwargs + ) defer.returnValue(result) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4d7cdd00d0..c6ed54721c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,7 +19,7 @@ from twisted.internet import defer, reactor from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event -from synapse.util.logcontext import preserve_context_over_deferred +from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logutils import log_function from synapse.api.constants import EventTypes @@ -664,14 +664,16 @@ class EventsStore(SQLBaseStore): for ids, d in lst: if not d.called: try: - d.callback([ - res[i] - for i in ids - if i in res - ]) + with PreserveLoggingContext(): + d.callback([ + res[i] + for i in ids + if i in res + ]) except: logger.exception("Failed to callback") - reactor.callFromThread(fire, event_list, row_dict) + with PreserveLoggingContext(): + reactor.callFromThread(fire, event_list, row_dict) except Exception as e: logger.exception("do_fetch") @@ -679,10 +681,12 @@ class EventsStore(SQLBaseStore): def fire(evs): for _, d in evs: if not d.called: - d.errback(e) + with PreserveLoggingContext(): + d.errback(e) if event_list: - reactor.callFromThread(fire, event_list) + with PreserveLoggingContext(): + reactor.callFromThread(fire, event_list) @defer.inlineCallbacks def _enqueue_events(self, events, check_redacted=True, @@ -709,18 +713,20 @@ class EventsStore(SQLBaseStore): should_start = False if should_start: - self.runWithConnection( - self._do_fetch - ) + with PreserveLoggingContext(): + self.runWithConnection( + self._do_fetch + ) - rows = yield preserve_context_over_deferred(events_d) + with PreserveLoggingContext(): + rows = yield events_d if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] res = yield defer.gatherResults( [ - self._get_event_from_row( + preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], check_redacted=check_redacted, get_prev_content=get_prev_content, diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 9b3aecaf8c..ef525f34c5 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -68,8 +68,9 @@ class PresenceStore(SQLBaseStore): for row in rows }) + @defer.inlineCallbacks def set_presence_state(self, user_localpart, new_state): - res = self._simple_update_one( + res = yield self._simple_update_one( table="presence", keyvalues={"user_id": user_localpart}, updatevalues={"state": new_state["state"], @@ -79,7 +80,7 @@ class PresenceStore(SQLBaseStore): ) self.get_presence_state.invalidate((user_localpart,)) - return res + defer.returnValue(res) def allow_presence_visible(self, observed_localpart, observer_userid): return self._simple_insert( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 50436cb2d2..367ffc9543 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -39,6 +39,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken +from synapse.util.logcontext import preserve_fn import logging @@ -170,12 +171,12 @@ class StreamStore(SQLBaseStore): room_ids = list(room_ids) for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): res = yield defer.gatherResults([ - self.get_room_events_stream_for_room( - room_id, from_key, to_key, limit - ).addCallback(lambda r, rm: (rm, r), room_id) + preserve_fn(self.get_room_events_stream_for_room)( + room_id, from_key, to_key, limit, + ) for room_id in room_ids ]) - results.update(dict(res)) + results.update(dict(zip(rm_ids, res))) defer.returnValue(results) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 7566d9eb33..133671e238 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import PreserveLoggingContext from twisted.internet import defer, reactor, task @@ -61,10 +61,8 @@ class Clock(object): *args: Postional arguments to pass to function. **kwargs: Key arguments to pass to function. """ - current_context = LoggingContext.current_context() - def wrapped_callback(*args, **kwargs): - with PreserveLoggingContext(current_context): + with PreserveLoggingContext(): callback(*args, **kwargs) with PreserveLoggingContext(): diff --git a/synapse/util/async.py b/synapse/util/async.py index 200edd404c..640fae3890 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -16,13 +16,16 @@ from twisted.internet import defer, reactor -from .logcontext import preserve_context_over_deferred +from .logcontext import PreserveLoggingContext +@defer.inlineCallbacks def sleep(seconds): d = defer.Deferred() - reactor.callLater(seconds, d.callback, seconds) - return preserve_context_over_deferred(d) + with PreserveLoggingContext(): + reactor.callLater(seconds, d.callback, seconds) + res = yield d + defer.returnValue(res) def run_on_reactor(): @@ -54,6 +57,7 @@ class ObservableDeferred(object): object.__setattr__(self, "_result", (True, r)) while self._observers: try: + # TODO: Handle errors here. self._observers.pop().callback(r) except: pass @@ -63,6 +67,7 @@ class ObservableDeferred(object): object.__setattr__(self, "_result", (False, f)) while self._observers: try: + # TODO: Handle errors here. self._observers.pop().errback(f) except: pass diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index e27917c63a..277854ccbc 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -18,6 +18,9 @@ from synapse.util.async import ObservableDeferred from synapse.util import unwrapFirstError from synapse.util.caches.lrucache import LruCache from synapse.util.caches.treecache import TreeCache +from synapse.util.logcontext import ( + PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn +) from . import caches_by_name, DEBUG_CACHES, cache_counter @@ -190,7 +193,7 @@ class CacheDescriptor(object): defer.returnValue(cached_result) observer.addCallback(check_result) - return observer + return preserve_context_over_deferred(observer) except KeyError: # Get the sequence number of the cache before reading from the # database so that we can tell if the cache is invalidated @@ -198,6 +201,7 @@ class CacheDescriptor(object): sequence = self.cache.sequence ret = defer.maybeDeferred( + preserve_context_over_fn, self.function_to_call, obj, *args, **kwargs ) @@ -211,7 +215,7 @@ class CacheDescriptor(object): ret = ObservableDeferred(ret, consumeErrors=True) self.cache.update(sequence, cache_key, ret) - return ret.observe() + return preserve_context_over_deferred(ret.observe()) wrapped.invalidate = self.cache.invalidate wrapped.invalidate_all = self.cache.invalidate_all @@ -299,6 +303,7 @@ class CacheListDescriptor(object): args_to_call[self.list_name] = missing ret_d = defer.maybeDeferred( + preserve_context_over_fn, self.function_to_call, **args_to_call ) @@ -308,7 +313,8 @@ class CacheListDescriptor(object): # We need to create deferreds for each arg in the list so that # we can insert the new deferred into the cache. for arg in missing: - observer = ret_d.observe() + with PreserveLoggingContext(): + observer = ret_d.observe() observer.addCallback(lambda r, arg: r.get(arg, None), arg) observer = ObservableDeferred(observer) @@ -327,10 +333,10 @@ class CacheListDescriptor(object): cached[arg] = res - return defer.gatherResults( + return preserve_context_over_deferred(defer.gatherResults( cached.values(), consumeErrors=True, - ).addErrback(unwrapFirstError).addCallback(lambda res: dict(res)) + ).addErrback(unwrapFirstError).addCallback(lambda res: dict(res))) obj.__dict__[self.orig.__name__] = wrapped diff --git a/synapse/util/caches/snapshot_cache.py b/synapse/util/caches/snapshot_cache.py index b1e40417fd..d03678b8c8 100644 --- a/synapse/util/caches/snapshot_cache.py +++ b/synapse/util/caches/snapshot_cache.py @@ -87,7 +87,8 @@ class SnapshotCache(object): # expire from the rotation of that cache. self.next_result_cache[key] = result self.pending_result_cache.pop(key, None) + return r - result.observe().addBoth(shuffle_along) + result.addBoth(shuffle_along) return result.observe() diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 4ebfebf701..8875813de4 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -15,9 +15,7 @@ from twisted.internet import defer -from synapse.util.logcontext import ( - PreserveLoggingContext, preserve_context_over_deferred, -) +from synapse.util.logcontext import PreserveLoggingContext from synapse.util import unwrapFirstError @@ -97,6 +95,7 @@ class Signal(object): Each observer callable may return a Deferred.""" self.observers.append(observer) + @defer.inlineCallbacks def fire(self, *args, **kwargs): """Invokes every callable in the observer list, passing in the args and kwargs. Exceptions thrown by observers are logged but ignored. It is @@ -116,6 +115,7 @@ class Signal(object): failure.getTracebackObject())) if not self.suppress_failures: return failure + return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb) with PreserveLoggingContext(): @@ -124,8 +124,11 @@ class Signal(object): for observer in self.observers ] - d = defer.gatherResults(deferreds, consumeErrors=True) + res = yield defer.gatherResults( + deferreds, consumeErrors=True + ).addErrback(unwrapFirstError) - d.addErrback(unwrapFirstError) + defer.returnValue(res) - return preserve_context_over_deferred(d) + def __repr__(self): + return "" % (self.name,) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index e701092cd8..9134e67908 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -48,7 +48,7 @@ class LoggingContext(object): __slots__ = [ "parent_context", "name", "usage_start", "usage_end", "main_thread", - "__dict__", "tag", + "__dict__", "tag", "alive", ] thread_local = threading.local() @@ -88,6 +88,7 @@ class LoggingContext(object): self.usage_start = None self.main_thread = threading.current_thread() self.tag = "" + self.alive = True def __str__(self): return "%s@%x" % (self.name, id(self)) @@ -106,6 +107,7 @@ class LoggingContext(object): The context that was previously active """ current = cls.current_context() + if current is not context: current.stop() cls.thread_local.current_context = context @@ -117,6 +119,7 @@ class LoggingContext(object): if self.parent_context is not None: raise Exception("Attempt to enter logging context multiple times") self.parent_context = self.set_current_context(self) + self.alive = True return self def __exit__(self, type, value, traceback): @@ -136,6 +139,7 @@ class LoggingContext(object): self ) self.parent_context = None + self.alive = False def __getattr__(self, name): """Delegate member lookup to parent context""" @@ -213,7 +217,7 @@ class PreserveLoggingContext(object): exited. Used to restore the context after a function using @defer.inlineCallbacks is resumed by a callback from the reactor.""" - __slots__ = ["current_context", "new_context"] + __slots__ = ["current_context", "new_context", "has_parent"] def __init__(self, new_context=LoggingContext.sentinel): self.new_context = new_context @@ -224,11 +228,26 @@ class PreserveLoggingContext(object): self.new_context ) + if self.current_context: + self.has_parent = self.current_context.parent_context is not None + if not self.current_context.alive: + logger.warn( + "Entering dead context: %s", + self.current_context, + ) + def __exit__(self, type, value, traceback): """Restores the current logging context""" - LoggingContext.set_current_context(self.current_context) + context = LoggingContext.set_current_context(self.current_context) + + if context != self.new_context: + logger.warn( + "Unexpected logging context: %s is not %s", + context, self.new_context, + ) + if self.current_context is not LoggingContext.sentinel: - if self.current_context.parent_context is None: + if not self.current_context.alive: logger.warn( "Restoring dead context: %s", self.current_context, @@ -289,3 +308,74 @@ def preserve_context_over_deferred(deferred): d = _PreservingContextDeferred(current_context) deferred.chainDeferred(d) return d + + +def preserve_fn(f): + """Ensures that function is called with correct context and that context is + restored after return. Useful for wrapping functions that return a deferred + which you don't yield on. + """ + current = LoggingContext.current_context() + + def g(*args, **kwargs): + with PreserveLoggingContext(current): + return f(*args, **kwargs) + + return g + + +# modules to ignore in `logcontext_tracer` +_to_ignore = [ + "synapse.util.logcontext", + "synapse.http.server", + "synapse.storage._base", + "synapse.util.async", +] + + +def logcontext_tracer(frame, event, arg): + """A tracer that logs whenever a logcontext "unexpectedly" changes within + a function. Probably inaccurate. + + Use by calling `sys.settrace(logcontext_tracer)` in the main thread. + """ + if event == 'call': + name = frame.f_globals["__name__"] + if name.startswith("synapse"): + if name == "synapse.util.logcontext": + if frame.f_code.co_name in ["__enter__", "__exit__"]: + tracer = frame.f_back.f_trace + if tracer: + tracer.just_changed = True + + tracer = frame.f_trace + if tracer: + return tracer + + if not any(name.startswith(ig) for ig in _to_ignore): + return LineTracer() + + +class LineTracer(object): + __slots__ = ["context", "just_changed"] + + def __init__(self): + self.context = LoggingContext.current_context() + self.just_changed = False + + def __call__(self, frame, event, arg): + if event in 'line': + if self.just_changed: + self.context = LoggingContext.current_context() + self.just_changed = False + else: + c = LoggingContext.current_context() + if c != self.context: + logger.info( + "Context changed! %s -> %s, %s, %s", + self.context, c, + frame.f_code.co_filename, frame.f_lineno + ) + self.context = c + + return self diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py index c37a157787..3a83828d25 100644 --- a/synapse/util/logutils.py +++ b/synapse/util/logutils.py @@ -168,3 +168,38 @@ def trace_function(f): wrapped.__name__ = func_name return wrapped + + +def get_previous_frames(): + s = inspect.currentframe().f_back.f_back + to_return = [] + while s: + if s.f_globals["__name__"].startswith("synapse"): + filename, lineno, function, _, _ = inspect.getframeinfo(s) + args_string = inspect.formatargvalues(*inspect.getargvalues(s)) + + to_return.append("{{ %s:%d %s - Args: %s }}" % ( + filename, lineno, function, args_string + )) + + s = s.f_back + + return ", ". join(to_return) + + +def get_previous_frame(ignore=[]): + s = inspect.currentframe().f_back.f_back + + while s: + if s.f_globals["__name__"].startswith("synapse"): + if not any(s.f_globals["__name__"].startswith(ig) for ig in ignore): + filename, lineno, function, _, _ = inspect.getframeinfo(s) + args_string = inspect.formatargvalues(*inspect.getargvalues(s)) + + return "{{ %s:%d %s - Args: %s }}" % ( + filename, lineno, function, args_string + ) + + s = s.f_back + + return None diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index daf6087fe0..ca48007218 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -68,16 +68,18 @@ class Measure(object): block_timer.inc_by(duration, self.name) context = LoggingContext.current_context() - if not context: - return if context != self.start_context: logger.warn( - "Context have unexpectedly changed %r, %r", - context, self.start_context + "Context have unexpectedly changed from '%s' to '%s'. (%r)", + context, self.start_context, self.name ) return + if not context: + logger.warn("Expected context. (%r)", self.name) + return + ru_utime, ru_stime = context.get_resource_usage() block_ru_utime.inc_by(ru_utime, self.name) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index ea321bc6a9..4076eed269 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError from synapse.util.async import sleep +from synapse.util.logcontext import preserve_fn import collections import contextlib @@ -163,7 +164,7 @@ class _PerHostRatelimiter(object): "Ratelimit [%s]: sleeping req", id(request_id), ) - ret_defer = sleep(self.sleep_msec / 1000.0) + ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0) self.sleeping_requests.add(request_id) -- cgit 1.5.1 From cf81375b94c4763766440471e632fc4b103450ab Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Fri, 12 Feb 2016 15:11:49 +0000 Subject: Merge two of the room join codepaths There's at least one more to merge in. Side-effects: * Stop reporting None as displayname and avatar_url in some cases * Joining a room by alias populates guest-ness in join event * Remove unspec'd PUT version of /join/ which has not been called on matrix.org according to logs * Stop recording access_token_id on /join/room_id - currently we don't record it on /join/room_alias; I can try to thread it through at some point. --- synapse/api/errors.py | 5 ++++ synapse/handlers/profile.py | 11 +++++-- synapse/handlers/room.py | 44 +++++++++++++++++++++------ synapse/rest/client/v1/room.py | 68 ++++++++---------------------------------- synapse/types.py | 14 +++++++-- 5 files changed, 73 insertions(+), 69 deletions(-) (limited to 'synapse/api') diff --git a/synapse/api/errors.py b/synapse/api/errors.py index b106fbed6d..0c7858f78d 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -84,6 +84,11 @@ class RegistrationError(SynapseError): pass +class BadIdentifierError(SynapseError): + """An error indicating an identifier couldn't be parsed.""" + pass + + class UnrecognizedRequestError(SynapseError): """An error indicating we don't understand the request you're trying to make""" def __init__(self, *args, **kwargs): diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 629e6e3594..32af622733 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -169,8 +169,15 @@ class ProfileHandler(BaseHandler): consumeErrors=True ).addErrback(unwrapFirstError) - state["displayname"] = displayname - state["avatar_url"] = avatar_url + if displayname is None: + del state["displayname"] + else: + state["displayname"] = displayname + + if avatar_url is None: + del state["avatar_url"] + else: + state["avatar_url"] = avatar_url defer.returnValue(None) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b2de2cd0c0..2950ed14e4 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -527,7 +527,17 @@ class RoomMemberHandler(BaseHandler): defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def join_room_alias(self, joinee, room_alias, content={}): + def lookup_room_alias(self, room_alias): + """ + Gets the room ID for an alias. + + Args: + room_alias (str): The room alias to look up. + Returns: + A tuple of the room ID (str) and the hosts hosting the room ([str]) + Raises: + SynapseError if the room couldn't be looked up. + """ directory_handler = self.hs.get_handlers().directory_handler mapping = yield directory_handler.get_association(room_alias) @@ -539,24 +549,40 @@ class RoomMemberHandler(BaseHandler): if not hosts: raise SynapseError(404, "No known servers") - # If event doesn't include a display name, add one. - yield collect_presencelike_data(self.distributor, joinee, content) + defer.returnValue((room_id, hosts)) + + @defer.inlineCallbacks + def do_join(self, requester, room_id, hosts=None): + """ + Joins requester to room_id. + + Args: + requester (Requester): The user joining the room. + room_id (str): The room ID (not alias) being joined. + hosts ([str]): A list of hosts which are hopefully in the room. + Raises: + SynapseError if the room couldn't be joined. + """ + hosts = hosts or [] + + content = {"membership": Membership.JOIN} + if requester.is_guest: + content["kind"] = "guest" + + yield collect_presencelike_data(self.distributor, requester.user, content) - content.update({"membership": Membership.JOIN}) builder = self.event_builder_factory.new({ "type": EventTypes.Member, - "state_key": joinee.to_string(), + "state_key": requester.user.to_string(), "room_id": room_id, - "sender": joinee.to_string(), - "membership": Membership.JOIN, + "sender": requester.user.to_string(), + "membership": Membership.JOIN, # For backwards compatibility "content": content, }) event, context = yield self._create_new_client_event(builder) yield self._do_join(event, context, room_hosts=hosts) - defer.returnValue({"room_id": room_id}) - @defer.inlineCallbacks def _do_join(self, event, context, room_hosts=None): room_id = event.room_id diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 81bfe377bd..1dd33b0a56 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -216,11 +216,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet): # TODO: Needs unit testing for room ID + alias joins class JoinRoomAliasServlet(ClientV1RestServlet): - - def register(self, http_server): - # /join/$room_identifier[/$txn_id] - PATTERNS = ("/join/(?P[^/]*)") - register_txn_path(self, PATTERNS, http_server) + PATTERNS = client_path_patterns("/join/(?P[^/]*)$") @defer.inlineCallbacks def on_POST(self, request, room_identifier, txn_id=None): @@ -229,60 +225,22 @@ class JoinRoomAliasServlet(ClientV1RestServlet): allow_guest=True, ) - # the identifier could be a room alias or a room id. Try one then the - # other if it fails to parse, without swallowing other valid - # SynapseErrors. + handler = self.handlers.room_member_handler - identifier = None - is_room_alias = False - try: - identifier = RoomAlias.from_string(room_identifier) - is_room_alias = True - except SynapseError: - identifier = RoomID.from_string(room_identifier) + room_id = None + hosts = [] + if RoomAlias.is_valid(room_identifier): + room_alias = RoomAlias.from_string(room_identifier) + room_id, hosts = yield handler.lookup_room_alias(room_alias) + else: + room_id = RoomID.from_string(room_identifier).to_string() # TODO: Support for specifying the home server to join with? - if is_room_alias: - handler = self.handlers.room_member_handler - ret_dict = yield handler.join_room_alias( - requester.user, - identifier, - ) - defer.returnValue((200, ret_dict)) - else: # room id - msg_handler = self.handlers.message_handler - content = {"membership": Membership.JOIN} - if requester.is_guest: - content["kind"] = "guest" - yield msg_handler.create_and_send_event( - { - "type": EventTypes.Member, - "content": content, - "room_id": identifier.to_string(), - "sender": requester.user.to_string(), - "state_key": requester.user.to_string(), - }, - token_id=requester.access_token_id, - txn_id=txn_id, - is_guest=requester.is_guest, - ) - - defer.returnValue((200, {"room_id": identifier.to_string()})) - - @defer.inlineCallbacks - def on_PUT(self, request, room_identifier, txn_id): - try: - defer.returnValue( - self.txns.get_client_transaction(request, txn_id) - ) - except KeyError: - pass - - response = yield self.on_POST(request, room_identifier, txn_id) - - self.txns.store_client_transaction(request, txn_id, response) - defer.returnValue(response) + yield handler.do_join( + requester, room_id, hosts=hosts + ) + defer.returnValue((200, {"room_id": room_id})) # TODO: Needs unit testing diff --git a/synapse/types.py b/synapse/types.py index 2095837ba6..0be8384e18 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.api.errors import SynapseError +from synapse.api.errors import SynapseError, BadIdentifierError from collections import namedtuple @@ -51,13 +51,13 @@ class DomainSpecificString( def from_string(cls, s): """Parse the string given by 's' into a structure object.""" if len(s) < 1 or s[0] != cls.SIGIL: - raise SynapseError(400, "Expected %s string to start with '%s'" % ( + raise BadIdentifierError(400, "Expected %s string to start with '%s'" % ( cls.__name__, cls.SIGIL, )) parts = s[1:].split(':', 1) if len(parts) != 2: - raise SynapseError( + raise BadIdentifierError( 400, "Expected %s of the form '%slocalname:domain'" % ( cls.__name__, cls.SIGIL, ) @@ -69,6 +69,14 @@ class DomainSpecificString( # names on one HS return cls(localpart=parts[0], domain=domain) + @classmethod + def is_valid(cls, s): + try: + cls.from_string(s) + return True + except: + return False + def to_string(self): """Return a string encoding the fields of the structure object.""" return "%s%s:%s" % (self.SIGIL, self.localpart, self.domain) -- cgit 1.5.1 From 4de08a4672c62eebda2ad3ee89643c2c32242cbf Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Fri, 12 Feb 2016 16:17:24 +0000 Subject: Revert "Merge two of the room join codepaths" This reverts commit cf81375b94c4763766440471e632fc4b103450ab. It subtly violates a guest joining auth check --- synapse/api/errors.py | 5 ---- synapse/handlers/profile.py | 11 ++----- synapse/handlers/room.py | 44 ++++++--------------------- synapse/rest/client/v1/room.py | 68 ++++++++++++++++++++++++++++++++++-------- synapse/types.py | 14 ++------- 5 files changed, 69 insertions(+), 73 deletions(-) (limited to 'synapse/api') diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 0c7858f78d..b106fbed6d 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -84,11 +84,6 @@ class RegistrationError(SynapseError): pass -class BadIdentifierError(SynapseError): - """An error indicating an identifier couldn't be parsed.""" - pass - - class UnrecognizedRequestError(SynapseError): """An error indicating we don't understand the request you're trying to make""" def __init__(self, *args, **kwargs): diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 32af622733..629e6e3594 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -169,15 +169,8 @@ class ProfileHandler(BaseHandler): consumeErrors=True ).addErrback(unwrapFirstError) - if displayname is None: - del state["displayname"] - else: - state["displayname"] = displayname - - if avatar_url is None: - del state["avatar_url"] - else: - state["avatar_url"] = avatar_url + state["displayname"] = displayname + state["avatar_url"] = avatar_url defer.returnValue(None) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 2950ed14e4..b2de2cd0c0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -527,17 +527,7 @@ class RoomMemberHandler(BaseHandler): defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def lookup_room_alias(self, room_alias): - """ - Gets the room ID for an alias. - - Args: - room_alias (str): The room alias to look up. - Returns: - A tuple of the room ID (str) and the hosts hosting the room ([str]) - Raises: - SynapseError if the room couldn't be looked up. - """ + def join_room_alias(self, joinee, room_alias, content={}): directory_handler = self.hs.get_handlers().directory_handler mapping = yield directory_handler.get_association(room_alias) @@ -549,40 +539,24 @@ class RoomMemberHandler(BaseHandler): if not hosts: raise SynapseError(404, "No known servers") - defer.returnValue((room_id, hosts)) - - @defer.inlineCallbacks - def do_join(self, requester, room_id, hosts=None): - """ - Joins requester to room_id. - - Args: - requester (Requester): The user joining the room. - room_id (str): The room ID (not alias) being joined. - hosts ([str]): A list of hosts which are hopefully in the room. - Raises: - SynapseError if the room couldn't be joined. - """ - hosts = hosts or [] - - content = {"membership": Membership.JOIN} - if requester.is_guest: - content["kind"] = "guest" - - yield collect_presencelike_data(self.distributor, requester.user, content) + # If event doesn't include a display name, add one. + yield collect_presencelike_data(self.distributor, joinee, content) + content.update({"membership": Membership.JOIN}) builder = self.event_builder_factory.new({ "type": EventTypes.Member, - "state_key": requester.user.to_string(), + "state_key": joinee.to_string(), "room_id": room_id, - "sender": requester.user.to_string(), - "membership": Membership.JOIN, # For backwards compatibility + "sender": joinee.to_string(), + "membership": Membership.JOIN, "content": content, }) event, context = yield self._create_new_client_event(builder) yield self._do_join(event, context, room_hosts=hosts) + defer.returnValue({"room_id": room_id}) + @defer.inlineCallbacks def _do_join(self, event, context, room_hosts=None): room_id = event.room_id diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 1dd33b0a56..81bfe377bd 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -216,7 +216,11 @@ class RoomSendEventRestServlet(ClientV1RestServlet): # TODO: Needs unit testing for room ID + alias joins class JoinRoomAliasServlet(ClientV1RestServlet): - PATTERNS = client_path_patterns("/join/(?P[^/]*)$") + + def register(self, http_server): + # /join/$room_identifier[/$txn_id] + PATTERNS = ("/join/(?P[^/]*)") + register_txn_path(self, PATTERNS, http_server) @defer.inlineCallbacks def on_POST(self, request, room_identifier, txn_id=None): @@ -225,22 +229,60 @@ class JoinRoomAliasServlet(ClientV1RestServlet): allow_guest=True, ) - handler = self.handlers.room_member_handler + # the identifier could be a room alias or a room id. Try one then the + # other if it fails to parse, without swallowing other valid + # SynapseErrors. - room_id = None - hosts = [] - if RoomAlias.is_valid(room_identifier): - room_alias = RoomAlias.from_string(room_identifier) - room_id, hosts = yield handler.lookup_room_alias(room_alias) - else: - room_id = RoomID.from_string(room_identifier).to_string() + identifier = None + is_room_alias = False + try: + identifier = RoomAlias.from_string(room_identifier) + is_room_alias = True + except SynapseError: + identifier = RoomID.from_string(room_identifier) # TODO: Support for specifying the home server to join with? - yield handler.do_join( - requester, room_id, hosts=hosts - ) - defer.returnValue((200, {"room_id": room_id})) + if is_room_alias: + handler = self.handlers.room_member_handler + ret_dict = yield handler.join_room_alias( + requester.user, + identifier, + ) + defer.returnValue((200, ret_dict)) + else: # room id + msg_handler = self.handlers.message_handler + content = {"membership": Membership.JOIN} + if requester.is_guest: + content["kind"] = "guest" + yield msg_handler.create_and_send_event( + { + "type": EventTypes.Member, + "content": content, + "room_id": identifier.to_string(), + "sender": requester.user.to_string(), + "state_key": requester.user.to_string(), + }, + token_id=requester.access_token_id, + txn_id=txn_id, + is_guest=requester.is_guest, + ) + + defer.returnValue((200, {"room_id": identifier.to_string()})) + + @defer.inlineCallbacks + def on_PUT(self, request, room_identifier, txn_id): + try: + defer.returnValue( + self.txns.get_client_transaction(request, txn_id) + ) + except KeyError: + pass + + response = yield self.on_POST(request, room_identifier, txn_id) + + self.txns.store_client_transaction(request, txn_id, response) + defer.returnValue(response) # TODO: Needs unit testing diff --git a/synapse/types.py b/synapse/types.py index 0be8384e18..2095837ba6 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.api.errors import SynapseError, BadIdentifierError +from synapse.api.errors import SynapseError from collections import namedtuple @@ -51,13 +51,13 @@ class DomainSpecificString( def from_string(cls, s): """Parse the string given by 's' into a structure object.""" if len(s) < 1 or s[0] != cls.SIGIL: - raise BadIdentifierError(400, "Expected %s string to start with '%s'" % ( + raise SynapseError(400, "Expected %s string to start with '%s'" % ( cls.__name__, cls.SIGIL, )) parts = s[1:].split(':', 1) if len(parts) != 2: - raise BadIdentifierError( + raise SynapseError( 400, "Expected %s of the form '%slocalname:domain'" % ( cls.__name__, cls.SIGIL, ) @@ -69,14 +69,6 @@ class DomainSpecificString( # names on one HS return cls(localpart=parts[0], domain=domain) - @classmethod - def is_valid(cls, s): - try: - cls.from_string(s) - return True - except: - return False - def to_string(self): """Return a string encoding the fields of the structure object.""" return "%s%s:%s" % (self.SIGIL, self.localpart, self.domain) -- cgit 1.5.1 From 536f949a1a0e531314e023436c22859b5114376d Mon Sep 17 00:00:00 2001 From: Patrik Oldsberg Date: Wed, 17 Feb 2016 10:53:06 +0100 Subject: api/filtering: don't assume that event content will always be a dict Signed-off-by: Patrik Oldsberg --- synapse/api/filtering.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/api') diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 6eff83e5f8..cd699ef27f 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -198,7 +198,10 @@ class Filter(object): sender = event.get("sender", None) if not sender: # Presence events have their 'sender' in content.user_id - sender = event.get("content", {}).get("user_id", None) + content = event.get("content") + # account_data has been allowed to have non-dict content, so check type first + if isinstance(content, dict): + sender = content.get("user_id") return self.check_fields( event.get("room_id", None), -- cgit 1.5.1 From 114b929f8bdd3dcc165fd488c087aaca3dc8bd91 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Feb 2016 09:16:32 +0000 Subject: Check presence state is a valid one --- synapse/api/constants.py | 1 - synapse/handlers/presence.py | 6 ++++++ 2 files changed, 6 insertions(+), 1 deletion(-) (limited to 'synapse/api') diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 84cbe710b3..8cf4d6169c 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -32,7 +32,6 @@ class PresenceState(object): OFFLINE = u"offline" UNAVAILABLE = u"unavailable" ONLINE = u"online" - FREE_FOR_CHAT = u"free_for_chat" class JoinRules(object): diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1942e2ad30..439bfe5919 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -606,6 +606,12 @@ class PresenceHandler(BaseHandler): status_msg = state.get("status_msg", None) presence = state["presence"] + valid_presence = ( + PresenceState.ONLINE, PresenceState.UNAVAILABLE, PresenceState.OFFLINE + ) + if presence not in valid_presence: + raise SynapseError(400, "Invalid presence state") + user_id = target_user.to_string() prev_state = yield self.current_state_for_user(user_id) -- cgit 1.5.1 From 577951b0324f67308f50c14fad703d2103621bc5 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Tue, 23 Feb 2016 15:11:25 +0000 Subject: Allow third_party_signed to be specified on /join --- synapse/api/auth.py | 57 ++++++++++++-------- synapse/federation/federation_server.py | 15 +++++- synapse/federation/transport/server.py | 12 ++++- synapse/handlers/federation.py | 93 +++++++++++++++++++++++++-------- synapse/handlers/room.py | 67 +++++++++++++++++++++--- synapse/python_dependencies.py | 2 +- synapse/rest/client/v1/room.py | 4 ++ 7 files changed, 196 insertions(+), 54 deletions(-) (limited to 'synapse/api') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index e2f84c4d57..183245443c 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -434,31 +434,46 @@ class Auth(object): if event.user_id != invite_event.user_id: return False - try: - public_key = invite_event.content["public_key"] - if signed["mxid"] != event.state_key: - return False - if signed["token"] != token: - return False - for server, signature_block in signed["signatures"].items(): - for key_name, encoded_signature in signature_block.items(): - if not key_name.startswith("ed25519:"): - return False - verify_key = decode_verify_key_bytes( - key_name, - decode_base64(public_key) - ) - verify_signed_json(signed, server, verify_key) - # We got the public key from the invite, so we know that the - # correct server signed the signed bundle. - # The caller is responsible for checking that the signing - # server has not revoked that public key. - return True + if signed["mxid"] != event.state_key: return False - except (KeyError, SignatureVerifyException,): + if signed["token"] != token: return False + for public_key_object in self.get_public_keys(invite_event): + public_key = public_key_object["public_key"] + try: + for server, signature_block in signed["signatures"].items(): + for key_name, encoded_signature in signature_block.items(): + if not key_name.startswith("ed25519:"): + continue + verify_key = decode_verify_key_bytes( + key_name, + decode_base64(public_key) + ) + verify_signed_json(signed, server, verify_key) + + # We got the public key from the invite, so we know that the + # correct server signed the signed bundle. + # The caller is responsible for checking that the signing + # server has not revoked that public key. + return True + except (KeyError, SignatureVerifyException,): + continue + return False + + def get_public_keys(self, invite_event): + public_keys = [] + if "public_key" in invite_event.content: + o = { + "public_key": invite_event.content["public_key"], + } + if "key_validity_url" in invite_event.content: + o["key_validity_url"] = invite_event.content["key_validity_url"] + public_keys.append(o) + public_keys.extend(invite_event.content.get("public_keys", [])) + return public_keys + def _get_power_level_event(self, auth_events): key = (EventTypes.PowerLevels, "", ) return auth_events.get(key) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 90718192dd..e8bfbe7cb5 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -543,8 +543,19 @@ class FederationServer(FederationBase): return event @defer.inlineCallbacks - def exchange_third_party_invite(self, invite): - ret = yield self.handler.exchange_third_party_invite(invite) + def exchange_third_party_invite( + self, + sender_user_id, + target_user_id, + room_id, + signed, + ): + ret = yield self.handler.exchange_third_party_invite( + sender_user_id, + target_user_id, + room_id, + signed, + ) defer.returnValue(ret) @defer.inlineCallbacks diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 65e054f7dd..6e92e2f8f4 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -425,7 +425,17 @@ class On3pidBindServlet(BaseFederationServlet): last_exception = None for invite in content["invites"]: try: - yield self.handler.exchange_third_party_invite(invite) + if "signed" not in invite or "token" not in invite["signed"]: + message = ("Rejecting received notification of third-" + "party invite without signed: %s" % (invite,)) + logger.info(message) + raise SynapseError(400, message) + yield self.handler.exchange_third_party_invite( + invite["sender"], + invite["mxid"], + invite["room_id"], + invite["signed"], + ) except Exception as e: last_exception = e if last_exception: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ac15f9e5dd..3655b9e5e2 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -14,6 +14,9 @@ # limitations under the License. """Contains handlers for federation events.""" +from signedjson.key import decode_verify_key_bytes +from signedjson.sign import verify_signed_json +from unpaddedbase64 import decode_base64 from ._base import BaseHandler @@ -1620,19 +1623,15 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def exchange_third_party_invite(self, invite): - sender = invite["sender"] - room_id = invite["room_id"] - - if "signed" not in invite or "token" not in invite["signed"]: - logger.info( - "Discarding received notification of third party invite " - "without signed: %s" % (invite,) - ) - return - + def exchange_third_party_invite( + self, + sender_user_id, + target_user_id, + room_id, + signed, + ): third_party_invite = { - "signed": invite["signed"], + "signed": signed, } event_dict = { @@ -1642,8 +1641,8 @@ class FederationHandler(BaseHandler): "third_party_invite": third_party_invite, }, "room_id": room_id, - "sender": sender, - "state_key": invite["mxid"], + "sender": sender_user_id, + "state_key": target_user_id, } if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)): @@ -1656,11 +1655,11 @@ class FederationHandler(BaseHandler): ) self.auth.check(event, context.current_state) - yield self._validate_keyserver(event, auth_events=context.current_state) + yield self._check_signature(event, auth_events=context.current_state) member_handler = self.hs.get_handlers().room_member_handler yield member_handler.send_membership_event(event, context, from_client=False) else: - destinations = set([x.split(":", 1)[-1] for x in (sender, room_id)]) + destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id)) yield self.replication_layer.forward_third_party_invite( destinations, room_id, @@ -1681,7 +1680,7 @@ class FederationHandler(BaseHandler): ) self.auth.check(event, auth_events=context.current_state) - yield self._validate_keyserver(event, auth_events=context.current_state) + yield self._check_signature(event, auth_events=context.current_state) returned_invite = yield self.send_invite(origin, event) # TODO: Make sure the signatures actually are correct. @@ -1711,17 +1710,69 @@ class FederationHandler(BaseHandler): defer.returnValue((event, context)) @defer.inlineCallbacks - def _validate_keyserver(self, event, auth_events): - token = event.content["third_party_invite"]["signed"]["token"] + def _check_signature(self, event, auth_events): + """ + Checks that the signature in the event is consistent with its invite. + :param event (Event): The m.room.member event to check + :param auth_events (dict<(event type, state_key), event>) + + :raises + AuthError if signature didn't match any keys, or key has been + revoked, + SynapseError if a transient error meant a key couldn't be checked + for revocation. + """ + signed = event.content["third_party_invite"]["signed"] + token = signed["token"] invite_event = auth_events.get( (EventTypes.ThirdPartyInvite, token,) ) + if not invite_event: + raise AuthError(403, "Could not find invite") + + last_exception = None + for public_key_object in self.hs.get_auth().get_public_keys(invite_event): + try: + for server, signature_block in signed["signatures"].items(): + for key_name, encoded_signature in signature_block.items(): + if not key_name.startswith("ed25519:"): + continue + + public_key = public_key_object["public_key"] + verify_key = decode_verify_key_bytes( + key_name, + decode_base64(public_key) + ) + verify_signed_json(signed, server, verify_key) + if "key_validity_url" in public_key_object: + yield self._check_key_revocation( + public_key, + public_key_object["key_validity_url"] + ) + return + except Exception as e: + last_exception = e + raise last_exception + + @defer.inlineCallbacks + def _check_key_revocation(self, public_key, url): + """ + Checks whether public_key has been revoked. + + :param public_key (str): base-64 encoded public key. + :param url (str): Key revocation URL. + + :raises + AuthError if they key has been revoked. + SynapseError if a transient error meant a key couldn't be checked + for revocation. + """ try: response = yield self.hs.get_simple_http_client().get_json( - invite_event.content["key_validity_url"], - {"public_key": invite_event.content["public_key"]} + url, + {"public_key": public_key} ) except Exception: raise SynapseError( diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b00cac4bd4..eb9700a35b 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -398,6 +398,7 @@ class RoomMemberHandler(BaseHandler): action, txn_id=None, remote_room_hosts=None, + third_party_signed=None, ratelimit=True, ): effective_membership_state = action @@ -406,6 +407,15 @@ class RoomMemberHandler(BaseHandler): elif action == "forget": effective_membership_state = "leave" + if third_party_signed is not None: + replication = self.hs.get_replication_layer() + yield replication.exchange_third_party_invite( + third_party_signed["sender"], + target.to_string(), + room_id, + third_party_signed, + ) + msg_handler = self.hs.get_handlers().message_handler content = {"membership": effective_membership_state} @@ -759,7 +769,7 @@ class RoomMemberHandler(BaseHandler): if room_avatar_event: room_avatar_url = room_avatar_event.content.get("url", "") - token, public_key, key_validity_url, display_name = ( + token, public_keys, fallback_public_key, display_name = ( yield self._ask_id_server_for_third_party_invite( id_server=id_server, medium=medium, @@ -774,14 +784,18 @@ class RoomMemberHandler(BaseHandler): inviter_avatar_url=inviter_avatar_url ) ) + msg_handler = self.hs.get_handlers().message_handler yield msg_handler.create_and_send_nonmember_event( { "type": EventTypes.ThirdPartyInvite, "content": { "display_name": display_name, - "key_validity_url": key_validity_url, - "public_key": public_key, + "public_keys": public_keys, + + # For backwards compatibility: + "key_validity_url": fallback_public_key["key_validity_url"], + "public_key": fallback_public_key["public_key"], }, "room_id": room_id, "sender": user.to_string(), @@ -806,6 +820,34 @@ class RoomMemberHandler(BaseHandler): inviter_display_name, inviter_avatar_url ): + """ + Asks an identity server for a third party invite. + + :param id_server (str): hostname + optional port for the identity server. + :param medium (str): The literal string "email". + :param address (str): The third party address being invited. + :param room_id (str): The ID of the room to which the user is invited. + :param inviter_user_id (str): The user ID of the inviter. + :param room_alias (str): An alias for the room, for cosmetic + notifications. + :param room_avatar_url (str): The URL of the room's avatar, for cosmetic + notifications. + :param room_join_rules (str): The join rules of the email + (e.g. "public"). + :param room_name (str): The m.room.name of the room. + :param inviter_display_name (str): The current display name of the + inviter. + :param inviter_avatar_url (str): The URL of the inviter's avatar. + + :return: A deferred tuple containing: + token (str): The token which must be signed to prove authenticity. + public_keys ([{"public_key": str, "key_validity_url": str}]): + public_key is a base64-encoded ed25519 public key. + fallback_public_key: One element from public_keys. + display_name (str): A user-friendly name to represent the invited + user. + """ + is_url = "%s%s/_matrix/identity/api/v1/store-invite" % ( id_server_scheme, id_server, ) @@ -826,12 +868,21 @@ class RoomMemberHandler(BaseHandler): ) # TODO: Check for success token = data["token"] - public_key = data["public_key"] + public_keys = data.get("public_keys", []) + if "public_key" in data: + fallback_public_key = { + "public_key": data["public_key"], + "key_validity_url": "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % ( + id_server_scheme, id_server, + ), + } + else: + fallback_public_key = public_keys[0] + + if not public_keys: + public_keys.append(fallback_public_key) display_name = data["display_name"] - key_validity_url = "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % ( - id_server_scheme, id_server, - ) - defer.returnValue((token, public_key, key_validity_url, display_name)) + defer.returnValue((token, public_keys, fallback_public_key, display_name)) def forget(self, user, room_id): return self.store.forget(user.to_string(), room_id) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 75bf3d13aa..35933324a4 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -19,7 +19,7 @@ logger = logging.getLogger(__name__) REQUIREMENTS = { "frozendict>=0.4": ["frozendict"], - "unpaddedbase64>=1.0.1": ["unpaddedbase64>=1.0.1"], + "unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"], "canonicaljson>=1.0.0": ["canonicaljson>=1.0.0"], "signedjson>=1.0.0": ["signedjson>=1.0.0"], "pynacl==0.3.0": ["nacl==0.3.0", "nacl.bindings"], diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index e6f5c5614a..07a2a5dd82 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -228,6 +228,8 @@ class JoinRoomAliasServlet(ClientV1RestServlet): allow_guest=True, ) + content = _parse_json(request) + if RoomID.is_valid(room_identifier): room_id = room_identifier remote_room_hosts = None @@ -248,6 +250,7 @@ class JoinRoomAliasServlet(ClientV1RestServlet): action="join", txn_id=txn_id, remote_room_hosts=remote_room_hosts, + third_party_signed=content.get("third_party_signed", None), ) defer.returnValue((200, {"room_id": room_id})) @@ -451,6 +454,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet): room_id=room_id, action=membership_action, txn_id=txn_id, + third_party_signed=content.get("third_party_signed", None), ) defer.returnValue((200, {})) -- cgit 1.5.1 From 874fd432575044c6cd93ee4f311412f677e6b7fe Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 7 Mar 2016 17:13:56 +0000 Subject: Send the user ID matching the guest access token, since there is no Matrix API to discover what user ID an access token is for. --- synapse/api/auth.py | 4 ++-- synapse/handlers/room.py | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) (limited to 'synapse/api') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 183245443c..3038df4ab8 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -534,7 +534,7 @@ class Auth(object): ) access_token = request.args["access_token"][0] - user_info = yield self._get_user_by_access_token(access_token) + user_info = yield self.get_user_by_access_token(access_token) user = user_info["user"] token_id = user_info["token_id"] is_guest = user_info["is_guest"] @@ -595,7 +595,7 @@ class Auth(object): defer.returnValue(user_id) @defer.inlineCallbacks - def _get_user_by_access_token(self, token): + def get_user_by_access_token(self, token): """ Get a registered user's ID. Args: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 0cb6c521c4..57113ae4a5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -884,6 +884,10 @@ class RoomMemberHandler(BaseHandler): inviter_user_id=inviter_user_id, ) + guest_user_info = yield self.hs.get_auth().get_user_by_access_token( + guest_access_token + ) + is_url = "%s%s/_matrix/identity/api/v1/store-invite" % ( id_server_scheme, id_server, ) @@ -900,6 +904,7 @@ class RoomMemberHandler(BaseHandler): "sender": inviter_user_id, "sender_display_name": inviter_display_name, "sender_avatar_url": inviter_avatar_url, + "guest_user_id": guest_user_info["user"].to_string(), "guest_access_token": guest_access_token, } ) -- cgit 1.5.1