diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 42cd3c83ad..e9aa2fc9dd 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -61,48 +61,60 @@ from .util.id_generators import ChainedIdGenerator, IdGenerator, StreamIdGenerat
logger = logging.getLogger(__name__)
-class DataStore(RoomMemberStore, RoomStore,
- RegistrationStore, StreamStore, ProfileStore,
- PresenceStore, TransactionStore,
- DirectoryStore, KeyStore, StateStore, SignatureStore,
- ApplicationServiceStore,
- EventsStore,
- EventFederationStore,
- MediaRepositoryStore,
- RejectionsStore,
- FilteringStore,
- PusherStore,
- PushRuleStore,
- ApplicationServiceTransactionStore,
- ReceiptsStore,
- EndToEndKeyStore,
- EndToEndRoomKeyStore,
- SearchStore,
- TagsStore,
- AccountDataStore,
- EventPushActionsStore,
- OpenIdStore,
- ClientIpStore,
- DeviceStore,
- DeviceInboxStore,
- UserDirectoryStore,
- GroupServerStore,
- UserErasureStore,
- MonthlyActiveUsersStore,
- ):
-
+class DataStore(
+ RoomMemberStore,
+ RoomStore,
+ RegistrationStore,
+ StreamStore,
+ ProfileStore,
+ PresenceStore,
+ TransactionStore,
+ DirectoryStore,
+ KeyStore,
+ StateStore,
+ SignatureStore,
+ ApplicationServiceStore,
+ EventsStore,
+ EventFederationStore,
+ MediaRepositoryStore,
+ RejectionsStore,
+ FilteringStore,
+ PusherStore,
+ PushRuleStore,
+ ApplicationServiceTransactionStore,
+ ReceiptsStore,
+ EndToEndKeyStore,
+ EndToEndRoomKeyStore,
+ SearchStore,
+ TagsStore,
+ AccountDataStore,
+ EventPushActionsStore,
+ OpenIdStore,
+ ClientIpStore,
+ DeviceStore,
+ DeviceInboxStore,
+ UserDirectoryStore,
+ GroupServerStore,
+ UserErasureStore,
+ MonthlyActiveUsersStore,
+):
def __init__(self, db_conn, hs):
self.hs = hs
self._clock = hs.get_clock()
self.database_engine = hs.database_engine
self._stream_id_gen = StreamIdGenerator(
- db_conn, "events", "stream_ordering",
- extra_tables=[("local_invites", "stream_id")]
+ db_conn,
+ "events",
+ "stream_ordering",
+ extra_tables=[("local_invites", "stream_id")],
)
self._backfill_id_gen = StreamIdGenerator(
- db_conn, "events", "stream_ordering", step=-1,
- extra_tables=[("ex_outlier_stream", "event_stream_ordering")]
+ db_conn,
+ "events",
+ "stream_ordering",
+ step=-1,
+ extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
)
self._presence_id_gen = StreamIdGenerator(
db_conn, "presence_stream", "stream_id"
@@ -114,7 +126,7 @@ class DataStore(RoomMemberStore, RoomStore,
db_conn, "public_room_list_stream", "stream_id"
)
self._device_list_id_gen = StreamIdGenerator(
- db_conn, "device_lists_stream", "stream_id",
+ db_conn, "device_lists_stream", "stream_id"
)
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
@@ -125,16 +137,15 @@ class DataStore(RoomMemberStore, RoomStore,
self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
)
self._pushers_id_gen = StreamIdGenerator(
- db_conn, "pushers", "id",
- extra_tables=[("deleted_pushers", "stream_id")],
+ db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
)
self._group_updates_id_gen = StreamIdGenerator(
- db_conn, "local_group_updates", "stream_id",
+ db_conn, "local_group_updates", "stream_id"
)
if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = StreamIdGenerator(
- db_conn, "cache_invalidation_stream", "stream_id",
+ db_conn, "cache_invalidation_stream", "stream_id"
)
else:
self._cache_id_gen = None
@@ -142,72 +153,82 @@ class DataStore(RoomMemberStore, RoomStore,
self._presence_on_startup = self._get_active_presence(db_conn)
presence_cache_prefill, min_presence_val = self._get_cache_dict(
- db_conn, "presence_stream",
+ db_conn,
+ "presence_stream",
entity_column="user_id",
stream_column="stream_id",
max_value=self._presence_id_gen.get_current_token(),
)
self.presence_stream_cache = StreamChangeCache(
- "PresenceStreamChangeCache", min_presence_val,
- prefilled_cache=presence_cache_prefill
+ "PresenceStreamChangeCache",
+ min_presence_val,
+ prefilled_cache=presence_cache_prefill,
)
max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
device_inbox_prefill, min_device_inbox_id = self._get_cache_dict(
- db_conn, "device_inbox",
+ db_conn,
+ "device_inbox",
entity_column="user_id",
stream_column="stream_id",
max_value=max_device_inbox_id,
limit=1000,
)
self._device_inbox_stream_cache = StreamChangeCache(
- "DeviceInboxStreamChangeCache", min_device_inbox_id,
+ "DeviceInboxStreamChangeCache",
+ min_device_inbox_id,
prefilled_cache=device_inbox_prefill,
)
# The federation outbox and the local device inbox uses the same
# stream_id generator.
device_outbox_prefill, min_device_outbox_id = self._get_cache_dict(
- db_conn, "device_federation_outbox",
+ db_conn,
+ "device_federation_outbox",
entity_column="destination",
stream_column="stream_id",
max_value=max_device_inbox_id,
limit=1000,
)
self._device_federation_outbox_stream_cache = StreamChangeCache(
- "DeviceFederationOutboxStreamChangeCache", min_device_outbox_id,
+ "DeviceFederationOutboxStreamChangeCache",
+ min_device_outbox_id,
prefilled_cache=device_outbox_prefill,
)
device_list_max = self._device_list_id_gen.get_current_token()
self._device_list_stream_cache = StreamChangeCache(
- "DeviceListStreamChangeCache", device_list_max,
+ "DeviceListStreamChangeCache", device_list_max
)
self._device_list_federation_stream_cache = StreamChangeCache(
- "DeviceListFederationStreamChangeCache", device_list_max,
+ "DeviceListFederationStreamChangeCache", device_list_max
)
events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
- db_conn, "current_state_delta_stream",
+ db_conn,
+ "current_state_delta_stream",
entity_column="room_id",
stream_column="stream_id",
max_value=events_max, # As we share the stream id with events token
limit=1000,
)
self._curr_state_delta_stream_cache = StreamChangeCache(
- "_curr_state_delta_stream_cache", min_curr_state_delta_id,
+ "_curr_state_delta_stream_cache",
+ min_curr_state_delta_id,
prefilled_cache=curr_state_delta_prefill,
)
_group_updates_prefill, min_group_updates_id = self._get_cache_dict(
- db_conn, "local_group_updates",
+ db_conn,
+ "local_group_updates",
entity_column="user_id",
stream_column="stream_id",
max_value=self._group_updates_id_gen.get_current_token(),
limit=1000,
)
self._group_updates_stream_cache = StreamChangeCache(
- "_group_updates_stream_cache", min_group_updates_id,
+ "_group_updates_stream_cache",
+ min_group_updates_id,
prefilled_cache=_group_updates_prefill,
)
@@ -250,6 +271,7 @@ class DataStore(RoomMemberStore, RoomStore,
"""
Counts the number of users who used this homeserver in the last 24 hours.
"""
+
def _count_users(txn):
yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
@@ -277,6 +299,7 @@ class DataStore(RoomMemberStore, RoomStore,
Returns counts globaly for a given user as well as breaking
by platform
"""
+
def _count_r30_users(txn):
thirty_days_in_secs = 86400 * 30
now = int(self._clock.time())
@@ -313,8 +336,7 @@ class DataStore(RoomMemberStore, RoomStore,
"""
results = {}
- txn.execute(sql, (thirty_days_ago_in_secs,
- thirty_days_ago_in_secs))
+ txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
for row in txn:
if row[0] == 'unknown':
@@ -341,8 +363,7 @@ class DataStore(RoomMemberStore, RoomStore,
) u
"""
- txn.execute(sql, (thirty_days_ago_in_secs,
- thirty_days_ago_in_secs))
+ txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
count, = txn.fetchone()
results['all'] = count
@@ -356,15 +377,14 @@ class DataStore(RoomMemberStore, RoomStore,
Returns millisecond unixtime for start of UTC day.
"""
now = time.gmtime()
- today_start = calendar.timegm((
- now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0,
- ))
+ today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
return today_start * 1000
def generate_user_daily_visits(self):
"""
Generates daily visit data for use in cohort/ retention analysis
"""
+
def _generate_user_daily_visits(txn):
logger.info("Calling _generate_user_daily_visits")
today_start = self._get_start_of_day()
@@ -395,25 +415,29 @@ class DataStore(RoomMemberStore, RoomStore,
# often to minimise this case.
if today_start > self._last_user_visit_update:
yesterday_start = today_start - a_day_in_milliseconds
- txn.execute(sql, (
- yesterday_start, yesterday_start,
- self._last_user_visit_update, today_start
- ))
+ txn.execute(
+ sql,
+ (
+ yesterday_start,
+ yesterday_start,
+ self._last_user_visit_update,
+ today_start,
+ ),
+ )
self._last_user_visit_update = today_start
- txn.execute(sql, (
- today_start, today_start,
- self._last_user_visit_update,
- now
- ))
+ txn.execute(
+ sql, (today_start, today_start, self._last_user_visit_update, now)
+ )
# Update _last_user_visit_update to now. The reason to do this
# rather just clamping to the beginning of the day is to limit
# the size of the join - meaning that the query can be run more
# frequently
self._last_user_visit_update = now
- return self.runInteraction("generate_user_daily_visits",
- _generate_user_daily_visits)
+ return self.runInteraction(
+ "generate_user_daily_visits", _generate_user_daily_visits
+ )
def get_users(self):
"""Function to reterive a list of users in users table.
@@ -425,12 +449,7 @@ class DataStore(RoomMemberStore, RoomStore,
return self._simple_select_list(
table="users",
keyvalues={},
- retcols=[
- "name",
- "password_hash",
- "is_guest",
- "admin"
- ],
+ retcols=["name", "password_hash", "is_guest", "admin"],
desc="get_users",
)
@@ -451,20 +470,9 @@ class DataStore(RoomMemberStore, RoomStore,
i_limit = (int)(limit)
return self.get_user_list_paginate(
table="users",
- keyvalues={
- "is_guest": is_guest
- },
- pagevalues=[
- order,
- i_limit,
- i_start
- ],
- retcols=[
- "name",
- "password_hash",
- "is_guest",
- "admin"
- ],
+ keyvalues={"is_guest": is_guest},
+ pagevalues=[order, i_limit, i_start],
+ retcols=["name", "password_hash", "is_guest", "admin"],
desc="get_users_paginate",
)
@@ -482,12 +490,7 @@ class DataStore(RoomMemberStore, RoomStore,
table="users",
term=term,
col="name",
- retcols=[
- "name",
- "password_hash",
- "is_guest",
- "admin"
- ],
+ retcols=["name", "password_hash", "is_guest", "admin"],
desc="search_users",
)
|