diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 4655669ba0..d2feee8dbb 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -23,9 +23,14 @@ from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
+from synapse.api.errors import SynapseError
from canonicaljson import encode_canonical_json
-from collections import deque, namedtuple
+from collections import deque, namedtuple, OrderedDict
+from functools import wraps
+
+import synapse
+import synapse.metrics
import logging
@@ -35,6 +40,10 @@ import ujson as json
logger = logging.getLogger(__name__)
+metrics = synapse.metrics.get_metrics_for(__name__)
+persist_event_counter = metrics.register_counter("persisted_events")
+
+
def encode_json(json_object):
if USE_FROZEN_DICTS:
# ujson doesn't like frozen_dicts
@@ -139,8 +148,32 @@ class _EventPeristenceQueue(object):
pass
+_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
+
+
+def _retry_on_integrity_error(func):
+ """Wraps a database function so that it gets retried on IntegrityError,
+ with `delete_existing=True` passed in.
+
+ Args:
+ func: function that returns a Deferred and accepts a `delete_existing` arg
+ """
+ @wraps(func)
+ @defer.inlineCallbacks
+ def f(self, *args, **kwargs):
+ try:
+ res = yield func(self, *args, **kwargs)
+ except self.database_engine.module.IntegrityError:
+ logger.exception("IntegrityError, retrying.")
+ res = yield func(self, *args, delete_existing=True, **kwargs)
+ defer.returnValue(res)
+
+ return f
+
+
class EventsStore(SQLBaseStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
+ EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
def __init__(self, hs):
super(EventsStore, self).__init__(hs)
@@ -148,6 +181,10 @@ class EventsStore(SQLBaseStore):
self.register_background_update_handler(
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
)
+ self.register_background_update_handler(
+ self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
+ self._background_reindex_fields_sender,
+ )
self._event_persist_queue = _EventPeristenceQueue()
@@ -213,8 +250,10 @@ class EventsStore(SQLBaseStore):
self._event_persist_queue.handle_queue(room_id, persisting_queue)
+ @_retry_on_integrity_error
@defer.inlineCallbacks
- def _persist_events(self, events_and_contexts, backfilled=False):
+ def _persist_events(self, events_and_contexts, backfilled=False,
+ delete_existing=False):
if not events_and_contexts:
return
@@ -257,11 +296,15 @@ class EventsStore(SQLBaseStore):
self._persist_events_txn,
events_and_contexts=chunk,
backfilled=backfilled,
+ delete_existing=delete_existing,
)
+ persist_event_counter.inc_by(len(chunk))
+ @_retry_on_integrity_error
@defer.inlineCallbacks
@log_function
- def _persist_event(self, event, context, current_state=None, backfilled=False):
+ def _persist_event(self, event, context, current_state=None, backfilled=False,
+ delete_existing=False):
try:
with self._stream_id_gen.get_next() as stream_ordering:
with self._state_groups_id_gen.get_next() as state_group_id:
@@ -274,7 +317,9 @@ class EventsStore(SQLBaseStore):
context=context,
current_state=current_state,
backfilled=backfilled,
+ delete_existing=delete_existing,
)
+ persist_event_counter.inc()
except _RollbackButIsFineException:
pass
@@ -305,7 +350,7 @@ class EventsStore(SQLBaseStore):
)
if not events and not allow_none:
- raise RuntimeError("Could not find event %s" % (event_id,))
+ raise SynapseError(404, "Could not find event %s" % (event_id,))
defer.returnValue(events[0] if events else None)
@@ -335,18 +380,15 @@ class EventsStore(SQLBaseStore):
defer.returnValue({e.event_id: e for e in events})
@log_function
- def _persist_event_txn(self, txn, event, context, current_state, backfilled=False):
+ def _persist_event_txn(self, txn, event, context, current_state, backfilled=False,
+ delete_existing=False):
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
- txn.call_after(
- self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
- )
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
- txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
# Add an entry to the current_state_resets table to record the point
# where we clobbered the current state
@@ -379,10 +421,38 @@ class EventsStore(SQLBaseStore):
txn,
[(event, context)],
backfilled=backfilled,
+ delete_existing=delete_existing,
)
@log_function
- def _persist_events_txn(self, txn, events_and_contexts, backfilled):
+ def _persist_events_txn(self, txn, events_and_contexts, backfilled,
+ delete_existing=False):
+ """Insert some number of room events into the necessary database tables.
+
+ Rejected events are only inserted into the events table, the events_json table,
+ and the rejections table. Things reading from those table will need to check
+ whether the event was rejected.
+
+ If delete_existing is True then existing events will be purged from the
+ database before insertion. This is useful when retrying due to IntegrityError.
+ """
+ # Ensure that we don't have the same event twice.
+ # Pick the earliest non-outlier if there is one, else the earliest one.
+ new_events_and_contexts = OrderedDict()
+ for event, context in events_and_contexts:
+ prev_event_context = new_events_and_contexts.get(event.event_id)
+ if prev_event_context:
+ if not event.internal_metadata.is_outlier():
+ if prev_event_context[0].internal_metadata.is_outlier():
+ # To ensure correct ordering we pop, as OrderedDict is
+ # ordered by first insertion.
+ new_events_and_contexts.pop(event.event_id, None)
+ new_events_and_contexts[event.event_id] = (event, context)
+ else:
+ new_events_and_contexts[event.event_id] = (event, context)
+
+ events_and_contexts = new_events_and_contexts.values()
+
depth_updates = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
@@ -393,21 +463,11 @@ class EventsStore(SQLBaseStore):
event.room_id, event.internal_metadata.stream_ordering,
)
- if not event.internal_metadata.is_outlier():
+ if not event.internal_metadata.is_outlier() and not context.rejected:
depth_updates[event.room_id] = max(
event.depth, depth_updates.get(event.room_id, event.depth)
)
- if context.push_actions:
- self._set_push_actions_for_event_and_users_txn(
- txn, event, context.push_actions
- )
-
- if event.type == EventTypes.Redaction and event.redacts is not None:
- self._remove_push_actions_for_event_id_txn(
- txn, event.room_id, event.redacts
- )
-
for room_id, depth in depth_updates.items():
self._update_min_depth_for_room_txn(txn, room_id, depth)
@@ -417,30 +477,21 @@ class EventsStore(SQLBaseStore):
),
[event.event_id for event, _ in events_and_contexts]
)
+
have_persisted = {
event_id: outlier
for event_id, outlier in txn.fetchall()
}
- event_map = {}
to_remove = set()
for event, context in events_and_contexts:
- # Handle the case of the list including the same event multiple
- # times. The tricky thing here is when they differ by whether
- # they are an outlier.
- if event.event_id in event_map:
- other = event_map[event.event_id]
-
- if not other.internal_metadata.is_outlier():
- to_remove.add(event)
- continue
- elif not event.internal_metadata.is_outlier():
+ if context.rejected:
+ # If the event is rejected then we don't care if the event
+ # was an outlier or not.
+ if event.event_id in have_persisted:
+ # If we have already seen the event then ignore it.
to_remove.add(event)
- continue
- else:
- to_remove.add(other)
-
- event_map[event.event_id] = event
+ continue
if event.event_id not in have_persisted:
continue
@@ -449,6 +500,12 @@ class EventsStore(SQLBaseStore):
outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted:
+ # We received a copy of an event that we had already stored as
+ # an outlier in the database. We now have some state at that
+ # so we need to update the state_groups table with that state.
+
+ # insert into the state_group, state_groups_state and
+ # event_to_state_groups tables.
self._store_mult_state_groups_txn(txn, ((event, context),))
metadata_json = encode_json(
@@ -464,6 +521,8 @@ class EventsStore(SQLBaseStore):
(metadata_json, event.event_id,)
)
+ # Add an entry to the ex_outlier_stream table to replicate the
+ # change in outlier status to our workers.
stream_order = event.internal_metadata.stream_ordering
state_group_id = context.state_group or context.new_state_group_id
self._simple_insert_txn(
@@ -485,6 +544,8 @@ class EventsStore(SQLBaseStore):
(False, event.event_id,)
)
+ # Update the event_backward_extremities table now that this
+ # event isn't an outlier any more.
self._update_extremeties(txn, [event])
events_and_contexts = [
@@ -492,38 +553,12 @@ class EventsStore(SQLBaseStore):
]
if not events_and_contexts:
+ # Make sure we don't pass an empty list to functions that expect to
+ # be storing at least one element.
return
- self._store_mult_state_groups_txn(txn, events_and_contexts)
-
- self._handle_mult_prev_events(
- txn,
- events=[event for event, _ in events_and_contexts],
- )
-
- for event, _ in events_and_contexts:
- if event.type == EventTypes.Name:
- self._store_room_name_txn(txn, event)
- elif event.type == EventTypes.Topic:
- self._store_room_topic_txn(txn, event)
- elif event.type == EventTypes.Message:
- self._store_room_message_txn(txn, event)
- elif event.type == EventTypes.Redaction:
- self._store_redaction(txn, event)
- elif event.type == EventTypes.RoomHistoryVisibility:
- self._store_history_visibility_txn(txn, event)
- elif event.type == EventTypes.GuestAccess:
- self._store_guest_access_txn(txn, event)
-
- self._store_room_members_txn(
- txn,
- [
- event
- for event, _ in events_and_contexts
- if event.type == EventTypes.Member
- ],
- backfilled=backfilled,
- )
+ # From this point onwards the events are only events that we haven't
+ # seen before.
def event_dict(event):
return {
@@ -535,6 +570,43 @@ class EventsStore(SQLBaseStore):
]
}
+ if delete_existing:
+ # For paranoia reasons, we go and delete all the existing entries
+ # for these events so we can reinsert them.
+ # This gets around any problems with some tables already having
+ # entries.
+
+ logger.info("Deleting existing")
+
+ for table in (
+ "events",
+ "event_auth",
+ "event_json",
+ "event_content_hashes",
+ "event_destinations",
+ "event_edge_hashes",
+ "event_edges",
+ "event_forward_extremities",
+ "event_push_actions",
+ "event_reference_hashes",
+ "event_search",
+ "event_signatures",
+ "event_to_state_groups",
+ "guest_access",
+ "history_visibility",
+ "local_invites",
+ "room_names",
+ "state_events",
+ "rejections",
+ "redactions",
+ "room_memberships",
+ "state_events"
+ ):
+ txn.executemany(
+ "DELETE FROM %s WHERE event_id = ?" % (table,),
+ [(ev.event_id,) for ev, _ in events_and_contexts]
+ )
+
self._simple_insert_many_txn(
txn,
table="event_json",
@@ -567,15 +639,51 @@ class EventsStore(SQLBaseStore):
"content": encode_json(event.content).decode("UTF-8"),
"origin_server_ts": int(event.origin_server_ts),
"received_ts": self._clock.time_msec(),
+ "sender": event.sender,
+ "contains_url": (
+ "url" in event.content
+ and isinstance(event.content["url"], basestring)
+ ),
}
for event, _ in events_and_contexts
],
)
- if context.rejected:
- self._store_rejections_txn(
- txn, event.event_id, context.rejected
- )
+ # Remove the rejected events from the list now that we've added them
+ # to the events table and the events_json table.
+ to_remove = set()
+ for event, context in events_and_contexts:
+ if context.rejected:
+ # Insert the event_id into the rejections table
+ self._store_rejections_txn(
+ txn, event.event_id, context.rejected
+ )
+ to_remove.add(event)
+
+ events_and_contexts = [
+ ec for ec in events_and_contexts if ec[0] not in to_remove
+ ]
+
+ if not events_and_contexts:
+ # Make sure we don't pass an empty list to functions that expect to
+ # be storing at least one element.
+ return
+
+ # From this point onwards the events are only ones that weren't rejected.
+
+ for event, context in events_and_contexts:
+ # Insert all the push actions into the event_push_actions table.
+ if context.push_actions:
+ self._set_push_actions_for_event_and_users_txn(
+ txn, event, context.push_actions
+ )
+
+ if event.type == EventTypes.Redaction and event.redacts is not None:
+ # Remove the entries in the event_push_actions table for the
+ # redacted event.
+ self._remove_push_actions_for_event_id_txn(
+ txn, event.room_id, event.redacts
+ )
self._simple_insert_many_txn(
txn,
@@ -591,6 +699,49 @@ class EventsStore(SQLBaseStore):
],
)
+ # Insert into the state_groups, state_groups_state, and
+ # event_to_state_groups tables.
+ self._store_mult_state_groups_txn(txn, events_and_contexts)
+
+ # Update the event_forward_extremities, event_backward_extremities and
+ # event_edges tables.
+ self._handle_mult_prev_events(
+ txn,
+ events=[event for event, _ in events_and_contexts],
+ )
+
+ for event, _ in events_and_contexts:
+ if event.type == EventTypes.Name:
+ # Insert into the room_names and event_search tables.
+ self._store_room_name_txn(txn, event)
+ elif event.type == EventTypes.Topic:
+ # Insert into the topics table and event_search table.
+ self._store_room_topic_txn(txn, event)
+ elif event.type == EventTypes.Message:
+ # Insert into the event_search table.
+ self._store_room_message_txn(txn, event)
+ elif event.type == EventTypes.Redaction:
+ # Insert into the redactions table.
+ self._store_redaction(txn, event)
+ elif event.type == EventTypes.RoomHistoryVisibility:
+ # Insert into the event_search table.
+ self._store_history_visibility_txn(txn, event)
+ elif event.type == EventTypes.GuestAccess:
+ # Insert into the event_search table.
+ self._store_guest_access_txn(txn, event)
+
+ # Insert into the room_memberships table.
+ self._store_room_members_txn(
+ txn,
+ [
+ event
+ for event, _ in events_and_contexts
+ if event.type == EventTypes.Member
+ ],
+ backfilled=backfilled,
+ )
+
+ # Insert event_reference_hashes table.
self._store_event_reference_hashes_txn(
txn, [event for event, _ in events_and_contexts]
)
@@ -635,6 +786,9 @@ class EventsStore(SQLBaseStore):
],
)
+ # Prefill the event cache
+ self._add_to_cache(txn, events_and_contexts)
+
if backfilled:
# Backfilled events come before the current state so we don't need
# to update the current state table
@@ -645,22 +799,11 @@ class EventsStore(SQLBaseStore):
# Outlier events shouldn't clobber the current state.
continue
- if context.rejected:
- # If the event failed it's auth checks then it shouldn't
- # clobbler the current state.
- continue
-
txn.call_after(
self._get_current_state_for_key.invalidate,
(event.room_id, event.type, event.state_key,)
)
- if event.type in [EventTypes.Name, EventTypes.Aliases]:
- txn.call_after(
- self.get_room_name_and_aliases.invalidate,
- (event.room_id,)
- )
-
self._simple_upsert_txn(
txn,
"current_state_events",
@@ -676,6 +819,45 @@ class EventsStore(SQLBaseStore):
return
+ def _add_to_cache(self, txn, events_and_contexts):
+ to_prefill = []
+
+ rows = []
+ N = 200
+ for i in range(0, len(events_and_contexts), N):
+ ev_map = {
+ e[0].event_id: e[0]
+ for e in events_and_contexts[i:i + N]
+ }
+ if not ev_map:
+ break
+
+ sql = (
+ "SELECT "
+ " e.event_id as event_id, "
+ " r.redacts as redacts,"
+ " rej.event_id as rejects "
+ " FROM events as e"
+ " LEFT JOIN rejections as rej USING (event_id)"
+ " LEFT JOIN redactions as r ON e.event_id = r.redacts"
+ " WHERE e.event_id IN (%s)"
+ ) % (",".join(["?"] * len(ev_map)),)
+
+ txn.execute(sql, ev_map.keys())
+ rows = self.cursor_to_dict(txn)
+ for row in rows:
+ event = ev_map[row["event_id"]]
+ if not row["rejects"] and not row["redacts"]:
+ to_prefill.append(_EventCacheEntry(
+ event=event,
+ redacted_event=None,
+ ))
+
+ def prefill():
+ for cache_entry in to_prefill:
+ self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry)
+ txn.call_after(prefill)
+
def _store_redaction(self, txn, event):
# invalidate the cache for the redacted event
txn.call_after(self._invalidate_get_event_cache, event.redacts)
@@ -741,100 +923,65 @@ class EventsStore(SQLBaseStore):
event_id_list = event_ids
event_ids = set(event_ids)
- event_map = self._get_events_from_cache(
+ event_entry_map = self._get_events_from_cache(
event_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
- missing_events_ids = [e for e in event_ids if e not in event_map]
+ missing_events_ids = [e for e in event_ids if e not in event_entry_map]
if missing_events_ids:
missing_events = yield self._enqueue_events(
missing_events_ids,
check_redacted=check_redacted,
- get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
- event_map.update(missing_events)
-
- defer.returnValue([
- event_map[e_id] for e_id in event_id_list
- if e_id in event_map and event_map[e_id]
- ])
+ event_entry_map.update(missing_events)
- def _get_events_txn(self, txn, event_ids, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- if not event_ids:
- return []
-
- event_map = self._get_events_from_cache(
- event_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
-
- missing_events_ids = [e for e in event_ids if e not in event_map]
+ events = []
+ for event_id in event_id_list:
+ entry = event_entry_map.get(event_id, None)
+ if not entry:
+ continue
- if not missing_events_ids:
- return [
- event_map[e_id] for e_id in event_ids
- if e_id in event_map and event_map[e_id]
- ]
+ if allow_rejected or not entry.event.rejected_reason:
+ if check_redacted and entry.redacted_event:
+ event = entry.redacted_event
+ else:
+ event = entry.event
- missing_events = self._fetch_events_txn(
- txn,
- missing_events_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
+ events.append(event)
- event_map.update(missing_events)
+ if get_prev_content:
+ if "replaces_state" in event.unsigned:
+ prev = yield self.get_event(
+ event.unsigned["replaces_state"],
+ get_prev_content=False,
+ allow_none=True,
+ )
+ if prev:
+ event.unsigned = dict(event.unsigned)
+ event.unsigned["prev_content"] = prev.content
+ event.unsigned["prev_sender"] = prev.sender
- return [
- event_map[e_id] for e_id in event_ids
- if e_id in event_map and event_map[e_id]
- ]
+ defer.returnValue(events)
def _invalidate_get_event_cache(self, event_id):
- for check_redacted in (False, True):
- for get_prev_content in (False, True):
- self._get_event_cache.invalidate(
- (event_id, check_redacted, get_prev_content)
- )
-
- def _get_event_txn(self, txn, event_id, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
+ self._get_event_cache.invalidate((event_id,))
- events = self._get_events_txn(
- txn, [event_id],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
-
- return events[0] if events else None
-
- def _get_events_from_cache(self, events, check_redacted, get_prev_content,
- allow_rejected):
+ def _get_events_from_cache(self, events, allow_rejected):
event_map = {}
for event_id in events:
- try:
- ret = self._get_event_cache.get(
- (event_id, check_redacted, get_prev_content,)
- )
+ ret = self._get_event_cache.get((event_id,), None)
+ if not ret:
+ continue
- if allow_rejected or not ret.rejected_reason:
- event_map[event_id] = ret
- else:
- event_map[event_id] = None
- except KeyError:
- pass
+ if allow_rejected or not ret.event.rejected_reason:
+ event_map[event_id] = ret
+ else:
+ event_map[event_id] = None
return event_map
@@ -905,8 +1052,7 @@ class EventsStore(SQLBaseStore):
reactor.callFromThread(fire, event_list)
@defer.inlineCallbacks
- def _enqueue_events(self, events, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
+ def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
"""Fetches events from the database using the _event_fetch_list. This
allows batch and bulk fetching of events - it allows us to fetch events
without having to create a new transaction for each request for events.
@@ -944,8 +1090,6 @@ class EventsStore(SQLBaseStore):
[
preserve_fn(self._get_event_from_row)(
row["internal_metadata"], row["json"], row["redacts"],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
rejected_reason=row["rejects"],
)
for row in rows
@@ -954,7 +1098,7 @@ class EventsStore(SQLBaseStore):
)
defer.returnValue({
- e.event_id: e
+ e.event.event_id: e
for e in res if e
})
@@ -984,37 +1128,8 @@ class EventsStore(SQLBaseStore):
return rows
- def _fetch_events_txn(self, txn, events, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- if not events:
- return {}
-
- rows = self._fetch_event_rows(
- txn, events,
- )
-
- if not allow_rejected:
- rows[:] = [r for r in rows if not r["rejects"]]
-
- res = [
- self._get_event_from_row_txn(
- txn,
- row["internal_metadata"], row["json"], row["redacts"],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- rejected_reason=row["rejects"],
- )
- for row in rows
- ]
-
- return {
- r.event_id: r
- for r in res
- }
-
@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
- check_redacted=True, get_prev_content=False,
rejected_reason=None):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
@@ -1024,26 +1139,27 @@ class EventsStore(SQLBaseStore):
table="rejections",
keyvalues={"event_id": rejected_reason},
retcol="reason",
- desc="_get_event_from_row",
+ desc="_get_event_from_row_rejected_reason",
)
- ev = FrozenEvent(
+ original_ev = FrozenEvent(
d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
- if check_redacted and redacted:
- ev = prune_event(ev)
+ redacted_event = None
+ if redacted:
+ redacted_event = prune_event(original_ev)
redaction_id = yield self._simple_select_one_onecol(
table="redactions",
- keyvalues={"redacts": ev.event_id},
+ keyvalues={"redacts": redacted_event.event_id},
retcol="event_id",
- desc="_get_event_from_row",
+ desc="_get_event_from_row_redactions",
)
- ev.unsigned["redacted_by"] = redaction_id
+ redacted_event.unsigned["redacted_by"] = redaction_id
# Get the redaction event.
because = yield self.get_event(
@@ -1055,86 +1171,16 @@ class EventsStore(SQLBaseStore):
if because:
# It's fine to do add the event directly, since get_pdu_json
# will serialise this field correctly
- ev.unsigned["redacted_because"] = because
-
- if get_prev_content and "replaces_state" in ev.unsigned:
- prev = yield self.get_event(
- ev.unsigned["replaces_state"],
- get_prev_content=False,
- allow_none=True,
- )
- if prev:
- ev.unsigned["prev_content"] = prev.content
- ev.unsigned["prev_sender"] = prev.sender
-
- self._get_event_cache.prefill(
- (ev.event_id, check_redacted, get_prev_content), ev
- )
-
- defer.returnValue(ev)
+ redacted_event.unsigned["redacted_because"] = because
- def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
- check_redacted=True, get_prev_content=False,
- rejected_reason=None):
- d = json.loads(js)
- internal_metadata = json.loads(internal_metadata)
-
- if rejected_reason:
- rejected_reason = self._simple_select_one_onecol_txn(
- txn,
- table="rejections",
- keyvalues={"event_id": rejected_reason},
- retcol="reason",
- )
-
- ev = FrozenEvent(
- d,
- internal_metadata_dict=internal_metadata,
- rejected_reason=rejected_reason,
+ cache_entry = _EventCacheEntry(
+ event=original_ev,
+ redacted_event=redacted_event,
)
- if check_redacted and redacted:
- ev = prune_event(ev)
-
- redaction_id = self._simple_select_one_onecol_txn(
- txn,
- table="redactions",
- keyvalues={"redacts": ev.event_id},
- retcol="event_id",
- )
+ self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
- ev.unsigned["redacted_by"] = redaction_id
- # Get the redaction event.
-
- because = self._get_event_txn(
- txn,
- redaction_id,
- check_redacted=False
- )
-
- if because:
- ev.unsigned["redacted_because"] = because
-
- if get_prev_content and "replaces_state" in ev.unsigned:
- prev = self._get_event_txn(
- txn,
- ev.unsigned["replaces_state"],
- get_prev_content=False,
- )
- if prev:
- ev.unsigned["prev_content"] = prev.content
- ev.unsigned["prev_sender"] = prev.sender
-
- self._get_event_cache.prefill(
- (ev.event_id, check_redacted, get_prev_content), ev
- )
-
- return ev
-
- def _parse_events_txn(self, txn, rows):
- event_ids = [r["event_id"] for r in rows]
-
- return self._get_events_txn(txn, event_ids)
+ defer.returnValue(cache_entry)
@defer.inlineCallbacks
def count_daily_messages(self):
@@ -1208,6 +1254,78 @@ class EventsStore(SQLBaseStore):
defer.returnValue(ret)
@defer.inlineCallbacks
+ def _background_reindex_fields_sender(self, progress, batch_size):
+ target_min_stream_id = progress["target_min_stream_id_inclusive"]
+ max_stream_id = progress["max_stream_id_exclusive"]
+ rows_inserted = progress.get("rows_inserted", 0)
+
+ INSERT_CLUMP_SIZE = 1000
+
+ def reindex_txn(txn):
+ sql = (
+ "SELECT stream_ordering, event_id, json FROM events"
+ " INNER JOIN event_json USING (event_id)"
+ " WHERE ? <= stream_ordering AND stream_ordering < ?"
+ " ORDER BY stream_ordering DESC"
+ " LIMIT ?"
+ )
+
+ txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+ rows = txn.fetchall()
+ if not rows:
+ return 0
+
+ min_stream_id = rows[-1][0]
+
+ update_rows = []
+ for row in rows:
+ try:
+ event_id = row[1]
+ event_json = json.loads(row[2])
+ sender = event_json["sender"]
+ content = event_json["content"]
+
+ contains_url = "url" in content
+ if contains_url:
+ contains_url &= isinstance(content["url"], basestring)
+ except (KeyError, AttributeError):
+ # If the event is missing a necessary field then
+ # skip over it.
+ continue
+
+ update_rows.append((sender, contains_url, event_id))
+
+ sql = (
+ "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
+ )
+
+ for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
+ clump = update_rows[index:index + INSERT_CLUMP_SIZE]
+ txn.executemany(sql, clump)
+
+ progress = {
+ "target_min_stream_id_inclusive": target_min_stream_id,
+ "max_stream_id_exclusive": min_stream_id,
+ "rows_inserted": rows_inserted + len(rows)
+ }
+
+ self._background_update_progress_txn(
+ txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
+ )
+
+ return len(rows)
+
+ result = yield self.runInteraction(
+ self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
+ )
+
+ if not result:
+ yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
+
+ defer.returnValue(result)
+
+ @defer.inlineCallbacks
def _background_reindex_origin_server_ts(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
@@ -1374,6 +1492,162 @@ class EventsStore(SQLBaseStore):
)
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
+ def delete_old_state(self, room_id, topological_ordering):
+ return self.runInteraction(
+ "delete_old_state",
+ self._delete_old_state_txn, room_id, topological_ordering
+ )
+
+ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
+ """Deletes old room state
+ """
+
+ # Tables that should be pruned:
+ # event_auth
+ # event_backward_extremities
+ # event_content_hashes
+ # event_destinations
+ # event_edge_hashes
+ # event_edges
+ # event_forward_extremities
+ # event_json
+ # event_push_actions
+ # event_reference_hashes
+ # event_search
+ # event_signatures
+ # event_to_state_groups
+ # events
+ # rejections
+ # room_depth
+ # state_groups
+ # state_groups_state
+
+ # First ensure that we're not about to delete all the forward extremeties
+ txn.execute(
+ "SELECT e.event_id, e.depth FROM events as e "
+ "INNER JOIN event_forward_extremities as f "
+ "ON e.event_id = f.event_id "
+ "AND e.room_id = f.room_id "
+ "WHERE f.room_id = ?",
+ (room_id,)
+ )
+ rows = txn.fetchall()
+ max_depth = max(row[0] for row in rows)
+
+ if max_depth <= topological_ordering:
+ # We need to ensure we don't delete all the events from the datanase
+ # otherwise we wouldn't be able to send any events (due to not
+ # having any backwards extremeties)
+ raise SynapseError(
+ 400, "topological_ordering is greater than forward extremeties"
+ )
+
+ txn.execute(
+ "SELECT event_id, state_key FROM events"
+ " LEFT JOIN state_events USING (room_id, event_id)"
+ " WHERE room_id = ? AND topological_ordering < ?",
+ (room_id, topological_ordering,)
+ )
+ event_rows = txn.fetchall()
+
+ # We calculate the new entries for the backward extremeties by finding
+ # all events that point to events that are to be purged
+ txn.execute(
+ "SELECT DISTINCT e.event_id FROM events as e"
+ " INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
+ " INNER JOIN events as e2 ON e2.event_id = ed.event_id"
+ " WHERE e.room_id = ? AND e.topological_ordering < ?"
+ " AND e2.topological_ordering >= ?",
+ (room_id, topological_ordering, topological_ordering)
+ )
+ new_backwards_extrems = txn.fetchall()
+
+ txn.execute(
+ "DELETE FROM event_backward_extremities WHERE room_id = ?",
+ (room_id,)
+ )
+
+ # Update backward extremeties
+ txn.executemany(
+ "INSERT INTO event_backward_extremities (room_id, event_id)"
+ " VALUES (?, ?)",
+ [
+ (room_id, event_id) for event_id, in new_backwards_extrems
+ ]
+ )
+
+ # Get all state groups that are only referenced by events that are
+ # to be deleted.
+ txn.execute(
+ "SELECT state_group FROM event_to_state_groups"
+ " INNER JOIN events USING (event_id)"
+ " WHERE state_group IN ("
+ " SELECT DISTINCT state_group FROM events"
+ " INNER JOIN event_to_state_groups USING (event_id)"
+ " WHERE room_id = ? AND topological_ordering < ?"
+ " )"
+ " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
+ (room_id, topological_ordering, topological_ordering)
+ )
+ state_rows = txn.fetchall()
+ txn.executemany(
+ "DELETE FROM state_groups_state WHERE state_group = ?",
+ state_rows
+ )
+ txn.executemany(
+ "DELETE FROM state_groups WHERE id = ?",
+ state_rows
+ )
+ # Delete all non-state
+ txn.executemany(
+ "DELETE FROM event_to_state_groups WHERE event_id = ?",
+ [(event_id,) for event_id, _ in event_rows]
+ )
+
+ txn.execute(
+ "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
+ (topological_ordering, room_id,)
+ )
+
+ # Delete all remote non-state events
+ to_delete = [
+ (event_id,) for event_id, state_key in event_rows
+ if state_key is None and not self.hs.is_mine_id(event_id)
+ ]
+ for table in (
+ "events",
+ "event_json",
+ "event_auth",
+ "event_content_hashes",
+ "event_destinations",
+ "event_edge_hashes",
+ "event_edges",
+ "event_forward_extremities",
+ "event_push_actions",
+ "event_reference_hashes",
+ "event_search",
+ "event_signatures",
+ "rejections",
+ ):
+ txn.executemany(
+ "DELETE FROM %s WHERE event_id = ?" % (table,),
+ to_delete
+ )
+
+ txn.executemany(
+ "DELETE FROM events WHERE event_id = ?",
+ to_delete
+ )
+ # Mark all state and own events as outliers
+ txn.executemany(
+ "UPDATE events SET outlier = ?"
+ " WHERE event_id = ?",
+ [
+ (True, event_id,) for event_id, state_key in event_rows
+ if state_key is not None or self.hs.is_mine_id(event_id)
+ ]
+ )
+
AllNewEventsResult = namedtuple("AllNewEventsResult", [
"new_forward_events", "new_backfill_events",
|