summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/app/user_dir.py17
-rw-r--r--synapse/replication/tcp/streams/__init__.py1
-rw-r--r--synapse/replication/tcp/streams/_base.py21
-rw-r--r--synapse/replication/tcp/streams/events.py34
-rw-r--r--synapse/storage/events.py3
5 files changed, 43 insertions, 33 deletions
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index d1ab9512cd..355f5aa71d 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -36,6 +36,10 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.streams.events import (
+    EventsStream,
+    EventsStreamCurrentStateRow,
+)
 from synapse.rest.client.v2_alpha import user_directory
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
@@ -73,19 +77,18 @@ class UserDirectorySlaveStore(
             prefilled_cache=curr_state_delta_prefill,
         )
 
-        self._current_state_delta_pos = events_max
-
     def stream_positions(self):
         result = super(UserDirectorySlaveStore, self).stream_positions()
-        result["current_state_deltas"] = self._current_state_delta_pos
         return result
 
     def process_replication_rows(self, stream_name, token, rows):
-        if stream_name == "current_state_deltas":
-            self._current_state_delta_pos = token
+        if stream_name == EventsStream.NAME:
+            self._stream_id_gen.advance(token)
             for row in rows:
+                if row.type != EventsStreamCurrentStateRow.TypeId:
+                    continue
                 self._curr_state_delta_stream_cache.entity_has_changed(
-                    row.room_id, token
+                    row.data.room_id, token
                 )
         return super(UserDirectorySlaveStore, self).process_replication_rows(
             stream_name, token, rows
@@ -170,7 +173,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
         yield super(UserDirectoryReplicationHandler, self).on_rdata(
             stream_name, token, rows
         )
-        if stream_name == "current_state_deltas":
+        if stream_name == EventsStream.NAME:
             run_in_background(self._notify_directory)
 
     @defer.inlineCallbacks
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index 5c715e3bfa..634f636dc9 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -44,7 +44,6 @@ STREAMS_MAP = {
         federation.FederationStream,
         _base.TagAccountDataStream,
         _base.AccountDataStream,
-        _base.CurrentStateDeltaStream,
         _base.GroupServerStream,
     )
 }
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 13ab1bee05..8971a6a22e 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -91,12 +91,6 @@ AccountDataStreamRow = namedtuple("AccountDataStream", (
     "data_type",  # str
     "data",  # dict
 ))
-CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", (
-    "room_id",  # str
-    "type",  # str
-    "state_key",  # str
-    "event_id",  # str, optional
-))
 GroupsStreamRow = namedtuple("GroupsStreamRow", (
     "group_id",  # str
     "user_id",  # str
@@ -428,21 +422,6 @@ class AccountDataStream(Stream):
         defer.returnValue(results)
 
 
-class CurrentStateDeltaStream(Stream):
-    """Current state for a room was changed
-    """
-    NAME = "current_state_deltas"
-    ROW_TYPE = CurrentStateDeltaStreamRow
-
-    def __init__(self, hs):
-        store = hs.get_datastore()
-
-        self.current_token = store.get_max_current_state_delta_stream_id
-        self.update_function = store.get_all_updated_current_state_deltas
-
-        super(CurrentStateDeltaStream, self).__init__(hs)
-
-
 class GroupServerStream(Stream):
     NAME = "groups"
     ROW_TYPE = GroupsStreamRow
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 928028e893..e0f6e29248 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -13,6 +13,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import heapq
 
 import attr
 
@@ -26,6 +27,7 @@ from ._base import Stream
 This stream contains rows of various types. Each row therefore contains a 'type'
 identifier before the real data. For example::
 
+    RDATA events batch ["state", ["!room:id", "m.type", "", "$event:id"]]
     RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]]
 
 An "ev" row is sent for each new event. The fields in the data part are:
@@ -36,6 +38,14 @@ An "ev" row is sent for each new event. The fields in the data part are:
  * The state key of the event, for state events
  * The event id of an event which is redacted by this event.
 
+A "state" row is sent whenever the "current state" in a room changes. The fields in the
+data part are:
+
+ * The room id for the state change
+ * The event type of the state which has changed
+ * The state_key of the state which has changed
+ * The event id of the new state
+
 """
 
 
@@ -77,10 +87,21 @@ class EventsStreamEventRow(BaseEventsStreamRow):
     redacts = attr.ib()    # str, optional
 
 
+@attr.s(slots=True, frozen=True)
+class EventsStreamCurrentStateRow(BaseEventsStreamRow):
+    TypeId = "state"
+
+    room_id = attr.ib()    # str
+    type = attr.ib()       # str
+    state_key = attr.ib()  # str
+    event_id = attr.ib()   # str, optional
+
+
 TypeToRow = {
     Row.TypeId: Row
     for Row in (
         EventsStreamEventRow,
+        EventsStreamCurrentStateRow,
     )
 }
 
@@ -105,7 +126,18 @@ class EventsStream(Stream):
             (row[0], EventsStreamEventRow.TypeId, row[1:])
             for row in event_rows
         )
-        defer.returnValue(event_updates)
+
+        state_rows = yield self._store.get_all_updated_current_state_deltas(
+            from_token, current_token, limit
+        )
+        state_updates = (
+            (row[0], EventsStreamCurrentStateRow.TypeId, row[1:])
+            for row in state_rows
+        )
+
+        all_updates = heapq.merge(event_updates, state_updates)
+
+        defer.returnValue(all_updates)
 
     @classmethod
     def parse_row(cls, row):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 428300ea0a..0b0a4dcdd3 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -2287,9 +2287,6 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
 
         defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"])))
 
-    def get_max_current_state_delta_stream_id(self):
-        return self._stream_id_gen.get_current_token()
-
     def get_all_updated_current_state_deltas(self, from_token, to_token, limit):
         def get_all_updated_current_state_deltas_txn(txn):
             sql = """