diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ea6879c619..2b7340c1d9 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -38,7 +38,6 @@ from functools import wraps
import synapse.metrics
import logging
-import math
import ujson as json
# these are only included to make the type annotations work
@@ -399,6 +398,11 @@ class EventsStore(SQLBaseStore):
event_counter.inc(event.type, origin_type, origin_entity)
+ for room_id, (_, _, new_state) in current_state_for_room.iteritems():
+ self.get_current_state_ids.prefill(
+ (room_id, ), new_state
+ )
+
@defer.inlineCallbacks
def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
"""Calculates the new forward extremeties for a room given events to
@@ -447,10 +451,10 @@ class EventsStore(SQLBaseStore):
Assumes that we are only persisting events for one room at a time.
Returns:
- 2-tuple (to_delete, to_insert) where both are state dicts, i.e.
- (type, state_key) -> event_id. `to_delete` are the entries to
+ 3-tuple (to_delete, to_insert, new_state) where both are state dicts,
+ i.e. (type, state_key) -> event_id. `to_delete` are the entries to
first be deleted from current_state_events, `to_insert` are entries
- to insert.
+ to insert. `new_state` is the full set of state.
May return None if there are no changes to be applied.
"""
# Now we need to work out the different state sets for
@@ -557,7 +561,7 @@ class EventsStore(SQLBaseStore):
if ev_id in events_to_insert
}
- defer.returnValue((to_delete, to_insert))
+ defer.returnValue((to_delete, to_insert, current_state))
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
@@ -643,9 +647,10 @@ class EventsStore(SQLBaseStore):
list of the event ids which are the forward extremities.
"""
- self._update_current_state_txn(txn, current_state_for_room)
-
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
+
+ self._update_current_state_txn(txn, current_state_for_room, max_stream_order)
+
self._update_forward_extremities_txn(
txn,
new_forward_extremities=new_forward_extremeties,
@@ -708,9 +713,9 @@ class EventsStore(SQLBaseStore):
backfilled=backfilled,
)
- def _update_current_state_txn(self, txn, state_delta_by_room):
+ def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
for room_id, current_state_tuple in state_delta_by_room.iteritems():
- to_delete, to_insert = current_state_tuple
+ to_delete, to_insert, _ = current_state_tuple
txn.executemany(
"DELETE FROM current_state_events WHERE event_id = ?",
[(ev_id,) for ev_id in to_delete.itervalues()],
@@ -730,6 +735,29 @@ class EventsStore(SQLBaseStore):
],
)
+ state_deltas = {key: None for key in to_delete}
+ state_deltas.update(to_insert)
+
+ self._simple_insert_many_txn(
+ txn,
+ table="current_state_delta_stream",
+ values=[
+ {
+ "stream_id": max_stream_order,
+ "room_id": room_id,
+ "type": key[0],
+ "state_key": key[1],
+ "event_id": ev_id,
+ "prev_event_id": to_delete.get(key, None),
+ }
+ for key, ev_id in state_deltas.iteritems()
+ ]
+ )
+
+ self._curr_state_delta_stream_cache.entity_has_changed(
+ room_id, max_stream_order,
+ )
+
# Invalidate the various caches
# Figure out the changes of membership to invalidate the
@@ -738,11 +766,7 @@ class EventsStore(SQLBaseStore):
# and which we have added, then we invlidate the caches for all
# those users.
members_changed = set(
- state_key for ev_type, state_key in to_delete.iterkeys()
- if ev_type == EventTypes.Member
- )
- members_changed.update(
- state_key for ev_type, state_key in to_insert.iterkeys()
+ state_key for ev_type, state_key in state_deltas
if ev_type == EventTypes.Member
)
@@ -751,6 +775,11 @@ class EventsStore(SQLBaseStore):
txn, self.get_rooms_for_user, (member,)
)
+ for host in set(get_domain_from_id(u) for u in members_changed):
+ self._invalidate_cache_and_stream(
+ txn, self.is_host_joined, (room_id, host)
+ )
+
self._invalidate_cache_and_stream(
txn, self.get_users_in_room, (room_id,)
)
@@ -1115,6 +1144,7 @@ class EventsStore(SQLBaseStore):
}
for event, _ in events_and_contexts
for auth_id, _ in event.auth_events
+ if event.is_state()
],
)
@@ -1414,7 +1444,7 @@ class EventsStore(SQLBaseStore):
]
rows = self._new_transaction(
- conn, "do_fetch", [], None, self._fetch_event_rows, event_ids
+ conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids
)
row_dict = {
@@ -1594,66 +1624,52 @@ class EventsStore(SQLBaseStore):
call to this function, it will return None.
"""
def _count_messages(txn):
- now = self.hs.get_clock().time()
-
- txn.execute(
- "SELECT reported_stream_token, reported_time FROM stats_reporting"
- )
- last_reported = self.cursor_to_dict(txn)
+ sql = """
+ SELECT COALESCE(COUNT(*), 0) FROM events
+ WHERE type = 'm.room.message'
+ AND stream_ordering > ?
+ """
+ txn.execute(sql, (self.stream_ordering_day_ago,))
+ count, = txn.fetchone()
+ return count
- txn.execute(
- "SELECT stream_ordering"
- " FROM events"
- " ORDER BY stream_ordering DESC"
- " LIMIT 1"
- )
- now_reporting = self.cursor_to_dict(txn)
- if not now_reporting:
- logger.info("Calculating daily messages skipped; no now_reporting")
- return None
- now_reporting = now_reporting[0]["stream_ordering"]
-
- txn.execute("DELETE FROM stats_reporting")
- txn.execute(
- "INSERT INTO stats_reporting"
- " (reported_stream_token, reported_time)"
- " VALUES (?, ?)",
- (now_reporting, now,)
- )
+ ret = yield self.runInteraction("count_messages", _count_messages)
+ defer.returnValue(ret)
- if not last_reported:
- logger.info("Calculating daily messages skipped; no last_reported")
- return None
-
- # Close enough to correct for our purposes.
- yesterday = (now - 24 * 60 * 60)
- since_yesterday_seconds = yesterday - last_reported[0]["reported_time"]
- any_since_yesterday = math.fabs(since_yesterday_seconds) > 60 * 60
- if any_since_yesterday:
- logger.info(
- "Calculating daily messages skipped; since_yesterday_seconds: %d" %
- (since_yesterday_seconds,)
- )
- return None
-
- txn.execute(
- "SELECT COUNT(*) as messages"
- " FROM events NATURAL JOIN event_json"
- " WHERE json like '%m.room.message%'"
- " AND stream_ordering > ?"
- " AND stream_ordering <= ?",
- (
- last_reported[0]["reported_stream_token"],
- now_reporting,
- )
- )
- rows = self.cursor_to_dict(txn)
- if not rows:
- logger.info("Calculating daily messages skipped; messages count missing")
- return None
- return rows[0]["messages"]
+ @defer.inlineCallbacks
+ def count_daily_sent_messages(self):
+ def _count_messages(txn):
+ # This is good enough as if you have silly characters in your own
+ # hostname then thats your own fault.
+ like_clause = "%:" + self.hs.hostname
+
+ sql = """
+ SELECT COALESCE(COUNT(*), 0) FROM events
+ WHERE type = 'm.room.message'
+ AND sender LIKE ?
+ AND stream_ordering > ?
+ """
+
+ txn.execute(sql, (like_clause, self.stream_ordering_day_ago,))
+ count, = txn.fetchone()
+ return count
+
+ ret = yield self.runInteraction("count_daily_sent_messages", _count_messages)
+ defer.returnValue(ret)
- ret = yield self.runInteraction("count_messages", _count_messages)
+ @defer.inlineCallbacks
+ def count_daily_active_rooms(self):
+ def _count(txn):
+ sql = """
+ SELECT COALESCE(COUNT(DISTINCT room_id), 0) FROM events
+ WHERE type = 'm.room.message'
+ AND stream_ordering > ?
+ """
+ txn.execute(sql, (self.stream_ordering_day_ago,))
+ count, = txn.fetchone()
+ return count
+
+ ret = yield self.runInteraction("count_daily_active_rooms", _count)
defer.returnValue(ret)
@defer.inlineCallbacks
@@ -2253,6 +2269,24 @@ class EventsStore(SQLBaseStore):
defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"])))
+ def get_max_current_state_delta_stream_id(self):
+ return self._stream_id_gen.get_current_token()
+
+ def get_all_updated_current_state_deltas(self, from_token, to_token, limit):
+ def get_all_updated_current_state_deltas_txn(txn):
+ sql = """
+ SELECT stream_id, room_id, type, state_key, event_id
+ FROM current_state_delta_stream
+ WHERE ? < stream_id AND stream_id <= ?
+ ORDER BY stream_id ASC LIMIT ?
+ """
+ txn.execute(sql, (from_token, to_token, limit))
+ return txn.fetchall()
+ return self.runInteraction(
+ "get_all_updated_current_state_deltas",
+ get_all_updated_current_state_deltas_txn,
+ )
+
AllNewEventsResult = namedtuple("AllNewEventsResult", [
"new_forward_events", "new_backfill_events",
|