summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py28
-rw-r--r--synapse/storage/appservice.py2
-rw-r--r--synapse/storage/event_federation.py2
-rw-r--r--synapse/storage/events.py20
-rw-r--r--synapse/storage/roommember.py2
-rw-r--r--synapse/storage/schema/delta/50/make_event_content_nullable.py2
-rw-r--r--synapse/storage/signatures.py2
-rw-r--r--synapse/storage/stream.py2
8 files changed, 48 insertions, 12 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index ba88a54979..134e4a80f1 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -66,6 +66,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 PresenceStore, TransactionStore,
                 DirectoryStore, KeyStore, StateStore, SignatureStore,
                 ApplicationServiceStore,
+                EventsStore,
                 EventFederationStore,
                 MediaRepositoryStore,
                 RejectionsStore,
@@ -73,7 +74,6 @@ class DataStore(RoomMemberStore, RoomStore,
                 PusherStore,
                 PushRuleStore,
                 ApplicationServiceTransactionStore,
-                EventsStore,
                 ReceiptsStore,
                 EndToEndKeyStore,
                 SearchStore,
@@ -94,6 +94,7 @@ class DataStore(RoomMemberStore, RoomStore,
         self._clock = hs.get_clock()
         self.database_engine = hs.database_engine
 
+        self.db_conn = db_conn
         self._stream_id_gen = StreamIdGenerator(
             db_conn, "events", "stream_ordering",
             extra_tables=[("local_invites", "stream_id")]
@@ -266,6 +267,31 @@ class DataStore(RoomMemberStore, RoomStore,
 
         return self.runInteraction("count_users", _count_users)
 
+    def count_monthly_users(self):
+        """Counts the number of users who used this homeserver in the last 30 days
+
+        This method should be refactored with count_daily_users - the only
+        reason not to is waiting on definition of mau
+
+        Returns:
+            Defered[int]
+        """
+        def _count_monthly_users(txn):
+            thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
+            sql = """
+                SELECT COALESCE(count(*), 0) FROM (
+                    SELECT user_id FROM user_ips
+                    WHERE last_seen > ?
+                    GROUP BY user_id
+                ) u
+            """
+
+            txn.execute(sql, (thirty_days_ago,))
+            count, = txn.fetchone()
+            return count
+
+        return self.runInteraction("count_monthly_users", _count_monthly_users)
+
     def count_r30_users(self):
         """
         Counts the number of 30 day retained users, defined as:-
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 9f12b360bc..31248d5e06 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -22,7 +22,7 @@ from twisted.internet import defer
 
 from synapse.appservice import AppServiceTransaction
 from synapse.config.appservice import load_appservices
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
 
 from ._base import SQLBaseStore
 
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 5d3ee90017..8bd35df119 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -25,7 +25,7 @@ from twisted.internet import defer
 from synapse.api.errors import StoreError
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import SQLBaseStore
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
 from synapse.storage.signatures import SignatureWorkerStore
 from synapse.util.caches.descriptors import cached
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ee9159c102..e8e5a0fe44 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -34,6 +34,8 @@ from synapse.api.errors import SynapseError
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.background_updates import BackgroundUpdateStore
+from synapse.storage.event_federation import EventFederationStore
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util.async import ObservableDeferred
@@ -65,7 +67,13 @@ state_delta_reuse_delta_counter = Counter(
 
 
 def encode_json(json_object):
-    return frozendict_json_encoder.encode(json_object)
+    """
+    Encode a Python object as JSON and return it in a Unicode string.
+    """
+    out = frozendict_json_encoder.encode(json_object)
+    if isinstance(out, bytes):
+        out = out.decode('utf8')
+    return out
 
 
 class _EventPeristenceQueue(object):
@@ -193,7 +201,9 @@ def _retry_on_integrity_error(func):
     return f
 
 
-class EventsStore(EventsWorkerStore):
+# inherits from EventFederationStore so that we can call _update_backward_extremities
+# and _handle_mult_prev_events (though arguably those could both be moved in here)
+class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
     EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
 
@@ -1064,7 +1074,7 @@ class EventsStore(EventsWorkerStore):
 
                 metadata_json = encode_json(
                     event.internal_metadata.get_dict()
-                ).decode("UTF-8")
+                )
 
                 sql = (
                     "UPDATE event_json SET internal_metadata = ?"
@@ -1178,8 +1188,8 @@ class EventsStore(EventsWorkerStore):
                     "room_id": event.room_id,
                     "internal_metadata": encode_json(
                         event.internal_metadata.get_dict()
-                    ).decode("UTF-8"),
-                    "json": encode_json(event_dict(event)).decode("UTF-8"),
+                    ),
+                    "json": encode_json(event_dict(event)),
                 }
                 for event, _ in events_and_contexts
             ],
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 027bf8c85e..10dce21cea 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -24,7 +24,7 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import get_domain_from_id
 from synapse.util.async import Linearizer
 from synapse.util.caches import intern_string
diff --git a/synapse/storage/schema/delta/50/make_event_content_nullable.py b/synapse/storage/schema/delta/50/make_event_content_nullable.py
index 7d27342e39..6dd467b6c5 100644
--- a/synapse/storage/schema/delta/50/make_event_content_nullable.py
+++ b/synapse/storage/schema/delta/50/make_event_content_nullable.py
@@ -88,5 +88,5 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
         "UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'",
         (sql, ),
     )
-    cur.execute("PRAGMA schema_version=%i" % (oldver+1,))
+    cur.execute("PRAGMA schema_version=%i" % (oldver + 1,))
     cur.execute("PRAGMA writable_schema=OFF")
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 470212aa2a..5623391f6e 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -74,7 +74,7 @@ class SignatureWorkerStore(SQLBaseStore):
             txn (cursor):
             event_id (str): Id for the Event.
         Returns:
-            A dict of algorithm -> hash.
+            A dict[unicode, bytes] of algorithm -> hash.
         """
         query = (
             "SELECT algorithm, hash"
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 25d0097b58..b9f2b74ac6 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -43,7 +43,7 @@ from twisted.internet import defer
 
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.engines import PostgresEngine
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background