From 1c445f88f64beabf0bd9bec3950a4a4c0d529e8a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 15 Oct 2014 17:09:04 +0100 Subject: persist hashes and origin signatures for PDUs --- synapse/federation/units.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/units.py b/synapse/federation/units.py index d97aeb698e..3518efb215 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -18,6 +18,7 @@ server protocol. """ from synapse.util.jsonobject import JsonEncodedObject +from syutil.base64util import encode_base64 import logging import json @@ -63,6 +64,8 @@ class Pdu(JsonEncodedObject): "depth", "content", "outlier", + "hashes", + "signatures", "is_state", # Below this are keys valid only for State Pdus. "state_key", "power_level", @@ -91,7 +94,7 @@ class Pdu(JsonEncodedObject): # just leaving it as a dict. (OR DO WE?!) def __init__(self, destinations=[], is_state=False, prev_pdus=[], - outlier=False, **kwargs): + outlier=False, hashes={}, signatures={}, **kwargs): if is_state: for required_key in ["state_key"]: if required_key not in kwargs: @@ -102,6 +105,8 @@ class Pdu(JsonEncodedObject): is_state=is_state, prev_pdus=prev_pdus, outlier=outlier, + hashes=hashes, + signatures=signatures, **kwargs ) @@ -126,6 +131,16 @@ class Pdu(JsonEncodedObject): if "unrecognized_keys" in d and d["unrecognized_keys"]: args.update(json.loads(d["unrecognized_keys"])) + hashes = { + alg: encode_base64(hsh) + for alg, hsh in pdu_tuple.hashes.items() + } + + signatures = { + kid: encode_base64(sig) + for kid, sig in pdu_tuple.signatures.items() + } + return Pdu( prev_pdus=pdu_tuple.prev_pdu_list, **args -- cgit 1.4.1 From 66104da10c4191aa1e048f2379190574755109e6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 16 Oct 2014 00:09:48 +0100 Subject: Sign outgoing PDUs. --- synapse/crypto/event_signing.py | 4 ++-- synapse/federation/pdu_codec.py | 6 +++++- synapse/storage/__init__.py | 7 ++++--- synapse/storage/signatures.py | 6 +++--- tests/federation/test_pdu_codec.py | 13 ++++++++++--- tests/rest/test_events.py | 7 +++++-- tests/rest/test_profile.py | 8 ++++++-- tests/rest/test_rooms.py | 32 +++++++++++++++++++++++++------- tests/utils.py | 3 ++- 9 files changed, 62 insertions(+), 24 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index 6557727e06..a115967c0a 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -15,6 +15,7 @@ # limitations under the License. +from synapse.federation.units import Pdu from synapse.api.events.utils import prune_pdu from syutil.jsonutil import encode_canonical_json from syutil.base64util import encode_base64, decode_base64 @@ -25,8 +26,7 @@ import hashlib def hash_event_pdu(pdu, hash_algortithm=hashlib.sha256): hashed = _compute_hash(pdu, hash_algortithm) - hashes[hashed.name] = encode_base64(hashed.digest()) - pdu.hashes = hashes + pdu.hashes[hashed.name] = encode_base64(hashed.digest()) return pdu diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index cef61108dd..bcac5f9ae8 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -14,6 +14,7 @@ # limitations under the License. from .units import Pdu +from synapse.crypto.event_signing import hash_event_pdu, sign_event_pdu import copy @@ -33,6 +34,7 @@ def encode_event_id(pdu_id, origin): class PduCodec(object): def __init__(self, hs): + self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname self.event_factory = hs.get_event_factory() self.clock = hs.get_clock() @@ -99,4 +101,6 @@ class PduCodec(object): if "ts" not in kwargs: kwargs["ts"] = int(self.clock.time_msec()) - return Pdu(**kwargs) + pdu = Pdu(**kwargs) + pdu = hash_event_pdu(pdu) + return sign_event_pdu(pdu, self.server_name, self.signing_key) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index bfeab7d1e8..b2a3f0b56c 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -42,6 +42,7 @@ from .transactions import TransactionStore from .keys import KeyStore from .signatures import SignatureStore +from syutil.base64util import decode_base64 import json import logging @@ -168,11 +169,11 @@ class DataStore(RoomMemberStore, RoomStore, txn, pdu.pdu_id, pdu.origin, hash_alg, hash_bytes, ) - signatures = pdu.sigatures.get(pdu.orgin, {}) + signatures = pdu.signatures.get(pdu.origin, {}) - for key_id, signature_base64 in signatures: + for key_id, signature_base64 in signatures.items(): signature_bytes = decode_base64(signature_base64) - self.store_pdu_origin_signatures_txn( + self._store_pdu_origin_signature_txn( txn, pdu.pdu_id, pdu.origin, key_id, signature_bytes, ) diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index bb860f09f0..1f0a680500 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -47,7 +47,7 @@ class SignatureStore(SQLBaseStore): algorithm (str): Hashing algorithm. hash_bytes (bytes): Hash function output bytes. """ - self._simple_insert_txn(self, txn, "pdu_hashes", { + self._simple_insert_txn(txn, "pdu_hashes", { "pdu_id": pdu_id, "origin": origin, "algorithm": algorithm, @@ -66,7 +66,7 @@ class SignatureStore(SQLBaseStore): query = ( "SELECT key_id, signature" " FROM pdu_origin_signatures" - " WHERE WHERE pdu_id = ? and origin = ?" + " WHERE pdu_id = ? and origin = ?" ) txn.execute(query, (pdu_id, origin)) return dict(txn.fetchall()) @@ -81,7 +81,7 @@ class SignatureStore(SQLBaseStore): key_id (str): Id for the signing key. signature (bytes): The signature. """ - self._simple_insert_txn(self, txn, "pdu_origin_signatures", { + self._simple_insert_txn(txn, "pdu_origin_signatures", { "pdu_id": pdu_id, "origin": origin, "key_id": key_id, diff --git a/tests/federation/test_pdu_codec.py b/tests/federation/test_pdu_codec.py index 344e1baf60..80851a4258 100644 --- a/tests/federation/test_pdu_codec.py +++ b/tests/federation/test_pdu_codec.py @@ -23,14 +23,21 @@ from synapse.federation.units import Pdu from synapse.server import HomeServer -from mock import Mock +from mock import Mock, NonCallableMock + +from ..utils import MockKey class PduCodecTestCase(unittest.TestCase): def setUp(self): - self.hs = HomeServer("blargle.net") - self.event_factory = self.hs.get_event_factory() + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + self.hs = HomeServer( + "blargle.net", + config=self.mock_config, + ) + self.event_factory = self.hs.get_event_factory() self.codec = PduCodec(self.hs) def test_decode_event_id(self): diff --git a/tests/rest/test_events.py b/tests/rest/test_events.py index 79b371c04d..362c7bc01c 100644 --- a/tests/rest/test_events.py +++ b/tests/rest/test_events.py @@ -28,7 +28,7 @@ from synapse.server import HomeServer # python imports import json -from ..utils import MockHttpResource, MemoryDataStore +from ..utils import MockHttpResource, MemoryDataStore, MockKey from .utils import RestTestCase from mock import Mock, NonCallableMock @@ -122,6 +122,9 @@ class EventStreamPermissionsTestCase(RestTestCase): persistence_service = Mock(spec=["get_latest_pdus_in_context"]) persistence_service.get_latest_pdus_in_context.return_value = [] + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + hs = HomeServer( "test", db_pool=None, @@ -139,7 +142,7 @@ class EventStreamPermissionsTestCase(RestTestCase): ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), - config=NonCallableMock(), + config=self.mock_config, ) self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) diff --git a/tests/rest/test_profile.py b/tests/rest/test_profile.py index b0f48e7fd8..3a0d1e700a 100644 --- a/tests/rest/test_profile.py +++ b/tests/rest/test_profile.py @@ -18,9 +18,9 @@ from tests import unittest from twisted.internet import defer -from mock import Mock +from mock import Mock, NonCallableMock -from ..utils import MockHttpResource +from ..utils import MockHttpResource, MockKey from synapse.api.errors import SynapseError, AuthError from synapse.server import HomeServer @@ -41,6 +41,9 @@ class ProfileTestCase(unittest.TestCase): "set_avatar_url", ]) + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + hs = HomeServer("test", db_pool=None, http_client=None, @@ -48,6 +51,7 @@ class ProfileTestCase(unittest.TestCase): federation=Mock(), replication_layer=Mock(), datastore=None, + config=self.mock_config, ) def _get_user_by_req(request=None): diff --git a/tests/rest/test_rooms.py b/tests/rest/test_rooms.py index 1ce9b8a83d..7170193051 100644 --- a/tests/rest/test_rooms.py +++ b/tests/rest/test_rooms.py @@ -27,7 +27,7 @@ from synapse.server import HomeServer import json import urllib -from ..utils import MockHttpResource, MemoryDataStore +from ..utils import MockHttpResource, MemoryDataStore, MockKey from .utils import RestTestCase from mock import Mock, NonCallableMock @@ -50,6 +50,9 @@ class RoomPermissionsTestCase(RestTestCase): persistence_service = Mock(spec=["get_latest_pdus_in_context"]) persistence_service.get_latest_pdus_in_context.return_value = [] + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + hs = HomeServer( "red", db_pool=None, @@ -61,7 +64,7 @@ class RoomPermissionsTestCase(RestTestCase): ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), - config=NonCallableMock(), + config=self.mock_config, ) self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) @@ -408,6 +411,9 @@ class RoomsMemberListTestCase(RestTestCase): persistence_service = Mock(spec=["get_latest_pdus_in_context"]) persistence_service.get_latest_pdus_in_context.return_value = [] + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + hs = HomeServer( "red", db_pool=None, @@ -419,7 +425,7 @@ class RoomsMemberListTestCase(RestTestCase): ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), - config=NonCallableMock(), + config=self.mock_config, ) self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) @@ -497,6 +503,9 @@ class RoomsCreateTestCase(RestTestCase): persistence_service = Mock(spec=["get_latest_pdus_in_context"]) persistence_service.get_latest_pdus_in_context.return_value = [] + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + hs = HomeServer( "red", db_pool=None, @@ -508,7 +517,7 @@ class RoomsCreateTestCase(RestTestCase): ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), - config=NonCallableMock(), + config=self.mock_config, ) self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) @@ -598,6 +607,9 @@ class RoomTopicTestCase(RestTestCase): persistence_service = Mock(spec=["get_latest_pdus_in_context"]) persistence_service.get_latest_pdus_in_context.return_value = [] + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + hs = HomeServer( "red", db_pool=None, @@ -609,7 +621,7 @@ class RoomTopicTestCase(RestTestCase): ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), - config=NonCallableMock(), + config=self.mock_config, ) self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) @@ -712,6 +724,9 @@ class RoomMemberStateTestCase(RestTestCase): persistence_service = Mock(spec=["get_latest_pdus_in_context"]) persistence_service.get_latest_pdus_in_context.return_value = [] + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + hs = HomeServer( "red", db_pool=None, @@ -723,7 +738,7 @@ class RoomMemberStateTestCase(RestTestCase): ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), - config=NonCallableMock(), + config=self.mock_config, ) self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) @@ -853,6 +868,9 @@ class RoomMessagesTestCase(RestTestCase): persistence_service = Mock(spec=["get_latest_pdus_in_context"]) persistence_service.get_latest_pdus_in_context.return_value = [] + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + hs = HomeServer( "red", db_pool=None, @@ -864,7 +882,7 @@ class RoomMessagesTestCase(RestTestCase): ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), - config=NonCallableMock(), + config=self.mock_config, ) self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) diff --git a/tests/utils.py b/tests/utils.py index 60fd6085ac..d8be73dba8 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -118,13 +118,14 @@ class MockHttpResource(HttpServer): class MockKey(object): alg = "mock_alg" version = "mock_version" + signature = b"\x9a\x87$" @property def verify_key(self): return self def sign(self, message): - return b"\x9a\x87$" + return self def verify(self, message, sig): assert sig == b"\x9a\x87$" -- cgit 1.4.1 From 1116f5330ec80533954026f67018e0db190cbae0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Oct 2014 16:56:51 +0100 Subject: Start implementing the invite/join dance. Continue moving auth to use event.state_events --- synapse/api/auth.py | 16 +++----- synapse/federation/replication.py | 22 +++++++++-- synapse/federation/transport.py | 34 +++++++++++++++- synapse/handlers/federation.py | 83 +++++++++++++++++++++++++++++++++++---- 4 files changed, 133 insertions(+), 22 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index d951cb265b..12ddef1b00 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -21,7 +21,7 @@ from synapse.api.constants import Membership, JoinRules from synapse.api.errors import AuthError, StoreError, Codes, SynapseError from synapse.api.events.room import ( RoomMemberEvent, RoomPowerLevelsEvent, RoomRedactionEvent, - RoomJoinRulesEvent, RoomOpsPowerLevelsEvent, + RoomJoinRulesEvent, RoomOpsPowerLevelsEvent, InviteJoinEvent, ) from synapse.util.logutils import log_function @@ -56,7 +56,8 @@ class Auth(object): defer.returnValue(allowed) return - self.check_event_sender_in_room(event) + if not event.type == InviteJoinEvent.TYPE: + self.check_event_sender_in_room(event) if is_state: # TODO (erikj): This really only should be called for *new* @@ -115,11 +116,6 @@ class Auth(object): def is_membership_change_allowed(self, event): target_user_id = event.state_key - # does this room even exist - room = yield self.store.get_room(event.room_id) - if not room: - raise AuthError(403, "Room does not exist") - # get info about the caller key = (RoomMemberEvent.TYPE, event.user_id, ) caller = event.old_state_events.get(key) @@ -170,7 +166,7 @@ class Auth(object): # joined: It's a NOOP if event.user_id != target_user_id: raise AuthError(403, "Cannot force another user to join.") - elif join_rule == JoinRules.PUBLIC or room.is_public: + elif join_rule == JoinRules.PUBLIC: pass elif join_rule == JoinRules.INVITE: if ( @@ -215,9 +211,9 @@ class Auth(object): power_level_event = event.old_state_events.get(key) level = None if power_level_event: - level = power_level_event.content[user_id] + level = power_level_event.content.get(user_id) if not level: - level = power_level_event.content["default"] + level = power_level_event.content.get("default", 0) return level diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 2346d55045..08c29dece5 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -393,9 +393,25 @@ class ReplicationLayer(object): response = yield self.query_handlers[query_type](args) defer.returnValue((200, response)) else: - defer.returnValue((404, "No handler for Query type '%s'" - % (query_type) - )) + defer.returnValue( + (404, "No handler for Query type '%s'" % (query_type, )) + ) + + def on_make_join_request(self, context, user_id): + return self.handler.on_make_join_request(context, user_id) + + @defer.inlineCallbacks + def on_send_join_request(self, origin, content): + pdu = Pdu(**content) + state = yield self.handler.on_send_join_request(origin, pdu) + defer.returnValue((200, self._transaction_from_pdus(state).get_dict())) + + def make_join(self, destination, context, user_id): + return self.transport_layer.make_join( + destination=destination, + context=context, + user_id=user_id, + ) @defer.inlineCallbacks @log_function diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index 755eee8cf6..4f552272e6 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -197,6 +197,19 @@ class TransportLayer(object): defer.returnValue(response) + @defer.inlineCallbacks + @log_function + def make_join(self, destination, context, user_id, retry_on_dns_fail=True): + path = PREFIX + "/make_join/%s/%s" % (context, user_id,) + + response = yield self.client.get_json( + destination=destination, + path=path, + retry_on_dns_fail=retry_on_dns_fail, + ) + + defer.returnValue(response) + @defer.inlineCallbacks def _authenticate_request(self, request): json_request = { @@ -353,6 +366,12 @@ class TransportLayer(object): ) ) + self.server.register_path( + "GET", + re.compile("^" + PREFIX + "/make_join/([^/]*)/([^/]*)$"), + self._on_make_join_request + ) + @defer.inlineCallbacks @log_function def _on_send_request(self, origin, content, query, transaction_id): @@ -438,7 +457,20 @@ class TransportLayer(object): versions = [v.split(",", 1) for v in v_list] return self.request_handler.on_backfill_request( - context, versions, limit) + context, versions, limit + ) + + @log_function + def _on_make_join_request(self, origin, content, query, context, user_id): + return self.request_handler.on_make_join_request( + context, user_id, + ) + + @log_function + def _on_send_join_request(self, origin, content, query): + return self.request_handler.on_send_join_request( + origin, content, + ) class TransportReceivedHandler(object): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 44bf7def2e..a4f6c739c3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -89,7 +89,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def on_receive_pdu(self, pdu, backfilled): """ Called by the ReplicationLayer when we have a new pdu. We need to - do auth checks and put it throught the StateHandler. + do auth checks and put it through the StateHandler. """ event = self.pdu_codec.event_from_pdu(pdu) @@ -97,13 +97,17 @@ class FederationHandler(BaseHandler): yield self.state_handler.annotate_state_groups(event) - with (yield self.lock_manager.lock(pdu.context)): - if event.is_state and not backfilled: - is_new_state = yield self.state_handler.handle_new_state( - pdu - ) - else: - is_new_state = False + logger.debug("Event: %s", event) + + if not backfilled: + yield self.auth.check(event, None, raises=True) + + if event.is_state and not backfilled: + is_new_state = yield self.state_handler.handle_new_state( + pdu + ) + else: + is_new_state = False # TODO: Implement something in federation that allows us to # respond to PDU. @@ -267,6 +271,69 @@ class FederationHandler(BaseHandler): defer.returnValue(True) + @defer.inlineCallbacks + def on_make_join_request(self, context, user_id): + event = self.event_factory.create_event( + etype=RoomMemberEvent.TYPE, + content={"membership": Membership.JOIN}, + room_id=context, + user_id=user_id, + state_key=user_id, + ) + + snapshot = yield self.store.snapshot_room( + event.room_id, event.user_id, + ) + snapshot.fill_out_prev_events(event) + + pdu = self.pdu_codec.pdu_from_event(event) + + defer.returnValue(pdu) + + @defer.inlineCallbacks + def on_send_join_request(self, origin, pdu): + event = self.pdu_codec.event_from_pdu(pdu) + + yield self.state_handler.annotate_state_groups(event) + yield self.auth.check(event, None, raises=True) + + is_new_state = yield self.state_handler.handle_new_state( + pdu + ) + + # FIXME (erikj): All this is duplicated above :( + + yield self.store.persist_event( + event, + backfilled=False, + is_new_state=is_new_state + ) + + extra_users = [] + if event.type == RoomMemberEvent.TYPE: + target_user_id = event.state_key + target_user = self.hs.parse_userid(target_user_id) + extra_users.append(target_user) + + yield self.notifier.on_new_room_event( + event, extra_users=extra_users + ) + + if event.type == RoomMemberEvent.TYPE: + if event.membership == Membership.JOIN: + user = self.hs.parse_userid(event.state_key) + self.distributor.fire( + "user_joined_room", user=user, room_id=event.room_id + ) + + pdu.destinations = yield self.store.get_joined_hosts_for_room( + event.room_id + ) + + yield self.replication_layer.send_pdu(pdu) + + defer.returnValue(event.state_events.values()) + @log_function def _on_user_joined(self, user, room_id): waiters = self.waiting_for_join_list.get((user.to_string(), room_id), []) -- cgit 1.4.1 From bb04447c44036ebf3ae5dde7a4cc7a7909d50ef6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 16 Oct 2014 23:25:12 +0100 Subject: Include hashes of previous pdus when referencing them --- synapse/api/events/__init__.py | 2 +- synapse/federation/pdu_codec.py | 13 ++++--------- synapse/federation/replication.py | 2 +- synapse/federation/units.py | 10 +++++++++- synapse/state.py | 4 ---- synapse/storage/__init__.py | 20 ++++++++++++++------ synapse/storage/pdu.py | 22 ++++++++++++++++------ synapse/storage/schema/signatures.sql | 16 ++++++++++++++++ synapse/storage/signatures.py | 31 +++++++++++++++++++++++++++++++ tests/federation/test_federation.py | 2 +- tests/federation/test_pdu_codec.py | 4 ++-- 11 files changed, 95 insertions(+), 31 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index f66fea2904..a5a55742e0 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -65,13 +65,13 @@ class SynapseEvent(JsonEncodedObject): internal_keys = [ "is_state", - "prev_events", "depth", "destinations", "origin", "outlier", "power_level", "redacted", + "prev_pdus", ] required_keys = [ diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index bcac5f9ae8..11fd7264b3 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -45,9 +45,7 @@ class PduCodec(object): kwargs["event_id"] = encode_event_id(pdu.pdu_id, pdu.origin) kwargs["room_id"] = pdu.context kwargs["etype"] = pdu.pdu_type - kwargs["prev_events"] = [ - encode_event_id(p[0], p[1]) for p in pdu.prev_pdus - ] + kwargs["prev_pdus"] = pdu.prev_pdus if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"): kwargs["prev_state"] = encode_event_id( @@ -78,11 +76,8 @@ class PduCodec(object): d["context"] = event.room_id d["pdu_type"] = event.type - if hasattr(event, "prev_events"): - d["prev_pdus"] = [ - decode_event_id(e, self.server_name) - for e in event.prev_events - ] + if hasattr(event, "prev_pdus"): + d["prev_pdus"] = event.prev_pdus if hasattr(event, "prev_state"): d["prev_state_id"], d["prev_state_origin"] = ( @@ -95,7 +90,7 @@ class PduCodec(object): kwargs = copy.deepcopy(event.unrecognized_keys) kwargs.update({ k: v for k, v in d.items() - if k not in ["event_id", "room_id", "type", "prev_events"] + if k not in ["event_id", "room_id", "type"] }) if "ts" not in kwargs: diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 9363ac7300..788a49b8e8 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -443,7 +443,7 @@ class ReplicationLayer(object): min_depth = yield self.store.get_min_depth_for_context(pdu.context) if min_depth and pdu.depth > min_depth: - for pdu_id, origin in pdu.prev_pdus: + for pdu_id, origin, hashes in pdu.prev_pdus: exists = yield self._get_persisted_pdu(pdu_id, origin) if not exists: diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 3518efb215..6a43007837 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -141,8 +141,16 @@ class Pdu(JsonEncodedObject): for kid, sig in pdu_tuple.signatures.items() } + prev_pdus = [] + for prev_pdu in pdu_tuple.prev_pdu_list: + prev_hashes = pdu_tuple.edge_hashes.get(prev_pdu, {}) + prev_hashes = { + alg: encode_base64(hsh) for alg, hsh in prev_hashes.items() + } + prev_pdus.append((prev_pdu[0], prev_pdu[1], prev_hashes)) + return Pdu( - prev_pdus=pdu_tuple.prev_pdu_list, + prev_pdus=prev_pdus, **args ) else: diff --git a/synapse/state.py b/synapse/state.py index 9db84c9b5c..bc6b928ec7 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -72,10 +72,6 @@ class StateHandler(object): snapshot.fill_out_prev_events(event) - event.prev_events = [ - e for e in event.prev_events if e != event.event_id - ] - current_state = snapshot.prev_state_pdu if current_state: diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index b2a3f0b56c..af05b47932 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -177,6 +177,14 @@ class DataStore(RoomMemberStore, RoomStore, txn, pdu.pdu_id, pdu.origin, key_id, signature_bytes, ) + for prev_pdu_id, prev_origin, prev_hashes in pdu.prev_pdus: + for alg, hash_base64 in prev_hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_prev_pdu_hash_txn( + txn, pdu.pdu_id, pdu.origin, prev_pdu_id, prev_origin, alg, + hash_bytes + ) + if pdu.is_state: self._persist_state_txn(txn, pdu.prev_pdus, cols) else: @@ -352,6 +360,7 @@ class DataStore(RoomMemberStore, RoomStore, prev_pdus = self._get_latest_pdus_in_context( txn, room_id ) + if state_type is not None and state_key is not None: prev_state_pdu = self._get_current_state_pdu( txn, room_id, state_type, state_key @@ -401,17 +410,16 @@ class Snapshot(object): self.prev_state_pdu = prev_state_pdu def fill_out_prev_events(self, event): - if hasattr(event, "prev_events"): + if hasattr(event, "prev_pdus"): return - es = [ - "%s@%s" % (p_id, origin) for p_id, origin, _ in self.prev_pdus + event.prev_pdus = [ + (p_id, origin, hashes) + for p_id, origin, hashes, _ in self.prev_pdus ] - event.prev_events = [e for e in es if e != event.event_id] - if self.prev_pdus: - event.depth = max([int(v) for _, _, v in self.prev_pdus]) + 1 + event.depth = max([int(v) for _, _, _, v in self.prev_pdus]) + 1 else: event.depth = 0 diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index 9d624429b7..a423b42dbd 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -20,10 +20,13 @@ from ._base import SQLBaseStore, Table, JoinHelper from synapse.federation.units import Pdu from synapse.util.logutils import log_function +from syutil.base64util import encode_base64 + from collections import namedtuple import logging + logger = logging.getLogger(__name__) @@ -64,6 +67,8 @@ class PduStore(SQLBaseStore): for r in PduEdgesTable.decode_results(txn.fetchall()) ] + edge_hashes = self._get_prev_pdu_hashes_txn(txn, pdu_id, origin) + hashes = self._get_pdu_hashes_txn(txn, pdu_id, origin) signatures = self._get_pdu_origin_signatures_txn( txn, pdu_id, origin @@ -86,7 +91,7 @@ class PduStore(SQLBaseStore): row = txn.fetchone() if row: results.append(PduTuple( - PduEntry(*row), edges, hashes, signatures + PduEntry(*row), edges, hashes, signatures, edge_hashes )) return results @@ -310,9 +315,14 @@ class PduStore(SQLBaseStore): (context, ) ) - results = txn.fetchall() + results = [] + for pdu_id, origin, depth in txn.fetchall(): + hashes = self._get_pdu_hashes_txn(txn, pdu_id, origin) + sha256_bytes = hashes["sha256"] + prev_hashes = {"sha256": encode_base64(sha256_bytes)} + results.append((pdu_id, origin, prev_hashes, depth)) - return [(row[0], row[1], row[2]) for row in results] + return results @defer.inlineCallbacks def get_oldest_pdus_in_context(self, context): @@ -431,7 +441,7 @@ class PduStore(SQLBaseStore): "DELETE FROM %s WHERE pdu_id = ? AND origin = ?" % PduForwardExtremitiesTable.table_name ) - txn.executemany(query, prev_pdus) + txn.executemany(query, list(p[:2] for p in prev_pdus)) # We only insert as a forward extremety the new pdu if there are no # other pdus that reference it as a prev pdu @@ -454,7 +464,7 @@ class PduStore(SQLBaseStore): # deleted in a second if they're incorrect anyway. txn.executemany( PduBackwardExtremitiesTable.insert_statement(), - [(i, o, context) for i, o in prev_pdus] + [(i, o, context) for i, o, _ in prev_pdus] ) # Also delete from the backwards extremities table all ones that @@ -915,7 +925,7 @@ This does not include a prev_pdus key. PduTuple = namedtuple( "PduTuple", - ("pdu_entry", "prev_pdu_list", "hashes", "signatures") + ("pdu_entry", "prev_pdu_list", "hashes", "signatures", "edge_hashes") ) """ This is a tuple of a `PduEntry` and a list of `PduIdTuple` that represent the `prev_pdus` key of a PDU. diff --git a/synapse/storage/schema/signatures.sql b/synapse/storage/schema/signatures.sql index 86ee0f2377..a72c4dc35f 100644 --- a/synapse/storage/schema/signatures.sql +++ b/synapse/storage/schema/signatures.sql @@ -34,3 +34,19 @@ CREATE TABLE IF NOT EXISTS pdu_origin_signatures ( CREATE INDEX IF NOT EXISTS pdu_origin_signatures_id ON pdu_origin_signatures ( pdu_id, origin ); + +CREATE TABLE IF NOT EXISTS pdu_edge_hashes( + pdu_id TEXT, + origin TEXT, + prev_pdu_id TEXT, + prev_origin TEXT, + algorithm TEXT, + hash BLOB, + CONSTRAINT uniqueness UNIQUE ( + pdu_id, origin, prev_pdu_id, prev_origin, algorithm + ) +); + +CREATE INDEX IF NOT EXISTS pdu_edge_hashes_id ON pdu_edge_hashes( + pdu_id, origin +); diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 1f0a680500..1147102489 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -88,3 +88,34 @@ class SignatureStore(SQLBaseStore): "signature": buffer(signature_bytes), }) + def _get_prev_pdu_hashes_txn(self, txn, pdu_id, origin): + """Get all the hashes for previous PDUs of a PDU + Args: + txn (cursor): + pdu_id (str): Id of the PDU. + origin (str): Origin of the PDU. + Returns: + dict of (pdu_id, origin) -> dict of algorithm -> hash_bytes. + """ + query = ( + "SELECT prev_pdu_id, prev_origin, algorithm, hash" + " FROM pdu_edge_hashes" + " WHERE pdu_id = ? and origin = ?" + ) + txn.execute(query, (pdu_id, origin)) + results = {} + for prev_pdu_id, prev_origin, algorithm, hash_bytes in txn.fetchall(): + hashes = results.setdefault((prev_pdu_id, prev_origin), {}) + hashes[algorithm] = hash_bytes + return results + + def _store_prev_pdu_hash_txn(self, txn, pdu_id, origin, prev_pdu_id, + prev_origin, algorithm, hash_bytes): + self._simple_insert_txn(txn, "pdu_edge_hashes", { + "pdu_id": pdu_id, + "origin": origin, + "prev_pdu_id": prev_pdu_id, + "prev_origin": prev_origin, + "algorithm": algorithm, + "hash": buffer(hash_bytes), + }) diff --git a/tests/federation/test_federation.py b/tests/federation/test_federation.py index 03b2167cf7..eed50e6335 100644 --- a/tests/federation/test_federation.py +++ b/tests/federation/test_federation.py @@ -41,7 +41,7 @@ def make_pdu(prev_pdus=[], **kwargs): } pdu_fields.update(kwargs) - return PduTuple(PduEntry(**pdu_fields), prev_pdus, {}, {}) + return PduTuple(PduEntry(**pdu_fields), prev_pdus, {}, {}, {}) class FederationTestCase(unittest.TestCase): diff --git a/tests/federation/test_pdu_codec.py b/tests/federation/test_pdu_codec.py index 80851a4258..0ad8cf6641 100644 --- a/tests/federation/test_pdu_codec.py +++ b/tests/federation/test_pdu_codec.py @@ -88,7 +88,7 @@ class PduCodecTestCase(unittest.TestCase): self.assertEquals(pdu.context, event.room_id) self.assertEquals(pdu.is_state, event.is_state) self.assertEquals(pdu.depth, event.depth) - self.assertEquals(["alice@bob.com"], event.prev_events) + self.assertEquals(pdu.prev_pdus, event.prev_pdus) self.assertEquals(pdu.content, event.content) def test_pdu_from_event(self): @@ -144,7 +144,7 @@ class PduCodecTestCase(unittest.TestCase): self.assertEquals(pdu.context, event.room_id) self.assertEquals(pdu.is_state, event.is_state) self.assertEquals(pdu.depth, event.depth) - self.assertEquals(["alice@bob.com"], event.prev_events) + self.assertEquals(pdu.prev_pdus, event.prev_pdus) self.assertEquals(pdu.content, event.content) self.assertEquals(pdu.state_key, event.state_key) -- cgit 1.4.1 From c8f996e29ffd7055bc6521ea610fc12ff50502e5 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 17 Oct 2014 11:40:35 +0100 Subject: Hash the same content covered by the signature when referencing previous PDUs rather than reusing the PDU content hashes --- synapse/crypto/event_signing.py | 19 +++++++++++---- synapse/federation/pdu_codec.py | 6 +++-- synapse/storage/__init__.py | 9 ++++++- synapse/storage/pdu.py | 4 ++-- synapse/storage/schema/signatures.sql | 18 ++++++++++++-- synapse/storage/signatures.py | 44 +++++++++++++++++++++++++++++++---- 6 files changed, 84 insertions(+), 16 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index a115967c0a..32d60bd30a 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -24,15 +24,15 @@ from syutil.crypto.jsonsign import sign_json, verify_signed_json import hashlib -def hash_event_pdu(pdu, hash_algortithm=hashlib.sha256): - hashed = _compute_hash(pdu, hash_algortithm) +def add_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): + hashed = _compute_content_hash(pdu, hash_algorithm) pdu.hashes[hashed.name] = encode_base64(hashed.digest()) return pdu -def check_event_pdu_hash(pdu, hash_algorithm=hashlib.sha256): +def check_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): """Check whether the hash for this PDU matches the contents""" - computed_hash = _compute_hash(pdu, hash_algortithm) + computed_hash = _compute_content_hash(pdu, hash_algortithm) if computed_hash.name not in pdu.hashes: raise Exception("Algorithm %s not in hashes %s" % ( computed_hash.name, list(pdu.hashes) @@ -45,7 +45,7 @@ def check_event_pdu_hash(pdu, hash_algorithm=hashlib.sha256): return message_hash_bytes == computed_hash.digest() -def _compute_hash(pdu, hash_algorithm): +def _compute_content_hash(pdu, hash_algorithm): pdu_json = pdu.get_dict() pdu_json.pop("meta", None) pdu_json.pop("signatures", None) @@ -54,6 +54,15 @@ def _compute_hash(pdu, hash_algorithm): return hash_algorithm(pdu_json_bytes) +def compute_pdu_event_reference_hash(pdu, hash_algorithm=hashlib.sha256): + tmp_pdu = Pdu(**pdu.get_dict()) + tmp_pdu = prune_pdu(tmp_pdu) + pdu_json = tmp_pdu.get_dict() + pdu_json_bytes = encode_canonical_json(pdu_json) + hashed = hash_algorithm(pdu_json_bytes) + return (hashed.name, hashed.digest()) + + def sign_event_pdu(pdu, signature_name, signing_key): tmp_pdu = Pdu(**pdu.get_dict()) tmp_pdu = prune_pdu(tmp_pdu) diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index 11fd7264b3..7e574f451d 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -14,7 +14,9 @@ # limitations under the License. from .units import Pdu -from synapse.crypto.event_signing import hash_event_pdu, sign_event_pdu +from synapse.crypto.event_signing import ( + add_event_pdu_content_hash, sign_event_pdu +) import copy @@ -97,5 +99,5 @@ class PduCodec(object): kwargs["ts"] = int(self.clock.time_msec()) pdu = Pdu(**kwargs) - pdu = hash_event_pdu(pdu) + pdu = add_event_pdu_content_hash(pdu) return sign_event_pdu(pdu, self.server_name, self.signing_key) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index af05b47932..1738260cc1 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -44,6 +44,8 @@ from .signatures import SignatureStore from syutil.base64util import decode_base64 +from synapse.crypto.event_signing import compute_pdu_event_reference_hash + import json import logging import os @@ -165,7 +167,7 @@ class DataStore(RoomMemberStore, RoomStore, for hash_alg, hash_base64 in pdu.hashes.items(): hash_bytes = decode_base64(hash_base64) - self._store_pdu_hash_txn( + self._store_pdu_content_hash_txn( txn, pdu.pdu_id, pdu.origin, hash_alg, hash_bytes, ) @@ -185,6 +187,11 @@ class DataStore(RoomMemberStore, RoomStore, hash_bytes ) + (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu) + self._store_pdu_reference_hash_txn( + txn, pdu.pdu_id, pdu.origin, ref_alg, ref_hash_bytes + ) + if pdu.is_state: self._persist_state_txn(txn, pdu.prev_pdus, cols) else: diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index a423b42dbd..3a90c382f0 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -69,7 +69,7 @@ class PduStore(SQLBaseStore): edge_hashes = self._get_prev_pdu_hashes_txn(txn, pdu_id, origin) - hashes = self._get_pdu_hashes_txn(txn, pdu_id, origin) + hashes = self._get_pdu_content_hashes_txn(txn, pdu_id, origin) signatures = self._get_pdu_origin_signatures_txn( txn, pdu_id, origin ) @@ -317,7 +317,7 @@ class PduStore(SQLBaseStore): results = [] for pdu_id, origin, depth in txn.fetchall(): - hashes = self._get_pdu_hashes_txn(txn, pdu_id, origin) + hashes = self._get_pdu_reference_hashes_txn(txn, pdu_id, origin) sha256_bytes = hashes["sha256"] prev_hashes = {"sha256": encode_base64(sha256_bytes)} results.append((pdu_id, origin, prev_hashes, depth)) diff --git a/synapse/storage/schema/signatures.sql b/synapse/storage/schema/signatures.sql index a72c4dc35f..1c45a51bec 100644 --- a/synapse/storage/schema/signatures.sql +++ b/synapse/storage/schema/signatures.sql @@ -13,7 +13,7 @@ * limitations under the License. */ -CREATE TABLE IF NOT EXISTS pdu_hashes ( +CREATE TABLE IF NOT EXISTS pdu_content_hashes ( pdu_id TEXT, origin TEXT, algorithm TEXT, @@ -21,7 +21,21 @@ CREATE TABLE IF NOT EXISTS pdu_hashes ( CONSTRAINT uniqueness UNIQUE (pdu_id, origin, algorithm) ); -CREATE INDEX IF NOT EXISTS pdu_hashes_id ON pdu_hashes (pdu_id, origin); +CREATE INDEX IF NOT EXISTS pdu_content_hashes_id ON pdu_content_hashes ( + pdu_id, origin +); + +CREATE TABLE IF NOT EXISTS pdu_reference_hashes ( + pdu_id TEXT, + origin TEXT, + algorithm TEXT, + hash BLOB, + CONSTRAINT uniqueness UNIQUE (pdu_id, origin, algorithm) +); + +CREATE INDEX IF NOT EXISTS pdu_reference_hashes_id ON pdu_reference_hashes ( + pdu_id, origin +); CREATE TABLE IF NOT EXISTS pdu_origin_signatures ( pdu_id TEXT, diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 1147102489..85eec7ffbe 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -21,7 +21,7 @@ from twisted.internet import defer class SignatureStore(SQLBaseStore): """Persistence for PDU signatures and hashes""" - def _get_pdu_hashes_txn(self, txn, pdu_id, origin): + def _get_pdu_content_hashes_txn(self, txn, pdu_id, origin): """Get all the hashes for a given PDU. Args: txn (cursor): @@ -32,13 +32,14 @@ class SignatureStore(SQLBaseStore): """ query = ( "SELECT algorithm, hash" - " FROM pdu_hashes" + " FROM pdu_content_hashes" " WHERE pdu_id = ? and origin = ?" ) txn.execute(query, (pdu_id, origin)) return dict(txn.fetchall()) - def _store_pdu_hash_txn(self, txn, pdu_id, origin, algorithm, hash_bytes): + def _store_pdu_content_hash_txn(self, txn, pdu_id, origin, algorithm, + hash_bytes): """Store a hash for a PDU Args: txn (cursor): @@ -47,13 +48,48 @@ class SignatureStore(SQLBaseStore): algorithm (str): Hashing algorithm. hash_bytes (bytes): Hash function output bytes. """ - self._simple_insert_txn(txn, "pdu_hashes", { + self._simple_insert_txn(txn, "pdu_content_hashes", { "pdu_id": pdu_id, "origin": origin, "algorithm": algorithm, "hash": buffer(hash_bytes), }) + def _get_pdu_reference_hashes_txn(self, txn, pdu_id, origin): + """Get all the hashes for a given PDU. + Args: + txn (cursor): + pdu_id (str): Id for the PDU. + origin (str): origin of the PDU. + Returns: + A dict of algorithm -> hash. + """ + query = ( + "SELECT algorithm, hash" + " FROM pdu_reference_hashes" + " WHERE pdu_id = ? and origin = ?" + ) + txn.execute(query, (pdu_id, origin)) + return dict(txn.fetchall()) + + def _store_pdu_reference_hash_txn(self, txn, pdu_id, origin, algorithm, + hash_bytes): + """Store a hash for a PDU + Args: + txn (cursor): + pdu_id (str): Id for the PDU. + origin (str): origin of the PDU. + algorithm (str): Hashing algorithm. + hash_bytes (bytes): Hash function output bytes. + """ + self._simple_insert_txn(txn, "pdu_reference_hashes", { + "pdu_id": pdu_id, + "origin": origin, + "algorithm": algorithm, + "hash": buffer(hash_bytes), + }) + + def _get_pdu_origin_signatures_txn(self, txn, pdu_id, origin): """Get all the signatures for a given PDU. Args: -- cgit 1.4.1 From f71627567b4aa58c5aba7e79c6d972b8ac26b449 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Oct 2014 15:04:17 +0100 Subject: Finish implementing the new join dance. --- synapse/api/auth.py | 9 ++ synapse/api/events/factory.py | 14 ++- synapse/federation/replication.py | 66 ++++++++++---- synapse/federation/transport.py | 68 ++++++++++++-- synapse/handlers/federation.py | 181 ++++++++++++++++++-------------------- synapse/state.py | 10 ++- 6 files changed, 222 insertions(+), 126 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 12ddef1b00..d1eca791ab 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -48,6 +48,15 @@ class Auth(object): """ try: if hasattr(event, "room_id"): + if not event.old_state_events: + # Oh, we don't know what the state of the room was, so we + # are trusting that this is allowed (at least for now) + defer.returnValue(True) + + if hasattr(event, "outlier") and event.outlier: + # TODO (erikj): Auth for outliers is done differently. + defer.returnValue(True) + is_state = hasattr(event, "state_key") if event.type == RoomMemberEvent.TYPE: diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py index 0d94850cec..c6d1151cac 100644 --- a/synapse/api/events/factory.py +++ b/synapse/api/events/factory.py @@ -51,12 +51,20 @@ class EventFactory(object): self.clock = hs.get_clock() self.hs = hs + self.event_id_count = 0 + + def create_event_id(self): + i = str(self.event_id_count) + self.event_id_count += 1 + + local_part = str(int(self.clock.time())) + i + random_string(5) + + return "%s@%s" % (local_part, self.hs.hostname) + def create_event(self, etype=None, **kwargs): kwargs["type"] = etype if "event_id" not in kwargs: - kwargs["event_id"] = "%s@%s" % ( - random_string(10), self.hs.hostname - ) + kwargs["event_id"] = self.create_event_id() if "ts" not in kwargs: kwargs["ts"] = int(self.clock.time_msec()) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 08c29dece5..d482193851 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -244,13 +244,14 @@ class ReplicationLayer(object): pdu = None if pdu_list: pdu = pdu_list[0] - yield self._handle_new_pdu(pdu) + yield self._handle_new_pdu(destination, pdu) defer.returnValue(pdu) @defer.inlineCallbacks @log_function - def get_state_for_context(self, destination, context): + def get_state_for_context(self, destination, context, pdu_id=None, + pdu_origin=None): """Requests all of the `current` state PDUs for a given context from a remote home server. @@ -263,13 +264,14 @@ class ReplicationLayer(object): """ transaction_data = yield self.transport_layer.get_context_state( - destination, context) + destination, context, pdu_id=pdu_id, pdu_origin=pdu_origin, + ) transaction = Transaction(**transaction_data) pdus = [Pdu(outlier=True, **p) for p in transaction.pdus] for pdu in pdus: - yield self._handle_new_pdu(pdu) + yield self._handle_new_pdu(destination, pdu) defer.returnValue(pdus) @@ -315,7 +317,7 @@ class ReplicationLayer(object): dl = [] for pdu in pdu_list: - dl.append(self._handle_new_pdu(pdu)) + dl.append(self._handle_new_pdu(transaction.origin, pdu)) if hasattr(transaction, "edus"): for edu in [Edu(**x) for x in transaction.edus]: @@ -347,14 +349,19 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_context_state_request(self, context): - results = yield self.store.get_current_state_for_context( - context - ) + def on_context_state_request(self, context, pdu_id, pdu_origin): + if pdu_id and pdu_origin: + pdus = yield self.handler.get_state_for_pdu( + pdu_id, pdu_origin + ) + else: + results = yield self.store.get_current_state_for_context( + context + ) + pdus = [Pdu.from_pdu_tuple(p) for p in results] - logger.debug("Context returning %d results", len(results)) + logger.debug("Context returning %d results", len(pdus)) - pdus = [Pdu.from_pdu_tuple(p) for p in results] defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) @defer.inlineCallbacks @@ -396,9 +403,10 @@ class ReplicationLayer(object): defer.returnValue( (404, "No handler for Query type '%s'" % (query_type, )) ) - + @defer.inlineCallbacks def on_make_join_request(self, context, user_id): - return self.handler.on_make_join_request(context, user_id) + pdu = yield self.handler.on_make_join_request(context, user_id) + defer.returnValue(pdu.get_dict()) @defer.inlineCallbacks def on_send_join_request(self, origin, content): @@ -406,13 +414,27 @@ class ReplicationLayer(object): state = yield self.handler.on_send_join_request(origin, pdu) defer.returnValue((200, self._transaction_from_pdus(state).get_dict())) + @defer.inlineCallbacks def make_join(self, destination, context, user_id): - return self.transport_layer.make_join( + pdu_dict = yield self.transport_layer.make_join( destination=destination, context=context, user_id=user_id, ) + logger.debug("Got response to make_join: %s", pdu_dict) + + defer.returnValue(Pdu(**pdu_dict)) + + def send_join(self, destination, pdu): + return self.transport_layer.send_join( + destination, + pdu.context, + pdu.pdu_id, + pdu.origin, + pdu.get_dict(), + ) + @defer.inlineCallbacks @log_function def _get_persisted_pdu(self, pdu_id, pdu_origin): @@ -443,7 +465,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def _handle_new_pdu(self, pdu, backfilled=False): + def _handle_new_pdu(self, origin, pdu, backfilled=False): # We reprocess pdus when we have seen them only as outliers existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin) @@ -452,6 +474,8 @@ class ReplicationLayer(object): defer.returnValue({}) return + state = None + # Get missing pdus if necessary. is_new = yield self.pdu_actions.is_new(pdu) if is_new and not pdu.outlier: @@ -475,12 +499,22 @@ class ReplicationLayer(object): except: # TODO(erikj): Do some more intelligent retries. logger.exception("Failed to get PDU") + else: + # We need to get the state at this event, since we have reached + # a backward extremity edge. + state = yield self.get_state_for_context( + origin, pdu.context, pdu.pdu_id, pdu.origin, + ) # Persist the Pdu, but don't mark it as processed yet. yield self.store.persist_event(pdu=pdu) if not backfilled: - ret = yield self.handler.on_receive_pdu(pdu, backfilled=backfilled) + ret = yield self.handler.on_receive_pdu( + pdu, + backfilled=backfilled, + state=state, + ) else: ret = None diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index 4f552272e6..a0d34fd24d 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -72,7 +72,8 @@ class TransportLayer(object): self.received_handler = None @log_function - def get_context_state(self, destination, context): + def get_context_state(self, destination, context, pdu_id=None, + pdu_origin=None): """ Requests all state for a given context (i.e. room) from the given server. @@ -89,7 +90,14 @@ class TransportLayer(object): subpath = "/state/%s/" % context - return self._do_request_for_transaction(destination, subpath) + args = {} + if pdu_id and pdu_origin: + args["pdu_id"] = pdu_id + args["pdu_origin"] = pdu_origin + + return self._do_request_for_transaction( + destination, subpath, args=args + ) @log_function def get_pdu(self, destination, pdu_origin, pdu_id): @@ -135,8 +143,10 @@ class TransportLayer(object): subpath = "/backfill/%s/" % context - args = {"v": ["%s,%s" % (i, o) for i, o in pdu_tuples]} - args["limit"] = limit + args = { + "v": ["%s,%s" % (i, o) for i, o in pdu_tuples], + "limit": limit, + } return self._do_request_for_transaction( dest, @@ -210,6 +220,23 @@ class TransportLayer(object): defer.returnValue(response) + @defer.inlineCallbacks + @log_function + def send_join(self, destination, context, pdu_id, origin, content): + path = PREFIX + "/send_join/%s/%s/%s" % ( + context, + origin, + pdu_id, + ) + + response = yield self.client.put_json( + destination=destination, + path=path, + data=content, + ) + + defer.returnValue(response) + @defer.inlineCallbacks def _authenticate_request(self, request): json_request = { @@ -330,7 +357,11 @@ class TransportLayer(object): re.compile("^" + PREFIX + "/state/([^/]*)/$"), self._with_authentication( lambda origin, content, query, context: - handler.on_context_state_request(context) + handler.on_context_state_request( + context, + query.get("pdu_id", [None])[0], + query.get("pdu_origin", [None])[0] + ) ) ) @@ -369,7 +400,23 @@ class TransportLayer(object): self.server.register_path( "GET", re.compile("^" + PREFIX + "/make_join/([^/]*)/([^/]*)$"), - self._on_make_join_request + self._with_authentication( + lambda origin, content, query, context, user_id: + self._on_make_join_request( + origin, content, query, context, user_id + ) + ) + ) + + self.server.register_path( + "PUT", + re.compile("^" + PREFIX + "/send_join/([^/]*)/([^/]*)/([^/]*)$"), + self._with_authentication( + lambda origin, content, query, context, pdu_origin, pdu_id: + self._on_send_join_request( + origin, content, query, + ) + ) ) @defer.inlineCallbacks @@ -460,18 +507,23 @@ class TransportLayer(object): context, versions, limit ) + @defer.inlineCallbacks @log_function def _on_make_join_request(self, origin, content, query, context, user_id): - return self.request_handler.on_make_join_request( + content = yield self.request_handler.on_make_join_request( context, user_id, ) + defer.returnValue((200, content)) + @defer.inlineCallbacks @log_function def _on_send_join_request(self, origin, content, query): - return self.request_handler.on_send_join_request( + content = yield self.request_handler.on_send_join_request( origin, content, ) + defer.returnValue((200, content)) + class TransportReceivedHandler(object): """ Callbacks used when we receive a transaction diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a4f6c739c3..0ae0541bd3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -20,7 +20,7 @@ from ._base import BaseHandler from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent from synapse.api.constants import Membership from synapse.util.logutils import log_function -from synapse.federation.pdu_codec import PduCodec +from synapse.federation.pdu_codec import PduCodec, encode_event_id from synapse.api.errors import SynapseError from twisted.internet import defer, reactor @@ -87,7 +87,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def on_receive_pdu(self, pdu, backfilled): + def on_receive_pdu(self, pdu, backfilled, state=None): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it through the StateHandler. """ @@ -95,7 +95,10 @@ class FederationHandler(BaseHandler): logger.debug("Got event: %s", event.event_id) - yield self.state_handler.annotate_state_groups(event) + if state: + state = [self.pdu_codec.event_from_pdu(p) for p in state] + state = {(e.type, e.state_key): e for e in state} + yield self.state_handler.annotate_state_groups(event, state=state) logger.debug("Event: %s", event) @@ -108,83 +111,55 @@ class FederationHandler(BaseHandler): ) else: is_new_state = False + # TODO: Implement something in federation that allows us to # respond to PDU. - target_is_mine = False - if hasattr(event, "target_host"): - target_is_mine = event.target_host == self.hs.hostname - - if event.type == InviteJoinEvent.TYPE: - if not target_is_mine: - logger.debug("Ignoring invite/join event %s", event) - return - - # If we receive an invite/join event then we need to join the - # sender to the given room. - # TODO: We should probably auth this or some such - content = event.content - content.update({"membership": Membership.JOIN}) - new_event = self.event_factory.create_event( - etype=RoomMemberEvent.TYPE, - state_key=event.user_id, - room_id=event.room_id, - user_id=event.user_id, - membership=Membership.JOIN, - content=content + with (yield self.room_lock.lock(event.room_id)): + yield self.store.persist_event( + event, + backfilled, + is_new_state=is_new_state ) - yield self.hs.get_handlers().room_member_handler.change_membership( - new_event, - do_auth=False, - ) + room = yield self.store.get_room(event.room_id) - else: - with (yield self.room_lock.lock(event.room_id)): - yield self.store.persist_event( - event, - backfilled, - is_new_state=is_new_state + if not room: + # Huh, let's try and get the current state + try: + yield self.replication_layer.get_state_for_context( + event.origin, event.room_id, pdu.pdu_id, pdu.origin, ) - room = yield self.store.get_room(event.room_id) - - if not room: - # Huh, let's try and get the current state - try: - yield self.replication_layer.get_state_for_context( - event.origin, event.room_id - ) - - hosts = yield self.store.get_joined_hosts_for_room( - event.room_id - ) - if self.hs.hostname in hosts: - try: - yield self.store.store_room( - room_id=event.room_id, - room_creator_user_id="", - is_public=False, - ) - except: - pass - except: - logger.exception( - "Failed to get current state for room %s", - event.room_id - ) - - if not backfilled: - extra_users = [] - if event.type == RoomMemberEvent.TYPE: - target_user_id = event.state_key - target_user = self.hs.parse_userid(target_user_id) - extra_users.append(target_user) - - yield self.notifier.on_new_room_event( - event, extra_users=extra_users + hosts = yield self.store.get_joined_hosts_for_room( + event.room_id + ) + if self.hs.hostname in hosts: + try: + yield self.store.store_room( + room_id=event.room_id, + room_creator_user_id="", + is_public=False, + ) + except: + pass + except: + logger.exception( + "Failed to get current state for room %s", + event.room_id ) + if not backfilled: + extra_users = [] + if event.type == RoomMemberEvent.TYPE: + target_user_id = event.state_key + target_user = self.hs.parse_userid(target_user_id) + extra_users.append(target_user) + + yield self.notifier.on_new_room_event( + event, extra_users=extra_users + ) + if event.type == RoomMemberEvent.TYPE: if event.membership == Membership.JOIN: user = self.hs.parse_userid(event.state_key) @@ -214,40 +189,35 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks def do_invite_join(self, target_host, room_id, joinee, content, snapshot): - hosts = yield self.store.get_joined_hosts_for_room(room_id) if self.hs.hostname in hosts: # We are already in the room. logger.debug("We're already in the room apparently") defer.returnValue(False) - # First get current state to see if we are already joined. - try: - yield self.replication_layer.get_state_for_context( - target_host, room_id - ) - - hosts = yield self.store.get_joined_hosts_for_room(room_id) - if self.hs.hostname in hosts: - # Oh, we were actually in the room already. - logger.debug("We're already in the room apparently") - defer.returnValue(False) - except Exception: - logger.exception("Failed to get current state") - - new_event = self.event_factory.create_event( - etype=InviteJoinEvent.TYPE, - target_host=target_host, - room_id=room_id, - user_id=joinee, - content=content + pdu = yield self.replication_layer.make_join( + target_host, + room_id, + joinee ) - new_event.destinations = [target_host] + logger.debug("Got response to make_join: %s", pdu) - snapshot.fill_out_prev_events(new_event) - yield self.state_handler.annotate_state_groups(new_event) - yield self.handle_new_event(new_event, snapshot) + event = self.pdu_codec.event_from_pdu(pdu) + + # We should assert some things. + assert(event.type == RoomMemberEvent.TYPE) + assert(event.user_id == joinee) + assert(event.state_key == joinee) + assert(event.room_id == room_id) + + event.event_id = self.event_factory.create_event_id() + event.content = content + + state = yield self.replication_layer.send_join( + target_host, + self.pdu_codec.pdu_from_event(event) + ) # TODO (erikj): Time out here. d = defer.Deferred() @@ -326,14 +296,31 @@ class FederationHandler(BaseHandler): "user_joined_room", user=user, room_id=event.room_id ) - pdu.destinations = yield self.store.get_joined_hosts_for_room( + new_pdu = self.pdu_codec.pdu_from_event(event); + new_pdu.destinations = yield self.store.get_joined_hosts_for_room( event.room_id ) - yield self.replication_layer.send_pdu(pdu) + yield self.replication_layer.send_pdu(new_pdu) defer.returnValue(event.state_events.values()) + @defer.inlineCallbacks + def get_state_for_pdu(self, pdu_id, pdu_origin): + state_groups = yield self.store.get_state_groups( + [encode_event_id(pdu_id, pdu_origin)] + ) + + if state_groups: + defer.returnValue( + [ + self.pdu_codec.pdu_from_event(s) + for s in state_groups[0].state + ] + ) + else: + defer.returnValue([]) + @log_function def _on_user_joined(self, user, room_id): waiters = self.waiting_for_join_list.get((user.to_string(), room_id), []) diff --git a/synapse/state.py b/synapse/state.py index 9be6b716e2..8c4eeb8924 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -130,7 +130,13 @@ class StateHandler(object): defer.returnValue(is_new) @defer.inlineCallbacks - def annotate_state_groups(self, event): + def annotate_state_groups(self, event, state=None): + if state: + event.state_group = None + event.old_state_events = None + event.state_events = state + return + state_groups = yield self.store.get_state_groups( event.prev_events ) @@ -177,7 +183,7 @@ class StateHandler(object): new_powers_deferreds = [] for e in curr_events: new_powers_deferreds.append( - self.store.get_power_level(e.context, e.user_id) + self.store.get_power_level(e.room_id, e.user_id) ) new_powers = yield defer.gatherResults( -- cgit 1.4.1 From 4d1a7624f444deee4352645fbf73165e11f66dd0 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 17 Oct 2014 15:27:11 +0100 Subject: move 'age' into 'meta' subdict so that it is clearer that it is not part of the signed data --- synapse/federation/replication.py | 20 ++++++++++++++------ synapse/federation/units.py | 6 +++++- 2 files changed, 19 insertions(+), 7 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 788a49b8e8..c4993aa5ee 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -295,6 +295,10 @@ class ReplicationLayer(object): transaction = Transaction(**transaction_data) for p in transaction.pdus: + if "meta" in p: + meta = p["meta"] + if "age" in meta: + p["age"] = meta["age"] if "age" in p: p["age_ts"] = int(self._clock.time_msec()) - int(p["age"]) del p["age"] @@ -414,14 +418,16 @@ class ReplicationLayer(object): transmission. """ pdus = [p.get_dict() for p in pdu_list] + time_now = self._clock.time_msec() for p in pdus: - if "age_ts" in pdus: - p["age"] = int(self.clock.time_msec()) - p["age_ts"] - + if "age_ts" in p: + age = time_now - p["age_ts"] + p.setdefault("meta", {})["age"] = int(age) + del p["age_ts"] return Transaction( origin=self.server_name, pdus=pdus, - ts=int(self._clock.time_msec()), + ts=int(time_now), destination=None, ) @@ -589,7 +595,7 @@ class _TransactionQueue(object): logger.debug("TX [%s] Persisting transaction...", destination) transaction = Transaction.create_new( - ts=self._clock.time_msec(), + ts=int(self._clock.time_msec()), transaction_id=str(self._next_txn_id), origin=self.server_name, destination=destination, @@ -614,7 +620,9 @@ class _TransactionQueue(object): if "pdus" in data: for p in data["pdus"]: if "age_ts" in p: - p["age"] = now - int(p["age_ts"]) + meta = p.setdefault("meta", {}) + meta["age"] = now - int(p["age_ts"]) + del p["age_ts"] return data code, response = yield self.transport_layer.send_transaction( diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 6a43007837..c4a10a4123 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -68,11 +68,11 @@ class Pdu(JsonEncodedObject): "signatures", "is_state", # Below this are keys valid only for State Pdus. "state_key", - "power_level", "prev_state_id", "prev_state_origin", "required_power_level", "user_id", + "meta" ] internal_keys = [ @@ -124,6 +124,10 @@ class Pdu(JsonEncodedObject): if pdu_tuple: d = copy.copy(pdu_tuple.pdu_entry._asdict()) + for k in d.keys(): + if d[k] is None: + del d[k] + d["content"] = json.loads(d["content_json"]) del d["content_json"] -- cgit 1.4.1 From c5cec1cc77029c21f0117c318c522ab320de3923 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 17 Oct 2014 16:50:04 +0100 Subject: Rename 'meta' to 'unsigned' --- docs/server-server/signing.rst | 16 ++++++++-------- synapse/crypto/event_signing.py | 4 +++- synapse/federation/replication.py | 14 +++++++------- synapse/federation/units.py | 1 - 4 files changed, 18 insertions(+), 17 deletions(-) (limited to 'synapse/federation') diff --git a/docs/server-server/signing.rst b/docs/server-server/signing.rst index dae10f121b..60c701ca91 100644 --- a/docs/server-server/signing.rst +++ b/docs/server-server/signing.rst @@ -1,13 +1,13 @@ Signing JSON ============ -JSON is signed by encoding the JSON object without ``signatures`` or ``meta`` +JSON is signed by encoding the JSON object without ``signatures`` or ``unsigned`` keys using a canonical encoding. The JSON bytes are then signed using the signature algorithm and the signature encoded using base64 with the padding stripped. The resulting base64 signature is added to an object under the *signing key identifier* which is added to the ``signatures`` object under the name of the server signing it which is added back to the original JSON object -along with the ``meta`` object. +along with the ``unsigned`` object. The *signing key identifier* is the concatenation of the *signing algorithm* and a *key version*. The *signing algorithm* identifies the algorithm used to @@ -15,8 +15,8 @@ sign the JSON. The currently support value for *signing algorithm* is ``ed25519`` as implemented by NACL (http://nacl.cr.yp.to/). The *key version* is used to distinguish between different signing keys used by the same entity. -The ``meta`` object and the ``signatures`` object are not covered by the -signature. Therefore intermediate servers can add metadata such as time stamps +The ``unsigned`` object and the ``signatures`` object are not covered by the +signature. Therefore intermediate servers can add unsigneddata such as time stamps and additional signatures. @@ -27,7 +27,7 @@ and additional signatures. "signing_keys": { "ed25519:1": "XSl0kuyvrXNj6A+7/tkrB9sxSbRi08Of5uRhxOqZtEQ" }, - "meta": { + "unsigned": { "retrieved_ts_ms": 922834800000 }, "signatures": { @@ -41,7 +41,7 @@ and additional signatures. def sign_json(json_object, signing_key, signing_name): signatures = json_object.pop("signatures", {}) - meta = json_object.pop("meta", None) + unsigned = json_object.pop("unsigned", None) signed = signing_key.sign(encode_canonical_json(json_object)) signature_base64 = encode_base64(signed.signature) @@ -50,8 +50,8 @@ and additional signatures. signatures.setdefault(sigature_name, {})[key_id] = signature_base64 json_object["signatures"] = signatures - if meta is not None: - json_object["meta"] = meta + if unsigned is not None: + json_object["unsigned"] = unsigned return json_object diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index 32d60bd30a..a236f7d708 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -47,7 +47,9 @@ def check_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): def _compute_content_hash(pdu, hash_algorithm): pdu_json = pdu.get_dict() - pdu_json.pop("meta", None) + #TODO: Make "age_ts" key internal + pdu_json.pop("age_ts") + pdu_json.pop("unsigned", None) pdu_json.pop("signatures", None) hashes = pdu_json.pop("hashes", {}) pdu_json_bytes = encode_canonical_json(pdu_json) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index c4993aa5ee..f2a5d4d5e2 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -295,10 +295,10 @@ class ReplicationLayer(object): transaction = Transaction(**transaction_data) for p in transaction.pdus: - if "meta" in p: - meta = p["meta"] - if "age" in meta: - p["age"] = meta["age"] + if "unsigned" in p: + unsigned = p["unsigned"] + if "age" in unsigned: + p["age"] = unsigned["age"] if "age" in p: p["age_ts"] = int(self._clock.time_msec()) - int(p["age"]) del p["age"] @@ -422,7 +422,7 @@ class ReplicationLayer(object): for p in pdus: if "age_ts" in p: age = time_now - p["age_ts"] - p.setdefault("meta", {})["age"] = int(age) + p.setdefault("unsigned", {})["age"] = int(age) del p["age_ts"] return Transaction( origin=self.server_name, @@ -620,8 +620,8 @@ class _TransactionQueue(object): if "pdus" in data: for p in data["pdus"]: if "age_ts" in p: - meta = p.setdefault("meta", {}) - meta["age"] = now - int(p["age_ts"]) + unsigned = p.setdefault("unsigned", {}) + unsigned["age"] = now - int(p["age_ts"]) del p["age_ts"] return data diff --git a/synapse/federation/units.py b/synapse/federation/units.py index c4a10a4123..c629e5793e 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -72,7 +72,6 @@ class Pdu(JsonEncodedObject): "prev_state_origin", "required_power_level", "user_id", - "meta" ] internal_keys = [ -- cgit 1.4.1 From 5ffe5ab43fa090111a0141b04ce6342172f60724 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Oct 2014 18:56:42 +0100 Subject: Use state groups to get current state. Make join dance actually work. --- synapse/api/auth.py | 5 +++ synapse/federation/replication.py | 17 +++++++- synapse/federation/transport.py | 57 +++++++++++++++++++++++--- synapse/handlers/federation.py | 74 +++++++++++++++++++++++---------- synapse/handlers/message.py | 6 +-- synapse/rest/base.py | 5 +++ synapse/rest/events.py | 34 ++++++++++------ synapse/state.py | 86 +++++++++++++++++++++++++++------------ synapse/storage/pdu.py | 6 +++ synapse/storage/state.py | 3 ++ 10 files changed, 226 insertions(+), 67 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index d1eca791ab..50ce7eb4cd 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -22,6 +22,7 @@ from synapse.api.errors import AuthError, StoreError, Codes, SynapseError from synapse.api.events.room import ( RoomMemberEvent, RoomPowerLevelsEvent, RoomRedactionEvent, RoomJoinRulesEvent, RoomOpsPowerLevelsEvent, InviteJoinEvent, + RoomCreateEvent, ) from synapse.util.logutils import log_function @@ -59,6 +60,10 @@ class Auth(object): is_state = hasattr(event, "state_key") + if event.type == RoomCreateEvent.TYPE: + # FIXME + defer.returnValue(True) + if event.type == RoomMemberEvent.TYPE: yield self._can_replace_state(event) allowed = yield self.is_membership_change_allowed(event) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index d482193851..8c7d510ef6 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -403,11 +403,18 @@ class ReplicationLayer(object): defer.returnValue( (404, "No handler for Query type '%s'" % (query_type, )) ) + @defer.inlineCallbacks def on_make_join_request(self, context, user_id): pdu = yield self.handler.on_make_join_request(context, user_id) defer.returnValue(pdu.get_dict()) + @defer.inlineCallbacks + def on_invite_request(self, origin, content): + pdu = Pdu(**content) + ret_pdu = yield self.handler.on_send_join_request(origin, pdu) + defer.returnValue((200, ret_pdu.get_dict())) + @defer.inlineCallbacks def on_send_join_request(self, origin, content): pdu = Pdu(**content) @@ -426,8 +433,9 @@ class ReplicationLayer(object): defer.returnValue(Pdu(**pdu_dict)) + @defer.inlineCallbacks def send_join(self, destination, pdu): - return self.transport_layer.send_join( + _, content = yield self.transport_layer.send_join( destination, pdu.context, pdu.pdu_id, @@ -435,6 +443,13 @@ class ReplicationLayer(object): pdu.get_dict(), ) + logger.debug("Got content: %s", content) + pdus = [Pdu(outlier=True, **p) for p in content.get("pdus", [])] + for pdu in pdus: + yield self._handle_new_pdu(destination, pdu) + + defer.returnValue(pdus) + @defer.inlineCallbacks @log_function def _get_persisted_pdu(self, pdu_id, pdu_origin): diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index a0d34fd24d..de64702e2f 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -229,13 +229,36 @@ class TransportLayer(object): pdu_id, ) - response = yield self.client.put_json( + code, content = yield self.client.put_json( destination=destination, path=path, data=content, ) - defer.returnValue(response) + if not 200 <= code < 300: + raise RuntimeError("Got %d from send_join", code) + + defer.returnValue(json.loads(content)) + + @defer.inlineCallbacks + @log_function + def send_invite(self, destination, context, pdu_id, origin, content): + path = PREFIX + "/invite/%s/%s/%s" % ( + context, + origin, + pdu_id, + ) + + code, content = yield self.client.put_json( + destination=destination, + path=path, + data=content, + ) + + if not 200 <= code < 300: + raise RuntimeError("Got %d from send_invite", code) + + defer.returnValue(json.loads(content)) @defer.inlineCallbacks def _authenticate_request(self, request): @@ -297,9 +320,13 @@ class TransportLayer(object): @defer.inlineCallbacks def new_handler(request, *args, **kwargs): (origin, content) = yield self._authenticate_request(request) - response = yield handler( - origin, content, request.args, *args, **kwargs - ) + try: + response = yield handler( + origin, content, request.args, *args, **kwargs + ) + except: + logger.exception("Callback failed") + raise defer.returnValue(response) return new_handler @@ -419,6 +446,17 @@ class TransportLayer(object): ) ) + self.server.register_path( + "PUT", + re.compile("^" + PREFIX + "/invite/([^/]*)/([^/]*)/([^/]*)$"), + self._with_authentication( + lambda origin, content, query, context, pdu_origin, pdu_id: + self._on_invite_request( + origin, content, query, + ) + ) + ) + @defer.inlineCallbacks @log_function def _on_send_request(self, origin, content, query, transaction_id): @@ -524,6 +562,15 @@ class TransportLayer(object): defer.returnValue((200, content)) + @defer.inlineCallbacks + @log_function + def _on_invite_request(self, origin, content, query): + content = yield self.request_handler.on_invite_request( + origin, content, + ) + + defer.returnValue((200, content)) + class TransportReceivedHandler(object): """ Callbacks used when we receive a transaction diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0ae0541bd3..70790aaa72 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -62,6 +62,9 @@ class FederationHandler(BaseHandler): self.pdu_codec = PduCodec(hs) + # When joining a room we need to queue any events for that room up + self.room_queues = {} + @log_function @defer.inlineCallbacks def handle_new_event(self, event, snapshot): @@ -95,22 +98,25 @@ class FederationHandler(BaseHandler): logger.debug("Got event: %s", event.event_id) + if event.room_id in self.room_queues: + self.room_queues[event.room_id].append(pdu) + return + if state: state = [self.pdu_codec.event_from_pdu(p) for p in state] state = {(e.type, e.state_key): e for e in state} - yield self.state_handler.annotate_state_groups(event, state=state) + + is_new_state = yield self.state_handler.annotate_state_groups( + event, + state=state + ) logger.debug("Event: %s", event) if not backfilled: yield self.auth.check(event, None, raises=True) - if event.is_state and not backfilled: - is_new_state = yield self.state_handler.handle_new_state( - pdu - ) - else: - is_new_state = False + is_new_state = is_new_state and not backfilled # TODO: Implement something in federation that allows us to # respond to PDU. @@ -211,6 +217,8 @@ class FederationHandler(BaseHandler): assert(event.state_key == joinee) assert(event.room_id == room_id) + self.room_queues[room_id] = [] + event.event_id = self.event_factory.create_event_id() event.content = content @@ -219,15 +227,14 @@ class FederationHandler(BaseHandler): self.pdu_codec.pdu_from_event(event) ) - # TODO (erikj): Time out here. - d = defer.Deferred() - self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d) - reactor.callLater(10, d.cancel) + state = [self.pdu_codec.event_from_pdu(p) for p in state] - try: - yield d - except defer.CancelledError: - raise SynapseError(500, "Unable to join remote room") + logger.debug("do_invite_join state: %s", state) + + is_new_state = yield self.state_handler.annotate_state_groups( + event, + state=state + ) try: yield self.store.store_room( @@ -239,6 +246,32 @@ class FederationHandler(BaseHandler): # FIXME pass + for e in state: + # FIXME: Auth these. + is_new_state = yield self.state_handler.annotate_state_groups( + e, + state=state + ) + + yield self.store.persist_event( + e, + backfilled=False, + is_new_state=False + ) + + yield self.store.persist_event( + event, + backfilled=False, + is_new_state=is_new_state + ) + + room_queue = self.room_queues[room_id] + del self.room_queues[room_id] + + for p in room_queue: + p.outlier = True + yield self.on_receive_pdu(p, backfilled=False) + defer.returnValue(True) @defer.inlineCallbacks @@ -264,13 +297,9 @@ class FederationHandler(BaseHandler): def on_send_join_request(self, origin, pdu): event = self.pdu_codec.event_from_pdu(pdu) - yield self.state_handler.annotate_state_groups(event) + is_new_state= yield self.state_handler.annotate_state_groups(event) yield self.auth.check(event, None, raises=True) - is_new_state = yield self.state_handler.handle_new_state( - pdu - ) - # FIXME (erikj): All this is duplicated above :( yield self.store.persist_event( @@ -303,7 +332,10 @@ class FederationHandler(BaseHandler): yield self.replication_layer.send_pdu(new_pdu) - defer.returnValue(event.state_events.values()) + defer.returnValue([ + self.pdu_codec.pdu_from_event(e) + for e in event.state_events.values() + ]) @defer.inlineCallbacks def get_state_for_pdu(self, pdu_id, pdu_origin): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 1c2cbce151..4aaf97a83e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -199,7 +199,7 @@ class MessageHandler(BaseHandler): raise RoomError( 403, "Member does not meet private room rules.") - data = yield self.store.get_current_state( + data = yield self.state_handler.get_current_state( room_id, event_type, state_key ) defer.returnValue(data) @@ -238,7 +238,7 @@ class MessageHandler(BaseHandler): yield self.auth.check_joined_room(room_id, user_id) # TODO: This is duplicating logic from snapshot_all_rooms - current_state = yield self.store.get_current_state(room_id) + current_state = yield self.state_handler.get_current_state(room_id) defer.returnValue([self.hs.serialize_event(c) for c in current_state]) @defer.inlineCallbacks @@ -315,7 +315,7 @@ class MessageHandler(BaseHandler): "end": end_token.to_string(), } - current_state = yield self.store.get_current_state( + current_state = yield self.state_handler.get_current_state( event.room_id ) d["state"] = [self.hs.serialize_event(c) for c in current_state] diff --git a/synapse/rest/base.py b/synapse/rest/base.py index 2e8e3fa7d4..dc784c1527 100644 --- a/synapse/rest/base.py +++ b/synapse/rest/base.py @@ -18,6 +18,11 @@ from synapse.api.urls import CLIENT_PREFIX from synapse.rest.transactions import HttpTransactionStore import re +import logging + + +logger = logging.getLogger(__name__) + def client_path_pattern(path_regex): """Creates a regex compiled client path with the correct client path diff --git a/synapse/rest/events.py b/synapse/rest/events.py index 097195d7cc..92ff5e5ca7 100644 --- a/synapse/rest/events.py +++ b/synapse/rest/events.py @@ -20,6 +20,12 @@ from synapse.api.errors import SynapseError from synapse.streams.config import PaginationConfig from synapse.rest.base import RestServlet, client_path_pattern +import logging + + +logger = logging.getLogger(__name__) + + class EventStreamRestServlet(RestServlet): PATTERN = client_path_pattern("/events$") @@ -29,18 +35,22 @@ class EventStreamRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request): auth_user = yield self.auth.get_user_by_req(request) - - handler = self.handlers.event_stream_handler - pagin_config = PaginationConfig.from_request(request) - timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS - if "timeout" in request.args: - try: - timeout = int(request.args["timeout"][0]) - except ValueError: - raise SynapseError(400, "timeout must be in milliseconds.") - - chunk = yield handler.get_stream(auth_user.to_string(), pagin_config, - timeout=timeout) + try: + handler = self.handlers.event_stream_handler + pagin_config = PaginationConfig.from_request(request) + timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS + if "timeout" in request.args: + try: + timeout = int(request.args["timeout"][0]) + except ValueError: + raise SynapseError(400, "timeout must be in milliseconds.") + + chunk = yield handler.get_stream( + auth_user.to_string(), pagin_config, timeout=timeout + ) + except: + logger.exception("Event stream failed") + raise defer.returnValue((200, chunk)) diff --git a/synapse/state.py b/synapse/state.py index 8c4eeb8924..24685c6fb4 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.federation.pdu_codec import encode_event_id, decode_event_id from synapse.util.logutils import log_function +from synapse.federation.pdu_codec import encode_event_id from collections import namedtuple @@ -130,54 +131,89 @@ class StateHandler(object): defer.returnValue(is_new) @defer.inlineCallbacks + @log_function def annotate_state_groups(self, event, state=None): if state: event.state_group = None event.old_state_events = None - event.state_events = state + event.state_events = {(s.type, s.state_key): s for s in state} + defer.returnValue(False) + return + + if hasattr(event, "outlier") and event.outlier: + event.state_group = None + event.old_state_events = None + event.state_events = None + defer.returnValue(False) return + new_state = yield self.resolve_state_groups(event.prev_events) + + event.old_state_events = new_state + + if hasattr(event, "state_key"): + new_state[(event.type, event.state_key)] = event + + event.state_group = None + event.state_events = new_state + + defer.returnValue(hasattr(event, "state_key")) + + @defer.inlineCallbacks + def get_current_state(self, room_id, event_type=None, state_key=""): + # FIXME: HACK! + pdus = yield self.store.get_latest_pdus_in_context(room_id) + + event_ids = [encode_event_id(p.pdu_id, p.origin) for p in pdus] + + res = self.resolve_state_groups(event_ids) + + if event_type: + defer.returnValue(res.get((event_type, state_key))) + return + + defer.returnValue(res.values()) + + @defer.inlineCallbacks + @log_function + def resolve_state_groups(self, event_ids): state_groups = yield self.store.get_state_groups( - event.prev_events + event_ids ) state = {} - state_sets = {} for group in state_groups: for s in group.state: - state.setdefault((s.type, s.state_key), []).append(s) - - state_sets.setdefault( + state.setdefault( (s.type, s.state_key), - set() - ).add(s.event_id) + {} + )[s.event_id] = s unconflicted_state = { - k: state[k].pop() for k, v in state_sets.items() - if len(v) == 1 + k: v.values()[0] for k, v in state.items() + if len(v.values()) == 1 } conflicted_state = { - k: state[k] - for k, v in state_sets.items() - if len(v) > 1 + k: v.values() + for k, v in state.items() + if len(v.values()) > 1 } - new_state = {} - new_state.update(unconflicted_state) - for key, events in conflicted_state.items(): - new_state[key] = yield self.resolve(events) + try: + new_state = {} + new_state.update(unconflicted_state) + for key, events in conflicted_state.items(): + new_state[key] = yield self._resolve_state_events(events) + except: + logger.exception("Failed to resolve state") + raise - event.old_state_events = new_state - - if hasattr(event, "state_key"): - new_state[(event.type, event.state_key)] = event - - event.state_group = None - event.state_events = new_state + defer.returnValue(new_state) @defer.inlineCallbacks - def resolve(self, events): + @log_function + def _resolve_state_events(self, events): curr_events = events new_powers_deferreds = [] diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index d70467dcd6..b1cb0185a6 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -277,6 +277,12 @@ class PduStore(SQLBaseStore): (context, depth) ) + def get_latest_pdus_in_context(self, context): + return self.runInteraction( + self._get_latest_pdus_in_context, + context + ) + def _get_latest_pdus_in_context(self, txn, context): """Get's a list of the most current pdus for a given context. This is used when we are sending a Pdu and need to fill out the `prev_pdus` diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 9496c935a7..0aa979c9f0 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -63,6 +63,9 @@ class StateStore(SQLBaseStore): ) def _store_state_groups_txn(self, txn, event): + if not event.state_events: + return + state_group = event.state_group if not state_group: state_group = self._simple_insert_txn( -- cgit 1.4.1 From 8afbece68319728e20c3b32c2f949fd1745d405e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 17 Oct 2014 19:41:32 +0100 Subject: Remove signatures from pdu when computing hashes to use for prev pdus, make sure is_state is a boolean. --- synapse/crypto/event_signing.py | 6 +++++- synapse/federation/units.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index a236f7d708..d3b501c6e7 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -22,6 +22,9 @@ from syutil.base64util import encode_base64, decode_base64 from syutil.crypto.jsonsign import sign_json, verify_signed_json import hashlib +import logging + +logger = logging.getLogger(__name__) def add_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): @@ -48,7 +51,7 @@ def check_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): def _compute_content_hash(pdu, hash_algorithm): pdu_json = pdu.get_dict() #TODO: Make "age_ts" key internal - pdu_json.pop("age_ts") + pdu_json.pop("age_ts", None) pdu_json.pop("unsigned", None) pdu_json.pop("signatures", None) hashes = pdu_json.pop("hashes", {}) @@ -60,6 +63,7 @@ def compute_pdu_event_reference_hash(pdu, hash_algorithm=hashlib.sha256): tmp_pdu = Pdu(**pdu.get_dict()) tmp_pdu = prune_pdu(tmp_pdu) pdu_json = tmp_pdu.get_dict() + pdu_json.pop("signatures", None) pdu_json_bytes = encode_canonical_json(pdu_json) hashed = hash_algorithm(pdu_json_bytes) return (hashed.name, hashed.digest()) diff --git a/synapse/federation/units.py b/synapse/federation/units.py index b81e162512..b779d259bd 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -101,7 +101,7 @@ class Pdu(JsonEncodedObject): super(Pdu, self).__init__( destinations=destinations, - is_state=is_state, + is_state=bool(is_state), prev_pdus=prev_pdus, outlier=outlier, hashes=hashes, -- cgit 1.4.1 From 5e2236f9ffe3a66bbe0ff37b1793e8fa59a1c475 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 27 Oct 2014 11:19:15 +0000 Subject: fix pyflakes warnings --- synapse/crypto/event_signing.py | 8 ++++---- synapse/federation/units.py | 2 ++ synapse/storage/signatures.py | 2 -- 3 files changed, 6 insertions(+), 6 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index d3b501c6e7..61edd2c6f9 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -35,12 +35,12 @@ def add_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): def check_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): """Check whether the hash for this PDU matches the contents""" - computed_hash = _compute_content_hash(pdu, hash_algortithm) + computed_hash = _compute_content_hash(pdu, hash_algorithm) if computed_hash.name not in pdu.hashes: raise Exception("Algorithm %s not in hashes %s" % ( computed_hash.name, list(pdu.hashes) )) - message_hash_base64 = hashes[computed_hash.name] + message_hash_base64 = pdu.hashes[computed_hash.name] try: message_hash_bytes = decode_base64(message_hash_base64) except: @@ -54,7 +54,7 @@ def _compute_content_hash(pdu, hash_algorithm): pdu_json.pop("age_ts", None) pdu_json.pop("unsigned", None) pdu_json.pop("signatures", None) - hashes = pdu_json.pop("hashes", {}) + pdu_json.pop("hashes", None) pdu_json_bytes = encode_canonical_json(pdu_json) return hash_algorithm(pdu_json_bytes) @@ -73,7 +73,7 @@ def sign_event_pdu(pdu, signature_name, signing_key): tmp_pdu = Pdu(**pdu.get_dict()) tmp_pdu = prune_pdu(tmp_pdu) pdu_json = tmp_pdu.get_dict() - pdu_jdon = sign_json(pdu_json, signature_name, signing_key) + pdu_json = sign_json(pdu_json, signature_name, signing_key) pdu.signatures = pdu_json["signatures"] return pdu diff --git a/synapse/federation/units.py b/synapse/federation/units.py index b779d259bd..adc3385644 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -155,6 +155,8 @@ class Pdu(JsonEncodedObject): return Pdu( prev_pdus=prev_pdus, + hashes=hashes, + signatures=signatures, **args ) else: diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 85eec7ffbe..82be946d3f 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -15,8 +15,6 @@ from _base import SQLBaseStore -from twisted.internet import defer - class SignatureStore(SQLBaseStore): """Persistence for PDU signatures and hashes""" -- cgit 1.4.1 From 2d1dfb3b34583a4de7e1e53f685c2564a7fc731f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Oct 2014 16:42:35 +0000 Subject: Begin implementing all the PDU storage stuff in Events land --- synapse/api/events/__init__.py | 4 +- synapse/federation/pdu_codec.py | 11 ++- synapse/storage/__init__.py | 72 ++++++++++---- synapse/storage/_base.py | 53 +++++++---- synapse/storage/event_federation.py | 143 ++++++++++++++++++++++++++++ synapse/storage/schema/event_edges.sql | 51 ++++++++++ synapse/storage/schema/event_signatures.sql | 65 +++++++++++++ synapse/storage/schema/im.sql | 1 + synapse/storage/signatures.py | 127 ++++++++++++++++++++++++ 9 files changed, 485 insertions(+), 42 deletions(-) create mode 100644 synapse/storage/event_federation.py create mode 100644 synapse/storage/schema/event_edges.sql create mode 100644 synapse/storage/schema/event_signatures.sql (limited to 'synapse/federation') diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index a5a55742e0..b855811b98 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -71,7 +71,9 @@ class SynapseEvent(JsonEncodedObject): "outlier", "power_level", "redacted", - "prev_pdus", + "prev_events", + "hashes", + "signatures", ] required_keys = [ diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index 991aae2a56..2cd591410b 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -47,7 +47,10 @@ class PduCodec(object): kwargs["event_id"] = encode_event_id(pdu.pdu_id, pdu.origin) kwargs["room_id"] = pdu.context kwargs["etype"] = pdu.pdu_type - kwargs["prev_pdus"] = pdu.prev_pdus + kwargs["prev_events"] = [ + encode_event_id(i, o) + for i, o in pdu.prev_pdus + ] if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"): kwargs["prev_state"] = encode_event_id( @@ -78,8 +81,8 @@ class PduCodec(object): d["context"] = event.room_id d["pdu_type"] = event.type - if hasattr(event, "prev_pdus"): - d["prev_pdus"] = event.prev_pdus + if hasattr(event, "prev_events"): + d["prev_pdus"] = [decode_event_id(e) for e in event.prev_events] if hasattr(event, "prev_state"): d["prev_state_id"], d["prev_state_origin"] = ( @@ -92,7 +95,7 @@ class PduCodec(object): kwargs = copy.deepcopy(event.unrecognized_keys) kwargs.update({ k: v for k, v in d.items() - if k not in ["event_id", "room_id", "type"] + if k not in ["event_id", "room_id", "type", "prev_events"] }) if "origin_server_ts" not in kwargs: diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a50e19349a..678de0cf50 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -40,6 +40,7 @@ from .stream import StreamStore from .pdu import StatePduStore, PduStore, PdusTable from .transactions import TransactionStore from .keys import KeyStore +from .event_federation import EventFederationStore from .state import StateStore from .signatures import SignatureStore @@ -69,6 +70,7 @@ SCHEMAS = [ "redactions", "state", "signatures", + "event_edges", ] @@ -83,10 +85,12 @@ class _RollbackButIsFineException(Exception): """ pass + class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, PduStore, StatePduStore, TransactionStore, - DirectoryStore, KeyStore, StateStore, SignatureStore): + DirectoryStore, KeyStore, StateStore, SignatureStore, + EventFederationStore, ): def __init__(self, hs): super(DataStore, self).__init__(hs) @@ -230,6 +234,10 @@ class DataStore(RoomMemberStore, RoomStore, elif event.type == RoomRedactionEvent.TYPE: self._store_redaction(txn, event) + outlier = False + if hasattr(event, "outlier"): + outlier = event.outlier + vals = { "topological_ordering": event.depth, "event_id": event.event_id, @@ -237,20 +245,20 @@ class DataStore(RoomMemberStore, RoomStore, "room_id": event.room_id, "content": json.dumps(event.content), "processed": True, + "outlier": outlier, + "depth": event.depth, } if stream_ordering is not None: vals["stream_ordering"] = stream_ordering - if hasattr(event, "outlier"): - vals["outlier"] = event.outlier - else: - vals["outlier"] = False - unrec = { k: v for k, v in event.get_full_dict().items() - if k not in vals.keys() and k not in ["redacted", "redacted_because"] + if k not in vals.keys() and k not in [ + "redacted", "redacted_because", "signatures", "hashes", + "prev_events", + ] } vals["unrecognized_keys"] = json.dumps(unrec) @@ -264,6 +272,14 @@ class DataStore(RoomMemberStore, RoomStore, ) raise _RollbackButIsFineException("_persist_event") + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + self._store_state_groups_txn(txn, event) is_state = hasattr(event, "state_key") and event.state_key is not None @@ -291,6 +307,28 @@ class DataStore(RoomMemberStore, RoomStore, } ) + signatures = event.signatures.get(event.origin, {}) + + for key_id, signature_base64 in signatures.items(): + signature_bytes = decode_base64(signature_base64) + self._store_event_origin_signature_txn( + txn, event.event_id, key_id, signature_bytes, + ) + + for prev_event_id, prev_hashes in event.prev_events: + for alg, hash_base64 in prev_hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_prev_event_hash_txn( + txn, event.event_id, prev_event_id, alg, hash_bytes + ) + + (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu) + self._store_pdu_reference_hash_txn( + txn, pdu.pdu_id, pdu.origin, ref_alg, ref_hash_bytes + ) + + self._update_min_depth_for_room_txn(txn, event.room_id, event.depth) + def _store_redaction(self, txn, event): txn.execute( "INSERT OR IGNORE INTO redactions " @@ -373,7 +411,7 @@ class DataStore(RoomMemberStore, RoomStore, """ def _snapshot(txn): membership_state = self._get_room_member(txn, user_id, room_id) - prev_pdus = self._get_latest_pdus_in_context( + prev_events = self._get_latest_events_in_room( txn, room_id ) @@ -388,7 +426,7 @@ class DataStore(RoomMemberStore, RoomStore, store=self, room_id=room_id, user_id=user_id, - prev_pdus=prev_pdus, + prev_events=prev_events, membership_state=membership_state, state_type=state_type, state_key=state_key, @@ -404,7 +442,7 @@ class Snapshot(object): store (DataStore): The datastore. room_id (RoomId): The room of the snapshot. user_id (UserId): The user this snapshot is for. - prev_pdus (list): The list of PDU ids this snapshot is after. + prev_events (list): The list of event ids this snapshot is after. membership_state (RoomMemberEvent): The current state of the user in the room. state_type (str, optional): State type captured by the snapshot @@ -413,29 +451,29 @@ class Snapshot(object): the previous value of the state type and key in the room. """ - def __init__(self, store, room_id, user_id, prev_pdus, + def __init__(self, store, room_id, user_id, prev_events, membership_state, state_type=None, state_key=None, prev_state_pdu=None): self.store = store self.room_id = room_id self.user_id = user_id - self.prev_pdus = prev_pdus + self.prev_events = prev_events self.membership_state = membership_state self.state_type = state_type self.state_key = state_key self.prev_state_pdu = prev_state_pdu def fill_out_prev_events(self, event): - if hasattr(event, "prev_pdus"): + if hasattr(event, "prev_events"): return - event.prev_pdus = [ + event.prev_events = [ (p_id, origin, hashes) - for p_id, origin, hashes, _ in self.prev_pdus + for p_id, origin, hashes, _ in self.prev_events ] - if self.prev_pdus: - event.depth = max([int(v) for _, _, _, v in self.prev_pdus]) + 1 + if self.prev_events: + event.depth = max([int(v) for _, _, _, v in self.prev_events]) + 1 else: event.depth = 0 diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1192216971..30732caa83 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -193,7 +193,6 @@ class SQLBaseStore(object): table, keyvalues, retcols=retcols, allow_none=allow_none ) - @defer.inlineCallbacks def _simple_select_one_onecol(self, table, keyvalues, retcol, allow_none=False): """Executes a SELECT query on the named table, which is expected to @@ -204,19 +203,41 @@ class SQLBaseStore(object): keyvalues : dict of column names and values to select the row with retcol : string giving the name of the column to return """ - ret = yield self._simple_select_one( + return self.runInteraction( + "_simple_select_one_onecol_txn", + self._simple_select_one_onecol_txn, + table, keyvalues, retcol, allow_none=allow_none, + ) + + def _simple_select_one_onecol_txn(self, txn, table, keyvalues, retcol, + allow_none=False): + ret = self._simple_select_onecol_txn( + txn, table=table, keyvalues=keyvalues, - retcols=[retcol], - allow_none=allow_none + retcols=retcol, ) if ret: - defer.returnValue(ret[retcol]) + return ret[retcol] else: - defer.returnValue(None) + if allow_none: + return None + else: + raise StoreError(404, "No row found") + + def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol): + sql = "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" % { + "retcol": retcol, + "table": table, + "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()), + } + + txn.execute(sql, keyvalues.values()) + + return [r[0] for r in txn.fetchall()] + - @defer.inlineCallbacks def _simple_select_onecol(self, table, keyvalues, retcol): """Executes a SELECT query on the named table, which returns a list comprising of the values of the named column from the selected rows. @@ -229,19 +250,11 @@ class SQLBaseStore(object): Returns: Deferred: Results in a list """ - sql = "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" % { - "retcol": retcol, - "table": table, - "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()), - } - - def func(txn): - txn.execute(sql, keyvalues.values()) - return txn.fetchall() - - res = yield self.runInteraction("_simple_select_onecol", func) - - defer.returnValue([r[0] for r in res]) + return self.runInteraction( + "_simple_select_onecol", + self._simple_select_onecol_txn, + table, keyvalues, retcol + ) def _simple_select_list(self, table, keyvalues, retcols): """Executes a SELECT query on the named table, which may return zero or diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py new file mode 100644 index 0000000000..27ad9aea4d --- /dev/null +++ b/synapse/storage/event_federation.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore +from twisted.internet import defer + +import logging + + +logger = logging.getLogger(__name__) + + +class EventFederationStore(SQLBaseStore): + + def _get_latest_events_in_room(self, txn, room_id): + self._simple_select_onecol_txn( + txn, + table="event_forward_extremities", + keyvalues={ + "room_id": room_id, + }, + retcol="event_id", + ) + + results = [] + for pdu_id, origin, depth in txn.fetchall(): + hashes = self._get_pdu_reference_hashes_txn(txn, pdu_id, origin) + sha256_bytes = hashes["sha256"] + prev_hashes = {"sha256": encode_base64(sha256_bytes)} + results.append((pdu_id, origin, prev_hashes, depth)) + + def _get_min_depth_interaction(self, txn, room_id): + min_depth = self._simple_select_one_onecol_txn( + txn, + table="room_depth", + keyvalues={"room_id": room_id,}, + retcol="min_depth", + allow_none=True, + ) + + return int(min_depth) if min_depth is not None else None + + def _update_min_depth_for_room_txn(self, txn, room_id, depth): + min_depth = self._get_min_depth_interaction(txn, room_id) + + do_insert = depth < min_depth if min_depth else True + + if do_insert: + self._simple_insert_txn( + txn, + table="room_depth", + values={ + "room_id": room_id, + "min_depth": depth, + }, + or_replace=True, + ) + + def _handle_prev_events(self, txn, outlier, event_id, prev_events, + room_id): + for e_id in prev_events: + # TODO (erikj): This could be done as a bulk insert + self._simple_insert_txn( + txn, + table="event_edges", + values={ + "event_id": event_id, + "prev_event": e_id, + "room_id": room_id, + } + ) + + # Update the extremities table if this is not an outlier. + if not outlier: + for e_id in prev_events: + # TODO (erikj): This could be done as a bulk insert + self._simple_delete_txn( + txn, + table="event_forward_extremities", + keyvalues={ + "event_id": e_id, + "room_id": room_id, + } + ) + + + + # We only insert as a forward extremity the new pdu if there are no + # other pdus that reference it as a prev pdu + query = ( + "INSERT INTO %(table)s (event_id, room_id) " + "SELECT ?, ? WHERE NOT EXISTS (" + "SELECT 1 FROM %(event_edges)s WHERE " + "prev_event_id = ? " + ")" + ) % { + "table": "event_forward_extremities", + "event_edges": "event_edges", + } + + logger.debug("query: %s", query) + + txn.execute(query, (event_id, room_id, event_id)) + + # Insert all the prev_pdus as a backwards thing, they'll get + # deleted in a second if they're incorrect anyway. + for e_id in prev_events: + # TODO (erikj): This could be done as a bulk insert + self._simple_insert_txn( + txn, + table="event_backward_extremities", + values={ + "event_id": e_id, + "room_id": room_id, + } + ) + + # Also delete from the backwards extremities table all ones that + # reference pdus that we have already seen + query = ( + "DELETE FROM %(event_back)s as b WHERE EXISTS (" + "SELECT 1 FROM %(events)s AS events " + "WHERE " + "b.event_id = events.event_id " + "AND not events.outlier " + ")" + ) % { + "event_back": "event_backward_extremities", + "events": "events", + } + txn.execute(query) \ No newline at end of file diff --git a/synapse/storage/schema/event_edges.sql b/synapse/storage/schema/event_edges.sql new file mode 100644 index 0000000000..6a28314ece --- /dev/null +++ b/synapse/storage/schema/event_edges.sql @@ -0,0 +1,51 @@ + +CREATE TABLE IF NOT EXISTS event_forward_extremities( + event_id TEXT, + room_id TEXT, + CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE +); + +CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id); +CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id); +-- + +CREATE TABLE IF NOT EXISTS event_backward_extremities( + event_id TEXT, + room_id TEXT, + CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE +); + +CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id); +CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id); +-- + +CREATE TABLE IF NOT EXISTS event_edges( + event_id TEXT, + prev_event_id TEXT, + room_id TEXT, + CONSTRAINT uniqueness UNIQUE (event_id, prev_event_id, room_id) +); + +CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id); +CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id); +-- + + +CREATE TABLE IF NOT EXISTS room_depth( + room_id TEXT, + min_depth INTEGER, + CONSTRAINT uniqueness UNIQUE (room_id) +); + +CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id); +-- + +create TABLE IF NOT EXISTS event_destinations( + event_id TEXT, + destination TEXT, + delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered + CONSTRAINT uniqueness UNIQUE (event_id, destination) ON CONFLICT REPLACE +); + +CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id); +-- \ No newline at end of file diff --git a/synapse/storage/schema/event_signatures.sql b/synapse/storage/schema/event_signatures.sql new file mode 100644 index 0000000000..5491c7ecec --- /dev/null +++ b/synapse/storage/schema/event_signatures.sql @@ -0,0 +1,65 @@ +/* Copyright 2014 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_content_hashes ( + event_id TEXT, + algorithm TEXT, + hash BLOB, + CONSTRAINT uniqueness UNIQUE (event_id, algorithm) +); + +CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes( + event_id +); + + +CREATE TABLE IF NOT EXISTS event_reference_hashes ( + event_id TEXT, + algorithm TEXT, + hash BLOB, + CONSTRAINT uniqueness UNIQUE (event_id, algorithm) +); + +CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes ( + event_id +); + + +CREATE TABLE IF NOT EXISTS event_origin_signatures ( + event_id TEXT, + origin TEXT, + key_id TEXT, + signature BLOB, + CONSTRAINT uniqueness UNIQUE (event_id, key_id) +); + +CREATE INDEX IF NOT EXISTS event_origin_signatures_id ON event_origin_signatures ( + event_id +); + + +CREATE TABLE IF NOT EXISTS event_edge_hashes( + event_id TEXT, + prev_event_id TEXT, + algorithm TEXT, + hash BLOB, + CONSTRAINT uniqueness UNIQUE ( + event_id, prev_event_id, algorithm + ) +); + +CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes( + event_id +); diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 3aa83f5c8c..8d6f655993 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -23,6 +23,7 @@ CREATE TABLE IF NOT EXISTS events( unrecognized_keys TEXT, processed BOOL NOT NULL, outlier BOOL NOT NULL, + depth INTEGER DEFAULT 0 NOT NULL, CONSTRAINT ev_uniq UNIQUE (event_id) ); diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 82be946d3f..b8f8fd44cb 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -153,3 +153,130 @@ class SignatureStore(SQLBaseStore): "algorithm": algorithm, "hash": buffer(hash_bytes), }) + + ## Events ## + + def _get_event_content_hashes_txn(self, txn, event_id): + """Get all the hashes for a given Event. + Args: + txn (cursor): + event_id (str): Id for the Event. + Returns: + A dict of algorithm -> hash. + """ + query = ( + "SELECT algorithm, hash" + " FROM event_content_hashes" + " WHERE event_id = ?" + ) + txn.execute(query, (event_id, )) + return dict(txn.fetchall()) + + def _store_event_content_hash_txn(self, txn, event_id, algorithm, + hash_bytes): + """Store a hash for a Event + Args: + txn (cursor): + event_id (str): Id for the Event. + algorithm (str): Hashing algorithm. + hash_bytes (bytes): Hash function output bytes. + """ + self._simple_insert_txn(txn, "event_content_hashes", { + "event_id": event_id, + "algorithm": algorithm, + "hash": buffer(hash_bytes), + }) + + def _get_event_reference_hashes_txn(self, txn, event_id): + """Get all the hashes for a given PDU. + Args: + txn (cursor): + event_id (str): Id for the Event. + Returns: + A dict of algorithm -> hash. + """ + query = ( + "SELECT algorithm, hash" + " FROM event_reference_hashes" + " WHERE event_id = ?" + ) + txn.execute(query, (event_id, )) + return dict(txn.fetchall()) + + def _store_event_reference_hash_txn(self, txn, event_id, algorithm, + hash_bytes): + """Store a hash for a PDU + Args: + txn (cursor): + event_id (str): Id for the Event. + algorithm (str): Hashing algorithm. + hash_bytes (bytes): Hash function output bytes. + """ + self._simple_insert_txn(txn, "event_reference_hashes", { + "event_id": event_id, + "algorithm": algorithm, + "hash": buffer(hash_bytes), + }) + + + def _get_event_origin_signatures_txn(self, txn, event_id): + """Get all the signatures for a given PDU. + Args: + txn (cursor): + event_id (str): Id for the Event. + Returns: + A dict of key_id -> signature_bytes. + """ + query = ( + "SELECT key_id, signature" + " FROM event_origin_signatures" + " WHERE event_id = ? " + ) + txn.execute(query, (event_id, )) + return dict(txn.fetchall()) + + def _store_event_origin_signature_txn(self, txn, event_id, origin, key_id, + signature_bytes): + """Store a signature from the origin server for a PDU. + Args: + txn (cursor): + event_id (str): Id for the Event. + origin (str): origin of the Event. + key_id (str): Id for the signing key. + signature (bytes): The signature. + """ + self._simple_insert_txn(txn, "event_origin_signatures", { + "event_id": event_id, + "origin": origin, + "key_id": key_id, + "signature": buffer(signature_bytes), + }) + + def _get_prev_event_hashes_txn(self, txn, event_id): + """Get all the hashes for previous PDUs of a PDU + Args: + txn (cursor): + event_id (str): Id for the Event. + Returns: + dict of (pdu_id, origin) -> dict of algorithm -> hash_bytes. + """ + query = ( + "SELECT prev_event_id, algorithm, hash" + " FROM event_edge_hashes" + " WHERE event_id = ?" + ) + txn.execute(query, (event_id, )) + results = {} + for prev_event_id, algorithm, hash_bytes in txn.fetchall(): + hashes = results.setdefault(prev_event_id, {}) + hashes[algorithm] = hash_bytes + return results + + def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id, + algorithm, hash_bytes): + self._simple_insert_txn(txn, "event_edge_hashes", { + "event_id": event_id, + "prev_event_id": prev_event_id, + "algorithm": algorithm, + "hash": buffer(hash_bytes), + }) \ No newline at end of file -- cgit 1.4.1 From e7858b6d7ef37849a3d2d5004743cdd21ec330a8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 29 Oct 2014 16:59:24 +0000 Subject: Start filling out and using new events tables --- synapse/federation/pdu_codec.py | 12 +++-- synapse/handlers/_base.py | 4 ++ synapse/handlers/federation.py | 90 +++++++++++++++++++--------------- synapse/state.py | 11 +++-- synapse/storage/__init__.py | 45 ++++++++++------- synapse/storage/_base.py | 33 ++++++++++--- synapse/storage/event_federation.py | 49 ++++++++++++------ synapse/storage/schema/event_edges.sql | 8 ++- 8 files changed, 159 insertions(+), 93 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index 2cd591410b..dccbccb85b 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -48,8 +48,8 @@ class PduCodec(object): kwargs["room_id"] = pdu.context kwargs["etype"] = pdu.pdu_type kwargs["prev_events"] = [ - encode_event_id(i, o) - for i, o in pdu.prev_pdus + (encode_event_id(i, o), s) + for i, o, s in pdu.prev_pdus ] if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"): @@ -82,7 +82,13 @@ class PduCodec(object): d["pdu_type"] = event.type if hasattr(event, "prev_events"): - d["prev_pdus"] = [decode_event_id(e) for e in event.prev_events] + def f(e, s): + i, o = decode_event_id(e, self.server_name) + return i, o, s + d["prev_pdus"] = [ + f(e, s) + for e, s in event.prev_events + ] if hasattr(event, "prev_state"): d["prev_state_id"], d["prev_state_origin"] = ( diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index cd6c35f194..787a01efc5 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -16,6 +16,8 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError +from synapse.util.async import run_on_reactor + class BaseHandler(object): def __init__(self, hs): @@ -45,6 +47,8 @@ class BaseHandler(object): @defer.inlineCallbacks def _on_new_room_event(self, event, snapshot, extra_destinations=[], extra_users=[], suppress_auth=False): + yield run_on_reactor() + snapshot.fill_out_prev_events(event) yield self.state_handler.annotate_state_groups(event) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b575986fc3..5f86ed03fa 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -22,6 +22,7 @@ from synapse.api.constants import Membership from synapse.util.logutils import log_function from synapse.federation.pdu_codec import PduCodec, encode_event_id from synapse.api.errors import SynapseError +from synapse.util.async import run_on_reactor from twisted.internet import defer, reactor @@ -81,6 +82,8 @@ class FederationHandler(BaseHandler): processing. """ + yield run_on_reactor() + pdu = self.pdu_codec.pdu_from_event(event) if not hasattr(pdu, "destinations") or not pdu.destinations: @@ -102,6 +105,8 @@ class FederationHandler(BaseHandler): self.room_queues[event.room_id].append(pdu) return + logger.debug("Processing event: %s", event.event_id) + if state: state = [self.pdu_codec.event_from_pdu(p) for p in state] @@ -216,58 +221,65 @@ class FederationHandler(BaseHandler): assert(event.state_key == joinee) assert(event.room_id == room_id) - self.room_queues[room_id] = [] - - event.event_id = self.event_factory.create_event_id() - event.content = content + event.outlier = False - state = yield self.replication_layer.send_join( - target_host, - self.pdu_codec.pdu_from_event(event) - ) + self.room_queues[room_id] = [] - state = [self.pdu_codec.event_from_pdu(p) for p in state] + try: + event.event_id = self.event_factory.create_event_id() + event.content = content - logger.debug("do_invite_join state: %s", state) + state = yield self.replication_layer.send_join( + target_host, + self.pdu_codec.pdu_from_event(event) + ) - is_new_state = yield self.state_handler.annotate_state_groups( - event, - state=state - ) + state = [self.pdu_codec.event_from_pdu(p) for p in state] - try: - yield self.store.store_room( - room_id=room_id, - room_creator_user_id="", - is_public=False - ) - except: - # FIXME - pass + logger.debug("do_invite_join state: %s", state) - for e in state: - # FIXME: Auth these. is_new_state = yield self.state_handler.annotate_state_groups( - e, + event, + state=state ) + logger.debug("do_invite_join event: %s", event) + + try: + yield self.store.store_room( + room_id=room_id, + room_creator_user_id="", + is_public=False + ) + except: + # FIXME + pass + + for e in state: + # FIXME: Auth these. + e.outlier = True + + yield self.state_handler.annotate_state_groups( + e, + ) + + yield self.store.persist_event( + e, + backfilled=False, + is_new_state=False + ) + yield self.store.persist_event( - e, + event, backfilled=False, - is_new_state=False + is_new_state=is_new_state ) + finally: + room_queue = self.room_queues[room_id] + del self.room_queues[room_id] - yield self.store.persist_event( - event, - backfilled=False, - is_new_state=is_new_state - ) - - room_queue = self.room_queues[room_id] - del self.room_queues[room_id] - - for p in room_queue: - yield self.on_receive_pdu(p, backfilled=False) + for p in room_queue: + yield self.on_receive_pdu(p, backfilled=False) defer.returnValue(True) diff --git a/synapse/state.py b/synapse/state.py index cc6a7db96b..993c4f18d3 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -143,7 +143,9 @@ class StateHandler(object): defer.returnValue(False) return - new_state = yield self.resolve_state_groups(event.prev_events) + new_state = yield self.resolve_state_groups( + [e for e, _ in event.prev_events] + ) event.old_state_events = copy.deepcopy(new_state) @@ -157,12 +159,11 @@ class StateHandler(object): @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): - # FIXME: HACK! - pdus = yield self.store.get_latest_pdus_in_context(room_id) + events = yield self.store.get_latest_events_in_room(room_id) event_ids = [ - encode_event_id(pdu_id, origin) - for pdu_id, origin, _ in pdus + e_id + for e_id, _ in events ] res = yield self.resolve_state_groups(event_ids) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f89e518690..d75c366834 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -71,6 +71,7 @@ SCHEMAS = [ "state", "signatures", "event_edges", + "event_signatures", ] @@ -134,7 +135,8 @@ class DataStore(RoomMemberStore, RoomStore, "type", "room_id", "content", - "unrecognized_keys" + "unrecognized_keys", + "depth", ], allow_none=allow_none, ) @@ -263,7 +265,12 @@ class DataStore(RoomMemberStore, RoomStore, vals["unrecognized_keys"] = json.dumps(unrec) try: - self._simple_insert_txn(txn, "events", vals) + self._simple_insert_txn( + txn, + "events", + vals, + or_replace=(not outlier), + ) except: logger.warn( "Failed to persist, probably duplicate: %s", @@ -307,13 +314,14 @@ class DataStore(RoomMemberStore, RoomStore, } ) - signatures = event.signatures.get(event.origin, {}) + if hasattr(event, "signatures"): + signatures = event.signatures.get(event.origin, {}) - for key_id, signature_base64 in signatures.items(): - signature_bytes = decode_base64(signature_base64) - self._store_event_origin_signature_txn( - txn, event.event_id, key_id, signature_bytes, - ) + for key_id, signature_base64 in signatures.items(): + signature_bytes = decode_base64(signature_base64) + self._store_event_origin_signature_txn( + txn, event.event_id, event.origin, key_id, signature_bytes, + ) for prev_event_id, prev_hashes in event.prev_events: for alg, hash_base64 in prev_hashes.items(): @@ -323,10 +331,10 @@ class DataStore(RoomMemberStore, RoomStore, ) # TODO - (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu) - self._store_event_reference_hash_txn( - txn, event.event_id, ref_alg, ref_hash_bytes - ) + # (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu) + # self._store_event_reference_hash_txn( + # txn, event.event_id, ref_alg, ref_hash_bytes + # ) self._update_min_depth_for_room_txn(txn, event.room_id, event.depth) @@ -412,9 +420,7 @@ class DataStore(RoomMemberStore, RoomStore, """ def _snapshot(txn): membership_state = self._get_room_member(txn, user_id, room_id) - prev_events = self._get_latest_events_in_room( - txn, room_id - ) + prev_events = self._get_latest_events_in_room(txn, room_id) if state_type is not None and state_key is not None: prev_state_pdu = self._get_current_state_pdu( @@ -469,12 +475,12 @@ class Snapshot(object): return event.prev_events = [ - (p_id, origin, hashes) - for p_id, origin, hashes, _ in self.prev_events + (event_id, hashes) + for event_id, hashes, _ in self.prev_events ] if self.prev_events: - event.depth = max([int(v) for _, _, _, v in self.prev_events]) + 1 + event.depth = max([int(v) for _, _, v in self.prev_events]) + 1 else: event.depth = 0 @@ -533,9 +539,10 @@ def prepare_database(db_conn): db_conn.commit() else: - sql_script = "BEGIN TRANSACTION;" + sql_script = "BEGIN TRANSACTION;\n" for sql_loc in SCHEMAS: sql_script += read_schema(sql_loc) + sql_script += "\n" sql_script += "COMMIT TRANSACTION;" c.executescript(sql_script) db_conn.commit() diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 30732caa83..464b12f032 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -19,10 +19,12 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.api.events.utils import prune_event from synapse.util.logutils import log_function +from syutil.base64util import encode_base64 import collections import copy import json +import sys import time @@ -67,6 +69,9 @@ class LoggingTransaction(object): return self.txn.execute( sql, *args, **kwargs ) + except: + logger.exception("[SQL FAIL] {%s}", self.name) + raise finally: end = time.clock() * 1000 sql_logger.debug("[SQL time] {%s} %f", self.name, end - start) @@ -85,14 +90,20 @@ class SQLBaseStore(object): """Wraps the .runInteraction() method on the underlying db_pool.""" def inner_func(txn, *args, **kwargs): start = time.clock() * 1000 - txn_id = str(SQLBaseStore._TXN_ID) - SQLBaseStore._TXN_ID += 1 + txn_id = SQLBaseStore._TXN_ID + + # We don't really need these to be unique, so lets stop it from + # growing really large. + self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1) - name = "%s-%s" % (desc, txn_id, ) + name = "%s-%x" % (desc, txn_id, ) transaction_logger.debug("[TXN START] {%s}", name) try: return func(LoggingTransaction(txn, name), *args, **kwargs) + except: + logger.exception("[TXN FAIL] {%s}", name) + raise finally: end = time.clock() * 1000 transaction_logger.debug( @@ -189,7 +200,6 @@ class SQLBaseStore(object): statement returns no rows """ return self._simple_selectupdate_one( - "_simple_select_one", table, keyvalues, retcols=retcols, allow_none=allow_none ) @@ -215,11 +225,11 @@ class SQLBaseStore(object): txn, table=table, keyvalues=keyvalues, - retcols=retcol, + retcol=retcol, ) if ret: - return ret[retcol] + return ret[0] else: if allow_none: return None @@ -434,6 +444,17 @@ class SQLBaseStore(object): sql = "SELECT * FROM events WHERE event_id = ?" for ev in events: + signatures = self._get_event_origin_signatures_txn( + txn, ev.event_id, + ) + + ev.signatures = { + k: encode_base64(v) for k, v in signatures.items() + } + + prev_events = self._get_latest_events_in_room(txn, ev.room_id) + ev.prev_events = [(e_id, s,) for e_id, s, _ in prev_events] + if hasattr(ev, "prev_state"): # Load previous state_content. # TODO: Should we be pulling this out above? diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 7688fc550f..5f94c31818 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -24,6 +24,13 @@ logger = logging.getLogger(__name__) class EventFederationStore(SQLBaseStore): + def get_latest_events_in_room(self, room_id): + return self.runInteraction( + "get_latest_events_in_room", + self._get_latest_events_in_room, + room_id, + ) + def _get_latest_events_in_room(self, txn, room_id): self._simple_select_onecol_txn( txn, @@ -34,12 +41,25 @@ class EventFederationStore(SQLBaseStore): retcol="event_id", ) + sql = ( + "SELECT e.event_id, e.depth FROM events as e " + "INNER JOIN event_forward_extremities as f " + "ON e.event_id = f.event_id " + "WHERE f.room_id = ?" + ) + + txn.execute(sql, (room_id, )) + results = [] - for pdu_id, origin, depth in txn.fetchall(): - hashes = self._get_prev_event_hashes_txn(txn, pdu_id, origin) - sha256_bytes = hashes["sha256"] - prev_hashes = {"sha256": encode_base64(sha256_bytes)} - results.append((pdu_id, origin, prev_hashes, depth)) + for event_id, depth in txn.fetchall(): + hashes = self._get_prev_event_hashes_txn(txn, event_id) + prev_hashes = { + k: encode_base64(v) for k, v in hashes.items() + if k == "sha256" + } + results.append((event_id, prev_hashes, depth)) + + return results def _get_min_depth_interaction(self, txn, room_id): min_depth = self._simple_select_one_onecol_txn( @@ -70,21 +90,21 @@ class EventFederationStore(SQLBaseStore): def _handle_prev_events(self, txn, outlier, event_id, prev_events, room_id): - for e_id in prev_events: + for e_id, _ in prev_events: # TODO (erikj): This could be done as a bulk insert self._simple_insert_txn( txn, table="event_edges", values={ "event_id": event_id, - "prev_event": e_id, + "prev_event_id": e_id, "room_id": room_id, } ) # Update the extremities table if this is not an outlier. if not outlier: - for e_id in prev_events: + for e_id, _ in prev_events: # TODO (erikj): This could be done as a bulk insert self._simple_delete_txn( txn, @@ -116,7 +136,7 @@ class EventFederationStore(SQLBaseStore): # Insert all the prev_pdus as a backwards thing, they'll get # deleted in a second if they're incorrect anyway. - for e_id in prev_events: + for e_id, _ in prev_events: # TODO (erikj): This could be done as a bulk insert self._simple_insert_txn( txn, @@ -130,14 +150,11 @@ class EventFederationStore(SQLBaseStore): # Also delete from the backwards extremities table all ones that # reference pdus that we have already seen query = ( - "DELETE FROM %(event_back)s as b WHERE EXISTS (" - "SELECT 1 FROM %(events)s AS events " + "DELETE FROM event_backward_extremities WHERE EXISTS (" + "SELECT 1 FROM events " "WHERE " - "b.event_id = events.event_id " + "event_backward_extremities.event_id = events.event_id " "AND not events.outlier " ")" - ) % { - "event_back": "event_backward_extremities", - "events": "events", - } + ) txn.execute(query) \ No newline at end of file diff --git a/synapse/storage/schema/event_edges.sql b/synapse/storage/schema/event_edges.sql index 6a28314ece..e5f768c705 100644 --- a/synapse/storage/schema/event_edges.sql +++ b/synapse/storage/schema/event_edges.sql @@ -7,7 +7,7 @@ CREATE TABLE IF NOT EXISTS event_forward_extremities( CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id); CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id); --- + CREATE TABLE IF NOT EXISTS event_backward_extremities( event_id TEXT, @@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS event_backward_extremities( CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id); CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id); --- + CREATE TABLE IF NOT EXISTS event_edges( event_id TEXT, @@ -28,7 +28,6 @@ CREATE TABLE IF NOT EXISTS event_edges( CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id); CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id); --- CREATE TABLE IF NOT EXISTS room_depth( @@ -38,7 +37,7 @@ CREATE TABLE IF NOT EXISTS room_depth( ); CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id); --- + create TABLE IF NOT EXISTS event_destinations( event_id TEXT, @@ -48,4 +47,3 @@ create TABLE IF NOT EXISTS event_destinations( ); CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id); --- \ No newline at end of file -- cgit 1.4.1 From 12ce441e67a40bb73ac5aca0283c2fe4afac4021 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Oct 2014 17:00:11 +0000 Subject: Convert event ids to be of the form :example.com --- synapse/api/events/factory.py | 6 +++++- synapse/federation/pdu_codec.py | 35 ++++++++++++++++------------------- synapse/handlers/federation.py | 7 +++++-- synapse/server.py | 7 ++++++- synapse/state.py | 17 ++++++++++++----- synapse/types.py | 10 ++++++++++ 6 files changed, 54 insertions(+), 28 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py index 06f3bf232b..750096c618 100644 --- a/synapse/api/events/factory.py +++ b/synapse/api/events/factory.py @@ -21,6 +21,8 @@ from synapse.api.events.room import ( RoomRedactionEvent, ) +from synapse.types import EventID + from synapse.util.stringutils import random_string @@ -59,7 +61,9 @@ class EventFactory(object): local_part = str(int(self.clock.time())) + i + random_string(5) - return "%s@%s" % (local_part, self.hs.hostname) + e_id = EventID.create_local(local_part, self.hs) + + return e_id.to_string() def create_event(self, etype=None, **kwargs): kwargs["type"] = etype diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index dccbccb85b..6d31286290 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -17,22 +17,11 @@ from .units import Pdu from synapse.crypto.event_signing import ( add_event_pdu_content_hash, sign_event_pdu ) +from synapse.types import EventID import copy -def decode_event_id(event_id, server_name): - parts = event_id.split("@") - if len(parts) < 2: - return (event_id, server_name) - else: - return (parts[0], "".join(parts[1:])) - - -def encode_event_id(pdu_id, origin): - return "%s@%s" % (pdu_id, origin) - - class PduCodec(object): def __init__(self, hs): @@ -40,20 +29,28 @@ class PduCodec(object): self.server_name = hs.hostname self.event_factory = hs.get_event_factory() self.clock = hs.get_clock() + self.hs = hs + + def encode_event_id(self, local, domain): + return EventID.create(local, domain, self.hs).to_string() + + def decode_event_id(self, event_id): + e_id = self.hs.parse_eventid(event_id) + return e_id.localpart, e_id.domain def event_from_pdu(self, pdu): kwargs = {} - kwargs["event_id"] = encode_event_id(pdu.pdu_id, pdu.origin) + kwargs["event_id"] = self.encode_event_id(pdu.pdu_id, pdu.origin) kwargs["room_id"] = pdu.context kwargs["etype"] = pdu.pdu_type kwargs["prev_events"] = [ - (encode_event_id(i, o), s) + (self.encode_event_id(i, o), s) for i, o, s in pdu.prev_pdus ] if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"): - kwargs["prev_state"] = encode_event_id( + kwargs["prev_state"] = self.encode_event_id( pdu.prev_state_id, pdu.prev_state_origin ) @@ -75,15 +72,15 @@ class PduCodec(object): def pdu_from_event(self, event): d = event.get_full_dict() - d["pdu_id"], d["origin"] = decode_event_id( - event.event_id, self.server_name + d["pdu_id"], d["origin"] = self.decode_event_id( + event.event_id ) d["context"] = event.room_id d["pdu_type"] = event.type if hasattr(event, "prev_events"): def f(e, s): - i, o = decode_event_id(e, self.server_name) + i, o = self.decode_event_id(e) return i, o, s d["prev_pdus"] = [ f(e, s) @@ -92,7 +89,7 @@ class PduCodec(object): if hasattr(event, "prev_state"): d["prev_state_id"], d["prev_state_origin"] = ( - decode_event_id(event.prev_state, self.server_name) + self.decode_event_id(event.prev_state) ) if hasattr(event, "state_key"): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index da99a4b449..1daeee833b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -20,9 +20,10 @@ from ._base import BaseHandler from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent from synapse.api.constants import Membership from synapse.util.logutils import log_function -from synapse.federation.pdu_codec import PduCodec, encode_event_id +from synapse.federation.pdu_codec import PduCodec from synapse.api.errors import SynapseError from synapse.util.async import run_on_reactor +from synapse.types import EventID from twisted.internet import defer, reactor @@ -358,7 +359,9 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def get_state_for_pdu(self, pdu_id, pdu_origin): - event_id = encode_event_id(pdu_id, pdu_origin) + yield run_on_reactor() + + event_id = EventID.create(pdu_id, pdu_origin, self.hs).to_string() state_groups = yield self.store.get_state_groups( [event_id] diff --git a/synapse/server.py b/synapse/server.py index a4d2d4aba5..d770b20b19 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -28,7 +28,7 @@ from synapse.handlers import Handlers from synapse.rest import RestServletFactory from synapse.state import StateHandler from synapse.storage import DataStore -from synapse.types import UserID, RoomAlias, RoomID +from synapse.types import UserID, RoomAlias, RoomID, EventID from synapse.util import Clock from synapse.util.distributor import Distributor from synapse.util.lockutils import LockManager @@ -143,6 +143,11 @@ class BaseHomeServer(object): object.""" return RoomID.from_string(s, hs=self) + def parse_eventid(self, s): + """Parse the string given by 's' as a Event ID and return a EventID + object.""" + return EventID.from_string(s, hs=self) + def serialize_event(self, e): return serialize_event(self, e) diff --git a/synapse/state.py b/synapse/state.py index a59688e3b4..414701b272 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -16,8 +16,10 @@ from twisted.internet import defer -from synapse.federation.pdu_codec import encode_event_id, decode_event_id from synapse.util.logutils import log_function +from synapse.util.async import run_on_reactor + +from synapse.types import EventID from collections import namedtuple @@ -43,6 +45,7 @@ class StateHandler(object): self.store = hs.get_datastore() self._replication = hs.get_replication_layer() self.server_name = hs.hostname + self.hs = hs @defer.inlineCallbacks @log_function @@ -77,15 +80,17 @@ class StateHandler(object): current_state = snapshot.prev_state_pdu if current_state: - event.prev_state = encode_event_id( - current_state.pdu_id, current_state.origin - ) + event.prev_state = EventID.create( + current_state.pdu_id, current_state.origin, self.hs + ).to_string() # TODO check current_state to see if the min power level is less # than the power level of the user # power_level = self._get_power_level_for_event(event) - pdu_id, origin = decode_event_id(event.event_id, self.server_name) + e_id = self.hs.parse_eventid(event.event_id) + pdu_id = e_id.localpart + origin = e_id.domain yield self.store.update_current_state( pdu_id=pdu_id, @@ -129,6 +134,8 @@ class StateHandler(object): @defer.inlineCallbacks @log_function def annotate_state_groups(self, event, old_state=None): + yield run_on_reactor() + if old_state: event.state_group = None event.old_state_events = old_state diff --git a/synapse/types.py b/synapse/types.py index c51bc8e4f2..649ff2f7d7 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -78,6 +78,11 @@ class DomainSpecificString( """Create a structure on the local domain""" return cls(localpart=localpart, domain=hs.hostname, is_mine=True) + @classmethod + def create(cls, localpart, domain, hs): + is_mine = domain == hs.hostname + return cls(localpart=localpart, domain=domain, is_mine=is_mine) + class UserID(DomainSpecificString): """Structure representing a user ID.""" @@ -94,6 +99,11 @@ class RoomID(DomainSpecificString): SIGIL = "!" +class EventID(DomainSpecificString): + """Structure representing an event id. """ + SIGIL = "$" + + class StreamToken( namedtuple( "Token", -- cgit 1.4.1 From f2de2d644af80557baebf43f64f3968b8ab46d0b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Oct 2014 09:59:02 +0000 Subject: Move the impl of backfill to use events. --- synapse/federation/replication.py | 6 +-- synapse/handlers/federation.py | 27 +++++++++++- synapse/storage/event_federation.py | 86 ++++++++++++++++++++++++++++++++++++- 3 files changed, 114 insertions(+), 5 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 000a3081c2..1628a56294 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -181,7 +181,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def backfill(self, dest, context, limit): + def backfill(self, dest, context, limit, extremities): """Requests some more historic PDUs for the given context from the given destination server. @@ -189,12 +189,12 @@ class ReplicationLayer(object): dest (str): The remote home server to ask. context (str): The context to backfill. limit (int): The maximum number of PDUs to return. + extremities (list): List of PDU id and origins of the first pdus + we have seen from the context Returns: Deferred: Results in the received PDUs. """ - extremities = yield self.store.get_oldest_pdus_in_context(context) - logger.debug("backfill extrem=%s", extremities) # If there are no extremeties then we've (probably) reached the start. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1daeee833b..9f457ce292 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -181,7 +181,17 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks def backfill(self, dest, room_id, limit): - pdus = yield self.replication_layer.backfill(dest, room_id, limit) + extremities = yield self.store.get_oldest_events_in_room(room_id) + + pdus = yield self.replication_layer.backfill( + dest, + room_id, + limit, + extremities=[ + self.pdu_codec.decode_event_id(e) + for e in extremities + ] + ) events = [] @@ -390,6 +400,21 @@ class FederationHandler(BaseHandler): else: defer.returnValue([]) + @defer.inlineCallbacks + @log_function + def on_backfill_request(self, context, pdu_list, limit): + + events = yield self.store.get_backfill_events( + context, + [self.pdu_codec.encode_event_id(i, o) for i, o in pdu_list], + limit + ) + + defer.returnValue([ + self.pdu_codec.pdu_from_event(e) + for e in events + ]) + @log_function def _on_user_joined(self, user, room_id): waiters = self.waiting_for_join_list.get((user.to_string(), room_id), []) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 88d09d9ba8..438b42c1da 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -24,6 +24,23 @@ logger = logging.getLogger(__name__) class EventFederationStore(SQLBaseStore): + def get_oldest_events_in_room(self, room_id): + return self.runInteraction( + "get_oldest_events_in_room", + self._get_oldest_events_in_room_txn, + room_id, + ) + + def _get_oldest_events_in_room_txn(self, txn, room_id): + return self._simple_select_onecol_txn( + txn, + table="event_backward_extremities", + keyvalues={ + "room_id": room_id, + }, + retcol="event_id", + ) + def get_latest_events_in_room(self, room_id): return self.runInteraction( "get_latest_events_in_room", @@ -159,4 +176,71 @@ class EventFederationStore(SQLBaseStore): "AND not events.outlier " ")" ) - txn.execute(query) \ No newline at end of file + txn.execute(query) + + + def get_backfill_events(self, room_id, event_list, limit): + """Get a list of Events for a given topic that occured before (and + including) the pdus in pdu_list. Return a list of max size `limit`. + + Args: + txn + room_id (str) + event_list (list) + limit (int) + + Return: + list: A list of PduTuples + """ + return self.runInteraction( + "get_backfill_events", + self._get_backfill_events, room_id, event_list, limit + ) + + def _get_backfill_events(self, txn, room_id, event_list, limit): + logger.debug( + "_get_backfill_events: %s, %s, %s", + room_id, repr(event_list), limit + ) + + # We seed the pdu_results with the things from the pdu_list. + event_results = event_list + + front = event_list + + query = ( + "SELECT prev_event_id FROM event_edges " + "WHERE room_id = ? AND event_id = ? " + "LIMIT ?" + ) + + # We iterate through all event_ids in `front` to select their previous + # events. These are dumped in `new_front`. + # We continue until we reach the limit *or* new_front is empty (i.e., + # we've run out of things to select + while front and len(event_results) < limit: + + new_front = [] + for event_id in front: + logger.debug( + "_backfill_interaction: id=%s", + event_id + ) + + txn.execute( + query, + (room_id, event_id, limit - len(event_results)) + ) + + for row in txn.fetchall(): + logger.debug( + "_backfill_interaction: got id=%s", + *row + ) + new_front.append(row) + + front = new_front + event_results += new_front + + # We also want to update the `prev_pdus` attributes before returning. + return self._get_pdu_tuples(txn, event_results) -- cgit 1.4.1 From 841df4da71b46dc1360f6a71e5ccd9e267211e38 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Oct 2014 09:59:59 +0000 Subject: Don't store any PDUs --- synapse/federation/replication.py | 47 ++++++++++++++++++++------------------- synapse/storage/transactions.py | 18 +++++++-------- 2 files changed, 33 insertions(+), 32 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 1628a56294..89dbf3e2e9 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -57,7 +57,7 @@ class ReplicationLayer(object): self.transport_layer.register_request_handler(self) self.store = hs.get_datastore() - self.pdu_actions = PduActions(self.store) + # self.pdu_actions = PduActions(self.store) self.transaction_actions = TransactionActions(self.store) self._transaction_queue = _TransactionQueue( @@ -278,16 +278,16 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function def on_context_pdus_request(self, context): - pdus = yield self.pdu_actions.get_all_pdus_from_context( - context + raise NotImplementedError( + "on_context_pdus_request is a security violation" ) - defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) @defer.inlineCallbacks @log_function def on_backfill_request(self, context, versions, limit): - - pdus = yield self.pdu_actions.backfill(context, versions, limit) + pdus = yield self.handler.on_backfill_request( + context, versions, limit + ) defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) @@ -383,20 +383,22 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function def on_pull_request(self, origin, versions): - transaction_id = max([int(v) for v in versions]) - - response = yield self.pdu_actions.after_transaction( - transaction_id, - origin, - self.server_name - ) - - if not response: - response = [] - - defer.returnValue( - (200, self._transaction_from_pdus(response).get_dict()) - ) + raise NotImplementedError("Pull transacions not implemented") + + # transaction_id = max([int(v) for v in versions]) + # + # response = yield self.pdu_actions.after_transaction( + # transaction_id, + # origin, + # self.server_name + # ) + # + # if not response: + # response = [] + # + # defer.returnValue( + # (200, self._transaction_from_pdus(response).get_dict()) + # ) @defer.inlineCallbacks def on_query_request(self, query_type, args): @@ -498,8 +500,7 @@ class ReplicationLayer(object): state = None # Get missing pdus if necessary. - is_new = yield self.pdu_actions.is_new(pdu) - if is_new and not pdu.outlier: + if not pdu.outlier: # We only backfill backwards to the min depth. min_depth = yield self.store.get_min_depth_for_context(pdu.context) @@ -539,7 +540,7 @@ class ReplicationLayer(object): else: ret = None - yield self.pdu_actions.mark_as_processed(pdu) + # yield self.pdu_actions.mark_as_processed(pdu) defer.returnValue(ret) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 908014d38b..6624348fd0 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -142,15 +142,15 @@ class TransactionStore(SQLBaseStore): # Update the tx id -> pdu id mapping - values = [ - (transaction_id, destination, pdu[0], pdu[1]) - for pdu in pdu_list - ] - - logger.debug("Inserting: %s", repr(values)) - - query = TransactionsToPduTable.insert_statement() - txn.executemany(query, values) + # values = [ + # (transaction_id, destination, pdu[0], pdu[1]) + # for pdu in pdu_list + # ] + # + # logger.debug("Inserting: %s", repr(values)) + # + # query = TransactionsToPduTable.insert_statement() + # txn.executemany(query, values) return prev_txns -- cgit 1.4.1 From 21fe249d62deafceca05cc114d5d6bec3e815b8c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Oct 2014 10:47:34 +0000 Subject: Actually don't store any PDUs --- synapse/federation/replication.py | 27 +++++++++++++-------------- synapse/handlers/federation.py | 22 ++++++++++++++++++++++ synapse/storage/event_federation.py | 7 +++++++ 3 files changed, 42 insertions(+), 14 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 89dbf3e2e9..a0bd2e0572 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -106,7 +106,6 @@ class ReplicationLayer(object): self.query_handlers[query_type] = handler - @defer.inlineCallbacks @log_function def send_pdu(self, pdu): """Informs the replication layer about a new PDU generated within the @@ -135,7 +134,7 @@ class ReplicationLayer(object): logger.debug("[%s] Persisting PDU", pdu.pdu_id) # Save *before* trying to send - yield self.store.persist_event(pdu=pdu) + # yield self.store.persist_event(pdu=pdu) logger.debug("[%s] Persisted PDU", pdu.pdu_id) logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.pdu_id) @@ -359,12 +358,13 @@ class ReplicationLayer(object): pdu_id, pdu_origin ) else: - results = yield self.store.get_current_state_for_context( - context - ) - pdus = [Pdu.from_pdu_tuple(p) for p in results] - - logger.debug("Context returning %d results", len(pdus)) + raise NotImplementedError("Specify an event") + # results = yield self.store.get_current_state_for_context( + # context + # ) + # pdus = [Pdu.from_pdu_tuple(p) for p in results] + # + # logger.debug("Context returning %d results", len(pdus)) defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) @@ -456,7 +456,6 @@ class ReplicationLayer(object): defer.returnValue(pdus) - @defer.inlineCallbacks @log_function def _get_persisted_pdu(self, pdu_id, pdu_origin): """ Get a PDU from the database with given origin and id. @@ -464,9 +463,7 @@ class ReplicationLayer(object): Returns: Deferred: Results in a `Pdu`. """ - pdu_tuple = yield self.store.get_pdu(pdu_id, pdu_origin) - - defer.returnValue(Pdu.from_pdu_tuple(pdu_tuple)) + return self.handler.get_persisted_pdu(pdu_id, pdu_origin) def _transaction_from_pdus(self, pdu_list): """Returns a new Transaction containing the given PDUs suitable for @@ -502,7 +499,9 @@ class ReplicationLayer(object): # Get missing pdus if necessary. if not pdu.outlier: # We only backfill backwards to the min depth. - min_depth = yield self.store.get_min_depth_for_context(pdu.context) + min_depth = yield self.handler.get_min_depth_for_context( + pdu.context + ) if min_depth and pdu.depth > min_depth: for pdu_id, origin, hashes in pdu.prev_pdus: @@ -529,7 +528,7 @@ class ReplicationLayer(object): ) # Persist the Pdu, but don't mark it as processed yet. - yield self.store.persist_event(pdu=pdu) + # yield self.store.persist_event(pdu=pdu) if not backfilled: ret = yield self.handler.on_receive_pdu( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9f457ce292..18cb1d4e97 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -415,6 +415,28 @@ class FederationHandler(BaseHandler): for e in events ]) + @defer.inlineCallbacks + @log_function + def get_persisted_pdu(self, pdu_id, origin): + """ Get a PDU from the database with given origin and id. + + Returns: + Deferred: Results in a `Pdu`. + """ + event = yield self.store.get_event( + self.pdu_codec.encode_event_id(pdu_id, origin), + allow_none=True, + ) + + if event: + defer.returnValue(self.pdu_codec.pdu_from_event(event)) + else: + defer.returnValue(None) + + @log_function + def get_min_depth_for_context(self, context): + return self.store.get_min_depth(context) + @log_function def _on_user_joined(self, user, room_id): waiters = self.waiting_for_join_list.get((user.to_string(), room_id), []) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 438b42c1da..8357071db6 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -78,6 +78,13 @@ class EventFederationStore(SQLBaseStore): return results + def get_min_depth(self, room_id): + return self.runInteraction( + "get_min_depth", + self._get_min_depth_interaction, + room_id, + ) + def _get_min_depth_interaction(self, txn, room_id): min_depth = self._simple_select_one_onecol_txn( txn, -- cgit 1.4.1 From bfa36a72b9a852130cc42fb9322f6596e89725a7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Oct 2014 14:00:14 +0000 Subject: Remove PDU tables. --- synapse/federation/persistence.py | 70 --- synapse/federation/replication.py | 2 +- synapse/storage/__init__.py | 60 +-- synapse/storage/pdu.py | 949 -------------------------------------- synapse/storage/schema/pdu.sql | 106 ----- synapse/storage/transactions.py | 45 -- 6 files changed, 2 insertions(+), 1230 deletions(-) delete mode 100644 synapse/storage/pdu.py delete mode 100644 synapse/storage/schema/pdu.sql (limited to 'synapse/federation') diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 7043fcc504..a565375e68 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -32,76 +32,6 @@ import logging logger = logging.getLogger(__name__) -class PduActions(object): - """ Defines persistence actions that relate to handling PDUs. - """ - - def __init__(self, datastore): - self.store = datastore - - @log_function - def mark_as_processed(self, pdu): - """ Persist the fact that we have fully processed the given `Pdu` - - Returns: - Deferred - """ - return self.store.mark_pdu_as_processed(pdu.pdu_id, pdu.origin) - - @defer.inlineCallbacks - @log_function - def after_transaction(self, transaction_id, destination, origin): - """ Returns all `Pdu`s that we sent to the given remote home server - after a given transaction id. - - Returns: - Deferred: Results in a list of `Pdu`s - """ - results = yield self.store.get_pdus_after_transaction( - transaction_id, - destination - ) - - defer.returnValue([Pdu.from_pdu_tuple(p) for p in results]) - - @defer.inlineCallbacks - @log_function - def get_all_pdus_from_context(self, context): - results = yield self.store.get_all_pdus_from_context(context) - defer.returnValue([Pdu.from_pdu_tuple(p) for p in results]) - - @defer.inlineCallbacks - @log_function - def backfill(self, context, pdu_list, limit): - """ For a given list of PDU id and origins return the proceeding - `limit` `Pdu`s in the given `context`. - - Returns: - Deferred: Results in a list of `Pdu`s. - """ - results = yield self.store.get_backfill( - context, pdu_list, limit - ) - - defer.returnValue([Pdu.from_pdu_tuple(p) for p in results]) - - @log_function - def is_new(self, pdu): - """ When we receive a `Pdu` from a remote home server, we want to - figure out whether it is `new`, i.e. it is not some historic PDU that - we haven't seen simply because we haven't backfilled back that far. - - Returns: - Deferred: Results in a `bool` - """ - return self.store.is_pdu_new( - pdu_id=pdu.pdu_id, - origin=pdu.origin, - context=pdu.context, - depth=pdu.depth - ) - - class TransactionActions(object): """ Defines persistence actions that relate to handling Transactions. """ diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index a0bd2e0572..159af4eed7 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -21,7 +21,7 @@ from twisted.internet import defer from .units import Transaction, Pdu, Edu -from .persistence import PduActions, TransactionActions +from .persistence import TransactionActions from synapse.util.logutils import log_function diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d75c366834..3faa571dd9 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -37,7 +37,6 @@ from .registration import RegistrationStore from .room import RoomStore from .roommember import RoomMemberStore from .stream import StreamStore -from .pdu import StatePduStore, PduStore, PdusTable from .transactions import TransactionStore from .keys import KeyStore from .event_federation import EventFederationStore @@ -60,7 +59,6 @@ logger = logging.getLogger(__name__) SCHEMAS = [ "transactions", - "pdu", "users", "profiles", "presence", @@ -89,7 +87,7 @@ class _RollbackButIsFineException(Exception): class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, - PresenceStore, PduStore, StatePduStore, TransactionStore, + PresenceStore, TransactionStore, DirectoryStore, KeyStore, StateStore, SignatureStore, EventFederationStore, ): @@ -150,68 +148,12 @@ class DataStore(RoomMemberStore, RoomStore, def _persist_pdu_event_txn(self, txn, pdu=None, event=None, backfilled=False, stream_ordering=None, is_new_state=True): - if pdu is not None: - self._persist_event_pdu_txn(txn, pdu) if event is not None: return self._persist_event_txn( txn, event, backfilled, stream_ordering, is_new_state=is_new_state, ) - def _persist_event_pdu_txn(self, txn, pdu): - cols = dict(pdu.__dict__) - unrec_keys = dict(pdu.unrecognized_keys) - del cols["hashes"] - del cols["signatures"] - del cols["content"] - del cols["prev_pdus"] - cols["content_json"] = json.dumps(pdu.content) - - unrec_keys.update({ - k: v for k, v in cols.items() - if k not in PdusTable.fields - }) - - cols["unrecognized_keys"] = json.dumps(unrec_keys) - - cols["ts"] = cols.pop("origin_server_ts") - - logger.debug("Persisting: %s", repr(cols)) - - for hash_alg, hash_base64 in pdu.hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_pdu_content_hash_txn( - txn, pdu.pdu_id, pdu.origin, hash_alg, hash_bytes, - ) - - signatures = pdu.signatures.get(pdu.origin, {}) - - for key_id, signature_base64 in signatures.items(): - signature_bytes = decode_base64(signature_base64) - self._store_pdu_origin_signature_txn( - txn, pdu.pdu_id, pdu.origin, key_id, signature_bytes, - ) - - for prev_pdu_id, prev_origin, prev_hashes in pdu.prev_pdus: - for alg, hash_base64 in prev_hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_prev_pdu_hash_txn( - txn, pdu.pdu_id, pdu.origin, prev_pdu_id, prev_origin, alg, - hash_bytes - ) - - (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu) - self._store_pdu_reference_hash_txn( - txn, pdu.pdu_id, pdu.origin, ref_alg, ref_hash_bytes - ) - - if pdu.is_state: - self._persist_state_txn(txn, pdu.prev_pdus, cols) - else: - self._persist_pdu_txn(txn, pdu.prev_pdus, cols) - - self._update_min_depth_for_context_txn(txn, pdu.context, pdu.depth) - @log_function def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None, is_new_state=True): diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py deleted file mode 100644 index 4a4341907b..0000000000 --- a/synapse/storage/pdu.py +++ /dev/null @@ -1,949 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from twisted.internet import defer - -from ._base import SQLBaseStore, Table, JoinHelper - -from synapse.federation.units import Pdu -from synapse.util.logutils import log_function - -from syutil.base64util import encode_base64 - -from collections import namedtuple - -import logging - - -logger = logging.getLogger(__name__) - - -class PduStore(SQLBaseStore): - """A collection of queries for handling PDUs. - """ - - def get_pdu(self, pdu_id, origin): - """Given a pdu_id and origin, get a PDU. - - Args: - txn - pdu_id (str) - origin (str) - - Returns: - PduTuple: If the pdu does not exist in the database, returns None - """ - - return self.runInteraction( - "get_pdu", self._get_pdu_tuple, pdu_id, origin - ) - - def _get_pdu_tuple(self, txn, pdu_id, origin): - res = self._get_pdu_tuples(txn, [(pdu_id, origin)]) - return res[0] if res else None - - def _get_pdu_tuples(self, txn, pdu_id_tuples): - results = [] - for pdu_id, origin in pdu_id_tuples: - txn.execute( - PduEdgesTable.select_statement("pdu_id = ? AND origin = ?"), - (pdu_id, origin) - ) - - edges = [ - (r.prev_pdu_id, r.prev_origin) - for r in PduEdgesTable.decode_results(txn.fetchall()) - ] - - edge_hashes = self._get_prev_pdu_hashes_txn(txn, pdu_id, origin) - - hashes = self._get_pdu_content_hashes_txn(txn, pdu_id, origin) - signatures = self._get_pdu_origin_signatures_txn( - txn, pdu_id, origin - ) - - query = ( - "SELECT %(fields)s FROM %(pdus)s as p " - "LEFT JOIN %(state)s as s " - "ON p.pdu_id = s.pdu_id AND p.origin = s.origin " - "WHERE p.pdu_id = ? AND p.origin = ? " - ) % { - "fields": _pdu_state_joiner.get_fields( - PdusTable="p", StatePdusTable="s"), - "pdus": PdusTable.table_name, - "state": StatePdusTable.table_name, - } - - txn.execute(query, (pdu_id, origin)) - - row = txn.fetchone() - if row: - results.append(PduTuple( - PduEntry(*row), edges, hashes, signatures, edge_hashes - )) - - return results - - def get_current_state_for_context(self, context): - """Get a list of PDUs that represent the current state for a given - context - - Args: - context (str) - - Returns: - list: A list of PduTuples - """ - - return self.runInteraction( - "get_current_state_for_context", - self._get_current_state_for_context, - context - ) - - def _get_current_state_for_context(self, txn, context): - query = ( - "SELECT pdu_id, origin FROM %s WHERE context = ?" - % CurrentStateTable.table_name - ) - - logger.debug("get_current_state %s, Args=%s", query, context) - txn.execute(query, (context,)) - - res = txn.fetchall() - - logger.debug("get_current_state %d results", len(res)) - - return self._get_pdu_tuples(txn, res) - - def _persist_pdu_txn(self, txn, prev_pdus, cols): - """Inserts a (non-state) PDU into the database. - - Args: - txn, - prev_pdus (list) - **cols: The columns to insert into the PdusTable. - """ - entry = PdusTable.EntryType( - **{k: cols.get(k, None) for k in PdusTable.fields} - ) - - txn.execute(PdusTable.insert_statement(), entry) - - self._handle_prev_pdus( - txn, entry.outlier, entry.pdu_id, entry.origin, - prev_pdus, entry.context - ) - - def mark_pdu_as_processed(self, pdu_id, pdu_origin): - """Mark a received PDU as processed. - - Args: - txn - pdu_id (str) - pdu_origin (str) - """ - - return self.runInteraction( - "mark_pdu_as_processed", - self._mark_as_processed, pdu_id, pdu_origin - ) - - def _mark_as_processed(self, txn, pdu_id, pdu_origin): - txn.execute("UPDATE %s SET have_processed = 1" % PdusTable.table_name) - - def get_all_pdus_from_context(self, context): - """Get a list of all PDUs for a given context.""" - return self.runInteraction( - "get_all_pdus_from_context", - self._get_all_pdus_from_context, context, - ) - - def _get_all_pdus_from_context(self, txn, context): - query = ( - "SELECT pdu_id, origin FROM %s " - "WHERE context = ?" - ) % PdusTable.table_name - - txn.execute(query, (context,)) - - return self._get_pdu_tuples(txn, txn.fetchall()) - - def get_backfill(self, context, pdu_list, limit): - """Get a list of Pdus for a given topic that occured before (and - including) the pdus in pdu_list. Return a list of max size `limit`. - - Args: - txn - context (str) - pdu_list (list) - limit (int) - - Return: - list: A list of PduTuples - """ - return self.runInteraction( - "get_backfill", - self._get_backfill, context, pdu_list, limit - ) - - def _get_backfill(self, txn, context, pdu_list, limit): - logger.debug( - "backfill: %s, %s, %s", - context, repr(pdu_list), limit - ) - - # We seed the pdu_results with the things from the pdu_list. - pdu_results = pdu_list - - front = pdu_list - - query = ( - "SELECT prev_pdu_id, prev_origin FROM %(edges_table)s " - "WHERE context = ? AND pdu_id = ? AND origin = ? " - "LIMIT ?" - ) % { - "edges_table": PduEdgesTable.table_name, - } - - # We iterate through all pdu_ids in `front` to select their previous - # pdus. These are dumped in `new_front`. We continue until we reach the - # limit *or* new_front is empty (i.e., we've run out of things to - # select - while front and len(pdu_results) < limit: - - new_front = [] - for pdu_id, origin in front: - logger.debug( - "_backfill_interaction: i=%s, o=%s", - pdu_id, origin - ) - - txn.execute( - query, - (context, pdu_id, origin, limit - len(pdu_results)) - ) - - for row in txn.fetchall(): - logger.debug( - "_backfill_interaction: got i=%s, o=%s", - *row - ) - new_front.append(row) - - front = new_front - pdu_results += new_front - - # We also want to update the `prev_pdus` attributes before returning. - return self._get_pdu_tuples(txn, pdu_results) - - def get_min_depth_for_context(self, context): - """Get the current minimum depth for a context - - Args: - txn - context (str) - """ - return self.runInteraction( - "get_min_depth_for_context", - self._get_min_depth_for_context, context - ) - - def _get_min_depth_for_context(self, txn, context): - return self._get_min_depth_interaction(txn, context) - - def _get_min_depth_interaction(self, txn, context): - txn.execute( - "SELECT min_depth FROM %s WHERE context = ?" - % ContextDepthTable.table_name, - (context,) - ) - - row = txn.fetchone() - - return row[0] if row else None - - def _update_min_depth_for_context_txn(self, txn, context, depth): - """Update the minimum `depth` of the given context, which is the line - on which we stop backfilling backwards. - - Args: - context (str) - depth (int) - """ - min_depth = self._get_min_depth_interaction(txn, context) - - do_insert = depth < min_depth if min_depth else True - - if do_insert: - txn.execute( - "INSERT OR REPLACE INTO %s (context, min_depth) " - "VALUES (?,?)" % ContextDepthTable.table_name, - (context, depth) - ) - - def get_latest_pdus_in_context(self, context): - return self.runInteraction( - "get_latest_pdus_in_context", - self._get_latest_pdus_in_context, - context - ) - - def _get_latest_pdus_in_context(self, txn, context): - """Get's a list of the most current pdus for a given context. This is - used when we are sending a Pdu and need to fill out the `prev_pdus` - key - - Args: - txn - context - """ - query = ( - "SELECT p.pdu_id, p.origin, p.depth FROM %(pdus)s as p " - "INNER JOIN %(forward)s as f ON p.pdu_id = f.pdu_id " - "AND f.origin = p.origin " - "WHERE f.context = ?" - ) % { - "pdus": PdusTable.table_name, - "forward": PduForwardExtremitiesTable.table_name, - } - - logger.debug("get_prev query: %s", query) - - txn.execute( - query, - (context, ) - ) - - results = [] - for pdu_id, origin, depth in txn.fetchall(): - hashes = self._get_pdu_reference_hashes_txn(txn, pdu_id, origin) - sha256_bytes = hashes["sha256"] - prev_hashes = {"sha256": encode_base64(sha256_bytes)} - results.append((pdu_id, origin, prev_hashes, depth)) - - return results - - @defer.inlineCallbacks - def get_oldest_pdus_in_context(self, context): - """Get a list of Pdus that we haven't backfilled beyond yet (and havent - seen). This list is used when we want to backfill backwards and is the - list we send to the remote server. - - Args: - txn - context (str) - - Returns: - list: A list of PduIdTuple. - """ - results = yield self._execute( - None, - "SELECT pdu_id, origin FROM %(back)s WHERE context = ?" - % {"back": PduBackwardExtremitiesTable.table_name, }, - context - ) - - defer.returnValue([PduIdTuple(i, o) for i, o in results]) - - def is_pdu_new(self, pdu_id, origin, context, depth): - """For a given Pdu, try and figure out if it's 'new', i.e., if it's - not something we got randomly from the past, for example when we - request the current state of the room that will probably return a bunch - of pdus from before we joined. - - Args: - txn - pdu_id (str) - origin (str) - context (str) - depth (int) - - Returns: - bool - """ - - return self.runInteraction( - "is_pdu_new", - self._is_pdu_new, - pdu_id=pdu_id, - origin=origin, - context=context, - depth=depth - ) - - def _is_pdu_new(self, txn, pdu_id, origin, context, depth): - # If depth > min depth in back table, then we classify it as new. - # OR if there is nothing in the back table, then it kinda needs to - # be a new thing. - query = ( - "SELECT min(p.depth) FROM %(edges)s as e " - "INNER JOIN %(back)s as b " - "ON e.prev_pdu_id = b.pdu_id AND e.prev_origin = b.origin " - "INNER JOIN %(pdus)s as p " - "ON e.pdu_id = p.pdu_id AND p.origin = e.origin " - "WHERE p.context = ?" - ) % { - "pdus": PdusTable.table_name, - "edges": PduEdgesTable.table_name, - "back": PduBackwardExtremitiesTable.table_name, - } - - txn.execute(query, (context,)) - - min_depth, = txn.fetchone() - - if not min_depth or depth > int(min_depth): - logger.debug( - "is_new true: id=%s, o=%s, d=%s min_depth=%s", - pdu_id, origin, depth, min_depth - ) - return True - - # If this pdu is in the forwards table, then it also is a new one - query = ( - "SELECT * FROM %(forward)s WHERE pdu_id = ? AND origin = ?" - ) % { - "forward": PduForwardExtremitiesTable.table_name, - } - - txn.execute(query, (pdu_id, origin)) - - # Did we get anything? - if txn.fetchall(): - logger.debug( - "is_new true: id=%s, o=%s, d=%s was forward", - pdu_id, origin, depth - ) - return True - - logger.debug( - "is_new false: id=%s, o=%s, d=%s", - pdu_id, origin, depth - ) - - # FINE THEN. It's probably old. - return False - - @staticmethod - @log_function - def _handle_prev_pdus(txn, outlier, pdu_id, origin, prev_pdus, - context): - txn.executemany( - PduEdgesTable.insert_statement(), - [(pdu_id, origin, p[0], p[1], context) for p in prev_pdus] - ) - - # Update the extremities table if this is not an outlier. - if not outlier: - - # First, we delete the new one from the forwards extremities table. - query = ( - "DELETE FROM %s WHERE pdu_id = ? AND origin = ?" - % PduForwardExtremitiesTable.table_name - ) - txn.executemany(query, list(p[:2] for p in prev_pdus)) - - # We only insert as a forward extremety the new pdu if there are no - # other pdus that reference it as a prev pdu - query = ( - "INSERT INTO %(table)s (pdu_id, origin, context) " - "SELECT ?, ?, ? WHERE NOT EXISTS (" - "SELECT 1 FROM %(pdu_edges)s WHERE " - "prev_pdu_id = ? AND prev_origin = ?" - ")" - ) % { - "table": PduForwardExtremitiesTable.table_name, - "pdu_edges": PduEdgesTable.table_name - } - - logger.debug("query: %s", query) - - txn.execute(query, (pdu_id, origin, context, pdu_id, origin)) - - # Insert all the prev_pdus as a backwards thing, they'll get - # deleted in a second if they're incorrect anyway. - txn.executemany( - PduBackwardExtremitiesTable.insert_statement(), - [(i, o, context) for i, o, _ in prev_pdus] - ) - - # Also delete from the backwards extremities table all ones that - # reference pdus that we have already seen - query = ( - "DELETE FROM %(pdu_back)s WHERE EXISTS (" - "SELECT 1 FROM %(pdus)s AS pdus " - "WHERE " - "%(pdu_back)s.pdu_id = pdus.pdu_id " - "AND %(pdu_back)s.origin = pdus.origin " - "AND not pdus.outlier " - ")" - ) % { - "pdu_back": PduBackwardExtremitiesTable.table_name, - "pdus": PdusTable.table_name, - } - txn.execute(query) - - -class StatePduStore(SQLBaseStore): - """A collection of queries for handling state PDUs. - """ - - def _persist_state_txn(self, txn, prev_pdus, cols): - """Inserts a state PDU into the database - - Args: - txn, - prev_pdus (list) - **cols: The columns to insert into the PdusTable and StatePdusTable - """ - pdu_entry = PdusTable.EntryType( - **{k: cols.get(k, None) for k in PdusTable.fields} - ) - state_entry = StatePdusTable.EntryType( - **{k: cols.get(k, None) for k in StatePdusTable.fields} - ) - - logger.debug("Inserting pdu: %s", repr(pdu_entry)) - logger.debug("Inserting state: %s", repr(state_entry)) - - txn.execute(PdusTable.insert_statement(), pdu_entry) - txn.execute(StatePdusTable.insert_statement(), state_entry) - - self._handle_prev_pdus( - txn, - pdu_entry.outlier, pdu_entry.pdu_id, pdu_entry.origin, prev_pdus, - pdu_entry.context - ) - - def get_unresolved_state_tree(self, new_state_pdu): - return self.runInteraction( - "get_unresolved_state_tree", - self._get_unresolved_state_tree, new_state_pdu - ) - - @log_function - def _get_unresolved_state_tree(self, txn, new_pdu): - current = self._get_current_interaction( - txn, - new_pdu.context, new_pdu.pdu_type, new_pdu.state_key - ) - - ReturnType = namedtuple( - "StateReturnType", ["new_branch", "current_branch"] - ) - return_value = ReturnType([new_pdu], []) - - if not current: - logger.debug("get_unresolved_state_tree No current state.") - return (return_value, None) - - return_value.current_branch.append(current) - - enum_branches = self._enumerate_state_branches( - txn, new_pdu, current - ) - - missing_branch = None - for branch, prev_state, state in enum_branches: - if state: - return_value[branch].append(state) - else: - # We don't have prev_state :( - missing_branch = branch - break - - return (return_value, missing_branch) - - def update_current_state(self, pdu_id, origin, context, pdu_type, - state_key): - return self.runInteraction( - "update_current_state", - self._update_current_state, - pdu_id, origin, context, pdu_type, state_key - ) - - def _update_current_state(self, txn, pdu_id, origin, context, pdu_type, - state_key): - query = ( - "INSERT OR REPLACE INTO %(curr)s (%(fields)s) VALUES (%(qs)s)" - ) % { - "curr": CurrentStateTable.table_name, - "fields": CurrentStateTable.get_fields_string(), - "qs": ", ".join(["?"] * len(CurrentStateTable.fields)) - } - - query_args = CurrentStateTable.EntryType( - pdu_id=pdu_id, - origin=origin, - context=context, - pdu_type=pdu_type, - state_key=state_key - ) - - txn.execute(query, query_args) - - def get_current_state_pdu(self, context, pdu_type, state_key): - """For a given context, pdu_type, state_key 3-tuple, return what is - currently considered the current state. - - Args: - txn - context (str) - pdu_type (str) - state_key (str) - - Returns: - PduEntry - """ - - return self.runInteraction( - "get_current_state_pdu", - self._get_current_state_pdu, context, pdu_type, state_key - ) - - def _get_current_state_pdu(self, txn, context, pdu_type, state_key): - return self._get_current_interaction(txn, context, pdu_type, state_key) - - def _get_current_interaction(self, txn, context, pdu_type, state_key): - logger.debug( - "_get_current_interaction %s %s %s", - context, pdu_type, state_key - ) - - fields = _pdu_state_joiner.get_fields( - PdusTable="p", StatePdusTable="s") - - current_query = ( - "SELECT %(fields)s FROM %(state)s as s " - "INNER JOIN %(pdus)s as p " - "ON s.pdu_id = p.pdu_id AND s.origin = p.origin " - "INNER JOIN %(curr)s as c " - "ON s.pdu_id = c.pdu_id AND s.origin = c.origin " - "WHERE s.context = ? AND s.pdu_type = ? AND s.state_key = ? " - ) % { - "fields": fields, - "curr": CurrentStateTable.table_name, - "state": StatePdusTable.table_name, - "pdus": PdusTable.table_name, - } - - txn.execute( - current_query, - (context, pdu_type, state_key) - ) - - row = txn.fetchone() - - result = PduEntry(*row) if row else None - - if not result: - logger.debug("_get_current_interaction not found") - else: - logger.debug( - "_get_current_interaction found %s %s", - result.pdu_id, result.origin - ) - - return result - - def handle_new_state(self, new_pdu): - """Actually perform conflict resolution on the new_pdu on the - assumption we have all the pdus required to perform it. - - Args: - new_pdu - - Returns: - bool: True if the new_pdu clobbered the current state, False if not - """ - return self.runInteraction( - "handle_new_state", - self._handle_new_state, new_pdu - ) - - def _handle_new_state(self, txn, new_pdu): - logger.debug( - "handle_new_state %s %s", - new_pdu.pdu_id, new_pdu.origin - ) - - current = self._get_current_interaction( - txn, - new_pdu.context, new_pdu.pdu_type, new_pdu.state_key - ) - - is_current = False - - if (not current or not current.prev_state_id - or not current.prev_state_origin): - # Oh, we don't have any state for this yet. - is_current = True - elif (current.pdu_id == new_pdu.prev_state_id - and current.origin == new_pdu.prev_state_origin): - # Oh! A direct clobber. Just do it. - is_current = True - else: - ## - # Ok, now loop through until we get to a common ancestor. - max_new = int(new_pdu.power_level) - max_current = int(current.power_level) - - enum_branches = self._enumerate_state_branches( - txn, new_pdu, current - ) - for branch, prev_state, state in enum_branches: - if not state: - raise RuntimeError( - "Could not find state_pdu %s %s" % - ( - prev_state.prev_state_id, - prev_state.prev_state_origin - ) - ) - - if branch == 0: - max_new = max(int(state.depth), max_new) - else: - max_current = max(int(state.depth), max_current) - - is_current = max_new > max_current - - if is_current: - logger.debug("handle_new_state make current") - - # Right, this is a new thing, so woo, just insert it. - txn.execute( - "INSERT OR REPLACE INTO %(curr)s (%(fields)s) VALUES (%(qs)s)" - % { - "curr": CurrentStateTable.table_name, - "fields": CurrentStateTable.get_fields_string(), - "qs": ", ".join(["?"] * len(CurrentStateTable.fields)) - }, - CurrentStateTable.EntryType( - *(new_pdu.__dict__[k] for k in CurrentStateTable.fields) - ) - ) - else: - logger.debug("handle_new_state not current") - - logger.debug("handle_new_state done") - - return is_current - - @log_function - def _enumerate_state_branches(self, txn, pdu_a, pdu_b): - branch_a = pdu_a - branch_b = pdu_b - - while True: - if (branch_a.pdu_id == branch_b.pdu_id - and branch_a.origin == branch_b.origin): - # Woo! We found a common ancestor - logger.debug("_enumerate_state_branches Found common ancestor") - break - - do_branch_a = ( - hasattr(branch_a, "prev_state_id") and - branch_a.prev_state_id - ) - - do_branch_b = ( - hasattr(branch_b, "prev_state_id") and - branch_b.prev_state_id - ) - - logger.debug( - "do_branch_a=%s, do_branch_b=%s", - do_branch_a, do_branch_b - ) - - if do_branch_a and do_branch_b: - do_branch_a = int(branch_a.depth) > int(branch_b.depth) - - if do_branch_a: - pdu_tuple = PduIdTuple( - branch_a.prev_state_id, - branch_a.prev_state_origin - ) - - prev_branch = branch_a - - logger.debug("getting branch_a prev %s", pdu_tuple) - branch_a = self._get_pdu_tuple(txn, *pdu_tuple) - if branch_a: - branch_a = Pdu.from_pdu_tuple(branch_a) - - logger.debug("branch_a=%s", branch_a) - - yield (0, prev_branch, branch_a) - - if not branch_a: - break - elif do_branch_b: - pdu_tuple = PduIdTuple( - branch_b.prev_state_id, - branch_b.prev_state_origin - ) - - prev_branch = branch_b - - logger.debug("getting branch_b prev %s", pdu_tuple) - branch_b = self._get_pdu_tuple(txn, *pdu_tuple) - if branch_b: - branch_b = Pdu.from_pdu_tuple(branch_b) - - logger.debug("branch_b=%s", branch_b) - - yield (1, prev_branch, branch_b) - - if not branch_b: - break - else: - break - - -class PdusTable(Table): - table_name = "pdus" - - fields = [ - "pdu_id", - "origin", - "context", - "pdu_type", - "ts", - "depth", - "is_state", - "content_json", - "unrecognized_keys", - "outlier", - "have_processed", - ] - - EntryType = namedtuple("PdusEntry", fields) - - -class PduDestinationsTable(Table): - table_name = "pdu_destinations" - - fields = [ - "pdu_id", - "origin", - "destination", - "delivered_ts", - ] - - EntryType = namedtuple("PduDestinationsEntry", fields) - - -class PduEdgesTable(Table): - table_name = "pdu_edges" - - fields = [ - "pdu_id", - "origin", - "prev_pdu_id", - "prev_origin", - "context" - ] - - EntryType = namedtuple("PduEdgesEntry", fields) - - -class PduForwardExtremitiesTable(Table): - table_name = "pdu_forward_extremities" - - fields = [ - "pdu_id", - "origin", - "context", - ] - - EntryType = namedtuple("PduForwardExtremitiesEntry", fields) - - -class PduBackwardExtremitiesTable(Table): - table_name = "pdu_backward_extremities" - - fields = [ - "pdu_id", - "origin", - "context", - ] - - EntryType = namedtuple("PduBackwardExtremitiesEntry", fields) - - -class ContextDepthTable(Table): - table_name = "context_depth" - - fields = [ - "context", - "min_depth", - ] - - EntryType = namedtuple("ContextDepthEntry", fields) - - -class StatePdusTable(Table): - table_name = "state_pdus" - - fields = [ - "pdu_id", - "origin", - "context", - "pdu_type", - "state_key", - "power_level", - "prev_state_id", - "prev_state_origin", - ] - - EntryType = namedtuple("StatePdusEntry", fields) - - -class CurrentStateTable(Table): - table_name = "current_state" - - fields = [ - "pdu_id", - "origin", - "context", - "pdu_type", - "state_key", - ] - - EntryType = namedtuple("CurrentStateEntry", fields) - -_pdu_state_joiner = JoinHelper(PdusTable, StatePdusTable) - - -# TODO: These should probably be put somewhere more sensible -PduIdTuple = namedtuple("PduIdTuple", ("pdu_id", "origin")) - -PduEntry = _pdu_state_joiner.EntryType -""" We are always interested in the join of the PdusTable and StatePdusTable, -rather than just the PdusTable. - -This does not include a prev_pdus key. -""" - -PduTuple = namedtuple( - "PduTuple", - ("pdu_entry", "prev_pdu_list", "hashes", "signatures", "edge_hashes") -) -""" This is a tuple of a `PduEntry` and a list of `PduIdTuple` that represent -the `prev_pdus` key of a PDU. -""" diff --git a/synapse/storage/schema/pdu.sql b/synapse/storage/schema/pdu.sql deleted file mode 100644 index 16e111a56c..0000000000 --- a/synapse/storage/schema/pdu.sql +++ /dev/null @@ -1,106 +0,0 @@ -/* Copyright 2014 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ --- Stores pdus and their content -CREATE TABLE IF NOT EXISTS pdus( - pdu_id TEXT, - origin TEXT, - context TEXT, - pdu_type TEXT, - ts INTEGER, - depth INTEGER DEFAULT 0 NOT NULL, - is_state BOOL, - content_json TEXT, - unrecognized_keys TEXT, - outlier BOOL NOT NULL, - have_processed BOOL, - CONSTRAINT pdu_id_origin UNIQUE (pdu_id, origin) -); - --- Stores what the current state pdu is for a given (context, pdu_type, key) tuple -CREATE TABLE IF NOT EXISTS state_pdus( - pdu_id TEXT, - origin TEXT, - context TEXT, - pdu_type TEXT, - state_key TEXT, - power_level TEXT, - prev_state_id TEXT, - prev_state_origin TEXT, - CONSTRAINT pdu_id_origin UNIQUE (pdu_id, origin) - CONSTRAINT prev_pdu_id_origin UNIQUE (prev_state_id, prev_state_origin) -); - -CREATE TABLE IF NOT EXISTS current_state( - pdu_id TEXT, - origin TEXT, - context TEXT, - pdu_type TEXT, - state_key TEXT, - CONSTRAINT pdu_id_origin UNIQUE (pdu_id, origin) - CONSTRAINT uniqueness UNIQUE (context, pdu_type, state_key) ON CONFLICT REPLACE -); - --- Stores where each pdu we want to send should be sent and the delivery status. -create TABLE IF NOT EXISTS pdu_destinations( - pdu_id TEXT, - origin TEXT, - destination TEXT, - delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered - CONSTRAINT uniqueness UNIQUE (pdu_id, origin, destination) ON CONFLICT REPLACE -); - -CREATE TABLE IF NOT EXISTS pdu_forward_extremities( - pdu_id TEXT, - origin TEXT, - context TEXT, - CONSTRAINT uniqueness UNIQUE (pdu_id, origin, context) ON CONFLICT REPLACE -); - -CREATE TABLE IF NOT EXISTS pdu_backward_extremities( - pdu_id TEXT, - origin TEXT, - context TEXT, - CONSTRAINT uniqueness UNIQUE (pdu_id, origin, context) ON CONFLICT REPLACE -); - -CREATE TABLE IF NOT EXISTS pdu_edges( - pdu_id TEXT, - origin TEXT, - prev_pdu_id TEXT, - prev_origin TEXT, - context TEXT, - CONSTRAINT uniqueness UNIQUE (pdu_id, origin, prev_pdu_id, prev_origin, context) -); - -CREATE TABLE IF NOT EXISTS context_depth( - context TEXT, - min_depth INTEGER, - CONSTRAINT uniqueness UNIQUE (context) -); - -CREATE INDEX IF NOT EXISTS context_depth_context ON context_depth(context); - - -CREATE INDEX IF NOT EXISTS pdu_id ON pdus(pdu_id, origin); - -CREATE INDEX IF NOT EXISTS dests_id ON pdu_destinations (pdu_id, origin); --- CREATE INDEX IF NOT EXISTS dests ON pdu_destinations (destination); - -CREATE INDEX IF NOT EXISTS pdu_extrem_context ON pdu_forward_extremities(context); -CREATE INDEX IF NOT EXISTS pdu_extrem_id ON pdu_forward_extremities(pdu_id, origin); - -CREATE INDEX IF NOT EXISTS pdu_edges_id ON pdu_edges(pdu_id, origin); - -CREATE INDEX IF NOT EXISTS pdu_b_extrem_context ON pdu_backward_extremities(context); diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 6624348fd0..ea67900788 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -14,7 +14,6 @@ # limitations under the License. from ._base import SQLBaseStore, Table -from .pdu import PdusTable from collections import namedtuple @@ -207,50 +206,6 @@ class TransactionStore(SQLBaseStore): return ReceivedTransactionsTable.decode_results(txn.fetchall()) - def get_pdus_after_transaction(self, transaction_id, destination): - """For a given local transaction_id that we sent to a given destination - home server, return a list of PDUs that were sent to that destination - after it. - - Args: - txn - transaction_id (str) - destination (str) - - Returns - list: A list of PduTuple - """ - return self.runInteraction( - "get_pdus_after_transaction", - self._get_pdus_after_transaction, - transaction_id, destination - ) - - def _get_pdus_after_transaction(self, txn, transaction_id, destination): - - # Query that first get's all transaction_ids with an id greater than - # the one given from the `sent_transactions` table. Then JOIN on this - # from the `tx->pdu` table to get a list of (pdu_id, origin) that - # specify the pdus that were sent in those transactions. - query = ( - "SELECT pdu_id, pdu_origin FROM %(tx_pdu)s as tp " - "INNER JOIN %(sent_tx)s as st " - "ON tp.transaction_id = st.transaction_id " - "AND tp.destination = st.destination " - "WHERE st.id > (" - "SELECT id FROM %(sent_tx)s " - "WHERE transaction_id = ? AND destination = ?" - ) % { - "tx_pdu": TransactionsToPduTable.table_name, - "sent_tx": SentTransactions.table_name, - } - - txn.execute(query, (transaction_id, destination)) - - pdus = PdusTable.decode_results(txn.fetchall()) - - return self._get_pdu_tuples(txn, pdus) - class ReceivedTransactionsTable(Table): table_name = "received_transactions" -- cgit 1.4.1 From 2f39dc19a26cca25305d10654916d7413a56a23a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Oct 2014 14:27:14 +0000 Subject: Remove more references to dead PDU tables --- synapse/federation/pdu_codec.py | 4 +-- synapse/state.py | 23 ++++-------- synapse/storage/__init__.py | 9 ----- synapse/storage/schema/signatures.sql | 66 ----------------------------------- 4 files changed, 8 insertions(+), 94 deletions(-) delete mode 100644 synapse/storage/schema/signatures.sql (limited to 'synapse/federation') diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index 6d31286290..d4c896e163 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -32,11 +32,11 @@ class PduCodec(object): self.hs = hs def encode_event_id(self, local, domain): - return EventID.create(local, domain, self.hs).to_string() + return local def decode_event_id(self, event_id): e_id = self.hs.parse_eventid(event_id) - return e_id.localpart, e_id.domain + return event_id, e_id.domain def event_from_pdu(self, pdu): kwargs = {} diff --git a/synapse/state.py b/synapse/state.py index f7249705ce..2548deed28 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -77,29 +77,18 @@ class StateHandler(object): snapshot.fill_out_prev_events(event) yield self.annotate_state_groups(event) - current_state = snapshot.prev_state_pdu + if event.old_state_events: + current_state = event.old_state_events.get( + (event.type, event.state_key) + ) - if current_state: - event.prev_state = EventID.create( - current_state.pdu_id, current_state.origin, self.hs - ).to_string() + if current_state: + event.prev_state = current_state.event_id # TODO check current_state to see if the min power level is less # than the power level of the user # power_level = self._get_power_level_for_event(event) - e_id = self.hs.parse_eventid(event.event_id) - pdu_id = e_id.localpart - origin = e_id.domain - - yield self.store.update_current_state( - pdu_id=pdu_id, - origin=origin, - context=key.context, - pdu_type=key.type, - state_key=key.state_key - ) - defer.returnValue(True) @defer.inlineCallbacks diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 3faa571dd9..c2560f6045 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -67,7 +67,6 @@ SCHEMAS = [ "keys", "redactions", "state", - "signatures", "event_edges", "event_signatures", ] @@ -364,13 +363,6 @@ class DataStore(RoomMemberStore, RoomStore, membership_state = self._get_room_member(txn, user_id, room_id) prev_events = self._get_latest_events_in_room(txn, room_id) - if state_type is not None and state_key is not None: - prev_state_pdu = self._get_current_state_pdu( - txn, room_id, state_type, state_key - ) - else: - prev_state_pdu = None - return Snapshot( store=self, room_id=room_id, @@ -379,7 +371,6 @@ class DataStore(RoomMemberStore, RoomStore, membership_state=membership_state, state_type=state_type, state_key=state_key, - prev_state_pdu=prev_state_pdu, ) return self.runInteraction("snapshot_room", _snapshot) diff --git a/synapse/storage/schema/signatures.sql b/synapse/storage/schema/signatures.sql deleted file mode 100644 index 1c45a51bec..0000000000 --- a/synapse/storage/schema/signatures.sql +++ /dev/null @@ -1,66 +0,0 @@ -/* Copyright 2014 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -CREATE TABLE IF NOT EXISTS pdu_content_hashes ( - pdu_id TEXT, - origin TEXT, - algorithm TEXT, - hash BLOB, - CONSTRAINT uniqueness UNIQUE (pdu_id, origin, algorithm) -); - -CREATE INDEX IF NOT EXISTS pdu_content_hashes_id ON pdu_content_hashes ( - pdu_id, origin -); - -CREATE TABLE IF NOT EXISTS pdu_reference_hashes ( - pdu_id TEXT, - origin TEXT, - algorithm TEXT, - hash BLOB, - CONSTRAINT uniqueness UNIQUE (pdu_id, origin, algorithm) -); - -CREATE INDEX IF NOT EXISTS pdu_reference_hashes_id ON pdu_reference_hashes ( - pdu_id, origin -); - -CREATE TABLE IF NOT EXISTS pdu_origin_signatures ( - pdu_id TEXT, - origin TEXT, - key_id TEXT, - signature BLOB, - CONSTRAINT uniqueness UNIQUE (pdu_id, origin, key_id) -); - -CREATE INDEX IF NOT EXISTS pdu_origin_signatures_id ON pdu_origin_signatures ( - pdu_id, origin -); - -CREATE TABLE IF NOT EXISTS pdu_edge_hashes( - pdu_id TEXT, - origin TEXT, - prev_pdu_id TEXT, - prev_origin TEXT, - algorithm TEXT, - hash BLOB, - CONSTRAINT uniqueness UNIQUE ( - pdu_id, origin, prev_pdu_id, prev_origin, algorithm - ) -); - -CREATE INDEX IF NOT EXISTS pdu_edge_hashes_id ON pdu_edge_hashes( - pdu_id, origin -); -- cgit 1.4.1 From d59aa6af25472a15e6258f2fd9cbd1ac4c05702d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Nov 2014 11:35:19 +0000 Subject: For now, don't store txn -> pdu mappings. --- synapse/federation/persistence.py | 1 - synapse/storage/transactions.py | 7 +++---- 2 files changed, 3 insertions(+), 5 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index a565375e68..b04fbb4177 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -88,7 +88,6 @@ class TransactionActions(object): transaction.transaction_id, transaction.destination, transaction.origin_server_ts, - [(p["pdu_id"], p["origin"]) for p in transaction.pdus] ) @log_function diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index ea67900788..00d0f48082 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -89,7 +89,7 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (code, response_json, transaction_id, origin)) def prep_send_transaction(self, transaction_id, destination, - origin_server_ts, pdu_list): + origin_server_ts): """Persists an outgoing transaction and calculates the values for the previous transaction id list. @@ -100,7 +100,6 @@ class TransactionStore(SQLBaseStore): transaction_id (str) destination (str) origin_server_ts (int) - pdu_list (list) Returns: list: A list of previous transaction ids. @@ -109,11 +108,11 @@ class TransactionStore(SQLBaseStore): return self.runInteraction( "prep_send_transaction", self._prep_send_transaction, - transaction_id, destination, origin_server_ts, pdu_list + transaction_id, destination, origin_server_ts ) def _prep_send_transaction(self, txn, transaction_id, destination, - origin_server_ts, pdu_list): + origin_server_ts): # First we find out what the prev_txs should be. # Since we know that we are only sending one transaction at a time, -- cgit 1.4.1 From ad6eacb3e9424902da9f83c8f106a4f0169c3108 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Nov 2014 13:06:58 +0000 Subject: Rename PDU fields to match that of events. --- synapse/api/events/utils.py | 2 +- synapse/federation/pdu_codec.py | 48 +--------- synapse/federation/replication.py | 72 ++++++--------- synapse/federation/transport.py | 184 +++++++------------------------------- synapse/federation/units.py | 78 +++------------- synapse/handlers/federation.py | 12 ++- 6 files changed, 80 insertions(+), 316 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/events/utils.py b/synapse/api/events/utils.py index 7fdf45a264..31601fd3a9 100644 --- a/synapse/api/events/utils.py +++ b/synapse/api/events/utils.py @@ -32,7 +32,7 @@ def prune_event(event): def prune_pdu(pdu): """Removes keys that contain unrestricted and non-essential data from a PDU """ - return _prune_event_or_pdu(pdu.pdu_type, pdu) + return _prune_event_or_pdu(pdu.type, pdu) def _prune_event_or_pdu(event_type, event): # Remove all extraneous fields. diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index d4c896e163..5ec97a698e 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -31,39 +31,16 @@ class PduCodec(object): self.clock = hs.get_clock() self.hs = hs - def encode_event_id(self, local, domain): - return local - - def decode_event_id(self, event_id): - e_id = self.hs.parse_eventid(event_id) - return event_id, e_id.domain - def event_from_pdu(self, pdu): kwargs = {} - kwargs["event_id"] = self.encode_event_id(pdu.pdu_id, pdu.origin) - kwargs["room_id"] = pdu.context - kwargs["etype"] = pdu.pdu_type - kwargs["prev_events"] = [ - (self.encode_event_id(i, o), s) - for i, o, s in pdu.prev_pdus - ] - - if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"): - kwargs["prev_state"] = self.encode_event_id( - pdu.prev_state_id, pdu.prev_state_origin - ) + kwargs["etype"] = pdu.type kwargs.update({ k: v for k, v in pdu.get_full_dict().items() if k not in [ - "pdu_id", - "context", - "pdu_type", - "prev_pdus", - "prev_state_id", - "prev_state_origin", + "type", ] }) @@ -72,33 +49,12 @@ class PduCodec(object): def pdu_from_event(self, event): d = event.get_full_dict() - d["pdu_id"], d["origin"] = self.decode_event_id( - event.event_id - ) - d["context"] = event.room_id - d["pdu_type"] = event.type - - if hasattr(event, "prev_events"): - def f(e, s): - i, o = self.decode_event_id(e) - return i, o, s - d["prev_pdus"] = [ - f(e, s) - for e, s in event.prev_events - ] - - if hasattr(event, "prev_state"): - d["prev_state_id"], d["prev_state_origin"] = ( - self.decode_event_id(event.prev_state) - ) - if hasattr(event, "state_key"): d["is_state"] = True kwargs = copy.deepcopy(event.unrecognized_keys) kwargs.update({ k: v for k, v in d.items() - if k not in ["event_id", "room_id", "type", "prev_events"] }) if "origin_server_ts" not in kwargs: diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 159af4eed7..838e660a46 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -111,14 +111,6 @@ class ReplicationLayer(object): """Informs the replication layer about a new PDU generated within the home server that should be transmitted to others. - This will fill out various attributes on the PDU object, e.g. the - `prev_pdus` key. - - *Note:* The home server should always call `send_pdu` even if it knows - that it does not need to be replicated to other home servers. This is - in case e.g. someone else joins via a remote home server and then - backfills. - TODO: Figure out when we should actually resolve the deferred. Args: @@ -131,18 +123,12 @@ class ReplicationLayer(object): order = self._order self._order += 1 - logger.debug("[%s] Persisting PDU", pdu.pdu_id) - - # Save *before* trying to send - # yield self.store.persist_event(pdu=pdu) - - logger.debug("[%s] Persisted PDU", pdu.pdu_id) - logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.pdu_id) + logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id) # TODO, add errback, etc. self._transaction_queue.enqueue_pdu(pdu, order) - logger.debug("[%s] transaction_layer.enqueue_pdu... done", pdu.pdu_id) + logger.debug("[%s] transaction_layer.enqueue_pdu... done", pdu.event_id) @log_function def send_edu(self, destination, edu_type, content): @@ -215,7 +201,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def get_pdu(self, destination, pdu_origin, pdu_id, outlier=False): + def get_pdu(self, destination, event_id, outlier=False): """Requests the PDU with given origin and ID from the remote home server. @@ -224,7 +210,7 @@ class ReplicationLayer(object): Args: destination (str): Which home server to query pdu_origin (str): The home server that originally sent the pdu. - pdu_id (str) + event_id (str) outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if it's from an arbitary point in the context as opposed to part of the current block of PDUs. Defaults to `False` @@ -233,8 +219,9 @@ class ReplicationLayer(object): Deferred: Results in the requested PDU. """ - transaction_data = yield self.transport_layer.get_pdu( - destination, pdu_origin, pdu_id) + transaction_data = yield self.transport_layer.get_event( + destination, event_id + ) transaction = Transaction(**transaction_data) @@ -249,8 +236,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def get_state_for_context(self, destination, context, pdu_id=None, - pdu_origin=None): + def get_state_for_context(self, destination, context, event_id=None): """Requests all of the `current` state PDUs for a given context from a remote home server. @@ -263,7 +249,9 @@ class ReplicationLayer(object): """ transaction_data = yield self.transport_layer.get_context_state( - destination, context, pdu_id=pdu_id, pdu_origin=pdu_origin, + destination, + context, + event_id=event_id, ) transaction = Transaction(**transaction_data) @@ -352,10 +340,10 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_context_state_request(self, context, pdu_id, pdu_origin): - if pdu_id and pdu_origin: + def on_context_state_request(self, context, event_id): + if event_id: pdus = yield self.handler.get_state_for_pdu( - pdu_id, pdu_origin + event_id ) else: raise NotImplementedError("Specify an event") @@ -370,8 +358,8 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_pdu_request(self, pdu_origin, pdu_id): - pdu = yield self._get_persisted_pdu(pdu_id, pdu_origin) + def on_pdu_request(self, event_id): + pdu = yield self._get_persisted_pdu(event_id) if pdu: defer.returnValue( @@ -443,9 +431,8 @@ class ReplicationLayer(object): def send_join(self, destination, pdu): _, content = yield self.transport_layer.send_join( destination, - pdu.context, - pdu.pdu_id, - pdu.origin, + pdu.room_id, + pdu.event_id, pdu.get_dict(), ) @@ -457,13 +444,13 @@ class ReplicationLayer(object): defer.returnValue(pdus) @log_function - def _get_persisted_pdu(self, pdu_id, pdu_origin): + def _get_persisted_pdu(self, event_id): """ Get a PDU from the database with given origin and id. Returns: Deferred: Results in a `Pdu`. """ - return self.handler.get_persisted_pdu(pdu_id, pdu_origin) + return self.handler.get_persisted_pdu(event_id) def _transaction_from_pdus(self, pdu_list): """Returns a new Transaction containing the given PDUs suitable for @@ -487,10 +474,10 @@ class ReplicationLayer(object): @log_function def _handle_new_pdu(self, origin, pdu, backfilled=False): # We reprocess pdus when we have seen them only as outliers - existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin) + existing = yield self._get_persisted_pdu(pdu.event_id) if existing and (not existing.outlier or pdu.outlier): - logger.debug("Already seen pdu %s %s", pdu.pdu_id, pdu.origin) + logger.debug("Already seen pdu %s", pdu.event_id) defer.returnValue({}) return @@ -500,23 +487,22 @@ class ReplicationLayer(object): if not pdu.outlier: # We only backfill backwards to the min depth. min_depth = yield self.handler.get_min_depth_for_context( - pdu.context + pdu.room_id ) if min_depth and pdu.depth > min_depth: - for pdu_id, origin, hashes in pdu.prev_pdus: - exists = yield self._get_persisted_pdu(pdu_id, origin) + for event_id, hashes in pdu.prev_events: + exists = yield self._get_persisted_pdu(event_id) if not exists: - logger.debug("Requesting pdu %s %s", pdu_id, origin) + logger.debug("Requesting pdu %s", event_id) try: yield self.get_pdu( pdu.origin, - pdu_id=pdu_id, - pdu_origin=origin + event_id=event_id, ) - logger.debug("Processed pdu %s %s", pdu_id, origin) + logger.debug("Processed pdu %s", event_id) except: # TODO(erikj): Do some more intelligent retries. logger.exception("Failed to get PDU") @@ -524,7 +510,7 @@ class ReplicationLayer(object): # We need to get the state at this event, since we have reached # a backward extremity edge. state = yield self.get_state_for_context( - origin, pdu.context, pdu.pdu_id, pdu.origin, + origin, pdu.room_id, pdu.event_id, ) # Persist the Pdu, but don't mark it as processed yet. diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index 7f01b4faaf..04ad7e63ae 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -72,8 +72,7 @@ class TransportLayer(object): self.received_handler = None @log_function - def get_context_state(self, destination, context, pdu_id=None, - pdu_origin=None): + def get_context_state(self, destination, context, event_id=None): """ Requests all state for a given context (i.e. room) from the given server. @@ -91,60 +90,59 @@ class TransportLayer(object): subpath = "/state/%s/" % context args = {} - if pdu_id and pdu_origin: - args["pdu_id"] = pdu_id - args["pdu_origin"] = pdu_origin + if event_id: + args["event_id"] = event_id return self._do_request_for_transaction( destination, subpath, args=args ) @log_function - def get_pdu(self, destination, pdu_origin, pdu_id): + def get_event(self, destination, event_id): """ Requests the pdu with give id and origin from the given server. Args: destination (str): The host name of the remote home server we want to get the state from. - pdu_origin (str): The home server which created the PDU. - pdu_id (str): The id of the PDU being requested. + event_id (str): The id of the event being requested. Returns: Deferred: Results in a dict received from the remote homeserver. """ - logger.debug("get_pdu dest=%s, pdu_origin=%s, pdu_id=%s", - destination, pdu_origin, pdu_id) + logger.debug("get_pdu dest=%s, event_id=%s", + destination, event_id) - subpath = "/pdu/%s/%s/" % (pdu_origin, pdu_id) + subpath = "/event/%s/" % (event_id, ) return self._do_request_for_transaction(destination, subpath) @log_function - def backfill(self, dest, context, pdu_tuples, limit): + def backfill(self, dest, context, event_tuples, limit): """ Requests `limit` previous PDUs in a given context before list of PDUs. Args: dest (str) context (str) - pdu_tuples (list) + event_tuples (list) limt (int) Returns: Deferred: Results in a dict received from the remote homeserver. """ logger.debug( - "backfill dest=%s, context=%s, pdu_tuples=%s, limit=%s", - dest, context, repr(pdu_tuples), str(limit) + "backfill dest=%s, context=%s, event_tuples=%s, limit=%s", + dest, context, repr(event_tuples), str(limit) ) - if not pdu_tuples: + if not event_tuples: + # TODO: raise? return - subpath = "/backfill/%s/" % context + subpath = "/backfill/%s/" % (context,) args = { - "v": ["%s,%s" % (i, o) for i, o in pdu_tuples], + "v": event_tuples, "limit": limit, } @@ -222,11 +220,10 @@ class TransportLayer(object): @defer.inlineCallbacks @log_function - def send_join(self, destination, context, pdu_id, origin, content): - path = PREFIX + "/send_join/%s/%s/%s" % ( + def send_join(self, destination, context, event_id, content): + path = PREFIX + "/send_join/%s/%s" % ( context, - origin, - pdu_id, + event_id, ) code, content = yield self.client.put_json( @@ -242,11 +239,10 @@ class TransportLayer(object): @defer.inlineCallbacks @log_function - def send_invite(self, destination, context, pdu_id, origin, content): - path = PREFIX + "/invite/%s/%s/%s" % ( + def send_invite(self, destination, context, event_id, content): + path = PREFIX + "/invite/%s/%s" % ( context, - origin, - pdu_id, + event_id, ) code, content = yield self.client.put_json( @@ -376,10 +372,10 @@ class TransportLayer(object): # data_id pair. self.server.register_path( "GET", - re.compile("^" + PREFIX + "/pdu/([^/]*)/([^/]*)/$"), + re.compile("^" + PREFIX + "/event/([^/]*)/$"), self._with_authentication( - lambda origin, content, query, pdu_origin, pdu_id: - handler.on_pdu_request(pdu_origin, pdu_id) + lambda origin, content, query, event_id: + handler.on_pdu_request(event_id) ) ) @@ -391,8 +387,7 @@ class TransportLayer(object): lambda origin, content, query, context: handler.on_context_state_request( context, - query.get("pdu_id", [None])[0], - query.get("pdu_origin", [None])[0] + query.get("event_id", [None])[0], ) ) ) @@ -442,9 +437,9 @@ class TransportLayer(object): self.server.register_path( "PUT", - re.compile("^" + PREFIX + "/send_join/([^/]*)/([^/]*)/([^/]*)$"), + re.compile("^" + PREFIX + "/send_join/([^/]*)/([^/]*)$"), self._with_authentication( - lambda origin, content, query, context, pdu_origin, pdu_id: + lambda origin, content, query, context, event_id: self._on_send_join_request( origin, content, query, ) @@ -453,9 +448,9 @@ class TransportLayer(object): self.server.register_path( "PUT", - re.compile("^" + PREFIX + "/invite/([^/]*)/([^/]*)/([^/]*)$"), + re.compile("^" + PREFIX + "/invite/([^/]*)/([^/]*)$"), self._with_authentication( - lambda origin, content, query, context, pdu_origin, pdu_id: + lambda origin, content, query, context, event_id: self._on_invite_request( origin, content, query, ) @@ -548,7 +543,7 @@ class TransportLayer(object): limit = int(limits[-1]) - versions = [v.split(",", 1) for v in v_list] + versions = v_list return self.request_handler.on_backfill_request( context, versions, limit @@ -579,120 +574,3 @@ class TransportLayer(object): ) defer.returnValue((200, content)) - - -class TransportReceivedHandler(object): - """ Callbacks used when we receive a transaction - """ - def on_incoming_transaction(self, transaction): - """ Called on PUT /send/, or on response to a request - that we sent (e.g. a backfill request) - - Args: - transaction (synapse.transaction.Transaction): The transaction that - was sent to us. - - Returns: - twisted.internet.defer.Deferred: A deferred that gets fired when - the transaction has finished being processed. - - The result should be a tuple in the form of - `(response_code, respond_body)`, where `response_body` is a python - dict that will get serialized to JSON. - - On errors, the dict should have an `error` key with a brief message - of what went wrong. - """ - pass - - -class TransportRequestHandler(object): - """ Handlers used when someone want's data from us - """ - def on_pull_request(self, versions): - """ Called on GET /pull/?v=... - - This is hit when a remote home server wants to get all data - after a given transaction. Mainly used when a home server comes back - online and wants to get everything it has missed. - - Args: - versions (list): A list of transaction_ids that should be used to - determine what PDUs the remote side have not yet seen. - - Returns: - Deferred: Resultsin a tuple in the form of - `(response_code, respond_body)`, where `response_body` is a python - dict that will get serialized to JSON. - - On errors, the dict should have an `error` key with a brief message - of what went wrong. - """ - pass - - def on_pdu_request(self, pdu_origin, pdu_id): - """ Called on GET /pdu/// - - Someone wants a particular PDU. This PDU may or may not have originated - from us. - - Args: - pdu_origin (str) - pdu_id (str) - - Returns: - Deferred: Resultsin a tuple in the form of - `(response_code, respond_body)`, where `response_body` is a python - dict that will get serialized to JSON. - - On errors, the dict should have an `error` key with a brief message - of what went wrong. - """ - pass - - def on_context_state_request(self, context): - """ Called on GET /state// - - Gets hit when someone wants all the *current* state for a given - contexts. - - Args: - context (str): The name of the context that we're interested in. - - Returns: - twisted.internet.defer.Deferred: A deferred that gets fired when - the transaction has finished being processed. - - The result should be a tuple in the form of - `(response_code, respond_body)`, where `response_body` is a python - dict that will get serialized to JSON. - - On errors, the dict should have an `error` key with a brief message - of what went wrong. - """ - pass - - def on_backfill_request(self, context, versions, limit): - """ Called on GET /backfill//?v=...&limit=... - - Gets hit when we want to backfill backwards on a given context from - the given point. - - Args: - context (str): The context to backfill - versions (list): A list of 2-tuples representing where to backfill - from, in the form `(pdu_id, origin)` - limit (int): How many pdus to return. - - Returns: - Deferred: Results in a tuple in the form of - `(response_code, respond_body)`, where `response_body` is a python - dict that will get serialized to JSON. - - On errors, the dict should have an `error` key with a brief message - of what went wrong. - """ - pass - - def on_query_request(self): - """ Called on a GET /query/ request. """ diff --git a/synapse/federation/units.py b/synapse/federation/units.py index adc3385644..c94dcf64cf 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -34,13 +34,13 @@ class Pdu(JsonEncodedObject): A Pdu can be classified as "state". For a given context, we can efficiently retrieve all state pdu's that haven't been clobbered. Clobbering is done - via a unique constraint on the tuple (context, pdu_type, state_key). A pdu + via a unique constraint on the tuple (context, type, state_key). A pdu is a state pdu if `is_state` is True. Example pdu:: { - "pdu_id": "78c", + "event_id": "$78c:example.com", "origin_server_ts": 1404835423000, "origin": "bar", "prev_ids": [ @@ -53,14 +53,14 @@ class Pdu(JsonEncodedObject): """ valid_keys = [ - "pdu_id", - "context", + "event_id", + "room_id", "origin", "origin_server_ts", - "pdu_type", + "type", "destinations", "transaction_id", - "prev_pdus", + "prev_events", "depth", "content", "outlier", @@ -68,8 +68,7 @@ class Pdu(JsonEncodedObject): "signatures", "is_state", # Below this are keys valid only for State Pdus. "state_key", - "prev_state_id", - "prev_state_origin", + "prev_state", "required_power_level", "user_id", ] @@ -81,18 +80,18 @@ class Pdu(JsonEncodedObject): ] required_keys = [ - "pdu_id", - "context", + "event_id", + "room_id", "origin", "origin_server_ts", - "pdu_type", + "type", "content", ] # TODO: We need to make this properly load content rather than # just leaving it as a dict. (OR DO WE?!) - def __init__(self, destinations=[], is_state=False, prev_pdus=[], + def __init__(self, destinations=[], is_state=False, prev_events=[], outlier=False, hashes={}, signatures={}, **kwargs): if is_state: for required_key in ["state_key"]: @@ -102,66 +101,13 @@ class Pdu(JsonEncodedObject): super(Pdu, self).__init__( destinations=destinations, is_state=bool(is_state), - prev_pdus=prev_pdus, + prev_events=prev_events, outlier=outlier, hashes=hashes, signatures=signatures, **kwargs ) - @classmethod - def from_pdu_tuple(cls, pdu_tuple): - """ Converts a PduTuple to a Pdu - - Args: - pdu_tuple (synapse.persistence.transactions.PduTuple): The tuple to - convert - - Returns: - Pdu - """ - if pdu_tuple: - d = copy.copy(pdu_tuple.pdu_entry._asdict()) - d["origin_server_ts"] = d.pop("ts") - - for k in d.keys(): - if d[k] is None: - del d[k] - - d["content"] = json.loads(d["content_json"]) - del d["content_json"] - - args = {f: d[f] for f in cls.valid_keys if f in d} - if "unrecognized_keys" in d and d["unrecognized_keys"]: - args.update(json.loads(d["unrecognized_keys"])) - - hashes = { - alg: encode_base64(hsh) - for alg, hsh in pdu_tuple.hashes.items() - } - - signatures = { - kid: encode_base64(sig) - for kid, sig in pdu_tuple.signatures.items() - } - - prev_pdus = [] - for prev_pdu in pdu_tuple.prev_pdu_list: - prev_hashes = pdu_tuple.edge_hashes.get(prev_pdu, {}) - prev_hashes = { - alg: encode_base64(hsh) for alg, hsh in prev_hashes.items() - } - prev_pdus.append((prev_pdu[0], prev_pdu[1], prev_hashes)) - - return Pdu( - prev_pdus=prev_pdus, - hashes=hashes, - signatures=signatures, - **args - ) - else: - return None - def __str__(self): return "(%s, %s)" % (self.__class__.__name__, repr(self.__dict__)) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 18cb1d4e97..bdd28f04bb 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -139,7 +139,7 @@ class FederationHandler(BaseHandler): # Huh, let's try and get the current state try: yield self.replication_layer.get_state_for_context( - event.origin, event.room_id, pdu.pdu_id, pdu.origin, + event.origin, event.room_id, event.event_id, ) hosts = yield self.store.get_joined_hosts_for_room( @@ -368,11 +368,9 @@ class FederationHandler(BaseHandler): ]) @defer.inlineCallbacks - def get_state_for_pdu(self, pdu_id, pdu_origin): + def get_state_for_pdu(self, event_id): yield run_on_reactor() - event_id = EventID.create(pdu_id, pdu_origin, self.hs).to_string() - state_groups = yield self.store.get_state_groups( [event_id] ) @@ -406,7 +404,7 @@ class FederationHandler(BaseHandler): events = yield self.store.get_backfill_events( context, - [self.pdu_codec.encode_event_id(i, o) for i, o in pdu_list], + pdu_list, limit ) @@ -417,14 +415,14 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def get_persisted_pdu(self, pdu_id, origin): + def get_persisted_pdu(self, event_id): """ Get a PDU from the database with given origin and id. Returns: Deferred: Results in a `Pdu`. """ event = yield self.store.get_event( - self.pdu_codec.encode_event_id(pdu_id, origin), + event_id, allow_none=True, ) -- cgit 1.4.1 From 68698e0ac8c39083f6ab7d377a48b5bead3d3598 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Nov 2014 17:51:42 +0000 Subject: Fix bugs in generating event signatures and hashing --- scripts/check_event_hash.py | 12 +++-- scripts/check_signature.py | 1 - synapse/api/events/__init__.py | 1 + synapse/crypto/event_signing.py | 100 +++++++++++++++------------------------- synapse/federation/pdu_codec.py | 13 +----- synapse/federation/units.py | 11 +---- 6 files changed, 50 insertions(+), 88 deletions(-) (limited to 'synapse/federation') diff --git a/scripts/check_event_hash.py b/scripts/check_event_hash.py index 9fa4452ee6..7c32f8102a 100644 --- a/scripts/check_event_hash.py +++ b/scripts/check_event_hash.py @@ -6,6 +6,7 @@ import hashlib import sys import json + class dictobj(dict): def __init__(self, *args, **kargs): dict.__init__(self, *args, **kargs) @@ -14,9 +15,12 @@ class dictobj(dict): def get_dict(self): return dict(self) + def get_full_dict(self): + return dict(self) + def main(): - parser = parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser() parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'), default=sys.stdin) args = parser.parse_args() @@ -29,14 +33,14 @@ def main(): } for alg_name in event_json.hashes: - if check_event_pdu_content_hash(event_json, algorithms[alg_name]): + if check_event_content_hash(event_json, algorithms[alg_name]): print "PASS content hash %s" % (alg_name,) else: print "FAIL content hash %s" % (alg_name,) for algorithm in algorithms.values(): - name, h_bytes = compute_pdu_event_reference_hash(event_json, algorithm) - print "Reference hash %s: %s" % (name, encode_base64(bytes)) + name, h_bytes = compute_event_reference_hash(event_json, algorithm) + print "Reference hash %s: %s" % (name, encode_base64(h_bytes)) if __name__=="__main__": main() diff --git a/scripts/check_signature.py b/scripts/check_signature.py index e7964e7e71..e146e18e24 100644 --- a/scripts/check_signature.py +++ b/scripts/check_signature.py @@ -1,5 +1,4 @@ -from synapse.crypto.event_signing import verify_signed_event_pdu from syutil.crypto.jsonsign import verify_signed_json from syutil.crypto.signing_key import ( decode_verify_key_bytes, write_signing_keys diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index b855811b98..168b812311 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -61,6 +61,7 @@ class SynapseEvent(JsonEncodedObject): "prev_content", "prev_state", "redacted_because", + "origin_server_ts", ] internal_keys = [ diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index 0e8bc7eb6c..de5d2e7465 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -15,11 +15,11 @@ # limitations under the License. -from synapse.federation.units import Pdu -from synapse.api.events.utils import prune_pdu, prune_event +from synapse.api.events.utils import prune_event from syutil.jsonutil import encode_canonical_json from syutil.base64util import encode_base64, decode_base64 -from syutil.crypto.jsonsign import sign_json, verify_signed_json +from syutil.crypto.jsonsign import sign_json +from synapse.api.events.room import GenericEvent import copy import hashlib @@ -28,20 +28,14 @@ import logging logger = logging.getLogger(__name__) -def add_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): - hashed = _compute_content_hash(pdu, hash_algorithm) - pdu.hashes[hashed.name] = encode_base64(hashed.digest()) - return pdu - - -def check_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): +def check_event_content_hash(event, hash_algorithm=hashlib.sha256): """Check whether the hash for this PDU matches the contents""" - computed_hash = _compute_content_hash(pdu, hash_algorithm) - if computed_hash.name not in pdu.hashes: + computed_hash = _compute_content_hash(event, hash_algorithm) + if computed_hash.name not in event.hashes: raise Exception("Algorithm %s not in hashes %s" % ( - computed_hash.name, list(pdu.hashes) + computed_hash.name, list(event.hashes) )) - message_hash_base64 = pdu.hashes[computed_hash.name] + message_hash_base64 = event.hashes[computed_hash.name] try: message_hash_bytes = decode_base64(message_hash_base64) except: @@ -49,70 +43,52 @@ def check_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): return message_hash_bytes == computed_hash.digest() -def _compute_content_hash(pdu, hash_algorithm): - pdu_json = pdu.get_dict() - #TODO: Make "age_ts" key internal - pdu_json.pop("age_ts", None) - pdu_json.pop("unsigned", None) - pdu_json.pop("signatures", None) - pdu_json.pop("hashes", None) - pdu_json_bytes = encode_canonical_json(pdu_json) - return hash_algorithm(pdu_json_bytes) - - -def compute_pdu_event_reference_hash(pdu, hash_algorithm=hashlib.sha256): - tmp_pdu = Pdu(**pdu.get_dict()) - tmp_pdu = prune_pdu(tmp_pdu) - pdu_json = tmp_pdu.get_dict() - pdu_json.pop("signatures", None) - pdu_json_bytes = encode_canonical_json(pdu_json) - hashed = hash_algorithm(pdu_json_bytes) - return (hashed.name, hashed.digest()) +def _compute_content_hash(event, hash_algorithm): + event_json = event.get_full_dict() + #TODO: We need to sign the JSON that is going out via fedaration. + event_json.pop("age_ts", None) + event_json.pop("unsigned", None) + event_json.pop("signatures", None) + event_json.pop("hashes", None) + event_json_bytes = encode_canonical_json(event_json) + return hash_algorithm(event_json_bytes) def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256): - tmp_event = copy.deepcopy(event) + # FIXME(erikj): GenericEvent! + tmp_event = GenericEvent(**event.get_full_dict()) tmp_event = prune_event(tmp_event) event_json = tmp_event.get_dict() event_json.pop("signatures", None) + event_json.pop("age_ts", None) + event_json.pop("unsigned", None) event_json_bytes = encode_canonical_json(event_json) hashed = hash_algorithm(event_json_bytes) return (hashed.name, hashed.digest()) -def sign_event_pdu(pdu, signature_name, signing_key): - tmp_pdu = Pdu(**pdu.get_dict()) - tmp_pdu = prune_pdu(tmp_pdu) - pdu_json = tmp_pdu.get_dict() - pdu_json = sign_json(pdu_json, signature_name, signing_key) - pdu.signatures = pdu_json["signatures"] - return pdu - - -def verify_signed_event_pdu(pdu, signature_name, verify_key): - tmp_pdu = Pdu(**pdu.get_dict()) - tmp_pdu = prune_pdu(tmp_pdu) - pdu_json = tmp_pdu.get_dict() - verify_signed_json(pdu_json, signature_name, verify_key) - - -def add_hashes_and_signatures(event, signature_name, signing_key, - hash_algorithm=hashlib.sha256): +def compute_event_signature(event, signature_name, signing_key): tmp_event = copy.deepcopy(event) tmp_event = prune_event(tmp_event) - redact_json = tmp_event.get_dict() + redact_json = tmp_event.get_full_dict() redact_json.pop("signatures", None) + redact_json.pop("age_ts", None) + redact_json.pop("unsigned", None) + logger.debug("Signing event: %s", redact_json) redact_json = sign_json(redact_json, signature_name, signing_key) - event.signatures = redact_json["signatures"] + return redact_json["signatures"] + + +def add_hashes_and_signatures(event, signature_name, signing_key, + hash_algorithm=hashlib.sha256): + hashed = _compute_content_hash(event, hash_algorithm=hash_algorithm) - event_json = event.get_full_dict() - #TODO: We need to sign the JSON that is going out via fedaration. - event_json.pop("age_ts", None) - event_json.pop("unsigned", None) - event_json.pop("signatures", None) - event_json.pop("hashes", None) - event_json_bytes = encode_canonical_json(event_json) - hashed = hash_algorithm(event_json_bytes) if not hasattr(event, "hashes"): event.hashes = {} event.hashes[hashed.name] = encode_base64(hashed.digest()) + + event.signatures = compute_event_signature( + event, + signature_name=signature_name, + signing_key=signing_key, + ) diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index 5ec97a698e..52c84efb5b 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -14,10 +14,6 @@ # limitations under the License. from .units import Pdu -from synapse.crypto.event_signing import ( - add_event_pdu_content_hash, sign_event_pdu -) -from synapse.types import EventID import copy @@ -49,17 +45,10 @@ class PduCodec(object): def pdu_from_event(self, event): d = event.get_full_dict() - if hasattr(event, "state_key"): - d["is_state"] = True - kwargs = copy.deepcopy(event.unrecognized_keys) kwargs.update({ k: v for k, v in d.items() }) - if "origin_server_ts" not in kwargs: - kwargs["origin_server_ts"] = int(self.clock.time_msec()) - pdu = Pdu(**kwargs) - pdu = add_event_pdu_content_hash(pdu) - return sign_event_pdu(pdu, self.server_name, self.signing_key) + return pdu diff --git a/synapse/federation/units.py b/synapse/federation/units.py index c94dcf64cf..c2d8dca8f3 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -65,8 +65,7 @@ class Pdu(JsonEncodedObject): "content", "outlier", "hashes", - "signatures", - "is_state", # Below this are keys valid only for State Pdus. + "signatures", # Below this are keys valid only for State Pdus. "state_key", "prev_state", "required_power_level", @@ -91,16 +90,10 @@ class Pdu(JsonEncodedObject): # TODO: We need to make this properly load content rather than # just leaving it as a dict. (OR DO WE?!) - def __init__(self, destinations=[], is_state=False, prev_events=[], + def __init__(self, destinations=[], prev_events=[], outlier=False, hashes={}, signatures={}, **kwargs): - if is_state: - for required_key in ["state_key"]: - if required_key not in kwargs: - raise RuntimeError("Key %s is required" % required_key) - super(Pdu, self).__init__( destinations=destinations, - is_state=bool(is_state), prev_events=prev_events, outlier=outlier, hashes=hashes, -- cgit 1.4.1 From aa76bf39aba2eadf506f57952a1dffce629f2637 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Nov 2014 14:14:02 +0000 Subject: Remove unused imports --- synapse/federation/persistence.py | 2 -- synapse/federation/units.py | 3 --- synapse/handlers/federation.py | 6 ++---- synapse/state.py | 2 -- synapse/storage/_base.py | 2 -- 5 files changed, 2 insertions(+), 13 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index b04fbb4177..73dc844d59 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -21,8 +21,6 @@ These actions are mostly only used by the :py:mod:`.replication` module. from twisted.internet import defer -from .units import Pdu - from synapse.util.logutils import log_function import json diff --git a/synapse/federation/units.py b/synapse/federation/units.py index c2d8dca8f3..9b25556707 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -18,11 +18,8 @@ server protocol. """ from synapse.util.jsonobject import JsonEncodedObject -from syutil.base64util import encode_base64 import logging -import json -import copy logger = logging.getLogger(__name__) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bdd28f04bb..49bfff88a4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -17,15 +17,13 @@ from ._base import BaseHandler -from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent +from synapse.api.events.room import RoomMemberEvent from synapse.api.constants import Membership from synapse.util.logutils import log_function from synapse.federation.pdu_codec import PduCodec -from synapse.api.errors import SynapseError from synapse.util.async import run_on_reactor -from synapse.types import EventID -from twisted.internet import defer, reactor +from twisted.internet import defer import logging diff --git a/synapse/state.py b/synapse/state.py index f4efc287c9..9771883bc3 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -19,8 +19,6 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor -from synapse.types import EventID - from collections import namedtuple import copy diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 464b12f032..5e00c23fd1 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -14,8 +14,6 @@ # limitations under the License. import logging -from twisted.internet import defer - from synapse.api.errors import StoreError from synapse.api.events.utils import prune_event from synapse.util.logutils import log_function -- cgit 1.4.1 From d7412c4df1cdfc1128b6d63a38c4ac1d6f08b76b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Nov 2014 14:16:19 +0000 Subject: Remove unused interface --- synapse/federation/replication.py | 8 -------- 1 file changed, 8 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 838e660a46..99dd390a64 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -533,14 +533,6 @@ class ReplicationLayer(object): return "" % self.server_name -class ReplicationHandler(object): - """This defines the methods that the :py:class:`.ReplicationLayer` will - use to communicate with the rest of the home server. - """ - def on_receive_pdu(self, pdu): - raise NotImplementedError("on_receive_pdu") - - class _TransactionQueue(object): """This class makes sure we only have one transaction in flight at a time for a given destination. -- cgit 1.4.1 From 440cbd5235e7e23dfe97d8e3d394cc0d35b35fd6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Nov 2014 14:17:55 +0000 Subject: Add support for sending failures --- synapse/federation/replication.py | 30 ++++++++++++++++++++++++++++-- synapse/federation/units.py | 1 + synapse/types.py | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 99dd390a64..680e7322a6 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -143,6 +143,11 @@ class ReplicationLayer(object): self._transaction_queue.enqueue_edu(edu) return defer.succeed(None) + @log_function + def send_failure(self, failure, destination): + self._transaction_queue.enqueue_failure(failure, destination) + return defer.succeed(None) + @log_function def make_query(self, destination, query_type, args, retry_on_dns_fail=True): @@ -558,6 +563,9 @@ class _TransactionQueue(object): # destination -> list of tuple(edu, deferred) self.pending_edus_by_dest = {} + # destination -> list of tuple(failure, deferred) + self.pending_failures_by_dest = {} + # HACK to get unique tx id self._next_txn_id = int(self._clock.time_msec()) @@ -610,6 +618,18 @@ class _TransactionQueue(object): return deferred + @defer.inlineCallbacks + def enqueue_failure(self, failure, destination): + deferred = defer.Deferred() + + self.pending_failures_by_dest.setdefault( + destination, [] + ).append( + (failure, deferred) + ) + + yield deferred + @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): @@ -619,8 +639,9 @@ class _TransactionQueue(object): # list of (pending_pdu, deferred, order) pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_failures = self.pending_failures_by_dest(destination, []) - if not pending_pdus and not pending_edus: + if not pending_pdus and not pending_edus and not pending_failures: return logger.debug("TX [%s] Attempting new transaction", destination) @@ -630,7 +651,11 @@ class _TransactionQueue(object): pdus = [x[0] for x in pending_pdus] edus = [x[0] for x in pending_edus] - deferreds = [x[1] for x in pending_pdus + pending_edus] + failures = [x[0].get_dict() for x in pending_failures] + deferreds = [ + x[1] + for x in pending_pdus + pending_edus + pending_failures + ] try: self.pending_transactions[destination] = 1 @@ -644,6 +669,7 @@ class _TransactionQueue(object): destination=destination, pdus=pdus, edus=edus, + pdu_failures=failures, ) self._next_txn_id += 1 diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 9b25556707..2070ffe1e2 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -157,6 +157,7 @@ class Transaction(JsonEncodedObject): "edus", "transaction_id", "destination", + "pdu_failures", ] internal_keys = [ diff --git a/synapse/types.py b/synapse/types.py index 649ff2f7d7..8fac20fd2e 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -128,3 +128,37 @@ class StreamToken( d = self._asdict() d[key] = new_value return StreamToken(**d) + + +class FederationError(RuntimeError): + """ This class is used to inform remote home servers about erroneous + PDUs they sent us. + + FATAL: The remote server could not interpret the source event. + (e.g., it was missing a required field) + ERROR: The remote server interpreted the event, but it failed some other + check (e.g. auth) + WARN: The remote server accepted the event, but believes some part of it + is wrong (e.g., it referred to an invalid event) + """ + + def __init__(self, level, code, reason, affected, source=None): + if level not in ["FATAL", "ERROR", "WARN"]: + raise ValueError("Level is not valid: %s" % (level,)) + self.level = level + self.code = code + self.reason = reason + self.affected = affected + self.source = source + + msg = "%s %s: %s" % (level, code, reason,) + super(FederationError, self).__init__(msg) + + def get_dict(self): + return { + "level": self.level, + "code": self.code, + "reason": self.reason, + "affected": self.affected, + "source": self.source if self.source else self.affected, + } -- cgit 1.4.1 From fc7b2b11a28b49a4d5b50fef66bae0faee47503f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Nov 2014 15:09:34 +0000 Subject: PEP8 --- synapse/federation/replication.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 680e7322a6..98997998ff 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -81,7 +81,7 @@ class ReplicationLayer(object): def register_edu_handler(self, edu_type, handler): if edu_type in self.edu_handlers: - raise KeyError("Already have an EDU handler for %s" % (edu_type)) + raise KeyError("Already have an EDU handler for %s" % (edu_type,)) self.edu_handlers[edu_type] = handler @@ -102,7 +102,9 @@ class ReplicationLayer(object): object to encode as JSON. """ if query_type in self.query_handlers: - raise KeyError("Already have a Query handler for %s" % (query_type)) + raise KeyError( + "Already have a Query handler for %s" % (query_type,) + ) self.query_handlers[query_type] = handler @@ -128,7 +130,10 @@ class ReplicationLayer(object): # TODO, add errback, etc. self._transaction_queue.enqueue_pdu(pdu, order) - logger.debug("[%s] transaction_layer.enqueue_pdu... done", pdu.event_id) + logger.debug( + "[%s] transaction_layer.enqueue_pdu... done", + pdu.event_id + ) @log_function def send_edu(self, destination, edu_type, content): @@ -317,7 +322,11 @@ class ReplicationLayer(object): if hasattr(transaction, "edus"): for edu in [Edu(**x) for x in transaction.edus]: - self.received_edu(transaction.origin, edu.edu_type, edu.content) + self.received_edu( + transaction.origin, + edu.edu_type, + edu.content + ) results = yield defer.DeferredList(dl) -- cgit 1.4.1 From a5a4ef3fd7b632d838940fb3bd7b079a897fa8a4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Nov 2014 15:16:43 +0000 Subject: Fix bug in replication --- synapse/federation/replication.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 98997998ff..92a9678e2c 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -648,7 +648,7 @@ class _TransactionQueue(object): # list of (pending_pdu, deferred, order) pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_failures = self.pending_failures_by_dest(destination, []) + pending_failures = self.pending_failures_by_dest.pop(destination, []) if not pending_pdus and not pending_edus and not pending_failures: return -- cgit 1.4.1 From 3b4dec442da51c6c999dd946db6ea6ce5f07ff0c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Nov 2014 11:22:12 +0000 Subject: Return auth chain when handling send_join --- synapse/federation/replication.py | 20 +++++++++++++++----- synapse/handlers/federation.py | 15 ++++++++++++--- 2 files changed, 27 insertions(+), 8 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 92a9678e2c..d1eddf249d 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -426,8 +426,12 @@ class ReplicationLayer(object): @defer.inlineCallbacks def on_send_join_request(self, origin, content): pdu = Pdu(**content) - state = yield self.handler.on_send_join_request(origin, pdu) - defer.returnValue((200, self._transaction_from_pdus(state).get_dict())) + res_pdus = yield self.handler.on_send_join_request(origin, pdu) + + defer.returnValue((200, { + "state": [p.get_dict() for p in res_pdus["state"]], + "auth_chain": [p.get_dict() for p in res_pdus["auth_chain"]], + })) @defer.inlineCallbacks def make_join(self, destination, context, user_id): @@ -451,11 +455,17 @@ class ReplicationLayer(object): ) logger.debug("Got content: %s", content) - pdus = [Pdu(outlier=True, **p) for p in content.get("pdus", [])] - for pdu in pdus: + state = [Pdu(outlier=True, **p) for p in content.get("state", [])] + for pdu in state: yield self._handle_new_pdu(destination, pdu) - defer.returnValue(pdus) + auth_chain = [ + Pdu(outlier=True, **p) for p in content.get("auth_chain", []) + ] + for pdu in auth_chain: + yield self._handle_new_pdu(destination, pdu) + + defer.returnValue(state) @log_function def _get_persisted_pdu(self, event_id): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 09593303a4..c193da12b7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -366,10 +366,19 @@ class FederationHandler(BaseHandler): yield self.replication_layer.send_pdu(new_pdu) - defer.returnValue([ + auth_chain = yield self.store.get_auth_chain(event.event_id) + pdu_auth_chain = [ self.pdu_codec.pdu_from_event(e) - for e in event.state_events.values() - ]) + for e in auth_chain + ] + + defer.returnValue({ + "state": [ + self.pdu_codec.pdu_from_event(e) + for e in event.state_events.values() + ], + "auth_chain": pdu_auth_chain, + }) @defer.inlineCallbacks def get_state_for_pdu(self, event_id): -- cgit 1.4.1 From 328dab246376814c141095d63e5cef3b4cb52068 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Nov 2014 11:40:38 +0000 Subject: Remove /context/ request --- synapse/federation/replication.py | 7 ------- synapse/federation/transport.py | 9 --------- 2 files changed, 16 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index d1eddf249d..37e7db0536 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -272,13 +272,6 @@ class ReplicationLayer(object): defer.returnValue(pdus) - @defer.inlineCallbacks - @log_function - def on_context_pdus_request(self, context): - raise NotImplementedError( - "on_context_pdus_request is a security violation" - ) - @defer.inlineCallbacks @log_function def on_backfill_request(self, context, versions, limit): diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index 04ad7e63ae..b9f7d54c71 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -403,15 +403,6 @@ class TransportLayer(object): ) ) - self.server.register_path( - "GET", - re.compile("^" + PREFIX + "/context/([^/]*)/$"), - self._with_authentication( - lambda origin, content, query, context: - handler.on_context_pdus_request(context) - ) - ) - # This is when we receive a server-server Query self.server.register_path( "GET", -- cgit 1.4.1 From d2fb2b8095ec7f5d00b51418e84b05a1b23b79b3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Nov 2014 13:41:00 +0000 Subject: Implement invite part of invite join dance --- synapse/federation/replication.py | 15 ++++++++++++++- synapse/handlers/_base.py | 13 ++++++++++++- synapse/handlers/federation.py | 37 +++++++++++++++++++++++++++++++++++++ synapse/handlers/room.py | 32 ++++++++++++-------------------- 4 files changed, 75 insertions(+), 22 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 37e7db0536..e358de942e 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -413,7 +413,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks def on_invite_request(self, origin, content): pdu = Pdu(**content) - ret_pdu = yield self.handler.on_send_join_request(origin, pdu) + ret_pdu = yield self.handler.on_invite_request(origin, pdu) defer.returnValue((200, ret_pdu.get_dict())) @defer.inlineCallbacks @@ -460,6 +460,19 @@ class ReplicationLayer(object): defer.returnValue(state) + @defer.inlineCallbacks + def send_invite(self, destination, context, event_id, pdu): + code, pdu_dict = yield self.transport_layer.send_invite( + destination=destination, + context=context, + event_id=event_id, + content=pdu.get_dict(), + ) + + logger.debug("Got response to send_invite: %s", pdu_dict) + + defer.returnValue(Pdu(**pdu_dict)) + @log_function def _get_persisted_pdu(self, event_id): """ Get a PDU from the database with given origin and id. diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index f630280031..9dc2fc2e0f 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -56,7 +56,8 @@ class BaseHandler(object): @defer.inlineCallbacks def _on_new_room_event(self, event, snapshot, extra_destinations=[], - extra_users=[], suppress_auth=False): + extra_users=[], suppress_auth=False, + do_invite_host=None): yield run_on_reactor() snapshot.fill_out_prev_events(event) @@ -80,6 +81,16 @@ class BaseHandler(object): else: logger.debug("Suppressed auth.") + if do_invite_host: + federation_handler = self.hs.get_handlers().federation_handler + invite_event = yield federation_handler.send_invite( + do_invite_host, + event + ) + + # FIXME: We need to check if the remote changed anything else + event.signatures = invite_event.signatures + yield self.store.persist_event(event) destinations = set(extra_destinations) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c193da12b7..e6afd95a58 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -23,6 +23,7 @@ from synapse.api.constants import Membership from synapse.util.logutils import log_function from synapse.federation.pdu_codec import PduCodec from synapse.util.async import run_on_reactor +from synapse.crypto.event_signing import compute_event_signature from twisted.internet import defer @@ -212,6 +213,17 @@ class FederationHandler(BaseHandler): defer.returnValue(events) + @defer.inlineCallbacks + def send_invite(self, target_host, event): + pdu = yield self.replication_layer.send_invite( + destination=target_host, + context=event.room_id, + event_id=event.event_id, + pdu=self.pdu_codec.pdu_from_event(event) + ) + + defer.returnValue(self.pdu_codec.event_from_pdu(pdu)) + @log_function @defer.inlineCallbacks def do_invite_join(self, target_host, room_id, joinee, content, snapshot): @@ -380,6 +392,31 @@ class FederationHandler(BaseHandler): "auth_chain": pdu_auth_chain, }) + @defer.inlineCallbacks + def on_invite_request(self, origin, pdu): + event = self.pdu_codec.event_from_pdu(pdu) + + event.outlier = True + + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) + + yield self.state_handler.annotate_state_groups(event) + + yield self.store.persist_event( + event, + backfilled=False, + ) + + yield self.notifier.on_new_room_event(event) + + defer.returnValue(self.pdu_codec.pdu_from_event(event)) + @defer.inlineCallbacks def get_state_for_pdu(self, event_id): yield run_on_reactor() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 42a6c9f9bf..3642fcfc6d 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -361,13 +361,6 @@ class RoomMemberHandler(BaseHandler): if prev_state: event.content["prev"] = prev_state.membership -# if prev_state and prev_state.membership == event.membership: -# # treat this event as a NOOP. -# if do_auth: # This is mainly to fix a unit test. -# yield self.auth.check(event, raises=True) -# defer.returnValue({}) -# return - room_id = event.room_id # If we're trying to join a room then we have to do this differently @@ -521,25 +514,24 @@ class RoomMemberHandler(BaseHandler): defer.returnValue([r.room_id for r in rooms]) + @defer.inlineCallbacks def _do_local_membership_update(self, event, membership, snapshot, do_auth): - destinations = [] - # If we're inviting someone, then we should also send it to that # HS. target_user_id = event.state_key target_user = self.hs.parse_userid(target_user_id) - if membership == Membership.INVITE: - host = target_user.domain - destinations.append(host) - - # Always include target domain - host = target_user.domain - destinations.append(host) - - return self._on_new_room_event( - event, snapshot, extra_destinations=destinations, - extra_users=[target_user], suppress_auth=(not do_auth), + if membership == Membership.INVITE and not target_user.is_mine: + do_invite_host = target_user.domain + else: + do_invite_host = None + + yield self._on_new_room_event( + event, + snapshot, + extra_users=[target_user], + suppress_auth=(not do_auth), + do_invite_host=do_invite_host, ) -- cgit 1.4.1 From 02c3b1c9e2afa27753e9ce898e5455b6489541b4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Nov 2014 15:35:53 +0000 Subject: Add '/event_auth/' federation api --- synapse/federation/replication.py | 5 +++++ synapse/federation/transport.py | 26 ++++++++++++++++++++++++++ synapse/handlers/federation.py | 5 +++++ synapse/storage/event_federation.py | 26 +++++++++++++++++++------- 4 files changed, 55 insertions(+), 7 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index e358de942e..719bfcc42c 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -426,6 +426,11 @@ class ReplicationLayer(object): "auth_chain": [p.get_dict() for p in res_pdus["auth_chain"]], })) + @defer.inlineCallbacks + def on_event_auth(self, origin, context, event_id): + auth_pdus = yield self.handler.on_event_auth(event_id) + defer.returnValue((200, [a.get_dict() for a in auth_pdus])) + @defer.inlineCallbacks def make_join(self, destination, context, user_id): pdu_dict = yield self.transport_layer.make_join( diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index b9f7d54c71..babe8447eb 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -256,6 +256,21 @@ class TransportLayer(object): defer.returnValue(json.loads(content)) + @defer.inlineCallbacks + @log_function + def get_event_auth(self, destination, context, event_id): + path = PREFIX + "/event_auth/%s/%s" % ( + context, + event_id, + ) + + response = yield self.client.get_json( + destination=destination, + path=path, + ) + + defer.returnValue(response) + @defer.inlineCallbacks def _authenticate_request(self, request): json_request = { @@ -426,6 +441,17 @@ class TransportLayer(object): ) ) + self.server.register_path( + "GET", + re.compile("^" + PREFIX + "/event_auth/([^/]*)/([^/]*)$"), + self._with_authentication( + lambda origin, content, query, context, event_id: + handler.on_event_auth( + origin, context, event_id, + ) + ) + ) + self.server.register_path( "PUT", re.compile("^" + PREFIX + "/send_join/([^/]*)/([^/]*)$"), diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index e6afd95a58..ce65bbcd62 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -224,6 +224,11 @@ class FederationHandler(BaseHandler): defer.returnValue(self.pdu_codec.event_from_pdu(pdu)) + @defer.inlineCallbacks + def on_event_auth(self, event_id): + auth = yield self.store.get_auth_chain(event_id) + defer.returnValue([self.pdu_codec.pdu_from_event(e) for e in auth]) + @log_function @defer.inlineCallbacks def do_invite_join(self, target_host, room_id, joinee, content, snapshot): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index d66a49e9f2..06e32d592d 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -32,6 +32,24 @@ class EventFederationStore(SQLBaseStore): ) def _get_auth_chain_txn(self, txn, event_id): + results = self._get_auth_chain_ids_txn(txn, event_id) + + sql = "SELECT * FROM events WHERE event_id = ?" + rows = [] + for ev_id in results: + c = txn.execute(sql, (ev_id,)) + rows.extend(self.cursor_to_dict(c)) + + return self._parse_events_txn(txn, rows) + + def get_auth_chain_ids(self, event_id): + return self.runInteraction( + "get_auth_chain_ids", + self._get_auth_chain_ids_txn, + event_id + ) + + def _get_auth_chain_ids_txn(self, txn, event_id): results = set() base_sql = ( @@ -48,13 +66,7 @@ class EventFederationStore(SQLBaseStore): front = [r[0] for r in txn.fetchall()] results.update(front) - sql = "SELECT * FROM events WHERE event_id = ?" - rows = [] - for ev_id in results: - c = txn.execute(sql, (ev_id,)) - rows.extend(self.cursor_to_dict(c)) - - return self._parse_events_txn(txn, rows) + return list(results) def get_oldest_events_in_room(self, room_id): return self.runInteraction( -- cgit 1.4.1 From 1c06806f90a6368cdc3b9fa3b9053021b7c40e94 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Nov 2014 10:21:32 +0000 Subject: Finish redaction algorithm. --- synapse/api/events/__init__.py | 4 ++-- synapse/api/events/utils.py | 39 ++++++++++++++++++++++++++------------- synapse/crypto/event_signing.py | 7 ++----- synapse/federation/units.py | 6 ++---- synapse/storage/_base.py | 2 +- 5 files changed, 33 insertions(+), 25 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index 8d65c29ac1..f1e53f23ab 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -86,8 +86,8 @@ class SynapseEvent(JsonEncodedObject): def __init__(self, raises=True, **kwargs): super(SynapseEvent, self).__init__(**kwargs) - if "content" in kwargs: - self.check_json(self.content, raises=raises) + # if "content" in kwargs: + # self.check_json(self.content, raises=raises) def get_content_template(self): """ Retrieve the JSON template for this event as a dict. diff --git a/synapse/api/events/utils.py b/synapse/api/events/utils.py index 5fc79105b5..802648f8f7 100644 --- a/synapse/api/events/utils.py +++ b/synapse/api/events/utils.py @@ -18,24 +18,31 @@ from .room import ( RoomAliasesEvent, RoomCreateEvent, ) + def prune_event(event): - """ Prunes the given event of all keys we don't know about or think could - potentially be dodgy. + """ Returns a pruned version of the given event, which removes all keys we + don't know about or think could potentially be dodgy. This is used when we "redact" an event. We want to remove all fields that the user has specified, but we do want to keep necessary information like type, state_key etc. """ - return _prune_event_or_pdu(event.type, event) - -def prune_pdu(pdu): - """Removes keys that contain unrestricted and non-essential data from a PDU - """ - return _prune_event_or_pdu(pdu.type, pdu) + event_type = event.type -def _prune_event_or_pdu(event_type, event): - # Remove all extraneous fields. - event.unrecognized_keys = {} + allowed_keys = [ + "event_id", + "user_id", + "room_id", + "hashes", + "signatures", + "content", + "type", + "state_key", + "depth", + "prev_events", + "prev_state", + "auth_events", + ] new_content = {} @@ -65,6 +72,12 @@ def _prune_event_or_pdu(event_type, event): elif event_type == RoomAliasesEvent.TYPE: add_fields("aliases") - event.content = new_content + allowed_fields = { + k: v + for k, v in event.get_full_dict().items() + if k in allowed_keys + } + + allowed_fields["content"] = new_content - return event + return type(event)(**allowed_fields) diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index 7d800615fe..056e8f6ca4 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -55,9 +55,7 @@ def _compute_content_hash(event, hash_algorithm): def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256): - # FIXME(erikj): GenericEvent! - tmp_event = GenericEvent(**event.get_full_dict()) - tmp_event = prune_event(tmp_event) + tmp_event = prune_event(event) event_json = tmp_event.get_dict() event_json.pop("signatures", None) event_json.pop("age_ts", None) @@ -68,8 +66,7 @@ def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256): def compute_event_signature(event, signature_name, signing_key): - tmp_event = copy.deepcopy(event) - tmp_event = prune_event(tmp_event) + tmp_event = prune_event(event) redact_json = tmp_event.get_full_dict() redact_json.pop("signatures", None) redact_json.pop("age_ts", None) diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 2070ffe1e2..d98014cac7 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -56,17 +56,15 @@ class Pdu(JsonEncodedObject): "origin_server_ts", "type", "destinations", - "transaction_id", "prev_events", "depth", "content", - "outlier", "hashes", + "user_id", + "auth_events", "signatures", # Below this are keys valid only for State Pdus. "state_key", "prev_state", - "required_power_level", - "user_id", ] internal_keys = [ diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 9aa404695d..3ab81a78d5 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -509,7 +509,7 @@ class SQLBaseStore(object): ) if del_evs: - prune_event(ev) + ev = prune_event(ev) ev.redacted_because = del_evs[0] return events -- cgit 1.4.1 From 6447db063a0d01135582bdfb3392b419f16a19e7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Nov 2014 11:59:51 +0000 Subject: Fix backfill to work. Add auth to backfill request --- synapse/api/auth.py | 6 ++++++ synapse/federation/replication.py | 36 ++++++++++++++++++++++++++++-------- synapse/federation/transport.py | 6 +++--- synapse/handlers/federation.py | 10 +++++----- synapse/storage/_base.py | 12 ++++++++++++ synapse/storage/event_federation.py | 4 ++-- 6 files changed, 56 insertions(+), 18 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 3e5d878eed..48f9d460a3 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -104,6 +104,12 @@ class Auth(object): pass defer.returnValue(None) + @defer.inlineCallbacks + def check_host_in_room(self, room_id, host): + joined_hosts = yield self.store.get_joined_hosts_for_room(room_id) + + defer.returnValue(host in joined_hosts) + def check_event_sender_in_room(self, event): key = (RoomMemberEvent.TYPE, event.user_id, ) member_event = event.state_events.get(key) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 719bfcc42c..7837f1c252 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -205,7 +205,7 @@ class ReplicationLayer(object): pdus = [Pdu(outlier=False, **p) for p in transaction.pdus] for pdu in pdus: - yield self._handle_new_pdu(pdu, backfilled=True) + yield self._handle_new_pdu(dest, pdu, backfilled=True) defer.returnValue(pdus) @@ -274,9 +274,9 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_backfill_request(self, context, versions, limit): + def on_backfill_request(self, origin, context, versions, limit): pdus = yield self.handler.on_backfill_request( - context, versions, limit + origin, context, versions, limit ) defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) @@ -408,13 +408,22 @@ class ReplicationLayer(object): @defer.inlineCallbacks def on_make_join_request(self, context, user_id): pdu = yield self.handler.on_make_join_request(context, user_id) - defer.returnValue(pdu.get_dict()) + defer.returnValue({ + "event": pdu.get_dict(), + }) @defer.inlineCallbacks def on_invite_request(self, origin, content): pdu = Pdu(**content) ret_pdu = yield self.handler.on_invite_request(origin, pdu) - defer.returnValue((200, ret_pdu.get_dict())) + defer.returnValue( + ( + 200, + { + "event": ret_pdu.get_dict(), + } + ) + ) @defer.inlineCallbacks def on_send_join_request(self, origin, content): @@ -429,16 +438,25 @@ class ReplicationLayer(object): @defer.inlineCallbacks def on_event_auth(self, origin, context, event_id): auth_pdus = yield self.handler.on_event_auth(event_id) - defer.returnValue((200, [a.get_dict() for a in auth_pdus])) + defer.returnValue( + ( + 200, + { + "auth_chain": [a.get_dict() for a in auth_pdus], + } + ) + ) @defer.inlineCallbacks def make_join(self, destination, context, user_id): - pdu_dict = yield self.transport_layer.make_join( + ret = yield self.transport_layer.make_join( destination=destination, context=context, user_id=user_id, ) + pdu_dict = ret["event"] + logger.debug("Got response to make_join: %s", pdu_dict) defer.returnValue(Pdu(**pdu_dict)) @@ -467,13 +485,15 @@ class ReplicationLayer(object): @defer.inlineCallbacks def send_invite(self, destination, context, event_id, pdu): - code, pdu_dict = yield self.transport_layer.send_invite( + code, content = yield self.transport_layer.send_invite( destination=destination, context=context, event_id=event_id, content=pdu.get_dict(), ) + pdu_dict = content["event"] + logger.debug("Got response to send_invite: %s", pdu_dict) defer.returnValue(Pdu(**pdu_dict)) diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index babe8447eb..92a1f4ce17 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -413,7 +413,7 @@ class TransportLayer(object): self._with_authentication( lambda origin, content, query, context: self._on_backfill_request( - context, query["v"], query["limit"] + origin, context, query["v"], query["limit"] ) ) ) @@ -552,7 +552,7 @@ class TransportLayer(object): defer.returnValue(data) @log_function - def _on_backfill_request(self, context, v_list, limits): + def _on_backfill_request(self, origin, context, v_list, limits): if not limits: return defer.succeed( (400, {"error": "Did not include limit param"}) @@ -563,7 +563,7 @@ class TransportLayer(object): versions = v_list return self.request_handler.on_backfill_request( - context, versions, limit + origin, context, versions, limit ) @defer.inlineCallbacks diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9a59fe94d2..00d10609b8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -193,10 +193,7 @@ class FederationHandler(BaseHandler): dest, room_id, limit, - extremities=[ - self.pdu_codec.decode_event_id(e) - for e in extremities - ] + extremities=extremities, ) events = [] @@ -473,7 +470,10 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def on_backfill_request(self, context, pdu_list, limit): + def on_backfill_request(self, origin, context, pdu_list, limit): + in_room = yield self.auth.check_host_in_room(context, origin) + if not in_room: + raise AuthError(403, "Host not in room.") events = yield self.store.get_backfill_events( context, diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 3ab81a78d5..a23f2b941b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -447,6 +447,18 @@ class SQLBaseStore(object): **d ) + def _get_events_txn(self, txn, event_ids): + # FIXME (erikj): This should be batched? + + sql = "SELECT * FROM events WHERE event_id = ?" + + event_rows = [] + for e_id in event_ids: + c = txn.execute(sql, (e_id,)) + event_rows.extend(self.cursor_to_dict(c)) + + return self._parse_events_txn(txn, event_rows) + def _parse_events(self, rows): return self.runInteraction( "_parse_events", self._parse_events_txn, rows diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 06e32d592d..a707030145 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -371,10 +371,10 @@ class EventFederationStore(SQLBaseStore): "_backfill_interaction: got id=%s", *row ) - new_front.append(row) + new_front.append(row[0]) front = new_front event_results += new_front # We also want to update the `prev_pdus` attributes before returning. - return self._get_pdu_tuples(txn, event_results) + return self._get_events_txn(txn, event_results) -- cgit 1.4.1 From 003668cfaadc2e96fab0127d7c563eb4b17e0619 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Nov 2014 13:37:24 +0000 Subject: Add auth to the various server-server APIs --- synapse/federation/replication.py | 14 ++++++++------ synapse/federation/transport.py | 3 ++- synapse/handlers/federation.py | 15 +++++++++++++-- 3 files changed, 23 insertions(+), 9 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 7837f1c252..dd8124dbb9 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -347,10 +347,12 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_context_state_request(self, context, event_id): + def on_context_state_request(self, origin, context, event_id): if event_id: pdus = yield self.handler.get_state_for_pdu( - event_id + origin, + context, + event_id, ) else: raise NotImplementedError("Specify an event") @@ -365,8 +367,8 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_pdu_request(self, event_id): - pdu = yield self._get_persisted_pdu(event_id) + def on_pdu_request(self, origin, event_id): + pdu = yield self._get_persisted_pdu(origin, event_id) if pdu: defer.returnValue( @@ -499,13 +501,13 @@ class ReplicationLayer(object): defer.returnValue(Pdu(**pdu_dict)) @log_function - def _get_persisted_pdu(self, event_id): + def _get_persisted_pdu(self, origin, event_id): """ Get a PDU from the database with given origin and id. Returns: Deferred: Results in a `Pdu`. """ - return self.handler.get_persisted_pdu(event_id) + return self.handler.get_persisted_pdu(origin, event_id) def _transaction_from_pdus(self, pdu_list): """Returns a new Transaction containing the given PDUs suitable for diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index 92a1f4ce17..d84a44c211 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -390,7 +390,7 @@ class TransportLayer(object): re.compile("^" + PREFIX + "/event/([^/]*)/$"), self._with_authentication( lambda origin, content, query, event_id: - handler.on_pdu_request(event_id) + handler.on_pdu_request(origin, event_id) ) ) @@ -401,6 +401,7 @@ class TransportLayer(object): self._with_authentication( lambda origin, content, query, context: handler.on_context_state_request( + origin, context, query.get("event_id", [None])[0], ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 00d10609b8..587fa308c8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -436,9 +436,13 @@ class FederationHandler(BaseHandler): defer.returnValue(self.pdu_codec.pdu_from_event(event)) @defer.inlineCallbacks - def get_state_for_pdu(self, event_id): + def get_state_for_pdu(self, origin, room_id, event_id): yield run_on_reactor() + in_room = yield self.auth.check_host_in_room(room_id, origin) + if not in_room: + raise AuthError(403, "Host not in room.") + state_groups = yield self.store.get_state_groups( [event_id] ) @@ -488,7 +492,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def get_persisted_pdu(self, event_id): + def get_persisted_pdu(self, origin, event_id): """ Get a PDU from the database with given origin and id. Returns: @@ -500,6 +504,13 @@ class FederationHandler(BaseHandler): ) if event: + in_room = yield self.auth.check_host_in_room( + event.room_id, + origin + ) + if not in_room: + raise AuthError(403, "Host not in room.") + defer.returnValue(self.pdu_codec.pdu_from_event(event)) else: defer.returnValue(None) -- cgit 1.4.1 From c46088405aa6baa41da0aef56073ae3b4fe9f850 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Nov 2014 13:39:33 +0000 Subject: Remove useless comments --- synapse/federation/replication.py | 23 ----------------------- 1 file changed, 23 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index dd8124dbb9..e798304353 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -356,12 +356,6 @@ class ReplicationLayer(object): ) else: raise NotImplementedError("Specify an event") - # results = yield self.store.get_current_state_for_context( - # context - # ) - # pdus = [Pdu.from_pdu_tuple(p) for p in results] - # - # logger.debug("Context returning %d results", len(pdus)) defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) @@ -382,21 +376,6 @@ class ReplicationLayer(object): def on_pull_request(self, origin, versions): raise NotImplementedError("Pull transacions not implemented") - # transaction_id = max([int(v) for v in versions]) - # - # response = yield self.pdu_actions.after_transaction( - # transaction_id, - # origin, - # self.server_name - # ) - # - # if not response: - # response = [] - # - # defer.returnValue( - # (200, self._transaction_from_pdus(response).get_dict()) - # ) - @defer.inlineCallbacks def on_query_request(self, query_type, args): if query_type in self.query_handlers: @@ -570,8 +549,6 @@ class ReplicationLayer(object): origin, pdu.room_id, pdu.event_id, ) - # Persist the Pdu, but don't mark it as processed yet. - # yield self.store.persist_event(pdu=pdu) if not backfilled: ret = yield self.handler.on_receive_pdu( -- cgit 1.4.1 From 5d439b127ba34b951dfd09a7d3c684c2d50df702 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Nov 2014 13:46:44 +0000 Subject: PEP8 --- synapse/api/auth.py | 3 +-- synapse/api/events/room.py | 1 + synapse/federation/replication.py | 1 - synapse/federation/transport.py | 9 ++++++--- synapse/federation/units.py | 7 +++---- synapse/handlers/federation.py | 5 ++++- synapse/storage/__init__.py | 7 ++++--- synapse/storage/event_federation.py | 9 +++------ 8 files changed, 22 insertions(+), 20 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 48f9d460a3..a5c6964707 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -369,7 +369,6 @@ class Auth(object): ] event.auth_events = zip(auth_events, hashes) - @log_function def _can_send_event(self, event): key = (RoomPowerLevelsEvent.TYPE, "", ) @@ -452,7 +451,7 @@ class Auth(object): event.user_id, ) - # Check other levels: + # Check other levels: levels_to_check = [ ("users_default", []), ("events_default", []), diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py index 25bc883706..8c4ac45d02 100644 --- a/synapse/api/events/room.py +++ b/synapse/api/events/room.py @@ -153,6 +153,7 @@ class RoomPowerLevelsEvent(SynapseStateEvent): def get_content_template(self): return {} + class RoomAliasesEvent(SynapseStateEvent): TYPE = "m.room.aliases" diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index e798304353..bacba36755 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -549,7 +549,6 @@ class ReplicationLayer(object): origin, pdu.room_id, pdu.event_id, ) - if not backfilled: ret = yield self.handler.on_receive_pdu( pdu, diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index d84a44c211..95c40c6c1b 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -284,7 +284,7 @@ class TransportLayer(object): origin = None if request.method == "PUT": - #TODO: Handle other method types? other content types? + # TODO: Handle other method types? other content types? try: content_bytes = request.content.read() content = json.loads(content_bytes) @@ -296,11 +296,13 @@ class TransportLayer(object): try: params = auth.split(" ")[1].split(",") param_dict = dict(kv.split("=") for kv in params) + def strip_quotes(value): if value.startswith("\""): return value[1:-1] else: return value + origin = strip_quotes(param_dict["origin"]) key = strip_quotes(param_dict["key"]) sig = strip_quotes(param_dict["sig"]) @@ -321,7 +323,7 @@ class TransportLayer(object): if auth.startswith("X-Matrix"): (origin, key, sig) = parse_auth_header(auth) json_request["origin"] = origin - json_request["signatures"].setdefault(origin,{})[key] = sig + json_request["signatures"].setdefault(origin, {})[key] = sig if not json_request["signatures"]: raise SynapseError( @@ -515,7 +517,8 @@ class TransportLayer(object): return try: - code, response = yield self.received_handler.on_incoming_transaction( + handler = self.received_handler + code, response = yield handler.on_incoming_transaction( transaction_data ) except: diff --git a/synapse/federation/units.py b/synapse/federation/units.py index d98014cac7..f4e7b62bd9 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -192,7 +192,9 @@ class Transaction(JsonEncodedObject): transaction_id and origin_server_ts keys. """ if "origin_server_ts" not in kwargs: - raise KeyError("Require 'origin_server_ts' to construct a Transaction") + raise KeyError( + "Require 'origin_server_ts' to construct a Transaction" + ) if "transaction_id" not in kwargs: raise KeyError( "Require 'transaction_id' to construct a Transaction" @@ -204,6 +206,3 @@ class Transaction(JsonEncodedObject): kwargs["pdus"] = [p.get_dict() for p in pdus] return Transaction(**kwargs) - - - diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 587fa308c8..e909af6bd8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -521,6 +521,9 @@ class FederationHandler(BaseHandler): @log_function def _on_user_joined(self, user, room_id): - waiters = self.waiting_for_join_list.get((user.to_string(), room_id), []) + waiters = self.waiting_for_join_list.get( + (user.to_string(), room_id), + [] + ) while waiters: waiters.pop().callback(None) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 7d810e6a62..4034437f6b 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -494,11 +494,13 @@ def prepare_database(db_conn): user_version = row[0] if user_version > SCHEMA_VERSION: - raise ValueError("Cannot use this database as it is too " + + raise ValueError( + "Cannot use this database as it is too " + "new for the server to understand" ) elif user_version < SCHEMA_VERSION: - logging.info("Upgrading database from version %d", + logging.info( + "Upgrading database from version %d", user_version ) @@ -520,4 +522,3 @@ def prepare_database(db_conn): c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION) c.close() - diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index a707030145..a027db3868 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -215,7 +215,7 @@ class EventFederationStore(SQLBaseStore): min_depth = self._simple_select_one_onecol_txn( txn, table="room_depth", - keyvalues={"room_id": room_id,}, + keyvalues={"room_id": room_id}, retcol="min_depth", allow_none=True, ) @@ -267,10 +267,8 @@ class EventFederationStore(SQLBaseStore): } ) - - - # We only insert as a forward extremity the new pdu if there are no - # other pdus that reference it as a prev pdu + # We only insert as a forward extremity the new pdu if there are + # no other pdus that reference it as a prev pdu query = ( "INSERT OR IGNORE INTO %(table)s (event_id, room_id) " "SELECT ?, ? WHERE NOT EXISTS (" @@ -312,7 +310,6 @@ class EventFederationStore(SQLBaseStore): ) txn.execute(query) - def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occured before (and including) the pdus in pdu_list. Return a list of max size `limit`. -- cgit 1.4.1 From 092979b8cca4c602c54e1b39ee15c8ce030e949d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Nov 2014 14:19:13 +0000 Subject: Fix bugs which broke federation due to changes in function signatures. --- synapse/federation/replication.py | 4 ++-- synapse/state.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index bacba36755..5c625ddabf 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -510,7 +510,7 @@ class ReplicationLayer(object): @log_function def _handle_new_pdu(self, origin, pdu, backfilled=False): # We reprocess pdus when we have seen them only as outliers - existing = yield self._get_persisted_pdu(pdu.event_id) + existing = yield self._get_persisted_pdu(origin, pdu.event_id) if existing and (not existing.outlier or pdu.outlier): logger.debug("Already seen pdu %s", pdu.event_id) @@ -528,7 +528,7 @@ class ReplicationLayer(object): if min_depth and pdu.depth > min_depth: for event_id, hashes in pdu.prev_events: - exists = yield self._get_persisted_pdu(event_id) + exists = yield self._get_persisted_pdu(origin, event_id) if not exists: logger.debug("Requesting pdu %s", event_id) diff --git a/synapse/state.py b/synapse/state.py index a58acb3c1e..11c54fd38c 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -105,7 +105,7 @@ class StateHandler(object): defer.returnValue(res[1].get((event_type, state_key))) return - defer.returnValue(res.values()) + defer.returnValue(res[1].values()) @defer.inlineCallbacks @log_function -- cgit 1.4.1