diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/api/auth.py | 16 | ||||
-rw-r--r-- | synapse/federation/handler.py | 157 | ||||
-rw-r--r-- | synapse/federation/persistence.py | 45 | ||||
-rw-r--r-- | synapse/federation/replication.py | 4 | ||||
-rw-r--r-- | synapse/handlers/_base.py | 21 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 142 | ||||
-rw-r--r-- | synapse/handlers/room.py | 118 | ||||
-rw-r--r-- | synapse/server.py | 5 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 157 | ||||
-rw-r--r-- | synapse/storage/_base.py | 12 | ||||
-rw-r--r-- | synapse/storage/feedback.py | 4 | ||||
-rw-r--r-- | synapse/storage/pdu.py | 22 | ||||
-rw-r--r-- | synapse/storage/room.py | 10 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 10 | ||||
-rw-r--r-- | synapse/storage/stream.py | 13 |
15 files changed, 373 insertions, 363 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 886e132e10..ef3604077d 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -33,7 +33,7 @@ class Auth(object): self.store = hs.get_datastore() @defer.inlineCallbacks - def check(self, event, raises=False): + def check(self, event, snapshot, raises=False): """ Checks if this event is correctly authed. Returns: @@ -48,7 +48,11 @@ class Auth(object): allowed = yield self.is_membership_change_allowed(event) defer.returnValue(allowed) else: - yield self.check_joined_room(event.room_id, event.user_id) + self._check_joined_room( + member=snapshot.membership_state, + user_id=snapshot.user_id, + room_id=snapshot.room_id, + ) defer.returnValue(True) else: raise AuthError(500, "Unknown event: %s" % event) @@ -66,14 +70,16 @@ class Auth(object): room_id=room_id, user_id=user_id ) - if not member or member.membership != Membership.JOIN: - raise AuthError(403, "User %s not in room %s" % - (user_id, room_id)) + self._check_joined_room(member, user_id, room_id) defer.returnValue(member) except AttributeError: pass defer.returnValue(None) + def _check_joined_room(self, member, user_id, room_id): + if not member or member.membership != Membership.JOIN: + raise AuthError(403, "User %s not in room %s" % (user_id, room_id)) + @defer.inlineCallbacks def is_membership_change_allowed(self, event): target_user_id = event.state_key diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py deleted file mode 100644 index 984c1558e9..0000000000 --- a/synapse/federation/handler.py +++ /dev/null @@ -1,157 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from twisted.internet import defer - -from .pdu_codec import PduCodec - -from synapse.api.errors import AuthError -from synapse.util.logutils import log_function - -import logging - - -logger = logging.getLogger(__name__) - - -class FederationEventHandler(object): - """ Responsible for: - a) handling received Pdus before handing them on as Events to the rest - of the home server (including auth and state conflict resoultion) - b) converting events that were produced by local clients that may need - to be sent to remote home servers. - """ - - def __init__(self, hs): - self.store = hs.get_datastore() - self.replication_layer = hs.get_replication_layer() - self.state_handler = hs.get_state_handler() - # self.auth_handler = gs.get_auth_handler() - self.event_handler = hs.get_handlers().federation_handler - self.server_name = hs.hostname - - self.lock_manager = hs.get_room_lock_manager() - - self.replication_layer.set_handler(self) - - self.pdu_codec = PduCodec(hs) - - @log_function - @defer.inlineCallbacks - def handle_new_event(self, event): - """ Takes in an event from the client to server side, that has already - been authed and handled by the state module, and sends it to any - remote home servers that may be interested. - - Args: - event - - Returns: - Deferred: Resolved when it has successfully been queued for - processing. - """ - yield self.fill_out_prev_events(event) - - pdu = self.pdu_codec.pdu_from_event(event) - - if not hasattr(pdu, "destinations") or not pdu.destinations: - pdu.destinations = [] - - yield self.replication_layer.send_pdu(pdu) - - @log_function - @defer.inlineCallbacks - def backfill(self, dest, room_id, limit): - pdus = yield self.replication_layer.backfill(dest, room_id, limit) - - if not pdus: - defer.returnValue([]) - - events = [ - self.pdu_codec.event_from_pdu(pdu) - for pdu in pdus - ] - - defer.returnValue(events) - - @log_function - def get_state_for_room(self, destination, room_id): - return self.replication_layer.get_state_for_context( - destination, room_id - ) - - @log_function - @defer.inlineCallbacks - def on_receive_pdu(self, pdu, backfilled): - """ Called by the ReplicationLayer when we have a new pdu. We need to - do auth checks and put it throught the StateHandler. - """ - event = self.pdu_codec.event_from_pdu(pdu) - - try: - with (yield self.lock_manager.lock(pdu.context)): - if event.is_state and not backfilled: - is_new_state = yield self.state_handler.handle_new_state( - pdu - ) - if not is_new_state: - return - else: - is_new_state = False - - yield self.event_handler.on_receive(event, is_new_state, backfilled) - - except AuthError: - # TODO: Implement something in federation that allows us to - # respond to PDU. - raise - - return - - @defer.inlineCallbacks - def _on_new_state(self, pdu, new_state_event): - # TODO: Do any store stuff here. Notifiy C2S about this new - # state. - - yield self.store.update_current_state( - pdu_id=pdu.pdu_id, - origin=pdu.origin, - context=pdu.context, - pdu_type=pdu.pdu_type, - state_key=pdu.state_key - ) - - yield self.event_handler.on_receive(new_state_event) - - @defer.inlineCallbacks - def fill_out_prev_events(self, event): - if hasattr(event, "prev_events"): - return - - results = yield self.store.get_latest_pdus_in_context( - event.room_id - ) - - es = [ - "%s@%s" % (p_id, origin) for p_id, origin, _ in results - ] - - event.prev_events = [e for e in es if e != event.event_id] - - if results: - event.depth = max([int(v) for _, _, v in results]) + 1 - else: - event.depth = 0 diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index e0e4de4e8c..2286fca821 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -25,7 +25,6 @@ from .units import Pdu from synapse.util.logutils import log_function -import copy import json import logging @@ -41,28 +40,6 @@ class PduActions(object): self.store = datastore @log_function - def persist_received(self, pdu): - """ Persists the given `Pdu` that was received from a remote home - server. - - Returns: - Deferred - """ - return self._persist(pdu) - - @defer.inlineCallbacks - @log_function - def persist_outgoing(self, pdu): - """ Persists the given `Pdu` that this home server created. - - Returns: - Deferred - """ - ret = yield self._persist(pdu) - - defer.returnValue(ret) - - @log_function def mark_as_processed(self, pdu): """ Persist the fact that we have fully processed the given `Pdu` @@ -143,28 +120,6 @@ class PduActions(object): depth=pdu.depth ) - @defer.inlineCallbacks - @log_function - def _persist(self, pdu): - kwargs = copy.copy(pdu.__dict__) - unrec_keys = copy.copy(pdu.unrecognized_keys) - del kwargs["content"] - kwargs["content_json"] = json.dumps(pdu.content) - kwargs["unrecognized_keys"] = json.dumps(unrec_keys) - - logger.debug("Persisting: %s", repr(kwargs)) - - if pdu.is_state: - ret = yield self.store.persist_state(**kwargs) - else: - ret = yield self.store.persist_pdu(**kwargs) - - yield self.store.update_min_depth_for_context( - pdu.context, pdu.depth - ) - - defer.returnValue(ret) - class TransactionActions(object): """ Defines persistence actions that relate to handling Transactions. diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index cf634a64b2..731cb74dd2 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -137,7 +137,7 @@ class ReplicationLayer(object): #yield self.pdu_actions.populate_previous_pdus(pdu) # Save *before* trying to send - yield self.pdu_actions.persist_outgoing(pdu) + yield self.store.persist_event(pdu=pdu) logger.debug("[%s] Persisted PDU", pdu.pdu_id) logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.pdu_id) @@ -450,7 +450,7 @@ class ReplicationLayer(object): logger.exception("Failed to get PDU") # Persist the Pdu, but don't mark it as processed yet. - yield self.pdu_actions.persist_received(pdu) + yield self.store.persist_event(pdu=pdu) if not backfilled: ret = yield self.handler.on_receive_pdu(pdu, backfilled=backfilled) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 3f07b5aa4a..00da47bb5d 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - +from twisted.internet import defer class BaseHandler(object): @@ -26,3 +26,22 @@ class BaseHandler(object): self.state_handler = hs.get_state_handler() self.distributor = hs.get_distributor() self.hs = hs + + +class BaseRoomHandler(BaseHandler): + + @defer.inlineCallbacks + def _on_new_room_event(self, event, snapshot, extra_destinations=[]): + store_id = yield self.store.persist_event(event) + + destinations = set(extra_destinations) + # Send a PDU to all hosts who have joined the room. + destinations.update((yield self.store.get_joined_hosts_for_room( + event.room_id + ))) + event.destinations = list(destinations) + + self.notifier.on_new_room_event(event, store_id) + + federation_handler = self.hs.get_handlers().federation_handler + yield federation_handler.handle_new_event(event, snapshot) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bfc1ab86f2..a2e935add0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -20,6 +20,9 @@ from ._base import BaseHandler from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent from synapse.api.constants import Membership from synapse.util.logutils import log_function +from synapse.federation.pdu_codec import PduCodec + +from synapse.api.errors import AuthError from twisted.internet import defer @@ -30,8 +33,14 @@ logger = logging.getLogger(__name__) class FederationHandler(BaseHandler): + """Handles events that originated from federation. + Responsible for: + a) handling received Pdus before handing them on as Events to the rest + of the home server (including auth and state conflict resoultion) + b) converting events that were produced by local clients that may need + to be sent to remote home servers. + """ - """Handles events that originated from federation.""" def __init__(self, hs): super(FederationHandler, self).__init__(hs) @@ -42,6 +51,112 @@ class FederationHandler(BaseHandler): self.waiting_for_join_list = {} + self.store = hs.get_datastore() + self.replication_layer = hs.get_replication_layer() + self.state_handler = hs.get_state_handler() + # self.auth_handler = gs.get_auth_handler() + self.server_name = hs.hostname + + self.lock_manager = hs.get_room_lock_manager() + + self.replication_layer.set_handler(self) + + self.pdu_codec = PduCodec(hs) + + @log_function + @defer.inlineCallbacks + def handle_new_event(self, event, snapshot): + """ Takes in an event from the client to server side, that has already + been authed and handled by the state module, and sends it to any + remote home servers that may be interested. + + Args: + event + snapshot (.storage.Snapshot): THe snapshot the event happened after + + Returns: + Deferred: Resolved when it has successfully been queued for + processing. + """ + yield self.fill_out_prev_events(event, snapshot) + + pdu = self.pdu_codec.pdu_from_event(event) + + if not hasattr(pdu, "destinations") or not pdu.destinations: + pdu.destinations = [] + + yield self.replication_layer.send_pdu(pdu) + + + @log_function + def get_state_for_room(self, destination, room_id): + return self.replication_layer.get_state_for_context( + destination, room_id + ) + + @log_function + @defer.inlineCallbacks + def on_receive_pdu(self, pdu, backfilled): + """ Called by the ReplicationLayer when we have a new pdu. We need to + do auth checks and put it throught the StateHandler. + """ + event = self.pdu_codec.event_from_pdu(pdu) + + try: + with (yield self.lock_manager.lock(pdu.context)): + if event.is_state and not backfilled: + is_new_state = yield self.state_handler.handle_new_state( + pdu + ) + if not is_new_state: + return + else: + is_new_state = False + + yield self.on_receive(event, is_new_state, backfilled) + + except AuthError: + # TODO: Implement something in federation that allows us to + # respond to PDU. + raise + + return + + @defer.inlineCallbacks + def _on_new_state(self, pdu, new_state_event): + # TODO: Do any store stuff here. Notifiy C2S about this new + # state. + + yield self.store.update_current_state( + pdu_id=pdu.pdu_id, + origin=pdu.origin, + context=pdu.context, + pdu_type=pdu.pdu_type, + state_key=pdu.state_key + ) + + yield self.on_receive(new_state_event) + + @defer.inlineCallbacks + def fill_out_prev_events(self, event, snapshot): + if hasattr(event, "prev_events"): + return + + results = snapshot.prev_pdus + + es = [ + "%s@%s" % (p_id, origin) for p_id, origin, _ in results + ] + + event.prev_events = [e for e in es if e != event.event_id] + + if results: + event.depth = max([int(v) for _, _, v in results]) + 1 + else: + event.depth = 0 + + + @log_function @defer.inlineCallbacks def on_receive(self, event, is_new_state, backfilled): @@ -86,8 +201,7 @@ class FederationHandler(BaseHandler): if not room: # Huh, let's try and get the current state try: - federation = self.hs.get_federation() - yield federation.get_state_for_room( + yield self.get_state_for_room( event.origin, event.room_id ) @@ -119,11 +233,10 @@ class FederationHandler(BaseHandler): "user_joined_room", user=user, room_id=event.room_id ) - @log_function @defer.inlineCallbacks def backfill(self, dest, room_id, limit): - events = yield self.hs.get_federation().backfill(dest, room_id, limit) + events = yield self._backfill(dest, room_id, limit) for event in events: try: @@ -133,10 +246,23 @@ class FederationHandler(BaseHandler): defer.returnValue(events) + @defer.inlineCallbacks + def _backfill(self, dest, room_id, limit): + pdus = yield self.replication_layer.backfill(dest, room_id, limit) + + if not pdus: + defer.returnValue([]) + + events = [ + self.pdu_codec.event_from_pdu(pdu) + for pdu in pdus + ] + + defer.returnValue(events) + @log_function @defer.inlineCallbacks def do_invite_join(self, target_host, room_id, joinee, content): - federation = self.hs.get_federation() hosts = yield self.store.get_joined_hosts_for_room(room_id) if self.hs.hostname in hosts: @@ -146,7 +272,7 @@ class FederationHandler(BaseHandler): # First get current state to see if we are already joined. try: - yield federation.get_state_for_room(target_host, room_id) + yield self.get_state_for_room(target_host, room_id) hosts = yield self.store.get_joined_hosts_for_room(room_id) if self.hs.hostname in hosts: @@ -166,7 +292,7 @@ class FederationHandler(BaseHandler): new_event.destinations = [target_host] - yield federation.handle_new_event(new_event) + yield self.handle_new_event(new_event) # TODO (erikj): Time out here. d = defer.Deferred() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5a4569ac95..9b02ce0dfd 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -25,14 +25,14 @@ from synapse.api.events.room import ( from synapse.api.streams.event import EventStream, EventsStreamData from synapse.handlers.presence import PresenceStreamData from synapse.util import stringutils -from ._base import BaseHandler +from ._base import BaseRoomHandler import logging logger = logging.getLogger(__name__) -class MessageHandler(BaseHandler): +class MessageHandler(BaseRoomHandler): def __init__(self, hs): super(MessageHandler, self).__init__(hs) @@ -84,20 +84,12 @@ class MessageHandler(BaseHandler): if stamp_event: event.content["hsob_ts"] = int(self.clock.time_msec()) - with (yield self.room_lock.lock(event.room_id)): - if not suppress_auth: - yield self.auth.check(event, raises=True) + snapshot = yield self.store.snapshot_room(event.room_id, event.user_id) - # store message in db - store_id = yield self.store.persist_event(event) + if not suppress_auth: + yield self.auth.check(event, snapshot, raises=True) - event.destinations = yield self.store.get_joined_hosts_for_room( - event.room_id - ) - - self.notifier.on_new_room_event(event, store_id) - - yield self.hs.get_federation().handle_new_event(event) + yield self._on_new_room_event(event, snapshot) @defer.inlineCallbacks def get_messages(self, user_id=None, room_id=None, pagin_config=None, @@ -134,23 +126,16 @@ class MessageHandler(BaseHandler): SynapseError if something went wrong. """ - with (yield self.room_lock.lock(event.room_id)): - yield self.auth.check(event, raises=True) + snapshot = yield self.store.snapshot_room(event.room_id, event.user_id) - if stamp_event: - event.content["hsob_ts"] = int(self.clock.time_msec()) + yield self.auth.check(event, snapshot, raises=True) - yield self.state_handler.handle_new_event(event) - - # store in db - store_id = yield self.store.persist_event(event) + if stamp_event: + event.content["hsob_ts"] = int(self.clock.time_msec()) - event.destinations = yield self.store.get_joined_hosts_for_room( - event.room_id - ) - self.notifier.on_new_room_event(event, store_id) + yield self.state_handler.handle_new_event(event) - yield self.hs.get_federation().handle_new_event(event) + yield self._on_new_room_event(event, snapshot) @defer.inlineCallbacks def get_room_data(self, user_id=None, room_id=None, @@ -219,18 +204,12 @@ class MessageHandler(BaseHandler): if stamp_event: event.content["hsob_ts"] = int(self.clock.time_msec()) - with (yield self.room_lock.lock(event.room_id)): - yield self.auth.check(event, raises=True) + snapshot = yield self.store.snapshot_room(event.room_id, event.user_id) - # store message in db - store_id = yield self.store.persist_event(event) + yield self.auth.check(event, snapshot, raises=True) - event.destinations = yield self.store.get_joined_hosts_for_room( - event.room_id - ) - yield self.hs.get_federation().handle_new_event(event) - - self.notifier.on_new_room_event(event, store_id) + # store message in db + yield self._on_new_room_event(event, snapshot) @defer.inlineCallbacks def snapshot_all_rooms(self, user_id=None, pagin_config=None, @@ -312,7 +291,7 @@ class MessageHandler(BaseHandler): defer.returnValue(ret) -class RoomCreationHandler(BaseHandler): +class RoomCreationHandler(BaseRoomHandler): @defer.inlineCallbacks def create_room(self, user_id, room_id, config): @@ -393,7 +372,8 @@ class RoomCreationHandler(BaseHandler): yield self.state_handler.handle_new_event(config_event) # store_id = persist... - yield self.hs.get_federation().handle_new_event(config_event) + federation_handler = self.hs.get_handlers().federation_handler + yield federation_handler.handle_new_event(config_event) # self.notifier.on_new_room_event(event, store_id) content = {"membership": Membership.JOIN} @@ -418,7 +398,7 @@ class RoomCreationHandler(BaseHandler): defer.returnValue(result) -class RoomMemberHandler(BaseHandler): +class RoomMemberHandler(BaseRoomHandler): # TODO(paul): This handler currently contains a messy conflation of # low-level API that works on UserID objects and so on, and REST-level # API that takes ID strings and returns pagination chunks. These concerns @@ -529,6 +509,11 @@ class RoomMemberHandler(BaseHandler): """ target_user_id = event.state_key + snapshot = yield self.store.snapshot_room( + event.room_id, event.user_id, + RoomMemberEvent.TYPE, target_user_id + ) + ## TODO(markjh): get prev state from snapshot. prev_state = yield self.store.get_room_member( target_user_id, event.room_id ) @@ -549,24 +534,22 @@ class RoomMemberHandler(BaseHandler): # if this HS is not currently in the room, i.e. we have to do the # invite/join dance. if event.membership == Membership.JOIN: - yield self._do_join(event, do_auth=do_auth) + yield self._do_join(event, snapshot, do_auth=do_auth) else: # This is not a JOIN, so we can handle it normally. if do_auth: - yield self.auth.check(event, raises=True) + yield self.auth.check(event, snapshot, raises=True) - prev_state = yield self.store.get_room_member( - target_user_id, event.room_id - ) if prev_state and prev_state.membership == event.membership: # double same action, treat this event as a NOOP. defer.returnValue({}) return - yield self.state_handler.handle_new_event(event) + yield self.state_handler.handle_new_event(event, snapshot) yield self._do_local_membership_update( event, membership=event.content["membership"], + snapshot=snapshot, ) defer.returnValue({"room_id": room_id}) @@ -596,12 +579,16 @@ class RoomMemberHandler(BaseHandler): content=content, ) - yield self._do_join(new_event, room_host=host, do_auth=True) + snapshot = yield self.store.snapshot_room( + room_id, joinee, RoomMemberEvent.TYPE, joinee + ) + + yield self._do_join(new_event, snapshot, room_host=host, do_auth=True) defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def _do_join(self, event, room_host=None, do_auth=True): + def _do_join(self, event, snapshot, room_host=None, do_auth=True): joinee = self.hs.parse_userid(event.state_key) # room_id = RoomID.from_string(event.room_id, self.hs) room_id = event.room_id @@ -623,6 +610,7 @@ class RoomMemberHandler(BaseHandler): elif room_host: should_do_dance = True else: + # TODO(markjh): get prev_state from snapshot prev_state = yield self.store.get_room_member( joinee.to_string(), room_id ) @@ -650,12 +638,13 @@ class RoomMemberHandler(BaseHandler): logger.debug("Doing normal join") if do_auth: - yield self.auth.check(event, raises=True) + yield self.auth.check(event, snapshot, raises=True) - yield self.state_handler.handle_new_event(event) + yield self.state_handler.handle_new_event(event, snapshot) yield self._do_local_membership_update( event, membership=event.content["membership"], + snapshot=snapshot, ) user = self.hs.parse_userid(event.user_id) @@ -699,39 +688,26 @@ class RoomMemberHandler(BaseHandler): defer.returnValue([r.room_id for r in rooms]) - @defer.inlineCallbacks - def _do_local_membership_update(self, event, membership): - # store membership - store_id = yield self.store.persist_event(event) - - # Send a PDU to all hosts who have joined the room. - destinations = yield self.store.get_joined_hosts_for_room( - event.room_id - ) + def _do_local_membership_update(self, event, membership, snapshot): + destinations = [] # If we're inviting someone, then we should also send it to that # HS. target_user_id = event.state_key if membership == Membership.INVITE: - host = UserID.from_string( - target_user_id, self.hs - ).domain + host = UserID.from_string(target_user_id, self.hs).domain destinations.append(host) # If we are joining a remote HS, include that. if membership == Membership.JOIN: - host = UserID.from_string( - target_user_id, self.hs - ).domain + host = UserID.from_string(target_user_id, self.hs).domain destinations.append(host) - event.destinations = list(set(destinations)) - - yield self.hs.get_federation().handle_new_event(event) - self.notifier.on_new_room_event(event, store_id) - + return self._on_new_room_event( + event, snapshot, extra_destinations=destinations + ) -class RoomListHandler(BaseHandler): +class RoomListHandler(BaseRoomHandler): @defer.inlineCallbacks def get_public_room_list(self): diff --git a/synapse/server.py b/synapse/server.py index 439b9d628e..9de88b6642 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -20,7 +20,6 @@ # Imports required for the default HomeServer() implementation from synapse.federation import initialize_http_replication -from synapse.federation.handler import FederationEventHandler from synapse.api.events.factory import EventFactory from synapse.api.notifier import Notifier from synapse.api.auth import Auth @@ -58,7 +57,6 @@ class BaseHomeServer(object): 'http_client', 'db_pool', 'persistence_service', - 'federation', 'replication_layer', 'datastore', 'event_factory', @@ -160,9 +158,6 @@ class HomeServer(BaseHomeServer): def build_replication_layer(self): return initialize_http_replication(self) - def build_federation(self): - return FederationEventHandler(self) - def build_datastore(self): return DataStore(self) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 38ab03c45c..08505f05ff 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -57,20 +57,28 @@ class DataStore(RoomMemberStore, RoomStore, @defer.inlineCallbacks @log_function - def persist_event(self, event, backfilled=False): - if event.type == RoomMemberEvent.TYPE: - yield self._store_room_member(event) - elif event.type == FeedbackEvent.TYPE: - yield self._store_feedback(event) -# elif event.type == RoomConfigEvent.TYPE: -# yield self._store_room_config(event) - elif event.type == RoomNameEvent.TYPE: - yield self._store_room_name(event) - elif event.type == RoomTopicEvent.TYPE: - yield self._store_room_topic(event) + def persist_event(self, event=None, backfilled=False, pdu=None): + # FIXME (erikj): This should be removed when we start amalgamating + # event and pdu storage + if event is not None: + federation_handler = self.hs.get_handlers().federation_handler + yield federation_handler.fill_out_prev_events(event) - ret = yield self._store_event(event, backfilled) - defer.returnValue(ret) + stream_ordering = None + if backfilled: + if not self.min_token_deferred.called: + yield self.min_token_deferred + self.min_token -= 1 + stream_ordering = self.min_token + + latest = yield self._db_pool.runInteraction( + self._persist_pdu_event_txn, + pdu=pdu, + event=event, + backfilled=backfilled, + stream_ordering=stream_ordering, + ) + defer.returnValue(latest) @defer.inlineCallbacks def get_event(self, event_id): @@ -89,12 +97,42 @@ class DataStore(RoomMemberStore, RoomStore, event = self._parse_event_from_row(events_dict) defer.returnValue(event) - @defer.inlineCallbacks + def _persist_pdu_event_txn(self, txn, pdu=None, event=None, + backfilled=False, stream_ordering=None): + if pdu is not None: + self._persist_pdu_txn(txn, pdu) + if event is not None: + self._persist_event_txn(txn, event, backfilled, stream_ordering) + + def _persist_pdu_txn(self, txn, pdu): + cols = dict(pdu.__dict__) + unrec_keys = dict(pdu.unrecognized_keys) + del cols["content"] + del cols["prev_pdus"] + cols["content_json"] = json.dumps(pdu.content) + cols["unrecognized_keys"] = json.dumps(unrec_keys) + + logger.debug("Persisting: %s", repr(cols)) + + if pdu.is_state: + self._persist_state_txn(txn, pdu.prev_pdus, cols) + else: + self._persist_pdu_txn(txn, pdu.prev_pdus, cols) + + self._update_min_depth_for_context_txn(txn, pdu.context, pdu.depth) + @log_function - def _store_event(self, event, backfilled): - # FIXME (erikj): This should be removed when we start amalgamating - # event and pdu storage - yield self.hs.get_federation().fill_out_prev_events(event) + def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None): + if event.type == RoomMemberEvent.TYPE: + self._store_room_member_txn(txn, event) + elif event.type == FeedbackEvent.TYPE: + self._store_feedback_txn(txn,event) +# elif event.type == RoomConfigEvent.TYPE: +# self._store_room_config_txn(txn, event) + elif event.type == RoomNameEvent.TYPE: + self._store_room_name_txn(txn, event) + elif event.type == RoomTopicEvent.TYPE: + self._store_room_topic_txn(txn, event) vals = { "topological_ordering": event.depth, @@ -105,17 +143,14 @@ class DataStore(RoomMemberStore, RoomStore, "processed": True, } + if stream_ordering is not None: + vals["stream_ordering"] = stream_ordering + if hasattr(event, "outlier"): vals["outlier"] = event.outlier else: vals["outlier"] = False - if backfilled: - if not self.min_token_deferred.called: - yield self.min_token_deferred - self.min_token -= 1 - vals["stream_ordering"] = self.min_token - unrec = { k: v for k, v in event.get_full_dict().items() @@ -124,7 +159,7 @@ class DataStore(RoomMemberStore, RoomStore, vals["unrecognized_keys"] = json.dumps(unrec) try: - yield self._simple_insert("events", vals) + self._simple_insert_txn(txn, "events", vals) except: logger.exception( "Failed to persist, probably duplicate: %s", @@ -143,9 +178,10 @@ class DataStore(RoomMemberStore, RoomStore, if hasattr(event, "prev_state"): vals["prev_state"] = event.prev_state - yield self._simple_insert("state_events", vals) + self._simple_insert_txn(txn, "state_events", vals) - yield self._simple_insert( + self._simple_insert_txn( + txn, "current_state_events", { "event_id": event.event_id, @@ -155,8 +191,7 @@ class DataStore(RoomMemberStore, RoomStore, } ) - latest = yield self.get_room_events_max_id() - defer.returnValue(latest) + return self._get_room_events_max_id_(txn) @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): @@ -192,6 +227,70 @@ class DataStore(RoomMemberStore, RoomStore, defer.returnValue(self.min_token) + def snapshot_room(self, room_id, user_id, state_type=None, state_key=None): + """Snapshot the room for an update by a user + Args: + room_id (synapse.types.RoomId): The room to snapshot. + user_id (synapse.types.UserId): The user to snapshot the room for. + state_type (str): Optional state type to snapshot. + state_key (str): Optional state key to snapshot. + Returns: + synapse.storage.Snapshot: A snapshot of the state of the room. + """ + def _snapshot(txn): + membership_state = self._get_room_member(txn, user_id) + prev_pdus = self._get_latest_pdus_in_context( + txn, room_id + ) + if state_type is not None and state_key is not None: + prev_state_pdu = self._get_current_state_pdu( + txn, room_id, state_type, state_key + ) + else: + prev_state_pdu = None + + return Snapshot( + store=self, + room_id=room_id, + user_id=user_id, + prev_pdus=prev_pdus, + membership_state=membership_state, + state_type=state_type, + state_key=state_key, + prev_state_pdu=prev_state_pdu, + ) + + return self._db_pool.runInteraction(_snapshot) + + +class Snapshot(object): + """Snapshot of the state of a room + Args: + store (DataStore): The datastore. + room_id (RoomId): The room of the snapshot. + user_id (UserId): The user this snapshot is for. + prev_pdus (list): The list of PDU ids this snapshot is after. + membership_state (RoomMemberEvent): The current state of the user in + the room. + state_type (str, optional): State type captured by the snapshot + state_key (str, optional): State key captured by the snapshot + prev_state_pdu (PduEntry, optional): pdu id of + the previous value of the state type and key in the room. + """ + + def __init__(self, store, room_id, user_id, prev_pdus, + membership_state, state_type=None, state_key=None, + prev_state_pdu=None): + self.store = store + self.room_id = room_id + self.user_id = user_id + self.prev_pdus = prev_pdus + self.membership_state + self.state_type = state_type + self.state_key = state_key + self.prev_state_pdu = prev_state_pdu + + def schema_path(schema): """ Get a filesystem path for the named database schema diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 75aab2d3b9..33d56f47ce 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -86,16 +86,18 @@ class SQLBaseStore(object): table : string giving the table name values : dict of new column names and values for them """ + return self._db_pool.runInteraction( + self._simple_insert_txn, table, values, + ) + + def _simple_insert_txn(self, txn, table, values): sql = "INSERT INTO %s (%s) VALUES(%s)" % ( table, ", ".join(k for k in values), ", ".join("?" for k in values) ) - - def func(txn): - txn.execute(sql, values.values()) - return txn.lastrowid - return self._db_pool.runInteraction(func) + txn.execute(sql, values.values()) + return txn.lastrowid def _simple_select_one(self, table, keyvalues, retcols, allow_none=False): diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py index 513b72d279..bac3dea955 100644 --- a/synapse/storage/feedback.py +++ b/synapse/storage/feedback.py @@ -20,8 +20,8 @@ from ._base import SQLBaseStore class FeedbackStore(SQLBaseStore): - def _store_feedback(self, event): - return self._simple_insert("feedback", { + def _store_feedback_txn(self, txn, event): + self._simple_insert_txn(txn, "feedback", { "event_id": event.event_id, "feedback_type": event.content["type"], "room_id": event.room_id, diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index 7655f43ede..657295b1d7 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -114,7 +114,7 @@ class PduStore(SQLBaseStore): return self._get_pdu_tuples(txn, res) - def persist_pdu(self, prev_pdus, **cols): + def _persist_pdu_txn(self, txn, prev_pdus, cols): """Inserts a (non-state) PDU into the database. Args: @@ -122,11 +122,6 @@ class PduStore(SQLBaseStore): prev_pdus (list) **cols: The columns to insert into the PdusTable. """ - return self._db_pool.runInteraction( - self._persist_pdu, prev_pdus, cols - ) - - def _persist_pdu(self, txn, prev_pdus, cols): entry = PdusTable.EntryType( **{k: cols.get(k, None) for k in PdusTable.fields} ) @@ -262,7 +257,7 @@ class PduStore(SQLBaseStore): return row[0] if row else None - def update_min_depth_for_context(self, context, depth): + def _update_min_depth_for_context_txn(self, txn, context, depth): """Update the minimum `depth` of the given context, which is the line on which we stop backfilling backwards. @@ -270,11 +265,6 @@ class PduStore(SQLBaseStore): context (str) depth (int) """ - return self._db_pool.runInteraction( - self._update_min_depth_for_context, context, depth - ) - - def _update_min_depth_for_context(self, txn, context, depth): min_depth = self._get_min_depth_interaction(txn, context) do_insert = depth < min_depth if min_depth else True @@ -485,7 +475,7 @@ class StatePduStore(SQLBaseStore): """A collection of queries for handling state PDUs. """ - def persist_state(self, prev_pdus, **cols): + def _persist_state_txn(self, txn, prev_pdus, cols): """Inserts a state PDU into the database Args: @@ -493,12 +483,6 @@ class StatePduStore(SQLBaseStore): prev_pdus (list) **cols: The columns to insert into the PdusTable and StatePdusTable """ - - return self._db_pool.runInteraction( - self._persist_state, prev_pdus, cols - ) - - def _persist_state(self, txn, prev_pdus, cols): pdu_entry = PdusTable.EntryType( **{k: cols.get(k, None) for k in PdusTable.fields} ) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index a5751005ef..d1f1a232f8 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -129,8 +129,9 @@ class RoomStore(SQLBaseStore): defer.returnValue(ret) - def _store_room_topic(self, event): - return self._simple_insert( + def _store_room_topic_txn(self, txn, event): + self._simple_insert_txn( + txn, "topics", { "event_id": event.event_id, @@ -139,8 +140,9 @@ class RoomStore(SQLBaseStore): } ) - def _store_room_name(self, event): - return self._simple_insert( + def _store_room_name_txn(self, txn, event): + self._simple_insert_txn( + txn, "room_names", { "event_id": event.event_id, diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 4ad37af0f3..5038aeea03 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -26,14 +26,14 @@ logger = logging.getLogger(__name__) class RoomMemberStore(SQLBaseStore): - @defer.inlineCallbacks - def _store_room_member(self, event): + def _store_room_member_txn(self, txn, event): """Store a room member in the database. """ target_user_id = event.state_key domain = self.hs.parse_userid(target_user_id).domain - yield self._simple_insert( + self._simple_insert_txn( + txn, "room_memberships", { "event_id": event.event_id, @@ -50,13 +50,13 @@ class RoomMemberStore(SQLBaseStore): "INSERT OR IGNORE INTO room_hosts (room_id, host) " "VALUES (?, ?)" ) - yield self._execute(None, sql, event.room_id, domain) + txn.execute(sql, event.room_id, domain) else: sql = ( "DELETE FROM room_hosts WHERE room_id = ? AND host = ?" ) - yield self._execute(None, sql, event.room_id, domain) + txn.execute(sql, event.room_id, domain) @defer.inlineCallbacks def get_room_member(self, user_id, room_id): diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index cae80563b4..ac887e2957 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -281,17 +281,20 @@ class StreamStore(SQLBaseStore): ) ) - @defer.inlineCallbacks def get_room_events_max_id(self): - res = yield self._execute_and_decode( + return self._db_pool.runInteraction(self._get_room_events_max_id_txn) + + def _get_room_events_max_id_txn(self, txn): + txn.execute( "SELECT MAX(stream_ordering) as m FROM events" ) + res = self.cursor_to_dict(txn) + logger.debug("get_room_events_max_id: %s", res) if not res or not res[0] or not res[0]["m"]: - defer.returnValue("s1") - return + return "s1" key = res[0]["m"] + 1 - defer.returnValue("s%d" % (key,)) + return "s%d" % (key,) |