diff options
Diffstat (limited to 'synapse/storage')
19 files changed, 365 insertions, 281 deletions
diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py index eac5a4e55b..f39f556c20 100644 --- a/synapse/storage/data_stores/main/cache.py +++ b/synapse/storage/data_stores/main/cache.py @@ -16,10 +16,12 @@ import itertools import logging -from typing import Any, Iterable, Optional, Tuple +from typing import Any, Iterable, List, Optional, Tuple from synapse.api.constants import EventTypes +from synapse.replication.tcp.streams import BackfillStream, CachesStream from synapse.replication.tcp.streams.events import ( + EventsStream, EventsStreamCurrentStateRow, EventsStreamEventRow, ) @@ -44,13 +46,30 @@ class CacheInvalidationWorkerStore(SQLBaseStore): async def get_all_updated_caches( self, instance_name: str, last_id: int, current_id: int, limit: int - ): - """Fetches cache invalidation rows between the two given IDs written - by the given instance. Returns at most `limit` rows. + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for caches replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data """ if last_id == current_id: - return [] + return [], current_id, False def get_all_updated_caches_txn(txn): # We purposefully don't bound by the current token, as we want to @@ -64,17 +83,24 @@ class CacheInvalidationWorkerStore(SQLBaseStore): LIMIT ? """ txn.execute(sql, (last_id, instance_name, limit)) - return txn.fetchall() + updates = [(row[0], row[1:]) for row in txn] + limited = False + upto_token = current_id + if len(updates) >= limit: + upto_token = updates[-1][0] + limited = True + + return updates, upto_token, limited return await self.db.runInteraction( "get_all_updated_caches", get_all_updated_caches_txn ) def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == "events": + if stream_name == EventsStream.NAME: for row in rows: self._process_event_stream_row(token, row) - elif stream_name == "backfill": + elif stream_name == BackfillStream.NAME: for row in rows: self._invalidate_caches_for_event( -token, @@ -86,7 +112,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): row.relates_to, backfilled=True, ) - elif stream_name == "caches": + elif stream_name == CachesStream.NAME: if self._cache_id_gen: self._cache_id_gen.advance(instance_name, token) diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py index 9a1178fb39..d313b9705f 100644 --- a/synapse/storage/data_stores/main/deviceinbox.py +++ b/synapse/storage/data_stores/main/deviceinbox.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import List, Tuple from canonicaljson import json @@ -207,31 +208,46 @@ class DeviceInboxWorkerStore(SQLBaseStore): "delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn ) - def get_all_new_device_messages(self, last_pos, current_pos, limit): - """ + async def get_all_new_device_messages( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for to device replication stream. + Args: - last_pos(int): - current_pos(int): - limit(int): + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + Returns: - A deferred list of rows from the device inbox + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data """ - if last_pos == current_pos: - return defer.succeed([]) + + if last_id == current_id: + return [], current_id, False def get_all_new_device_messages_txn(txn): # We limit like this as we might have multiple rows per stream_id, and # we want to make sure we always get all entries for any stream_id # we return. - upper_pos = min(current_pos, last_pos + limit) + upper_pos = min(current_id, last_id + limit) sql = ( "SELECT max(stream_id), user_id" " FROM device_inbox" " WHERE ? < stream_id AND stream_id <= ?" " GROUP BY user_id" ) - txn.execute(sql, (last_pos, upper_pos)) - rows = txn.fetchall() + txn.execute(sql, (last_id, upper_pos)) + updates = [(row[0], row[1:]) for row in txn] sql = ( "SELECT max(stream_id), destination" @@ -239,15 +255,21 @@ class DeviceInboxWorkerStore(SQLBaseStore): " WHERE ? < stream_id AND stream_id <= ?" " GROUP BY destination" ) - txn.execute(sql, (last_pos, upper_pos)) - rows.extend(txn) + txn.execute(sql, (last_id, upper_pos)) + updates.extend((row[0], row[1:]) for row in txn) # Order by ascending stream ordering - rows.sort() + updates.sort() - return rows + limited = False + upto_token = current_id + if len(updates) >= limit: + upto_token = updates[-1][0] + limited = True - return self.db.runInteraction( + return updates, upto_token, limited + + return await self.db.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn ) diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 0ff0542453..343cf9a2d5 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -582,32 +582,58 @@ class DeviceWorkerStore(SQLBaseStore): return set() async def get_all_device_list_changes_for_remotes( - self, from_key: int, to_key: int, limit: int, - ) -> List[Tuple[int, str]]: - """Return a list of `(stream_id, entity)` which is the combined list of - changes to devices and which destinations need to be poked. Entity is - either a user ID (starting with '@') or a remote destination. - """ + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for device lists replication stream. - # This query Does The Right Thing where it'll correctly apply the - # bounds to the inner queries. - sql = """ - SELECT stream_id, entity FROM ( - SELECT stream_id, user_id AS entity FROM device_lists_stream - UNION ALL - SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes - ) AS e - WHERE ? < stream_id AND stream_id <= ? - LIMIT ? + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data """ - return await self.db.execute( + if last_id == current_id: + return [], current_id, False + + def _get_all_device_list_changes_for_remotes(txn): + # This query Does The Right Thing where it'll correctly apply the + # bounds to the inner queries. + sql = """ + SELECT stream_id, entity FROM ( + SELECT stream_id, user_id AS entity FROM device_lists_stream + UNION ALL + SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes + ) AS e + WHERE ? < stream_id AND stream_id <= ? + LIMIT ? + """ + + txn.execute(sql, (last_id, current_id, limit)) + updates = [(row[0], row[1:]) for row in txn] + limited = False + upto_token = current_id + if len(updates) >= limit: + upto_token = updates[-1][0] + limited = True + + return updates, upto_token, limited + + return await self.db.runInteraction( "get_all_device_list_changes_for_remotes", - None, - sql, - from_key, - to_key, - limit, + _get_all_device_list_changes_for_remotes, ) @cached(max_entries=10000) diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py index 1a0842d4b0..6c3cff82e1 100644 --- a/synapse/storage/data_stores/main/end_to_end_keys.py +++ b/synapse/storage/data_stores/main/end_to_end_keys.py @@ -14,7 +14,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, List +from typing import Dict, List, Tuple from canonicaljson import encode_canonical_json, json @@ -479,34 +479,61 @@ class EndToEndKeyWorkerStore(SQLBaseStore): return result - def get_all_user_signature_changes_for_remotes(self, from_key, to_key, limit): - """Return a list of changes from the user signature stream to notify remotes. + async def get_all_user_signature_changes_for_remotes( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for groups replication stream. + Note that the user signature stream represents when a user signs their device with their user-signing key, which is not published to other users or servers, so no `destination` is needed in the returned list. However, this is needed to poke workers. Args: - from_key (int): the stream ID to start at (exclusive) - to_key (int): the stream ID to end at (inclusive) + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. Returns: - Deferred[list[(int,str)]] a list of `(stream_id, user_id)` - """ - sql = """ - SELECT stream_id, from_user_id AS user_id - FROM user_signature_stream - WHERE ? < stream_id AND stream_id <= ? - ORDER BY stream_id ASC - LIMIT ? + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data """ - return self.db.execute( + + if last_id == current_id: + return [], current_id, False + + def _get_all_user_signature_changes_for_remotes_txn(txn): + sql = """ + SELECT stream_id, from_user_id AS user_id + FROM user_signature_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + LIMIT ? + """ + txn.execute(sql, (last_id, current_id, limit)) + + updates = [(row[0], (row[1:])) for row in txn] + + limited = False + upto_token = current_id + if len(updates) >= limit: + upto_token = updates[-1][0] + limited = True + + return updates, upto_token, limited + + return await self.db.runInteraction( "get_all_user_signature_changes_for_remotes", - None, - sql, - from_key, - to_key, - limit, + _get_all_user_signature_changes_for_remotes_txn, ) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index cfd24d2f06..230fb5cd7f 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -14,7 +14,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import itertools import logging from collections import OrderedDict, namedtuple @@ -28,12 +27,7 @@ from prometheus_client import Counter from twisted.internet import defer import synapse.metrics -from synapse.api.constants import ( - EventContentFields, - EventTypes, - Membership, - RelationTypes, -) +from synapse.api.constants import EventContentFields, EventTypes, RelationTypes from synapse.api.room_versions import RoomVersions from synapse.crypto.event_signing import compute_event_reference_hash from synapse.events import EventBase # noqa: F401 @@ -48,8 +42,8 @@ from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.iterutils import batch_iter if TYPE_CHECKING: - from synapse.storage.data_stores.main import DataStore from synapse.server import HomeServer + from synapse.storage.data_stores.main import DataStore logger = logging.getLogger(__name__) @@ -820,7 +814,6 @@ class PersistEventsStore: "event_reference_hashes", "event_search", "event_to_state_groups", - "local_invites", "state_events", "rejections", "redactions", @@ -1197,65 +1190,27 @@ class PersistEventsStore: (event.state_key,), ) - # We update the local_invites table only if the event is "current", - # i.e., its something that has just happened. If the event is an - # outlier it is only current if its an "out of band membership", - # like a remote invite or a rejection of a remote invite. - is_new_state = not backfilled and ( - not event.internal_metadata.is_outlier() - or event.internal_metadata.is_out_of_band_membership() - ) - is_mine = self.is_mine_id(event.state_key) - if is_new_state and is_mine: - if event.membership == Membership.INVITE: - self.db.simple_insert_txn( - txn, - table="local_invites", - values={ - "event_id": event.event_id, - "invitee": event.state_key, - "inviter": event.sender, - "room_id": event.room_id, - "stream_id": event.internal_metadata.stream_ordering, - }, - ) - else: - sql = ( - "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE" - " room_id = ? AND invitee = ? AND locally_rejected is NULL" - " AND replaced_by is NULL" - ) - - txn.execute( - sql, - ( - event.internal_metadata.stream_ordering, - event.event_id, - event.room_id, - event.state_key, - ), - ) - - # We also update the `local_current_membership` table with - # latest invite info. This will usually get updated by the - # `current_state_events` handling, unless its an outlier. - if event.internal_metadata.is_outlier(): - # This should only happen for out of band memberships, so - # we add a paranoia check. - assert event.internal_metadata.is_out_of_band_membership() - - self.db.simple_upsert_txn( - txn, - table="local_current_membership", - keyvalues={ - "room_id": event.room_id, - "user_id": event.state_key, - }, - values={ - "event_id": event.event_id, - "membership": event.membership, - }, - ) + # We update the local_current_membership table only if the event is + # "current", i.e., its something that has just happened. + # + # This will usually get updated by the `current_state_events` handling, + # unless its an outlier, and an outlier is only "current" if it's an "out of + # band membership", like a remote invite or a rejection of a remote invite. + if ( + self.is_mine_id(event.state_key) + and not backfilled + and event.internal_metadata.is_outlier() + and event.internal_metadata.is_out_of_band_membership() + ): + self.db.simple_upsert_txn( + txn, + table="local_current_membership", + keyvalues={"room_id": event.room_id, "user_id": event.state_key}, + values={ + "event_id": event.event_id, + "membership": event.membership, + }, + ) def _handle_event_relations(self, txn, event): """Handles inserting relation data during peristence of events @@ -1586,31 +1541,3 @@ class PersistEventsStore: if not ev.internal_metadata.is_outlier() ], ) - - async def locally_reject_invite(self, user_id: str, room_id: str) -> int: - """Mark the invite has having been rejected even though we failed to - create a leave event for it. - """ - - sql = ( - "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE" - " room_id = ? AND invitee = ? AND locally_rejected is NULL" - " AND replaced_by is NULL" - ) - - def f(txn, stream_ordering): - txn.execute(sql, (stream_ordering, True, room_id, user_id)) - - # We also clear this entry from `local_current_membership`. - # Ideally we'd point to a leave event, but we don't have one, so - # nevermind. - self.db.simple_delete_txn( - txn, - table="local_current_membership", - keyvalues={"room_id": room_id, "user_id": user_id}, - ) - - with self._stream_id_gen.get_next() as stream_ordering: - await self.db.runInteraction("locally_reject_invite", f, stream_ordering) - - return stream_ordering diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index a48c7a96ca..01cad7d4fa 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -38,6 +38,8 @@ from synapse.events.utils import prune_event from synapse.logging.context import PreserveLoggingContext, current_context from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker +from synapse.replication.tcp.streams import BackfillStream +from synapse.replication.tcp.streams.events import EventsStream from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage.database import Database from synapse.storage.util.id_generators import StreamIdGenerator @@ -80,10 +82,7 @@ class EventsWorkerStore(SQLBaseStore): # We are the process in charge of generating stream ids for events, # so instantiate ID generators based on the database self._stream_id_gen = StreamIdGenerator( - db_conn, - "events", - "stream_ordering", - extra_tables=[("local_invites", "stream_id")], + db_conn, "events", "stream_ordering", ) self._backfill_id_gen = StreamIdGenerator( db_conn, @@ -113,9 +112,9 @@ class EventsWorkerStore(SQLBaseStore): self._event_fetch_ongoing = 0 def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == "events": + if stream_name == EventsStream.NAME: self._stream_id_gen.advance(token) - elif stream_name == "backfill": + elif stream_name == BackfillStream.NAME: self._backfill_id_gen.advance(-token) super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py index fb1361f1c1..4fb9f9850c 100644 --- a/synapse/storage/data_stores/main/group_server.py +++ b/synapse/storage/data_stores/main/group_server.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import List, Tuple + from canonicaljson import json from twisted.internet import defer @@ -526,13 +528,35 @@ class GroupServerWorkerStore(SQLBaseStore): "get_groups_changes_for_user", _get_groups_changes_for_user_txn ) - def get_all_groups_changes(self, from_token, to_token, limit): - from_token = int(from_token) - has_changed = self._group_updates_stream_cache.has_any_entity_changed( - from_token - ) + async def get_all_groups_changes( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for groups replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + + last_id = int(last_id) + has_changed = self._group_updates_stream_cache.has_any_entity_changed(last_id) + if not has_changed: - return defer.succeed([]) + return [], current_id, False def _get_all_groups_changes_txn(txn): sql = """ @@ -541,13 +565,21 @@ class GroupServerWorkerStore(SQLBaseStore): WHERE ? < stream_id AND stream_id <= ? LIMIT ? """ - txn.execute(sql, (from_token, to_token, limit)) - return [ - (stream_id, group_id, user_id, gtype, json.loads(content_json)) + txn.execute(sql, (last_id, current_id, limit)) + updates = [ + (stream_id, (group_id, user_id, gtype, json.loads(content_json))) for stream_id, group_id, user_id, gtype, content_json in txn ] - return self.db.runInteraction( + limited = False + upto_token = current_id + if len(updates) >= limit: + upto_token = updates[-1][0] + limited = True + + return updates, upto_token, limited + + return await self.db.runInteraction( "get_all_groups_changes", _get_all_groups_changes_txn ) diff --git a/synapse/storage/data_stores/main/purge_events.py b/synapse/storage/data_stores/main/purge_events.py index a93e1ef198..6546569139 100644 --- a/synapse/storage/data_stores/main/purge_events.py +++ b/synapse/storage/data_stores/main/purge_events.py @@ -361,7 +361,6 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): "event_push_summary", "pusher_throttle", "group_summary_rooms", - "local_invites", "room_account_data", "room_tags", "local_current_membership", diff --git a/synapse/storage/data_stores/main/pusher.py b/synapse/storage/data_stores/main/pusher.py index 547b9d69cb..5461016240 100644 --- a/synapse/storage/data_stores/main/pusher.py +++ b/synapse/storage/data_stores/main/pusher.py @@ -15,7 +15,7 @@ # limitations under the License. import logging -from typing import Iterable, Iterator +from typing import Iterable, Iterator, List, Tuple from canonicaljson import encode_canonical_json, json @@ -98,77 +98,69 @@ class PusherWorkerStore(SQLBaseStore): rows = yield self.db.runInteraction("get_all_pushers", get_pushers) return rows - def get_all_updated_pushers(self, last_id, current_id, limit): - if last_id == current_id: - return defer.succeed(([], [])) - - def get_all_updated_pushers_txn(txn): - sql = ( - "SELECT id, user_name, access_token, profile_tag, kind," - " app_id, app_display_name, device_display_name, pushkey, ts," - " lang, data" - " FROM pushers" - " WHERE ? < id AND id <= ?" - " ORDER BY id ASC LIMIT ?" - ) - txn.execute(sql, (last_id, current_id, limit)) - updated = txn.fetchall() - - sql = ( - "SELECT stream_id, user_id, app_id, pushkey" - " FROM deleted_pushers" - " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC LIMIT ?" - ) - txn.execute(sql, (last_id, current_id, limit)) - deleted = txn.fetchall() + async def get_all_updated_pushers_rows( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for pushers replication stream. - return updated, deleted + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. - return self.db.runInteraction( - "get_all_updated_pushers", get_all_updated_pushers_txn - ) + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. - def get_all_updated_pushers_rows(self, last_id, current_id, limit): - """Get all the pushers that have changed between the given tokens. + The token returned can be used in a subsequent call to this + function to get further updatees. - Returns: - Deferred(list(tuple)): each tuple consists of: - stream_id (str) - user_id (str) - app_id (str) - pushkey (str) - was_deleted (bool): whether the pusher was added/updated (False) - or deleted (True) + The updates are a list of 2-tuples of stream ID and the row data """ if last_id == current_id: - return defer.succeed([]) + return [], current_id, False def get_all_updated_pushers_rows_txn(txn): - sql = ( - "SELECT id, user_name, app_id, pushkey" - " FROM pushers" - " WHERE ? < id AND id <= ?" - " ORDER BY id ASC LIMIT ?" - ) + sql = """ + SELECT id, user_name, app_id, pushkey + FROM pushers + WHERE ? < id AND id <= ? + ORDER BY id ASC LIMIT ? + """ txn.execute(sql, (last_id, current_id, limit)) - results = [list(row) + [False] for row in txn] - - sql = ( - "SELECT stream_id, user_id, app_id, pushkey" - " FROM deleted_pushers" - " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC LIMIT ?" - ) + updates = [ + (stream_id, (user_name, app_id, pushkey, False)) + for stream_id, user_name, app_id, pushkey in txn + ] + + sql = """ + SELECT stream_id, user_id, app_id, pushkey + FROM deleted_pushers + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC LIMIT ? + """ txn.execute(sql, (last_id, current_id, limit)) + updates.extend( + (stream_id, (user_name, app_id, pushkey, True)) + for stream_id, user_name, app_id, pushkey in txn + ) + + updates.sort() # Sort so that they're ordered by stream id - results.extend(list(row) + [True] for row in txn) - results.sort() # Sort so that they're ordered by stream id + limited = False + upper_bound = current_id + if len(updates) >= limit: + limited = True + upper_bound = updates[-1][0] - return results + return updates, upper_bound, limited - return self.db.runInteraction( + return await self.db.runInteraction( "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn ) diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py index 13e366536a..c473cf158f 100644 --- a/synapse/storage/data_stores/main/room.py +++ b/synapse/storage/data_stores/main/room.py @@ -803,7 +803,32 @@ class RoomWorkerStore(SQLBaseStore): return total_media_quarantined - def get_all_new_public_rooms(self, prev_id, current_id, limit): + async def get_all_new_public_rooms( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for public rooms replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + if last_id == current_id: + return [], current_id, False + def get_all_new_public_rooms(txn): sql = """ SELECT stream_id, room_id, visibility, appservice_id, network_id @@ -813,13 +838,17 @@ class RoomWorkerStore(SQLBaseStore): LIMIT ? """ - txn.execute(sql, (prev_id, current_id, limit)) - return txn.fetchall() + txn.execute(sql, (last_id, current_id, limit)) + updates = [(row[0], row[1:]) for row in txn] + limited = False + upto_token = current_id + if len(updates) >= limit: + upto_token = updates[-1][0] + limited = True - if prev_id == current_id: - return defer.succeed([]) + return updates, upto_token, limited - return self.db.runInteraction( + return await self.db.runInteraction( "get_all_new_public_rooms", get_all_new_public_rooms ) diff --git a/synapse/storage/data_stores/main/schema/delta/25/fts.py b/synapse/storage/data_stores/main/schema/delta/25/fts.py index 4b2ffd35fd..ee675e71ff 100644 --- a/synapse/storage/data_stores/main/schema/delta/25/fts.py +++ b/synapse/storage/data_stores/main/schema/delta/25/fts.py @@ -11,11 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import json import logging -import simplejson - from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.prepare_database import get_statements @@ -66,7 +64,7 @@ def run_create(cur, database_engine, *args, **kwargs): "max_stream_id_exclusive": max_stream_id + 1, "rows_inserted": 0, } - progress_json = simplejson.dumps(progress) + progress_json = json.dumps(progress) sql = ( "INSERT into background_updates (update_name, progress_json)" diff --git a/synapse/storage/data_stores/main/schema/delta/27/ts.py b/synapse/storage/data_stores/main/schema/delta/27/ts.py index 414f9f5aa0..b7972cfa8e 100644 --- a/synapse/storage/data_stores/main/schema/delta/27/ts.py +++ b/synapse/storage/data_stores/main/schema/delta/27/ts.py @@ -11,11 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import json import logging -import simplejson - from synapse.storage.prepare_database import get_statements logger = logging.getLogger(__name__) @@ -45,7 +43,7 @@ def run_create(cur, database_engine, *args, **kwargs): "max_stream_id_exclusive": max_stream_id + 1, "rows_inserted": 0, } - progress_json = simplejson.dumps(progress) + progress_json = json.dumps(progress) sql = ( "INSERT into background_updates (update_name, progress_json)" diff --git a/synapse/storage/data_stores/main/schema/delta/31/search_update.py b/synapse/storage/data_stores/main/schema/delta/31/search_update.py index 7d8ca5f93f..63b757ade6 100644 --- a/synapse/storage/data_stores/main/schema/delta/31/search_update.py +++ b/synapse/storage/data_stores/main/schema/delta/31/search_update.py @@ -11,11 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import json import logging -import simplejson - from synapse.storage.engines import PostgresEngine from synapse.storage.prepare_database import get_statements @@ -50,7 +48,7 @@ def run_create(cur, database_engine, *args, **kwargs): "rows_inserted": 0, "have_added_indexes": False, } - progress_json = simplejson.dumps(progress) + progress_json = json.dumps(progress) sql = ( "INSERT into background_updates (update_name, progress_json)" diff --git a/synapse/storage/data_stores/main/schema/delta/33/event_fields.py b/synapse/storage/data_stores/main/schema/delta/33/event_fields.py index bff1256a7b..a3e81eeac7 100644 --- a/synapse/storage/data_stores/main/schema/delta/33/event_fields.py +++ b/synapse/storage/data_stores/main/schema/delta/33/event_fields.py @@ -11,11 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import json import logging -import simplejson - from synapse.storage.prepare_database import get_statements logger = logging.getLogger(__name__) @@ -45,7 +43,7 @@ def run_create(cur, database_engine, *args, **kwargs): "max_stream_id_exclusive": max_stream_id + 1, "rows_inserted": 0, } - progress_json = simplejson.dumps(progress) + progress_json = json.dumps(progress) sql = ( "INSERT into background_updates (update_name, progress_json)" diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py index f8c776be3f..290317fd94 100644 --- a/synapse/storage/data_stores/main/tags.py +++ b/synapse/storage/data_stores/main/tags.py @@ -15,6 +15,7 @@ # limitations under the License. import logging +from typing import List, Tuple from canonicaljson import json @@ -53,18 +54,32 @@ class TagsWorkerStore(AccountDataWorkerStore): return deferred - @defer.inlineCallbacks - def get_all_updated_tags(self, last_id, current_id, limit): - """Get all the client tags that have changed on the server + async def get_all_updated_tags( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for tags replication stream. + Args: - last_id(int): The position to fetch from. - current_id(int): The position to fetch up to. + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + Returns: - A deferred list of tuples of stream_id int, user_id string, - room_id string, tag string and content string. + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data """ + if last_id == current_id: - return [] + return [], current_id, False def get_all_updated_tags_txn(txn): sql = ( @@ -76,7 +91,7 @@ class TagsWorkerStore(AccountDataWorkerStore): txn.execute(sql, (last_id, current_id, limit)) return txn.fetchall() - tag_ids = yield self.db.runInteraction( + tag_ids = await self.db.runInteraction( "get_all_updated_tags", get_all_updated_tags_txn ) @@ -89,21 +104,27 @@ class TagsWorkerStore(AccountDataWorkerStore): for tag, content in txn: tags.append(json.dumps(tag) + ":" + content) tag_json = "{" + ",".join(tags) + "}" - results.append((stream_id, user_id, room_id, tag_json)) + results.append((stream_id, (user_id, room_id, tag_json))) return results batch_size = 50 results = [] for i in range(0, len(tag_ids), batch_size): - tags = yield self.db.runInteraction( + tags = await self.db.runInteraction( "get_all_updated_tag_content", get_tag_content, tag_ids[i : i + batch_size], ) results.extend(tags) - return results + limited = False + upto_token = current_id + if len(results) >= limit: + upto_token = results[-1][0] + limited = True + + return results, upto_token, limited @defer.inlineCallbacks def get_updated_tags(self, user_id, stream_id): diff --git a/synapse/storage/data_stores/main/ui_auth.py b/synapse/storage/data_stores/main/ui_auth.py index ec2f38c373..4c044b1a15 100644 --- a/synapse/storage/data_stores/main/ui_auth.py +++ b/synapse/storage/data_stores/main/ui_auth.py @@ -17,10 +17,10 @@ from typing import Any, Dict, Optional, Union import attr -import synapse.util.stringutils as stringutils from synapse.api.errors import StoreError from synapse.storage._base import SQLBaseStore from synapse.types import JsonDict +from synapse.util import stringutils as stringutils @attr.s diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 6c7d08a6f2..a31588080d 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -92,7 +92,7 @@ class PostgresEngine(BaseDatabaseEngine): errors.append(" - 'COLLATE' is set to %r. Should be 'C'" % (collation,)) if ctype != "C": - errors.append(" - 'CTYPE' is set to %r. Should be 'C'" % (collation,)) + errors.append(" - 'CTYPE' is set to %r. Should be 'C'" % (ctype,)) if errors: raise IncorrectDatabaseSetup( diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index ec894a91cb..fa46041676 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -783,9 +783,3 @@ class EventsPersistenceStorage(object): for user_id in left_users: await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id) - - async def locally_reject_invite(self, user_id: str, room_id: str) -> int: - """Mark the invite has having been rejected even though we failed to - create a leave event for it. - """ - return await self.persist_events_store.locally_reject_invite(user_id, room_id) diff --git a/synapse/storage/types.py b/synapse/storage/types.py index daff81c5ee..2d2b560e74 100644 --- a/synapse/storage/types.py +++ b/synapse/storage/types.py @@ -12,12 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from typing import Any, Iterable, Iterator, List, Tuple from typing_extensions import Protocol - """ Some very basic protocol definitions for the DB-API2 classes specified in PEP-249 """ |