From 9cb4f75d53d99634e79e791de22cb7de718248d6 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 7 Jan 2015 15:16:31 +0000 Subject: SYN-154: Better error messages when joining an unknown room by ID. The simple fix doesn't work here because room creation also involves unknown room IDs. The check relies on the presence of m.room.create for rooms being created, whereas bogus room IDs have no state events at all. --- synapse/handlers/federation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d26975a88a..d0de6fd04d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -617,8 +617,8 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def on_backfill_request(self, origin, context, pdu_list, limit): - in_room = yield self.auth.check_host_in_room(context, origin) + def on_backfill_request(self, origin, room_id, pdu_list, limit): + in_room = yield self.auth.check_host_in_room(room_id, origin) if not in_room: raise AuthError(403, "Host not in room.") -- cgit 1.4.1 From 333836ff9205a53934cf0c412b75916740e407b5 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 7 Jan 2015 16:18:12 +0000 Subject: PEP8 and pyflakes warnings --- synapse/handlers/federation.py | 2 +- synapse/handlers/room.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d0de6fd04d..195f7c618a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -623,7 +623,7 @@ class FederationHandler(BaseHandler): raise AuthError(403, "Host not in room.") events = yield self.store.get_backfill_events( - context, + room_id, pdu_list, limit ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 16c6628292..6d0db18e51 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -428,7 +428,9 @@ class RoomMemberHandler(BaseHandler): if not is_host_in_room: # is *anyone* in the room? room_member_keys = [ - v for (k,v) in context.current_state.keys() if k == "m.room.member" + v for (k, v) in context.current_state.keys() if ( + k == "m.room.member" + ) ] if len(room_member_keys) == 0: # has the room been created so we can join it? -- cgit 1.4.1 From 5fed04264056263e10b920a917a3a40f88e7e820 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 16 Jan 2015 18:59:04 +0000 Subject: Finish renaming "context" to "room_id" in federation codebase --- synapse/federation/replication.py | 94 +++++++++++++--------------------- synapse/federation/transport/client.py | 47 +++++++---------- synapse/federation/transport/server.py | 1 - synapse/handlers/_base.py | 4 +- synapse/handlers/federation.py | 10 ++-- synapse/http/matrixfederationclient.py | 1 - tests/handlers/test_room.py | 4 +- 7 files changed, 62 insertions(+), 99 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index a4c29b484b..6620532a60 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -256,23 +256,21 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def get_state_for_context(self, destination, context, event_id): - """Requests all of the `current` state PDUs for a given context from + def get_state_for_room(self, destination, room_id, event_id): + """Requests all of the `current` state PDUs for a given room from a remote home server. Args: destination (str): The remote homeserver to query for the state. - context (str): The context we're interested in. + room_id (str): The id of the room we're interested in. event_id (str): The id of the event we want the state at. Returns: Deferred: Results in a list of PDUs. """ - result = yield self.transport_layer.get_context_state( - destination, - context, - event_id=event_id, + result = yield self.transport_layer.get_room_state( + destination, room_id, event_id=event_id, ) pdus = [ @@ -288,9 +286,9 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def get_event_auth(self, destination, context, event_id): + def get_event_auth(self, destination, room_id, event_id): res = yield self.transport_layer.get_event_auth( - destination, context, event_id, + destination, room_id, event_id, ) auth_chain = [ @@ -304,9 +302,9 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_backfill_request(self, origin, context, versions, limit): + def on_backfill_request(self, origin, room_id, versions, limit): pdus = yield self.handler.on_backfill_request( - origin, context, versions, limit + origin, room_id, versions, limit ) defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) @@ -380,12 +378,10 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_context_state_request(self, origin, context, event_id): + def on_context_state_request(self, origin, room_id, event_id): if event_id: pdus = yield self.handler.get_state_for_pdu( - origin, - context, - event_id, + origin, room_id, event_id, ) auth_chain = yield self.store.get_auth_chain( [pdu.event_id for pdu in pdus] @@ -413,7 +409,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function def on_pull_request(self, origin, versions): - raise NotImplementedError("Pull transacions not implemented") + raise NotImplementedError("Pull transactions not implemented") @defer.inlineCallbacks def on_query_request(self, query_type, args): @@ -422,30 +418,21 @@ class ReplicationLayer(object): defer.returnValue((200, response)) else: defer.returnValue( - (404, "No handler for Query type '%s'" % (query_type, )) + (404, "No handler for Query type '%s'" % (query_type,)) ) @defer.inlineCallbacks - def on_make_join_request(self, context, user_id): - pdu = yield self.handler.on_make_join_request(context, user_id) + def on_make_join_request(self, room_id, user_id): + pdu = yield self.handler.on_make_join_request(room_id, user_id) time_now = self._clock.time_msec() - defer.returnValue({ - "event": pdu.get_pdu_json(time_now), - }) + defer.returnValue({"event": pdu.get_pdu_json(time_now)}) @defer.inlineCallbacks def on_invite_request(self, origin, content): pdu = self.event_from_pdu_json(content) ret_pdu = yield self.handler.on_invite_request(origin, pdu) time_now = self._clock.time_msec() - defer.returnValue( - ( - 200, - { - "event": ret_pdu.get_pdu_json(time_now), - } - ) - ) + defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)})) @defer.inlineCallbacks def on_send_join_request(self, origin, content): @@ -462,26 +449,17 @@ class ReplicationLayer(object): })) @defer.inlineCallbacks - def on_event_auth(self, origin, context, event_id): + def on_event_auth(self, origin, room_id, event_id): time_now = self._clock.time_msec() auth_pdus = yield self.handler.on_event_auth(event_id) - defer.returnValue( - ( - 200, - { - "auth_chain": [ - a.get_pdu_json(time_now) for a in auth_pdus - ], - } - ) - ) + defer.returnValue((200, { + "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], + })) @defer.inlineCallbacks - def make_join(self, destination, context, user_id): + def make_join(self, destination, room_id, user_id): ret = yield self.transport_layer.make_join( - destination=destination, - context=context, - user_id=user_id, + destination, room_id, user_id ) pdu_dict = ret["event"] @@ -494,10 +472,10 @@ class ReplicationLayer(object): def send_join(self, destination, pdu): time_now = self._clock.time_msec() _, content = yield self.transport_layer.send_join( - destination, - pdu.room_id, - pdu.event_id, - pdu.get_pdu_json(time_now), + destination=destination, + room_id=pdu.room_id, + event_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), ) logger.debug("Got content: %s", content) @@ -507,9 +485,6 @@ class ReplicationLayer(object): for p in content.get("state", []) ] - # FIXME: We probably want to do something with the auth_chain given - # to us - auth_chain = [ self.event_from_pdu_json(p, outlier=True) for p in content.get("auth_chain", []) @@ -523,11 +498,11 @@ class ReplicationLayer(object): }) @defer.inlineCallbacks - def send_invite(self, destination, context, event_id, pdu): + def send_invite(self, destination, room_id, event_id, pdu): time_now = self._clock.time_msec() code, content = yield self.transport_layer.send_invite( destination=destination, - context=context, + room_id=room_id, event_id=event_id, content=pdu.get_pdu_json(time_now), ) @@ -657,7 +632,7 @@ class ReplicationLayer(object): "_handle_new_pdu getting state for %s", pdu.room_id ) - state, auth_chain = yield self.get_state_for_context( + state, auth_chain = yield self.get_state_for_room( origin, pdu.room_id, pdu.event_id, ) @@ -816,7 +791,7 @@ class _TransactionQueue(object): logger.info("TX [%s] is ready for retry", destination) logger.info("TX [%s] _attempt_new_transaction", destination) - + if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending # request at which point pending_pdus_by_dest just keeps growing. @@ -830,14 +805,15 @@ class _TransactionQueue(object): pending_failures = self.pending_failures_by_dest.pop(destination, []) if pending_pdus: - logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d", destination, len(pending_pdus)) + logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d", + destination, len(pending_pdus)) if not pending_pdus and not pending_edus and not pending_failures: return logger.debug( - "TX [%s] Attempting new transaction " - "(pdus: %d, edus: %d, failures: %d)", + "TX [%s] Attempting new transaction" + " (pdus: %d, edus: %d, failures: %d)", destination, len(pending_pdus), len(pending_edus), diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index d61ff192eb..e634a3a213 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -29,9 +29,9 @@ class TransportLayerClient(object): """Sends federation HTTP requests to other servers""" @log_function - def get_context_state(self, destination, context, event_id): - """ Requests all state for a given context (i.e. room) from the - given server. + def get_room_state(self, destination, room_id, event_id): + """ Requests all state for a given room from the given server at the + given event. Args: destination (str): The host name of the remote home server we want @@ -42,10 +42,10 @@ class TransportLayerClient(object): Returns: Deferred: Results in a dict received from the remote homeserver. """ - logger.debug("get_context_state dest=%s, context=%s", - destination, context) + logger.debug("get_room_state dest=%s, room=%s", + destination, room_id) - path = PREFIX + "/state/%s/" % context + path = PREFIX + "/state/%s/" % room_id return self.client.get_json( destination, path=path, args={"event_id": event_id}, ) @@ -69,13 +69,13 @@ class TransportLayerClient(object): return self.client.get_json(destination, path=path) @log_function - def backfill(self, dest, context, event_tuples, limit): + def backfill(self, destination, room_id, event_tuples, limit): """ Requests `limit` previous PDUs in a given context before list of PDUs. Args: dest (str) - context (str) + room_id (str) event_tuples (list) limt (int) @@ -83,15 +83,15 @@ class TransportLayerClient(object): Deferred: Results in a dict received from the remote homeserver. """ logger.debug( - "backfill dest=%s, context=%s, event_tuples=%s, limit=%s", - dest, context, repr(event_tuples), str(limit) + "backfill dest=%s, room_id=%s, event_tuples=%s, limit=%s", + destination, room_id, repr(event_tuples), str(limit) ) if not event_tuples: # TODO: raise? return - path = PREFIX + "/backfill/%s/" % (context,) + path = PREFIX + "/backfill/%s/" % (room_id,) args = { "v": event_tuples, @@ -159,8 +159,8 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def make_join(self, destination, context, user_id, retry_on_dns_fail=True): - path = PREFIX + "/make_join/%s/%s" % (context, user_id,) + def make_join(self, destination, room_id, user_id, retry_on_dns_fail=True): + path = PREFIX + "/make_join/%s/%s" % (room_id, user_id) response = yield self.client.get_json( destination=destination, @@ -172,11 +172,8 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def send_join(self, destination, context, event_id, content): - path = PREFIX + "/send_join/%s/%s" % ( - context, - event_id, - ) + def send_join(self, destination, room_id, event_id, content): + path = PREFIX + "/send_join/%s/%s" % (room_id, event_id) code, content = yield self.client.put_json( destination=destination, @@ -191,11 +188,8 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def send_invite(self, destination, context, event_id, content): - path = PREFIX + "/invite/%s/%s" % ( - context, - event_id, - ) + def send_invite(self, destination, room_id, event_id, content): + path = PREFIX + "/invite/%s/%s" % (room_id, event_id) code, content = yield self.client.put_json( destination=destination, @@ -210,11 +204,8 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def get_event_auth(self, destination, context, event_id): - path = PREFIX + "/event_auth/%s/%s" % ( - context, - event_id, - ) + def get_event_auth(self, destination, room_id, event_id): + path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id) response = yield self.client.get_json( destination=destination, diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 34b50def7d..a380a6910b 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -285,7 +285,6 @@ class TransportLayerServer(object): defer.returnValue((code, response)) - @log_function def _on_backfill_request(self, origin, context, v_list, limits): if not limits: diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 38af034b4d..f33d17a31e 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -144,7 +144,5 @@ class BaseHandler(object): yield self.notifier.on_new_room_event(event, extra_users=extra_users) yield federation_handler.handle_new_event( - event, - None, - destinations=destinations, + event, destinations=destinations, ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 195f7c618a..81203bf1a3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -75,14 +75,14 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def handle_new_event(self, event, snapshot, destinations): + def handle_new_event(self, event, destinations): """ Takes in an event from the client to server side, that has already been authed and handled by the state module, and sends it to any remote home servers that may be interested. Args: - event - snapshot (.storage.Snapshot): THe snapshot the event happened after + event: The event to send + destinations: A list of destinations to send it to Returns: Deferred: Resolved when it has successfully been queued for @@ -154,7 +154,7 @@ class FederationHandler(BaseHandler): replication = self.replication_layer if not state: - state, auth_chain = yield replication.get_state_for_context( + state, auth_chain = yield replication.get_state_for_room( origin, context=event.room_id, event_id=event.event_id, ) @@ -281,7 +281,7 @@ class FederationHandler(BaseHandler): """ pdu = yield self.replication_layer.send_invite( destination=target_host, - context=event.room_id, + room_id=event.room_id, event_id=event.event_id, pdu=event ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index aa14782b0f..1dda3ba2c7 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -72,7 +72,6 @@ class MatrixFederationHttpClient(object): requests. """ - def __init__(self, hs): self.hs = hs self.signing_key = hs.config.signing_key[0] diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index 0cb8aa4fbc..d3253b48b8 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -223,7 +223,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): yield room_handler.change_membership(event, context) self.federation.handle_new_event.assert_called_once_with( - event, None, destinations=set() + event, destinations=set() ) self.datastore.persist_event.assert_called_once_with( @@ -301,7 +301,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): yield room_handler.change_membership(event, context) self.federation.handle_new_event.assert_called_once_with( - event, None, destinations=set(['red']) + event, destinations=set(['red']) ) self.datastore.persist_event.assert_called_once_with( -- cgit 1.4.1 From 5759bec43cb52862a8d455afb8cd9d1c5660bc3d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 23 Jan 2015 11:47:15 +0000 Subject: Replace hs.parse_userid with UserID.from_string --- synapse/api/auth.py | 9 +++++---- synapse/handlers/_base.py | 5 +++-- synapse/handlers/events.py | 3 ++- synapse/handlers/federation.py | 13 +++++++------ synapse/handlers/message.py | 13 +++++++------ synapse/handlers/presence.py | 23 ++++++++++++----------- synapse/handlers/profile.py | 3 ++- synapse/handlers/room.py | 14 +++++++------- synapse/handlers/typing.py | 3 ++- synapse/rest/client/v1/admin.py | 4 +++- synapse/rest/client/v1/presence.py | 15 ++++++++------- synapse/rest/client/v1/profile.py | 13 +++++++------ synapse/rest/client/v1/room.py | 5 +++-- synapse/server.py | 6 ------ synapse/storage/roommember.py | 5 +++-- tests/handlers/test_presence.py | 29 +++++++++++++++-------------- tests/handlers/test_presencelike.py | 9 +++++---- tests/handlers/test_profile.py | 8 ++++---- tests/handlers/test_room.py | 9 +++++---- tests/handlers/test_typing.py | 7 ++++--- tests/rest/client/v1/test_presence.py | 19 ++++++++++--------- tests/rest/client/v1/test_profile.py | 3 ++- tests/rest/client/v1/test_rooms.py | 21 +++++++++------------ tests/rest/client/v1/test_typing.py | 5 +++-- tests/storage/test_presence.py | 5 +++-- tests/storage/test_profile.py | 3 ++- tests/storage/test_redaction.py | 5 +++-- tests/storage/test_room.py | 3 ++- tests/storage/test_roommember.py | 7 ++++--- tests/storage/test_stream.py | 5 +++-- tests/test_types.py | 6 ------ 31 files changed, 145 insertions(+), 133 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index e31482cfaa..a342a0e0da 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -21,6 +21,7 @@ from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import AuthError, StoreError, Codes, SynapseError from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor +from synapse.types import UserID import logging @@ -104,7 +105,7 @@ class Auth(object): for event in curr_state: if event.type == EventTypes.Member: try: - if self.hs.parse_userid(event.state_key).domain != host: + if UserID.from_string(event.state_key).domain != host: continue except: logger.warn("state_key not user_id: %s", event.state_key) @@ -337,7 +338,7 @@ class Auth(object): user_info = { "admin": bool(ret.get("admin", False)), "device_id": ret.get("device_id"), - "user": self.hs.parse_userid(ret.get("name")), + "user": UserID.from_string(ret.get("name")), } defer.returnValue(user_info) @@ -461,7 +462,7 @@ class Auth(object): "You are not allowed to set others state" ) else: - sender_domain = self.hs.parse_userid( + sender_domain = UserID.from_string( event.user_id ).domain @@ -496,7 +497,7 @@ class Auth(object): # Validate users for k, v in user_list.items(): try: - self.hs.parse_userid(k) + UserID.from_string(k) except: raise SynapseError(400, "Not a valid user_id: %s" % (k,)) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index f33d17a31e..1773fa20aa 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -19,6 +19,7 @@ from synapse.api.errors import LimitExceededError, SynapseError from synapse.util.async import run_on_reactor from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.api.constants import Membership, EventTypes +from synapse.types import UserID import logging @@ -113,7 +114,7 @@ class BaseHandler(object): if event.type == EventTypes.Member: if event.content["membership"] == Membership.INVITE: - invitee = self.hs.parse_userid(event.state_key) + invitee = UserID.from_string(event.state_key) if not self.hs.is_mine(invitee): # TODO: Can we add signature from remote server in a nicer # way? If we have been invited by a remote server, we need @@ -134,7 +135,7 @@ class BaseHandler(object): if k[0] == EventTypes.Member: if s.content["membership"] == Membership.JOIN: destinations.add( - self.hs.parse_userid(s.state_key).domain + UserID.from_string(s.state_key).domain ) except SynapseError: logger.warn( diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 103bc67c42..01e67b0818 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function +from synapse.types import UserID from ._base import BaseHandler @@ -48,7 +49,7 @@ class EventStreamHandler(BaseHandler): @log_function def get_stream(self, auth_user_id, pagin_config, timeout=0, as_client_event=True): - auth_user = self.hs.parse_userid(auth_user_id) + auth_user = UserID.from_string(auth_user_id) try: if auth_user not in self._streams_per_user: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 81203bf1a3..bcdcc90a18 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -28,6 +28,7 @@ from synapse.crypto.event_signing import ( compute_event_signature, check_event_content_hash, add_hashes_and_signatures, ) +from synapse.types import UserID from syutil.jsonutil import encode_canonical_json from twisted.internet import defer @@ -227,7 +228,7 @@ class FederationHandler(BaseHandler): extra_users = [] if event.type == EventTypes.Member: target_user_id = event.state_key - target_user = self.hs.parse_userid(target_user_id) + target_user = UserID.from_string(target_user_id) extra_users.append(target_user) yield self.notifier.on_new_room_event( @@ -236,7 +237,7 @@ class FederationHandler(BaseHandler): if event.type == EventTypes.Member: if event.membership == Membership.JOIN: - user = self.hs.parse_userid(event.state_key) + user = UserID.from_string(event.state_key) yield self.distributor.fire( "user_joined_room", user=user, room_id=event.room_id ) @@ -491,7 +492,7 @@ class FederationHandler(BaseHandler): extra_users = [] if event.type == EventTypes.Member: target_user_id = event.state_key - target_user = self.hs.parse_userid(target_user_id) + target_user = UserID.from_string(target_user_id) extra_users.append(target_user) yield self.notifier.on_new_room_event( @@ -500,7 +501,7 @@ class FederationHandler(BaseHandler): if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: - user = self.hs.parse_userid(event.state_key) + user = UserID.from_string(event.state_key) yield self.distributor.fire( "user_joined_room", user=user, room_id=event.room_id ) @@ -514,7 +515,7 @@ class FederationHandler(BaseHandler): if k[0] == EventTypes.Member: if s.content["membership"] == Membership.JOIN: destinations.add( - self.hs.parse_userid(s.state_key).domain + UserID.from_string(s.state_key).domain ) except: logger.warn( @@ -565,7 +566,7 @@ class FederationHandler(BaseHandler): backfilled=False, ) - target_user = self.hs.parse_userid(event.state_key) + target_user = UserID.from_string(event.state_key) yield self.notifier.on_new_room_event( event, extra_users=[target_user], ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index f2a2f16933..6a1104a890 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -20,6 +20,7 @@ from synapse.api.errors import RoomError from synapse.streams.config import PaginationConfig from synapse.events.validator import EventValidator from synapse.util.logcontext import PreserveLoggingContext +from synapse.types import UserID from ._base import BaseHandler @@ -89,7 +90,7 @@ class MessageHandler(BaseHandler): yield self.hs.get_event_sources().get_current_token() ) - user = self.hs.parse_userid(user_id) + user = UserID.from_string(user_id) events, next_key = yield data_source.get_pagination_rows( user, pagin_config.get_source_config("room"), room_id @@ -130,13 +131,13 @@ class MessageHandler(BaseHandler): if ratelimit: self.ratelimit(builder.user_id) # TODO(paul): Why does 'event' not have a 'user' object? - user = self.hs.parse_userid(builder.user_id) + user = UserID.from_string(builder.user_id) assert self.hs.is_mine(user), "User must be our own: %s" % (user,) if builder.type == EventTypes.Member: membership = builder.content.get("membership", None) if membership == Membership.JOIN: - joinee = self.hs.parse_userid(builder.state_key) + joinee = UserID.from_string(builder.state_key) # If event doesn't include a display name, add one. yield self.distributor.fire( "collect_presencelike_data", @@ -237,7 +238,7 @@ class MessageHandler(BaseHandler): membership_list=[Membership.INVITE, Membership.JOIN] ) - user = self.hs.parse_userid(user_id) + user = UserID.from_string(user_id) rooms_ret = [] @@ -316,7 +317,7 @@ class MessageHandler(BaseHandler): # TODO(paul): I wish I was called with user objects not user_id # strings... - auth_user = self.hs.parse_userid(user_id) + auth_user = UserID.from_string(user_id) # TODO: These concurrently state_tuples = yield self.state_handler.get_current_state(room_id) @@ -349,7 +350,7 @@ class MessageHandler(BaseHandler): for m in room_members: try: member_presence = yield presence_handler.get_state( - target_user=self.hs.parse_userid(m.user_id), + target_user=UserID.from_string(m.user_id), auth_user=auth_user, as_event=True, ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 8aeed99274..d66bfea7b1 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -20,6 +20,7 @@ from synapse.api.constants import PresenceState from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext +from synapse.types import UserID from ._base import BaseHandler @@ -96,22 +97,22 @@ class PresenceHandler(BaseHandler): self.federation.register_edu_handler( "m.presence_invite", lambda origin, content: self.invite_presence( - observed_user=hs.parse_userid(content["observed_user"]), - observer_user=hs.parse_userid(content["observer_user"]), + observed_user=UserID.from_string(content["observed_user"]), + observer_user=UserID.from_string(content["observer_user"]), ) ) self.federation.register_edu_handler( "m.presence_accept", lambda origin, content: self.accept_presence( - observed_user=hs.parse_userid(content["observed_user"]), - observer_user=hs.parse_userid(content["observer_user"]), + observed_user=UserID.from_string(content["observed_user"]), + observer_user=UserID.from_string(content["observer_user"]), ) ) self.federation.register_edu_handler( "m.presence_deny", lambda origin, content: self.deny_presence( - observed_user=hs.parse_userid(content["observed_user"]), - observer_user=hs.parse_userid(content["observer_user"]), + observed_user=UserID.from_string(content["observed_user"]), + observer_user=UserID.from_string(content["observer_user"]), ) ) @@ -418,7 +419,7 @@ class PresenceHandler(BaseHandler): ) for p in presence: - observed_user = self.hs.parse_userid(p.pop("observed_user_id")) + observed_user = UserID.from_string(p.pop("observed_user_id")) p["observed_user"] = observed_user p.update(self._get_or_offline_usercache(observed_user).get_state()) if "last_active" in p: @@ -441,7 +442,7 @@ class PresenceHandler(BaseHandler): user.localpart, accepted=True ) target_users = set([ - self.hs.parse_userid(x["observed_user_id"]) for x in presence + UserID.from_string(x["observed_user_id"]) for x in presence ]) # Also include people in all my rooms @@ -646,7 +647,7 @@ class PresenceHandler(BaseHandler): deferreds = [] for push in content.get("push", []): - user = self.hs.parse_userid(push["user_id"]) + user = UserID.from_string(push["user_id"]) logger.debug("Incoming presence update from %s", user) @@ -694,7 +695,7 @@ class PresenceHandler(BaseHandler): del self._user_cachemap[user] for poll in content.get("poll", []): - user = self.hs.parse_userid(poll) + user = UserID.from_string(poll) if not self.hs.is_mine(user): continue @@ -709,7 +710,7 @@ class PresenceHandler(BaseHandler): deferreds.append(self._push_presence_remote(user, origin)) for unpoll in content.get("unpoll", []): - user = self.hs.parse_userid(unpoll) + user = UserID.from_string(unpoll) if not self.hs.is_mine(user): continue diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 7777d3cc94..03b2159c53 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.api.constants import EventTypes, Membership from synapse.util.logcontext import PreserveLoggingContext +from synapse.types import UserID from ._base import BaseHandler @@ -169,7 +170,7 @@ class ProfileHandler(BaseHandler): @defer.inlineCallbacks def on_profile_query(self, args): - user = self.hs.parse_userid(args["user_id"]) + user = UserID.from_string(args["user_id"]) if not self.hs.is_mine(user): raise SynapseError(400, "User is not hosted on this Home Server") diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6d0db18e51..0242288c4e 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -64,7 +64,7 @@ class RoomCreationHandler(BaseHandler): invite_list = config.get("invite", []) for i in invite_list: try: - self.hs.parse_userid(i) + UserID.from_string(i) except: raise SynapseError(400, "Invalid user_id: %s" % (i,)) @@ -114,7 +114,7 @@ class RoomCreationHandler(BaseHandler): servers=[self.hs.hostname], ) - user = self.hs.parse_userid(user_id) + user = UserID.from_string(user_id) creation_events = self._create_events_for_new_room( user, room_id, is_public=is_public ) @@ -250,7 +250,7 @@ class RoomMemberHandler(BaseHandler): users = yield self.store.get_users_in_room(room_id) - defer.returnValue([hs.parse_userid(u) for u in users]) + defer.returnValue([UserID.from_string(u) for u in users]) @defer.inlineCallbacks def fetch_room_distributions_into(self, room_id, localusers=None, @@ -368,7 +368,7 @@ class RoomMemberHandler(BaseHandler): ) if prev_state and prev_state.membership == Membership.JOIN: - user = self.hs.parse_userid(event.user_id) + user = UserID.from_string(event.user_id) self.distributor.fire( "user_left_room", user=user, room_id=event.room_id ) @@ -412,7 +412,7 @@ class RoomMemberHandler(BaseHandler): @defer.inlineCallbacks def _do_join(self, event, context, room_host=None, do_auth=True): - joinee = self.hs.parse_userid(event.state_key) + joinee = UserID.from_string(event.state_key) # room_id = RoomID.from_string(event.room_id, self.hs) room_id = event.room_id @@ -476,7 +476,7 @@ class RoomMemberHandler(BaseHandler): do_auth=do_auth, ) - user = self.hs.parse_userid(event.user_id) + user = UserID.from_string(event.user_id) yield self.distributor.fire( "user_joined_room", user=user, room_id=room_id ) @@ -526,7 +526,7 @@ class RoomMemberHandler(BaseHandler): do_auth): yield run_on_reactor() - target_user = self.hs.parse_userid(event.state_key) + target_user = UserID.from_string(event.state_key) yield self.handle_new_client_event( event, diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index cd9638dd04..c69787005f 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -18,6 +18,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.errors import SynapseError, AuthError +from synapse.types import UserID import logging @@ -185,7 +186,7 @@ class TypingNotificationHandler(BaseHandler): @defer.inlineCallbacks def _recv_edu(self, origin, content): room_id = content["room_id"] - user = self.homeserver.parse_userid(content["user_id"]) + user = UserID.from_string(content["user_id"]) localusers = set() diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 0aa83514c8..4aefb94053 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -16,6 +16,8 @@ from twisted.internet import defer from synapse.api.errors import AuthError, SynapseError +from synapse.types import UserID + from base import RestServlet, client_path_pattern import logging @@ -28,7 +30,7 @@ class WhoisRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, user_id): - target_user = self.hs.parse_userid(user_id) + target_user = UserID.from_string(user_id) auth_user = yield self.auth.get_user_by_req(request) is_admin = yield self.auth.is_server_admin(auth_user) diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index ca4d2d21f0..22fcb7d7d0 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -18,7 +18,8 @@ from twisted.internet import defer from synapse.api.errors import SynapseError -from base import RestServlet, client_path_pattern +from synapse.types import UserID +from .base import RestServlet, client_path_pattern import json import logging @@ -32,7 +33,7 @@ class PresenceStatusRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, user_id): auth_user = yield self.auth.get_user_by_req(request) - user = self.hs.parse_userid(user_id) + user = UserID.from_string(user_id) state = yield self.handlers.presence_handler.get_state( target_user=user, auth_user=auth_user) @@ -42,7 +43,7 @@ class PresenceStatusRestServlet(RestServlet): @defer.inlineCallbacks def on_PUT(self, request, user_id): auth_user = yield self.auth.get_user_by_req(request) - user = self.hs.parse_userid(user_id) + user = UserID.from_string(user_id) state = {} try: @@ -77,7 +78,7 @@ class PresenceListRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, user_id): auth_user = yield self.auth.get_user_by_req(request) - user = self.hs.parse_userid(user_id) + user = UserID.from_string(user_id) if not self.hs.is_mine(user): raise SynapseError(400, "User not hosted on this Home Server") @@ -97,7 +98,7 @@ class PresenceListRestServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request, user_id): auth_user = yield self.auth.get_user_by_req(request) - user = self.hs.parse_userid(user_id) + user = UserID.from_string(user_id) if not self.hs.is_mine(user): raise SynapseError(400, "User not hosted on this Home Server") @@ -118,7 +119,7 @@ class PresenceListRestServlet(RestServlet): raise SynapseError(400, "Bad invite value.") if len(u) == 0: continue - invited_user = self.hs.parse_userid(u) + invited_user = UserID.from_string(u) yield self.handlers.presence_handler.send_invite( observer_user=user, observed_user=invited_user ) @@ -129,7 +130,7 @@ class PresenceListRestServlet(RestServlet): raise SynapseError(400, "Bad drop value.") if len(u) == 0: continue - dropped_user = self.hs.parse_userid(u) + dropped_user = UserID.from_string(u) yield self.handlers.presence_handler.drop( observer_user=user, observed_user=dropped_user ) diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py index dc6eb424b0..39297930c8 100644 --- a/synapse/rest/client/v1/profile.py +++ b/synapse/rest/client/v1/profile.py @@ -16,7 +16,8 @@ """ This module contains REST servlets to do with profile: /profile/ """ from twisted.internet import defer -from base import RestServlet, client_path_pattern +from .base import RestServlet, client_path_pattern +from synapse.types import UserID import json @@ -26,7 +27,7 @@ class ProfileDisplaynameRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, user_id): - user = self.hs.parse_userid(user_id) + user = UserID.from_string(user_id) displayname = yield self.handlers.profile_handler.get_displayname( user, @@ -37,7 +38,7 @@ class ProfileDisplaynameRestServlet(RestServlet): @defer.inlineCallbacks def on_PUT(self, request, user_id): auth_user = yield self.auth.get_user_by_req(request) - user = self.hs.parse_userid(user_id) + user = UserID.from_string(user_id) try: content = json.loads(request.content.read()) @@ -59,7 +60,7 @@ class ProfileAvatarURLRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, user_id): - user = self.hs.parse_userid(user_id) + user = UserID.from_string(user_id) avatar_url = yield self.handlers.profile_handler.get_avatar_url( user, @@ -70,7 +71,7 @@ class ProfileAvatarURLRestServlet(RestServlet): @defer.inlineCallbacks def on_PUT(self, request, user_id): auth_user = yield self.auth.get_user_by_req(request) - user = self.hs.parse_userid(user_id) + user = UserID.from_string(user_id) try: content = json.loads(request.content.read()) @@ -92,7 +93,7 @@ class ProfileRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, user_id): - user = self.hs.parse_userid(user_id) + user = UserID.from_string(user_id) displayname = yield self.handlers.profile_handler.get_displayname( user, diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 48bba2a5f3..c5837b3403 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -20,6 +20,7 @@ from base import RestServlet, client_path_pattern from synapse.api.errors import SynapseError, Codes from synapse.streams.config import PaginationConfig from synapse.api.constants import EventTypes, Membership +from synapse.types import UserID import json import logging @@ -289,7 +290,7 @@ class RoomMemberListRestServlet(RestServlet): for event in members["chunk"]: # FIXME: should probably be state_key here, not user_id - target_user = self.hs.parse_userid(event["user_id"]) + target_user = UserID.from_string(event["user_id"]) # Presence is an optional cache; don't fail if we can't fetch it try: presence_handler = self.handlers.presence_handler @@ -478,7 +479,7 @@ class RoomTypingRestServlet(RestServlet): auth_user = yield self.auth.get_user_by_req(request) room_id = urllib.unquote(room_id) - target_user = self.hs.parse_userid(urllib.unquote(user_id)) + target_user = UserID.from_string(urllib.unquote(user_id)) content = _parse_json(request) diff --git a/synapse/server.py b/synapse/server.py index 476d809374..52a21aaf78 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -127,12 +127,6 @@ class BaseHomeServer(object): # TODO: Why are these parse_ methods so high up along with other globals? # Surely these should be in a util package or in the api package? - # Other utility methods - def parse_userid(self, s): - """Parse the string given by 's' as a User ID and return a UserID - object.""" - return UserID.from_string(s) - def parse_roomalias(self, s): """Parse the string given by 's' as a Room Alias and return a RoomAlias object.""" diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index e59e65529b..c69dd995ce 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -20,6 +20,7 @@ from collections import namedtuple from ._base import SQLBaseStore from synapse.api.constants import Membership +from synapse.types import UserID import logging @@ -39,7 +40,7 @@ class RoomMemberStore(SQLBaseStore): """ try: target_user_id = event.state_key - domain = self.hs.parse_userid(target_user_id).domain + domain = UserID.from_string(target_user_id).domain except: logger.exception( "Failed to parse target_user_id=%s", target_user_id @@ -84,7 +85,7 @@ class RoomMemberStore(SQLBaseStore): for e in member_events: try: joined_domains.add( - self.hs.parse_userid(e.state_key).domain + UserID.from_string(e.state_key).domain ) except: # FIXME: How do we deal with invalid user ids in the db? diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 56e90177f1..5621a8afaf 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -17,7 +17,7 @@ from tests import unittest from twisted.internet import defer, reactor -from mock import Mock, call, ANY, NonCallableMock, patch +from mock import Mock, call, ANY, NonCallableMock import json from tests.utils import ( @@ -31,6 +31,7 @@ from synapse.api.errors import SynapseError from synapse.handlers.presence import PresenceHandler, UserPresenceCache from synapse.streams.config import SourcePaginationConfig from synapse.storage.transactions import DestinationsTable +from synapse.types import UserID OFFLINE = PresenceState.OFFLINE UNAVAILABLE = PresenceState.UNAVAILABLE @@ -170,9 +171,9 @@ class PresenceTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp_users(self, hs): # Some local users to test with - self.u_apple = hs.parse_userid("@apple:test") - self.u_banana = hs.parse_userid("@banana:test") - self.u_clementine = hs.parse_userid("@clementine:test") + self.u_apple = UserID.from_string("@apple:test") + self.u_banana = UserID.from_string("@banana:test") + self.u_clementine = UserID.from_string("@clementine:test") for u in self.u_apple, self.u_banana, self.u_clementine: yield self.datastore.create_presence(u.localpart) @@ -182,10 +183,10 @@ class PresenceTestCase(unittest.TestCase): ) # ID of a local user that does not exist - self.u_durian = hs.parse_userid("@durian:test") + self.u_durian = UserID.from_string("@durian:test") # A remote user - self.u_cabbage = hs.parse_userid("@cabbage:elsewhere") + self.u_cabbage = UserID.from_string("@cabbage:elsewhere") class MockedDatastorePresenceTestCase(PresenceTestCase): @@ -250,16 +251,16 @@ class MockedDatastorePresenceTestCase(PresenceTestCase): @defer.inlineCallbacks def setUp_users(self, hs): # Some local users to test with - self.u_apple = hs.parse_userid("@apple:test") - self.u_banana = hs.parse_userid("@banana:test") - self.u_clementine = hs.parse_userid("@clementine:test") - self.u_durian = hs.parse_userid("@durian:test") - self.u_elderberry = hs.parse_userid("@elderberry:test") - self.u_fig = hs.parse_userid("@fig:test") + self.u_apple = UserID.from_string("@apple:test") + self.u_banana = UserID.from_string("@banana:test") + self.u_clementine = UserID.from_string("@clementine:test") + self.u_durian = UserID.from_string("@durian:test") + self.u_elderberry = UserID.from_string("@elderberry:test") + self.u_fig = UserID.from_string("@fig:test") # Remote user - self.u_onion = hs.parse_userid("@onion:farm") - self.u_potato = hs.parse_userid("@potato:remote") + self.u_onion = UserID.from_string("@onion:farm") + self.u_potato = UserID.from_string("@potato:remote") yield diff --git a/tests/handlers/test_presencelike.py b/tests/handlers/test_presencelike.py index 0584e4c8b9..3cdbb186ae 100644 --- a/tests/handlers/test_presencelike.py +++ b/tests/handlers/test_presencelike.py @@ -27,6 +27,7 @@ from synapse.server import HomeServer from synapse.api.constants import PresenceState from synapse.handlers.presence import PresenceHandler from synapse.handlers.profile import ProfileHandler +from synapse.types import UserID OFFLINE = PresenceState.OFFLINE @@ -136,12 +137,12 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): lambda u: defer.succeed([])) # Some local users to test with - self.u_apple = hs.parse_userid("@apple:test") - self.u_banana = hs.parse_userid("@banana:test") - self.u_clementine = hs.parse_userid("@clementine:test") + self.u_apple = UserID.from_string("@apple:test") + self.u_banana = UserID.from_string("@banana:test") + self.u_clementine = UserID.from_string("@clementine:test") # Remote user - self.u_potato = hs.parse_userid("@potato:remote") + self.u_potato = UserID.from_string("@potato:remote") self.mock_get_joined = ( self.datastore.get_rooms_for_user_where_membership_is diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 25b172aa5e..7b9590c110 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -22,7 +22,7 @@ from mock import Mock, NonCallableMock from synapse.api.errors import AuthError from synapse.server import HomeServer from synapse.handlers.profile import ProfileHandler -from synapse.api.constants import Membership +from synapse.types import UserID from tests.utils import SQLiteMemoryDbPool, MockKey @@ -71,9 +71,9 @@ class ProfileTestCase(unittest.TestCase): self.store = hs.get_datastore() - self.frank = hs.parse_userid("@1234ABCD:test") - self.bob = hs.parse_userid("@4567:test") - self.alice = hs.parse_userid("@alice:remote") + self.frank = UserID.from_string("@1234ABCD:test") + self.bob = UserID.from_string("@4567:test") + self.alice = UserID.from_string("@alice:remote") yield self.store.create_profile(self.frank.localpart) diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index d3253b48b8..9a23b3812d 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -15,12 +15,13 @@ from twisted.internet import defer -from tests import unittest +from .. import unittest from synapse.api.constants import EventTypes, Membership from synapse.handlers.room import RoomMemberHandler, RoomCreationHandler from synapse.handlers.profile import ProfileHandler from synapse.server import HomeServer +from synapse.types import UserID from ..utils import MockKey from mock import Mock, NonCallableMock @@ -164,7 +165,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): event, context=context, ) self.notifier.on_new_room_event.assert_called_once_with( - event, extra_users=[self.hs.parse_userid(target_user_id)] + event, extra_users=[UserID.from_string(target_user_id)] ) self.assertFalse(self.datastore.get_room.called) self.assertFalse(self.datastore.store_room.called) @@ -174,7 +175,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): def test_simple_join(self): room_id = "!foo:red" user_id = "@bob:red" - user = self.hs.parse_userid(user_id) + user = UserID.from_string(user_id) join_signal_observer = Mock() self.distributor.observe("user_joined_room", join_signal_observer) @@ -252,7 +253,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): def test_simple_leave(self): room_id = "!foo:red" user_id = "@bob:red" - user = self.hs.parse_userid(user_id) + user = UserID.from_string(user_id) builder = self.hs.get_event_builder_factory().new({ "type": EventTypes.Member, diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 6a498b23a4..8a7fc028d1 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -27,6 +27,7 @@ from synapse.server import HomeServer from synapse.handlers.typing import TypingNotificationHandler from synapse.storage.transactions import DestinationsTable +from synapse.types import UserID def _expect_edu(destination, edu_type, content, origin="test"): @@ -153,11 +154,11 @@ class TypingNotificationsTestCase(unittest.TestCase): self.auth.check_joined_room = check_joined_room # Some local users to test with - self.u_apple = hs.parse_userid("@apple:test") - self.u_banana = hs.parse_userid("@banana:test") + self.u_apple = UserID.from_string("@apple:test") + self.u_banana = UserID.from_string("@banana:test") # Remote user - self.u_onion = hs.parse_userid("@onion:farm") + self.u_onion = UserID.from_string("@onion:farm") @defer.inlineCallbacks def test_started_typing_local(self): diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 783720ac29..65d5cc4916 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -27,6 +27,7 @@ from synapse.handlers.presence import PresenceHandler from synapse.server import HomeServer from synapse.rest.client.v1 import presence from synapse.rest.client.v1 import events +from synapse.types import UserID OFFLINE = PresenceState.OFFLINE @@ -71,7 +72,7 @@ class PresenceStateTestCase(unittest.TestCase): def _get_user_by_token(token=None): return { - "user": hs.parse_userid(myid), + "user": UserID.from_string(myid), "admin": False, "device_id": None, } @@ -90,7 +91,7 @@ class PresenceStateTestCase(unittest.TestCase): presence.register_servlets(hs, self.mock_resource) - self.u_apple = hs.parse_userid(myid) + self.u_apple = UserID.from_string(myid) @defer.inlineCallbacks def test_get_my_status(self): @@ -161,12 +162,12 @@ class PresenceListTestCase(unittest.TestCase): def _get_user_by_token(token=None): return { - "user": hs.parse_userid(myid), + "user": UserID.from_string(myid), "admin": False, "device_id": None, } - room_member_handler = hs.handlers.room_member_handler = Mock( + hs.handlers.room_member_handler = Mock( spec=[ "get_rooms_for_user", ] @@ -176,8 +177,8 @@ class PresenceListTestCase(unittest.TestCase): presence.register_servlets(hs, self.mock_resource) - self.u_apple = hs.parse_userid("@apple:test") - self.u_banana = hs.parse_userid("@banana:test") + self.u_apple = UserID.from_string("@apple:test") + self.u_banana = UserID.from_string("@banana:test") @defer.inlineCallbacks def test_get_my_list(self): @@ -281,7 +282,7 @@ class PresenceEventStreamTestCase(unittest.TestCase): hs.get_clock().time_msec.return_value = 1000000 def _get_user_by_req(req=None): - return hs.parse_userid(myid) + return UserID.from_string(myid) hs.get_auth().get_user_by_req = _get_user_by_req @@ -322,8 +323,8 @@ class PresenceEventStreamTestCase(unittest.TestCase): self.presence = hs.get_handlers().presence_handler - self.u_apple = hs.parse_userid("@apple:test") - self.u_banana = hs.parse_userid("@banana:test") + self.u_apple = UserID.from_string("@apple:test") + self.u_banana = UserID.from_string("@banana:test") @defer.inlineCallbacks def test_shortpoll(self): diff --git a/tests/rest/client/v1/test_profile.py b/tests/rest/client/v1/test_profile.py index 5b5c3edc22..39cd68d829 100644 --- a/tests/rest/client/v1/test_profile.py +++ b/tests/rest/client/v1/test_profile.py @@ -24,6 +24,7 @@ from ....utils import MockHttpResource, MockKey from synapse.api.errors import SynapseError, AuthError from synapse.server import HomeServer +from synapse.types import UserID from synapse.rest.client.v1 import profile @@ -57,7 +58,7 @@ class ProfileTestCase(unittest.TestCase): ) def _get_user_by_req(request=None): - return hs.parse_userid(myid) + return UserID.from_string(myid) hs.get_auth().get_user_by_req = _get_user_by_req diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 12f8040541..76ed550b75 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -22,13 +22,10 @@ import synapse.rest.client.v1.room from synapse.api.constants import Membership from synapse.server import HomeServer +from synapse.types import UserID -from tests import unittest - -# python imports import json import urllib -import types from ....utils import MockHttpResource, SQLiteMemoryDbPool, MockKey from .utils import RestTestCase @@ -70,7 +67,7 @@ class RoomPermissionsTestCase(RestTestCase): def _get_user_by_token(token=None): return { - "user": hs.parse_userid(self.auth_user_id), + "user": UserID.from_string(self.auth_user_id), "admin": False, "device_id": None, } @@ -466,7 +463,7 @@ class RoomsMemberListTestCase(RestTestCase): def _get_user_by_token(token=None): return { - "user": hs.parse_userid(self.auth_user_id), + "user": UserID.from_string(self.auth_user_id), "admin": False, "device_id": None, } @@ -555,7 +552,7 @@ class RoomsCreateTestCase(RestTestCase): def _get_user_by_token(token=None): return { - "user": hs.parse_userid(self.auth_user_id), + "user": UserID.from_string(self.auth_user_id), "admin": False, "device_id": None, } @@ -657,7 +654,7 @@ class RoomTopicTestCase(RestTestCase): def _get_user_by_token(token=None): return { - "user": hs.parse_userid(self.auth_user_id), + "user": UserID.from_string(self.auth_user_id), "admin": False, "device_id": None, } @@ -773,7 +770,7 @@ class RoomMemberStateTestCase(RestTestCase): def _get_user_by_token(token=None): return { - "user": hs.parse_userid(self.auth_user_id), + "user": UserID.from_string(self.auth_user_id), "admin": False, "device_id": None, } @@ -909,7 +906,7 @@ class RoomMessagesTestCase(RestTestCase): def _get_user_by_token(token=None): return { - "user": hs.parse_userid(self.auth_user_id), + "user": UserID.from_string(self.auth_user_id), "admin": False, "device_id": None, } @@ -1013,7 +1010,7 @@ class RoomInitialSyncTestCase(RestTestCase): def _get_user_by_token(token=None): return { - "user": hs.parse_userid(self.auth_user_id), + "user": UserID.from_string(self.auth_user_id), "admin": False, "device_id": None, } @@ -1028,7 +1025,7 @@ class RoomInitialSyncTestCase(RestTestCase): # Since I'm getting my own presence I need to exist as far as presence # is concerned. hs.get_handlers().presence_handler.registered_user( - hs.parse_userid(self.user_id) + UserID.from_string(self.user_id) ) # create the room diff --git a/tests/rest/client/v1/test_typing.py b/tests/rest/client/v1/test_typing.py index 647bcebfd8..c89b37d004 100644 --- a/tests/rest/client/v1/test_typing.py +++ b/tests/rest/client/v1/test_typing.py @@ -20,6 +20,7 @@ from twisted.internet import defer import synapse.rest.client.v1.room from synapse.server import HomeServer +from synapse.types import UserID from ....utils import MockHttpResource, MockClock, SQLiteMemoryDbPool, MockKey from .utils import RestTestCase @@ -69,7 +70,7 @@ class RoomTypingTestCase(RestTestCase): def _get_user_by_token(token=None): return { - "user": hs.parse_userid(self.auth_user_id), + "user": UserID.from_string(self.auth_user_id), "admin": False, "device_id": None, } @@ -82,7 +83,7 @@ class RoomTypingTestCase(RestTestCase): def get_room_members(room_id): if room_id == self.room_id: - return defer.succeed([hs.parse_userid(self.user_id)]) + return defer.succeed([UserID.from_string(self.user_id)]) else: return defer.succeed([]) diff --git a/tests/storage/test_presence.py b/tests/storage/test_presence.py index 9655d3cf42..1ab193736b 100644 --- a/tests/storage/test_presence.py +++ b/tests/storage/test_presence.py @@ -19,6 +19,7 @@ from twisted.internet import defer from synapse.server import HomeServer from synapse.storage.presence import PresenceStore +from synapse.types import UserID from tests.utils import SQLiteMemoryDbPool, MockClock @@ -37,8 +38,8 @@ class PresenceStoreTestCase(unittest.TestCase): self.store = PresenceStore(hs) - self.u_apple = hs.parse_userid("@apple:test") - self.u_banana = hs.parse_userid("@banana:test") + self.u_apple = UserID.from_string("@apple:test") + self.u_banana = UserID.from_string("@banana:test") @defer.inlineCallbacks def test_state(self): diff --git a/tests/storage/test_profile.py b/tests/storage/test_profile.py index 5d36723c28..84381241bc 100644 --- a/tests/storage/test_profile.py +++ b/tests/storage/test_profile.py @@ -19,6 +19,7 @@ from twisted.internet import defer from synapse.server import HomeServer from synapse.storage.profile import ProfileStore +from synapse.types import UserID from tests.utils import SQLiteMemoryDbPool @@ -36,7 +37,7 @@ class ProfileStoreTestCase(unittest.TestCase): self.store = ProfileStore(hs) - self.u_frank = hs.parse_userid("@frank:test") + self.u_frank = UserID.from_string("@frank:test") @defer.inlineCallbacks def test_displayname(self): diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py index 9806fbc69b..a16ccad881 100644 --- a/tests/storage/test_redaction.py +++ b/tests/storage/test_redaction.py @@ -19,6 +19,7 @@ from twisted.internet import defer from synapse.server import HomeServer from synapse.api.constants import EventTypes, Membership +from synapse.types import UserID from tests.utils import SQLiteMemoryDbPool, MockKey @@ -48,8 +49,8 @@ class RedactionTestCase(unittest.TestCase): self.handlers = hs.get_handlers() self.message_handler = self.handlers.message_handler - self.u_alice = hs.parse_userid("@alice:test") - self.u_bob = hs.parse_userid("@bob:test") + self.u_alice = UserID.from_string("@alice:test") + self.u_bob = UserID.from_string("@bob:test") self.room1 = hs.parse_roomid("!abc123:test") diff --git a/tests/storage/test_room.py b/tests/storage/test_room.py index e7739776ec..c6bfde069a 100644 --- a/tests/storage/test_room.py +++ b/tests/storage/test_room.py @@ -19,6 +19,7 @@ from twisted.internet import defer from synapse.server import HomeServer from synapse.api.constants import EventTypes +from synapse.types import UserID from tests.utils import SQLiteMemoryDbPool @@ -40,7 +41,7 @@ class RoomStoreTestCase(unittest.TestCase): self.room = hs.parse_roomid("!abcde:test") self.alias = hs.parse_roomalias("#a-room-name:test") - self.u_creator = hs.parse_userid("@creator:test") + self.u_creator = UserID.from_string("@creator:test") yield self.store.store_room(self.room.to_string(), room_creator_user_id=self.u_creator.to_string(), diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py index a23a8189df..6b7930b1d8 100644 --- a/tests/storage/test_roommember.py +++ b/tests/storage/test_roommember.py @@ -19,6 +19,7 @@ from twisted.internet import defer from synapse.server import HomeServer from synapse.api.constants import EventTypes, Membership +from synapse.types import UserID from tests.utils import SQLiteMemoryDbPool, MockKey @@ -49,11 +50,11 @@ class RoomMemberStoreTestCase(unittest.TestCase): self.handlers = hs.get_handlers() self.message_handler = self.handlers.message_handler - self.u_alice = hs.parse_userid("@alice:test") - self.u_bob = hs.parse_userid("@bob:test") + self.u_alice = UserID.from_string("@alice:test") + self.u_bob = UserID.from_string("@bob:test") # User elsewhere on another host - self.u_charlie = hs.parse_userid("@charlie:elsewhere") + self.u_charlie = UserID.from_string("@charlie:elsewhere") self.room = hs.parse_roomid("!abc123:test") diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 9247fc579e..d7c7f64d5e 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -19,6 +19,7 @@ from twisted.internet import defer from synapse.server import HomeServer from synapse.api.constants import EventTypes, Membership +from synapse.types import UserID from tests.utils import SQLiteMemoryDbPool, MockKey @@ -48,8 +49,8 @@ class StreamStoreTestCase(unittest.TestCase): self.handlers = hs.get_handlers() self.message_handler = self.handlers.message_handler - self.u_alice = hs.parse_userid("@alice:test") - self.u_bob = hs.parse_userid("@bob:test") + self.u_alice = UserID.from_string("@alice:test") + self.u_bob = UserID.from_string("@bob:test") self.room1 = hs.parse_roomid("!abc123:test") self.room2 = hs.parse_roomid("!xyx987:test") diff --git a/tests/test_types.py b/tests/test_types.py index bfb9e6f548..2de7f22ab0 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -42,12 +42,6 @@ class UserIDTestCase(unittest.TestCase): self.assertTrue(userA == userAagain) self.assertTrue(userA != userB) - def test_via_homeserver(self): - user = mock_homeserver.parse_userid("@3456ijkl:my.domain") - - self.assertEquals("3456ijkl", user.localpart) - self.assertEquals("my.domain", user.domain) - class RoomAliasTestCase(unittest.TestCase): -- cgit 1.4.1 From 0ef5bfd6a9eaaae14e199997658b3d0006abd854 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 28 Jan 2015 16:16:53 +0000 Subject: Start implementing auth conflict res --- synapse/api/auth.py | 38 +++--- synapse/api/constants.py | 6 + synapse/federation/federation_client.py | 39 ++++++ synapse/handlers/federation.py | 211 ++++++++++++++++++++++++++------ synapse/storage/rejections.py | 10 ++ synapse/storage/schema/im.sql | 1 + 6 files changed, 253 insertions(+), 52 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index a342a0e0da..461faa8c78 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -353,9 +353,23 @@ class Auth(object): def add_auth_events(self, builder, context): yield run_on_reactor() - if builder.type == EventTypes.Create: - builder.auth_events = [] - return + auth_ids = self.compute_auth_events(builder, context) + + auth_events_entries = yield self.store.add_event_hashes( + auth_ids + ) + + builder.auth_events = auth_events_entries + + context.auth_events = { + k: v + for k, v in context.current_state.items() + if v.event_id in auth_ids + } + + def compute_auth_events(self, event, context): + if event.type == EventTypes.Create: + return [] auth_ids = [] @@ -368,7 +382,7 @@ class Auth(object): key = (EventTypes.JoinRules, "", ) join_rule_event = context.current_state.get(key) - key = (EventTypes.Member, builder.user_id, ) + key = (EventTypes.Member, event.user_id, ) member_event = context.current_state.get(key) key = (EventTypes.Create, "", ) @@ -382,8 +396,8 @@ class Auth(object): else: is_public = False - if builder.type == EventTypes.Member: - e_type = builder.content["membership"] + if event.type == EventTypes.Member: + e_type = event.content["membership"] if e_type in [Membership.JOIN, Membership.INVITE]: if join_rule_event: auth_ids.append(join_rule_event.event_id) @@ -398,17 +412,7 @@ class Auth(object): if member_event.content["membership"] == Membership.JOIN: auth_ids.append(member_event.event_id) - auth_events_entries = yield self.store.add_event_hashes( - auth_ids - ) - - builder.auth_events = auth_events_entries - - context.auth_events = { - k: v - for k, v in context.current_state.items() - if v.event_id in auth_ids - } + return auth_ids @log_function def _can_send_event(self, event, auth_events): diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 7ee6dcc46e..0d3fc629af 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -74,3 +74,9 @@ class EventTypes(object): Message = "m.room.message" Topic = "m.room.topic" Name = "m.room.name" + + +class RejectedReason(object): + AUTH_ERROR = "auth_error" + REPLACED = "replaced" + NOT_ANCESTOR = "not_ancestor" diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 91b44cd8b3..ebcd593506 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -331,6 +331,45 @@ class FederationClient(object): defer.returnValue(pdu) + @defer.inlineCallbacks + def query_auth(self, destination, room_id, event_id, local_auth): + """ + Params: + destination (str) + event_it (str) + local_auth (list) + """ + time_now = self._clock.time_msec() + + send_content = { + "auth_chain": [e.get_pdu_json(time_now) for e in local_auth], + } + + code, content = yield self.transport_layer.send_invite( + destination=destination, + room_id=room_id, + event_id=event_id, + content=send_content, + ) + + auth_chain = [ + (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + for e in content["auth_chain"] + ] + + missing = [ + (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + for e in content.get("missing", []) + ] + + ret = { + "auth_chain": auth_chain, + "rejects": content.get("rejects", []), + "missing": missing, + } + + defer.returnValue(ret) + def event_from_pdu_json(self, pdu_json, outlier=False): event = FrozenEvent( pdu_json diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bcdcc90a18..97e3c503b9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -17,19 +17,16 @@ from ._base import BaseHandler -from synapse.events.utils import prune_event from synapse.api.errors import ( - AuthError, FederationError, SynapseError, StoreError, + AuthError, FederationError, StoreError, ) -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.crypto.event_signing import ( - compute_event_signature, check_event_content_hash, - add_hashes_and_signatures, + compute_event_signature, add_hashes_and_signatures, ) from synapse.types import UserID -from syutil.jsonutil import encode_canonical_json from twisted.internet import defer @@ -113,33 +110,6 @@ class FederationHandler(BaseHandler): logger.debug("Processing event: %s", event.event_id) - redacted_event = prune_event(event) - - redacted_pdu_json = redacted_event.get_pdu_json() - try: - yield self.keyring.verify_json_for_server( - event.origin, redacted_pdu_json - ) - except SynapseError as e: - logger.warn( - "Signature check failed for %s redacted to %s", - encode_canonical_json(pdu.get_pdu_json()), - encode_canonical_json(redacted_pdu_json), - ) - raise FederationError( - "ERROR", - e.code, - e.msg, - affected=event.event_id, - ) - - if not check_event_content_hash(event): - logger.warn( - "Event content has been tampered, redacting %s, %s", - event.event_id, encode_canonical_json(event.get_dict()) - ) - event = redacted_event - logger.debug("Event: %s", event) # FIXME (erikj): Awful hack to make the case where we are not currently @@ -180,7 +150,6 @@ class FederationHandler(BaseHandler): if state: for e in state: - logging.info("A :) %r", e) e.internal_metadata.outlier = True try: yield self._handle_new_event(e) @@ -747,7 +716,20 @@ class FederationHandler(BaseHandler): event.event_id, event.signatures, ) - self.auth.check(event, auth_events=context.auth_events) + try: + self.auth.check(event, auth_events=context.auth_events) + except AuthError: + # TODO: Store rejection. + context.rejected = RejectedReason.AUTH_ERROR + + yield self.store.persist_event( + event, + context=context, + backfilled=backfilled, + is_new_state=False, + current_state=current_state, + ) + raise logger.debug( "_handle_new_event: Before persist_event: %s, sigs: %s", @@ -768,3 +750,162 @@ class FederationHandler(BaseHandler): ) defer.returnValue(context) + + @defer.inlineCallbacks + def do_auth(self, origin, event, context): + for e_id, _ in event.auth_events: + pass + + auth_events = set(e_id for e_id, _ in event.auth_events) + current_state = set(e.event_id for e in context.auth_events.values()) + + missing_auth = auth_events - current_state + + if missing_auth: + # Do auth conflict res. + + # 1. Get what we think is the auth chain. + auth_ids = self.auth.compute_auth_events(event, context) + local_auth_chain = yield self.store.get_auth_chain(auth_ids) + + # 2. Get remote difference. + result = yield self.replication_layer.query_auth( + origin, + event.room_id, + event.event_id, + local_auth_chain, + ) + + # 3. Process any remote auth chain events we haven't seen. + for e in result.get("missing", []): + # TODO. + pass + + # 4. Look at rejects and their proofs. + # TODO. + + try: + self.auth.check(event, auth_events=context.auth_events) + except AuthError: + raise + + @defer.inlineCallbacks + def construct_auth_difference(self, local_auth, remote_auth): + """ Given a local and remote auth chain, find the differences. This + assumes that we have already processed all events in remote_auth + + Params: + local_auth (list) + remote_auth (list) + + Returns: + dict + """ + + # TODO: Make sure we are OK with local_auth or remote_auth having more + # auth events in them than strictly necessary. + + def sort_fun(ev): + return ev.depth, ev.event_id + + # We find the differences by starting at the "bottom" of each list + # and iterating up on both lists. The lists are ordered by depth and + # then event_id, we iterate up both lists until we find the event ids + # don't match. Then we look at depth/event_id to see which side is + # missing that event, and iterate only up that list. Repeat. + + remote_list = list(remote_auth) + remote_list.sort(key=sort_fun) + + local_list = list(local_auth) + local_list.sort(key=sort_fun) + + local_iter = iter(local_list) + remote_iter = iter(remote_list) + + current_local = local_iter.next() + current_remote = remote_iter.next() + + def get_next(it, opt=None): + return it.next() if it.has_next() else opt + + missing_remotes = [] + missing_locals = [] + while current_local and current_remote: + if current_remote is None: + missing_locals.append(current_local) + current_local = get_next(local_iter) + continue + + if current_local is None: + missing_remotes.append(current_remote) + current_remote = get_next(remote_iter) + continue + + if current_local.event_id == current_remote.event_id: + current_local = get_next(local_iter) + current_remote = get_next(remote_iter) + continue + + if current_local.depth < current_remote.depth: + missing_locals.append(current_local) + current_local = get_next(local_iter) + continue + + if current_local.depth > current_remote.depth: + missing_remotes.append(current_remote) + current_remote = get_next(remote_iter) + continue + + # They have the same depth, so we fall back to the event_id order + if current_local.event_id < current_remote.event_id: + missing_locals.append(current_local) + current_local = get_next(local_iter) + + if current_local.event_id > current_remote.event_id: + missing_remotes.append(current_remote) + current_remote = get_next(remote_iter) + continue + + # missing locals should be sent to the server + # We should find why we are missing remotes, as they will have been + # rejected. + + # Remove events from missing_remotes if they are referencing a missing + # remote. We only care about the "root" rejected ones. + missing_remote_ids = [e.event_id for e in missing_remotes] + base_remote_rejected = list(missing_remotes) + for e in missing_remotes: + for e_id, _ in e.auth_events: + if e_id in missing_remote_ids: + base_remote_rejected.remove(e) + + reason_map = {} + + for e in base_remote_rejected: + reason = yield self.store.get_rejection_reason(e.event_id) + if reason is None: + # FIXME: ERRR?! + raise RuntimeError("") + + reason_map[e.event_id] = reason + + if reason == RejectedReason.AUTH_ERROR: + pass + elif reason == RejectedReason.REPLACED: + # TODO: Get proof + pass + elif reason == RejectedReason.NOT_ANCESTOR: + # TODO: Get proof. + pass + + defer.returnValue({ + "rejects": { + e.event_id: { + "reason": reason_map[e.event_id], + "proof": None, + } + for e in base_remote_rejected + }, + "missing": missing_locals, + }) diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py index 7d38b31f44..b7249700d7 100644 --- a/synapse/storage/rejections.py +++ b/synapse/storage/rejections.py @@ -31,3 +31,13 @@ class RejectionsStore(SQLBaseStore): "last_failure": self._clock.time_msec(), } ) + + def get_rejection_reason(self, event_id): + self._simple_select_one_onecol( + table="rejections", + retcol="reason", + keyvalues={ + "event_id": event_id, + }, + allow_none=True, + ) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index bc7c6b6ed5..5866a387f6 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -128,5 +128,6 @@ CREATE TABLE IF NOT EXISTS rejections( event_id TEXT NOT NULL, reason TEXT NOT NULL, last_check TEXT NOT NULL, + root_rejected TEXT, CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE ); -- cgit 1.4.1 From 78015948a7febb18e000651f72f8f58830a55b93 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Jan 2015 16:50:23 +0000 Subject: Initial implementation of auth conflict resolution --- synapse/events/utils.py | 6 +- synapse/federation/federation_client.py | 2 +- synapse/federation/federation_server.py | 33 +++++ synapse/federation/transport/client.py | 16 +++ synapse/federation/transport/server.py | 21 +++- synapse/handlers/federation.py | 207 ++++++++++++++++++++------------ synapse/storage/rejections.py | 4 +- tests/handlers/test_federation.py | 2 + 8 files changed, 210 insertions(+), 81 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index bcb5457278..10a6b9f264 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -45,12 +45,14 @@ def prune_event(event): "membership", ] + event_dict = event.get_dict() + new_content = {} def add_fields(*fields): for field in fields: if field in event.content: - new_content[field] = event.content[field] + new_content[field] = event_dict["content"][field] if event_type == EventTypes.Member: add_fields("membership") @@ -75,7 +77,7 @@ def prune_event(event): allowed_fields = { k: v - for k, v in event.get_dict().items() + for k, v in event_dict.items() if k in allowed_keys } diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ebcd593506..1173ca817b 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -345,7 +345,7 @@ class FederationClient(object): "auth_chain": [e.get_pdu_json(time_now) for e in local_auth], } - code, content = yield self.transport_layer.send_invite( + code, content = yield self.transport_layer.send_query_auth( destination=destination, room_id=room_id, event_id=event_id, diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index fc5342afaa..8cff4e6472 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -230,6 +230,39 @@ class FederationServer(object): "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], })) + @defer.inlineCallbacks + def on_query_auth_request(self, origin, content, event_id): + auth_chain = [ + (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + for e in content["auth_chain"] + ] + + missing = [ + (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + for e in content.get("missing", []) + ] + + ret = yield self.handler.on_query_auth( + origin, event_id, auth_chain, content.get("rejects", []), missing + ) + + time_now = self._clock.time_msec() + send_content = { + "auth_chain": [ + e.get_pdu_json(time_now) + for e in ret["auth_chain"] + ], + "rejects": content.get("rejects", []), + "missing": [ + e.get_pdu_json(time_now) + for e in ret.get("missing", []) + ], + } + + defer.returnValue( + (200, send_content) + ) + @log_function def _get_persisted_pdu(self, origin, event_id, do_auth=True): """ Get a PDU from the database with given origin and id. diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index e634a3a213..4cb1dea2de 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -213,3 +213,19 @@ class TransportLayerClient(object): ) defer.returnValue(response) + + @defer.inlineCallbacks + @log_function + def send_query_auth(self, destination, room_id, event_id, content): + path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id) + + code, content = yield self.client.post_json( + destination=destination, + path=path, + data=content, + ) + + if not 200 <= code < 300: + raise RuntimeError("Got %d from send_invite", code) + + defer.returnValue(json.loads(content)) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index a380a6910b..9c9f8d525b 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -42,7 +42,7 @@ class TransportLayerServer(object): content = None origin = None - if request.method == "PUT": + if request.method in ["PUT", "POST"]: # TODO: Handle other method types? other content types? try: content_bytes = request.content.read() @@ -234,6 +234,16 @@ class TransportLayerServer(object): ) ) ) + self.server.register_path( + "POST", + re.compile("^" + PREFIX + "/query_auth/([^/]*)/([^/]*)$"), + self._with_authentication( + lambda origin, content, query, context, event_id: + self._on_query_auth_request( + origin, content, event_id, + ) + ) + ) @defer.inlineCallbacks @log_function @@ -325,3 +335,12 @@ class TransportLayerServer(object): ) defer.returnValue((200, content)) + + @defer.inlineCallbacks + @log_function + def _on_query_auth_request(self, origin, content, event_id): + new_content = yield self.request_handler.on_query_auth_request( + origin, content, event_id + ) + + defer.returnValue((200, new_content)) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 97e3c503b9..14c26d8cea 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -126,7 +126,7 @@ class FederationHandler(BaseHandler): if not state: state, auth_chain = yield replication.get_state_for_room( - origin, context=event.room_id, event_id=event.event_id, + origin, room_id=event.room_id, event_id=event.event_id, ) if not auth_chain: @@ -139,7 +139,7 @@ class FederationHandler(BaseHandler): for e in auth_chain: e.internal_metadata.outlier = True try: - yield self._handle_new_event(e, fetch_auth_from=origin) + yield self._handle_new_event(origin, e) except: logger.exception( "Failed to handle auth event %s", @@ -152,7 +152,7 @@ class FederationHandler(BaseHandler): for e in state: e.internal_metadata.outlier = True try: - yield self._handle_new_event(e) + yield self._handle_new_event(origin, e) except: logger.exception( "Failed to handle state event %s", @@ -161,6 +161,7 @@ class FederationHandler(BaseHandler): try: yield self._handle_new_event( + origin, event, state=state, backfilled=backfilled, @@ -363,7 +364,14 @@ class FederationHandler(BaseHandler): for e in auth_chain: e.internal_metadata.outlier = True try: - yield self._handle_new_event(e) + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event( + target_host, e, auth_events=auth + ) except: logger.exception( "Failed to handle auth event %s", @@ -374,8 +382,13 @@ class FederationHandler(BaseHandler): # FIXME: Auth these. e.internal_metadata.outlier = True try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } yield self._handle_new_event( - e, fetch_auth_from=target_host + target_host, e, auth_events=auth ) except: logger.exception( @@ -384,6 +397,7 @@ class FederationHandler(BaseHandler): ) yield self._handle_new_event( + target_host, new_event, state=state, current_state=state, @@ -450,7 +464,7 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - context = yield self._handle_new_event(event) + context = yield self._handle_new_event(origin, event) logger.debug( "on_send_join_request: After _handle_new_event: %s, sigs: %s", @@ -651,11 +665,12 @@ class FederationHandler(BaseHandler): waiters.pop().callback(None) @defer.inlineCallbacks - def _handle_new_event(self, event, state=None, backfilled=False, - current_state=None, fetch_auth_from=None): + @log_function + def _handle_new_event(self, origin, event, state=None, backfilled=False, + current_state=None, auth_events=None): logger.debug( - "_handle_new_event: Before annotate: %s, sigs: %s", + "_handle_new_event: %s, sigs: %s", event.event_id, event.signatures, ) @@ -663,62 +678,34 @@ class FederationHandler(BaseHandler): event, old_state=state ) + if not auth_events: + auth_events = context.auth_events + logger.debug( - "_handle_new_event: Before auth fetch: %s, sigs: %s", - event.event_id, event.signatures, + "_handle_new_event: %s, auth_events: %s", + event.event_id, auth_events, ) is_new_state = not event.internal_metadata.is_outlier() - known_ids = set( - [s.event_id for s in context.auth_events.values()] - ) - - for e_id, _ in event.auth_events: - if e_id not in known_ids: - e = yield self.store.get_event(e_id, allow_none=True) - - if not e and fetch_auth_from is not None: - # Grab the auth_chain over federation if we are missing - # auth events. - auth_chain = yield self.replication_layer.get_event_auth( - fetch_auth_from, event.event_id, event.room_id - ) - for auth_event in auth_chain: - yield self._handle_new_event(auth_event) - e = yield self.store.get_event(e_id, allow_none=True) - - if not e: - # TODO: Do some conflict res to make sure that we're - # not the ones who are wrong. - logger.info( - "Rejecting %s as %s not in db or %s", - event.event_id, e_id, known_ids, - ) - # FIXME: How does raising AuthError work with federation? - raise AuthError(403, "Cannot find auth event") - - context.auth_events[(e.type, e.state_key)] = e - - logger.debug( - "_handle_new_event: Before hack: %s, sigs: %s", - event.event_id, event.signatures, - ) - + # This is a hack to fix some old rooms where the initial join event + # didn't reference the create event in its auth events. if event.type == EventTypes.Member and not event.auth_events: if len(event.prev_events) == 1: c = yield self.store.get_event(event.prev_events[0][0]) if c.type == EventTypes.Create: - context.auth_events[(c.type, c.state_key)] = c - - logger.debug( - "_handle_new_event: Before auth check: %s, sigs: %s", - event.event_id, event.signatures, - ) + auth_events[(c.type, c.state_key)] = c try: - self.auth.check(event, auth_events=context.auth_events) - except AuthError: + yield self.do_auth( + origin, event, context, auth_events=auth_events + ) + except AuthError as e: + logger.warn( + "Rejecting %s because %s", + event.event_id, e.msg + ) + # TODO: Store rejection. context.rejected = RejectedReason.AUTH_ERROR @@ -731,11 +718,6 @@ class FederationHandler(BaseHandler): ) raise - logger.debug( - "_handle_new_event: Before persist_event: %s, sigs: %s", - event.event_id, event.signatures, - ) - yield self.store.persist_event( event, context=context, @@ -744,25 +726,73 @@ class FederationHandler(BaseHandler): current_state=current_state, ) - logger.debug( - "_handle_new_event: After persist_event: %s, sigs: %s", - event.event_id, event.signatures, + defer.returnValue(context) + + @defer.inlineCallbacks + def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, + missing): + # Just go through and process each event in `remote_auth_chain`. We + # don't want to fall into the trap of `missing` being wrong. + for e in remote_auth_chain: + try: + yield self._handle_new_event(origin, e) + except AuthError: + pass + + # Now get the current auth_chain for the event. + local_auth_chain = yield self.store.get_auth_chain([event_id]) + + # TODO: Check if we would now reject event_id. If so we need to tell + # everyone. + + ret = yield self.construct_auth_difference( + local_auth_chain, remote_auth_chain ) - defer.returnValue(context) + logger.debug("on_query_auth reutrning: %s", ret) + + defer.returnValue(ret) @defer.inlineCallbacks - def do_auth(self, origin, event, context): - for e_id, _ in event.auth_events: - pass + @log_function + def do_auth(self, origin, event, context, auth_events): + # Check if we have all the auth events. + res = yield self.store.have_events( + [e_id for e_id, _ in event.auth_events] + ) - auth_events = set(e_id for e_id, _ in event.auth_events) - current_state = set(e.event_id for e in context.auth_events.values()) + event_auth_events = set(e_id for e_id, _ in event.auth_events) + seen_events = set(res.keys()) - missing_auth = auth_events - current_state + missing_auth = event_auth_events - seen_events if missing_auth: + logger.debug("Missing auth: %s", missing_auth) + # If we don't have all the auth events, we need to get them. + remote_auth_chain = yield self.replication_layer.get_event_auth( + origin, event.room_id, event.event_id + ) + + for e in remote_auth_chain: + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in remote_auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event( + origin, e, auth_events=auth + ) + auth_events[(e.type, e.state_key)] = e + except AuthError: + pass + + current_state = set(e.event_id for e in auth_events.values()) + different_auth = event_auth_events - current_state + + if different_auth and not event.internal_metadata.is_outlier(): # Do auth conflict res. + logger.debug("Different auth: %s", different_auth) # 1. Get what we think is the auth chain. auth_ids = self.auth.compute_auth_events(event, context) @@ -778,14 +808,24 @@ class FederationHandler(BaseHandler): # 3. Process any remote auth chain events we haven't seen. for e in result.get("missing", []): - # TODO. - pass + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in result["auth_chain"] + if e.event_id in auth_ids + } + yield self._handle_new_event( + origin, e, auth_events=auth + ) + auth_events[(e.type, e.state_key)] = e + except AuthError: + pass # 4. Look at rejects and their proofs. # TODO. try: - self.auth.check(event, auth_events=context.auth_events) + self.auth.check(event, auth_events=auth_events) except AuthError: raise @@ -802,12 +842,16 @@ class FederationHandler(BaseHandler): dict """ + logger.debug("construct_auth_difference Start!") + # TODO: Make sure we are OK with local_auth or remote_auth having more # auth events in them than strictly necessary. def sort_fun(ev): return ev.depth, ev.event_id + logger.debug("construct_auth_difference after sort_fun!") + # We find the differences by starting at the "bottom" of each list # and iterating up on both lists. The lists are ordered by depth and # then event_id, we iterate up both lists until we find the event ids @@ -823,11 +867,18 @@ class FederationHandler(BaseHandler): local_iter = iter(local_list) remote_iter = iter(remote_list) - current_local = local_iter.next() - current_remote = remote_iter.next() + logger.debug("construct_auth_difference before get_next!") def get_next(it, opt=None): - return it.next() if it.has_next() else opt + try: + return it.next() + except: + return opt + + current_local = get_next(local_iter) + current_remote = get_next(remote_iter) + + logger.debug("construct_auth_difference before while") missing_remotes = [] missing_locals = [] @@ -867,6 +918,8 @@ class FederationHandler(BaseHandler): current_remote = get_next(remote_iter) continue + logger.debug("construct_auth_difference after while") + # missing locals should be sent to the server # We should find why we are missing remotes, as they will have been # rejected. @@ -886,6 +939,7 @@ class FederationHandler(BaseHandler): reason = yield self.store.get_rejection_reason(e.event_id) if reason is None: # FIXME: ERRR?! + logger.warn("Could not find reason for %s", e.event_id) raise RuntimeError("") reason_map[e.event_id] = reason @@ -899,7 +953,10 @@ class FederationHandler(BaseHandler): # TODO: Get proof. pass + logger.debug("construct_auth_difference returning") + defer.returnValue({ + "auth_chain": local_auth, "rejects": { e.event_id: { "reason": reason_map[e.event_id], diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py index b7249700d7..4e1a9a2783 100644 --- a/synapse/storage/rejections.py +++ b/synapse/storage/rejections.py @@ -28,12 +28,12 @@ class RejectionsStore(SQLBaseStore): values={ "event_id": event_id, "reason": reason, - "last_failure": self._clock.time_msec(), + "last_check": self._clock.time_msec(), } ) def get_rejection_reason(self, event_id): - self._simple_select_one_onecol( + return self._simple_select_one_onecol( table="rejections", retcol="reason", keyvalues={ diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index ed21defd13..44dbce6bea 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -52,6 +52,7 @@ class FederationTestCase(unittest.TestCase): "get_room", "get_destination_retry_timings", "set_destination_retry_timings", + "have_events", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), @@ -90,6 +91,7 @@ class FederationTestCase(unittest.TestCase): self.datastore.persist_event.return_value = defer.succeed(None) self.datastore.get_room.return_value = defer.succeed(True) self.auth.check_host_in_room.return_value = defer.succeed(True) + self.datastore.have_events.return_value = defer.succeed({}) def annotate(ev, old_state=None): context = Mock() -- cgit 1.4.1 From c1d860870b418b9583078022d0babe557b73760f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 10:48:47 +0000 Subject: Fix regression where we no longer correctly handled the case of gaps in our event graph --- synapse/federation/federation_server.py | 3 +++ synapse/handlers/federation.py | 8 ++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8cff4e6472..845a07a3a3 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -366,10 +366,13 @@ class FederationServer(object): logger.debug("Processed pdu %s", event_id) else: logger.warn("Failed to get PDU %s", event_id) + fetch_state = True except: # TODO(erikj): Do some more intelligent retries. logger.exception("Failed to get PDU") fetch_state = True + else: + fetch_state = True else: fetch_state = True diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 14c26d8cea..de47a97e6b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -119,7 +119,7 @@ class FederationHandler(BaseHandler): event.room_id, self.server_name ) - if not is_in_room and not event.internal_metadata.outlier: + if not is_in_room and not event.internal_metadata.is_outlier(): logger.debug("Got event for room we're not in.") replication = self.replication_layer @@ -780,6 +780,7 @@ class FederationHandler(BaseHandler): (e.type, e.state_key): e for e in remote_auth_chain if e.event_id in auth_ids } + e.internal_metadata.outlier = True yield self._handle_new_event( origin, e, auth_events=auth ) @@ -787,6 +788,8 @@ class FederationHandler(BaseHandler): except AuthError: pass + # FIXME: Assumes we have and stored all the state for all the + # prev_events current_state = set(e.event_id for e in auth_events.values()) different_auth = event_auth_events - current_state @@ -814,6 +817,7 @@ class FederationHandler(BaseHandler): (e.type, e.state_key): e for e in result["auth_chain"] if e.event_id in auth_ids } + e.internal_metadata.outlier = True yield self._handle_new_event( origin, e, auth_events=auth ) @@ -882,7 +886,7 @@ class FederationHandler(BaseHandler): missing_remotes = [] missing_locals = [] - while current_local and current_remote: + while current_local or current_remote: if current_remote is None: missing_locals.append(current_local) current_local = get_next(local_iter) -- cgit 1.4.1 From 0c2d245fdf5f24f99138f1c60c4e6bed52cf5d5a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 11:08:52 +0000 Subject: Update the current state of an event if we update auth events. --- synapse/handlers/federation.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index de47a97e6b..cc22f21cd1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -706,7 +706,6 @@ class FederationHandler(BaseHandler): event.event_id, e.msg ) - # TODO: Store rejection. context.rejected = RejectedReason.AUTH_ERROR yield self.store.persist_event( @@ -828,6 +827,9 @@ class FederationHandler(BaseHandler): # 4. Look at rejects and their proofs. # TODO. + context.current_state.update(auth_events) + context.state_group = None + try: self.auth.check(event, auth_events=auth_events) except AuthError: -- cgit 1.4.1 From a70a801184814d116ed5b10a952e17c45df7bfc8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 13:34:01 +0000 Subject: Fix bug where we superfluously asked for current state. Change API of /query_auth/ so that we don't duplicate events in the response. --- synapse/api/auth.py | 2 ++ synapse/federation/federation_client.py | 7 +---- synapse/federation/federation_server.py | 12 ++++---- synapse/handlers/federation.py | 51 ++++++++++++--------------------- synapse/state.py | 20 ++++++++++--- 5 files changed, 43 insertions(+), 49 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 3471afd7e7..37e31d2b6f 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -102,6 +102,8 @@ class Auth(object): def check_host_in_room(self, room_id, host): curr_state = yield self.state.get_current_state(room_id) + logger.debug("Got curr_state %s", curr_state) + for event in curr_state: if event.type == EventTypes.Member: try: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 1173ca817b..e1539bd0e0 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -357,15 +357,10 @@ class FederationClient(object): for e in content["auth_chain"] ] - missing = [ - (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) - for e in content.get("missing", []) - ] - ret = { "auth_chain": auth_chain, "rejects": content.get("rejects", []), - "missing": missing, + "missing": content.get("missing", []), } defer.returnValue(ret) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 845a07a3a3..84ed0a0ba0 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -252,11 +252,8 @@ class FederationServer(object): e.get_pdu_json(time_now) for e in ret["auth_chain"] ], - "rejects": content.get("rejects", []), - "missing": [ - e.get_pdu_json(time_now) - for e in ret.get("missing", []) - ], + "rejects": ret.get("rejects", []), + "missing": ret.get("missing", []), } defer.returnValue( @@ -372,7 +369,10 @@ class FederationServer(object): logger.exception("Failed to get PDU") fetch_state = True else: - fetch_state = True + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + if prevs - seen: + fetch_state = True else: fetch_state = True diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index cc22f21cd1..35cad4182a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -121,38 +121,18 @@ class FederationHandler(BaseHandler): ) if not is_in_room and not event.internal_metadata.is_outlier(): logger.debug("Got event for room we're not in.") - - replication = self.replication_layer - - if not state: - state, auth_chain = yield replication.get_state_for_room( - origin, room_id=event.room_id, event_id=event.event_id, - ) - - if not auth_chain: - auth_chain = yield replication.get_event_auth( - origin, - context=event.room_id, - event_id=event.event_id, - ) - - for e in auth_chain: - e.internal_metadata.outlier = True - try: - yield self._handle_new_event(origin, e) - except: - logger.exception( - "Failed to handle auth event %s", - e.event_id, - ) - current_state = state - if state: + if state and auth_chain is not None: for e in state: e.internal_metadata.outlier = True try: - yield self._handle_new_event(origin, e) + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event(origin, e, auth_events=auth) except: logger.exception( "Failed to handle state event %s", @@ -809,18 +789,23 @@ class FederationHandler(BaseHandler): ) # 3. Process any remote auth chain events we haven't seen. - for e in result.get("missing", []): + for missing_id in result.get("missing", []): try: - auth_ids = [e_id for e_id, _ in e.auth_events] + for e in result["auth_chain"]: + if e.event_id == missing_id: + ev = e + break + + auth_ids = [e_id for e_id, _ in ev.auth_events] auth = { (e.type, e.state_key): e for e in result["auth_chain"] if e.event_id in auth_ids } - e.internal_metadata.outlier = True + ev.internal_metadata.outlier = True yield self._handle_new_event( - origin, e, auth_events=auth + origin, ev, auth_events=auth ) - auth_events[(e.type, e.state_key)] = e + auth_events[(ev.type, ev.state_key)] = ev except AuthError: pass @@ -970,5 +955,5 @@ class FederationHandler(BaseHandler): } for e in base_remote_rejected }, - "missing": missing_locals, + "missing": [e.event_id for e in missing_locals], }) diff --git a/synapse/state.py b/synapse/state.py index d9fdfb34be..e6632978b5 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -166,10 +166,17 @@ class StateHandler(object): first is the name of a state group if one and only one is involved, otherwise `None`. """ + logger.debug("resolve_state_groups event_ids %s", event_ids) + state_groups = yield self.store.get_state_groups( event_ids ) + logger.debug( + "resolve_state_groups state_groups %s", + state_groups.keys() + ) + group_names = set(state_groups.keys()) if len(group_names) == 1: name, state_list = state_groups.items().pop() @@ -205,6 +212,15 @@ class StateHandler(object): if len(v.values()) > 1 } + logger.debug( + "resolve_state_groups Unconflicted state: %s", + unconflicted_state.values(), + ) + logger.debug( + "resolve_state_groups Conflicted state: %s", + conflicted_state.values(), + ) + if event_type: prev_states_events = conflicted_state.get( (event_type, state_key), [] @@ -240,10 +256,6 @@ class StateHandler(object): 1. power levels 2. memberships 3. other events. - - :param conflicted_state: - :param auth_events: - :return: """ resolved_state = {} power_key = (EventTypes.PowerLevels, "") -- cgit 1.4.1 From b724a809c46f782231e18b70a632ce1ea9c540da Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 15:57:53 +0000 Subject: Only auth_events with event if event in event.auth_events --- synapse/handlers/federation.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 35cad4182a..40836e0c51 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -805,7 +805,9 @@ class FederationHandler(BaseHandler): yield self._handle_new_event( origin, ev, auth_events=auth ) - auth_events[(ev.type, ev.state_key)] = ev + + if ev.event_id in event_auth_events: + auth_events[(ev.type, ev.state_key)] = ev except AuthError: pass -- cgit 1.4.1 From 2cd29dbdd91a8f58f988eb3c417ad6a187fc7e87 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 16:51:58 +0000 Subject: Fix bug where accepting invite over federation didn't work. Add logging. --- synapse/handlers/federation.py | 57 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 49 insertions(+), 8 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 40836e0c51..623c54d584 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -343,6 +343,10 @@ class FederationHandler(BaseHandler): for e in auth_chain: e.internal_metadata.outlier = True + + if e.event_id == event.event_id: + continue + try: auth_ids = [e_id for e_id, _ in e.auth_events] auth = { @@ -359,7 +363,9 @@ class FederationHandler(BaseHandler): ) for e in state: - # FIXME: Auth these. + if e.event_id == event.event_id: + continue + e.internal_metadata.outlier = True try: auth_ids = [e_id for e_id, _ in e.auth_events] @@ -376,11 +382,18 @@ class FederationHandler(BaseHandler): e.event_id, ) + auth_ids = [e_id for e_id, _ in event.auth_events] + auth_events = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event( target_host, new_event, state=state, current_state=state, + auth_events=auth_events, ) yield self.notifier.on_new_room_event( @@ -752,7 +765,17 @@ class FederationHandler(BaseHandler): origin, event.room_id, event.event_id ) + seen_remotes = yield self.store.have_events( + [e.event_id for e in remote_auth_chain] + ) + for e in remote_auth_chain: + if e.event_id in seen_remotes.keys(): + continue + + if e.event_id == event.event_id: + continue + try: auth_ids = [e_id for e_id, _ in e.auth_events] auth = { @@ -760,10 +783,17 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } e.internal_metadata.outlier = True + + logger.debug( + "do_auth %s missing_auth: %s", + event.event_id, e.event_id + ) yield self._handle_new_event( origin, e, auth_events=auth ) - auth_events[(e.type, e.state_key)] = e + + if e.event_id in event_auth_events: + auth_events[(e.type, e.state_key)] = e except AuthError: pass @@ -788,20 +818,31 @@ class FederationHandler(BaseHandler): local_auth_chain, ) + seen_remotes = yield self.store.have_events( + [e.event_id for e in result["auth_chain"]] + ) + # 3. Process any remote auth chain events we haven't seen. - for missing_id in result.get("missing", []): - try: - for e in result["auth_chain"]: - if e.event_id == missing_id: - ev = e - break + for ev in result["auth_chain"]: + if ev.event_id in seen_remotes.keys(): + continue + if ev.event_id == event.event_id: + continue + + try: auth_ids = [e_id for e_id, _ in ev.auth_events] auth = { (e.type, e.state_key): e for e in result["auth_chain"] if e.event_id in auth_ids } ev.internal_metadata.outlier = True + + logger.debug( + "do_auth %s different_auth: %s", + event.event_id, e.event_id + ) + yield self._handle_new_event( origin, ev, auth_events=auth ) -- cgit 1.4.1 From 4c0da49d7ca58774adad870da98a7e168a5be18c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 22:53:13 +0000 Subject: Resign events when we return them via /query_auth/ --- synapse/handlers/federation.py | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 623c54d584..8bf5a4cc11 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -741,6 +741,15 @@ class FederationHandler(BaseHandler): local_auth_chain, remote_auth_chain ) + for event in ret["auth_chain"]: + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) + logger.debug("on_query_auth reutrning: %s", ret) defer.returnValue(ret) -- cgit 1.4.1 From e7ca813dd476c83497d4130ad8efa9424d86e921 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 10:38:14 +0000 Subject: Try to ensure we don't persist an event we have already persisted. In persist_event check if we already have the event, if so then update instead of replacing so that we don't cause a bump of the stream_ordering. --- synapse/handlers/federation.py | 42 ++++++++++++++++++++++++++------------- synapse/storage/__init__.py | 40 +++++++++++++++++++++++++++++++++---- tests/handlers/test_federation.py | 5 ++++- 3 files changed, 68 insertions(+), 19 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8bf5a4cc11..c384789c2f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -112,6 +112,14 @@ class FederationHandler(BaseHandler): logger.debug("Event: %s", event) + event_ids = set() + if state: + event_ids += {e.event_id for e in state} + if auth_chain: + event_ids += {e.event_id for e in auth_chain} + + seen_ids = (yield self.store.have_events(event_ids)).keys() + # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work current_state = None @@ -124,20 +132,26 @@ class FederationHandler(BaseHandler): current_state = state if state and auth_chain is not None: - for e in state: - e.internal_metadata.outlier = True - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - yield self._handle_new_event(origin, e, auth_events=auth) - except: - logger.exception( - "Failed to handle state event %s", - e.event_id, - ) + for list_of_pdus in [auth_chain, state]: + for e in list_of_pdus: + if e.event_id in seen_ids: + continue + + e.internal_metadata.outlier = True + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event( + origin, e, auth_events=auth + ) + except: + logger.exception( + "Failed to handle state event %s", + e.event_id, + ) try: yield self._handle_new_event( diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index b4a7a3f068..93aefe0c48 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -161,6 +161,39 @@ class DataStore(RoomMemberStore, RoomStore, outlier = event.internal_metadata.is_outlier() + have_persisted = self._simple_select_one_onecol_txn( + txn, + table="event_json", + keyvalues={"event_id": event.event_id}, + retcol="event_id", + allow_none=True, + ) + + metadata_json = encode_canonical_json( + event.internal_metadata.get_dict() + ) + + if have_persisted: + if not outlier: + sql = ( + "UPDATE event_json SET internal_metadata = ?" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (metadata_json.decode("UTF-8"), event.event_id,) + ) + + sql = ( + "UPDATE events SET outlier = 0" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (event.event_id,) + ) + return + event_dict = { k: v for k, v in event.get_dict().items() @@ -170,10 +203,6 @@ class DataStore(RoomMemberStore, RoomStore, ] } - metadata_json = encode_canonical_json( - event.internal_metadata.get_dict() - ) - self._simple_insert_txn( txn, table="event_json", @@ -482,6 +511,9 @@ class DataStore(RoomMemberStore, RoomStore, the rejected reason string if we rejected the event, else maps to None. """ + if not event_ids: + return defer.succeed({}) + def f(txn): sql = ( "SELECT e.event_id, reason FROM events as e " diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 44dbce6bea..4270481139 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -91,7 +91,10 @@ class FederationTestCase(unittest.TestCase): self.datastore.persist_event.return_value = defer.succeed(None) self.datastore.get_room.return_value = defer.succeed(True) self.auth.check_host_in_room.return_value = defer.succeed(True) - self.datastore.have_events.return_value = defer.succeed({}) + + def have_events(event_ids): + return defer.succeed({}) + self.datastore.have_events.side_effect = have_events def annotate(ev, old_state=None): context = Mock() -- cgit 1.4.1 From 51969f9e5f4774e4d0ddcc2e8ebcf96803cbf295 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 10:40:14 +0000 Subject: Return rejected events if asked for it over federation. --- synapse/handlers/federation.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c384789c2f..0161fbe49c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -632,6 +632,7 @@ class FederationHandler(BaseHandler): event = yield self.store.get_event( event_id, allow_none=True, + allow_rejected=True, ) if event: -- cgit 1.4.1 From 4ff2273b300c0a40fcf054a613aac24c51af22f1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 11:23:26 +0000 Subject: Add FIXME note. --- synapse/handlers/federation.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0161fbe49c..322c1b407d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -716,6 +716,8 @@ class FederationHandler(BaseHandler): context.rejected = RejectedReason.AUTH_ERROR + # FIXME: Don't store as rejected with AUTH_ERROR if we haven't + # seen all the auth events. yield self.store.persist_event( event, context=context, -- cgit 1.4.1 From 06c34bfbaee553b851e0f02d8e17d5bff7360376 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 11:23:44 +0000 Subject: Give exception better message --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 322c1b407d..9d7557f578 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1000,7 +1000,7 @@ class FederationHandler(BaseHandler): if reason is None: # FIXME: ERRR?! logger.warn("Could not find reason for %s", e.event_id) - raise RuntimeError("") + raise RuntimeError("Could not find reason for %s" % e.event_id) reason_map[e.event_id] = reason -- cgit 1.4.1 From fed29251d70b050ea5799708b6b13be9617d1f6d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 13:23:58 +0000 Subject: Spelling --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9d7557f578..b9b2e25d1f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -767,7 +767,7 @@ class FederationHandler(BaseHandler): ) ) - logger.debug("on_query_auth reutrning: %s", ret) + logger.debug("on_query_auth returning: %s", ret) defer.returnValue(ret) -- cgit 1.4.1 From 77a076bd25fc355c49251bcf489c0511c0f0f0af Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 13:35:17 +0000 Subject: Set combinations is | and not + --- synapse/handlers/federation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b9b2e25d1f..653ab0dbfa 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -114,9 +114,9 @@ class FederationHandler(BaseHandler): event_ids = set() if state: - event_ids += {e.event_id for e in state} + event_ids |= {e.event_id for e in state} if auth_chain: - event_ids += {e.event_id for e in auth_chain} + event_ids |= {e.event_id for e in auth_chain} seen_ids = (yield self.store.have_events(event_ids)).keys() -- cgit 1.4.1 From 6efd4d1649a539ba4bf1c884ebb90a48c2d1c8df Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 13:57:54 +0000 Subject: Don't completely die if get auth_chain or querying auth_chain requests fail --- synapse/handlers/federation.py | 135 ++++++++++++++++++++++------------------- 1 file changed, 72 insertions(+), 63 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 653ab0dbfa..6727155c38 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -787,41 +787,45 @@ class FederationHandler(BaseHandler): if missing_auth: logger.debug("Missing auth: %s", missing_auth) # If we don't have all the auth events, we need to get them. - remote_auth_chain = yield self.replication_layer.get_event_auth( - origin, event.room_id, event.event_id - ) + try: + remote_auth_chain = yield self.replication_layer.get_event_auth( + origin, event.room_id, event.event_id + ) - seen_remotes = yield self.store.have_events( - [e.event_id for e in remote_auth_chain] - ) + seen_remotes = yield self.store.have_events( + [e.event_id for e in remote_auth_chain] + ) - for e in remote_auth_chain: - if e.event_id in seen_remotes.keys(): - continue + for e in remote_auth_chain: + if e.event_id in seen_remotes.keys(): + continue - if e.event_id == event.event_id: - continue + if e.event_id == event.event_id: + continue - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in remote_auth_chain - if e.event_id in auth_ids - } - e.internal_metadata.outlier = True + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in remote_auth_chain + if e.event_id in auth_ids + } + e.internal_metadata.outlier = True - logger.debug( - "do_auth %s missing_auth: %s", - event.event_id, e.event_id - ) - yield self._handle_new_event( - origin, e, auth_events=auth - ) + logger.debug( + "do_auth %s missing_auth: %s", + event.event_id, e.event_id + ) + yield self._handle_new_event( + origin, e, auth_events=auth + ) - if e.event_id in event_auth_events: - auth_events[(e.type, e.state_key)] = e - except AuthError: - pass + if e.event_id in event_auth_events: + auth_events[(e.type, e.state_key)] = e + except AuthError: + pass + except: + # FIXME: + logger.exception("Failed to get auth chain") # FIXME: Assumes we have and stored all the state for all the # prev_events @@ -836,47 +840,52 @@ class FederationHandler(BaseHandler): auth_ids = self.auth.compute_auth_events(event, context) local_auth_chain = yield self.store.get_auth_chain(auth_ids) - # 2. Get remote difference. - result = yield self.replication_layer.query_auth( - origin, - event.room_id, - event.event_id, - local_auth_chain, - ) + try: + # 2. Get remote difference. + result = yield self.replication_layer.query_auth( + origin, + event.room_id, + event.event_id, + local_auth_chain, + ) - seen_remotes = yield self.store.have_events( - [e.event_id for e in result["auth_chain"]] - ) + seen_remotes = yield self.store.have_events( + [e.event_id for e in result["auth_chain"]] + ) - # 3. Process any remote auth chain events we haven't seen. - for ev in result["auth_chain"]: - if ev.event_id in seen_remotes.keys(): - continue + # 3. Process any remote auth chain events we haven't seen. + for ev in result["auth_chain"]: + if ev.event_id in seen_remotes.keys(): + continue - if ev.event_id == event.event_id: - continue + if ev.event_id == event.event_id: + continue - try: - auth_ids = [e_id for e_id, _ in ev.auth_events] - auth = { - (e.type, e.state_key): e for e in result["auth_chain"] - if e.event_id in auth_ids - } - ev.internal_metadata.outlier = True + try: + auth_ids = [e_id for e_id, _ in ev.auth_events] + auth = { + (e.type, e.state_key): e for e in result["auth_chain"] + if e.event_id in auth_ids + } + ev.internal_metadata.outlier = True - logger.debug( - "do_auth %s different_auth: %s", - event.event_id, e.event_id - ) + logger.debug( + "do_auth %s different_auth: %s", + event.event_id, e.event_id + ) - yield self._handle_new_event( - origin, ev, auth_events=auth - ) + yield self._handle_new_event( + origin, ev, auth_events=auth + ) + + if ev.event_id in event_auth_events: + auth_events[(ev.type, ev.state_key)] = ev + except AuthError: + pass - if ev.event_id in event_auth_events: - auth_events[(ev.type, ev.state_key)] = ev - except AuthError: - pass + except: + # FIXME: + logger.exception("Failed to query auth chain") # 4. Look at rejects and their proofs. # TODO. -- cgit 1.4.1 From 7dd1c5c542d58464085b11ababe746c6a60515ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 16:12:04 +0000 Subject: Neaten the handling of state and auth_chain up a bit --- synapse/handlers/federation.py | 57 ++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 27 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6727155c38..86953bf8c8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -30,6 +30,7 @@ from synapse.types import UserID from twisted.internet import defer +import itertools import logging @@ -112,14 +113,6 @@ class FederationHandler(BaseHandler): logger.debug("Event: %s", event) - event_ids = set() - if state: - event_ids |= {e.event_id for e in state} - if auth_chain: - event_ids |= {e.event_id for e in auth_chain} - - seen_ids = (yield self.store.have_events(event_ids)).keys() - # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work current_state = None @@ -131,27 +124,37 @@ class FederationHandler(BaseHandler): logger.debug("Got event for room we're not in.") current_state = state + event_ids = set() + if state: + event_ids |= {e.event_id for e in state} + if auth_chain: + event_ids |= {e.event_id for e in auth_chain} + + seen_ids = (yield self.store.have_events(event_ids)).keys() + if state and auth_chain is not None: - for list_of_pdus in [auth_chain, state]: - for e in list_of_pdus: - if e.event_id in seen_ids: - continue + # If we have any state or auth_chain given to us by the replication + # layer, then we should handle them (if we haven't before.) + for e in itertools.chain(auth_chain, state): + if e.event_id in seen_ids: + continue - e.internal_metadata.outlier = True - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - yield self._handle_new_event( - origin, e, auth_events=auth - ) - except: - logger.exception( - "Failed to handle state event %s", - e.event_id, - ) + e.internal_metadata.outlier = True + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event( + origin, e, auth_events=auth + ) + seen_ids.add(e.event_id) + except: + logger.exception( + "Failed to handle state event %s", + e.event_id, + ) try: yield self._handle_new_event( -- cgit 1.4.1 From c0462dbf1533f285f632dcb0a74c0ef0c3e2475b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Feb 2015 10:16:51 +0000 Subject: Rearrange persist_event so that do all the queries that need to be done before returning early if we have already persisted that event. --- synapse/events/__init__.py | 2 +- synapse/handlers/federation.py | 2 + synapse/storage/__init__.py | 145 +++++++++++++++++++++-------------------- 3 files changed, 77 insertions(+), 72 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index bf07951027..8f0c6e959f 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -77,7 +77,7 @@ class EventBase(object): return self.content["membership"] def is_state(self): - return hasattr(self, "state_key") + return hasattr(self, "state_key") and self.state_key is not None def get_dict(self): d = dict(self._event_dict) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 86953bf8c8..0876589e31 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -515,6 +515,8 @@ class FederationHandler(BaseHandler): "Failed to get destination from event %s", s.event_id ) + destinations.remove(origin) + logger.debug( "on_send_join_request: Sending event: %s, signatures: %s", event.event_id, diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 93ab26fcd1..30ce378900 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -163,19 +163,70 @@ class DataStore(RoomMemberStore, RoomStore, def _persist_event_txn(self, txn, event, context, backfilled, stream_ordering=None, is_new_state=True, current_state=None): - if event.type == EventTypes.Member: - self._store_room_member_txn(txn, event) - elif event.type == EventTypes.Feedback: - self._store_feedback_txn(txn, event) - elif event.type == EventTypes.Name: - self._store_room_name_txn(txn, event) - elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, event) - elif event.type == EventTypes.Redaction: - self._store_redaction(txn, event) + + # We purposefully do this first since if we include a `current_state` + # key, we *want* to update the `current_state_events` table + if current_state: + txn.execute( + "DELETE FROM current_state_events WHERE room_id = ?", + (event.room_id,) + ) + + for s in current_state: + self._simple_insert_txn( + txn, + "current_state_events", + { + "event_id": s.event_id, + "room_id": s.room_id, + "type": s.type, + "state_key": s.state_key, + }, + or_replace=True, + ) + + if event.is_state() and is_new_state: + if not backfilled and not context.rejected: + self._simple_insert_txn( + txn, + table="state_forward_extremities", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + or_replace=True, + ) + + for prev_state_id, _ in event.prev_state: + self._simple_delete_txn( + txn, + table="state_forward_extremities", + keyvalues={ + "event_id": prev_state_id, + } + ) outlier = event.internal_metadata.is_outlier() + if not outlier: + self._store_state_groups_txn(txn, event, context) + + self._update_min_depth_for_room_txn( + txn, + event.room_id, + event.depth + ) + + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + have_persisted = self._simple_select_one_onecol_txn( txn, table="event_json", @@ -209,6 +260,17 @@ class DataStore(RoomMemberStore, RoomStore, ) return + if event.type == EventTypes.Member: + self._store_room_member_txn(txn, event) + elif event.type == EventTypes.Feedback: + self._store_feedback_txn(txn, event) + elif event.type == EventTypes.Name: + self._store_room_name_txn(txn, event) + elif event.type == EventTypes.Topic: + self._store_room_topic_txn(txn, event) + elif event.type == EventTypes.Redaction: + self._store_redaction(txn, event) + event_dict = { k: v for k, v in event.get_dict().items() @@ -273,41 +335,10 @@ class DataStore(RoomMemberStore, RoomStore, ) raise _RollbackButIsFineException("_persist_event") - self._handle_prev_events( - txn, - outlier=outlier, - event_id=event.event_id, - prev_events=event.prev_events, - room_id=event.room_id, - ) - - if not outlier: - self._store_state_groups_txn(txn, event, context) - if context.rejected: self._store_rejections_txn(txn, event.event_id, context.rejected) - if current_state: - txn.execute( - "DELETE FROM current_state_events WHERE room_id = ?", - (event.room_id,) - ) - - for s in current_state: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": s.event_id, - "room_id": s.room_id, - "type": s.type, - "state_key": s.state_key, - }, - or_replace=True, - ) - - is_state = hasattr(event, "state_key") and event.state_key is not None - if is_state: + if event.is_state(): vals = { "event_id": event.event_id, "room_id": event.room_id, @@ -315,6 +346,7 @@ class DataStore(RoomMemberStore, RoomStore, "state_key": event.state_key, } + # TODO: How does this work with backfilling? if hasattr(event, "replaces_state"): vals["prev_state"] = event.replaces_state @@ -351,28 +383,6 @@ class DataStore(RoomMemberStore, RoomStore, or_ignore=True, ) - if not backfilled and not context.rejected: - self._simple_insert_txn( - txn, - table="state_forward_extremities", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - or_replace=True, - ) - - for prev_state_id, _ in event.prev_state: - self._simple_delete_txn( - txn, - table="state_forward_extremities", - keyvalues={ - "event_id": prev_state_id, - } - ) - for hash_alg, hash_base64 in event.hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_event_content_hash_txn( @@ -403,13 +413,6 @@ class DataStore(RoomMemberStore, RoomStore, txn, event.event_id, ref_alg, ref_hash_bytes ) - if not outlier: - self._update_min_depth_for_room_txn( - txn, - event.room_id, - event.depth - ) - def _store_redaction(self, txn, event): txn.execute( "INSERT OR IGNORE INTO redactions " -- cgit 1.4.1 From 650e32d45580ddd13826364291bda6760c014df9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Feb 2015 14:06:42 +0000 Subject: Change context.auth_events to what the auth_events would be bases on context.current_state, rather than based on the auth_events from the event. --- synapse/api/auth.py | 12 ++++++------ synapse/handlers/federation.py | 4 +++- synapse/state.py | 8 ++++++-- 3 files changed, 15 insertions(+), 9 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 3471afd7e7..7105ee21dc 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -358,7 +358,7 @@ class Auth(object): def add_auth_events(self, builder, context): yield run_on_reactor() - auth_ids = self.compute_auth_events(builder, context) + auth_ids = self.compute_auth_events(builder, context.current_state) auth_events_entries = yield self.store.add_event_hashes( auth_ids @@ -372,26 +372,26 @@ class Auth(object): if v.event_id in auth_ids } - def compute_auth_events(self, event, context): + def compute_auth_events(self, event, current_state): if event.type == EventTypes.Create: return [] auth_ids = [] key = (EventTypes.PowerLevels, "", ) - power_level_event = context.current_state.get(key) + power_level_event = current_state.get(key) if power_level_event: auth_ids.append(power_level_event.event_id) key = (EventTypes.JoinRules, "", ) - join_rule_event = context.current_state.get(key) + join_rule_event = current_state.get(key) key = (EventTypes.Member, event.user_id, ) - member_event = context.current_state.get(key) + member_event = current_state.get(key) key = (EventTypes.Create, "", ) - create_event = context.current_state.get(key) + create_event = current_state.get(key) if create_event: auth_ids.append(create_event.event_id) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0876589e31..2e2c23ef63 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -842,7 +842,9 @@ class FederationHandler(BaseHandler): logger.debug("Different auth: %s", different_auth) # 1. Get what we think is the auth chain. - auth_ids = self.auth.compute_auth_events(event, context) + auth_ids = self.auth.compute_auth_events( + event, context.current_state + ) local_auth_chain = yield self.store.get_auth_chain(auth_ids) try: diff --git a/synapse/state.py b/synapse/state.py index 6a6fb8aea0..695a5e7ac4 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -103,7 +103,9 @@ class StateHandler(object): context.state_group = None if hasattr(event, "auth_events") and event.auth_events: - auth_ids = zip(*event.auth_events)[0] + auth_ids = self.hs.get_auth().compute_auth_events( + event, context.current_state + ) context.auth_events = { k: v for k, v in context.current_state.items() @@ -149,7 +151,9 @@ class StateHandler(object): event.unsigned["replaces_state"] = replaces.event_id if hasattr(event, "auth_events") and event.auth_events: - auth_ids = zip(*event.auth_events)[0] + auth_ids = self.hs.get_auth().compute_auth_events( + event, context.current_state + ) context.auth_events = { k: v for k, v in context.current_state.items() -- cgit 1.4.1 From ae46f10fc5dba0f81518e3144ab8d9ed7a7d03bc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Feb 2015 16:28:12 +0000 Subject: Apply sanity to the transport client interface. Convert 'make_join' and 'send_join' to accept iterables of destinations --- synapse/api/errors.py | 8 +++- synapse/federation/federation_client.py | 82 ++++++++++++++++++++------------- synapse/federation/transaction_queue.py | 23 +++++++-- synapse/federation/transport/client.py | 42 +++++++---------- synapse/handlers/federation.py | 4 +- synapse/http/matrixfederationclient.py | 42 ++++++++++++++--- 6 files changed, 130 insertions(+), 71 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/api/errors.py b/synapse/api/errors.py index ad478aa6b7..5041828f18 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -39,7 +39,7 @@ class Codes(object): TOO_LARGE = "M_TOO_LARGE" -class CodeMessageException(Exception): +class CodeMessageException(RuntimeError): """An exception with integer code and message string attributes.""" def __init__(self, code, msg): @@ -227,3 +227,9 @@ class FederationError(RuntimeError): "affected": self.affected, "source": self.source if self.source else self.affected, } + + +class HttpResponseException(CodeMessageException): + def __init__(self, code, msg, response): + self.response = response + super(HttpResponseException, self).__init__(code, msg) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index d6b8c43916..eb36ec040b 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -19,6 +19,7 @@ from twisted.internet import defer from .federation_base import FederationBase from .units import Edu +from synapse.api.errors import CodeMessageException from synapse.util.logutils import log_function from synapse.events import FrozenEvent @@ -180,7 +181,8 @@ class FederationClient(FederationBase): pdu = yield self._check_sigs_and_hash(pdu) break - + except CodeMessageException: + raise except Exception as e: logger.info( "Failed to get PDU %s from %s because %s", @@ -264,45 +266,63 @@ class FederationClient(FederationBase): defer.returnValue(self.event_from_pdu_json(pdu_dict)) break - except Exception as e: - logger.warn("Failed to make_join via %s", destination) + except CodeMessageException: + raise + except RuntimeError as e: + logger.warn( + "Failed to make_join via %s: %s", + destination, e.message + ) + + raise RuntimeError("Failed to send to any server.") @defer.inlineCallbacks - def send_join(self, destination, pdu): - time_now = self._clock.time_msec() - _, content = yield self.transport_layer.send_join( - destination=destination, - room_id=pdu.room_id, - event_id=pdu.event_id, - content=pdu.get_pdu_json(time_now), - ) + def send_join(self, destinations, pdu): + for destination in destinations: + try: + time_now = self._clock.time_msec() + _, content = yield self.transport_layer.send_join( + destination=destination, + room_id=pdu.room_id, + event_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), + ) - logger.debug("Got content: %s", content) + logger.debug("Got content: %s", content) - state = [ - self.event_from_pdu_json(p, outlier=True) - for p in content.get("state", []) - ] + state = [ + self.event_from_pdu_json(p, outlier=True) + for p in content.get("state", []) + ] - auth_chain = [ - self.event_from_pdu_json(p, outlier=True) - for p in content.get("auth_chain", []) - ] + auth_chain = [ + self.event_from_pdu_json(p, outlier=True) + for p in content.get("auth_chain", []) + ] - signed_state = yield self._check_sigs_and_hash_and_fetch( - destination, state, outlier=True - ) + signed_state = yield self._check_sigs_and_hash_and_fetch( + destination, state, outlier=True + ) - signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True - ) + signed_auth = yield self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True + ) - auth_chain.sort(key=lambda e: e.depth) + auth_chain.sort(key=lambda e: e.depth) + + defer.returnValue({ + "state": signed_state, + "auth_chain": signed_auth, + }) + except CodeMessageException: + raise + except RuntimeError as e: + logger.warn( + "Failed to send_join via %s: %s", + destination, e.message + ) - defer.returnValue({ - "state": signed_state, - "auth_chain": signed_auth, - }) + raise RuntimeError("Failed to send to any server.") @defer.inlineCallbacks def send_invite(self, destination, room_id, event_id, pdu): diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 9d4f2c09a2..f38aeba7cc 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -19,6 +19,7 @@ from twisted.internet import defer from .persistence import TransactionActions from .units import Transaction +from synapse.api.errors import HttpResponseException from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext @@ -238,9 +239,14 @@ class TransactionQueue(object): del p["age_ts"] return data - code, response = yield self.transport_layer.send_transaction( - transaction, json_data_cb - ) + try: + response = yield self.transport_layer.send_transaction( + transaction, json_data_cb + ) + code = 200 + except HttpResponseException as e: + code = e.code + response = e.response logger.info("TX [%s] got %d response", destination, code) @@ -274,8 +280,7 @@ class TransactionQueue(object): pass logger.debug("TX [%s] Yielded to callbacks", destination) - - except Exception as e: + except RuntimeError as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. logger.warn( @@ -283,6 +288,14 @@ class TransactionQueue(object): destination, e, ) + except Exception as e: + # We capture this here as there as nothing actually listens + # for this finishing functions deferred. + logger.exception( + "TX [%s] Problem in _attempt_transaction: %s", + destination, + e, + ) self.set_retrying(destination, retry_interval) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 4cb1dea2de..8b137e7128 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -19,7 +19,6 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.util.logutils import log_function import logging -import json logger = logging.getLogger(__name__) @@ -129,7 +128,7 @@ class TransportLayerClient(object): # generated by the json_data_callback. json_data = transaction.get_dict() - code, response = yield self.client.put_json( + response = yield self.client.put_json( transaction.destination, path=PREFIX + "/send/%s/" % transaction.transaction_id, data=json_data, @@ -137,95 +136,86 @@ class TransportLayerClient(object): ) logger.debug( - "send_data dest=%s, txid=%s, got response: %d", - transaction.destination, transaction.transaction_id, code + "send_data dest=%s, txid=%s, got response: 200", + transaction.destination, transaction.transaction_id, ) - defer.returnValue((code, response)) + defer.returnValue(response) @defer.inlineCallbacks @log_function def make_query(self, destination, query_type, args, retry_on_dns_fail): path = PREFIX + "/query/%s" % query_type - response = yield self.client.get_json( + content = yield self.client.get_json( destination=destination, path=path, args=args, retry_on_dns_fail=retry_on_dns_fail, ) - defer.returnValue(response) + defer.returnValue(content) @defer.inlineCallbacks @log_function def make_join(self, destination, room_id, user_id, retry_on_dns_fail=True): path = PREFIX + "/make_join/%s/%s" % (room_id, user_id) - response = yield self.client.get_json( + content = yield self.client.get_json( destination=destination, path=path, retry_on_dns_fail=retry_on_dns_fail, ) - defer.returnValue(response) + defer.returnValue(content) @defer.inlineCallbacks @log_function def send_join(self, destination, room_id, event_id, content): path = PREFIX + "/send_join/%s/%s" % (room_id, event_id) - code, content = yield self.client.put_json( + response = yield self.client.put_json( destination=destination, path=path, data=content, ) - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_join", code) - - defer.returnValue(json.loads(content)) + defer.returnValue(response) @defer.inlineCallbacks @log_function def send_invite(self, destination, room_id, event_id, content): path = PREFIX + "/invite/%s/%s" % (room_id, event_id) - code, content = yield self.client.put_json( + response = yield self.client.put_json( destination=destination, path=path, data=content, ) - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_invite", code) - - defer.returnValue(json.loads(content)) + defer.returnValue(response) @defer.inlineCallbacks @log_function def get_event_auth(self, destination, room_id, event_id): path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id) - response = yield self.client.get_json( + content = yield self.client.get_json( destination=destination, path=path, ) - defer.returnValue(response) + defer.returnValue(content) @defer.inlineCallbacks @log_function def send_query_auth(self, destination, room_id, event_id, content): path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id) - code, content = yield self.client.post_json( + content = yield self.client.post_json( destination=destination, path=path, data=content, ) - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_invite", code) - - defer.returnValue(json.loads(content)) + defer.returnValue(content) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0876589e31..a968a87360 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -288,7 +288,7 @@ class FederationHandler(BaseHandler): logger.debug("Joining %s to %s", joinee, room_id) pdu = yield self.replication_layer.make_join( - target_host, + [target_host], room_id, joinee ) @@ -331,7 +331,7 @@ class FederationHandler(BaseHandler): new_event = builder.build() ret = yield self.replication_layer.send_join( - target_host, + [target_host], new_event ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index c7bf1b47b8..8559d06b7f 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -27,7 +27,9 @@ from synapse.util.logcontext import PreserveLoggingContext from syutil.jsonutil import encode_canonical_json -from synapse.api.errors import CodeMessageException, SynapseError, Codes +from synapse.api.errors import ( + SynapseError, Codes, HttpResponseException, +) from syutil.crypto.jsonsign import sign_json @@ -163,13 +165,12 @@ class MatrixFederationHttpClient(object): ) if 200 <= response.code < 300: - # We need to update the transactions table to say it was sent? pass else: # :'( # Update transactions table? - raise CodeMessageException( - response.code, response.phrase + raise HttpResponseException( + response.code, response.phrase, response ) defer.returnValue(response) @@ -238,11 +239,20 @@ class MatrixFederationHttpClient(object): headers_dict={"Content-Type": ["application/json"]}, ) + if 200 <= response.code < 300: + # We need to update the transactions table to say it was sent? + c_type = response.headers.getRawHeaders("Content-Type") + + if "application/json" not in c_type: + raise RuntimeError( + "Content-Type not application/json" + ) + logger.debug("Getting resp body") body = yield readBody(response) logger.debug("Got resp body") - defer.returnValue((response.code, body)) + defer.returnValue(json.loads(body)) @defer.inlineCallbacks def post_json(self, destination, path, data={}): @@ -275,11 +285,20 @@ class MatrixFederationHttpClient(object): headers_dict={"Content-Type": ["application/json"]}, ) + if 200 <= response.code < 300: + # We need to update the transactions table to say it was sent? + c_type = response.headers.getRawHeaders("Content-Type") + + if "application/json" not in c_type: + raise RuntimeError( + "Content-Type not application/json" + ) + logger.debug("Getting resp body") body = yield readBody(response) logger.debug("Got resp body") - defer.returnValue((response.code, body)) + defer.returnValue(json.loads(body)) @defer.inlineCallbacks def get_json(self, destination, path, args={}, retry_on_dns_fail=True): @@ -321,7 +340,18 @@ class MatrixFederationHttpClient(object): retry_on_dns_fail=retry_on_dns_fail ) + if 200 <= response.code < 300: + # We need to update the transactions table to say it was sent? + c_type = response.headers.getRawHeaders("Content-Type") + + if "application/json" not in c_type: + raise RuntimeError( + "Content-Type not application/json" + ) + + logger.debug("Getting resp body") body = yield readBody(response) + logger.debug("Got resp body") defer.returnValue(json.loads(body)) -- cgit 1.4.1 From e1515c3e91f2117adc3976b5e606728560ce9e96 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Feb 2015 13:43:28 +0000 Subject: Pass through list of room hosts from room alias query to federation so that it can retry against different room hosts --- synapse/federation/federation_client.py | 5 ++++- synapse/handlers/federation.py | 20 +++++++++++++------- synapse/handlers/room.py | 12 +++++------- 3 files changed, 22 insertions(+), 15 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index eb36ec040b..9923b3fc0d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -264,7 +264,9 @@ class FederationClient(FederationBase): logger.debug("Got response to make_join: %s", pdu_dict) - defer.returnValue(self.event_from_pdu_json(pdu_dict)) + defer.returnValue( + (destination, self.event_from_pdu_json(pdu_dict)) + ) break except CodeMessageException: raise @@ -313,6 +315,7 @@ class FederationClient(FederationBase): defer.returnValue({ "state": signed_state, "auth_chain": signed_auth, + "origin": destination, }) except CodeMessageException: raise diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 04a4689483..aba266c2bc 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -273,7 +273,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def do_invite_join(self, target_host, room_id, joinee, content, snapshot): + def do_invite_join(self, target_hosts, room_id, joinee, content, snapshot): """ Attempts to join the `joinee` to the room `room_id` via the server `target_host`. @@ -287,8 +287,8 @@ class FederationHandler(BaseHandler): """ logger.debug("Joining %s to %s", joinee, room_id) - pdu = yield self.replication_layer.make_join( - [target_host], + origin, pdu = yield self.replication_layer.make_join( + target_hosts, room_id, joinee ) @@ -330,11 +330,17 @@ class FederationHandler(BaseHandler): new_event = builder.build() + # Try the host we successfully got a response to /make_join/ + # request first. + target_hosts.remove(origin) + target_hosts.insert(0, origin) + ret = yield self.replication_layer.send_join( - [target_host], + target_hosts, new_event ) + origin = ret["origin"] state = ret["state"] auth_chain = ret["auth_chain"] auth_chain.sort(key=lambda e: e.depth) @@ -371,7 +377,7 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } yield self._handle_new_event( - target_host, e, auth_events=auth + origin, e, auth_events=auth ) except: logger.exception( @@ -391,7 +397,7 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } yield self._handle_new_event( - target_host, e, auth_events=auth + origin, e, auth_events=auth ) except: logger.exception( @@ -406,7 +412,7 @@ class FederationHandler(BaseHandler): } yield self._handle_new_event( - target_host, + origin, new_event, state=state, current_state=state, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 23821d321f..0369b907a5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -389,8 +389,6 @@ class RoomMemberHandler(BaseHandler): if not hosts: raise SynapseError(404, "No known servers") - host = hosts[0] - # If event doesn't include a display name, add one. yield self.distributor.fire( "collect_presencelike_data", joinee, content @@ -407,12 +405,12 @@ class RoomMemberHandler(BaseHandler): }) event, context = yield self._create_new_client_event(builder) - yield self._do_join(event, context, room_host=host, do_auth=True) + yield self._do_join(event, context, room_hosts=hosts, do_auth=True) defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def _do_join(self, event, context, room_host=None, do_auth=True): + def _do_join(self, event, context, room_hosts=None, do_auth=True): joinee = UserID.from_string(event.state_key) # room_id = RoomID.from_string(event.room_id, self.hs) room_id = event.room_id @@ -441,7 +439,7 @@ class RoomMemberHandler(BaseHandler): if is_host_in_room: should_do_dance = False - elif room_host: # TODO: Shouldn't this be remote_room_host? + elif room_hosts: # TODO: Shouldn't this be remote_room_host? should_do_dance = True else: # TODO(markjh): get prev_state from snapshot @@ -453,7 +451,7 @@ class RoomMemberHandler(BaseHandler): inviter = UserID.from_string(prev_state.user_id) should_do_dance = not self.hs.is_mine(inviter) - room_host = inviter.domain + room_hosts = [inviter.domain] else: # return the same error as join_room_alias does raise SynapseError(404, "No known servers") @@ -461,7 +459,7 @@ class RoomMemberHandler(BaseHandler): if should_do_dance: handler = self.hs.get_handlers().federation_handler yield handler.do_invite_join( - room_host, + room_hosts, room_id, event.user_id, event.get_dict()["content"], # FIXME To get a non-frozen dict -- cgit 1.4.1 From 3737329d9b40b54bc4205c5f5e7e0946c5e51614 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 Feb 2015 10:53:18 +0000 Subject: Handle the fact the list.remove raises if element doesn't exist --- synapse/handlers/federation.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index aba266c2bc..9d0ce9aa50 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -332,8 +332,11 @@ class FederationHandler(BaseHandler): # Try the host we successfully got a response to /make_join/ # request first. - target_hosts.remove(origin) - target_hosts.insert(0, origin) + try: + target_hosts.remove(origin) + target_hosts.insert(0, origin) + except ValueError: + pass ret = yield self.replication_layer.send_join( target_hosts, @@ -521,7 +524,7 @@ class FederationHandler(BaseHandler): "Failed to get destination from event %s", s.event_id ) - destinations.remove(origin) + destinations.discard(origin) logger.debug( "on_send_join_request: Sending event: %s, signatures: %s", @@ -1013,7 +1016,10 @@ class FederationHandler(BaseHandler): for e in missing_remotes: for e_id, _ in e.auth_events: if e_id in missing_remote_ids: - base_remote_rejected.remove(e) + try: + base_remote_rejected.remove(e) + except ValueError: + pass reason_map = {} -- cgit 1.4.1 From c78b5fb1f16957f5398517fa5a99dcb370992b26 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 Feb 2015 13:52:16 +0000 Subject: Make seen_ids a set --- synapse/handlers/federation.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9d0ce9aa50..1b709e5e24 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -130,7 +130,9 @@ class FederationHandler(BaseHandler): if auth_chain: event_ids |= {e.event_id for e in auth_chain} - seen_ids = (yield self.store.have_events(event_ids)).keys() + seen_ids = set( + (yield self.store.have_events(event_ids)).keys() + ) if state and auth_chain is not None: # If we have any state or auth_chain given to us by the replication -- cgit 1.4.1 From e890ce223c0c7798efc9961ca1e4c4870cfc7e2f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 Feb 2015 14:16:50 +0000 Subject: Don't query auth if the only difference is events that were rejected due to auth. --- synapse/handlers/federation.py | 128 +++++++++++++++++++++++------------------ 1 file changed, 72 insertions(+), 56 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1b709e5e24..b13b7c7701 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -791,12 +791,12 @@ class FederationHandler(BaseHandler): @log_function def do_auth(self, origin, event, context, auth_events): # Check if we have all the auth events. - res = yield self.store.have_events( + have_events = yield self.store.have_events( [e_id for e_id, _ in event.auth_events] ) event_auth_events = set(e_id for e_id, _ in event.auth_events) - seen_events = set(res.keys()) + seen_events = set(have_events.keys()) missing_auth = event_auth_events - seen_events @@ -839,6 +839,11 @@ class FederationHandler(BaseHandler): auth_events[(e.type, e.state_key)] = e except AuthError: pass + + have_events = yield self.store.have_events( + [e_id for e_id, _ in event.auth_events] + ) + seen_events = set(have_events.keys()) except: # FIXME: logger.exception("Failed to get auth chain") @@ -852,64 +857,75 @@ class FederationHandler(BaseHandler): # Do auth conflict res. logger.debug("Different auth: %s", different_auth) - # 1. Get what we think is the auth chain. - auth_ids = self.auth.compute_auth_events( - event, context.current_state - ) - local_auth_chain = yield self.store.get_auth_chain(auth_ids) - - try: - # 2. Get remote difference. - result = yield self.replication_layer.query_auth( - origin, - event.room_id, - event.event_id, - local_auth_chain, - ) - - seen_remotes = yield self.store.have_events( - [e.event_id for e in result["auth_chain"]] + # Only do auth resolution if we have something new to say. + # We can't rove an auth failure. + do_resolution = False + for e_id in different_auth: + if e_id in have_events: + if have_events[e_id] != RejectedReason.AUTH_ERROR: + do_resolution = True + break + + if do_resolution: + # 1. Get what we think is the auth chain. + auth_ids = self.auth.compute_auth_events( + event, context.current_state ) + local_auth_chain = yield self.store.get_auth_chain(auth_ids) - # 3. Process any remote auth chain events we haven't seen. - for ev in result["auth_chain"]: - if ev.event_id in seen_remotes.keys(): - continue - - if ev.event_id == event.event_id: - continue - - try: - auth_ids = [e_id for e_id, _ in ev.auth_events] - auth = { - (e.type, e.state_key): e for e in result["auth_chain"] - if e.event_id in auth_ids - } - ev.internal_metadata.outlier = True - - logger.debug( - "do_auth %s different_auth: %s", - event.event_id, e.event_id - ) + try: + # 2. Get remote difference. + result = yield self.replication_layer.query_auth( + origin, + event.room_id, + event.event_id, + local_auth_chain, + ) - yield self._handle_new_event( - origin, ev, auth_events=auth - ) + seen_remotes = yield self.store.have_events( + [e.event_id for e in result["auth_chain"]] + ) - if ev.event_id in event_auth_events: - auth_events[(ev.type, ev.state_key)] = ev - except AuthError: - pass + # 3. Process any remote auth chain events we haven't seen. + for ev in result["auth_chain"]: + if ev.event_id in seen_remotes.keys(): + continue + + if ev.event_id == event.event_id: + continue + + try: + auth_ids = [e_id for e_id, _ in ev.auth_events] + auth = { + (e.type, e.state_key): e + for e in result["auth_chain"] + if e.event_id in auth_ids + } + ev.internal_metadata.outlier = True + + logger.debug( + "do_auth %s different_auth: %s", + event.event_id, e.event_id + ) + + yield self._handle_new_event( + origin, ev, auth_events=auth + ) + + if ev.event_id in event_auth_events: + auth_events[(ev.type, ev.state_key)] = ev + except AuthError: + pass - except: - # FIXME: - logger.exception("Failed to query auth chain") + except: + # FIXME: + logger.exception("Failed to query auth chain") - # 4. Look at rejects and their proofs. - # TODO. + # 4. Look at rejects and their proofs. + # TODO. - context.current_state.update(auth_events) - context.state_group = None + context.current_state.update(auth_events) + context.state_group = None try: self.auth.check(event, auth_events=auth_events) @@ -1028,9 +1044,9 @@ class FederationHandler(BaseHandler): for e in base_remote_rejected: reason = yield self.store.get_rejection_reason(e.event_id) if reason is None: - # FIXME: ERRR?! - logger.warn("Could not find reason for %s", e.event_id) - raise RuntimeError("Could not find reason for %s" % e.event_id) + # TODO: e is not in the current state, so we should + # construct some proof of that. + continue reason_map[e.event_id] = reason -- cgit 1.4.1 From cc0532a4bf328478d508242d830433a33c4f22b2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 Feb 2015 15:16:26 +0000 Subject: Explicitly list the RejectedReasons that we can prove --- synapse/handlers/federation.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b13b7c7701..0f9c82fd06 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -860,9 +860,14 @@ class FederationHandler(BaseHandler): # Only do auth resolution if we have something new to say. # We can't rove an auth failure. do_resolution = False + + provable = [ + RejectedReason.NOT_ANCESTOR, RejectedReason.NOT_ANCESTOR, + ] + for e_id in different_auth: if e_id in have_events: - if have_events[e_id] != RejectedReason.AUTH_ERROR: + if have_events[e_id] in provable: do_resolution = True break -- cgit 1.4.1 From ddb816cf60ca1b0c0f9bfab5df233a010ac309a3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Feb 2015 15:44:28 +0000 Subject: Don't unfreeze when using FreezeEvent.get_dict, as we are using a JSONEncoder that understands FrozenDict --- synapse/events/__init__.py | 4 ---- synapse/federation/persistence.py | 7 ++++--- synapse/handlers/federation.py | 15 ++++++++++++++- synapse/handlers/room.py | 4 +++- synapse/storage/__init__.py | 11 +++++++++-- synapse/util/frozenutils.py | 8 ++++++-- 6 files changed, 36 insertions(+), 13 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 8f0c6e959f..f4ec8cd18c 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -140,10 +140,6 @@ class FrozenEvent(EventBase): return e - def get_dict(self): - # We need to unfreeze what we return - return unfreeze(super(FrozenEvent, self).get_dict()) - def __str__(self): return self.__repr__() diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 8a1afc0ca5..76a9dcd777 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -23,7 +23,8 @@ from twisted.internet import defer from synapse.util.logutils import log_function -import simplejson as json +from syutil.jsonutil import encode_canonical_json + import logging @@ -70,7 +71,7 @@ class TransactionActions(object): transaction.transaction_id, transaction.origin, code, - json.dumps(response) + encode_canonical_json(response) ) @defer.inlineCallbacks @@ -100,5 +101,5 @@ class TransactionActions(object): transaction.transaction_id, transaction.destination, response_code, - json.dumps(response_dict) + encode_canonical_json(response_dict) ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0f9c82fd06..e36f0945ef 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -23,6 +23,7 @@ from synapse.api.errors import ( from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor +from synapse.util.frozenutils import unfreeze from synapse.crypto.event_signing import ( compute_event_signature, add_hashes_and_signatures, ) @@ -311,9 +312,12 @@ class FederationHandler(BaseHandler): self.room_queues[room_id] = [] builder = self.event_builder_factory.new( - event.get_pdu_json() + unfreeze(event.get_pdu_json()) ) + logger.info("Builder: %s", builder.get_pdu_json()) + logger.info("Content: %s", content) + handled_events = set() try: @@ -324,14 +328,21 @@ class FederationHandler(BaseHandler): if not hasattr(event, "signatures"): builder.signatures = {} + logger.info("Content befhahs: %s", builder.content) + add_hashes_and_signatures( builder, self.hs.hostname, self.hs.config.signing_key[0], ) + logger.info("Content aftet hah: %s", builder.content) + logger.info("Content pdu json: %s", builder.get_pdu_json()) + new_event = builder.build() + logger.info("Content after build: %s", new_event.content) + # Try the host we successfully got a response to /make_join/ # request first. try: @@ -340,6 +351,7 @@ class FederationHandler(BaseHandler): except ValueError: pass + logger.info(new_event.content) ret = yield self.replication_layer.send_join( target_hosts, new_event @@ -485,6 +497,7 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False + logger.info(event.content) context = yield self._handle_new_event(origin, event) logger.debug( diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 0369b907a5..c685991a9f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -403,7 +403,9 @@ class RoomMemberHandler(BaseHandler): "membership": Membership.JOIN, "content": content, }) + logger.info("Before build: %s", builder.get_pdu_json()) event, context = yield self._create_new_client_event(builder) + logger.info("After build: %s", event.get_dict()) yield self._do_join(event, context, room_hosts=hosts, do_auth=True) @@ -462,7 +464,7 @@ class RoomMemberHandler(BaseHandler): room_hosts, room_id, event.user_id, - event.get_dict()["content"], # FIXME To get a non-frozen dict + event.content, # FIXME To get a non-frozen dict context ) else: diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 924ea89035..94603ef826 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -298,12 +298,16 @@ class DataStore(RoomMemberStore, RoomStore, or_replace=True, ) + content = encode_canonical_json( + event.content + ).decode("UTF-8") + vals = { "topological_ordering": event.depth, "event_id": event.event_id, "type": event.type, "room_id": event.room_id, - "content": json.dumps(event.get_dict()["content"]), + "content": content, "processed": True, "outlier": outlier, "depth": event.depth, @@ -323,7 +327,10 @@ class DataStore(RoomMemberStore, RoomStore, "prev_events", ] } - vals["unrecognized_keys"] = json.dumps(unrec) + + vals["unrecognized_keys"] = encode_canonical_json( + unrec + ).decode("UTF-8") try: self._simple_insert_txn( diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py index a13a2015e4..9e10d37aec 100644 --- a/synapse/util/frozenutils.py +++ b/synapse/util/frozenutils.py @@ -21,6 +21,9 @@ def freeze(o): if t is dict: return frozendict({k: freeze(v) for k, v in o.items()}) + if t is frozendict: + return o + if t is str or t is unicode: return o @@ -33,10 +36,11 @@ def freeze(o): def unfreeze(o): - if isinstance(o, frozendict) or isinstance(o, dict): + t = type(o) + if t is dict or t is frozendict: return dict({k: unfreeze(v) for k, v in o.items()}) - if isinstance(o, basestring): + if t is str or t is unicode: return o try: -- cgit 1.4.1 From ed877d658500b736e7f7192fbeb64aac444ef4d8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Feb 2015 16:50:46 +0000 Subject: Remove debug logging --- synapse/handlers/federation.py | 10 ---------- synapse/handlers/room.py | 2 -- 2 files changed, 12 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index e36f0945ef..0c35317bb8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -315,9 +315,6 @@ class FederationHandler(BaseHandler): unfreeze(event.get_pdu_json()) ) - logger.info("Builder: %s", builder.get_pdu_json()) - logger.info("Content: %s", content) - handled_events = set() try: @@ -328,21 +325,14 @@ class FederationHandler(BaseHandler): if not hasattr(event, "signatures"): builder.signatures = {} - logger.info("Content befhahs: %s", builder.content) - add_hashes_and_signatures( builder, self.hs.hostname, self.hs.config.signing_key[0], ) - logger.info("Content aftet hah: %s", builder.content) - logger.info("Content pdu json: %s", builder.get_pdu_json()) - new_event = builder.build() - logger.info("Content after build: %s", new_event.content) - # Try the host we successfully got a response to /make_join/ # request first. try: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index c685991a9f..914742d913 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -403,9 +403,7 @@ class RoomMemberHandler(BaseHandler): "membership": Membership.JOIN, "content": content, }) - logger.info("Before build: %s", builder.get_pdu_json()) event, context = yield self._create_new_client_event(builder) - logger.info("After build: %s", event.get_dict()) yield self._do_join(event, context, room_hosts=hosts, do_auth=True) -- cgit 1.4.1 From 76935078d1983aec9966f2eb245e4186dcee0386 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Feb 2015 16:51:22 +0000 Subject: Remove more debug logging --- synapse/handlers/federation.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0c35317bb8..77c81fe2da 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -341,7 +341,6 @@ class FederationHandler(BaseHandler): except ValueError: pass - logger.info(new_event.content) ret = yield self.replication_layer.send_join( target_hosts, new_event @@ -487,7 +486,6 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - logger.info(event.content) context = yield self._handle_new_event(origin, event) logger.debug( -- cgit 1.4.1