summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/federation/federation_server.py1
-rw-r--r--synapse/handlers/federation.py38
-rw-r--r--synapse/storage/events.py22
3 files changed, 19 insertions, 42 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 76820b924b..429ab6ddec 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -531,7 +531,6 @@ class FederationServer(FederationBase):
         yield self.handler.on_receive_pdu(
             origin,
             pdu,
-            backfilled=False,
             state=state,
             auth_chain=auth_chain,
         )
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c172877bde..267fedf114 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -102,7 +102,7 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def on_receive_pdu(self, origin, pdu, backfilled, state=None,
+    def on_receive_pdu(self, origin, pdu, state=None,
                        auth_chain=None):
         """ Called by the ReplicationLayer when we have a new pdu. We need to
         do auth checks and put it through the StateHandler.
@@ -185,7 +185,6 @@ class FederationHandler(BaseHandler):
                     origin,
                     event,
                     state=state,
-                    backfilled=backfilled,
                 )
             except AuthError as e:
                 raise FederationError(
@@ -214,18 +213,17 @@ class FederationHandler(BaseHandler):
             except StoreError:
                 logger.exception("Failed to store room.")
 
-        if not backfilled:
-            extra_users = []
-            if event.type == EventTypes.Member:
-                target_user_id = event.state_key
-                target_user = UserID.from_string(target_user_id)
-                extra_users.append(target_user)
+        extra_users = []
+        if event.type == EventTypes.Member:
+            target_user_id = event.state_key
+            target_user = UserID.from_string(target_user_id)
+            extra_users.append(target_user)
 
-            with PreserveLoggingContext():
-                self.notifier.on_new_room_event(
-                    event, event_stream_id, max_stream_id,
-                    extra_users=extra_users
-                )
+        with PreserveLoggingContext():
+            self.notifier.on_new_room_event(
+                event, event_stream_id, max_stream_id,
+                extra_users=extra_users
+            )
 
         if event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
@@ -645,7 +643,7 @@ class FederationHandler(BaseHandler):
                     continue
 
                 try:
-                    self.on_receive_pdu(origin, p, backfilled=False)
+                    self.on_receive_pdu(origin, p)
                 except:
                     logger.exception("Couldn't handle pdu")
 
@@ -777,7 +775,6 @@ class FederationHandler(BaseHandler):
         event_stream_id, max_stream_id = yield self.store.persist_event(
             event,
             context=context,
-            backfilled=False,
         )
 
         target_user = UserID.from_string(event.state_key)
@@ -817,7 +814,6 @@ class FederationHandler(BaseHandler):
         event_stream_id, max_stream_id = yield self.store.persist_event(
             event,
             context=context,
-            backfilled=False,
         )
 
         target_user = UserID.from_string(event.state_key)
@@ -1072,8 +1068,7 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def _handle_new_event(self, origin, event, state=None, backfilled=False,
-                          current_state=None, auth_events=None):
+    def _handle_new_event(self, origin, event, state=None, auth_events=None):
 
         outlier = event.internal_metadata.is_outlier()
 
@@ -1083,7 +1078,7 @@ class FederationHandler(BaseHandler):
             auth_events=auth_events,
         )
 
-        if not backfilled and not event.internal_metadata.is_outlier():
+        if not event.internal_metadata.is_outlier():
             action_generator = ActionGenerator(self.hs)
             yield action_generator.handle_push_actions_for_event(
                 event, context, self
@@ -1092,9 +1087,7 @@ class FederationHandler(BaseHandler):
         event_stream_id, max_stream_id = yield self.store.persist_event(
             event,
             context=context,
-            backfilled=backfilled,
-            is_new_state=(not outlier and not backfilled),
-            current_state=current_state,
+            is_new_state=not outlier,
         )
 
         defer.returnValue((context, event_stream_id, max_stream_id))
@@ -1192,7 +1185,6 @@ class FederationHandler(BaseHandler):
 
         event_stream_id, max_stream_id = yield self.store.persist_event(
             event, new_event_context,
-            backfilled=False,
             is_new_state=True,
             current_state=state,
         )
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 285c586cfe..e444b64cee 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -101,30 +101,16 @@ class EventsStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     @log_function
-    def persist_event(self, event, context, backfilled=False,
+    def persist_event(self, event, context,
                       is_new_state=True, current_state=None):
-        stream_ordering = None
-        if backfilled:
-            self.min_stream_token -= 1
-            stream_ordering = self.min_stream_token
-
-        if stream_ordering is None:
-            stream_ordering_manager = self._stream_id_gen.get_next()
-        else:
-            @contextmanager
-            def stream_ordering_manager():
-                yield stream_ordering
-            stream_ordering_manager = stream_ordering_manager()
-
         try:
-            with stream_ordering_manager as stream_ordering:
+            with self._stream_id_gen.get_next() as stream_ordering:
                 event.internal_metadata.stream_ordering = stream_ordering
                 yield self.runInteraction(
                     "persist_event",
                     self._persist_event_txn,
                     event=event,
                     context=context,
-                    backfilled=backfilled,
                     is_new_state=is_new_state,
                     current_state=current_state,
                 )
@@ -166,7 +152,7 @@ class EventsStore(SQLBaseStore):
         defer.returnValue(events[0] if events else None)
 
     @log_function
-    def _persist_event_txn(self, txn, event, context, backfilled,
+    def _persist_event_txn(self, txn, event, context,
                            is_new_state=True, current_state=None):
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
@@ -198,7 +184,7 @@ class EventsStore(SQLBaseStore):
         return self._persist_events_txn(
             txn,
             [(event, context)],
-            backfilled=backfilled,
+            backfilled=False,
             is_new_state=is_new_state,
         )