diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 76820b924b..429ab6ddec 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -531,7 +531,6 @@ class FederationServer(FederationBase):
yield self.handler.on_receive_pdu(
origin,
pdu,
- backfilled=False,
state=state,
auth_chain=auth_chain,
)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c172877bde..267fedf114 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -102,7 +102,7 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
- def on_receive_pdu(self, origin, pdu, backfilled, state=None,
+ def on_receive_pdu(self, origin, pdu, state=None,
auth_chain=None):
""" Called by the ReplicationLayer when we have a new pdu. We need to
do auth checks and put it through the StateHandler.
@@ -185,7 +185,6 @@ class FederationHandler(BaseHandler):
origin,
event,
state=state,
- backfilled=backfilled,
)
except AuthError as e:
raise FederationError(
@@ -214,18 +213,17 @@ class FederationHandler(BaseHandler):
except StoreError:
logger.exception("Failed to store room.")
- if not backfilled:
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
+ extra_users = []
+ if event.type == EventTypes.Member:
+ target_user_id = event.state_key
+ target_user = UserID.from_string(target_user_id)
+ extra_users.append(target_user)
- with PreserveLoggingContext():
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=extra_users
- )
+ with PreserveLoggingContext():
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
+ )
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
@@ -645,7 +643,7 @@ class FederationHandler(BaseHandler):
continue
try:
- self.on_receive_pdu(origin, p, backfilled=False)
+ self.on_receive_pdu(origin, p)
except:
logger.exception("Couldn't handle pdu")
@@ -777,7 +775,6 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
- backfilled=False,
)
target_user = UserID.from_string(event.state_key)
@@ -817,7 +814,6 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
- backfilled=False,
)
target_user = UserID.from_string(event.state_key)
@@ -1072,8 +1068,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
- def _handle_new_event(self, origin, event, state=None, backfilled=False,
- current_state=None, auth_events=None):
+ def _handle_new_event(self, origin, event, state=None, auth_events=None):
outlier = event.internal_metadata.is_outlier()
@@ -1083,7 +1078,7 @@ class FederationHandler(BaseHandler):
auth_events=auth_events,
)
- if not backfilled and not event.internal_metadata.is_outlier():
+ if not event.internal_metadata.is_outlier():
action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event(
event, context, self
@@ -1092,9 +1087,7 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
- backfilled=backfilled,
- is_new_state=(not outlier and not backfilled),
- current_state=current_state,
+ is_new_state=not outlier,
)
defer.returnValue((context, event_stream_id, max_stream_id))
@@ -1192,7 +1185,6 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event(
event, new_event_context,
- backfilled=False,
is_new_state=True,
current_state=state,
)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 285c586cfe..e444b64cee 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -101,30 +101,16 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
@log_function
- def persist_event(self, event, context, backfilled=False,
+ def persist_event(self, event, context,
is_new_state=True, current_state=None):
- stream_ordering = None
- if backfilled:
- self.min_stream_token -= 1
- stream_ordering = self.min_stream_token
-
- if stream_ordering is None:
- stream_ordering_manager = self._stream_id_gen.get_next()
- else:
- @contextmanager
- def stream_ordering_manager():
- yield stream_ordering
- stream_ordering_manager = stream_ordering_manager()
-
try:
- with stream_ordering_manager as stream_ordering:
+ with self._stream_id_gen.get_next() as stream_ordering:
event.internal_metadata.stream_ordering = stream_ordering
yield self.runInteraction(
"persist_event",
self._persist_event_txn,
event=event,
context=context,
- backfilled=backfilled,
is_new_state=is_new_state,
current_state=current_state,
)
@@ -166,7 +152,7 @@ class EventsStore(SQLBaseStore):
defer.returnValue(events[0] if events else None)
@log_function
- def _persist_event_txn(self, txn, event, context, backfilled,
+ def _persist_event_txn(self, txn, event, context,
is_new_state=True, current_state=None):
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
@@ -198,7 +184,7 @@ class EventsStore(SQLBaseStore):
return self._persist_events_txn(
txn,
[(event, context)],
- backfilled=backfilled,
+ backfilled=False,
is_new_state=is_new_state,
)
|