From c1e66800a937351614b2d19cdebbb141f786b03a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Dec 2014 11:40:22 +0000 Subject: Begin fleshing out a new Event object --- synapse/events/__init__.py | 120 +++++++++++++++++++++++++++++++++++++++++++++ synapse/events/builder.py | 74 ++++++++++++++++++++++++++++ 2 files changed, 194 insertions(+) create mode 100644 synapse/events/__init__.py create mode 100644 synapse/events/builder.py (limited to 'synapse/events') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py new file mode 100644 index 0000000000..eefc9d3b30 --- /dev/null +++ b/synapse/events/__init__.py @@ -0,0 +1,120 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from frozendict import frozendict + + +class _EventInternalMetadata(object): + def __init__(self, internal_metadata_dict): + self.__dict__ = internal_metadata_dict + + def get_dict(self): + return dict(self.__dict__) + + +class Event(object): + def __init__(self, event_dict, internal_metadata_dict={}): + self._signatures = event_dict.get("signatures", {}) + self._unsigned = event_dict.get("unsigned", {}) + + self._original = { + k: v + for k, v in event_dict.items() + if k not in ["signatures", "unsigned"] + } + + self._event_dict = frozendict(self._original) + + self.internal_metadata = _EventInternalMetadata( + internal_metadata_dict + ) + + @property + def auth_events(self): + return self._event_dict["auth_events"] + + @property + def content(self): + return self._event_dict["content"] + + @property + def event_id(self): + return self._event_dict["event_id"] + + @property + def hashes(self): + return self._event_dict["hashes"] + + @property + def origin(self): + return self._event_dict["origin"] + + @property + def prev_events(self): + return self._event_dict["prev_events"] + + @property + def prev_state(self): + return self._event_dict["prev_state"] + + @property + def room_id(self): + return self._event_dict["room_id"] + + @property + def signatures(self): + return self._signatures + + @property + def state_key(self): + return self._event_dict["state_key"] + + @property + def type(self): + return self._event_dict["type"] + + @property + def unsigned(self): + return self._unsigned + + @property + def user_id(self): + return self._event_dict["sender"] + + @property + def sender(self): + return self._event_dict["sender"] + + def get_dict(self): + d = dict(self._original) + d.update({ + "signatures": self._signatures, + "unsigned": self._unsigned, + }) + + return d + + def get_internal_metadata_dict(self): + return self.internal_metadata.get_dict() + + def get_pdu_json(self, time_now=None): + pdu_json = self.get_dict() + + if time_now is not None and "age_ts" in pdu_json["unsigned"]: + age = time_now - pdu_json["unsigned"]["age_ts"] + pdu_json.setdefault("unsigned", {})["age"] = int(age) + del pdu_json["unsigned"]["age_ts"] + + return pdu_json \ No newline at end of file diff --git a/synapse/events/builder.py b/synapse/events/builder.py new file mode 100644 index 0000000000..d741795bc5 --- /dev/null +++ b/synapse/events/builder.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import Event + +from synapse.types import EventID + +from synapse.util.stringutils import random_string + + +class EventBuilder(object): + def __init__(self, key_values={}): + self._event_dict = dict(key_values) + self._metadata = {} + + def update_event_key(self, key, value): + self._event_dict[key] = value + + def update_event_keys(self, other_dict): + self._event_dict.update(other_dict) + + def update_internal_key(self, key, value): + self._metadata[key] = value + + def build(self): + return Event( + self._event_dict, + self._metadata, + ) + + +class EventBuilderFactory(object): + def __init__(self, clock, hostname): + self.clock = clock + self.hostname = hostname + + self.event_id_count = 0 + + def create_event_id(self): + i = str(self.event_id_count) + self.event_id_count += 1 + + local_part = str(int(self.clock.time())) + i + random_string(5) + + e_id = EventID.create(local_part, self.hostname) + + return e_id.to_string() + + def new(self, key_values={}): + if "event_id" not in key_values: + key_values["event_id"] = self.create_event_id() + + time_now = self.clock.time_msec() + + key_values.setdefault("origin", self.hostname) + key_values.setdefault("origin_server_ts", time_now) + + if "unsigned" in key_values: + age = key_values["unsigned"].pop("age", 0) + key_values["unsigned"].setdefault("age_ts", time_now - age) + + return EventBuilder(key_values=key_values,) \ No newline at end of file -- cgit 1.4.1 From 75b4329aaad400abcee4b23e9c88ff845e0d73c7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Dec 2014 16:07:21 +0000 Subject: WIP for new way of managing events. --- synapse/api/auth.py | 13 ++-- synapse/api/constants.py | 9 +++ synapse/crypto/event_signing.py | 39 ++++++----- synapse/events/__init__.py | 141 ++++++++++++++++++++++------------------ synapse/events/builder.py | 20 ++---- synapse/events/snapshot.py | 63 ++++++++++++++++++ synapse/events/utils.py | 82 +++++++++++++++++++++++ synapse/events/validator.py | 58 +++++++++++++++++ synapse/handlers/_base.py | 51 +++++++++++++++ 9 files changed, 374 insertions(+), 102 deletions(-) create mode 100644 synapse/events/snapshot.py create mode 100644 synapse/events/utils.py create mode 100644 synapse/events/validator.py (limited to 'synapse/events') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 2b0475543d..50d4be113f 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -351,7 +351,7 @@ class Auth(object): return self.store.is_server_admin(user) @defer.inlineCallbacks - def add_auth_events(self, event): + def get_auth_events(self, event, current_state): if event.type == RoomCreateEvent.TYPE: event.auth_events = [] return @@ -359,19 +359,19 @@ class Auth(object): auth_events = [] key = (RoomPowerLevelsEvent.TYPE, "", ) - power_level_event = event.old_state_events.get(key) + power_level_event = current_state.get(key) if power_level_event: auth_events.append(power_level_event.event_id) key = (RoomJoinRulesEvent.TYPE, "", ) - join_rule_event = event.old_state_events.get(key) + join_rule_event = current_state.get(key) key = (RoomMemberEvent.TYPE, event.user_id, ) - member_event = event.old_state_events.get(key) + member_event = current_state.get(key) key = (RoomCreateEvent.TYPE, "", ) - create_event = event.old_state_events.get(key) + create_event = current_state.get(key) if create_event: auth_events.append(create_event.event_id) @@ -403,7 +403,8 @@ class Auth(object): } for h in hashes ] - event.auth_events = zip(auth_events, hashes) + + defer.returnValue(zip(auth_events, hashes)) @log_function def _can_send_event(self, event, auth_events): diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 3cafff0e32..acf50e42ab 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -59,3 +59,12 @@ class LoginType(object): EMAIL_URL = u"m.login.email.url" EMAIL_IDENTITY = u"m.login.email.identity" RECAPTCHA = u"m.login.recaptcha" + + +class EventTypes(object): + Member = "m.room.member" + Create = "m.room.create" + JoinRules = "m.room.join_rules" + PowerLevels = "m.room.power_levels" + Aliases = "m.room.aliases" + Redaction = "m.room.redaction" \ No newline at end of file diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index a9d8953239..209f9d73fe 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -29,17 +29,17 @@ logger = logging.getLogger(__name__) def check_event_content_hash(event, hash_algorithm=hashlib.sha256): """Check whether the hash for this PDU matches the contents""" - computed_hash = _compute_content_hash(event, hash_algorithm) - logger.debug("Expecting hash: %s", encode_base64(computed_hash.digest())) - if computed_hash.name not in event.hashes: + name, expected_hash = compute_content_hash(event, hash_algorithm) + logger.debug("Expecting hash: %s", encode_base64(expected_hash)) + if name not in event.hashes: raise SynapseError( 400, "Algorithm %s not in hashes %s" % ( - computed_hash.name, list(event.hashes), + name, list(event.hashes), ), Codes.UNAUTHORIZED, ) - message_hash_base64 = event.hashes[computed_hash.name] + message_hash_base64 = event.hashes[name.name] try: message_hash_bytes = decode_base64(message_hash_base64) except: @@ -48,10 +48,10 @@ def check_event_content_hash(event, hash_algorithm=hashlib.sha256): "Invalid base64: %s" % (message_hash_base64,), Codes.UNAUTHORIZED, ) - return message_hash_bytes == computed_hash.digest() + return message_hash_bytes == expected_hash -def _compute_content_hash(event, hash_algorithm): +def compute_content_hash(event, hash_algorithm): event_json = event.get_pdu_json() event_json.pop("age_ts", None) event_json.pop("unsigned", None) @@ -59,8 +59,11 @@ def _compute_content_hash(event, hash_algorithm): event_json.pop("hashes", None) event_json.pop("outlier", None) event_json.pop("destinations", None) + event_json_bytes = encode_canonical_json(event_json) - return hash_algorithm(event_json_bytes) + + hashed = hash_algorithm(event_json_bytes) + return (hashed.name, hashed.digest()) def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256): @@ -86,20 +89,20 @@ def compute_event_signature(event, signature_name, signing_key): def add_hashes_and_signatures(event, signature_name, signing_key, hash_algorithm=hashlib.sha256): - if hasattr(event, "old_state_events"): - state_json_bytes = encode_canonical_json( - [e.event_id for e in event.old_state_events.values()] - ) - hashed = hash_algorithm(state_json_bytes) - event.state_hash = { - hashed.name: encode_base64(hashed.digest()) - } + # if hasattr(event, "old_state_events"): + # state_json_bytes = encode_canonical_json( + # [e.event_id for e in event.old_state_events.values()] + # ) + # hashed = hash_algorithm(state_json_bytes) + # event.state_hash = { + # hashed.name: encode_base64(hashed.digest()) + # } - hashed = _compute_content_hash(event, hash_algorithm=hash_algorithm) + name, digest = compute_content_hash(event, hash_algorithm=hash_algorithm) if not hasattr(event, "hashes"): event.hashes = {} - event.hashes[hashed.name] = encode_base64(hashed.digest()) + event.hashes[name] = encode_base64(digest) event.signatures = compute_event_signature( event, diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index eefc9d3b30..6748198917 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -16,6 +16,21 @@ from frozendict import frozendict +def _freeze(o): + if isinstance(o, dict): + return frozendict({k: _freeze(v) for k,v in o.items()}) + + if isinstance(o, basestring): + return o + + try: + return tuple([_freeze(i) for i in o]) + except TypeError: + pass + + return o + + class _EventInternalMetadata(object): def __init__(self, internal_metadata_dict): self.__dict__ = internal_metadata_dict @@ -24,78 +39,47 @@ class _EventInternalMetadata(object): return dict(self.__dict__) -class Event(object): - def __init__(self, event_dict, internal_metadata_dict={}): - self._signatures = event_dict.get("signatures", {}) - self._unsigned = event_dict.get("unsigned", {}) +def _event_dict_property(key): + def getter(self): + return self._event_dict[key] - self._original = { - k: v - for k, v in event_dict.items() - if k not in ["signatures", "unsigned"] - } + def setter(self, v): + self._event_dict[key] = v - self._event_dict = frozendict(self._original) + def delete(self): + del self._event_dict[key] - self.internal_metadata = _EventInternalMetadata( - internal_metadata_dict + return property( + getter, + setter, + delete, ) - @property - def auth_events(self): - return self._event_dict["auth_events"] - - @property - def content(self): - return self._event_dict["content"] - - @property - def event_id(self): - return self._event_dict["event_id"] - - @property - def hashes(self): - return self._event_dict["hashes"] - - @property - def origin(self): - return self._event_dict["origin"] - - @property - def prev_events(self): - return self._event_dict["prev_events"] - - @property - def prev_state(self): - return self._event_dict["prev_state"] - - @property - def room_id(self): - return self._event_dict["room_id"] - - @property - def signatures(self): - return self._signatures - @property - def state_key(self): - return self._event_dict["state_key"] +class EventBase(object): + def __init__(self, event_dict, signatures={}, unsigned={}, + internal_metadata_dict={}): + self.signatures = signatures + self.unsigned = unsigned - @property - def type(self): - return self._event_dict["type"] + self._event_dict = event_dict - @property - def unsigned(self): - return self._unsigned - - @property - def user_id(self): - return self._event_dict["sender"] + self.internal_metadata = _EventInternalMetadata( + internal_metadata_dict + ) - @property - def sender(self): - return self._event_dict["sender"] + auth_events = _event_dict_property("auth_events") + content = _event_dict_property("content") + event_id = _event_dict_property("event_id") + hashes = _event_dict_property("hashes") + origin = _event_dict_property("origin") + prev_events = _event_dict_property("prev_events") + prev_state = _event_dict_property("prev_state") + room_id = _event_dict_property("room_id") + sender = _event_dict_property("sender") + state_key = _event_dict_property("state_key") + type = _event_dict_property("type") + user_id = _event_dict_property("sender") def get_dict(self): d = dict(self._original) @@ -117,4 +101,33 @@ class Event(object): pdu_json.setdefault("unsigned", {})["age"] = int(age) del pdu_json["unsigned"]["age_ts"] - return pdu_json \ No newline at end of file + return pdu_json + + def __set__(self, instance, value): + raise AttributeError("Unrecognized attribute %s" % (instance,)) + + +class FrozenEvent(EventBase): + def __init__(self, event_dict, signatures={}, unsigned={}): + event_dict = dict(event_dict) + + signatures.update(event_dict.pop("signatures", {})) + unsigned.update(event_dict.pop("unsigned", {})) + + frozen_dict = _freeze(event_dict) + + super(FrozenEvent, self).__init__( + frozen_dict, + signatures=signatures, + unsigned=unsigned + ) + + @staticmethod + def from_event(event): + e = FrozenEvent( + event.event_dict() + ) + + e.internal_metadata = event.internal_metadata + + return e diff --git a/synapse/events/builder.py b/synapse/events/builder.py index d741795bc5..39b4d2a2ab 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -13,32 +13,24 @@ # See the License for the specific language governing permissions and # limitations under the License. -from . import Event +from . import EventBase, FrozenEvent from synapse.types import EventID from synapse.util.stringutils import random_string -class EventBuilder(object): +class EventBuilder(EventBase): def __init__(self, key_values={}): - self._event_dict = dict(key_values) - self._metadata = {} - - def update_event_key(self, key, value): - self._event_dict[key] = value + super(FrozenEvent, self).__init__( + key_values, + ) def update_event_keys(self, other_dict): self._event_dict.update(other_dict) - def update_internal_key(self, key, value): - self._metadata[key] = value - def build(self): - return Event( - self._event_dict, - self._metadata, - ) + return FrozenEvent.from_event(self) class EventBuilderFactory(object): diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py new file mode 100644 index 0000000000..ca15ec09ae --- /dev/null +++ b/synapse/events/snapshot.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + + +class EventSnapshot(object): + def __init__(self, prev_events, depth, current_state, + current_state_group): + self._prev_events = prev_events + self._depth = depth + self._current_state = current_state + self._current_state_group = current_state_group + + +class EventCache(object): + def __init__(self, store): + self._store = store + + self._cache = {} + + @defer.inlineCallbacks + def load_event(self, event_id): + event = self._cache.get(event_id, None) + + if not event: + event = yield self._store.get_event( + event_id, + allow_none=True + ) + + if event: + self._cache[event_id] = event + + defer.returnValue(event) + + def load_event_from_cache(self, event_id): + return self._cache.get(event_id, None) + + def add_to_cache(self, *events): + self._cache.update({ + event.event_id: event + for event in events + }) + + +class EventContext(object): + + def __init__(self, current_state, auth_events): + self.current_state = current_state + self.auth_events = auth_events diff --git a/synapse/events/utils.py b/synapse/events/utils.py new file mode 100644 index 0000000000..412f690f08 --- /dev/null +++ b/synapse/events/utils.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.api.constants import EventTypes + + +def prune_event(event): + """ Returns a pruned version of the given event, which removes all keys we + don't know about or think could potentially be dodgy. + + This is used when we "redact" an event. We want to remove all fields that + the user has specified, but we do want to keep necessary information like + type, state_key etc. + """ + event_type = event.type + + allowed_keys = [ + "event_id", + "sender", + "room_id", + "hashes", + "signatures", + "content", + "type", + "state_key", + "depth", + "prev_events", + "prev_state", + "auth_events", + "origin", + "origin_server_ts", + ] + + new_content = {} + + def add_fields(*fields): + for field in fields: + if field in event.content: + new_content[field] = event.content[field] + + if event_type == EventTypes.Member: + add_fields("membership") + elif event_type == EventTypes.Create: + add_fields("creator") + elif event_type == EventTypes.JoinRules: + add_fields("join_rule") + elif event_type == EventTypes.PowerLevels: + add_fields( + "users", + "users_default", + "events", + "events_default", + "events_default", + "state_default", + "ban", + "kick", + "redact", + ) + elif event_type == EventTypes.Aliases: + add_fields("aliases") + + allowed_fields = { + k: v + for k, v in event.get_dict().items() + if k in allowed_keys + } + + allowed_fields["content"] = new_content + + return type(event)(allowed_fields) diff --git a/synapse/events/validator.py b/synapse/events/validator.py new file mode 100644 index 0000000000..7dc9506ec4 --- /dev/null +++ b/synapse/events/validator.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.types import EventID, RoomID, UserID +from synapse.api.errors import SynapseError + + +class EventValidator(object): + + def validate(self, event): + EventID.from_string(event.event_id) + RoomID.from_string(event.room_id) + + hasattr(event, "auth_events") + hasattr(event, "content") + hasattr(event, "hashes") + hasattr(event, "origin") + hasattr(event, "prev_events") + hasattr(event, "prev_events") + hasattr(event, "sender") + hasattr(event, "type") + + # Check that the following keys have string values + strings = [ + "origin", + "sender", + "type", + ] + + if hasattr(event, "state_key"): + strings.append("state_key") + + for s in strings: + if not isinstance(getattr(event, s), basestring): + raise SynapseError(400, "Not '%s' a string type" % (s,)) + + # Check that the following keys have dictionary values + # TODO + + # Check that the following keys have the correct format for DAGs + # TODO + + def validate_new(self, event): + self.validate(event) + + UserID.from_string(event.sender) \ No newline at end of file diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 15adc9dc2c..a45715bf60 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -21,6 +21,8 @@ from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.api.events.room import RoomMemberEvent from synapse.api.constants import Membership +from synapse.events.snapshot import EventSnapshot, EventContext + import logging @@ -56,6 +58,55 @@ class BaseHandler(object): retry_after_ms=int(1000*(time_allowed - time_now)), ) + @defer.inlineCallbacks + def _handle_new_client_event(self, builder): + latest_ret = yield self.store.get_latest_events_in_room( + builder.room_id, + ) + + depth = max([d for _, _, d in latest_ret]) + prev_events = [(e, h) for e, h, _ in latest_ret] + + group, curr_state = yield self.state_handler.resolve_state_groups( + [e for e, _ in prev_events] + ) + + snapshot = EventSnapshot( + prev_events=prev_events, + depth=depth, + current_state=curr_state, + current_state_group=group, + ) + + builder.prev_events = prev_events + builder.depth = depth + + auth_events = yield self.auth.get_event_auth(builder, curr_state) + + builder.update_event_key("auth_events", auth_events) + + add_hashes_and_signatures( + builder, self.server_name, self.signing_key + ) + + event = builder.build() + + auth_ids = zip(*auth_events)[0] + curr_auth_events = { + k: v + for k, v in curr_state + if v.event_id in auth_ids + } + + context = EventContext( + current_state=curr_state, + auth_events=curr_auth_events, + ) + + self.auth.check(event, auth_events=context.auth_events) + + + @defer.inlineCallbacks def _on_new_room_event(self, event, snapshot, extra_destinations=[], extra_users=[], suppress_auth=False, -- cgit 1.4.1 From 5d7c9ab7898f2721aa3f60ab76c53dc44322be77 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Dec 2014 11:27:59 +0000 Subject: Begin converting things to use the new Event structure --- synapse/api/auth.py | 11 ++----- synapse/events/__init__.py | 3 ++ synapse/federation/replication.py | 12 +++----- synapse/handlers/_base.py | 65 +++++++++++++++++++++++++++++++-------- synapse/handlers/federation.py | 9 ++---- synapse/handlers/room.py | 1 + synapse/state.py | 18 ++++++++--- synapse/storage/signatures.py | 16 ++++++++++ 8 files changed, 96 insertions(+), 39 deletions(-) (limited to 'synapse/events') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 50d4be113f..5261c3e3bf 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -393,18 +393,11 @@ class Auth(object): if member_event.content["membership"] == Membership.JOIN: auth_events.append(member_event.event_id) - hashes = yield self.store.get_event_reference_hashes( + auth_events = yield self.store.add_event_hashes( auth_events ) - hashes = [ - { - k: encode_base64(v) for k, v in h.items() - if k == "sha256" - } - for h in hashes - ] - defer.returnValue(zip(auth_events, hashes)) + defer.returnValue(auth_events) @log_function def _can_send_event(self, event, auth_events): diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 6748198917..6a05ba2d16 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -81,6 +81,9 @@ class EventBase(object): type = _event_dict_property("type") user_id = _event_dict_property("sender") + def is_state(self): + return hasattr(self, "state_key") + def get_dict(self): d = dict(self._original) d.update({ diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 01f87fe423..bd56a4c108 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -112,7 +112,7 @@ class ReplicationLayer(object): self.query_handlers[query_type] = handler @log_function - def send_pdu(self, pdu): + def send_pdu(self, pdu, destinations): """Informs the replication layer about a new PDU generated within the home server that should be transmitted to others. @@ -131,7 +131,7 @@ class ReplicationLayer(object): logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id) # TODO, add errback, etc. - self._transaction_queue.enqueue_pdu(pdu, order) + self._transaction_queue.enqueue_pdu(pdu, destinations, order) logger.debug( "[%s] transaction_layer.enqueue_pdu... done", @@ -705,15 +705,13 @@ class _TransactionQueue(object): @defer.inlineCallbacks @log_function - def enqueue_pdu(self, pdu, order): + def enqueue_pdu(self, pdu, destinations, order): # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. - destinations = set([ - d for d in pdu.destinations - if d != self.server_name - ]) + destinations = set(destinations) + destinations.remove(self.server_name) logger.debug("Sending to: %s", str(destinations)) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index a45715bf60..890b51be30 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -15,11 +15,11 @@ from twisted.internet import defer -from synapse.api.errors import LimitExceededError +from synapse.api.errors import LimitExceededError, SynapseError from synapse.util.async import run_on_reactor from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.api.events.room import RoomMemberEvent -from synapse.api.constants import Membership +from synapse.api.constants import Membership, EventTypes from synapse.events.snapshot import EventSnapshot, EventContext @@ -59,7 +59,7 @@ class BaseHandler(object): ) @defer.inlineCallbacks - def _handle_new_client_event(self, builder): + def _create_new_client_event(self, builder): latest_ret = yield self.store.get_latest_events_in_room( builder.room_id, ) @@ -67,16 +67,27 @@ class BaseHandler(object): depth = max([d for _, _, d in latest_ret]) prev_events = [(e, h) for e, h, _ in latest_ret] - group, curr_state = yield self.state_handler.resolve_state_groups( - [e for e, _ in prev_events] - ) + state_handler = self.state_handler + if builder.is_state(): + ret = yield state_handler.resolve_state_groups( + [e for e, _ in prev_events], + event_type=builder.event_type, + state_key=builder.state_key, + ) - snapshot = EventSnapshot( - prev_events=prev_events, - depth=depth, - current_state=curr_state, - current_state_group=group, - ) + group, curr_state, prev_state = ret + + prev_state = yield self.store.add_event_hashes( + prev_state + ) + + builder.prev_state = prev_state + else: + group, curr_state, _ = yield state_handler.resolve_state_groups( + [e for e, _ in prev_events], + ) + + builder.internal_metadata.state_group = group builder.prev_events = prev_events builder.depth = depth @@ -103,9 +114,39 @@ class BaseHandler(object): auth_events=curr_auth_events, ) + defer.returnValue( + (event, context,) + ) + + @defer.inlineCallbacks + def _handle_new_client_event(self, event, context): + # We now need to go and hit out to wherever we need to hit out to. + self.auth.check(event, auth_events=context.auth_events) + yield self.store.persist_event(event) + destinations = set() + for k, s in context.current_state.items(): + try: + if k[0] == EventTypes.Member: + if s.content["membership"] == Membership.JOIN: + destinations.add( + self.hs.parse_userid(s.state_key).domain + ) + except SynapseError: + logger.warn( + "Failed to get destination from event %s", s.event_id + ) + + yield self.notifier.on_new_room_event(event) + + federation_handler = self.hs.get_handlers().federation_handler + yield federation_handler.handle_new_event( + event, + None, + destinations=destinations, + ) @defer.inlineCallbacks def _on_new_room_event(self, event, snapshot, extra_destinations=[], diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 925eb5376e..7bd36e415e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -76,7 +76,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def handle_new_event(self, event, snapshot): + def handle_new_event(self, event, snapshot, destinations): """ Takes in an event from the client to server side, that has already been authed and handled by the state module, and sends it to any remote home servers that may be interested. @@ -92,12 +92,7 @@ class FederationHandler(BaseHandler): yield run_on_reactor() - pdu = event - - if not hasattr(pdu, "destinations") or not pdu.destinations: - pdu.destinations = [] - - yield self.replication_layer.send_pdu(pdu) + yield self.replication_layer.send_pdu(event, destinations) @log_function @defer.inlineCallbacks diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6e1c37df03..52a9788823 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -378,6 +378,7 @@ class RoomMemberHandler(BaseHandler): else: # This is not a JOIN, so we can handle it normally. + # FIXME: This isn't idempotency. if prev_state and prev_state.membership == event.membership: # double same action, treat this event as a NOOP. defer.returnValue({}) diff --git a/synapse/state.py b/synapse/state.py index 430665f7ba..8a556a27f6 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -89,7 +89,7 @@ class StateHandler(object): ids = [e for e, _ in event.prev_events] ret = yield self.resolve_state_groups(ids) - state_group, new_state = ret + state_group, new_state, _ = ret event.old_state_events = copy.deepcopy(new_state) @@ -137,7 +137,7 @@ class StateHandler(object): @defer.inlineCallbacks @log_function - def resolve_state_groups(self, event_ids): + def resolve_state_groups(self, event_ids, event_type=None, state_key=""): """ Given a list of event_ids this method fetches the state at each event, resolves conflicts between them and returns them. @@ -156,7 +156,10 @@ class StateHandler(object): (e.type, e.state_key): e for e in state_list } - defer.returnValue((name, state)) + prev_state = state.get((event_type, state_key), None) + if prev_state: + prev_state = prev_state.event_id + defer.returnValue((name, state, [prev_state])) state = {} for group, g_state in state_groups.items(): @@ -177,6 +180,13 @@ class StateHandler(object): if len(v.values()) > 1 } + if event_type: + prev_states = conflicted_state.get( + (event_type, state_key), {} + ).keys() + else: + prev_states = [] + try: new_state = {} new_state.update(unconflicted_state) @@ -186,7 +196,7 @@ class StateHandler(object): logger.exception("Failed to resolve state") raise - defer.returnValue((None, new_state)) + defer.returnValue((None, new_state, prev_states)) def _get_power_level_from_event_state(self, event, user_id): if hasattr(event, "old_state_events") and event.old_state_events: diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index eea4f21065..e2f11c7ffc 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -15,6 +15,8 @@ from _base import SQLBaseStore +from syutil.base64util import encode_base64 + class SignatureStore(SQLBaseStore): """Persistence for event signatures and hashes""" @@ -67,6 +69,20 @@ class SignatureStore(SQLBaseStore): f ) + def add_event_hashes(self, event_ids): + hashes = yield self.store.get_event_reference_hashes( + event_ids + ) + hashes = [ + { + k: encode_base64(v) for k, v in h.items() + if k == "sha256" + } + for h in hashes + ] + + defer.returnValue(zip(event_ids, hashes)) + def _get_event_reference_hashes_txn(self, txn, event_id): """Get all the hashes for a given PDU. Args: -- cgit 1.4.1 From c31dba86ec40853f27c70ae13409ca3332052cc1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Dec 2014 15:50:01 +0000 Subject: Convert rest and handlers to use new event structure --- synapse/crypto/event_signing.py | 2 +- synapse/events/__init__.py | 8 +-- synapse/events/builder.py | 5 +- synapse/federation/replication.py | 18 ++---- synapse/handlers/_base.py | 16 +++-- synapse/handlers/directory.py | 20 +++--- synapse/handlers/federation.py | 19 +++--- synapse/handlers/message.py | 23 ++++++- synapse/handlers/profile.py | 19 +++--- synapse/handlers/room.py | 130 ++++++++++++++++---------------------- synapse/rest/base.py | 2 +- synapse/rest/room.py | 112 ++++++++++++++------------------ synapse/server.py | 8 +++ synapse/storage/signatures.py | 5 +- 14 files changed, 188 insertions(+), 199 deletions(-) (limited to 'synapse/events') diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index 209f9d73fe..b189f0bb2b 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -15,7 +15,7 @@ # limitations under the License. -from synapse.api.events.utils import prune_event +from synapse.events.utils import prune_event from syutil.jsonutil import encode_canonical_json from syutil.base64util import encode_base64, decode_base64 from syutil.crypto.jsonsign import sign_json diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 6a05ba2d16..58edf2bc8f 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -85,10 +85,10 @@ class EventBase(object): return hasattr(self, "state_key") def get_dict(self): - d = dict(self._original) + d = dict(self._event_dict) d.update({ - "signatures": self._signatures, - "unsigned": self._unsigned, + "signatures": self.signatures, + "unsigned": self.unsigned, }) return d @@ -128,7 +128,7 @@ class FrozenEvent(EventBase): @staticmethod def from_event(event): e = FrozenEvent( - event.event_dict() + event.get_pdu_json() ) e.internal_metadata = event.internal_metadata diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 39b4d2a2ab..0b8caf9318 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -22,10 +22,13 @@ from synapse.util.stringutils import random_string class EventBuilder(EventBase): def __init__(self, key_values={}): - super(FrozenEvent, self).__init__( + super(EventBuilder, self).__init__( key_values, ) + def update_event_key(self, key, value): + self._event_dict[key] = value + def update_event_keys(self, other_dict): self._event_dict.update(other_dict) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index bd56a4c108..b11df9e5c6 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -74,6 +74,7 @@ class ReplicationLayer(object): self._clock = hs.get_clock() self.event_factory = hs.get_event_factory() + self.event_builder_factory = hs.get_event_builder_factory() def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate @@ -658,19 +659,14 @@ class ReplicationLayer(object): return "" % self.server_name def event_from_pdu_json(self, pdu_json, outlier=False): - #TODO: Check we have all the PDU keys here - pdu_json.setdefault("hashes", {}) - pdu_json.setdefault("signatures", {}) - sender = pdu_json.pop("sender", None) - if sender is not None: - pdu_json["user_id"] = sender - state_hash = pdu_json.get("unsigned", {}).pop("state_hash", None) - if state_hash is not None: - pdu_json["state_hash"] = state_hash - return self.event_factory.create_event( - pdu_json["type"], outlier=outlier, **pdu_json + builder = self.event_builder_factory.new( + pdu_json ) + builder.internal_metadata = outlier + + return builder.build() + class _TransactionQueue(object): """This class makes sure we only have one transaction in flight at diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 890b51be30..4052d0e1e7 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -46,6 +46,8 @@ class BaseHandler(object): self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname + self.event_builder_factory = hs.get_event_builder_factory() + def ratelimit(self, user_id): time_now = self.clock.time() allowed, time_allowed = self.ratelimiter.send_message( @@ -92,7 +94,7 @@ class BaseHandler(object): builder.prev_events = prev_events builder.depth = depth - auth_events = yield self.auth.get_event_auth(builder, curr_state) + auth_events = yield self.auth.get_auth_events(builder, curr_state) builder.update_event_key("auth_events", auth_events) @@ -105,7 +107,7 @@ class BaseHandler(object): auth_ids = zip(*auth_events)[0] curr_auth_events = { k: v - for k, v in curr_state + for k, v in curr_state.items() if v.event_id in auth_ids } @@ -119,14 +121,16 @@ class BaseHandler(object): ) @defer.inlineCallbacks - def _handle_new_client_event(self, event, context): + def handle_new_client_event(self, event, context, extra_destinations=[], + extra_users=[], suppress_auth=False): # We now need to go and hit out to wherever we need to hit out to. - self.auth.check(event, auth_events=context.auth_events) + if not suppress_auth: + self.auth.check(event, auth_events=context.auth_events) yield self.store.persist_event(event) - destinations = set() + destinations = set(extra_destinations) for k, s in context.current_state.items(): try: if k[0] == EventTypes.Member: @@ -139,7 +143,7 @@ class BaseHandler(object): "Failed to get destination from event %s", s.event_id ) - yield self.notifier.on_new_room_event(event) + yield self.notifier.on_new_room_event(event, extra_users=extra_users) federation_handler = self.hs.get_handlers().federation_handler yield federation_handler.handle_new_event( diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index b95c4b8bf7..76fb897f20 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -148,16 +148,12 @@ class DirectoryHandler(BaseHandler): def send_room_alias_update_event(self, user_id, room_id): aliases = yield self.store.get_aliases_for_room(room_id) - event = self.event_factory.create_event( - etype=RoomAliasesEvent.TYPE, - state_key=self.hs.hostname, - room_id=room_id, - user_id=user_id, - content={"aliases": aliases}, - ) - - snapshot = yield self.store.snapshot_room(event) + msg_handler = self.hs.get_handlers().message_handler + yield msg_handler.handle_event({ + "type": RoomAliasesEvent.TYPE, + "state_key": self.hs.hostname, + "room_id": room_id, + "sender": user_id, + "content": {"aliases": aliases}, + }) - yield self._on_new_room_event( - event, snapshot, extra_users=[user_id], suppress_auth=True - ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7bd36e415e..b4a28ea3cb 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -421,16 +421,17 @@ class FederationHandler(BaseHandler): join event for the room and return that. We don *not* persist or process it until the other server has signed it and sent it back. """ - event = self.event_factory.create_event( - etype=RoomMemberEvent.TYPE, - content={"membership": Membership.JOIN}, - room_id=context, - user_id=user_id, - state_key=user_id, - ) + builder = self.event_builder_factory.new({ + "type": RoomMemberEvent.TYPE, + "content": {"membership": Membership.JOIN}, + "room_id": context, + "sender": user_id, + "state_key": user_id, + }) - snapshot = yield self.store.snapshot_room(event) - snapshot.fill_out_prev_events(event) + event, context = yield self._create_new_client_event( + builder=builder, + ) yield self.state_handler.annotate_event_with_state(event) yield self.auth.add_auth_events(event) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 269d6622e1..485d8e8179 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -15,7 +15,7 @@ from twisted.internet import defer -from synapse.api.constants import Membership +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import RoomError from synapse.streams.config import PaginationConfig from synapse.util.logcontext import PreserveLoggingContext @@ -133,6 +133,27 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) + @defer.inlineCallbacks + def handle_event(self, event_dict): + builder = self.event_builder_factory.new(event_dict) + + event, context = yield self._create_new_client_event( + builder=builder, + ) + + # TODO: self.validator.validate(event) + + if event.type == EventTypes.Member: + member_handler = self.hs.get_handlers().room_member_handler + yield member_handler.change_membership(event, context) + else: + yield self.handle_new_client_event( + event=event, + context=context, + ) + + defer.returnValue(event) + @defer.inlineCallbacks def store_room_data(self, event=None): """ Stores data for a room. diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 0116ba5358..f2abbc5df9 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -210,14 +210,11 @@ class ProfileHandler(BaseHandler): "collect_presencelike_data", user, content ) - new_event = self.event_factory.create_event( - etype=j.type, - room_id=j.room_id, - state_key=j.state_key, - content=content, - user_id=j.state_key, - ) - - yield self._on_new_room_event( - new_event, snapshot, suppress_auth=True - ) + msg_handler = self.hs.get_handlers().message_handler + yield msg_handler.handle_event({ + "type": j.type, + "room_id": j.room_id, + "state_key": j.state_key, + "content": content, + "sender": j.state_key, + }) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 52a9788823..f0ffd62b7f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -123,59 +123,37 @@ class RoomCreationHandler(BaseHandler): user, room_id, is_public=is_public ) - room_member_handler = self.hs.get_handlers().room_member_handler - - @defer.inlineCallbacks - def handle_event(event): - snapshot = yield self.store.snapshot_room(event) - - logger.debug("Event: %s", event) - - if event.type == RoomMemberEvent.TYPE: - yield room_member_handler.change_membership( - event, - do_auth=True - ) - else: - yield self._on_new_room_event( - event, snapshot, extra_users=[user], suppress_auth=True - ) + msg_handler = self.hs.get_handlers().message_handler for event in creation_events: - yield handle_event(event) + yield msg_handler.handle_event(event) if "name" in config: name = config["name"] - name_event = self.event_factory.create_event( - etype=RoomNameEvent.TYPE, - room_id=room_id, - user_id=user_id, - content={"name": name}, - ) - - yield handle_event(name_event) + yield msg_handler.handle_event({ + "type": RoomNameEvent.TYPE, + "room_id": room_id, + "sender": user_id, + "content": {"name": name}, + }) if "topic" in config: topic = config["topic"] - topic_event = self.event_factory.create_event( - etype=RoomTopicEvent.TYPE, - room_id=room_id, - user_id=user_id, - content={"topic": topic}, - ) + yield msg_handler.handle_event({ + "type": RoomTopicEvent.TYPE, + "room_id": room_id, + "sender": user_id, + "content": {"topic": topic}, + }) - yield handle_event(topic_event) - - content = {"membership": Membership.INVITE} for invitee in invite_list: - invite_event = self.event_factory.create_event( - etype=RoomMemberEvent.TYPE, - state_key=invitee, - room_id=room_id, - user_id=user_id, - content=content - ) - yield handle_event(invite_event) + yield msg_handler.handle_event({ + "type": RoomMemberEvent.TYPE, + "state_key": invitee, + "room_id": room_id, + "user_id": user_id, + "content": {"membership": Membership.INVITE}, + }) result = {"room_id": room_id} @@ -192,22 +170,25 @@ class RoomCreationHandler(BaseHandler): event_keys = { "room_id": room_id, - "user_id": creator_id, + "sender": creator_id, } - def create(etype, **content): - return self.event_factory.create_event( - etype=etype, - content=content, - **event_keys - ) + def create(etype, content): + e = { + "type": etype, + "content": content, + } + + e.update(event_keys) + + return e creation_event = create( etype=RoomCreateEvent.TYPE, - creator=creator.to_string(), + content={"creator": creator.to_string()}, ) - join_event = self.event_factory.create_event( + join_event = create( etype=RoomMemberEvent.TYPE, state_key=creator_id, content={ @@ -216,7 +197,7 @@ class RoomCreationHandler(BaseHandler): **event_keys ) - power_levels_event = self.event_factory.create_event( + power_levels_event = create( etype=RoomPowerLevelsEvent.TYPE, content={ "users": { @@ -233,13 +214,12 @@ class RoomCreationHandler(BaseHandler): "kick": 50, "redact": 50 }, - **event_keys ) join_rule = JoinRules.PUBLIC if is_public else JoinRules.INVITE join_rules_event = create( etype=RoomJoinRulesEvent.TYPE, - join_rule=join_rule, + content={"join_rule": join_rule}, ) return [ @@ -351,7 +331,7 @@ class RoomMemberHandler(BaseHandler): defer.returnValue(member) @defer.inlineCallbacks - def change_membership(self, event=None, do_auth=True): + def change_membership(self, event, context, do_auth=True): """ Change the membership status of a user in a room. Args: @@ -361,8 +341,6 @@ class RoomMemberHandler(BaseHandler): """ target_user_id = event.state_key - snapshot = yield self.store.snapshot_room(event) - ## TODO(markjh): get prev state from snapshot. prev_state = yield self.store.get_room_member( target_user_id, event.room_id @@ -374,7 +352,7 @@ class RoomMemberHandler(BaseHandler): # if this HS is not currently in the room, i.e. we have to do the # invite/join dance. if event.membership == Membership.JOIN: - yield self._do_join(event, snapshot, do_auth=do_auth) + yield self._do_join(event, context, do_auth=do_auth) else: # This is not a JOIN, so we can handle it normally. @@ -387,7 +365,7 @@ class RoomMemberHandler(BaseHandler): yield self._do_local_membership_update( event, membership=event.content["membership"], - snapshot=snapshot, + context=context, do_auth=do_auth, ) @@ -409,23 +387,21 @@ class RoomMemberHandler(BaseHandler): host = hosts[0] content.update({"membership": Membership.JOIN}) - new_event = self.event_factory.create_event( - etype=RoomMemberEvent.TYPE, - state_key=joinee.to_string(), - room_id=room_id, - user_id=joinee.to_string(), - membership=Membership.JOIN, - content=content, - ) - - snapshot = yield self.store.snapshot_room(new_event) + event, context = yield self.create_new_client_event({ + "type": RoomMemberEvent.TYPE, + "state_key": joinee.to_string(), + "room_id": room_id, + "sender": joinee.to_string(), + "membership": Membership.JOIN, + "content": content, + }) - yield self._do_join(new_event, snapshot, room_host=host, do_auth=True) + yield self._do_join(event, context, room_host=host, do_auth=True) defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def _do_join(self, event, snapshot, room_host=None, do_auth=True): + def _do_join(self, event, context, room_host=None, do_auth=True): joinee = self.hs.parse_userid(event.state_key) # room_id = RoomID.from_string(event.room_id, self.hs) room_id = event.room_id @@ -470,7 +446,7 @@ class RoomMemberHandler(BaseHandler): if should_do_dance: handler = self.hs.get_handlers().federation_handler have_joined = yield handler.do_invite_join( - room_host, room_id, event.user_id, event.content, snapshot + room_host, room_id, event.user_id, event.content, context ) # We want to do the _do_update inside the room lock. @@ -480,7 +456,7 @@ class RoomMemberHandler(BaseHandler): yield self._do_local_membership_update( event, membership=event.content["membership"], - snapshot=snapshot, + context=context, do_auth=do_auth, ) @@ -530,7 +506,7 @@ class RoomMemberHandler(BaseHandler): defer.returnValue(room_ids) @defer.inlineCallbacks - def _do_local_membership_update(self, event, membership, snapshot, + def _do_local_membership_update(self, event, membership, context, do_auth): yield run_on_reactor() @@ -543,9 +519,9 @@ class RoomMemberHandler(BaseHandler): else: do_invite_host = None - yield self._on_new_room_event( + yield self.handle_new_client_event( event, - snapshot, + context, extra_users=[target_user], suppress_auth=(not do_auth), do_invite_host=do_invite_host, diff --git a/synapse/rest/base.py b/synapse/rest/base.py index 79fc4dfb84..72bb66ddda 100644 --- a/synapse/rest/base.py +++ b/synapse/rest/base.py @@ -63,7 +63,7 @@ class RestServlet(object): self.hs = hs self.handlers = hs.get_handlers() - self.event_factory = hs.get_event_factory() + self.builder_factory = hs.get_event_builder_factory() self.auth = hs.get_auth() self.txns = HttpTransactionStore() diff --git a/synapse/rest/room.py b/synapse/rest/room.py index 3147d7a60b..3d78b4ff5c 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -117,10 +117,10 @@ class RoomStateEventRestServlet(RestServlet): self.on_PUT_no_state_key) def on_GET_no_state_key(self, request, room_id, event_type): - return self.on_GET(request, room_id, event_type, "") + return self.on_GET(request, room_id, event_type, None) def on_PUT_no_state_key(self, request, room_id, event_type): - return self.on_PUT(request, room_id, event_type, "") + return self.on_PUT(request, room_id, event_type, None) @defer.inlineCallbacks def on_GET(self, request, room_id, event_type, state_key): @@ -147,28 +147,18 @@ class RoomStateEventRestServlet(RestServlet): content = _parse_json(request) - event = self.event_factory.create_event( - etype=event_type, # already urldecoded - content=content, - room_id=urllib.unquote(room_id), - user_id=user.to_string(), - state_key=urllib.unquote(state_key) - ) - - self.validator.validate(event) + msg_handler = self.handlers.message_handler + yield msg_handler.handle_event( + { + "type": event_type, + "content": content, + "room_id": room_id, + "sender": user.to_string(), + "state_key": urllib.unquote(state_key), + } + ) - if event_type == RoomMemberEvent.TYPE: - # membership events are special - handler = self.handlers.room_member_handler - yield handler.change_membership(event) - defer.returnValue((200, {})) - else: - # store random bits of state - msg_handler = self.handlers.message_handler - yield msg_handler.store_room_data( - event=event - ) - defer.returnValue((200, {})) + defer.returnValue((200, {})) # TODO: Needs unit testing for generic events + feedback @@ -184,17 +174,15 @@ class RoomSendEventRestServlet(RestServlet): user = yield self.auth.get_user_by_req(request) content = _parse_json(request) - event = self.event_factory.create_event( - etype=urllib.unquote(event_type), - room_id=urllib.unquote(room_id), - user_id=user.to_string(), - content=content - ) - - self.validator.validate(event) - msg_handler = self.handlers.message_handler - yield msg_handler.send_message(event) + event = yield msg_handler.handle_event( + { + "type": urllib.unquote(event_type), + "content": content, + "room_id": urllib.unquote(room_id), + "sender": user.to_string(), + } + ) defer.returnValue((200, {"event_id": event.event_id})) @@ -251,18 +239,17 @@ class JoinRoomAliasServlet(RestServlet): ret_dict = yield handler.join_room_alias(user, identifier) defer.returnValue((200, ret_dict)) else: # room id - event = self.event_factory.create_event( - etype=RoomMemberEvent.TYPE, - content={"membership": Membership.JOIN}, - room_id=urllib.unquote(identifier.to_string()), - user_id=user.to_string(), - state_key=user.to_string() + msg_handler = self.handlers.message_handler + yield msg_handler.handle_event( + { + "type": RoomMemberEvent.TYPE, + "content": {"membership": Membership.JOIN}, + "room_id": urllib.unquote(identifier.to_string()), + "sender": user.to_string(), + "state_key": user.to_string(), + } ) - self.validator.validate(event) - - handler = self.handlers.room_member_handler - yield handler.change_membership(event) defer.returnValue((200, {})) @defer.inlineCallbacks @@ -414,18 +401,17 @@ class RoomMembershipRestServlet(RestServlet): if membership_action == "kick": membership_action = "leave" - event = self.event_factory.create_event( - etype=RoomMemberEvent.TYPE, - content={"membership": unicode(membership_action)}, - room_id=urllib.unquote(room_id), - user_id=user.to_string(), - state_key=state_key + msg_handler = self.handlers.message_handler + yield msg_handler.handle_event( + { + "type": RoomMemberEvent.TYPE, + "content": {"membership": unicode(membership_action)}, + "room_id": urllib.unquote(room_id), + "sender": user.to_string(), + "state_key": state_key, + } ) - self.validator.validate(event) - - handler = self.handlers.room_member_handler - yield handler.change_membership(event) defer.returnValue((200, {})) @defer.inlineCallbacks @@ -453,18 +439,16 @@ class RoomRedactEventRestServlet(RestServlet): user = yield self.auth.get_user_by_req(request) content = _parse_json(request) - event = self.event_factory.create_event( - etype=RoomRedactionEvent.TYPE, - room_id=urllib.unquote(room_id), - user_id=user.to_string(), - content=content, - redacts=urllib.unquote(event_id), - ) - - self.validator.validate(event) - msg_handler = self.handlers.message_handler - yield msg_handler.send_message(event) + event = yield msg_handler.handle_event( + { + "type": RoomRedactionEvent.TYPE, + "content": content, + "room_id": urllib.unquote(room_id), + "sender": user.to_string(), + "redacts": urllib.unquote(event_id), + } + ) defer.returnValue((200, {"event_id": event.event_id})) diff --git a/synapse/server.py b/synapse/server.py index c3b54221d6..8bc27bbc3c 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -36,6 +36,7 @@ from synapse.util.lockutils import LockManager from synapse.streams.events import EventSources from synapse.api.ratelimiting import Ratelimiter from synapse.crypto.keyring import Keyring +from synapse.events.builder import EventBuilderFactory class BaseHomeServer(object): @@ -82,6 +83,7 @@ class BaseHomeServer(object): 'ratelimiter', 'keyring', 'event_validator', + 'event_builder_factory', ] def __init__(self, hostname, **kwargs): @@ -231,6 +233,12 @@ class HomeServer(BaseHomeServer): def build_event_validator(self): return EventValidator(self) + def build_event_builder_factory(self): + return EventBuilderFactory( + clock=self.get_clock(), + hostname=self.hostname, + ) + def register_servlets(self): """ Register all servlets associated with this HomeServer. """ diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index e2f11c7ffc..3a705119fd 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from _base import SQLBaseStore from syutil.base64util import encode_base64 @@ -69,8 +71,9 @@ class SignatureStore(SQLBaseStore): f ) + @defer.inlineCallbacks def add_event_hashes(self, event_ids): - hashes = yield self.store.get_event_reference_hashes( + hashes = yield self.get_event_reference_hashes( event_ids ) hashes = [ -- cgit 1.4.1 From 6630e1b5795667fd947cc5b0d5d2b00da97325e3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Dec 2014 16:20:48 +0000 Subject: Start making more things use EventContext rather than event.* --- synapse/api/auth.py | 33 +++++---- synapse/events/__init__.py | 35 ++++++++- synapse/events/utils.py | 16 ++++ synapse/handlers/_base.py | 164 ++++++++++++++++++----------------------- synapse/handlers/federation.py | 19 +++-- synapse/server.py | 2 +- synapse/state.py | 33 +++++++++ synapse/storage/__init__.py | 23 +++--- synapse/storage/_base.py | 8 +- synapse/storage/state.py | 13 +++- 10 files changed, 212 insertions(+), 134 deletions(-) (limited to 'synapse/events') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 5261c3e3bf..3f2e58a5ef 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -351,27 +351,27 @@ class Auth(object): return self.store.is_server_admin(user) @defer.inlineCallbacks - def get_auth_events(self, event, current_state): - if event.type == RoomCreateEvent.TYPE: - event.auth_events = [] + def add_auth_events(self, builder, context): + if builder.type == RoomCreateEvent.TYPE: + builder.auth_events = [] return auth_events = [] key = (RoomPowerLevelsEvent.TYPE, "", ) - power_level_event = current_state.get(key) + power_level_event = context.current_state.get(key) if power_level_event: auth_events.append(power_level_event.event_id) key = (RoomJoinRulesEvent.TYPE, "", ) - join_rule_event = current_state.get(key) + join_rule_event = context.current_state.get(key) - key = (RoomMemberEvent.TYPE, event.user_id, ) - member_event = current_state.get(key) + key = (RoomMemberEvent.TYPE, builder.user_id, ) + member_event = context.current_state.get(key) key = (RoomCreateEvent.TYPE, "", ) - create_event = current_state.get(key) + create_event = context.current_state.get(key) if create_event: auth_events.append(create_event.event_id) @@ -381,8 +381,8 @@ class Auth(object): else: is_public = False - if event.type == RoomMemberEvent.TYPE: - e_type = event.content["membership"] + if builder.type == RoomMemberEvent.TYPE: + e_type = builder.content["membership"] if e_type in [Membership.JOIN, Membership.INVITE]: if join_rule_event: auth_events.append(join_rule_event.event_id) @@ -393,11 +393,18 @@ class Auth(object): if member_event.content["membership"] == Membership.JOIN: auth_events.append(member_event.event_id) - auth_events = yield self.store.add_event_hashes( - auth_events + auth_ids = [(a.event_id, h) for a, h in auth_events] + auth_events_entries = yield self.store.add_event_hashes( + auth_ids ) - defer.returnValue(auth_events) + builder.auth_events = auth_events_entries + + context.auth_events = { + k: v + for k, v in context.current_state.items() + if v.event_id in auth_ids + } @log_function def _can_send_event(self, event, auth_events): diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 58edf2bc8f..e81b995d39 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -17,8 +17,8 @@ from frozendict import frozendict def _freeze(o): - if isinstance(o, dict): - return frozendict({k: _freeze(v) for k,v in o.items()}) + if isinstance(o, dict) or isinstance(o, frozendict): + return frozendict({k: _freeze(v) for k, v in o.items()}) if isinstance(o, basestring): return o @@ -31,6 +31,21 @@ def _freeze(o): return o +def _unfreeze(o): + if isinstance(o, frozendict) or isinstance(o, dict): + return dict({k: _unfreeze(v) for k, v in o.items()}) + + if isinstance(o, basestring): + return o + + try: + return [_unfreeze(i) for i in o] + except TypeError: + pass + + return o + + class _EventInternalMetadata(object): def __init__(self, internal_metadata_dict): self.__dict__ = internal_metadata_dict @@ -69,6 +84,7 @@ class EventBase(object): ) auth_events = _event_dict_property("auth_events") + depth = _event_dict_property("depth") content = _event_dict_property("content") event_id = _event_dict_property("event_id") hashes = _event_dict_property("hashes") @@ -81,6 +97,10 @@ class EventBase(object): type = _event_dict_property("type") user_id = _event_dict_property("sender") + @property + def membership(self): + return self.content["membership"] + def is_state(self): return hasattr(self, "state_key") @@ -134,3 +154,14 @@ class FrozenEvent(EventBase): e.internal_metadata = event.internal_metadata return e + + def get_dict(self): + # We need to unfreeze what we return + + d = _unfreeze(self._event_dict) + d.update({ + "signatures": self.signatures, + "unsigned": self.unsigned, + }) + + return d diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 412f690f08..1b05ee0a95 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -14,6 +14,7 @@ # limitations under the License. from synapse.api.constants import EventTypes +from . import EventBase def prune_event(event): @@ -80,3 +81,18 @@ def prune_event(event): allowed_fields["content"] = new_content return type(event)(allowed_fields) + + +def serialize_event(hs, e): + # FIXME(erikj): To handle the case of presence events and the like + if not isinstance(e, EventBase): + return e + + # Should this strip out None's? + d = {k: v for k, v in e.get_dict().items()} + if "age_ts" in d["unsigned"]: + now = int(hs.get_clock().time_msec()) + d["unsigned"]["age"] = now - d["unsigned"]["age_ts"] + del d["unsigned"]["age_ts"] + + return d \ No newline at end of file diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 4052d0e1e7..810ce138ff 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -62,6 +62,8 @@ class BaseHandler(object): @defer.inlineCallbacks def _create_new_client_event(self, builder): + context = EventContext() + latest_ret = yield self.store.get_latest_events_in_room( builder.room_id, ) @@ -69,34 +71,26 @@ class BaseHandler(object): depth = max([d for _, _, d in latest_ret]) prev_events = [(e, h) for e, h, _ in latest_ret] - state_handler = self.state_handler - if builder.is_state(): - ret = yield state_handler.resolve_state_groups( - [e for e, _ in prev_events], - event_type=builder.event_type, - state_key=builder.state_key, - ) + builder.prev_events = prev_events + builder.depth = depth - group, curr_state, prev_state = ret + state_handler = self.state_handler + ret = yield state_handler.annotate_context_with_state( + builder, + context, + ) + group, prev_state = ret + if builder.is_state(): prev_state = yield self.store.add_event_hashes( prev_state ) builder.prev_state = prev_state - else: - group, curr_state, _ = yield state_handler.resolve_state_groups( - [e for e, _ in prev_events], - ) builder.internal_metadata.state_group = group - builder.prev_events = prev_events - builder.depth = depth - - auth_events = yield self.auth.get_auth_events(builder, curr_state) - - builder.update_event_key("auth_events", auth_events) + yield self.auth.add_auth_events(builder, context) add_hashes_and_signatures( builder, self.server_name, self.signing_key @@ -104,18 +98,6 @@ class BaseHandler(object): event = builder.build() - auth_ids = zip(*auth_events)[0] - curr_auth_events = { - k: v - for k, v in curr_state.items() - if v.event_id in auth_ids - } - - context = EventContext( - current_state=curr_state, - auth_events=curr_auth_events, - ) - defer.returnValue( (event, context,) ) @@ -128,7 +110,7 @@ class BaseHandler(object): if not suppress_auth: self.auth.check(event, auth_events=context.auth_events) - yield self.store.persist_event(event) + yield self.store.persist_event(event, context=context) destinations = set(extra_destinations) for k, s in context.current_state.items(): @@ -152,63 +134,63 @@ class BaseHandler(object): destinations=destinations, ) - @defer.inlineCallbacks - def _on_new_room_event(self, event, snapshot, extra_destinations=[], - extra_users=[], suppress_auth=False, - do_invite_host=None): - yield run_on_reactor() - - snapshot.fill_out_prev_events(event) - - yield self.state_handler.annotate_event_with_state(event) - - yield self.auth.add_auth_events(event) - - logger.debug("Signing event...") - - add_hashes_and_signatures( - event, self.server_name, self.signing_key - ) - - logger.debug("Signed event.") - - if not suppress_auth: - logger.debug("Authing...") - self.auth.check(event, auth_events=event.old_state_events) - logger.debug("Authed") - else: - logger.debug("Suppressed auth.") - - if do_invite_host: - federation_handler = self.hs.get_handlers().federation_handler - invite_event = yield federation_handler.send_invite( - do_invite_host, - event - ) - - # FIXME: We need to check if the remote changed anything else - event.signatures = invite_event.signatures - - yield self.store.persist_event(event) - - destinations = set(extra_destinations) - # Send a PDU to all hosts who have joined the room. - - for k, s in event.state_events.items(): - try: - if k[0] == RoomMemberEvent.TYPE: - if s.content["membership"] == Membership.JOIN: - destinations.add( - self.hs.parse_userid(s.state_key).domain - ) - except: - logger.warn( - "Failed to get destination from event %s", s.event_id - ) - - event.destinations = list(destinations) - - yield self.notifier.on_new_room_event(event, extra_users=extra_users) - - federation_handler = self.hs.get_handlers().federation_handler - yield federation_handler.handle_new_event(event, snapshot) + # @defer.inlineCallbacks + # def _on_new_room_event(self, event, snapshot, extra_destinations=[], + # extra_users=[], suppress_auth=False, + # do_invite_host=None): + # yield run_on_reactor() + # + # snapshot.fill_out_prev_events(event) + # + # yield self.state_handler.annotate_event_with_state(event) + # + # yield self.auth.add_auth_events(event) + # + # logger.debug("Signing event...") + # + # add_hashes_and_signatures( + # event, self.server_name, self.signing_key + # ) + # + # logger.debug("Signed event.") + # + # if not suppress_auth: + # logger.debug("Authing...") + # self.auth.check(event, auth_events=event.old_state_events) + # logger.debug("Authed") + # else: + # logger.debug("Suppressed auth.") + # + # if do_invite_host: + # federation_handler = self.hs.get_handlers().federation_handler + # invite_event = yield federation_handler.send_invite( + # do_invite_host, + # event + # ) + # + # # FIXME: We need to check if the remote changed anything else + # event.signatures = invite_event.signatures + # + # yield self.store.persist_event(event) + # + # destinations = set(extra_destinations) + # # Send a PDU to all hosts who have joined the room. + # + # for k, s in event.state_events.items(): + # try: + # if k[0] == RoomMemberEvent.TYPE: + # if s.content["membership"] == Membership.JOIN: + # destinations.add( + # self.hs.parse_userid(s.state_key).domain + # ) + # except: + # logger.warn( + # "Failed to get destination from event %s", s.event_id + # ) + # + # event.destinations = list(destinations) + # + # yield self.notifier.on_new_room_event(event, extra_users=extra_users) + # + # federation_handler = self.hs.get_handlers().federation_handler + # yield federation_handler.handle_new_event(event, snapshot) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b4a28ea3cb..5264e3eafc 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -17,7 +17,8 @@ from ._base import BaseHandler -from synapse.api.events.utils import prune_event +from synapse.events.snapshot import EventContext +from synapse.events.utils import prune_event from synapse.api.errors import ( AuthError, FederationError, SynapseError, StoreError, ) @@ -416,7 +417,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def on_make_join_request(self, context, user_id): + def on_make_join_request(self, room_id, user_id): """ We've received a /make_join/ request, so we create a partial join event for the room and return that. We don *not* persist or process it until the other server has signed it and sent it back. @@ -424,7 +425,7 @@ class FederationHandler(BaseHandler): builder = self.event_builder_factory.new({ "type": RoomMemberEvent.TYPE, "content": {"membership": Membership.JOIN}, - "room_id": context, + "room_id": room_id, "sender": user_id, "state_key": user_id, }) @@ -433,9 +434,7 @@ class FederationHandler(BaseHandler): builder=builder, ) - yield self.state_handler.annotate_event_with_state(event) - yield self.auth.add_auth_events(event) - self.auth.check(event, auth_events=event.old_state_events) + self.auth.check(event, auth_events=context.auth_events) pdu = event @@ -505,7 +504,9 @@ class FederationHandler(BaseHandler): """ event = pdu - event.outlier = True + context = EventContext() + + event.internal_metadata.outlier = True event.signatures.update( compute_event_signature( @@ -515,10 +516,11 @@ class FederationHandler(BaseHandler): ) ) - yield self.state_handler.annotate_event_with_state(event) + yield self.state_handler.annotate_context_with_state(event, context) yield self.store.persist_event( event, + context=context, backfilled=False, ) @@ -640,6 +642,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _handle_new_event(self, event, state=None, backfilled=False, current_state=None, fetch_missing=True): + context = EventContext() is_new_state = yield self.state_handler.annotate_event_with_state( event, old_state=state diff --git a/synapse/server.py b/synapse/server.py index 8bc27bbc3c..0d0f3af3f4 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -20,7 +20,7 @@ # Imports required for the default HomeServer() implementation from synapse.federation import initialize_http_replication -from synapse.api.events import serialize_event +from synapse.events.utils import serialize_event from synapse.api.events.factory import EventFactory from synapse.api.events.validator import EventValidator from synapse.notifier import Notifier diff --git a/synapse/state.py b/synapse/state.py index 8a556a27f6..cbb4243fad 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -135,6 +135,39 @@ class StateHandler(object): defer.returnValue(res[1].values()) + @defer.inlineCallbacks + def annotate_context_with_state(self, event, context): + if event.is_state(): + ret = yield self.resolve_state_groups( + [e for e, _ in event.prev_events], + event_type=event.event_type, + state_key=event.state_key, + ) + else: + ret = yield self.resolve_state_groups( + [e for e, _ in event.prev_events], + ) + + group, curr_state, prev_state = ret + + context.current_state = curr_state + + prev_state = yield self.store.add_event_hashes( + prev_state + ) + + if hasattr(event, "auth_events") and event.auth_events: + auth_ids = zip(*event.auth_events)[0] + context.auth_events = { + k: v + for k, v in context.current_state.items() + if v.event_id in auth_ids + } + + defer.returnValue( + (group, prev_state) + ) + @defer.inlineCallbacks @log_function def resolve_state_groups(self, event_ids, event_type=None, state_key=""): diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 205d125642..f172c2690a 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -21,6 +21,7 @@ from synapse.api.events.room import ( ) from synapse.util.logutils import log_function +from synapse.util.frozenutils import FrozenEncoder from .directory import DirectoryStore from .feedback import FeedbackStore @@ -93,8 +94,8 @@ class DataStore(RoomMemberStore, RoomStore, @defer.inlineCallbacks @log_function - def persist_event(self, event, backfilled=False, is_new_state=True, - current_state=None): + def persist_event(self, event, context, backfilled=False, + is_new_state=True, current_state=None): stream_ordering = None if backfilled: if not self.min_token_deferred.called: @@ -107,6 +108,7 @@ class DataStore(RoomMemberStore, RoomStore, "persist_event", self._persist_event_txn, event=event, + context=context, backfilled=backfilled, stream_ordering=stream_ordering, is_new_state=is_new_state, @@ -138,8 +140,9 @@ class DataStore(RoomMemberStore, RoomStore, defer.returnValue(event[0]) @log_function - def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None, - is_new_state=True, current_state=None): + def _persist_event_txn(self, txn, event, context, backfilled, + stream_ordering=None, is_new_state=True, + current_state=None): if event.type == RoomMemberEvent.TYPE: self._store_room_member_txn(txn, event) elif event.type == FeedbackEvent.TYPE: @@ -152,12 +155,12 @@ class DataStore(RoomMemberStore, RoomStore, self._store_redaction(txn, event) outlier = False - if hasattr(event, "outlier"): - outlier = event.outlier + if hasattr(event.internal_metadata, "outlier"): + outlier = event.internal_metadata.outlier event_dict = { k: v - for k, v in event.get_full_dict().items() + for k, v in event.get_dict().items() if k not in [ "redacted", "redacted_because", @@ -179,7 +182,7 @@ class DataStore(RoomMemberStore, RoomStore, "event_id": event.event_id, "type": event.type, "room_id": event.room_id, - "content": json.dumps(event.content), + "content": json.dumps(event.content, cls=FrozenEncoder), "processed": True, "outlier": outlier, "depth": event.depth, @@ -190,7 +193,7 @@ class DataStore(RoomMemberStore, RoomStore, unrec = { k: v - for k, v in event.get_full_dict().items() + for k, v in event.get_dict().items() if k not in vals.keys() and k not in [ "redacted", "redacted_because", @@ -225,7 +228,7 @@ class DataStore(RoomMemberStore, RoomStore, room_id=event.room_id, ) - self._store_state_groups_txn(txn, event) + self._store_state_groups_txn(txn, event, context) if current_state: txn.execute( diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index bb61c20150..c56c3a0b0f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -15,7 +15,8 @@ import logging from synapse.api.errors import StoreError -from synapse.api.events.utils import prune_event +from synapse.events import FrozenEvent +from synapse.events.utils import prune_event from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext, LoggingContext from syutil.base64util import encode_base64 @@ -497,10 +498,7 @@ class SQLBaseStore(object): d = json.loads(js) - ev = self.event_factory.create_event( - etype=d["type"], - **d - ) + ev = FrozenEvent(d) if hasattr(ev, "redacted") and ev.redacted: # Get the redaction event. diff --git a/synapse/storage/state.py b/synapse/storage/state.py index e0f44b3e59..b8e721ad72 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -86,11 +86,16 @@ class StateStore(SQLBaseStore): self._store_state_groups_txn, event ) - def _store_state_groups_txn(self, txn, event): - if event.state_events is None: + def _store_state_groups_txn(self, txn, event, context): + if context.current_state_events is None: return - state_group = event.state_group + state_events = context.current_state_events + + if event.is_state(): + state_events[(event.type, event.state_key)] = event + + state_group = context.state_group if not state_group: state_group = self._simple_insert_txn( txn, @@ -102,7 +107,7 @@ class StateStore(SQLBaseStore): or_ignore=True, ) - for state in event.state_events.values(): + for state in context.state_events.values(): self._simple_insert_txn( txn, table="state_groups_state", -- cgit 1.4.1 From d044121168672c657e595525af9b588c8769e9bb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Dec 2014 09:08:26 +0000 Subject: Various typos and bug fixes. --- synapse/api/auth.py | 16 ++++--- synapse/events/snapshot.py | 3 +- synapse/events/utils.py | 2 + synapse/federation/replication.py | 12 +++-- synapse/handlers/_base.py | 8 ++-- synapse/handlers/federation.py | 97 +++++++++++++++------------------------ synapse/state.py | 16 +++++-- synapse/storage/state.py | 6 +-- 8 files changed, 80 insertions(+), 80 deletions(-) (limited to 'synapse/events') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 3f2e58a5ef..821e3ba5e2 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -24,6 +24,7 @@ from synapse.api.events.room import ( RoomJoinRulesEvent, RoomCreateEvent, RoomAliasesEvent, ) from synapse.util.logutils import log_function +from synapse.util.async import run_on_reactor from syutil.base64util import encode_base64 import logging @@ -352,17 +353,19 @@ class Auth(object): @defer.inlineCallbacks def add_auth_events(self, builder, context): + yield run_on_reactor() + if builder.type == RoomCreateEvent.TYPE: builder.auth_events = [] return - auth_events = [] + auth_ids = [] key = (RoomPowerLevelsEvent.TYPE, "", ) power_level_event = context.current_state.get(key) if power_level_event: - auth_events.append(power_level_event.event_id) + auth_ids.append(power_level_event.event_id) key = (RoomJoinRulesEvent.TYPE, "", ) join_rule_event = context.current_state.get(key) @@ -373,7 +376,7 @@ class Auth(object): key = (RoomCreateEvent.TYPE, "", ) create_event = context.current_state.get(key) if create_event: - auth_events.append(create_event.event_id) + auth_ids.append(create_event.event_id) if join_rule_event: join_rule = join_rule_event.content.get("join_rule") @@ -385,15 +388,14 @@ class Auth(object): e_type = builder.content["membership"] if e_type in [Membership.JOIN, Membership.INVITE]: if join_rule_event: - auth_events.append(join_rule_event.event_id) + auth_ids.append(join_rule_event.event_id) if member_event and not is_public: - auth_events.append(member_event.event_id) + auth_ids.append(member_event.event_id) elif member_event: if member_event.content["membership"] == Membership.JOIN: - auth_events.append(member_event.event_id) + auth_ids.append(member_event.event_id) - auth_ids = [(a.event_id, h) for a, h in auth_events] auth_events_entries = yield self.store.add_event_hashes( auth_ids ) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index ca15ec09ae..e0cbacc19c 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -58,6 +58,7 @@ class EventCache(object): class EventContext(object): - def __init__(self, current_state, auth_events): + def __init__(self, current_state=None, auth_events=None): self.current_state = current_state self.auth_events = auth_events + self.state_group = None diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 1b05ee0a95..485f075406 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -95,4 +95,6 @@ def serialize_event(hs, e): d["unsigned"]["age"] = now - d["unsigned"]["age_ts"] del d["unsigned"]["age_ts"] + d["user_id"] = d.pop("sender", None) + return d \ No newline at end of file diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index b11df9e5c6..3af24ee46d 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -558,7 +558,13 @@ class ReplicationLayer(object): origin, pdu.event_id, do_auth=False ) - if existing and (not existing.outlier or pdu.outlier): + already_seen = ( + existing and ( + not existing.internal_metadata.outlier + or pdu.internal_metadata.outlier + ) + ) + if already_seen: logger.debug("Already seen pdu %s", pdu.event_id) defer.returnValue({}) return @@ -596,7 +602,7 @@ class ReplicationLayer(object): # ) # Get missing pdus if necessary. - if not pdu.outlier: + if not pdu.internal_metadata.outlier: # We only backfill backwards to the min depth. min_depth = yield self.handler.get_min_depth_for_context( pdu.room_id @@ -663,7 +669,7 @@ class ReplicationLayer(object): pdu_json ) - builder.internal_metadata = outlier + builder.internal_metadata.outlier = outlier return builder.build() diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 810ce138ff..0bff644192 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -62,6 +62,8 @@ class BaseHandler(object): @defer.inlineCallbacks def _create_new_client_event(self, builder): + yield run_on_reactor() + context = EventContext() latest_ret = yield self.store.get_latest_events_in_room( @@ -79,7 +81,7 @@ class BaseHandler(object): builder, context, ) - group, prev_state = ret + prev_state = ret if builder.is_state(): prev_state = yield self.store.add_event_hashes( @@ -88,8 +90,6 @@ class BaseHandler(object): builder.prev_state = prev_state - builder.internal_metadata.state_group = group - yield self.auth.add_auth_events(builder, context) add_hashes_and_signatures( @@ -105,6 +105,8 @@ class BaseHandler(object): @defer.inlineCallbacks def handle_new_client_event(self, event, context, extra_destinations=[], extra_users=[], suppress_auth=False): + yield run_on_reactor() + # We now need to go and hit out to wherever we need to hit out to. if not suppress_auth: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 5264e3eafc..38ee32d26e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -149,7 +149,7 @@ class FederationHandler(BaseHandler): event.room_id, self.server_name ) - if not is_in_room and not event.outlier: + if not is_in_room and not event.internal_metadata.outlier: logger.debug("Got event for room we're not in.") replication_layer = self.replication_layer @@ -160,7 +160,7 @@ class FederationHandler(BaseHandler): ) for e in auth_chain: - e.outlier = True + e.internal_metadata.outlier = True try: yield self._handle_new_event(e, fetch_missing=False) except: @@ -180,7 +180,7 @@ class FederationHandler(BaseHandler): if state: for e in state: - e.outlier = True + e.internal_metadata.outlier = True try: yield self._handle_new_event(e) except: @@ -254,11 +254,18 @@ class FederationHandler(BaseHandler): event = pdu # FIXME (erikj): Not sure this actually works :/ - yield self.state_handler.annotate_event_with_state(event) + context = EventContext() + yield self.state_handler.annotate_context_with_state(event, context) - events.append(event) + events.append( + (event, context) + ) - yield self.store.persist_event(event, backfilled=True) + yield self.store.persist_event( + event, + context=context, + backfilled=True + ) defer.returnValue(events) @@ -326,7 +333,7 @@ class FederationHandler(BaseHandler): assert(event.state_key == joinee) assert(event.room_id == room_id) - event.outlier = False + event.internal_metadata.outlier = False self.room_queues[room_id] = [] @@ -369,7 +376,7 @@ class FederationHandler(BaseHandler): pass for e in auth_chain: - e.outlier = True + e.internal_metadata.outlier = True try: yield self._handle_new_event(e, fetch_missing=False) except: @@ -380,7 +387,7 @@ class FederationHandler(BaseHandler): for e in state: # FIXME: Auth these. - e.outlier = True + e.internal_metadata.outlier = True try: yield self._handle_new_event( e, @@ -448,7 +455,7 @@ class FederationHandler(BaseHandler): """ event = pdu - event.outlier = False + event.internal_metadata.outlier = False yield self._handle_new_event(event) @@ -643,70 +650,42 @@ class FederationHandler(BaseHandler): def _handle_new_event(self, event, state=None, backfilled=False, current_state=None, fetch_missing=True): context = EventContext() - is_new_state = yield self.state_handler.annotate_event_with_state( + yield self.state_handler.annotate_context_with_state( event, old_state=state ) - if event.old_state_events: - known_ids = set( - [s.event_id for s in event.old_state_events.values()] - ) - for e_id, _ in event.auth_events: - if e_id not in known_ids: - e = yield self.store.get_event( - e_id, - allow_none=True, - ) - - if not e: - # TODO: Do some conflict res to make sure that we're - # not the ones who are wrong. - logger.info( - "Rejecting %s as %s not in %s", - event.event_id, e_id, known_ids, - ) - raise AuthError(403, "Auth events are stale") + is_new_state = not event.internal_metadata.outlier - auth_events = event.old_state_events - else: - # We need to get the auth events from somewhere. - - # TODO: Don't just hit the DBs? - - auth_events = {} - for e_id, _ in event.auth_events: + known_ids = set( + [s.event_id for s in context.auth_events.values()] + ) + for e_id, _ in event.auth_events: + if e_id not in known_ids: e = yield self.store.get_event( e_id, + context, allow_none=True, ) if not e: - e = yield self.replication_layer.get_pdu( - event.origin, e_id, outlier=True + # TODO: Do some conflict res to make sure that we're + # not the ones who are wrong. + logger.info( + "Rejecting %s as %s not in %s", + event.event_id, e_id, known_ids, ) + raise AuthError(403, "Auth events are stale") - if e and fetch_missing: - try: - yield self.on_receive_pdu(event.origin, e, False) - except: - logger.exception( - "Failed to parse auth event %s", - e_id, - ) + context.auth_events[(e.type, e.state_key)] = e - if not e: - logger.warn("Can't find auth event %s.", e_id) + if event.type == RoomMemberEvent.TYPE and not event.auth_events: + if len(event.prev_events) == 1: + c = yield self.store.get_event(event.prev_events[0][0]) + if c.type == RoomCreateEvent.TYPE: + context.auth_events[(c.type, c.state_key)] = c - auth_events[(e.type, e.state_key)] = e - - if event.type == RoomMemberEvent.TYPE and not event.auth_events: - if len(event.prev_events) == 1: - c = yield self.store.get_event(event.prev_events[0][0]) - if c.type == RoomCreateEvent.TYPE: - auth_events[(c.type, c.state_key)] = c - - self.auth.check(event, auth_events=auth_events) + self.auth.check(event, auth_events=context.auth_events) yield self.store.persist_event( event, diff --git a/synapse/state.py b/synapse/state.py index cbb4243fad..464cbae564 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -136,7 +136,16 @@ class StateHandler(object): defer.returnValue(res[1].values()) @defer.inlineCallbacks - def annotate_context_with_state(self, event, context): + def annotate_context_with_state(self, event, context, old_state=None): + yield run_on_reactor() + + if old_state: + context.current_state = { + (s.type, s.state_key): s for s in old_state + } + context.state_group = None + defer.returnValue([]) + if event.is_state(): ret = yield self.resolve_state_groups( [e for e, _ in event.prev_events], @@ -151,6 +160,7 @@ class StateHandler(object): group, curr_state, prev_state = ret context.current_state = curr_state + context.state_group = group prev_state = yield self.store.add_event_hashes( prev_state @@ -164,9 +174,7 @@ class StateHandler(object): if v.event_id in auth_ids } - defer.returnValue( - (group, prev_state) - ) + defer.returnValue(prev_state) @defer.inlineCallbacks @log_function diff --git a/synapse/storage/state.py b/synapse/storage/state.py index b8e721ad72..afe3e5edea 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -87,10 +87,10 @@ class StateStore(SQLBaseStore): ) def _store_state_groups_txn(self, txn, event, context): - if context.current_state_events is None: + if context.current_state is None: return - state_events = context.current_state_events + state_events = context.current_state if event.is_state(): state_events[(event.type, event.state_key)] = event @@ -107,7 +107,7 @@ class StateStore(SQLBaseStore): or_ignore=True, ) - for state in context.state_events.values(): + for state in state_events.values(): self._simple_insert_txn( txn, table="state_groups_state", -- cgit 1.4.1 From 609c31e8dfa23bce3b34500f28df4f8eaf740a91 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Dec 2014 17:50:56 +0000 Subject: More bug fixes --- synapse/crypto/event_signing.py | 2 +- synapse/events/__init__.py | 9 +-------- synapse/handlers/federation.py | 39 +++++++++++++++++++++++---------------- synapse/handlers/room.py | 11 ++++++++--- synapse/state.py | 4 ++-- 5 files changed, 35 insertions(+), 30 deletions(-) (limited to 'synapse/events') diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index b189f0bb2b..15de0f5ae3 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -39,7 +39,7 @@ def check_event_content_hash(event, hash_algorithm=hashlib.sha256): ), Codes.UNAUTHORIZED, ) - message_hash_base64 = event.hashes[name.name] + message_hash_base64 = event.hashes[name] try: message_hash_bytes = decode_base64(message_hash_base64) except: diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index e81b995d39..230daf30d6 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -157,11 +157,4 @@ class FrozenEvent(EventBase): def get_dict(self): # We need to unfreeze what we return - - d = _unfreeze(self._event_dict) - d.update({ - "signatures": self.signatures, - "unsigned": self.unsigned, - }) - - return d + return _unfreeze(super(FrozenEvent, self).get_dict()) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 38ee32d26e..2d015ccce6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -337,23 +337,29 @@ class FederationHandler(BaseHandler): self.room_queues[room_id] = [] + builder = self.event_builder_factory.new( + event.get_pdu_json() + ) + try: - event.event_id = self.event_factory.create_event_id() - event.origin = self.hs.hostname - event.content = content + builder.event_id = self.event_factory.create_event_id() + builder.origin = self.hs.hostname + builder.content = content if not hasattr(event, "signatures"): - event.signatures = {} + builder.signatures = {} add_hashes_and_signatures( - event, + builder, self.hs.hostname, self.hs.config.signing_key[0], ) + new_event = builder.build() + ret = yield self.replication_layer.send_join( target_host, - event + new_event ) state = ret["state"] @@ -363,7 +369,7 @@ class FederationHandler(BaseHandler): logger.debug("do_invite_join auth_chain: %s", auth_chain) logger.debug("do_invite_join state: %s", state) - logger.debug("do_invite_join event: %s", event) + logger.debug("do_invite_join event: %s", new_event) try: yield self.store.store_room( @@ -400,13 +406,13 @@ class FederationHandler(BaseHandler): ) yield self._handle_new_event( - event, + new_event, state=state, current_state=state, ) yield self.notifier.on_new_room_event( - event, extra_users=[joinee] + new_event, extra_users=[joinee] ) logger.debug("Finished joining %s to %s", joinee, room_id) @@ -457,7 +463,7 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - yield self._handle_new_event(event) + context = yield self._handle_new_event(event) extra_users = [] if event.type == RoomMemberEvent.TYPE: @@ -480,7 +486,7 @@ class FederationHandler(BaseHandler): destinations = set() - for k, s in event.state_events.items(): + for k, s in context.current_state.items(): try: if k[0] == RoomMemberEvent.TYPE: if s.content["membership"] == Membership.JOIN: @@ -492,14 +498,12 @@ class FederationHandler(BaseHandler): "Failed to get destination from event %s", s.event_id ) - new_pdu.destinations = list(destinations) - - yield self.replication_layer.send_pdu(new_pdu) + yield self.replication_layer.send_pdu(new_pdu, destinations) auth_chain = yield self.store.get_auth_chain(event.event_id) defer.returnValue({ - "state": event.state_events.values(), + "state": context.current_state.values(), "auth_chain": auth_chain, }) @@ -652,6 +656,7 @@ class FederationHandler(BaseHandler): context = EventContext() yield self.state_handler.annotate_context_with_state( event, + context, old_state=state ) @@ -664,7 +669,6 @@ class FederationHandler(BaseHandler): if e_id not in known_ids: e = yield self.store.get_event( e_id, - context, allow_none=True, ) @@ -689,7 +693,10 @@ class FederationHandler(BaseHandler): yield self.store.persist_event( event, + context=context, backfilled=backfilled, is_new_state=(is_new_state and not backfilled), current_state=current_state, ) + + defer.returnValue(context) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 4f4b275290..6da084b3ac 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -171,6 +171,7 @@ class RoomCreationHandler(BaseHandler): event_keys = { "room_id": room_id, "sender": creator_id, + "state_key": "", } def create(etype, content, **kwargs): @@ -187,7 +188,6 @@ class RoomCreationHandler(BaseHandler): creation_event = create( etype=RoomCreateEvent.TYPE, content={"creator": creator.to_string()}, - state_key="", ) join_event = create( @@ -388,7 +388,7 @@ class RoomMemberHandler(BaseHandler): host = hosts[0] content.update({"membership": Membership.JOIN}) - event, context = yield self.create_new_client_event({ + builder = self.event_builder_factory.new({ "type": RoomMemberEvent.TYPE, "state_key": joinee.to_string(), "room_id": room_id, @@ -396,6 +396,7 @@ class RoomMemberHandler(BaseHandler): "membership": Membership.JOIN, "content": content, }) + event, context = yield self._create_new_client_event(builder) yield self._do_join(event, context, room_host=host, do_auth=True) @@ -442,7 +443,11 @@ class RoomMemberHandler(BaseHandler): if should_do_dance: handler = self.hs.get_handlers().federation_handler have_joined = yield handler.do_invite_join( - room_host, room_id, event.user_id, event.content, context + room_host, + room_id, + event.user_id, + event.get_dict()["content"], # FIXME To get a non-frozen dict + context ) # We want to do the _do_update inside the room lock. diff --git a/synapse/state.py b/synapse/state.py index 19b408db45..d1d6f95246 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -160,13 +160,13 @@ class StateHandler(object): group, curr_state, prev_state = ret context.current_state = curr_state - context.state_group = group + context.state_group = group if not event.is_state() else None prev_state = yield self.store.add_event_hashes( prev_state ) - if hasattr(event, "auth_events") and event.auth_events: + if hasattr(event, "auth_events"): auth_ids = zip(*event.auth_events)[0] context.auth_events = { k: v -- cgit 1.4.1 From 8c4845068298cf6e2b8a662ffa2340b655abf9b6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Dec 2014 10:58:31 +0000 Subject: Add PEP8 newlines --- synapse/events/builder.py | 2 +- synapse/events/utils.py | 2 +- synapse/events/validator.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 0b8caf9318..127b8fa904 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -66,4 +66,4 @@ class EventBuilderFactory(object): age = key_values["unsigned"].pop("age", 0) key_values["unsigned"].setdefault("age_ts", time_now - age) - return EventBuilder(key_values=key_values,) \ No newline at end of file + return EventBuilder(key_values=key_values,) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 485f075406..f5e135e3d0 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -97,4 +97,4 @@ def serialize_event(hs, e): d["user_id"] = d.pop("sender", None) - return d \ No newline at end of file + return d diff --git a/synapse/events/validator.py b/synapse/events/validator.py index 7dc9506ec4..f319072d37 100644 --- a/synapse/events/validator.py +++ b/synapse/events/validator.py @@ -55,4 +55,4 @@ class EventValidator(object): def validate_new(self, event): self.validate(event) - UserID.from_string(event.sender) \ No newline at end of file + UserID.from_string(event.sender) -- cgit 1.4.1 From 95aa903ffa77effcbca2a510744c3c3fa9b46ed3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Dec 2014 10:06:12 +0000 Subject: Try and figure out how and why signatures are being changed. --- synapse/crypto/event_signing.py | 3 ++- synapse/events/__init__.py | 18 +++++++------ synapse/events/builder.py | 7 ++--- synapse/federation/replication.py | 9 ++++--- synapse/handlers/federation.py | 54 ++++++++++++++++++++++++++++++++++++--- synapse/handlers/message.py | 1 - synapse/handlers/room.py | 7 ++--- synapse/state.py | 11 ++++++++ synapse/storage/__init__.py | 10 -------- 9 files changed, 86 insertions(+), 34 deletions(-) (limited to 'synapse/events') diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index 15de0f5ae3..21c19c971d 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -82,8 +82,9 @@ def compute_event_signature(event, signature_name, signing_key): redact_json = tmp_event.get_pdu_json() redact_json.pop("age_ts", None) redact_json.pop("unsigned", None) - logger.debug("Signing event: %s", redact_json) + logger.debug("Signing event: %s", encode_canonical_json(redact_json)) redact_json = sign_json(redact_json, signature_name, signing_key) + logger.debug("Signed event: %s", encode_canonical_json(redact_json)) return redact_json["signatures"] diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 230daf30d6..ed02138706 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -15,6 +15,8 @@ from frozendict import frozendict +import copy + def _freeze(o): if isinstance(o, dict) or isinstance(o, frozendict): @@ -48,7 +50,7 @@ def _unfreeze(o): class _EventInternalMetadata(object): def __init__(self, internal_metadata_dict): - self.__dict__ = internal_metadata_dict + self.__dict__ = copy.deepcopy(internal_metadata_dict) def get_dict(self): return dict(self.__dict__) @@ -74,10 +76,10 @@ def _event_dict_property(key): class EventBase(object): def __init__(self, event_dict, signatures={}, unsigned={}, internal_metadata_dict={}): - self.signatures = signatures - self.unsigned = unsigned + self.signatures = copy.deepcopy(signatures) + self.unsigned = copy.deepcopy(unsigned) - self._event_dict = event_dict + self._event_dict = copy.deepcopy(event_dict) self.internal_metadata = _EventInternalMetadata( internal_metadata_dict @@ -131,11 +133,11 @@ class EventBase(object): class FrozenEvent(EventBase): - def __init__(self, event_dict, signatures={}, unsigned={}): - event_dict = dict(event_dict) + def __init__(self, event_dict): + event_dict = copy.deepcopy(event_dict) - signatures.update(event_dict.pop("signatures", {})) - unsigned.update(event_dict.pop("unsigned", {})) + signatures = copy.deepcopy(event_dict.pop("signatures", {})) + unsigned = copy.deepcopy(event_dict.pop("unsigned", {})) frozen_dict = _freeze(event_dict) diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 127b8fa904..642264e9f3 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -54,10 +54,9 @@ class EventBuilderFactory(object): return e_id.to_string() def new(self, key_values={}): - if "event_id" not in key_values: - key_values["event_id"] = self.create_event_id() + key_values["event_id"] = self.create_event_id() - time_now = self.clock.time_msec() + time_now = int(self.clock.time_msec()) key_values.setdefault("origin", self.hostname) key_values.setdefault("origin_server_ts", time_now) @@ -66,4 +65,6 @@ class EventBuilderFactory(object): age = key_values["unsigned"].pop("age", 0) key_values["unsigned"].setdefault("age_ts", time_now - age) + key_values["signatures"] = {} + return EventBuilder(key_values=key_values,) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index d4cd79b7ac..a4600b0b40 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -25,6 +25,7 @@ from .persistence import TransactionActions from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext +from synapse.events import FrozenEvent import logging @@ -439,7 +440,9 @@ class ReplicationLayer(object): @defer.inlineCallbacks def on_send_join_request(self, origin, content): + logger.debug("on_send_join_request: content: %s", content) pdu = self.event_from_pdu_json(content) + logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures) res_pdus = yield self.handler.on_send_join_request(origin, pdu) time_now = self._clock.time_msec() defer.returnValue((200, { @@ -665,13 +668,13 @@ class ReplicationLayer(object): return "" % self.server_name def event_from_pdu_json(self, pdu_json, outlier=False): - builder = self.event_builder_factory.new( + event = FrozenEvent( pdu_json ) - builder.internal_metadata.outlier = outlier + event.internal_metadata.outlier = outlier - return builder.build() + return event class _TransactionQueue(object): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 827c86c9da..9ae3e5eca4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -459,10 +459,22 @@ class FederationHandler(BaseHandler): """ event = pdu + logger.debug( + "on_send_join_request: Got event: %s, signatures: %s", + event.event_id, + event.signatures, + ) + event.internal_metadata.outlier = False context = yield self._handle_new_event(event) + logger.debug( + "on_send_join_request: After _handle_new_event: %s, sigs: %s", + event.event_id, + event.signatures, + ) + extra_users = [] if event.type == RoomMemberEvent.TYPE: target_user_id = event.state_key @@ -496,6 +508,12 @@ class FederationHandler(BaseHandler): "Failed to get destination from event %s", s.event_id ) + logger.debug( + "on_send_join_request: Sending event: %s, signatures: %s", + event.event_id, + event.signatures, + ) + yield self.replication_layer.send_pdu(new_pdu, destinations) auth_chain = yield self.store.get_auth_chain(event.event_id) @@ -652,12 +670,23 @@ class FederationHandler(BaseHandler): def _handle_new_event(self, event, state=None, backfilled=False, current_state=None, fetch_missing=True): context = EventContext() + + logger.debug( + "_handle_new_event: Before annotate: %s, sigs: %s", + event.event_id, event.signatures, + ) + yield self.state_handler.annotate_context_with_state( event, context, old_state=state ) + logger.debug( + "_handle_new_event: Before auth fetch: %s, sigs: %s", + event.event_id, event.signatures, + ) + is_new_state = not event.internal_metadata.outlier known_ids = set( @@ -666,29 +695,43 @@ class FederationHandler(BaseHandler): for e_id, _ in event.auth_events: if e_id not in known_ids: e = yield self.store.get_event( - e_id, - allow_none=True, + e_id, allow_none=True, ) if not e: # TODO: Do some conflict res to make sure that we're # not the ones who are wrong. logger.info( - "Rejecting %s as %s not in %s", + "Rejecting %s as %s not in db or %s", event.event_id, e_id, known_ids, ) raise AuthError(403, "Auth events are stale") context.auth_events[(e.type, e.state_key)] = e + logger.debug( + "_handle_new_event: Before hack: %s, sigs: %s", + event.event_id, event.signatures, + ) + if event.type == RoomMemberEvent.TYPE and not event.auth_events: if len(event.prev_events) == 1: c = yield self.store.get_event(event.prev_events[0][0]) if c.type == RoomCreateEvent.TYPE: context.auth_events[(c.type, c.state_key)] = c + logger.debug( + "_handle_new_event: Before auth check: %s, sigs: %s", + event.event_id, event.signatures, + ) + self.auth.check(event, auth_events=context.auth_events) + logger.debug( + "_handle_new_event: Before persist_event: %s, sigs: %s", + event.event_id, event.signatures, + ) + yield self.store.persist_event( event, context=context, @@ -697,4 +740,9 @@ class FederationHandler(BaseHandler): current_state=current_state, ) + logger.debug( + "_handle_new_event: After persist_event: %s, sigs: %s", + event.event_id, event.signatures, + ) + defer.returnValue(context) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8ee560d79a..13fa0be7b4 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -137,7 +137,6 @@ class MessageHandler(BaseHandler): def handle_event(self, event_dict): builder = self.event_builder_factory.new(event_dict) - if builder.type == EventTypes.Member: membership = builder.content.get("membership", None) if membership == Membership.JOIN: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 509763ebc7..93732a9c87 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -436,19 +436,16 @@ class RoomMemberHandler(BaseHandler): else: should_do_dance = False - have_joined = False if should_do_dance: handler = self.hs.get_handlers().federation_handler - have_joined = yield handler.do_invite_join( + yield handler.do_invite_join( room_host, room_id, event.user_id, event.get_dict()["content"], # FIXME To get a non-frozen dict context ) - - # We want to do the _do_update inside the room lock. - if not have_joined: + else: logger.debug("Doing normal join") yield self._do_local_membership_update( diff --git a/synapse/state.py b/synapse/state.py index ebec0ad9dc..7fdf596006 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -144,6 +144,17 @@ class StateHandler(object): (s.type, s.state_key): s for s in old_state } context.state_group = None + + if hasattr(event, "auth_events") and event.auth_events: + auth_ids = zip(*event.auth_events)[0] + context.auth_events = { + k: v + for k, v in context.current_state.items() + if v.event_id in auth_ids + } + else: + context.auth_events = {} + defer.returnValue([]) if event.is_state(): diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 7a09c33613..07b1665bf6 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -312,16 +312,6 @@ class DataStore(RoomMemberStore, RoomStore, txn, event.event_id, hash_alg, hash_bytes, ) - if hasattr(event, "signatures"): - logger.debug("sigs: %s", event.signatures) - for name, sigs in event.signatures.items(): - for key_id, signature_base64 in sigs.items(): - signature_bytes = decode_base64(signature_base64) - self._store_event_signature_txn( - txn, event.event_id, name, key_id, - signature_bytes, - ) - for prev_event_id, prev_hashes in event.prev_events: for alg, hash_base64 in prev_hashes.items(): hash_bytes = decode_base64(hash_base64) -- cgit 1.4.1 From 102d2373b4c5e46252ca9e2845a2415c5d00d193 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Dec 2014 11:38:08 +0000 Subject: Add __str__ to FrozenEvent --- synapse/events/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/events') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index ed02138706..c43367fa20 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -160,3 +160,8 @@ class FrozenEvent(EventBase): def get_dict(self): # We need to unfreeze what we return return _unfreeze(super(FrozenEvent, self).get_dict()) + + def __str__(self): + return "" % ( + self.event_id, self.type, self.state_key, + ) -- cgit 1.4.1 From 02db1fd2e7cfbd9741684a0a875789cb5d599f54 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Dec 2014 12:00:05 +0000 Subject: Fix AttributeError --- synapse/events/__init__.py | 3 +++ synapse/handlers/federation.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/events') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index c43367fa20..5f41933174 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -55,6 +55,9 @@ class _EventInternalMetadata(object): def get_dict(self): return dict(self.__dict__) + def is_outlier(self): + return hasattr(self, "outlier") and self.outlier + def _event_dict_property(key): def getter(self): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9ae3e5eca4..f38d9a48c8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -687,7 +687,7 @@ class FederationHandler(BaseHandler): event.event_id, event.signatures, ) - is_new_state = not event.internal_metadata.outlier + is_new_state = not event.internal_metadata.is_outlier() known_ids = set( [s.event_id for s in context.auth_events.values()] -- cgit 1.4.1 From 02db7eb209af879a0168a371b4cb1c2ad0fcab49 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Dec 2014 14:02:48 +0000 Subject: Fix bug when uploading state with empty state_key --- synapse/events/__init__.py | 10 +++++-- synapse/handlers/_base.py | 66 ++++--------------------------------------- synapse/handlers/message.py | 15 ---------- synapse/rest/room.py | 20 +++++++------ synapse/storage/__init__.py | 4 ++- synapse/storage/schema/im.sql | 2 ++ 6 files changed, 29 insertions(+), 88 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 5f41933174..bd3ffb59cc 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -118,6 +118,9 @@ class EventBase(object): return d + def get(self, key, default): + return self._event_dict.get(key, default) + def get_internal_metadata_dict(self): return self.internal_metadata.get_dict() @@ -165,6 +168,9 @@ class FrozenEvent(EventBase): return _unfreeze(super(FrozenEvent, self).get_dict()) def __str__(self): + return self.__repr__() + + def __repr__(self): return "" % ( - self.event_id, self.type, self.state_key, - ) + self.event_id, self.type, self.get("state_key", None), + ) \ No newline at end of file diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 6b5f6e7cd8..fdd3151877 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -97,6 +97,11 @@ class BaseHandler(object): event = builder.build() + logger.debug( + "Created event %s with auth_events: %s, current state: %s", + event.event_id, context.auth_events, context.current_state, + ) + defer.returnValue( (event, context,) ) @@ -147,64 +152,3 @@ class BaseHandler(object): None, destinations=destinations, ) - - # @defer.inlineCallbacks - # def _on_new_room_event(self, event, snapshot, extra_destinations=[], - # extra_users=[], suppress_auth=False, - # do_invite_host=None): - # yield run_on_reactor() - # - # snapshot.fill_out_prev_events(event) - # - # yield self.state_handler.annotate_event_with_state(event) - # - # yield self.auth.add_auth_events(event) - # - # logger.debug("Signing event...") - # - # add_hashes_and_signatures( - # event, self.server_name, self.signing_key - # ) - # - # logger.debug("Signed event.") - # - # if not suppress_auth: - # logger.debug("Authing...") - # self.auth.check(event, auth_events=event.old_state_events) - # logger.debug("Authed") - # else: - # logger.debug("Suppressed auth.") - # - # if do_invite_host: - # federation_handler = self.hs.get_handlers().federation_handler - # invite_event = yield federation_handler.send_invite( - # do_invite_host, - # event - # ) - # - # # FIXME: We need to check if the remote changed anything else - # event.signatures = invite_event.signatures - # - # yield self.store.persist_event(event) - # - # destinations = set(extra_destinations) - # # Send a PDU to all hosts who have joined the room. - # - # for k, s in event.state_events.items(): - # try: - # if k[0] == RoomMemberEvent.TYPE: - # if s.content["membership"] == Membership.JOIN: - # destinations.add( - # self.hs.parse_userid(s.state_key).domain - # ) - # except: - # logger.warn( - # "Failed to get destination from event %s", s.event_id - # ) - # - # event.destinations = list(destinations) - # - # yield self.notifier.on_new_room_event(event, extra_users=extra_users) - # - # federation_handler = self.hs.get_handlers().federation_handler - # yield federation_handler.handle_new_event(event, snapshot) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 13fa0be7b4..9043b945e4 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -165,21 +165,6 @@ class MessageHandler(BaseHandler): defer.returnValue(event) - @defer.inlineCallbacks - def store_room_data(self, event=None): - """ Stores data for a room. - - Args: - event : The room path event - stamp_event (bool) : True to stamp event content with server keys. - Raises: - SynapseError if something went wrong. - """ - - snapshot = yield self.store.snapshot_room(event) - - yield self._on_new_room_event(event, snapshot) - @defer.inlineCallbacks def get_room_data(self, user_id=None, room_id=None, event_type=None, state_key=""): diff --git a/synapse/rest/room.py b/synapse/rest/room.py index 3d78b4ff5c..22e1244e57 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -147,16 +147,18 @@ class RoomStateEventRestServlet(RestServlet): content = _parse_json(request) + event_dict = { + "type": event_type, + "content": content, + "room_id": urllib.unquote(room_id), + "sender": user.to_string(), + } + + if state_key is not None: + event_dict["state_key"] = urllib.unquote(state_key) + msg_handler = self.handlers.message_handler - yield msg_handler.handle_event( - { - "type": event_type, - "content": content, - "room_id": room_id, - "sender": user.to_string(), - "state_key": urllib.unquote(state_key), - } - ) + yield msg_handler.handle_event(event_dict) defer.returnValue((200, {})) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 7068424441..f8d895082d 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -39,6 +39,7 @@ from .state import StateStore from .signatures import SignatureStore from syutil.base64util import decode_base64 +from syutil.jsonutil import encode_canonical_json from synapse.crypto.event_signing import compute_event_reference_hash @@ -162,7 +163,8 @@ class DataStore(RoomMemberStore, RoomStore, table="event_json", values={ "event_id": event.event_id, - "json": json.dumps(event_dict, separators=(',', ':')), + "room_id": event.room_id, + "json": encode_canonical_json(event_dict), }, or_replace=True, ) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index cb0c494ddf..0300bb29e1 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -35,11 +35,13 @@ CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id); CREATE TABLE IF NOT EXISTS event_json( event_id TEXT NOT NULL, + room_id TEXT NOT NULL, json BLOB NOT NULL, CONSTRAINT ev_j_uniq UNIQUE (event_id) ); CREATE INDEX IF NOT EXISTS event_json_id ON event_json(event_id); +CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id); CREATE TABLE IF NOT EXISTS state_events( -- cgit 1.4.1 From cabead6194c2946c2d05d6662c7ffe237d60291d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Dec 2014 14:49:52 +0000 Subject: Actually fix bug when uploading state with empty state_key --- synapse/events/__init__.py | 1 + synapse/rest/room.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index bd3ffb59cc..7103b937af 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -94,6 +94,7 @@ class EventBase(object): event_id = _event_dict_property("event_id") hashes = _event_dict_property("hashes") origin = _event_dict_property("origin") + origin_server_ts = _event_dict_property("origin_server_ts") prev_events = _event_dict_property("prev_events") prev_state = _event_dict_property("prev_state") room_id = _event_dict_property("room_id") diff --git a/synapse/rest/room.py b/synapse/rest/room.py index 22e1244e57..c526e9bc72 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -117,10 +117,10 @@ class RoomStateEventRestServlet(RestServlet): self.on_PUT_no_state_key) def on_GET_no_state_key(self, request, room_id, event_type): - return self.on_GET(request, room_id, event_type, None) + return self.on_GET(request, room_id, event_type, "") def on_PUT_no_state_key(self, request, room_id, event_type): - return self.on_PUT(request, room_id, event_type, None) + return self.on_PUT(request, room_id, event_type, "") @defer.inlineCallbacks def on_GET(self, request, room_id, event_type, state_key): -- cgit 1.4.1 From b245ee34ed70854d0802921feb13822cd07996fa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Dec 2014 17:59:47 +0000 Subject: Add some basic event validation --- synapse/events/validator.py | 29 +++++++++++++++++++++-------- synapse/handlers/message.py | 8 ++++++-- 2 files changed, 27 insertions(+), 10 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/validator.py b/synapse/events/validator.py index f319072d37..47830aa985 100644 --- a/synapse/events/validator.py +++ b/synapse/events/validator.py @@ -15,6 +15,7 @@ from synapse.types import EventID, RoomID, UserID from synapse.api.errors import SynapseError +from synapse.api.constants import EventTypes, Membership class EventValidator(object): @@ -23,14 +24,19 @@ class EventValidator(object): EventID.from_string(event.event_id) RoomID.from_string(event.room_id) - hasattr(event, "auth_events") - hasattr(event, "content") - hasattr(event, "hashes") - hasattr(event, "origin") - hasattr(event, "prev_events") - hasattr(event, "prev_events") - hasattr(event, "sender") - hasattr(event, "type") + required = [ + # "auth_events", + "content", + # "hashes", + "origin", + # "prev_events", + "sender", + "type", + ] + + for k in required: + if not hasattr(event, k): + raise SynapseError(400, "Event does not have key %s" % (k,)) # Check that the following keys have string values strings = [ @@ -46,6 +52,13 @@ class EventValidator(object): if not isinstance(getattr(event, s), basestring): raise SynapseError(400, "Not '%s' a string type" % (s,)) + if event.type == EventTypes.Member: + if "membership" not in event.content: + raise SynapseError(400, "Content has not membership key") + + if event.content["membership"] not in Membership.LIST: + raise SynapseError(400, "Invalid membership key") + # Check that the following keys have dictionary values # TODO diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 9043b945e4..f92b01a50f 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -19,6 +19,9 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import RoomError from synapse.streams.config import PaginationConfig from synapse.util.logcontext import PreserveLoggingContext + +from synapse.events.validator import EventValidator + from ._base import BaseHandler import logging @@ -33,6 +36,7 @@ class MessageHandler(BaseHandler): self.hs = hs self.clock = hs.get_clock() self.event_factory = hs.get_event_factory() + self.validator = EventValidator() @defer.inlineCallbacks def get_message(self, msg_id=None, room_id=None, sender_id=None, @@ -137,6 +141,8 @@ class MessageHandler(BaseHandler): def handle_event(self, event_dict): builder = self.event_builder_factory.new(event_dict) + self.validator.validate(builder) + if builder.type == EventTypes.Member: membership = builder.content.get("membership", None) if membership == Membership.JOIN: @@ -152,8 +158,6 @@ class MessageHandler(BaseHandler): builder=builder, ) - # TODO: self.validator.validate(event) - if event.type == EventTypes.Member: member_handler = self.hs.get_handlers().room_member_handler yield member_handler.change_membership(event, context) -- cgit 1.4.1 From 8cdebce470869613658543cb79ed5dd97a5f0548 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 11 Dec 2014 13:25:19 +0000 Subject: Fix redactions. Fix 'age' key --- synapse/events/__init__.py | 1 + synapse/events/builder.py | 13 ++++++--- synapse/events/utils.py | 21 +++++++++++++++ synapse/state.py | 12 +++++++++ synapse/storage/_base.py | 60 ++++++++++++++++++++++++----------------- tests/storage/test_redaction.py | 6 ++--- 6 files changed, 83 insertions(+), 30 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 7103b937af..98d7f0e324 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -97,6 +97,7 @@ class EventBase(object): origin_server_ts = _event_dict_property("origin_server_ts") prev_events = _event_dict_property("prev_events") prev_state = _event_dict_property("prev_state") + redacts = _event_dict_property("redacts") room_id = _event_dict_property("room_id") sender = _event_dict_property("sender") state_key = _event_dict_property("state_key") diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 642264e9f3..9579b1fe8b 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -19,11 +19,18 @@ from synapse.types import EventID from synapse.util.stringutils import random_string +import copy + class EventBuilder(EventBase): def __init__(self, key_values={}): + signatures = copy.deepcopy(key_values.pop("signatures", {})) + unsigned = copy.deepcopy(key_values.pop("unsigned", {})) + super(EventBuilder, self).__init__( key_values, + signatures=signatures, + unsigned=unsigned ) def update_event_key(self, key, value): @@ -61,9 +68,9 @@ class EventBuilderFactory(object): key_values.setdefault("origin", self.hostname) key_values.setdefault("origin_server_ts", time_now) - if "unsigned" in key_values: - age = key_values["unsigned"].pop("age", 0) - key_values["unsigned"].setdefault("age_ts", time_now - age) + key_values.setdefault("unsigned", {}) + age = key_values["unsigned"].pop("age", 0) + key_values["unsigned"].setdefault("age_ts", time_now - age) key_values["signatures"] = {} diff --git a/synapse/events/utils.py b/synapse/events/utils.py index f5e135e3d0..6d9c9352e2 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -80,6 +80,11 @@ def prune_event(event): allowed_fields["content"] = new_content + allowed_fields["unsigned"] = {} + + if "age_ts" in event.unsigned: + allowed_fields["unsigned"]["age_ts"] = event.unsigned["age_ts"] + return type(event)(allowed_fields) @@ -97,4 +102,20 @@ def serialize_event(hs, e): d["user_id"] = d.pop("sender", None) + if "redacted_because" in e.unsigned: + d["redacted_because"] = serialize_event( + hs, e.unsigned["redacted_because"] + ) + + del d["unsigned"]["redacted_because"] + + if "redacted_by" in e.unsigned: + d["redacted_by"] = e.unsigned["redacted_by"] + del d["unsigned"]["redacted_by"] + + del d["auth_events"] + del d["prev_events"] + del d["hashes"] + del d["signatures"] + return d diff --git a/synapse/state.py b/synapse/state.py index 7fdf596006..5bfa73fb46 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -155,6 +155,12 @@ class StateHandler(object): else: context.auth_events = {} + if event.is_state(): + key = (event.type, event.state_key) + if key in context.current_state: + replaces = context.current_state[key] + event.unsigned["replaces_state"] = replaces.event_id + defer.returnValue([]) if event.is_state(): @@ -177,6 +183,12 @@ class StateHandler(object): prev_state ) + if event.is_state(): + key = (event.type, event.state_key) + if key in context.current_state: + replaces = context.current_state[key] + event.unsigned["replaces_state"] = replaces.event_id + if hasattr(event, "auth_events") and event.auth_events: auth_ids = zip(*event.auth_events)[0] context.auth_events = { diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 12239fa074..ffc26d4a61 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -444,38 +444,50 @@ class SQLBaseStore(object): def _get_events_txn(self, txn, event_ids): events = [] for e_id in event_ids: - js = self._simple_select_one_onecol_txn( - txn, - table="event_json", - keyvalues={"event_id": e_id}, - retcol="json", - allow_none=True, - ) + ev = self._get_event_txn(txn, e_id) - if not js: - # FIXME (erikj): What should we actually do here? - continue + if ev: + events.append(ev) - d = json.loads(js) + return events - ev = FrozenEvent(d) + def _get_event_txn(self, txn, event_id, check_redacted=True): + sql = ( + "SELECT json, r.event_id FROM event_json as e " + "LEFT JOIN redactions as r ON e.event_id = r.redacts " + "WHERE e.event_id = ? " + "LIMIT 1 " + ) - if hasattr(ev, "redacted") and ev.redacted: - # Get the redaction event. - select_event_sql = "SELECT * FROM events WHERE event_id = ?" - txn.execute(select_event_sql, (ev.redacted,)) + txn.execute(sql, (event_id,)) - del_evs = self._parse_events_txn( - txn, self.cursor_to_dict(txn) - ) + res = txn.fetchone() - if del_evs: - ev = prune_event(ev) - ev.redacted_because = del_evs[0] + if not res: + return None - events.append(ev) + js, redacted = res - return events + d = json.loads(js) + + ev = FrozenEvent(d) + + if check_redacted and redacted: + ev = prune_event(ev) + + ev.unsigned["redacted_by"] = redacted + # Get the redaction event. + + because = self._get_event_txn( + txn, + redacted, + check_redacted=False + ) + + if because: + ev.unsigned["redacted_because"] = because + + return ev def _parse_events(self, rows): return self.runInteraction( diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py index e8671ae3a2..d81f7add1c 100644 --- a/tests/storage/test_redaction.py +++ b/tests/storage/test_redaction.py @@ -149,7 +149,7 @@ class RedactionTestCase(unittest.TestCase): event, ) - self.assertFalse(hasattr(event, "redacted_because")) + self.assertFalse("redacted_because" in event.unsigned) # Redact event reason = "Because I said so" @@ -179,7 +179,7 @@ class RedactionTestCase(unittest.TestCase): event, ) - self.assertTrue(hasattr(event, "redacted_because")) + self.assertTrue("redacted_because" in event.unsigned) self.assertObjectHasAttributes( { @@ -187,7 +187,7 @@ class RedactionTestCase(unittest.TestCase): "user_id": self.u_alice.to_string(), "content": {"reason": reason}, }, - event.redacted_because, + event.unsigned["redacted_because"], ) @defer.inlineCallbacks -- cgit 1.4.1 From 9191292b0f657f6210e88f16ffd9a182bfab8170 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 11 Dec 2014 15:16:55 +0000 Subject: Fix prev_content --- synapse/events/utils.py | 8 ++++++++ synapse/storage/_base.py | 6 ++++++ tests/storage/test_stream.py | 5 ++--- 3 files changed, 16 insertions(+), 3 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 6d9c9352e2..4ab770dd5f 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -113,6 +113,14 @@ def serialize_event(hs, e): d["redacted_by"] = e.unsigned["redacted_by"] del d["unsigned"]["redacted_by"] + if "replaces_state" in e.unsigned: + d["replaces_state"] = e.unsigned["replaces_state"] + del d["unsigned"]["replaces_state"] + + if "prev_content" in e.unsigned: + d["prev_content"] = e.unsigned["prev_content"] + del d["unsigned"]["prev_content"] + del d["auth_events"] del d["prev_events"] del d["hashes"] diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index ffc26d4a61..e9cf73a8e2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -487,6 +487,12 @@ class SQLBaseStore(object): if because: ev.unsigned["redacted_because"] = because + if "replaces_state" in ev.unsigned: + ev.unsigned["prev_content"] = self._get_event_txn( + txn, + ev.unsigned["replaces_state"], + ).get_dict()["content"] + return ev def _parse_events(self, rows): diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index cba65bb9f0..4865a5c142 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -58,7 +58,7 @@ class StreamStoreTestCase(unittest.TestCase): self.depth = 1 @defer.inlineCallbacks - def inject_room_member(self, room, user, membership, replaces_state=None): + def inject_room_member(self, room, user, membership): self.depth += 1 builder = self.event_builder_factory.new({ @@ -215,7 +215,6 @@ class StreamStoreTestCase(unittest.TestCase): event2 = yield self.inject_room_member( self.room1, self.u_alice, Membership.JOIN, - replaces_state=event1.event_id, ) end = yield self.store.get_room_events_max_id() @@ -233,6 +232,6 @@ class StreamStoreTestCase(unittest.TestCase): event = results[0] self.assertTrue( - hasattr(event, "prev_content"), + "prev_content" in event.unsigned, msg="No prev_content key" ) -- cgit 1.4.1 From 63810c777d8c02bc6fefa2a4bcfacb7f4df21ba2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Dec 2014 10:56:14 +0000 Subject: Validate message, topic and name event contents --- synapse/api/constants.py | 5 +++++ synapse/events/validator.py | 21 +++++++++++++++++++++ synapse/handlers/message.py | 2 +- 3 files changed, 27 insertions(+), 1 deletion(-) (limited to 'synapse/events') diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 7e8c892b64..b668da4a26 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -68,3 +68,8 @@ class EventTypes(object): PowerLevels = "m.room.power_levels" Aliases = "m.room.aliases" Redaction = "m.room.redaction" + + # These are used for validation + Message = "m.room.message" + Topic = "m.room.topic" + Name = "m.room.name" diff --git a/synapse/events/validator.py b/synapse/events/validator.py index 47830aa985..ebc6c30e62 100644 --- a/synapse/events/validator.py +++ b/synapse/events/validator.py @@ -69,3 +69,24 @@ class EventValidator(object): self.validate(event) UserID.from_string(event.sender) + + if event.type == EventTypes.Message: + strings = [ + "body", + "msgtype", + ] + + self._ensure_strings(event.content, strings) + + elif event.type == EventTypes.Topic: + self._ensure_strings(event.content, ["topic"]) + + elif event.type == EventTypes.Name: + self._ensure_strings(event.content, ["name"]) + + def _ensure_strings(self, d, keys): + for s in keys: + if s not in d: + raise SynapseError(400, "'%s' not in content" % (s,)) + if not isinstance(d[s], basestring): + raise SynapseError(400, "Not '%s' a string type" % (s,)) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index f92b01a50f..4fa4ffea25 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -141,7 +141,7 @@ class MessageHandler(BaseHandler): def handle_event(self, event_dict): builder = self.event_builder_factory.new(event_dict) - self.validator.validate(builder) + self.validator.validate_new(builder) if builder.type == EventTypes.Member: membership = builder.content.get("membership", None) -- cgit 1.4.1 From c8dd3314d673fce90a53520475cdb19d5358dd34 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Dec 2014 13:55:22 +0000 Subject: Fix bug where we ignored event_edge_hashes table --- scripts/check_event_hash.py | 3 +++ synapse/events/utils.py | 1 + synapse/storage/__init__.py | 1 - synapse/storage/_base.py | 1 - synapse/storage/event_federation.py | 11 ++++++----- 5 files changed, 10 insertions(+), 7 deletions(-) (limited to 'synapse/events') diff --git a/scripts/check_event_hash.py b/scripts/check_event_hash.py index 7c32f8102a..679afbd268 100644 --- a/scripts/check_event_hash.py +++ b/scripts/check_event_hash.py @@ -18,6 +18,9 @@ class dictobj(dict): def get_full_dict(self): return dict(self) + def get_pdu_json(self): + return dict(self) + def main(): parser = argparse.ArgumentParser() diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 4ab770dd5f..94f3f15f52 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -42,6 +42,7 @@ def prune_event(event): "auth_events", "origin", "origin_server_ts", + "membership", ] new_content = {} diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 079ddac605..d0ea304b3d 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -91,7 +91,6 @@ class DataStore(RoomMemberStore, RoomStore, def __init__(self, hs): super(DataStore, self).__init__(hs) - self.event_factory = hs.get_event_factory() self.hs = hs self.min_token_deferred = self._get_min_token() diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1967290ad2..31d5163c19 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -83,7 +83,6 @@ class SQLBaseStore(object): def __init__(self, hs): self.hs = hs self._db_pool = hs.get_db_pool() - self.event_factory = hs.get_event_factory() self._clock = hs.get_clock() @defer.inlineCallbacks diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 6c559f8f63..ced066f407 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -177,14 +177,15 @@ class EventFederationStore(SQLBaseStore): retcols=["prev_event_id", "is_state"], ) + hashes = self._get_prev_event_hashes_txn(txn, event_id) + results = [] for d in res: - hashes = self._get_event_reference_hashes_txn( - txn, - d["prev_event_id"] - ) + edge_hash = self._get_event_reference_hashes_txn(txn, d["prev_event_id"]) + edge_hash.update(hashes.get(d["prev_event_id"], {})) prev_hashes = { - k: encode_base64(v) for k, v in hashes.items() + k: encode_base64(v) + for k, v in edge_hash.items() if k == "sha256" } results.append((d["prev_event_id"], prev_hashes, d["is_state"])) -- cgit 1.4.1 From b75adaedcaa0f153557557217844cd06f92635ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Dec 2014 16:14:34 +0000 Subject: Finish up upgrade script --- scripts/upgrade_db_to_v0.5.5.py | 124 ++++++++++++++++++++++++++++++---------- synapse/events/__init__.py | 2 +- 2 files changed, 95 insertions(+), 31 deletions(-) (limited to 'synapse/events') diff --git a/scripts/upgrade_db_to_v0.5.5.py b/scripts/upgrade_db_to_v0.5.5.py index 5898341d6e..be9d07b2df 100644 --- a/scripts/upgrade_db_to_v0.5.5.py +++ b/scripts/upgrade_db_to_v0.5.5.py @@ -4,25 +4,42 @@ from synapse.storage.event_federation import EventFederationStore from syutil.base64util import encode_base64, decode_base64 -from synapse.events import FrozenEvent +from synapse.crypto.event_signing import compute_event_signature + from synapse.events.builder import EventBuilder from synapse.events.utils import prune_event from synapse.crypto.event_signing import check_event_content_hash -from syutil.crypto.jsonsign import verify_signed_json, SignatureVerifyException -from syutil.crypto.signing_key import ( - decode_verify_key_bytes, write_signing_keys +from syutil.crypto.jsonsign import ( + verify_signed_json, SignatureVerifyException, ) +from syutil.crypto.signing_key import decode_verify_key_bytes + +from syutil.jsonutil import encode_canonical_json +import argparse import dns.resolver import hashlib import json import sqlite3 -import sys +import syutil import urllib2 +delta_sql = """ +CREATE TABLE IF NOT EXISTS event_json( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + json BLOB NOT NULL, + CONSTRAINT ev_j_uniq UNIQUE (event_id) +); + +CREATE INDEX IF NOT EXISTS event_json_id ON event_json(event_id); +CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id); +""" + + class Store(object): _get_event_signatures_txn = SignatureStore.__dict__["_get_event_signatures_txn"] _get_event_content_hashes_txn = SignatureStore.__dict__["_get_event_content_hashes_txn"] @@ -33,10 +50,9 @@ class Store(object): cursor_to_dict = SQLBaseStore.__dict__["cursor_to_dict"] _simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"] _simple_select_list_txn = SQLBaseStore.__dict__["_simple_select_list_txn"] + _simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"] def _generate_event_json(self, txn, rows): - sql = "SELECT * FROM events WHERE event_id = ? ORDER BY rowid asc" - events = [] for row in rows: d = dict(row) @@ -145,7 +161,9 @@ def get_key(server_name): keys = json.load(urllib2.urlopen(url, timeout=2)) verify_keys = {} for key_id, key_base64 in keys["verify_keys"].items(): - verify_key = decode_verify_key_bytes(key_id, decode_base64(key_base64)) + verify_key = decode_verify_key_bytes( + key_id, decode_base64(key_base64) + ) verify_signed_json(keys, server_name, verify_key) verify_keys[key_id] = verify_key print "Got keys for: %s" % (server_name,) @@ -157,18 +175,11 @@ def get_key(server_name): return {} -def get_events(cursor): - # cursor.execute( - # "SELECT * FROM events WHERE event_id = ? ORDER BY rowid DESC", - # ("$14182049031533SMfTT:matrix.org",) - # ) - - # cursor.execute( - # "SELECT * FROM events ORDER BY rowid DESC LIMIT 10000" - # ) +def reinsert_events(cursor, server_name, signing_key): + cursor.executescript(delta_sql) cursor.execute( - "SELECT * FROM events ORDER BY rowid DESC" + "SELECT * FROM events ORDER BY rowid ASC" ) rows = store.cursor_to_dict(cursor) @@ -181,19 +192,26 @@ def get_events(cursor): "sha256": hashlib.sha256, } - server_keys = {} + key_id = "%s:%s" % (signing_key.alg, signing_key.version) + verify_key = signing_key.verify_key + verify_key.alg = signing_key.alg + verify_key.version = signing_key.version + + server_keys = { + server_name: { + key_id: verify_key + } + } for event in events: for alg_name in event.hashes: if check_event_content_hash(event, algorithms[alg_name]): - # print "PASS content hash %s" % (alg_name,) pass else: pass print "FAIL content hash %s %s" % (alg_name, event.event_id, ) - # print "%s %d" % (event.event_id, event.origin_server_ts) - # print json.dumps(event.get_pdu_json(), indent=4, sort_keys=True) + have_own_correctly_signed = False for host, sigs in event.signatures.items(): pruned = prune_event(event) @@ -207,17 +225,63 @@ def get_events(cursor): host, server_keys[host][key_id] ) - except SignatureVerifyException as e: - # print e - print "FAIL signature check %s %s" % (key_id, event.event_id) - # print json.dumps(pruned.get_pdu_json(), indent=4, sort_keys=True) -def main(): - conn = sqlite3.connect(sys.argv[1]) + if host == server_name: + have_own_correctly_signed = True + except SignatureVerifyException: + print "FAIL signature check %s %s" % ( + key_id, event.event_id + ) + + # TODO: Re sign with our own server key + if not have_own_correctly_signed: + sigs = compute_event_signature(event, server_name, signing_key) + event.signatures.update(sigs) + + pruned = prune_event(event) + + for key_id in event.signatures[server_name]: + verify_signed_json( + pruned.get_pdu_json(), + server_name, + server_keys[server_name][key_id] + ) + + event_json = encode_canonical_json( + event.get_dict() + ).decode("UTF-8") + + store._simple_insert_txn( + cursor, + table="event_json", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "json": event_json, + }, + or_replace=True, + ) + + +def main(database, server_name, signing_key): + conn = sqlite3.connect(database) cursor = conn.cursor() - get_events(cursor) + reinsert_events(cursor, server_name, signing_key) conn.commit() if __name__ == "__main__": - main() \ No newline at end of file + parser = argparse.ArgumentParser() + + parser.add_argument("database") + parser.add_argument("server_name") + parser.add_argument( + "signing_key", type=argparse.FileType('r'), + ) + args = parser.parse_args() + + signing_key = syutil.crypto.signing_key.read_signing_keys( + args.signing_key + ) + + main(args.database, args.server_name, signing_key[0]) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 98d7f0e324..d9dfe5e3f3 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -175,4 +175,4 @@ class FrozenEvent(EventBase): def __repr__(self): return "" % ( self.event_id, self.type, self.get("state_key", None), - ) \ No newline at end of file + ) -- cgit 1.4.1 From 670dcdfc14d855c45b076f852673906dc450f515 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Dec 2014 16:16:58 +0000 Subject: Remove unused functions --- synapse/events/snapshot.py | 42 ------------------------------------------ 1 file changed, 42 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index e0cbacc19c..b9fb29be01 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -13,48 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer - - -class EventSnapshot(object): - def __init__(self, prev_events, depth, current_state, - current_state_group): - self._prev_events = prev_events - self._depth = depth - self._current_state = current_state - self._current_state_group = current_state_group - - -class EventCache(object): - def __init__(self, store): - self._store = store - - self._cache = {} - - @defer.inlineCallbacks - def load_event(self, event_id): - event = self._cache.get(event_id, None) - - if not event: - event = yield self._store.get_event( - event_id, - allow_none=True - ) - - if event: - self._cache[event_id] = event - - defer.returnValue(event) - - def load_event_from_cache(self, event_id): - return self._cache.get(event_id, None) - - def add_to_cache(self, *events): - self._cache.update({ - event.event_id: event - for event in events - }) - class EventContext(object): -- cgit 1.4.1 From f280929a12314cfdb680881e2458bb047b7346ce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Dec 2014 17:31:36 +0000 Subject: Use frozenutils --- synapse/events/__init__.py | 36 ++++-------------------------------- synapse/storage/__init__.py | 3 +-- 2 files changed, 5 insertions(+), 34 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index d9dfe5e3f3..f8fbb18e3c 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -15,37 +15,9 @@ from frozendict import frozendict -import copy - - -def _freeze(o): - if isinstance(o, dict) or isinstance(o, frozendict): - return frozendict({k: _freeze(v) for k, v in o.items()}) - - if isinstance(o, basestring): - return o - - try: - return tuple([_freeze(i) for i in o]) - except TypeError: - pass - - return o +from synapse.util.frozenutils import freeze, unfreeze - -def _unfreeze(o): - if isinstance(o, frozendict) or isinstance(o, dict): - return dict({k: _unfreeze(v) for k, v in o.items()}) - - if isinstance(o, basestring): - return o - - try: - return [_unfreeze(i) for i in o] - except TypeError: - pass - - return o +import copy class _EventInternalMetadata(object): @@ -147,7 +119,7 @@ class FrozenEvent(EventBase): signatures = copy.deepcopy(event_dict.pop("signatures", {})) unsigned = copy.deepcopy(event_dict.pop("unsigned", {})) - frozen_dict = _freeze(event_dict) + frozen_dict = freeze(event_dict) super(FrozenEvent, self).__init__( frozen_dict, @@ -167,7 +139,7 @@ class FrozenEvent(EventBase): def get_dict(self): # We need to unfreeze what we return - return _unfreeze(super(FrozenEvent, self).get_dict()) + return unfreeze(super(FrozenEvent, self).get_dict()) def __str__(self): return self.__repr__() diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d0ea304b3d..e75eaa92d5 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -21,7 +21,6 @@ from synapse.api.events.room import ( ) from synapse.util.logutils import log_function -from synapse.util.frozenutils import FrozenEncoder from .directory import DirectoryStore from .feedback import FeedbackStore @@ -177,7 +176,7 @@ class DataStore(RoomMemberStore, RoomStore, "event_id": event.event_id, "type": event.type, "room_id": event.room_id, - "content": json.dumps(event.content, cls=FrozenEncoder), + "content": json.dumps(event.get_dict()["content"]), "processed": True, "outlier": outlier, "depth": event.depth, -- cgit 1.4.1 From 65b2e494294269db6026a8095ee364fbe1377620 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Dec 2014 17:35:37 +0000 Subject: Fix pyflakes --- synapse/events/__init__.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index f8fbb18e3c..1ec79a6788 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from frozendict import frozendict - from synapse.util.frozenutils import freeze, unfreeze import copy -- cgit 1.4.1 From 882dc8dcab434d24b0e902be3b4545ceabe9bdbe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 Dec 2014 13:17:09 +0000 Subject: Persist internal_metadata --- synapse/events/__init__.py | 5 +++-- synapse/storage/__init__.py | 5 +++++ synapse/storage/_base.py | 7 ++++--- synapse/storage/schema/im.sql | 1 + 4 files changed, 13 insertions(+), 5 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 1ec79a6788..984f14fce4 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -111,7 +111,7 @@ class EventBase(object): class FrozenEvent(EventBase): - def __init__(self, event_dict): + def __init__(self, event_dict, internal_metadata_dict={}): event_dict = copy.deepcopy(event_dict) signatures = copy.deepcopy(event_dict.pop("signatures", {})) @@ -122,7 +122,8 @@ class FrozenEvent(EventBase): super(FrozenEvent, self).__init__( frozen_dict, signatures=signatures, - unsigned=unsigned + unsigned=unsigned, + internal_metadata_dict=internal_metadata_dict, ) @staticmethod diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 5c079da5ba..26f205ae8f 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -156,12 +156,17 @@ class DataStore(RoomMemberStore, RoomStore, ] } + metadata_json = encode_canonical_json( + event.internal_metadata.get_dict() + ) + self._simple_insert_txn( txn, table="event_json", values={ "event_id": event.event_id, "room_id": event.room_id, + "internal_metadata": metadata_json.decode("UTF-8"), "json": encode_canonical_json(event_dict).decode("UTF-8"), }, or_replace=True, diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 31d5163c19..6dc857c4aa 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -452,7 +452,7 @@ class SQLBaseStore(object): def _get_event_txn(self, txn, event_id, check_redacted=True, get_prev_content=True): sql = ( - "SELECT json, r.event_id FROM event_json as e " + "SELECT internal_metadata, json, r.event_id FROM event_json as e " "LEFT JOIN redactions as r ON e.event_id = r.redacts " "WHERE e.event_id = ? " "LIMIT 1 " @@ -465,11 +465,12 @@ class SQLBaseStore(object): if not res: return None - js, redacted = res + internal_metadata, js, redacted = res d = json.loads(js) + internal_metadata = json.loads(internal_metadata) - ev = FrozenEvent(d) + ev = FrozenEvent(d, internal_metadata_dict=internal_metadata) if check_redacted and redacted: ev = prune_event(ev) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 0300bb29e1..253f9f779b 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -36,6 +36,7 @@ CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id); CREATE TABLE IF NOT EXISTS event_json( event_id TEXT NOT NULL, room_id TEXT NOT NULL, + internal_metadata NOT NULL, json BLOB NOT NULL, CONSTRAINT ev_j_uniq UNIQUE (event_id) ); -- cgit 1.4.1 From 3c7857e49b8dcad723d52174aba77c47453c0298 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 16 Dec 2014 15:24:03 +0000 Subject: clean up coding style a bit --- synapse/events/__init__.py | 22 +++++++++++----------- synapse/handlers/_base.py | 4 ++-- synapse/handlers/directory.py | 1 - synapse/handlers/federation.py | 11 ++++------- synapse/handlers/typing.py | 4 ++-- synapse/media/v1/base_resource.py | 1 + synapse/rest/room.py | 8 +++++--- synapse/rest/transactions.py | 1 + 8 files changed, 26 insertions(+), 26 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 984f14fce4..34b1b944ab 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -30,20 +30,20 @@ class _EventInternalMetadata(object): def _event_dict_property(key): - def getter(self): - return self._event_dict[key] + def getter(self): + return self._event_dict[key] - def setter(self, v): - self._event_dict[key] = v + def setter(self, v): + self._event_dict[key] = v - def delete(self): - del self._event_dict[key] + def delete(self): + del self._event_dict[key] - return property( - getter, - setter, - delete, - ) + return property( + getter, + setter, + delete, + ) class EventBase(object): diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index af8eb5f0f5..567769253e 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -61,8 +61,6 @@ class BaseHandler(object): def _create_new_client_event(self, builder): yield run_on_reactor() - context = EventContext() - latest_ret = yield self.store.get_latest_events_in_room( builder.room_id, ) @@ -78,6 +76,8 @@ class BaseHandler(object): builder.depth = depth state_handler = self.state_handler + + context = EventContext() ret = yield state_handler.annotate_context_with_state( builder, context, diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 404baea796..66d3b533d9 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -156,4 +156,3 @@ class DirectoryHandler(BaseHandler): "sender": user_id, "content": {"aliases": aliases}, }) - diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 16a104c0e2..d80a54bdea 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -202,7 +202,7 @@ class FederationHandler(BaseHandler): e.msg, affected=event.event_id, ) - + # if we're receiving valid events from an origin, # it's probably a good idea to mark it as not in retry-state # for sending (although this is a bit of a leap) @@ -263,9 +263,7 @@ class FederationHandler(BaseHandler): context = EventContext() yield self.state_handler.annotate_context_with_state(event, context) - events.append( - (event, context) - ) + events.append((event, context)) yield self.store.persist_event( event, @@ -547,8 +545,6 @@ class FederationHandler(BaseHandler): """ event = pdu - context = EventContext() - event.internal_metadata.outlier = True event.signatures.update( @@ -559,6 +555,7 @@ class FederationHandler(BaseHandler): ) ) + context = EventContext() yield self.state_handler.annotate_context_with_state(event, context) yield self.store.persist_event( @@ -685,13 +682,13 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _handle_new_event(self, event, state=None, backfilled=False, current_state=None, fetch_missing=True): - context = EventContext() logger.debug( "_handle_new_event: Before annotate: %s, sigs: %s", event.event_id, event.signatures, ) + context = EventContext() yield self.state_handler.annotate_context_with_state( event, context, diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 77d66f66ff..7626b07280 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -45,8 +45,8 @@ class TypingNotificationHandler(BaseHandler): hs.get_distributor().observe("user_left_room", self.user_left_room) - self._member_typing_until = {} # clock time we expect to stop - self._member_typing_timer = {} # deferreds to manage theabove + self._member_typing_until = {} # clock time we expect to stop + self._member_typing_timer = {} # deferreds to manage theabove # map room IDs to serial numbers self._room_serials = {} diff --git a/synapse/media/v1/base_resource.py b/synapse/media/v1/base_resource.py index 14735ff375..2f5440ab64 100644 --- a/synapse/media/v1/base_resource.py +++ b/synapse/media/v1/base_resource.py @@ -135,6 +135,7 @@ class BaseMediaResource(Resource): if download is None: download = self._get_remote_media_impl(server_name, media_id) self.downloads[key] = download + @download.addBoth def callback(media_info): del self.downloads[key] diff --git a/synapse/rest/room.py b/synapse/rest/room.py index 0e2d5fbaae..005a9f6f44 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -310,8 +310,8 @@ class RoomMessageListRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request, room_id): user = yield self.auth.get_user_by_req(request) - pagination_config = PaginationConfig.from_request(request, - default_limit=10, + pagination_config = PaginationConfig.from_request( + request, default_limit=10, ) with_feedback = "feedback" in request.args handler = self.handlers.message_handler @@ -466,7 +466,9 @@ class RoomRedactEventRestServlet(RestServlet): class RoomTypingRestServlet(RestServlet): - PATTERN = client_path_pattern("/rooms/(?P[^/]*)/typing/(?P[^/]*)$") + PATTERN = client_path_pattern( + "/rooms/(?P[^/]*)/typing/(?P[^/]*)$" + ) @defer.inlineCallbacks def on_PUT(self, request, room_id, user_id): diff --git a/synapse/rest/transactions.py b/synapse/rest/transactions.py index 8c41ab4edb..31377bd41d 100644 --- a/synapse/rest/transactions.py +++ b/synapse/rest/transactions.py @@ -19,6 +19,7 @@ import logging logger = logging.getLogger(__name__) + # FIXME: elsewhere we use FooStore to indicate something in the storage layer... class HttpTransactionStore(object): -- cgit 1.4.1