diff options
Diffstat (limited to 'synapse/storage/__init__.py')
-rw-r--r-- | synapse/storage/__init__.py | 195 |
1 files changed, 99 insertions, 96 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 42cd3c83ad..e9aa2fc9dd 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -61,48 +61,60 @@ from .util.id_generators import ChainedIdGenerator, IdGenerator, StreamIdGenerat logger = logging.getLogger(__name__) -class DataStore(RoomMemberStore, RoomStore, - RegistrationStore, StreamStore, ProfileStore, - PresenceStore, TransactionStore, - DirectoryStore, KeyStore, StateStore, SignatureStore, - ApplicationServiceStore, - EventsStore, - EventFederationStore, - MediaRepositoryStore, - RejectionsStore, - FilteringStore, - PusherStore, - PushRuleStore, - ApplicationServiceTransactionStore, - ReceiptsStore, - EndToEndKeyStore, - EndToEndRoomKeyStore, - SearchStore, - TagsStore, - AccountDataStore, - EventPushActionsStore, - OpenIdStore, - ClientIpStore, - DeviceStore, - DeviceInboxStore, - UserDirectoryStore, - GroupServerStore, - UserErasureStore, - MonthlyActiveUsersStore, - ): - +class DataStore( + RoomMemberStore, + RoomStore, + RegistrationStore, + StreamStore, + ProfileStore, + PresenceStore, + TransactionStore, + DirectoryStore, + KeyStore, + StateStore, + SignatureStore, + ApplicationServiceStore, + EventsStore, + EventFederationStore, + MediaRepositoryStore, + RejectionsStore, + FilteringStore, + PusherStore, + PushRuleStore, + ApplicationServiceTransactionStore, + ReceiptsStore, + EndToEndKeyStore, + EndToEndRoomKeyStore, + SearchStore, + TagsStore, + AccountDataStore, + EventPushActionsStore, + OpenIdStore, + ClientIpStore, + DeviceStore, + DeviceInboxStore, + UserDirectoryStore, + GroupServerStore, + UserErasureStore, + MonthlyActiveUsersStore, +): def __init__(self, db_conn, hs): self.hs = hs self._clock = hs.get_clock() self.database_engine = hs.database_engine self._stream_id_gen = StreamIdGenerator( - db_conn, "events", "stream_ordering", - extra_tables=[("local_invites", "stream_id")] + db_conn, + "events", + "stream_ordering", + extra_tables=[("local_invites", "stream_id")], ) self._backfill_id_gen = StreamIdGenerator( - db_conn, "events", "stream_ordering", step=-1, - extra_tables=[("ex_outlier_stream", "event_stream_ordering")] + db_conn, + "events", + "stream_ordering", + step=-1, + extra_tables=[("ex_outlier_stream", "event_stream_ordering")], ) self._presence_id_gen = StreamIdGenerator( db_conn, "presence_stream", "stream_id" @@ -114,7 +126,7 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "public_room_list_stream", "stream_id" ) self._device_list_id_gen = StreamIdGenerator( - db_conn, "device_lists_stream", "stream_id", + db_conn, "device_lists_stream", "stream_id" ) self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") @@ -125,16 +137,15 @@ class DataStore(RoomMemberStore, RoomStore, self._stream_id_gen, db_conn, "push_rules_stream", "stream_id" ) self._pushers_id_gen = StreamIdGenerator( - db_conn, "pushers", "id", - extra_tables=[("deleted_pushers", "stream_id")], + db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")] ) self._group_updates_id_gen = StreamIdGenerator( - db_conn, "local_group_updates", "stream_id", + db_conn, "local_group_updates", "stream_id" ) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( - db_conn, "cache_invalidation_stream", "stream_id", + db_conn, "cache_invalidation_stream", "stream_id" ) else: self._cache_id_gen = None @@ -142,72 +153,82 @@ class DataStore(RoomMemberStore, RoomStore, self._presence_on_startup = self._get_active_presence(db_conn) presence_cache_prefill, min_presence_val = self._get_cache_dict( - db_conn, "presence_stream", + db_conn, + "presence_stream", entity_column="user_id", stream_column="stream_id", max_value=self._presence_id_gen.get_current_token(), ) self.presence_stream_cache = StreamChangeCache( - "PresenceStreamChangeCache", min_presence_val, - prefilled_cache=presence_cache_prefill + "PresenceStreamChangeCache", + min_presence_val, + prefilled_cache=presence_cache_prefill, ) max_device_inbox_id = self._device_inbox_id_gen.get_current_token() device_inbox_prefill, min_device_inbox_id = self._get_cache_dict( - db_conn, "device_inbox", + db_conn, + "device_inbox", entity_column="user_id", stream_column="stream_id", max_value=max_device_inbox_id, limit=1000, ) self._device_inbox_stream_cache = StreamChangeCache( - "DeviceInboxStreamChangeCache", min_device_inbox_id, + "DeviceInboxStreamChangeCache", + min_device_inbox_id, prefilled_cache=device_inbox_prefill, ) # The federation outbox and the local device inbox uses the same # stream_id generator. device_outbox_prefill, min_device_outbox_id = self._get_cache_dict( - db_conn, "device_federation_outbox", + db_conn, + "device_federation_outbox", entity_column="destination", stream_column="stream_id", max_value=max_device_inbox_id, limit=1000, ) self._device_federation_outbox_stream_cache = StreamChangeCache( - "DeviceFederationOutboxStreamChangeCache", min_device_outbox_id, + "DeviceFederationOutboxStreamChangeCache", + min_device_outbox_id, prefilled_cache=device_outbox_prefill, ) device_list_max = self._device_list_id_gen.get_current_token() self._device_list_stream_cache = StreamChangeCache( - "DeviceListStreamChangeCache", device_list_max, + "DeviceListStreamChangeCache", device_list_max ) self._device_list_federation_stream_cache = StreamChangeCache( - "DeviceListFederationStreamChangeCache", device_list_max, + "DeviceListFederationStreamChangeCache", device_list_max ) events_max = self._stream_id_gen.get_current_token() curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict( - db_conn, "current_state_delta_stream", + db_conn, + "current_state_delta_stream", entity_column="room_id", stream_column="stream_id", max_value=events_max, # As we share the stream id with events token limit=1000, ) self._curr_state_delta_stream_cache = StreamChangeCache( - "_curr_state_delta_stream_cache", min_curr_state_delta_id, + "_curr_state_delta_stream_cache", + min_curr_state_delta_id, prefilled_cache=curr_state_delta_prefill, ) _group_updates_prefill, min_group_updates_id = self._get_cache_dict( - db_conn, "local_group_updates", + db_conn, + "local_group_updates", entity_column="user_id", stream_column="stream_id", max_value=self._group_updates_id_gen.get_current_token(), limit=1000, ) self._group_updates_stream_cache = StreamChangeCache( - "_group_updates_stream_cache", min_group_updates_id, + "_group_updates_stream_cache", + min_group_updates_id, prefilled_cache=_group_updates_prefill, ) @@ -250,6 +271,7 @@ class DataStore(RoomMemberStore, RoomStore, """ Counts the number of users who used this homeserver in the last 24 hours. """ + def _count_users(txn): yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24) @@ -277,6 +299,7 @@ class DataStore(RoomMemberStore, RoomStore, Returns counts globaly for a given user as well as breaking by platform """ + def _count_r30_users(txn): thirty_days_in_secs = 86400 * 30 now = int(self._clock.time()) @@ -313,8 +336,7 @@ class DataStore(RoomMemberStore, RoomStore, """ results = {} - txn.execute(sql, (thirty_days_ago_in_secs, - thirty_days_ago_in_secs)) + txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) for row in txn: if row[0] == 'unknown': @@ -341,8 +363,7 @@ class DataStore(RoomMemberStore, RoomStore, ) u """ - txn.execute(sql, (thirty_days_ago_in_secs, - thirty_days_ago_in_secs)) + txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) count, = txn.fetchone() results['all'] = count @@ -356,15 +377,14 @@ class DataStore(RoomMemberStore, RoomStore, Returns millisecond unixtime for start of UTC day. """ now = time.gmtime() - today_start = calendar.timegm(( - now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0, - )) + today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0)) return today_start * 1000 def generate_user_daily_visits(self): """ Generates daily visit data for use in cohort/ retention analysis """ + def _generate_user_daily_visits(txn): logger.info("Calling _generate_user_daily_visits") today_start = self._get_start_of_day() @@ -395,25 +415,29 @@ class DataStore(RoomMemberStore, RoomStore, # often to minimise this case. if today_start > self._last_user_visit_update: yesterday_start = today_start - a_day_in_milliseconds - txn.execute(sql, ( - yesterday_start, yesterday_start, - self._last_user_visit_update, today_start - )) + txn.execute( + sql, + ( + yesterday_start, + yesterday_start, + self._last_user_visit_update, + today_start, + ), + ) self._last_user_visit_update = today_start - txn.execute(sql, ( - today_start, today_start, - self._last_user_visit_update, - now - )) + txn.execute( + sql, (today_start, today_start, self._last_user_visit_update, now) + ) # Update _last_user_visit_update to now. The reason to do this # rather just clamping to the beginning of the day is to limit # the size of the join - meaning that the query can be run more # frequently self._last_user_visit_update = now - return self.runInteraction("generate_user_daily_visits", - _generate_user_daily_visits) + return self.runInteraction( + "generate_user_daily_visits", _generate_user_daily_visits + ) def get_users(self): """Function to reterive a list of users in users table. @@ -425,12 +449,7 @@ class DataStore(RoomMemberStore, RoomStore, return self._simple_select_list( table="users", keyvalues={}, - retcols=[ - "name", - "password_hash", - "is_guest", - "admin" - ], + retcols=["name", "password_hash", "is_guest", "admin"], desc="get_users", ) @@ -451,20 +470,9 @@ class DataStore(RoomMemberStore, RoomStore, i_limit = (int)(limit) return self.get_user_list_paginate( table="users", - keyvalues={ - "is_guest": is_guest - }, - pagevalues=[ - order, - i_limit, - i_start - ], - retcols=[ - "name", - "password_hash", - "is_guest", - "admin" - ], + keyvalues={"is_guest": is_guest}, + pagevalues=[order, i_limit, i_start], + retcols=["name", "password_hash", "is_guest", "admin"], desc="get_users_paginate", ) @@ -482,12 +490,7 @@ class DataStore(RoomMemberStore, RoomStore, table="users", term=term, col="name", - retcols=[ - "name", - "password_hash", - "is_guest", - "admin" - ], + retcols=["name", "password_hash", "is_guest", "admin"], desc="search_users", ) |