From 1da15f05f5c9c1e47c9fd1323caff869c2e55aa3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 13 Dec 2019 12:55:32 +0000 Subject: sanity-checking for events used in state res (#6531) When we perform state resolution, check that all of the events involved are in the right room. --- tests/state/test_v2.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'tests/state/test_v2.py') diff --git a/tests/state/test_v2.py b/tests/state/test_v2.py index 8d3845c870..0f341d3ac3 100644 --- a/tests/state/test_v2.py +++ b/tests/state/test_v2.py @@ -58,6 +58,7 @@ class FakeEvent(object): self.type = type self.state_key = state_key self.content = content + self.room_id = ROOM_ID def to_event(self, auth_events, prev_events): """Given the auth_events and prev_events, convert to a Frozen Event @@ -418,6 +419,7 @@ class StateTestCase(unittest.TestCase): state_before = dict(state_at_event[prev_events[0]]) else: state_d = resolve_events_with_store( + ROOM_ID, RoomVersions.V2.identifier, [state_at_event[n] for n in prev_events], event_map=event_map, @@ -565,6 +567,7 @@ class SimpleParamStateTestCase(unittest.TestCase): # Test that we correctly handle passing `None` as the event_map state_d = resolve_events_with_store( + ROOM_ID, RoomVersions.V2.identifier, [self.state_at_bob, self.state_at_charlie], event_map=None, -- cgit 1.5.1 From 799001f2c0b31d72b95a252a3808da25987e1ed3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 7 Feb 2020 15:30:04 +0000 Subject: Add a `make_event_from_dict` method (#6858) ... and use it in places where it's trivial to do so. This will make it easier to pass room versions into the FrozenEvent constructors. --- changelog.d/6858.misc | 1 + synapse/events/__init__.py | 16 ++++++++++++++-- synapse/events/builder.py | 10 +++------- synapse/federation/federation_base.py | 5 ++--- tests/api/test_filtering.py | 4 ++-- tests/crypto/test_event_signing.py | 6 +++--- tests/events/test_utils.py | 9 +++++---- tests/federation/test_federation_server.py | 4 ++-- tests/replication/slave/storage/test_events.py | 12 ++++++++---- tests/state/test_v2.py | 4 ++-- tests/test_event_auth.py | 10 +++++----- tests/test_federation.py | 6 +++--- tests/test_state.py | 4 ++-- 13 files changed, 52 insertions(+), 39 deletions(-) create mode 100644 changelog.d/6858.misc (limited to 'tests/state/test_v2.py') diff --git a/changelog.d/6858.misc b/changelog.d/6858.misc new file mode 100644 index 0000000000..08aa80bcd9 --- /dev/null +++ b/changelog.d/6858.misc @@ -0,0 +1 @@ +Refactoring work in preparation for changing the event redaction algorithm. diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 89d41d82b6..a842661a90 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -16,12 +16,13 @@ import os from distutils.util import strtobool +from typing import Optional, Type import six from unpaddedbase64 import encode_base64 -from synapse.api.room_versions import EventFormatVersions +from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions from synapse.types import JsonDict from synapse.util.caches import intern_dict from synapse.util.frozenutils import freeze @@ -407,7 +408,7 @@ class FrozenEventV3(FrozenEventV2): return self._event_id -def event_type_from_format_version(format_version): +def event_type_from_format_version(format_version: int) -> Type[EventBase]: """Returns the python type to use to construct an Event object for the given event format version. @@ -427,3 +428,14 @@ def event_type_from_format_version(format_version): return FrozenEventV3 else: raise Exception("No event format %r" % (format_version,)) + + +def make_event_from_dict( + event_dict: JsonDict, + room_version: RoomVersion = RoomVersions.V1, + internal_metadata_dict: JsonDict = {}, + rejected_reason: Optional[str] = None, +) -> EventBase: + """Construct an EventBase from the given event dict""" + event_type = event_type_from_format_version(room_version.event_format) + return event_type(event_dict, internal_metadata_dict, rejected_reason) diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 8d63ad6dc3..a0c4a40c27 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -28,11 +28,7 @@ from synapse.api.room_versions import ( RoomVersion, ) from synapse.crypto.event_signing import add_hashes_and_signatures -from synapse.events import ( - EventBase, - _EventInternalMetadata, - event_type_from_format_version, -) +from synapse.events import EventBase, _EventInternalMetadata, make_event_from_dict from synapse.types import EventID, JsonDict from synapse.util import Clock from synapse.util.stringutils import random_string @@ -256,8 +252,8 @@ def create_local_event_from_event_dict( event_dict.setdefault("signatures", {}) add_hashes_and_signatures(room_version, event_dict, hostname, signing_key) - return event_type_from_format_version(format_version)( - event_dict, internal_metadata_dict=internal_metadata_dict + return make_event_from_dict( + event_dict, room_version, internal_metadata_dict=internal_metadata_dict ) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index ebe8b8e9fe..eea64c1c9f 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -29,7 +29,7 @@ from synapse.api.room_versions import ( RoomVersion, ) from synapse.crypto.event_signing import check_event_content_hash -from synapse.events import EventBase, event_type_from_format_version +from synapse.events import EventBase, make_event_from_dict from synapse.events.utils import prune_event from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import ( @@ -374,8 +374,7 @@ def event_from_pdu_json( elif depth > MAX_DEPTH: raise SynapseError(400, "Depth too large", Codes.BAD_JSON) - event = event_type_from_format_version(room_version.event_format)(pdu_json) - + event = make_event_from_dict(pdu_json, room_version) event.internal_metadata.outlier = outlier return event diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py index 63d8633582..4e67503cf0 100644 --- a/tests/api/test_filtering.py +++ b/tests/api/test_filtering.py @@ -25,7 +25,7 @@ from twisted.internet import defer from synapse.api.constants import EventContentFields from synapse.api.errors import SynapseError from synapse.api.filtering import Filter -from synapse.events import FrozenEvent +from synapse.events import make_event_from_dict from tests import unittest from tests.utils import DeferredMockCallable, MockHttpResource, setup_test_homeserver @@ -38,7 +38,7 @@ def MockEvent(**kwargs): kwargs["event_id"] = "fake_event_id" if "type" not in kwargs: kwargs["type"] = "fake_type" - return FrozenEvent(kwargs) + return make_event_from_dict(kwargs) class FilteringTestCase(unittest.TestCase): diff --git a/tests/crypto/test_event_signing.py b/tests/crypto/test_event_signing.py index 6143a50ab2..62f639a18d 100644 --- a/tests/crypto/test_event_signing.py +++ b/tests/crypto/test_event_signing.py @@ -19,7 +19,7 @@ from unpaddedbase64 import decode_base64 from synapse.api.room_versions import RoomVersions from synapse.crypto.event_signing import add_hashes_and_signatures -from synapse.events import FrozenEvent +from synapse.events import make_event_from_dict from tests import unittest @@ -54,7 +54,7 @@ class EventSigningTestCase(unittest.TestCase): RoomVersions.V1, event_dict, HOSTNAME, self.signing_key ) - event = FrozenEvent(event_dict) + event = make_event_from_dict(event_dict) self.assertTrue(hasattr(event, "hashes")) self.assertIn("sha256", event.hashes) @@ -88,7 +88,7 @@ class EventSigningTestCase(unittest.TestCase): RoomVersions.V1, event_dict, HOSTNAME, self.signing_key ) - event = FrozenEvent(event_dict) + event = make_event_from_dict(event_dict) self.assertTrue(hasattr(event, "hashes")) self.assertIn("sha256", event.hashes) diff --git a/tests/events/test_utils.py b/tests/events/test_utils.py index 2b13980dfd..45d55b9e94 100644 --- a/tests/events/test_utils.py +++ b/tests/events/test_utils.py @@ -13,8 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - -from synapse.events import FrozenEvent +from synapse.events import make_event_from_dict from synapse.events.utils import ( copy_power_levels_contents, prune_event, @@ -30,7 +29,7 @@ def MockEvent(**kwargs): kwargs["event_id"] = "fake_event_id" if "type" not in kwargs: kwargs["type"] = "fake_type" - return FrozenEvent(kwargs) + return make_event_from_dict(kwargs) class PruneEventTestCase(unittest.TestCase): @@ -38,7 +37,9 @@ class PruneEventTestCase(unittest.TestCase): `matchdict` when it is redacted. """ def run_test(self, evdict, matchdict): - self.assertEquals(prune_event(FrozenEvent(evdict)).get_dict(), matchdict) + self.assertEquals( + prune_event(make_event_from_dict(evdict)).get_dict(), matchdict + ) def test_minimal(self): self.run_test( diff --git a/tests/federation/test_federation_server.py b/tests/federation/test_federation_server.py index 1ec8c40901..e7d8699040 100644 --- a/tests/federation/test_federation_server.py +++ b/tests/federation/test_federation_server.py @@ -15,7 +15,7 @@ # limitations under the License. import logging -from synapse.events import FrozenEvent +from synapse.events import make_event_from_dict from synapse.federation.federation_server import server_matches_acl_event from synapse.rest import admin from synapse.rest.client.v1 import login, room @@ -105,7 +105,7 @@ class StateQueryTests(unittest.FederatingHomeserverTestCase): def _create_acl_event(content): - return FrozenEvent( + return make_event_from_dict( { "room_id": "!a:b", "event_id": "$a:b", diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index b1b037006d..d31210fbe4 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -15,7 +15,7 @@ import logging from canonicaljson import encode_canonical_json -from synapse.events import FrozenEvent, _EventInternalMetadata +from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_dict from synapse.events.snapshot import EventContext from synapse.handlers.room import RoomEventSource from synapse.replication.slave.storage.events import SlavedEventStore @@ -90,7 +90,9 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): msg_dict["content"] = {} msg_dict["unsigned"]["redacted_by"] = redaction.event_id msg_dict["unsigned"]["redacted_because"] = redaction - redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict()) + redacted = make_event_from_dict( + msg_dict, internal_metadata_dict=msg.internal_metadata.get_dict() + ) self.check("get_event", [msg.event_id], redacted) def test_backfilled_redactions(self): @@ -110,7 +112,9 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): msg_dict["content"] = {} msg_dict["unsigned"]["redacted_by"] = redaction.event_id msg_dict["unsigned"]["redacted_because"] = redaction - redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict()) + redacted = make_event_from_dict( + msg_dict, internal_metadata_dict=msg.internal_metadata.get_dict() + ) self.check("get_event", [msg.event_id], redacted) def test_invites(self): @@ -345,7 +349,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): if redacts is not None: event_dict["redacts"] = redacts - event = FrozenEvent(event_dict, internal_metadata_dict=internal) + event = make_event_from_dict(event_dict, internal_metadata_dict=internal) self.event_id += 1 diff --git a/tests/state/test_v2.py b/tests/state/test_v2.py index 0f341d3ac3..5bafad9f19 100644 --- a/tests/state/test_v2.py +++ b/tests/state/test_v2.py @@ -22,7 +22,7 @@ import attr from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.api.room_versions import RoomVersions from synapse.event_auth import auth_types_for_event -from synapse.events import FrozenEvent +from synapse.events import make_event_from_dict from synapse.state.v2 import lexicographical_topological_sort, resolve_events_with_store from synapse.types import EventID @@ -89,7 +89,7 @@ class FakeEvent(object): if self.state_key is not None: event_dict["state_key"] = self.state_key - return FrozenEvent(event_dict) + return make_event_from_dict(event_dict) # All graphs start with this set of events diff --git a/tests/test_event_auth.py b/tests/test_event_auth.py index ca20b085a2..bfa5d6f510 100644 --- a/tests/test_event_auth.py +++ b/tests/test_event_auth.py @@ -18,7 +18,7 @@ import unittest from synapse import event_auth from synapse.api.errors import AuthError from synapse.api.room_versions import RoomVersions -from synapse.events import FrozenEvent +from synapse.events import make_event_from_dict class EventAuthTestCase(unittest.TestCase): @@ -94,7 +94,7 @@ TEST_ROOM_ID = "!test:room" def _create_event(user_id): - return FrozenEvent( + return make_event_from_dict( { "room_id": TEST_ROOM_ID, "event_id": _get_event_id(), @@ -106,7 +106,7 @@ def _create_event(user_id): def _join_event(user_id): - return FrozenEvent( + return make_event_from_dict( { "room_id": TEST_ROOM_ID, "event_id": _get_event_id(), @@ -119,7 +119,7 @@ def _join_event(user_id): def _power_levels_event(sender, content): - return FrozenEvent( + return make_event_from_dict( { "room_id": TEST_ROOM_ID, "event_id": _get_event_id(), @@ -132,7 +132,7 @@ def _power_levels_event(sender, content): def _random_state_event(sender): - return FrozenEvent( + return make_event_from_dict( { "room_id": TEST_ROOM_ID, "event_id": _get_event_id(), diff --git a/tests/test_federation.py b/tests/test_federation.py index 68684460c6..9b5cf562f3 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -2,7 +2,7 @@ from mock import Mock from twisted.internet.defer import ensureDeferred, maybeDeferred, succeed -from synapse.events import FrozenEvent +from synapse.events import make_event_from_dict from synapse.logging.context import LoggingContext from synapse.types import Requester, UserID from synapse.util import Clock @@ -43,7 +43,7 @@ class MessageAcceptTests(unittest.TestCase): ) )[0] - join_event = FrozenEvent( + join_event = make_event_from_dict( { "room_id": self.room_id, "sender": "@baduser:test.serv", @@ -105,7 +105,7 @@ class MessageAcceptTests(unittest.TestCase): )[0] # Now lie about an event - lying_event = FrozenEvent( + lying_event = make_event_from_dict( { "room_id": self.room_id, "sender": "@baduser:test.serv", diff --git a/tests/test_state.py b/tests/test_state.py index 1e4449fa1c..d1578fe581 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -20,7 +20,7 @@ from twisted.internet import defer from synapse.api.auth import Auth from synapse.api.constants import EventTypes, Membership from synapse.api.room_versions import RoomVersions -from synapse.events import FrozenEvent +from synapse.events import make_event_from_dict from synapse.events.snapshot import EventContext from synapse.state import StateHandler, StateResolutionHandler @@ -66,7 +66,7 @@ def create_event( d.update(kwargs) - event = FrozenEvent(d) + event = make_event_from_dict(d) return event -- cgit 1.5.1 From 2b37eabca1e9355e2e2ab8f65bbdda12431ecc28 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 19 Feb 2020 15:04:47 +0000 Subject: Reduce auth chains fetched during v2 state res. (#6952) The state res v2 algorithm only cares about the difference between auth chains, so we can pass in the known common state to the `get_auth_chain` storage function so that it can ignore those events. --- changelog.d/6952.misc | 1 + synapse/state/__init__.py | 15 ++++++++---- synapse/state/v2.py | 2 +- .../storage/data_stores/main/event_federation.py | 28 ++++++++++++++++++---- tests/state/test_v2.py | 6 +++-- 5 files changed, 39 insertions(+), 13 deletions(-) create mode 100644 changelog.d/6952.misc (limited to 'tests/state/test_v2.py') diff --git a/changelog.d/6952.misc b/changelog.d/6952.misc new file mode 100644 index 0000000000..e26dc5cab8 --- /dev/null +++ b/changelog.d/6952.misc @@ -0,0 +1 @@ +Improve perf of v2 state res for large rooms. diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index fdd6bef6b4..df7a4f6a89 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -16,7 +16,7 @@ import logging from collections import namedtuple -from typing import Dict, Iterable, List, Optional +from typing import Dict, Iterable, List, Optional, Set from six import iteritems, itervalues @@ -662,7 +662,7 @@ class StateResolutionStore(object): allow_rejected=allow_rejected, ) - def get_auth_chain(self, event_ids): + def get_auth_chain(self, event_ids: List[str], ignore_events: Set[str]): """Gets the full auth chain for a set of events (including rejected events). @@ -674,11 +674,16 @@ class StateResolutionStore(object): presence of rejected events Args: - event_ids (list): The event IDs of the events to fetch the auth - chain for. Must be state events. + event_ids: The event IDs of the events to fetch the auth chain for. + Must be state events. + ignore_events: Set of events to exclude from the returned auth + chain. + Returns: Deferred[list[str]]: List of event IDs of the auth chain. """ - return self.store.get_auth_chain_ids(event_ids, include_given=True) + return self.store.get_auth_chain_ids( + event_ids, include_given=True, ignore_events=ignore_events, + ) diff --git a/synapse/state/v2.py b/synapse/state/v2.py index 531018c6a5..75fe58305a 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -248,7 +248,7 @@ def _get_auth_chain_difference(state_sets, event_map, state_res_store): and eid not in common ) - auth_chain = yield state_res_store.get_auth_chain(auth_ids) + auth_chain = yield state_res_store.get_auth_chain(auth_ids, common) auth_ids.update(auth_chain) auth_sets.append(auth_ids) diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py index 60c67457b4..e16da2577d 100644 --- a/synapse/storage/data_stores/main/event_federation.py +++ b/synapse/storage/data_stores/main/event_federation.py @@ -14,6 +14,7 @@ # limitations under the License. import itertools import logging +from typing import List, Optional, Set from six.moves import range from six.moves.queue import Empty, PriorityQueue @@ -46,21 +47,37 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas event_ids, include_given=include_given ).addCallback(self.get_events_as_list) - def get_auth_chain_ids(self, event_ids, include_given=False): + def get_auth_chain_ids( + self, + event_ids: List[str], + include_given: bool = False, + ignore_events: Optional[Set[str]] = None, + ): """Get auth events for given event_ids. The events *must* be state events. Args: - event_ids (list): state events - include_given (bool): include the given events in result + event_ids: state events + include_given: include the given events in result + ignore_events: Set of events to exclude from the returned auth + chain. This is useful if the caller will just discard the + given events anyway, and saves us from figuring out their auth + chains if not required. Returns: list of event_ids """ return self.db.runInteraction( - "get_auth_chain_ids", self._get_auth_chain_ids_txn, event_ids, include_given + "get_auth_chain_ids", + self._get_auth_chain_ids_txn, + event_ids, + include_given, + ignore_events, ) - def _get_auth_chain_ids_txn(self, txn, event_ids, include_given): + def _get_auth_chain_ids_txn(self, txn, event_ids, include_given, ignore_events): + if ignore_events is None: + ignore_events = set() + if include_given: results = set(event_ids) else: @@ -80,6 +97,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas txn.execute(base_sql + clause, list(args)) new_front.update([r[0] for r in txn]) + new_front -= ignore_events new_front -= results front = new_front diff --git a/tests/state/test_v2.py b/tests/state/test_v2.py index 5bafad9f19..5059ade850 100644 --- a/tests/state/test_v2.py +++ b/tests/state/test_v2.py @@ -603,7 +603,7 @@ class TestStateResolutionStore(object): return {eid: self.event_map[eid] for eid in event_ids if eid in self.event_map} - def get_auth_chain(self, event_ids): + def get_auth_chain(self, event_ids, ignore_events): """Gets the full auth chain for a set of events (including rejected events). @@ -617,6 +617,8 @@ class TestStateResolutionStore(object): Args: event_ids (list): The event IDs of the events to fetch the auth chain for. Must be state events. + ignore_events: Set of events to exclude from the returned auth + chain. Returns: Deferred[list[str]]: List of event IDs of the auth chain. @@ -627,7 +629,7 @@ class TestStateResolutionStore(object): stack = list(event_ids) while stack: event_id = stack.pop() - if event_id in result: + if event_id in result or event_id in ignore_events: continue result.add(event_id) -- cgit 1.5.1 From 4a17a647a9508b70de35130fd82e3e21474270a9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Mar 2020 16:46:41 +0000 Subject: Improve get auth chain difference algorithm. (#7095) It was originally implemented by pulling the full auth chain of all state sets out of the database and doing set comparison. However, that can take a lot work if the state and auth chains are large. Instead, lets try and fetch the auth chains at the same time and calculate the difference on the fly, allowing us to bail early if all the auth chains converge. Assuming that the auth chains do converge more often than not, this should improve performance. Hopefully. --- changelog.d/7095.misc | 1 + synapse/state/__init__.py | 28 ++-- synapse/state/v2.py | 32 +---- .../storage/data_stores/main/event_federation.py | 150 +++++++++++++++++++- tests/state/test_v2.py | 13 +- tests/storage/test_event_federation.py | 157 ++++++++++++++++++--- 6 files changed, 310 insertions(+), 71 deletions(-) create mode 100644 changelog.d/7095.misc (limited to 'tests/state/test_v2.py') diff --git a/changelog.d/7095.misc b/changelog.d/7095.misc new file mode 100644 index 0000000000..44fc9f616f --- /dev/null +++ b/changelog.d/7095.misc @@ -0,0 +1 @@ +Attempt to improve performance of state res v2 algorithm. diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index df7a4f6a89..4afefc6b1d 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -662,28 +662,16 @@ class StateResolutionStore(object): allow_rejected=allow_rejected, ) - def get_auth_chain(self, event_ids: List[str], ignore_events: Set[str]): - """Gets the full auth chain for a set of events (including rejected - events). - - Includes the given event IDs in the result. - - Note that: - 1. All events must be state events. - 2. For v1 rooms this may not have the full auth chain in the - presence of rejected events - - Args: - event_ids: The event IDs of the events to fetch the auth chain for. - Must be state events. - ignore_events: Set of events to exclude from the returned auth - chain. + def get_auth_chain_difference(self, state_sets: List[Set[str]]): + """Given sets of state events figure out the auth chain difference (as + per state res v2 algorithm). + This equivalent to fetching the full auth chain for each set of state + and returning the events that don't appear in each and every auth + chain. Returns: - Deferred[list[str]]: List of event IDs of the auth chain. + Deferred[Set[str]]: Set of event IDs. """ - return self.store.get_auth_chain_ids( - event_ids, include_given=True, ignore_events=ignore_events, - ) + return self.store.get_auth_chain_difference(state_sets) diff --git a/synapse/state/v2.py b/synapse/state/v2.py index 0ffe6d8c14..18484e2fa6 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -227,36 +227,12 @@ def _get_auth_chain_difference(state_sets, event_map, state_res_store): Returns: Deferred[set[str]]: Set of event IDs """ - common = set(itervalues(state_sets[0])).intersection( - *(itervalues(s) for s in state_sets[1:]) - ) - - auth_sets = [] - for state_set in state_sets: - auth_ids = { - eid - for key, eid in iteritems(state_set) - if ( - key[0] in (EventTypes.Member, EventTypes.ThirdPartyInvite) - or key - in ( - (EventTypes.PowerLevels, ""), - (EventTypes.Create, ""), - (EventTypes.JoinRules, ""), - ) - ) - and eid not in common - } - auth_chain = yield state_res_store.get_auth_chain(auth_ids, common) - auth_ids.update(auth_chain) - - auth_sets.append(auth_ids) - - intersection = set(auth_sets[0]).intersection(*auth_sets[1:]) - union = set().union(*auth_sets) + difference = yield state_res_store.get_auth_chain_difference( + [set(state_set.values()) for state_set in state_sets] + ) - return union - intersection + return difference def _seperate(state_sets): diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py index 49a7b8b433..62d4e9f599 100644 --- a/synapse/storage/data_stores/main/event_federation.py +++ b/synapse/storage/data_stores/main/event_federation.py @@ -14,7 +14,7 @@ # limitations under the License. import itertools import logging -from typing import List, Optional, Set +from typing import Dict, List, Optional, Set, Tuple from six.moves.queue import Empty, PriorityQueue @@ -103,6 +103,154 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas return list(results) + def get_auth_chain_difference(self, state_sets: List[Set[str]]): + """Given sets of state events figure out the auth chain difference (as + per state res v2 algorithm). + + This equivalent to fetching the full auth chain for each set of state + and returning the events that don't appear in each and every auth + chain. + + Returns: + Deferred[Set[str]] + """ + + return self.db.runInteraction( + "get_auth_chain_difference", + self._get_auth_chain_difference_txn, + state_sets, + ) + + def _get_auth_chain_difference_txn( + self, txn, state_sets: List[Set[str]] + ) -> Set[str]: + + # Algorithm Description + # ~~~~~~~~~~~~~~~~~~~~~ + # + # The idea here is to basically walk the auth graph of each state set in + # tandem, keeping track of which auth events are reachable by each state + # set. If we reach an auth event we've already visited (via a different + # state set) then we mark that auth event and all ancestors as reachable + # by the state set. This requires that we keep track of the auth chains + # in memory. + # + # Doing it in a such a way means that we can stop early if all auth + # events we're currently walking are reachable by all state sets. + # + # *Note*: We can't stop walking an event's auth chain if it is reachable + # by all state sets. This is because other auth chains we're walking + # might be reachable only via the original auth chain. For example, + # given the following auth chain: + # + # A -> C -> D -> E + # / / + # B -´---------´ + # + # and state sets {A} and {B} then walking the auth chains of A and B + # would immediately show that C is reachable by both. However, if we + # stopped at C then we'd only reach E via the auth chain of B and so E + # would errornously get included in the returned difference. + # + # The other thing that we do is limit the number of auth chains we walk + # at once, due to practical limits (i.e. we can only query the database + # with a limited set of parameters). We pick the auth chains we walk + # each iteration based on their depth, in the hope that events with a + # lower depth are likely reachable by those with higher depths. + # + # We could use any ordering that we believe would give a rough + # topological ordering, e.g. origin server timestamp. If the ordering + # chosen is not topological then the algorithm still produces the right + # result, but perhaps a bit more inefficiently. This is why it is safe + # to use "depth" here. + + initial_events = set(state_sets[0]).union(*state_sets[1:]) + + # Dict from events in auth chains to which sets *cannot* reach them. + # I.e. if the set is empty then all sets can reach the event. + event_to_missing_sets = { + event_id: {i for i, a in enumerate(state_sets) if event_id not in a} + for event_id in initial_events + } + + # We need to get the depth of the initial events for sorting purposes. + sql = """ + SELECT depth, event_id FROM events + WHERE %s + ORDER BY depth ASC + """ + clause, args = make_in_list_sql_clause( + txn.database_engine, "event_id", initial_events + ) + txn.execute(sql % (clause,), args) + + # The sorted list of events whose auth chains we should walk. + search = txn.fetchall() # type: List[Tuple[int, str]] + + # Map from event to its auth events + event_to_auth_events = {} # type: Dict[str, Set[str]] + + base_sql = """ + SELECT a.event_id, auth_id, depth + FROM event_auth AS a + INNER JOIN events AS e ON (e.event_id = a.auth_id) + WHERE + """ + + while search: + # Check whether all our current walks are reachable by all state + # sets. If so we can bail. + if all(not event_to_missing_sets[eid] for _, eid in search): + break + + # Fetch the auth events and their depths of the N last events we're + # currently walking + search, chunk = search[:-100], search[-100:] + clause, args = make_in_list_sql_clause( + txn.database_engine, "a.event_id", [e_id for _, e_id in chunk] + ) + txn.execute(base_sql + clause, args) + + for event_id, auth_event_id, auth_event_depth in txn: + event_to_auth_events.setdefault(event_id, set()).add(auth_event_id) + + sets = event_to_missing_sets.get(auth_event_id) + if sets is None: + # First time we're seeing this event, so we add it to the + # queue of things to fetch. + search.append((auth_event_depth, auth_event_id)) + + # Assume that this event is unreachable from any of the + # state sets until proven otherwise + sets = event_to_missing_sets[auth_event_id] = set( + range(len(state_sets)) + ) + else: + # We've previously seen this event, so look up its auth + # events and recursively mark all ancestors as reachable + # by the current event's state set. + a_ids = event_to_auth_events.get(auth_event_id) + while a_ids: + new_aids = set() + for a_id in a_ids: + event_to_missing_sets[a_id].intersection_update( + event_to_missing_sets[event_id] + ) + + b = event_to_auth_events.get(a_id) + if b: + new_aids.update(b) + + a_ids = new_aids + + # Mark that the auth event is reachable by the approriate sets. + sets.intersection_update(event_to_missing_sets[event_id]) + + search.sort() + + # Return all events where not all sets can reach them. + return {eid for eid, n in event_to_missing_sets.items() if n} + def get_oldest_events_in_room(self, room_id): return self.db.runInteraction( "get_oldest_events_in_room", self._get_oldest_events_in_room_txn, room_id diff --git a/tests/state/test_v2.py b/tests/state/test_v2.py index 5059ade850..a44960203e 100644 --- a/tests/state/test_v2.py +++ b/tests/state/test_v2.py @@ -603,7 +603,7 @@ class TestStateResolutionStore(object): return {eid: self.event_map[eid] for eid in event_ids if eid in self.event_map} - def get_auth_chain(self, event_ids, ignore_events): + def _get_auth_chain(self, event_ids): """Gets the full auth chain for a set of events (including rejected events). @@ -617,9 +617,6 @@ class TestStateResolutionStore(object): Args: event_ids (list): The event IDs of the events to fetch the auth chain for. Must be state events. - ignore_events: Set of events to exclude from the returned auth - chain. - Returns: Deferred[list[str]]: List of event IDs of the auth chain. """ @@ -629,7 +626,7 @@ class TestStateResolutionStore(object): stack = list(event_ids) while stack: event_id = stack.pop() - if event_id in result or event_id in ignore_events: + if event_id in result: continue result.add(event_id) @@ -639,3 +636,9 @@ class TestStateResolutionStore(object): stack.append(aid) return list(result) + + def get_auth_chain_difference(self, auth_sets): + chains = [frozenset(self._get_auth_chain(a)) for a in auth_sets] + + common = set(chains[0]).intersection(*chains[1:]) + return set(chains[0]).union(*chains[1:]) - common diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index a331517f4d..3aeec0dc0f 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -13,19 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer - import tests.unittest import tests.utils -class EventFederationWorkerStoreTestCase(tests.unittest.TestCase): - @defer.inlineCallbacks - def setUp(self): - hs = yield tests.utils.setup_test_homeserver(self.addCleanup) +class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): + def prepare(self, reactor, clock, hs): self.store = hs.get_datastore() - @defer.inlineCallbacks def test_get_prev_events_for_room(self): room_id = "@ROOM:local" @@ -61,15 +56,14 @@ class EventFederationWorkerStoreTestCase(tests.unittest.TestCase): ) for i in range(0, 20): - yield self.store.db.runInteraction("insert", insert_event, i) + self.get_success(self.store.db.runInteraction("insert", insert_event, i)) # this should get the last ten - r = yield self.store.get_prev_events_for_room(room_id) + r = self.get_success(self.store.get_prev_events_for_room(room_id)) self.assertEqual(10, len(r)) for i in range(0, 10): self.assertEqual("$event_%i:local" % (19 - i), r[i]) - @defer.inlineCallbacks def test_get_rooms_with_many_extremities(self): room1 = "#room1" room2 = "#room2" @@ -86,25 +80,154 @@ class EventFederationWorkerStoreTestCase(tests.unittest.TestCase): ) for i in range(0, 20): - yield self.store.db.runInteraction("insert", insert_event, i, room1) - yield self.store.db.runInteraction("insert", insert_event, i, room2) - yield self.store.db.runInteraction("insert", insert_event, i, room3) + self.get_success( + self.store.db.runInteraction("insert", insert_event, i, room1) + ) + self.get_success( + self.store.db.runInteraction("insert", insert_event, i, room2) + ) + self.get_success( + self.store.db.runInteraction("insert", insert_event, i, room3) + ) # Test simple case - r = yield self.store.get_rooms_with_many_extremities(5, 5, []) + r = self.get_success(self.store.get_rooms_with_many_extremities(5, 5, [])) self.assertEqual(len(r), 3) # Does filter work? - r = yield self.store.get_rooms_with_many_extremities(5, 5, [room1]) + r = self.get_success(self.store.get_rooms_with_many_extremities(5, 5, [room1])) self.assertTrue(room2 in r) self.assertTrue(room3 in r) self.assertEqual(len(r), 2) - r = yield self.store.get_rooms_with_many_extremities(5, 5, [room1, room2]) + r = self.get_success( + self.store.get_rooms_with_many_extremities(5, 5, [room1, room2]) + ) self.assertEqual(r, [room3]) # Does filter and limit work? - r = yield self.store.get_rooms_with_many_extremities(5, 1, [room1]) + r = self.get_success(self.store.get_rooms_with_many_extremities(5, 1, [room1])) self.assertTrue(r == [room2] or r == [room3]) + + def test_auth_difference(self): + room_id = "@ROOM:local" + + # The silly auth graph we use to test the auth difference algorithm, + # where the top are the most recent events. + # + # A B + # \ / + # D E + # \ | + # ` F C + # | /| + # G ´ | + # | \ | + # H I + # | | + # K J + + auth_graph = { + "a": ["e"], + "b": ["e"], + "c": ["g", "i"], + "d": ["f"], + "e": ["f"], + "f": ["g"], + "g": ["h", "i"], + "h": ["k"], + "i": ["j"], + "k": [], + "j": [], + } + + depth_map = { + "a": 7, + "b": 7, + "c": 4, + "d": 6, + "e": 6, + "f": 5, + "g": 3, + "h": 2, + "i": 2, + "k": 1, + "j": 1, + } + + # We rudely fiddle with the appropriate tables directly, as that's much + # easier than constructing events properly. + + def insert_event(txn, event_id, stream_ordering): + + depth = depth_map[event_id] + + self.store.db.simple_insert_txn( + txn, + table="events", + values={ + "event_id": event_id, + "room_id": room_id, + "depth": depth, + "topological_ordering": depth, + "type": "m.test", + "processed": True, + "outlier": False, + "stream_ordering": stream_ordering, + }, + ) + + self.store.db.simple_insert_many_txn( + txn, + table="event_auth", + values=[ + {"event_id": event_id, "room_id": room_id, "auth_id": a} + for a in auth_graph[event_id] + ], + ) + + next_stream_ordering = 0 + for event_id in auth_graph: + next_stream_ordering += 1 + self.get_success( + self.store.db.runInteraction( + "insert", insert_event, event_id, next_stream_ordering + ) + ) + + # Now actually test that various combinations give the right result: + + difference = self.get_success( + self.store.get_auth_chain_difference([{"a"}, {"b"}]) + ) + self.assertSetEqual(difference, {"a", "b"}) + + difference = self.get_success( + self.store.get_auth_chain_difference([{"a"}, {"b"}, {"c"}]) + ) + self.assertSetEqual(difference, {"a", "b", "c", "e", "f"}) + + difference = self.get_success( + self.store.get_auth_chain_difference([{"a", "c"}, {"b"}]) + ) + self.assertSetEqual(difference, {"a", "b", "c"}) + + difference = self.get_success( + self.store.get_auth_chain_difference([{"a"}, {"b"}, {"d"}]) + ) + self.assertSetEqual(difference, {"a", "b", "d", "e"}) + + difference = self.get_success( + self.store.get_auth_chain_difference([{"a"}, {"b"}, {"c"}, {"d"}]) + ) + self.assertSetEqual(difference, {"a", "b", "c", "d", "e", "f"}) + + difference = self.get_success( + self.store.get_auth_chain_difference([{"a"}, {"b"}, {"e"}]) + ) + self.assertSetEqual(difference, {"a", "b"}) + + difference = self.get_success(self.store.get_auth_chain_difference([{"a"}])) + self.assertSetEqual(difference, set()) -- cgit 1.5.1