diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py
index 9988a6d3fc..8eed590929 100644
--- a/synapse/storage/data_stores/main/event_push_actions.py
+++ b/synapse/storage/data_stores/main/event_push_actions.py
@@ -608,6 +608,23 @@ class EventPushActionsWorkerStore(SQLBaseStore):
return range_end
+ @defer.inlineCallbacks
+ def get_time_of_last_push_action_before(self, stream_ordering):
+ def f(txn):
+ sql = (
+ "SELECT e.received_ts"
+ " FROM event_push_actions AS ep"
+ " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
+ " WHERE ep.stream_ordering > ?"
+ " ORDER BY ep.stream_ordering ASC"
+ " LIMIT 1"
+ )
+ txn.execute(sql, (stream_ordering,))
+ return txn.fetchone()
+
+ result = yield self.db.runInteraction("get_time_of_last_push_action_before", f)
+ return result[0] if result else None
+
class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
@@ -736,23 +753,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
return push_actions
@defer.inlineCallbacks
- def get_time_of_last_push_action_before(self, stream_ordering):
- def f(txn):
- sql = (
- "SELECT e.received_ts"
- " FROM event_push_actions AS ep"
- " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
- " WHERE ep.stream_ordering > ?"
- " ORDER BY ep.stream_ordering ASC"
- " LIMIT 1"
- )
- txn.execute(sql, (stream_ordering,))
- return txn.fetchone()
-
- result = yield self.db.runInteraction("get_time_of_last_push_action_before", f)
- return result[0] if result else None
-
- @defer.inlineCallbacks
def get_latest_push_action_stream_ordering(self):
def f(txn):
txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 8ae23df00a..d593ef47b8 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -1168,7 +1168,11 @@ class EventsStore(
and original_event.internal_metadata.is_redacted()
):
# Redaction was allowed
- pruned_json = encode_json(prune_event_dict(original_event.get_dict()))
+ pruned_json = encode_json(
+ prune_event_dict(
+ original_event.room_version, original_event.get_dict()
+ )
+ )
else:
# Redaction wasn't allowed
pruned_json = None
@@ -1929,7 +1933,9 @@ class EventsStore(
return
# Prune the event's dict then convert it to JSON.
- pruned_json = encode_json(prune_event_dict(event.get_dict()))
+ pruned_json = encode_json(
+ prune_event_dict(event.room_version, event.get_dict())
+ )
# Update the event_json table to replace the event's JSON with the pruned
# JSON.
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 47a3a26072..ca237c6f12 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -28,9 +28,12 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.api.errors import NotFoundError
-from synapse.api.room_versions import EventFormatVersions
-from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401
-from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.api.room_versions import (
+ KNOWN_ROOM_VERSIONS,
+ EventFormatVersions,
+ RoomVersions,
+)
+from synapse.events import make_event_from_dict
from synapse.events.utils import prune_event
from synapse.logging.context import LoggingContext, PreserveLoggingContext
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -580,8 +583,49 @@ class EventsWorkerStore(SQLBaseStore):
# of a event format version, so it must be a V1 event.
format_version = EventFormatVersions.V1
- original_ev = event_type_from_format_version(format_version)(
+ room_version_id = row["room_version_id"]
+
+ if not room_version_id:
+ # this should only happen for out-of-band membership events
+ if not internal_metadata.get("out_of_band_membership"):
+ logger.warning(
+ "Room %s for event %s is unknown", d["room_id"], event_id
+ )
+ continue
+
+ # take a wild stab at the room version based on the event format
+ if format_version == EventFormatVersions.V1:
+ room_version = RoomVersions.V1
+ elif format_version == EventFormatVersions.V2:
+ room_version = RoomVersions.V3
+ else:
+ room_version = RoomVersions.V5
+ else:
+ room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
+ if not room_version:
+ logger.error(
+ "Event %s in room %s has unknown room version %s",
+ event_id,
+ d["room_id"],
+ room_version_id,
+ )
+ continue
+
+ if room_version.event_format != format_version:
+ logger.error(
+ "Event %s in room %s with version %s has wrong format: "
+ "expected %s, was %s",
+ event_id,
+ d["room_id"],
+ room_version_id,
+ room_version.event_format,
+ format_version,
+ )
+ continue
+
+ original_ev = make_event_from_dict(
event_dict=d,
+ room_version=room_version,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
@@ -661,6 +705,12 @@ class EventsWorkerStore(SQLBaseStore):
of EventFormatVersions. 'None' means the event predates
EventFormatVersions (so the event is format V1).
+ * room_version_id (str|None): The version of the room which contains the event.
+ Hopefully one of RoomVersions.
+
+ Due to historical reasons, there may be a few events in the database which
+ do not have an associated room; in this case None will be returned here.
+
* rejected_reason (str|None): if the event was rejected, the reason
why.
@@ -676,17 +726,18 @@ class EventsWorkerStore(SQLBaseStore):
"""
event_dict = {}
for evs in batch_iter(event_ids, 200):
- sql = (
- "SELECT "
- " e.event_id, "
- " e.internal_metadata,"
- " e.json,"
- " e.format_version, "
- " rej.reason "
- " FROM event_json as e"
- " LEFT JOIN rejections as rej USING (event_id)"
- " WHERE "
- )
+ sql = """\
+ SELECT
+ e.event_id,
+ e.internal_metadata,
+ e.json,
+ e.format_version,
+ r.room_version,
+ rej.reason
+ FROM event_json as e
+ LEFT JOIN rooms r USING (room_id)
+ LEFT JOIN rejections as rej USING (event_id)
+ WHERE """
clause, args = make_in_list_sql_clause(
txn.database_engine, "e.event_id", evs
@@ -701,7 +752,8 @@ class EventsWorkerStore(SQLBaseStore):
"internal_metadata": row[1],
"json": row[2],
"format_version": row[3],
- "rejected_reason": row[4],
+ "room_version_id": row[4],
+ "rejected_reason": row[5],
"redactions": [],
}
diff --git a/synapse/storage/data_stores/main/monthly_active_users.py b/synapse/storage/data_stores/main/monthly_active_users.py
index 1507a14e09..925bc5691b 100644
--- a/synapse/storage/data_stores/main/monthly_active_users.py
+++ b/synapse/storage/data_stores/main/monthly_active_users.py
@@ -43,13 +43,40 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
def _count_users(txn):
sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
-
txn.execute(sql)
(count,) = txn.fetchone()
return count
return self.db.runInteraction("count_users", _count_users)
+ @cached(num_args=0)
+ def get_monthly_active_count_by_service(self):
+ """Generates current count of monthly active users broken down by service.
+ A service is typically an appservice but also includes native matrix users.
+ Since the `monthly_active_users` table is populated from the `user_ips` table
+ `config.track_appservice_user_ips` must be set to `true` for this
+ method to return anything other than native matrix users.
+
+ Returns:
+ Deferred[dict]: dict that includes a mapping between app_service_id
+ and the number of occurrences.
+
+ """
+
+ def _count_users_by_service(txn):
+ sql = """
+ SELECT COALESCE(appservice_id, 'native'), COALESCE(count(*), 0)
+ FROM monthly_active_users
+ LEFT JOIN users ON monthly_active_users.user_id=users.name
+ GROUP BY appservice_id;
+ """
+
+ txn.execute(sql)
+ result = txn.fetchall()
+ return dict(result)
+
+ return self.db.runInteraction("count_users_by_service", _count_users_by_service)
+
@defer.inlineCallbacks
def get_registered_reserved_users(self):
"""Of the reserved threepids defined in config, which are associated
@@ -292,6 +319,9 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
self._invalidate_cache_and_stream(
+ txn, self.get_monthly_active_count_by_service, ()
+ )
+ self._invalidate_cache_and_stream(
txn, self.user_last_seen_monthly_active, (user_id,)
)
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 609db40616..e61595336c 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -29,7 +29,11 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.config.database import DatabaseConnectionConfig
-from synapse.logging.context import LoggingContext, make_deferred_yieldable
+from synapse.logging.context import (
+ LoggingContext,
+ LoggingContextOrSentinel,
+ make_deferred_yieldable,
+)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
@@ -543,7 +547,9 @@ class Database(object):
Returns:
Deferred: The result of func
"""
- parent_context = LoggingContext.current_context()
+ parent_context = (
+ LoggingContext.current_context()
+ ) # type: Optional[LoggingContextOrSentinel]
if parent_context == LoggingContext.sentinel:
logger.warning(
"Starting db connection from sentinel context: metrics will be lost"
|