diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/_base.py | 10 | ||||
-rw-r--r-- | synapse/storage/background_updates.py | 2 | ||||
-rw-r--r-- | synapse/storage/engines/sqlite.py | 3 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 23 | ||||
-rw-r--r-- | synapse/storage/events.py | 1 | ||||
-rw-r--r-- | synapse/storage/events_worker.py | 57 | ||||
-rw-r--r-- | synapse/storage/monthly_active_users.py | 12 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 8 | ||||
-rw-r--r-- | synapse/storage/schema/delta/53/event_format_version.sql | 16 | ||||
-rw-r--r-- | synapse/storage/state.py | 24 |
10 files changed, 137 insertions, 19 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index f62f70b9f1..4872ff55b6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -27,7 +27,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.storage.engines import PostgresEngine +from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.util.caches.descriptors import Cache from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.stringutils import exception_to_unicode @@ -196,6 +196,12 @@ class SQLBaseStore(object): # A set of tables that are not safe to use native upserts in. self._unsafe_to_upsert_tables = {"user_ips"} + # We add the user_directory_search table to the blacklist on SQLite + # because the existing search table does not have an index, making it + # unsafe to use native upserts. + if isinstance(self.database_engine, Sqlite3Engine): + self._unsafe_to_upsert_tables.add("user_directory_search") + if self.database_engine.can_native_upsert: # Check ASAP (and then later, every 1s) to see if we have finished # background updates of tables that aren't safe to update. @@ -230,7 +236,7 @@ class SQLBaseStore(object): self._unsafe_to_upsert_tables.discard("user_ips") # If there's any tables left to check, reschedule to run. - if self._unsafe_to_upsert_tables: + if updates: self._clock.call_later( 15.0, run_as_background_process, diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 5fe1ca2de7..60cdc884e6 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -240,7 +240,7 @@ class BackgroundUpdateStore(SQLBaseStore): * An integer count of the number of items to update in this batch. The handler should return a deferred integer count of items updated. - The hander is responsible for updating the progress of the update. + The handler is responsible for updating the progress of the update. Args: update_name(str): The name of the update that this code handles. diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index c64d73ff21..059ab81055 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -15,7 +15,6 @@ import struct import threading -from sqlite3 import sqlite_version_info from synapse.storage.prepare_database import prepare_database @@ -37,7 +36,7 @@ class Sqlite3Engine(object): Do we support native UPSERTs? This requires SQLite3 3.24+, plus some more work we haven't done yet to tell what was inserted vs updated. """ - return sqlite_version_info >= (3, 24, 0) + return self.module.sqlite_version_info >= (3, 24, 0) def check_database(self, txn): pass diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index d3b9dea1d6..38809ed0fc 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -125,6 +125,29 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, return dict(txn) + @defer.inlineCallbacks + def get_max_depth_of(self, event_ids): + """Returns the max depth of a set of event IDs + + Args: + event_ids (list[str]) + + Returns + Deferred[int] + """ + rows = yield self._simple_select_many_batch( + table="events", + column="event_id", + iterable=event_ids, + retcols=("depth",), + desc="get_max_depth_of", + ) + + if not rows: + defer.returnValue(0) + else: + defer.returnValue(max(row["depth"] for row in rows)) + def _get_oldest_events_in_room_txn(self, txn, room_id): return self._simple_select_onecol_txn( txn, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 79e0276de6..3e1915fb87 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1268,6 +1268,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore event.internal_metadata.get_dict() ), "json": encode_json(event_dict(event)), + "format_version": event.format_version, } for event, _ in events_and_contexts ], diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index a8326f5296..ebe1429acb 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -21,13 +21,14 @@ from canonicaljson import json from twisted.internet import defer +from synapse.api.constants import EventFormatVersions, EventTypes from synapse.api.errors import NotFoundError +from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401 # these are only included to make the type annotations work -from synapse.events import EventBase # noqa: F401 -from synapse.events import FrozenEvent from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.utils import prune_event from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import get_domain_from_id from synapse.util.logcontext import ( LoggingContext, PreserveLoggingContext, @@ -162,7 +163,6 @@ class EventsWorkerStore(SQLBaseStore): missing_events = yield self._enqueue_events( missing_events_ids, - check_redacted=check_redacted, allow_rejected=allow_rejected, ) @@ -174,6 +174,29 @@ class EventsWorkerStore(SQLBaseStore): if not entry: continue + # Starting in room version v3, some redactions need to be rechecked if we + # didn't have the redacted event at the time, so we recheck on read + # instead. + if not allow_rejected and entry.event.type == EventTypes.Redaction: + if entry.event.internal_metadata.need_to_check_redaction(): + orig = yield self.get_event( + entry.event.redacts, + allow_none=True, + allow_rejected=True, + get_prev_content=False, + ) + expected_domain = get_domain_from_id(entry.event.sender) + if orig and get_domain_from_id(orig.sender) == expected_domain: + # This redaction event is allowed. Mark as not needing a + # recheck. + entry.event.internal_metadata.recheck_redaction = False + else: + # We don't have the event that is being redacted, so we + # assume that the event isn't authorized for now. (If we + # later receive the event, then we will always redact + # it anyway, since we have this redaction) + continue + if allow_rejected or not entry.event.rejected_reason: if check_redacted and entry.redacted_event: event = entry.redacted_event @@ -310,7 +333,7 @@ class EventsWorkerStore(SQLBaseStore): self.hs.get_reactor().callFromThread(fire, event_list, e) @defer.inlineCallbacks - def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): + def _enqueue_events(self, events, allow_rejected=False): """Fetches events from the database using the _event_fetch_list. This allows batch and bulk fetching of events - it allows us to fetch events without having to create a new transaction for each request for events. @@ -353,6 +376,7 @@ class EventsWorkerStore(SQLBaseStore): self._get_event_from_row, row["internal_metadata"], row["json"], row["redacts"], rejected_reason=row["rejects"], + format_version=row["format_version"], ) for row in rows ], @@ -377,6 +401,7 @@ class EventsWorkerStore(SQLBaseStore): " e.event_id as event_id, " " e.internal_metadata," " e.json," + " e.format_version, " " r.redacts as redacts," " rej.event_id as rejects " " FROM event_json as e" @@ -392,7 +417,7 @@ class EventsWorkerStore(SQLBaseStore): @defer.inlineCallbacks def _get_event_from_row(self, internal_metadata, js, redacted, - rejected_reason=None): + format_version, rejected_reason=None): with Measure(self._clock, "_get_event_from_row"): d = json.loads(js) internal_metadata = json.loads(internal_metadata) @@ -405,8 +430,13 @@ class EventsWorkerStore(SQLBaseStore): desc="_get_event_from_row_rejected_reason", ) - original_ev = FrozenEvent( - d, + if format_version is None: + # This means that we stored the event before we had the concept + # of a event format version, so it must be a V1 event. + format_version = EventFormatVersions.V1 + + original_ev = event_type_from_format_version(format_version)( + event_dict=d, internal_metadata_dict=internal_metadata, rejected_reason=rejected_reason, ) @@ -436,6 +466,19 @@ class EventsWorkerStore(SQLBaseStore): # will serialise this field correctly redacted_event.unsigned["redacted_because"] = because + # Starting in room version v3, some redactions need to be + # rechecked if we didn't have the redacted event at the + # time, so we recheck on read instead. + if because.internal_metadata.need_to_check_redaction(): + expected_domain = get_domain_from_id(original_ev.sender) + if get_domain_from_id(because.sender) == expected_domain: + # This redaction event is allowed. Mark as not needing a + # recheck. + because.internal_metadata.recheck_redaction = False + else: + # Senders don't match, so the event isn't actually redacted + redacted_event = None + cache_entry = _EventCacheEntry( event=original_ev, redacted_event=redacted_event, diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py index d6fc8edd4c..9e7e09b8c1 100644 --- a/synapse/storage/monthly_active_users.py +++ b/synapse/storage/monthly_active_users.py @@ -197,15 +197,21 @@ class MonthlyActiveUsersStore(SQLBaseStore): if is_support: return - is_insert = yield self.runInteraction( + yield self.runInteraction( "upsert_monthly_active_user", self.upsert_monthly_active_user_txn, user_id ) - if is_insert: - self.user_last_seen_monthly_active.invalidate((user_id,)) + user_in_mau = self.user_last_seen_monthly_active.cache.get( + (user_id,), + None, + update_metrics=False + ) + if user_in_mau is None: self.get_monthly_active_count.invalidate(()) + self.user_last_seen_monthly_active.invalidate((user_id,)) + def upsert_monthly_active_user_txn(self, txn, user_id): """Updates or inserts monthly active user member diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 0707f9a86a..592c1bcd33 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -588,12 +588,12 @@ class RoomMemberStore(RoomMemberWorkerStore): ) # We update the local_invites table only if the event is "current", - # i.e., its something that has just happened. - # The only current event that can also be an outlier is if its an - # invite that has come in across federation. + # i.e., its something that has just happened. If the event is an + # outlier it is only current if its an "out of band membership", + # like a remote invite or a rejection of a remote invite. is_new_state = not backfilled and ( not event.internal_metadata.is_outlier() - or event.internal_metadata.is_invite_from_remote() + or event.internal_metadata.is_out_of_band_membership() ) is_mine = self.hs.is_mine_id(event.state_key) if is_new_state and is_mine: diff --git a/synapse/storage/schema/delta/53/event_format_version.sql b/synapse/storage/schema/delta/53/event_format_version.sql new file mode 100644 index 0000000000..1d977c2834 --- /dev/null +++ b/synapse/storage/schema/delta/53/event_format_version.sql @@ -0,0 +1,16 @@ +/* Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE event_json ADD COLUMN format_version INTEGER; diff --git a/synapse/storage/state.py b/synapse/storage/state.py index a134e9b3e8..c3ab7db7ae 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -437,6 +437,30 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): create_event = yield self.get_event(create_id) defer.returnValue(create_event.content.get("room_version", "1")) + @defer.inlineCallbacks + def get_room_predecessor(self, room_id): + """Get the predecessor room of an upgraded room if one exists. + Otherwise return None. + + Args: + room_id (str) + + Returns: + Deferred[unicode|None]: predecessor room id + """ + state_ids = yield self.get_current_state_ids(room_id) + create_id = state_ids.get((EventTypes.Create, "")) + + # If we can't find the create event, assume we've hit a dead end + if not create_id: + defer.returnValue(None) + + # Retrieve the room's create event + create_event = yield self.get_event(create_id) + + # Return predecessor if present + defer.returnValue(create_event.content.get("predecessor", None)) + @cached(max_entries=100000, iterable=True) def get_current_state_ids(self, room_id): """Get the current state event ids for a room based on the |