diff --git a/changelog.d/8388.misc b/changelog.d/8388.misc
new file mode 100644
index 0000000000..aaaef88b66
--- /dev/null
+++ b/changelog.d/8388.misc
@@ -0,0 +1 @@
+Add `EventStreamPosition` type.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ea9264e751..9f773aefa7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -74,6 +74,8 @@ from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
JsonDict,
MutableStateMap,
+ PersistedEventPosition,
+ RoomStreamToken,
StateMap,
UserID,
get_domain_from_id,
@@ -2956,7 +2958,7 @@ class FederationHandler(BaseHandler):
)
return result["max_stream_id"]
else:
- max_stream_id = await self.storage.persistence.persist_events(
+ max_stream_token = await self.storage.persistence.persist_events(
event_and_contexts, backfilled=backfilled
)
@@ -2967,12 +2969,12 @@ class FederationHandler(BaseHandler):
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
- await self._notify_persisted_event(event, max_stream_id)
+ await self._notify_persisted_event(event, max_stream_token)
- return max_stream_id
+ return max_stream_token.stream
async def _notify_persisted_event(
- self, event: EventBase, max_stream_id: int
+ self, event: EventBase, max_stream_token: RoomStreamToken
) -> None:
"""Checks to see if notifier/pushers should be notified about the
event or not.
@@ -2998,9 +3000,11 @@ class FederationHandler(BaseHandler):
elif event.internal_metadata.is_outlier():
return
- event_stream_id = event.internal_metadata.stream_ordering
+ event_pos = PersistedEventPosition(
+ self._instance_name, event.internal_metadata.stream_ordering
+ )
self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
+ event, event_pos, max_stream_token, extra_users=extra_users
)
async def _clean_room_for_join(self, room_id: str) -> None:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6ee559fd1d..ee271e85e5 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1138,7 +1138,7 @@ class EventCreationHandler:
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")
- event_stream_id, max_stream_id = await self.storage.persistence.persist_event(
+ event_pos, max_stream_token = await self.storage.persistence.persist_event(
event, context=context
)
@@ -1149,7 +1149,7 @@ class EventCreationHandler:
def _notify():
try:
self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
+ event, event_pos, max_stream_token, extra_users=extra_users
)
except Exception:
logger.exception("Error notifying about new room event")
@@ -1161,7 +1161,7 @@ class EventCreationHandler:
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)
- return event_stream_id
+ return event_pos.stream
async def _bump_active_time(self, user: UserID) -> None:
try:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9b3a4f638b..e948efef2e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -967,7 +967,7 @@ class SyncHandler:
raise NotImplementedError()
else:
joined_room_ids = await self.get_rooms_for_user_at(
- user_id, now_token.room_stream_id
+ user_id, now_token.room_key
)
sync_result_builder = SyncResultBuilder(
sync_config,
@@ -1916,7 +1916,7 @@ class SyncHandler:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
async def get_rooms_for_user_at(
- self, user_id: str, stream_ordering: int
+ self, user_id: str, room_key: RoomStreamToken
) -> FrozenSet[str]:
"""Get set of joined rooms for a user at the given stream ordering.
@@ -1942,15 +1942,15 @@ class SyncHandler:
# If the membership's stream ordering is after the given stream
# ordering, we need to go and work out if the user was in the room
# before.
- for room_id, membership_stream_ordering in joined_rooms:
- if membership_stream_ordering <= stream_ordering:
+ for room_id, event_pos in joined_rooms:
+ if not event_pos.persisted_after(room_key):
joined_room_ids.add(room_id)
continue
logger.info("User joined room after current token: %s", room_id)
extrems = await self.store.get_forward_extremeties_for_room(
- room_id, stream_ordering
+ room_id, event_pos.stream
)
users_in_room = await self.state.get_current_users_in_room(room_id, extrems)
if user_id in users_in_room:
diff --git a/synapse/notifier.py b/synapse/notifier.py
index a8fd3ef886..441b3d15e2 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -42,7 +42,13 @@ from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.streams.config import PaginationConfig
-from synapse.types import Collection, RoomStreamToken, StreamToken, UserID
+from synapse.types import (
+ Collection,
+ PersistedEventPosition,
+ RoomStreamToken,
+ StreamToken,
+ UserID,
+)
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client
@@ -187,7 +193,7 @@ class Notifier:
self.store = hs.get_datastore()
self.pending_new_room_events = (
[]
- ) # type: List[Tuple[int, EventBase, Collection[UserID]]]
+ ) # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]]
# Called when there are new things to stream over replication
self.replication_callbacks = [] # type: List[Callable[[], None]]
@@ -246,8 +252,8 @@ class Notifier:
def on_new_room_event(
self,
event: EventBase,
- room_stream_id: int,
- max_room_stream_id: int,
+ event_pos: PersistedEventPosition,
+ max_room_stream_token: RoomStreamToken,
extra_users: Collection[UserID] = [],
):
""" Used by handlers to inform the notifier something has happened
@@ -261,16 +267,16 @@ class Notifier:
until all previous events have been persisted before notifying
the client streams.
"""
- self.pending_new_room_events.append((room_stream_id, event, extra_users))
- self._notify_pending_new_room_events(max_room_stream_id)
+ self.pending_new_room_events.append((event_pos, event, extra_users))
+ self._notify_pending_new_room_events(max_room_stream_token)
self.notify_replication()
- def _notify_pending_new_room_events(self, max_room_stream_id: int):
+ def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken):
"""Notify for the room events that were queued waiting for a previous
event to be persisted.
Args:
- max_room_stream_id: The highest stream_id below which all
+ max_room_stream_token: The highest stream_id below which all
events have been persisted.
"""
pending = self.pending_new_room_events
@@ -279,11 +285,9 @@ class Notifier:
users = set() # type: Set[UserID]
rooms = set() # type: Set[str]
- for room_stream_id, event, extra_users in pending:
- if room_stream_id > max_room_stream_id:
- self.pending_new_room_events.append(
- (room_stream_id, event, extra_users)
- )
+ for event_pos, event, extra_users in pending:
+ if event_pos.persisted_after(max_room_stream_token):
+ self.pending_new_room_events.append((event_pos, event, extra_users))
else:
if (
event.type == EventTypes.Member
@@ -296,39 +300,38 @@ class Notifier:
if users or rooms:
self.on_new_event(
- "room_key",
- RoomStreamToken(None, max_room_stream_id),
- users=users,
- rooms=rooms,
+ "room_key", max_room_stream_token, users=users, rooms=rooms,
)
- self._on_updated_room_token(max_room_stream_id)
+ self._on_updated_room_token(max_room_stream_token)
- def _on_updated_room_token(self, max_room_stream_id: int):
+ def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken):
"""Poke services that might care that the room position has been
updated.
"""
# poke any interested application service.
run_as_background_process(
- "_notify_app_services", self._notify_app_services, max_room_stream_id
+ "_notify_app_services", self._notify_app_services, max_room_stream_token
)
run_as_background_process(
- "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_id
+ "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token
)
if self.federation_sender:
- self.federation_sender.notify_new_events(max_room_stream_id)
+ self.federation_sender.notify_new_events(max_room_stream_token.stream)
- async def _notify_app_services(self, max_room_stream_id: int):
+ async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
try:
- await self.appservice_handler.notify_interested_services(max_room_stream_id)
+ await self.appservice_handler.notify_interested_services(
+ max_room_stream_token.stream
+ )
except Exception:
logger.exception("Error notifying application services of event")
- async def _notify_pusher_pool(self, max_room_stream_id: int):
+ async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
- await self._pusher_pool.on_new_notifications(max_room_stream_id)
+ await self._pusher_pool.on_new_notifications(max_room_stream_token.stream)
except Exception:
logger.exception("Error pusher pool of event")
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e82b9e386f..55af3d41ea 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -29,7 +29,7 @@ from synapse.replication.tcp.streams.events import (
EventsStreamEventRow,
EventsStreamRow,
)
-from synapse.types import UserID
+from synapse.types import PersistedEventPosition, RoomStreamToken, UserID
from synapse.util.async_helpers import timeout_deferred
from synapse.util.metrics import Measure
@@ -151,8 +151,14 @@ class ReplicationDataHandler:
extra_users = () # type: Tuple[UserID, ...]
if event.type == EventTypes.Member:
extra_users = (UserID.from_string(event.state_key),)
- max_token = self.store.get_room_max_stream_ordering()
- self.notifier.on_new_room_event(event, token, max_token, extra_users)
+
+ max_token = RoomStreamToken(
+ None, self.store.get_room_max_stream_ordering()
+ )
+ event_pos = PersistedEventPosition(instance_name, token)
+ self.notifier.on_new_room_event(
+ event, event_pos, max_token, extra_users
+ )
# Notify any waiting deferreds. The list is ordered by position so we
# just iterate through the list until we reach a position that is
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 4fa8767b01..86ffe2479e 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -13,7 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Set
@@ -37,7 +36,7 @@ from synapse.storage.roommember import (
ProfileInfo,
RoomsForUser,
)
-from synapse.types import Collection, get_domain_from_id
+from synapse.types import Collection, PersistedEventPosition, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
@@ -387,7 +386,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# for rooms the server is participating in.
if self._current_state_events_membership_up_to_date:
sql = """
- SELECT room_id, e.stream_ordering
+ SELECT room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN events AS e USING (room_id, event_id)
WHERE
@@ -397,7 +396,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
"""
else:
sql = """
- SELECT room_id, e.stream_ordering
+ SELECT room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN room_memberships AS m USING (room_id, event_id)
INNER JOIN events AS e USING (room_id, event_id)
@@ -408,7 +407,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
"""
txn.execute(sql, (user_id, Membership.JOIN))
- return frozenset(GetRoomsForUserWithStreamOrdering(*row) for row in txn)
+ return frozenset(
+ GetRoomsForUserWithStreamOrdering(
+ room_id, PersistedEventPosition(instance, stream_id)
+ )
+ for room_id, instance, stream_id in txn
+ )
async def get_users_server_still_shares_room_with(
self, user_ids: Collection[str]
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index d89f6ed128..603cd7d825 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -31,7 +31,7 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yielda
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases import Databases
from synapse.storage.databases.main.events import DeltaState
-from synapse.types import Collection, StateMap
+from synapse.types import Collection, PersistedEventPosition, RoomStreamToken, StateMap
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.metrics import Measure
@@ -190,6 +190,7 @@ class EventsPersistenceStorage:
self.persist_events_store = stores.persist_events
self._clock = hs.get_clock()
+ self._instance_name = hs.get_instance_name()
self.is_mine_id = hs.is_mine_id
self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()
@@ -198,7 +199,7 @@ class EventsPersistenceStorage:
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool = False,
- ) -> int:
+ ) -> RoomStreamToken:
"""
Write events to the database
Args:
@@ -228,11 +229,11 @@ class EventsPersistenceStorage:
defer.gatherResults(deferreds, consumeErrors=True)
)
- return self.main_store.get_current_events_token()
+ return RoomStreamToken(None, self.main_store.get_current_events_token())
async def persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
- ) -> Tuple[int, int]:
+ ) -> Tuple[PersistedEventPosition, RoomStreamToken]:
"""
Returns:
The stream ordering of `event`, and the stream ordering of the
@@ -247,7 +248,10 @@ class EventsPersistenceStorage:
await make_deferred_yieldable(deferred)
max_persisted_id = self.main_store.get_current_events_token()
- return (event.internal_metadata.stream_ordering, max_persisted_id)
+ event_stream_id = event.internal_metadata.stream_ordering
+
+ pos = PersistedEventPosition(self._instance_name, event_stream_id)
+ return pos, RoomStreamToken(None, max_persisted_id)
def _maybe_start_persisting(self, room_id: str):
async def persisting_queue(item):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 8c4a83a840..f152f63321 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -25,7 +25,7 @@ RoomsForUser = namedtuple(
)
GetRoomsForUserWithStreamOrdering = namedtuple(
- "_GetRoomsForUserWithStreamOrdering", ("room_id", "stream_ordering")
+ "_GetRoomsForUserWithStreamOrdering", ("room_id", "event_pos")
)
diff --git a/synapse/types.py b/synapse/types.py
index a6fc7df22c..ec39f9e1e8 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -495,6 +495,21 @@ class StreamToken:
StreamToken.START = StreamToken.from_string("s0_0")
+@attr.s(slots=True, frozen=True)
+class PersistedEventPosition:
+ """Position of a newly persisted event with instance that persisted it.
+
+ This can be used to test whether the event is persisted before or after a
+ RoomStreamToken.
+ """
+
+ instance_name = attr.ib(type=str)
+ stream = attr.ib(type=int)
+
+ def persisted_after(self, token: RoomStreamToken) -> bool:
+ return token.stream < self.stream
+
+
class ThirdPartyInstanceID(
namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id"))
):
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index bc578411d6..c0ee1cfbd6 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -20,6 +20,7 @@ from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_
from synapse.handlers.room import RoomEventSource
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.storage.roommember import RoomsForUser
+from synapse.types import PersistedEventPosition
from tests.server import FakeTransport
@@ -204,10 +205,14 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
)
self.replicate()
+
+ expected_pos = PersistedEventPosition(
+ "master", j2.internal_metadata.stream_ordering
+ )
self.check(
"get_rooms_for_user_with_stream_ordering",
(USER_ID_2,),
- {(ROOM_ID, j2.internal_metadata.stream_ordering)},
+ {(ROOM_ID, expected_pos)},
)
def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(self):
@@ -293,9 +298,10 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
# the membership change is only any use to us if the room is in the
# joined_rooms list.
if membership_changes:
- self.assertEqual(
- joined_rooms, {(ROOM_ID, j2.internal_metadata.stream_ordering)}
+ expected_pos = PersistedEventPosition(
+ "master", j2.internal_metadata.stream_ordering
)
+ self.assertEqual(joined_rooms, {(ROOM_ID, expected_pos)})
event_id = 0
|