summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py108
1 files changed, 84 insertions, 24 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 8774b3b388..5d35ca90b9 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 from _base import SQLBaseStore, _RollbackButIsFineException
 
 from twisted.internet import defer, reactor
@@ -24,15 +23,23 @@ from synapse.util.logcontext import preserve_context_over_deferred
 from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
 
-from syutil.jsonutil import encode_json
+from canonicaljson import encode_canonical_json
 from contextlib import contextmanager
 
 import logging
+import math
 import ujson as json
 
 logger = logging.getLogger(__name__)
 
 
+def encode_json(json_object):
+    if USE_FROZEN_DICTS:
+        # ujson doesn't like frozen_dicts
+        return encode_canonical_json(json_object)
+    else:
+        return json.dumps(json_object, ensure_ascii=False)
+
 # These values are used in the `enqueus_event` and `_do_fetch` methods to
 # control how we batch/bulk fetch events from the database.
 # The values are plucked out of thing air to make initial sync run faster
@@ -253,8 +260,7 @@ class EventsStore(SQLBaseStore):
                 )
 
                 metadata_json = encode_json(
-                    event.internal_metadata.get_dict(),
-                    using_frozen_dicts=USE_FROZEN_DICTS
+                    event.internal_metadata.get_dict()
                 ).decode("UTF-8")
 
                 sql = (
@@ -301,8 +307,14 @@ class EventsStore(SQLBaseStore):
                 self._store_room_name_txn(txn, event)
             elif event.type == EventTypes.Topic:
                 self._store_room_topic_txn(txn, event)
+            elif event.type == EventTypes.Message:
+                self._store_room_message_txn(txn, event)
             elif event.type == EventTypes.Redaction:
                 self._store_redaction(txn, event)
+            elif event.type == EventTypes.RoomHistoryVisibility:
+                self._store_history_visibility_txn(txn, event)
+            elif event.type == EventTypes.GuestAccess:
+                self._store_guest_access_txn(txn, event)
 
         self._store_room_members_txn(
             txn,
@@ -331,12 +343,9 @@ class EventsStore(SQLBaseStore):
                     "event_id": event.event_id,
                     "room_id": event.room_id,
                     "internal_metadata": encode_json(
-                        event.internal_metadata.get_dict(),
-                        using_frozen_dicts=USE_FROZEN_DICTS
-                    ).decode("UTF-8"),
-                    "json": encode_json(
-                        event_dict(event), using_frozen_dicts=USE_FROZEN_DICTS
+                        event.internal_metadata.get_dict()
                     ).decode("UTF-8"),
+                    "json": encode_json(event_dict(event)).decode("UTF-8"),
                 }
                 for event, _ in events_and_contexts
             ],
@@ -355,9 +364,7 @@ class EventsStore(SQLBaseStore):
                     "type": event.type,
                     "processed": True,
                     "outlier": event.internal_metadata.is_outlier(),
-                    "content": encode_json(
-                        event.content, using_frozen_dicts=USE_FROZEN_DICTS
-                    ).decode("UTF-8"),
+                    "content": encode_json(event.content).decode("UTF-8"),
                 }
                 for event, _ in events_and_contexts
             ],
@@ -824,7 +831,8 @@ class EventsStore(SQLBaseStore):
                 allow_none=True,
             )
             if prev:
-                ev.unsigned["prev_content"] = prev.get_dict()["content"]
+                ev.unsigned["prev_content"] = prev.content
+                ev.unsigned["prev_sender"] = prev.sender
 
         self._get_event_cache.prefill(
             (ev.event_id, check_redacted, get_prev_content), ev
@@ -881,7 +889,8 @@ class EventsStore(SQLBaseStore):
                 get_prev_content=False,
             )
             if prev:
-                ev.unsigned["prev_content"] = prev.get_dict()["content"]
+                ev.unsigned["prev_content"] = prev.content
+                ev.unsigned["prev_sender"] = prev.sender
 
         self._get_event_cache.prefill(
             (ev.event_id, check_redacted, get_prev_content), ev
@@ -889,18 +898,69 @@ class EventsStore(SQLBaseStore):
 
         return ev
 
-    def _parse_events(self, rows):
-        return self.runInteraction(
-            "_parse_events", self._parse_events_txn, rows
-        )
-
     def _parse_events_txn(self, txn, rows):
         event_ids = [r["event_id"] for r in rows]
 
         return self._get_events_txn(txn, event_ids)
 
-    def _has_been_redacted_txn(self, txn, event):
-        sql = "SELECT event_id FROM redactions WHERE redacts = ?"
-        txn.execute(sql, (event.event_id,))
-        result = txn.fetchone()
-        return result[0] if result else None
+    @defer.inlineCallbacks
+    def count_daily_messages(self):
+        """
+        Returns an estimate of the number of messages sent in the last day.
+
+        If it has been significantly less or more than one day since the last
+        call to this function, it will return None.
+        """
+        def _count_messages(txn):
+            now = self.hs.get_clock().time()
+
+            txn.execute(
+                "SELECT reported_stream_token, reported_time FROM stats_reporting"
+            )
+            last_reported = self.cursor_to_dict(txn)
+
+            txn.execute(
+                "SELECT stream_ordering"
+                " FROM events"
+                " ORDER BY stream_ordering DESC"
+                " LIMIT 1"
+            )
+            now_reporting = self.cursor_to_dict(txn)
+            if not now_reporting:
+                return None
+            now_reporting = now_reporting[0]["stream_ordering"]
+
+            txn.execute("DELETE FROM stats_reporting")
+            txn.execute(
+                "INSERT INTO stats_reporting"
+                " (reported_stream_token, reported_time)"
+                " VALUES (?, ?)",
+                (now_reporting, now,)
+            )
+
+            if not last_reported:
+                return None
+
+            # Close enough to correct for our purposes.
+            yesterday = (now - 24 * 60 * 60)
+            if math.fabs(yesterday - last_reported[0]["reported_time"]) > 60 * 60:
+                return None
+
+            txn.execute(
+                "SELECT COUNT(*) as messages"
+                " FROM events NATURAL JOIN event_json"
+                " WHERE json like '%m.room.message%'"
+                " AND stream_ordering > ?"
+                " AND stream_ordering <= ?",
+                (
+                    last_reported[0]["reported_stream_token"],
+                    now_reporting,
+                )
+            )
+            rows = self.cursor_to_dict(txn)
+            if not rows:
+                return None
+            return rows[0]["messages"]
+
+        ret = yield self.runInteraction("count_messages", _count_messages)
+        defer.returnValue(ret)