diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ba368a3eca..285c586cfe 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -12,14 +12,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from _base import SQLBaseStore, _RollbackButIsFineException
+from ._base import SQLBaseStore, _RollbackButIsFineException
from twisted.internet import defer, reactor
from synapse.events import FrozenEvent, USE_FROZEN_DICTS
from synapse.events.utils import prune_event
-from synapse.util.logcontext import preserve_context_over_deferred
+from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
@@ -66,19 +66,17 @@ class EventsStore(SQLBaseStore):
return
if backfilled:
- if not self.min_token_deferred.called:
- yield self.min_token_deferred
- start = self.min_token - 1
- self.min_token -= len(events_and_contexts) + 1
- stream_orderings = range(start, self.min_token, -1)
+ start = self.min_stream_token - 1
+ self.min_stream_token -= len(events_and_contexts) + 1
+ stream_orderings = range(start, self.min_stream_token, -1)
@contextmanager
def stream_ordering_manager():
yield stream_orderings
stream_ordering_manager = stream_ordering_manager()
else:
- stream_ordering_manager = yield self._stream_id_gen.get_next_mult(
- self, len(events_and_contexts)
+ stream_ordering_manager = self._stream_id_gen.get_next_mult(
+ len(events_and_contexts)
)
with stream_ordering_manager as stream_orderings:
@@ -86,7 +84,7 @@ class EventsStore(SQLBaseStore):
event.internal_metadata.stream_ordering = stream
chunks = [
- events_and_contexts[x:x+100]
+ events_and_contexts[x:x + 100]
for x in xrange(0, len(events_and_contexts), 100)
]
@@ -107,13 +105,11 @@ class EventsStore(SQLBaseStore):
is_new_state=True, current_state=None):
stream_ordering = None
if backfilled:
- if not self.min_token_deferred.called:
- yield self.min_token_deferred
- self.min_token -= 1
- stream_ordering = self.min_token
+ self.min_stream_token -= 1
+ stream_ordering = self.min_stream_token
if stream_ordering is None:
- stream_ordering_manager = yield self._stream_id_gen.get_next(self)
+ stream_ordering_manager = self._stream_id_gen.get_next()
else:
@contextmanager
def stream_ordering_manager():
@@ -135,7 +131,7 @@ class EventsStore(SQLBaseStore):
except _RollbackButIsFineException:
pass
- max_persisted_id = yield self._stream_id_gen.get_max_token(self)
+ max_persisted_id = yield self._stream_id_gen.get_max_token()
defer.returnValue((stream_ordering, max_persisted_id))
@defer.inlineCallbacks
@@ -209,17 +205,29 @@ class EventsStore(SQLBaseStore):
@log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
is_new_state=True):
-
- # Remove the any existing cache entries for the event_ids
- for event, _ in events_and_contexts:
+ depth_updates = {}
+ for event, context in events_and_contexts:
+ # Remove the any existing cache entries for the event_ids
txn.call_after(self._invalidate_get_event_cache, event.event_id)
+ if not backfilled:
+ txn.call_after(
+ self._events_stream_cache.entity_has_changed,
+ event.room_id, event.internal_metadata.stream_ordering,
+ )
- depth_updates = {}
- for event, _ in events_and_contexts:
- if event.internal_metadata.is_outlier():
- continue
- depth_updates[event.room_id] = max(
- event.depth, depth_updates.get(event.room_id, event.depth)
+ if not event.internal_metadata.is_outlier():
+ depth_updates[event.room_id] = max(
+ event.depth, depth_updates.get(event.room_id, event.depth)
+ )
+
+ if context.push_actions:
+ self._set_push_actions_for_event_and_users_txn(
+ txn, event, context.push_actions
+ )
+
+ if event.type == EventTypes.Redaction and event.redacts is not None:
+ self._remove_push_actions_for_event_id_txn(
+ txn, event.room_id, event.redacts
)
for room_id, depth in depth_updates.items():
@@ -518,6 +526,9 @@ class EventsStore(SQLBaseStore):
if not event_ids:
defer.returnValue([])
+ event_id_list = event_ids
+ event_ids = set(event_ids)
+
event_map = self._get_events_from_cache(
event_ids,
check_redacted=check_redacted,
@@ -527,23 +538,18 @@ class EventsStore(SQLBaseStore):
missing_events_ids = [e for e in event_ids if e not in event_map]
- if not missing_events_ids:
- defer.returnValue([
- event_map[e_id] for e_id in event_ids
- if e_id in event_map and event_map[e_id]
- ])
-
- missing_events = yield self._enqueue_events(
- missing_events_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
+ if missing_events_ids:
+ missing_events = yield self._enqueue_events(
+ missing_events_ids,
+ check_redacted=check_redacted,
+ get_prev_content=get_prev_content,
+ allow_rejected=allow_rejected,
+ )
- event_map.update(missing_events)
+ event_map.update(missing_events)
defer.returnValue([
- event_map[e_id] for e_id in event_ids
+ event_map[e_id] for e_id in event_id_list
if e_id in event_map and event_map[e_id]
])
@@ -662,14 +668,16 @@ class EventsStore(SQLBaseStore):
for ids, d in lst:
if not d.called:
try:
- d.callback([
- res[i]
- for i in ids
- if i in res
- ])
+ with PreserveLoggingContext():
+ d.callback([
+ res[i]
+ for i in ids
+ if i in res
+ ])
except:
logger.exception("Failed to callback")
- reactor.callFromThread(fire, event_list, row_dict)
+ with PreserveLoggingContext():
+ reactor.callFromThread(fire, event_list, row_dict)
except Exception as e:
logger.exception("do_fetch")
@@ -677,10 +685,12 @@ class EventsStore(SQLBaseStore):
def fire(evs):
for _, d in evs:
if not d.called:
- d.errback(e)
+ with PreserveLoggingContext():
+ d.errback(e)
if event_list:
- reactor.callFromThread(fire, event_list)
+ with PreserveLoggingContext():
+ reactor.callFromThread(fire, event_list)
@defer.inlineCallbacks
def _enqueue_events(self, events, check_redacted=True,
@@ -707,18 +717,20 @@ class EventsStore(SQLBaseStore):
should_start = False
if should_start:
- self.runWithConnection(
- self._do_fetch
- )
+ with PreserveLoggingContext():
+ self.runWithConnection(
+ self._do_fetch
+ )
- rows = yield preserve_context_over_deferred(events_d)
+ with PreserveLoggingContext():
+ rows = yield events_d
if not allow_rejected:
rows[:] = [r for r in rows if not r["rejects"]]
res = yield defer.gatherResults(
[
- self._get_event_from_row(
+ preserve_fn(self._get_event_from_row)(
row["internal_metadata"], row["json"], row["redacts"],
check_redacted=check_redacted,
get_prev_content=get_prev_content,
@@ -738,7 +750,7 @@ class EventsStore(SQLBaseStore):
rows = []
N = 200
for i in range(1 + len(events) / N):
- evs = events[i*N:(i + 1)*N]
+ evs = events[i * N:(i + 1) * N]
if not evs:
break
@@ -753,7 +765,7 @@ class EventsStore(SQLBaseStore):
" LEFT JOIN rejections as rej USING (event_id)"
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
" WHERE e.event_id IN (%s)"
- ) % (",".join(["?"]*len(evs)),)
+ ) % (",".join(["?"] * len(evs)),)
txn.execute(sql, evs)
rows.extend(self.cursor_to_dict(txn))
@@ -1050,3 +1062,48 @@ class EventsStore(SQLBaseStore):
yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
defer.returnValue(result)
+
+ def get_current_backfill_token(self):
+ """The current minimum token that backfilled events have reached"""
+
+ # TODO: Fix race with the persit_event txn by using one of the
+ # stream id managers
+ return -self.min_stream_token
+
+ def get_all_new_events(self, last_backfill_id, last_forward_id,
+ current_backfill_id, current_forward_id, limit):
+ """Get all the new events that have arrived at the server either as
+ new events or as backfilled events"""
+ def get_all_new_events_txn(txn):
+ sql = (
+ "SELECT e.stream_ordering, ej.internal_metadata, ej.json"
+ " FROM events as e"
+ " JOIN event_json as ej"
+ " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
+ " WHERE ? < e.stream_ordering AND e.stream_ordering <= ?"
+ " ORDER BY e.stream_ordering ASC"
+ " LIMIT ?"
+ )
+ if last_forward_id != current_forward_id:
+ txn.execute(sql, (last_forward_id, current_forward_id, limit))
+ new_forward_events = txn.fetchall()
+ else:
+ new_forward_events = []
+
+ sql = (
+ "SELECT -e.stream_ordering, ej.internal_metadata, ej.json"
+ " FROM events as e"
+ " JOIN event_json as ej"
+ " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
+ " WHERE ? > e.stream_ordering AND e.stream_ordering >= ?"
+ " ORDER BY e.stream_ordering DESC"
+ " LIMIT ?"
+ )
+ if last_backfill_id != current_backfill_id:
+ txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit))
+ new_backfill_events = txn.fetchall()
+ else:
+ new_backfill_events = []
+
+ return (new_forward_events, new_backfill_events)
+ return self.runInteraction("get_all_new_events", get_all_new_events_txn)
|