diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 793cef6c26..9caf1e80c1 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -16,12 +16,10 @@
import abc
import logging
import re
+import urllib
from inspect import signature
from typing import Dict, List, Tuple
-from six import raise_from
-from six.moves import urllib
-
from twisted.internet import defer
from synapse.api.errors import (
@@ -220,7 +218,7 @@ class ReplicationEndpoint(object):
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
except RequestSendFailed as e:
- raise_from(SynapseError(502, "Failed to talk to master"), e)
+ raise SynapseError(502, "Failed to talk to master") from e
return result
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index c04f622816..ea5937a20c 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -149,7 +149,7 @@ class RdataCommand(Command):
class PositionCommand(Command):
- """Sent by the server to tell the client the stream postition without
+ """Sent by the server to tell the client the stream position without
needing to send an RDATA.
Format::
@@ -188,7 +188,7 @@ class ErrorCommand(_SimpleCommand):
class PingCommand(_SimpleCommand):
- """Sent by either side as a keep alive. The data is arbitary (often timestamp)
+ """Sent by either side as a keep alive. The data is arbitrary (often timestamp)
"""
NAME = "PING"
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index cbcf46f3ae..e6a2e2598b 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -112,8 +112,8 @@ class ReplicationCommandHandler:
"replication_position", clock=self._clock
)
- # Map of stream to batched updates. See RdataCommand for info on how
- # batching works.
+ # Map of stream name to batched updates. See RdataCommand for info on
+ # how batching works.
self._pending_batches = {} # type: Dict[str, List[Any]]
# The factory used to create connections.
@@ -123,7 +123,8 @@ class ReplicationCommandHandler:
# outgoing replication commands to.)
self._connections = [] # type: List[AbstractConnection]
- # For each connection, the incoming streams that are coming from that connection
+ # For each connection, the incoming stream names that are coming from
+ # that connection.
self._streams_by_connection = {} # type: Dict[AbstractConnection, Set[str]]
LaterGauge(
@@ -310,7 +311,28 @@ class ReplicationCommandHandler:
# Check if this is the last of a batch of updates
rows = self._pending_batches.pop(stream_name, [])
rows.append(row)
- await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)
+
+ stream = self._streams.get(stream_name)
+ if not stream:
+ logger.error("Got RDATA for unknown stream: %s", stream_name)
+ return
+
+ # Find where we previously streamed up to.
+ current_token = stream.current_token(cmd.instance_name)
+
+ # Discard this data if this token is earlier than the current
+ # position. Note that streams can be reset (in which case you
+ # expect an earlier token), but that must be preceded by a
+ # POSITION command.
+ if cmd.token <= current_token:
+ logger.debug(
+ "Discarding RDATA from stream %s at position %s before previous position %s",
+ stream_name,
+ cmd.token,
+ current_token,
+ )
+ else:
+ await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)
async def on_rdata(
self, stream_name: str, instance_name: str, token: int, rows: list
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 4acefc8a96..f196eff072 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -264,7 +264,7 @@ class BackfillStream(Stream):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_current_backfill_token),
- db_query_to_update_function(store.get_all_new_backfill_event_rows),
+ store.get_all_new_backfill_event_rows,
)
@@ -291,9 +291,7 @@ class PresenceStream(Stream):
if hs.config.worker_app is None:
# on the master, query the presence handler
presence_handler = hs.get_presence_handler()
- update_function = db_query_to_update_function(
- presence_handler.get_all_presence_updates
- )
+ update_function = presence_handler.get_all_presence_updates
else:
# Query master process
update_function = make_http_update_function(hs, self.NAME)
@@ -318,9 +316,7 @@ class TypingStream(Stream):
if hs.config.worker_app is None:
# on the master, query the typing handler
- update_function = db_query_to_update_function(
- typing_handler.get_all_typing_updates
- )
+ update_function = typing_handler.get_all_typing_updates
else:
# Query master process
update_function = make_http_update_function(hs, self.NAME)
@@ -352,7 +348,7 @@ class ReceiptsStream(Stream):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_max_receipt_stream_id),
- db_query_to_update_function(store.get_all_updated_receipts),
+ store.get_all_updated_receipts,
)
@@ -367,26 +363,17 @@ class PushRulesStream(Stream):
def __init__(self, hs):
self.store = hs.get_datastore()
+
super(PushRulesStream, self).__init__(
- hs.get_instance_name(), self._current_token, self._update_function
+ hs.get_instance_name(),
+ self._current_token,
+ self.store.get_all_push_rule_updates,
)
def _current_token(self, instance_name: str) -> int:
push_rules_token, _ = self.store.get_push_rules_stream_token()
return push_rules_token
- async def _update_function(
- self, instance_name: str, from_token: Token, to_token: Token, limit: int
- ):
- rows = await self.store.get_all_push_rule_updates(from_token, to_token, limit)
-
- limited = False
- if len(rows) == limit:
- to_token = rows[-1][0]
- limited = True
-
- return [(row[0], (row[2],)) for row in rows], to_token, limited
-
class PushersStream(Stream):
"""A user has added/changed/removed a pusher
|