From 75b4329aaad400abcee4b23e9c88ff845e0d73c7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Dec 2014 16:07:21 +0000 Subject: WIP for new way of managing events. --- synapse/events/utils.py | 82 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 synapse/events/utils.py (limited to 'synapse/events/utils.py') diff --git a/synapse/events/utils.py b/synapse/events/utils.py new file mode 100644 index 0000000000..412f690f08 --- /dev/null +++ b/synapse/events/utils.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.api.constants import EventTypes + + +def prune_event(event): + """ Returns a pruned version of the given event, which removes all keys we + don't know about or think could potentially be dodgy. + + This is used when we "redact" an event. We want to remove all fields that + the user has specified, but we do want to keep necessary information like + type, state_key etc. + """ + event_type = event.type + + allowed_keys = [ + "event_id", + "sender", + "room_id", + "hashes", + "signatures", + "content", + "type", + "state_key", + "depth", + "prev_events", + "prev_state", + "auth_events", + "origin", + "origin_server_ts", + ] + + new_content = {} + + def add_fields(*fields): + for field in fields: + if field in event.content: + new_content[field] = event.content[field] + + if event_type == EventTypes.Member: + add_fields("membership") + elif event_type == EventTypes.Create: + add_fields("creator") + elif event_type == EventTypes.JoinRules: + add_fields("join_rule") + elif event_type == EventTypes.PowerLevels: + add_fields( + "users", + "users_default", + "events", + "events_default", + "events_default", + "state_default", + "ban", + "kick", + "redact", + ) + elif event_type == EventTypes.Aliases: + add_fields("aliases") + + allowed_fields = { + k: v + for k, v in event.get_dict().items() + if k in allowed_keys + } + + allowed_fields["content"] = new_content + + return type(event)(allowed_fields) -- cgit 1.4.1 From 6630e1b5795667fd947cc5b0d5d2b00da97325e3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Dec 2014 16:20:48 +0000 Subject: Start making more things use EventContext rather than event.* --- synapse/api/auth.py | 33 +++++---- synapse/events/__init__.py | 35 ++++++++- synapse/events/utils.py | 16 ++++ synapse/handlers/_base.py | 164 ++++++++++++++++++----------------------- synapse/handlers/federation.py | 19 +++-- synapse/server.py | 2 +- synapse/state.py | 33 +++++++++ synapse/storage/__init__.py | 23 +++--- synapse/storage/_base.py | 8 +- synapse/storage/state.py | 13 +++- 10 files changed, 212 insertions(+), 134 deletions(-) (limited to 'synapse/events/utils.py') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 5261c3e3bf..3f2e58a5ef 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -351,27 +351,27 @@ class Auth(object): return self.store.is_server_admin(user) @defer.inlineCallbacks - def get_auth_events(self, event, current_state): - if event.type == RoomCreateEvent.TYPE: - event.auth_events = [] + def add_auth_events(self, builder, context): + if builder.type == RoomCreateEvent.TYPE: + builder.auth_events = [] return auth_events = [] key = (RoomPowerLevelsEvent.TYPE, "", ) - power_level_event = current_state.get(key) + power_level_event = context.current_state.get(key) if power_level_event: auth_events.append(power_level_event.event_id) key = (RoomJoinRulesEvent.TYPE, "", ) - join_rule_event = current_state.get(key) + join_rule_event = context.current_state.get(key) - key = (RoomMemberEvent.TYPE, event.user_id, ) - member_event = current_state.get(key) + key = (RoomMemberEvent.TYPE, builder.user_id, ) + member_event = context.current_state.get(key) key = (RoomCreateEvent.TYPE, "", ) - create_event = current_state.get(key) + create_event = context.current_state.get(key) if create_event: auth_events.append(create_event.event_id) @@ -381,8 +381,8 @@ class Auth(object): else: is_public = False - if event.type == RoomMemberEvent.TYPE: - e_type = event.content["membership"] + if builder.type == RoomMemberEvent.TYPE: + e_type = builder.content["membership"] if e_type in [Membership.JOIN, Membership.INVITE]: if join_rule_event: auth_events.append(join_rule_event.event_id) @@ -393,11 +393,18 @@ class Auth(object): if member_event.content["membership"] == Membership.JOIN: auth_events.append(member_event.event_id) - auth_events = yield self.store.add_event_hashes( - auth_events + auth_ids = [(a.event_id, h) for a, h in auth_events] + auth_events_entries = yield self.store.add_event_hashes( + auth_ids ) - defer.returnValue(auth_events) + builder.auth_events = auth_events_entries + + context.auth_events = { + k: v + for k, v in context.current_state.items() + if v.event_id in auth_ids + } @log_function def _can_send_event(self, event, auth_events): diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 58edf2bc8f..e81b995d39 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -17,8 +17,8 @@ from frozendict import frozendict def _freeze(o): - if isinstance(o, dict): - return frozendict({k: _freeze(v) for k,v in o.items()}) + if isinstance(o, dict) or isinstance(o, frozendict): + return frozendict({k: _freeze(v) for k, v in o.items()}) if isinstance(o, basestring): return o @@ -31,6 +31,21 @@ def _freeze(o): return o +def _unfreeze(o): + if isinstance(o, frozendict) or isinstance(o, dict): + return dict({k: _unfreeze(v) for k, v in o.items()}) + + if isinstance(o, basestring): + return o + + try: + return [_unfreeze(i) for i in o] + except TypeError: + pass + + return o + + class _EventInternalMetadata(object): def __init__(self, internal_metadata_dict): self.__dict__ = internal_metadata_dict @@ -69,6 +84,7 @@ class EventBase(object): ) auth_events = _event_dict_property("auth_events") + depth = _event_dict_property("depth") content = _event_dict_property("content") event_id = _event_dict_property("event_id") hashes = _event_dict_property("hashes") @@ -81,6 +97,10 @@ class EventBase(object): type = _event_dict_property("type") user_id = _event_dict_property("sender") + @property + def membership(self): + return self.content["membership"] + def is_state(self): return hasattr(self, "state_key") @@ -134,3 +154,14 @@ class FrozenEvent(EventBase): e.internal_metadata = event.internal_metadata return e + + def get_dict(self): + # We need to unfreeze what we return + + d = _unfreeze(self._event_dict) + d.update({ + "signatures": self.signatures, + "unsigned": self.unsigned, + }) + + return d diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 412f690f08..1b05ee0a95 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -14,6 +14,7 @@ # limitations under the License. from synapse.api.constants import EventTypes +from . import EventBase def prune_event(event): @@ -80,3 +81,18 @@ def prune_event(event): allowed_fields["content"] = new_content return type(event)(allowed_fields) + + +def serialize_event(hs, e): + # FIXME(erikj): To handle the case of presence events and the like + if not isinstance(e, EventBase): + return e + + # Should this strip out None's? + d = {k: v for k, v in e.get_dict().items()} + if "age_ts" in d["unsigned"]: + now = int(hs.get_clock().time_msec()) + d["unsigned"]["age"] = now - d["unsigned"]["age_ts"] + del d["unsigned"]["age_ts"] + + return d \ No newline at end of file diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 4052d0e1e7..810ce138ff 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -62,6 +62,8 @@ class BaseHandler(object): @defer.inlineCallbacks def _create_new_client_event(self, builder): + context = EventContext() + latest_ret = yield self.store.get_latest_events_in_room( builder.room_id, ) @@ -69,34 +71,26 @@ class BaseHandler(object): depth = max([d for _, _, d in latest_ret]) prev_events = [(e, h) for e, h, _ in latest_ret] - state_handler = self.state_handler - if builder.is_state(): - ret = yield state_handler.resolve_state_groups( - [e for e, _ in prev_events], - event_type=builder.event_type, - state_key=builder.state_key, - ) + builder.prev_events = prev_events + builder.depth = depth - group, curr_state, prev_state = ret + state_handler = self.state_handler + ret = yield state_handler.annotate_context_with_state( + builder, + context, + ) + group, prev_state = ret + if builder.is_state(): prev_state = yield self.store.add_event_hashes( prev_state ) builder.prev_state = prev_state - else: - group, curr_state, _ = yield state_handler.resolve_state_groups( - [e for e, _ in prev_events], - ) builder.internal_metadata.state_group = group - builder.prev_events = prev_events - builder.depth = depth - - auth_events = yield self.auth.get_auth_events(builder, curr_state) - - builder.update_event_key("auth_events", auth_events) + yield self.auth.add_auth_events(builder, context) add_hashes_and_signatures( builder, self.server_name, self.signing_key @@ -104,18 +98,6 @@ class BaseHandler(object): event = builder.build() - auth_ids = zip(*auth_events)[0] - curr_auth_events = { - k: v - for k, v in curr_state.items() - if v.event_id in auth_ids - } - - context = EventContext( - current_state=curr_state, - auth_events=curr_auth_events, - ) - defer.returnValue( (event, context,) ) @@ -128,7 +110,7 @@ class BaseHandler(object): if not suppress_auth: self.auth.check(event, auth_events=context.auth_events) - yield self.store.persist_event(event) + yield self.store.persist_event(event, context=context) destinations = set(extra_destinations) for k, s in context.current_state.items(): @@ -152,63 +134,63 @@ class BaseHandler(object): destinations=destinations, ) - @defer.inlineCallbacks - def _on_new_room_event(self, event, snapshot, extra_destinations=[], - extra_users=[], suppress_auth=False, - do_invite_host=None): - yield run_on_reactor() - - snapshot.fill_out_prev_events(event) - - yield self.state_handler.annotate_event_with_state(event) - - yield self.auth.add_auth_events(event) - - logger.debug("Signing event...") - - add_hashes_and_signatures( - event, self.server_name, self.signing_key - ) - - logger.debug("Signed event.") - - if not suppress_auth: - logger.debug("Authing...") - self.auth.check(event, auth_events=event.old_state_events) - logger.debug("Authed") - else: - logger.debug("Suppressed auth.") - - if do_invite_host: - federation_handler = self.hs.get_handlers().federation_handler - invite_event = yield federation_handler.send_invite( - do_invite_host, - event - ) - - # FIXME: We need to check if the remote changed anything else - event.signatures = invite_event.signatures - - yield self.store.persist_event(event) - - destinations = set(extra_destinations) - # Send a PDU to all hosts who have joined the room. - - for k, s in event.state_events.items(): - try: - if k[0] == RoomMemberEvent.TYPE: - if s.content["membership"] == Membership.JOIN: - destinations.add( - self.hs.parse_userid(s.state_key).domain - ) - except: - logger.warn( - "Failed to get destination from event %s", s.event_id - ) - - event.destinations = list(destinations) - - yield self.notifier.on_new_room_event(event, extra_users=extra_users) - - federation_handler = self.hs.get_handlers().federation_handler - yield federation_handler.handle_new_event(event, snapshot) + # @defer.inlineCallbacks + # def _on_new_room_event(self, event, snapshot, extra_destinations=[], + # extra_users=[], suppress_auth=False, + # do_invite_host=None): + # yield run_on_reactor() + # + # snapshot.fill_out_prev_events(event) + # + # yield self.state_handler.annotate_event_with_state(event) + # + # yield self.auth.add_auth_events(event) + # + # logger.debug("Signing event...") + # + # add_hashes_and_signatures( + # event, self.server_name, self.signing_key + # ) + # + # logger.debug("Signed event.") + # + # if not suppress_auth: + # logger.debug("Authing...") + # self.auth.check(event, auth_events=event.old_state_events) + # logger.debug("Authed") + # else: + # logger.debug("Suppressed auth.") + # + # if do_invite_host: + # federation_handler = self.hs.get_handlers().federation_handler + # invite_event = yield federation_handler.send_invite( + # do_invite_host, + # event + # ) + # + # # FIXME: We need to check if the remote changed anything else + # event.signatures = invite_event.signatures + # + # yield self.store.persist_event(event) + # + # destinations = set(extra_destinations) + # # Send a PDU to all hosts who have joined the room. + # + # for k, s in event.state_events.items(): + # try: + # if k[0] == RoomMemberEvent.TYPE: + # if s.content["membership"] == Membership.JOIN: + # destinations.add( + # self.hs.parse_userid(s.state_key).domain + # ) + # except: + # logger.warn( + # "Failed to get destination from event %s", s.event_id + # ) + # + # event.destinations = list(destinations) + # + # yield self.notifier.on_new_room_event(event, extra_users=extra_users) + # + # federation_handler = self.hs.get_handlers().federation_handler + # yield federation_handler.handle_new_event(event, snapshot) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b4a28ea3cb..5264e3eafc 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -17,7 +17,8 @@ from ._base import BaseHandler -from synapse.api.events.utils import prune_event +from synapse.events.snapshot import EventContext +from synapse.events.utils import prune_event from synapse.api.errors import ( AuthError, FederationError, SynapseError, StoreError, ) @@ -416,7 +417,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def on_make_join_request(self, context, user_id): + def on_make_join_request(self, room_id, user_id): """ We've received a /make_join/ request, so we create a partial join event for the room and return that. We don *not* persist or process it until the other server has signed it and sent it back. @@ -424,7 +425,7 @@ class FederationHandler(BaseHandler): builder = self.event_builder_factory.new({ "type": RoomMemberEvent.TYPE, "content": {"membership": Membership.JOIN}, - "room_id": context, + "room_id": room_id, "sender": user_id, "state_key": user_id, }) @@ -433,9 +434,7 @@ class FederationHandler(BaseHandler): builder=builder, ) - yield self.state_handler.annotate_event_with_state(event) - yield self.auth.add_auth_events(event) - self.auth.check(event, auth_events=event.old_state_events) + self.auth.check(event, auth_events=context.auth_events) pdu = event @@ -505,7 +504,9 @@ class FederationHandler(BaseHandler): """ event = pdu - event.outlier = True + context = EventContext() + + event.internal_metadata.outlier = True event.signatures.update( compute_event_signature( @@ -515,10 +516,11 @@ class FederationHandler(BaseHandler): ) ) - yield self.state_handler.annotate_event_with_state(event) + yield self.state_handler.annotate_context_with_state(event, context) yield self.store.persist_event( event, + context=context, backfilled=False, ) @@ -640,6 +642,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _handle_new_event(self, event, state=None, backfilled=False, current_state=None, fetch_missing=True): + context = EventContext() is_new_state = yield self.state_handler.annotate_event_with_state( event, old_state=state diff --git a/synapse/server.py b/synapse/server.py index 8bc27bbc3c..0d0f3af3f4 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -20,7 +20,7 @@ # Imports required for the default HomeServer() implementation from synapse.federation import initialize_http_replication -from synapse.api.events import serialize_event +from synapse.events.utils import serialize_event from synapse.api.events.factory import EventFactory from synapse.api.events.validator import EventValidator from synapse.notifier import Notifier diff --git a/synapse/state.py b/synapse/state.py index 8a556a27f6..cbb4243fad 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -135,6 +135,39 @@ class StateHandler(object): defer.returnValue(res[1].values()) + @defer.inlineCallbacks + def annotate_context_with_state(self, event, context): + if event.is_state(): + ret = yield self.resolve_state_groups( + [e for e, _ in event.prev_events], + event_type=event.event_type, + state_key=event.state_key, + ) + else: + ret = yield self.resolve_state_groups( + [e for e, _ in event.prev_events], + ) + + group, curr_state, prev_state = ret + + context.current_state = curr_state + + prev_state = yield self.store.add_event_hashes( + prev_state + ) + + if hasattr(event, "auth_events") and event.auth_events: + auth_ids = zip(*event.auth_events)[0] + context.auth_events = { + k: v + for k, v in context.current_state.items() + if v.event_id in auth_ids + } + + defer.returnValue( + (group, prev_state) + ) + @defer.inlineCallbacks @log_function def resolve_state_groups(self, event_ids, event_type=None, state_key=""): diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 205d125642..f172c2690a 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -21,6 +21,7 @@ from synapse.api.events.room import ( ) from synapse.util.logutils import log_function +from synapse.util.frozenutils import FrozenEncoder from .directory import DirectoryStore from .feedback import FeedbackStore @@ -93,8 +94,8 @@ class DataStore(RoomMemberStore, RoomStore, @defer.inlineCallbacks @log_function - def persist_event(self, event, backfilled=False, is_new_state=True, - current_state=None): + def persist_event(self, event, context, backfilled=False, + is_new_state=True, current_state=None): stream_ordering = None if backfilled: if not self.min_token_deferred.called: @@ -107,6 +108,7 @@ class DataStore(RoomMemberStore, RoomStore, "persist_event", self._persist_event_txn, event=event, + context=context, backfilled=backfilled, stream_ordering=stream_ordering, is_new_state=is_new_state, @@ -138,8 +140,9 @@ class DataStore(RoomMemberStore, RoomStore, defer.returnValue(event[0]) @log_function - def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None, - is_new_state=True, current_state=None): + def _persist_event_txn(self, txn, event, context, backfilled, + stream_ordering=None, is_new_state=True, + current_state=None): if event.type == RoomMemberEvent.TYPE: self._store_room_member_txn(txn, event) elif event.type == FeedbackEvent.TYPE: @@ -152,12 +155,12 @@ class DataStore(RoomMemberStore, RoomStore, self._store_redaction(txn, event) outlier = False - if hasattr(event, "outlier"): - outlier = event.outlier + if hasattr(event.internal_metadata, "outlier"): + outlier = event.internal_metadata.outlier event_dict = { k: v - for k, v in event.get_full_dict().items() + for k, v in event.get_dict().items() if k not in [ "redacted", "redacted_because", @@ -179,7 +182,7 @@ class DataStore(RoomMemberStore, RoomStore, "event_id": event.event_id, "type": event.type, "room_id": event.room_id, - "content": json.dumps(event.content), + "content": json.dumps(event.content, cls=FrozenEncoder), "processed": True, "outlier": outlier, "depth": event.depth, @@ -190,7 +193,7 @@ class DataStore(RoomMemberStore, RoomStore, unrec = { k: v - for k, v in event.get_full_dict().items() + for k, v in event.get_dict().items() if k not in vals.keys() and k not in [ "redacted", "redacted_because", @@ -225,7 +228,7 @@ class DataStore(RoomMemberStore, RoomStore, room_id=event.room_id, ) - self._store_state_groups_txn(txn, event) + self._store_state_groups_txn(txn, event, context) if current_state: txn.execute( diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index bb61c20150..c56c3a0b0f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -15,7 +15,8 @@ import logging from synapse.api.errors import StoreError -from synapse.api.events.utils import prune_event +from synapse.events import FrozenEvent +from synapse.events.utils import prune_event from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext, LoggingContext from syutil.base64util import encode_base64 @@ -497,10 +498,7 @@ class SQLBaseStore(object): d = json.loads(js) - ev = self.event_factory.create_event( - etype=d["type"], - **d - ) + ev = FrozenEvent(d) if hasattr(ev, "redacted") and ev.redacted: # Get the redaction event. diff --git a/synapse/storage/state.py b/synapse/storage/state.py index e0f44b3e59..b8e721ad72 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -86,11 +86,16 @@ class StateStore(SQLBaseStore): self._store_state_groups_txn, event ) - def _store_state_groups_txn(self, txn, event): - if event.state_events is None: + def _store_state_groups_txn(self, txn, event, context): + if context.current_state_events is None: return - state_group = event.state_group + state_events = context.current_state_events + + if event.is_state(): + state_events[(event.type, event.state_key)] = event + + state_group = context.state_group if not state_group: state_group = self._simple_insert_txn( txn, @@ -102,7 +107,7 @@ class StateStore(SQLBaseStore): or_ignore=True, ) - for state in event.state_events.values(): + for state in context.state_events.values(): self._simple_insert_txn( txn, table="state_groups_state", -- cgit 1.4.1 From d044121168672c657e595525af9b588c8769e9bb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Dec 2014 09:08:26 +0000 Subject: Various typos and bug fixes. --- synapse/api/auth.py | 16 ++++--- synapse/events/snapshot.py | 3 +- synapse/events/utils.py | 2 + synapse/federation/replication.py | 12 +++-- synapse/handlers/_base.py | 8 ++-- synapse/handlers/federation.py | 97 +++++++++++++++------------------------ synapse/state.py | 16 +++++-- synapse/storage/state.py | 6 +-- 8 files changed, 80 insertions(+), 80 deletions(-) (limited to 'synapse/events/utils.py') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 3f2e58a5ef..821e3ba5e2 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -24,6 +24,7 @@ from synapse.api.events.room import ( RoomJoinRulesEvent, RoomCreateEvent, RoomAliasesEvent, ) from synapse.util.logutils import log_function +from synapse.util.async import run_on_reactor from syutil.base64util import encode_base64 import logging @@ -352,17 +353,19 @@ class Auth(object): @defer.inlineCallbacks def add_auth_events(self, builder, context): + yield run_on_reactor() + if builder.type == RoomCreateEvent.TYPE: builder.auth_events = [] return - auth_events = [] + auth_ids = [] key = (RoomPowerLevelsEvent.TYPE, "", ) power_level_event = context.current_state.get(key) if power_level_event: - auth_events.append(power_level_event.event_id) + auth_ids.append(power_level_event.event_id) key = (RoomJoinRulesEvent.TYPE, "", ) join_rule_event = context.current_state.get(key) @@ -373,7 +376,7 @@ class Auth(object): key = (RoomCreateEvent.TYPE, "", ) create_event = context.current_state.get(key) if create_event: - auth_events.append(create_event.event_id) + auth_ids.append(create_event.event_id) if join_rule_event: join_rule = join_rule_event.content.get("join_rule") @@ -385,15 +388,14 @@ class Auth(object): e_type = builder.content["membership"] if e_type in [Membership.JOIN, Membership.INVITE]: if join_rule_event: - auth_events.append(join_rule_event.event_id) + auth_ids.append(join_rule_event.event_id) if member_event and not is_public: - auth_events.append(member_event.event_id) + auth_ids.append(member_event.event_id) elif member_event: if member_event.content["membership"] == Membership.JOIN: - auth_events.append(member_event.event_id) + auth_ids.append(member_event.event_id) - auth_ids = [(a.event_id, h) for a, h in auth_events] auth_events_entries = yield self.store.add_event_hashes( auth_ids ) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index ca15ec09ae..e0cbacc19c 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -58,6 +58,7 @@ class EventCache(object): class EventContext(object): - def __init__(self, current_state, auth_events): + def __init__(self, current_state=None, auth_events=None): self.current_state = current_state self.auth_events = auth_events + self.state_group = None diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 1b05ee0a95..485f075406 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -95,4 +95,6 @@ def serialize_event(hs, e): d["unsigned"]["age"] = now - d["unsigned"]["age_ts"] del d["unsigned"]["age_ts"] + d["user_id"] = d.pop("sender", None) + return d \ No newline at end of file diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index b11df9e5c6..3af24ee46d 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -558,7 +558,13 @@ class ReplicationLayer(object): origin, pdu.event_id, do_auth=False ) - if existing and (not existing.outlier or pdu.outlier): + already_seen = ( + existing and ( + not existing.internal_metadata.outlier + or pdu.internal_metadata.outlier + ) + ) + if already_seen: logger.debug("Already seen pdu %s", pdu.event_id) defer.returnValue({}) return @@ -596,7 +602,7 @@ class ReplicationLayer(object): # ) # Get missing pdus if necessary. - if not pdu.outlier: + if not pdu.internal_metadata.outlier: # We only backfill backwards to the min depth. min_depth = yield self.handler.get_min_depth_for_context( pdu.room_id @@ -663,7 +669,7 @@ class ReplicationLayer(object): pdu_json ) - builder.internal_metadata = outlier + builder.internal_metadata.outlier = outlier return builder.build() diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 810ce138ff..0bff644192 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -62,6 +62,8 @@ class BaseHandler(object): @defer.inlineCallbacks def _create_new_client_event(self, builder): + yield run_on_reactor() + context = EventContext() latest_ret = yield self.store.get_latest_events_in_room( @@ -79,7 +81,7 @@ class BaseHandler(object): builder, context, ) - group, prev_state = ret + prev_state = ret if builder.is_state(): prev_state = yield self.store.add_event_hashes( @@ -88,8 +90,6 @@ class BaseHandler(object): builder.prev_state = prev_state - builder.internal_metadata.state_group = group - yield self.auth.add_auth_events(builder, context) add_hashes_and_signatures( @@ -105,6 +105,8 @@ class BaseHandler(object): @defer.inlineCallbacks def handle_new_client_event(self, event, context, extra_destinations=[], extra_users=[], suppress_auth=False): + yield run_on_reactor() + # We now need to go and hit out to wherever we need to hit out to. if not suppress_auth: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 5264e3eafc..38ee32d26e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -149,7 +149,7 @@ class FederationHandler(BaseHandler): event.room_id, self.server_name ) - if not is_in_room and not event.outlier: + if not is_in_room and not event.internal_metadata.outlier: logger.debug("Got event for room we're not in.") replication_layer = self.replication_layer @@ -160,7 +160,7 @@ class FederationHandler(BaseHandler): ) for e in auth_chain: - e.outlier = True + e.internal_metadata.outlier = True try: yield self._handle_new_event(e, fetch_missing=False) except: @@ -180,7 +180,7 @@ class FederationHandler(BaseHandler): if state: for e in state: - e.outlier = True + e.internal_metadata.outlier = True try: yield self._handle_new_event(e) except: @@ -254,11 +254,18 @@ class FederationHandler(BaseHandler): event = pdu # FIXME (erikj): Not sure this actually works :/ - yield self.state_handler.annotate_event_with_state(event) + context = EventContext() + yield self.state_handler.annotate_context_with_state(event, context) - events.append(event) + events.append( + (event, context) + ) - yield self.store.persist_event(event, backfilled=True) + yield self.store.persist_event( + event, + context=context, + backfilled=True + ) defer.returnValue(events) @@ -326,7 +333,7 @@ class FederationHandler(BaseHandler): assert(event.state_key == joinee) assert(event.room_id == room_id) - event.outlier = False + event.internal_metadata.outlier = False self.room_queues[room_id] = [] @@ -369,7 +376,7 @@ class FederationHandler(BaseHandler): pass for e in auth_chain: - e.outlier = True + e.internal_metadata.outlier = True try: yield self._handle_new_event(e, fetch_missing=False) except: @@ -380,7 +387,7 @@ class FederationHandler(BaseHandler): for e in state: # FIXME: Auth these. - e.outlier = True + e.internal_metadata.outlier = True try: yield self._handle_new_event( e, @@ -448,7 +455,7 @@ class FederationHandler(BaseHandler): """ event = pdu - event.outlier = False + event.internal_metadata.outlier = False yield self._handle_new_event(event) @@ -643,70 +650,42 @@ class FederationHandler(BaseHandler): def _handle_new_event(self, event, state=None, backfilled=False, current_state=None, fetch_missing=True): context = EventContext() - is_new_state = yield self.state_handler.annotate_event_with_state( + yield self.state_handler.annotate_context_with_state( event, old_state=state ) - if event.old_state_events: - known_ids = set( - [s.event_id for s in event.old_state_events.values()] - ) - for e_id, _ in event.auth_events: - if e_id not in known_ids: - e = yield self.store.get_event( - e_id, - allow_none=True, - ) - - if not e: - # TODO: Do some conflict res to make sure that we're - # not the ones who are wrong. - logger.info( - "Rejecting %s as %s not in %s", - event.event_id, e_id, known_ids, - ) - raise AuthError(403, "Auth events are stale") + is_new_state = not event.internal_metadata.outlier - auth_events = event.old_state_events - else: - # We need to get the auth events from somewhere. - - # TODO: Don't just hit the DBs? - - auth_events = {} - for e_id, _ in event.auth_events: + known_ids = set( + [s.event_id for s in context.auth_events.values()] + ) + for e_id, _ in event.auth_events: + if e_id not in known_ids: e = yield self.store.get_event( e_id, + context, allow_none=True, ) if not e: - e = yield self.replication_layer.get_pdu( - event.origin, e_id, outlier=True + # TODO: Do some conflict res to make sure that we're + # not the ones who are wrong. + logger.info( + "Rejecting %s as %s not in %s", + event.event_id, e_id, known_ids, ) + raise AuthError(403, "Auth events are stale") - if e and fetch_missing: - try: - yield self.on_receive_pdu(event.origin, e, False) - except: - logger.exception( - "Failed to parse auth event %s", - e_id, - ) + context.auth_events[(e.type, e.state_key)] = e - if not e: - logger.warn("Can't find auth event %s.", e_id) + if event.type == RoomMemberEvent.TYPE and not event.auth_events: + if len(event.prev_events) == 1: + c = yield self.store.get_event(event.prev_events[0][0]) + if c.type == RoomCreateEvent.TYPE: + context.auth_events[(c.type, c.state_key)] = c - auth_events[(e.type, e.state_key)] = e - - if event.type == RoomMemberEvent.TYPE and not event.auth_events: - if len(event.prev_events) == 1: - c = yield self.store.get_event(event.prev_events[0][0]) - if c.type == RoomCreateEvent.TYPE: - auth_events[(c.type, c.state_key)] = c - - self.auth.check(event, auth_events=auth_events) + self.auth.check(event, auth_events=context.auth_events) yield self.store.persist_event( event, diff --git a/synapse/state.py b/synapse/state.py index cbb4243fad..464cbae564 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -136,7 +136,16 @@ class StateHandler(object): defer.returnValue(res[1].values()) @defer.inlineCallbacks - def annotate_context_with_state(self, event, context): + def annotate_context_with_state(self, event, context, old_state=None): + yield run_on_reactor() + + if old_state: + context.current_state = { + (s.type, s.state_key): s for s in old_state + } + context.state_group = None + defer.returnValue([]) + if event.is_state(): ret = yield self.resolve_state_groups( [e for e, _ in event.prev_events], @@ -151,6 +160,7 @@ class StateHandler(object): group, curr_state, prev_state = ret context.current_state = curr_state + context.state_group = group prev_state = yield self.store.add_event_hashes( prev_state @@ -164,9 +174,7 @@ class StateHandler(object): if v.event_id in auth_ids } - defer.returnValue( - (group, prev_state) - ) + defer.returnValue(prev_state) @defer.inlineCallbacks @log_function diff --git a/synapse/storage/state.py b/synapse/storage/state.py index b8e721ad72..afe3e5edea 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -87,10 +87,10 @@ class StateStore(SQLBaseStore): ) def _store_state_groups_txn(self, txn, event, context): - if context.current_state_events is None: + if context.current_state is None: return - state_events = context.current_state_events + state_events = context.current_state if event.is_state(): state_events[(event.type, event.state_key)] = event @@ -107,7 +107,7 @@ class StateStore(SQLBaseStore): or_ignore=True, ) - for state in context.state_events.values(): + for state in state_events.values(): self._simple_insert_txn( txn, table="state_groups_state", -- cgit 1.4.1 From 8c4845068298cf6e2b8a662ffa2340b655abf9b6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Dec 2014 10:58:31 +0000 Subject: Add PEP8 newlines --- synapse/events/builder.py | 2 +- synapse/events/utils.py | 2 +- synapse/events/validator.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/events/utils.py') diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 0b8caf9318..127b8fa904 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -66,4 +66,4 @@ class EventBuilderFactory(object): age = key_values["unsigned"].pop("age", 0) key_values["unsigned"].setdefault("age_ts", time_now - age) - return EventBuilder(key_values=key_values,) \ No newline at end of file + return EventBuilder(key_values=key_values,) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 485f075406..f5e135e3d0 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -97,4 +97,4 @@ def serialize_event(hs, e): d["user_id"] = d.pop("sender", None) - return d \ No newline at end of file + return d diff --git a/synapse/events/validator.py b/synapse/events/validator.py index 7dc9506ec4..f319072d37 100644 --- a/synapse/events/validator.py +++ b/synapse/events/validator.py @@ -55,4 +55,4 @@ class EventValidator(object): def validate_new(self, event): self.validate(event) - UserID.from_string(event.sender) \ No newline at end of file + UserID.from_string(event.sender) -- cgit 1.4.1 From 8cdebce470869613658543cb79ed5dd97a5f0548 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 11 Dec 2014 13:25:19 +0000 Subject: Fix redactions. Fix 'age' key --- synapse/events/__init__.py | 1 + synapse/events/builder.py | 13 ++++++--- synapse/events/utils.py | 21 +++++++++++++++ synapse/state.py | 12 +++++++++ synapse/storage/_base.py | 60 ++++++++++++++++++++++++----------------- tests/storage/test_redaction.py | 6 ++--- 6 files changed, 83 insertions(+), 30 deletions(-) (limited to 'synapse/events/utils.py') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 7103b937af..98d7f0e324 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -97,6 +97,7 @@ class EventBase(object): origin_server_ts = _event_dict_property("origin_server_ts") prev_events = _event_dict_property("prev_events") prev_state = _event_dict_property("prev_state") + redacts = _event_dict_property("redacts") room_id = _event_dict_property("room_id") sender = _event_dict_property("sender") state_key = _event_dict_property("state_key") diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 642264e9f3..9579b1fe8b 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -19,11 +19,18 @@ from synapse.types import EventID from synapse.util.stringutils import random_string +import copy + class EventBuilder(EventBase): def __init__(self, key_values={}): + signatures = copy.deepcopy(key_values.pop("signatures", {})) + unsigned = copy.deepcopy(key_values.pop("unsigned", {})) + super(EventBuilder, self).__init__( key_values, + signatures=signatures, + unsigned=unsigned ) def update_event_key(self, key, value): @@ -61,9 +68,9 @@ class EventBuilderFactory(object): key_values.setdefault("origin", self.hostname) key_values.setdefault("origin_server_ts", time_now) - if "unsigned" in key_values: - age = key_values["unsigned"].pop("age", 0) - key_values["unsigned"].setdefault("age_ts", time_now - age) + key_values.setdefault("unsigned", {}) + age = key_values["unsigned"].pop("age", 0) + key_values["unsigned"].setdefault("age_ts", time_now - age) key_values["signatures"] = {} diff --git a/synapse/events/utils.py b/synapse/events/utils.py index f5e135e3d0..6d9c9352e2 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -80,6 +80,11 @@ def prune_event(event): allowed_fields["content"] = new_content + allowed_fields["unsigned"] = {} + + if "age_ts" in event.unsigned: + allowed_fields["unsigned"]["age_ts"] = event.unsigned["age_ts"] + return type(event)(allowed_fields) @@ -97,4 +102,20 @@ def serialize_event(hs, e): d["user_id"] = d.pop("sender", None) + if "redacted_because" in e.unsigned: + d["redacted_because"] = serialize_event( + hs, e.unsigned["redacted_because"] + ) + + del d["unsigned"]["redacted_because"] + + if "redacted_by" in e.unsigned: + d["redacted_by"] = e.unsigned["redacted_by"] + del d["unsigned"]["redacted_by"] + + del d["auth_events"] + del d["prev_events"] + del d["hashes"] + del d["signatures"] + return d diff --git a/synapse/state.py b/synapse/state.py index 7fdf596006..5bfa73fb46 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -155,6 +155,12 @@ class StateHandler(object): else: context.auth_events = {} + if event.is_state(): + key = (event.type, event.state_key) + if key in context.current_state: + replaces = context.current_state[key] + event.unsigned["replaces_state"] = replaces.event_id + defer.returnValue([]) if event.is_state(): @@ -177,6 +183,12 @@ class StateHandler(object): prev_state ) + if event.is_state(): + key = (event.type, event.state_key) + if key in context.current_state: + replaces = context.current_state[key] + event.unsigned["replaces_state"] = replaces.event_id + if hasattr(event, "auth_events") and event.auth_events: auth_ids = zip(*event.auth_events)[0] context.auth_events = { diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 12239fa074..ffc26d4a61 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -444,38 +444,50 @@ class SQLBaseStore(object): def _get_events_txn(self, txn, event_ids): events = [] for e_id in event_ids: - js = self._simple_select_one_onecol_txn( - txn, - table="event_json", - keyvalues={"event_id": e_id}, - retcol="json", - allow_none=True, - ) + ev = self._get_event_txn(txn, e_id) - if not js: - # FIXME (erikj): What should we actually do here? - continue + if ev: + events.append(ev) - d = json.loads(js) + return events - ev = FrozenEvent(d) + def _get_event_txn(self, txn, event_id, check_redacted=True): + sql = ( + "SELECT json, r.event_id FROM event_json as e " + "LEFT JOIN redactions as r ON e.event_id = r.redacts " + "WHERE e.event_id = ? " + "LIMIT 1 " + ) - if hasattr(ev, "redacted") and ev.redacted: - # Get the redaction event. - select_event_sql = "SELECT * FROM events WHERE event_id = ?" - txn.execute(select_event_sql, (ev.redacted,)) + txn.execute(sql, (event_id,)) - del_evs = self._parse_events_txn( - txn, self.cursor_to_dict(txn) - ) + res = txn.fetchone() - if del_evs: - ev = prune_event(ev) - ev.redacted_because = del_evs[0] + if not res: + return None - events.append(ev) + js, redacted = res - return events + d = json.loads(js) + + ev = FrozenEvent(d) + + if check_redacted and redacted: + ev = prune_event(ev) + + ev.unsigned["redacted_by"] = redacted + # Get the redaction event. + + because = self._get_event_txn( + txn, + redacted, + check_redacted=False + ) + + if because: + ev.unsigned["redacted_because"] = because + + return ev def _parse_events(self, rows): return self.runInteraction( diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py index e8671ae3a2..d81f7add1c 100644 --- a/tests/storage/test_redaction.py +++ b/tests/storage/test_redaction.py @@ -149,7 +149,7 @@ class RedactionTestCase(unittest.TestCase): event, ) - self.assertFalse(hasattr(event, "redacted_because")) + self.assertFalse("redacted_because" in event.unsigned) # Redact event reason = "Because I said so" @@ -179,7 +179,7 @@ class RedactionTestCase(unittest.TestCase): event, ) - self.assertTrue(hasattr(event, "redacted_because")) + self.assertTrue("redacted_because" in event.unsigned) self.assertObjectHasAttributes( { @@ -187,7 +187,7 @@ class RedactionTestCase(unittest.TestCase): "user_id": self.u_alice.to_string(), "content": {"reason": reason}, }, - event.redacted_because, + event.unsigned["redacted_because"], ) @defer.inlineCallbacks -- cgit 1.4.1 From 9191292b0f657f6210e88f16ffd9a182bfab8170 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 11 Dec 2014 15:16:55 +0000 Subject: Fix prev_content --- synapse/events/utils.py | 8 ++++++++ synapse/storage/_base.py | 6 ++++++ tests/storage/test_stream.py | 5 ++--- 3 files changed, 16 insertions(+), 3 deletions(-) (limited to 'synapse/events/utils.py') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 6d9c9352e2..4ab770dd5f 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -113,6 +113,14 @@ def serialize_event(hs, e): d["redacted_by"] = e.unsigned["redacted_by"] del d["unsigned"]["redacted_by"] + if "replaces_state" in e.unsigned: + d["replaces_state"] = e.unsigned["replaces_state"] + del d["unsigned"]["replaces_state"] + + if "prev_content" in e.unsigned: + d["prev_content"] = e.unsigned["prev_content"] + del d["unsigned"]["prev_content"] + del d["auth_events"] del d["prev_events"] del d["hashes"] diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index ffc26d4a61..e9cf73a8e2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -487,6 +487,12 @@ class SQLBaseStore(object): if because: ev.unsigned["redacted_because"] = because + if "replaces_state" in ev.unsigned: + ev.unsigned["prev_content"] = self._get_event_txn( + txn, + ev.unsigned["replaces_state"], + ).get_dict()["content"] + return ev def _parse_events(self, rows): diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index cba65bb9f0..4865a5c142 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -58,7 +58,7 @@ class StreamStoreTestCase(unittest.TestCase): self.depth = 1 @defer.inlineCallbacks - def inject_room_member(self, room, user, membership, replaces_state=None): + def inject_room_member(self, room, user, membership): self.depth += 1 builder = self.event_builder_factory.new({ @@ -215,7 +215,6 @@ class StreamStoreTestCase(unittest.TestCase): event2 = yield self.inject_room_member( self.room1, self.u_alice, Membership.JOIN, - replaces_state=event1.event_id, ) end = yield self.store.get_room_events_max_id() @@ -233,6 +232,6 @@ class StreamStoreTestCase(unittest.TestCase): event = results[0] self.assertTrue( - hasattr(event, "prev_content"), + "prev_content" in event.unsigned, msg="No prev_content key" ) -- cgit 1.4.1 From c8dd3314d673fce90a53520475cdb19d5358dd34 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Dec 2014 13:55:22 +0000 Subject: Fix bug where we ignored event_edge_hashes table --- scripts/check_event_hash.py | 3 +++ synapse/events/utils.py | 1 + synapse/storage/__init__.py | 1 - synapse/storage/_base.py | 1 - synapse/storage/event_federation.py | 11 ++++++----- 5 files changed, 10 insertions(+), 7 deletions(-) (limited to 'synapse/events/utils.py') diff --git a/scripts/check_event_hash.py b/scripts/check_event_hash.py index 7c32f8102a..679afbd268 100644 --- a/scripts/check_event_hash.py +++ b/scripts/check_event_hash.py @@ -18,6 +18,9 @@ class dictobj(dict): def get_full_dict(self): return dict(self) + def get_pdu_json(self): + return dict(self) + def main(): parser = argparse.ArgumentParser() diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 4ab770dd5f..94f3f15f52 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -42,6 +42,7 @@ def prune_event(event): "auth_events", "origin", "origin_server_ts", + "membership", ] new_content = {} diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 079ddac605..d0ea304b3d 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -91,7 +91,6 @@ class DataStore(RoomMemberStore, RoomStore, def __init__(self, hs): super(DataStore, self).__init__(hs) - self.event_factory = hs.get_event_factory() self.hs = hs self.min_token_deferred = self._get_min_token() diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1967290ad2..31d5163c19 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -83,7 +83,6 @@ class SQLBaseStore(object): def __init__(self, hs): self.hs = hs self._db_pool = hs.get_db_pool() - self.event_factory = hs.get_event_factory() self._clock = hs.get_clock() @defer.inlineCallbacks diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 6c559f8f63..ced066f407 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -177,14 +177,15 @@ class EventFederationStore(SQLBaseStore): retcols=["prev_event_id", "is_state"], ) + hashes = self._get_prev_event_hashes_txn(txn, event_id) + results = [] for d in res: - hashes = self._get_event_reference_hashes_txn( - txn, - d["prev_event_id"] - ) + edge_hash = self._get_event_reference_hashes_txn(txn, d["prev_event_id"]) + edge_hash.update(hashes.get(d["prev_event_id"], {})) prev_hashes = { - k: encode_base64(v) for k, v in hashes.items() + k: encode_base64(v) + for k, v in edge_hash.items() if k == "sha256" } results.append((d["prev_event_id"], prev_hashes, d["is_state"])) -- cgit 1.4.1