diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/_base.py | 23 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 99 | ||||
-rw-r--r-- | synapse/handlers/room.py | 129 |
3 files changed, 167 insertions, 84 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 3f07b5aa4a..78df9ac53e 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - +from twisted.internet import defer class BaseHandler(object): @@ -26,3 +26,24 @@ class BaseHandler(object): self.state_handler = hs.get_state_handler() self.distributor = hs.get_distributor() self.hs = hs + + +class BaseRoomHandler(BaseHandler): + + @defer.inlineCallbacks + def _on_new_room_event(self, event, snapshot, extra_destinations=[]): + snapshot.fill_out_prev_events(event) + + store_id = yield self.store.persist_event(event) + + destinations = set(extra_destinations) + # Send a PDU to all hosts who have joined the room. + destinations.update((yield self.store.get_joined_hosts_for_room( + event.room_id + ))) + event.destinations = list(destinations) + + self.notifier.on_new_room_event(event, store_id) + + federation_handler = self.hs.get_handlers().federation_handler + yield federation_handler.handle_new_event(event, snapshot) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bfc1ab86f2..7253f56322 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -20,6 +20,9 @@ from ._base import BaseHandler from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent from synapse.api.constants import Membership from synapse.util.logutils import log_function +from synapse.federation.pdu_codec import PduCodec + +from synapse.api.errors import AuthError from twisted.internet import defer @@ -30,8 +33,14 @@ logger = logging.getLogger(__name__) class FederationHandler(BaseHandler): + """Handles events that originated from federation. + Responsible for: + a) handling received Pdus before handing them on as Events to the rest + of the home server (including auth and state conflict resoultion) + b) converting events that were produced by local clients that may need + to be sent to remote home servers. + """ - """Handles events that originated from federation.""" def __init__(self, hs): super(FederationHandler, self).__init__(hs) @@ -42,9 +51,67 @@ class FederationHandler(BaseHandler): self.waiting_for_join_list = {} + self.store = hs.get_datastore() + self.replication_layer = hs.get_replication_layer() + self.state_handler = hs.get_state_handler() + # self.auth_handler = gs.get_auth_handler() + self.server_name = hs.hostname + + self.lock_manager = hs.get_room_lock_manager() + + self.replication_layer.set_handler(self) + + self.pdu_codec = PduCodec(hs) + @log_function @defer.inlineCallbacks - def on_receive(self, event, is_new_state, backfilled): + def handle_new_event(self, event, snapshot): + """ Takes in an event from the client to server side, that has already + been authed and handled by the state module, and sends it to any + remote home servers that may be interested. + + Args: + event + snapshot (.storage.Snapshot): THe snapshot the event happened after + + Returns: + Deferred: Resolved when it has successfully been queued for + processing. + """ + + pdu = self.pdu_codec.pdu_from_event(event) + + if not hasattr(pdu, "destinations") or not pdu.destinations: + pdu.destinations = [] + + yield self.replication_layer.send_pdu(pdu) + + @log_function + def get_state_for_room(self, destination, room_id): + return self.replication_layer.get_state_for_context( + destination, room_id + ) + + @log_function + @defer.inlineCallbacks + def on_receive_pdu(self, pdu, backfilled): + """ Called by the ReplicationLayer when we have a new pdu. We need to + do auth checks and put it throught the StateHandler. + """ + event = self.pdu_codec.event_from_pdu(pdu) + + with (yield self.lock_manager.lock(pdu.context)): + if event.is_state and not backfilled: + is_new_state = yield self.state_handler.handle_new_state( + pdu + ) + if not is_new_state: + return + else: + is_new_state = False + # TODO: Implement something in federation that allows us to + # respond to PDU. + if hasattr(event, "state_key") and not is_new_state: logger.debug("Ignoring old state.") return @@ -86,8 +153,7 @@ class FederationHandler(BaseHandler): if not room: # Huh, let's try and get the current state try: - federation = self.hs.get_federation() - yield federation.get_state_for_room( + yield self.get_state_for_room( event.origin, event.room_id ) @@ -119,11 +185,10 @@ class FederationHandler(BaseHandler): "user_joined_room", user=user, room_id=event.room_id ) - @log_function @defer.inlineCallbacks def backfill(self, dest, room_id, limit): - events = yield self.hs.get_federation().backfill(dest, room_id, limit) + events = yield self._backfill(dest, room_id, limit) for event in events: try: @@ -133,10 +198,23 @@ class FederationHandler(BaseHandler): defer.returnValue(events) + @defer.inlineCallbacks + def _backfill(self, dest, room_id, limit): + pdus = yield self.replication_layer.backfill(dest, room_id, limit) + + if not pdus: + defer.returnValue([]) + + events = [ + self.pdu_codec.event_from_pdu(pdu) + for pdu in pdus + ] + + defer.returnValue(events) + @log_function @defer.inlineCallbacks - def do_invite_join(self, target_host, room_id, joinee, content): - federation = self.hs.get_federation() + def do_invite_join(self, target_host, room_id, joinee, content, snapshot): hosts = yield self.store.get_joined_hosts_for_room(room_id) if self.hs.hostname in hosts: @@ -146,7 +224,7 @@ class FederationHandler(BaseHandler): # First get current state to see if we are already joined. try: - yield federation.get_state_for_room(target_host, room_id) + yield self.get_state_for_room(target_host, room_id) hosts = yield self.store.get_joined_hosts_for_room(room_id) if self.hs.hostname in hosts: @@ -166,7 +244,8 @@ class FederationHandler(BaseHandler): new_event.destinations = [target_host] - yield federation.handle_new_event(new_event) + snapshot.fill_out_prev_events(new_event) + yield self.handle_new_event(new_event, snapshot) # TODO (erikj): Time out here. d = defer.Deferred() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5a4569ac95..7b4b051888 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -25,14 +25,14 @@ from synapse.api.events.room import ( from synapse.api.streams.event import EventStream, EventsStreamData from synapse.handlers.presence import PresenceStreamData from synapse.util import stringutils -from ._base import BaseHandler +from ._base import BaseRoomHandler import logging logger = logging.getLogger(__name__) -class MessageHandler(BaseHandler): +class MessageHandler(BaseRoomHandler): def __init__(self, hs): super(MessageHandler, self).__init__(hs) @@ -84,20 +84,12 @@ class MessageHandler(BaseHandler): if stamp_event: event.content["hsob_ts"] = int(self.clock.time_msec()) - with (yield self.room_lock.lock(event.room_id)): - if not suppress_auth: - yield self.auth.check(event, raises=True) + snapshot = yield self.store.snapshot_room(event.room_id, event.user_id) - # store message in db - store_id = yield self.store.persist_event(event) + if not suppress_auth: + yield self.auth.check(event, snapshot, raises=True) - event.destinations = yield self.store.get_joined_hosts_for_room( - event.room_id - ) - - self.notifier.on_new_room_event(event, store_id) - - yield self.hs.get_federation().handle_new_event(event) + yield self._on_new_room_event(event, snapshot) @defer.inlineCallbacks def get_messages(self, user_id=None, room_id=None, pagin_config=None, @@ -134,23 +126,16 @@ class MessageHandler(BaseHandler): SynapseError if something went wrong. """ - with (yield self.room_lock.lock(event.room_id)): - yield self.auth.check(event, raises=True) + snapshot = yield self.store.snapshot_room(event.room_id, event.user_id) - if stamp_event: - event.content["hsob_ts"] = int(self.clock.time_msec()) + yield self.auth.check(event, snapshot, raises=True) - yield self.state_handler.handle_new_event(event) - - # store in db - store_id = yield self.store.persist_event(event) + if stamp_event: + event.content["hsob_ts"] = int(self.clock.time_msec()) - event.destinations = yield self.store.get_joined_hosts_for_room( - event.room_id - ) - self.notifier.on_new_room_event(event, store_id) + yield self.state_handler.handle_new_event(event, snapshot) - yield self.hs.get_federation().handle_new_event(event) + yield self._on_new_room_event(event, snapshot) @defer.inlineCallbacks def get_room_data(self, user_id=None, room_id=None, @@ -219,18 +204,12 @@ class MessageHandler(BaseHandler): if stamp_event: event.content["hsob_ts"] = int(self.clock.time_msec()) - with (yield self.room_lock.lock(event.room_id)): - yield self.auth.check(event, raises=True) - - # store message in db - store_id = yield self.store.persist_event(event) + snapshot = yield self.store.snapshot_room(event.room_id, event.user_id) - event.destinations = yield self.store.get_joined_hosts_for_room( - event.room_id - ) - yield self.hs.get_federation().handle_new_event(event) + yield self.auth.check(event, snapshot, raises=True) - self.notifier.on_new_room_event(event, store_id) + # store message in db + yield self._on_new_room_event(event, snapshot) @defer.inlineCallbacks def snapshot_all_rooms(self, user_id=None, pagin_config=None, @@ -312,7 +291,7 @@ class MessageHandler(BaseHandler): defer.returnValue(ret) -class RoomCreationHandler(BaseHandler): +class RoomCreationHandler(BaseRoomHandler): @defer.inlineCallbacks def create_room(self, user_id, room_id, config): @@ -383,6 +362,13 @@ class RoomCreationHandler(BaseHandler): content=config, ) + snapshot = yield self.store.snapshot_room( + room_id=room_id, + user_id=user_id, + state_type=RoomConfigEvent.TYPE, + state_key="", + ) + if room_alias: yield self.store.create_room_alias_association( room_id=room_id, @@ -390,10 +376,11 @@ class RoomCreationHandler(BaseHandler): servers=[self.hs.hostname], ) - yield self.state_handler.handle_new_event(config_event) + yield self.state_handler.handle_new_event(config_event, snapshot) # store_id = persist... - yield self.hs.get_federation().handle_new_event(config_event) + federation_handler = self.hs.get_handlers().federation_handler + yield federation_handler.handle_new_event(config_event, snapshot) # self.notifier.on_new_room_event(event, store_id) content = {"membership": Membership.JOIN} @@ -418,7 +405,7 @@ class RoomCreationHandler(BaseHandler): defer.returnValue(result) -class RoomMemberHandler(BaseHandler): +class RoomMemberHandler(BaseRoomHandler): # TODO(paul): This handler currently contains a messy conflation of # low-level API that works on UserID objects and so on, and REST-level # API that takes ID strings and returns pagination chunks. These concerns @@ -529,6 +516,11 @@ class RoomMemberHandler(BaseHandler): """ target_user_id = event.state_key + snapshot = yield self.store.snapshot_room( + event.room_id, event.user_id, + RoomMemberEvent.TYPE, target_user_id + ) + ## TODO(markjh): get prev state from snapshot. prev_state = yield self.store.get_room_member( target_user_id, event.room_id ) @@ -549,24 +541,22 @@ class RoomMemberHandler(BaseHandler): # if this HS is not currently in the room, i.e. we have to do the # invite/join dance. if event.membership == Membership.JOIN: - yield self._do_join(event, do_auth=do_auth) + yield self._do_join(event, snapshot, do_auth=do_auth) else: # This is not a JOIN, so we can handle it normally. if do_auth: - yield self.auth.check(event, raises=True) + yield self.auth.check(event, snapshot, raises=True) - prev_state = yield self.store.get_room_member( - target_user_id, event.room_id - ) if prev_state and prev_state.membership == event.membership: # double same action, treat this event as a NOOP. defer.returnValue({}) return - yield self.state_handler.handle_new_event(event) + yield self.state_handler.handle_new_event(event, snapshot) yield self._do_local_membership_update( event, membership=event.content["membership"], + snapshot=snapshot, ) defer.returnValue({"room_id": room_id}) @@ -596,12 +586,16 @@ class RoomMemberHandler(BaseHandler): content=content, ) - yield self._do_join(new_event, room_host=host, do_auth=True) + snapshot = yield self.store.snapshot_room( + room_id, joinee, RoomMemberEvent.TYPE, joinee + ) + + yield self._do_join(new_event, snapshot, room_host=host, do_auth=True) defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def _do_join(self, event, room_host=None, do_auth=True): + def _do_join(self, event, snapshot, room_host=None, do_auth=True): joinee = self.hs.parse_userid(event.state_key) # room_id = RoomID.from_string(event.room_id, self.hs) room_id = event.room_id @@ -623,6 +617,7 @@ class RoomMemberHandler(BaseHandler): elif room_host: should_do_dance = True else: + # TODO(markjh): get prev_state from snapshot prev_state = yield self.store.get_room_member( joinee.to_string(), room_id ) @@ -642,7 +637,7 @@ class RoomMemberHandler(BaseHandler): if should_do_dance: handler = self.hs.get_handlers().federation_handler have_joined = yield handler.do_invite_join( - room_host, room_id, event.user_id, event.content + room_host, room_id, event.user_id, event.content, snapshot ) # We want to do the _do_update inside the room lock. @@ -650,12 +645,13 @@ class RoomMemberHandler(BaseHandler): logger.debug("Doing normal join") if do_auth: - yield self.auth.check(event, raises=True) + yield self.auth.check(event, snapshot, raises=True) - yield self.state_handler.handle_new_event(event) + yield self.state_handler.handle_new_event(event, snapshot) yield self._do_local_membership_update( event, membership=event.content["membership"], + snapshot=snapshot, ) user = self.hs.parse_userid(event.user_id) @@ -699,39 +695,26 @@ class RoomMemberHandler(BaseHandler): defer.returnValue([r.room_id for r in rooms]) - @defer.inlineCallbacks - def _do_local_membership_update(self, event, membership): - # store membership - store_id = yield self.store.persist_event(event) - - # Send a PDU to all hosts who have joined the room. - destinations = yield self.store.get_joined_hosts_for_room( - event.room_id - ) + def _do_local_membership_update(self, event, membership, snapshot): + destinations = [] # If we're inviting someone, then we should also send it to that # HS. target_user_id = event.state_key if membership == Membership.INVITE: - host = UserID.from_string( - target_user_id, self.hs - ).domain + host = UserID.from_string(target_user_id, self.hs).domain destinations.append(host) # If we are joining a remote HS, include that. if membership == Membership.JOIN: - host = UserID.from_string( - target_user_id, self.hs - ).domain + host = UserID.from_string(target_user_id, self.hs).domain destinations.append(host) - event.destinations = list(set(destinations)) - - yield self.hs.get_federation().handle_new_event(event) - self.notifier.on_new_room_event(event, store_id) - + return self._on_new_room_event( + event, snapshot, extra_destinations=destinations + ) -class RoomListHandler(BaseHandler): +class RoomListHandler(BaseRoomHandler): @defer.inlineCallbacks def get_public_room_list(self): |