summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py28
-rw-r--r--synapse/storage/appservice.py2
-rw-r--r--synapse/storage/event_federation.py3
-rw-r--r--synapse/storage/events.py34
-rw-r--r--synapse/storage/events_worker.py20
-rw-r--r--synapse/storage/roommember.py2
-rw-r--r--synapse/storage/schema/delta/50/make_event_content_nullable.py2
-rw-r--r--synapse/storage/signatures.py2
-rw-r--r--synapse/storage/stream.py2
9 files changed, 75 insertions, 20 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index ba88a54979..134e4a80f1 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -66,6 +66,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 PresenceStore, TransactionStore,
                 DirectoryStore, KeyStore, StateStore, SignatureStore,
                 ApplicationServiceStore,
+                EventsStore,
                 EventFederationStore,
                 MediaRepositoryStore,
                 RejectionsStore,
@@ -73,7 +74,6 @@ class DataStore(RoomMemberStore, RoomStore,
                 PusherStore,
                 PushRuleStore,
                 ApplicationServiceTransactionStore,
-                EventsStore,
                 ReceiptsStore,
                 EndToEndKeyStore,
                 SearchStore,
@@ -94,6 +94,7 @@ class DataStore(RoomMemberStore, RoomStore,
         self._clock = hs.get_clock()
         self.database_engine = hs.database_engine
 
+        self.db_conn = db_conn
         self._stream_id_gen = StreamIdGenerator(
             db_conn, "events", "stream_ordering",
             extra_tables=[("local_invites", "stream_id")]
@@ -266,6 +267,31 @@ class DataStore(RoomMemberStore, RoomStore,
 
         return self.runInteraction("count_users", _count_users)
 
+    def count_monthly_users(self):
+        """Counts the number of users who used this homeserver in the last 30 days
+
+        This method should be refactored with count_daily_users - the only
+        reason not to is waiting on definition of mau
+
+        Returns:
+            Defered[int]
+        """
+        def _count_monthly_users(txn):
+            thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
+            sql = """
+                SELECT COALESCE(count(*), 0) FROM (
+                    SELECT user_id FROM user_ips
+                    WHERE last_seen > ?
+                    GROUP BY user_id
+                ) u
+            """
+
+            txn.execute(sql, (thirty_days_ago,))
+            count, = txn.fetchone()
+            return count
+
+        return self.runInteraction("count_monthly_users", _count_monthly_users)
+
     def count_r30_users(self):
         """
         Counts the number of 30 day retained users, defined as:-
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 9f12b360bc..31248d5e06 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -22,7 +22,7 @@ from twisted.internet import defer
 
 from synapse.appservice import AppServiceTransaction
 from synapse.config.appservice import load_appservices
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
 
 from ._base import SQLBaseStore
 
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 5d3ee90017..24345b20a6 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -25,7 +25,7 @@ from twisted.internet import defer
 from synapse.api.errors import StoreError
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import SQLBaseStore
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
 from synapse.storage.signatures import SignatureWorkerStore
 from synapse.util.caches.descriptors import cached
 
@@ -343,6 +343,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
                 table="events",
                 keyvalues={
                     "event_id": event_id,
+                    "room_id": room_id,
                 },
                 retcol="depth",
                 allow_none=True,
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2f482af3a1..e8e5a0fe44 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -34,6 +34,8 @@ from synapse.api.errors import SynapseError
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.background_updates import BackgroundUpdateStore
+from synapse.storage.event_federation import EventFederationStore
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util.async import ObservableDeferred
@@ -65,7 +67,13 @@ state_delta_reuse_delta_counter = Counter(
 
 
 def encode_json(json_object):
-    return frozendict_json_encoder.encode(json_object)
+    """
+    Encode a Python object as JSON and return it in a Unicode string.
+    """
+    out = frozendict_json_encoder.encode(json_object)
+    if isinstance(out, bytes):
+        out = out.decode('utf8')
+    return out
 
 
 class _EventPeristenceQueue(object):
@@ -193,7 +201,9 @@ def _retry_on_integrity_error(func):
     return f
 
 
-class EventsStore(EventsWorkerStore):
+# inherits from EventFederationStore so that we can call _update_backward_extremities
+# and _handle_mult_prev_events (though arguably those could both be moved in here)
+class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
     EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
 
@@ -231,12 +241,18 @@ class EventsStore(EventsWorkerStore):
 
         self._state_resolution_handler = hs.get_state_resolution_handler()
 
+    @defer.inlineCallbacks
     def persist_events(self, events_and_contexts, backfilled=False):
         """
         Write events to the database
         Args:
             events_and_contexts: list of tuples of (event, context)
-            backfilled: ?
+            backfilled (bool): Whether the results are retrieved from federation
+                via backfill or not. Used to determine if they're "new" events
+                which might update the current state etc.
+
+        Returns:
+            Deferred[int]: the stream ordering of the latest persisted event
         """
         partitioned = {}
         for event, ctx in events_and_contexts:
@@ -253,10 +269,14 @@ class EventsStore(EventsWorkerStore):
         for room_id in partitioned:
             self._maybe_start_persisting(room_id)
 
-        return make_deferred_yieldable(
+        yield make_deferred_yieldable(
             defer.gatherResults(deferreds, consumeErrors=True)
         )
 
+        max_persisted_id = yield self._stream_id_gen.get_current_token()
+
+        defer.returnValue(max_persisted_id)
+
     @defer.inlineCallbacks
     @log_function
     def persist_event(self, event, context, backfilled=False):
@@ -1054,7 +1074,7 @@ class EventsStore(EventsWorkerStore):
 
                 metadata_json = encode_json(
                     event.internal_metadata.get_dict()
-                ).decode("UTF-8")
+                )
 
                 sql = (
                     "UPDATE event_json SET internal_metadata = ?"
@@ -1168,8 +1188,8 @@ class EventsStore(EventsWorkerStore):
                     "room_id": event.room_id,
                     "internal_metadata": encode_json(
                         event.internal_metadata.get_dict()
-                    ).decode("UTF-8"),
-                    "json": encode_json(event_dict(event)).decode("UTF-8"),
+                    ),
+                    "json": encode_json(event_dict(event)),
                 }
                 for event, _ in events_and_contexts
             ],
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index f28239a808..9b4cfeb899 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -19,7 +19,7 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError
+from synapse.api.errors import NotFoundError
 # these are only included to make the type annotations work
 from synapse.events import EventBase  # noqa: F401
 from synapse.events import FrozenEvent
@@ -77,7 +77,7 @@ class EventsWorkerStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_event(self, event_id, check_redacted=True,
                   get_prev_content=False, allow_rejected=False,
-                  allow_none=False):
+                  allow_none=False, check_room_id=None):
         """Get an event from the database by event_id.
 
         Args:
@@ -88,7 +88,9 @@ class EventsWorkerStore(SQLBaseStore):
                 include the previous states content in the unsigned field.
             allow_rejected (bool): If True return rejected events.
             allow_none (bool): If True, return None if no event found, if
-                False throw an exception.
+                False throw a NotFoundError
+            check_room_id (str|None): if not None, check the room of the found event.
+                If there is a mismatch, behave as per allow_none.
 
         Returns:
             Deferred : A FrozenEvent.
@@ -100,10 +102,16 @@ class EventsWorkerStore(SQLBaseStore):
             allow_rejected=allow_rejected,
         )
 
-        if not events and not allow_none:
-            raise SynapseError(404, "Could not find event %s" % (event_id,))
+        event = events[0] if events else None
 
-        defer.returnValue(events[0] if events else None)
+        if event is not None and check_room_id is not None:
+            if event.room_id != check_room_id:
+                event = None
+
+        if event is None and not allow_none:
+            raise NotFoundError("Could not find event %s" % (event_id,))
+
+        defer.returnValue(event)
 
     @defer.inlineCallbacks
     def get_events(self, event_ids, check_redacted=True,
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 027bf8c85e..10dce21cea 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -24,7 +24,7 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import get_domain_from_id
 from synapse.util.async import Linearizer
 from synapse.util.caches import intern_string
diff --git a/synapse/storage/schema/delta/50/make_event_content_nullable.py b/synapse/storage/schema/delta/50/make_event_content_nullable.py
index 7d27342e39..6dd467b6c5 100644
--- a/synapse/storage/schema/delta/50/make_event_content_nullable.py
+++ b/synapse/storage/schema/delta/50/make_event_content_nullable.py
@@ -88,5 +88,5 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
         "UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'",
         (sql, ),
     )
-    cur.execute("PRAGMA schema_version=%i" % (oldver+1,))
+    cur.execute("PRAGMA schema_version=%i" % (oldver + 1,))
     cur.execute("PRAGMA writable_schema=OFF")
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 470212aa2a..5623391f6e 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -74,7 +74,7 @@ class SignatureWorkerStore(SQLBaseStore):
             txn (cursor):
             event_id (str): Id for the Event.
         Returns:
-            A dict of algorithm -> hash.
+            A dict[unicode, bytes] of algorithm -> hash.
         """
         query = (
             "SELECT algorithm, hash"
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 25d0097b58..b9f2b74ac6 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -43,7 +43,7 @@ from twisted.internet import defer
 
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.engines import PostgresEngine
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background