diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 26 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 1 | ||||
-rw-r--r-- | synapse/storage/events.py | 14 | ||||
-rw-r--r-- | synapse/storage/events_worker.py | 20 | ||||
-rw-r--r-- | synapse/storage/schema/delta/50/make_event_content_nullable.py | 2 |
5 files changed, 54 insertions, 9 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 3bd63cd195..134e4a80f1 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -94,6 +94,7 @@ class DataStore(RoomMemberStore, RoomStore, self._clock = hs.get_clock() self.database_engine = hs.database_engine + self.db_conn = db_conn self._stream_id_gen = StreamIdGenerator( db_conn, "events", "stream_ordering", extra_tables=[("local_invites", "stream_id")] @@ -266,6 +267,31 @@ class DataStore(RoomMemberStore, RoomStore, return self.runInteraction("count_users", _count_users) + def count_monthly_users(self): + """Counts the number of users who used this homeserver in the last 30 days + + This method should be refactored with count_daily_users - the only + reason not to is waiting on definition of mau + + Returns: + Defered[int] + """ + def _count_monthly_users(txn): + thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30) + sql = """ + SELECT COALESCE(count(*), 0) FROM ( + SELECT user_id FROM user_ips + WHERE last_seen > ? + GROUP BY user_id + ) u + """ + + txn.execute(sql, (thirty_days_ago,)) + count, = txn.fetchone() + return count + + return self.runInteraction("count_monthly_users", _count_monthly_users) + def count_r30_users(self): """ Counts the number of 30 day retained users, defined as:- diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 8bd35df119..24345b20a6 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -343,6 +343,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, table="events", keyvalues={ "event_id": event_id, + "room_id": room_id, }, retcol="depth", allow_none=True, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 61223da1a5..e8e5a0fe44 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -241,12 +241,18 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore self._state_resolution_handler = hs.get_state_resolution_handler() + @defer.inlineCallbacks def persist_events(self, events_and_contexts, backfilled=False): """ Write events to the database Args: events_and_contexts: list of tuples of (event, context) - backfilled: ? + backfilled (bool): Whether the results are retrieved from federation + via backfill or not. Used to determine if they're "new" events + which might update the current state etc. + + Returns: + Deferred[int]: the stream ordering of the latest persisted event """ partitioned = {} for event, ctx in events_and_contexts: @@ -263,10 +269,14 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore for room_id in partitioned: self._maybe_start_persisting(room_id) - return make_deferred_yieldable( + yield make_deferred_yieldable( defer.gatherResults(deferreds, consumeErrors=True) ) + max_persisted_id = yield self._stream_id_gen.get_current_token() + + defer.returnValue(max_persisted_id) + @defer.inlineCallbacks @log_function def persist_event(self, event, context, backfilled=False): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index f28239a808..9b4cfeb899 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -19,7 +19,7 @@ from canonicaljson import json from twisted.internet import defer -from synapse.api.errors import SynapseError +from synapse.api.errors import NotFoundError # these are only included to make the type annotations work from synapse.events import EventBase # noqa: F401 from synapse.events import FrozenEvent @@ -77,7 +77,7 @@ class EventsWorkerStore(SQLBaseStore): @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False, - allow_none=False): + allow_none=False, check_room_id=None): """Get an event from the database by event_id. Args: @@ -88,7 +88,9 @@ class EventsWorkerStore(SQLBaseStore): include the previous states content in the unsigned field. allow_rejected (bool): If True return rejected events. allow_none (bool): If True, return None if no event found, if - False throw an exception. + False throw a NotFoundError + check_room_id (str|None): if not None, check the room of the found event. + If there is a mismatch, behave as per allow_none. Returns: Deferred : A FrozenEvent. @@ -100,10 +102,16 @@ class EventsWorkerStore(SQLBaseStore): allow_rejected=allow_rejected, ) - if not events and not allow_none: - raise SynapseError(404, "Could not find event %s" % (event_id,)) + event = events[0] if events else None - defer.returnValue(events[0] if events else None) + if event is not None and check_room_id is not None: + if event.room_id != check_room_id: + event = None + + if event is None and not allow_none: + raise NotFoundError("Could not find event %s" % (event_id,)) + + defer.returnValue(event) @defer.inlineCallbacks def get_events(self, event_ids, check_redacted=True, diff --git a/synapse/storage/schema/delta/50/make_event_content_nullable.py b/synapse/storage/schema/delta/50/make_event_content_nullable.py index 7d27342e39..6dd467b6c5 100644 --- a/synapse/storage/schema/delta/50/make_event_content_nullable.py +++ b/synapse/storage/schema/delta/50/make_event_content_nullable.py @@ -88,5 +88,5 @@ def run_upgrade(cur, database_engine, *args, **kwargs): "UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'", (sql, ), ) - cur.execute("PRAGMA schema_version=%i" % (oldver+1,)) + cur.execute("PRAGMA schema_version=%i" % (oldver + 1,)) cur.execute("PRAGMA writable_schema=OFF") |