diff options
Diffstat (limited to 'synapse/storage/__init__.py')
-rw-r--r-- | synapse/storage/__init__.py | 211 |
1 files changed, 110 insertions, 101 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 42cd3c83ad..c432041b4e 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -18,6 +18,8 @@ import calendar import logging import time +from twisted.internet import defer + from synapse.api.constants import PresenceState from synapse.storage.devices import DeviceStore from synapse.storage.user_erasure_store import UserErasureStore @@ -61,48 +63,60 @@ from .util.id_generators import ChainedIdGenerator, IdGenerator, StreamIdGenerat logger = logging.getLogger(__name__) -class DataStore(RoomMemberStore, RoomStore, - RegistrationStore, StreamStore, ProfileStore, - PresenceStore, TransactionStore, - DirectoryStore, KeyStore, StateStore, SignatureStore, - ApplicationServiceStore, - EventsStore, - EventFederationStore, - MediaRepositoryStore, - RejectionsStore, - FilteringStore, - PusherStore, - PushRuleStore, - ApplicationServiceTransactionStore, - ReceiptsStore, - EndToEndKeyStore, - EndToEndRoomKeyStore, - SearchStore, - TagsStore, - AccountDataStore, - EventPushActionsStore, - OpenIdStore, - ClientIpStore, - DeviceStore, - DeviceInboxStore, - UserDirectoryStore, - GroupServerStore, - UserErasureStore, - MonthlyActiveUsersStore, - ): - +class DataStore( + RoomMemberStore, + RoomStore, + RegistrationStore, + StreamStore, + ProfileStore, + PresenceStore, + TransactionStore, + DirectoryStore, + KeyStore, + StateStore, + SignatureStore, + ApplicationServiceStore, + EventsStore, + EventFederationStore, + MediaRepositoryStore, + RejectionsStore, + FilteringStore, + PusherStore, + PushRuleStore, + ApplicationServiceTransactionStore, + ReceiptsStore, + EndToEndKeyStore, + EndToEndRoomKeyStore, + SearchStore, + TagsStore, + AccountDataStore, + EventPushActionsStore, + OpenIdStore, + ClientIpStore, + DeviceStore, + DeviceInboxStore, + UserDirectoryStore, + GroupServerStore, + UserErasureStore, + MonthlyActiveUsersStore, +): def __init__(self, db_conn, hs): self.hs = hs self._clock = hs.get_clock() self.database_engine = hs.database_engine self._stream_id_gen = StreamIdGenerator( - db_conn, "events", "stream_ordering", - extra_tables=[("local_invites", "stream_id")] + db_conn, + "events", + "stream_ordering", + extra_tables=[("local_invites", "stream_id")], ) self._backfill_id_gen = StreamIdGenerator( - db_conn, "events", "stream_ordering", step=-1, - extra_tables=[("ex_outlier_stream", "event_stream_ordering")] + db_conn, + "events", + "stream_ordering", + step=-1, + extra_tables=[("ex_outlier_stream", "event_stream_ordering")], ) self._presence_id_gen = StreamIdGenerator( db_conn, "presence_stream", "stream_id" @@ -114,7 +128,7 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "public_room_list_stream", "stream_id" ) self._device_list_id_gen = StreamIdGenerator( - db_conn, "device_lists_stream", "stream_id", + db_conn, "device_lists_stream", "stream_id" ) self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") @@ -125,16 +139,15 @@ class DataStore(RoomMemberStore, RoomStore, self._stream_id_gen, db_conn, "push_rules_stream", "stream_id" ) self._pushers_id_gen = StreamIdGenerator( - db_conn, "pushers", "id", - extra_tables=[("deleted_pushers", "stream_id")], + db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")] ) self._group_updates_id_gen = StreamIdGenerator( - db_conn, "local_group_updates", "stream_id", + db_conn, "local_group_updates", "stream_id" ) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( - db_conn, "cache_invalidation_stream", "stream_id", + db_conn, "cache_invalidation_stream", "stream_id" ) else: self._cache_id_gen = None @@ -142,72 +155,82 @@ class DataStore(RoomMemberStore, RoomStore, self._presence_on_startup = self._get_active_presence(db_conn) presence_cache_prefill, min_presence_val = self._get_cache_dict( - db_conn, "presence_stream", + db_conn, + "presence_stream", entity_column="user_id", stream_column="stream_id", max_value=self._presence_id_gen.get_current_token(), ) self.presence_stream_cache = StreamChangeCache( - "PresenceStreamChangeCache", min_presence_val, - prefilled_cache=presence_cache_prefill + "PresenceStreamChangeCache", + min_presence_val, + prefilled_cache=presence_cache_prefill, ) max_device_inbox_id = self._device_inbox_id_gen.get_current_token() device_inbox_prefill, min_device_inbox_id = self._get_cache_dict( - db_conn, "device_inbox", + db_conn, + "device_inbox", entity_column="user_id", stream_column="stream_id", max_value=max_device_inbox_id, limit=1000, ) self._device_inbox_stream_cache = StreamChangeCache( - "DeviceInboxStreamChangeCache", min_device_inbox_id, + "DeviceInboxStreamChangeCache", + min_device_inbox_id, prefilled_cache=device_inbox_prefill, ) # The federation outbox and the local device inbox uses the same # stream_id generator. device_outbox_prefill, min_device_outbox_id = self._get_cache_dict( - db_conn, "device_federation_outbox", + db_conn, + "device_federation_outbox", entity_column="destination", stream_column="stream_id", max_value=max_device_inbox_id, limit=1000, ) self._device_federation_outbox_stream_cache = StreamChangeCache( - "DeviceFederationOutboxStreamChangeCache", min_device_outbox_id, + "DeviceFederationOutboxStreamChangeCache", + min_device_outbox_id, prefilled_cache=device_outbox_prefill, ) device_list_max = self._device_list_id_gen.get_current_token() self._device_list_stream_cache = StreamChangeCache( - "DeviceListStreamChangeCache", device_list_max, + "DeviceListStreamChangeCache", device_list_max ) self._device_list_federation_stream_cache = StreamChangeCache( - "DeviceListFederationStreamChangeCache", device_list_max, + "DeviceListFederationStreamChangeCache", device_list_max ) events_max = self._stream_id_gen.get_current_token() curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict( - db_conn, "current_state_delta_stream", + db_conn, + "current_state_delta_stream", entity_column="room_id", stream_column="stream_id", max_value=events_max, # As we share the stream id with events token limit=1000, ) self._curr_state_delta_stream_cache = StreamChangeCache( - "_curr_state_delta_stream_cache", min_curr_state_delta_id, + "_curr_state_delta_stream_cache", + min_curr_state_delta_id, prefilled_cache=curr_state_delta_prefill, ) _group_updates_prefill, min_group_updates_id = self._get_cache_dict( - db_conn, "local_group_updates", + db_conn, + "local_group_updates", entity_column="user_id", stream_column="stream_id", max_value=self._group_updates_id_gen.get_current_token(), limit=1000, ) self._group_updates_stream_cache = StreamChangeCache( - "_group_updates_stream_cache", min_group_updates_id, + "_group_updates_stream_cache", + min_group_updates_id, prefilled_cache=_group_updates_prefill, ) @@ -250,6 +273,7 @@ class DataStore(RoomMemberStore, RoomStore, """ Counts the number of users who used this homeserver in the last 24 hours. """ + def _count_users(txn): yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24) @@ -277,6 +301,7 @@ class DataStore(RoomMemberStore, RoomStore, Returns counts globaly for a given user as well as breaking by platform """ + def _count_r30_users(txn): thirty_days_in_secs = 86400 * 30 now = int(self._clock.time()) @@ -313,8 +338,7 @@ class DataStore(RoomMemberStore, RoomStore, """ results = {} - txn.execute(sql, (thirty_days_ago_in_secs, - thirty_days_ago_in_secs)) + txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) for row in txn: if row[0] == 'unknown': @@ -341,8 +365,7 @@ class DataStore(RoomMemberStore, RoomStore, ) u """ - txn.execute(sql, (thirty_days_ago_in_secs, - thirty_days_ago_in_secs)) + txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) count, = txn.fetchone() results['all'] = count @@ -356,15 +379,14 @@ class DataStore(RoomMemberStore, RoomStore, Returns millisecond unixtime for start of UTC day. """ now = time.gmtime() - today_start = calendar.timegm(( - now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0, - )) + today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0)) return today_start * 1000 def generate_user_daily_visits(self): """ Generates daily visit data for use in cohort/ retention analysis """ + def _generate_user_daily_visits(txn): logger.info("Calling _generate_user_daily_visits") today_start = self._get_start_of_day() @@ -395,25 +417,29 @@ class DataStore(RoomMemberStore, RoomStore, # often to minimise this case. if today_start > self._last_user_visit_update: yesterday_start = today_start - a_day_in_milliseconds - txn.execute(sql, ( - yesterday_start, yesterday_start, - self._last_user_visit_update, today_start - )) + txn.execute( + sql, + ( + yesterday_start, + yesterday_start, + self._last_user_visit_update, + today_start, + ), + ) self._last_user_visit_update = today_start - txn.execute(sql, ( - today_start, today_start, - self._last_user_visit_update, - now - )) + txn.execute( + sql, (today_start, today_start, self._last_user_visit_update, now) + ) # Update _last_user_visit_update to now. The reason to do this # rather just clamping to the beginning of the day is to limit # the size of the join - meaning that the query can be run more # frequently self._last_user_visit_update = now - return self.runInteraction("generate_user_daily_visits", - _generate_user_daily_visits) + return self.runInteraction( + "generate_user_daily_visits", _generate_user_daily_visits + ) def get_users(self): """Function to reterive a list of users in users table. @@ -425,15 +451,11 @@ class DataStore(RoomMemberStore, RoomStore, return self._simple_select_list( table="users", keyvalues={}, - retcols=[ - "name", - "password_hash", - "is_guest", - "admin" - ], + retcols=["name", "password_hash", "is_guest", "admin"], desc="get_users", ) + @defer.inlineCallbacks def get_users_paginate(self, order, start, limit): """Function to reterive a paginated list of users from users list. This will return a json object, which contains @@ -446,27 +468,19 @@ class DataStore(RoomMemberStore, RoomStore, Returns: defer.Deferred: resolves to json object {list[dict[str, Any]], count} """ - is_guest = 0 - i_start = (int)(start) - i_limit = (int)(limit) - return self.get_user_list_paginate( + users = yield self.runInteraction( + "get_users_paginate", + self._simple_select_list_paginate_txn, table="users", - keyvalues={ - "is_guest": is_guest - }, - pagevalues=[ - order, - i_limit, - i_start - ], - retcols=[ - "name", - "password_hash", - "is_guest", - "admin" - ], - desc="get_users_paginate", + keyvalues={"is_guest": False}, + orderby=order, + start=start, + limit=limit, + retcols=["name", "password_hash", "is_guest", "admin"], ) + count = yield self.runInteraction("get_users_paginate", self.get_user_count_txn) + retval = {"users": users, "total": count} + defer.returnValue(retval) def search_users(self, term): """Function to search users list for one or more users with @@ -482,12 +496,7 @@ class DataStore(RoomMemberStore, RoomStore, table="users", term=term, col="name", - retcols=[ - "name", - "password_hash", - "is_guest", - "admin" - ], + retcols=["name", "password_hash", "is_guest", "admin"], desc="search_users", ) |