diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index fcf8ebf1e7..e165429cad 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -14,7 +14,6 @@
# limitations under the License.
"""A replication client for use by synapse workers.
"""
-import heapq
import logging
from typing import TYPE_CHECKING, Dict, List, Tuple
@@ -30,6 +29,7 @@ from synapse.replication.tcp.streams.events import (
EventsStreamEventRow,
EventsStreamRow,
)
+from synapse.types import PersistedEventPosition, UserID
from synapse.util.async_helpers import timeout_deferred
from synapse.util.metrics import Measure
@@ -99,7 +99,6 @@ class ReplicationDataHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
- self.pusher_pool = hs.get_pusherpool()
self.notifier = hs.get_notifier()
self._reactor = hs.get_reactor()
self._clock = hs.get_clock()
@@ -149,13 +148,15 @@ class ReplicationDataHandler:
if event.rejected_reason:
continue
- extra_users = () # type: Tuple[str, ...]
+ extra_users = () # type: Tuple[UserID, ...]
if event.type == EventTypes.Member:
- extra_users = (event.state_key,)
- max_token = self.store.get_room_max_stream_ordering()
- self.notifier.on_new_room_event(event, token, max_token, extra_users)
+ extra_users = (UserID.from_string(event.state_key),)
- await self.pusher_pool.on_new_notifications(token, token)
+ max_token = self.store.get_room_max_token()
+ event_pos = PersistedEventPosition(instance_name, token)
+ self.notifier.on_new_room_event(
+ event, event_pos, max_token, extra_users
+ )
# Notify any waiting deferreds. The list is ordered by position so we
# just iterate through the list until we reach a position that is
@@ -219,9 +220,8 @@ class ReplicationDataHandler:
waiting_list = self._streams_to_waiters.setdefault(stream_name, [])
- # We insert into the list using heapq as it is more efficient than
- # pushing then resorting each time.
- heapq.heappush(waiting_list, (position, deferred))
+ waiting_list.append((position, deferred))
+ waiting_list.sort(key=lambda t: t[0])
# We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"):
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index f33801f883..8cd47770c1 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -18,11 +18,10 @@ The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
allowed to be sent by which side.
"""
import abc
-import json
import logging
from typing import Tuple, Type
-_json_encoder = json.JSONEncoder()
+from synapse.util import json_decoder, json_encoder
logger = logging.getLogger(__name__)
@@ -124,7 +123,7 @@ class RdataCommand(Command):
stream_name,
instance_name,
None if token == "batch" else int(token),
- json.loads(row_json),
+ json_decoder.decode(row_json),
)
def to_line(self):
@@ -133,7 +132,7 @@ class RdataCommand(Command):
self.stream_name,
self.instance_name,
str(self.token) if self.token is not None else "batch",
- _json_encoder.encode(self.row),
+ json_encoder.encode(self.row),
)
)
@@ -358,7 +357,7 @@ class UserIpCommand(Command):
def from_line(cls, line):
user_id, jsn = line.split(" ", 1)
- access_token, ip, user_agent, device_id, last_seen = json.loads(jsn)
+ access_token, ip, user_agent, device_id, last_seen = json_decoder.decode(jsn)
return cls(user_id, access_token, ip, user_agent, device_id, last_seen)
@@ -366,7 +365,7 @@ class UserIpCommand(Command):
return (
self.user_id
+ " "
- + _json_encoder.encode(
+ + json_encoder.encode(
(
self.access_token,
self.ip,
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 1c303f3a46..b323841f73 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -109,7 +109,7 @@ class ReplicationCommandHandler:
if isinstance(stream, (EventsStream, BackfillStream)):
# Only add EventStream and BackfillStream as a source on the
# instance in charge of event persistence.
- if hs.config.worker.writers.events == hs.get_instance_name():
+ if hs.get_instance_name() in hs.config.worker.writers.events:
self._streams_to_replicate.append(stream)
continue
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 0350923898..0b0d204e64 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -113,7 +113,7 @@ PING_TIMEOUT_MULTIPLIER = 5
PING_TIMEOUT_MS = PING_TIME * PING_TIMEOUT_MULTIPLIER
-class ConnectionStates(object):
+class ConnectionStates:
CONNECTING = "connecting"
ESTABLISHED = "established"
PAUSED = "paused"
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 41569305df..687984e7a8 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -58,7 +58,7 @@ class ReplicationStreamProtocolFactory(Factory):
)
-class ReplicationStreamer(object):
+class ReplicationStreamer:
"""Handles replication connections.
This needs to be poked when new replication data may be available. When new
@@ -93,7 +93,7 @@ class ReplicationStreamer(object):
"""
if not self.command_handler.connected():
# Don't bother if nothing is listening. We still need to advance
- # the stream tokens otherwise they'll fall beihind forever
+ # the stream tokens otherwise they'll fall behind forever
for stream in self.streams:
stream.discard_updates_and_advance()
return
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 7a42de3f7d..54dccd15a6 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -79,7 +79,7 @@ StreamUpdateResult = Tuple[List[Tuple[Token, StreamRow]], Token, bool]
UpdateFunction = Callable[[str, Token, Token, int], Awaitable[StreamUpdateResult]]
-class Stream(object):
+class Stream:
"""Base class for the streams.
Provides a `get_updates()` function that returns new updates since the last
@@ -345,14 +345,14 @@ class PushRulesStream(Stream):
def __init__(self, hs):
self.store = hs.get_datastore()
- super(PushRulesStream, self).__init__(
+ super().__init__(
hs.get_instance_name(),
self._current_token,
self.store.get_all_push_rule_updates,
)
def _current_token(self, instance_name: str) -> int:
- push_rules_token, _ = self.store.get_push_rules_stream_token()
+ push_rules_token = self.store.get_max_push_rules_stream_id()
return push_rules_token
@@ -383,7 +383,7 @@ class CachesStream(Stream):
the cache on the workers
"""
- @attr.s
+ @attr.s(slots=True)
class CachesStreamRow:
"""Stream to inform workers they should invalidate their cache.
@@ -405,7 +405,7 @@ class CachesStream(Stream):
store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
- store.get_cache_stream_token,
+ store.get_cache_stream_token_for_writer,
store.get_all_updated_caches,
)
@@ -441,7 +441,7 @@ class DeviceListsStream(Stream):
told about a device update.
"""
- @attr.s
+ @attr.s(slots=True)
class DeviceListsStreamRow:
entity = attr.ib(type=str)
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 16c63ff4ec..ccc7ca30d8 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -19,7 +19,7 @@ from typing import List, Tuple, Type
import attr
-from ._base import Stream, StreamUpdateResult, Token, current_token_without_instance
+from ._base import Stream, StreamUpdateResult, Token
"""Handling of the 'events' replication stream
@@ -49,14 +49,14 @@ data part are:
@attr.s(slots=True, frozen=True)
-class EventsStreamRow(object):
+class EventsStreamRow:
"""A parsed row from the events replication stream"""
type = attr.ib() # str: the TypeId of one of the *EventsStreamRows
data = attr.ib() # BaseEventsStreamRow
-class BaseEventsStreamRow(object):
+class BaseEventsStreamRow:
"""Base class for rows to be sent in the events stream.
Specifies how to identify, serialize and deserialize the different types.
@@ -117,7 +117,7 @@ class EventsStream(Stream):
self._store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
- current_token_without_instance(self._store.get_current_events_token),
+ self._store._stream_id_gen.get_current_token_for_writer,
self._update_function,
)
|