diff options
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 178 |
1 files changed, 106 insertions, 72 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ea6879c619..2b7340c1d9 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -38,7 +38,6 @@ from functools import wraps import synapse.metrics import logging -import math import ujson as json # these are only included to make the type annotations work @@ -399,6 +398,11 @@ class EventsStore(SQLBaseStore): event_counter.inc(event.type, origin_type, origin_entity) + for room_id, (_, _, new_state) in current_state_for_room.iteritems(): + self.get_current_state_ids.prefill( + (room_id, ), new_state + ) + @defer.inlineCallbacks def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): """Calculates the new forward extremeties for a room given events to @@ -447,10 +451,10 @@ class EventsStore(SQLBaseStore): Assumes that we are only persisting events for one room at a time. Returns: - 2-tuple (to_delete, to_insert) where both are state dicts, i.e. - (type, state_key) -> event_id. `to_delete` are the entries to + 3-tuple (to_delete, to_insert, new_state) where both are state dicts, + i.e. (type, state_key) -> event_id. `to_delete` are the entries to first be deleted from current_state_events, `to_insert` are entries - to insert. + to insert. `new_state` is the full set of state. May return None if there are no changes to be applied. """ # Now we need to work out the different state sets for @@ -557,7 +561,7 @@ class EventsStore(SQLBaseStore): if ev_id in events_to_insert } - defer.returnValue((to_delete, to_insert)) + defer.returnValue((to_delete, to_insert, current_state)) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, @@ -643,9 +647,10 @@ class EventsStore(SQLBaseStore): list of the event ids which are the forward extremities. """ - self._update_current_state_txn(txn, current_state_for_room) - max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering + + self._update_current_state_txn(txn, current_state_for_room, max_stream_order) + self._update_forward_extremities_txn( txn, new_forward_extremities=new_forward_extremeties, @@ -708,9 +713,9 @@ class EventsStore(SQLBaseStore): backfilled=backfilled, ) - def _update_current_state_txn(self, txn, state_delta_by_room): + def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): for room_id, current_state_tuple in state_delta_by_room.iteritems(): - to_delete, to_insert = current_state_tuple + to_delete, to_insert, _ = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", [(ev_id,) for ev_id in to_delete.itervalues()], @@ -730,6 +735,29 @@ class EventsStore(SQLBaseStore): ], ) + state_deltas = {key: None for key in to_delete} + state_deltas.update(to_insert) + + self._simple_insert_many_txn( + txn, + table="current_state_delta_stream", + values=[ + { + "stream_id": max_stream_order, + "room_id": room_id, + "type": key[0], + "state_key": key[1], + "event_id": ev_id, + "prev_event_id": to_delete.get(key, None), + } + for key, ev_id in state_deltas.iteritems() + ] + ) + + self._curr_state_delta_stream_cache.entity_has_changed( + room_id, max_stream_order, + ) + # Invalidate the various caches # Figure out the changes of membership to invalidate the @@ -738,11 +766,7 @@ class EventsStore(SQLBaseStore): # and which we have added, then we invlidate the caches for all # those users. members_changed = set( - state_key for ev_type, state_key in to_delete.iterkeys() - if ev_type == EventTypes.Member - ) - members_changed.update( - state_key for ev_type, state_key in to_insert.iterkeys() + state_key for ev_type, state_key in state_deltas if ev_type == EventTypes.Member ) @@ -751,6 +775,11 @@ class EventsStore(SQLBaseStore): txn, self.get_rooms_for_user, (member,) ) + for host in set(get_domain_from_id(u) for u in members_changed): + self._invalidate_cache_and_stream( + txn, self.is_host_joined, (room_id, host) + ) + self._invalidate_cache_and_stream( txn, self.get_users_in_room, (room_id,) ) @@ -1115,6 +1144,7 @@ class EventsStore(SQLBaseStore): } for event, _ in events_and_contexts for auth_id, _ in event.auth_events + if event.is_state() ], ) @@ -1414,7 +1444,7 @@ class EventsStore(SQLBaseStore): ] rows = self._new_transaction( - conn, "do_fetch", [], None, self._fetch_event_rows, event_ids + conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids ) row_dict = { @@ -1594,66 +1624,52 @@ class EventsStore(SQLBaseStore): call to this function, it will return None. """ def _count_messages(txn): - now = self.hs.get_clock().time() - - txn.execute( - "SELECT reported_stream_token, reported_time FROM stats_reporting" - ) - last_reported = self.cursor_to_dict(txn) + sql = """ + SELECT COALESCE(COUNT(*), 0) FROM events + WHERE type = 'm.room.message' + AND stream_ordering > ? + """ + txn.execute(sql, (self.stream_ordering_day_ago,)) + count, = txn.fetchone() + return count - txn.execute( - "SELECT stream_ordering" - " FROM events" - " ORDER BY stream_ordering DESC" - " LIMIT 1" - ) - now_reporting = self.cursor_to_dict(txn) - if not now_reporting: - logger.info("Calculating daily messages skipped; no now_reporting") - return None - now_reporting = now_reporting[0]["stream_ordering"] - - txn.execute("DELETE FROM stats_reporting") - txn.execute( - "INSERT INTO stats_reporting" - " (reported_stream_token, reported_time)" - " VALUES (?, ?)", - (now_reporting, now,) - ) + ret = yield self.runInteraction("count_messages", _count_messages) + defer.returnValue(ret) - if not last_reported: - logger.info("Calculating daily messages skipped; no last_reported") - return None - - # Close enough to correct for our purposes. - yesterday = (now - 24 * 60 * 60) - since_yesterday_seconds = yesterday - last_reported[0]["reported_time"] - any_since_yesterday = math.fabs(since_yesterday_seconds) > 60 * 60 - if any_since_yesterday: - logger.info( - "Calculating daily messages skipped; since_yesterday_seconds: %d" % - (since_yesterday_seconds,) - ) - return None - - txn.execute( - "SELECT COUNT(*) as messages" - " FROM events NATURAL JOIN event_json" - " WHERE json like '%m.room.message%'" - " AND stream_ordering > ?" - " AND stream_ordering <= ?", - ( - last_reported[0]["reported_stream_token"], - now_reporting, - ) - ) - rows = self.cursor_to_dict(txn) - if not rows: - logger.info("Calculating daily messages skipped; messages count missing") - return None - return rows[0]["messages"] + @defer.inlineCallbacks + def count_daily_sent_messages(self): + def _count_messages(txn): + # This is good enough as if you have silly characters in your own + # hostname then thats your own fault. + like_clause = "%:" + self.hs.hostname + + sql = """ + SELECT COALESCE(COUNT(*), 0) FROM events + WHERE type = 'm.room.message' + AND sender LIKE ? + AND stream_ordering > ? + """ + + txn.execute(sql, (like_clause, self.stream_ordering_day_ago,)) + count, = txn.fetchone() + return count + + ret = yield self.runInteraction("count_daily_sent_messages", _count_messages) + defer.returnValue(ret) - ret = yield self.runInteraction("count_messages", _count_messages) + @defer.inlineCallbacks + def count_daily_active_rooms(self): + def _count(txn): + sql = """ + SELECT COALESCE(COUNT(DISTINCT room_id), 0) FROM events + WHERE type = 'm.room.message' + AND stream_ordering > ? + """ + txn.execute(sql, (self.stream_ordering_day_ago,)) + count, = txn.fetchone() + return count + + ret = yield self.runInteraction("count_daily_active_rooms", _count) defer.returnValue(ret) @defer.inlineCallbacks @@ -2253,6 +2269,24 @@ class EventsStore(SQLBaseStore): defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"]))) + def get_max_current_state_delta_stream_id(self): + return self._stream_id_gen.get_current_token() + + def get_all_updated_current_state_deltas(self, from_token, to_token, limit): + def get_all_updated_current_state_deltas_txn(txn): + sql = """ + SELECT stream_id, room_id, type, state_key, event_id + FROM current_state_delta_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC LIMIT ? + """ + txn.execute(sql, (from_token, to_token, limit)) + return txn.fetchall() + return self.runInteraction( + "get_all_updated_current_state_deltas", + get_all_updated_current_state_deltas_txn, + ) + AllNewEventsResult = namedtuple("AllNewEventsResult", [ "new_forward_events", "new_backfill_events", |