From e3e2fc3255665ac888f3e0c0c6e2be39d5fda5f3 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 15 Jan 2015 16:17:21 +0000 Subject: Don't make the pushers' event streams cause people to appear online --- synapse/handlers/events.py | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index c9ade253dd..54ab27004f 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -47,11 +47,11 @@ class EventStreamHandler(BaseHandler): @defer.inlineCallbacks @log_function def get_stream(self, auth_user_id, pagin_config, timeout=0, - as_client_event=True): + as_client_event=True, affect_presence=True): auth_user = self.hs.parse_userid(auth_user_id) try: - if auth_user not in self._streams_per_user: + if affect_presence and auth_user not in self._streams_per_user: self._streams_per_user[auth_user] = 0 if auth_user in self._stop_timer_per_user: try: @@ -64,7 +64,7 @@ class EventStreamHandler(BaseHandler): yield self.distributor.fire( "started_user_eventstream", auth_user ) - self._streams_per_user[auth_user] += 1 + self._streams_per_user[auth_user] += 1 if pagin_config.from_token is None: pagin_config.from_token = None @@ -92,27 +92,28 @@ class EventStreamHandler(BaseHandler): defer.returnValue(chunk) finally: - self._streams_per_user[auth_user] -= 1 - if not self._streams_per_user[auth_user]: - del self._streams_per_user[auth_user] - - # 10 seconds of grace to allow the client to reconnect again - # before we think they're gone - def _later(): - logger.debug( - "_later stopped_user_eventstream %s", auth_user - ) + if affect_presence: + self._streams_per_user[auth_user] -= 1 + if not self._streams_per_user[auth_user]: + del self._streams_per_user[auth_user] + + # 10 seconds of grace to allow the client to reconnect again + # before we think they're gone + def _later(): + logger.debug( + "_later stopped_user_eventstream %s", auth_user + ) - self._stop_timer_per_user.pop(auth_user, None) + self._stop_timer_per_user.pop(auth_user, None) - yield self.distributor.fire( - "stopped_user_eventstream", auth_user - ) + yield self.distributor.fire( + "stopped_user_eventstream", auth_user + ) - logger.debug("Scheduling _later: for %s", auth_user) - self._stop_timer_per_user[auth_user] = ( - self.clock.call_later(30, _later) - ) + logger.debug("Scheduling _later: for %s", auth_user) + self._stop_timer_per_user[auth_user] = ( + self.clock.call_later(30, _later) + ) class EventHandler(BaseHandler): -- cgit 1.4.1 From 0cfb4591a7754bcc08edddd17629006b5096d94d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 26 Jan 2015 15:46:31 +0000 Subject: Add handler for /sync API --- synapse/handlers/sync.py | 110 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 synapse/handlers/sync.py (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py new file mode 100644 index 0000000000..ec20ea4890 --- /dev/null +++ b/synapse/handlers/sync.py @@ -0,0 +1,110 @@ +import collections + + +SyncConfig = collections.namedtuple("SyncConfig", [ + "user", + "device", + "since", + "limit", + "gap", + "sort" + "backfill" + "filter", +) + + +RoomSyncResult = collections.namedtuple("RoomSyncResult", [ + "room_id", + "limited", + "published", + "prev_batch", + "events", + "state", + "event_map", +]) + + +class SyncResult(collections.namedtuple("SyncResult", [ + "next_batch", # Token for the next sync + "private_user_data", # List of private events for the user. + "public_user_data", # List of public events for all users. + "rooms", # RoomSyncResult for each room. +])): + __slots__ = [] + + def __nonzero__(self): + return self.private_user_data or self.public_user_data or self.rooms + + +class SyncHandler(BaseHandler): + + def __init__(self, hs): + super(SyncHandler, self).__init__(hs) + self.event_sources = hs.get_event_sources() + + def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0): + if timeout == 0: + return self.current_sync_for_user(sync_config, since) + else: + def current_sync_callback(since_token): + return self.current_sync_for_user( + self, since_token, sync_config + ) + return self.notifier.wait_for_events( + sync_config.filter, since_token, current_sync_callback + ) + defer.returnValue(result) + + def current_sync_for_user(self, sync_config, since_token=None): + if since_token is None: + return self.inital_sync(sync_config) + else: + return self.incremental_sync(sync_config) + + @defer.inlineCallbacks + def initial_sync(self, sync_config): + now_token = yield self.event_sources.get_current_token() + + presence_stream = self.event_sources.sources["presence"] + # TODO (markjh): This looks wrong, shouldn't we be getting the presence + # UP to the present rather than after the present? + pagination_config = PaginationConfig(from_token=now_token) + presence, _ = yield presence_stream.get_pagination_rows( + user, pagination_config.get_source_config("presence"), None + ) + room_list = yield self.store.get_rooms_for_user_where_membership_is( + user_id=user_id, + membership_list=[Membership.INVITE, Membership.JOIN] + ) + + # TODO (markjh): Does public mean "published"? + published_rooms = yield self.store.get_rooms(is_public=True) + published_room_ids = set(r["room_id"] for r in public_rooms) + + for event in room_list: + + messages, token = yield self.store.get_recent_events_for_room( + event.room_id, + limit=sync_config.limit, + end_token=now_token.room_key, + ) + prev_batch_token = now_token.copy_and_replace("room_key", token[0]) + current_state = yield self.state_handler.get_current_state( + event.room_id + ) + + rooms.append(RoomSyncResult( + room_id=event.room_id, + published=event.room_id in published_room_ids, + + + + + + @defer.inlineCallbacks + def incremental_sync(self, sync_config): + + + + + -- cgit 1.4.1 From 436513068de73ab47d9ba9a32046420be3d86588 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 26 Jan 2015 18:53:31 +0000 Subject: Start implementing the non-incremental sync portion of the v2 /sync API --- synapse/events/utils.py | 6 +- synapse/handlers/__init__.py | 2 + synapse/handlers/sync.py | 87 ++++++++++++++++++++--------- synapse/rest/client/v2_alpha/sync.py | 105 +++++++++++++++++++++++++---------- 4 files changed, 146 insertions(+), 54 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index e391aca4cc..b7f1ad4b40 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -89,7 +89,7 @@ def prune_event(event): return type(event)(allowed_fields) -def serialize_event(e, time_now_ms, client_event=True): +def serialize_event(e, time_now_ms, client_event=True, strip_ids=False): # FIXME(erikj): To handle the case of presence events and the like if not isinstance(e, EventBase): return e @@ -138,4 +138,8 @@ def serialize_event(e, time_now_ms, client_event=True): d.pop("unsigned", None) d.pop("origin", None) + if strip_ids: + d.pop("room_id", None) + d.pop("event_id", None) + return d diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index fe071a4bc2..a32eab9316 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -26,6 +26,7 @@ from .presence import PresenceHandler from .directory import DirectoryHandler from .typing import TypingNotificationHandler from .admin import AdminHandler +from .sync import SyncHandler class Handlers(object): @@ -51,3 +52,4 @@ class Handlers(object): self.directory_handler = DirectoryHandler(hs) self.typing_notification_handler = TypingNotificationHandler(hs) self.admin_handler = AdminHandler(hs) + self.sync_handler = SyncHandler(hs) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ec20ea4890..bbabaf3df1 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1,26 +1,49 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseHandler + +from synapse.streams.config import PaginationConfig +from synapse.api.constants import Membership + +from twisted.internet import defer + import collections +import logging + +logger = logging.getLogger(__name__) SyncConfig = collections.namedtuple("SyncConfig", [ "user", "device", - "since", "limit", "gap", - "sort" - "backfill" + "sort", + "backfill", "filter", -) +]) RoomSyncResult = collections.namedtuple("RoomSyncResult", [ "room_id", "limited", "published", - "prev_batch", - "events", + "events", # dict of event "state", - "event_map", + "prev_batch", ]) @@ -41,10 +64,11 @@ class SyncHandler(BaseHandler): def __init__(self, hs): super(SyncHandler, self).__init__(hs) self.event_sources = hs.get_event_sources() + self.clock = hs.get_clock() def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0): if timeout == 0: - return self.current_sync_for_user(sync_config, since) + return self.current_sync_for_user(sync_config, since_token) else: def current_sync_callback(since_token): return self.current_sync_for_user( @@ -53,58 +77,71 @@ class SyncHandler(BaseHandler): return self.notifier.wait_for_events( sync_config.filter, since_token, current_sync_callback ) - defer.returnValue(result) def current_sync_for_user(self, sync_config, since_token=None): if since_token is None: - return self.inital_sync(sync_config) + return self.initial_sync(sync_config) else: return self.incremental_sync(sync_config) @defer.inlineCallbacks def initial_sync(self, sync_config): + if sync_config.sort == "timeline,desc": + # TODO(mjark): Handle going through events in reverse order?. + # What does "most recent events" mean when applying the limits mean + # in this case? + raise NotImplementedError() + now_token = yield self.event_sources.get_current_token() presence_stream = self.event_sources.sources["presence"] - # TODO (markjh): This looks wrong, shouldn't we be getting the presence + # TODO (mjark): This looks wrong, shouldn't we be getting the presence # UP to the present rather than after the present? pagination_config = PaginationConfig(from_token=now_token) presence, _ = yield presence_stream.get_pagination_rows( - user, pagination_config.get_source_config("presence"), None + user=sync_config.user, + pagination_config=pagination_config.get_source_config("presence"), + key=None ) room_list = yield self.store.get_rooms_for_user_where_membership_is( - user_id=user_id, + user_id=sync_config.user.to_string(), membership_list=[Membership.INVITE, Membership.JOIN] ) - # TODO (markjh): Does public mean "published"? + # TODO (mjark): Does public mean "published"? published_rooms = yield self.store.get_rooms(is_public=True) - published_room_ids = set(r["room_id"] for r in public_rooms) + published_room_ids = set(r["room_id"] for r in published_rooms) + rooms = [] for event in room_list: - - messages, token = yield self.store.get_recent_events_for_room( + #TODO (mjark): Apply the event filter in sync_config. + recent_events, token = yield self.store.get_recent_events_for_room( event.room_id, limit=sync_config.limit, end_token=now_token.room_key, ) prev_batch_token = now_token.copy_and_replace("room_key", token[0]) - current_state = yield self.state_handler.get_current_state( + current_state_events = yield self.state_handler.get_current_state( event.room_id ) rooms.append(RoomSyncResult( room_id=event.room_id, published=event.room_id in published_room_ids, + events=recent_events, + prev_batch=prev_batch_token, + state=current_state_events, + limited=True, + )) - - + defer.returnValue(SyncResult( + public_user_data=presence, + private_user_data=[], + rooms=rooms, + next_batch=now_token, + )) @defer.inlineCallbacks def incremental_sync(self, sync_config): - - - - - + pass diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 39bb5ec8e9..cc667ebafc 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2015 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,6 +16,9 @@ from twisted.internet import defer from synapse.http.servlet import RestServlet +from synapse.handlers.sync import SyncConfig +from synapse.types import StreamToken +from synapse.events.utils import serialize_event from ._base import client_v2_pattern import logging @@ -73,14 +76,15 @@ class SyncRestServlet(RestServlet): def __init__(self, hs): super(SyncRestServlet, self).__init__() self.auth = hs.get_auth() - #self.sync_handler = hs.get_handlers().sync_hanlder + self.sync_handler = hs.get_handlers().sync_handler + self.clock = hs.get_clock() @defer.inlineCallbacks def on_GET(self, request): user = yield self.auth.get_user_by_req(request) timeout = self.parse_integer(request, "timeout", default=0) - limit = self.parse_integer(request, "limit", default=None) + limit = self.parse_integer(request, "limit", required=True) gap = self.parse_boolean(request, "gap", default=True) sort = self.parse_string( request, "sort", default="timeline,asc", @@ -91,7 +95,7 @@ class SyncRestServlet(RestServlet): request, "set_presence", default="online", allowed_values=self.ALLOWED_PRESENCE ) - backfill = self.parse_boolean(request, "backfill", default=True) + backfill = self.parse_boolean(request, "backfill", default=False) filter_id = self.parse_string(request, "filter", default=None) logger.info( @@ -108,36 +112,81 @@ class SyncRestServlet(RestServlet): # if filter.matches(event): # # stuff - # if timeout != 0: - # register for updates from the event stream - - #rooms = [] - - if gap: - pass - # now_stream_token = get_current_stream_token - # for room_id in get_rooms_for_user(user, filter=filter): - # state, events, start, end, limited, published = updates_for_room( - # from=since, to=now_stream_token, limit=limit, - # anchor_to_start=False - # ) - # rooms[room_id] = (state, events, start, limited, published) - # next_stream_token = now. + sync_config = SyncConfig( + user=user, + device="TODO", # TODO(mjark) Get the device_id from access_token + gap=gap, + limit=limit, + sort=sort, + backfill=backfill, + filter="TODO", # TODO(mjark) Add the filter to the config. + ) + + if since is not None: + since_token = StreamToken.from_string(since) else: - pass - # now_stream_token = get_current_stream_token - # for room_id in get_rooms_for_user(user, filter=filter) - # state, events, start, end, limited, published = updates_for_room( - # from=since, to=now_stream_token, limit=limit, - # anchor_to_start=False - # ) - # next_stream_token = min(next_stream_token, end) + since_token = None + sync_result = yield self.sync_handler.wait_for_sync_for_user( + sync_config, since_token=since_token, timeout=timeout + ) - response_content = {} + time_now = self.clock.time_msec() + + response_content = { + "public_user_data": self.encode_events( + sync_result.public_user_data, filter, time_now + ), + "private_user_data": self.encode_events( + sync_result.private_user_data, filter, time_now + ), + "rooms": self.encode_rooms(sync_result.rooms, filter, time_now), + "next_batch": sync_result.next_batch.to_string(), + } defer.returnValue((200, response_content)) + def encode_events(self, events, filter, time_now): + return [self.encode_event(event, filter, time_now) for event in events] + + @staticmethod + def encode_event(event, filter, time_now): + # TODO(mjark): Respect formatting requirements in the filter. + return serialize_event(event, time_now) + + def encode_rooms(self, rooms, filter, time_now): + return [self.encode_room(room, filter, time_now) for room in rooms] + + @staticmethod + def encode_room(room, filter, time_now): + event_map = {} + state_event_ids = [] + recent_event_ids = [] + for event in room.state: + # TODO(mjark): Respect formatting requirements in the filter. + event_map[event.event_id] = serialize_event( + event, time_now, strip_ids=True + ) + state_event_ids.append(event.event_id) + + for event in room.events: + # TODO(mjark): Respect formatting requirements in the filter. + event_map[event.event_id] = serialize_event( + event, time_now, strip_ids=True + ) + recent_event_ids.append(event.event_id) + return { + "room_id": room.room_id, + "event_map": event_map, + "events": { + "batch": recent_event_ids, + "prev_batch": room.prev_batch.to_string(), + }, + "state": state_event_ids, + "limited": room.limited, + "published": room.published, + } + def register_servlets(hs, http_server): SyncRestServlet(hs).register(http_server) -- cgit 1.4.1 From 1d779691248be2fcdd43e97352d13c2cef239c10 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 27 Jan 2015 15:58:27 +0000 Subject: Unbreak bad presence merge - don't add these blocks together with an and: they're different things. --- synapse/handlers/events.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 75fb941008..f1a3e4a4ad 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -51,19 +51,21 @@ class EventStreamHandler(BaseHandler): auth_user = self.hs.parse_userid(auth_user_id) try: - if affect_presence and auth_user not in self._streams_per_user: - self._streams_per_user[auth_user] = 0 - if auth_user in self._stop_timer_per_user: - try: - self.clock.cancel_call_later( - self._stop_timer_per_user.pop(auth_user) + if affect_presence: + if auth_user not in self._streams_per_user: + self._streams_per_user[auth_user] = 0 + if auth_user in self._stop_timer_per_user: + try: + print "cancel",auth_user + self.clock.cancel_call_later( + self._stop_timer_per_user.pop(auth_user) + ) + except: + logger.exception("Failed to cancel event timer") + else: + yield self.distributor.fire( + "started_user_eventstream", auth_user ) - except: - logger.exception("Failed to cancel event timer") - else: - yield self.distributor.fire( - "started_user_eventstream", auth_user - ) self._streams_per_user[auth_user] += 1 if pagin_config.from_token is None: -- cgit 1.4.1 From eba89f093f0376b8499953b53bbd85f9b12b7852 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 27 Jan 2015 16:00:07 +0000 Subject: Need a defer.inlineCallbacks here as we yield in it: otherwise nothing in the cb gets executed. --- synapse/handlers/events.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index f1a3e4a4ad..851eebf600 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -99,6 +99,7 @@ class EventStreamHandler(BaseHandler): # 10 seconds of grace to allow the client to reconnect again # before we think they're gone + @defer.inlineCallbacks def _later(): logger.debug( "_later stopped_user_eventstream %s", auth_user -- cgit 1.4.1 From 5eacaeb4a73aa4c4ff450d7b23acabe79124171f Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 27 Jan 2015 16:05:23 +0000 Subject: or of course we could just return the deferred --- synapse/handlers/events.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 851eebf600..48de3630e3 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -99,7 +99,6 @@ class EventStreamHandler(BaseHandler): # 10 seconds of grace to allow the client to reconnect again # before we think they're gone - @defer.inlineCallbacks def _later(): logger.debug( "_later stopped_user_eventstream %s", auth_user @@ -107,7 +106,7 @@ class EventStreamHandler(BaseHandler): self._stop_timer_per_user.pop(auth_user, None) - yield self.distributor.fire( + return self.distributor.fire( "stopped_user_eventstream", auth_user ) -- cgit 1.4.1 From f7c4daa8f94e5968277ba438f6d3da1f0f27ba4c Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 27 Jan 2015 16:08:47 +0000 Subject: Oops, remove debugging --- synapse/handlers/events.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 48de3630e3..52ef07eaae 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -56,7 +56,6 @@ class EventStreamHandler(BaseHandler): self._streams_per_user[auth_user] = 0 if auth_user in self._stop_timer_per_user: try: - print "cancel",auth_user self.clock.cancel_call_later( self._stop_timer_per_user.pop(auth_user) ) -- cgit 1.4.1 From a56008842b43089433768f569f35b2d14523ac39 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 27 Jan 2015 16:24:22 +0000 Subject: Start implementing incremental initial sync --- synapse/events/utils.py | 1 + synapse/handlers/sync.py | 233 +++++++++++++++++++++++++++++++++++++++++----- synapse/storage/stream.py | 41 ++++++-- 3 files changed, 241 insertions(+), 34 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index b7f1ad4b40..42fb0371e5 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -137,6 +137,7 @@ def serialize_event(e, time_now_ms, client_event=True, strip_ids=False): d.pop("depth", None) d.pop("unsigned", None) d.pop("origin", None) + d.pop("prev_state", None) if strip_ids: d.pop("room_id", None) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index bbabaf3df1..f8629a588f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -37,14 +37,18 @@ SyncConfig = collections.namedtuple("SyncConfig", [ ]) -RoomSyncResult = collections.namedtuple("RoomSyncResult", [ +class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ "room_id", "limited", "published", - "events", # dict of event + "events", "state", "prev_batch", -]) +])): + __slots__ = [] + + def __nonzero__(self): + return bool(self.events or self.state) class SyncResult(collections.namedtuple("SyncResult", [ @@ -56,7 +60,9 @@ class SyncResult(collections.namedtuple("SyncResult", [ __slots__ = [] def __nonzero__(self): - return self.private_user_data or self.public_user_data or self.rooms + return bool( + self.private_user_data or self.public_user_data or self.rooms + ) class SyncHandler(BaseHandler): @@ -67,7 +73,13 @@ class SyncHandler(BaseHandler): self.clock = hs.get_clock() def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0): - if timeout == 0: + """Get the sync for a client if we have new data for it now. Otherwise + wait for new data to arrive on the server. If the timeout expires, then + return an empty sync result. + Returns: + A Deferred SyncResult. + """ + if timeout == 0 or since_token is None: return self.current_sync_for_user(sync_config, since_token) else: def current_sync_callback(since_token): @@ -79,13 +91,25 @@ class SyncHandler(BaseHandler): ) def current_sync_for_user(self, sync_config, since_token=None): + """Get the sync for client needed to match what the server has now. + Returns: + A Deferred SyncResult. + """ if since_token is None: return self.initial_sync(sync_config) else: - return self.incremental_sync(sync_config) + if sync_config.gap: + return self.incremental_sync_with_gap(sync_config, since_token) + else: + #TODO(mjark): Handle gapless sync + pass @defer.inlineCallbacks def initial_sync(self, sync_config): + """Get a sync for a client which is starting without any state + Returns: + A Deferred SyncResult. + """ if sync_config.sort == "timeline,desc": # TODO(mjark): Handle going through events in reverse order?. # What does "most recent events" mean when applying the limits mean @@ -114,25 +138,86 @@ class SyncHandler(BaseHandler): rooms = [] for event in room_list: - #TODO (mjark): Apply the event filter in sync_config. - recent_events, token = yield self.store.get_recent_events_for_room( - event.room_id, - limit=sync_config.limit, - end_token=now_token.room_key, - ) - prev_batch_token = now_token.copy_and_replace("room_key", token[0]) - current_state_events = yield self.state_handler.get_current_state( - event.room_id + room_sync = yield self.initial_sync_for_room( + event.room_id, sync_config, now_token, published_room_ids ) + rooms.append(room_sync) + + defer.returnValue(SyncResult( + public_user_data=presence, + private_user_data=[], + rooms=rooms, + next_batch=now_token, + )) + + @defer.inlineCallbacks + def intial_sync_for_room(self, room_id, sync_config, now_token, + published_room_ids): + """Sync a room for a client which is starting without any state + Returns: + A Deferred RoomSyncResult. + """ + recent_events, token = yield self.store.get_recent_events_for_room( + room_id, + limit=sync_config.limit, + end_token=now_token.room_key, + ) + prev_batch_token = now_token.copy_and_replace("room_key", token[0]) + current_state_events = yield self.state_handler.get_current_state( + room_id + ) + + defer.returnValue(RoomSyncResult( + room_id=room_id, + published=room_id in published_room_ids, + events=recent_events, + prev_batch=prev_batch_token, + state=current_state_events, + limited=True, + )) + + + @defer.inlineCallbacks + def incremental_sync_with_gap(self, sync_config, since_token): + """ Get the incremental delta needed to bring the client up to + date with the server. + Returns: + A Deferred SyncResult. + """ + if sync_config.sort == "timeline,desc": + # TODO(mjark): Handle going through events in reverse order?. + # What does "most recent events" mean when applying the limits mean + # in this case? + raise NotImplementedError() + + now_token = yield self.event_sources.get_current_token() + + presence_stream = self.event_sources.sources["presence"] + pagination_config = PaginationConfig( + from_token=since_token, to_token=now_token + ) + presence, _ = yield presence_stream.get_pagination_rows( + user=sync_config.user, + pagination_config=pagination_config.get_source_config("presence"), + key=None + ) + room_list = yield self.store.get_rooms_for_user_where_membership_is( + user_id=sync_config.user.to_string(), + membership_list=[Membership.INVITE, Membership.JOIN] + ) + + # TODO (mjark): Does public mean "published"? + published_rooms = yield self.store.get_rooms(is_public=True) + published_room_ids = set(r["room_id"] for r in published_rooms) - rooms.append(RoomSyncResult( - room_id=event.room_id, - published=event.room_id in published_room_ids, - events=recent_events, - prev_batch=prev_batch_token, - state=current_state_events, - limited=True, - )) + rooms = [] + for event in room_list: + room_sync = yield self.incremental_sync_with_gap_for_room( + event.room_id, sync_config, since_token, now_token, + published_room_ids + ) + if room_sync: + rooms.append(room_sync) defer.returnValue(SyncResult( public_user_data=presence, @@ -143,5 +228,103 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks - def incremental_sync(self, sync_config): - pass + def incremental_sync_with_gap_for_room(self, room_id, sync_config, + since_token, now_token, + published_room_ids): + """ Get the incremental delta needed to bring the client up to date for + the room. Gives the client the most recent events and the changes to + state. + Returns: + A Deferred RoomSyncResult + """ + # TODO(mjark): Check if they have joined the room between + # the previous sync and this one. + # TODO(mjark): Apply the event filter in sync_config + # TODO(mjark): Check for redactions we might have missed. + # TODO(mjark): Typing notifications. + recents, token = yield self.store.get_recent_events_for_room( + room_id, + limit=sync_config.limit + 1, + from_token=since_token.room_key, + end_token=now_token.room_key, + ) + + logging.debug("Recents %r", recents) + + if len(recents) > sync_config.limit: + limited = True + recents = recents[1:] + else: + limited = False + + prev_batch_token = now_token.copy_and_replace("room_key", token[0]) + + # TODO(mjark): This seems racy since this isn't being passed a + # token to indicate what point in the stream this is + current_state_events = yield self.state_handler.get_current_state( + room_id + ) + + state_at_previous_sync = yield self.get_state_at_previous_sync( + room_id, since_token=since_token + ) + + state_events_delta = yield self.compute_state_delta( + since_token=since_token, + previous_state=state_at_previous_sync, + current_state=current_state_events, + ) + + room_sync = RoomSyncResult( + room_id=room_id, + published=room_id in published_room_ids, + events=recents, + prev_batch=prev_batch_token, + state=state_events_delta, + limited=limited, + ) + + logging.debug("Room sync: %r", room_sync) + + defer.returnValue(room_sync) + + @defer.inlineCallbacks + def get_state_at_previous_sync(self, room_id, since_token): + """ Get the room state at the previous sync the client made. + Returns: + A Deferred list of Events. + """ + last_events, token = yield self.store.get_recent_events_for_room( + room_id, end_token=since_token.room_key, limit=1, + ) + + if last_events: + last_event = last_events[0] + last_context = yield self.state_handler.compute_event_context( + last_event + ) + if last_event.is_state(): + state = [last_event] + last_context.current_state.values() + else: + state = last_context.current_state.values() + else: + state = () + defer.returnValue(state) + + + def compute_state_delta(self, since_token, previous_state, current_state): + """ Works out the differnce in state between the current state and the + state the client got when it last performed a sync. + Returns: + A list of events. + """ + # TODO(mjark) Check if the state events were received by the server + # after the previous sync, since we need to include those state + # updates even if they occured logically before the previous event. + # TODO(mjark) Check for new redactions in the state events. + previous_dict = {event.event_id:event for event in previous_state} + state_delta = [] + for event in current_state: + if event.event_id not in previous_dict: + state_delta.append(event) + return state_delta diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 8ac2adab05..06aca1a4e5 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -265,17 +265,38 @@ class StreamStore(SQLBaseStore): return self.runInteraction("paginate_room_events", f) def get_recent_events_for_room(self, room_id, limit, end_token, - with_feedback=False): + with_feedback=False, from_token=None): # TODO (erikj): Handle compressed feedback - sql = ( - "SELECT stream_ordering, topological_ordering, event_id FROM events " - "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 " - "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? " - ) + end_token = _StreamToken.parse_stream_token(end_token) - def f(txn): - txn.execute(sql, (room_id, end_token, limit,)) + if from_token is None: + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0" + " ORDER BY topological_ordering DESC, stream_ordering DESC" + " LIMIT ?" + ) + else: + from_token = _StreamToken.parse_stream_token(from_token) + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering > ?" + " AND stream_ordering <= ? AND outlier = 0" + " ORDER BY topological_ordering DESC, stream_ordering DESC" + " LIMIT ?" + ) + + + def get_recent_events_for_room_txn(txn): + if from_token is None: + txn.execute(sql, (room_id, end_token.stream, limit,)) + else: + txn.execute(sql, ( + room_id, from_token.stream, end_token.stream, limit + )) rows = self.cursor_to_dict(txn) @@ -303,7 +324,9 @@ class StreamStore(SQLBaseStore): return events, token - return self.runInteraction("get_recent_events_for_room", f) + return self.runInteraction( + "get_recent_events_for_room", get_recent_events_for_room_txn + ) def get_room_events_max_id(self): return self.runInteraction( -- cgit 1.4.1 From b19cf6a105ad87954e15cf01e60e14fec280db6d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 27 Jan 2015 20:09:52 +0000 Subject: Wait for events if the incremental sync is empty and a timeout is given --- demo/start.sh | 2 +- synapse/handlers/sync.py | 19 ++++++++++++------- synapse/notifier.py | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/demo/start.sh b/demo/start.sh index bb2248770d..a7502e7a8d 100755 --- a/demo/start.sh +++ b/demo/start.sh @@ -16,7 +16,7 @@ if [ $# -eq 1 ]; then fi fi -for port in 8080 8081 8082; do +for port in 8080; do echo "Starting server on port $port... " https_port=$((port + 400)) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f8629a588f..9f5f73eab6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -72,6 +72,7 @@ class SyncHandler(BaseHandler): self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() + @defer.inlineCallbacks def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0): """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then @@ -80,15 +81,19 @@ class SyncHandler(BaseHandler): A Deferred SyncResult. """ if timeout == 0 or since_token is None: - return self.current_sync_for_user(sync_config, since_token) + result = yield self.current_sync_for_user(sync_config, since_token) + defer.returnValue(result) else: - def current_sync_callback(since_token): - return self.current_sync_for_user( - self, since_token, sync_config - ) - return self.notifier.wait_for_events( - sync_config.filter, since_token, current_sync_callback + def current_sync_callback(): + return self.current_sync_for_user(sync_config, since_token) + + rm_handler = self.hs.get_handlers().room_member_handler + room_ids = yield rm_handler.get_rooms_for_user(sync_config.user) + result = yield self.notifier.wait_for_events( + sync_config.user, room_ids, + sync_config.filter, timeout, current_sync_callback ) + defer.returnValue(result) def current_sync_for_user(self, sync_config, since_token=None): """Get the sync for client needed to match what the server has now. diff --git a/synapse/notifier.py b/synapse/notifier.py index 3aec1d4af2..922bf064d0 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext from synapse.util.async import run_on_reactor +from synapse.types import StreamToken import logging @@ -205,6 +206,53 @@ class Notifier(object): [notify(l).addErrback(eb) for l in listeners] ) + @defer.inlineCallbacks + def wait_for_events(self, user, rooms, filter, timeout, callback): + """Wait until the callback returns a non empty response or the + timeout fires. + """ + + deferred = defer.Deferred() + + from_token=StreamToken("s0","0","0") + + listener = [_NotificationListener( + user=user, + rooms=rooms, + from_token=from_token, + limit=1, + timeout=timeout, + deferred=deferred, + )] + + if timeout: + self._register_with_keys(listener[0]) + + result = yield callback() + if timeout: + timed_out = [False] + def _timeout_listener(): + timed_out[0] = True + listener[0].notify(self, [], from_token, from_token) + + self.clock.call_later(timeout/1000., _timeout_listener) + while not result and not timed_out[0]: + yield deferred + deferred = defer.Deferred() + listener[0] = _NotificationListener( + user=user, + rooms=rooms, + from_token=from_token, + limit=1, + timeout=timeout, + deferred=deferred, + ) + self._register_with_keys(listener[0]) + result = yield callback() + + defer.returnValue(result) + + def get_events_for(self, user, rooms, pagination_config, timeout): """ For the given user and rooms, return any new events for them. If there are no new events wait for up to `timeout` milliseconds for any -- cgit 1.4.1 From e020574d65a994858ac53c45070ae5016090d2f3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 27 Jan 2015 20:19:36 +0000 Subject: Fix Formatting --- synapse/handlers/sync.py | 13 +++++-------- synapse/notifier.py | 4 ++-- synapse/rest/client/v2_alpha/sync.py | 5 ++--- synapse/storage/stream.py | 1 - 4 files changed, 9 insertions(+), 14 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9f5f73eab6..82a2c6986a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -52,10 +52,10 @@ class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ class SyncResult(collections.namedtuple("SyncResult", [ - "next_batch", # Token for the next sync - "private_user_data", # List of private events for the user. - "public_user_data", # List of public events for all users. - "rooms", # RoomSyncResult for each room. + "next_batch", # Token for the next sync + "private_user_data", # List of private events for the user. + "public_user_data", # List of public events for all users. + "rooms", # RoomSyncResult for each room. ])): __slots__ = [] @@ -181,7 +181,6 @@ class SyncHandler(BaseHandler): limited=True, )) - @defer.inlineCallbacks def incremental_sync_with_gap(self, sync_config, since_token): """ Get the incremental delta needed to bring the client up to @@ -231,7 +230,6 @@ class SyncHandler(BaseHandler): next_batch=now_token, )) - @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, @@ -316,7 +314,6 @@ class SyncHandler(BaseHandler): state = () defer.returnValue(state) - def compute_state_delta(self, since_token, previous_state, current_state): """ Works out the differnce in state between the current state and the state the client got when it last performed a sync. @@ -327,7 +324,7 @@ class SyncHandler(BaseHandler): # after the previous sync, since we need to include those state # updates even if they occured logically before the previous event. # TODO(mjark) Check for new redactions in the state events. - previous_dict = {event.event_id:event for event in previous_state} + previous_dict = {event.event_id: event for event in previous_state} state_delta = [] for event in current_state: if event.event_id not in previous_dict: diff --git a/synapse/notifier.py b/synapse/notifier.py index 922bf064d0..e3b6ead620 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -214,7 +214,7 @@ class Notifier(object): deferred = defer.Deferred() - from_token=StreamToken("s0","0","0") + from_token = StreamToken("s0", "0", "0") listener = [_NotificationListener( user=user, @@ -231,6 +231,7 @@ class Notifier(object): result = yield callback() if timeout: timed_out = [False] + def _timeout_listener(): timed_out[0] = True listener[0].notify(self, [], from_token, from_token) @@ -252,7 +253,6 @@ class Notifier(object): defer.returnValue(result) - def get_events_for(self, user, rooms, pagination_config, timeout): """ For the given user and rooms, return any new events for them. If there are no new events wait for up to `timeout` milliseconds for any diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index cc667ebafc..0c17208cd3 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -68,7 +68,6 @@ class SyncRestServlet(RestServlet): } """ - PATTERN = client_v2_pattern("/sync$") ALLOWED_SORT = set(["timeline,asc", "timeline,desc"]) ALLOWED_PRESENCE = set(["online", "offline", "idle"]) @@ -114,12 +113,12 @@ class SyncRestServlet(RestServlet): sync_config = SyncConfig( user=user, - device="TODO", # TODO(mjark) Get the device_id from access_token + device="TODO", # TODO(mjark) Get the device_id from access_token gap=gap, limit=limit, sort=sort, backfill=backfill, - filter="TODO", # TODO(mjark) Add the filter to the config. + filter="TODO", # TODO(mjark) Add the filter to the config. ) if since is not None: diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 06aca1a4e5..db1816ea84 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -289,7 +289,6 @@ class StreamStore(SQLBaseStore): " LIMIT ?" ) - def get_recent_events_for_room_txn(txn): if from_token is None: txn.execute(sql, (room_id, end_token.stream, limit,)) -- cgit 1.4.1 From e32ded7b3e290067f95497ceee0ac04842ba9cdf Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 28 Jan 2015 10:09:54 +0000 Subject: Add matrix.org as a trusted ID server because it's now passed through on ports 80/443 and the web client defaults to that now. Fixes email validation (including signing up with an email address). --- synapse/handlers/register.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 732652c228..66a89c10b2 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -163,7 +163,7 @@ class RegistrationHandler(BaseHandler): # each request httpCli = SimpleHttpClient(self.hs) # XXX: make this configurable! - trustedIdServers = ['matrix.org:8090'] + trustedIdServers = ['matrix.org:8090', 'matrix.org'] if not creds['idServer'] in trustedIdServers: logger.warn('%s is not a trusted ID server: rejecting 3pid ' + 'credentials', creds['idServer']) -- cgit 1.4.1 From 273b12729b99addf4474c9092f44ff300fd8153b Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 28 Jan 2015 11:55:49 +0000 Subject: Reset badge count to zero when last active time is bumped --- synapse/handlers/presence.py | 5 +++++ synapse/push/__init__.py | 19 +++++++++++++++++++ synapse/push/httppusher.py | 38 +++++++++++++++++++++++++++++++++++--- synapse/push/pusherpool.py | 17 +++++++++++++++++ 4 files changed, 76 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 8aeed99274..24d901b51f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -86,6 +86,10 @@ class PresenceHandler(BaseHandler): "changed_presencelike_data", self.changed_presencelike_data ) + # outbound signal from the presence module to advertise when a user's + # presence has changed + distributor.declare("user_presence_changed") + self.distributor = distributor self.federation = hs.get_replication_layer() @@ -603,6 +607,7 @@ class PresenceHandler(BaseHandler): room_ids=room_ids, statuscache=statuscache, ) + yield self.distributor.fire("user_presence_changed", user, statuscache) @defer.inlineCallbacks def _push_presence_remote(self, user, destination, state=None): diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index b6d01a82a0..4862d0de27 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -54,6 +54,9 @@ class Pusher(object): self.failing_since = failing_since self.alive = True + # The last value of last_active_time that we saw + self.last_last_active_time = 0 + @defer.inlineCallbacks def _actions_for_event(self, ev): """ @@ -273,6 +276,22 @@ class Pusher(object): """ pass + def reset_badge_count(self): + pass + + def presence_changed(self, state): + """ + We clear badge counts whenever a user's last_active time is bumped + This is by no means perfect but I think it's the best we can do + without read receipts. + """ + if 'last_active' in state.state: + last_active = state.state['last_active'] + if last_active > self.last_last_active_time: + logger.info("Resetting badge count for %s", self.user_name) + self.reset_badge_count() + self.last_last_active_time = last_active + def _value_for_dotted_key(dotted_key, event): parts = dotted_key.split(".") diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 22532fcc6a..d592bc2fd2 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -71,11 +71,12 @@ class HttpPusher(Pusher): # we may have to fetch this over federation and we # can't trust it anyway: is it worth it? #'from_display_name': 'Steve Stevington' - #'counts': { -- we don't mark messages as read yet so + 'counts': { #-- we don't mark messages as read yet so # we have no way of knowing - # 'unread': 1, + # Just set the badge to 1 until we have read receipts + 'unread': 1, # 'missed_calls': 2 - # }, + }, 'devices': [ { 'app_id': self.app_id, @@ -111,3 +112,34 @@ class HttpPusher(Pusher): if 'rejected' in resp: rejected = resp['rejected'] defer.returnValue(rejected) + + @defer.inlineCallbacks + def reset_badge_count(self): + d = { + 'notification': { + 'id': '', + 'type': None, + 'from': '', + 'counts': { + 'unread': 0, + 'missed_calls': 0 + }, + 'devices': [ + { + 'app_id': self.app_id, + 'pushkey': self.pushkey, + 'pushkey_ts': long(self.pushkey_ts / 1000), + 'data': self.data_minus_url, + } + ] + } + } + try: + resp = yield self.httpCli.post_json_get_json(self.url, d) + except: + logger.exception("Failed to push %s ", self.url) + defer.returnValue(False) + rejected = [] + if 'rejected' in resp: + rejected = resp['rejected'] + defer.returnValue(rejected) \ No newline at end of file diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 2dfecf178b..65ab4f46e1 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -18,6 +18,7 @@ from twisted.internet import defer from httppusher import HttpPusher from synapse.push import PusherConfigException +from synapse.api.constants import PresenceState import logging import json @@ -32,6 +33,22 @@ class PusherPool: self.pushers = {} self.last_pusher_started = -1 + distributor = self.hs.get_distributor() + distributor.observe( + "user_presence_changed", self.user_presence_changed + ) + + @defer.inlineCallbacks + def user_presence_changed(self, user, state): + user_name = user.to_string() + + # until we have read receipts, pushers use this to reset a user's + # badge counters to zero + for p in self.pushers.values(): + if p.user_name == user_name: + yield p.presence_changed(state) + + @defer.inlineCallbacks def start(self): pushers = yield self.store.get_all_pushers() -- cgit 1.4.1 From 0ef5bfd6a9eaaae14e199997658b3d0006abd854 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 28 Jan 2015 16:16:53 +0000 Subject: Start implementing auth conflict res --- synapse/api/auth.py | 38 +++--- synapse/api/constants.py | 6 + synapse/federation/federation_client.py | 39 ++++++ synapse/handlers/federation.py | 211 ++++++++++++++++++++++++++------ synapse/storage/rejections.py | 10 ++ synapse/storage/schema/im.sql | 1 + 6 files changed, 253 insertions(+), 52 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index a342a0e0da..461faa8c78 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -353,9 +353,23 @@ class Auth(object): def add_auth_events(self, builder, context): yield run_on_reactor() - if builder.type == EventTypes.Create: - builder.auth_events = [] - return + auth_ids = self.compute_auth_events(builder, context) + + auth_events_entries = yield self.store.add_event_hashes( + auth_ids + ) + + builder.auth_events = auth_events_entries + + context.auth_events = { + k: v + for k, v in context.current_state.items() + if v.event_id in auth_ids + } + + def compute_auth_events(self, event, context): + if event.type == EventTypes.Create: + return [] auth_ids = [] @@ -368,7 +382,7 @@ class Auth(object): key = (EventTypes.JoinRules, "", ) join_rule_event = context.current_state.get(key) - key = (EventTypes.Member, builder.user_id, ) + key = (EventTypes.Member, event.user_id, ) member_event = context.current_state.get(key) key = (EventTypes.Create, "", ) @@ -382,8 +396,8 @@ class Auth(object): else: is_public = False - if builder.type == EventTypes.Member: - e_type = builder.content["membership"] + if event.type == EventTypes.Member: + e_type = event.content["membership"] if e_type in [Membership.JOIN, Membership.INVITE]: if join_rule_event: auth_ids.append(join_rule_event.event_id) @@ -398,17 +412,7 @@ class Auth(object): if member_event.content["membership"] == Membership.JOIN: auth_ids.append(member_event.event_id) - auth_events_entries = yield self.store.add_event_hashes( - auth_ids - ) - - builder.auth_events = auth_events_entries - - context.auth_events = { - k: v - for k, v in context.current_state.items() - if v.event_id in auth_ids - } + return auth_ids @log_function def _can_send_event(self, event, auth_events): diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 7ee6dcc46e..0d3fc629af 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -74,3 +74,9 @@ class EventTypes(object): Message = "m.room.message" Topic = "m.room.topic" Name = "m.room.name" + + +class RejectedReason(object): + AUTH_ERROR = "auth_error" + REPLACED = "replaced" + NOT_ANCESTOR = "not_ancestor" diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 91b44cd8b3..ebcd593506 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -331,6 +331,45 @@ class FederationClient(object): defer.returnValue(pdu) + @defer.inlineCallbacks + def query_auth(self, destination, room_id, event_id, local_auth): + """ + Params: + destination (str) + event_it (str) + local_auth (list) + """ + time_now = self._clock.time_msec() + + send_content = { + "auth_chain": [e.get_pdu_json(time_now) for e in local_auth], + } + + code, content = yield self.transport_layer.send_invite( + destination=destination, + room_id=room_id, + event_id=event_id, + content=send_content, + ) + + auth_chain = [ + (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + for e in content["auth_chain"] + ] + + missing = [ + (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + for e in content.get("missing", []) + ] + + ret = { + "auth_chain": auth_chain, + "rejects": content.get("rejects", []), + "missing": missing, + } + + defer.returnValue(ret) + def event_from_pdu_json(self, pdu_json, outlier=False): event = FrozenEvent( pdu_json diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bcdcc90a18..97e3c503b9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -17,19 +17,16 @@ from ._base import BaseHandler -from synapse.events.utils import prune_event from synapse.api.errors import ( - AuthError, FederationError, SynapseError, StoreError, + AuthError, FederationError, StoreError, ) -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.crypto.event_signing import ( - compute_event_signature, check_event_content_hash, - add_hashes_and_signatures, + compute_event_signature, add_hashes_and_signatures, ) from synapse.types import UserID -from syutil.jsonutil import encode_canonical_json from twisted.internet import defer @@ -113,33 +110,6 @@ class FederationHandler(BaseHandler): logger.debug("Processing event: %s", event.event_id) - redacted_event = prune_event(event) - - redacted_pdu_json = redacted_event.get_pdu_json() - try: - yield self.keyring.verify_json_for_server( - event.origin, redacted_pdu_json - ) - except SynapseError as e: - logger.warn( - "Signature check failed for %s redacted to %s", - encode_canonical_json(pdu.get_pdu_json()), - encode_canonical_json(redacted_pdu_json), - ) - raise FederationError( - "ERROR", - e.code, - e.msg, - affected=event.event_id, - ) - - if not check_event_content_hash(event): - logger.warn( - "Event content has been tampered, redacting %s, %s", - event.event_id, encode_canonical_json(event.get_dict()) - ) - event = redacted_event - logger.debug("Event: %s", event) # FIXME (erikj): Awful hack to make the case where we are not currently @@ -180,7 +150,6 @@ class FederationHandler(BaseHandler): if state: for e in state: - logging.info("A :) %r", e) e.internal_metadata.outlier = True try: yield self._handle_new_event(e) @@ -747,7 +716,20 @@ class FederationHandler(BaseHandler): event.event_id, event.signatures, ) - self.auth.check(event, auth_events=context.auth_events) + try: + self.auth.check(event, auth_events=context.auth_events) + except AuthError: + # TODO: Store rejection. + context.rejected = RejectedReason.AUTH_ERROR + + yield self.store.persist_event( + event, + context=context, + backfilled=backfilled, + is_new_state=False, + current_state=current_state, + ) + raise logger.debug( "_handle_new_event: Before persist_event: %s, sigs: %s", @@ -768,3 +750,162 @@ class FederationHandler(BaseHandler): ) defer.returnValue(context) + + @defer.inlineCallbacks + def do_auth(self, origin, event, context): + for e_id, _ in event.auth_events: + pass + + auth_events = set(e_id for e_id, _ in event.auth_events) + current_state = set(e.event_id for e in context.auth_events.values()) + + missing_auth = auth_events - current_state + + if missing_auth: + # Do auth conflict res. + + # 1. Get what we think is the auth chain. + auth_ids = self.auth.compute_auth_events(event, context) + local_auth_chain = yield self.store.get_auth_chain(auth_ids) + + # 2. Get remote difference. + result = yield self.replication_layer.query_auth( + origin, + event.room_id, + event.event_id, + local_auth_chain, + ) + + # 3. Process any remote auth chain events we haven't seen. + for e in result.get("missing", []): + # TODO. + pass + + # 4. Look at rejects and their proofs. + # TODO. + + try: + self.auth.check(event, auth_events=context.auth_events) + except AuthError: + raise + + @defer.inlineCallbacks + def construct_auth_difference(self, local_auth, remote_auth): + """ Given a local and remote auth chain, find the differences. This + assumes that we have already processed all events in remote_auth + + Params: + local_auth (list) + remote_auth (list) + + Returns: + dict + """ + + # TODO: Make sure we are OK with local_auth or remote_auth having more + # auth events in them than strictly necessary. + + def sort_fun(ev): + return ev.depth, ev.event_id + + # We find the differences by starting at the "bottom" of each list + # and iterating up on both lists. The lists are ordered by depth and + # then event_id, we iterate up both lists until we find the event ids + # don't match. Then we look at depth/event_id to see which side is + # missing that event, and iterate only up that list. Repeat. + + remote_list = list(remote_auth) + remote_list.sort(key=sort_fun) + + local_list = list(local_auth) + local_list.sort(key=sort_fun) + + local_iter = iter(local_list) + remote_iter = iter(remote_list) + + current_local = local_iter.next() + current_remote = remote_iter.next() + + def get_next(it, opt=None): + return it.next() if it.has_next() else opt + + missing_remotes = [] + missing_locals = [] + while current_local and current_remote: + if current_remote is None: + missing_locals.append(current_local) + current_local = get_next(local_iter) + continue + + if current_local is None: + missing_remotes.append(current_remote) + current_remote = get_next(remote_iter) + continue + + if current_local.event_id == current_remote.event_id: + current_local = get_next(local_iter) + current_remote = get_next(remote_iter) + continue + + if current_local.depth < current_remote.depth: + missing_locals.append(current_local) + current_local = get_next(local_iter) + continue + + if current_local.depth > current_remote.depth: + missing_remotes.append(current_remote) + current_remote = get_next(remote_iter) + continue + + # They have the same depth, so we fall back to the event_id order + if current_local.event_id < current_remote.event_id: + missing_locals.append(current_local) + current_local = get_next(local_iter) + + if current_local.event_id > current_remote.event_id: + missing_remotes.append(current_remote) + current_remote = get_next(remote_iter) + continue + + # missing locals should be sent to the server + # We should find why we are missing remotes, as they will have been + # rejected. + + # Remove events from missing_remotes if they are referencing a missing + # remote. We only care about the "root" rejected ones. + missing_remote_ids = [e.event_id for e in missing_remotes] + base_remote_rejected = list(missing_remotes) + for e in missing_remotes: + for e_id, _ in e.auth_events: + if e_id in missing_remote_ids: + base_remote_rejected.remove(e) + + reason_map = {} + + for e in base_remote_rejected: + reason = yield self.store.get_rejection_reason(e.event_id) + if reason is None: + # FIXME: ERRR?! + raise RuntimeError("") + + reason_map[e.event_id] = reason + + if reason == RejectedReason.AUTH_ERROR: + pass + elif reason == RejectedReason.REPLACED: + # TODO: Get proof + pass + elif reason == RejectedReason.NOT_ANCESTOR: + # TODO: Get proof. + pass + + defer.returnValue({ + "rejects": { + e.event_id: { + "reason": reason_map[e.event_id], + "proof": None, + } + for e in base_remote_rejected + }, + "missing": missing_locals, + }) diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py index 7d38b31f44..b7249700d7 100644 --- a/synapse/storage/rejections.py +++ b/synapse/storage/rejections.py @@ -31,3 +31,13 @@ class RejectionsStore(SQLBaseStore): "last_failure": self._clock.time_msec(), } ) + + def get_rejection_reason(self, event_id): + self._simple_select_one_onecol( + table="rejections", + retcol="reason", + keyvalues={ + "event_id": event_id, + }, + allow_none=True, + ) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index bc7c6b6ed5..5866a387f6 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -128,5 +128,6 @@ CREATE TABLE IF NOT EXISTS rejections( event_id TEXT NOT NULL, reason TEXT NOT NULL, last_check TEXT NOT NULL, + root_rejected TEXT, CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE ); -- cgit 1.4.1 From 388581e087a3658c1b70d2aa1d17a132953350ca Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 28 Jan 2015 16:58:23 +0000 Subject: Extract the id token of the token when authing users, include the token and device_id in the internal meta data for the event along with the transaction id when sending events --- synapse/api/auth.py | 8 ++-- synapse/handlers/message.py | 12 +++++- synapse/rest/client/v1/admin.py | 2 +- synapse/rest/client/v1/directory.py | 4 +- synapse/rest/client/v1/events.py | 4 +- synapse/rest/client/v1/initial_sync.py | 2 +- synapse/rest/client/v1/presence.py | 8 ++-- synapse/rest/client/v1/profile.py | 4 +- synapse/rest/client/v1/room.py | 64 +++++++++++++++++------------ synapse/rest/client/v1/voip.py | 2 +- synapse/rest/media/v0/content_repository.py | 2 +- synapse/rest/media/v1/upload_resource.py | 2 +- synapse/storage/registration.py | 3 +- synapse/types.py | 3 ++ tests/rest/client/v1/test_presence.py | 2 + tests/rest/client/v1/test_rooms.py | 7 ++++ tests/rest/client/v1/test_typing.py | 1 + tests/storage/test_registration.py | 10 ++++- 18 files changed, 92 insertions(+), 48 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 292e9e2a80..3959e06a8b 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -21,7 +21,7 @@ from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import AuthError, StoreError, Codes, SynapseError from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor -from synapse.types import UserID +from synapse.types import UserID, ClientID import logging @@ -292,7 +292,7 @@ class Auth(object): Returns: Tuple of UserID and device string: User ID object of the user making the request - Device ID string of the device the user is using + Client ID object of the client instance the user is using Raises: AuthError if no user by that token exists or the token is invalid. """ @@ -302,6 +302,7 @@ class Auth(object): user_info = yield self.get_user_by_token(access_token) user = user_info["user"] device_id = user_info["device_id"] + token_id = user_info["token_id"] ip_addr = self.hs.get_ip_from_request(request) user_agent = request.requestHeaders.getRawHeaders( @@ -317,7 +318,7 @@ class Auth(object): user_agent=user_agent ) - defer.returnValue((user, device_id)) + defer.returnValue((user, ClientID(device_id, token_id))) except KeyError: raise AuthError(403, "Missing access token.") @@ -342,6 +343,7 @@ class Auth(object): "admin": bool(ret.get("admin", False)), "device_id": ret.get("device_id"), "user": UserID.from_string(ret.get("name")), + "token_id": ret.get("token_id", None), } defer.returnValue(user_info) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 9c3271fe88..6fbd2af4ab 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -114,7 +114,8 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) @defer.inlineCallbacks - def create_and_send_event(self, event_dict, ratelimit=True): + def create_and_send_event(self, event_dict, ratelimit=True, + client=None, txn_id=None): """ Given a dict from a client, create and handle a new event. Creates an FrozenEvent object, filling out auth_events, prev_events, @@ -148,6 +149,15 @@ class MessageHandler(BaseHandler): builder.content ) + if client is not None: + if client.token_id is not None: + builder.internal_metadata.token_id = client.token_id + if client.device_id is not None: + builder.internal_metadata.device_id = client.device_id + + if txn_id is not None: + builder.internal_metadata.txn_id = txn_id + event, context = yield self._create_new_client_event( builder=builder, ) diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 6cfce1a479..2ce754b028 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -31,7 +31,7 @@ class WhoisRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, user_id): target_user = UserID.from_string(user_id) - auth_user, device_id = yield self.auth.get_user_by_req(request) + auth_user, client = yield self.auth.get_user_by_req(request) is_admin = yield self.auth.is_server_admin(auth_user) if not is_admin and target_user != auth_user: diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index ef853af411..8f65efec5f 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -45,7 +45,7 @@ class ClientDirectoryServer(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, room_alias): - user, device_id = yield self.auth.get_user_by_req(request) + user, client = yield self.auth.get_user_by_req(request) content = _parse_json(request) if not "room_id" in content: @@ -85,7 +85,7 @@ class ClientDirectoryServer(ClientV1RestServlet): @defer.inlineCallbacks def on_DELETE(self, request, room_alias): - user, device_id = yield self.auth.get_user_by_req(request) + user, client = yield self.auth.get_user_by_req(request) is_admin = yield self.auth.is_server_admin(user) if not is_admin: diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py index e58ee46fcd..77b7c25a03 100644 --- a/synapse/rest/client/v1/events.py +++ b/synapse/rest/client/v1/events.py @@ -34,7 +34,7 @@ class EventStreamRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request): - auth_user, device_id = yield self.auth.get_user_by_req(request) + auth_user, client = yield self.auth.get_user_by_req(request) try: handler = self.handlers.event_stream_handler pagin_config = PaginationConfig.from_request(request) @@ -71,7 +71,7 @@ class EventRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, event_id): - auth_user, device_id = yield self.auth.get_user_by_req(request) + auth_user, client = yield self.auth.get_user_by_req(request) handler = self.handlers.event_handler event = yield handler.get_event(auth_user, event_id) diff --git a/synapse/rest/client/v1/initial_sync.py b/synapse/rest/client/v1/initial_sync.py index 78d30abbf8..4a259bba64 100644 --- a/synapse/rest/client/v1/initial_sync.py +++ b/synapse/rest/client/v1/initial_sync.py @@ -25,7 +25,7 @@ class InitialSyncRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request): - user, device_id = yield self.auth.get_user_by_req(request) + user, client = yield self.auth.get_user_by_req(request) with_feedback = "feedback" in request.args as_client_event = "raw" not in request.args pagination_config = PaginationConfig.from_request(request) diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index 74669274a7..7feb4aadb1 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -32,7 +32,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, user_id): - auth_user, device_id = yield self.auth.get_user_by_req(request) + auth_user, client = yield self.auth.get_user_by_req(request) user = UserID.from_string(user_id) state = yield self.handlers.presence_handler.get_state( @@ -42,7 +42,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, user_id): - auth_user, device_id = yield self.auth.get_user_by_req(request) + auth_user, client = yield self.auth.get_user_by_req(request) user = UserID.from_string(user_id) state = {} @@ -77,7 +77,7 @@ class PresenceListRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, user_id): - auth_user, device_id = yield self.auth.get_user_by_req(request) + auth_user, client = yield self.auth.get_user_by_req(request) user = UserID.from_string(user_id) if not self.hs.is_mine(user): @@ -97,7 +97,7 @@ class PresenceListRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request, user_id): - auth_user, device_id = yield self.auth.get_user_by_req(request) + auth_user, client = yield self.auth.get_user_by_req(request) user = UserID.from_string(user_id) if not self.hs.is_mine(user): diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py index f04abb2c26..15d6f3fc6c 100644 --- a/synapse/rest/client/v1/profile.py +++ b/synapse/rest/client/v1/profile.py @@ -37,7 +37,7 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, user_id): - auth_user, device_id = yield self.auth.get_user_by_req(request) + auth_user, client = yield self.auth.get_user_by_req(request) user = UserID.from_string(user_id) try: @@ -70,7 +70,7 @@ class ProfileAvatarURLRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, user_id): - auth_user, device_id = yield self.auth.get_user_by_req(request) + auth_user, client = yield self.auth.get_user_by_req(request) user = UserID.from_string(user_id) try: diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index c8c34b4801..410f19ccf6 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -62,7 +62,7 @@ class RoomCreateRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request): - auth_user, device_id = yield self.auth.get_user_by_req(request) + auth_user, client = yield self.auth.get_user_by_req(request) room_config = self.get_room_config(request) info = yield self.make_room(room_config, auth_user, None) @@ -125,7 +125,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, room_id, event_type, state_key): - user, device_id = yield self.auth.get_user_by_req(request) + user, client = yield self.auth.get_user_by_req(request) msg_handler = self.handlers.message_handler data = yield msg_handler.get_room_data( @@ -142,8 +142,8 @@ class RoomStateEventRestServlet(ClientV1RestServlet): defer.returnValue((200, data.get_dict()["content"])) @defer.inlineCallbacks - def on_PUT(self, request, room_id, event_type, state_key): - user, device_id = yield self.auth.get_user_by_req(request) + def on_PUT(self, request, room_id, event_type, state_key, txn_id=None): + user, client = yield self.auth.get_user_by_req(request) content = _parse_json(request) @@ -158,7 +158,9 @@ class RoomStateEventRestServlet(ClientV1RestServlet): event_dict["state_key"] = state_key msg_handler = self.handlers.message_handler - yield msg_handler.create_and_send_event(event_dict) + yield msg_handler.create_and_send_event( + event_dict, client=client, txn_id=txn_id, + ) defer.returnValue((200, {})) @@ -172,8 +174,8 @@ class RoomSendEventRestServlet(ClientV1RestServlet): register_txn_path(self, PATTERN, http_server, with_get=True) @defer.inlineCallbacks - def on_POST(self, request, room_id, event_type): - user, device_id = yield self.auth.get_user_by_req(request) + def on_POST(self, request, room_id, event_type, txn_id=None): + user, client = yield self.auth.get_user_by_req(request) content = _parse_json(request) msg_handler = self.handlers.message_handler @@ -183,7 +185,9 @@ class RoomSendEventRestServlet(ClientV1RestServlet): "content": content, "room_id": room_id, "sender": user.to_string(), - } + }, + client=client, + txn_id=txn_id, ) defer.returnValue((200, {"event_id": event.event_id})) @@ -200,7 +204,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet): except KeyError: pass - response = yield self.on_POST(request, room_id, event_type) + response = yield self.on_POST(request, room_id, event_type, txn_id) self.txns.store_client_transaction(request, txn_id, response) defer.returnValue(response) @@ -215,8 +219,8 @@ class JoinRoomAliasServlet(ClientV1RestServlet): register_txn_path(self, PATTERN, http_server) @defer.inlineCallbacks - def on_POST(self, request, room_identifier): - user, device_id = yield self.auth.get_user_by_req(request) + def on_POST(self, request, room_identifier, txn_id=None): + user, client = yield self.auth.get_user_by_req(request) # the identifier could be a room alias or a room id. Try one then the # other if it fails to parse, without swallowing other valid @@ -245,7 +249,9 @@ class JoinRoomAliasServlet(ClientV1RestServlet): "room_id": identifier.to_string(), "sender": user.to_string(), "state_key": user.to_string(), - } + }, + client=client, + txn_id=txn_id, ) defer.returnValue((200, {"room_id": identifier.to_string()})) @@ -259,7 +265,7 @@ class JoinRoomAliasServlet(ClientV1RestServlet): except KeyError: pass - response = yield self.on_POST(request, room_identifier) + response = yield self.on_POST(request, room_identifier, txn_id) self.txns.store_client_transaction(request, txn_id, response) defer.returnValue(response) @@ -283,7 +289,7 @@ class RoomMemberListRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, room_id): # TODO support Pagination stream API (limit/tokens) - user, device_id = yield self.auth.get_user_by_req(request) + user, client = yield self.auth.get_user_by_req(request) handler = self.handlers.room_member_handler members = yield handler.get_room_members_as_pagination_chunk( room_id=room_id, @@ -311,7 +317,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, room_id): - user, device_id = yield self.auth.get_user_by_req(request) + user, client = yield self.auth.get_user_by_req(request) pagination_config = PaginationConfig.from_request( request, default_limit=10, ) @@ -335,7 +341,7 @@ class RoomStateRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, room_id): - user, device_id = yield self.auth.get_user_by_req(request) + user, client = yield self.auth.get_user_by_req(request) handler = self.handlers.message_handler # Get all the current state for this room events = yield handler.get_state_events( @@ -351,7 +357,7 @@ class RoomInitialSyncRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, room_id): - user, device_id = yield self.auth.get_user_by_req(request) + user, client = yield self.auth.get_user_by_req(request) pagination_config = PaginationConfig.from_request(request) content = yield self.handlers.message_handler.room_initial_sync( room_id=room_id, @@ -395,8 +401,8 @@ class RoomMembershipRestServlet(ClientV1RestServlet): register_txn_path(self, PATTERN, http_server) @defer.inlineCallbacks - def on_POST(self, request, room_id, membership_action): - user, device_id = yield self.auth.get_user_by_req(request) + def on_POST(self, request, room_id, membership_action, txn_id=None): + user, client = yield self.auth.get_user_by_req(request) content = _parse_json(request) @@ -418,7 +424,9 @@ class RoomMembershipRestServlet(ClientV1RestServlet): "room_id": room_id, "sender": user.to_string(), "state_key": state_key, - } + }, + client=client, + txn_id=txn_id, ) defer.returnValue((200, {})) @@ -432,7 +440,9 @@ class RoomMembershipRestServlet(ClientV1RestServlet): except KeyError: pass - response = yield self.on_POST(request, room_id, membership_action) + response = yield self.on_POST( + request, room_id, membership_action, txn_id + ) self.txns.store_client_transaction(request, txn_id, response) defer.returnValue(response) @@ -444,8 +454,8 @@ class RoomRedactEventRestServlet(ClientV1RestServlet): register_txn_path(self, PATTERN, http_server) @defer.inlineCallbacks - def on_POST(self, request, room_id, event_id): - user, device_id = yield self.auth.get_user_by_req(request) + def on_POST(self, request, room_id, event_id, txn_id=None): + user, client = yield self.auth.get_user_by_req(request) content = _parse_json(request) msg_handler = self.handlers.message_handler @@ -456,7 +466,9 @@ class RoomRedactEventRestServlet(ClientV1RestServlet): "room_id": room_id, "sender": user.to_string(), "redacts": event_id, - } + }, + client=client, + txn_id=txn_id, ) defer.returnValue((200, {"event_id": event.event_id})) @@ -470,7 +482,7 @@ class RoomRedactEventRestServlet(ClientV1RestServlet): except KeyError: pass - response = yield self.on_POST(request, room_id, event_id) + response = yield self.on_POST(request, room_id, event_id, txn_id) self.txns.store_client_transaction(request, txn_id, response) defer.returnValue(response) @@ -483,7 +495,7 @@ class RoomTypingRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, room_id, user_id): - auth_user, device_id = yield self.auth.get_user_by_req(request) + auth_user, client = yield self.auth.get_user_by_req(request) room_id = urllib.unquote(room_id) target_user = UserID.from_string(urllib.unquote(user_id)) diff --git a/synapse/rest/client/v1/voip.py b/synapse/rest/client/v1/voip.py index 42d8e30bab..11d08fbced 100644 --- a/synapse/rest/client/v1/voip.py +++ b/synapse/rest/client/v1/voip.py @@ -28,7 +28,7 @@ class VoipRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request): - auth_user, device_id = yield self.auth.get_user_by_req(request) + auth_user, client = yield self.auth.get_user_by_req(request) turnUris = self.hs.config.turn_uris turnSecret = self.hs.config.turn_shared_secret diff --git a/synapse/rest/media/v0/content_repository.py b/synapse/rest/media/v0/content_repository.py index 311ab89edb..22e26e3cd5 100644 --- a/synapse/rest/media/v0/content_repository.py +++ b/synapse/rest/media/v0/content_repository.py @@ -66,7 +66,7 @@ class ContentRepoResource(resource.Resource): @defer.inlineCallbacks def map_request_to_name(self, request): # auth the user - auth_user, device_id = yield self.auth.get_user_by_req(request) + auth_user, client = yield self.auth.get_user_by_req(request) # namespace all file uploads on the user prefix = base64.urlsafe_b64encode( diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py index 6bed8a8efa..b939a30e19 100644 --- a/synapse/rest/media/v1/upload_resource.py +++ b/synapse/rest/media/v1/upload_resource.py @@ -42,7 +42,7 @@ class UploadResource(BaseMediaResource): @defer.inlineCallbacks def _async_render_POST(self, request): try: - auth_user, device_id = yield self.auth.get_user_by_req(request) + auth_user, client = yield self.auth.get_user_by_req(request) # TODO: The checks here are a bit late. The content will have # already been uploaded to a tmp file at this point content_length = request.getHeader("Content-Length") diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 75dffa4db2..029b07cc66 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -122,7 +122,8 @@ class RegistrationStore(SQLBaseStore): def _query_for_auth(self, txn, token): sql = ( - "SELECT users.name, users.admin, access_tokens.device_id" + "SELECT users.name, users.admin," + " access_tokens.device_id, access_tokens.id as token_id" " FROM users" " INNER JOIN access_tokens on users.id = access_tokens.user_id" " WHERE token = ?" diff --git a/synapse/types.py b/synapse/types.py index faac729ff2..46dbab5374 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -119,3 +119,6 @@ class StreamToken( d = self._asdict() d[key] = new_value return StreamToken(**d) + + +ClientID = namedtuple("ClientID", ("device_id", "token_id")) diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index a4f2abf213..f849120a3e 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -75,6 +75,7 @@ class PresenceStateTestCase(unittest.TestCase): "user": UserID.from_string(myid), "admin": False, "device_id": None, + "token_id": 1, } hs.get_auth().get_user_by_token = _get_user_by_token @@ -165,6 +166,7 @@ class PresenceListTestCase(unittest.TestCase): "user": UserID.from_string(myid), "admin": False, "device_id": None, + "token_id": 1, } hs.handlers.room_member_handler = Mock( diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 76ed550b75..81ead10e76 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -70,6 +70,7 @@ class RoomPermissionsTestCase(RestTestCase): "user": UserID.from_string(self.auth_user_id), "admin": False, "device_id": None, + "token_id": 1, } hs.get_auth().get_user_by_token = _get_user_by_token @@ -466,6 +467,7 @@ class RoomsMemberListTestCase(RestTestCase): "user": UserID.from_string(self.auth_user_id), "admin": False, "device_id": None, + "token_id": 1, } hs.get_auth().get_user_by_token = _get_user_by_token @@ -555,6 +557,7 @@ class RoomsCreateTestCase(RestTestCase): "user": UserID.from_string(self.auth_user_id), "admin": False, "device_id": None, + "token_id": 1, } hs.get_auth().get_user_by_token = _get_user_by_token @@ -657,6 +660,7 @@ class RoomTopicTestCase(RestTestCase): "user": UserID.from_string(self.auth_user_id), "admin": False, "device_id": None, + "token_id": 1, } hs.get_auth().get_user_by_token = _get_user_by_token @@ -773,6 +777,7 @@ class RoomMemberStateTestCase(RestTestCase): "user": UserID.from_string(self.auth_user_id), "admin": False, "device_id": None, + "token_id": 1, } hs.get_auth().get_user_by_token = _get_user_by_token @@ -909,6 +914,7 @@ class RoomMessagesTestCase(RestTestCase): "user": UserID.from_string(self.auth_user_id), "admin": False, "device_id": None, + "token_id": 1, } hs.get_auth().get_user_by_token = _get_user_by_token @@ -1013,6 +1019,7 @@ class RoomInitialSyncTestCase(RestTestCase): "user": UserID.from_string(self.auth_user_id), "admin": False, "device_id": None, + "token_id": 1, } hs.get_auth().get_user_by_token = _get_user_by_token diff --git a/tests/rest/client/v1/test_typing.py b/tests/rest/client/v1/test_typing.py index c89b37d004..c5d5b06da3 100644 --- a/tests/rest/client/v1/test_typing.py +++ b/tests/rest/client/v1/test_typing.py @@ -73,6 +73,7 @@ class RoomTypingTestCase(RestTestCase): "user": UserID.from_string(self.auth_user_id), "admin": False, "device_id": None, + "token_id": 1, } hs.get_auth().get_user_by_token = _get_user_by_token diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py index 84bfde7568..6f8bea2f61 100644 --- a/tests/storage/test_registration.py +++ b/tests/storage/test_registration.py @@ -53,7 +53,10 @@ class RegistrationStoreTestCase(unittest.TestCase): ) self.assertEquals( - {"admin": 0, "device_id": None, "name": self.user_id}, + {"admin": 0, + "device_id": None, + "name": self.user_id, + "token_id": 1}, (yield self.store.get_user_by_token(self.tokens[0])) ) @@ -63,7 +66,10 @@ class RegistrationStoreTestCase(unittest.TestCase): yield self.store.add_access_token_to_user(self.user_id, self.tokens[1]) self.assertEquals( - {"admin": 0, "device_id": None, "name": self.user_id}, + {"admin": 0, + "device_id": None, + "name": self.user_id, + "token_id": 2}, (yield self.store.get_user_by_token(self.tokens[1])) ) -- cgit 1.4.1 From 3dbce6f4a59fde2a67e563ce338f510feda2dd1a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Jan 2015 03:33:51 +0000 Subject: Add typing notifications to sync --- synapse/handlers/sync.py | 30 +++++++++++++++++++++--------- synapse/rest/client/v2_alpha/sync.py | 18 ++++++++---------- 2 files changed, 29 insertions(+), 19 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 82a2c6986a..b86e783e14 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -44,11 +44,12 @@ class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ "events", "state", "prev_batch", + "typing", ])): __slots__ = [] def __nonzero__(self): - return bool(self.events or self.state) + return bool(self.events or self.state or self.typing) class SyncResult(collections.namedtuple("SyncResult", [ @@ -196,15 +197,25 @@ class SyncHandler(BaseHandler): now_token = yield self.event_sources.get_current_token() - presence_stream = self.event_sources.sources["presence"] - pagination_config = PaginationConfig( - from_token=since_token, to_token=now_token + presence_source = self.event_sources.sources["presence"] + presence, presence_key = yield presence_source.get_new_events_for_user( + user=sync_config.user, + from_key=since_token.presence_key, + limit=sync_config.limit, ) - presence, _ = yield presence_stream.get_pagination_rows( + now_token = now_token.copy_and_replace("presence_key", presence_key) + + typing_source = self.event_sources.sources["typing"] + typing, typing_key = yield typing_source.get_new_events_for_user( user=sync_config.user, - pagination_config=pagination_config.get_source_config("presence"), - key=None + from_key=since_token.typing_key, + limit=sync_config.limit, ) + now_token = now_token.copy_and_replace("typing_key", typing_key) + + typing_by_room = {event["room_id"]: event for event in typing} + logger.debug("Typing %r", typing_by_room) + room_list = yield self.store.get_rooms_for_user_where_membership_is( user_id=sync_config.user.to_string(), membership_list=[Membership.INVITE, Membership.JOIN] @@ -218,7 +229,7 @@ class SyncHandler(BaseHandler): for event in room_list: room_sync = yield self.incremental_sync_with_gap_for_room( event.room_id, sync_config, since_token, now_token, - published_room_ids + published_room_ids, typing_by_room ) if room_sync: rooms.append(room_sync) @@ -233,7 +244,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, - published_room_ids): + published_room_ids, typing_by_room): """ Get the incremental delta needed to bring the client up to date for the room. Gives the client the most recent events and the changes to state. @@ -285,6 +296,7 @@ class SyncHandler(BaseHandler): prev_batch=prev_batch_token, state=state_events_delta, limited=limited, + typing=typing_by_room.get(room_id, None) ) logging.debug("Room sync: %r", room_sync) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 4d950f9956..76489e27c8 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -135,10 +135,10 @@ class SyncRestServlet(RestServlet): time_now = self.clock.time_msec() response_content = { - "public_user_data": self.encode_events( + "public_user_data": self.encode_user_data( sync_result.public_user_data, filter, time_now ), - "private_user_data": self.encode_events( + "private_user_data": self.encode_user_data( sync_result.private_user_data, filter, time_now ), "rooms": self.encode_rooms( @@ -149,13 +149,8 @@ class SyncRestServlet(RestServlet): defer.returnValue((200, response_content)) - def encode_events(self, events, filter, time_now): - return [self.encode_event(event, filter, time_now) for event in events] - - @staticmethod - def encode_event(event, filter, time_now): - # TODO(mjark): Respect formatting requirements in the filter. - return serialize_event(event, time_now) + def encode_user_data(self, events, filter, time_now): + return events def encode_rooms(self, rooms, filter, time_now, token_id): return [ @@ -183,7 +178,7 @@ class SyncRestServlet(RestServlet): event_format=format_event_for_client_v2_without_event_id, ) recent_event_ids.append(event.event_id) - return { + result = { "room_id": room.room_id, "event_map": event_map, "events": { @@ -194,6 +189,9 @@ class SyncRestServlet(RestServlet): "limited": room.limited, "published": room.published, } + if room.typing is not None: + result["typing"] = room.typing + return result def register_servlets(hs, http_server): -- cgit 1.4.1 From e3e72b8c5c004c403929406fc952141fd0f1b8ae Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Jan 2015 03:35:25 +0000 Subject: Remove typing TODO --- synapse/handlers/sync.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b86e783e14..9e1188da56 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -255,7 +255,6 @@ class SyncHandler(BaseHandler): # the previous sync and this one. # TODO(mjark): Apply the event filter in sync_config # TODO(mjark): Check for redactions we might have missed. - # TODO(mjark): Typing notifications. recents, token = yield self.store.get_recent_events_for_room( room_id, limit=sync_config.limit + 1, -- cgit 1.4.1 From e016f4043b81ffdedf71c4459772f66757386e44 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Jan 2015 14:40:28 +0000 Subject: Use get_room_events_stream to get changes to the rooms if the number of changes is small --- synapse/handlers/sync.py | 56 +++++++++++++++++++++++++++++++++++++---------- synapse/storage/stream.py | 7 ++++++ 2 files changed, 52 insertions(+), 11 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9e1188da56..e93dfe005d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -216,23 +216,57 @@ class SyncHandler(BaseHandler): typing_by_room = {event["room_id"]: event for event in typing} logger.debug("Typing %r", typing_by_room) - room_list = yield self.store.get_rooms_for_user_where_membership_is( - user_id=sync_config.user.to_string(), - membership_list=[Membership.INVITE, Membership.JOIN] - ) + rm_handler = self.hs.get_handlers().room_member_handler + room_ids = yield rm_handler.get_rooms_for_user(sync_config.user) # TODO (mjark): Does public mean "published"? published_rooms = yield self.store.get_rooms(is_public=True) published_room_ids = set(r["room_id"] for r in published_rooms) + room_events, _ = yield self.store.get_room_events_stream( + sync_config.user.to_string(), + from_key=since_token.room_key, + to_key=now_token.room_key, + room_id=None, + limit=sync_config.limit + 1, + ) + rooms = [] - for event in room_list: - room_sync = yield self.incremental_sync_with_gap_for_room( - event.room_id, sync_config, since_token, now_token, - published_room_ids, typing_by_room - ) - if room_sync: - rooms.append(room_sync) + if len(room_events) <= sync_config.limit: + # There is no gap in any of the rooms. Therefore we can just + # partition the new events by room and return them. + events_by_room_id = {} + for event in room_events: + events_by_room_id.setdefault(event.room_id, []).append(event) + + for room_id in room_ids: + recents = events_by_room_id.get(room_id, []) + state = [event for event in recents if event.is_state()] + if recents: + prev_batch = now_token.copy_and_replace( + "room_key", recents[0].internal_metadata.before + ) + else: + prev_batch = now_token + room_sync = RoomSyncResult( + room_id=room_id, + published=room_id in published_room_ids, + events=recents, + prev_batch=prev_batch, + state=state, + limited=False, + typing=typing_by_room.get(room_id, None) + ) + if room_sync is not None: + rooms.append(room_sync) + else: + for room_id in room_ids: + room_sync = yield self.incremental_sync_with_gap_for_room( + room_id, sync_config, since_token, now_token, + published_room_ids, typing_by_room + ) + if room_sync: + rooms.append(room_sync) defer.returnValue(SyncResult( public_user_data=presence, diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index db1816ea84..93ccfd8c10 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -181,6 +181,13 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) + for event, row in zip(ret, rows): + stream = row["stream_ordering"] + topo = event.depth + internal = event.internal_metadata + internal.before = str(_StreamToken(topo, stream - 1)) + internal.after = str(_StreamToken(topo, stream)) + if rows: key = "s%d" % max([r["stream_ordering"] for r in rows]) else: -- cgit 1.4.1 From 4d9dd9bdc02ca7b524c90ae527687e7a838acee4 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Jan 2015 16:23:03 +0000 Subject: Fix v2 initial sync --- synapse/handlers/sync.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index e93dfe005d..cfe2944917 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -157,7 +157,7 @@ class SyncHandler(BaseHandler): )) @defer.inlineCallbacks - def intial_sync_for_room(self, room_id, sync_config, now_token, + def initial_sync_for_room(self, room_id, sync_config, now_token, published_room_ids): """Sync a room for a client which is starting without any state Returns: @@ -180,6 +180,7 @@ class SyncHandler(BaseHandler): prev_batch=prev_batch_token, state=current_state_events, limited=True, + typing=None, )) @defer.inlineCallbacks -- cgit 1.4.1 From cc42d3f9076db1815a989bab7ef6004796b31b3c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Jan 2015 16:27:38 +0000 Subject: Fix check for empty room update --- synapse/handlers/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index cfe2944917..ec8beb4c6a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -258,7 +258,7 @@ class SyncHandler(BaseHandler): limited=False, typing=typing_by_room.get(room_id, None) ) - if room_sync is not None: + if room_sync: rooms.append(room_sync) else: for room_id in room_ids: -- cgit 1.4.1 From 722b65f46131349c5afdcc7eb48297cdd9d9cbd6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Jan 2015 16:41:21 +0000 Subject: Move typing notifs to an "emphermal" event list on the room object --- synapse/handlers/sync.py | 12 +++++++----- synapse/rest/client/v2_alpha/sync.py | 4 ++-- 2 files changed, 9 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ec8beb4c6a..3860c3c95b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -44,12 +44,12 @@ class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ "events", "state", "prev_batch", - "typing", + "ephemeral", ])): __slots__ = [] def __nonzero__(self): - return bool(self.events or self.state or self.typing) + return bool(self.events or self.state or self.ephemeral) class SyncResult(collections.namedtuple("SyncResult", [ @@ -180,7 +180,7 @@ class SyncHandler(BaseHandler): prev_batch=prev_batch_token, state=current_state_events, limited=True, - typing=None, + ephemeral=[], )) @defer.inlineCallbacks @@ -214,7 +214,9 @@ class SyncHandler(BaseHandler): ) now_token = now_token.copy_and_replace("typing_key", typing_key) - typing_by_room = {event["room_id"]: event for event in typing} + typing_by_room = {event["room_id"]: [event] for event in typing} + for event in typing: + event.pop("room_id") logger.debug("Typing %r", typing_by_room) rm_handler = self.hs.get_handlers().room_member_handler @@ -256,7 +258,7 @@ class SyncHandler(BaseHandler): prev_batch=prev_batch, state=state, limited=False, - typing=typing_by_room.get(room_id, None) + ephemeral=typing_by_room.get(room_id, []) ) if room_sync: rooms.append(room_sync) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 76489e27c8..2ae2eec55d 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -66,6 +66,7 @@ class SyncRestServlet(RestServlet): } "state": [] // list of EventIDs updating the current state to // be what it should be at the end of the batch. + "ephemeral": [] }] } """ @@ -188,9 +189,8 @@ class SyncRestServlet(RestServlet): "state": state_event_ids, "limited": room.limited, "published": room.published, + "ephemeral": room.ephemeral, } - if room.typing is not None: - result["typing"] = room.typing return result -- cgit 1.4.1 From 4ad45f258245344218c5ac9ffcf20052bfdb5ba2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Jan 2015 16:41:49 +0000 Subject: Fix indent --- synapse/handlers/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 3860c3c95b..8c7681a48d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -158,7 +158,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def initial_sync_for_room(self, room_id, sync_config, now_token, - published_room_ids): + published_room_ids): """Sync a room for a client which is starting without any state Returns: A Deferred RoomSyncResult. -- cgit 1.4.1 From 78015948a7febb18e000651f72f8f58830a55b93 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Jan 2015 16:50:23 +0000 Subject: Initial implementation of auth conflict resolution --- synapse/events/utils.py | 6 +- synapse/federation/federation_client.py | 2 +- synapse/federation/federation_server.py | 33 +++++ synapse/federation/transport/client.py | 16 +++ synapse/federation/transport/server.py | 21 +++- synapse/handlers/federation.py | 207 ++++++++++++++++++++------------ synapse/storage/rejections.py | 4 +- tests/handlers/test_federation.py | 2 + 8 files changed, 210 insertions(+), 81 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index bcb5457278..10a6b9f264 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -45,12 +45,14 @@ def prune_event(event): "membership", ] + event_dict = event.get_dict() + new_content = {} def add_fields(*fields): for field in fields: if field in event.content: - new_content[field] = event.content[field] + new_content[field] = event_dict["content"][field] if event_type == EventTypes.Member: add_fields("membership") @@ -75,7 +77,7 @@ def prune_event(event): allowed_fields = { k: v - for k, v in event.get_dict().items() + for k, v in event_dict.items() if k in allowed_keys } diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ebcd593506..1173ca817b 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -345,7 +345,7 @@ class FederationClient(object): "auth_chain": [e.get_pdu_json(time_now) for e in local_auth], } - code, content = yield self.transport_layer.send_invite( + code, content = yield self.transport_layer.send_query_auth( destination=destination, room_id=room_id, event_id=event_id, diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index fc5342afaa..8cff4e6472 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -230,6 +230,39 @@ class FederationServer(object): "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], })) + @defer.inlineCallbacks + def on_query_auth_request(self, origin, content, event_id): + auth_chain = [ + (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + for e in content["auth_chain"] + ] + + missing = [ + (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + for e in content.get("missing", []) + ] + + ret = yield self.handler.on_query_auth( + origin, event_id, auth_chain, content.get("rejects", []), missing + ) + + time_now = self._clock.time_msec() + send_content = { + "auth_chain": [ + e.get_pdu_json(time_now) + for e in ret["auth_chain"] + ], + "rejects": content.get("rejects", []), + "missing": [ + e.get_pdu_json(time_now) + for e in ret.get("missing", []) + ], + } + + defer.returnValue( + (200, send_content) + ) + @log_function def _get_persisted_pdu(self, origin, event_id, do_auth=True): """ Get a PDU from the database with given origin and id. diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index e634a3a213..4cb1dea2de 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -213,3 +213,19 @@ class TransportLayerClient(object): ) defer.returnValue(response) + + @defer.inlineCallbacks + @log_function + def send_query_auth(self, destination, room_id, event_id, content): + path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id) + + code, content = yield self.client.post_json( + destination=destination, + path=path, + data=content, + ) + + if not 200 <= code < 300: + raise RuntimeError("Got %d from send_invite", code) + + defer.returnValue(json.loads(content)) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index a380a6910b..9c9f8d525b 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -42,7 +42,7 @@ class TransportLayerServer(object): content = None origin = None - if request.method == "PUT": + if request.method in ["PUT", "POST"]: # TODO: Handle other method types? other content types? try: content_bytes = request.content.read() @@ -234,6 +234,16 @@ class TransportLayerServer(object): ) ) ) + self.server.register_path( + "POST", + re.compile("^" + PREFIX + "/query_auth/([^/]*)/([^/]*)$"), + self._with_authentication( + lambda origin, content, query, context, event_id: + self._on_query_auth_request( + origin, content, event_id, + ) + ) + ) @defer.inlineCallbacks @log_function @@ -325,3 +335,12 @@ class TransportLayerServer(object): ) defer.returnValue((200, content)) + + @defer.inlineCallbacks + @log_function + def _on_query_auth_request(self, origin, content, event_id): + new_content = yield self.request_handler.on_query_auth_request( + origin, content, event_id + ) + + defer.returnValue((200, new_content)) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 97e3c503b9..14c26d8cea 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -126,7 +126,7 @@ class FederationHandler(BaseHandler): if not state: state, auth_chain = yield replication.get_state_for_room( - origin, context=event.room_id, event_id=event.event_id, + origin, room_id=event.room_id, event_id=event.event_id, ) if not auth_chain: @@ -139,7 +139,7 @@ class FederationHandler(BaseHandler): for e in auth_chain: e.internal_metadata.outlier = True try: - yield self._handle_new_event(e, fetch_auth_from=origin) + yield self._handle_new_event(origin, e) except: logger.exception( "Failed to handle auth event %s", @@ -152,7 +152,7 @@ class FederationHandler(BaseHandler): for e in state: e.internal_metadata.outlier = True try: - yield self._handle_new_event(e) + yield self._handle_new_event(origin, e) except: logger.exception( "Failed to handle state event %s", @@ -161,6 +161,7 @@ class FederationHandler(BaseHandler): try: yield self._handle_new_event( + origin, event, state=state, backfilled=backfilled, @@ -363,7 +364,14 @@ class FederationHandler(BaseHandler): for e in auth_chain: e.internal_metadata.outlier = True try: - yield self._handle_new_event(e) + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event( + target_host, e, auth_events=auth + ) except: logger.exception( "Failed to handle auth event %s", @@ -374,8 +382,13 @@ class FederationHandler(BaseHandler): # FIXME: Auth these. e.internal_metadata.outlier = True try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } yield self._handle_new_event( - e, fetch_auth_from=target_host + target_host, e, auth_events=auth ) except: logger.exception( @@ -384,6 +397,7 @@ class FederationHandler(BaseHandler): ) yield self._handle_new_event( + target_host, new_event, state=state, current_state=state, @@ -450,7 +464,7 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - context = yield self._handle_new_event(event) + context = yield self._handle_new_event(origin, event) logger.debug( "on_send_join_request: After _handle_new_event: %s, sigs: %s", @@ -651,11 +665,12 @@ class FederationHandler(BaseHandler): waiters.pop().callback(None) @defer.inlineCallbacks - def _handle_new_event(self, event, state=None, backfilled=False, - current_state=None, fetch_auth_from=None): + @log_function + def _handle_new_event(self, origin, event, state=None, backfilled=False, + current_state=None, auth_events=None): logger.debug( - "_handle_new_event: Before annotate: %s, sigs: %s", + "_handle_new_event: %s, sigs: %s", event.event_id, event.signatures, ) @@ -663,62 +678,34 @@ class FederationHandler(BaseHandler): event, old_state=state ) + if not auth_events: + auth_events = context.auth_events + logger.debug( - "_handle_new_event: Before auth fetch: %s, sigs: %s", - event.event_id, event.signatures, + "_handle_new_event: %s, auth_events: %s", + event.event_id, auth_events, ) is_new_state = not event.internal_metadata.is_outlier() - known_ids = set( - [s.event_id for s in context.auth_events.values()] - ) - - for e_id, _ in event.auth_events: - if e_id not in known_ids: - e = yield self.store.get_event(e_id, allow_none=True) - - if not e and fetch_auth_from is not None: - # Grab the auth_chain over federation if we are missing - # auth events. - auth_chain = yield self.replication_layer.get_event_auth( - fetch_auth_from, event.event_id, event.room_id - ) - for auth_event in auth_chain: - yield self._handle_new_event(auth_event) - e = yield self.store.get_event(e_id, allow_none=True) - - if not e: - # TODO: Do some conflict res to make sure that we're - # not the ones who are wrong. - logger.info( - "Rejecting %s as %s not in db or %s", - event.event_id, e_id, known_ids, - ) - # FIXME: How does raising AuthError work with federation? - raise AuthError(403, "Cannot find auth event") - - context.auth_events[(e.type, e.state_key)] = e - - logger.debug( - "_handle_new_event: Before hack: %s, sigs: %s", - event.event_id, event.signatures, - ) - + # This is a hack to fix some old rooms where the initial join event + # didn't reference the create event in its auth events. if event.type == EventTypes.Member and not event.auth_events: if len(event.prev_events) == 1: c = yield self.store.get_event(event.prev_events[0][0]) if c.type == EventTypes.Create: - context.auth_events[(c.type, c.state_key)] = c - - logger.debug( - "_handle_new_event: Before auth check: %s, sigs: %s", - event.event_id, event.signatures, - ) + auth_events[(c.type, c.state_key)] = c try: - self.auth.check(event, auth_events=context.auth_events) - except AuthError: + yield self.do_auth( + origin, event, context, auth_events=auth_events + ) + except AuthError as e: + logger.warn( + "Rejecting %s because %s", + event.event_id, e.msg + ) + # TODO: Store rejection. context.rejected = RejectedReason.AUTH_ERROR @@ -731,11 +718,6 @@ class FederationHandler(BaseHandler): ) raise - logger.debug( - "_handle_new_event: Before persist_event: %s, sigs: %s", - event.event_id, event.signatures, - ) - yield self.store.persist_event( event, context=context, @@ -744,25 +726,73 @@ class FederationHandler(BaseHandler): current_state=current_state, ) - logger.debug( - "_handle_new_event: After persist_event: %s, sigs: %s", - event.event_id, event.signatures, + defer.returnValue(context) + + @defer.inlineCallbacks + def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, + missing): + # Just go through and process each event in `remote_auth_chain`. We + # don't want to fall into the trap of `missing` being wrong. + for e in remote_auth_chain: + try: + yield self._handle_new_event(origin, e) + except AuthError: + pass + + # Now get the current auth_chain for the event. + local_auth_chain = yield self.store.get_auth_chain([event_id]) + + # TODO: Check if we would now reject event_id. If so we need to tell + # everyone. + + ret = yield self.construct_auth_difference( + local_auth_chain, remote_auth_chain ) - defer.returnValue(context) + logger.debug("on_query_auth reutrning: %s", ret) + + defer.returnValue(ret) @defer.inlineCallbacks - def do_auth(self, origin, event, context): - for e_id, _ in event.auth_events: - pass + @log_function + def do_auth(self, origin, event, context, auth_events): + # Check if we have all the auth events. + res = yield self.store.have_events( + [e_id for e_id, _ in event.auth_events] + ) - auth_events = set(e_id for e_id, _ in event.auth_events) - current_state = set(e.event_id for e in context.auth_events.values()) + event_auth_events = set(e_id for e_id, _ in event.auth_events) + seen_events = set(res.keys()) - missing_auth = auth_events - current_state + missing_auth = event_auth_events - seen_events if missing_auth: + logger.debug("Missing auth: %s", missing_auth) + # If we don't have all the auth events, we need to get them. + remote_auth_chain = yield self.replication_layer.get_event_auth( + origin, event.room_id, event.event_id + ) + + for e in remote_auth_chain: + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in remote_auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event( + origin, e, auth_events=auth + ) + auth_events[(e.type, e.state_key)] = e + except AuthError: + pass + + current_state = set(e.event_id for e in auth_events.values()) + different_auth = event_auth_events - current_state + + if different_auth and not event.internal_metadata.is_outlier(): # Do auth conflict res. + logger.debug("Different auth: %s", different_auth) # 1. Get what we think is the auth chain. auth_ids = self.auth.compute_auth_events(event, context) @@ -778,14 +808,24 @@ class FederationHandler(BaseHandler): # 3. Process any remote auth chain events we haven't seen. for e in result.get("missing", []): - # TODO. - pass + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in result["auth_chain"] + if e.event_id in auth_ids + } + yield self._handle_new_event( + origin, e, auth_events=auth + ) + auth_events[(e.type, e.state_key)] = e + except AuthError: + pass # 4. Look at rejects and their proofs. # TODO. try: - self.auth.check(event, auth_events=context.auth_events) + self.auth.check(event, auth_events=auth_events) except AuthError: raise @@ -802,12 +842,16 @@ class FederationHandler(BaseHandler): dict """ + logger.debug("construct_auth_difference Start!") + # TODO: Make sure we are OK with local_auth or remote_auth having more # auth events in them than strictly necessary. def sort_fun(ev): return ev.depth, ev.event_id + logger.debug("construct_auth_difference after sort_fun!") + # We find the differences by starting at the "bottom" of each list # and iterating up on both lists. The lists are ordered by depth and # then event_id, we iterate up both lists until we find the event ids @@ -823,11 +867,18 @@ class FederationHandler(BaseHandler): local_iter = iter(local_list) remote_iter = iter(remote_list) - current_local = local_iter.next() - current_remote = remote_iter.next() + logger.debug("construct_auth_difference before get_next!") def get_next(it, opt=None): - return it.next() if it.has_next() else opt + try: + return it.next() + except: + return opt + + current_local = get_next(local_iter) + current_remote = get_next(remote_iter) + + logger.debug("construct_auth_difference before while") missing_remotes = [] missing_locals = [] @@ -867,6 +918,8 @@ class FederationHandler(BaseHandler): current_remote = get_next(remote_iter) continue + logger.debug("construct_auth_difference after while") + # missing locals should be sent to the server # We should find why we are missing remotes, as they will have been # rejected. @@ -886,6 +939,7 @@ class FederationHandler(BaseHandler): reason = yield self.store.get_rejection_reason(e.event_id) if reason is None: # FIXME: ERRR?! + logger.warn("Could not find reason for %s", e.event_id) raise RuntimeError("") reason_map[e.event_id] = reason @@ -899,7 +953,10 @@ class FederationHandler(BaseHandler): # TODO: Get proof. pass + logger.debug("construct_auth_difference returning") + defer.returnValue({ + "auth_chain": local_auth, "rejects": { e.event_id: { "reason": reason_map[e.event_id], diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py index b7249700d7..4e1a9a2783 100644 --- a/synapse/storage/rejections.py +++ b/synapse/storage/rejections.py @@ -28,12 +28,12 @@ class RejectionsStore(SQLBaseStore): values={ "event_id": event_id, "reason": reason, - "last_failure": self._clock.time_msec(), + "last_check": self._clock.time_msec(), } ) def get_rejection_reason(self, event_id): - self._simple_select_one_onecol( + return self._simple_select_one_onecol( table="rejections", retcol="reason", keyvalues={ diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index ed21defd13..44dbce6bea 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -52,6 +52,7 @@ class FederationTestCase(unittest.TestCase): "get_room", "get_destination_retry_timings", "set_destination_retry_timings", + "have_events", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), @@ -90,6 +91,7 @@ class FederationTestCase(unittest.TestCase): self.datastore.persist_event.return_value = defer.succeed(None) self.datastore.get_room.return_value = defer.succeed(True) self.auth.check_host_in_room.return_value = defer.succeed(True) + self.datastore.have_events.return_value = defer.succeed({}) def annotate(ev, old_state=None): context = Mock() -- cgit 1.4.1 From ece828a7b72a25502d8daa15f6429c73a620b53c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Jan 2015 18:15:24 +0000 Subject: Update todo for the filtering on sync --- synapse/handlers/sync.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 8c7681a48d..5768702192 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -290,7 +290,8 @@ class SyncHandler(BaseHandler): """ # TODO(mjark): Check if they have joined the room between # the previous sync and this one. - # TODO(mjark): Apply the event filter in sync_config + # TODO(mjark): Apply the event filter in sync_config taking care to get + # enough events to reach the limit # TODO(mjark): Check for redactions we might have missed. recents, token = yield self.store.get_recent_events_for_room( room_id, -- cgit 1.4.1 From c1d860870b418b9583078022d0babe557b73760f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 10:48:47 +0000 Subject: Fix regression where we no longer correctly handled the case of gaps in our event graph --- synapse/federation/federation_server.py | 3 +++ synapse/handlers/federation.py | 8 ++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8cff4e6472..845a07a3a3 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -366,10 +366,13 @@ class FederationServer(object): logger.debug("Processed pdu %s", event_id) else: logger.warn("Failed to get PDU %s", event_id) + fetch_state = True except: # TODO(erikj): Do some more intelligent retries. logger.exception("Failed to get PDU") fetch_state = True + else: + fetch_state = True else: fetch_state = True diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 14c26d8cea..de47a97e6b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -119,7 +119,7 @@ class FederationHandler(BaseHandler): event.room_id, self.server_name ) - if not is_in_room and not event.internal_metadata.outlier: + if not is_in_room and not event.internal_metadata.is_outlier(): logger.debug("Got event for room we're not in.") replication = self.replication_layer @@ -780,6 +780,7 @@ class FederationHandler(BaseHandler): (e.type, e.state_key): e for e in remote_auth_chain if e.event_id in auth_ids } + e.internal_metadata.outlier = True yield self._handle_new_event( origin, e, auth_events=auth ) @@ -787,6 +788,8 @@ class FederationHandler(BaseHandler): except AuthError: pass + # FIXME: Assumes we have and stored all the state for all the + # prev_events current_state = set(e.event_id for e in auth_events.values()) different_auth = event_auth_events - current_state @@ -814,6 +817,7 @@ class FederationHandler(BaseHandler): (e.type, e.state_key): e for e in result["auth_chain"] if e.event_id in auth_ids } + e.internal_metadata.outlier = True yield self._handle_new_event( origin, e, auth_events=auth ) @@ -882,7 +886,7 @@ class FederationHandler(BaseHandler): missing_remotes = [] missing_locals = [] - while current_local and current_remote: + while current_local or current_remote: if current_remote is None: missing_locals.append(current_local) current_local = get_next(local_iter) -- cgit 1.4.1 From 0c2d245fdf5f24f99138f1c60c4e6bed52cf5d5a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 11:08:52 +0000 Subject: Update the current state of an event if we update auth events. --- synapse/handlers/federation.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index de47a97e6b..cc22f21cd1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -706,7 +706,6 @@ class FederationHandler(BaseHandler): event.event_id, e.msg ) - # TODO: Store rejection. context.rejected = RejectedReason.AUTH_ERROR yield self.store.persist_event( @@ -828,6 +827,9 @@ class FederationHandler(BaseHandler): # 4. Look at rejects and their proofs. # TODO. + context.current_state.update(auth_events) + context.state_group = None + try: self.auth.check(event, auth_events=auth_events) except AuthError: -- cgit 1.4.1 From 22dd1cde2d83a2448074816108b85d1957315236 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 30 Jan 2015 11:32:35 +0000 Subject: Filter the recent events before applying the limit when doing an incremental sync with a gap --- synapse/api/filtering.py | 2 -- synapse/handlers/sync.py | 53 ++++++++++++++++++++++++++---------- synapse/rest/client/v2_alpha/sync.py | 2 +- synapse/storage/stream.py | 21 ++++++++++---- 4 files changed, 54 insertions(+), 24 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index b7e5d3222f..fa4de2614d 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -12,8 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer - from synapse.api.errors import SynapseError from synapse.types import UserID, RoomID diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5768702192..0df1851b0e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -278,6 +278,40 @@ class SyncHandler(BaseHandler): next_batch=now_token, )) + @defer.inlineCallbacks + def load_filtered_recents(self, room_id, sync_config, since_token, + now_token): + limited = True + recents = [] + filtering_factor = 2 + load_limit = max(sync_config.limit * filtering_factor, 100) + max_repeat = 3 # Only try a few times per room, otherwise + room_key = now_token.room_key + + while limited and len(recents) < sync_config.limit and max_repeat: + events, room_key = yield self.store.get_recent_events_for_room( + room_id, + limit=load_limit + 1, + from_token=since_token.room_key, + end_token=room_key, + ) + loaded_recents = sync_config.filter.filter_room_events(events) + loaded_recents.extend(recents) + recents = loaded_recents + if len(events) <= load_limit: + limited = False + max_repeat -= 1 + + if len(recents) > sync_config.limit: + recents = recents[-sync_config.limit:] + room_key = recents[0].internal_metadata.before + + prev_batch_token = now_token.copy_and_replace( + "room_key", room_key + ) + + defer.returnValue((recents, prev_batch_token, limited)) + @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, @@ -288,28 +322,17 @@ class SyncHandler(BaseHandler): Returns: A Deferred RoomSyncResult """ + # TODO(mjark): Check if they have joined the room between # the previous sync and this one. - # TODO(mjark): Apply the event filter in sync_config taking care to get - # enough events to reach the limit # TODO(mjark): Check for redactions we might have missed. - recents, token = yield self.store.get_recent_events_for_room( - room_id, - limit=sync_config.limit + 1, - from_token=since_token.room_key, - end_token=now_token.room_key, + + recents, prev_batch_token, limited = self.load_filtered_recents( + room_id, sync_config, since_token, ) logging.debug("Recents %r", recents) - if len(recents) > sync_config.limit: - limited = True - recents = recents[1:] - else: - limited = False - - prev_batch_token = now_token.copy_and_replace("room_key", token[0]) - # TODO(mjark): This seems racy since this isn't being passed a # token to indicate what point in the stream this is current_state_events = yield self.state_handler.get_current_state( diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index c1277d2675..46ea50d118 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -116,7 +116,7 @@ class SyncRestServlet(RestServlet): user.localpart, filter_id ) except: - filter = Filter({}) + filter = Filter({}) # filter = filter.apply_overrides(http_request) #if filter.matches(event): # # stuff diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 2ea5e1a021..73504c8b52 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -181,15 +181,11 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) - for event, row in zip(ret, rows): - stream = row["stream_ordering"] - topo = event.depth - internal = event.internal_metadata - internal.before = str(_StreamToken(topo, stream - 1)) - internal.after = str(_StreamToken(topo, stream)) + self._set_before_and_after(ret, rows) if rows: key = "s%d" % max([r["stream_ordering"] for r in rows]) + else: # Assume we didn't get anything because there was nothing to # get. @@ -267,6 +263,8 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) + self._set_before_and_after(events, rows) + return events, next_token, return self.runInteraction("paginate_room_events", f) @@ -328,6 +326,8 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) + self._set_before_and_after(events, rows) + return events, token return self.runInteraction( @@ -354,3 +354,12 @@ class StreamStore(SQLBaseStore): key = res[0]["m"] return "s%d" % (key,) + + @staticmethod + def _set_before_and_after(events, rows): + for event, row in zip(events, rows): + stream = row["stream_ordering"] + topo = event.depth + internal = event.internal_metadata + internal.before = str(_StreamToken(topo, stream - 1)) + internal.after = str(_StreamToken(topo, stream)) -- cgit 1.4.1 From e97de6d96a25912b34fa38cd3a587d798f6b676c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 30 Jan 2015 11:35:20 +0000 Subject: Filter the recent events before applying the limit when doing an initial sync --- synapse/handlers/sync.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0df1851b0e..b83fcad655 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -163,12 +163,11 @@ class SyncHandler(BaseHandler): Returns: A Deferred RoomSyncResult. """ - recent_events, token = yield self.store.get_recent_events_for_room( - room_id, - limit=sync_config.limit, - end_token=now_token.room_key, + + recents, prev_batch_token, limited = self.load_filtered_recents( + room_id, sync_config, now_token, ) - prev_batch_token = now_token.copy_and_replace("room_key", token[0]) + current_state_events = yield self.state_handler.get_current_state( room_id ) @@ -176,10 +175,10 @@ class SyncHandler(BaseHandler): defer.returnValue(RoomSyncResult( room_id=room_id, published=room_id in published_room_ids, - events=recent_events, + events=recents, prev_batch=prev_batch_token, state=current_state_events, - limited=True, + limited=limited, ephemeral=[], )) @@ -279,8 +278,8 @@ class SyncHandler(BaseHandler): )) @defer.inlineCallbacks - def load_filtered_recents(self, room_id, sync_config, since_token, - now_token): + def load_filtered_recents(self, room_id, sync_config, now_token, + since_token=None): limited = True recents = [] filtering_factor = 2 @@ -292,7 +291,7 @@ class SyncHandler(BaseHandler): events, room_key = yield self.store.get_recent_events_for_room( room_id, limit=load_limit + 1, - from_token=since_token.room_key, + from_token=since_token.room_key if since_token else None, end_token=room_key, ) loaded_recents = sync_config.filter.filter_room_events(events) @@ -328,7 +327,7 @@ class SyncHandler(BaseHandler): # TODO(mjark): Check for redactions we might have missed. recents, prev_batch_token, limited = self.load_filtered_recents( - room_id, sync_config, since_token, + room_id, sync_config, now_token, since_token, ) logging.debug("Recents %r", recents) -- cgit 1.4.1 From 8498d348d818aa2d2cb9bb9bb2775103840f355d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 30 Jan 2015 11:42:09 +0000 Subject: Fix token formatting --- synapse/handlers/sync.py | 6 +++--- synapse/storage/stream.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b83fcad655..3c68e2a9ec 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -164,7 +164,7 @@ class SyncHandler(BaseHandler): A Deferred RoomSyncResult. """ - recents, prev_batch_token, limited = self.load_filtered_recents( + recents, prev_batch_token, limited = yield self.load_filtered_recents( room_id, sync_config, now_token, ) @@ -288,7 +288,7 @@ class SyncHandler(BaseHandler): room_key = now_token.room_key while limited and len(recents) < sync_config.limit and max_repeat: - events, room_key = yield self.store.get_recent_events_for_room( + events, (room_key,_) = yield self.store.get_recent_events_for_room( room_id, limit=load_limit + 1, from_token=since_token.room_key if since_token else None, @@ -326,7 +326,7 @@ class SyncHandler(BaseHandler): # the previous sync and this one. # TODO(mjark): Check for redactions we might have missed. - recents, prev_batch_token, limited = self.load_filtered_recents( + recents, prev_batch_token, limited = yield self.load_filtered_recents( room_id, sync_config, now_token, since_token, ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 73504c8b52..3ccb6f8a61 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -316,9 +316,9 @@ class StreamStore(SQLBaseStore): toke = rows[0]["stream_ordering"] - 1 start_token = str(_StreamToken(topo, toke)) - token = (start_token, end_token) + token = (start_token, str(end_token)) else: - token = (end_token, end_token) + token = (str(end_token), str(end_token)) events = self._get_events_txn( txn, -- cgit 1.4.1 From 4a67834bc84f604605c618049599f4638434c7cf Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 30 Jan 2015 11:50:15 +0000 Subject: Pass client info to the sync_config --- synapse/handlers/sync.py | 5 +++-- synapse/rest/client/v2_alpha/sync.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 3c68e2a9ec..1a74a4c97c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -28,7 +28,7 @@ logger = logging.getLogger(__name__) SyncConfig = collections.namedtuple("SyncConfig", [ "user", - "device", + "client_info", "limit", "gap", "sort", @@ -288,12 +288,13 @@ class SyncHandler(BaseHandler): room_key = now_token.room_key while limited and len(recents) < sync_config.limit and max_repeat: - events, (room_key,_) = yield self.store.get_recent_events_for_room( + events, keys = yield self.store.get_recent_events_for_room( room_id, limit=load_limit + 1, from_token=since_token.room_key if since_token else None, end_token=room_key, ) + (room_key, _) = keys loaded_recents = sync_config.filter.filter_room_events(events) loaded_recents.extend(recents) recents = loaded_recents diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 46ea50d118..81d5cf8ead 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -123,7 +123,7 @@ class SyncRestServlet(RestServlet): sync_config = SyncConfig( user=user, - device="TODO", # TODO(mjark) Get the device_id from access_token + client_info=client, gap=gap, limit=limit, sort=sort, -- cgit 1.4.1 From a70a801184814d116ed5b10a952e17c45df7bfc8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 13:34:01 +0000 Subject: Fix bug where we superfluously asked for current state. Change API of /query_auth/ so that we don't duplicate events in the response. --- synapse/api/auth.py | 2 ++ synapse/federation/federation_client.py | 7 +---- synapse/federation/federation_server.py | 12 ++++---- synapse/handlers/federation.py | 51 ++++++++++++--------------------- synapse/state.py | 20 ++++++++++--- 5 files changed, 43 insertions(+), 49 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 3471afd7e7..37e31d2b6f 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -102,6 +102,8 @@ class Auth(object): def check_host_in_room(self, room_id, host): curr_state = yield self.state.get_current_state(room_id) + logger.debug("Got curr_state %s", curr_state) + for event in curr_state: if event.type == EventTypes.Member: try: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 1173ca817b..e1539bd0e0 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -357,15 +357,10 @@ class FederationClient(object): for e in content["auth_chain"] ] - missing = [ - (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) - for e in content.get("missing", []) - ] - ret = { "auth_chain": auth_chain, "rejects": content.get("rejects", []), - "missing": missing, + "missing": content.get("missing", []), } defer.returnValue(ret) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 845a07a3a3..84ed0a0ba0 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -252,11 +252,8 @@ class FederationServer(object): e.get_pdu_json(time_now) for e in ret["auth_chain"] ], - "rejects": content.get("rejects", []), - "missing": [ - e.get_pdu_json(time_now) - for e in ret.get("missing", []) - ], + "rejects": ret.get("rejects", []), + "missing": ret.get("missing", []), } defer.returnValue( @@ -372,7 +369,10 @@ class FederationServer(object): logger.exception("Failed to get PDU") fetch_state = True else: - fetch_state = True + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + if prevs - seen: + fetch_state = True else: fetch_state = True diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index cc22f21cd1..35cad4182a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -121,38 +121,18 @@ class FederationHandler(BaseHandler): ) if not is_in_room and not event.internal_metadata.is_outlier(): logger.debug("Got event for room we're not in.") - - replication = self.replication_layer - - if not state: - state, auth_chain = yield replication.get_state_for_room( - origin, room_id=event.room_id, event_id=event.event_id, - ) - - if not auth_chain: - auth_chain = yield replication.get_event_auth( - origin, - context=event.room_id, - event_id=event.event_id, - ) - - for e in auth_chain: - e.internal_metadata.outlier = True - try: - yield self._handle_new_event(origin, e) - except: - logger.exception( - "Failed to handle auth event %s", - e.event_id, - ) - current_state = state - if state: + if state and auth_chain is not None: for e in state: e.internal_metadata.outlier = True try: - yield self._handle_new_event(origin, e) + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event(origin, e, auth_events=auth) except: logger.exception( "Failed to handle state event %s", @@ -809,18 +789,23 @@ class FederationHandler(BaseHandler): ) # 3. Process any remote auth chain events we haven't seen. - for e in result.get("missing", []): + for missing_id in result.get("missing", []): try: - auth_ids = [e_id for e_id, _ in e.auth_events] + for e in result["auth_chain"]: + if e.event_id == missing_id: + ev = e + break + + auth_ids = [e_id for e_id, _ in ev.auth_events] auth = { (e.type, e.state_key): e for e in result["auth_chain"] if e.event_id in auth_ids } - e.internal_metadata.outlier = True + ev.internal_metadata.outlier = True yield self._handle_new_event( - origin, e, auth_events=auth + origin, ev, auth_events=auth ) - auth_events[(e.type, e.state_key)] = e + auth_events[(ev.type, ev.state_key)] = ev except AuthError: pass @@ -970,5 +955,5 @@ class FederationHandler(BaseHandler): } for e in base_remote_rejected }, - "missing": missing_locals, + "missing": [e.event_id for e in missing_locals], }) diff --git a/synapse/state.py b/synapse/state.py index d9fdfb34be..e6632978b5 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -166,10 +166,17 @@ class StateHandler(object): first is the name of a state group if one and only one is involved, otherwise `None`. """ + logger.debug("resolve_state_groups event_ids %s", event_ids) + state_groups = yield self.store.get_state_groups( event_ids ) + logger.debug( + "resolve_state_groups state_groups %s", + state_groups.keys() + ) + group_names = set(state_groups.keys()) if len(group_names) == 1: name, state_list = state_groups.items().pop() @@ -205,6 +212,15 @@ class StateHandler(object): if len(v.values()) > 1 } + logger.debug( + "resolve_state_groups Unconflicted state: %s", + unconflicted_state.values(), + ) + logger.debug( + "resolve_state_groups Conflicted state: %s", + conflicted_state.values(), + ) + if event_type: prev_states_events = conflicted_state.get( (event_type, state_key), [] @@ -240,10 +256,6 @@ class StateHandler(object): 1. power levels 2. memberships 3. other events. - - :param conflicted_state: - :param auth_events: - :return: """ resolved_state = {} power_key = (EventTypes.PowerLevels, "") -- cgit 1.4.1 From 8fe39a03119b3d092e84319d6d99b6ffc49a83f5 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 30 Jan 2015 13:33:41 +0000 Subject: Check if the user has joined the room between incremental syncs --- synapse/handlers/sync.py | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 1a74a4c97c..9e07f30b24 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -16,7 +16,7 @@ from ._base import BaseHandler from synapse.streams.config import PaginationConfig -from synapse.api.constants import Membership +from synapse.api.constants import Membership, EventTypes from twisted.internet import defer @@ -250,6 +250,11 @@ class SyncHandler(BaseHandler): ) else: prev_batch = now_token + + state = yield self.check_joined_room( + sync_config, room_id, state + ) + room_sync = RoomSyncResult( room_id=room_id, published=room_id in published_room_ids, @@ -323,8 +328,6 @@ class SyncHandler(BaseHandler): A Deferred RoomSyncResult """ - # TODO(mjark): Check if they have joined the room between - # the previous sync and this one. # TODO(mjark): Check for redactions we might have missed. recents, prev_batch_token, limited = yield self.load_filtered_recents( @@ -349,6 +352,10 @@ class SyncHandler(BaseHandler): current_state=current_state_events, ) + state_events_delta = yield self.check_joined_room( + sync_config, room_id, state_events_delta + ) + room_sync = RoomSyncResult( room_id=room_id, published=room_id in published_room_ids, @@ -356,7 +363,7 @@ class SyncHandler(BaseHandler): prev_batch=prev_batch_token, state=state_events_delta, limited=limited, - typing=typing_by_room.get(room_id, None) + ephemeral=typing_by_room.get(room_id, None) ) logging.debug("Room sync: %r", room_sync) @@ -402,3 +409,19 @@ class SyncHandler(BaseHandler): if event.event_id not in previous_dict: state_delta.append(event) return state_delta + + @defer.inlineCallbacks + def check_joined_room(self, sync_config, room_id, state_delta): + joined = False + for event in state_delta: + if ( + event.type == EventTypes.Member + and event.state_key == sync_config.user.to_string() + ): + if event.content["membership"] == Membership.JOIN: + joined = True + + if joined: + state_delta = yield self.state_handler.get_current_state(room_id) + + defer.returnValue(state_delta) -- cgit 1.4.1 From 017dfaef4c0d4550acd45d27cdcea1c20766684d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 30 Jan 2015 15:52:05 +0000 Subject: Add doc string for __nonzero__ overrides for sync results, raise not implemented if the client attempts to do a gapless sync --- synapse/handlers/sync.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9e07f30b24..dc69b3cfe7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -49,6 +49,9 @@ class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ __slots__ = [] def __nonzero__(self): + """Make the result appear empty if there are no updates. This is used + to tell if room needs to be part of the sync result. + """ return bool(self.events or self.state or self.ephemeral) @@ -61,6 +64,10 @@ class SyncResult(collections.namedtuple("SyncResult", [ __slots__ = [] def __nonzero__(self): + """Make the result appear empty if there are no updates. This is used + to tell if the notifier needs to wait for more events when polling for + events. + """ return bool( self.private_user_data or self.public_user_data or self.rooms ) @@ -108,7 +115,7 @@ class SyncHandler(BaseHandler): return self.incremental_sync_with_gap(sync_config, since_token) else: #TODO(mjark): Handle gapless sync - pass + raise NotImplementedError() @defer.inlineCallbacks def initial_sync(self, sync_config): -- cgit 1.4.1 From b724a809c46f782231e18b70a632ce1ea9c540da Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 15:57:53 +0000 Subject: Only auth_events with event if event in event.auth_events --- synapse/handlers/federation.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 35cad4182a..40836e0c51 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -805,7 +805,9 @@ class FederationHandler(BaseHandler): yield self._handle_new_event( origin, ev, auth_events=auth ) - auth_events[(ev.type, ev.state_key)] = ev + + if ev.event_id in event_auth_events: + auth_events[(ev.type, ev.state_key)] = ev except AuthError: pass -- cgit 1.4.1 From 2cd29dbdd91a8f58f988eb3c417ad6a187fc7e87 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 16:51:58 +0000 Subject: Fix bug where accepting invite over federation didn't work. Add logging. --- synapse/handlers/federation.py | 57 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 49 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 40836e0c51..623c54d584 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -343,6 +343,10 @@ class FederationHandler(BaseHandler): for e in auth_chain: e.internal_metadata.outlier = True + + if e.event_id == event.event_id: + continue + try: auth_ids = [e_id for e_id, _ in e.auth_events] auth = { @@ -359,7 +363,9 @@ class FederationHandler(BaseHandler): ) for e in state: - # FIXME: Auth these. + if e.event_id == event.event_id: + continue + e.internal_metadata.outlier = True try: auth_ids = [e_id for e_id, _ in e.auth_events] @@ -376,11 +382,18 @@ class FederationHandler(BaseHandler): e.event_id, ) + auth_ids = [e_id for e_id, _ in event.auth_events] + auth_events = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event( target_host, new_event, state=state, current_state=state, + auth_events=auth_events, ) yield self.notifier.on_new_room_event( @@ -752,7 +765,17 @@ class FederationHandler(BaseHandler): origin, event.room_id, event.event_id ) + seen_remotes = yield self.store.have_events( + [e.event_id for e in remote_auth_chain] + ) + for e in remote_auth_chain: + if e.event_id in seen_remotes.keys(): + continue + + if e.event_id == event.event_id: + continue + try: auth_ids = [e_id for e_id, _ in e.auth_events] auth = { @@ -760,10 +783,17 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } e.internal_metadata.outlier = True + + logger.debug( + "do_auth %s missing_auth: %s", + event.event_id, e.event_id + ) yield self._handle_new_event( origin, e, auth_events=auth ) - auth_events[(e.type, e.state_key)] = e + + if e.event_id in event_auth_events: + auth_events[(e.type, e.state_key)] = e except AuthError: pass @@ -788,20 +818,31 @@ class FederationHandler(BaseHandler): local_auth_chain, ) + seen_remotes = yield self.store.have_events( + [e.event_id for e in result["auth_chain"]] + ) + # 3. Process any remote auth chain events we haven't seen. - for missing_id in result.get("missing", []): - try: - for e in result["auth_chain"]: - if e.event_id == missing_id: - ev = e - break + for ev in result["auth_chain"]: + if ev.event_id in seen_remotes.keys(): + continue + if ev.event_id == event.event_id: + continue + + try: auth_ids = [e_id for e_id, _ in ev.auth_events] auth = { (e.type, e.state_key): e for e in result["auth_chain"] if e.event_id in auth_ids } ev.internal_metadata.outlier = True + + logger.debug( + "do_auth %s different_auth: %s", + event.event_id, e.event_id + ) + yield self._handle_new_event( origin, ev, auth_events=auth ) -- cgit 1.4.1 From 0b1cc7cc0bd81f93ed353a6ac67e06f91b62ab90 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 30 Jan 2015 16:24:40 +0000 Subject: Return empty list rather than None when there are no emphemeral events for a room --- synapse/handlers/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index dc69b3cfe7..962686f4bb 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -370,7 +370,7 @@ class SyncHandler(BaseHandler): prev_batch=prev_batch_token, state=state_events_delta, limited=limited, - ephemeral=typing_by_room.get(room_id, None) + ephemeral=typing_by_room.get(room_id, []) ) logging.debug("Room sync: %r", room_sync) -- cgit 1.4.1 From 4c0da49d7ca58774adad870da98a7e168a5be18c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 22:53:13 +0000 Subject: Resign events when we return them via /query_auth/ --- synapse/handlers/federation.py | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 623c54d584..8bf5a4cc11 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -741,6 +741,15 @@ class FederationHandler(BaseHandler): local_auth_chain, remote_auth_chain ) + for event in ret["auth_chain"]: + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) + logger.debug("on_query_auth reutrning: %s", ret) defer.returnValue(ret) -- cgit 1.4.1