diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 8774b3b388..5d35ca90b9 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
from _base import SQLBaseStore, _RollbackButIsFineException
from twisted.internet import defer, reactor
@@ -24,15 +23,23 @@ from synapse.util.logcontext import preserve_context_over_deferred
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
-from syutil.jsonutil import encode_json
+from canonicaljson import encode_canonical_json
from contextlib import contextmanager
import logging
+import math
import ujson as json
logger = logging.getLogger(__name__)
+def encode_json(json_object):
+ if USE_FROZEN_DICTS:
+ # ujson doesn't like frozen_dicts
+ return encode_canonical_json(json_object)
+ else:
+ return json.dumps(json_object, ensure_ascii=False)
+
# These values are used in the `enqueus_event` and `_do_fetch` methods to
# control how we batch/bulk fetch events from the database.
# The values are plucked out of thing air to make initial sync run faster
@@ -253,8 +260,7 @@ class EventsStore(SQLBaseStore):
)
metadata_json = encode_json(
- event.internal_metadata.get_dict(),
- using_frozen_dicts=USE_FROZEN_DICTS
+ event.internal_metadata.get_dict()
).decode("UTF-8")
sql = (
@@ -301,8 +307,14 @@ class EventsStore(SQLBaseStore):
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
self._store_room_topic_txn(txn, event)
+ elif event.type == EventTypes.Message:
+ self._store_room_message_txn(txn, event)
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
+ elif event.type == EventTypes.RoomHistoryVisibility:
+ self._store_history_visibility_txn(txn, event)
+ elif event.type == EventTypes.GuestAccess:
+ self._store_guest_access_txn(txn, event)
self._store_room_members_txn(
txn,
@@ -331,12 +343,9 @@ class EventsStore(SQLBaseStore):
"event_id": event.event_id,
"room_id": event.room_id,
"internal_metadata": encode_json(
- event.internal_metadata.get_dict(),
- using_frozen_dicts=USE_FROZEN_DICTS
- ).decode("UTF-8"),
- "json": encode_json(
- event_dict(event), using_frozen_dicts=USE_FROZEN_DICTS
+ event.internal_metadata.get_dict()
).decode("UTF-8"),
+ "json": encode_json(event_dict(event)).decode("UTF-8"),
}
for event, _ in events_and_contexts
],
@@ -355,9 +364,7 @@ class EventsStore(SQLBaseStore):
"type": event.type,
"processed": True,
"outlier": event.internal_metadata.is_outlier(),
- "content": encode_json(
- event.content, using_frozen_dicts=USE_FROZEN_DICTS
- ).decode("UTF-8"),
+ "content": encode_json(event.content).decode("UTF-8"),
}
for event, _ in events_and_contexts
],
@@ -824,7 +831,8 @@ class EventsStore(SQLBaseStore):
allow_none=True,
)
if prev:
- ev.unsigned["prev_content"] = prev.get_dict()["content"]
+ ev.unsigned["prev_content"] = prev.content
+ ev.unsigned["prev_sender"] = prev.sender
self._get_event_cache.prefill(
(ev.event_id, check_redacted, get_prev_content), ev
@@ -881,7 +889,8 @@ class EventsStore(SQLBaseStore):
get_prev_content=False,
)
if prev:
- ev.unsigned["prev_content"] = prev.get_dict()["content"]
+ ev.unsigned["prev_content"] = prev.content
+ ev.unsigned["prev_sender"] = prev.sender
self._get_event_cache.prefill(
(ev.event_id, check_redacted, get_prev_content), ev
@@ -889,18 +898,69 @@ class EventsStore(SQLBaseStore):
return ev
- def _parse_events(self, rows):
- return self.runInteraction(
- "_parse_events", self._parse_events_txn, rows
- )
-
def _parse_events_txn(self, txn, rows):
event_ids = [r["event_id"] for r in rows]
return self._get_events_txn(txn, event_ids)
- def _has_been_redacted_txn(self, txn, event):
- sql = "SELECT event_id FROM redactions WHERE redacts = ?"
- txn.execute(sql, (event.event_id,))
- result = txn.fetchone()
- return result[0] if result else None
+ @defer.inlineCallbacks
+ def count_daily_messages(self):
+ """
+ Returns an estimate of the number of messages sent in the last day.
+
+ If it has been significantly less or more than one day since the last
+ call to this function, it will return None.
+ """
+ def _count_messages(txn):
+ now = self.hs.get_clock().time()
+
+ txn.execute(
+ "SELECT reported_stream_token, reported_time FROM stats_reporting"
+ )
+ last_reported = self.cursor_to_dict(txn)
+
+ txn.execute(
+ "SELECT stream_ordering"
+ " FROM events"
+ " ORDER BY stream_ordering DESC"
+ " LIMIT 1"
+ )
+ now_reporting = self.cursor_to_dict(txn)
+ if not now_reporting:
+ return None
+ now_reporting = now_reporting[0]["stream_ordering"]
+
+ txn.execute("DELETE FROM stats_reporting")
+ txn.execute(
+ "INSERT INTO stats_reporting"
+ " (reported_stream_token, reported_time)"
+ " VALUES (?, ?)",
+ (now_reporting, now,)
+ )
+
+ if not last_reported:
+ return None
+
+ # Close enough to correct for our purposes.
+ yesterday = (now - 24 * 60 * 60)
+ if math.fabs(yesterday - last_reported[0]["reported_time"]) > 60 * 60:
+ return None
+
+ txn.execute(
+ "SELECT COUNT(*) as messages"
+ " FROM events NATURAL JOIN event_json"
+ " WHERE json like '%m.room.message%'"
+ " AND stream_ordering > ?"
+ " AND stream_ordering <= ?",
+ (
+ last_reported[0]["reported_stream_token"],
+ now_reporting,
+ )
+ )
+ rows = self.cursor_to_dict(txn)
+ if not rows:
+ return None
+ return rows[0]["messages"]
+
+ ret = yield self.runInteraction("count_messages", _count_messages)
+ defer.returnValue(ret)
|