summary refs log tree commit diff
path: root/synapse/storage/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/__init__.py')
-rw-r--r--synapse/storage/__init__.py211
1 files changed, 110 insertions, 101 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 42cd3c83ad..c432041b4e 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -18,6 +18,8 @@ import calendar
 import logging
 import time
 
+from twisted.internet import defer
+
 from synapse.api.constants import PresenceState
 from synapse.storage.devices import DeviceStore
 from synapse.storage.user_erasure_store import UserErasureStore
@@ -61,48 +63,60 @@ from .util.id_generators import ChainedIdGenerator, IdGenerator, StreamIdGenerat
 logger = logging.getLogger(__name__)
 
 
-class DataStore(RoomMemberStore, RoomStore,
-                RegistrationStore, StreamStore, ProfileStore,
-                PresenceStore, TransactionStore,
-                DirectoryStore, KeyStore, StateStore, SignatureStore,
-                ApplicationServiceStore,
-                EventsStore,
-                EventFederationStore,
-                MediaRepositoryStore,
-                RejectionsStore,
-                FilteringStore,
-                PusherStore,
-                PushRuleStore,
-                ApplicationServiceTransactionStore,
-                ReceiptsStore,
-                EndToEndKeyStore,
-                EndToEndRoomKeyStore,
-                SearchStore,
-                TagsStore,
-                AccountDataStore,
-                EventPushActionsStore,
-                OpenIdStore,
-                ClientIpStore,
-                DeviceStore,
-                DeviceInboxStore,
-                UserDirectoryStore,
-                GroupServerStore,
-                UserErasureStore,
-                MonthlyActiveUsersStore,
-                ):
-
+class DataStore(
+    RoomMemberStore,
+    RoomStore,
+    RegistrationStore,
+    StreamStore,
+    ProfileStore,
+    PresenceStore,
+    TransactionStore,
+    DirectoryStore,
+    KeyStore,
+    StateStore,
+    SignatureStore,
+    ApplicationServiceStore,
+    EventsStore,
+    EventFederationStore,
+    MediaRepositoryStore,
+    RejectionsStore,
+    FilteringStore,
+    PusherStore,
+    PushRuleStore,
+    ApplicationServiceTransactionStore,
+    ReceiptsStore,
+    EndToEndKeyStore,
+    EndToEndRoomKeyStore,
+    SearchStore,
+    TagsStore,
+    AccountDataStore,
+    EventPushActionsStore,
+    OpenIdStore,
+    ClientIpStore,
+    DeviceStore,
+    DeviceInboxStore,
+    UserDirectoryStore,
+    GroupServerStore,
+    UserErasureStore,
+    MonthlyActiveUsersStore,
+):
     def __init__(self, db_conn, hs):
         self.hs = hs
         self._clock = hs.get_clock()
         self.database_engine = hs.database_engine
 
         self._stream_id_gen = StreamIdGenerator(
-            db_conn, "events", "stream_ordering",
-            extra_tables=[("local_invites", "stream_id")]
+            db_conn,
+            "events",
+            "stream_ordering",
+            extra_tables=[("local_invites", "stream_id")],
         )
         self._backfill_id_gen = StreamIdGenerator(
-            db_conn, "events", "stream_ordering", step=-1,
-            extra_tables=[("ex_outlier_stream", "event_stream_ordering")]
+            db_conn,
+            "events",
+            "stream_ordering",
+            step=-1,
+            extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
         )
         self._presence_id_gen = StreamIdGenerator(
             db_conn, "presence_stream", "stream_id"
@@ -114,7 +128,7 @@ class DataStore(RoomMemberStore, RoomStore,
             db_conn, "public_room_list_stream", "stream_id"
         )
         self._device_list_id_gen = StreamIdGenerator(
-            db_conn, "device_lists_stream", "stream_id",
+            db_conn, "device_lists_stream", "stream_id"
         )
 
         self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
@@ -125,16 +139,15 @@ class DataStore(RoomMemberStore, RoomStore,
             self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
         )
         self._pushers_id_gen = StreamIdGenerator(
-            db_conn, "pushers", "id",
-            extra_tables=[("deleted_pushers", "stream_id")],
+            db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
         )
         self._group_updates_id_gen = StreamIdGenerator(
-            db_conn, "local_group_updates", "stream_id",
+            db_conn, "local_group_updates", "stream_id"
         )
 
         if isinstance(self.database_engine, PostgresEngine):
             self._cache_id_gen = StreamIdGenerator(
-                db_conn, "cache_invalidation_stream", "stream_id",
+                db_conn, "cache_invalidation_stream", "stream_id"
             )
         else:
             self._cache_id_gen = None
@@ -142,72 +155,82 @@ class DataStore(RoomMemberStore, RoomStore,
         self._presence_on_startup = self._get_active_presence(db_conn)
 
         presence_cache_prefill, min_presence_val = self._get_cache_dict(
-            db_conn, "presence_stream",
+            db_conn,
+            "presence_stream",
             entity_column="user_id",
             stream_column="stream_id",
             max_value=self._presence_id_gen.get_current_token(),
         )
         self.presence_stream_cache = StreamChangeCache(
-            "PresenceStreamChangeCache", min_presence_val,
-            prefilled_cache=presence_cache_prefill
+            "PresenceStreamChangeCache",
+            min_presence_val,
+            prefilled_cache=presence_cache_prefill,
         )
 
         max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
         device_inbox_prefill, min_device_inbox_id = self._get_cache_dict(
-            db_conn, "device_inbox",
+            db_conn,
+            "device_inbox",
             entity_column="user_id",
             stream_column="stream_id",
             max_value=max_device_inbox_id,
             limit=1000,
         )
         self._device_inbox_stream_cache = StreamChangeCache(
-            "DeviceInboxStreamChangeCache", min_device_inbox_id,
+            "DeviceInboxStreamChangeCache",
+            min_device_inbox_id,
             prefilled_cache=device_inbox_prefill,
         )
         # The federation outbox and the local device inbox uses the same
         # stream_id generator.
         device_outbox_prefill, min_device_outbox_id = self._get_cache_dict(
-            db_conn, "device_federation_outbox",
+            db_conn,
+            "device_federation_outbox",
             entity_column="destination",
             stream_column="stream_id",
             max_value=max_device_inbox_id,
             limit=1000,
         )
         self._device_federation_outbox_stream_cache = StreamChangeCache(
-            "DeviceFederationOutboxStreamChangeCache", min_device_outbox_id,
+            "DeviceFederationOutboxStreamChangeCache",
+            min_device_outbox_id,
             prefilled_cache=device_outbox_prefill,
         )
 
         device_list_max = self._device_list_id_gen.get_current_token()
         self._device_list_stream_cache = StreamChangeCache(
-            "DeviceListStreamChangeCache", device_list_max,
+            "DeviceListStreamChangeCache", device_list_max
         )
         self._device_list_federation_stream_cache = StreamChangeCache(
-            "DeviceListFederationStreamChangeCache", device_list_max,
+            "DeviceListFederationStreamChangeCache", device_list_max
         )
 
         events_max = self._stream_id_gen.get_current_token()
         curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
-            db_conn, "current_state_delta_stream",
+            db_conn,
+            "current_state_delta_stream",
             entity_column="room_id",
             stream_column="stream_id",
             max_value=events_max,  # As we share the stream id with events token
             limit=1000,
         )
         self._curr_state_delta_stream_cache = StreamChangeCache(
-            "_curr_state_delta_stream_cache", min_curr_state_delta_id,
+            "_curr_state_delta_stream_cache",
+            min_curr_state_delta_id,
             prefilled_cache=curr_state_delta_prefill,
         )
 
         _group_updates_prefill, min_group_updates_id = self._get_cache_dict(
-            db_conn, "local_group_updates",
+            db_conn,
+            "local_group_updates",
             entity_column="user_id",
             stream_column="stream_id",
             max_value=self._group_updates_id_gen.get_current_token(),
             limit=1000,
         )
         self._group_updates_stream_cache = StreamChangeCache(
-            "_group_updates_stream_cache", min_group_updates_id,
+            "_group_updates_stream_cache",
+            min_group_updates_id,
             prefilled_cache=_group_updates_prefill,
         )
 
@@ -250,6 +273,7 @@ class DataStore(RoomMemberStore, RoomStore,
         """
         Counts the number of users who used this homeserver in the last 24 hours.
         """
+
         def _count_users(txn):
             yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
 
@@ -277,6 +301,7 @@ class DataStore(RoomMemberStore, RoomStore,
          Returns counts globaly for a given user as well as breaking
          by platform
         """
+
         def _count_r30_users(txn):
             thirty_days_in_secs = 86400 * 30
             now = int(self._clock.time())
@@ -313,8 +338,7 @@ class DataStore(RoomMemberStore, RoomStore,
             """
 
             results = {}
-            txn.execute(sql, (thirty_days_ago_in_secs,
-                              thirty_days_ago_in_secs))
+            txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
 
             for row in txn:
                 if row[0] == 'unknown':
@@ -341,8 +365,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 ) u
             """
 
-            txn.execute(sql, (thirty_days_ago_in_secs,
-                              thirty_days_ago_in_secs))
+            txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
 
             count, = txn.fetchone()
             results['all'] = count
@@ -356,15 +379,14 @@ class DataStore(RoomMemberStore, RoomStore,
         Returns millisecond unixtime for start of UTC day.
         """
         now = time.gmtime()
-        today_start = calendar.timegm((
-            now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0,
-        ))
+        today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
         return today_start * 1000
 
     def generate_user_daily_visits(self):
         """
         Generates daily visit data for use in cohort/ retention analysis
         """
+
         def _generate_user_daily_visits(txn):
             logger.info("Calling _generate_user_daily_visits")
             today_start = self._get_start_of_day()
@@ -395,25 +417,29 @@ class DataStore(RoomMemberStore, RoomStore,
             # often to minimise this case.
             if today_start > self._last_user_visit_update:
                 yesterday_start = today_start - a_day_in_milliseconds
-                txn.execute(sql, (
-                    yesterday_start, yesterday_start,
-                    self._last_user_visit_update, today_start
-                ))
+                txn.execute(
+                    sql,
+                    (
+                        yesterday_start,
+                        yesterday_start,
+                        self._last_user_visit_update,
+                        today_start,
+                    ),
+                )
                 self._last_user_visit_update = today_start
 
-            txn.execute(sql, (
-                today_start, today_start,
-                self._last_user_visit_update,
-                now
-            ))
+            txn.execute(
+                sql, (today_start, today_start, self._last_user_visit_update, now)
+            )
             # Update _last_user_visit_update to now. The reason to do this
             # rather just clamping to the beginning of the day is to limit
             # the size of the join - meaning that the query can be run more
             # frequently
             self._last_user_visit_update = now
 
-        return self.runInteraction("generate_user_daily_visits",
-                                   _generate_user_daily_visits)
+        return self.runInteraction(
+            "generate_user_daily_visits", _generate_user_daily_visits
+        )
 
     def get_users(self):
         """Function to reterive a list of users in users table.
@@ -425,15 +451,11 @@ class DataStore(RoomMemberStore, RoomStore,
         return self._simple_select_list(
             table="users",
             keyvalues={},
-            retcols=[
-                "name",
-                "password_hash",
-                "is_guest",
-                "admin"
-            ],
+            retcols=["name", "password_hash", "is_guest", "admin"],
             desc="get_users",
         )
 
+    @defer.inlineCallbacks
     def get_users_paginate(self, order, start, limit):
         """Function to reterive a paginated list of users from
         users list. This will return a json object, which contains
@@ -446,27 +468,19 @@ class DataStore(RoomMemberStore, RoomStore,
         Returns:
             defer.Deferred: resolves to json object {list[dict[str, Any]], count}
         """
-        is_guest = 0
-        i_start = (int)(start)
-        i_limit = (int)(limit)
-        return self.get_user_list_paginate(
+        users = yield self.runInteraction(
+            "get_users_paginate",
+            self._simple_select_list_paginate_txn,
             table="users",
-            keyvalues={
-                "is_guest": is_guest
-            },
-            pagevalues=[
-                order,
-                i_limit,
-                i_start
-            ],
-            retcols=[
-                "name",
-                "password_hash",
-                "is_guest",
-                "admin"
-            ],
-            desc="get_users_paginate",
+            keyvalues={"is_guest": False},
+            orderby=order,
+            start=start,
+            limit=limit,
+            retcols=["name", "password_hash", "is_guest", "admin"],
         )
+        count = yield self.runInteraction("get_users_paginate", self.get_user_count_txn)
+        retval = {"users": users, "total": count}
+        defer.returnValue(retval)
 
     def search_users(self, term):
         """Function to search users list for one or more users with
@@ -482,12 +496,7 @@ class DataStore(RoomMemberStore, RoomStore,
             table="users",
             term=term,
             col="name",
-            retcols=[
-                "name",
-                "password_hash",
-                "is_guest",
-                "admin"
-            ],
+            retcols=["name", "password_hash", "is_guest", "admin"],
             desc="search_users",
         )