diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 7a7f841c6c..f631fb1733 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2018-2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,7 +17,7 @@
import itertools
import logging
-from collections import OrderedDict, deque, namedtuple
+from collections import Counter as c_counter, OrderedDict, deque, namedtuple
from functools import wraps
from six import iteritems, text_type
@@ -32,6 +33,7 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.metrics import BucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateResolutionStore
from synapse.storage.background_updates import BackgroundUpdateStore
@@ -219,42 +221,38 @@ class EventsStore(
EventsWorkerStore,
BackgroundUpdateStore,
):
- EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
- EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
-
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
- self.register_background_update_handler(
- self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
- )
- self.register_background_update_handler(
- self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
- self._background_reindex_fields_sender,
- )
- self.register_background_index_update(
- "event_contains_url_index",
- index_name="event_contains_url_index",
- table="events",
- columns=["room_id", "topological_ordering", "stream_ordering"],
- where_clause="contains_url = true AND outlier = false",
- )
+ self._event_persist_queue = _EventPeristenceQueue()
+ self._state_resolution_handler = hs.get_state_resolution_handler()
- # an event_id index on event_search is useful for the purge_history
- # api. Plus it means we get to enforce some integrity with a UNIQUE
- # clause
- self.register_background_index_update(
- "event_search_event_id_idx",
- index_name="event_search_event_id_idx",
- table="event_search",
- columns=["event_id"],
- unique=True,
- psql_only=True,
+ # Collect metrics on the number of forward extremities that exist.
+ # Counter of number of extremities to count
+ self._current_forward_extremities_amount = c_counter()
+
+ BucketCollector(
+ "synapse_forward_extremities",
+ lambda: self._current_forward_extremities_amount,
+ buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"]
)
- self._event_persist_queue = _EventPeristenceQueue()
+ # Read the extrems every 60 minutes
+ hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)
- self._state_resolution_handler = hs.get_state_resolution_handler()
+ @defer.inlineCallbacks
+ def _read_forward_extremities(self):
+ def fetch(txn):
+ txn.execute(
+ """
+ select count(*) c from event_forward_extremities
+ group by room_id
+ """
+ )
+ return txn.fetchall()
+
+ res = yield self.runInteraction("read_forward_extremities", fetch)
+ self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))
@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
@@ -554,10 +552,18 @@ class EventsStore(
e_id for event in new_events for e_id in event.prev_event_ids()
)
- # Finally, remove any events which are prev_events of any existing events.
+ # Remove any events which are prev_events of any existing events.
existing_prevs = yield self._get_events_which_are_prevs(result)
result.difference_update(existing_prevs)
+ # Finally handle the case where the new events have soft-failed prev
+ # events. If they do we need to remove them and their prev events,
+ # otherwise we end up with dangling extremities.
+ existing_prevs = yield self._get_prevs_before_rejected(
+ e_id for event in new_events for e_id in event.prev_event_ids()
+ )
+ result.difference_update(existing_prevs)
+
defer.returnValue(result)
@defer.inlineCallbacks
@@ -573,12 +579,13 @@ class EventsStore(
"""
results = []
- def _get_events(txn, batch):
+ def _get_events_which_are_prevs_txn(txn, batch):
sql = """
- SELECT prev_event_id
+ SELECT prev_event_id, internal_metadata
FROM event_edges
INNER JOIN events USING (event_id)
LEFT JOIN rejections USING (event_id)
+ LEFT JOIN event_json USING (event_id)
WHERE
prev_event_id IN (%s)
AND NOT events.outlier
@@ -588,14 +595,78 @@ class EventsStore(
)
txn.execute(sql, batch)
- results.extend(r[0] for r in txn)
+ results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
for chunk in batch_iter(event_ids, 100):
- yield self.runInteraction("_get_events_which_are_prevs", _get_events, chunk)
+ yield self.runInteraction(
+ "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
+ )
defer.returnValue(results)
@defer.inlineCallbacks
+ def _get_prevs_before_rejected(self, event_ids):
+ """Get soft-failed ancestors to remove from the extremities.
+
+ Given a set of events, find all those that have been soft-failed or
+ rejected. Returns those soft failed/rejected events and their prev
+ events (whether soft-failed/rejected or not), and recurses up the
+ prev-event graph until it finds no more soft-failed/rejected events.
+
+ This is used to find extremities that are ancestors of new events, but
+ are separated by soft failed events.
+
+ Args:
+ event_ids (Iterable[str]): Events to find prev events for. Note
+ that these must have already been persisted.
+
+ Returns:
+ Deferred[set[str]]
+ """
+
+ # The set of event_ids to return. This includes all soft-failed events
+ # and their prev events.
+ existing_prevs = set()
+
+ def _get_prevs_before_rejected_txn(txn, batch):
+ to_recursively_check = batch
+
+ while to_recursively_check:
+ sql = """
+ SELECT
+ event_id, prev_event_id, internal_metadata,
+ rejections.event_id IS NOT NULL
+ FROM event_edges
+ INNER JOIN events USING (event_id)
+ LEFT JOIN rejections USING (event_id)
+ LEFT JOIN event_json USING (event_id)
+ WHERE
+ event_id IN (%s)
+ AND NOT events.outlier
+ """ % (
+ ",".join("?" for _ in to_recursively_check),
+ )
+
+ txn.execute(sql, to_recursively_check)
+ to_recursively_check = []
+
+ for event_id, prev_event_id, metadata, rejected in txn:
+ if prev_event_id in existing_prevs:
+ continue
+
+ soft_failed = json.loads(metadata).get("soft_failed")
+ if soft_failed or rejected:
+ to_recursively_check.append(prev_event_id)
+ existing_prevs.add(prev_event_id)
+
+ for chunk in batch_iter(event_ids, 100):
+ yield self.runInteraction(
+ "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
+ )
+
+ defer.returnValue(existing_prevs)
+
+ @defer.inlineCallbacks
def _get_new_state_after_events(
self, room_id, events_context, old_latest_event_ids, new_latest_event_ids
):
@@ -1325,6 +1396,9 @@ class EventsStore(
txn, event.room_id, event.redacts
)
+ # Remove from relations table.
+ self._handle_redaction(txn, event.redacts)
+
# Update the event_forward_extremities, event_backward_extremities and
# event_edges tables.
self._handle_mult_prev_events(
@@ -1351,6 +1425,8 @@ class EventsStore(
# Insert into the event_search table.
self._store_guest_access_txn(txn, event)
+ self._handle_event_relations(txn, event)
+
# Insert into the room_memberships table.
self._store_room_members_txn(
txn,
@@ -1493,153 +1569,6 @@ class EventsStore(
ret = yield self.runInteraction("count_daily_active_rooms", _count)
defer.returnValue(ret)
- @defer.inlineCallbacks
- def _background_reindex_fields_sender(self, progress, batch_size):
- target_min_stream_id = progress["target_min_stream_id_inclusive"]
- max_stream_id = progress["max_stream_id_exclusive"]
- rows_inserted = progress.get("rows_inserted", 0)
-
- INSERT_CLUMP_SIZE = 1000
-
- def reindex_txn(txn):
- sql = (
- "SELECT stream_ordering, event_id, json FROM events"
- " INNER JOIN event_json USING (event_id)"
- " WHERE ? <= stream_ordering AND stream_ordering < ?"
- " ORDER BY stream_ordering DESC"
- " LIMIT ?"
- )
-
- txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
-
- rows = txn.fetchall()
- if not rows:
- return 0
-
- min_stream_id = rows[-1][0]
-
- update_rows = []
- for row in rows:
- try:
- event_id = row[1]
- event_json = json.loads(row[2])
- sender = event_json["sender"]
- content = event_json["content"]
-
- contains_url = "url" in content
- if contains_url:
- contains_url &= isinstance(content["url"], text_type)
- except (KeyError, AttributeError):
- # If the event is missing a necessary field then
- # skip over it.
- continue
-
- update_rows.append((sender, contains_url, event_id))
-
- sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
-
- for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
- clump = update_rows[index : index + INSERT_CLUMP_SIZE]
- txn.executemany(sql, clump)
-
- progress = {
- "target_min_stream_id_inclusive": target_min_stream_id,
- "max_stream_id_exclusive": min_stream_id,
- "rows_inserted": rows_inserted + len(rows),
- }
-
- self._background_update_progress_txn(
- txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
- )
-
- return len(rows)
-
- result = yield self.runInteraction(
- self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
- )
-
- if not result:
- yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
-
- defer.returnValue(result)
-
- @defer.inlineCallbacks
- def _background_reindex_origin_server_ts(self, progress, batch_size):
- target_min_stream_id = progress["target_min_stream_id_inclusive"]
- max_stream_id = progress["max_stream_id_exclusive"]
- rows_inserted = progress.get("rows_inserted", 0)
-
- INSERT_CLUMP_SIZE = 1000
-
- def reindex_search_txn(txn):
- sql = (
- "SELECT stream_ordering, event_id FROM events"
- " WHERE ? <= stream_ordering AND stream_ordering < ?"
- " ORDER BY stream_ordering DESC"
- " LIMIT ?"
- )
-
- txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
-
- rows = txn.fetchall()
- if not rows:
- return 0
-
- min_stream_id = rows[-1][0]
- event_ids = [row[1] for row in rows]
-
- rows_to_update = []
-
- chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
- for chunk in chunks:
- ev_rows = self._simple_select_many_txn(
- txn,
- table="event_json",
- column="event_id",
- iterable=chunk,
- retcols=["event_id", "json"],
- keyvalues={},
- )
-
- for row in ev_rows:
- event_id = row["event_id"]
- event_json = json.loads(row["json"])
- try:
- origin_server_ts = event_json["origin_server_ts"]
- except (KeyError, AttributeError):
- # If the event is missing a necessary field then
- # skip over it.
- continue
-
- rows_to_update.append((origin_server_ts, event_id))
-
- sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
-
- for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
- clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
- txn.executemany(sql, clump)
-
- progress = {
- "target_min_stream_id_inclusive": target_min_stream_id,
- "max_stream_id_exclusive": min_stream_id,
- "rows_inserted": rows_inserted + len(rows_to_update),
- }
-
- self._background_update_progress_txn(
- txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
- )
-
- return len(rows_to_update)
-
- result = yield self.runInteraction(
- self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
- )
-
- if not result:
- yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
-
- defer.returnValue(result)
-
def get_current_backfill_token(self):
"""The current minimum token that backfilled events have reached"""
return -self._backfill_id_gen.get_current_token()
@@ -1655,10 +1584,11 @@ class EventsStore(
def get_all_new_forward_event_rows(txn):
sql = (
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
- " state_key, redacts"
+ " state_key, redacts, relates_to_id"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
+ " LEFT JOIN event_relations USING (event_id)"
" WHERE ? < stream_ordering AND stream_ordering <= ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
@@ -1673,11 +1603,12 @@ class EventsStore(
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
- " state_key, redacts"
+ " state_key, redacts, relates_to_id"
" FROM events AS e"
" INNER JOIN ex_outlier_stream USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
+ " LEFT JOIN event_relations USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
" ORDER BY event_stream_ordering DESC"
@@ -1698,10 +1629,11 @@ class EventsStore(
def get_all_new_backfill_event_rows(txn):
sql = (
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
- " state_key, redacts"
+ " state_key, redacts, relates_to_id"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
+ " LEFT JOIN event_relations USING (event_id)"
" WHERE ? > stream_ordering AND stream_ordering >= ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
@@ -1716,11 +1648,12 @@ class EventsStore(
sql = (
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
- " state_key, redacts"
+ " state_key, redacts, relates_to_id"
" FROM events AS e"
" INNER JOIN ex_outlier_stream USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
+ " LEFT JOIN event_relations USING (event_id)"
" WHERE ? > event_stream_ordering"
" AND event_stream_ordering >= ?"
" ORDER BY event_stream_ordering DESC"
|