summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/client.py8
-rw-r--r--synapse/replication/tcp/handler.py2
-rw-r--r--synapse/replication/tcp/resource.py2
-rw-r--r--synapse/replication/tcp/streams/_base.py6
-rw-r--r--synapse/replication/tcp/streams/events.py4
5 files changed, 10 insertions, 12 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index d6ecf5b327..e82b9e386f 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -29,6 +29,7 @@ from synapse.replication.tcp.streams.events import (
     EventsStreamEventRow,
     EventsStreamRow,
 )
+from synapse.types import UserID
 from synapse.util.async_helpers import timeout_deferred
 from synapse.util.metrics import Measure
 
@@ -98,7 +99,6 @@ class ReplicationDataHandler:
 
     def __init__(self, hs: "HomeServer"):
         self.store = hs.get_datastore()
-        self.pusher_pool = hs.get_pusherpool()
         self.notifier = hs.get_notifier()
         self._reactor = hs.get_reactor()
         self._clock = hs.get_clock()
@@ -148,14 +148,12 @@ class ReplicationDataHandler:
                 if event.rejected_reason:
                     continue
 
-                extra_users = ()  # type: Tuple[str, ...]
+                extra_users = ()  # type: Tuple[UserID, ...]
                 if event.type == EventTypes.Member:
-                    extra_users = (event.state_key,)
+                    extra_users = (UserID.from_string(event.state_key),)
                 max_token = self.store.get_room_max_stream_ordering()
                 self.notifier.on_new_room_event(event, token, max_token, extra_users)
 
-            await self.pusher_pool.on_new_notifications(token, token)
-
         # Notify any waiting deferreds. The list is ordered by position so we
         # just iterate through the list until we reach a position that is
         # greater than the received row position.
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 1c303f3a46..b323841f73 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -109,7 +109,7 @@ class ReplicationCommandHandler:
             if isinstance(stream, (EventsStream, BackfillStream)):
                 # Only add EventStream and BackfillStream as a source on the
                 # instance in charge of event persistence.
-                if hs.config.worker.writers.events == hs.get_instance_name():
+                if hs.get_instance_name() in hs.config.worker.writers.events:
                     self._streams_to_replicate.append(stream)
 
                 continue
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 04d894fb3d..687984e7a8 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -93,7 +93,7 @@ class ReplicationStreamer:
         """
         if not self.command_handler.connected():
             # Don't bother if nothing is listening. We still need to advance
-            # the stream tokens otherwise they'll fall beihind forever
+            # the stream tokens otherwise they'll fall behind forever
             for stream in self.streams:
                 stream.discard_updates_and_advance()
             return
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 682d47f402..54dccd15a6 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -345,7 +345,7 @@ class PushRulesStream(Stream):
     def __init__(self, hs):
         self.store = hs.get_datastore()
 
-        super(PushRulesStream, self).__init__(
+        super().__init__(
             hs.get_instance_name(),
             self._current_token,
             self.store.get_all_push_rule_updates,
@@ -383,7 +383,7 @@ class CachesStream(Stream):
     the cache on the workers
     """
 
-    @attr.s
+    @attr.s(slots=True)
     class CachesStreamRow:
         """Stream to inform workers they should invalidate their cache.
 
@@ -441,7 +441,7 @@ class DeviceListsStream(Stream):
     told about a device update.
     """
 
-    @attr.s
+    @attr.s(slots=True)
     class DeviceListsStreamRow:
         entity = attr.ib(type=str)
 
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index f929fc3954..ccc7ca30d8 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -19,7 +19,7 @@ from typing import List, Tuple, Type
 
 import attr
 
-from ._base import Stream, StreamUpdateResult, Token, current_token_without_instance
+from ._base import Stream, StreamUpdateResult, Token
 
 """Handling of the 'events' replication stream
 
@@ -117,7 +117,7 @@ class EventsStream(Stream):
         self._store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),
-            current_token_without_instance(self._store.get_current_events_token),
+            self._store._stream_id_gen.get_current_token_for_writer,
             self._update_function,
         )