summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/replication/slave/storage/events.py31
-rw-r--r--synapse/storage/_base.py5
-rw-r--r--synapse/storage/events.py24
3 files changed, 40 insertions, 20 deletions
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index c57385d92f..b457c5563f 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -16,7 +16,10 @@
 import logging
 
 from synapse.api.constants import EventTypes
-from synapse.replication.tcp.streams.events import EventsStreamEventRow
+from synapse.replication.tcp.streams.events import (
+    EventsStreamCurrentStateRow,
+    EventsStreamEventRow,
+)
 from synapse.storage.event_federation import EventFederationWorkerStore
 from synapse.storage.event_push_actions import EventPushActionsWorkerStore
 from synapse.storage.events_worker import EventsWorkerStore
@@ -80,14 +83,7 @@ class SlavedEventStore(EventFederationWorkerStore,
         if stream_name == "events":
             self._stream_id_gen.advance(token)
             for row in rows:
-                if row.type != EventsStreamEventRow.TypeId:
-                    continue
-                data = row.data
-                self.invalidate_caches_for_event(
-                    token, data.event_id, data.room_id, data.type, data.state_key,
-                    data.redacts,
-                    backfilled=False,
-                )
+                self._process_event_stream_row(token, row)
         elif stream_name == "backfill":
             self._backfill_id_gen.advance(-token)
             for row in rows:
@@ -100,6 +96,23 @@ class SlavedEventStore(EventFederationWorkerStore,
             stream_name, token, rows
         )
 
+    def _process_event_stream_row(self, token, row):
+        data = row.data
+
+        if row.type == EventsStreamEventRow.TypeId:
+            self.invalidate_caches_for_event(
+                token, data.event_id, data.room_id, data.type, data.state_key,
+                data.redacts,
+                backfilled=False,
+            )
+        elif row.type == EventsStreamCurrentStateRow.TypeId:
+            if data.type == EventTypes.Member:
+                self.get_rooms_for_user_with_stream_ordering.invalidate(
+                    (data.state_key, ),
+                )
+        else:
+            raise Exception("Unknown events stream row type %s" % (row.type, ))
+
     def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
                                     etype, state_key, redacts, backfilled):
         self._invalidate_get_event_cache(event_id)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 7e3903859b..653b32fbf5 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -1355,11 +1355,6 @@ class SQLBaseStore(object):
             members_changed (iterable[str]): The user_ids of members that have
                 changed
         """
-        for member in members_changed:
-            self._attempt_to_invalidate_cache(
-                "get_rooms_for_user_with_stream_ordering", (member,),
-            )
-
         for host in set(get_domain_from_id(u) for u in members_changed):
             self._attempt_to_invalidate_cache(
                 "is_host_joined", (room_id, host,),
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index d0668e39c4..dfda39bbe0 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -79,7 +79,7 @@ def encode_json(json_object):
     """
     out = frozendict_json_encoder.encode(json_object)
     if isinstance(out, bytes):
-        out = out.decode('utf8')
+        out = out.decode("utf8")
     return out
 
 
@@ -813,9 +813,10 @@ class EventsStore(
         """
         all_events_and_contexts = events_and_contexts
 
+        min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
         max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
 
-        self._update_current_state_txn(txn, state_delta_for_room, max_stream_order)
+        self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
 
         self._update_forward_extremities_txn(
             txn,
@@ -890,7 +891,7 @@ class EventsStore(
             backfilled=backfilled,
         )
 
-    def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
+    def _update_current_state_txn(self, txn, state_delta_by_room, stream_id):
         for room_id, current_state_tuple in iteritems(state_delta_by_room):
             to_delete, to_insert = current_state_tuple
 
@@ -899,6 +900,12 @@ class EventsStore(
             # that we can use it to calculate the `prev_event_id`. (This
             # allows us to not have to pull out the existing state
             # unnecessarily).
+            #
+            # The stream_id for the update is chosen to be the minimum of the stream_ids
+            # for the batch of the events that we are persisting; that means we do not
+            # end up in a situation where workers see events before the
+            # current_state_delta updates.
+            #
             sql = """
                 INSERT INTO current_state_delta_stream
                 (stream_id, room_id, type, state_key, event_id, prev_event_id)
@@ -911,7 +918,7 @@ class EventsStore(
                 sql,
                 (
                     (
-                        max_stream_order,
+                        stream_id,
                         room_id,
                         etype,
                         state_key,
@@ -929,7 +936,7 @@ class EventsStore(
                 sql,
                 (
                     (
-                        max_stream_order,
+                        stream_id,
                         room_id,
                         etype,
                         state_key,
@@ -970,7 +977,7 @@ class EventsStore(
             txn.call_after(
                 self._curr_state_delta_stream_cache.entity_has_changed,
                 room_id,
-                max_stream_order,
+                stream_id,
             )
 
             # Invalidate the various caches
@@ -986,6 +993,11 @@ class EventsStore(
                 if ev_type == EventTypes.Member
             )
 
+            for member in members_changed:
+                txn.call_after(
+                    self.get_rooms_for_user_with_stream_ordering.invalidate, (member,)
+                )
+
             self._invalidate_state_caches_and_stream(txn, room_id, members_changed)
 
     def _update_forward_extremities_txn(