diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/federation.py | 32 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 15 | ||||
-rw-r--r-- | synapse/storage/_base.py | 7 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 31 | ||||
-rw-r--r-- | synapse/storage/schema/event_signatures.sql | 6 | ||||
-rw-r--r-- | synapse/storage/schema/state.sql | 13 | ||||
-rw-r--r-- | synapse/storage/signatures.py | 24 | ||||
-rw-r--r-- | synapse/storage/state.py | 84 |
8 files changed, 152 insertions, 60 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 99655c8bb0..5e096f4652 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -40,6 +40,8 @@ class FederationHandler(BaseHandler): of the home server (including auth and state conflict resoultion) b) converting events that were produced by local clients that may need to be sent to remote home servers. + c) doing the necessary dances to invite remote users and join remote + rooms. """ def __init__(self, hs): @@ -102,6 +104,8 @@ class FederationHandler(BaseHandler): logger.debug("Got event: %s", event.event_id) + # If we are currently in the process of joining this room, then we + # queue up events for later processing. if event.room_id in self.room_queues: self.room_queues[event.room_id].append(pdu) return @@ -187,6 +191,8 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks def backfill(self, dest, room_id, limit): + """ Trigger a backfill request to `dest` for the given `room_id` + """ extremities = yield self.store.get_oldest_events_in_room(room_id) pdus = yield self.replication_layer.backfill( @@ -212,6 +218,10 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def send_invite(self, target_host, event): + """ Sends the invite to the remote server for signing. + + Invites must be signed by the invitee's server before distribution. + """ pdu = yield self.replication_layer.send_invite( destination=target_host, context=event.room_id, @@ -229,6 +239,17 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks def do_invite_join(self, target_host, room_id, joinee, content, snapshot): + """ Attempts to join the `joinee` to the room `room_id` via the + server `target_host`. + + This first triggers a /make_join/ request that returns a partial + event that we can fill out and sign. This is then sent to the + remote server via /send_join/ which responds with the state at that + event and the auth_chains. + + We suspend processing of any received events from this room until we + have finished processing the join. + """ pdu = yield self.replication_layer.make_join( target_host, room_id, @@ -313,6 +334,10 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function def on_make_join_request(self, context, user_id): + """ We've received a /make_join/ request, so we create a partial + join event for the room and return that. We don *not* persist or + process it until the other server has signed it and sent it back. + """ event = self.event_factory.create_event( etype=RoomMemberEvent.TYPE, content={"membership": Membership.JOIN}, @@ -335,6 +360,9 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function def on_send_join_request(self, origin, pdu): + """ We have received a join event for a room. Fully process it and + respond with the current state and auth chains. + """ event = self.pdu_codec.event_from_pdu(pdu) event.outlier = False @@ -403,6 +431,10 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def on_invite_request(self, origin, pdu): + """ We've got an invite event. Process and persist it. Sign it. + + Respond with the now signed event. + """ event = self.pdu_codec.event_from_pdu(pdu) event.outlier = True diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 72290eb5a0..d8f351a675 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -279,13 +279,14 @@ class DataStore(RoomMemberStore, RoomStore, ) if hasattr(event, "signatures"): - signatures = event.signatures.get(event.origin, {}) - - for key_id, signature_base64 in signatures.items(): - signature_bytes = decode_base64(signature_base64) - self._store_event_origin_signature_txn( - txn, event.event_id, event.origin, key_id, signature_bytes, - ) + logger.debug("sigs: %s", event.signatures) + for name, sigs in event.signatures.items(): + for key_id, signature_base64 in sigs.items(): + signature_bytes = decode_base64(signature_base64) + self._store_event_signature_txn( + txn, event.event_id, name, key_id, + signature_bytes, + ) for prev_event_id, prev_hashes in event.prev_events: for alg, hash_base64 in prev_hashes.items(): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a1ee0318f6..670387b04a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -470,12 +470,15 @@ class SQLBaseStore(object): select_event_sql = "SELECT * FROM events WHERE event_id = ?" for i, ev in enumerate(events): - signatures = self._get_event_origin_signatures_txn( + signatures = self._get_event_signatures_txn( txn, ev.event_id, ) ev.signatures = { - k: encode_base64(v) for k, v in signatures.items() + n: { + k: encode_base64(v) for k, v in s.items() + } + for n, s in signatures.items() } prevs = self._get_prev_events_and_state(txn, ev.event_id) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index a027db3868..6c559f8f63 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -23,6 +23,14 @@ logger = logging.getLogger(__name__) class EventFederationStore(SQLBaseStore): + """ Responsible for storing and serving up the various graphs associated + with an event. Including the main event graph and the auth chains for an + event. + + Also has methods for getting the front (latest) and back (oldest) edges + of the event graphs. These are used to generate the parents for new events + and backfilling from another server respectively. + """ def get_auth_chain(self, event_id): return self.runInteraction( @@ -205,6 +213,8 @@ class EventFederationStore(SQLBaseStore): return results def get_min_depth(self, room_id): + """ For hte given room, get the minimum depth we have seen for it. + """ return self.runInteraction( "get_min_depth", self._get_min_depth_interaction, @@ -240,6 +250,10 @@ class EventFederationStore(SQLBaseStore): def _handle_prev_events(self, txn, outlier, event_id, prev_events, room_id): + """ + For the given event, update the event edges table and forward and + backward extremities tables. + """ for e_id, _ in prev_events: # TODO (erikj): This could be done as a bulk insert self._simple_insert_txn( @@ -267,8 +281,8 @@ class EventFederationStore(SQLBaseStore): } ) - # We only insert as a forward extremity the new pdu if there are - # no other pdus that reference it as a prev pdu + # We only insert as a forward extremity the new event if there are + # no other events that reference it as a prev event query = ( "INSERT OR IGNORE INTO %(table)s (event_id, room_id) " "SELECT ?, ? WHERE NOT EXISTS (" @@ -284,7 +298,7 @@ class EventFederationStore(SQLBaseStore): txn.execute(query, (event_id, room_id, event_id)) - # Insert all the prev_pdus as a backwards thing, they'll get + # Insert all the prev_events as a backwards thing, they'll get # deleted in a second if they're incorrect anyway. for e_id, _ in prev_events: # TODO (erikj): This could be done as a bulk insert @@ -299,7 +313,7 @@ class EventFederationStore(SQLBaseStore): ) # Also delete from the backwards extremities table all ones that - # reference pdus that we have already seen + # reference events that we have already seen query = ( "DELETE FROM event_backward_extremities WHERE EXISTS (" "SELECT 1 FROM events " @@ -311,17 +325,14 @@ class EventFederationStore(SQLBaseStore): txn.execute(query) def get_backfill_events(self, room_id, event_list, limit): - """Get a list of Events for a given topic that occured before (and - including) the pdus in pdu_list. Return a list of max size `limit`. + """Get a list of Events for a given topic that occurred before (and + including) the events in event_list. Return a list of max size `limit` Args: txn room_id (str) event_list (list) limit (int) - - Return: - list: A list of PduTuples """ return self.runInteraction( "get_backfill_events", @@ -334,7 +345,6 @@ class EventFederationStore(SQLBaseStore): room_id, repr(event_list), limit ) - # We seed the pdu_results with the things from the pdu_list. event_results = event_list front = event_list @@ -373,5 +383,4 @@ class EventFederationStore(SQLBaseStore): front = new_front event_results += new_front - # We also want to update the `prev_pdus` attributes before returning. return self._get_events_txn(txn, event_results) diff --git a/synapse/storage/schema/event_signatures.sql b/synapse/storage/schema/event_signatures.sql index 5491c7ecec..4efa8a3e63 100644 --- a/synapse/storage/schema/event_signatures.sql +++ b/synapse/storage/schema/event_signatures.sql @@ -37,15 +37,15 @@ CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes ( ); -CREATE TABLE IF NOT EXISTS event_origin_signatures ( +CREATE TABLE IF NOT EXISTS event_signatures ( event_id TEXT, - origin TEXT, + signature_name TEXT, key_id TEXT, signature BLOB, CONSTRAINT uniqueness UNIQUE (event_id, key_id) ); -CREATE INDEX IF NOT EXISTS event_origin_signatures_id ON event_origin_signatures ( +CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures ( event_id ); diff --git a/synapse/storage/schema/state.sql b/synapse/storage/schema/state.sql index b44c56b519..44f7aafb27 100644 --- a/synapse/storage/schema/state.sql +++ b/synapse/storage/schema/state.sql @@ -30,4 +30,17 @@ CREATE TABLE IF NOT EXISTS state_groups_state( CREATE TABLE IF NOT EXISTS event_to_state_groups( event_id TEXT NOT NULL, state_group INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id); + +CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state( + state_group +); +CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state( + room_id, type, state_key +); + +CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups( + event_id ); \ No newline at end of file diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 84a49088a2..d90e08fff1 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -103,24 +103,30 @@ class SignatureStore(SQLBaseStore): or_ignore=True, ) - - def _get_event_origin_signatures_txn(self, txn, event_id): + def _get_event_signatures_txn(self, txn, event_id): """Get all the signatures for a given PDU. Args: txn (cursor): event_id (str): Id for the Event. Returns: - A dict of key_id -> signature_bytes. + A dict of sig name -> dict(key_id -> signature_bytes) """ query = ( - "SELECT key_id, signature" - " FROM event_origin_signatures" + "SELECT signature_name, key_id, signature" + " FROM event_signatures" " WHERE event_id = ? " ) txn.execute(query, (event_id, )) - return dict(txn.fetchall()) + rows = txn.fetchall() + + res = {} + + for name, key, sig in rows: + res.setdefault(name, {})[key] = sig + + return res - def _store_event_origin_signature_txn(self, txn, event_id, origin, key_id, + def _store_event_signature_txn(self, txn, event_id, signature_name, key_id, signature_bytes): """Store a signature from the origin server for a PDU. Args: @@ -132,10 +138,10 @@ class SignatureStore(SQLBaseStore): """ self._simple_insert_txn( txn, - "event_origin_signatures", + "event_signatures", { "event_id": event_id, - "origin": origin, + "signature_name": signature_name, "key_id": key_id, "signature": buffer(signature_bytes), }, diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 2f3a70b4e5..55ea567793 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -14,43 +14,71 @@ # limitations under the License. from ._base import SQLBaseStore -from twisted.internet import defer class StateStore(SQLBaseStore): + """ Keeps track of the state at a given event. + + This is done by the concept of `state groups`. Every event is a assigned + a state group (identified by an arbitrary string), which references a + collection of state events. The current state of an event is then the + collection of state events referenced by the event's state group. + + Hence, every change in the current state causes a new state group to be + generated. However, if no change happens (e.g., if we get a message event + with only one parent it inherits the state group from its parent.) + + There are three tables: + * `state_groups`: Stores group name, first event with in the group and + room id. + * `event_to_state_groups`: Maps events to state groups. + * `state_groups_state`: Maps state group to state events. + """ - @defer.inlineCallbacks def get_state_groups(self, event_ids): - groups = set() - for event_id in event_ids: - group = yield self._simple_select_one_onecol( - table="event_to_state_groups", - keyvalues={"event_id": event_id}, - retcol="state_group", - allow_none=True, - ) - if group: - groups.add(group) - - res = {} - for group in groups: - state_ids = yield self._simple_select_onecol( - table="state_groups_state", - keyvalues={"state_group": group}, - retcol="event_id", - ) - state = [] - for state_id in state_ids: - s = yield self.get_event( - state_id, + """ Get the state groups for the given list of event_ids + + The return value is a dict mapping group names to lists of events. + """ + + def f(txn): + groups = set() + for event_id in event_ids: + group = self._simple_select_one_onecol_txn( + txn, + table="event_to_state_groups", + keyvalues={"event_id": event_id}, + retcol="state_group", allow_none=True, ) - if s: - state.append(s) + if group: + groups.add(group) - res[group] = state + res = {} + for group in groups: + state_ids = self._simple_select_onecol_txn( + txn, + table="state_groups_state", + keyvalues={"state_group": group}, + retcol="event_id", + ) + state = [] + for state_id in state_ids: + s = self._get_events_txn( + txn, + [state_id], + ) + if s: + state.extend(s) + + res[group] = state - defer.returnValue(res) + return res + + return self.runInteraction( + "get_state_groups", + f, + ) def store_state_groups(self, event): return self.runInteraction( |