diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2b3f79577b..6d978ffcd5 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -27,6 +27,9 @@ from synapse.api.constants import EventTypes
from canonicaljson import encode_canonical_json
from collections import deque, namedtuple
+import synapse
+import synapse.metrics
+
import logging
import math
@@ -35,6 +38,10 @@ import ujson as json
logger = logging.getLogger(__name__)
+metrics = synapse.metrics.get_metrics_for(__name__)
+persist_event_counter = metrics.register_counter("persisted_events")
+
+
def encode_json(json_object):
if USE_FROZEN_DICTS:
# ujson doesn't like frozen_dicts
@@ -139,6 +146,9 @@ class _EventPeristenceQueue(object):
pass
+_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
+
+
class EventsStore(SQLBaseStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
@@ -258,6 +268,7 @@ class EventsStore(SQLBaseStore):
events_and_contexts=chunk,
backfilled=backfilled,
)
+ persist_event_counter.inc_by(len(chunk))
@defer.inlineCallbacks
@log_function
@@ -275,6 +286,7 @@ class EventsStore(SQLBaseStore):
current_state=current_state,
backfilled=backfilled,
)
+ persist_event_counter.inc()
except _RollbackButIsFineException:
pass
@@ -632,6 +644,8 @@ class EventsStore(SQLBaseStore):
],
)
+ self._add_to_cache(txn, events_and_contexts)
+
if backfilled:
# Backfilled events come before the current state so we don't need
# to update the current state table
@@ -673,6 +687,45 @@ class EventsStore(SQLBaseStore):
return
+ def _add_to_cache(self, txn, events_and_contexts):
+ to_prefill = []
+
+ rows = []
+ N = 200
+ for i in range(0, len(events_and_contexts), N):
+ ev_map = {
+ e[0].event_id: e[0]
+ for e in events_and_contexts[i:i + N]
+ }
+ if not ev_map:
+ break
+
+ sql = (
+ "SELECT "
+ " e.event_id as event_id, "
+ " r.redacts as redacts,"
+ " rej.event_id as rejects "
+ " FROM events as e"
+ " LEFT JOIN rejections as rej USING (event_id)"
+ " LEFT JOIN redactions as r ON e.event_id = r.redacts"
+ " WHERE e.event_id IN (%s)"
+ ) % (",".join(["?"] * len(ev_map)),)
+
+ txn.execute(sql, ev_map.keys())
+ rows = self.cursor_to_dict(txn)
+ for row in rows:
+ event = ev_map[row["event_id"]]
+ if not row["rejects"] and not row["redacts"]:
+ to_prefill.append(_EventCacheEntry(
+ event=event,
+ redacted_event=None,
+ ))
+
+ def prefill():
+ for cache_entry in to_prefill:
+ self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry)
+ txn.call_after(prefill)
+
def _store_redaction(self, txn, event):
# invalidate the cache for the redacted event
txn.call_after(self._invalidate_get_event_cache, event.redacts)
@@ -738,100 +791,65 @@ class EventsStore(SQLBaseStore):
event_id_list = event_ids
event_ids = set(event_ids)
- event_map = self._get_events_from_cache(
+ event_entry_map = self._get_events_from_cache(
event_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
- missing_events_ids = [e for e in event_ids if e not in event_map]
+ missing_events_ids = [e for e in event_ids if e not in event_entry_map]
if missing_events_ids:
missing_events = yield self._enqueue_events(
missing_events_ids,
check_redacted=check_redacted,
- get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
- event_map.update(missing_events)
+ event_entry_map.update(missing_events)
- defer.returnValue([
- event_map[e_id] for e_id in event_id_list
- if e_id in event_map and event_map[e_id]
- ])
-
- def _get_events_txn(self, txn, event_ids, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- if not event_ids:
- return []
-
- event_map = self._get_events_from_cache(
- event_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
-
- missing_events_ids = [e for e in event_ids if e not in event_map]
+ events = []
+ for event_id in event_id_list:
+ entry = event_entry_map.get(event_id, None)
+ if not entry:
+ continue
- if not missing_events_ids:
- return [
- event_map[e_id] for e_id in event_ids
- if e_id in event_map and event_map[e_id]
- ]
+ if allow_rejected or not entry.event.rejected_reason:
+ if check_redacted and entry.redacted_event:
+ event = entry.redacted_event
+ else:
+ event = entry.event
- missing_events = self._fetch_events_txn(
- txn,
- missing_events_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
+ events.append(event)
- event_map.update(missing_events)
+ if get_prev_content:
+ if "replaces_state" in event.unsigned:
+ prev = yield self.get_event(
+ event.unsigned["replaces_state"],
+ get_prev_content=False,
+ allow_none=True,
+ )
+ if prev:
+ event.unsigned = dict(event.unsigned)
+ event.unsigned["prev_content"] = prev.content
+ event.unsigned["prev_sender"] = prev.sender
- return [
- event_map[e_id] for e_id in event_ids
- if e_id in event_map and event_map[e_id]
- ]
+ defer.returnValue(events)
def _invalidate_get_event_cache(self, event_id):
- for check_redacted in (False, True):
- for get_prev_content in (False, True):
- self._get_event_cache.invalidate(
- (event_id, check_redacted, get_prev_content)
- )
-
- def _get_event_txn(self, txn, event_id, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
-
- events = self._get_events_txn(
- txn, [event_id],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
-
- return events[0] if events else None
+ self._get_event_cache.invalidate((event_id,))
- def _get_events_from_cache(self, events, check_redacted, get_prev_content,
- allow_rejected):
+ def _get_events_from_cache(self, events, allow_rejected):
event_map = {}
for event_id in events:
- try:
- ret = self._get_event_cache.get(
- (event_id, check_redacted, get_prev_content,)
- )
+ ret = self._get_event_cache.get((event_id,), None)
+ if not ret:
+ continue
- if allow_rejected or not ret.rejected_reason:
- event_map[event_id] = ret
- else:
- event_map[event_id] = None
- except KeyError:
- pass
+ if allow_rejected or not ret.event.rejected_reason:
+ event_map[event_id] = ret
+ else:
+ event_map[event_id] = None
return event_map
@@ -902,8 +920,7 @@ class EventsStore(SQLBaseStore):
reactor.callFromThread(fire, event_list)
@defer.inlineCallbacks
- def _enqueue_events(self, events, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
+ def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
"""Fetches events from the database using the _event_fetch_list. This
allows batch and bulk fetching of events - it allows us to fetch events
without having to create a new transaction for each request for events.
@@ -941,8 +958,6 @@ class EventsStore(SQLBaseStore):
[
preserve_fn(self._get_event_from_row)(
row["internal_metadata"], row["json"], row["redacts"],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
rejected_reason=row["rejects"],
)
for row in rows
@@ -951,7 +966,7 @@ class EventsStore(SQLBaseStore):
)
defer.returnValue({
- e.event_id: e
+ e.event.event_id: e
for e in res if e
})
@@ -981,37 +996,8 @@ class EventsStore(SQLBaseStore):
return rows
- def _fetch_events_txn(self, txn, events, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- if not events:
- return {}
-
- rows = self._fetch_event_rows(
- txn, events,
- )
-
- if not allow_rejected:
- rows[:] = [r for r in rows if not r["rejects"]]
-
- res = [
- self._get_event_from_row_txn(
- txn,
- row["internal_metadata"], row["json"], row["redacts"],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- rejected_reason=row["rejects"],
- )
- for row in rows
- ]
-
- return {
- r.event_id: r
- for r in res
- }
-
@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
- check_redacted=True, get_prev_content=False,
rejected_reason=None):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
@@ -1021,26 +1007,27 @@ class EventsStore(SQLBaseStore):
table="rejections",
keyvalues={"event_id": rejected_reason},
retcol="reason",
- desc="_get_event_from_row",
+ desc="_get_event_from_row_rejected_reason",
)
- ev = FrozenEvent(
+ original_ev = FrozenEvent(
d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
- if check_redacted and redacted:
- ev = prune_event(ev)
+ redacted_event = None
+ if redacted:
+ redacted_event = prune_event(original_ev)
redaction_id = yield self._simple_select_one_onecol(
table="redactions",
- keyvalues={"redacts": ev.event_id},
+ keyvalues={"redacts": redacted_event.event_id},
retcol="event_id",
- desc="_get_event_from_row",
+ desc="_get_event_from_row_redactions",
)
- ev.unsigned["redacted_by"] = redaction_id
+ redacted_event.unsigned["redacted_by"] = redaction_id
# Get the redaction event.
because = yield self.get_event(
@@ -1052,86 +1039,16 @@ class EventsStore(SQLBaseStore):
if because:
# It's fine to do add the event directly, since get_pdu_json
# will serialise this field correctly
- ev.unsigned["redacted_because"] = because
-
- if get_prev_content and "replaces_state" in ev.unsigned:
- prev = yield self.get_event(
- ev.unsigned["replaces_state"],
- get_prev_content=False,
- allow_none=True,
- )
- if prev:
- ev.unsigned["prev_content"] = prev.content
- ev.unsigned["prev_sender"] = prev.sender
-
- self._get_event_cache.prefill(
- (ev.event_id, check_redacted, get_prev_content), ev
- )
-
- defer.returnValue(ev)
-
- def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
- check_redacted=True, get_prev_content=False,
- rejected_reason=None):
- d = json.loads(js)
- internal_metadata = json.loads(internal_metadata)
-
- if rejected_reason:
- rejected_reason = self._simple_select_one_onecol_txn(
- txn,
- table="rejections",
- keyvalues={"event_id": rejected_reason},
- retcol="reason",
- )
-
- ev = FrozenEvent(
- d,
- internal_metadata_dict=internal_metadata,
- rejected_reason=rejected_reason,
- )
-
- if check_redacted and redacted:
- ev = prune_event(ev)
-
- redaction_id = self._simple_select_one_onecol_txn(
- txn,
- table="redactions",
- keyvalues={"redacts": ev.event_id},
- retcol="event_id",
- )
-
- ev.unsigned["redacted_by"] = redaction_id
- # Get the redaction event.
-
- because = self._get_event_txn(
- txn,
- redaction_id,
- check_redacted=False
- )
-
- if because:
- ev.unsigned["redacted_because"] = because
+ redacted_event.unsigned["redacted_because"] = because
- if get_prev_content and "replaces_state" in ev.unsigned:
- prev = self._get_event_txn(
- txn,
- ev.unsigned["replaces_state"],
- get_prev_content=False,
- )
- if prev:
- ev.unsigned["prev_content"] = prev.content
- ev.unsigned["prev_sender"] = prev.sender
-
- self._get_event_cache.prefill(
- (ev.event_id, check_redacted, get_prev_content), ev
+ cache_entry = _EventCacheEntry(
+ event=original_ev,
+ redacted_event=redacted_event,
)
- return ev
-
- def _parse_events_txn(self, txn, rows):
- event_ids = [r["event_id"] for r in rows]
+ self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
- return self._get_events_txn(txn, event_ids)
+ defer.returnValue(cache_entry)
@defer.inlineCallbacks
def count_daily_messages(self):
|