diff --git a/changelog.d/4955.bugfix b/changelog.d/4955.bugfix
new file mode 100644
index 0000000000..e50e67383d
--- /dev/null
+++ b/changelog.d/4955.bugfix
@@ -0,0 +1 @@
+Fix sync bug which made accepting invites unreliable in worker-mode synapses.
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 9163b56d86..5388def28a 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -48,6 +48,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.streams.events import EventsStreamEventRow
from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
@@ -369,7 +370,9 @@ class SyncReplicationHandler(ReplicationClientHandler):
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
for row in rows:
- event = yield self.store.get_event(row.event_id)
+ if row.type != EventsStreamEventRow.TypeId:
+ continue
+ event = yield self.store.get_event(row.data.event_id)
extra_users = ()
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index d1ab9512cd..355f5aa71d 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -36,6 +36,10 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.streams.events import (
+ EventsStream,
+ EventsStreamCurrentStateRow,
+)
from synapse.rest.client.v2_alpha import user_directory
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -73,19 +77,18 @@ class UserDirectorySlaveStore(
prefilled_cache=curr_state_delta_prefill,
)
- self._current_state_delta_pos = events_max
-
def stream_positions(self):
result = super(UserDirectorySlaveStore, self).stream_positions()
- result["current_state_deltas"] = self._current_state_delta_pos
return result
def process_replication_rows(self, stream_name, token, rows):
- if stream_name == "current_state_deltas":
- self._current_state_delta_pos = token
+ if stream_name == EventsStream.NAME:
+ self._stream_id_gen.advance(token)
for row in rows:
+ if row.type != EventsStreamCurrentStateRow.TypeId:
+ continue
self._curr_state_delta_stream_cache.entity_has_changed(
- row.room_id, token
+ row.data.room_id, token
)
return super(UserDirectorySlaveStore, self).process_replication_rows(
stream_name, token, rows
@@ -170,7 +173,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
yield super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows
)
- if stream_name == "current_state_deltas":
+ if stream_name == EventsStream.NAME:
run_in_background(self._notify_directory)
@defer.inlineCallbacks
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 4830c68f35..c57385d92f 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -16,6 +16,7 @@
import logging
from synapse.api.constants import EventTypes
+from synapse.replication.tcp.streams.events import EventsStreamEventRow
from synapse.storage.event_federation import EventFederationWorkerStore
from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.events_worker import EventsWorkerStore
@@ -79,9 +80,12 @@ class SlavedEventStore(EventFederationWorkerStore,
if stream_name == "events":
self._stream_id_gen.advance(token)
for row in rows:
+ if row.type != EventsStreamEventRow.TypeId:
+ continue
+ data = row.data
self.invalidate_caches_for_event(
- token, row.event_id, row.room_id, row.type, row.state_key,
- row.redacts,
+ token, data.event_id, data.room_id, data.type, data.state_key,
+ data.redacts,
backfilled=False,
)
elif stream_name == "backfill":
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 9daec2c995..b51590cf8f 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -42,8 +42,8 @@ indicate which side is sending, these are *not* included on the wire::
> POSITION backfill 1
> POSITION caches 1
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
- > RDATA events 14 ["$149019767112vOHxz:localhost:8823",
- "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
+ > RDATA events 14 ["ev", ["$149019767112vOHxz:localhost:8823",
+ "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]]
< PING 1490197675618
> ERROR server stopping
* connection closed by server *
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index 5c715e3bfa..634f636dc9 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -44,7 +44,6 @@ STREAMS_MAP = {
federation.FederationStream,
_base.TagAccountDataStream,
_base.AccountDataStream,
- _base.CurrentStateDeltaStream,
_base.GroupServerStream,
)
}
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 13ab1bee05..8971a6a22e 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -91,12 +91,6 @@ AccountDataStreamRow = namedtuple("AccountDataStream", (
"data_type", # str
"data", # dict
))
-CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", (
- "room_id", # str
- "type", # str
- "state_key", # str
- "event_id", # str, optional
-))
GroupsStreamRow = namedtuple("GroupsStreamRow", (
"group_id", # str
"user_id", # str
@@ -428,21 +422,6 @@ class AccountDataStream(Stream):
defer.returnValue(results)
-class CurrentStateDeltaStream(Stream):
- """Current state for a room was changed
- """
- NAME = "current_state_deltas"
- ROW_TYPE = CurrentStateDeltaStreamRow
-
- def __init__(self, hs):
- store = hs.get_datastore()
-
- self.current_token = store.get_max_current_state_delta_stream_id
- self.update_function = store.get_all_updated_current_state_deltas
-
- super(CurrentStateDeltaStream, self).__init__(hs)
-
-
class GroupServerStream(Stream):
NAME = "groups"
ROW_TYPE = GroupsStreamRow
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 511dd6bcc7..e0f6e29248 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -13,28 +13,134 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from collections import namedtuple
+import heapq
+
+import attr
+
+from twisted.internet import defer
from ._base import Stream
-EventStreamRow = namedtuple("EventStreamRow", (
- "event_id", # str
- "room_id", # str
- "type", # str
- "state_key", # str, optional
- "redacts", # str, optional
-))
+
+"""Handling of the 'events' replication stream
+
+This stream contains rows of various types. Each row therefore contains a 'type'
+identifier before the real data. For example::
+
+ RDATA events batch ["state", ["!room:id", "m.type", "", "$event:id"]]
+ RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]]
+
+An "ev" row is sent for each new event. The fields in the data part are:
+
+ * The new event id
+ * The room id for the event
+ * The type of the new event
+ * The state key of the event, for state events
+ * The event id of an event which is redacted by this event.
+
+A "state" row is sent whenever the "current state" in a room changes. The fields in the
+data part are:
+
+ * The room id for the state change
+ * The event type of the state which has changed
+ * The state_key of the state which has changed
+ * The event id of the new state
+
+"""
+
+
+@attr.s(slots=True, frozen=True)
+class EventsStreamRow(object):
+ """A parsed row from the events replication stream"""
+ type = attr.ib() # str: the TypeId of one of the *EventsStreamRows
+ data = attr.ib() # BaseEventsStreamRow
+
+
+class BaseEventsStreamRow(object):
+ """Base class for rows to be sent in the events stream.
+
+ Specifies how to identify, serialize and deserialize the different types.
+ """
+
+ TypeId = None # Unique string that ids the type. Must be overriden in sub classes.
+
+ @classmethod
+ def from_data(cls, data):
+ """Parse the data from the replication stream into a row.
+
+ By default we just call the constructor with the data list as arguments
+
+ Args:
+ data: The value of the data object from the replication stream
+ """
+ return cls(*data)
+
+
+@attr.s(slots=True, frozen=True)
+class EventsStreamEventRow(BaseEventsStreamRow):
+ TypeId = "ev"
+
+ event_id = attr.ib() # str
+ room_id = attr.ib() # str
+ type = attr.ib() # str
+ state_key = attr.ib() # str, optional
+ redacts = attr.ib() # str, optional
+
+
+@attr.s(slots=True, frozen=True)
+class EventsStreamCurrentStateRow(BaseEventsStreamRow):
+ TypeId = "state"
+
+ room_id = attr.ib() # str
+ type = attr.ib() # str
+ state_key = attr.ib() # str
+ event_id = attr.ib() # str, optional
+
+
+TypeToRow = {
+ Row.TypeId: Row
+ for Row in (
+ EventsStreamEventRow,
+ EventsStreamCurrentStateRow,
+ )
+}
class EventsStream(Stream):
"""We received a new event, or an event went from being an outlier to not
"""
NAME = "events"
- ROW_TYPE = EventStreamRow
def __init__(self, hs):
- store = hs.get_datastore()
- self.current_token = store.get_current_events_token
- self.update_function = store.get_all_new_forward_event_rows
+ self._store = hs.get_datastore()
+ self.current_token = self._store.get_current_events_token
super(EventsStream, self).__init__(hs)
+
+ @defer.inlineCallbacks
+ def update_function(self, from_token, current_token, limit=None):
+ event_rows = yield self._store.get_all_new_forward_event_rows(
+ from_token, current_token, limit,
+ )
+ event_updates = (
+ (row[0], EventsStreamEventRow.TypeId, row[1:])
+ for row in event_rows
+ )
+
+ state_rows = yield self._store.get_all_updated_current_state_deltas(
+ from_token, current_token, limit
+ )
+ state_updates = (
+ (row[0], EventsStreamCurrentStateRow.TypeId, row[1:])
+ for row in state_rows
+ )
+
+ all_updates = heapq.merge(event_updates, state_updates)
+
+ defer.returnValue(all_updates)
+
+ @classmethod
+ def parse_row(cls, row):
+ (typ, data) = row
+ data = TypeToRow[typ].from_data(data)
+ return EventsStreamRow(typ, data)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index b1d5f469c8..d0668e39c4 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -2235,9 +2235,6 @@ class EventsStore(
(int(res["topological_ordering"]), int(res["stream_ordering"]))
)
- def get_max_current_state_delta_stream_id(self):
- return self._stream_id_gen.get_current_token()
-
def get_all_updated_current_state_deltas(self, from_token, to_token, limit):
def get_all_updated_current_state_deltas_txn(txn):
sql = """
|