summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/event_federation.py23
-rw-r--r--synapse/storage/events.py168
-rw-r--r--synapse/storage/events_worker.py44
4 files changed, 148 insertions, 89 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py

index 24329879e5..42cd3c83ad 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py
@@ -317,7 +317,7 @@ class DataStore(RoomMemberStore, RoomStore, thirty_days_ago_in_secs)) for row in txn: - if row[0] is 'unknown': + if row[0] == 'unknown': pass results[row[0]] = row[1] diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index d3b9dea1d6..38809ed0fc 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py
@@ -125,6 +125,29 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, return dict(txn) + @defer.inlineCallbacks + def get_max_depth_of(self, event_ids): + """Returns the max depth of a set of event IDs + + Args: + event_ids (list[str]) + + Returns + Deferred[int] + """ + rows = yield self._simple_select_many_batch( + table="events", + column="event_id", + iterable=event_ids, + retcols=("depth",), + desc="get_max_depth_of", + ) + + if not rows: + defer.returnValue(0) + else: + defer.returnValue(max(row["depth"] for row in rows)) + def _get_oldest_events_in_room_txn(self, txn, room_id): return self._simple_select_onecol_txn( txn, diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 3e1915fb87..81b250480d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py
@@ -904,106 +904,106 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): for room_id, current_state_tuple in iteritems(state_delta_by_room): - to_delete, to_insert = current_state_tuple - - # First we add entries to the current_state_delta_stream. We - # do this before updating the current_state_events table so - # that we can use it to calculate the `prev_event_id`. (This - # allows us to not have to pull out the existing state - # unnecessarily). - sql = """ - INSERT INTO current_state_delta_stream - (stream_id, room_id, type, state_key, event_id, prev_event_id) - SELECT ?, ?, ?, ?, ?, ( - SELECT event_id FROM current_state_events - WHERE room_id = ? AND type = ? AND state_key = ? - ) - """ - txn.executemany(sql, ( - ( - max_stream_order, room_id, etype, state_key, None, - room_id, etype, state_key, - ) - for etype, state_key in to_delete - # We sanity check that we're deleting rather than updating - if (etype, state_key) not in to_insert - )) - txn.executemany(sql, ( - ( - max_stream_order, room_id, etype, state_key, ev_id, - room_id, etype, state_key, - ) - for (etype, state_key), ev_id in iteritems(to_insert) - )) + to_delete, to_insert = current_state_tuple - # Now we actually update the current_state_events table - - txn.executemany( - "DELETE FROM current_state_events" - " WHERE room_id = ? AND type = ? AND state_key = ?", - ( - (room_id, etype, state_key) - for etype, state_key in itertools.chain(to_delete, to_insert) - ), + # First we add entries to the current_state_delta_stream. We + # do this before updating the current_state_events table so + # that we can use it to calculate the `prev_event_id`. (This + # allows us to not have to pull out the existing state + # unnecessarily). + sql = """ + INSERT INTO current_state_delta_stream + (stream_id, room_id, type, state_key, event_id, prev_event_id) + SELECT ?, ?, ?, ?, ?, ( + SELECT event_id FROM current_state_events + WHERE room_id = ? AND type = ? AND state_key = ? ) - - self._simple_insert_many_txn( - txn, - table="current_state_events", - values=[ - { - "event_id": ev_id, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - } - for key, ev_id in iteritems(to_insert) - ], + """ + txn.executemany(sql, ( + ( + max_stream_order, room_id, etype, state_key, None, + room_id, etype, state_key, ) - - txn.call_after( - self._curr_state_delta_stream_cache.entity_has_changed, - room_id, max_stream_order, + for etype, state_key in to_delete + # We sanity check that we're deleting rather than updating + if (etype, state_key) not in to_insert + )) + txn.executemany(sql, ( + ( + max_stream_order, room_id, etype, state_key, ev_id, + room_id, etype, state_key, ) + for (etype, state_key), ev_id in iteritems(to_insert) + )) - # Invalidate the various caches - - # Figure out the changes of membership to invalidate the - # `get_rooms_for_user` cache. - # We find out which membership events we may have deleted - # and which we have added, then we invlidate the caches for all - # those users. - members_changed = set( - state_key - for ev_type, state_key in itertools.chain(to_delete, to_insert) - if ev_type == EventTypes.Member - ) + # Now we actually update the current_state_events table - for member in members_changed: - self._invalidate_cache_and_stream( - txn, self.get_rooms_for_user_with_stream_ordering, (member,) - ) + txn.executemany( + "DELETE FROM current_state_events" + " WHERE room_id = ? AND type = ? AND state_key = ?", + ( + (room_id, etype, state_key) + for etype, state_key in itertools.chain(to_delete, to_insert) + ), + ) - for host in set(get_domain_from_id(u) for u in members_changed): - self._invalidate_cache_and_stream( - txn, self.is_host_joined, (room_id, host) - ) - self._invalidate_cache_and_stream( - txn, self.was_host_joined, (room_id, host) - ) + self._simple_insert_many_txn( + txn, + table="current_state_events", + values=[ + { + "event_id": ev_id, + "room_id": room_id, + "type": key[0], + "state_key": key[1], + } + for key, ev_id in iteritems(to_insert) + ], + ) + + txn.call_after( + self._curr_state_delta_stream_cache.entity_has_changed, + room_id, max_stream_order, + ) + + # Invalidate the various caches + + # Figure out the changes of membership to invalidate the + # `get_rooms_for_user` cache. + # We find out which membership events we may have deleted + # and which we have added, then we invlidate the caches for all + # those users. + members_changed = set( + state_key + for ev_type, state_key in itertools.chain(to_delete, to_insert) + if ev_type == EventTypes.Member + ) + for member in members_changed: self._invalidate_cache_and_stream( - txn, self.get_users_in_room, (room_id,) + txn, self.get_rooms_for_user_with_stream_ordering, (member,) ) + for host in set(get_domain_from_id(u) for u in members_changed): self._invalidate_cache_and_stream( - txn, self.get_room_summary, (room_id,) + txn, self.is_host_joined, (room_id, host) ) - self._invalidate_cache_and_stream( - txn, self.get_current_state_ids, (room_id,) + txn, self.was_host_joined, (room_id, host) ) + self._invalidate_cache_and_stream( + txn, self.get_users_in_room, (room_id,) + ) + + self._invalidate_cache_and_stream( + txn, self.get_room_summary, (room_id,) + ) + + self._invalidate_cache_and_stream( + txn, self.get_current_state_ids, (room_id,) + ) + def _update_forward_extremities_txn(self, txn, new_forward_extremities, max_stream_order): for room_id, new_extrem in iteritems(new_forward_extremities): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 0a0ca58fc4..57dae324c7 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py
@@ -21,13 +21,14 @@ from canonicaljson import json from twisted.internet import defer -from synapse.api.constants import EventFormatVersions +from synapse.api.constants import EventFormatVersions, EventTypes from synapse.api.errors import NotFoundError from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401 # these are only included to make the type annotations work from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.utils import prune_event from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import get_domain_from_id from synapse.util.logcontext import ( LoggingContext, PreserveLoggingContext, @@ -162,7 +163,6 @@ class EventsWorkerStore(SQLBaseStore): missing_events = yield self._enqueue_events( missing_events_ids, - check_redacted=check_redacted, allow_rejected=allow_rejected, ) @@ -174,6 +174,29 @@ class EventsWorkerStore(SQLBaseStore): if not entry: continue + # Starting in room version v3, some redactions need to be rechecked if we + # didn't have the redacted event at the time, so we recheck on read + # instead. + if not allow_rejected and entry.event.type == EventTypes.Redaction: + if entry.event.internal_metadata.need_to_check_redaction(): + orig = yield self.get_event( + entry.event.redacts, + allow_none=True, + allow_rejected=True, + get_prev_content=False, + ) + expected_domain = get_domain_from_id(entry.event.sender) + if orig and get_domain_from_id(orig.sender) == expected_domain: + # This redaction event is allowed. Mark as not needing a + # recheck. + entry.event.internal_metadata.recheck_redaction = False + else: + # We don't have the event that is being redacted, so we + # assume that the event isn't authorized for now. (If we + # later receive the event, then we will always redact + # it anyway, since we have this redaction) + continue + if allow_rejected or not entry.event.rejected_reason: if check_redacted and entry.redacted_event: event = entry.redacted_event @@ -197,7 +220,7 @@ class EventsWorkerStore(SQLBaseStore): defer.returnValue(events) def _invalidate_get_event_cache(self, event_id): - self._get_event_cache.invalidate((event_id,)) + self._get_event_cache.invalidate((event_id,)) def _get_events_from_cache(self, events, allow_rejected, update_metrics=True): """Fetch events from the caches @@ -310,7 +333,7 @@ class EventsWorkerStore(SQLBaseStore): self.hs.get_reactor().callFromThread(fire, event_list, e) @defer.inlineCallbacks - def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): + def _enqueue_events(self, events, allow_rejected=False): """Fetches events from the database using the _event_fetch_list. This allows batch and bulk fetching of events - it allows us to fetch events without having to create a new transaction for each request for events. @@ -443,6 +466,19 @@ class EventsWorkerStore(SQLBaseStore): # will serialise this field correctly redacted_event.unsigned["redacted_because"] = because + # Starting in room version v3, some redactions need to be + # rechecked if we didn't have the redacted event at the + # time, so we recheck on read instead. + if because.internal_metadata.need_to_check_redaction(): + expected_domain = get_domain_from_id(original_ev.sender) + if get_domain_from_id(because.sender) == expected_domain: + # This redaction event is allowed. Mark as not needing a + # recheck. + because.internal_metadata.recheck_redaction = False + else: + # Senders don't match, so the event isn't actually redacted + redacted_event = None + cache_entry = _EventCacheEntry( event=original_ev, redacted_event=redacted_event,