From 1c445f88f64beabf0bd9bec3950a4a4c0d529e8a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 15 Oct 2014 17:09:04 +0100 Subject: persist hashes and origin signatures for PDUs --- synapse/federation/units.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) (limited to 'synapse/federation/units.py') diff --git a/synapse/federation/units.py b/synapse/federation/units.py index d97aeb698e..3518efb215 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -18,6 +18,7 @@ server protocol. """ from synapse.util.jsonobject import JsonEncodedObject +from syutil.base64util import encode_base64 import logging import json @@ -63,6 +64,8 @@ class Pdu(JsonEncodedObject): "depth", "content", "outlier", + "hashes", + "signatures", "is_state", # Below this are keys valid only for State Pdus. "state_key", "power_level", @@ -91,7 +94,7 @@ class Pdu(JsonEncodedObject): # just leaving it as a dict. (OR DO WE?!) def __init__(self, destinations=[], is_state=False, prev_pdus=[], - outlier=False, **kwargs): + outlier=False, hashes={}, signatures={}, **kwargs): if is_state: for required_key in ["state_key"]: if required_key not in kwargs: @@ -102,6 +105,8 @@ class Pdu(JsonEncodedObject): is_state=is_state, prev_pdus=prev_pdus, outlier=outlier, + hashes=hashes, + signatures=signatures, **kwargs ) @@ -126,6 +131,16 @@ class Pdu(JsonEncodedObject): if "unrecognized_keys" in d and d["unrecognized_keys"]: args.update(json.loads(d["unrecognized_keys"])) + hashes = { + alg: encode_base64(hsh) + for alg, hsh in pdu_tuple.hashes.items() + } + + signatures = { + kid: encode_base64(sig) + for kid, sig in pdu_tuple.signatures.items() + } + return Pdu( prev_pdus=pdu_tuple.prev_pdu_list, **args -- cgit 1.4.1 From bb04447c44036ebf3ae5dde7a4cc7a7909d50ef6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 16 Oct 2014 23:25:12 +0100 Subject: Include hashes of previous pdus when referencing them --- synapse/api/events/__init__.py | 2 +- synapse/federation/pdu_codec.py | 13 ++++--------- synapse/federation/replication.py | 2 +- synapse/federation/units.py | 10 +++++++++- synapse/state.py | 4 ---- synapse/storage/__init__.py | 20 ++++++++++++++------ synapse/storage/pdu.py | 22 ++++++++++++++++------ synapse/storage/schema/signatures.sql | 16 ++++++++++++++++ synapse/storage/signatures.py | 31 +++++++++++++++++++++++++++++++ tests/federation/test_federation.py | 2 +- tests/federation/test_pdu_codec.py | 4 ++-- 11 files changed, 95 insertions(+), 31 deletions(-) (limited to 'synapse/federation/units.py') diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index f66fea2904..a5a55742e0 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -65,13 +65,13 @@ class SynapseEvent(JsonEncodedObject): internal_keys = [ "is_state", - "prev_events", "depth", "destinations", "origin", "outlier", "power_level", "redacted", + "prev_pdus", ] required_keys = [ diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index bcac5f9ae8..11fd7264b3 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -45,9 +45,7 @@ class PduCodec(object): kwargs["event_id"] = encode_event_id(pdu.pdu_id, pdu.origin) kwargs["room_id"] = pdu.context kwargs["etype"] = pdu.pdu_type - kwargs["prev_events"] = [ - encode_event_id(p[0], p[1]) for p in pdu.prev_pdus - ] + kwargs["prev_pdus"] = pdu.prev_pdus if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"): kwargs["prev_state"] = encode_event_id( @@ -78,11 +76,8 @@ class PduCodec(object): d["context"] = event.room_id d["pdu_type"] = event.type - if hasattr(event, "prev_events"): - d["prev_pdus"] = [ - decode_event_id(e, self.server_name) - for e in event.prev_events - ] + if hasattr(event, "prev_pdus"): + d["prev_pdus"] = event.prev_pdus if hasattr(event, "prev_state"): d["prev_state_id"], d["prev_state_origin"] = ( @@ -95,7 +90,7 @@ class PduCodec(object): kwargs = copy.deepcopy(event.unrecognized_keys) kwargs.update({ k: v for k, v in d.items() - if k not in ["event_id", "room_id", "type", "prev_events"] + if k not in ["event_id", "room_id", "type"] }) if "ts" not in kwargs: diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 9363ac7300..788a49b8e8 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -443,7 +443,7 @@ class ReplicationLayer(object): min_depth = yield self.store.get_min_depth_for_context(pdu.context) if min_depth and pdu.depth > min_depth: - for pdu_id, origin in pdu.prev_pdus: + for pdu_id, origin, hashes in pdu.prev_pdus: exists = yield self._get_persisted_pdu(pdu_id, origin) if not exists: diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 3518efb215..6a43007837 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -141,8 +141,16 @@ class Pdu(JsonEncodedObject): for kid, sig in pdu_tuple.signatures.items() } + prev_pdus = [] + for prev_pdu in pdu_tuple.prev_pdu_list: + prev_hashes = pdu_tuple.edge_hashes.get(prev_pdu, {}) + prev_hashes = { + alg: encode_base64(hsh) for alg, hsh in prev_hashes.items() + } + prev_pdus.append((prev_pdu[0], prev_pdu[1], prev_hashes)) + return Pdu( - prev_pdus=pdu_tuple.prev_pdu_list, + prev_pdus=prev_pdus, **args ) else: diff --git a/synapse/state.py b/synapse/state.py index 9db84c9b5c..bc6b928ec7 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -72,10 +72,6 @@ class StateHandler(object): snapshot.fill_out_prev_events(event) - event.prev_events = [ - e for e in event.prev_events if e != event.event_id - ] - current_state = snapshot.prev_state_pdu if current_state: diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index b2a3f0b56c..af05b47932 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -177,6 +177,14 @@ class DataStore(RoomMemberStore, RoomStore, txn, pdu.pdu_id, pdu.origin, key_id, signature_bytes, ) + for prev_pdu_id, prev_origin, prev_hashes in pdu.prev_pdus: + for alg, hash_base64 in prev_hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_prev_pdu_hash_txn( + txn, pdu.pdu_id, pdu.origin, prev_pdu_id, prev_origin, alg, + hash_bytes + ) + if pdu.is_state: self._persist_state_txn(txn, pdu.prev_pdus, cols) else: @@ -352,6 +360,7 @@ class DataStore(RoomMemberStore, RoomStore, prev_pdus = self._get_latest_pdus_in_context( txn, room_id ) + if state_type is not None and state_key is not None: prev_state_pdu = self._get_current_state_pdu( txn, room_id, state_type, state_key @@ -401,17 +410,16 @@ class Snapshot(object): self.prev_state_pdu = prev_state_pdu def fill_out_prev_events(self, event): - if hasattr(event, "prev_events"): + if hasattr(event, "prev_pdus"): return - es = [ - "%s@%s" % (p_id, origin) for p_id, origin, _ in self.prev_pdus + event.prev_pdus = [ + (p_id, origin, hashes) + for p_id, origin, hashes, _ in self.prev_pdus ] - event.prev_events = [e for e in es if e != event.event_id] - if self.prev_pdus: - event.depth = max([int(v) for _, _, v in self.prev_pdus]) + 1 + event.depth = max([int(v) for _, _, _, v in self.prev_pdus]) + 1 else: event.depth = 0 diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index 9d624429b7..a423b42dbd 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -20,10 +20,13 @@ from ._base import SQLBaseStore, Table, JoinHelper from synapse.federation.units import Pdu from synapse.util.logutils import log_function +from syutil.base64util import encode_base64 + from collections import namedtuple import logging + logger = logging.getLogger(__name__) @@ -64,6 +67,8 @@ class PduStore(SQLBaseStore): for r in PduEdgesTable.decode_results(txn.fetchall()) ] + edge_hashes = self._get_prev_pdu_hashes_txn(txn, pdu_id, origin) + hashes = self._get_pdu_hashes_txn(txn, pdu_id, origin) signatures = self._get_pdu_origin_signatures_txn( txn, pdu_id, origin @@ -86,7 +91,7 @@ class PduStore(SQLBaseStore): row = txn.fetchone() if row: results.append(PduTuple( - PduEntry(*row), edges, hashes, signatures + PduEntry(*row), edges, hashes, signatures, edge_hashes )) return results @@ -310,9 +315,14 @@ class PduStore(SQLBaseStore): (context, ) ) - results = txn.fetchall() + results = [] + for pdu_id, origin, depth in txn.fetchall(): + hashes = self._get_pdu_hashes_txn(txn, pdu_id, origin) + sha256_bytes = hashes["sha256"] + prev_hashes = {"sha256": encode_base64(sha256_bytes)} + results.append((pdu_id, origin, prev_hashes, depth)) - return [(row[0], row[1], row[2]) for row in results] + return results @defer.inlineCallbacks def get_oldest_pdus_in_context(self, context): @@ -431,7 +441,7 @@ class PduStore(SQLBaseStore): "DELETE FROM %s WHERE pdu_id = ? AND origin = ?" % PduForwardExtremitiesTable.table_name ) - txn.executemany(query, prev_pdus) + txn.executemany(query, list(p[:2] for p in prev_pdus)) # We only insert as a forward extremety the new pdu if there are no # other pdus that reference it as a prev pdu @@ -454,7 +464,7 @@ class PduStore(SQLBaseStore): # deleted in a second if they're incorrect anyway. txn.executemany( PduBackwardExtremitiesTable.insert_statement(), - [(i, o, context) for i, o in prev_pdus] + [(i, o, context) for i, o, _ in prev_pdus] ) # Also delete from the backwards extremities table all ones that @@ -915,7 +925,7 @@ This does not include a prev_pdus key. PduTuple = namedtuple( "PduTuple", - ("pdu_entry", "prev_pdu_list", "hashes", "signatures") + ("pdu_entry", "prev_pdu_list", "hashes", "signatures", "edge_hashes") ) """ This is a tuple of a `PduEntry` and a list of `PduIdTuple` that represent the `prev_pdus` key of a PDU. diff --git a/synapse/storage/schema/signatures.sql b/synapse/storage/schema/signatures.sql index 86ee0f2377..a72c4dc35f 100644 --- a/synapse/storage/schema/signatures.sql +++ b/synapse/storage/schema/signatures.sql @@ -34,3 +34,19 @@ CREATE TABLE IF NOT EXISTS pdu_origin_signatures ( CREATE INDEX IF NOT EXISTS pdu_origin_signatures_id ON pdu_origin_signatures ( pdu_id, origin ); + +CREATE TABLE IF NOT EXISTS pdu_edge_hashes( + pdu_id TEXT, + origin TEXT, + prev_pdu_id TEXT, + prev_origin TEXT, + algorithm TEXT, + hash BLOB, + CONSTRAINT uniqueness UNIQUE ( + pdu_id, origin, prev_pdu_id, prev_origin, algorithm + ) +); + +CREATE INDEX IF NOT EXISTS pdu_edge_hashes_id ON pdu_edge_hashes( + pdu_id, origin +); diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 1f0a680500..1147102489 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -88,3 +88,34 @@ class SignatureStore(SQLBaseStore): "signature": buffer(signature_bytes), }) + def _get_prev_pdu_hashes_txn(self, txn, pdu_id, origin): + """Get all the hashes for previous PDUs of a PDU + Args: + txn (cursor): + pdu_id (str): Id of the PDU. + origin (str): Origin of the PDU. + Returns: + dict of (pdu_id, origin) -> dict of algorithm -> hash_bytes. + """ + query = ( + "SELECT prev_pdu_id, prev_origin, algorithm, hash" + " FROM pdu_edge_hashes" + " WHERE pdu_id = ? and origin = ?" + ) + txn.execute(query, (pdu_id, origin)) + results = {} + for prev_pdu_id, prev_origin, algorithm, hash_bytes in txn.fetchall(): + hashes = results.setdefault((prev_pdu_id, prev_origin), {}) + hashes[algorithm] = hash_bytes + return results + + def _store_prev_pdu_hash_txn(self, txn, pdu_id, origin, prev_pdu_id, + prev_origin, algorithm, hash_bytes): + self._simple_insert_txn(txn, "pdu_edge_hashes", { + "pdu_id": pdu_id, + "origin": origin, + "prev_pdu_id": prev_pdu_id, + "prev_origin": prev_origin, + "algorithm": algorithm, + "hash": buffer(hash_bytes), + }) diff --git a/tests/federation/test_federation.py b/tests/federation/test_federation.py index 03b2167cf7..eed50e6335 100644 --- a/tests/federation/test_federation.py +++ b/tests/federation/test_federation.py @@ -41,7 +41,7 @@ def make_pdu(prev_pdus=[], **kwargs): } pdu_fields.update(kwargs) - return PduTuple(PduEntry(**pdu_fields), prev_pdus, {}, {}) + return PduTuple(PduEntry(**pdu_fields), prev_pdus, {}, {}, {}) class FederationTestCase(unittest.TestCase): diff --git a/tests/federation/test_pdu_codec.py b/tests/federation/test_pdu_codec.py index 80851a4258..0ad8cf6641 100644 --- a/tests/federation/test_pdu_codec.py +++ b/tests/federation/test_pdu_codec.py @@ -88,7 +88,7 @@ class PduCodecTestCase(unittest.TestCase): self.assertEquals(pdu.context, event.room_id) self.assertEquals(pdu.is_state, event.is_state) self.assertEquals(pdu.depth, event.depth) - self.assertEquals(["alice@bob.com"], event.prev_events) + self.assertEquals(pdu.prev_pdus, event.prev_pdus) self.assertEquals(pdu.content, event.content) def test_pdu_from_event(self): @@ -144,7 +144,7 @@ class PduCodecTestCase(unittest.TestCase): self.assertEquals(pdu.context, event.room_id) self.assertEquals(pdu.is_state, event.is_state) self.assertEquals(pdu.depth, event.depth) - self.assertEquals(["alice@bob.com"], event.prev_events) + self.assertEquals(pdu.prev_pdus, event.prev_pdus) self.assertEquals(pdu.content, event.content) self.assertEquals(pdu.state_key, event.state_key) -- cgit 1.4.1 From 4d1a7624f444deee4352645fbf73165e11f66dd0 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 17 Oct 2014 15:27:11 +0100 Subject: move 'age' into 'meta' subdict so that it is clearer that it is not part of the signed data --- synapse/federation/replication.py | 20 ++++++++++++++------ synapse/federation/units.py | 6 +++++- 2 files changed, 19 insertions(+), 7 deletions(-) (limited to 'synapse/federation/units.py') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 788a49b8e8..c4993aa5ee 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -295,6 +295,10 @@ class ReplicationLayer(object): transaction = Transaction(**transaction_data) for p in transaction.pdus: + if "meta" in p: + meta = p["meta"] + if "age" in meta: + p["age"] = meta["age"] if "age" in p: p["age_ts"] = int(self._clock.time_msec()) - int(p["age"]) del p["age"] @@ -414,14 +418,16 @@ class ReplicationLayer(object): transmission. """ pdus = [p.get_dict() for p in pdu_list] + time_now = self._clock.time_msec() for p in pdus: - if "age_ts" in pdus: - p["age"] = int(self.clock.time_msec()) - p["age_ts"] - + if "age_ts" in p: + age = time_now - p["age_ts"] + p.setdefault("meta", {})["age"] = int(age) + del p["age_ts"] return Transaction( origin=self.server_name, pdus=pdus, - ts=int(self._clock.time_msec()), + ts=int(time_now), destination=None, ) @@ -589,7 +595,7 @@ class _TransactionQueue(object): logger.debug("TX [%s] Persisting transaction...", destination) transaction = Transaction.create_new( - ts=self._clock.time_msec(), + ts=int(self._clock.time_msec()), transaction_id=str(self._next_txn_id), origin=self.server_name, destination=destination, @@ -614,7 +620,9 @@ class _TransactionQueue(object): if "pdus" in data: for p in data["pdus"]: if "age_ts" in p: - p["age"] = now - int(p["age_ts"]) + meta = p.setdefault("meta", {}) + meta["age"] = now - int(p["age_ts"]) + del p["age_ts"] return data code, response = yield self.transport_layer.send_transaction( diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 6a43007837..c4a10a4123 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -68,11 +68,11 @@ class Pdu(JsonEncodedObject): "signatures", "is_state", # Below this are keys valid only for State Pdus. "state_key", - "power_level", "prev_state_id", "prev_state_origin", "required_power_level", "user_id", + "meta" ] internal_keys = [ @@ -124,6 +124,10 @@ class Pdu(JsonEncodedObject): if pdu_tuple: d = copy.copy(pdu_tuple.pdu_entry._asdict()) + for k in d.keys(): + if d[k] is None: + del d[k] + d["content"] = json.loads(d["content_json"]) del d["content_json"] -- cgit 1.4.1 From c5cec1cc77029c21f0117c318c522ab320de3923 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 17 Oct 2014 16:50:04 +0100 Subject: Rename 'meta' to 'unsigned' --- docs/server-server/signing.rst | 16 ++++++++-------- synapse/crypto/event_signing.py | 4 +++- synapse/federation/replication.py | 14 +++++++------- synapse/federation/units.py | 1 - 4 files changed, 18 insertions(+), 17 deletions(-) (limited to 'synapse/federation/units.py') diff --git a/docs/server-server/signing.rst b/docs/server-server/signing.rst index dae10f121b..60c701ca91 100644 --- a/docs/server-server/signing.rst +++ b/docs/server-server/signing.rst @@ -1,13 +1,13 @@ Signing JSON ============ -JSON is signed by encoding the JSON object without ``signatures`` or ``meta`` +JSON is signed by encoding the JSON object without ``signatures`` or ``unsigned`` keys using a canonical encoding. The JSON bytes are then signed using the signature algorithm and the signature encoded using base64 with the padding stripped. The resulting base64 signature is added to an object under the *signing key identifier* which is added to the ``signatures`` object under the name of the server signing it which is added back to the original JSON object -along with the ``meta`` object. +along with the ``unsigned`` object. The *signing key identifier* is the concatenation of the *signing algorithm* and a *key version*. The *signing algorithm* identifies the algorithm used to @@ -15,8 +15,8 @@ sign the JSON. The currently support value for *signing algorithm* is ``ed25519`` as implemented by NACL (http://nacl.cr.yp.to/). The *key version* is used to distinguish between different signing keys used by the same entity. -The ``meta`` object and the ``signatures`` object are not covered by the -signature. Therefore intermediate servers can add metadata such as time stamps +The ``unsigned`` object and the ``signatures`` object are not covered by the +signature. Therefore intermediate servers can add unsigneddata such as time stamps and additional signatures. @@ -27,7 +27,7 @@ and additional signatures. "signing_keys": { "ed25519:1": "XSl0kuyvrXNj6A+7/tkrB9sxSbRi08Of5uRhxOqZtEQ" }, - "meta": { + "unsigned": { "retrieved_ts_ms": 922834800000 }, "signatures": { @@ -41,7 +41,7 @@ and additional signatures. def sign_json(json_object, signing_key, signing_name): signatures = json_object.pop("signatures", {}) - meta = json_object.pop("meta", None) + unsigned = json_object.pop("unsigned", None) signed = signing_key.sign(encode_canonical_json(json_object)) signature_base64 = encode_base64(signed.signature) @@ -50,8 +50,8 @@ and additional signatures. signatures.setdefault(sigature_name, {})[key_id] = signature_base64 json_object["signatures"] = signatures - if meta is not None: - json_object["meta"] = meta + if unsigned is not None: + json_object["unsigned"] = unsigned return json_object diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index 32d60bd30a..a236f7d708 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -47,7 +47,9 @@ def check_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): def _compute_content_hash(pdu, hash_algorithm): pdu_json = pdu.get_dict() - pdu_json.pop("meta", None) + #TODO: Make "age_ts" key internal + pdu_json.pop("age_ts") + pdu_json.pop("unsigned", None) pdu_json.pop("signatures", None) hashes = pdu_json.pop("hashes", {}) pdu_json_bytes = encode_canonical_json(pdu_json) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index c4993aa5ee..f2a5d4d5e2 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -295,10 +295,10 @@ class ReplicationLayer(object): transaction = Transaction(**transaction_data) for p in transaction.pdus: - if "meta" in p: - meta = p["meta"] - if "age" in meta: - p["age"] = meta["age"] + if "unsigned" in p: + unsigned = p["unsigned"] + if "age" in unsigned: + p["age"] = unsigned["age"] if "age" in p: p["age_ts"] = int(self._clock.time_msec()) - int(p["age"]) del p["age"] @@ -422,7 +422,7 @@ class ReplicationLayer(object): for p in pdus: if "age_ts" in p: age = time_now - p["age_ts"] - p.setdefault("meta", {})["age"] = int(age) + p.setdefault("unsigned", {})["age"] = int(age) del p["age_ts"] return Transaction( origin=self.server_name, @@ -620,8 +620,8 @@ class _TransactionQueue(object): if "pdus" in data: for p in data["pdus"]: if "age_ts" in p: - meta = p.setdefault("meta", {}) - meta["age"] = now - int(p["age_ts"]) + unsigned = p.setdefault("unsigned", {}) + unsigned["age"] = now - int(p["age_ts"]) del p["age_ts"] return data diff --git a/synapse/federation/units.py b/synapse/federation/units.py index c4a10a4123..c629e5793e 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -72,7 +72,6 @@ class Pdu(JsonEncodedObject): "prev_state_origin", "required_power_level", "user_id", - "meta" ] internal_keys = [ -- cgit 1.4.1 From 8afbece68319728e20c3b32c2f949fd1745d405e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 17 Oct 2014 19:41:32 +0100 Subject: Remove signatures from pdu when computing hashes to use for prev pdus, make sure is_state is a boolean. --- synapse/crypto/event_signing.py | 6 +++++- synapse/federation/units.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) (limited to 'synapse/federation/units.py') diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index a236f7d708..d3b501c6e7 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -22,6 +22,9 @@ from syutil.base64util import encode_base64, decode_base64 from syutil.crypto.jsonsign import sign_json, verify_signed_json import hashlib +import logging + +logger = logging.getLogger(__name__) def add_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): @@ -48,7 +51,7 @@ def check_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): def _compute_content_hash(pdu, hash_algorithm): pdu_json = pdu.get_dict() #TODO: Make "age_ts" key internal - pdu_json.pop("age_ts") + pdu_json.pop("age_ts", None) pdu_json.pop("unsigned", None) pdu_json.pop("signatures", None) hashes = pdu_json.pop("hashes", {}) @@ -60,6 +63,7 @@ def compute_pdu_event_reference_hash(pdu, hash_algorithm=hashlib.sha256): tmp_pdu = Pdu(**pdu.get_dict()) tmp_pdu = prune_pdu(tmp_pdu) pdu_json = tmp_pdu.get_dict() + pdu_json.pop("signatures", None) pdu_json_bytes = encode_canonical_json(pdu_json) hashed = hash_algorithm(pdu_json_bytes) return (hashed.name, hashed.digest()) diff --git a/synapse/federation/units.py b/synapse/federation/units.py index b81e162512..b779d259bd 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -101,7 +101,7 @@ class Pdu(JsonEncodedObject): super(Pdu, self).__init__( destinations=destinations, - is_state=is_state, + is_state=bool(is_state), prev_pdus=prev_pdus, outlier=outlier, hashes=hashes, -- cgit 1.4.1 From 5e2236f9ffe3a66bbe0ff37b1793e8fa59a1c475 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 27 Oct 2014 11:19:15 +0000 Subject: fix pyflakes warnings --- synapse/crypto/event_signing.py | 8 ++++---- synapse/federation/units.py | 2 ++ synapse/storage/signatures.py | 2 -- 3 files changed, 6 insertions(+), 6 deletions(-) (limited to 'synapse/federation/units.py') diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index d3b501c6e7..61edd2c6f9 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -35,12 +35,12 @@ def add_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): def check_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): """Check whether the hash for this PDU matches the contents""" - computed_hash = _compute_content_hash(pdu, hash_algortithm) + computed_hash = _compute_content_hash(pdu, hash_algorithm) if computed_hash.name not in pdu.hashes: raise Exception("Algorithm %s not in hashes %s" % ( computed_hash.name, list(pdu.hashes) )) - message_hash_base64 = hashes[computed_hash.name] + message_hash_base64 = pdu.hashes[computed_hash.name] try: message_hash_bytes = decode_base64(message_hash_base64) except: @@ -54,7 +54,7 @@ def _compute_content_hash(pdu, hash_algorithm): pdu_json.pop("age_ts", None) pdu_json.pop("unsigned", None) pdu_json.pop("signatures", None) - hashes = pdu_json.pop("hashes", {}) + pdu_json.pop("hashes", None) pdu_json_bytes = encode_canonical_json(pdu_json) return hash_algorithm(pdu_json_bytes) @@ -73,7 +73,7 @@ def sign_event_pdu(pdu, signature_name, signing_key): tmp_pdu = Pdu(**pdu.get_dict()) tmp_pdu = prune_pdu(tmp_pdu) pdu_json = tmp_pdu.get_dict() - pdu_jdon = sign_json(pdu_json, signature_name, signing_key) + pdu_json = sign_json(pdu_json, signature_name, signing_key) pdu.signatures = pdu_json["signatures"] return pdu diff --git a/synapse/federation/units.py b/synapse/federation/units.py index b779d259bd..adc3385644 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -155,6 +155,8 @@ class Pdu(JsonEncodedObject): return Pdu( prev_pdus=prev_pdus, + hashes=hashes, + signatures=signatures, **args ) else: diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 85eec7ffbe..82be946d3f 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -15,8 +15,6 @@ from _base import SQLBaseStore -from twisted.internet import defer - class SignatureStore(SQLBaseStore): """Persistence for PDU signatures and hashes""" -- cgit 1.4.1 From ad6eacb3e9424902da9f83c8f106a4f0169c3108 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Nov 2014 13:06:58 +0000 Subject: Rename PDU fields to match that of events. --- synapse/api/events/utils.py | 2 +- synapse/federation/pdu_codec.py | 48 +--------- synapse/federation/replication.py | 72 ++++++--------- synapse/federation/transport.py | 184 +++++++------------------------------- synapse/federation/units.py | 78 +++------------- synapse/handlers/federation.py | 12 ++- 6 files changed, 80 insertions(+), 316 deletions(-) (limited to 'synapse/federation/units.py') diff --git a/synapse/api/events/utils.py b/synapse/api/events/utils.py index 7fdf45a264..31601fd3a9 100644 --- a/synapse/api/events/utils.py +++ b/synapse/api/events/utils.py @@ -32,7 +32,7 @@ def prune_event(event): def prune_pdu(pdu): """Removes keys that contain unrestricted and non-essential data from a PDU """ - return _prune_event_or_pdu(pdu.pdu_type, pdu) + return _prune_event_or_pdu(pdu.type, pdu) def _prune_event_or_pdu(event_type, event): # Remove all extraneous fields. diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index d4c896e163..5ec97a698e 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -31,39 +31,16 @@ class PduCodec(object): self.clock = hs.get_clock() self.hs = hs - def encode_event_id(self, local, domain): - return local - - def decode_event_id(self, event_id): - e_id = self.hs.parse_eventid(event_id) - return event_id, e_id.domain - def event_from_pdu(self, pdu): kwargs = {} - kwargs["event_id"] = self.encode_event_id(pdu.pdu_id, pdu.origin) - kwargs["room_id"] = pdu.context - kwargs["etype"] = pdu.pdu_type - kwargs["prev_events"] = [ - (self.encode_event_id(i, o), s) - for i, o, s in pdu.prev_pdus - ] - - if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"): - kwargs["prev_state"] = self.encode_event_id( - pdu.prev_state_id, pdu.prev_state_origin - ) + kwargs["etype"] = pdu.type kwargs.update({ k: v for k, v in pdu.get_full_dict().items() if k not in [ - "pdu_id", - "context", - "pdu_type", - "prev_pdus", - "prev_state_id", - "prev_state_origin", + "type", ] }) @@ -72,33 +49,12 @@ class PduCodec(object): def pdu_from_event(self, event): d = event.get_full_dict() - d["pdu_id"], d["origin"] = self.decode_event_id( - event.event_id - ) - d["context"] = event.room_id - d["pdu_type"] = event.type - - if hasattr(event, "prev_events"): - def f(e, s): - i, o = self.decode_event_id(e) - return i, o, s - d["prev_pdus"] = [ - f(e, s) - for e, s in event.prev_events - ] - - if hasattr(event, "prev_state"): - d["prev_state_id"], d["prev_state_origin"] = ( - self.decode_event_id(event.prev_state) - ) - if hasattr(event, "state_key"): d["is_state"] = True kwargs = copy.deepcopy(event.unrecognized_keys) kwargs.update({ k: v for k, v in d.items() - if k not in ["event_id", "room_id", "type", "prev_events"] }) if "origin_server_ts" not in kwargs: diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 159af4eed7..838e660a46 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -111,14 +111,6 @@ class ReplicationLayer(object): """Informs the replication layer about a new PDU generated within the home server that should be transmitted to others. - This will fill out various attributes on the PDU object, e.g. the - `prev_pdus` key. - - *Note:* The home server should always call `send_pdu` even if it knows - that it does not need to be replicated to other home servers. This is - in case e.g. someone else joins via a remote home server and then - backfills. - TODO: Figure out when we should actually resolve the deferred. Args: @@ -131,18 +123,12 @@ class ReplicationLayer(object): order = self._order self._order += 1 - logger.debug("[%s] Persisting PDU", pdu.pdu_id) - - # Save *before* trying to send - # yield self.store.persist_event(pdu=pdu) - - logger.debug("[%s] Persisted PDU", pdu.pdu_id) - logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.pdu_id) + logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id) # TODO, add errback, etc. self._transaction_queue.enqueue_pdu(pdu, order) - logger.debug("[%s] transaction_layer.enqueue_pdu... done", pdu.pdu_id) + logger.debug("[%s] transaction_layer.enqueue_pdu... done", pdu.event_id) @log_function def send_edu(self, destination, edu_type, content): @@ -215,7 +201,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def get_pdu(self, destination, pdu_origin, pdu_id, outlier=False): + def get_pdu(self, destination, event_id, outlier=False): """Requests the PDU with given origin and ID from the remote home server. @@ -224,7 +210,7 @@ class ReplicationLayer(object): Args: destination (str): Which home server to query pdu_origin (str): The home server that originally sent the pdu. - pdu_id (str) + event_id (str) outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if it's from an arbitary point in the context as opposed to part of the current block of PDUs. Defaults to `False` @@ -233,8 +219,9 @@ class ReplicationLayer(object): Deferred: Results in the requested PDU. """ - transaction_data = yield self.transport_layer.get_pdu( - destination, pdu_origin, pdu_id) + transaction_data = yield self.transport_layer.get_event( + destination, event_id + ) transaction = Transaction(**transaction_data) @@ -249,8 +236,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def get_state_for_context(self, destination, context, pdu_id=None, - pdu_origin=None): + def get_state_for_context(self, destination, context, event_id=None): """Requests all of the `current` state PDUs for a given context from a remote home server. @@ -263,7 +249,9 @@ class ReplicationLayer(object): """ transaction_data = yield self.transport_layer.get_context_state( - destination, context, pdu_id=pdu_id, pdu_origin=pdu_origin, + destination, + context, + event_id=event_id, ) transaction = Transaction(**transaction_data) @@ -352,10 +340,10 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_context_state_request(self, context, pdu_id, pdu_origin): - if pdu_id and pdu_origin: + def on_context_state_request(self, context, event_id): + if event_id: pdus = yield self.handler.get_state_for_pdu( - pdu_id, pdu_origin + event_id ) else: raise NotImplementedError("Specify an event") @@ -370,8 +358,8 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_pdu_request(self, pdu_origin, pdu_id): - pdu = yield self._get_persisted_pdu(pdu_id, pdu_origin) + def on_pdu_request(self, event_id): + pdu = yield self._get_persisted_pdu(event_id) if pdu: defer.returnValue( @@ -443,9 +431,8 @@ class ReplicationLayer(object): def send_join(self, destination, pdu): _, content = yield self.transport_layer.send_join( destination, - pdu.context, - pdu.pdu_id, - pdu.origin, + pdu.room_id, + pdu.event_id, pdu.get_dict(), ) @@ -457,13 +444,13 @@ class ReplicationLayer(object): defer.returnValue(pdus) @log_function - def _get_persisted_pdu(self, pdu_id, pdu_origin): + def _get_persisted_pdu(self, event_id): """ Get a PDU from the database with given origin and id. Returns: Deferred: Results in a `Pdu`. """ - return self.handler.get_persisted_pdu(pdu_id, pdu_origin) + return self.handler.get_persisted_pdu(event_id) def _transaction_from_pdus(self, pdu_list): """Returns a new Transaction containing the given PDUs suitable for @@ -487,10 +474,10 @@ class ReplicationLayer(object): @log_function def _handle_new_pdu(self, origin, pdu, backfilled=False): # We reprocess pdus when we have seen them only as outliers - existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin) + existing = yield self._get_persisted_pdu(pdu.event_id) if existing and (not existing.outlier or pdu.outlier): - logger.debug("Already seen pdu %s %s", pdu.pdu_id, pdu.origin) + logger.debug("Already seen pdu %s", pdu.event_id) defer.returnValue({}) return @@ -500,23 +487,22 @@ class ReplicationLayer(object): if not pdu.outlier: # We only backfill backwards to the min depth. min_depth = yield self.handler.get_min_depth_for_context( - pdu.context + pdu.room_id ) if min_depth and pdu.depth > min_depth: - for pdu_id, origin, hashes in pdu.prev_pdus: - exists = yield self._get_persisted_pdu(pdu_id, origin) + for event_id, hashes in pdu.prev_events: + exists = yield self._get_persisted_pdu(event_id) if not exists: - logger.debug("Requesting pdu %s %s", pdu_id, origin) + logger.debug("Requesting pdu %s", event_id) try: yield self.get_pdu( pdu.origin, - pdu_id=pdu_id, - pdu_origin=origin + event_id=event_id, ) - logger.debug("Processed pdu %s %s", pdu_id, origin) + logger.debug("Processed pdu %s", event_id) except: # TODO(erikj): Do some more intelligent retries. logger.exception("Failed to get PDU") @@ -524,7 +510,7 @@ class ReplicationLayer(object): # We need to get the state at this event, since we have reached # a backward extremity edge. state = yield self.get_state_for_context( - origin, pdu.context, pdu.pdu_id, pdu.origin, + origin, pdu.room_id, pdu.event_id, ) # Persist the Pdu, but don't mark it as processed yet. diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index 7f01b4faaf..04ad7e63ae 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -72,8 +72,7 @@ class TransportLayer(object): self.received_handler = None @log_function - def get_context_state(self, destination, context, pdu_id=None, - pdu_origin=None): + def get_context_state(self, destination, context, event_id=None): """ Requests all state for a given context (i.e. room) from the given server. @@ -91,60 +90,59 @@ class TransportLayer(object): subpath = "/state/%s/" % context args = {} - if pdu_id and pdu_origin: - args["pdu_id"] = pdu_id - args["pdu_origin"] = pdu_origin + if event_id: + args["event_id"] = event_id return self._do_request_for_transaction( destination, subpath, args=args ) @log_function - def get_pdu(self, destination, pdu_origin, pdu_id): + def get_event(self, destination, event_id): """ Requests the pdu with give id and origin from the given server. Args: destination (str): The host name of the remote home server we want to get the state from. - pdu_origin (str): The home server which created the PDU. - pdu_id (str): The id of the PDU being requested. + event_id (str): The id of the event being requested. Returns: Deferred: Results in a dict received from the remote homeserver. """ - logger.debug("get_pdu dest=%s, pdu_origin=%s, pdu_id=%s", - destination, pdu_origin, pdu_id) + logger.debug("get_pdu dest=%s, event_id=%s", + destination, event_id) - subpath = "/pdu/%s/%s/" % (pdu_origin, pdu_id) + subpath = "/event/%s/" % (event_id, ) return self._do_request_for_transaction(destination, subpath) @log_function - def backfill(self, dest, context, pdu_tuples, limit): + def backfill(self, dest, context, event_tuples, limit): """ Requests `limit` previous PDUs in a given context before list of PDUs. Args: dest (str) context (str) - pdu_tuples (list) + event_tuples (list) limt (int) Returns: Deferred: Results in a dict received from the remote homeserver. """ logger.debug( - "backfill dest=%s, context=%s, pdu_tuples=%s, limit=%s", - dest, context, repr(pdu_tuples), str(limit) + "backfill dest=%s, context=%s, event_tuples=%s, limit=%s", + dest, context, repr(event_tuples), str(limit) ) - if not pdu_tuples: + if not event_tuples: + # TODO: raise? return - subpath = "/backfill/%s/" % context + subpath = "/backfill/%s/" % (context,) args = { - "v": ["%s,%s" % (i, o) for i, o in pdu_tuples], + "v": event_tuples, "limit": limit, } @@ -222,11 +220,10 @@ class TransportLayer(object): @defer.inlineCallbacks @log_function - def send_join(self, destination, context, pdu_id, origin, content): - path = PREFIX + "/send_join/%s/%s/%s" % ( + def send_join(self, destination, context, event_id, content): + path = PREFIX + "/send_join/%s/%s" % ( context, - origin, - pdu_id, + event_id, ) code, content = yield self.client.put_json( @@ -242,11 +239,10 @@ class TransportLayer(object): @defer.inlineCallbacks @log_function - def send_invite(self, destination, context, pdu_id, origin, content): - path = PREFIX + "/invite/%s/%s/%s" % ( + def send_invite(self, destination, context, event_id, content): + path = PREFIX + "/invite/%s/%s" % ( context, - origin, - pdu_id, + event_id, ) code, content = yield self.client.put_json( @@ -376,10 +372,10 @@ class TransportLayer(object): # data_id pair. self.server.register_path( "GET", - re.compile("^" + PREFIX + "/pdu/([^/]*)/([^/]*)/$"), + re.compile("^" + PREFIX + "/event/([^/]*)/$"), self._with_authentication( - lambda origin, content, query, pdu_origin, pdu_id: - handler.on_pdu_request(pdu_origin, pdu_id) + lambda origin, content, query, event_id: + handler.on_pdu_request(event_id) ) ) @@ -391,8 +387,7 @@ class TransportLayer(object): lambda origin, content, query, context: handler.on_context_state_request( context, - query.get("pdu_id", [None])[0], - query.get("pdu_origin", [None])[0] + query.get("event_id", [None])[0], ) ) ) @@ -442,9 +437,9 @@ class TransportLayer(object): self.server.register_path( "PUT", - re.compile("^" + PREFIX + "/send_join/([^/]*)/([^/]*)/([^/]*)$"), + re.compile("^" + PREFIX + "/send_join/([^/]*)/([^/]*)$"), self._with_authentication( - lambda origin, content, query, context, pdu_origin, pdu_id: + lambda origin, content, query, context, event_id: self._on_send_join_request( origin, content, query, ) @@ -453,9 +448,9 @@ class TransportLayer(object): self.server.register_path( "PUT", - re.compile("^" + PREFIX + "/invite/([^/]*)/([^/]*)/([^/]*)$"), + re.compile("^" + PREFIX + "/invite/([^/]*)/([^/]*)$"), self._with_authentication( - lambda origin, content, query, context, pdu_origin, pdu_id: + lambda origin, content, query, context, event_id: self._on_invite_request( origin, content, query, ) @@ -548,7 +543,7 @@ class TransportLayer(object): limit = int(limits[-1]) - versions = [v.split(",", 1) for v in v_list] + versions = v_list return self.request_handler.on_backfill_request( context, versions, limit @@ -579,120 +574,3 @@ class TransportLayer(object): ) defer.returnValue((200, content)) - - -class TransportReceivedHandler(object): - """ Callbacks used when we receive a transaction - """ - def on_incoming_transaction(self, transaction): - """ Called on PUT /send/, or on response to a request - that we sent (e.g. a backfill request) - - Args: - transaction (synapse.transaction.Transaction): The transaction that - was sent to us. - - Returns: - twisted.internet.defer.Deferred: A deferred that gets fired when - the transaction has finished being processed. - - The result should be a tuple in the form of - `(response_code, respond_body)`, where `response_body` is a python - dict that will get serialized to JSON. - - On errors, the dict should have an `error` key with a brief message - of what went wrong. - """ - pass - - -class TransportRequestHandler(object): - """ Handlers used when someone want's data from us - """ - def on_pull_request(self, versions): - """ Called on GET /pull/?v=... - - This is hit when a remote home server wants to get all data - after a given transaction. Mainly used when a home server comes back - online and wants to get everything it has missed. - - Args: - versions (list): A list of transaction_ids that should be used to - determine what PDUs the remote side have not yet seen. - - Returns: - Deferred: Resultsin a tuple in the form of - `(response_code, respond_body)`, where `response_body` is a python - dict that will get serialized to JSON. - - On errors, the dict should have an `error` key with a brief message - of what went wrong. - """ - pass - - def on_pdu_request(self, pdu_origin, pdu_id): - """ Called on GET /pdu/// - - Someone wants a particular PDU. This PDU may or may not have originated - from us. - - Args: - pdu_origin (str) - pdu_id (str) - - Returns: - Deferred: Resultsin a tuple in the form of - `(response_code, respond_body)`, where `response_body` is a python - dict that will get serialized to JSON. - - On errors, the dict should have an `error` key with a brief message - of what went wrong. - """ - pass - - def on_context_state_request(self, context): - """ Called on GET /state// - - Gets hit when someone wants all the *current* state for a given - contexts. - - Args: - context (str): The name of the context that we're interested in. - - Returns: - twisted.internet.defer.Deferred: A deferred that gets fired when - the transaction has finished being processed. - - The result should be a tuple in the form of - `(response_code, respond_body)`, where `response_body` is a python - dict that will get serialized to JSON. - - On errors, the dict should have an `error` key with a brief message - of what went wrong. - """ - pass - - def on_backfill_request(self, context, versions, limit): - """ Called on GET /backfill//?v=...&limit=... - - Gets hit when we want to backfill backwards on a given context from - the given point. - - Args: - context (str): The context to backfill - versions (list): A list of 2-tuples representing where to backfill - from, in the form `(pdu_id, origin)` - limit (int): How many pdus to return. - - Returns: - Deferred: Results in a tuple in the form of - `(response_code, respond_body)`, where `response_body` is a python - dict that will get serialized to JSON. - - On errors, the dict should have an `error` key with a brief message - of what went wrong. - """ - pass - - def on_query_request(self): - """ Called on a GET /query/ request. """ diff --git a/synapse/federation/units.py b/synapse/federation/units.py index adc3385644..c94dcf64cf 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -34,13 +34,13 @@ class Pdu(JsonEncodedObject): A Pdu can be classified as "state". For a given context, we can efficiently retrieve all state pdu's that haven't been clobbered. Clobbering is done - via a unique constraint on the tuple (context, pdu_type, state_key). A pdu + via a unique constraint on the tuple (context, type, state_key). A pdu is a state pdu if `is_state` is True. Example pdu:: { - "pdu_id": "78c", + "event_id": "$78c:example.com", "origin_server_ts": 1404835423000, "origin": "bar", "prev_ids": [ @@ -53,14 +53,14 @@ class Pdu(JsonEncodedObject): """ valid_keys = [ - "pdu_id", - "context", + "event_id", + "room_id", "origin", "origin_server_ts", - "pdu_type", + "type", "destinations", "transaction_id", - "prev_pdus", + "prev_events", "depth", "content", "outlier", @@ -68,8 +68,7 @@ class Pdu(JsonEncodedObject): "signatures", "is_state", # Below this are keys valid only for State Pdus. "state_key", - "prev_state_id", - "prev_state_origin", + "prev_state", "required_power_level", "user_id", ] @@ -81,18 +80,18 @@ class Pdu(JsonEncodedObject): ] required_keys = [ - "pdu_id", - "context", + "event_id", + "room_id", "origin", "origin_server_ts", - "pdu_type", + "type", "content", ] # TODO: We need to make this properly load content rather than # just leaving it as a dict. (OR DO WE?!) - def __init__(self, destinations=[], is_state=False, prev_pdus=[], + def __init__(self, destinations=[], is_state=False, prev_events=[], outlier=False, hashes={}, signatures={}, **kwargs): if is_state: for required_key in ["state_key"]: @@ -102,66 +101,13 @@ class Pdu(JsonEncodedObject): super(Pdu, self).__init__( destinations=destinations, is_state=bool(is_state), - prev_pdus=prev_pdus, + prev_events=prev_events, outlier=outlier, hashes=hashes, signatures=signatures, **kwargs ) - @classmethod - def from_pdu_tuple(cls, pdu_tuple): - """ Converts a PduTuple to a Pdu - - Args: - pdu_tuple (synapse.persistence.transactions.PduTuple): The tuple to - convert - - Returns: - Pdu - """ - if pdu_tuple: - d = copy.copy(pdu_tuple.pdu_entry._asdict()) - d["origin_server_ts"] = d.pop("ts") - - for k in d.keys(): - if d[k] is None: - del d[k] - - d["content"] = json.loads(d["content_json"]) - del d["content_json"] - - args = {f: d[f] for f in cls.valid_keys if f in d} - if "unrecognized_keys" in d and d["unrecognized_keys"]: - args.update(json.loads(d["unrecognized_keys"])) - - hashes = { - alg: encode_base64(hsh) - for alg, hsh in pdu_tuple.hashes.items() - } - - signatures = { - kid: encode_base64(sig) - for kid, sig in pdu_tuple.signatures.items() - } - - prev_pdus = [] - for prev_pdu in pdu_tuple.prev_pdu_list: - prev_hashes = pdu_tuple.edge_hashes.get(prev_pdu, {}) - prev_hashes = { - alg: encode_base64(hsh) for alg, hsh in prev_hashes.items() - } - prev_pdus.append((prev_pdu[0], prev_pdu[1], prev_hashes)) - - return Pdu( - prev_pdus=prev_pdus, - hashes=hashes, - signatures=signatures, - **args - ) - else: - return None - def __str__(self): return "(%s, %s)" % (self.__class__.__name__, repr(self.__dict__)) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 18cb1d4e97..bdd28f04bb 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -139,7 +139,7 @@ class FederationHandler(BaseHandler): # Huh, let's try and get the current state try: yield self.replication_layer.get_state_for_context( - event.origin, event.room_id, pdu.pdu_id, pdu.origin, + event.origin, event.room_id, event.event_id, ) hosts = yield self.store.get_joined_hosts_for_room( @@ -368,11 +368,9 @@ class FederationHandler(BaseHandler): ]) @defer.inlineCallbacks - def get_state_for_pdu(self, pdu_id, pdu_origin): + def get_state_for_pdu(self, event_id): yield run_on_reactor() - event_id = EventID.create(pdu_id, pdu_origin, self.hs).to_string() - state_groups = yield self.store.get_state_groups( [event_id] ) @@ -406,7 +404,7 @@ class FederationHandler(BaseHandler): events = yield self.store.get_backfill_events( context, - [self.pdu_codec.encode_event_id(i, o) for i, o in pdu_list], + pdu_list, limit ) @@ -417,14 +415,14 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def get_persisted_pdu(self, pdu_id, origin): + def get_persisted_pdu(self, event_id): """ Get a PDU from the database with given origin and id. Returns: Deferred: Results in a `Pdu`. """ event = yield self.store.get_event( - self.pdu_codec.encode_event_id(pdu_id, origin), + event_id, allow_none=True, ) -- cgit 1.4.1 From 68698e0ac8c39083f6ab7d377a48b5bead3d3598 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Nov 2014 17:51:42 +0000 Subject: Fix bugs in generating event signatures and hashing --- scripts/check_event_hash.py | 12 +++-- scripts/check_signature.py | 1 - synapse/api/events/__init__.py | 1 + synapse/crypto/event_signing.py | 100 +++++++++++++++------------------------- synapse/federation/pdu_codec.py | 13 +----- synapse/federation/units.py | 11 +---- 6 files changed, 50 insertions(+), 88 deletions(-) (limited to 'synapse/federation/units.py') diff --git a/scripts/check_event_hash.py b/scripts/check_event_hash.py index 9fa4452ee6..7c32f8102a 100644 --- a/scripts/check_event_hash.py +++ b/scripts/check_event_hash.py @@ -6,6 +6,7 @@ import hashlib import sys import json + class dictobj(dict): def __init__(self, *args, **kargs): dict.__init__(self, *args, **kargs) @@ -14,9 +15,12 @@ class dictobj(dict): def get_dict(self): return dict(self) + def get_full_dict(self): + return dict(self) + def main(): - parser = parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser() parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'), default=sys.stdin) args = parser.parse_args() @@ -29,14 +33,14 @@ def main(): } for alg_name in event_json.hashes: - if check_event_pdu_content_hash(event_json, algorithms[alg_name]): + if check_event_content_hash(event_json, algorithms[alg_name]): print "PASS content hash %s" % (alg_name,) else: print "FAIL content hash %s" % (alg_name,) for algorithm in algorithms.values(): - name, h_bytes = compute_pdu_event_reference_hash(event_json, algorithm) - print "Reference hash %s: %s" % (name, encode_base64(bytes)) + name, h_bytes = compute_event_reference_hash(event_json, algorithm) + print "Reference hash %s: %s" % (name, encode_base64(h_bytes)) if __name__=="__main__": main() diff --git a/scripts/check_signature.py b/scripts/check_signature.py index e7964e7e71..e146e18e24 100644 --- a/scripts/check_signature.py +++ b/scripts/check_signature.py @@ -1,5 +1,4 @@ -from synapse.crypto.event_signing import verify_signed_event_pdu from syutil.crypto.jsonsign import verify_signed_json from syutil.crypto.signing_key import ( decode_verify_key_bytes, write_signing_keys diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index b855811b98..168b812311 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -61,6 +61,7 @@ class SynapseEvent(JsonEncodedObject): "prev_content", "prev_state", "redacted_because", + "origin_server_ts", ] internal_keys = [ diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index 0e8bc7eb6c..de5d2e7465 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -15,11 +15,11 @@ # limitations under the License. -from synapse.federation.units import Pdu -from synapse.api.events.utils import prune_pdu, prune_event +from synapse.api.events.utils import prune_event from syutil.jsonutil import encode_canonical_json from syutil.base64util import encode_base64, decode_base64 -from syutil.crypto.jsonsign import sign_json, verify_signed_json +from syutil.crypto.jsonsign import sign_json +from synapse.api.events.room import GenericEvent import copy import hashlib @@ -28,20 +28,14 @@ import logging logger = logging.getLogger(__name__) -def add_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): - hashed = _compute_content_hash(pdu, hash_algorithm) - pdu.hashes[hashed.name] = encode_base64(hashed.digest()) - return pdu - - -def check_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): +def check_event_content_hash(event, hash_algorithm=hashlib.sha256): """Check whether the hash for this PDU matches the contents""" - computed_hash = _compute_content_hash(pdu, hash_algorithm) - if computed_hash.name not in pdu.hashes: + computed_hash = _compute_content_hash(event, hash_algorithm) + if computed_hash.name not in event.hashes: raise Exception("Algorithm %s not in hashes %s" % ( - computed_hash.name, list(pdu.hashes) + computed_hash.name, list(event.hashes) )) - message_hash_base64 = pdu.hashes[computed_hash.name] + message_hash_base64 = event.hashes[computed_hash.name] try: message_hash_bytes = decode_base64(message_hash_base64) except: @@ -49,70 +43,52 @@ def check_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): return message_hash_bytes == computed_hash.digest() -def _compute_content_hash(pdu, hash_algorithm): - pdu_json = pdu.get_dict() - #TODO: Make "age_ts" key internal - pdu_json.pop("age_ts", None) - pdu_json.pop("unsigned", None) - pdu_json.pop("signatures", None) - pdu_json.pop("hashes", None) - pdu_json_bytes = encode_canonical_json(pdu_json) - return hash_algorithm(pdu_json_bytes) - - -def compute_pdu_event_reference_hash(pdu, hash_algorithm=hashlib.sha256): - tmp_pdu = Pdu(**pdu.get_dict()) - tmp_pdu = prune_pdu(tmp_pdu) - pdu_json = tmp_pdu.get_dict() - pdu_json.pop("signatures", None) - pdu_json_bytes = encode_canonical_json(pdu_json) - hashed = hash_algorithm(pdu_json_bytes) - return (hashed.name, hashed.digest()) +def _compute_content_hash(event, hash_algorithm): + event_json = event.get_full_dict() + #TODO: We need to sign the JSON that is going out via fedaration. + event_json.pop("age_ts", None) + event_json.pop("unsigned", None) + event_json.pop("signatures", None) + event_json.pop("hashes", None) + event_json_bytes = encode_canonical_json(event_json) + return hash_algorithm(event_json_bytes) def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256): - tmp_event = copy.deepcopy(event) + # FIXME(erikj): GenericEvent! + tmp_event = GenericEvent(**event.get_full_dict()) tmp_event = prune_event(tmp_event) event_json = tmp_event.get_dict() event_json.pop("signatures", None) + event_json.pop("age_ts", None) + event_json.pop("unsigned", None) event_json_bytes = encode_canonical_json(event_json) hashed = hash_algorithm(event_json_bytes) return (hashed.name, hashed.digest()) -def sign_event_pdu(pdu, signature_name, signing_key): - tmp_pdu = Pdu(**pdu.get_dict()) - tmp_pdu = prune_pdu(tmp_pdu) - pdu_json = tmp_pdu.get_dict() - pdu_json = sign_json(pdu_json, signature_name, signing_key) - pdu.signatures = pdu_json["signatures"] - return pdu - - -def verify_signed_event_pdu(pdu, signature_name, verify_key): - tmp_pdu = Pdu(**pdu.get_dict()) - tmp_pdu = prune_pdu(tmp_pdu) - pdu_json = tmp_pdu.get_dict() - verify_signed_json(pdu_json, signature_name, verify_key) - - -def add_hashes_and_signatures(event, signature_name, signing_key, - hash_algorithm=hashlib.sha256): +def compute_event_signature(event, signature_name, signing_key): tmp_event = copy.deepcopy(event) tmp_event = prune_event(tmp_event) - redact_json = tmp_event.get_dict() + redact_json = tmp_event.get_full_dict() redact_json.pop("signatures", None) + redact_json.pop("age_ts", None) + redact_json.pop("unsigned", None) + logger.debug("Signing event: %s", redact_json) redact_json = sign_json(redact_json, signature_name, signing_key) - event.signatures = redact_json["signatures"] + return redact_json["signatures"] + + +def add_hashes_and_signatures(event, signature_name, signing_key, + hash_algorithm=hashlib.sha256): + hashed = _compute_content_hash(event, hash_algorithm=hash_algorithm) - event_json = event.get_full_dict() - #TODO: We need to sign the JSON that is going out via fedaration. - event_json.pop("age_ts", None) - event_json.pop("unsigned", None) - event_json.pop("signatures", None) - event_json.pop("hashes", None) - event_json_bytes = encode_canonical_json(event_json) - hashed = hash_algorithm(event_json_bytes) if not hasattr(event, "hashes"): event.hashes = {} event.hashes[hashed.name] = encode_base64(hashed.digest()) + + event.signatures = compute_event_signature( + event, + signature_name=signature_name, + signing_key=signing_key, + ) diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index 5ec97a698e..52c84efb5b 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -14,10 +14,6 @@ # limitations under the License. from .units import Pdu -from synapse.crypto.event_signing import ( - add_event_pdu_content_hash, sign_event_pdu -) -from synapse.types import EventID import copy @@ -49,17 +45,10 @@ class PduCodec(object): def pdu_from_event(self, event): d = event.get_full_dict() - if hasattr(event, "state_key"): - d["is_state"] = True - kwargs = copy.deepcopy(event.unrecognized_keys) kwargs.update({ k: v for k, v in d.items() }) - if "origin_server_ts" not in kwargs: - kwargs["origin_server_ts"] = int(self.clock.time_msec()) - pdu = Pdu(**kwargs) - pdu = add_event_pdu_content_hash(pdu) - return sign_event_pdu(pdu, self.server_name, self.signing_key) + return pdu diff --git a/synapse/federation/units.py b/synapse/federation/units.py index c94dcf64cf..c2d8dca8f3 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -65,8 +65,7 @@ class Pdu(JsonEncodedObject): "content", "outlier", "hashes", - "signatures", - "is_state", # Below this are keys valid only for State Pdus. + "signatures", # Below this are keys valid only for State Pdus. "state_key", "prev_state", "required_power_level", @@ -91,16 +90,10 @@ class Pdu(JsonEncodedObject): # TODO: We need to make this properly load content rather than # just leaving it as a dict. (OR DO WE?!) - def __init__(self, destinations=[], is_state=False, prev_events=[], + def __init__(self, destinations=[], prev_events=[], outlier=False, hashes={}, signatures={}, **kwargs): - if is_state: - for required_key in ["state_key"]: - if required_key not in kwargs: - raise RuntimeError("Key %s is required" % required_key) - super(Pdu, self).__init__( destinations=destinations, - is_state=bool(is_state), prev_events=prev_events, outlier=outlier, hashes=hashes, -- cgit 1.4.1 From aa76bf39aba2eadf506f57952a1dffce629f2637 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Nov 2014 14:14:02 +0000 Subject: Remove unused imports --- synapse/federation/persistence.py | 2 -- synapse/federation/units.py | 3 --- synapse/handlers/federation.py | 6 ++---- synapse/state.py | 2 -- synapse/storage/_base.py | 2 -- 5 files changed, 2 insertions(+), 13 deletions(-) (limited to 'synapse/federation/units.py') diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index b04fbb4177..73dc844d59 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -21,8 +21,6 @@ These actions are mostly only used by the :py:mod:`.replication` module. from twisted.internet import defer -from .units import Pdu - from synapse.util.logutils import log_function import json diff --git a/synapse/federation/units.py b/synapse/federation/units.py index c2d8dca8f3..9b25556707 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -18,11 +18,8 @@ server protocol. """ from synapse.util.jsonobject import JsonEncodedObject -from syutil.base64util import encode_base64 import logging -import json -import copy logger = logging.getLogger(__name__) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bdd28f04bb..49bfff88a4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -17,15 +17,13 @@ from ._base import BaseHandler -from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent +from synapse.api.events.room import RoomMemberEvent from synapse.api.constants import Membership from synapse.util.logutils import log_function from synapse.federation.pdu_codec import PduCodec -from synapse.api.errors import SynapseError from synapse.util.async import run_on_reactor -from synapse.types import EventID -from twisted.internet import defer, reactor +from twisted.internet import defer import logging diff --git a/synapse/state.py b/synapse/state.py index f4efc287c9..9771883bc3 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -19,8 +19,6 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor -from synapse.types import EventID - from collections import namedtuple import copy diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 464b12f032..5e00c23fd1 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -14,8 +14,6 @@ # limitations under the License. import logging -from twisted.internet import defer - from synapse.api.errors import StoreError from synapse.api.events.utils import prune_event from synapse.util.logutils import log_function -- cgit 1.4.1 From 440cbd5235e7e23dfe97d8e3d394cc0d35b35fd6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Nov 2014 14:17:55 +0000 Subject: Add support for sending failures --- synapse/federation/replication.py | 30 ++++++++++++++++++++++++++++-- synapse/federation/units.py | 1 + synapse/types.py | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 2 deletions(-) (limited to 'synapse/federation/units.py') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 99dd390a64..680e7322a6 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -143,6 +143,11 @@ class ReplicationLayer(object): self._transaction_queue.enqueue_edu(edu) return defer.succeed(None) + @log_function + def send_failure(self, failure, destination): + self._transaction_queue.enqueue_failure(failure, destination) + return defer.succeed(None) + @log_function def make_query(self, destination, query_type, args, retry_on_dns_fail=True): @@ -558,6 +563,9 @@ class _TransactionQueue(object): # destination -> list of tuple(edu, deferred) self.pending_edus_by_dest = {} + # destination -> list of tuple(failure, deferred) + self.pending_failures_by_dest = {} + # HACK to get unique tx id self._next_txn_id = int(self._clock.time_msec()) @@ -610,6 +618,18 @@ class _TransactionQueue(object): return deferred + @defer.inlineCallbacks + def enqueue_failure(self, failure, destination): + deferred = defer.Deferred() + + self.pending_failures_by_dest.setdefault( + destination, [] + ).append( + (failure, deferred) + ) + + yield deferred + @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): @@ -619,8 +639,9 @@ class _TransactionQueue(object): # list of (pending_pdu, deferred, order) pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_failures = self.pending_failures_by_dest(destination, []) - if not pending_pdus and not pending_edus: + if not pending_pdus and not pending_edus and not pending_failures: return logger.debug("TX [%s] Attempting new transaction", destination) @@ -630,7 +651,11 @@ class _TransactionQueue(object): pdus = [x[0] for x in pending_pdus] edus = [x[0] for x in pending_edus] - deferreds = [x[1] for x in pending_pdus + pending_edus] + failures = [x[0].get_dict() for x in pending_failures] + deferreds = [ + x[1] + for x in pending_pdus + pending_edus + pending_failures + ] try: self.pending_transactions[destination] = 1 @@ -644,6 +669,7 @@ class _TransactionQueue(object): destination=destination, pdus=pdus, edus=edus, + pdu_failures=failures, ) self._next_txn_id += 1 diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 9b25556707..2070ffe1e2 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -157,6 +157,7 @@ class Transaction(JsonEncodedObject): "edus", "transaction_id", "destination", + "pdu_failures", ] internal_keys = [ diff --git a/synapse/types.py b/synapse/types.py index 649ff2f7d7..8fac20fd2e 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -128,3 +128,37 @@ class StreamToken( d = self._asdict() d[key] = new_value return StreamToken(**d) + + +class FederationError(RuntimeError): + """ This class is used to inform remote home servers about erroneous + PDUs they sent us. + + FATAL: The remote server could not interpret the source event. + (e.g., it was missing a required field) + ERROR: The remote server interpreted the event, but it failed some other + check (e.g. auth) + WARN: The remote server accepted the event, but believes some part of it + is wrong (e.g., it referred to an invalid event) + """ + + def __init__(self, level, code, reason, affected, source=None): + if level not in ["FATAL", "ERROR", "WARN"]: + raise ValueError("Level is not valid: %s" % (level,)) + self.level = level + self.code = code + self.reason = reason + self.affected = affected + self.source = source + + msg = "%s %s: %s" % (level, code, reason,) + super(FederationError, self).__init__(msg) + + def get_dict(self): + return { + "level": self.level, + "code": self.code, + "reason": self.reason, + "affected": self.affected, + "source": self.source if self.source else self.affected, + } -- cgit 1.4.1 From 1c06806f90a6368cdc3b9fa3b9053021b7c40e94 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Nov 2014 10:21:32 +0000 Subject: Finish redaction algorithm. --- synapse/api/events/__init__.py | 4 ++-- synapse/api/events/utils.py | 39 ++++++++++++++++++++++++++------------- synapse/crypto/event_signing.py | 7 ++----- synapse/federation/units.py | 6 ++---- synapse/storage/_base.py | 2 +- 5 files changed, 33 insertions(+), 25 deletions(-) (limited to 'synapse/federation/units.py') diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index 8d65c29ac1..f1e53f23ab 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -86,8 +86,8 @@ class SynapseEvent(JsonEncodedObject): def __init__(self, raises=True, **kwargs): super(SynapseEvent, self).__init__(**kwargs) - if "content" in kwargs: - self.check_json(self.content, raises=raises) + # if "content" in kwargs: + # self.check_json(self.content, raises=raises) def get_content_template(self): """ Retrieve the JSON template for this event as a dict. diff --git a/synapse/api/events/utils.py b/synapse/api/events/utils.py index 5fc79105b5..802648f8f7 100644 --- a/synapse/api/events/utils.py +++ b/synapse/api/events/utils.py @@ -18,24 +18,31 @@ from .room import ( RoomAliasesEvent, RoomCreateEvent, ) + def prune_event(event): - """ Prunes the given event of all keys we don't know about or think could - potentially be dodgy. + """ Returns a pruned version of the given event, which removes all keys we + don't know about or think could potentially be dodgy. This is used when we "redact" an event. We want to remove all fields that the user has specified, but we do want to keep necessary information like type, state_key etc. """ - return _prune_event_or_pdu(event.type, event) - -def prune_pdu(pdu): - """Removes keys that contain unrestricted and non-essential data from a PDU - """ - return _prune_event_or_pdu(pdu.type, pdu) + event_type = event.type -def _prune_event_or_pdu(event_type, event): - # Remove all extraneous fields. - event.unrecognized_keys = {} + allowed_keys = [ + "event_id", + "user_id", + "room_id", + "hashes", + "signatures", + "content", + "type", + "state_key", + "depth", + "prev_events", + "prev_state", + "auth_events", + ] new_content = {} @@ -65,6 +72,12 @@ def _prune_event_or_pdu(event_type, event): elif event_type == RoomAliasesEvent.TYPE: add_fields("aliases") - event.content = new_content + allowed_fields = { + k: v + for k, v in event.get_full_dict().items() + if k in allowed_keys + } + + allowed_fields["content"] = new_content - return event + return type(event)(**allowed_fields) diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index 7d800615fe..056e8f6ca4 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -55,9 +55,7 @@ def _compute_content_hash(event, hash_algorithm): def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256): - # FIXME(erikj): GenericEvent! - tmp_event = GenericEvent(**event.get_full_dict()) - tmp_event = prune_event(tmp_event) + tmp_event = prune_event(event) event_json = tmp_event.get_dict() event_json.pop("signatures", None) event_json.pop("age_ts", None) @@ -68,8 +66,7 @@ def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256): def compute_event_signature(event, signature_name, signing_key): - tmp_event = copy.deepcopy(event) - tmp_event = prune_event(tmp_event) + tmp_event = prune_event(event) redact_json = tmp_event.get_full_dict() redact_json.pop("signatures", None) redact_json.pop("age_ts", None) diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 2070ffe1e2..d98014cac7 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -56,17 +56,15 @@ class Pdu(JsonEncodedObject): "origin_server_ts", "type", "destinations", - "transaction_id", "prev_events", "depth", "content", - "outlier", "hashes", + "user_id", + "auth_events", "signatures", # Below this are keys valid only for State Pdus. "state_key", "prev_state", - "required_power_level", - "user_id", ] internal_keys = [ diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 9aa404695d..3ab81a78d5 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -509,7 +509,7 @@ class SQLBaseStore(object): ) if del_evs: - prune_event(ev) + ev = prune_event(ev) ev.redacted_because = del_evs[0] return events -- cgit 1.4.1 From 5d439b127ba34b951dfd09a7d3c684c2d50df702 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Nov 2014 13:46:44 +0000 Subject: PEP8 --- synapse/api/auth.py | 3 +-- synapse/api/events/room.py | 1 + synapse/federation/replication.py | 1 - synapse/federation/transport.py | 9 ++++++--- synapse/federation/units.py | 7 +++---- synapse/handlers/federation.py | 5 ++++- synapse/storage/__init__.py | 7 ++++--- synapse/storage/event_federation.py | 9 +++------ 8 files changed, 22 insertions(+), 20 deletions(-) (limited to 'synapse/federation/units.py') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 48f9d460a3..a5c6964707 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -369,7 +369,6 @@ class Auth(object): ] event.auth_events = zip(auth_events, hashes) - @log_function def _can_send_event(self, event): key = (RoomPowerLevelsEvent.TYPE, "", ) @@ -452,7 +451,7 @@ class Auth(object): event.user_id, ) - # Check other levels: + # Check other levels: levels_to_check = [ ("users_default", []), ("events_default", []), diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py index 25bc883706..8c4ac45d02 100644 --- a/synapse/api/events/room.py +++ b/synapse/api/events/room.py @@ -153,6 +153,7 @@ class RoomPowerLevelsEvent(SynapseStateEvent): def get_content_template(self): return {} + class RoomAliasesEvent(SynapseStateEvent): TYPE = "m.room.aliases" diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index e798304353..bacba36755 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -549,7 +549,6 @@ class ReplicationLayer(object): origin, pdu.room_id, pdu.event_id, ) - if not backfilled: ret = yield self.handler.on_receive_pdu( pdu, diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index d84a44c211..95c40c6c1b 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -284,7 +284,7 @@ class TransportLayer(object): origin = None if request.method == "PUT": - #TODO: Handle other method types? other content types? + # TODO: Handle other method types? other content types? try: content_bytes = request.content.read() content = json.loads(content_bytes) @@ -296,11 +296,13 @@ class TransportLayer(object): try: params = auth.split(" ")[1].split(",") param_dict = dict(kv.split("=") for kv in params) + def strip_quotes(value): if value.startswith("\""): return value[1:-1] else: return value + origin = strip_quotes(param_dict["origin"]) key = strip_quotes(param_dict["key"]) sig = strip_quotes(param_dict["sig"]) @@ -321,7 +323,7 @@ class TransportLayer(object): if auth.startswith("X-Matrix"): (origin, key, sig) = parse_auth_header(auth) json_request["origin"] = origin - json_request["signatures"].setdefault(origin,{})[key] = sig + json_request["signatures"].setdefault(origin, {})[key] = sig if not json_request["signatures"]: raise SynapseError( @@ -515,7 +517,8 @@ class TransportLayer(object): return try: - code, response = yield self.received_handler.on_incoming_transaction( + handler = self.received_handler + code, response = yield handler.on_incoming_transaction( transaction_data ) except: diff --git a/synapse/federation/units.py b/synapse/federation/units.py index d98014cac7..f4e7b62bd9 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -192,7 +192,9 @@ class Transaction(JsonEncodedObject): transaction_id and origin_server_ts keys. """ if "origin_server_ts" not in kwargs: - raise KeyError("Require 'origin_server_ts' to construct a Transaction") + raise KeyError( + "Require 'origin_server_ts' to construct a Transaction" + ) if "transaction_id" not in kwargs: raise KeyError( "Require 'transaction_id' to construct a Transaction" @@ -204,6 +206,3 @@ class Transaction(JsonEncodedObject): kwargs["pdus"] = [p.get_dict() for p in pdus] return Transaction(**kwargs) - - - diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 587fa308c8..e909af6bd8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -521,6 +521,9 @@ class FederationHandler(BaseHandler): @log_function def _on_user_joined(self, user, room_id): - waiters = self.waiting_for_join_list.get((user.to_string(), room_id), []) + waiters = self.waiting_for_join_list.get( + (user.to_string(), room_id), + [] + ) while waiters: waiters.pop().callback(None) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 7d810e6a62..4034437f6b 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -494,11 +494,13 @@ def prepare_database(db_conn): user_version = row[0] if user_version > SCHEMA_VERSION: - raise ValueError("Cannot use this database as it is too " + + raise ValueError( + "Cannot use this database as it is too " + "new for the server to understand" ) elif user_version < SCHEMA_VERSION: - logging.info("Upgrading database from version %d", + logging.info( + "Upgrading database from version %d", user_version ) @@ -520,4 +522,3 @@ def prepare_database(db_conn): c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION) c.close() - diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index a707030145..a027db3868 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -215,7 +215,7 @@ class EventFederationStore(SQLBaseStore): min_depth = self._simple_select_one_onecol_txn( txn, table="room_depth", - keyvalues={"room_id": room_id,}, + keyvalues={"room_id": room_id}, retcol="min_depth", allow_none=True, ) @@ -267,10 +267,8 @@ class EventFederationStore(SQLBaseStore): } ) - - - # We only insert as a forward extremity the new pdu if there are no - # other pdus that reference it as a prev pdu + # We only insert as a forward extremity the new pdu if there are + # no other pdus that reference it as a prev pdu query = ( "INSERT OR IGNORE INTO %(table)s (event_id, room_id) " "SELECT ?, ? WHERE NOT EXISTS (" @@ -312,7 +310,6 @@ class EventFederationStore(SQLBaseStore): ) txn.execute(query) - def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occured before (and including) the pdus in pdu_list. Return a list of max size `limit`. -- cgit 1.4.1 From 8d8a133c89925086452bdec9739db589f0715363 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 13 Nov 2014 15:48:51 +0000 Subject: SYN-103: Remove "origin" and "destination" keys from edus --- synapse/federation/units.py | 9 ++++----- tests/federation/test_federation.py | 3 --- tests/handlers/test_presence.py | 3 --- tests/handlers/test_typing.py | 3 --- 4 files changed, 4 insertions(+), 14 deletions(-) (limited to 'synapse/federation/units.py') diff --git a/synapse/federation/units.py b/synapse/federation/units.py index f4e7b62bd9..70412439cd 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -122,11 +122,10 @@ class Edu(JsonEncodedObject): "edu_type", ] -# TODO: SYN-103: Remove "origin" and "destination" keys. -# internal_keys = [ -# "origin", -# "destination", -# ] + internal_keys = [ + "origin", + "destination", + ] class Transaction(JsonEncodedObject): diff --git a/tests/federation/test_federation.py b/tests/federation/test_federation.py index eb329eec50..ad09fab392 100644 --- a/tests/federation/test_federation.py +++ b/tests/federation/test_federation.py @@ -218,9 +218,6 @@ class FederationTestCase(unittest.TestCase): "pdus": [], "edus": [ { - # TODO: SYN-103: Remove "origin" and "destination" - "origin": "test", - "destination": "remote", "edu_type": "m.test", "content": {"testing": "content here"}, } diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index a6af648def..cdaf93429b 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -44,9 +44,6 @@ def _expect_edu(destination, edu_type, content, origin="test"): "pdus": [], "edus": [ { - # TODO: SYN-103: Remove "origin" and "destination" keys. - "origin": origin, - "destination": destination, "edu_type": edu_type, "content": content, } diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 07acda5eee..adb5148351 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -33,9 +33,6 @@ def _expect_edu(destination, edu_type, content, origin="test"): "pdus": [], "edus": [ { - # TODO: SYN-103: Remove "origin" and "destination" keys. - "origin": origin, - "destination": destination, "edu_type": edu_type, "content": content, } -- cgit 1.4.1 From cb4b6c844a0c9e2d4a96165958ff5680ed82e160 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 14 Nov 2014 21:25:02 +0000 Subject: Merge PDUs and Events into one object --- synapse/api/events/__init__.py | 6 +++ synapse/api/events/utils.py | 2 + synapse/crypto/event_signing.py | 15 ++----- synapse/federation/pdu_codec.py | 54 ------------------------- synapse/federation/replication.py | 61 +++++++++++++++++++--------- synapse/federation/units.py | 79 +------------------------------------ synapse/handlers/federation.py | 65 +++++++++++------------------- synapse/storage/_base.py | 8 ++++ tests/federation/test_federation.py | 8 ++-- tests/handlers/test_federation.py | 5 ++- 10 files changed, 91 insertions(+), 212 deletions(-) delete mode 100644 synapse/federation/pdu_codec.py (limited to 'synapse/federation/units.py') diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index 1d8bed2906..63c0bd7ae7 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -117,6 +117,12 @@ class SynapseEvent(JsonEncodedObject): """ raise NotImplementedError("get_content_template not implemented.") + def get_pdu_json(self): + pdu_json = self.get_full_dict() + pdu_json.pop("destination", None) + pdu_json.pop("outlier", None) + return pdu_json + class SynapseStateEvent(SynapseEvent): diff --git a/synapse/api/events/utils.py b/synapse/api/events/utils.py index 802648f8f7..d6019d56eb 100644 --- a/synapse/api/events/utils.py +++ b/synapse/api/events/utils.py @@ -42,6 +42,8 @@ def prune_event(event): "prev_events", "prev_state", "auth_events", + "origin", + "origin_server_ts", ] new_content = {} diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index 79274fd552..4dff2c0ec2 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -16,7 +16,6 @@ from synapse.api.events.utils import prune_event -from synapse.federation.units import Pdu from syutil.jsonutil import encode_canonical_json from syutil.base64util import encode_base64, decode_base64 from syutil.crypto.jsonsign import sign_json @@ -53,8 +52,7 @@ def check_event_content_hash(event, hash_algorithm=hashlib.sha256): def _compute_content_hash(event, hash_algorithm): - event_json = event.get_full_dict() - # TODO: We need to sign the JSON that is going out via fedaration. + event_json = event.get_pdu_json() event_json.pop("age_ts", None) event_json.pop("unsigned", None) event_json.pop("signatures", None) @@ -67,7 +65,7 @@ def _compute_content_hash(event, hash_algorithm): def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256): tmp_event = prune_event(event) - event_json = tmp_event.get_dict() + event_json = tmp_event.get_pdu_json() event_json.pop("signatures", None) event_json.pop("age_ts", None) event_json.pop("unsigned", None) @@ -78,14 +76,7 @@ def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256): def compute_event_signature(event, signature_name, signing_key): tmp_event = prune_event(event) - tmp_event.origin = event.origin - tmp_event.origin_server_ts = event.origin_server_ts - d = tmp_event.get_full_dict() - kwargs = dict(event.unrecognized_keys) - kwargs.update({k: v for k, v in d.items()}) - tmp_pdu = Pdu(**kwargs) - redact_json = tmp_pdu.get_dict() - redact_json.pop("signatures", None) + redact_json = tmp_event.get_pdu_json() redact_json.pop("age_ts", None) redact_json.pop("unsigned", None) logger.debug("Signing event: %s", redact_json) diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py deleted file mode 100644 index 52c84efb5b..0000000000 --- a/synapse/federation/pdu_codec.py +++ /dev/null @@ -1,54 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from .units import Pdu - -import copy - - -class PduCodec(object): - - def __init__(self, hs): - self.signing_key = hs.config.signing_key[0] - self.server_name = hs.hostname - self.event_factory = hs.get_event_factory() - self.clock = hs.get_clock() - self.hs = hs - - def event_from_pdu(self, pdu): - kwargs = {} - - kwargs["etype"] = pdu.type - - kwargs.update({ - k: v - for k, v in pdu.get_full_dict().items() - if k not in [ - "type", - ] - }) - - return self.event_factory.create_event(**kwargs) - - def pdu_from_event(self, event): - d = event.get_full_dict() - - kwargs = copy.deepcopy(event.unrecognized_keys) - kwargs.update({ - k: v for k, v in d.items() - }) - - pdu = Pdu(**kwargs) - return pdu diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index a07e307849..8ee74de005 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -19,7 +19,7 @@ a given transport. from twisted.internet import defer -from .units import Transaction, Pdu, Edu +from .units import Transaction, Edu from .persistence import TransactionActions @@ -72,6 +72,8 @@ class ReplicationLayer(object): self._clock = hs.get_clock() + self.event_factory = hs.get_event_factory() + def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate receipt of new PDUs from other home servers. The required methods are @@ -203,7 +205,10 @@ class ReplicationLayer(object): transaction = Transaction(**transaction_data) - pdus = [Pdu(outlier=False, **p) for p in transaction.pdus] + pdus = [ + self.event_from_pdu_json(p, outlier=False) + for p in transaction.pdus + ] for pdu in pdus: yield self._handle_new_pdu(dest, pdu, backfilled=True) @@ -235,7 +240,10 @@ class ReplicationLayer(object): transaction = Transaction(**transaction_data) - pdu_list = [Pdu(outlier=outlier, **p) for p in transaction.pdus] + pdu_list = [ + self.event_from_pdu_json(p, outlier=outlier) + for p in transaction.pdus + ] pdu = None if pdu_list: @@ -265,8 +273,10 @@ class ReplicationLayer(object): ) transaction = Transaction(**transaction_data) - - pdus = [Pdu(outlier=True, **p) for p in transaction.pdus] + pdus = [ + self.event_from_pdu_json(p, outlier=True) + for p in transaction.pdus + ] defer.returnValue(pdus) @@ -293,7 +303,9 @@ class ReplicationLayer(object): p["age_ts"] = int(self._clock.time_msec()) - int(p["age"]) del p["age"] - pdu_list = [Pdu(**p) for p in transaction.pdus] + pdu_list = [ + self.event_from_pdu_json(p) for p in transaction.pdus + ] logger.debug("[%s] Got transaction", transaction.transaction_id) @@ -388,30 +400,30 @@ class ReplicationLayer(object): def on_make_join_request(self, context, user_id): pdu = yield self.handler.on_make_join_request(context, user_id) defer.returnValue({ - "event": pdu.get_dict(), + "event": pdu.get_pdu_json(), }) @defer.inlineCallbacks def on_invite_request(self, origin, content): - pdu = Pdu(**content) + pdu = self.event_from_pdu_json(content) ret_pdu = yield self.handler.on_invite_request(origin, pdu) defer.returnValue( ( 200, { - "event": ret_pdu.get_dict(), + "event": ret_pdu.get_pdu_json(), } ) ) @defer.inlineCallbacks def on_send_join_request(self, origin, content): - pdu = Pdu(**content) + pdu = self.event_from_pdu_json(content) res_pdus = yield self.handler.on_send_join_request(origin, pdu) defer.returnValue((200, { - "state": [p.get_dict() for p in res_pdus["state"]], - "auth_chain": [p.get_dict() for p in res_pdus["auth_chain"]], + "state": [p.get_pdu_json() for p in res_pdus["state"]], + "auth_chain": [p.get_pdu_json() for p in res_pdus["auth_chain"]], })) @defer.inlineCallbacks @@ -421,7 +433,7 @@ class ReplicationLayer(object): ( 200, { - "auth_chain": [a.get_dict() for a in auth_pdus], + "auth_chain": [a.get_pdu_json() for a in auth_pdus], } ) ) @@ -438,7 +450,7 @@ class ReplicationLayer(object): logger.debug("Got response to make_join: %s", pdu_dict) - defer.returnValue(Pdu(**pdu_dict)) + defer.returnValue(self.event_from_pdu_json(pdu_dict)) @defer.inlineCallbacks def send_join(self, destination, pdu): @@ -446,12 +458,15 @@ class ReplicationLayer(object): destination, pdu.room_id, pdu.event_id, - pdu.get_dict(), + pdu.get_pdu_json(), ) logger.debug("Got content: %s", content) - state = [Pdu(outlier=True, **p) for p in content.get("state", [])] + state = [ + self.event_from_pdu_json(p, outlier=True) + for p in content.get("state", []) + ] # FIXME: We probably want to do something with the auth_chain given # to us @@ -468,14 +483,14 @@ class ReplicationLayer(object): destination=destination, context=context, event_id=event_id, - content=pdu.get_dict(), + content=pdu.get_pdu_json(), ) pdu_dict = content["event"] logger.debug("Got response to send_invite: %s", pdu_dict) - defer.returnValue(Pdu(**pdu_dict)) + defer.returnValue(self.event_from_pdu_json(pdu_dict)) @log_function def _get_persisted_pdu(self, origin, event_id): @@ -490,7 +505,7 @@ class ReplicationLayer(object): """Returns a new Transaction containing the given PDUs suitable for transmission. """ - pdus = [p.get_dict() for p in pdu_list] + pdus = [p.get_pdu_json() for p in pdu_list] time_now = self._clock.time_msec() for p in pdus: if "age_ts" in p: @@ -563,6 +578,14 @@ class ReplicationLayer(object): def __str__(self): return "" % self.server_name + def event_from_pdu_json(self, pdu_json, outlier=False): + #TODO: Check we have all the PDU keys here + pdu_json.setdefault("hashes", {}) + pdu_json.setdefault("signatures", {}) + return self.event_factory.create_event( + pdu_json["type"], outlier=outlier, **pdu_json + ) + class _TransactionQueue(object): """This class makes sure we only have one transaction in flight at diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 70412439cd..6e708edb8c 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -25,83 +25,6 @@ import logging logger = logging.getLogger(__name__) -class Pdu(JsonEncodedObject): - """ A Pdu represents a piece of data sent from a server and is associated - with a context. - - A Pdu can be classified as "state". For a given context, we can efficiently - retrieve all state pdu's that haven't been clobbered. Clobbering is done - via a unique constraint on the tuple (context, type, state_key). A pdu - is a state pdu if `is_state` is True. - - Example pdu:: - - { - "event_id": "$78c:example.com", - "origin_server_ts": 1404835423000, - "origin": "bar", - "prev_ids": [ - ["23b", "foo"], - ["56a", "bar"], - ], - "content": { ... }, - } - - """ - - valid_keys = [ - "event_id", - "room_id", - "origin", - "origin_server_ts", - "type", - "destinations", - "prev_events", - "depth", - "content", - "hashes", - "user_id", - "auth_events", - "signatures", # Below this are keys valid only for State Pdus. - "state_key", - "prev_state", - ] - - internal_keys = [ - "destinations", - "transaction_id", - "outlier", - ] - - required_keys = [ - "event_id", - "room_id", - "origin", - "origin_server_ts", - "type", - "content", - ] - - # TODO: We need to make this properly load content rather than - # just leaving it as a dict. (OR DO WE?!) - - def __init__(self, destinations=[], prev_events=[], - outlier=False, hashes={}, signatures={}, **kwargs): - super(Pdu, self).__init__( - destinations=destinations, - prev_events=prev_events, - outlier=outlier, - hashes=hashes, - signatures=signatures, - **kwargs - ) - - def __str__(self): - return "(%s, %s)" % (self.__class__.__name__, repr(self.__dict__)) - - def __repr__(self): - return "<%s, %s>" % (self.__class__.__name__, repr(self.__dict__)) - class Edu(JsonEncodedObject): """ An Edu represents a piece of data sent from one homeserver to another. @@ -202,6 +125,6 @@ class Transaction(JsonEncodedObject): for p in pdus: p.transaction_id = kwargs["transaction_id"] - kwargs["pdus"] = [p.get_dict() for p in pdus] + kwargs["pdus"] = [p.get_pdu_json() for p in pdus] return Transaction(**kwargs) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fc00128c56..da38f34e6a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -22,7 +22,6 @@ from synapse.api.errors import AuthError, FederationError, SynapseError from synapse.api.events.room import RoomMemberEvent from synapse.api.constants import Membership from synapse.util.logutils import log_function -from synapse.federation.pdu_codec import PduCodec from synapse.util.async import run_on_reactor from synapse.crypto.event_signing import ( compute_event_signature, check_event_content_hash @@ -69,8 +68,6 @@ class FederationHandler(BaseHandler): self.replication_layer.set_handler(self) - self.pdu_codec = PduCodec(hs) - # When joining a room we need to queue any events for that room up self.room_queues = {} @@ -92,7 +89,7 @@ class FederationHandler(BaseHandler): yield run_on_reactor() - pdu = self.pdu_codec.pdu_from_event(event) + pdu = event if not hasattr(pdu, "destinations") or not pdu.destinations: pdu.destinations = [] @@ -105,7 +102,7 @@ class FederationHandler(BaseHandler): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it through the StateHandler. """ - event = self.pdu_codec.event_from_pdu(pdu) + event = pdu logger.debug("Got event: %s", event.event_id) @@ -118,18 +115,15 @@ class FederationHandler(BaseHandler): logger.debug("Processing event: %s", event.event_id) redacted_event = prune_event(event) - redacted_event.origin = pdu.origin - redacted_event.origin_server_ts = pdu.origin_server_ts - redacted_pdu = self.pdu_codec.pdu_from_event(redacted_event) - redacted_pdu_json = redacted_pdu.get_dict() + redacted_pdu_json = redacted_event.get_pdu_json() try: yield self.keyring.verify_json_for_server( event.origin, redacted_pdu_json ) except SynapseError as e: logger.warn("Signature check failed for %s redacted to %s", - encode_canonical_json(pdu.get_dict()), + encode_canonical_json(pdu.get_pdu_json()), encode_canonical_json(redacted_pdu_json), ) raise FederationError( @@ -147,7 +141,7 @@ class FederationHandler(BaseHandler): event = redacted_event if state: - state = [self.pdu_codec.event_from_pdu(p) for p in state] + state = [p for p in state] is_new_state = yield self.state_handler.annotate_event_with_state( event, @@ -239,7 +233,7 @@ class FederationHandler(BaseHandler): events = [] for pdu in pdus: - event = self.pdu_codec.event_from_pdu(pdu) + event = pdu # FIXME (erikj): Not sure this actually works :/ yield self.state_handler.annotate_event_with_state(event) @@ -260,15 +254,15 @@ class FederationHandler(BaseHandler): destination=target_host, context=event.room_id, event_id=event.event_id, - pdu=self.pdu_codec.pdu_from_event(event) + pdu=event ) - defer.returnValue(self.pdu_codec.event_from_pdu(pdu)) + defer.returnValue(pdu) @defer.inlineCallbacks def on_event_auth(self, event_id): auth = yield self.store.get_auth_chain(event_id) - defer.returnValue([self.pdu_codec.pdu_from_event(e) for e in auth]) + defer.returnValue([e for e in auth]) @log_function @defer.inlineCallbacks @@ -292,7 +286,7 @@ class FederationHandler(BaseHandler): logger.debug("Got response to make_join: %s", pdu) - event = self.pdu_codec.event_from_pdu(pdu) + event = pdu # We should assert some things. assert(event.type == RoomMemberEvent.TYPE) @@ -310,10 +304,10 @@ class FederationHandler(BaseHandler): state = yield self.replication_layer.send_join( target_host, - self.pdu_codec.pdu_from_event(event) + event ) - state = [self.pdu_codec.event_from_pdu(p) for p in state] + state = [p for p in state] logger.debug("do_invite_join state: %s", state) @@ -387,7 +381,7 @@ class FederationHandler(BaseHandler): yield self.auth.add_auth_events(event) self.auth.check(event, raises=True) - pdu = self.pdu_codec.pdu_from_event(event) + pdu = event defer.returnValue(pdu) @@ -397,7 +391,7 @@ class FederationHandler(BaseHandler): """ We have received a join event for a room. Fully process it and respond with the current state and auth chains. """ - event = self.pdu_codec.event_from_pdu(pdu) + event = pdu event.outlier = False @@ -429,7 +423,7 @@ class FederationHandler(BaseHandler): "user_joined_room", user=user, room_id=event.room_id ) - new_pdu = self.pdu_codec.pdu_from_event(event) + new_pdu = event destinations = set() @@ -450,17 +444,10 @@ class FederationHandler(BaseHandler): yield self.replication_layer.send_pdu(new_pdu) auth_chain = yield self.store.get_auth_chain(event.event_id) - pdu_auth_chain = [ - self.pdu_codec.pdu_from_event(e) - for e in auth_chain - ] defer.returnValue({ - "state": [ - self.pdu_codec.pdu_from_event(e) - for e in event.state_events.values() - ], - "auth_chain": pdu_auth_chain, + "state": event.state_events.values(), + "auth_chain": auth_chain, }) @defer.inlineCallbacks @@ -469,7 +456,7 @@ class FederationHandler(BaseHandler): Respond with the now signed event. """ - event = self.pdu_codec.event_from_pdu(pdu) + event = pdu event.outlier = True @@ -493,7 +480,7 @@ class FederationHandler(BaseHandler): event, extra_users=[target_user], ) - defer.returnValue(self.pdu_codec.pdu_from_event(event)) + defer.returnValue(event) @defer.inlineCallbacks def get_state_for_pdu(self, origin, room_id, event_id): @@ -524,12 +511,7 @@ class FederationHandler(BaseHandler): else: del results[(event.type, event.state_key)] - defer.returnValue( - [ - self.pdu_codec.pdu_from_event(s) - for s in results.values() - ] - ) + defer.returnValue(results.values()) else: defer.returnValue([]) @@ -546,10 +528,7 @@ class FederationHandler(BaseHandler): limit ) - defer.returnValue([ - self.pdu_codec.pdu_from_event(e) - for e in events - ]) + defer.returnValue(events) @defer.inlineCallbacks @log_function @@ -572,7 +551,7 @@ class FederationHandler(BaseHandler): if not in_room: raise AuthError(403, "Host not in room.") - defer.returnValue(self.pdu_codec.pdu_from_event(event)) + defer.returnValue(event) else: defer.returnValue(None) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 30e6eac8db..5d4be09a82 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -492,6 +492,14 @@ class SQLBaseStore(object): for n, s in signatures.items() } + hashes = self._get_event_content_hashes_txn( + txn, ev.event_id, + ) + + ev.hashes = { + k: encode_base64(v) for k, v in hashes.items() + } + prevs = self._get_prev_events_and_state(txn, ev.event_id) ev.prev_events = [ diff --git a/tests/federation/test_federation.py b/tests/federation/test_federation.py index ad09fab392..efac4075dc 100644 --- a/tests/federation/test_federation.py +++ b/tests/federation/test_federation.py @@ -23,7 +23,7 @@ from ..utils import MockHttpResource, MockClock, MockKey from synapse.server import HomeServer from synapse.federation import initialize_http_replication -from synapse.federation.units import Pdu +from synapse.api.events import SynapseEvent def make_pdu(prev_pdus=[], **kwargs): @@ -40,7 +40,7 @@ def make_pdu(prev_pdus=[], **kwargs): } pdu_fields.update(kwargs) - return Pdu(prev_pdus=prev_pdus, **pdu_fields) + return SynapseEvent(prev_pdus=prev_pdus, **pdu_fields) class FederationTestCase(unittest.TestCase): @@ -169,7 +169,7 @@ class FederationTestCase(unittest.TestCase): (200, "OK") ) - pdu = Pdu( + pdu = SynapseEvent( event_id="abc123def456", origin="red", room_id="my-context", @@ -189,7 +189,7 @@ class FederationTestCase(unittest.TestCase): "origin_server_ts": 1000000, "origin": "test", "pdus": [ - pdu.get_dict(), + pdu.get_pdu_json(), ], 'pdu_failures': [], }, diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 3f17ca8fb0..e19b073817 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -19,9 +19,10 @@ from tests import unittest from synapse.api.events.room import ( MessageEvent, ) + +from synapse.api.events import SynapseEvent from synapse.handlers.federation import FederationHandler from synapse.server import HomeServer -from synapse.federation.units import Pdu from mock import NonCallableMock, ANY, Mock @@ -74,7 +75,7 @@ class FederationTestCase(unittest.TestCase): @defer.inlineCallbacks def test_msg(self): - pdu = Pdu( + pdu = SynapseEvent( type=MessageEvent.TYPE, room_id="foo", content={"msgtype": u"fooo"}, -- cgit 1.4.1