From e7bc1291a079224315cea5c756061ad711241be1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Oct 2014 16:06:59 +0100 Subject: Begin making auth use event.old_state_events --- synapse/handlers/_base.py | 10 +++++++++- synapse/handlers/directory.py | 5 +++-- synapse/handlers/federation.py | 11 +++++++++-- synapse/handlers/message.py | 13 +++---------- synapse/handlers/profile.py | 5 +++-- synapse/handlers/room.py | 19 +++++++++---------- 6 files changed, 36 insertions(+), 27 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index de4d23bbb3..cd6c35f194 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -44,9 +44,17 @@ class BaseHandler(object): @defer.inlineCallbacks def _on_new_room_event(self, event, snapshot, extra_destinations=[], - extra_users=[]): + extra_users=[], suppress_auth=False): snapshot.fill_out_prev_events(event) + yield self.state_handler.annotate_state_groups(event) + + if not suppress_auth: + yield self.auth.check(event, snapshot, raises=True) + + if hasattr(event, "state_key"): + yield self.state_handler.handle_new_event(event, snapshot) + yield self.store.persist_event(event) destinations = set(extra_destinations) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index a56830d520..6e897e915d 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -152,5 +152,6 @@ class DirectoryHandler(BaseHandler): user_id=user_id, ) - yield self.state_handler.handle_new_event(event, snapshot) - yield self._on_new_room_event(event, snapshot, extra_users=[user_id]) + yield self._on_new_room_event( + event, snapshot, extra_users=[user_id], suppress_auth=True + ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f52591d2a3..44bf7def2e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -95,6 +95,8 @@ class FederationHandler(BaseHandler): logger.debug("Got event: %s", event.event_id) + yield self.state_handler.annotate_state_groups(event) + with (yield self.lock_manager.lock(pdu.context)): if event.is_state and not backfilled: is_new_state = yield self.state_handler.handle_new_state( @@ -195,7 +197,12 @@ class FederationHandler(BaseHandler): for pdu in pdus: event = self.pdu_codec.event_from_pdu(pdu) + + # FIXME (erikj): Not sure this actually works :/ + yield self.state_handler.annotate_state_groups(event) + events.append(event) + yield self.store.persist_event(event, backfilled=True) defer.returnValue(events) @@ -235,6 +242,7 @@ class FederationHandler(BaseHandler): new_event.destinations = [target_host] snapshot.fill_out_prev_events(new_event) + yield self.state_handler.annotate_state_groups(new_event) yield self.handle_new_event(new_event, snapshot) # TODO (erikj): Time out here. @@ -254,12 +262,11 @@ class FederationHandler(BaseHandler): is_public=False ) except: + # FIXME pass - defer.returnValue(True) - @log_function def _on_user_joined(self, user, room_id): waiters = self.waiting_for_join_list.get((user.to_string(), room_id), []) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 317ef2c80c..1c2cbce151 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -87,10 +87,9 @@ class MessageHandler(BaseHandler): snapshot = yield self.store.snapshot_room(event.room_id, event.user_id) - if not suppress_auth: - yield self.auth.check(event, snapshot, raises=True) - - yield self._on_new_room_event(event, snapshot) + yield self._on_new_room_event( + event, snapshot, suppress_auth=suppress_auth + ) self.hs.get_handlers().presence_handler.bump_presence_active_time( user @@ -149,13 +148,9 @@ class MessageHandler(BaseHandler): state_key=event.state_key, ) - yield self.auth.check(event, snapshot, raises=True) - if stamp_event: event.content["hsob_ts"] = int(self.clock.time_msec()) - yield self.state_handler.handle_new_event(event, snapshot) - yield self._on_new_room_event(event, snapshot) @defer.inlineCallbacks @@ -227,8 +222,6 @@ class MessageHandler(BaseHandler): snapshot = yield self.store.snapshot_room(event.room_id, event.user_id) - yield self.auth.check(event, snapshot, raises=True) - # store message in db yield self._on_new_room_event(event, snapshot) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index dab9b03f04..4cd0a06093 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -218,5 +218,6 @@ class ProfileHandler(BaseHandler): user_id=j.state_key, ) - yield self.state_handler.handle_new_event(new_event, snapshot) - yield self._on_new_room_event(new_event, snapshot) + yield self._on_new_room_event( + new_event, snapshot, suppress_auth=True + ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index c0f9a7c807..cb5bd17d2b 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -129,8 +129,9 @@ class RoomCreationHandler(BaseHandler): logger.debug("Event: %s", event) - yield self.state_handler.handle_new_event(event, snapshot) - yield self._on_new_room_event(event, snapshot, extra_users=[user]) + yield self._on_new_room_event( + event, snapshot, extra_users=[user], suppress_auth=True + ) for event in creation_events: yield handle_event(event) @@ -396,8 +397,6 @@ class RoomMemberHandler(BaseHandler): yield self._do_join(event, snapshot, do_auth=do_auth) else: # This is not a JOIN, so we can handle it normally. - if do_auth: - yield self.auth.check(event, snapshot, raises=True) # If we're banning someone, set a req power level if event.membership == Membership.BAN: @@ -419,6 +418,7 @@ class RoomMemberHandler(BaseHandler): event, membership=event.content["membership"], snapshot=snapshot, + do_auth=do_auth, ) defer.returnValue({"room_id": room_id}) @@ -507,14 +507,11 @@ class RoomMemberHandler(BaseHandler): if not have_joined: logger.debug("Doing normal join") - if do_auth: - yield self.auth.check(event, snapshot, raises=True) - - yield self.state_handler.handle_new_event(event, snapshot) yield self._do_local_membership_update( event, membership=event.content["membership"], snapshot=snapshot, + do_auth=do_auth, ) user = self.hs.parse_userid(event.user_id) @@ -558,7 +555,8 @@ class RoomMemberHandler(BaseHandler): defer.returnValue([r.room_id for r in rooms]) - def _do_local_membership_update(self, event, membership, snapshot): + def _do_local_membership_update(self, event, membership, snapshot, + do_auth): destinations = [] # If we're inviting someone, then we should also send it to that @@ -575,9 +573,10 @@ class RoomMemberHandler(BaseHandler): return self._on_new_room_event( event, snapshot, extra_destinations=destinations, - extra_users=[target_user] + extra_users=[target_user], suppress_auth=(not do_auth), ) + class RoomListHandler(BaseHandler): @defer.inlineCallbacks -- cgit 1.5.1 From 1116f5330ec80533954026f67018e0db190cbae0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Oct 2014 16:56:51 +0100 Subject: Start implementing the invite/join dance. Continue moving auth to use event.state_events --- synapse/api/auth.py | 16 +++----- synapse/federation/replication.py | 22 +++++++++-- synapse/federation/transport.py | 34 +++++++++++++++- synapse/handlers/federation.py | 83 +++++++++++++++++++++++++++++++++++---- 4 files changed, 133 insertions(+), 22 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index d951cb265b..12ddef1b00 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -21,7 +21,7 @@ from synapse.api.constants import Membership, JoinRules from synapse.api.errors import AuthError, StoreError, Codes, SynapseError from synapse.api.events.room import ( RoomMemberEvent, RoomPowerLevelsEvent, RoomRedactionEvent, - RoomJoinRulesEvent, RoomOpsPowerLevelsEvent, + RoomJoinRulesEvent, RoomOpsPowerLevelsEvent, InviteJoinEvent, ) from synapse.util.logutils import log_function @@ -56,7 +56,8 @@ class Auth(object): defer.returnValue(allowed) return - self.check_event_sender_in_room(event) + if not event.type == InviteJoinEvent.TYPE: + self.check_event_sender_in_room(event) if is_state: # TODO (erikj): This really only should be called for *new* @@ -115,11 +116,6 @@ class Auth(object): def is_membership_change_allowed(self, event): target_user_id = event.state_key - # does this room even exist - room = yield self.store.get_room(event.room_id) - if not room: - raise AuthError(403, "Room does not exist") - # get info about the caller key = (RoomMemberEvent.TYPE, event.user_id, ) caller = event.old_state_events.get(key) @@ -170,7 +166,7 @@ class Auth(object): # joined: It's a NOOP if event.user_id != target_user_id: raise AuthError(403, "Cannot force another user to join.") - elif join_rule == JoinRules.PUBLIC or room.is_public: + elif join_rule == JoinRules.PUBLIC: pass elif join_rule == JoinRules.INVITE: if ( @@ -215,9 +211,9 @@ class Auth(object): power_level_event = event.old_state_events.get(key) level = None if power_level_event: - level = power_level_event.content[user_id] + level = power_level_event.content.get(user_id) if not level: - level = power_level_event.content["default"] + level = power_level_event.content.get("default", 0) return level diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 2346d55045..08c29dece5 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -393,9 +393,25 @@ class ReplicationLayer(object): response = yield self.query_handlers[query_type](args) defer.returnValue((200, response)) else: - defer.returnValue((404, "No handler for Query type '%s'" - % (query_type) - )) + defer.returnValue( + (404, "No handler for Query type '%s'" % (query_type, )) + ) + + def on_make_join_request(self, context, user_id): + return self.handler.on_make_join_request(context, user_id) + + @defer.inlineCallbacks + def on_send_join_request(self, origin, content): + pdu = Pdu(**content) + state = yield self.handler.on_send_join_request(origin, pdu) + defer.returnValue((200, self._transaction_from_pdus(state).get_dict())) + + def make_join(self, destination, context, user_id): + return self.transport_layer.make_join( + destination=destination, + context=context, + user_id=user_id, + ) @defer.inlineCallbacks @log_function diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index 755eee8cf6..4f552272e6 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -197,6 +197,19 @@ class TransportLayer(object): defer.returnValue(response) + @defer.inlineCallbacks + @log_function + def make_join(self, destination, context, user_id, retry_on_dns_fail=True): + path = PREFIX + "/make_join/%s/%s" % (context, user_id,) + + response = yield self.client.get_json( + destination=destination, + path=path, + retry_on_dns_fail=retry_on_dns_fail, + ) + + defer.returnValue(response) + @defer.inlineCallbacks def _authenticate_request(self, request): json_request = { @@ -353,6 +366,12 @@ class TransportLayer(object): ) ) + self.server.register_path( + "GET", + re.compile("^" + PREFIX + "/make_join/([^/]*)/([^/]*)$"), + self._on_make_join_request + ) + @defer.inlineCallbacks @log_function def _on_send_request(self, origin, content, query, transaction_id): @@ -438,7 +457,20 @@ class TransportLayer(object): versions = [v.split(",", 1) for v in v_list] return self.request_handler.on_backfill_request( - context, versions, limit) + context, versions, limit + ) + + @log_function + def _on_make_join_request(self, origin, content, query, context, user_id): + return self.request_handler.on_make_join_request( + context, user_id, + ) + + @log_function + def _on_send_join_request(self, origin, content, query): + return self.request_handler.on_send_join_request( + origin, content, + ) class TransportReceivedHandler(object): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 44bf7def2e..a4f6c739c3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -89,7 +89,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def on_receive_pdu(self, pdu, backfilled): """ Called by the ReplicationLayer when we have a new pdu. We need to - do auth checks and put it throught the StateHandler. + do auth checks and put it through the StateHandler. """ event = self.pdu_codec.event_from_pdu(pdu) @@ -97,13 +97,17 @@ class FederationHandler(BaseHandler): yield self.state_handler.annotate_state_groups(event) - with (yield self.lock_manager.lock(pdu.context)): - if event.is_state and not backfilled: - is_new_state = yield self.state_handler.handle_new_state( - pdu - ) - else: - is_new_state = False + logger.debug("Event: %s", event) + + if not backfilled: + yield self.auth.check(event, None, raises=True) + + if event.is_state and not backfilled: + is_new_state = yield self.state_handler.handle_new_state( + pdu + ) + else: + is_new_state = False # TODO: Implement something in federation that allows us to # respond to PDU. @@ -267,6 +271,69 @@ class FederationHandler(BaseHandler): defer.returnValue(True) + @defer.inlineCallbacks + def on_make_join_request(self, context, user_id): + event = self.event_factory.create_event( + etype=RoomMemberEvent.TYPE, + content={"membership": Membership.JOIN}, + room_id=context, + user_id=user_id, + state_key=user_id, + ) + + snapshot = yield self.store.snapshot_room( + event.room_id, event.user_id, + ) + snapshot.fill_out_prev_events(event) + + pdu = self.pdu_codec.pdu_from_event(event) + + defer.returnValue(pdu) + + @defer.inlineCallbacks + def on_send_join_request(self, origin, pdu): + event = self.pdu_codec.event_from_pdu(pdu) + + yield self.state_handler.annotate_state_groups(event) + yield self.auth.check(event, None, raises=True) + + is_new_state = yield self.state_handler.handle_new_state( + pdu + ) + + # FIXME (erikj): All this is duplicated above :( + + yield self.store.persist_event( + event, + backfilled=False, + is_new_state=is_new_state + ) + + extra_users = [] + if event.type == RoomMemberEvent.TYPE: + target_user_id = event.state_key + target_user = self.hs.parse_userid(target_user_id) + extra_users.append(target_user) + + yield self.notifier.on_new_room_event( + event, extra_users=extra_users + ) + + if event.type == RoomMemberEvent.TYPE: + if event.membership == Membership.JOIN: + user = self.hs.parse_userid(event.state_key) + self.distributor.fire( + "user_joined_room", user=user, room_id=event.room_id + ) + + pdu.destinations = yield self.store.get_joined_hosts_for_room( + event.room_id + ) + + yield self.replication_layer.send_pdu(pdu) + + defer.returnValue(event.state_events.values()) + @log_function def _on_user_joined(self, user, room_id): waiters = self.waiting_for_join_list.get((user.to_string(), room_id), []) -- cgit 1.5.1 From f71627567b4aa58c5aba7e79c6d972b8ac26b449 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Oct 2014 15:04:17 +0100 Subject: Finish implementing the new join dance. --- synapse/api/auth.py | 9 ++ synapse/api/events/factory.py | 14 ++- synapse/federation/replication.py | 66 ++++++++++---- synapse/federation/transport.py | 68 ++++++++++++-- synapse/handlers/federation.py | 181 ++++++++++++++++++-------------------- synapse/state.py | 10 ++- 6 files changed, 222 insertions(+), 126 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 12ddef1b00..d1eca791ab 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -48,6 +48,15 @@ class Auth(object): """ try: if hasattr(event, "room_id"): + if not event.old_state_events: + # Oh, we don't know what the state of the room was, so we + # are trusting that this is allowed (at least for now) + defer.returnValue(True) + + if hasattr(event, "outlier") and event.outlier: + # TODO (erikj): Auth for outliers is done differently. + defer.returnValue(True) + is_state = hasattr(event, "state_key") if event.type == RoomMemberEvent.TYPE: diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py index 0d94850cec..c6d1151cac 100644 --- a/synapse/api/events/factory.py +++ b/synapse/api/events/factory.py @@ -51,12 +51,20 @@ class EventFactory(object): self.clock = hs.get_clock() self.hs = hs + self.event_id_count = 0 + + def create_event_id(self): + i = str(self.event_id_count) + self.event_id_count += 1 + + local_part = str(int(self.clock.time())) + i + random_string(5) + + return "%s@%s" % (local_part, self.hs.hostname) + def create_event(self, etype=None, **kwargs): kwargs["type"] = etype if "event_id" not in kwargs: - kwargs["event_id"] = "%s@%s" % ( - random_string(10), self.hs.hostname - ) + kwargs["event_id"] = self.create_event_id() if "ts" not in kwargs: kwargs["ts"] = int(self.clock.time_msec()) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 08c29dece5..d482193851 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -244,13 +244,14 @@ class ReplicationLayer(object): pdu = None if pdu_list: pdu = pdu_list[0] - yield self._handle_new_pdu(pdu) + yield self._handle_new_pdu(destination, pdu) defer.returnValue(pdu) @defer.inlineCallbacks @log_function - def get_state_for_context(self, destination, context): + def get_state_for_context(self, destination, context, pdu_id=None, + pdu_origin=None): """Requests all of the `current` state PDUs for a given context from a remote home server. @@ -263,13 +264,14 @@ class ReplicationLayer(object): """ transaction_data = yield self.transport_layer.get_context_state( - destination, context) + destination, context, pdu_id=pdu_id, pdu_origin=pdu_origin, + ) transaction = Transaction(**transaction_data) pdus = [Pdu(outlier=True, **p) for p in transaction.pdus] for pdu in pdus: - yield self._handle_new_pdu(pdu) + yield self._handle_new_pdu(destination, pdu) defer.returnValue(pdus) @@ -315,7 +317,7 @@ class ReplicationLayer(object): dl = [] for pdu in pdu_list: - dl.append(self._handle_new_pdu(pdu)) + dl.append(self._handle_new_pdu(transaction.origin, pdu)) if hasattr(transaction, "edus"): for edu in [Edu(**x) for x in transaction.edus]: @@ -347,14 +349,19 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_context_state_request(self, context): - results = yield self.store.get_current_state_for_context( - context - ) + def on_context_state_request(self, context, pdu_id, pdu_origin): + if pdu_id and pdu_origin: + pdus = yield self.handler.get_state_for_pdu( + pdu_id, pdu_origin + ) + else: + results = yield self.store.get_current_state_for_context( + context + ) + pdus = [Pdu.from_pdu_tuple(p) for p in results] - logger.debug("Context returning %d results", len(results)) + logger.debug("Context returning %d results", len(pdus)) - pdus = [Pdu.from_pdu_tuple(p) for p in results] defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) @defer.inlineCallbacks @@ -396,9 +403,10 @@ class ReplicationLayer(object): defer.returnValue( (404, "No handler for Query type '%s'" % (query_type, )) ) - + @defer.inlineCallbacks def on_make_join_request(self, context, user_id): - return self.handler.on_make_join_request(context, user_id) + pdu = yield self.handler.on_make_join_request(context, user_id) + defer.returnValue(pdu.get_dict()) @defer.inlineCallbacks def on_send_join_request(self, origin, content): @@ -406,13 +414,27 @@ class ReplicationLayer(object): state = yield self.handler.on_send_join_request(origin, pdu) defer.returnValue((200, self._transaction_from_pdus(state).get_dict())) + @defer.inlineCallbacks def make_join(self, destination, context, user_id): - return self.transport_layer.make_join( + pdu_dict = yield self.transport_layer.make_join( destination=destination, context=context, user_id=user_id, ) + logger.debug("Got response to make_join: %s", pdu_dict) + + defer.returnValue(Pdu(**pdu_dict)) + + def send_join(self, destination, pdu): + return self.transport_layer.send_join( + destination, + pdu.context, + pdu.pdu_id, + pdu.origin, + pdu.get_dict(), + ) + @defer.inlineCallbacks @log_function def _get_persisted_pdu(self, pdu_id, pdu_origin): @@ -443,7 +465,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def _handle_new_pdu(self, pdu, backfilled=False): + def _handle_new_pdu(self, origin, pdu, backfilled=False): # We reprocess pdus when we have seen them only as outliers existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin) @@ -452,6 +474,8 @@ class ReplicationLayer(object): defer.returnValue({}) return + state = None + # Get missing pdus if necessary. is_new = yield self.pdu_actions.is_new(pdu) if is_new and not pdu.outlier: @@ -475,12 +499,22 @@ class ReplicationLayer(object): except: # TODO(erikj): Do some more intelligent retries. logger.exception("Failed to get PDU") + else: + # We need to get the state at this event, since we have reached + # a backward extremity edge. + state = yield self.get_state_for_context( + origin, pdu.context, pdu.pdu_id, pdu.origin, + ) # Persist the Pdu, but don't mark it as processed yet. yield self.store.persist_event(pdu=pdu) if not backfilled: - ret = yield self.handler.on_receive_pdu(pdu, backfilled=backfilled) + ret = yield self.handler.on_receive_pdu( + pdu, + backfilled=backfilled, + state=state, + ) else: ret = None diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index 4f552272e6..a0d34fd24d 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -72,7 +72,8 @@ class TransportLayer(object): self.received_handler = None @log_function - def get_context_state(self, destination, context): + def get_context_state(self, destination, context, pdu_id=None, + pdu_origin=None): """ Requests all state for a given context (i.e. room) from the given server. @@ -89,7 +90,14 @@ class TransportLayer(object): subpath = "/state/%s/" % context - return self._do_request_for_transaction(destination, subpath) + args = {} + if pdu_id and pdu_origin: + args["pdu_id"] = pdu_id + args["pdu_origin"] = pdu_origin + + return self._do_request_for_transaction( + destination, subpath, args=args + ) @log_function def get_pdu(self, destination, pdu_origin, pdu_id): @@ -135,8 +143,10 @@ class TransportLayer(object): subpath = "/backfill/%s/" % context - args = {"v": ["%s,%s" % (i, o) for i, o in pdu_tuples]} - args["limit"] = limit + args = { + "v": ["%s,%s" % (i, o) for i, o in pdu_tuples], + "limit": limit, + } return self._do_request_for_transaction( dest, @@ -210,6 +220,23 @@ class TransportLayer(object): defer.returnValue(response) + @defer.inlineCallbacks + @log_function + def send_join(self, destination, context, pdu_id, origin, content): + path = PREFIX + "/send_join/%s/%s/%s" % ( + context, + origin, + pdu_id, + ) + + response = yield self.client.put_json( + destination=destination, + path=path, + data=content, + ) + + defer.returnValue(response) + @defer.inlineCallbacks def _authenticate_request(self, request): json_request = { @@ -330,7 +357,11 @@ class TransportLayer(object): re.compile("^" + PREFIX + "/state/([^/]*)/$"), self._with_authentication( lambda origin, content, query, context: - handler.on_context_state_request(context) + handler.on_context_state_request( + context, + query.get("pdu_id", [None])[0], + query.get("pdu_origin", [None])[0] + ) ) ) @@ -369,7 +400,23 @@ class TransportLayer(object): self.server.register_path( "GET", re.compile("^" + PREFIX + "/make_join/([^/]*)/([^/]*)$"), - self._on_make_join_request + self._with_authentication( + lambda origin, content, query, context, user_id: + self._on_make_join_request( + origin, content, query, context, user_id + ) + ) + ) + + self.server.register_path( + "PUT", + re.compile("^" + PREFIX + "/send_join/([^/]*)/([^/]*)/([^/]*)$"), + self._with_authentication( + lambda origin, content, query, context, pdu_origin, pdu_id: + self._on_send_join_request( + origin, content, query, + ) + ) ) @defer.inlineCallbacks @@ -460,18 +507,23 @@ class TransportLayer(object): context, versions, limit ) + @defer.inlineCallbacks @log_function def _on_make_join_request(self, origin, content, query, context, user_id): - return self.request_handler.on_make_join_request( + content = yield self.request_handler.on_make_join_request( context, user_id, ) + defer.returnValue((200, content)) + @defer.inlineCallbacks @log_function def _on_send_join_request(self, origin, content, query): - return self.request_handler.on_send_join_request( + content = yield self.request_handler.on_send_join_request( origin, content, ) + defer.returnValue((200, content)) + class TransportReceivedHandler(object): """ Callbacks used when we receive a transaction diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a4f6c739c3..0ae0541bd3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -20,7 +20,7 @@ from ._base import BaseHandler from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent from synapse.api.constants import Membership from synapse.util.logutils import log_function -from synapse.federation.pdu_codec import PduCodec +from synapse.federation.pdu_codec import PduCodec, encode_event_id from synapse.api.errors import SynapseError from twisted.internet import defer, reactor @@ -87,7 +87,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def on_receive_pdu(self, pdu, backfilled): + def on_receive_pdu(self, pdu, backfilled, state=None): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it through the StateHandler. """ @@ -95,7 +95,10 @@ class FederationHandler(BaseHandler): logger.debug("Got event: %s", event.event_id) - yield self.state_handler.annotate_state_groups(event) + if state: + state = [self.pdu_codec.event_from_pdu(p) for p in state] + state = {(e.type, e.state_key): e for e in state} + yield self.state_handler.annotate_state_groups(event, state=state) logger.debug("Event: %s", event) @@ -108,83 +111,55 @@ class FederationHandler(BaseHandler): ) else: is_new_state = False + # TODO: Implement something in federation that allows us to # respond to PDU. - target_is_mine = False - if hasattr(event, "target_host"): - target_is_mine = event.target_host == self.hs.hostname - - if event.type == InviteJoinEvent.TYPE: - if not target_is_mine: - logger.debug("Ignoring invite/join event %s", event) - return - - # If we receive an invite/join event then we need to join the - # sender to the given room. - # TODO: We should probably auth this or some such - content = event.content - content.update({"membership": Membership.JOIN}) - new_event = self.event_factory.create_event( - etype=RoomMemberEvent.TYPE, - state_key=event.user_id, - room_id=event.room_id, - user_id=event.user_id, - membership=Membership.JOIN, - content=content + with (yield self.room_lock.lock(event.room_id)): + yield self.store.persist_event( + event, + backfilled, + is_new_state=is_new_state ) - yield self.hs.get_handlers().room_member_handler.change_membership( - new_event, - do_auth=False, - ) + room = yield self.store.get_room(event.room_id) - else: - with (yield self.room_lock.lock(event.room_id)): - yield self.store.persist_event( - event, - backfilled, - is_new_state=is_new_state + if not room: + # Huh, let's try and get the current state + try: + yield self.replication_layer.get_state_for_context( + event.origin, event.room_id, pdu.pdu_id, pdu.origin, ) - room = yield self.store.get_room(event.room_id) - - if not room: - # Huh, let's try and get the current state - try: - yield self.replication_layer.get_state_for_context( - event.origin, event.room_id - ) - - hosts = yield self.store.get_joined_hosts_for_room( - event.room_id - ) - if self.hs.hostname in hosts: - try: - yield self.store.store_room( - room_id=event.room_id, - room_creator_user_id="", - is_public=False, - ) - except: - pass - except: - logger.exception( - "Failed to get current state for room %s", - event.room_id - ) - - if not backfilled: - extra_users = [] - if event.type == RoomMemberEvent.TYPE: - target_user_id = event.state_key - target_user = self.hs.parse_userid(target_user_id) - extra_users.append(target_user) - - yield self.notifier.on_new_room_event( - event, extra_users=extra_users + hosts = yield self.store.get_joined_hosts_for_room( + event.room_id + ) + if self.hs.hostname in hosts: + try: + yield self.store.store_room( + room_id=event.room_id, + room_creator_user_id="", + is_public=False, + ) + except: + pass + except: + logger.exception( + "Failed to get current state for room %s", + event.room_id ) + if not backfilled: + extra_users = [] + if event.type == RoomMemberEvent.TYPE: + target_user_id = event.state_key + target_user = self.hs.parse_userid(target_user_id) + extra_users.append(target_user) + + yield self.notifier.on_new_room_event( + event, extra_users=extra_users + ) + if event.type == RoomMemberEvent.TYPE: if event.membership == Membership.JOIN: user = self.hs.parse_userid(event.state_key) @@ -214,40 +189,35 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks def do_invite_join(self, target_host, room_id, joinee, content, snapshot): - hosts = yield self.store.get_joined_hosts_for_room(room_id) if self.hs.hostname in hosts: # We are already in the room. logger.debug("We're already in the room apparently") defer.returnValue(False) - # First get current state to see if we are already joined. - try: - yield self.replication_layer.get_state_for_context( - target_host, room_id - ) - - hosts = yield self.store.get_joined_hosts_for_room(room_id) - if self.hs.hostname in hosts: - # Oh, we were actually in the room already. - logger.debug("We're already in the room apparently") - defer.returnValue(False) - except Exception: - logger.exception("Failed to get current state") - - new_event = self.event_factory.create_event( - etype=InviteJoinEvent.TYPE, - target_host=target_host, - room_id=room_id, - user_id=joinee, - content=content + pdu = yield self.replication_layer.make_join( + target_host, + room_id, + joinee ) - new_event.destinations = [target_host] + logger.debug("Got response to make_join: %s", pdu) - snapshot.fill_out_prev_events(new_event) - yield self.state_handler.annotate_state_groups(new_event) - yield self.handle_new_event(new_event, snapshot) + event = self.pdu_codec.event_from_pdu(pdu) + + # We should assert some things. + assert(event.type == RoomMemberEvent.TYPE) + assert(event.user_id == joinee) + assert(event.state_key == joinee) + assert(event.room_id == room_id) + + event.event_id = self.event_factory.create_event_id() + event.content = content + + state = yield self.replication_layer.send_join( + target_host, + self.pdu_codec.pdu_from_event(event) + ) # TODO (erikj): Time out here. d = defer.Deferred() @@ -326,14 +296,31 @@ class FederationHandler(BaseHandler): "user_joined_room", user=user, room_id=event.room_id ) - pdu.destinations = yield self.store.get_joined_hosts_for_room( + new_pdu = self.pdu_codec.pdu_from_event(event); + new_pdu.destinations = yield self.store.get_joined_hosts_for_room( event.room_id ) - yield self.replication_layer.send_pdu(pdu) + yield self.replication_layer.send_pdu(new_pdu) defer.returnValue(event.state_events.values()) + @defer.inlineCallbacks + def get_state_for_pdu(self, pdu_id, pdu_origin): + state_groups = yield self.store.get_state_groups( + [encode_event_id(pdu_id, pdu_origin)] + ) + + if state_groups: + defer.returnValue( + [ + self.pdu_codec.pdu_from_event(s) + for s in state_groups[0].state + ] + ) + else: + defer.returnValue([]) + @log_function def _on_user_joined(self, user, room_id): waiters = self.waiting_for_join_list.get((user.to_string(), room_id), []) diff --git a/synapse/state.py b/synapse/state.py index 9be6b716e2..8c4eeb8924 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -130,7 +130,13 @@ class StateHandler(object): defer.returnValue(is_new) @defer.inlineCallbacks - def annotate_state_groups(self, event): + def annotate_state_groups(self, event, state=None): + if state: + event.state_group = None + event.old_state_events = None + event.state_events = state + return + state_groups = yield self.store.get_state_groups( event.prev_events ) @@ -177,7 +183,7 @@ class StateHandler(object): new_powers_deferreds = [] for e in curr_events: new_powers_deferreds.append( - self.store.get_power_level(e.context, e.user_id) + self.store.get_power_level(e.room_id, e.user_id) ) new_powers = yield defer.gatherResults( -- cgit 1.5.1 From 5ffe5ab43fa090111a0141b04ce6342172f60724 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Oct 2014 18:56:42 +0100 Subject: Use state groups to get current state. Make join dance actually work. --- synapse/api/auth.py | 5 +++ synapse/federation/replication.py | 17 +++++++- synapse/federation/transport.py | 57 +++++++++++++++++++++++--- synapse/handlers/federation.py | 74 +++++++++++++++++++++++---------- synapse/handlers/message.py | 6 +-- synapse/rest/base.py | 5 +++ synapse/rest/events.py | 34 ++++++++++------ synapse/state.py | 86 +++++++++++++++++++++++++++------------ synapse/storage/pdu.py | 6 +++ synapse/storage/state.py | 3 ++ 10 files changed, 226 insertions(+), 67 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index d1eca791ab..50ce7eb4cd 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -22,6 +22,7 @@ from synapse.api.errors import AuthError, StoreError, Codes, SynapseError from synapse.api.events.room import ( RoomMemberEvent, RoomPowerLevelsEvent, RoomRedactionEvent, RoomJoinRulesEvent, RoomOpsPowerLevelsEvent, InviteJoinEvent, + RoomCreateEvent, ) from synapse.util.logutils import log_function @@ -59,6 +60,10 @@ class Auth(object): is_state = hasattr(event, "state_key") + if event.type == RoomCreateEvent.TYPE: + # FIXME + defer.returnValue(True) + if event.type == RoomMemberEvent.TYPE: yield self._can_replace_state(event) allowed = yield self.is_membership_change_allowed(event) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index d482193851..8c7d510ef6 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -403,11 +403,18 @@ class ReplicationLayer(object): defer.returnValue( (404, "No handler for Query type '%s'" % (query_type, )) ) + @defer.inlineCallbacks def on_make_join_request(self, context, user_id): pdu = yield self.handler.on_make_join_request(context, user_id) defer.returnValue(pdu.get_dict()) + @defer.inlineCallbacks + def on_invite_request(self, origin, content): + pdu = Pdu(**content) + ret_pdu = yield self.handler.on_send_join_request(origin, pdu) + defer.returnValue((200, ret_pdu.get_dict())) + @defer.inlineCallbacks def on_send_join_request(self, origin, content): pdu = Pdu(**content) @@ -426,8 +433,9 @@ class ReplicationLayer(object): defer.returnValue(Pdu(**pdu_dict)) + @defer.inlineCallbacks def send_join(self, destination, pdu): - return self.transport_layer.send_join( + _, content = yield self.transport_layer.send_join( destination, pdu.context, pdu.pdu_id, @@ -435,6 +443,13 @@ class ReplicationLayer(object): pdu.get_dict(), ) + logger.debug("Got content: %s", content) + pdus = [Pdu(outlier=True, **p) for p in content.get("pdus", [])] + for pdu in pdus: + yield self._handle_new_pdu(destination, pdu) + + defer.returnValue(pdus) + @defer.inlineCallbacks @log_function def _get_persisted_pdu(self, pdu_id, pdu_origin): diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index a0d34fd24d..de64702e2f 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -229,13 +229,36 @@ class TransportLayer(object): pdu_id, ) - response = yield self.client.put_json( + code, content = yield self.client.put_json( destination=destination, path=path, data=content, ) - defer.returnValue(response) + if not 200 <= code < 300: + raise RuntimeError("Got %d from send_join", code) + + defer.returnValue(json.loads(content)) + + @defer.inlineCallbacks + @log_function + def send_invite(self, destination, context, pdu_id, origin, content): + path = PREFIX + "/invite/%s/%s/%s" % ( + context, + origin, + pdu_id, + ) + + code, content = yield self.client.put_json( + destination=destination, + path=path, + data=content, + ) + + if not 200 <= code < 300: + raise RuntimeError("Got %d from send_invite", code) + + defer.returnValue(json.loads(content)) @defer.inlineCallbacks def _authenticate_request(self, request): @@ -297,9 +320,13 @@ class TransportLayer(object): @defer.inlineCallbacks def new_handler(request, *args, **kwargs): (origin, content) = yield self._authenticate_request(request) - response = yield handler( - origin, content, request.args, *args, **kwargs - ) + try: + response = yield handler( + origin, content, request.args, *args, **kwargs + ) + except: + logger.exception("Callback failed") + raise defer.returnValue(response) return new_handler @@ -419,6 +446,17 @@ class TransportLayer(object): ) ) + self.server.register_path( + "PUT", + re.compile("^" + PREFIX + "/invite/([^/]*)/([^/]*)/([^/]*)$"), + self._with_authentication( + lambda origin, content, query, context, pdu_origin, pdu_id: + self._on_invite_request( + origin, content, query, + ) + ) + ) + @defer.inlineCallbacks @log_function def _on_send_request(self, origin, content, query, transaction_id): @@ -524,6 +562,15 @@ class TransportLayer(object): defer.returnValue((200, content)) + @defer.inlineCallbacks + @log_function + def _on_invite_request(self, origin, content, query): + content = yield self.request_handler.on_invite_request( + origin, content, + ) + + defer.returnValue((200, content)) + class TransportReceivedHandler(object): """ Callbacks used when we receive a transaction diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0ae0541bd3..70790aaa72 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -62,6 +62,9 @@ class FederationHandler(BaseHandler): self.pdu_codec = PduCodec(hs) + # When joining a room we need to queue any events for that room up + self.room_queues = {} + @log_function @defer.inlineCallbacks def handle_new_event(self, event, snapshot): @@ -95,22 +98,25 @@ class FederationHandler(BaseHandler): logger.debug("Got event: %s", event.event_id) + if event.room_id in self.room_queues: + self.room_queues[event.room_id].append(pdu) + return + if state: state = [self.pdu_codec.event_from_pdu(p) for p in state] state = {(e.type, e.state_key): e for e in state} - yield self.state_handler.annotate_state_groups(event, state=state) + + is_new_state = yield self.state_handler.annotate_state_groups( + event, + state=state + ) logger.debug("Event: %s", event) if not backfilled: yield self.auth.check(event, None, raises=True) - if event.is_state and not backfilled: - is_new_state = yield self.state_handler.handle_new_state( - pdu - ) - else: - is_new_state = False + is_new_state = is_new_state and not backfilled # TODO: Implement something in federation that allows us to # respond to PDU. @@ -211,6 +217,8 @@ class FederationHandler(BaseHandler): assert(event.state_key == joinee) assert(event.room_id == room_id) + self.room_queues[room_id] = [] + event.event_id = self.event_factory.create_event_id() event.content = content @@ -219,15 +227,14 @@ class FederationHandler(BaseHandler): self.pdu_codec.pdu_from_event(event) ) - # TODO (erikj): Time out here. - d = defer.Deferred() - self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d) - reactor.callLater(10, d.cancel) + state = [self.pdu_codec.event_from_pdu(p) for p in state] - try: - yield d - except defer.CancelledError: - raise SynapseError(500, "Unable to join remote room") + logger.debug("do_invite_join state: %s", state) + + is_new_state = yield self.state_handler.annotate_state_groups( + event, + state=state + ) try: yield self.store.store_room( @@ -239,6 +246,32 @@ class FederationHandler(BaseHandler): # FIXME pass + for e in state: + # FIXME: Auth these. + is_new_state = yield self.state_handler.annotate_state_groups( + e, + state=state + ) + + yield self.store.persist_event( + e, + backfilled=False, + is_new_state=False + ) + + yield self.store.persist_event( + event, + backfilled=False, + is_new_state=is_new_state + ) + + room_queue = self.room_queues[room_id] + del self.room_queues[room_id] + + for p in room_queue: + p.outlier = True + yield self.on_receive_pdu(p, backfilled=False) + defer.returnValue(True) @defer.inlineCallbacks @@ -264,13 +297,9 @@ class FederationHandler(BaseHandler): def on_send_join_request(self, origin, pdu): event = self.pdu_codec.event_from_pdu(pdu) - yield self.state_handler.annotate_state_groups(event) + is_new_state= yield self.state_handler.annotate_state_groups(event) yield self.auth.check(event, None, raises=True) - is_new_state = yield self.state_handler.handle_new_state( - pdu - ) - # FIXME (erikj): All this is duplicated above :( yield self.store.persist_event( @@ -303,7 +332,10 @@ class FederationHandler(BaseHandler): yield self.replication_layer.send_pdu(new_pdu) - defer.returnValue(event.state_events.values()) + defer.returnValue([ + self.pdu_codec.pdu_from_event(e) + for e in event.state_events.values() + ]) @defer.inlineCallbacks def get_state_for_pdu(self, pdu_id, pdu_origin): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 1c2cbce151..4aaf97a83e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -199,7 +199,7 @@ class MessageHandler(BaseHandler): raise RoomError( 403, "Member does not meet private room rules.") - data = yield self.store.get_current_state( + data = yield self.state_handler.get_current_state( room_id, event_type, state_key ) defer.returnValue(data) @@ -238,7 +238,7 @@ class MessageHandler(BaseHandler): yield self.auth.check_joined_room(room_id, user_id) # TODO: This is duplicating logic from snapshot_all_rooms - current_state = yield self.store.get_current_state(room_id) + current_state = yield self.state_handler.get_current_state(room_id) defer.returnValue([self.hs.serialize_event(c) for c in current_state]) @defer.inlineCallbacks @@ -315,7 +315,7 @@ class MessageHandler(BaseHandler): "end": end_token.to_string(), } - current_state = yield self.store.get_current_state( + current_state = yield self.state_handler.get_current_state( event.room_id ) d["state"] = [self.hs.serialize_event(c) for c in current_state] diff --git a/synapse/rest/base.py b/synapse/rest/base.py index 2e8e3fa7d4..dc784c1527 100644 --- a/synapse/rest/base.py +++ b/synapse/rest/base.py @@ -18,6 +18,11 @@ from synapse.api.urls import CLIENT_PREFIX from synapse.rest.transactions import HttpTransactionStore import re +import logging + + +logger = logging.getLogger(__name__) + def client_path_pattern(path_regex): """Creates a regex compiled client path with the correct client path diff --git a/synapse/rest/events.py b/synapse/rest/events.py index 097195d7cc..92ff5e5ca7 100644 --- a/synapse/rest/events.py +++ b/synapse/rest/events.py @@ -20,6 +20,12 @@ from synapse.api.errors import SynapseError from synapse.streams.config import PaginationConfig from synapse.rest.base import RestServlet, client_path_pattern +import logging + + +logger = logging.getLogger(__name__) + + class EventStreamRestServlet(RestServlet): PATTERN = client_path_pattern("/events$") @@ -29,18 +35,22 @@ class EventStreamRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request): auth_user = yield self.auth.get_user_by_req(request) - - handler = self.handlers.event_stream_handler - pagin_config = PaginationConfig.from_request(request) - timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS - if "timeout" in request.args: - try: - timeout = int(request.args["timeout"][0]) - except ValueError: - raise SynapseError(400, "timeout must be in milliseconds.") - - chunk = yield handler.get_stream(auth_user.to_string(), pagin_config, - timeout=timeout) + try: + handler = self.handlers.event_stream_handler + pagin_config = PaginationConfig.from_request(request) + timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS + if "timeout" in request.args: + try: + timeout = int(request.args["timeout"][0]) + except ValueError: + raise SynapseError(400, "timeout must be in milliseconds.") + + chunk = yield handler.get_stream( + auth_user.to_string(), pagin_config, timeout=timeout + ) + except: + logger.exception("Event stream failed") + raise defer.returnValue((200, chunk)) diff --git a/synapse/state.py b/synapse/state.py index 8c4eeb8924..24685c6fb4 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.federation.pdu_codec import encode_event_id, decode_event_id from synapse.util.logutils import log_function +from synapse.federation.pdu_codec import encode_event_id from collections import namedtuple @@ -130,54 +131,89 @@ class StateHandler(object): defer.returnValue(is_new) @defer.inlineCallbacks + @log_function def annotate_state_groups(self, event, state=None): if state: event.state_group = None event.old_state_events = None - event.state_events = state + event.state_events = {(s.type, s.state_key): s for s in state} + defer.returnValue(False) + return + + if hasattr(event, "outlier") and event.outlier: + event.state_group = None + event.old_state_events = None + event.state_events = None + defer.returnValue(False) return + new_state = yield self.resolve_state_groups(event.prev_events) + + event.old_state_events = new_state + + if hasattr(event, "state_key"): + new_state[(event.type, event.state_key)] = event + + event.state_group = None + event.state_events = new_state + + defer.returnValue(hasattr(event, "state_key")) + + @defer.inlineCallbacks + def get_current_state(self, room_id, event_type=None, state_key=""): + # FIXME: HACK! + pdus = yield self.store.get_latest_pdus_in_context(room_id) + + event_ids = [encode_event_id(p.pdu_id, p.origin) for p in pdus] + + res = self.resolve_state_groups(event_ids) + + if event_type: + defer.returnValue(res.get((event_type, state_key))) + return + + defer.returnValue(res.values()) + + @defer.inlineCallbacks + @log_function + def resolve_state_groups(self, event_ids): state_groups = yield self.store.get_state_groups( - event.prev_events + event_ids ) state = {} - state_sets = {} for group in state_groups: for s in group.state: - state.setdefault((s.type, s.state_key), []).append(s) - - state_sets.setdefault( + state.setdefault( (s.type, s.state_key), - set() - ).add(s.event_id) + {} + )[s.event_id] = s unconflicted_state = { - k: state[k].pop() for k, v in state_sets.items() - if len(v) == 1 + k: v.values()[0] for k, v in state.items() + if len(v.values()) == 1 } conflicted_state = { - k: state[k] - for k, v in state_sets.items() - if len(v) > 1 + k: v.values() + for k, v in state.items() + if len(v.values()) > 1 } - new_state = {} - new_state.update(unconflicted_state) - for key, events in conflicted_state.items(): - new_state[key] = yield self.resolve(events) + try: + new_state = {} + new_state.update(unconflicted_state) + for key, events in conflicted_state.items(): + new_state[key] = yield self._resolve_state_events(events) + except: + logger.exception("Failed to resolve state") + raise - event.old_state_events = new_state - - if hasattr(event, "state_key"): - new_state[(event.type, event.state_key)] = event - - event.state_group = None - event.state_events = new_state + defer.returnValue(new_state) @defer.inlineCallbacks - def resolve(self, events): + @log_function + def _resolve_state_events(self, events): curr_events = events new_powers_deferreds = [] diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index d70467dcd6..b1cb0185a6 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -277,6 +277,12 @@ class PduStore(SQLBaseStore): (context, depth) ) + def get_latest_pdus_in_context(self, context): + return self.runInteraction( + self._get_latest_pdus_in_context, + context + ) + def _get_latest_pdus_in_context(self, txn, context): """Get's a list of the most current pdus for a given context. This is used when we are sending a Pdu and need to fill out the `prev_pdus` diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 9496c935a7..0aa979c9f0 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -63,6 +63,9 @@ class StateStore(SQLBaseStore): ) def _store_state_groups_txn(self, txn, event): + if not event.state_events: + return + state_group = event.state_group if not state_group: state_group = self._simple_insert_txn( -- cgit 1.5.1 From b3b19614962d78e7851299ff1f7b41706ced3d00 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Oct 2014 19:37:41 +0100 Subject: Fix bug where people could join private rooms --- synapse/api/auth.py | 86 +++++++++++++++++++++++------------------- synapse/handlers/federation.py | 10 ++++- synapse/state.py | 12 ++++-- 3 files changed, 63 insertions(+), 45 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 50ce7eb4cd..93a3533304 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -49,12 +49,12 @@ class Auth(object): """ try: if hasattr(event, "room_id"): - if not event.old_state_events: + if event.old_state_events is None: # Oh, we don't know what the state of the room was, so we # are trusting that this is allowed (at least for now) defer.returnValue(True) - if hasattr(event, "outlier") and event.outlier: + if hasattr(event, "outlier") and event.outlier is True: # TODO (erikj): Auth for outliers is done differently. defer.returnValue(True) @@ -65,8 +65,12 @@ class Auth(object): defer.returnValue(True) if event.type == RoomMemberEvent.TYPE: - yield self._can_replace_state(event) - allowed = yield self.is_membership_change_allowed(event) + self._can_replace_state(event) + allowed = self.is_membership_change_allowed(event) + if allowed: + logger.debug("Allowing! %s", event) + else: + logger.debug("Denying! %s", event) defer.returnValue(allowed) return @@ -77,24 +81,28 @@ class Auth(object): # TODO (erikj): This really only should be called for *new* # state yield self._can_add_state(event) - yield self._can_replace_state(event) + self._can_replace_state(event) else: yield self._can_send_event(event) if event.type == RoomPowerLevelsEvent.TYPE: - yield self._check_power_levels(event) + self._check_power_levels(event) if event.type == RoomRedactionEvent.TYPE: - yield self._check_redaction(event) + self._check_redaction(event) + + logger.debug("Allowing! %s", event) defer.returnValue(True) else: raise AuthError(500, "Unknown event: %s" % event) except AuthError as e: logger.info("Event auth check failed on event %s with msg: %s", event, e.msg) + logger.info("Denying! %s", event) if raises: raise e + defer.returnValue(False) @defer.inlineCallbacks @@ -126,7 +134,7 @@ class Auth(object): user_id, room_id, repr(member) )) - @defer.inlineCallbacks + @log_function def is_membership_change_allowed(self, event): target_user_id = event.state_key @@ -159,11 +167,23 @@ class Auth(object): ) ban_level, kick_level, redact_level = ( - yield self._get_ops_level_from_event_state( + self._get_ops_level_from_event_state( event ) ) + logger.debug( + "is_membership_change_allowed: %s", + { + "caller_in_room": caller_in_room, + "target_in_room": target_in_room, + "membership": membership, + "join_rule": join_rule, + "target_user_id": target_user_id, + "event.user_id": event.user_id, + } + ) + if Membership.INVITE == membership: # TODO (erikj): We should probably handle this more intelligently # PRIVATE join rules. @@ -183,10 +203,7 @@ class Auth(object): elif join_rule == JoinRules.PUBLIC: pass elif join_rule == JoinRules.INVITE: - if ( - not caller or caller.membership not in - [Membership.INVITE, Membership.JOIN] - ): + if not caller_in_room: raise AuthError(403, "You are not invited to this room.") else: # TODO (erikj): may_join list @@ -218,7 +235,7 @@ class Auth(object): else: raise AuthError(500, "Unknown membership %s" % membership) - defer.returnValue(True) + return True def _get_power_level_from_event_state(self, event, user_id): key = (RoomPowerLevelsEvent.TYPE, "", ) @@ -359,17 +376,7 @@ class Auth(object): defer.returnValue(True) - @defer.inlineCallbacks def _can_replace_state(self, event): - current_state = yield self.store.get_current_state( - event.room_id, - event.type, - event.state_key, - ) - - if current_state: - current_state = current_state[0] - user_level = self._get_power_level_from_event_state( event, event.user_id, @@ -383,6 +390,10 @@ class Auth(object): logger.debug( "Checking power level for %s, %s", event.user_id, user_level ) + + key = (event.type, event.state_key, ) + current_state = event.old_state_events.get(key) + if current_state and hasattr(current_state, "required_power_level"): req = current_state.required_power_level @@ -393,19 +404,20 @@ class Auth(object): "You don't have permission to change that state" ) - @defer.inlineCallbacks def _check_redaction(self, event): - user_level = yield self.store.get_power_level( - event.room_id, - event.user_id, - ) - user_level = self._get_power_level_from_event_state( event, event.user_id, ) - _, _, redact_level = yield self.store.get_ops_levels(event.room_id) + if user_level: + user_level = int(user_level) + else: + user_level = 0 + + _, _, redact_level = self.store._get_ops_level_from_event_state( + event.room_id + ) if not redact_level: redact_level = 50 @@ -416,7 +428,6 @@ class Auth(object): "You don't have permission to redact events" ) - @defer.inlineCallbacks def _check_power_levels(self, event): for k, v in event.content.items(): if k == "default": @@ -436,19 +447,16 @@ class Auth(object): except: raise SynapseError(400, "Not a valid power level: %s" % (v,)) - current_state = yield self.store.get_current_state( - event.room_id, - event.type, - event.state_key, - ) + key = (event.type, event.state_key, ) + current_state = event.old_state_events.get(key) if not current_state: return else: current_state = current_state[0] - user_level = yield self.store.get_power_level( - event.room_id, + user_level = self._get_power_level_from_event_state( + event, event.user_id, ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 70790aaa72..8c80a37164 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -269,12 +269,12 @@ class FederationHandler(BaseHandler): del self.room_queues[room_id] for p in room_queue: - p.outlier = True yield self.on_receive_pdu(p, backfilled=False) defer.returnValue(True) @defer.inlineCallbacks + @log_function def on_make_join_request(self, context, user_id): event = self.event_factory.create_event( etype=RoomMemberEvent.TYPE, @@ -289,15 +289,21 @@ class FederationHandler(BaseHandler): ) snapshot.fill_out_prev_events(event) + yield self.state_handler.annotate_state_groups(event) + yield self.auth.check(event, None, raises=True) + pdu = self.pdu_codec.pdu_from_event(event) defer.returnValue(pdu) @defer.inlineCallbacks + @log_function def on_send_join_request(self, origin, pdu): event = self.pdu_codec.event_from_pdu(pdu) - is_new_state= yield self.state_handler.annotate_state_groups(event) + event.outlier = False + + is_new_state = yield self.state_handler.annotate_state_groups(event) yield self.auth.check(event, None, raises=True) # FIXME (erikj): All this is duplicated above :( diff --git a/synapse/state.py b/synapse/state.py index 24685c6fb4..c062cef757 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -22,6 +22,7 @@ from synapse.federation.pdu_codec import encode_event_id from collections import namedtuple +import copy import logging import hashlib @@ -143,13 +144,13 @@ class StateHandler(object): if hasattr(event, "outlier") and event.outlier: event.state_group = None event.old_state_events = None - event.state_events = None + event.state_events = {} defer.returnValue(False) return new_state = yield self.resolve_state_groups(event.prev_events) - event.old_state_events = new_state + event.old_state_events = copy.deepcopy(new_state) if hasattr(event, "state_key"): new_state[(event.type, event.state_key)] = event @@ -164,9 +165,12 @@ class StateHandler(object): # FIXME: HACK! pdus = yield self.store.get_latest_pdus_in_context(room_id) - event_ids = [encode_event_id(p.pdu_id, p.origin) for p in pdus] + event_ids = [ + encode_event_id(pdu_id, origin) + for pdu_id, origin, _ in pdus + ] - res = self.resolve_state_groups(event_ids) + res = yield self.resolve_state_groups(event_ids) if event_type: defer.returnValue(res.get((event_type, state_key))) -- cgit 1.5.1 From bf8cdda2f598acce5a8694fe6482fc929c53715b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Oct 2014 20:10:34 +0100 Subject: It doesn't want a dict --- synapse/handlers/federation.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8c80a37164..b575986fc3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -104,7 +104,6 @@ class FederationHandler(BaseHandler): if state: state = [self.pdu_codec.event_from_pdu(p) for p in state] - state = {(e.type, e.state_key): e for e in state} is_new_state = yield self.state_handler.annotate_state_groups( event, @@ -250,7 +249,6 @@ class FederationHandler(BaseHandler): # FIXME: Auth these. is_new_state = yield self.state_handler.annotate_state_groups( e, - state=state ) yield self.store.persist_event( -- cgit 1.5.1 From e7858b6d7ef37849a3d2d5004743cdd21ec330a8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 29 Oct 2014 16:59:24 +0000 Subject: Start filling out and using new events tables --- synapse/federation/pdu_codec.py | 12 +++-- synapse/handlers/_base.py | 4 ++ synapse/handlers/federation.py | 90 +++++++++++++++++++--------------- synapse/state.py | 11 +++-- synapse/storage/__init__.py | 45 ++++++++++------- synapse/storage/_base.py | 33 ++++++++++--- synapse/storage/event_federation.py | 49 ++++++++++++------ synapse/storage/schema/event_edges.sql | 8 ++- 8 files changed, 159 insertions(+), 93 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index 2cd591410b..dccbccb85b 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -48,8 +48,8 @@ class PduCodec(object): kwargs["room_id"] = pdu.context kwargs["etype"] = pdu.pdu_type kwargs["prev_events"] = [ - encode_event_id(i, o) - for i, o in pdu.prev_pdus + (encode_event_id(i, o), s) + for i, o, s in pdu.prev_pdus ] if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"): @@ -82,7 +82,13 @@ class PduCodec(object): d["pdu_type"] = event.type if hasattr(event, "prev_events"): - d["prev_pdus"] = [decode_event_id(e) for e in event.prev_events] + def f(e, s): + i, o = decode_event_id(e, self.server_name) + return i, o, s + d["prev_pdus"] = [ + f(e, s) + for e, s in event.prev_events + ] if hasattr(event, "prev_state"): d["prev_state_id"], d["prev_state_origin"] = ( diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index cd6c35f194..787a01efc5 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -16,6 +16,8 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError +from synapse.util.async import run_on_reactor + class BaseHandler(object): def __init__(self, hs): @@ -45,6 +47,8 @@ class BaseHandler(object): @defer.inlineCallbacks def _on_new_room_event(self, event, snapshot, extra_destinations=[], extra_users=[], suppress_auth=False): + yield run_on_reactor() + snapshot.fill_out_prev_events(event) yield self.state_handler.annotate_state_groups(event) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b575986fc3..5f86ed03fa 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -22,6 +22,7 @@ from synapse.api.constants import Membership from synapse.util.logutils import log_function from synapse.federation.pdu_codec import PduCodec, encode_event_id from synapse.api.errors import SynapseError +from synapse.util.async import run_on_reactor from twisted.internet import defer, reactor @@ -81,6 +82,8 @@ class FederationHandler(BaseHandler): processing. """ + yield run_on_reactor() + pdu = self.pdu_codec.pdu_from_event(event) if not hasattr(pdu, "destinations") or not pdu.destinations: @@ -102,6 +105,8 @@ class FederationHandler(BaseHandler): self.room_queues[event.room_id].append(pdu) return + logger.debug("Processing event: %s", event.event_id) + if state: state = [self.pdu_codec.event_from_pdu(p) for p in state] @@ -216,58 +221,65 @@ class FederationHandler(BaseHandler): assert(event.state_key == joinee) assert(event.room_id == room_id) - self.room_queues[room_id] = [] - - event.event_id = self.event_factory.create_event_id() - event.content = content + event.outlier = False - state = yield self.replication_layer.send_join( - target_host, - self.pdu_codec.pdu_from_event(event) - ) + self.room_queues[room_id] = [] - state = [self.pdu_codec.event_from_pdu(p) for p in state] + try: + event.event_id = self.event_factory.create_event_id() + event.content = content - logger.debug("do_invite_join state: %s", state) + state = yield self.replication_layer.send_join( + target_host, + self.pdu_codec.pdu_from_event(event) + ) - is_new_state = yield self.state_handler.annotate_state_groups( - event, - state=state - ) + state = [self.pdu_codec.event_from_pdu(p) for p in state] - try: - yield self.store.store_room( - room_id=room_id, - room_creator_user_id="", - is_public=False - ) - except: - # FIXME - pass + logger.debug("do_invite_join state: %s", state) - for e in state: - # FIXME: Auth these. is_new_state = yield self.state_handler.annotate_state_groups( - e, + event, + state=state ) + logger.debug("do_invite_join event: %s", event) + + try: + yield self.store.store_room( + room_id=room_id, + room_creator_user_id="", + is_public=False + ) + except: + # FIXME + pass + + for e in state: + # FIXME: Auth these. + e.outlier = True + + yield self.state_handler.annotate_state_groups( + e, + ) + + yield self.store.persist_event( + e, + backfilled=False, + is_new_state=False + ) + yield self.store.persist_event( - e, + event, backfilled=False, - is_new_state=False + is_new_state=is_new_state ) + finally: + room_queue = self.room_queues[room_id] + del self.room_queues[room_id] - yield self.store.persist_event( - event, - backfilled=False, - is_new_state=is_new_state - ) - - room_queue = self.room_queues[room_id] - del self.room_queues[room_id] - - for p in room_queue: - yield self.on_receive_pdu(p, backfilled=False) + for p in room_queue: + yield self.on_receive_pdu(p, backfilled=False) defer.returnValue(True) diff --git a/synapse/state.py b/synapse/state.py index cc6a7db96b..993c4f18d3 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -143,7 +143,9 @@ class StateHandler(object): defer.returnValue(False) return - new_state = yield self.resolve_state_groups(event.prev_events) + new_state = yield self.resolve_state_groups( + [e for e, _ in event.prev_events] + ) event.old_state_events = copy.deepcopy(new_state) @@ -157,12 +159,11 @@ class StateHandler(object): @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): - # FIXME: HACK! - pdus = yield self.store.get_latest_pdus_in_context(room_id) + events = yield self.store.get_latest_events_in_room(room_id) event_ids = [ - encode_event_id(pdu_id, origin) - for pdu_id, origin, _ in pdus + e_id + for e_id, _ in events ] res = yield self.resolve_state_groups(event_ids) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f89e518690..d75c366834 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -71,6 +71,7 @@ SCHEMAS = [ "state", "signatures", "event_edges", + "event_signatures", ] @@ -134,7 +135,8 @@ class DataStore(RoomMemberStore, RoomStore, "type", "room_id", "content", - "unrecognized_keys" + "unrecognized_keys", + "depth", ], allow_none=allow_none, ) @@ -263,7 +265,12 @@ class DataStore(RoomMemberStore, RoomStore, vals["unrecognized_keys"] = json.dumps(unrec) try: - self._simple_insert_txn(txn, "events", vals) + self._simple_insert_txn( + txn, + "events", + vals, + or_replace=(not outlier), + ) except: logger.warn( "Failed to persist, probably duplicate: %s", @@ -307,13 +314,14 @@ class DataStore(RoomMemberStore, RoomStore, } ) - signatures = event.signatures.get(event.origin, {}) + if hasattr(event, "signatures"): + signatures = event.signatures.get(event.origin, {}) - for key_id, signature_base64 in signatures.items(): - signature_bytes = decode_base64(signature_base64) - self._store_event_origin_signature_txn( - txn, event.event_id, key_id, signature_bytes, - ) + for key_id, signature_base64 in signatures.items(): + signature_bytes = decode_base64(signature_base64) + self._store_event_origin_signature_txn( + txn, event.event_id, event.origin, key_id, signature_bytes, + ) for prev_event_id, prev_hashes in event.prev_events: for alg, hash_base64 in prev_hashes.items(): @@ -323,10 +331,10 @@ class DataStore(RoomMemberStore, RoomStore, ) # TODO - (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu) - self._store_event_reference_hash_txn( - txn, event.event_id, ref_alg, ref_hash_bytes - ) + # (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu) + # self._store_event_reference_hash_txn( + # txn, event.event_id, ref_alg, ref_hash_bytes + # ) self._update_min_depth_for_room_txn(txn, event.room_id, event.depth) @@ -412,9 +420,7 @@ class DataStore(RoomMemberStore, RoomStore, """ def _snapshot(txn): membership_state = self._get_room_member(txn, user_id, room_id) - prev_events = self._get_latest_events_in_room( - txn, room_id - ) + prev_events = self._get_latest_events_in_room(txn, room_id) if state_type is not None and state_key is not None: prev_state_pdu = self._get_current_state_pdu( @@ -469,12 +475,12 @@ class Snapshot(object): return event.prev_events = [ - (p_id, origin, hashes) - for p_id, origin, hashes, _ in self.prev_events + (event_id, hashes) + for event_id, hashes, _ in self.prev_events ] if self.prev_events: - event.depth = max([int(v) for _, _, _, v in self.prev_events]) + 1 + event.depth = max([int(v) for _, _, v in self.prev_events]) + 1 else: event.depth = 0 @@ -533,9 +539,10 @@ def prepare_database(db_conn): db_conn.commit() else: - sql_script = "BEGIN TRANSACTION;" + sql_script = "BEGIN TRANSACTION;\n" for sql_loc in SCHEMAS: sql_script += read_schema(sql_loc) + sql_script += "\n" sql_script += "COMMIT TRANSACTION;" c.executescript(sql_script) db_conn.commit() diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 30732caa83..464b12f032 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -19,10 +19,12 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.api.events.utils import prune_event from synapse.util.logutils import log_function +from syutil.base64util import encode_base64 import collections import copy import json +import sys import time @@ -67,6 +69,9 @@ class LoggingTransaction(object): return self.txn.execute( sql, *args, **kwargs ) + except: + logger.exception("[SQL FAIL] {%s}", self.name) + raise finally: end = time.clock() * 1000 sql_logger.debug("[SQL time] {%s} %f", self.name, end - start) @@ -85,14 +90,20 @@ class SQLBaseStore(object): """Wraps the .runInteraction() method on the underlying db_pool.""" def inner_func(txn, *args, **kwargs): start = time.clock() * 1000 - txn_id = str(SQLBaseStore._TXN_ID) - SQLBaseStore._TXN_ID += 1 + txn_id = SQLBaseStore._TXN_ID + + # We don't really need these to be unique, so lets stop it from + # growing really large. + self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1) - name = "%s-%s" % (desc, txn_id, ) + name = "%s-%x" % (desc, txn_id, ) transaction_logger.debug("[TXN START] {%s}", name) try: return func(LoggingTransaction(txn, name), *args, **kwargs) + except: + logger.exception("[TXN FAIL] {%s}", name) + raise finally: end = time.clock() * 1000 transaction_logger.debug( @@ -189,7 +200,6 @@ class SQLBaseStore(object): statement returns no rows """ return self._simple_selectupdate_one( - "_simple_select_one", table, keyvalues, retcols=retcols, allow_none=allow_none ) @@ -215,11 +225,11 @@ class SQLBaseStore(object): txn, table=table, keyvalues=keyvalues, - retcols=retcol, + retcol=retcol, ) if ret: - return ret[retcol] + return ret[0] else: if allow_none: return None @@ -434,6 +444,17 @@ class SQLBaseStore(object): sql = "SELECT * FROM events WHERE event_id = ?" for ev in events: + signatures = self._get_event_origin_signatures_txn( + txn, ev.event_id, + ) + + ev.signatures = { + k: encode_base64(v) for k, v in signatures.items() + } + + prev_events = self._get_latest_events_in_room(txn, ev.room_id) + ev.prev_events = [(e_id, s,) for e_id, s, _ in prev_events] + if hasattr(ev, "prev_state"): # Load previous state_content. # TODO: Should we be pulling this out above? diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 7688fc550f..5f94c31818 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -24,6 +24,13 @@ logger = logging.getLogger(__name__) class EventFederationStore(SQLBaseStore): + def get_latest_events_in_room(self, room_id): + return self.runInteraction( + "get_latest_events_in_room", + self._get_latest_events_in_room, + room_id, + ) + def _get_latest_events_in_room(self, txn, room_id): self._simple_select_onecol_txn( txn, @@ -34,12 +41,25 @@ class EventFederationStore(SQLBaseStore): retcol="event_id", ) + sql = ( + "SELECT e.event_id, e.depth FROM events as e " + "INNER JOIN event_forward_extremities as f " + "ON e.event_id = f.event_id " + "WHERE f.room_id = ?" + ) + + txn.execute(sql, (room_id, )) + results = [] - for pdu_id, origin, depth in txn.fetchall(): - hashes = self._get_prev_event_hashes_txn(txn, pdu_id, origin) - sha256_bytes = hashes["sha256"] - prev_hashes = {"sha256": encode_base64(sha256_bytes)} - results.append((pdu_id, origin, prev_hashes, depth)) + for event_id, depth in txn.fetchall(): + hashes = self._get_prev_event_hashes_txn(txn, event_id) + prev_hashes = { + k: encode_base64(v) for k, v in hashes.items() + if k == "sha256" + } + results.append((event_id, prev_hashes, depth)) + + return results def _get_min_depth_interaction(self, txn, room_id): min_depth = self._simple_select_one_onecol_txn( @@ -70,21 +90,21 @@ class EventFederationStore(SQLBaseStore): def _handle_prev_events(self, txn, outlier, event_id, prev_events, room_id): - for e_id in prev_events: + for e_id, _ in prev_events: # TODO (erikj): This could be done as a bulk insert self._simple_insert_txn( txn, table="event_edges", values={ "event_id": event_id, - "prev_event": e_id, + "prev_event_id": e_id, "room_id": room_id, } ) # Update the extremities table if this is not an outlier. if not outlier: - for e_id in prev_events: + for e_id, _ in prev_events: # TODO (erikj): This could be done as a bulk insert self._simple_delete_txn( txn, @@ -116,7 +136,7 @@ class EventFederationStore(SQLBaseStore): # Insert all the prev_pdus as a backwards thing, they'll get # deleted in a second if they're incorrect anyway. - for e_id in prev_events: + for e_id, _ in prev_events: # TODO (erikj): This could be done as a bulk insert self._simple_insert_txn( txn, @@ -130,14 +150,11 @@ class EventFederationStore(SQLBaseStore): # Also delete from the backwards extremities table all ones that # reference pdus that we have already seen query = ( - "DELETE FROM %(event_back)s as b WHERE EXISTS (" - "SELECT 1 FROM %(events)s AS events " + "DELETE FROM event_backward_extremities WHERE EXISTS (" + "SELECT 1 FROM events " "WHERE " - "b.event_id = events.event_id " + "event_backward_extremities.event_id = events.event_id " "AND not events.outlier " ")" - ) % { - "event_back": "event_backward_extremities", - "events": "events", - } + ) txn.execute(query) \ No newline at end of file diff --git a/synapse/storage/schema/event_edges.sql b/synapse/storage/schema/event_edges.sql index 6a28314ece..e5f768c705 100644 --- a/synapse/storage/schema/event_edges.sql +++ b/synapse/storage/schema/event_edges.sql @@ -7,7 +7,7 @@ CREATE TABLE IF NOT EXISTS event_forward_extremities( CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id); CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id); --- + CREATE TABLE IF NOT EXISTS event_backward_extremities( event_id TEXT, @@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS event_backward_extremities( CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id); CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id); --- + CREATE TABLE IF NOT EXISTS event_edges( event_id TEXT, @@ -28,7 +28,6 @@ CREATE TABLE IF NOT EXISTS event_edges( CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id); CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id); --- CREATE TABLE IF NOT EXISTS room_depth( @@ -38,7 +37,7 @@ CREATE TABLE IF NOT EXISTS room_depth( ); CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id); --- + create TABLE IF NOT EXISTS event_destinations( event_id TEXT, @@ -48,4 +47,3 @@ create TABLE IF NOT EXISTS event_destinations( ); CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id); --- \ No newline at end of file -- cgit 1.5.1 From da511334d26a3e2e6e3b8e3d57259896bcd4bb4c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Oct 2014 11:53:35 +0000 Subject: Make federation return the old current state, so that we can use it to do auth --- synapse/handlers/federation.py | 28 +++++++++++++++++++++++----- synapse/state.py | 14 +++++++++----- 2 files changed, 32 insertions(+), 10 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 5f86ed03fa..da99a4b449 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -112,7 +112,7 @@ class FederationHandler(BaseHandler): is_new_state = yield self.state_handler.annotate_state_groups( event, - state=state + old_state=state ) logger.debug("Event: %s", event) @@ -240,7 +240,7 @@ class FederationHandler(BaseHandler): is_new_state = yield self.state_handler.annotate_state_groups( event, - state=state + old_state=state ) logger.debug("do_invite_join event: %s", event) @@ -279,7 +279,10 @@ class FederationHandler(BaseHandler): del self.room_queues[room_id] for p in room_queue: - yield self.on_receive_pdu(p, backfilled=False) + try: + yield self.on_receive_pdu(p, backfilled=False) + except: + pass defer.returnValue(True) @@ -355,15 +358,30 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def get_state_for_pdu(self, pdu_id, pdu_origin): + event_id = encode_event_id(pdu_id, pdu_origin) + state_groups = yield self.store.get_state_groups( - [encode_event_id(pdu_id, pdu_origin)] + [event_id] ) if state_groups: + results = { + (e.type, e.state_key): e for e in state_groups[0].state + } + + event = yield self.store.get_event(event_id) + if hasattr(event, "state_key"): + # Get previous state + if hasattr(event, "prev_state") and event.prev_state: + prev_event = yield self.store.get_event(event.prev_state) + results[(event.type, event.state_key)] = prev_event + else: + del results[(event.type, event.state_key)] + defer.returnValue( [ self.pdu_codec.pdu_from_event(s) - for s in state_groups[0].state + for s in results.values() ] ) else: diff --git a/synapse/state.py b/synapse/state.py index 993c4f18d3..a59688e3b4 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -128,11 +128,15 @@ class StateHandler(object): @defer.inlineCallbacks @log_function - def annotate_state_groups(self, event, state=None): - if state: + def annotate_state_groups(self, event, old_state=None): + if old_state: event.state_group = None - event.old_state_events = None - event.state_events = {(s.type, s.state_key): s for s in state} + event.old_state_events = old_state + event.state_events = {(s.type, s.state_key): s for s in old_state} + + if hasattr(event, "state_key"): + event.state_events[(event.type, event.state_key)] = event + defer.returnValue(False) return @@ -163,7 +167,7 @@ class StateHandler(object): event_ids = [ e_id - for e_id, _ in events + for e_id, _, _ in events ] res = yield self.resolve_state_groups(event_ids) -- cgit 1.5.1 From 12ce441e67a40bb73ac5aca0283c2fe4afac4021 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Oct 2014 17:00:11 +0000 Subject: Convert event ids to be of the form :example.com --- synapse/api/events/factory.py | 6 +++++- synapse/federation/pdu_codec.py | 35 ++++++++++++++++------------------- synapse/handlers/federation.py | 7 +++++-- synapse/server.py | 7 ++++++- synapse/state.py | 17 ++++++++++++----- synapse/types.py | 10 ++++++++++ 6 files changed, 54 insertions(+), 28 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py index 06f3bf232b..750096c618 100644 --- a/synapse/api/events/factory.py +++ b/synapse/api/events/factory.py @@ -21,6 +21,8 @@ from synapse.api.events.room import ( RoomRedactionEvent, ) +from synapse.types import EventID + from synapse.util.stringutils import random_string @@ -59,7 +61,9 @@ class EventFactory(object): local_part = str(int(self.clock.time())) + i + random_string(5) - return "%s@%s" % (local_part, self.hs.hostname) + e_id = EventID.create_local(local_part, self.hs) + + return e_id.to_string() def create_event(self, etype=None, **kwargs): kwargs["type"] = etype diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index dccbccb85b..6d31286290 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -17,22 +17,11 @@ from .units import Pdu from synapse.crypto.event_signing import ( add_event_pdu_content_hash, sign_event_pdu ) +from synapse.types import EventID import copy -def decode_event_id(event_id, server_name): - parts = event_id.split("@") - if len(parts) < 2: - return (event_id, server_name) - else: - return (parts[0], "".join(parts[1:])) - - -def encode_event_id(pdu_id, origin): - return "%s@%s" % (pdu_id, origin) - - class PduCodec(object): def __init__(self, hs): @@ -40,20 +29,28 @@ class PduCodec(object): self.server_name = hs.hostname self.event_factory = hs.get_event_factory() self.clock = hs.get_clock() + self.hs = hs + + def encode_event_id(self, local, domain): + return EventID.create(local, domain, self.hs).to_string() + + def decode_event_id(self, event_id): + e_id = self.hs.parse_eventid(event_id) + return e_id.localpart, e_id.domain def event_from_pdu(self, pdu): kwargs = {} - kwargs["event_id"] = encode_event_id(pdu.pdu_id, pdu.origin) + kwargs["event_id"] = self.encode_event_id(pdu.pdu_id, pdu.origin) kwargs["room_id"] = pdu.context kwargs["etype"] = pdu.pdu_type kwargs["prev_events"] = [ - (encode_event_id(i, o), s) + (self.encode_event_id(i, o), s) for i, o, s in pdu.prev_pdus ] if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"): - kwargs["prev_state"] = encode_event_id( + kwargs["prev_state"] = self.encode_event_id( pdu.prev_state_id, pdu.prev_state_origin ) @@ -75,15 +72,15 @@ class PduCodec(object): def pdu_from_event(self, event): d = event.get_full_dict() - d["pdu_id"], d["origin"] = decode_event_id( - event.event_id, self.server_name + d["pdu_id"], d["origin"] = self.decode_event_id( + event.event_id ) d["context"] = event.room_id d["pdu_type"] = event.type if hasattr(event, "prev_events"): def f(e, s): - i, o = decode_event_id(e, self.server_name) + i, o = self.decode_event_id(e) return i, o, s d["prev_pdus"] = [ f(e, s) @@ -92,7 +89,7 @@ class PduCodec(object): if hasattr(event, "prev_state"): d["prev_state_id"], d["prev_state_origin"] = ( - decode_event_id(event.prev_state, self.server_name) + self.decode_event_id(event.prev_state) ) if hasattr(event, "state_key"): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index da99a4b449..1daeee833b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -20,9 +20,10 @@ from ._base import BaseHandler from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent from synapse.api.constants import Membership from synapse.util.logutils import log_function -from synapse.federation.pdu_codec import PduCodec, encode_event_id +from synapse.federation.pdu_codec import PduCodec from synapse.api.errors import SynapseError from synapse.util.async import run_on_reactor +from synapse.types import EventID from twisted.internet import defer, reactor @@ -358,7 +359,9 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def get_state_for_pdu(self, pdu_id, pdu_origin): - event_id = encode_event_id(pdu_id, pdu_origin) + yield run_on_reactor() + + event_id = EventID.create(pdu_id, pdu_origin, self.hs).to_string() state_groups = yield self.store.get_state_groups( [event_id] diff --git a/synapse/server.py b/synapse/server.py index a4d2d4aba5..d770b20b19 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -28,7 +28,7 @@ from synapse.handlers import Handlers from synapse.rest import RestServletFactory from synapse.state import StateHandler from synapse.storage import DataStore -from synapse.types import UserID, RoomAlias, RoomID +from synapse.types import UserID, RoomAlias, RoomID, EventID from synapse.util import Clock from synapse.util.distributor import Distributor from synapse.util.lockutils import LockManager @@ -143,6 +143,11 @@ class BaseHomeServer(object): object.""" return RoomID.from_string(s, hs=self) + def parse_eventid(self, s): + """Parse the string given by 's' as a Event ID and return a EventID + object.""" + return EventID.from_string(s, hs=self) + def serialize_event(self, e): return serialize_event(self, e) diff --git a/synapse/state.py b/synapse/state.py index a59688e3b4..414701b272 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -16,8 +16,10 @@ from twisted.internet import defer -from synapse.federation.pdu_codec import encode_event_id, decode_event_id from synapse.util.logutils import log_function +from synapse.util.async import run_on_reactor + +from synapse.types import EventID from collections import namedtuple @@ -43,6 +45,7 @@ class StateHandler(object): self.store = hs.get_datastore() self._replication = hs.get_replication_layer() self.server_name = hs.hostname + self.hs = hs @defer.inlineCallbacks @log_function @@ -77,15 +80,17 @@ class StateHandler(object): current_state = snapshot.prev_state_pdu if current_state: - event.prev_state = encode_event_id( - current_state.pdu_id, current_state.origin - ) + event.prev_state = EventID.create( + current_state.pdu_id, current_state.origin, self.hs + ).to_string() # TODO check current_state to see if the min power level is less # than the power level of the user # power_level = self._get_power_level_for_event(event) - pdu_id, origin = decode_event_id(event.event_id, self.server_name) + e_id = self.hs.parse_eventid(event.event_id) + pdu_id = e_id.localpart + origin = e_id.domain yield self.store.update_current_state( pdu_id=pdu_id, @@ -129,6 +134,8 @@ class StateHandler(object): @defer.inlineCallbacks @log_function def annotate_state_groups(self, event, old_state=None): + yield run_on_reactor() + if old_state: event.state_group = None event.old_state_events = old_state diff --git a/synapse/types.py b/synapse/types.py index c51bc8e4f2..649ff2f7d7 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -78,6 +78,11 @@ class DomainSpecificString( """Create a structure on the local domain""" return cls(localpart=localpart, domain=hs.hostname, is_mine=True) + @classmethod + def create(cls, localpart, domain, hs): + is_mine = domain == hs.hostname + return cls(localpart=localpart, domain=domain, is_mine=is_mine) + class UserID(DomainSpecificString): """Structure representing a user ID.""" @@ -94,6 +99,11 @@ class RoomID(DomainSpecificString): SIGIL = "!" +class EventID(DomainSpecificString): + """Structure representing an event id. """ + SIGIL = "$" + + class StreamToken( namedtuple( "Token", -- cgit 1.5.1 From f2de2d644af80557baebf43f64f3968b8ab46d0b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Oct 2014 09:59:02 +0000 Subject: Move the impl of backfill to use events. --- synapse/federation/replication.py | 6 +-- synapse/handlers/federation.py | 27 +++++++++++- synapse/storage/event_federation.py | 86 ++++++++++++++++++++++++++++++++++++- 3 files changed, 114 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 000a3081c2..1628a56294 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -181,7 +181,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def backfill(self, dest, context, limit): + def backfill(self, dest, context, limit, extremities): """Requests some more historic PDUs for the given context from the given destination server. @@ -189,12 +189,12 @@ class ReplicationLayer(object): dest (str): The remote home server to ask. context (str): The context to backfill. limit (int): The maximum number of PDUs to return. + extremities (list): List of PDU id and origins of the first pdus + we have seen from the context Returns: Deferred: Results in the received PDUs. """ - extremities = yield self.store.get_oldest_pdus_in_context(context) - logger.debug("backfill extrem=%s", extremities) # If there are no extremeties then we've (probably) reached the start. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1daeee833b..9f457ce292 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -181,7 +181,17 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks def backfill(self, dest, room_id, limit): - pdus = yield self.replication_layer.backfill(dest, room_id, limit) + extremities = yield self.store.get_oldest_events_in_room(room_id) + + pdus = yield self.replication_layer.backfill( + dest, + room_id, + limit, + extremities=[ + self.pdu_codec.decode_event_id(e) + for e in extremities + ] + ) events = [] @@ -390,6 +400,21 @@ class FederationHandler(BaseHandler): else: defer.returnValue([]) + @defer.inlineCallbacks + @log_function + def on_backfill_request(self, context, pdu_list, limit): + + events = yield self.store.get_backfill_events( + context, + [self.pdu_codec.encode_event_id(i, o) for i, o in pdu_list], + limit + ) + + defer.returnValue([ + self.pdu_codec.pdu_from_event(e) + for e in events + ]) + @log_function def _on_user_joined(self, user, room_id): waiters = self.waiting_for_join_list.get((user.to_string(), room_id), []) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 88d09d9ba8..438b42c1da 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -24,6 +24,23 @@ logger = logging.getLogger(__name__) class EventFederationStore(SQLBaseStore): + def get_oldest_events_in_room(self, room_id): + return self.runInteraction( + "get_oldest_events_in_room", + self._get_oldest_events_in_room_txn, + room_id, + ) + + def _get_oldest_events_in_room_txn(self, txn, room_id): + return self._simple_select_onecol_txn( + txn, + table="event_backward_extremities", + keyvalues={ + "room_id": room_id, + }, + retcol="event_id", + ) + def get_latest_events_in_room(self, room_id): return self.runInteraction( "get_latest_events_in_room", @@ -159,4 +176,71 @@ class EventFederationStore(SQLBaseStore): "AND not events.outlier " ")" ) - txn.execute(query) \ No newline at end of file + txn.execute(query) + + + def get_backfill_events(self, room_id, event_list, limit): + """Get a list of Events for a given topic that occured before (and + including) the pdus in pdu_list. Return a list of max size `limit`. + + Args: + txn + room_id (str) + event_list (list) + limit (int) + + Return: + list: A list of PduTuples + """ + return self.runInteraction( + "get_backfill_events", + self._get_backfill_events, room_id, event_list, limit + ) + + def _get_backfill_events(self, txn, room_id, event_list, limit): + logger.debug( + "_get_backfill_events: %s, %s, %s", + room_id, repr(event_list), limit + ) + + # We seed the pdu_results with the things from the pdu_list. + event_results = event_list + + front = event_list + + query = ( + "SELECT prev_event_id FROM event_edges " + "WHERE room_id = ? AND event_id = ? " + "LIMIT ?" + ) + + # We iterate through all event_ids in `front` to select their previous + # events. These are dumped in `new_front`. + # We continue until we reach the limit *or* new_front is empty (i.e., + # we've run out of things to select + while front and len(event_results) < limit: + + new_front = [] + for event_id in front: + logger.debug( + "_backfill_interaction: id=%s", + event_id + ) + + txn.execute( + query, + (room_id, event_id, limit - len(event_results)) + ) + + for row in txn.fetchall(): + logger.debug( + "_backfill_interaction: got id=%s", + *row + ) + new_front.append(row) + + front = new_front + event_results += new_front + + # We also want to update the `prev_pdus` attributes before returning. + return self._get_pdu_tuples(txn, event_results) -- cgit 1.5.1 From 21fe249d62deafceca05cc114d5d6bec3e815b8c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Oct 2014 10:47:34 +0000 Subject: Actually don't store any PDUs --- synapse/federation/replication.py | 27 +++++++++++++-------------- synapse/handlers/federation.py | 22 ++++++++++++++++++++++ synapse/storage/event_federation.py | 7 +++++++ 3 files changed, 42 insertions(+), 14 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 89dbf3e2e9..a0bd2e0572 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -106,7 +106,6 @@ class ReplicationLayer(object): self.query_handlers[query_type] = handler - @defer.inlineCallbacks @log_function def send_pdu(self, pdu): """Informs the replication layer about a new PDU generated within the @@ -135,7 +134,7 @@ class ReplicationLayer(object): logger.debug("[%s] Persisting PDU", pdu.pdu_id) # Save *before* trying to send - yield self.store.persist_event(pdu=pdu) + # yield self.store.persist_event(pdu=pdu) logger.debug("[%s] Persisted PDU", pdu.pdu_id) logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.pdu_id) @@ -359,12 +358,13 @@ class ReplicationLayer(object): pdu_id, pdu_origin ) else: - results = yield self.store.get_current_state_for_context( - context - ) - pdus = [Pdu.from_pdu_tuple(p) for p in results] - - logger.debug("Context returning %d results", len(pdus)) + raise NotImplementedError("Specify an event") + # results = yield self.store.get_current_state_for_context( + # context + # ) + # pdus = [Pdu.from_pdu_tuple(p) for p in results] + # + # logger.debug("Context returning %d results", len(pdus)) defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) @@ -456,7 +456,6 @@ class ReplicationLayer(object): defer.returnValue(pdus) - @defer.inlineCallbacks @log_function def _get_persisted_pdu(self, pdu_id, pdu_origin): """ Get a PDU from the database with given origin and id. @@ -464,9 +463,7 @@ class ReplicationLayer(object): Returns: Deferred: Results in a `Pdu`. """ - pdu_tuple = yield self.store.get_pdu(pdu_id, pdu_origin) - - defer.returnValue(Pdu.from_pdu_tuple(pdu_tuple)) + return self.handler.get_persisted_pdu(pdu_id, pdu_origin) def _transaction_from_pdus(self, pdu_list): """Returns a new Transaction containing the given PDUs suitable for @@ -502,7 +499,9 @@ class ReplicationLayer(object): # Get missing pdus if necessary. if not pdu.outlier: # We only backfill backwards to the min depth. - min_depth = yield self.store.get_min_depth_for_context(pdu.context) + min_depth = yield self.handler.get_min_depth_for_context( + pdu.context + ) if min_depth and pdu.depth > min_depth: for pdu_id, origin, hashes in pdu.prev_pdus: @@ -529,7 +528,7 @@ class ReplicationLayer(object): ) # Persist the Pdu, but don't mark it as processed yet. - yield self.store.persist_event(pdu=pdu) + # yield self.store.persist_event(pdu=pdu) if not backfilled: ret = yield self.handler.on_receive_pdu( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9f457ce292..18cb1d4e97 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -415,6 +415,28 @@ class FederationHandler(BaseHandler): for e in events ]) + @defer.inlineCallbacks + @log_function + def get_persisted_pdu(self, pdu_id, origin): + """ Get a PDU from the database with given origin and id. + + Returns: + Deferred: Results in a `Pdu`. + """ + event = yield self.store.get_event( + self.pdu_codec.encode_event_id(pdu_id, origin), + allow_none=True, + ) + + if event: + defer.returnValue(self.pdu_codec.pdu_from_event(event)) + else: + defer.returnValue(None) + + @log_function + def get_min_depth_for_context(self, context): + return self.store.get_min_depth(context) + @log_function def _on_user_joined(self, user, room_id): waiters = self.waiting_for_join_list.get((user.to_string(), room_id), []) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 438b42c1da..8357071db6 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -78,6 +78,13 @@ class EventFederationStore(SQLBaseStore): return results + def get_min_depth(self, room_id): + return self.runInteraction( + "get_min_depth", + self._get_min_depth_interaction, + room_id, + ) + def _get_min_depth_interaction(self, txn, room_id): min_depth = self._simple_select_one_onecol_txn( txn, -- cgit 1.5.1 From 7249785bcbe86a4e1c7f6b4f4f5b0601f8a8c47a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Nov 2014 11:33:28 +0000 Subject: Sign events we create. --- synapse/handlers/_base.py | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 787a01efc5..28b64565ae 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -18,6 +18,8 @@ from synapse.api.errors import LimitExceededError from synapse.util.async import run_on_reactor +from synapse.crypto.event_signing import add_hashes_and_signatures + class BaseHandler(object): def __init__(self, hs): @@ -32,6 +34,9 @@ class BaseHandler(object): self.clock = hs.get_clock() self.hs = hs + self.signing_key = hs.config.signing_key[0] + self.server_name = hs.hostname + def ratelimit(self, user_id): time_now = self.clock.time() allowed, time_allowed = self.ratelimiter.send_message( @@ -53,6 +58,10 @@ class BaseHandler(object): yield self.state_handler.annotate_state_groups(event) + yield add_hashes_and_signatures( + event, self.server_name, self.signing_key + ) + if not suppress_auth: yield self.auth.check(event, snapshot, raises=True) -- cgit 1.5.1 From ad6eacb3e9424902da9f83c8f106a4f0169c3108 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Nov 2014 13:06:58 +0000 Subject: Rename PDU fields to match that of events. --- synapse/api/events/utils.py | 2 +- synapse/federation/pdu_codec.py | 48 +--------- synapse/federation/replication.py | 72 ++++++--------- synapse/federation/transport.py | 184 +++++++------------------------------- synapse/federation/units.py | 78 +++------------- synapse/handlers/federation.py | 12 ++- 6 files changed, 80 insertions(+), 316 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/events/utils.py b/synapse/api/events/utils.py index 7fdf45a264..31601fd3a9 100644 --- a/synapse/api/events/utils.py +++ b/synapse/api/events/utils.py @@ -32,7 +32,7 @@ def prune_event(event): def prune_pdu(pdu): """Removes keys that contain unrestricted and non-essential data from a PDU """ - return _prune_event_or_pdu(pdu.pdu_type, pdu) + return _prune_event_or_pdu(pdu.type, pdu) def _prune_event_or_pdu(event_type, event): # Remove all extraneous fields. diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index d4c896e163..5ec97a698e 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -31,39 +31,16 @@ class PduCodec(object): self.clock = hs.get_clock() self.hs = hs - def encode_event_id(self, local, domain): - return local - - def decode_event_id(self, event_id): - e_id = self.hs.parse_eventid(event_id) - return event_id, e_id.domain - def event_from_pdu(self, pdu): kwargs = {} - kwargs["event_id"] = self.encode_event_id(pdu.pdu_id, pdu.origin) - kwargs["room_id"] = pdu.context - kwargs["etype"] = pdu.pdu_type - kwargs["prev_events"] = [ - (self.encode_event_id(i, o), s) - for i, o, s in pdu.prev_pdus - ] - - if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"): - kwargs["prev_state"] = self.encode_event_id( - pdu.prev_state_id, pdu.prev_state_origin - ) + kwargs["etype"] = pdu.type kwargs.update({ k: v for k, v in pdu.get_full_dict().items() if k not in [ - "pdu_id", - "context", - "pdu_type", - "prev_pdus", - "prev_state_id", - "prev_state_origin", + "type", ] }) @@ -72,33 +49,12 @@ class PduCodec(object): def pdu_from_event(self, event): d = event.get_full_dict() - d["pdu_id"], d["origin"] = self.decode_event_id( - event.event_id - ) - d["context"] = event.room_id - d["pdu_type"] = event.type - - if hasattr(event, "prev_events"): - def f(e, s): - i, o = self.decode_event_id(e) - return i, o, s - d["prev_pdus"] = [ - f(e, s) - for e, s in event.prev_events - ] - - if hasattr(event, "prev_state"): - d["prev_state_id"], d["prev_state_origin"] = ( - self.decode_event_id(event.prev_state) - ) - if hasattr(event, "state_key"): d["is_state"] = True kwargs = copy.deepcopy(event.unrecognized_keys) kwargs.update({ k: v for k, v in d.items() - if k not in ["event_id", "room_id", "type", "prev_events"] }) if "origin_server_ts" not in kwargs: diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 159af4eed7..838e660a46 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -111,14 +111,6 @@ class ReplicationLayer(object): """Informs the replication layer about a new PDU generated within the home server that should be transmitted to others. - This will fill out various attributes on the PDU object, e.g. the - `prev_pdus` key. - - *Note:* The home server should always call `send_pdu` even if it knows - that it does not need to be replicated to other home servers. This is - in case e.g. someone else joins via a remote home server and then - backfills. - TODO: Figure out when we should actually resolve the deferred. Args: @@ -131,18 +123,12 @@ class ReplicationLayer(object): order = self._order self._order += 1 - logger.debug("[%s] Persisting PDU", pdu.pdu_id) - - # Save *before* trying to send - # yield self.store.persist_event(pdu=pdu) - - logger.debug("[%s] Persisted PDU", pdu.pdu_id) - logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.pdu_id) + logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id) # TODO, add errback, etc. self._transaction_queue.enqueue_pdu(pdu, order) - logger.debug("[%s] transaction_layer.enqueue_pdu... done", pdu.pdu_id) + logger.debug("[%s] transaction_layer.enqueue_pdu... done", pdu.event_id) @log_function def send_edu(self, destination, edu_type, content): @@ -215,7 +201,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def get_pdu(self, destination, pdu_origin, pdu_id, outlier=False): + def get_pdu(self, destination, event_id, outlier=False): """Requests the PDU with given origin and ID from the remote home server. @@ -224,7 +210,7 @@ class ReplicationLayer(object): Args: destination (str): Which home server to query pdu_origin (str): The home server that originally sent the pdu. - pdu_id (str) + event_id (str) outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if it's from an arbitary point in the context as opposed to part of the current block of PDUs. Defaults to `False` @@ -233,8 +219,9 @@ class ReplicationLayer(object): Deferred: Results in the requested PDU. """ - transaction_data = yield self.transport_layer.get_pdu( - destination, pdu_origin, pdu_id) + transaction_data = yield self.transport_layer.get_event( + destination, event_id + ) transaction = Transaction(**transaction_data) @@ -249,8 +236,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def get_state_for_context(self, destination, context, pdu_id=None, - pdu_origin=None): + def get_state_for_context(self, destination, context, event_id=None): """Requests all of the `current` state PDUs for a given context from a remote home server. @@ -263,7 +249,9 @@ class ReplicationLayer(object): """ transaction_data = yield self.transport_layer.get_context_state( - destination, context, pdu_id=pdu_id, pdu_origin=pdu_origin, + destination, + context, + event_id=event_id, ) transaction = Transaction(**transaction_data) @@ -352,10 +340,10 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_context_state_request(self, context, pdu_id, pdu_origin): - if pdu_id and pdu_origin: + def on_context_state_request(self, context, event_id): + if event_id: pdus = yield self.handler.get_state_for_pdu( - pdu_id, pdu_origin + event_id ) else: raise NotImplementedError("Specify an event") @@ -370,8 +358,8 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_pdu_request(self, pdu_origin, pdu_id): - pdu = yield self._get_persisted_pdu(pdu_id, pdu_origin) + def on_pdu_request(self, event_id): + pdu = yield self._get_persisted_pdu(event_id) if pdu: defer.returnValue( @@ -443,9 +431,8 @@ class ReplicationLayer(object): def send_join(self, destination, pdu): _, content = yield self.transport_layer.send_join( destination, - pdu.context, - pdu.pdu_id, - pdu.origin, + pdu.room_id, + pdu.event_id, pdu.get_dict(), ) @@ -457,13 +444,13 @@ class ReplicationLayer(object): defer.returnValue(pdus) @log_function - def _get_persisted_pdu(self, pdu_id, pdu_origin): + def _get_persisted_pdu(self, event_id): """ Get a PDU from the database with given origin and id. Returns: Deferred: Results in a `Pdu`. """ - return self.handler.get_persisted_pdu(pdu_id, pdu_origin) + return self.handler.get_persisted_pdu(event_id) def _transaction_from_pdus(self, pdu_list): """Returns a new Transaction containing the given PDUs suitable for @@ -487,10 +474,10 @@ class ReplicationLayer(object): @log_function def _handle_new_pdu(self, origin, pdu, backfilled=False): # We reprocess pdus when we have seen them only as outliers - existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin) + existing = yield self._get_persisted_pdu(pdu.event_id) if existing and (not existing.outlier or pdu.outlier): - logger.debug("Already seen pdu %s %s", pdu.pdu_id, pdu.origin) + logger.debug("Already seen pdu %s", pdu.event_id) defer.returnValue({}) return @@ -500,23 +487,22 @@ class ReplicationLayer(object): if not pdu.outlier: # We only backfill backwards to the min depth. min_depth = yield self.handler.get_min_depth_for_context( - pdu.context + pdu.room_id ) if min_depth and pdu.depth > min_depth: - for pdu_id, origin, hashes in pdu.prev_pdus: - exists = yield self._get_persisted_pdu(pdu_id, origin) + for event_id, hashes in pdu.prev_events: + exists = yield self._get_persisted_pdu(event_id) if not exists: - logger.debug("Requesting pdu %s %s", pdu_id, origin) + logger.debug("Requesting pdu %s", event_id) try: yield self.get_pdu( pdu.origin, - pdu_id=pdu_id, - pdu_origin=origin + event_id=event_id, ) - logger.debug("Processed pdu %s %s", pdu_id, origin) + logger.debug("Processed pdu %s", event_id) except: # TODO(erikj): Do some more intelligent retries. logger.exception("Failed to get PDU") @@ -524,7 +510,7 @@ class ReplicationLayer(object): # We need to get the state at this event, since we have reached # a backward extremity edge. state = yield self.get_state_for_context( - origin, pdu.context, pdu.pdu_id, pdu.origin, + origin, pdu.room_id, pdu.event_id, ) # Persist the Pdu, but don't mark it as processed yet. diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index 7f01b4faaf..04ad7e63ae 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -72,8 +72,7 @@ class TransportLayer(object): self.received_handler = None @log_function - def get_context_state(self, destination, context, pdu_id=None, - pdu_origin=None): + def get_context_state(self, destination, context, event_id=None): """ Requests all state for a given context (i.e. room) from the given server. @@ -91,60 +90,59 @@ class TransportLayer(object): subpath = "/state/%s/" % context args = {} - if pdu_id and pdu_origin: - args["pdu_id"] = pdu_id - args["pdu_origin"] = pdu_origin + if event_id: + args["event_id"] = event_id return self._do_request_for_transaction( destination, subpath, args=args ) @log_function - def get_pdu(self, destination, pdu_origin, pdu_id): + def get_event(self, destination, event_id): """ Requests the pdu with give id and origin from the given server. Args: destination (str): The host name of the remote home server we want to get the state from. - pdu_origin (str): The home server which created the PDU. - pdu_id (str): The id of the PDU being requested. + event_id (str): The id of the event being requested. Returns: Deferred: Results in a dict received from the remote homeserver. """ - logger.debug("get_pdu dest=%s, pdu_origin=%s, pdu_id=%s", - destination, pdu_origin, pdu_id) + logger.debug("get_pdu dest=%s, event_id=%s", + destination, event_id) - subpath = "/pdu/%s/%s/" % (pdu_origin, pdu_id) + subpath = "/event/%s/" % (event_id, ) return self._do_request_for_transaction(destination, subpath) @log_function - def backfill(self, dest, context, pdu_tuples, limit): + def backfill(self, dest, context, event_tuples, limit): """ Requests `limit` previous PDUs in a given context before list of PDUs. Args: dest (str) context (str) - pdu_tuples (list) + event_tuples (list) limt (int) Returns: Deferred: Results in a dict received from the remote homeserver. """ logger.debug( - "backfill dest=%s, context=%s, pdu_tuples=%s, limit=%s", - dest, context, repr(pdu_tuples), str(limit) + "backfill dest=%s, context=%s, event_tuples=%s, limit=%s", + dest, context, repr(event_tuples), str(limit) ) - if not pdu_tuples: + if not event_tuples: + # TODO: raise? return - subpath = "/backfill/%s/" % context + subpath = "/backfill/%s/" % (context,) args = { - "v": ["%s,%s" % (i, o) for i, o in pdu_tuples], + "v": event_tuples, "limit": limit, } @@ -222,11 +220,10 @@ class TransportLayer(object): @defer.inlineCallbacks @log_function - def send_join(self, destination, context, pdu_id, origin, content): - path = PREFIX + "/send_join/%s/%s/%s" % ( + def send_join(self, destination, context, event_id, content): + path = PREFIX + "/send_join/%s/%s" % ( context, - origin, - pdu_id, + event_id, ) code, content = yield self.client.put_json( @@ -242,11 +239,10 @@ class TransportLayer(object): @defer.inlineCallbacks @log_function - def send_invite(self, destination, context, pdu_id, origin, content): - path = PREFIX + "/invite/%s/%s/%s" % ( + def send_invite(self, destination, context, event_id, content): + path = PREFIX + "/invite/%s/%s" % ( context, - origin, - pdu_id, + event_id, ) code, content = yield self.client.put_json( @@ -376,10 +372,10 @@ class TransportLayer(object): # data_id pair. self.server.register_path( "GET", - re.compile("^" + PREFIX + "/pdu/([^/]*)/([^/]*)/$"), + re.compile("^" + PREFIX + "/event/([^/]*)/$"), self._with_authentication( - lambda origin, content, query, pdu_origin, pdu_id: - handler.on_pdu_request(pdu_origin, pdu_id) + lambda origin, content, query, event_id: + handler.on_pdu_request(event_id) ) ) @@ -391,8 +387,7 @@ class TransportLayer(object): lambda origin, content, query, context: handler.on_context_state_request( context, - query.get("pdu_id", [None])[0], - query.get("pdu_origin", [None])[0] + query.get("event_id", [None])[0], ) ) ) @@ -442,9 +437,9 @@ class TransportLayer(object): self.server.register_path( "PUT", - re.compile("^" + PREFIX + "/send_join/([^/]*)/([^/]*)/([^/]*)$"), + re.compile("^" + PREFIX + "/send_join/([^/]*)/([^/]*)$"), self._with_authentication( - lambda origin, content, query, context, pdu_origin, pdu_id: + lambda origin, content, query, context, event_id: self._on_send_join_request( origin, content, query, ) @@ -453,9 +448,9 @@ class TransportLayer(object): self.server.register_path( "PUT", - re.compile("^" + PREFIX + "/invite/([^/]*)/([^/]*)/([^/]*)$"), + re.compile("^" + PREFIX + "/invite/([^/]*)/([^/]*)$"), self._with_authentication( - lambda origin, content, query, context, pdu_origin, pdu_id: + lambda origin, content, query, context, event_id: self._on_invite_request( origin, content, query, ) @@ -548,7 +543,7 @@ class TransportLayer(object): limit = int(limits[-1]) - versions = [v.split(",", 1) for v in v_list] + versions = v_list return self.request_handler.on_backfill_request( context, versions, limit @@ -579,120 +574,3 @@ class TransportLayer(object): ) defer.returnValue((200, content)) - - -class TransportReceivedHandler(object): - """ Callbacks used when we receive a transaction - """ - def on_incoming_transaction(self, transaction): - """ Called on PUT /send/, or on response to a request - that we sent (e.g. a backfill request) - - Args: - transaction (synapse.transaction.Transaction): The transaction that - was sent to us. - - Returns: - twisted.internet.defer.Deferred: A deferred that gets fired when - the transaction has finished being processed. - - The result should be a tuple in the form of - `(response_code, respond_body)`, where `response_body` is a python - dict that will get serialized to JSON. - - On errors, the dict should have an `error` key with a brief message - of what went wrong. - """ - pass - - -class TransportRequestHandler(object): - """ Handlers used when someone want's data from us - """ - def on_pull_request(self, versions): - """ Called on GET /pull/?v=... - - This is hit when a remote home server wants to get all data - after a given transaction. Mainly used when a home server comes back - online and wants to get everything it has missed. - - Args: - versions (list): A list of transaction_ids that should be used to - determine what PDUs the remote side have not yet seen. - - Returns: - Deferred: Resultsin a tuple in the form of - `(response_code, respond_body)`, where `response_body` is a python - dict that will get serialized to JSON. - - On errors, the dict should have an `error` key with a brief message - of what went wrong. - """ - pass - - def on_pdu_request(self, pdu_origin, pdu_id): - """ Called on GET /pdu/// - - Someone wants a particular PDU. This PDU may or may not have originated - from us. - - Args: - pdu_origin (str) - pdu_id (str) - - Returns: - Deferred: Resultsin a tuple in the form of - `(response_code, respond_body)`, where `response_body` is a python - dict that will get serialized to JSON. - - On errors, the dict should have an `error` key with a brief message - of what went wrong. - """ - pass - - def on_context_state_request(self, context): - """ Called on GET /state// - - Gets hit when someone wants all the *current* state for a given - contexts. - - Args: - context (str): The name of the context that we're interested in. - - Returns: - twisted.internet.defer.Deferred: A deferred that gets fired when - the transaction has finished being processed. - - The result should be a tuple in the form of - `(response_code, respond_body)`, where `response_body` is a python - dict that will get serialized to JSON. - - On errors, the dict should have an `error` key with a brief message - of what went wrong. - """ - pass - - def on_backfill_request(self, context, versions, limit): - """ Called on GET /backfill//?v=...&limit=... - - Gets hit when we want to backfill backwards on a given context from - the given point. - - Args: - context (str): The context to backfill - versions (list): A list of 2-tuples representing where to backfill - from, in the form `(pdu_id, origin)` - limit (int): How many pdus to return. - - Returns: - Deferred: Results in a tuple in the form of - `(response_code, respond_body)`, where `response_body` is a python - dict that will get serialized to JSON. - - On errors, the dict should have an `error` key with a brief message - of what went wrong. - """ - pass - - def on_query_request(self): - """ Called on a GET /query/ request. """ diff --git a/synapse/federation/units.py b/synapse/federation/units.py index adc3385644..c94dcf64cf 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -34,13 +34,13 @@ class Pdu(JsonEncodedObject): A Pdu can be classified as "state". For a given context, we can efficiently retrieve all state pdu's that haven't been clobbered. Clobbering is done - via a unique constraint on the tuple (context, pdu_type, state_key). A pdu + via a unique constraint on the tuple (context, type, state_key). A pdu is a state pdu if `is_state` is True. Example pdu:: { - "pdu_id": "78c", + "event_id": "$78c:example.com", "origin_server_ts": 1404835423000, "origin": "bar", "prev_ids": [ @@ -53,14 +53,14 @@ class Pdu(JsonEncodedObject): """ valid_keys = [ - "pdu_id", - "context", + "event_id", + "room_id", "origin", "origin_server_ts", - "pdu_type", + "type", "destinations", "transaction_id", - "prev_pdus", + "prev_events", "depth", "content", "outlier", @@ -68,8 +68,7 @@ class Pdu(JsonEncodedObject): "signatures", "is_state", # Below this are keys valid only for State Pdus. "state_key", - "prev_state_id", - "prev_state_origin", + "prev_state", "required_power_level", "user_id", ] @@ -81,18 +80,18 @@ class Pdu(JsonEncodedObject): ] required_keys = [ - "pdu_id", - "context", + "event_id", + "room_id", "origin", "origin_server_ts", - "pdu_type", + "type", "content", ] # TODO: We need to make this properly load content rather than # just leaving it as a dict. (OR DO WE?!) - def __init__(self, destinations=[], is_state=False, prev_pdus=[], + def __init__(self, destinations=[], is_state=False, prev_events=[], outlier=False, hashes={}, signatures={}, **kwargs): if is_state: for required_key in ["state_key"]: @@ -102,66 +101,13 @@ class Pdu(JsonEncodedObject): super(Pdu, self).__init__( destinations=destinations, is_state=bool(is_state), - prev_pdus=prev_pdus, + prev_events=prev_events, outlier=outlier, hashes=hashes, signatures=signatures, **kwargs ) - @classmethod - def from_pdu_tuple(cls, pdu_tuple): - """ Converts a PduTuple to a Pdu - - Args: - pdu_tuple (synapse.persistence.transactions.PduTuple): The tuple to - convert - - Returns: - Pdu - """ - if pdu_tuple: - d = copy.copy(pdu_tuple.pdu_entry._asdict()) - d["origin_server_ts"] = d.pop("ts") - - for k in d.keys(): - if d[k] is None: - del d[k] - - d["content"] = json.loads(d["content_json"]) - del d["content_json"] - - args = {f: d[f] for f in cls.valid_keys if f in d} - if "unrecognized_keys" in d and d["unrecognized_keys"]: - args.update(json.loads(d["unrecognized_keys"])) - - hashes = { - alg: encode_base64(hsh) - for alg, hsh in pdu_tuple.hashes.items() - } - - signatures = { - kid: encode_base64(sig) - for kid, sig in pdu_tuple.signatures.items() - } - - prev_pdus = [] - for prev_pdu in pdu_tuple.prev_pdu_list: - prev_hashes = pdu_tuple.edge_hashes.get(prev_pdu, {}) - prev_hashes = { - alg: encode_base64(hsh) for alg, hsh in prev_hashes.items() - } - prev_pdus.append((prev_pdu[0], prev_pdu[1], prev_hashes)) - - return Pdu( - prev_pdus=prev_pdus, - hashes=hashes, - signatures=signatures, - **args - ) - else: - return None - def __str__(self): return "(%s, %s)" % (self.__class__.__name__, repr(self.__dict__)) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 18cb1d4e97..bdd28f04bb 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -139,7 +139,7 @@ class FederationHandler(BaseHandler): # Huh, let's try and get the current state try: yield self.replication_layer.get_state_for_context( - event.origin, event.room_id, pdu.pdu_id, pdu.origin, + event.origin, event.room_id, event.event_id, ) hosts = yield self.store.get_joined_hosts_for_room( @@ -368,11 +368,9 @@ class FederationHandler(BaseHandler): ]) @defer.inlineCallbacks - def get_state_for_pdu(self, pdu_id, pdu_origin): + def get_state_for_pdu(self, event_id): yield run_on_reactor() - event_id = EventID.create(pdu_id, pdu_origin, self.hs).to_string() - state_groups = yield self.store.get_state_groups( [event_id] ) @@ -406,7 +404,7 @@ class FederationHandler(BaseHandler): events = yield self.store.get_backfill_events( context, - [self.pdu_codec.encode_event_id(i, o) for i, o in pdu_list], + pdu_list, limit ) @@ -417,14 +415,14 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def get_persisted_pdu(self, pdu_id, origin): + def get_persisted_pdu(self, event_id): """ Get a PDU from the database with given origin and id. Returns: Deferred: Results in a `Pdu`. """ event = yield self.store.get_event( - self.pdu_codec.encode_event_id(pdu_id, origin), + event_id, allow_none=True, ) -- cgit 1.5.1 From aa76bf39aba2eadf506f57952a1dffce629f2637 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Nov 2014 14:14:02 +0000 Subject: Remove unused imports --- synapse/federation/persistence.py | 2 -- synapse/federation/units.py | 3 --- synapse/handlers/federation.py | 6 ++---- synapse/state.py | 2 -- synapse/storage/_base.py | 2 -- 5 files changed, 2 insertions(+), 13 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index b04fbb4177..73dc844d59 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -21,8 +21,6 @@ These actions are mostly only used by the :py:mod:`.replication` module. from twisted.internet import defer -from .units import Pdu - from synapse.util.logutils import log_function import json diff --git a/synapse/federation/units.py b/synapse/federation/units.py index c2d8dca8f3..9b25556707 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -18,11 +18,8 @@ server protocol. """ from synapse.util.jsonobject import JsonEncodedObject -from syutil.base64util import encode_base64 import logging -import json -import copy logger = logging.getLogger(__name__) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bdd28f04bb..49bfff88a4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -17,15 +17,13 @@ from ._base import BaseHandler -from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent +from synapse.api.events.room import RoomMemberEvent from synapse.api.constants import Membership from synapse.util.logutils import log_function from synapse.federation.pdu_codec import PduCodec -from synapse.api.errors import SynapseError from synapse.util.async import run_on_reactor -from synapse.types import EventID -from twisted.internet import defer, reactor +from twisted.internet import defer import logging diff --git a/synapse/state.py b/synapse/state.py index f4efc287c9..9771883bc3 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -19,8 +19,6 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor -from synapse.types import EventID - from collections import namedtuple import copy diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 464b12f032..5e00c23fd1 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -14,8 +14,6 @@ # limitations under the License. import logging -from twisted.internet import defer - from synapse.api.errors import StoreError from synapse.api.events.utils import prune_event from synapse.util.logutils import log_function -- cgit 1.5.1 From 2a49f177fe39ee15c7fc2915db9aadb6152bc547 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Nov 2014 15:10:43 +0000 Subject: On AuthError, raise a FederationError --- synapse/handlers/federation.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 49bfff88a4..8848656b1d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -17,6 +17,7 @@ from ._base import BaseHandler +from synapse.api.errors import AuthError, FederationError from synapse.api.events.room import RoomMemberEvent from synapse.api.constants import Membership from synapse.util.logutils import log_function @@ -116,8 +117,15 @@ class FederationHandler(BaseHandler): logger.debug("Event: %s", event) - if not backfilled: + try: yield self.auth.check(event, None, raises=True) + except AuthError as e: + raise FederationError( + "ERROR", + e.code, + e.msg, + affected=event.event_id, + ) is_new_state = is_new_state and not backfilled -- cgit 1.5.1 From da4a09f977181ae222438dc75f5717cbebc6fe86 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Nov 2014 16:51:23 +0000 Subject: Don't bother locking --- synapse/handlers/federation.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8848656b1d..06a2dabae2 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -132,12 +132,11 @@ class FederationHandler(BaseHandler): # TODO: Implement something in federation that allows us to # respond to PDU. - with (yield self.room_lock.lock(event.room_id)): - yield self.store.persist_event( - event, - backfilled, - is_new_state=is_new_state - ) + yield self.store.persist_event( + event, + backfilled, + is_new_state=is_new_state + ) room = yield self.store.get_room(event.room_id) -- cgit 1.5.1 From dfb3d21a6d2560668a637bba149bf34c7d413125 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Nov 2014 17:12:39 +0000 Subject: Fix room handler tests --- synapse/handlers/room.py | 1 - tests/handlers/test_room.py | 146 ++++++-------------------------------------- 2 files changed, 19 insertions(+), 128 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ffc0892f1a..f176ad39bf 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -408,7 +408,6 @@ class RoomMemberHandler(BaseHandler): defer.returnValue({}) return - yield self.state_handler.handle_new_event(event, snapshot) yield self._do_local_membership_update( event, membership=event.content["membership"], diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index c88d1c8840..57a8a1b13d 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -18,7 +18,7 @@ from twisted.internet import defer from tests import unittest from synapse.api.events.room import ( - InviteJoinEvent, RoomMemberEvent, RoomConfigEvent + RoomMemberEvent, ) from synapse.api.constants import Membership from synapse.handlers.room import RoomMemberHandler, RoomCreationHandler @@ -34,6 +34,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): def setUp(self): self.mock_config = NonCallableMock() self.mock_config.signing_key = [MockKey()] + self.hostname = "red" hs = HomeServer( self.hostname, @@ -58,7 +59,10 @@ class RoomMemberHandlerTestCase(unittest.TestCase): "federation_handler", ]), auth=NonCallableMock(spec_set=["check"]), - state_handler=NonCallableMock(spec_set=["handle_new_event"]), + state_handler=NonCallableMock(spec_set=[ + "handle_new_event", + "annotate_state_groups", + ]), config=self.mock_config, ) @@ -114,6 +118,8 @@ class RoomMemberHandlerTestCase(unittest.TestCase): store_id = "store_id_fooo" self.datastore.persist_event.return_value = defer.succeed(store_id) + self.datastore.get_room_member.return_value = defer.succeed(None) + # Actual invocation yield self.room_member_handler.change_membership(event) @@ -202,133 +208,16 @@ class RoomMemberHandlerTestCase(unittest.TestCase): join_signal_observer.assert_called_with( user=user, room_id=room_id) - @defer.inlineCallbacks - def STALE_test_invite_join(self): - room_id = "foo" - user_id = "@bob:red" - target_user_id = "@bob:red" - content = {"membership": Membership.JOIN} - - event = self.hs.get_event_factory().create_event( - etype=RoomMemberEvent.TYPE, - user_id=user_id, - target_user_id=target_user_id, - room_id=room_id, - membership=Membership.JOIN, - content=content, - ) - - joined = ["red", "blue", "green"] - - self.state_handler.handle_new_event.return_value = defer.succeed(True) - self.datastore.get_joined_hosts_for_room.return_value = ( - defer.succeed(joined) - ) - - store_id = "store_id_fooo" - self.datastore.store_room_member.return_value = defer.succeed(store_id) - self.datastore.get_room.return_value = defer.succeed(None) - - prev_state = NonCallableMock(name="prev_state") - prev_state.membership = Membership.INVITE - prev_state.sender = "@foo:blue" - self.datastore.get_room_member.return_value = defer.succeed(prev_state) - - # Actual invocation - yield self.room_member_handler.change_membership(event) - - self.datastore.get_room_member.assert_called_once_with( - target_user_id, room_id - ) - - self.assertTrue(self.federation.handle_new_event.called) - args = self.federation.handle_new_event.call_args[0] - invite_join_event = args[0] - - self.assertTrue(InviteJoinEvent.TYPE, invite_join_event.TYPE) - self.assertTrue("blue", invite_join_event.target_host) - self.assertTrue(room_id, invite_join_event.room_id) - self.assertTrue(user_id, invite_join_event.user_id) - self.assertFalse(hasattr(invite_join_event, "state_key")) - - self.assertEquals( - set(["blue"]), - set(invite_join_event.destinations) - ) - - self.federation.get_state_for_room.assert_called_once_with( - "blue", room_id - ) - - self.assertFalse(self.datastore.store_room_member.called) - - self.assertFalse(self.notifier.on_new_room_event.called) - self.assertFalse(self.state_handler.handle_new_event.called) - - @defer.inlineCallbacks - def STALE_test_invite_join_public(self): - room_id = "#foo:blue" - user_id = "@bob:red" - target_user_id = "@bob:red" - content = {"membership": Membership.JOIN} - - event = self.hs.get_event_factory().create_event( - etype=RoomMemberEvent.TYPE, - user_id=user_id, - target_user_id=target_user_id, - room_id=room_id, - membership=Membership.JOIN, - content=content, - ) - - joined = ["red", "blue", "green"] - - self.state_handler.handle_new_event.return_value = defer.succeed(True) - self.datastore.get_joined_hosts_for_room.return_value = ( - defer.succeed(joined) - ) - - store_id = "store_id_fooo" - self.datastore.store_room_member.return_value = defer.succeed(store_id) - self.datastore.get_room.return_value = defer.succeed(None) - - prev_state = NonCallableMock(name="prev_state") - prev_state.membership = Membership.INVITE - prev_state.sender = "@foo:blue" - self.datastore.get_room_member.return_value = defer.succeed(prev_state) - - # Actual invocation - yield self.room_member_handler.change_membership(event) - - self.assertTrue(self.federation.handle_new_event.called) - args = self.federation.handle_new_event.call_args[0] - invite_join_event = args[0] - - self.assertTrue(InviteJoinEvent.TYPE, invite_join_event.TYPE) - self.assertTrue("blue", invite_join_event.target_host) - self.assertTrue("foo", invite_join_event.room_id) - self.assertTrue(user_id, invite_join_event.user_id) - self.assertFalse(hasattr(invite_join_event, "state_key")) - - self.assertEquals( - set(["blue"]), - set(invite_join_event.destinations) - ) - - self.federation.get_state_for_room.assert_called_once_with( - "blue", "foo" - ) - - self.assertFalse(self.datastore.store_room_member.called) - - self.assertFalse(self.notifier.on_new_room_event.called) - self.assertFalse(self.state_handler.handle_new_event.called) - class RoomCreationTest(unittest.TestCase): def setUp(self): self.hostname = "red" + + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + + hs = HomeServer( self.hostname, db_pool=None, @@ -346,11 +235,14 @@ class RoomCreationTest(unittest.TestCase): "federation_handler", ]), auth=NonCallableMock(spec_set=["check"]), - state_handler=NonCallableMock(spec_set=["handle_new_event"]), + state_handler=NonCallableMock(spec_set=[ + "handle_new_event", + "annotate_state_groups", + ]), ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), - config=NonCallableMock(), + config=self.mock_config, ) self.federation = NonCallableMock(spec_set=[ @@ -400,6 +292,6 @@ class RoomCreationTest(unittest.TestCase): self.assertEquals(user_id, join_event.user_id) self.assertEquals(user_id, join_event.state_key) - self.assertTrue(self.state_handler.handle_new_event.called) + self.assertTrue(self.state_handler.annotate_state_groups.called) self.assertTrue(self.federation.handle_new_event.called) -- cgit 1.5.1 From 96c001e6688617cc365f548a3152a32c647ebc59 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Nov 2014 11:07:54 +0000 Subject: Fix auth checks to all use the given old_event_state --- synapse/api/auth.py | 55 ++++++++++++++++++++++++------------------ synapse/handlers/_base.py | 19 +++++++++++---- synapse/handlers/federation.py | 6 ++--- synapse/state.py | 8 ++++-- 4 files changed, 54 insertions(+), 34 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index c684265101..9eb0491c97 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -22,7 +22,7 @@ from synapse.api.errors import AuthError, StoreError, Codes, SynapseError from synapse.api.events.room import ( RoomMemberEvent, RoomPowerLevelsEvent, RoomRedactionEvent, RoomJoinRulesEvent, RoomOpsPowerLevelsEvent, InviteJoinEvent, - RoomCreateEvent, + RoomCreateEvent, RoomSendEventLevelEvent, RoomAddStateLevelEvent, ) from synapse.util.logutils import log_function @@ -37,8 +37,7 @@ class Auth(object): self.hs = hs self.store = hs.get_datastore() - @defer.inlineCallbacks - def check(self, event, snapshot, raises=False): + def check(self, event, raises=False): """ Checks if this event is correctly authed. Returns: @@ -52,17 +51,17 @@ class Auth(object): if event.old_state_events is None: # Oh, we don't know what the state of the room was, so we # are trusting that this is allowed (at least for now) - defer.returnValue(True) + return True if hasattr(event, "outlier") and event.outlier is True: # TODO (erikj): Auth for outliers is done differently. - defer.returnValue(True) + return True is_state = hasattr(event, "state_key") if event.type == RoomCreateEvent.TYPE: # FIXME - defer.returnValue(True) + return True if event.type == RoomMemberEvent.TYPE: self._can_replace_state(event) @@ -71,8 +70,7 @@ class Auth(object): logger.debug("Allowing! %s", event) else: logger.debug("Denying! %s", event) - defer.returnValue(allowed) - return + return allowed if not event.type == InviteJoinEvent.TYPE: self.check_event_sender_in_room(event) @@ -80,10 +78,10 @@ class Auth(object): if is_state: # TODO (erikj): This really only should be called for *new* # state - yield self._can_add_state(event) + self._can_add_state(event) self._can_replace_state(event) else: - yield self._can_send_event(event) + self._can_send_event(event) if event.type == RoomPowerLevelsEvent.TYPE: self._check_power_levels(event) @@ -91,9 +89,8 @@ class Auth(object): if event.type == RoomRedactionEvent.TYPE: self._check_redaction(event) - logger.debug("Allowing! %s", event) - defer.returnValue(True) + return True else: raise AuthError(500, "Unknown event: %s" % event) except AuthError as e: @@ -103,7 +100,7 @@ class Auth(object): if raises: raise e - defer.returnValue(False) + return False @defer.inlineCallbacks def check_joined_room(self, room_id, user_id): @@ -326,10 +323,15 @@ class Auth(object): def is_server_admin(self, user): return self.store.is_server_admin(user) - @defer.inlineCallbacks @log_function def _can_send_event(self, event): - send_level = yield self.store.get_send_event_level(event.room_id) + key = (RoomSendEventLevelEvent.TYPE, "", ) + send_level_event = event.old_state_events.get(key) + send_level = None + if send_level_event: + send_level = send_level_event.content.get(event.user_id) + if not send_level: + send_level = send_level_event.content.get("level", 0) if send_level: send_level = int(send_level) @@ -351,16 +353,21 @@ class Auth(object): 403, "You don't have permission to post to the room" ) - defer.returnValue(True) + return True - @defer.inlineCallbacks def _can_add_state(self, event): - add_level = yield self.store.get_add_state_level(event.room_id) - - if not add_level: - defer.returnValue(True) - - add_level = int(add_level) + key = (RoomAddStateLevelEvent.TYPE, "", ) + add_level_event = event.old_state_events.get(key) + add_level = None + if add_level_event: + add_level = add_level_event.content.get(event.user_id) + if not add_level: + add_level = add_level_event.content.get("level", 0) + + if add_level: + add_level = int(add_level) + else: + add_level = 0 user_level = self._get_power_level_from_event_state( event, @@ -374,7 +381,7 @@ class Auth(object): 403, "You don't have permission to add state to the room" ) - defer.returnValue(True) + return True def _can_replace_state(self, event): user_level = self._get_power_level_from_event_state( diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 28b64565ae..509f7b550c 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -20,6 +20,12 @@ from synapse.util.async import run_on_reactor from synapse.crypto.event_signing import add_hashes_and_signatures +import logging + + +logger = logging.getLogger(__name__) + + class BaseHandler(object): def __init__(self, hs): @@ -58,15 +64,18 @@ class BaseHandler(object): yield self.state_handler.annotate_state_groups(event) - yield add_hashes_and_signatures( + logger.debug("Signing event...") + + add_hashes_and_signatures( event, self.server_name, self.signing_key ) - if not suppress_auth: - yield self.auth.check(event, snapshot, raises=True) + logger.debug("Signed event.") - if hasattr(event, "state_key"): - yield self.state_handler.handle_new_event(event, snapshot) + if not suppress_auth: + logger.debug("Authing...") + self.auth.check(event, raises=True) + logger.debug("Authed") yield self.store.persist_event(event) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 06a2dabae2..1464a60937 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -118,7 +118,7 @@ class FederationHandler(BaseHandler): logger.debug("Event: %s", event) try: - yield self.auth.check(event, None, raises=True) + self.auth.check(event, raises=True) except AuthError as e: raise FederationError( "ERROR", @@ -319,7 +319,7 @@ class FederationHandler(BaseHandler): snapshot.fill_out_prev_events(event) yield self.state_handler.annotate_state_groups(event) - yield self.auth.check(event, None, raises=True) + self.auth.check(event, raises=True) pdu = self.pdu_codec.pdu_from_event(event) @@ -333,7 +333,7 @@ class FederationHandler(BaseHandler): event.outlier = False is_new_state = yield self.state_handler.annotate_state_groups(event) - yield self.auth.check(event, None, raises=True) + self.auth.check(event, raises=True) # FIXME (erikj): All this is duplicated above :( diff --git a/synapse/state.py b/synapse/state.py index 9771883bc3..32744e047c 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -188,11 +188,15 @@ class StateHandler(object): consumeErrors=True ) - max_power = max([int(p) for p in new_powers]) + new_powers = [ + int(p) if p else 0 for p in new_powers + ] + + max_power = max(new_powers) curr_events = [ z[0] for z in zip(curr_events, new_powers) - if int(z[1]) == max_power + if z[1] == max_power ] if not curr_events: -- cgit 1.5.1 From 4317c8e5835f0c15bf882f737d3e3c2a5b85f73f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Nov 2014 15:10:55 +0000 Subject: Implement new replace_state and changed prev_state `prev_state` is now a list of previous state ids, similiar to prev_events. `replace_state` now points to what we think was replaced. --- synapse/api/events/__init__.py | 1 + synapse/handlers/directory.py | 5 +- synapse/handlers/federation.py | 4 +- synapse/handlers/message.py | 11 ++-- synapse/handlers/profile.py | 6 +-- synapse/handlers/room.py | 16 ++---- synapse/rest/room.py | 2 +- synapse/state.py | 39 ++------------ synapse/storage/__init__.py | 92 +++++++++++++++++++++++++--------- synapse/storage/_base.py | 66 +++++++++++++++++------- synapse/storage/event_federation.py | 64 ++++++++++++++++++++--- synapse/storage/schema/event_edges.sql | 40 ++++++++++----- synapse/util/jsonobject.py | 2 +- 13 files changed, 220 insertions(+), 128 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index 168b812311..fc3f350570 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -60,6 +60,7 @@ class SynapseEvent(JsonEncodedObject): "age_ts", "prev_content", "prev_state", + "replaces_state", "redacted_because", "origin_server_ts", ] diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 6e897e915d..164363cdc5 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -147,10 +147,7 @@ class DirectoryHandler(BaseHandler): content={"aliases": aliases}, ) - snapshot = yield self.store.snapshot_room( - room_id=room_id, - user_id=user_id, - ) + snapshot = yield self.store.snapshot_room(event) yield self._on_new_room_event( event, snapshot, extra_users=[user_id], suppress_auth=True diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1464a60937..513ec9a5e3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -313,9 +313,7 @@ class FederationHandler(BaseHandler): state_key=user_id, ) - snapshot = yield self.store.snapshot_room( - event.room_id, event.user_id, - ) + snapshot = yield self.store.snapshot_room(event) snapshot.fill_out_prev_events(event) yield self.state_handler.annotate_state_groups(event) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c6f6ab14d1..8394013df3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -81,7 +81,7 @@ class MessageHandler(BaseHandler): user = self.hs.parse_userid(event.user_id) assert user.is_mine, "User must be our own: %s" % (user,) - snapshot = yield self.store.snapshot_room(event.room_id, event.user_id) + snapshot = yield self.store.snapshot_room(event) yield self._on_new_room_event( event, snapshot, suppress_auth=suppress_auth @@ -141,12 +141,7 @@ class MessageHandler(BaseHandler): SynapseError if something went wrong. """ - snapshot = yield self.store.snapshot_room( - event.room_id, - event.user_id, - state_type=event.type, - state_key=event.state_key, - ) + snapshot = yield self.store.snapshot_room(event) yield self._on_new_room_event(event, snapshot) @@ -214,7 +209,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def send_feedback(self, event): - snapshot = yield self.store.snapshot_room(event.room_id, event.user_id) + snapshot = yield self.store.snapshot_room(event) # store message in db yield self._on_new_room_event(event, snapshot) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 4cd0a06093..e47814483a 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.api.constants import Membership -from synapse.api.events.room import RoomMemberEvent from ._base import BaseHandler @@ -196,10 +195,7 @@ class ProfileHandler(BaseHandler): ) for j in joins: - snapshot = yield self.store.snapshot_room( - j.room_id, j.state_key, RoomMemberEvent.TYPE, - j.state_key - ) + snapshot = yield self.store.snapshot_room(j) content = { "membership": j.content["membership"], diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index f176ad39bf..55c893eb58 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -122,10 +122,7 @@ class RoomCreationHandler(BaseHandler): @defer.inlineCallbacks def handle_event(event): - snapshot = yield self.store.snapshot_room( - room_id=room_id, - user_id=user_id, - ) + snapshot = yield self.store.snapshot_room(event) logger.debug("Event: %s", event) @@ -364,10 +361,8 @@ class RoomMemberHandler(BaseHandler): """ target_user_id = event.state_key - snapshot = yield self.store.snapshot_room( - event.room_id, event.user_id, - RoomMemberEvent.TYPE, target_user_id - ) + snapshot = yield self.store.snapshot_room(event) + ## TODO(markjh): get prev state from snapshot. prev_state = yield self.store.get_room_member( target_user_id, event.room_id @@ -442,10 +437,7 @@ class RoomMemberHandler(BaseHandler): content=content, ) - snapshot = yield self.store.snapshot_room( - room_id, joinee.to_string(), RoomMemberEvent.TYPE, - joinee.to_string() - ) + snapshot = yield self.store.snapshot_room(new_event) yield self._do_join(new_event, snapshot, room_host=host, do_auth=True) diff --git a/synapse/rest/room.py b/synapse/rest/room.py index ec0ce78fda..997895dab0 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -138,7 +138,7 @@ class RoomStateEventRestServlet(RestServlet): raise SynapseError( 404, "Event not found.", errcode=Codes.NOT_FOUND ) - defer.returnValue((200, data[0].get_dict()["content"])) + defer.returnValue((200, data.get_dict()["content"])) @defer.inlineCallbacks def on_PUT(self, request, room_id, event_type, state_key): diff --git a/synapse/state.py b/synapse/state.py index 32744e047c..97a8160a33 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -45,40 +45,6 @@ class StateHandler(object): self.server_name = hs.hostname self.hs = hs - @defer.inlineCallbacks - @log_function - def handle_new_event(self, event, snapshot): - """ Given an event this works out if a) we have sufficient power level - to update the state and b) works out what the prev_state should be. - - Returns: - Deferred: Resolved with a boolean indicating if we successfully - updated the state. - - Raised: - AuthError - """ - # This needs to be done in a transaction. - - if not hasattr(event, "state_key"): - return - - # Now I need to fill out the prev state and work out if it has auth - # (w.r.t. to power levels) - - snapshot.fill_out_prev_events(event) - yield self.annotate_state_groups(event) - - if event.old_state_events: - current_state = event.old_state_events.get( - (event.type, event.state_key) - ) - - if current_state: - event.prev_state = current_state.event_id - - defer.returnValue(True) - @defer.inlineCallbacks @log_function def annotate_state_groups(self, event, old_state=None): @@ -111,7 +77,10 @@ class StateHandler(object): event.old_state_events = copy.deepcopy(new_state) if hasattr(event, "state_key"): - new_state[(event.type, event.state_key)] = event + key = (event.type, event.state_key) + if key in new_state: + event.replaces_state = new_state[key].event_id + new_state[key] = event event.state_group = None event.state_events = new_state diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 6b8fed4502..2d62fc2ed0 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -242,8 +242,8 @@ class DataStore(RoomMemberStore, RoomStore, "state_key": event.state_key, } - if hasattr(event, "prev_state"): - vals["prev_state"] = event.prev_state + if hasattr(event, "replaces_state"): + vals["prev_state"] = event.replaces_state self._simple_insert_txn(txn, "state_events", vals) @@ -258,6 +258,40 @@ class DataStore(RoomMemberStore, RoomStore, } ) + for e_id, h in event.prev_state: + self._simple_insert_txn( + txn, + table="event_edges", + values={ + "event_id": event.event_id, + "prev_event_id": e_id, + "room_id": event.room_id, + "is_state": 1, + }, + or_ignore=True, + ) + + if not backfilled: + self._simple_insert_txn( + txn, + table="state_forward_extremities", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + ) + + for prev_state_id, _ in event.prev_state: + self._simple_delete_txn( + txn, + table="state_forward_extremities", + keyvalues={ + "event_id": prev_state_id, + } + ) + for hash_alg, hash_base64 in event.hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_event_content_hash_txn( @@ -357,7 +391,7 @@ class DataStore(RoomMemberStore, RoomStore, ], ) - def snapshot_room(self, room_id, user_id, state_type=None, state_key=None): + def snapshot_room(self, event): """Snapshot the room for an update by a user Args: room_id (synapse.types.RoomId): The room to snapshot. @@ -368,16 +402,29 @@ class DataStore(RoomMemberStore, RoomStore, synapse.storage.Snapshot: A snapshot of the state of the room. """ def _snapshot(txn): - membership_state = self._get_room_member(txn, user_id, room_id) - prev_events = self._get_latest_events_in_room(txn, room_id) + prev_events = self._get_latest_events_in_room( + txn, + event.room_id + ) + + prev_state = None + state_key = None + if hasattr(event, "state_key"): + state_key = event.state_key + prev_state = self._get_latest_state_in_room( + txn, + event.room_id, + type=event.type, + state_key=state_key, + ) return Snapshot( store=self, - room_id=room_id, - user_id=user_id, + room_id=event.room_id, + user_id=event.user_id, prev_events=prev_events, - membership_state=membership_state, - state_type=state_type, + prev_state=prev_state, + state_type=event.type, state_key=state_key, ) @@ -400,30 +447,29 @@ class Snapshot(object): """ def __init__(self, store, room_id, user_id, prev_events, - membership_state, state_type=None, state_key=None, - prev_state_pdu=None): + prev_state, state_type=None, state_key=None): self.store = store self.room_id = room_id self.user_id = user_id self.prev_events = prev_events - self.membership_state = membership_state + self.prev_state = prev_state self.state_type = state_type self.state_key = state_key - self.prev_state_pdu = prev_state_pdu def fill_out_prev_events(self, event): - if hasattr(event, "prev_events"): - return + if not hasattr(event, "prev_events"): + event.prev_events = [ + (event_id, hashes) + for event_id, hashes, _ in self.prev_events + ] - event.prev_events = [ - (event_id, hashes) - for event_id, hashes, _ in self.prev_events - ] + if self.prev_events: + event.depth = max([int(v) for _, _, v in self.prev_events]) + 1 + else: + event.depth = 0 - if self.prev_events: - event.depth = max([int(v) for _, _, v in self.prev_events]) + 1 - else: - event.depth = 0 + if not hasattr(event, "prev_state") and self.prev_state is not None: + event.prev_state = self.prev_state def schema_path(schema): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7d445b4633..7821fc4726 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -245,7 +245,6 @@ class SQLBaseStore(object): return [r[0] for r in txn.fetchall()] - def _simple_select_onecol(self, table, keyvalues, retcol): """Executes a SELECT query on the named table, which returns a list comprising of the values of the named column from the selected rows. @@ -273,17 +272,30 @@ class SQLBaseStore(object): keyvalues : dict of column names and values to select the rows with retcols : list of strings giving the names of the columns to return """ + return self.runInteraction( + "_simple_select_list", + self._simple_select_list_txn, + table, keyvalues, retcols + ) + + def _simple_select_list_txn(self, txn, table, keyvalues, retcols): + """Executes a SELECT query on the named table, which may return zero or + more rows, returning the result as a list of dicts. + + Args: + txn : Transaction object + table : string giving the table name + keyvalues : dict of column names and values to select the rows with + retcols : list of strings giving the names of the columns to return + """ sql = "SELECT %s FROM %s WHERE %s" % ( ", ".join(retcols), table, - " AND ".join("%s = ?" % (k) for k in keyvalues) + " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) - def func(txn): - txn.execute(sql, keyvalues.values()) - return self.cursor_to_dict(txn) - - return self.runInteraction("_simple_select_list", func) + txn.execute(sql, keyvalues.values()) + return self.cursor_to_dict(txn) def _simple_update_one(self, table, keyvalues, updatevalues, retcols=None): @@ -417,6 +429,10 @@ class SQLBaseStore(object): d.pop("topological_ordering", None) d.pop("processed", None) d["origin_server_ts"] = d.pop("ts", 0) + replaces_state = d.pop("prev_state", None) + + if replaces_state: + d["replaces_state"] = replaces_state d.update(json.loads(row_dict["unrecognized_keys"])) d["content"] = json.loads(d["content"]) @@ -450,16 +466,32 @@ class SQLBaseStore(object): k: encode_base64(v) for k, v in signatures.items() } - ev.prev_events = self._get_prev_events(txn, ev.event_id) - - if hasattr(ev, "prev_state"): - # Load previous state_content. - # TODO: Should we be pulling this out above? - cursor = txn.execute(select_event_sql, (ev.prev_state,)) - prevs = self.cursor_to_dict(cursor) - if prevs: - prev = self._parse_event_from_row(prevs[0]) - ev.prev_content = prev.content + prevs = self._get_prev_events_and_state(txn, ev.event_id) + + ev.prev_events = [ + (e_id, h) + for e_id, h, is_state in prevs + if is_state == 0 + ] + + if hasattr(ev, "state_key"): + ev.prev_state = [ + (e_id, h) + for e_id, h, is_state in prevs + if is_state == 1 + ] + + if hasattr(ev, "replaces_state"): + # Load previous state_content. + # FIXME (erikj): Handle multiple prev_states. + cursor = txn.execute( + select_event_sql, + (ev.replaces_state,) + ) + prevs = self.cursor_to_dict(cursor) + if prevs: + prev = self._parse_event_from_row(prevs[0]) + ev.prev_content = prev.content if not hasattr(ev, "redacted"): logger.debug("Doesn't have redacted key: %s", ev) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index f427aba879..180a764134 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -69,19 +69,21 @@ class EventFederationStore(SQLBaseStore): return results - def _get_prev_events(self, txn, event_id): - prev_ids = self._simple_select_onecol_txn( + def _get_latest_state_in_room(self, txn, room_id, type, state_key): + event_ids = self._simple_select_onecol_txn( txn, - table="event_edges", + table="state_forward_extremities", keyvalues={ - "event_id": event_id, + "room_id": room_id, + "type": type, + "state_key": state_key, }, - retcol="prev_event_id", + retcol="event_id", ) results = [] - for prev_event_id in prev_ids: - hashes = self._get_event_reference_hashes_txn(txn, prev_event_id) + for event_id in event_ids: + hashes = self._get_event_reference_hashes_txn(txn, event_id) prev_hashes = { k: encode_base64(v) for k, v in hashes.items() if k == "sha256" @@ -90,6 +92,53 @@ class EventFederationStore(SQLBaseStore): return results + def _get_prev_events(self, txn, event_id): + results = self._get_prev_events_and_state( + txn, + event_id, + is_state=0, + ) + + return [(e_id, h, ) for e_id, h, _ in results] + + def _get_prev_state(self, txn, event_id): + results = self._get_prev_events_and_state( + txn, + event_id, + is_state=1, + ) + + return [(e_id, h, ) for e_id, h, _ in results] + + def _get_prev_events_and_state(self, txn, event_id, is_state=None): + keyvalues = { + "event_id": event_id, + } + + if is_state is not None: + keyvalues["is_state"] = is_state + + res = self._simple_select_list_txn( + txn, + table="event_edges", + keyvalues=keyvalues, + retcols=["prev_event_id", "is_state"], + ) + + results = [] + for d in res: + hashes = self._get_event_reference_hashes_txn( + txn, + d["prev_event_id"] + ) + prev_hashes = { + k: encode_base64(v) for k, v in hashes.items() + if k == "sha256" + } + results.append((d["prev_event_id"], prev_hashes, d["is_state"])) + + return results + def get_min_depth(self, room_id): return self.runInteraction( "get_min_depth", @@ -135,6 +184,7 @@ class EventFederationStore(SQLBaseStore): "event_id": event_id, "prev_event_id": e_id, "room_id": room_id, + "is_state": 0, }, or_ignore=True, ) diff --git a/synapse/storage/schema/event_edges.sql b/synapse/storage/schema/event_edges.sql index e5f768c705..51695826a8 100644 --- a/synapse/storage/schema/event_edges.sql +++ b/synapse/storage/schema/event_edges.sql @@ -1,7 +1,7 @@ CREATE TABLE IF NOT EXISTS event_forward_extremities( - event_id TEXT, - room_id TEXT, + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE ); @@ -10,8 +10,8 @@ CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_backward_extremities( - event_id TEXT, - room_id TEXT, + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE ); @@ -20,10 +20,11 @@ CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id CREATE TABLE IF NOT EXISTS event_edges( - event_id TEXT, - prev_event_id TEXT, - room_id TEXT, - CONSTRAINT uniqueness UNIQUE (event_id, prev_event_id, room_id) + event_id TEXT NOT NULL, + prev_event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + is_state INTEGER NOT NULL, + CONSTRAINT uniqueness UNIQUE (event_id, prev_event_id, room_id, is_state) ); CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id); @@ -31,8 +32,8 @@ CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id); CREATE TABLE IF NOT EXISTS room_depth( - room_id TEXT, - min_depth INTEGER, + room_id TEXT NOT NULL, + min_depth INTEGER NOT NULL, CONSTRAINT uniqueness UNIQUE (room_id) ); @@ -40,10 +41,25 @@ CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id); create TABLE IF NOT EXISTS event_destinations( - event_id TEXT, - destination TEXT, + event_id TEXT NOT NULL, + destination TEXT NOT NULL, delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered CONSTRAINT uniqueness UNIQUE (event_id, destination) ON CONFLICT REPLACE ); CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id); + + +CREATE TABLE IF NOT EXISTS state_forward_extremities( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + type TEXT NOT NULL, + state_key TEXT NOT NULL, + CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE +); + +CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities( + room_id, type, state_key +); +CREATE INDEX IF NOT EXISTS st_extrem_id ON state_forward_extremities(event_id); + diff --git a/synapse/util/jsonobject.py b/synapse/util/jsonobject.py index c91eb897a8..e79b68f661 100644 --- a/synapse/util/jsonobject.py +++ b/synapse/util/jsonobject.py @@ -80,7 +80,7 @@ class JsonEncodedObject(object): def get_full_dict(self): d = { - k: v for (k, v) in self.__dict__.items() + k: _encode(v) for (k, v) in self.__dict__.items() if k in self.valid_keys or k in self.internal_keys } d.update(self.unrecognized_keys) -- cgit 1.5.1 From 233969bb586f2a5b0231c8e0675c8c8c3097ef03 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Nov 2014 15:25:03 +0000 Subject: Update to use replaces_state rather than prev_state --- synapse/handlers/federation.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 513ec9a5e3..f0448a05d8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -386,8 +386,10 @@ class FederationHandler(BaseHandler): event = yield self.store.get_event(event_id) if hasattr(event, "state_key"): # Get previous state - if hasattr(event, "prev_state") and event.prev_state: - prev_event = yield self.store.get_event(event.prev_state) + if hasattr(event, "replaces_state") and event.replaces_state: + prev_event = yield self.store.get_event( + event.replaces_state + ) results[(event.type, event.state_key)] = prev_event else: del results[(event.type, event.state_key)] -- cgit 1.5.1 From 351c64e99e5677096f9a2ae2cd7e84dbc1887878 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Nov 2014 16:59:13 +0000 Subject: Amalgamate all power levels. Remove concept of reqired power levels, something similiar can be done using the new power level event. --- synapse/api/auth.py | 221 ++++++++++++++--------------------------- synapse/api/events/__init__.py | 3 +- synapse/api/events/factory.py | 7 +- synapse/api/events/room.py | 22 ---- synapse/api/events/utils.py | 23 ++--- synapse/handlers/room.py | 52 +++------- synapse/storage/__init__.py | 12 --- synapse/storage/room.py | 157 ----------------------------- 8 files changed, 102 insertions(+), 395 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 9eb0491c97..462e97bd90 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -21,8 +21,8 @@ from synapse.api.constants import Membership, JoinRules from synapse.api.errors import AuthError, StoreError, Codes, SynapseError from synapse.api.events.room import ( RoomMemberEvent, RoomPowerLevelsEvent, RoomRedactionEvent, - RoomJoinRulesEvent, RoomOpsPowerLevelsEvent, InviteJoinEvent, - RoomCreateEvent, RoomSendEventLevelEvent, RoomAddStateLevelEvent, + RoomJoinRulesEvent, InviteJoinEvent, + RoomCreateEvent, ) from synapse.util.logutils import log_function @@ -51,6 +51,7 @@ class Auth(object): if event.old_state_events is None: # Oh, we don't know what the state of the room was, so we # are trusting that this is allowed (at least for now) + logger.warn("Trusting event: %s", event.event_id) return True if hasattr(event, "outlier") and event.outlier is True: @@ -64,7 +65,7 @@ class Auth(object): return True if event.type == RoomMemberEvent.TYPE: - self._can_replace_state(event) + self._can_send_event(event) allowed = self.is_membership_change_allowed(event) if allowed: logger.debug("Allowing! %s", event) @@ -72,16 +73,7 @@ class Auth(object): logger.debug("Denying! %s", event) return allowed - if not event.type == InviteJoinEvent.TYPE: - self.check_event_sender_in_room(event) - - if is_state: - # TODO (erikj): This really only should be called for *new* - # state - self._can_add_state(event) - self._can_replace_state(event) - else: - self._can_send_event(event) + self._can_send_event(event) if event.type == RoomPowerLevelsEvent.TYPE: self._check_power_levels(event) @@ -239,21 +231,21 @@ class Auth(object): power_level_event = event.old_state_events.get(key) level = None if power_level_event: - level = power_level_event.content.get(user_id) + level = power_level_event.content.get("users", {}).get(user_id) if not level: - level = power_level_event.content.get("default", 0) + level = power_level_event.content.get("users_default", 0) return level def _get_ops_level_from_event_state(self, event): - key = (RoomOpsPowerLevelsEvent.TYPE, "", ) - ops_event = event.old_state_events.get(key) + key = (RoomPowerLevelsEvent.TYPE, "", ) + power_level_event = event.old_state_events.get(key) - if ops_event: + if power_level_event: return ( - ops_event.content.get("ban_level"), - ops_event.content.get("kick_level"), - ops_event.content.get("redact_level"), + power_level_event.content.get("ban", 50), + power_level_event.content.get("kick", 50), + power_level_event.content.get("redact", 50), ) return None, None, None, @@ -325,13 +317,22 @@ class Auth(object): @log_function def _can_send_event(self, event): - key = (RoomSendEventLevelEvent.TYPE, "", ) + key = (RoomPowerLevelsEvent.TYPE, "", ) send_level_event = event.old_state_events.get(key) send_level = None if send_level_event: - send_level = send_level_event.content.get(event.user_id) + send_level = send_level_event.content.get("events", {}).get( + event.type + ) if not send_level: - send_level = send_level_event.content.get("level", 0) + if hasattr(event, "state_key"): + send_level = send_level_event.content.get( + "state_default", 50 + ) + else: + send_level = send_level_event.content.get( + "events_default", 0 + ) if send_level: send_level = int(send_level) @@ -350,85 +351,21 @@ class Auth(object): if user_level < send_level: raise AuthError( - 403, "You don't have permission to post to the room" - ) - - return True - - def _can_add_state(self, event): - key = (RoomAddStateLevelEvent.TYPE, "", ) - add_level_event = event.old_state_events.get(key) - add_level = None - if add_level_event: - add_level = add_level_event.content.get(event.user_id) - if not add_level: - add_level = add_level_event.content.get("level", 0) - - if add_level: - add_level = int(add_level) - else: - add_level = 0 - - user_level = self._get_power_level_from_event_state( - event, - event.user_id, - ) - - user_level = int(user_level) - - if user_level < add_level: - raise AuthError( - 403, "You don't have permission to add state to the room" + 403, "You don't have permission to post that to the room" ) return True - def _can_replace_state(self, event): - user_level = self._get_power_level_from_event_state( - event, - event.user_id, - ) - - if user_level: - user_level = int(user_level) - else: - user_level = 0 - - logger.debug( - "Checking power level for %s, %s", event.user_id, user_level - ) - - key = (event.type, event.state_key, ) - current_state = event.old_state_events.get(key) - - if current_state and hasattr(current_state, "required_power_level"): - req = current_state.required_power_level - - logger.debug("Checked power level for %s, %s", event.user_id, req) - if user_level < req: - raise AuthError( - 403, - "You don't have permission to change that state" - ) - def _check_redaction(self, event): user_level = self._get_power_level_from_event_state( event, event.user_id, ) - if user_level: - user_level = int(user_level) - else: - user_level = 0 - _, _, redact_level = self._get_ops_level_from_event_state( event ) - if not redact_level: - redact_level = 50 - if user_level < redact_level: raise AuthError( 403, @@ -436,14 +373,9 @@ class Auth(object): ) def _check_power_levels(self, event): - for k, v in event.content.items(): - if k == "default": - continue - - # FIXME (erikj): We don't want hsob_Ts in content. - if k == "hsob_ts": - continue - + user_list = event.content.get("users", {}) + # Validate users + for k, v in user_list.items(): try: self.hs.parse_userid(k) except: @@ -459,72 +391,63 @@ class Auth(object): if not current_state: return - else: - current_state = current_state[0] user_level = self._get_power_level_from_event_state( event, event.user_id, ) - if user_level: - user_level = int(user_level) - else: - user_level = 0 + # Check other levels: + levels_to_check = [ + ("users_default", []), + ("events_default", []), + ("ban", []), + ("redact", []), + ("kick", []), + ] + + old_list = current_state.content.get("users") + for user in set(old_list.keys() + user_list.keys()): + levels_to_check.append( + (user, ["users"]) + ) - old_list = current_state.content + old_list = current_state.content.get("events") + new_list = event.content.get("events") + for ev_id in set(old_list.keys() + new_list.keys()): + levels_to_check.append( + (ev_id, ["events"]) + ) - # FIXME (erikj) - old_people = {k: v for k, v in old_list.items() if k.startswith("@")} - new_people = { - k: v for k, v in event.content.items() - if k.startswith("@") - } + old_state = current_state.content + new_state = event.content - removed = set(old_people.keys()) - set(new_people.keys()) - added = set(new_people.keys()) - set(old_people.keys()) - same = set(old_people.keys()) & set(new_people.keys()) + for level_to_check, dir in levels_to_check: + old_loc = old_state + for d in dir: + old_loc = old_loc.get(d, {}) - for r in removed: - if int(old_list[r]) > user_level: - raise AuthError( - 403, - "You don't have permission to remove user: %s" % (r, ) - ) + new_loc = new_state + for d in dir: + new_loc = new_loc.get(d, {}) - for n in added: - if int(event.content[n]) > user_level: - raise AuthError( - 403, - "You don't have permission to add ops level greater " - "than your own" - ) + if level_to_check in old_loc: + old_level = int(old_loc[level_to_check]) + else: + old_level = None - for s in same: - if int(event.content[s]) != int(old_list[s]): - if int(event.content[s]) > user_level: - raise AuthError( - 403, - "You don't have permission to add ops level greater " - "than your own" - ) + if level_to_check in new_loc: + new_level = int(new_loc[level_to_check]) + else: + new_level = None - if "default" in old_list: - old_default = int(old_list["default"]) + if new_level is not None and old_level is not None: + if new_level == old_level: + continue - if old_default > user_level: + if old_level > user_level or new_level > user_level: raise AuthError( 403, - "You don't have permission to add ops level greater than " - "your own" + "You don't have permission to add ops level greater " + "than your own" ) - - if "default" in event.content: - new_default = int(event.content["default"]) - - if new_default > user_level: - raise AuthError( - 403, - "You don't have permission to add ops level greater " - "than your own" - ) diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index 98a66144e7..84d3a98365 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -56,12 +56,12 @@ class SynapseEvent(JsonEncodedObject): "user_id", # sender/initiator "content", # HTTP body, JSON "state_key", - "required_power_level", "age_ts", "prev_content", "replaces_state", "redacted_because", "origin_server_ts", + "auth_chains", ] internal_keys = [ @@ -70,7 +70,6 @@ class SynapseEvent(JsonEncodedObject): "destinations", "origin", "outlier", - "power_level", "redacted", "prev_events", "hashes", diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py index 9134c82eff..a1ec708a81 100644 --- a/synapse/api/events/factory.py +++ b/synapse/api/events/factory.py @@ -16,8 +16,8 @@ from synapse.api.events.room import ( RoomTopicEvent, MessageEvent, RoomMemberEvent, FeedbackEvent, InviteJoinEvent, RoomConfigEvent, RoomNameEvent, GenericEvent, - RoomPowerLevelsEvent, RoomJoinRulesEvent, RoomOpsPowerLevelsEvent, - RoomCreateEvent, RoomAddStateLevelEvent, RoomSendEventLevelEvent, + RoomPowerLevelsEvent, RoomJoinRulesEvent, + RoomCreateEvent, RoomRedactionEvent, ) @@ -39,9 +39,6 @@ class EventFactory(object): RoomPowerLevelsEvent, RoomJoinRulesEvent, RoomCreateEvent, - RoomAddStateLevelEvent, - RoomSendEventLevelEvent, - RoomOpsPowerLevelsEvent, RoomRedactionEvent, ] diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py index cd936074fc..25bc883706 100644 --- a/synapse/api/events/room.py +++ b/synapse/api/events/room.py @@ -153,28 +153,6 @@ class RoomPowerLevelsEvent(SynapseStateEvent): def get_content_template(self): return {} - -class RoomAddStateLevelEvent(SynapseStateEvent): - TYPE = "m.room.add_state_level" - - def get_content_template(self): - return {} - - -class RoomSendEventLevelEvent(SynapseStateEvent): - TYPE = "m.room.send_event_level" - - def get_content_template(self): - return {} - - -class RoomOpsPowerLevelsEvent(SynapseStateEvent): - TYPE = "m.room.ops_levels" - - def get_content_template(self): - return {} - - class RoomAliasesEvent(SynapseStateEvent): TYPE = "m.room.aliases" diff --git a/synapse/api/events/utils.py b/synapse/api/events/utils.py index 31601fd3a9..5fc79105b5 100644 --- a/synapse/api/events/utils.py +++ b/synapse/api/events/utils.py @@ -15,7 +15,6 @@ from .room import ( RoomMemberEvent, RoomJoinRulesEvent, RoomPowerLevelsEvent, - RoomAddStateLevelEvent, RoomSendEventLevelEvent, RoomOpsPowerLevelsEvent, RoomAliasesEvent, RoomCreateEvent, ) @@ -52,17 +51,17 @@ def _prune_event_or_pdu(event_type, event): elif event_type == RoomJoinRulesEvent.TYPE: add_fields("join_rule") elif event_type == RoomPowerLevelsEvent.TYPE: - # TODO: Actually check these are valid user_ids etc. - add_fields("default") - for k, v in event.content.items(): - if k.startswith("@") and isinstance(v, (int, long)): - new_content[k] = v - elif event_type == RoomAddStateLevelEvent.TYPE: - add_fields("level") - elif event_type == RoomSendEventLevelEvent.TYPE: - add_fields("level") - elif event_type == RoomOpsPowerLevelsEvent.TYPE: - add_fields("kick_level", "ban_level", "redact_level") + add_fields( + "users", + "users_default", + "events", + "events_default", + "events_default", + "state_default", + "ban", + "kick", + "redact", + ) elif event_type == RoomAliasesEvent.TYPE: add_fields("aliases") diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 55c893eb58..42a6c9f9bf 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -21,8 +21,7 @@ from synapse.api.constants import Membership, JoinRules from synapse.api.errors import StoreError, SynapseError from synapse.api.events.room import ( RoomMemberEvent, RoomCreateEvent, RoomPowerLevelsEvent, - RoomJoinRulesEvent, RoomAddStateLevelEvent, RoomTopicEvent, - RoomSendEventLevelEvent, RoomOpsPowerLevelsEvent, RoomNameEvent, + RoomTopicEvent, RoomNameEvent, RoomJoinRulesEvent, ) from synapse.util import stringutils from ._base import BaseHandler @@ -139,7 +138,6 @@ class RoomCreationHandler(BaseHandler): etype=RoomNameEvent.TYPE, room_id=room_id, user_id=user_id, - required_power_level=50, content={"name": name}, ) @@ -151,7 +149,6 @@ class RoomCreationHandler(BaseHandler): etype=RoomTopicEvent.TYPE, room_id=room_id, user_id=user_id, - required_power_level=50, content={"topic": topic}, ) @@ -196,7 +193,6 @@ class RoomCreationHandler(BaseHandler): event_keys = { "room_id": room_id, "user_id": creator.to_string(), - "required_power_level": 100, } def create(etype, **content): @@ -213,7 +209,21 @@ class RoomCreationHandler(BaseHandler): power_levels_event = self.event_factory.create_event( etype=RoomPowerLevelsEvent.TYPE, - content={creator.to_string(): 100, "default": 0}, + content={ + "users": { + creator.to_string(): 100, + }, + "users_default": 0, + "events": { + RoomNameEvent.TYPE: 100, + RoomPowerLevelsEvent.TYPE: 100, + }, + "events_default": 0, + "state_default": 50, + "ban": 50, + "kick": 50, + "redact": 50 + }, **event_keys ) @@ -223,30 +233,10 @@ class RoomCreationHandler(BaseHandler): join_rule=join_rule, ) - add_state_event = create( - etype=RoomAddStateLevelEvent.TYPE, - level=100, - ) - - send_event = create( - etype=RoomSendEventLevelEvent.TYPE, - level=0, - ) - - ops = create( - etype=RoomOpsPowerLevelsEvent.TYPE, - ban_level=50, - kick_level=50, - redact_level=50, - ) - return [ creation_event, power_levels_event, join_rules_event, - add_state_event, - send_event, - ops, ] @@ -388,16 +378,6 @@ class RoomMemberHandler(BaseHandler): else: # This is not a JOIN, so we can handle it normally. - # If we're banning someone, set a req power level - if event.membership == Membership.BAN: - if not hasattr(event, "required_power_level") or event.required_power_level is None: - # Add some default required_power_level - user_level = yield self.store.get_power_level( - event.room_id, - event.user_id, - ) - event.required_power_level = user_level - if prev_state and prev_state.membership == event.membership: # double same action, treat this event as a NOOP. defer.returnValue({}) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 2d62fc2ed0..2a1970914f 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -17,13 +17,9 @@ from twisted.internet import defer from synapse.api.events.room import ( RoomMemberEvent, RoomTopicEvent, FeedbackEvent, -# RoomConfigEvent, RoomNameEvent, RoomJoinRulesEvent, RoomPowerLevelsEvent, - RoomAddStateLevelEvent, - RoomSendEventLevelEvent, - RoomOpsPowerLevelsEvent, RoomRedactionEvent, ) @@ -166,14 +162,6 @@ class DataStore(RoomMemberStore, RoomStore, self._store_room_topic_txn(txn, event) elif event.type == RoomJoinRulesEvent.TYPE: self._store_join_rule(txn, event) - elif event.type == RoomPowerLevelsEvent.TYPE: - self._store_power_levels(txn, event) - elif event.type == RoomAddStateLevelEvent.TYPE: - self._store_add_state_level(txn, event) - elif event.type == RoomSendEventLevelEvent.TYPE: - self._store_send_event_level(txn, event) - elif event.type == RoomOpsPowerLevelsEvent.TYPE: - self._store_ops_level(txn, event) elif event.type == RoomRedactionEvent.TYPE: self._store_redaction(txn, event) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 7e48ce9cc3..0c83c11ad3 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -148,85 +148,6 @@ class RoomStore(SQLBaseStore): else: defer.returnValue(None) - def get_power_level(self, room_id, user_id): - return self.runInteraction( - "get_power_level", - self._get_power_level, - room_id, user_id, - ) - - def _get_power_level(self, txn, room_id, user_id): - sql = ( - "SELECT level FROM room_power_levels as r " - "INNER JOIN current_state_events as c " - "ON r.event_id = c.event_id " - "WHERE c.room_id = ? AND r.user_id = ? " - ) - - rows = txn.execute(sql, (room_id, user_id,)).fetchall() - - if len(rows) == 1: - return rows[0][0] - - sql = ( - "SELECT level FROM room_default_levels as r " - "INNER JOIN current_state_events as c " - "ON r.event_id = c.event_id " - "WHERE c.room_id = ? " - ) - - rows = txn.execute(sql, (room_id,)).fetchall() - - if len(rows) == 1: - return rows[0][0] - else: - return None - - def get_ops_levels(self, room_id): - return self.runInteraction( - "get_ops_levels", - self._get_ops_levels, - room_id, - ) - - def _get_ops_levels(self, txn, room_id): - sql = ( - "SELECT ban_level, kick_level, redact_level " - "FROM room_ops_levels as r " - "INNER JOIN current_state_events as c " - "ON r.event_id = c.event_id " - "WHERE c.room_id = ? " - ) - - rows = txn.execute(sql, (room_id,)).fetchall() - - if len(rows) == 1: - return OpsLevel(rows[0][0], rows[0][1], rows[0][2]) - else: - return OpsLevel(None, None) - - def get_add_state_level(self, room_id): - return self._get_level_from_table("room_add_state_levels", room_id) - - def get_send_event_level(self, room_id): - return self._get_level_from_table("room_send_event_levels", room_id) - - @defer.inlineCallbacks - def _get_level_from_table(self, table, room_id): - sql = ( - "SELECT level FROM %(table)s as r " - "INNER JOIN current_state_events as c " - "ON r.event_id = c.event_id " - "WHERE c.room_id = ? " - ) % {"table": table} - - rows = yield self._execute(None, sql, room_id) - - if len(rows) == 1: - defer.returnValue(rows[0][0]) - else: - defer.returnValue(None) - def _store_room_topic_txn(self, txn, event): self._simple_insert_txn( txn, @@ -260,84 +181,6 @@ class RoomStore(SQLBaseStore): }, ) - def _store_power_levels(self, txn, event): - for user_id, level in event.content.items(): - if user_id == "default": - self._simple_insert_txn( - txn, - "room_default_levels", - { - "event_id": event.event_id, - "room_id": event.room_id, - "level": level, - }, - ) - else: - self._simple_insert_txn( - txn, - "room_power_levels", - { - "event_id": event.event_id, - "room_id": event.room_id, - "user_id": user_id, - "level": level - }, - ) - - def _store_default_level(self, txn, event): - self._simple_insert_txn( - txn, - "room_default_levels", - { - "event_id": event.event_id, - "room_id": event.room_id, - "level": event.content["default_level"], - }, - ) - - def _store_add_state_level(self, txn, event): - self._simple_insert_txn( - txn, - "room_add_state_levels", - { - "event_id": event.event_id, - "room_id": event.room_id, - "level": event.content["level"], - }, - ) - - def _store_send_event_level(self, txn, event): - self._simple_insert_txn( - txn, - "room_send_event_levels", - { - "event_id": event.event_id, - "room_id": event.room_id, - "level": event.content["level"], - }, - ) - - def _store_ops_level(self, txn, event): - content = { - "event_id": event.event_id, - "room_id": event.room_id, - } - - if "kick_level" in event.content: - content["kick_level"] = event.content["kick_level"] - - if "ban_level" in event.content: - content["ban_level"] = event.content["ban_level"] - - if "redact_level" in event.content: - content["redact_level"] = event.content["redact_level"] - - self._simple_insert_txn( - txn, - "room_ops_levels", - content, - ) - class RoomsTable(Table): table_name = "rooms" -- cgit 1.5.1 From bf6b72eb558cca94e209a541188079750bfefea0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Nov 2014 18:42:18 +0000 Subject: Start implementing auth chains --- synapse/api/auth.py | 3 +- synapse/api/events/__init__.py | 2 +- synapse/handlers/_base.py | 59 ++++++++++++++++++++++++++++++++-- synapse/storage/__init__.py | 12 ++++++- synapse/storage/_base.py | 2 ++ synapse/storage/event_federation.py | 21 ++++++++++++ synapse/storage/schema/event_edges.sql | 10 ++++++ synapse/storage/signatures.py | 12 +++++++ 8 files changed, 115 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index bb25c4ec55..e1302553d7 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -21,8 +21,7 @@ from synapse.api.constants import Membership, JoinRules from synapse.api.errors import AuthError, StoreError, Codes, SynapseError from synapse.api.events.room import ( RoomMemberEvent, RoomPowerLevelsEvent, RoomRedactionEvent, - RoomJoinRulesEvent, InviteJoinEvent, - RoomCreateEvent, + RoomJoinRulesEvent, RoomCreateEvent, ) from synapse.util.logutils import log_function diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index 84d3a98365..513a48f568 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -61,7 +61,7 @@ class SynapseEvent(JsonEncodedObject): "replaces_state", "redacted_because", "origin_server_ts", - "auth_chains", + "auth_events", ] internal_keys = [ diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 509f7b550c..2613fa7fce 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -14,11 +14,15 @@ # limitations under the License. from twisted.internet import defer -from synapse.api.errors import LimitExceededError +from synapse.api.errors import LimitExceededError from synapse.util.async import run_on_reactor - from synapse.crypto.event_signing import add_hashes_and_signatures +from synapse.api.events.room import ( + RoomCreateEvent, RoomMemberEvent, RoomPowerLevelsEvent, RoomJoinRulesEvent, +) +from synapse.api.constants import Membership, JoinRules +from syutil.base64util import encode_base64 import logging @@ -55,6 +59,53 @@ class BaseHandler(object): retry_after_ms=int(1000*(time_allowed - time_now)), ) + @defer.inlineCallbacks + def _add_auth(self, event): + if event.type == RoomCreateEvent.TYPE: + event.auth_events = [] + return + + auth_events = [] + + key = (RoomPowerLevelsEvent.TYPE, "", ) + power_level_event = event.old_state_events.get(key) + + if power_level_event: + auth_events.append(power_level_event.event_id) + + key = (RoomJoinRulesEvent.TYPE, "", ) + join_rule_event = event.old_state_events.get(key) + + key = (RoomMemberEvent.TYPE, event.user_id, ) + member_event = event.old_state_events.get(key) + + if join_rule_event: + join_rule = join_rule_event.content.get("join_rule") + is_public = join_rule == JoinRules.PUBLIC if join_rule else False + + if event.type == RoomMemberEvent.TYPE: + if event.content["membership"] == Membership.JOIN: + if is_public: + auth_events.append(join_rule_event.event_id) + elif member_event: + auth_events.append(member_event.event_id) + + if member_event: + if member_event.content["membership"] == Membership.JOIN: + auth_events.append(member_event.event_id) + + hashes = yield self.store.get_event_reference_hashes( + auth_events + ) + hashes = [ + { + k: encode_base64(v) for k, v in h.items() + if k == "sha256" + } + for h in hashes + ] + event.auth_events = zip(auth_events, hashes) + @defer.inlineCallbacks def _on_new_room_event(self, event, snapshot, extra_destinations=[], extra_users=[], suppress_auth=False): @@ -64,6 +115,8 @@ class BaseHandler(object): yield self.state_handler.annotate_state_groups(event) + yield self._add_auth(event) + logger.debug("Signing event...") add_hashes_and_signatures( @@ -76,6 +129,8 @@ class BaseHandler(object): logger.debug("Authing...") self.auth.check(event, raises=True) logger.debug("Authed") + else: + logger.debug("Suppressed auth.") yield self.store.persist_event(event) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 2a1970914f..48ad4d864f 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -19,7 +19,6 @@ from synapse.api.events.room import ( RoomMemberEvent, RoomTopicEvent, FeedbackEvent, RoomNameEvent, RoomJoinRulesEvent, - RoomPowerLevelsEvent, RoomRedactionEvent, ) @@ -302,6 +301,17 @@ class DataStore(RoomMemberStore, RoomStore, txn, event.event_id, prev_event_id, alg, hash_bytes ) + for auth_id, _ in event.auth_events: + self._simple_insert_txn( + txn, + table="event_auth", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "auth_id": auth_id, + }, + ) + (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) self._store_event_reference_hash_txn( txn, event.event_id, ref_alg, ref_hash_bytes diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7821fc4726..9aa404695d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -474,6 +474,8 @@ class SQLBaseStore(object): if is_state == 0 ] + ev.auth_events = self._get_auth_events(txn, ev.event_id) + if hasattr(ev, "state_key"): ev.prev_state = [ (e_id, h) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 180a764134..86c68ebf87 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -139,6 +139,27 @@ class EventFederationStore(SQLBaseStore): return results + def _get_auth_events(self, txn, event_id): + auth_ids = self._simple_select_onecol_txn( + txn, + table="event_auth", + keyvalues={ + "event_id": event_id, + }, + retcol="auth_id", + ) + + results = [] + for auth_id in auth_ids: + hashes = self._get_event_reference_hashes_txn(txn, auth_id) + prev_hashes = { + k: encode_base64(v) for k, v in hashes.items() + if k == "sha256" + } + results.append((auth_id, prev_hashes)) + + return results + def get_min_depth(self, room_id): return self.runInteraction( "get_min_depth", diff --git a/synapse/storage/schema/event_edges.sql b/synapse/storage/schema/event_edges.sql index 51695826a8..be1c72a775 100644 --- a/synapse/storage/schema/event_edges.sql +++ b/synapse/storage/schema/event_edges.sql @@ -63,3 +63,13 @@ CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities( ); CREATE INDEX IF NOT EXISTS st_extrem_id ON state_forward_extremities(event_id); + +CREATE TABLE IF NOT EXISTS event_auth( + event_id TEXT NOT NULL, + auth_id TEXT NOT NULL, + room_id TEXT NOT NULL, + CONSTRAINT uniqueness UNIQUE (event_id, auth_id, room_id) +); + +CREATE INDEX IF NOT EXISTS evauth_edges_id ON event_auth(event_id); +CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id); \ No newline at end of file diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index b4b3d5d7ea..84a49088a2 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -55,6 +55,18 @@ class SignatureStore(SQLBaseStore): or_ignore=True, ) + def get_event_reference_hashes(self, event_ids): + def f(txn): + return [ + self._get_event_reference_hashes_txn(txn, ev) + for ev in event_ids + ] + + return self.runInteraction( + "get_event_reference_hashes", + f + ) + def _get_event_reference_hashes_txn(self, txn, event_id): """Get all the hashes for a given PDU. Args: -- cgit 1.5.1 From 49948d72f3c76f0ca32b844955d3add7922180ea Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Nov 2014 10:42:44 +0000 Subject: Fix joining over federation --- synapse/api/auth.py | 53 +++++++++++++++++++++++++++++++++++++++-- synapse/api/events/__init__.py | 2 +- synapse/handlers/_base.py | 54 +----------------------------------------- synapse/handlers/federation.py | 1 + synapse/storage/__init__.py | 1 + 5 files changed, 55 insertions(+), 56 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index e1302553d7..d4f284bd60 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -24,6 +24,7 @@ from synapse.api.events.room import ( RoomJoinRulesEvent, RoomCreateEvent, ) from synapse.util.logutils import log_function +from syutil.base64util import encode_base64 import logging @@ -61,8 +62,6 @@ class Auth(object): # FIXME return True - self._can_send_event(event) - if event.type == RoomMemberEvent.TYPE: allowed = self.is_membership_change_allowed(event) if allowed: @@ -71,6 +70,8 @@ class Auth(object): logger.debug("Denying! %s", event) return allowed + self._can_send_event(event) + if event.type == RoomPowerLevelsEvent.TYPE: self._check_power_levels(event) @@ -311,6 +312,54 @@ class Auth(object): def is_server_admin(self, user): return self.store.is_server_admin(user) + @defer.inlineCallbacks + def add_auth_events(self, event): + if event.type == RoomCreateEvent.TYPE: + event.auth_events = [] + return + + auth_events = [] + + key = (RoomPowerLevelsEvent.TYPE, "", ) + power_level_event = event.old_state_events.get(key) + + if power_level_event: + auth_events.append(power_level_event.event_id) + + key = (RoomJoinRulesEvent.TYPE, "", ) + join_rule_event = event.old_state_events.get(key) + + key = (RoomMemberEvent.TYPE, event.user_id, ) + member_event = event.old_state_events.get(key) + + if join_rule_event: + join_rule = join_rule_event.content.get("join_rule") + is_public = join_rule == JoinRules.PUBLIC if join_rule else False + + if event.type == RoomMemberEvent.TYPE: + if event.content["membership"] == Membership.JOIN: + if is_public: + auth_events.append(join_rule_event.event_id) + elif member_event: + auth_events.append(member_event.event_id) + + if member_event: + if member_event.content["membership"] == Membership.JOIN: + auth_events.append(member_event.event_id) + + hashes = yield self.store.get_event_reference_hashes( + auth_events + ) + hashes = [ + { + k: encode_base64(v) for k, v in h.items() + if k == "sha256" + } + for h in hashes + ] + event.auth_events = zip(auth_events, hashes) + + @log_function def _can_send_event(self, event): key = (RoomPowerLevelsEvent.TYPE, "", ) diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index 513a48f568..e5980c4be3 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -61,7 +61,6 @@ class SynapseEvent(JsonEncodedObject): "replaces_state", "redacted_because", "origin_server_ts", - "auth_events", ] internal_keys = [ @@ -75,6 +74,7 @@ class SynapseEvent(JsonEncodedObject): "hashes", "signatures", "prev_state", + "auth_events", ] required_keys = [ diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 2613fa7fce..f630280031 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -18,11 +18,6 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError from synapse.util.async import run_on_reactor from synapse.crypto.event_signing import add_hashes_and_signatures -from synapse.api.events.room import ( - RoomCreateEvent, RoomMemberEvent, RoomPowerLevelsEvent, RoomJoinRulesEvent, -) -from synapse.api.constants import Membership, JoinRules -from syutil.base64util import encode_base64 import logging @@ -59,53 +54,6 @@ class BaseHandler(object): retry_after_ms=int(1000*(time_allowed - time_now)), ) - @defer.inlineCallbacks - def _add_auth(self, event): - if event.type == RoomCreateEvent.TYPE: - event.auth_events = [] - return - - auth_events = [] - - key = (RoomPowerLevelsEvent.TYPE, "", ) - power_level_event = event.old_state_events.get(key) - - if power_level_event: - auth_events.append(power_level_event.event_id) - - key = (RoomJoinRulesEvent.TYPE, "", ) - join_rule_event = event.old_state_events.get(key) - - key = (RoomMemberEvent.TYPE, event.user_id, ) - member_event = event.old_state_events.get(key) - - if join_rule_event: - join_rule = join_rule_event.content.get("join_rule") - is_public = join_rule == JoinRules.PUBLIC if join_rule else False - - if event.type == RoomMemberEvent.TYPE: - if event.content["membership"] == Membership.JOIN: - if is_public: - auth_events.append(join_rule_event.event_id) - elif member_event: - auth_events.append(member_event.event_id) - - if member_event: - if member_event.content["membership"] == Membership.JOIN: - auth_events.append(member_event.event_id) - - hashes = yield self.store.get_event_reference_hashes( - auth_events - ) - hashes = [ - { - k: encode_base64(v) for k, v in h.items() - if k == "sha256" - } - for h in hashes - ] - event.auth_events = zip(auth_events, hashes) - @defer.inlineCallbacks def _on_new_room_event(self, event, snapshot, extra_destinations=[], extra_users=[], suppress_auth=False): @@ -115,7 +63,7 @@ class BaseHandler(object): yield self.state_handler.annotate_state_groups(event) - yield self._add_auth(event) + yield self.auth.add_auth_events(event) logger.debug("Signing event...") diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f0448a05d8..09593303a4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -317,6 +317,7 @@ class FederationHandler(BaseHandler): snapshot.fill_out_prev_events(event) yield self.state_handler.annotate_state_groups(event) + yield self.auth.add_auth_events(event) self.auth.check(event, raises=True) pdu = self.pdu_codec.pdu_from_event(event) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 48ad4d864f..96adf20c89 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -310,6 +310,7 @@ class DataStore(RoomMemberStore, RoomStore, "room_id": event.room_id, "auth_id": auth_id, }, + or_ignore=True, ) (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) -- cgit 1.5.1 From 3b4dec442da51c6c999dd946db6ea6ce5f07ff0c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Nov 2014 11:22:12 +0000 Subject: Return auth chain when handling send_join --- synapse/federation/replication.py | 20 +++++++++++++++----- synapse/handlers/federation.py | 15 ++++++++++++--- 2 files changed, 27 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 92a9678e2c..d1eddf249d 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -426,8 +426,12 @@ class ReplicationLayer(object): @defer.inlineCallbacks def on_send_join_request(self, origin, content): pdu = Pdu(**content) - state = yield self.handler.on_send_join_request(origin, pdu) - defer.returnValue((200, self._transaction_from_pdus(state).get_dict())) + res_pdus = yield self.handler.on_send_join_request(origin, pdu) + + defer.returnValue((200, { + "state": [p.get_dict() for p in res_pdus["state"]], + "auth_chain": [p.get_dict() for p in res_pdus["auth_chain"]], + })) @defer.inlineCallbacks def make_join(self, destination, context, user_id): @@ -451,11 +455,17 @@ class ReplicationLayer(object): ) logger.debug("Got content: %s", content) - pdus = [Pdu(outlier=True, **p) for p in content.get("pdus", [])] - for pdu in pdus: + state = [Pdu(outlier=True, **p) for p in content.get("state", [])] + for pdu in state: yield self._handle_new_pdu(destination, pdu) - defer.returnValue(pdus) + auth_chain = [ + Pdu(outlier=True, **p) for p in content.get("auth_chain", []) + ] + for pdu in auth_chain: + yield self._handle_new_pdu(destination, pdu) + + defer.returnValue(state) @log_function def _get_persisted_pdu(self, event_id): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 09593303a4..c193da12b7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -366,10 +366,19 @@ class FederationHandler(BaseHandler): yield self.replication_layer.send_pdu(new_pdu) - defer.returnValue([ + auth_chain = yield self.store.get_auth_chain(event.event_id) + pdu_auth_chain = [ self.pdu_codec.pdu_from_event(e) - for e in event.state_events.values() - ]) + for e in auth_chain + ] + + defer.returnValue({ + "state": [ + self.pdu_codec.pdu_from_event(e) + for e in event.state_events.values() + ], + "auth_chain": pdu_auth_chain, + }) @defer.inlineCallbacks def get_state_for_pdu(self, event_id): -- cgit 1.5.1 From d2fb2b8095ec7f5d00b51418e84b05a1b23b79b3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Nov 2014 13:41:00 +0000 Subject: Implement invite part of invite join dance --- synapse/federation/replication.py | 15 ++++++++++++++- synapse/handlers/_base.py | 13 ++++++++++++- synapse/handlers/federation.py | 37 +++++++++++++++++++++++++++++++++++++ synapse/handlers/room.py | 32 ++++++++++++-------------------- 4 files changed, 75 insertions(+), 22 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 37e7db0536..e358de942e 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -413,7 +413,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks def on_invite_request(self, origin, content): pdu = Pdu(**content) - ret_pdu = yield self.handler.on_send_join_request(origin, pdu) + ret_pdu = yield self.handler.on_invite_request(origin, pdu) defer.returnValue((200, ret_pdu.get_dict())) @defer.inlineCallbacks @@ -460,6 +460,19 @@ class ReplicationLayer(object): defer.returnValue(state) + @defer.inlineCallbacks + def send_invite(self, destination, context, event_id, pdu): + code, pdu_dict = yield self.transport_layer.send_invite( + destination=destination, + context=context, + event_id=event_id, + content=pdu.get_dict(), + ) + + logger.debug("Got response to send_invite: %s", pdu_dict) + + defer.returnValue(Pdu(**pdu_dict)) + @log_function def _get_persisted_pdu(self, event_id): """ Get a PDU from the database with given origin and id. diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index f630280031..9dc2fc2e0f 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -56,7 +56,8 @@ class BaseHandler(object): @defer.inlineCallbacks def _on_new_room_event(self, event, snapshot, extra_destinations=[], - extra_users=[], suppress_auth=False): + extra_users=[], suppress_auth=False, + do_invite_host=None): yield run_on_reactor() snapshot.fill_out_prev_events(event) @@ -80,6 +81,16 @@ class BaseHandler(object): else: logger.debug("Suppressed auth.") + if do_invite_host: + federation_handler = self.hs.get_handlers().federation_handler + invite_event = yield federation_handler.send_invite( + do_invite_host, + event + ) + + # FIXME: We need to check if the remote changed anything else + event.signatures = invite_event.signatures + yield self.store.persist_event(event) destinations = set(extra_destinations) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c193da12b7..e6afd95a58 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -23,6 +23,7 @@ from synapse.api.constants import Membership from synapse.util.logutils import log_function from synapse.federation.pdu_codec import PduCodec from synapse.util.async import run_on_reactor +from synapse.crypto.event_signing import compute_event_signature from twisted.internet import defer @@ -212,6 +213,17 @@ class FederationHandler(BaseHandler): defer.returnValue(events) + @defer.inlineCallbacks + def send_invite(self, target_host, event): + pdu = yield self.replication_layer.send_invite( + destination=target_host, + context=event.room_id, + event_id=event.event_id, + pdu=self.pdu_codec.pdu_from_event(event) + ) + + defer.returnValue(self.pdu_codec.event_from_pdu(pdu)) + @log_function @defer.inlineCallbacks def do_invite_join(self, target_host, room_id, joinee, content, snapshot): @@ -380,6 +392,31 @@ class FederationHandler(BaseHandler): "auth_chain": pdu_auth_chain, }) + @defer.inlineCallbacks + def on_invite_request(self, origin, pdu): + event = self.pdu_codec.event_from_pdu(pdu) + + event.outlier = True + + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) + + yield self.state_handler.annotate_state_groups(event) + + yield self.store.persist_event( + event, + backfilled=False, + ) + + yield self.notifier.on_new_room_event(event) + + defer.returnValue(self.pdu_codec.pdu_from_event(event)) + @defer.inlineCallbacks def get_state_for_pdu(self, event_id): yield run_on_reactor() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 42a6c9f9bf..3642fcfc6d 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -361,13 +361,6 @@ class RoomMemberHandler(BaseHandler): if prev_state: event.content["prev"] = prev_state.membership -# if prev_state and prev_state.membership == event.membership: -# # treat this event as a NOOP. -# if do_auth: # This is mainly to fix a unit test. -# yield self.auth.check(event, raises=True) -# defer.returnValue({}) -# return - room_id = event.room_id # If we're trying to join a room then we have to do this differently @@ -521,25 +514,24 @@ class RoomMemberHandler(BaseHandler): defer.returnValue([r.room_id for r in rooms]) + @defer.inlineCallbacks def _do_local_membership_update(self, event, membership, snapshot, do_auth): - destinations = [] - # If we're inviting someone, then we should also send it to that # HS. target_user_id = event.state_key target_user = self.hs.parse_userid(target_user_id) - if membership == Membership.INVITE: - host = target_user.domain - destinations.append(host) - - # Always include target domain - host = target_user.domain - destinations.append(host) - - return self._on_new_room_event( - event, snapshot, extra_destinations=destinations, - extra_users=[target_user], suppress_auth=(not do_auth), + if membership == Membership.INVITE and not target_user.is_mine: + do_invite_host = target_user.domain + else: + do_invite_host = None + + yield self._on_new_room_event( + event, + snapshot, + extra_users=[target_user], + suppress_auth=(not do_auth), + do_invite_host=do_invite_host, ) -- cgit 1.5.1 From 02c3b1c9e2afa27753e9ce898e5455b6489541b4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Nov 2014 15:35:53 +0000 Subject: Add '/event_auth/' federation api --- synapse/federation/replication.py | 5 +++++ synapse/federation/transport.py | 26 ++++++++++++++++++++++++++ synapse/handlers/federation.py | 5 +++++ synapse/storage/event_federation.py | 26 +++++++++++++++++++------- 4 files changed, 55 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index e358de942e..719bfcc42c 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -426,6 +426,11 @@ class ReplicationLayer(object): "auth_chain": [p.get_dict() for p in res_pdus["auth_chain"]], })) + @defer.inlineCallbacks + def on_event_auth(self, origin, context, event_id): + auth_pdus = yield self.handler.on_event_auth(event_id) + defer.returnValue((200, [a.get_dict() for a in auth_pdus])) + @defer.inlineCallbacks def make_join(self, destination, context, user_id): pdu_dict = yield self.transport_layer.make_join( diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index b9f7d54c71..babe8447eb 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -256,6 +256,21 @@ class TransportLayer(object): defer.returnValue(json.loads(content)) + @defer.inlineCallbacks + @log_function + def get_event_auth(self, destination, context, event_id): + path = PREFIX + "/event_auth/%s/%s" % ( + context, + event_id, + ) + + response = yield self.client.get_json( + destination=destination, + path=path, + ) + + defer.returnValue(response) + @defer.inlineCallbacks def _authenticate_request(self, request): json_request = { @@ -426,6 +441,17 @@ class TransportLayer(object): ) ) + self.server.register_path( + "GET", + re.compile("^" + PREFIX + "/event_auth/([^/]*)/([^/]*)$"), + self._with_authentication( + lambda origin, content, query, context, event_id: + handler.on_event_auth( + origin, context, event_id, + ) + ) + ) + self.server.register_path( "PUT", re.compile("^" + PREFIX + "/send_join/([^/]*)/([^/]*)$"), diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index e6afd95a58..ce65bbcd62 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -224,6 +224,11 @@ class FederationHandler(BaseHandler): defer.returnValue(self.pdu_codec.event_from_pdu(pdu)) + @defer.inlineCallbacks + def on_event_auth(self, event_id): + auth = yield self.store.get_auth_chain(event_id) + defer.returnValue([self.pdu_codec.pdu_from_event(e) for e in auth]) + @log_function @defer.inlineCallbacks def do_invite_join(self, target_host, room_id, joinee, content, snapshot): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index d66a49e9f2..06e32d592d 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -32,6 +32,24 @@ class EventFederationStore(SQLBaseStore): ) def _get_auth_chain_txn(self, txn, event_id): + results = self._get_auth_chain_ids_txn(txn, event_id) + + sql = "SELECT * FROM events WHERE event_id = ?" + rows = [] + for ev_id in results: + c = txn.execute(sql, (ev_id,)) + rows.extend(self.cursor_to_dict(c)) + + return self._parse_events_txn(txn, rows) + + def get_auth_chain_ids(self, event_id): + return self.runInteraction( + "get_auth_chain_ids", + self._get_auth_chain_ids_txn, + event_id + ) + + def _get_auth_chain_ids_txn(self, txn, event_id): results = set() base_sql = ( @@ -48,13 +66,7 @@ class EventFederationStore(SQLBaseStore): front = [r[0] for r in txn.fetchall()] results.update(front) - sql = "SELECT * FROM events WHERE event_id = ?" - rows = [] - for ev_id in results: - c = txn.execute(sql, (ev_id,)) - rows.extend(self.cursor_to_dict(c)) - - return self._parse_events_txn(txn, rows) + return list(results) def get_oldest_events_in_room(self, room_id): return self.runInteraction( -- cgit 1.5.1 From 07286a73b179c7b3888f7cbf15dcf7c4f5a0c3a3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Nov 2014 16:03:31 +0000 Subject: Use current state to get room hosts, rather than querying the database --- synapse/handlers/_base.py | 18 +++++++++++++++--- synapse/handlers/federation.py | 21 +++++++++++++++++---- 2 files changed, 32 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 9dc2fc2e0f..07a8464107 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -18,6 +18,8 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError from synapse.util.async import run_on_reactor from synapse.crypto.event_signing import add_hashes_and_signatures +from synapse.api.events.room import RoomMemberEvent +from synapse.api.constants import Membership import logging @@ -95,9 +97,19 @@ class BaseHandler(object): destinations = set(extra_destinations) # Send a PDU to all hosts who have joined the room. - destinations.update((yield self.store.get_joined_hosts_for_room( - event.room_id - ))) + + for k, s in event.state_events.items(): + try: + if k[0] == RoomMemberEvent.TYPE: + if s.content["membership"] == Membership.JOIN: + destinations.add( + self.hs.parse_userid(s.state_key).domain + ) + except: + logger.warn( + "Failed to get destination from event %s", s.event_id + ) + event.destinations = list(destinations) self.notifier.on_new_room_event(event, extra_users=extra_users) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ce65bbcd62..7e10583902 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -376,10 +376,23 @@ class FederationHandler(BaseHandler): "user_joined_room", user=user, room_id=event.room_id ) - new_pdu = self.pdu_codec.pdu_from_event(event); - new_pdu.destinations = yield self.store.get_joined_hosts_for_room( - event.room_id - ) + new_pdu = self.pdu_codec.pdu_from_event(event) + + destinations = set() + + for k, s in event.state_events.items(): + try: + if k[0] == RoomMemberEvent.TYPE: + if s.content["membership"] == Membership.JOIN: + destinations.add( + self.hs.parse_userid(s.state_key).domain + ) + except: + logger.warn( + "Failed to get destination from event %s", s.event_id + ) + + new_pdu.destinations = list(destinations) yield self.replication_layer.send_pdu(new_pdu) -- cgit 1.5.1 From 65f846ade09feba129a14b2b9714c2ad51d00f0a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Nov 2014 11:15:02 +0000 Subject: Notify users about invites. --- synapse/api/auth.py | 16 +++++++++------- synapse/handlers/federation.py | 5 ++++- synapse/storage/stream.py | 3 +-- 3 files changed, 14 insertions(+), 10 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 077d1ab0bf..3e5d878eed 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -337,15 +337,17 @@ class Auth(object): if join_rule_event: join_rule = join_rule_event.content.get("join_rule") is_public = join_rule == JoinRules.PUBLIC if join_rule else False + else: + is_public = False - if event.type == RoomMemberEvent.TYPE: - if event.content["membership"] == Membership.JOIN: - if is_public: - auth_events.append(join_rule_event.event_id) - elif member_event: - auth_events.append(member_event.event_id) + if event.type == RoomMemberEvent.TYPE: + e_type = event.content["membership"] + if e_type in [Membership.JOIN, Membership.INVITE]: + auth_events.append(join_rule_event.event_id) - if member_event: + if member_event and not is_public: + auth_events.append(member_event.event_id) + elif member_event: if member_event.content["membership"] == Membership.JOIN: auth_events.append(member_event.event_id) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7e10583902..9a59fe94d2 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -431,7 +431,10 @@ class FederationHandler(BaseHandler): backfilled=False, ) - yield self.notifier.on_new_room_event(event) + target_user = self.hs.parse_userid(event.state_key) + yield self.notifier.on_new_room_event( + event, extra_users=[target_user], + ) defer.returnValue(self.pdu_codec.pdu_from_event(event)) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 8f7f61d29d..475e7f20a1 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -177,10 +177,9 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT *, (%(redacted)s) AS redacted FROM events AS e WHERE " - "((room_id IN (%(current)s)) OR " + "(e.outlier = 0 AND (room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " "AND e.stream_ordering > ? AND e.stream_ordering <= ? " - "AND e.outlier = 0 " "ORDER BY stream_ordering ASC LIMIT %(limit)d " ) % { "redacted": del_sql, -- cgit 1.5.1 From 6447db063a0d01135582bdfb3392b419f16a19e7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Nov 2014 11:59:51 +0000 Subject: Fix backfill to work. Add auth to backfill request --- synapse/api/auth.py | 6 ++++++ synapse/federation/replication.py | 36 ++++++++++++++++++++++++++++-------- synapse/federation/transport.py | 6 +++--- synapse/handlers/federation.py | 10 +++++----- synapse/storage/_base.py | 12 ++++++++++++ synapse/storage/event_federation.py | 4 ++-- 6 files changed, 56 insertions(+), 18 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 3e5d878eed..48f9d460a3 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -104,6 +104,12 @@ class Auth(object): pass defer.returnValue(None) + @defer.inlineCallbacks + def check_host_in_room(self, room_id, host): + joined_hosts = yield self.store.get_joined_hosts_for_room(room_id) + + defer.returnValue(host in joined_hosts) + def check_event_sender_in_room(self, event): key = (RoomMemberEvent.TYPE, event.user_id, ) member_event = event.state_events.get(key) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 719bfcc42c..7837f1c252 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -205,7 +205,7 @@ class ReplicationLayer(object): pdus = [Pdu(outlier=False, **p) for p in transaction.pdus] for pdu in pdus: - yield self._handle_new_pdu(pdu, backfilled=True) + yield self._handle_new_pdu(dest, pdu, backfilled=True) defer.returnValue(pdus) @@ -274,9 +274,9 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_backfill_request(self, context, versions, limit): + def on_backfill_request(self, origin, context, versions, limit): pdus = yield self.handler.on_backfill_request( - context, versions, limit + origin, context, versions, limit ) defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) @@ -408,13 +408,22 @@ class ReplicationLayer(object): @defer.inlineCallbacks def on_make_join_request(self, context, user_id): pdu = yield self.handler.on_make_join_request(context, user_id) - defer.returnValue(pdu.get_dict()) + defer.returnValue({ + "event": pdu.get_dict(), + }) @defer.inlineCallbacks def on_invite_request(self, origin, content): pdu = Pdu(**content) ret_pdu = yield self.handler.on_invite_request(origin, pdu) - defer.returnValue((200, ret_pdu.get_dict())) + defer.returnValue( + ( + 200, + { + "event": ret_pdu.get_dict(), + } + ) + ) @defer.inlineCallbacks def on_send_join_request(self, origin, content): @@ -429,16 +438,25 @@ class ReplicationLayer(object): @defer.inlineCallbacks def on_event_auth(self, origin, context, event_id): auth_pdus = yield self.handler.on_event_auth(event_id) - defer.returnValue((200, [a.get_dict() for a in auth_pdus])) + defer.returnValue( + ( + 200, + { + "auth_chain": [a.get_dict() for a in auth_pdus], + } + ) + ) @defer.inlineCallbacks def make_join(self, destination, context, user_id): - pdu_dict = yield self.transport_layer.make_join( + ret = yield self.transport_layer.make_join( destination=destination, context=context, user_id=user_id, ) + pdu_dict = ret["event"] + logger.debug("Got response to make_join: %s", pdu_dict) defer.returnValue(Pdu(**pdu_dict)) @@ -467,13 +485,15 @@ class ReplicationLayer(object): @defer.inlineCallbacks def send_invite(self, destination, context, event_id, pdu): - code, pdu_dict = yield self.transport_layer.send_invite( + code, content = yield self.transport_layer.send_invite( destination=destination, context=context, event_id=event_id, content=pdu.get_dict(), ) + pdu_dict = content["event"] + logger.debug("Got response to send_invite: %s", pdu_dict) defer.returnValue(Pdu(**pdu_dict)) diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index babe8447eb..92a1f4ce17 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -413,7 +413,7 @@ class TransportLayer(object): self._with_authentication( lambda origin, content, query, context: self._on_backfill_request( - context, query["v"], query["limit"] + origin, context, query["v"], query["limit"] ) ) ) @@ -552,7 +552,7 @@ class TransportLayer(object): defer.returnValue(data) @log_function - def _on_backfill_request(self, context, v_list, limits): + def _on_backfill_request(self, origin, context, v_list, limits): if not limits: return defer.succeed( (400, {"error": "Did not include limit param"}) @@ -563,7 +563,7 @@ class TransportLayer(object): versions = v_list return self.request_handler.on_backfill_request( - context, versions, limit + origin, context, versions, limit ) @defer.inlineCallbacks diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9a59fe94d2..00d10609b8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -193,10 +193,7 @@ class FederationHandler(BaseHandler): dest, room_id, limit, - extremities=[ - self.pdu_codec.decode_event_id(e) - for e in extremities - ] + extremities=extremities, ) events = [] @@ -473,7 +470,10 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def on_backfill_request(self, context, pdu_list, limit): + def on_backfill_request(self, origin, context, pdu_list, limit): + in_room = yield self.auth.check_host_in_room(context, origin) + if not in_room: + raise AuthError(403, "Host not in room.") events = yield self.store.get_backfill_events( context, diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 3ab81a78d5..a23f2b941b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -447,6 +447,18 @@ class SQLBaseStore(object): **d ) + def _get_events_txn(self, txn, event_ids): + # FIXME (erikj): This should be batched? + + sql = "SELECT * FROM events WHERE event_id = ?" + + event_rows = [] + for e_id in event_ids: + c = txn.execute(sql, (e_id,)) + event_rows.extend(self.cursor_to_dict(c)) + + return self._parse_events_txn(txn, event_rows) + def _parse_events(self, rows): return self.runInteraction( "_parse_events", self._parse_events_txn, rows diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 06e32d592d..a707030145 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -371,10 +371,10 @@ class EventFederationStore(SQLBaseStore): "_backfill_interaction: got id=%s", *row ) - new_front.append(row) + new_front.append(row[0]) front = new_front event_results += new_front # We also want to update the `prev_pdus` attributes before returning. - return self._get_pdu_tuples(txn, event_results) + return self._get_events_txn(txn, event_results) -- cgit 1.5.1 From 003668cfaadc2e96fab0127d7c563eb4b17e0619 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Nov 2014 13:37:24 +0000 Subject: Add auth to the various server-server APIs --- synapse/federation/replication.py | 14 ++++++++------ synapse/federation/transport.py | 3 ++- synapse/handlers/federation.py | 15 +++++++++++++-- 3 files changed, 23 insertions(+), 9 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 7837f1c252..dd8124dbb9 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -347,10 +347,12 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_context_state_request(self, context, event_id): + def on_context_state_request(self, origin, context, event_id): if event_id: pdus = yield self.handler.get_state_for_pdu( - event_id + origin, + context, + event_id, ) else: raise NotImplementedError("Specify an event") @@ -365,8 +367,8 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_pdu_request(self, event_id): - pdu = yield self._get_persisted_pdu(event_id) + def on_pdu_request(self, origin, event_id): + pdu = yield self._get_persisted_pdu(origin, event_id) if pdu: defer.returnValue( @@ -499,13 +501,13 @@ class ReplicationLayer(object): defer.returnValue(Pdu(**pdu_dict)) @log_function - def _get_persisted_pdu(self, event_id): + def _get_persisted_pdu(self, origin, event_id): """ Get a PDU from the database with given origin and id. Returns: Deferred: Results in a `Pdu`. """ - return self.handler.get_persisted_pdu(event_id) + return self.handler.get_persisted_pdu(origin, event_id) def _transaction_from_pdus(self, pdu_list): """Returns a new Transaction containing the given PDUs suitable for diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index 92a1f4ce17..d84a44c211 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -390,7 +390,7 @@ class TransportLayer(object): re.compile("^" + PREFIX + "/event/([^/]*)/$"), self._with_authentication( lambda origin, content, query, event_id: - handler.on_pdu_request(event_id) + handler.on_pdu_request(origin, event_id) ) ) @@ -401,6 +401,7 @@ class TransportLayer(object): self._with_authentication( lambda origin, content, query, context: handler.on_context_state_request( + origin, context, query.get("event_id", [None])[0], ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 00d10609b8..587fa308c8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -436,9 +436,13 @@ class FederationHandler(BaseHandler): defer.returnValue(self.pdu_codec.pdu_from_event(event)) @defer.inlineCallbacks - def get_state_for_pdu(self, event_id): + def get_state_for_pdu(self, origin, room_id, event_id): yield run_on_reactor() + in_room = yield self.auth.check_host_in_room(room_id, origin) + if not in_room: + raise AuthError(403, "Host not in room.") + state_groups = yield self.store.get_state_groups( [event_id] ) @@ -488,7 +492,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def get_persisted_pdu(self, event_id): + def get_persisted_pdu(self, origin, event_id): """ Get a PDU from the database with given origin and id. Returns: @@ -500,6 +504,13 @@ class FederationHandler(BaseHandler): ) if event: + in_room = yield self.auth.check_host_in_room( + event.room_id, + origin + ) + if not in_room: + raise AuthError(403, "Host not in room.") + defer.returnValue(self.pdu_codec.pdu_from_event(event)) else: defer.returnValue(None) -- cgit 1.5.1 From 5d439b127ba34b951dfd09a7d3c684c2d50df702 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Nov 2014 13:46:44 +0000 Subject: PEP8 --- synapse/api/auth.py | 3 +-- synapse/api/events/room.py | 1 + synapse/federation/replication.py | 1 - synapse/federation/transport.py | 9 ++++++--- synapse/federation/units.py | 7 +++---- synapse/handlers/federation.py | 5 ++++- synapse/storage/__init__.py | 7 ++++--- synapse/storage/event_federation.py | 9 +++------ 8 files changed, 22 insertions(+), 20 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 48f9d460a3..a5c6964707 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -369,7 +369,6 @@ class Auth(object): ] event.auth_events = zip(auth_events, hashes) - @log_function def _can_send_event(self, event): key = (RoomPowerLevelsEvent.TYPE, "", ) @@ -452,7 +451,7 @@ class Auth(object): event.user_id, ) - # Check other levels: + # Check other levels: levels_to_check = [ ("users_default", []), ("events_default", []), diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py index 25bc883706..8c4ac45d02 100644 --- a/synapse/api/events/room.py +++ b/synapse/api/events/room.py @@ -153,6 +153,7 @@ class RoomPowerLevelsEvent(SynapseStateEvent): def get_content_template(self): return {} + class RoomAliasesEvent(SynapseStateEvent): TYPE = "m.room.aliases" diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index e798304353..bacba36755 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -549,7 +549,6 @@ class ReplicationLayer(object): origin, pdu.room_id, pdu.event_id, ) - if not backfilled: ret = yield self.handler.on_receive_pdu( pdu, diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index d84a44c211..95c40c6c1b 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -284,7 +284,7 @@ class TransportLayer(object): origin = None if request.method == "PUT": - #TODO: Handle other method types? other content types? + # TODO: Handle other method types? other content types? try: content_bytes = request.content.read() content = json.loads(content_bytes) @@ -296,11 +296,13 @@ class TransportLayer(object): try: params = auth.split(" ")[1].split(",") param_dict = dict(kv.split("=") for kv in params) + def strip_quotes(value): if value.startswith("\""): return value[1:-1] else: return value + origin = strip_quotes(param_dict["origin"]) key = strip_quotes(param_dict["key"]) sig = strip_quotes(param_dict["sig"]) @@ -321,7 +323,7 @@ class TransportLayer(object): if auth.startswith("X-Matrix"): (origin, key, sig) = parse_auth_header(auth) json_request["origin"] = origin - json_request["signatures"].setdefault(origin,{})[key] = sig + json_request["signatures"].setdefault(origin, {})[key] = sig if not json_request["signatures"]: raise SynapseError( @@ -515,7 +517,8 @@ class TransportLayer(object): return try: - code, response = yield self.received_handler.on_incoming_transaction( + handler = self.received_handler + code, response = yield handler.on_incoming_transaction( transaction_data ) except: diff --git a/synapse/federation/units.py b/synapse/federation/units.py index d98014cac7..f4e7b62bd9 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -192,7 +192,9 @@ class Transaction(JsonEncodedObject): transaction_id and origin_server_ts keys. """ if "origin_server_ts" not in kwargs: - raise KeyError("Require 'origin_server_ts' to construct a Transaction") + raise KeyError( + "Require 'origin_server_ts' to construct a Transaction" + ) if "transaction_id" not in kwargs: raise KeyError( "Require 'transaction_id' to construct a Transaction" @@ -204,6 +206,3 @@ class Transaction(JsonEncodedObject): kwargs["pdus"] = [p.get_dict() for p in pdus] return Transaction(**kwargs) - - - diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 587fa308c8..e909af6bd8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -521,6 +521,9 @@ class FederationHandler(BaseHandler): @log_function def _on_user_joined(self, user, room_id): - waiters = self.waiting_for_join_list.get((user.to_string(), room_id), []) + waiters = self.waiting_for_join_list.get( + (user.to_string(), room_id), + [] + ) while waiters: waiters.pop().callback(None) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 7d810e6a62..4034437f6b 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -494,11 +494,13 @@ def prepare_database(db_conn): user_version = row[0] if user_version > SCHEMA_VERSION: - raise ValueError("Cannot use this database as it is too " + + raise ValueError( + "Cannot use this database as it is too " + "new for the server to understand" ) elif user_version < SCHEMA_VERSION: - logging.info("Upgrading database from version %d", + logging.info( + "Upgrading database from version %d", user_version ) @@ -520,4 +522,3 @@ def prepare_database(db_conn): c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION) c.close() - diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index a707030145..a027db3868 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -215,7 +215,7 @@ class EventFederationStore(SQLBaseStore): min_depth = self._simple_select_one_onecol_txn( txn, table="room_depth", - keyvalues={"room_id": room_id,}, + keyvalues={"room_id": room_id}, retcol="min_depth", allow_none=True, ) @@ -267,10 +267,8 @@ class EventFederationStore(SQLBaseStore): } ) - - - # We only insert as a forward extremity the new pdu if there are no - # other pdus that reference it as a prev pdu + # We only insert as a forward extremity the new pdu if there are + # no other pdus that reference it as a prev pdu query = ( "INSERT OR IGNORE INTO %(table)s (event_id, room_id) " "SELECT ?, ? WHERE NOT EXISTS (" @@ -312,7 +310,6 @@ class EventFederationStore(SQLBaseStore): ) txn.execute(query) - def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occured before (and including) the pdus in pdu_list. Return a list of max size `limit`. -- cgit 1.5.1 From a8e565eca8cbfcedbdfd812c98a6545c2fc31afd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Nov 2014 18:24:43 +0000 Subject: Add an EventValidator. Fix bugs in auth ++ storage --- synapse/api/auth.py | 16 +++++++---- synapse/api/events/__init__.py | 61 ----------------------------------------- synapse/handlers/profile.py | 11 +++++--- synapse/rest/base.py | 2 ++ synapse/rest/room.py | 13 +++++++++ synapse/server.py | 5 ++++ synapse/storage/_base.py | 2 +- synapse/storage/registration.py | 6 +++- synapse/storage/room.py | 38 +++++++++++++------------ 9 files changed, 64 insertions(+), 90 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index a5c6964707..6c2d3db26e 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -70,6 +70,7 @@ class Auth(object): logger.debug("Denying! %s", event) return allowed + self.check_event_sender_in_room(event) self._can_send_event(event) if event.type == RoomPowerLevelsEvent.TYPE: @@ -83,8 +84,10 @@ class Auth(object): else: raise AuthError(500, "Unknown event: %s" % event) except AuthError as e: - logger.info("Event auth check failed on event %s with msg: %s", - event, e.msg) + logger.info( + "Event auth check failed on event %s with msg: %s", + event, e.msg + ) logger.info("Denying! %s", event) if raises: raise e @@ -277,7 +280,7 @@ class Auth(object): default=[""] )[0] if user and access_token and ip_addr: - self.store.insert_client_ip( + yield self.store.insert_client_ip( user=user, access_token=access_token, device_id=user_info["device_id"], @@ -349,7 +352,8 @@ class Auth(object): if event.type == RoomMemberEvent.TYPE: e_type = event.content["membership"] if e_type in [Membership.JOIN, Membership.INVITE]: - auth_events.append(join_rule_event.event_id) + if join_rule_event: + auth_events.append(join_rule_event.event_id) if member_event and not is_public: auth_events.append(member_event.event_id) @@ -405,7 +409,9 @@ class Auth(object): if user_level < send_level: raise AuthError( - 403, "You don't have permission to post that to the room" + 403, + "You don't have permission to post that to the room. " + + "user_level (%d) < send_level (%d)" % (user_level, send_level) ) return True diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index f1e53f23ab..1d8bed2906 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.api.errors import SynapseError, Codes from synapse.util.jsonobject import JsonEncodedObject @@ -118,66 +117,6 @@ class SynapseEvent(JsonEncodedObject): """ raise NotImplementedError("get_content_template not implemented.") - def check_json(self, content, raises=True): - """Checks the given JSON content abides by the rules of the template. - - Args: - content : A JSON object to check. - raises: True to raise a SynapseError if the check fails. - Returns: - True if the content passes the template. Returns False if the check - fails and raises=False. - Raises: - SynapseError if the check fails and raises=True. - """ - # recursively call to inspect each layer - err_msg = self._check_json(content, self.get_content_template()) - if err_msg: - if raises: - raise SynapseError(400, err_msg, Codes.BAD_JSON) - else: - return False - else: - return True - - def _check_json(self, content, template): - """Check content and template matches. - - If the template is a dict, each key in the dict will be validated with - the content, else it will just compare the types of content and - template. This basic type check is required because this function will - be recursively called and could be called with just strs or ints. - - Args: - content: The content to validate. - template: The validation template. - Returns: - str: An error message if the validation fails, else None. - """ - if type(content) != type(template): - return "Mismatched types: %s" % template - - if type(template) == dict: - for key in template: - if key not in content: - return "Missing %s key" % key - - if type(content[key]) != type(template[key]): - return "Key %s is of the wrong type (got %s, want %s)" % ( - key, type(content[key]), type(template[key])) - - if type(content[key]) == dict: - # we must go deeper - msg = self._check_json(content[key], template[key]) - if msg: - return msg - elif type(content[key]) == list: - # make sure each item type in content matches the template - for entry in content[key]: - msg = self._check_json(entry, template[key][0]) - if msg: - return msg - class SynapseStateEvent(SynapseEvent): diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index e47814483a..834b37f5f3 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -152,10 +152,13 @@ class ProfileHandler(BaseHandler): if not user.is_mine: defer.returnValue(None) - (displayname, avatar_url) = yield defer.gatherResults([ - self.store.get_profile_displayname(user.localpart), - self.store.get_profile_avatar_url(user.localpart), - ]) + (displayname, avatar_url) = yield defer.gatherResults( + [ + self.store.get_profile_displayname(user.localpart), + self.store.get_profile_avatar_url(user.localpart), + ], + consumeErrors=True + ) state["displayname"] = displayname state["avatar_url"] = avatar_url diff --git a/synapse/rest/base.py b/synapse/rest/base.py index dc784c1527..79fc4dfb84 100644 --- a/synapse/rest/base.py +++ b/synapse/rest/base.py @@ -67,6 +67,8 @@ class RestServlet(object): self.auth = hs.get_auth() self.txns = HttpTransactionStore() + self.validator = hs.get_event_validator() + def register(self, http_server): """ Register this servlet with the given HTTP server. """ if hasattr(self, "PATTERN"): diff --git a/synapse/rest/room.py b/synapse/rest/room.py index 5c9c9d3af4..05da0be090 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -154,6 +154,9 @@ class RoomStateEventRestServlet(RestServlet): user_id=user.to_string(), state_key=urllib.unquote(state_key) ) + + self.validator.validate(event) + if event_type == RoomMemberEvent.TYPE: # membership events are special handler = self.handlers.room_member_handler @@ -188,6 +191,8 @@ class RoomSendEventRestServlet(RestServlet): content=content ) + self.validator.validate(event) + msg_handler = self.handlers.message_handler yield msg_handler.send_message(event) @@ -253,6 +258,9 @@ class JoinRoomAliasServlet(RestServlet): user_id=user.to_string(), state_key=user.to_string() ) + + self.validator.validate(event) + handler = self.handlers.room_member_handler yield handler.change_membership(event) defer.returnValue((200, {})) @@ -424,6 +432,9 @@ class RoomMembershipRestServlet(RestServlet): user_id=user.to_string(), state_key=state_key ) + + self.validator.validate(event) + handler = self.handlers.room_member_handler yield handler.change_membership(event) defer.returnValue((200, {})) @@ -461,6 +472,8 @@ class RoomRedactEventRestServlet(RestServlet): redacts=urllib.unquote(event_id), ) + self.validator.validate(event) + msg_handler = self.handlers.message_handler yield msg_handler.send_message(event) diff --git a/synapse/server.py b/synapse/server.py index d770b20b19..da0a44433a 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -22,6 +22,7 @@ from synapse.federation import initialize_http_replication from synapse.api.events import serialize_event from synapse.api.events.factory import EventFactory +from synapse.api.events.validator import EventValidator from synapse.notifier import Notifier from synapse.api.auth import Auth from synapse.handlers import Handlers @@ -80,6 +81,7 @@ class BaseHomeServer(object): 'event_sources', 'ratelimiter', 'keyring', + 'event_validator', ] def __init__(self, hostname, **kwargs): @@ -223,6 +225,9 @@ class HomeServer(BaseHomeServer): def build_keyring(self): return Keyring(self) + def build_event_validator(self): + return EventValidator(self) + def register_servlets(self): """ Register all servlets associated with this HomeServer. """ diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2df64bdfeb..a1ee0318f6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -212,7 +212,7 @@ class SQLBaseStore(object): retcol : string giving the name of the column to return """ return self.runInteraction( - "_simple_select_one_onecol_txn", + "_simple_select_one_onecol", self._simple_select_one_onecol_txn, table, keyvalues, retcol, allow_none=allow_none, ) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index a2ca6f9a69..1f89d77344 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -107,13 +107,17 @@ class RegistrationStore(SQLBaseStore): token ) + @defer.inlineCallbacks def is_server_admin(self, user): - return self._simple_select_one_onecol( + res = yield self._simple_select_one_onecol( table="users", keyvalues={"name": user.to_string()}, retcol="admin", + allow_none=True, ) + defer.returnValue(res if res else False) + def _query_for_auth(self, txn, token): sql = ( "SELECT users.name, users.admin, access_tokens.device_id " diff --git a/synapse/storage/room.py b/synapse/storage/room.py index ca70506d28..cc0513b8d2 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -133,26 +133,28 @@ class RoomStore(SQLBaseStore): defer.returnValue(ret) def _store_room_topic_txn(self, txn, event): - self._simple_insert_txn( - txn, - "topics", - { - "event_id": event.event_id, - "room_id": event.room_id, - "topic": event.topic, - } - ) + if hasattr(event, "topic"): + self._simple_insert_txn( + txn, + "topics", + { + "event_id": event.event_id, + "room_id": event.room_id, + "topic": event.topic, + } + ) def _store_room_name_txn(self, txn, event): - self._simple_insert_txn( - txn, - "room_names", - { - "event_id": event.event_id, - "room_id": event.room_id, - "name": event.name, - } - ) + if hasattr(event, "name"): + self._simple_insert_txn( + txn, + "room_names", + { + "event_id": event.event_id, + "room_id": event.room_id, + "name": event.name, + } + ) class RoomsTable(Table): -- cgit 1.5.1 From 5ff0bfb81dfbf27d5c889113d08edb609a2d145a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Nov 2014 14:16:41 +0000 Subject: Fix bug where we /always/ created a new state group --- synapse/handlers/federation.py | 3 ++- synapse/state.py | 61 ++++++++++++++++++++++++++---------------- synapse/storage/state.py | 9 ++----- 3 files changed, 42 insertions(+), 31 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index e909af6bd8..c2cd91bb39 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -448,8 +448,9 @@ class FederationHandler(BaseHandler): ) if state_groups: + _, state = state_groups.items().pop() results = { - (e.type, e.state_key): e for e in state_groups[0].state + (e.type, e.state_key): e for e in state } event = yield self.store.get_event(event_id) diff --git a/synapse/state.py b/synapse/state.py index e2fd48bdae..a58acb3c1e 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -42,9 +42,6 @@ class StateHandler(object): def __init__(self, hs): self.store = hs.get_datastore() - self._replication = hs.get_replication_layer() - self.server_name = hs.hostname - self.hs = hs @defer.inlineCallbacks @log_function @@ -71,9 +68,10 @@ class StateHandler(object): defer.returnValue(False) return - new_state = yield self.resolve_state_groups( - [e for e, _ in event.prev_events] - ) + ids = [e for e, _ in event.prev_events] + + ret = yield self.resolve_state_groups(ids) + state_group, new_state = ret event.old_state_events = copy.deepcopy(new_state) @@ -82,6 +80,10 @@ class StateHandler(object): if key in new_state: event.replaces_state = new_state[key].event_id new_state[key] = event + elif state_group: + event.state_group = state_group + event.state_events = new_state + defer.returnValue(False) event.state_group = None event.state_events = new_state @@ -100,7 +102,7 @@ class StateHandler(object): res = yield self.resolve_state_groups(event_ids) if event_type: - defer.returnValue(res.get((event_type, state_key))) + defer.returnValue(res[1].get((event_type, state_key))) return defer.returnValue(res.values()) @@ -112,9 +114,18 @@ class StateHandler(object): event_ids ) + group_names = set(state_groups.keys()) + if len(group_names) == 1: + name, state_list = state_groups.items().pop() + state = { + (e.type, e.state_key): e + for e in state_list + } + defer.returnValue((name, state)) + state = {} - for group in state_groups: - for s in group.state: + for group, g_state in state_groups.items(): + for s in g_state: state.setdefault( (s.type, s.state_key), {} @@ -135,25 +146,29 @@ class StateHandler(object): new_state = {} new_state.update(unconflicted_state) for key, events in conflicted_state.items(): - new_state[key] = yield self._resolve_state_events(events) + new_state[key] = self._resolve_state_events(events) except: logger.exception("Failed to resolve state") raise - defer.returnValue(new_state) + defer.returnValue((None, new_state)) def _get_power_level_from_event_state(self, event, user_id): - key = (RoomPowerLevelsEvent.TYPE, "", ) - power_level_event = event.old_state_events.get(key) - level = None - if power_level_event: - level = power_level_event.content.get("users", {}).get(user_id) - if not level: - level = power_level_event.content.get("users_default", 0) - - return level + if hasattr(event, "old_state_events") and event.old_state_events: + key = (RoomPowerLevelsEvent.TYPE, "", ) + power_level_event = event.old_state_events.get(key) + level = None + if power_level_event: + level = power_level_event.content.get("users", {}).get( + user_id + ) + if not level: + level = power_level_event.content.get("users_default", 0) + + return level + else: + return 0 - @defer.inlineCallbacks @log_function def _resolve_state_events(self, events): curr_events = events @@ -177,10 +192,10 @@ class StateHandler(object): if not curr_events: raise RuntimeError("Max didn't get a max?") elif len(curr_events) == 1: - defer.returnValue(curr_events[0]) + return curr_events[0] # TODO: For now, just choose the one with the largest event_id. - defer.returnValue( + return ( sorted( curr_events, key=lambda e: hashlib.sha1( diff --git a/synapse/storage/state.py b/synapse/storage/state.py index e08acd6404..68975969f5 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -16,11 +16,6 @@ from ._base import SQLBaseStore from twisted.internet import defer -from collections import namedtuple - - -StateGroup = namedtuple("StateGroup", ("group", "state")) - class StateStore(SQLBaseStore): @@ -37,7 +32,7 @@ class StateStore(SQLBaseStore): if group: groups.add(group) - res = [] + res = {} for group in groups: state_ids = yield self._simple_select_onecol( table="state_groups_state", @@ -53,7 +48,7 @@ class StateStore(SQLBaseStore): if s: state.append(s) - res.append(StateGroup(group, state)) + res[group] = state defer.returnValue(res) -- cgit 1.5.1