summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py165
1 files changed, 111 insertions, 54 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ba368a3eca..285c586cfe 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -12,14 +12,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from _base import SQLBaseStore, _RollbackButIsFineException
+from ._base import SQLBaseStore, _RollbackButIsFineException
 
 from twisted.internet import defer, reactor
 
 from synapse.events import FrozenEvent, USE_FROZEN_DICTS
 from synapse.events.utils import prune_event
 
-from synapse.util.logcontext import preserve_context_over_deferred
+from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
 
@@ -66,19 +66,17 @@ class EventsStore(SQLBaseStore):
             return
 
         if backfilled:
-            if not self.min_token_deferred.called:
-                yield self.min_token_deferred
-            start = self.min_token - 1
-            self.min_token -= len(events_and_contexts) + 1
-            stream_orderings = range(start, self.min_token, -1)
+            start = self.min_stream_token - 1
+            self.min_stream_token -= len(events_and_contexts) + 1
+            stream_orderings = range(start, self.min_stream_token, -1)
 
             @contextmanager
             def stream_ordering_manager():
                 yield stream_orderings
             stream_ordering_manager = stream_ordering_manager()
         else:
-            stream_ordering_manager = yield self._stream_id_gen.get_next_mult(
-                self, len(events_and_contexts)
+            stream_ordering_manager = self._stream_id_gen.get_next_mult(
+                len(events_and_contexts)
             )
 
         with stream_ordering_manager as stream_orderings:
@@ -86,7 +84,7 @@ class EventsStore(SQLBaseStore):
                 event.internal_metadata.stream_ordering = stream
 
             chunks = [
-                events_and_contexts[x:x+100]
+                events_and_contexts[x:x + 100]
                 for x in xrange(0, len(events_and_contexts), 100)
             ]
 
@@ -107,13 +105,11 @@ class EventsStore(SQLBaseStore):
                       is_new_state=True, current_state=None):
         stream_ordering = None
         if backfilled:
-            if not self.min_token_deferred.called:
-                yield self.min_token_deferred
-            self.min_token -= 1
-            stream_ordering = self.min_token
+            self.min_stream_token -= 1
+            stream_ordering = self.min_stream_token
 
         if stream_ordering is None:
-            stream_ordering_manager = yield self._stream_id_gen.get_next(self)
+            stream_ordering_manager = self._stream_id_gen.get_next()
         else:
             @contextmanager
             def stream_ordering_manager():
@@ -135,7 +131,7 @@ class EventsStore(SQLBaseStore):
         except _RollbackButIsFineException:
             pass
 
-        max_persisted_id = yield self._stream_id_gen.get_max_token(self)
+        max_persisted_id = yield self._stream_id_gen.get_max_token()
         defer.returnValue((stream_ordering, max_persisted_id))
 
     @defer.inlineCallbacks
@@ -209,17 +205,29 @@ class EventsStore(SQLBaseStore):
     @log_function
     def _persist_events_txn(self, txn, events_and_contexts, backfilled,
                             is_new_state=True):
-
-        # Remove the any existing cache entries for the event_ids
-        for event, _ in events_and_contexts:
+        depth_updates = {}
+        for event, context in events_and_contexts:
+            # Remove the any existing cache entries for the event_ids
             txn.call_after(self._invalidate_get_event_cache, event.event_id)
+            if not backfilled:
+                txn.call_after(
+                    self._events_stream_cache.entity_has_changed,
+                    event.room_id, event.internal_metadata.stream_ordering,
+                )
 
-        depth_updates = {}
-        for event, _ in events_and_contexts:
-            if event.internal_metadata.is_outlier():
-                continue
-            depth_updates[event.room_id] = max(
-                event.depth, depth_updates.get(event.room_id, event.depth)
+            if not event.internal_metadata.is_outlier():
+                depth_updates[event.room_id] = max(
+                    event.depth, depth_updates.get(event.room_id, event.depth)
+                )
+
+            if context.push_actions:
+                self._set_push_actions_for_event_and_users_txn(
+                    txn, event, context.push_actions
+                )
+
+        if event.type == EventTypes.Redaction and event.redacts is not None:
+            self._remove_push_actions_for_event_id_txn(
+                txn, event.room_id, event.redacts
             )
 
         for room_id, depth in depth_updates.items():
@@ -518,6 +526,9 @@ class EventsStore(SQLBaseStore):
         if not event_ids:
             defer.returnValue([])
 
+        event_id_list = event_ids
+        event_ids = set(event_ids)
+
         event_map = self._get_events_from_cache(
             event_ids,
             check_redacted=check_redacted,
@@ -527,23 +538,18 @@ class EventsStore(SQLBaseStore):
 
         missing_events_ids = [e for e in event_ids if e not in event_map]
 
-        if not missing_events_ids:
-            defer.returnValue([
-                event_map[e_id] for e_id in event_ids
-                if e_id in event_map and event_map[e_id]
-            ])
-
-        missing_events = yield self._enqueue_events(
-            missing_events_ids,
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
-            allow_rejected=allow_rejected,
-        )
+        if missing_events_ids:
+            missing_events = yield self._enqueue_events(
+                missing_events_ids,
+                check_redacted=check_redacted,
+                get_prev_content=get_prev_content,
+                allow_rejected=allow_rejected,
+            )
 
-        event_map.update(missing_events)
+            event_map.update(missing_events)
 
         defer.returnValue([
-            event_map[e_id] for e_id in event_ids
+            event_map[e_id] for e_id in event_id_list
             if e_id in event_map and event_map[e_id]
         ])
 
@@ -662,14 +668,16 @@ class EventsStore(SQLBaseStore):
                     for ids, d in lst:
                         if not d.called:
                             try:
-                                d.callback([
-                                    res[i]
-                                    for i in ids
-                                    if i in res
-                                ])
+                                with PreserveLoggingContext():
+                                    d.callback([
+                                        res[i]
+                                        for i in ids
+                                        if i in res
+                                    ])
                             except:
                                 logger.exception("Failed to callback")
-                reactor.callFromThread(fire, event_list, row_dict)
+                with PreserveLoggingContext():
+                    reactor.callFromThread(fire, event_list, row_dict)
             except Exception as e:
                 logger.exception("do_fetch")
 
@@ -677,10 +685,12 @@ class EventsStore(SQLBaseStore):
                 def fire(evs):
                     for _, d in evs:
                         if not d.called:
-                            d.errback(e)
+                            with PreserveLoggingContext():
+                                d.errback(e)
 
                 if event_list:
-                    reactor.callFromThread(fire, event_list)
+                    with PreserveLoggingContext():
+                        reactor.callFromThread(fire, event_list)
 
     @defer.inlineCallbacks
     def _enqueue_events(self, events, check_redacted=True,
@@ -707,18 +717,20 @@ class EventsStore(SQLBaseStore):
                 should_start = False
 
         if should_start:
-            self.runWithConnection(
-                self._do_fetch
-            )
+            with PreserveLoggingContext():
+                self.runWithConnection(
+                    self._do_fetch
+                )
 
-        rows = yield preserve_context_over_deferred(events_d)
+        with PreserveLoggingContext():
+            rows = yield events_d
 
         if not allow_rejected:
             rows[:] = [r for r in rows if not r["rejects"]]
 
         res = yield defer.gatherResults(
             [
-                self._get_event_from_row(
+                preserve_fn(self._get_event_from_row)(
                     row["internal_metadata"], row["json"], row["redacts"],
                     check_redacted=check_redacted,
                     get_prev_content=get_prev_content,
@@ -738,7 +750,7 @@ class EventsStore(SQLBaseStore):
         rows = []
         N = 200
         for i in range(1 + len(events) / N):
-            evs = events[i*N:(i + 1)*N]
+            evs = events[i * N:(i + 1) * N]
             if not evs:
                 break
 
@@ -753,7 +765,7 @@ class EventsStore(SQLBaseStore):
                 " LEFT JOIN rejections as rej USING (event_id)"
                 " LEFT JOIN redactions as r ON e.event_id = r.redacts"
                 " WHERE e.event_id IN (%s)"
-            ) % (",".join(["?"]*len(evs)),)
+            ) % (",".join(["?"] * len(evs)),)
 
             txn.execute(sql, evs)
             rows.extend(self.cursor_to_dict(txn))
@@ -1050,3 +1062,48 @@ class EventsStore(SQLBaseStore):
             yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
 
         defer.returnValue(result)
+
+    def get_current_backfill_token(self):
+        """The current minimum token that backfilled events have reached"""
+
+        # TODO: Fix race with the persit_event txn by using one of the
+        # stream id managers
+        return -self.min_stream_token
+
+    def get_all_new_events(self, last_backfill_id, last_forward_id,
+                           current_backfill_id, current_forward_id, limit):
+        """Get all the new events that have arrived at the server either as
+        new events or as backfilled events"""
+        def get_all_new_events_txn(txn):
+            sql = (
+                "SELECT e.stream_ordering, ej.internal_metadata, ej.json"
+                " FROM events as e"
+                " JOIN event_json as ej"
+                " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
+                " WHERE ? < e.stream_ordering AND e.stream_ordering <= ?"
+                " ORDER BY e.stream_ordering ASC"
+                " LIMIT ?"
+            )
+            if last_forward_id != current_forward_id:
+                txn.execute(sql, (last_forward_id, current_forward_id, limit))
+                new_forward_events = txn.fetchall()
+            else:
+                new_forward_events = []
+
+            sql = (
+                "SELECT -e.stream_ordering, ej.internal_metadata, ej.json"
+                " FROM events as e"
+                " JOIN event_json as ej"
+                " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
+                " WHERE ? > e.stream_ordering AND e.stream_ordering >= ?"
+                " ORDER BY e.stream_ordering DESC"
+                " LIMIT ?"
+            )
+            if last_backfill_id != current_backfill_id:
+                txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit))
+                new_backfill_events = txn.fetchall()
+            else:
+                new_backfill_events = []
+
+            return (new_forward_events, new_backfill_events)
+        return self.runInteraction("get_all_new_events", get_all_new_events_txn)