diff --git a/changelog.d/3350.misc b/changelog.d/3350.misc
new file mode 100644
index 0000000000..3713cd6d63
--- /dev/null
+++ b/changelog.d/3350.misc
@@ -0,0 +1 @@
+Remove redundant checks on who_forgot_in_room
\ No newline at end of file
diff --git a/changelog.d/3555.feature b/changelog.d/3555.feature
new file mode 100644
index 0000000000..ea4a85e0ae
--- /dev/null
+++ b/changelog.d/3555.feature
@@ -0,0 +1 @@
+Add support for client_reader to handle more APIs
diff --git a/changelog.d/3586.misc b/changelog.d/3586.misc
new file mode 100644
index 0000000000..e853e2481b
--- /dev/null
+++ b/changelog.d/3586.misc
@@ -0,0 +1 @@
+Fixes and optimisations for resolve_state_groups
diff --git a/changelog.d/3590.misc b/changelog.d/3590.misc
new file mode 100644
index 0000000000..0f1688fd0f
--- /dev/null
+++ b/changelog.d/3590.misc
@@ -0,0 +1 @@
+Add some measure blocks to persist_events
diff --git a/changelog.d/3591.misc b/changelog.d/3591.misc
new file mode 100644
index 0000000000..f0137766a0
--- /dev/null
+++ b/changelog.d/3591.misc
@@ -0,0 +1 @@
+Fix some random logcontext leaks.
\ No newline at end of file
diff --git a/changelog.d/3592.misc b/changelog.d/3592.misc
new file mode 100644
index 0000000000..60129569c2
--- /dev/null
+++ b/changelog.d/3592.misc
@@ -0,0 +1 @@
+Speed up calculating state deltas in persist_event loop
diff --git a/docs/workers.rst b/docs/workers.rst
index 1d521b9ec5..c5b37c3ded 100644
--- a/docs/workers.rst
+++ b/docs/workers.rst
@@ -206,6 +206,10 @@ Handles client API endpoints. It can handle REST endpoints matching the
following regular expressions::
^/_matrix/client/(api/v1|r0|unstable)/publicRooms$
+ ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$
+ ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$
+ ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$
+ ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$
``synapse.app.user_dir``
~~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 535bdb449d..073229b4c4 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -739,3 +739,37 @@ class Auth(object):
)
return query_params[0]
+
+ @defer.inlineCallbacks
+ def check_in_room_or_world_readable(self, room_id, user_id):
+ """Checks that the user is or was in the room or the room is world
+ readable. If it isn't then an exception is raised.
+
+ Returns:
+ Deferred[tuple[str, str|None]]: Resolves to the current membership of
+ the user in the room and the membership event ID of the user. If
+ the user is not in the room and never has been, then
+ `(Membership.JOIN, None)` is returned.
+ """
+
+ try:
+ # check_user_was_in_room will return the most recent membership
+ # event for the user if:
+ # * The user is a non-guest user, and was ever in the room
+ # * The user is a guest user, and has joined the room
+ # else it will throw.
+ member_event = yield self.check_user_was_in_room(room_id, user_id)
+ defer.returnValue((member_event.membership, member_event.event_id))
+ except AuthError:
+ visibility = yield self.state.get_current_state(
+ room_id, EventTypes.RoomHistoryVisibility, ""
+ )
+ if (
+ visibility and
+ visibility.content["history_visibility"] == "world_readable"
+ ):
+ defer.returnValue((Membership.JOIN, None))
+ return
+ raise AuthError(
+ 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
+ )
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index b0ea26dcb4..398bb36602 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -40,7 +40,13 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.rest.client.v1.room import PublicRoomListRestServlet
+from synapse.rest.client.v1.room import (
+ JoinedRoomMemberListRestServlet,
+ PublicRoomListRestServlet,
+ RoomEventContextServlet,
+ RoomMemberListRestServlet,
+ RoomStateRestServlet,
+)
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
@@ -82,7 +88,13 @@ class ClientReaderServer(HomeServer):
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
+
PublicRoomListRestServlet(self).register(resource)
+ RoomMemberListRestServlet(self).register(resource)
+ JoinedRoomMemberListRestServlet(self).register(resource)
+ RoomStateRestServlet(self).register(resource)
+ RoomEventContextServlet(self).register(resource)
+
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index 189212b0fa..368b5f6ae4 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -249,7 +249,7 @@ class EventContext(object):
@defer.inlineCallbacks
def update_state(self, state_group, prev_state_ids, current_state_ids,
- delta_ids):
+ prev_group, delta_ids):
"""Replace the state in the context
"""
@@ -260,6 +260,7 @@ class EventContext(object):
self.state_group = state_group
self._prev_state_ids = prev_state_ids
+ self.prev_group = prev_group
self._current_state_ids = current_state_ids
self.delta_ids = delta_ids
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 4b9923d8c0..413425fed1 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,9 +17,7 @@ from .admin import AdminHandler
from .directory import DirectoryHandler
from .federation import FederationHandler
from .identity import IdentityHandler
-from .message import MessageHandler
from .register import RegistrationHandler
-from .room import RoomContextHandler
from .search import SearchHandler
@@ -44,10 +42,8 @@ class Handlers(object):
def __init__(self, hs):
self.registration_handler = RegistrationHandler(hs)
- self.message_handler = MessageHandler(hs)
self.federation_handler = FederationHandler(hs)
self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs)
self.identity_handler = IdentityHandler(hs)
self.search_handler = SearchHandler(hs)
- self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ec9fe01a5a..ee41aed69e 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -23,6 +23,7 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.metrics import Measure
@@ -106,7 +107,9 @@ class ApplicationServicesHandler(object):
yield self._check_user_exists(event.state_key)
if not self.started_scheduler:
- self.scheduler.start().addErrback(log_failure)
+ def start_scheduler():
+ return self.scheduler.start().addErrback(log_failure)
+ run_as_background_process("as_scheduler", start_scheduler)
self.started_scheduler = True
# Fork off pushes to these services
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 14654d59f1..145c1a21d4 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1980,10 +1980,6 @@ class FederationHandler(BaseHandler):
current_state_ids.update(state_updates)
- if context.delta_ids is not None:
- delta_ids = dict(context.delta_ids)
- delta_ids.update(state_updates)
-
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = dict(prev_state_ids)
@@ -1991,11 +1987,13 @@ class FederationHandler(BaseHandler):
k: a.event_id for k, a in iteritems(auth_events)
})
+ # create a new state group as a delta from the existing one.
+ prev_group = context.state_group
state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
- prev_group=context.prev_group,
- delta_ids=delta_ids,
+ prev_group=prev_group,
+ delta_ids=state_updates,
current_state_ids=current_state_ids,
)
@@ -2003,7 +2001,8 @@ class FederationHandler(BaseHandler):
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
- delta_ids=delta_ids,
+ prev_group=prev_group,
+ delta_ids=state_updates,
)
@defer.inlineCallbacks
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index fb11716eb8..40e7580a61 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -148,13 +148,15 @@ class InitialSyncHandler(BaseHandler):
try:
if event.membership == Membership.JOIN:
room_end_token = now_token.room_key
- deferred_room_state = self.state_handler.get_current_state(
- event.room_id
+ deferred_room_state = run_in_background(
+ self.state_handler.get_current_state,
+ event.room_id,
)
elif event.membership == Membership.LEAVE:
room_end_token = "s%d" % (event.stream_ordering,)
- deferred_room_state = self.store.get_state_for_events(
- [event.event_id], None
+ deferred_room_state = run_in_background(
+ self.store.get_state_for_events,
+ [event.event_id], None,
)
deferred_room_state.addCallback(
lambda states: states[event.event_id]
@@ -387,19 +389,21 @@ class InitialSyncHandler(BaseHandler):
receipts = []
defer.returnValue(receipts)
- presence, receipts, (messages, token) = yield defer.gatherResults(
- [
- run_in_background(get_presence),
- run_in_background(get_receipts),
- run_in_background(
- self.store.get_recent_events_for_room,
- room_id,
- limit=limit,
- end_token=now_token.room_key,
- )
- ],
- consumeErrors=True,
- ).addErrback(unwrapFirstError)
+ presence, receipts, (messages, token) = yield make_deferred_yieldable(
+ defer.gatherResults(
+ [
+ run_in_background(get_presence),
+ run_in_background(get_receipts),
+ run_in_background(
+ self.store.get_recent_events_for_room,
+ room_id,
+ limit=limit,
+ end_token=now_token.room_key,
+ )
+ ],
+ consumeErrors=True,
+ ).addErrback(unwrapFirstError),
+ )
messages = yield filter_events_for_client(
self.store, user_id, messages, is_peeking=is_peeking,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7571975c22..39d7724778 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from twisted.internet.defer import succeed
-from twisted.python.failure import Failure
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
@@ -32,247 +31,26 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import send_event_to_master
-from synapse.types import RoomAlias, RoomStreamToken, UserID
-from synapse.util.async import Linearizer, ReadWriteLock
+from synapse.types import RoomAlias, UserID
+from synapse.util.async import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
-from synapse.util.stringutils import random_string
-from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
logger = logging.getLogger(__name__)
-class PurgeStatus(object):
- """Object tracking the status of a purge request
-
- This class contains information on the progress of a purge request, for
- return by get_purge_status.
-
- Attributes:
- status (int): Tracks whether this request has completed. One of
- STATUS_{ACTIVE,COMPLETE,FAILED}
+class MessageHandler(object):
+ """Contains some read only APIs to get state about a room
"""
- STATUS_ACTIVE = 0
- STATUS_COMPLETE = 1
- STATUS_FAILED = 2
-
- STATUS_TEXT = {
- STATUS_ACTIVE: "active",
- STATUS_COMPLETE: "complete",
- STATUS_FAILED: "failed",
- }
-
- def __init__(self):
- self.status = PurgeStatus.STATUS_ACTIVE
-
- def asdict(self):
- return {
- "status": PurgeStatus.STATUS_TEXT[self.status]
- }
-
-
-class MessageHandler(BaseHandler):
-
def __init__(self, hs):
- super(MessageHandler, self).__init__(hs)
- self.hs = hs
- self.state = hs.get_state_handler()
+ self.auth = hs.get_auth()
self.clock = hs.get_clock()
-
- self.pagination_lock = ReadWriteLock()
- self._purges_in_progress_by_room = set()
- # map from purge id to PurgeStatus
- self._purges_by_id = {}
-
- def start_purge_history(self, room_id, token,
- delete_local_events=False):
- """Start off a history purge on a room.
-
- Args:
- room_id (str): The room to purge from
-
- token (str): topological token to delete events before
- delete_local_events (bool): True to delete local events as well as
- remote ones
-
- Returns:
- str: unique ID for this purge transaction.
- """
- if room_id in self._purges_in_progress_by_room:
- raise SynapseError(
- 400,
- "History purge already in progress for %s" % (room_id, ),
- )
-
- purge_id = random_string(16)
-
- # we log the purge_id here so that it can be tied back to the
- # request id in the log lines.
- logger.info("[purge] starting purge_id %s", purge_id)
-
- self._purges_by_id[purge_id] = PurgeStatus()
- run_in_background(
- self._purge_history,
- purge_id, room_id, token, delete_local_events,
- )
- return purge_id
-
- @defer.inlineCallbacks
- def _purge_history(self, purge_id, room_id, token,
- delete_local_events):
- """Carry out a history purge on a room.
-
- Args:
- purge_id (str): The id for this purge
- room_id (str): The room to purge from
- token (str): topological token to delete events before
- delete_local_events (bool): True to delete local events as well as
- remote ones
-
- Returns:
- Deferred
- """
- self._purges_in_progress_by_room.add(room_id)
- try:
- with (yield self.pagination_lock.write(room_id)):
- yield self.store.purge_history(
- room_id, token, delete_local_events,
- )
- logger.info("[purge] complete")
- self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
- except Exception:
- logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
- self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
- finally:
- self._purges_in_progress_by_room.discard(room_id)
-
- # remove the purge from the list 24 hours after it completes
- def clear_purge():
- del self._purges_by_id[purge_id]
- self.hs.get_reactor().callLater(24 * 3600, clear_purge)
-
- def get_purge_status(self, purge_id):
- """Get the current status of an active purge
-
- Args:
- purge_id (str): purge_id returned by start_purge_history
-
- Returns:
- PurgeStatus|None
- """
- return self._purges_by_id.get(purge_id)
-
- @defer.inlineCallbacks
- def get_messages(self, requester, room_id=None, pagin_config=None,
- as_client_event=True, event_filter=None):
- """Get messages in a room.
-
- Args:
- requester (Requester): The user requesting messages.
- room_id (str): The room they want messages from.
- pagin_config (synapse.api.streams.PaginationConfig): The pagination
- config rules to apply, if any.
- as_client_event (bool): True to get events in client-server format.
- event_filter (Filter): Filter to apply to results or None
- Returns:
- dict: Pagination API results
- """
- user_id = requester.user.to_string()
-
- if pagin_config.from_token:
- room_token = pagin_config.from_token.room_key
- else:
- pagin_config.from_token = (
- yield self.hs.get_event_sources().get_current_token_for_room(
- room_id=room_id
- )
- )
- room_token = pagin_config.from_token.room_key
-
- room_token = RoomStreamToken.parse(room_token)
-
- pagin_config.from_token = pagin_config.from_token.copy_and_replace(
- "room_key", str(room_token)
- )
-
- source_config = pagin_config.get_source_config("room")
-
- with (yield self.pagination_lock.read(room_id)):
- membership, member_event_id = yield self._check_in_room_or_world_readable(
- room_id, user_id
- )
-
- if source_config.direction == 'b':
- # if we're going backwards, we might need to backfill. This
- # requires that we have a topo token.
- if room_token.topological:
- max_topo = room_token.topological
- else:
- max_topo = yield self.store.get_max_topological_token(
- room_id, room_token.stream
- )
-
- if membership == Membership.LEAVE:
- # If they have left the room then clamp the token to be before
- # they left the room, to save the effort of loading from the
- # database.
- leave_token = yield self.store.get_topological_token_for_event(
- member_event_id
- )
- leave_token = RoomStreamToken.parse(leave_token)
- if leave_token.topological < max_topo:
- source_config.from_key = str(leave_token)
-
- yield self.hs.get_handlers().federation_handler.maybe_backfill(
- room_id, max_topo
- )
-
- events, next_key = yield self.store.paginate_room_events(
- room_id=room_id,
- from_key=source_config.from_key,
- to_key=source_config.to_key,
- direction=source_config.direction,
- limit=source_config.limit,
- event_filter=event_filter,
- )
-
- next_token = pagin_config.from_token.copy_and_replace(
- "room_key", next_key
- )
-
- if not events:
- defer.returnValue({
- "chunk": [],
- "start": pagin_config.from_token.to_string(),
- "end": next_token.to_string(),
- })
-
- if event_filter:
- events = event_filter.filter(events)
-
- events = yield filter_events_for_client(
- self.store,
- user_id,
- events,
- is_peeking=(member_event_id is None),
- )
-
- time_now = self.clock.time_msec()
-
- chunk = {
- "chunk": [
- serialize_event(e, time_now, as_client_event)
- for e in events
- ],
- "start": pagin_config.from_token.to_string(),
- "end": next_token.to_string(),
- }
-
- defer.returnValue(chunk)
+ self.state = hs.get_state_handler()
+ self.store = hs.get_datastore()
@defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None,
@@ -286,12 +64,12 @@ class MessageHandler(BaseHandler):
Raises:
SynapseError if something went wrong.
"""
- membership, membership_event_id = yield self._check_in_room_or_world_readable(
+ membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership == Membership.JOIN:
- data = yield self.state_handler.get_current_state(
+ data = yield self.state.get_current_state(
room_id, event_type, state_key
)
elif membership == Membership.LEAVE:
@@ -304,31 +82,6 @@ class MessageHandler(BaseHandler):
defer.returnValue(data)
@defer.inlineCallbacks
- def _check_in_room_or_world_readable(self, room_id, user_id):
- try:
- # check_user_was_in_room will return the most recent membership
- # event for the user if:
- # * The user is a non-guest user, and was ever in the room
- # * The user is a guest user, and has joined the room
- # else it will throw.
- member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
- defer.returnValue((member_event.membership, member_event.event_id))
- return
- except AuthError:
- visibility = yield self.state_handler.get_current_state(
- room_id, EventTypes.RoomHistoryVisibility, ""
- )
- if (
- visibility and
- visibility.content["history_visibility"] == "world_readable"
- ):
- defer.returnValue((Membership.JOIN, None))
- return
- raise AuthError(
- 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
- )
-
- @defer.inlineCallbacks
def get_state_events(self, user_id, room_id, is_guest=False):
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
@@ -340,12 +93,12 @@ class MessageHandler(BaseHandler):
Returns:
A list of dicts representing state events. [{}, {}, {}]
"""
- membership, membership_event_id = yield self._check_in_room_or_world_readable(
+ membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership == Membership.JOIN:
- room_state = yield self.state_handler.get_current_state(room_id)
+ room_state = yield self.state.get_current_state(room_id)
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
[membership_event_id], None
@@ -373,7 +126,7 @@ class MessageHandler(BaseHandler):
if not requester.app_service:
# We check AS auth after fetching the room membership, as it
# requires us to pull out all joined members anyway.
- membership, _ = yield self._check_in_room_or_world_readable(
+ membership, _ = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership != Membership.JOIN:
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
new file mode 100644
index 0000000000..b2849783ed
--- /dev/null
+++ b/synapse/handlers/pagination.py
@@ -0,0 +1,265 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2017 - 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+from twisted.python.failure import Failure
+
+from synapse.api.constants import Membership
+from synapse.api.errors import SynapseError
+from synapse.events.utils import serialize_event
+from synapse.types import RoomStreamToken
+from synapse.util.async import ReadWriteLock
+from synapse.util.logcontext import run_in_background
+from synapse.util.stringutils import random_string
+from synapse.visibility import filter_events_for_client
+
+logger = logging.getLogger(__name__)
+
+
+class PurgeStatus(object):
+ """Object tracking the status of a purge request
+
+ This class contains information on the progress of a purge request, for
+ return by get_purge_status.
+
+ Attributes:
+ status (int): Tracks whether this request has completed. One of
+ STATUS_{ACTIVE,COMPLETE,FAILED}
+ """
+
+ STATUS_ACTIVE = 0
+ STATUS_COMPLETE = 1
+ STATUS_FAILED = 2
+
+ STATUS_TEXT = {
+ STATUS_ACTIVE: "active",
+ STATUS_COMPLETE: "complete",
+ STATUS_FAILED: "failed",
+ }
+
+ def __init__(self):
+ self.status = PurgeStatus.STATUS_ACTIVE
+
+ def asdict(self):
+ return {
+ "status": PurgeStatus.STATUS_TEXT[self.status]
+ }
+
+
+class PaginationHandler(object):
+ """Handles pagination and purge history requests.
+
+ These are in the same handler due to the fact we need to block clients
+ paginating during a purge.
+ """
+
+ def __init__(self, hs):
+ self.hs = hs
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+
+ self.pagination_lock = ReadWriteLock()
+ self._purges_in_progress_by_room = set()
+ # map from purge id to PurgeStatus
+ self._purges_by_id = {}
+
+ def start_purge_history(self, room_id, token,
+ delete_local_events=False):
+ """Start off a history purge on a room.
+
+ Args:
+ room_id (str): The room to purge from
+
+ token (str): topological token to delete events before
+ delete_local_events (bool): True to delete local events as well as
+ remote ones
+
+ Returns:
+ str: unique ID for this purge transaction.
+ """
+ if room_id in self._purges_in_progress_by_room:
+ raise SynapseError(
+ 400,
+ "History purge already in progress for %s" % (room_id, ),
+ )
+
+ purge_id = random_string(16)
+
+ # we log the purge_id here so that it can be tied back to the
+ # request id in the log lines.
+ logger.info("[purge] starting purge_id %s", purge_id)
+
+ self._purges_by_id[purge_id] = PurgeStatus()
+ run_in_background(
+ self._purge_history,
+ purge_id, room_id, token, delete_local_events,
+ )
+ return purge_id
+
+ @defer.inlineCallbacks
+ def _purge_history(self, purge_id, room_id, token,
+ delete_local_events):
+ """Carry out a history purge on a room.
+
+ Args:
+ purge_id (str): The id for this purge
+ room_id (str): The room to purge from
+ token (str): topological token to delete events before
+ delete_local_events (bool): True to delete local events as well as
+ remote ones
+
+ Returns:
+ Deferred
+ """
+ self._purges_in_progress_by_room.add(room_id)
+ try:
+ with (yield self.pagination_lock.write(room_id)):
+ yield self.store.purge_history(
+ room_id, token, delete_local_events,
+ )
+ logger.info("[purge] complete")
+ self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+ except Exception:
+ logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+ self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+ finally:
+ self._purges_in_progress_by_room.discard(room_id)
+
+ # remove the purge from the list 24 hours after it completes
+ def clear_purge():
+ del self._purges_by_id[purge_id]
+ self.hs.get_reactor().callLater(24 * 3600, clear_purge)
+
+ def get_purge_status(self, purge_id):
+ """Get the current status of an active purge
+
+ Args:
+ purge_id (str): purge_id returned by start_purge_history
+
+ Returns:
+ PurgeStatus|None
+ """
+ return self._purges_by_id.get(purge_id)
+
+ @defer.inlineCallbacks
+ def get_messages(self, requester, room_id=None, pagin_config=None,
+ as_client_event=True, event_filter=None):
+ """Get messages in a room.
+
+ Args:
+ requester (Requester): The user requesting messages.
+ room_id (str): The room they want messages from.
+ pagin_config (synapse.api.streams.PaginationConfig): The pagination
+ config rules to apply, if any.
+ as_client_event (bool): True to get events in client-server format.
+ event_filter (Filter): Filter to apply to results or None
+ Returns:
+ dict: Pagination API results
+ """
+ user_id = requester.user.to_string()
+
+ if pagin_config.from_token:
+ room_token = pagin_config.from_token.room_key
+ else:
+ pagin_config.from_token = (
+ yield self.hs.get_event_sources().get_current_token_for_room(
+ room_id=room_id
+ )
+ )
+ room_token = pagin_config.from_token.room_key
+
+ room_token = RoomStreamToken.parse(room_token)
+
+ pagin_config.from_token = pagin_config.from_token.copy_and_replace(
+ "room_key", str(room_token)
+ )
+
+ source_config = pagin_config.get_source_config("room")
+
+ with (yield self.pagination_lock.read(room_id)):
+ membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
+ room_id, user_id
+ )
+
+ if source_config.direction == 'b':
+ # if we're going backwards, we might need to backfill. This
+ # requires that we have a topo token.
+ if room_token.topological:
+ max_topo = room_token.topological
+ else:
+ max_topo = yield self.store.get_max_topological_token(
+ room_id, room_token.stream
+ )
+
+ if membership == Membership.LEAVE:
+ # If they have left the room then clamp the token to be before
+ # they left the room, to save the effort of loading from the
+ # database.
+ leave_token = yield self.store.get_topological_token_for_event(
+ member_event_id
+ )
+ leave_token = RoomStreamToken.parse(leave_token)
+ if leave_token.topological < max_topo:
+ source_config.from_key = str(leave_token)
+
+ yield self.hs.get_handlers().federation_handler.maybe_backfill(
+ room_id, max_topo
+ )
+
+ events, next_key = yield self.store.paginate_room_events(
+ room_id=room_id,
+ from_key=source_config.from_key,
+ to_key=source_config.to_key,
+ direction=source_config.direction,
+ limit=source_config.limit,
+ event_filter=event_filter,
+ )
+
+ next_token = pagin_config.from_token.copy_and_replace(
+ "room_key", next_key
+ )
+
+ if not events:
+ defer.returnValue({
+ "chunk": [],
+ "start": pagin_config.from_token.to_string(),
+ "end": next_token.to_string(),
+ })
+
+ if event_filter:
+ events = event_filter.filter(events)
+
+ events = yield filter_events_for_client(
+ self.store,
+ user_id,
+ events,
+ is_peeking=(member_event_id is None),
+ )
+
+ time_now = self.clock.time_msec()
+
+ chunk = {
+ "chunk": [
+ serialize_event(e, time_now, as_client_event)
+ for e in events
+ ],
+ "start": pagin_config.from_token.to_string(),
+ "end": next_token.to_string(),
+ }
+
+ defer.returnValue(chunk)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index f67512078b..6150b7e226 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -395,7 +395,11 @@ class RoomCreationHandler(BaseHandler):
)
-class RoomContextHandler(BaseHandler):
+class RoomContextHandler(object):
+ def __init__(self, hs):
+ self.hs = hs
+ self.store = hs.get_datastore()
+
@defer.inlineCallbacks
def get_event_context(self, user, room_id, event_id, limit):
"""Retrieves events, pagination tokens and state around a given event
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 9e9c175970..99f6c6e3c3 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -244,7 +244,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
hs (synapse.server.HomeServer)
"""
super(PurgeHistoryRestServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.pagination_handler = hs.get_pagination_handler()
self.store = hs.get_datastore()
@defer.inlineCallbacks
@@ -319,7 +319,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
errcode=Codes.BAD_JSON,
)
- purge_id = yield self.handlers.message_handler.start_purge_history(
+ purge_id = yield self.pagination_handler.start_purge_history(
room_id, token,
delete_local_events=delete_local_events,
)
@@ -341,7 +341,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
hs (synapse.server.HomeServer)
"""
super(PurgeHistoryStatusRestServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.pagination_handler = hs.get_pagination_handler()
@defer.inlineCallbacks
def on_GET(self, request, purge_id):
@@ -351,7 +351,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
if not is_admin:
raise AuthError(403, "You are not a server admin")
- purge_status = self.handlers.message_handler.get_purge_status(purge_id)
+ purge_status = self.pagination_handler.get_purge_status(purge_id)
if purge_status is None:
raise NotFoundError("purge id '%s' not found" % purge_id)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index b9512a2b61..b7bd878c90 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -90,6 +90,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
self.handlers = hs.get_handlers()
self.event_creation_hander = hs.get_event_creation_handler()
self.room_member_handler = hs.get_room_member_handler()
+ self.message_handler = hs.get_message_handler()
def register(self, http_server):
# /room/$roomid/state/$eventtype
@@ -124,7 +125,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
format = parse_string(request, "format", default="content",
allowed_values=["content", "event"])
- msg_handler = self.handlers.message_handler
+ msg_handler = self.message_handler
data = yield msg_handler.get_room_data(
user_id=requester.user.to_string(),
room_id=room_id,
@@ -377,14 +378,13 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomMemberListRestServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
# TODO support Pagination stream API (limit/tokens)
requester = yield self.auth.get_user_by_req(request)
- handler = self.handlers.message_handler
- events = yield handler.get_state_events(
+ events = yield self.message_handler.get_state_events(
room_id=room_id,
user_id=requester.user.to_string(),
)
@@ -406,7 +406,7 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(JoinedRoomMemberListRestServlet, self).__init__(hs)
- self.message_handler = hs.get_handlers().message_handler
+ self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
@@ -427,7 +427,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomMessageListRestServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.pagination_handler = hs.get_pagination_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
@@ -442,8 +442,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
event_filter = Filter(json.loads(filter_json))
else:
event_filter = None
- handler = self.handlers.message_handler
- msgs = yield handler.get_messages(
+ msgs = yield self.pagination_handler.get_messages(
room_id=room_id,
requester=requester,
pagin_config=pagination_config,
@@ -460,14 +459,13 @@ class RoomStateRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomStateRestServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
- handler = self.handlers.message_handler
# Get all the current state for this room
- events = yield handler.get_state_events(
+ events = yield self.message_handler.get_state_events(
room_id=room_id,
user_id=requester.user.to_string(),
is_guest=requester.is_guest,
@@ -525,7 +523,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomEventContextServlet, self).__init__(hs)
self.clock = hs.get_clock()
- self.handlers = hs.get_handlers()
+ self.room_context_handler = hs.get_room_context_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id, event_id):
@@ -533,7 +531,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
limit = parse_integer(request, "limit", default=10)
- results = yield self.handlers.room_context_handler.get_event_context(
+ results = yield self.room_context_handler.get_event_context(
requester.user,
room_id,
event_id,
diff --git a/synapse/server.py b/synapse/server.py
index fd4f992258..140be9ebe8 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -52,12 +52,13 @@ from synapse.handlers.e2e_keys import E2eKeysHandler
from synapse.handlers.events import EventHandler, EventStreamHandler
from synapse.handlers.groups_local import GroupsLocalHandler
from synapse.handlers.initial_sync import InitialSyncHandler
-from synapse.handlers.message import EventCreationHandler
+from synapse.handlers.message import EventCreationHandler, MessageHandler
+from synapse.handlers.pagination import PaginationHandler
from synapse.handlers.presence import PresenceHandler
from synapse.handlers.profile import ProfileHandler
from synapse.handlers.read_marker import ReadMarkerHandler
from synapse.handlers.receipts import ReceiptsHandler
-from synapse.handlers.room import RoomCreationHandler
+from synapse.handlers.room import RoomContextHandler, RoomCreationHandler
from synapse.handlers.room_list import RoomListHandler
from synapse.handlers.room_member import RoomMemberMasterHandler
from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
@@ -165,6 +166,9 @@ class HomeServer(object):
'federation_registry',
'server_notices_manager',
'server_notices_sender',
+ 'message_handler',
+ 'pagination_handler',
+ 'room_context_handler',
]
def __init__(self, hostname, reactor=None, **kwargs):
@@ -431,6 +435,15 @@ class HomeServer(object):
return WorkerServerNoticesSender(self)
return ServerNoticesSender(self)
+ def build_message_handler(self):
+ return MessageHandler(self)
+
+ def build_pagination_handler(self):
+ return PaginationHandler(self)
+
+ def build_room_context_handler(self):
+ return RoomContextHandler(self)
+
def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
diff --git a/synapse/state.py b/synapse/state.py
index 32125c95df..033f55d967 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -471,69 +471,39 @@ class StateResolutionHandler(object):
"Resolving state for %s with %d groups", room_id, len(state_groups_ids)
)
- # build a map from state key to the event_ids which set that state.
- # dict[(str, str), set[str])
- state = {}
+ # start by assuming we won't have any conflicted state, and build up the new
+ # state map by iterating through the state groups. If we discover a conflict,
+ # we give up and instead use `resolve_events_with_factory`.
+ #
+ # XXX: is this actually worthwhile, or should we just let
+ # resolve_events_with_factory do it?
+ new_state = {}
+ conflicted_state = False
for st in itervalues(state_groups_ids):
for key, e_id in iteritems(st):
- state.setdefault(key, set()).add(e_id)
-
- # build a map from state key to the event_ids which set that state,
- # including only those where there are state keys in conflict.
- conflicted_state = {
- k: list(v)
- for k, v in iteritems(state)
- if len(v) > 1
- }
+ if key in new_state:
+ conflicted_state = True
+ break
+ new_state[key] = e_id
+ if conflicted_state:
+ break
if conflicted_state:
logger.info("Resolving conflicted state for %r", room_id)
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_factory(
- list(state_groups_ids.values()),
+ list(itervalues(state_groups_ids)),
event_map=event_map,
state_map_factory=state_map_factory,
)
- else:
- new_state = {
- key: e_ids.pop() for key, e_ids in iteritems(state)
- }
- with Measure(self.clock, "state.create_group_ids"):
- # if the new state matches any of the input state groups, we can
- # use that state group again. Otherwise we will generate a state_id
- # which will be used as a cache key for future resolutions, but
- # not get persisted.
- state_group = None
- new_state_event_ids = frozenset(itervalues(new_state))
- for sg, events in iteritems(state_groups_ids):
- if new_state_event_ids == frozenset(e_id for e_id in events):
- state_group = sg
- break
+ # if the new state matches any of the input state groups, we can
+ # use that state group again. Otherwise we will generate a state_id
+ # which will be used as a cache key for future resolutions, but
+ # not get persisted.
- # TODO: We want to create a state group for this set of events, to
- # increase cache hits, but we need to make sure that it doesn't
- # end up as a prev_group without being added to the database
-
- prev_group = None
- delta_ids = None
- for old_group, old_ids in iteritems(state_groups_ids):
- if not set(new_state) - set(old_ids):
- n_delta_ids = {
- k: v
- for k, v in iteritems(new_state)
- if old_ids.get(k) != v
- }
- if not delta_ids or len(n_delta_ids) < len(delta_ids):
- prev_group = old_group
- delta_ids = n_delta_ids
-
- cache = _StateCacheEntry(
- state=new_state,
- state_group=state_group,
- prev_group=prev_group,
- delta_ids=delta_ids,
- )
+ with Measure(self.clock, "state.create_group_ids"):
+ cache = _make_state_cache_entry(new_state, state_groups_ids)
if self._state_cache is not None:
self._state_cache[group_names] = cache
@@ -541,6 +511,70 @@ class StateResolutionHandler(object):
defer.returnValue(cache)
+def _make_state_cache_entry(
+ new_state,
+ state_groups_ids,
+):
+ """Given a resolved state, and a set of input state groups, pick one to base
+ a new state group on (if any), and return an appropriately-constructed
+ _StateCacheEntry.
+
+ Args:
+ new_state (dict[(str, str), str]): resolved state map (mapping from
+ (type, state_key) to event_id)
+
+ state_groups_ids (dict[int, dict[(str, str), str]]):
+ map from state group id to the state in that state group
+ (where 'state' is a map from state key to event id)
+
+ Returns:
+ _StateCacheEntry
+ """
+ # if the new state matches any of the input state groups, we can
+ # use that state group again. Otherwise we will generate a state_id
+ # which will be used as a cache key for future resolutions, but
+ # not get persisted.
+
+ # first look for exact matches
+ new_state_event_ids = set(itervalues(new_state))
+ for sg, state in iteritems(state_groups_ids):
+ if len(new_state_event_ids) != len(state):
+ continue
+
+ old_state_event_ids = set(itervalues(state))
+ if new_state_event_ids == old_state_event_ids:
+ # got an exact match.
+ return _StateCacheEntry(
+ state=new_state,
+ state_group=sg,
+ )
+
+ # TODO: We want to create a state group for this set of events, to
+ # increase cache hits, but we need to make sure that it doesn't
+ # end up as a prev_group without being added to the database
+
+ # failing that, look for the closest match.
+ prev_group = None
+ delta_ids = None
+
+ for old_group, old_state in iteritems(state_groups_ids):
+ n_delta_ids = {
+ k: v
+ for k, v in iteritems(new_state)
+ if old_state.get(k) != v
+ }
+ if not delta_ids or len(n_delta_ids) < len(delta_ids):
+ prev_group = old_group
+ delta_ids = n_delta_ids
+
+ return _StateCacheEntry(
+ state=new_state,
+ state_group=None,
+ prev_group=prev_group,
+ delta_ids=delta_ids,
+ )
+
+
def _ordered_events(events):
def key_func(e):
return -int(e.depth), hashlib.sha1(e.event_id.encode()).hexdigest()
@@ -582,7 +616,7 @@ def _seperate(state_sets):
with them in different state sets.
Args:
- state_sets(list[dict[(str, str), str]]):
+ state_sets(iterable[dict[(str, str), str]]):
List of dicts of (type, state_key) -> event_id, which are the
different state groups to resolve.
@@ -596,10 +630,11 @@ def _seperate(state_sets):
conflicted_state is a dict mapping (type, state_key) to a set of
event ids for conflicted state keys.
"""
- unconflicted_state = dict(state_sets[0])
+ state_set_iterator = iter(state_sets)
+ unconflicted_state = dict(next(state_set_iterator))
conflicted_state = {}
- for state_set in state_sets[1:]:
+ for state_set in state_set_iterator:
for key, value in iteritems(state_set):
# Check if there is an unconflicted entry for the state key.
unconflicted_value = unconflicted_state.get(key)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index c2910094d0..19c05dc8d6 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -39,7 +39,7 @@ from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.logcontext import make_deferred_yieldable
+from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
@@ -147,7 +147,8 @@ class _EventPeristenceQueue(object):
# callbacks on the deferred.
try:
ret = yield per_item_callback(item)
- item.deferred.callback(ret)
+ with PreserveLoggingContext():
+ item.deferred.callback(ret)
except Exception:
item.deferred.errback()
finally:
@@ -417,18 +418,27 @@ class EventsStore(EventsWorkerStore):
logger.info(
"Calculating state delta for room %s", room_id,
)
- current_state = yield self._get_new_state_after_events(
- room_id,
- ev_ctx_rm,
- latest_event_ids,
- new_latest_event_ids,
- )
+
+ with Measure(
+ self._clock,
+ "persist_events.get_new_state_after_events",
+ ):
+ current_state = yield self._get_new_state_after_events(
+ room_id,
+ ev_ctx_rm,
+ latest_event_ids,
+ new_latest_event_ids,
+ )
+
if current_state is not None:
current_state_for_room[room_id] = current_state
- delta = yield self._calculate_state_delta(
- room_id, current_state,
- )
- if delta is not None:
+ with Measure(
+ self._clock,
+ "persist_events.calculate_state_delta",
+ ):
+ delta = yield self._calculate_state_delta(
+ room_id, current_state,
+ )
state_delta_for_room[room_id] = delta
yield self.runInteraction(
@@ -644,21 +654,14 @@ class EventsStore(EventsWorkerStore):
"""
existing_state = yield self.get_current_state_ids(room_id)
- existing_events = set(itervalues(existing_state))
- new_events = set(ev_id for ev_id in itervalues(current_state))
- changed_events = existing_events ^ new_events
-
- if not changed_events:
- return
-
to_delete = {
key: ev_id for key, ev_id in iteritems(existing_state)
- if ev_id in changed_events
+ if ev_id != current_state.get(key)
}
- events_to_insert = (new_events - existing_events)
+
to_insert = {
key: ev_id for key, ev_id in iteritems(current_state)
- if ev_id in events_to_insert
+ if ev_id != existing_state.get(key)
}
defer.returnValue((to_delete, to_insert))
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index af564b1b4e..6a5028961d 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -21,7 +21,6 @@ from canonicaljson import json
from twisted.internet import defer
-from synapse.api.constants import EventTypes
from synapse.push.baserules import list_with_base_rules
from synapse.storage.appservice import ApplicationServiceWorkerStore
from synapse.storage.pusher import PusherWorkerStore
@@ -250,18 +249,6 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
if uid in local_users_in_room:
user_ids.add(uid)
- forgotten = yield self.who_forgot_in_room(
- event.room_id, on_invalidate=cache_context.invalidate,
- )
-
- for row in forgotten:
- user_id = row["user_id"]
- event_id = row["event_id"]
-
- mem_id = current_state_ids.get((EventTypes.Member, user_id), None)
- if event_id == mem_id:
- user_ids.discard(user_id)
-
rules_by_user = yield self.bulk_get_push_rules(
user_ids, on_invalidate=cache_context.invalidate,
)
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index cc273a57b2..8443bd4c1b 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -233,7 +233,7 @@ class PusherStore(PusherWorkerStore):
)
if newly_inserted:
- self.runInteraction(
+ yield self.runInteraction(
"add_pusher",
self._invalidate_cache_and_stream,
self.get_if_user_has_pusher, (user_id,)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index a27702a7a0..01697ab2c9 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -461,18 +461,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
def _get_joined_hosts_cache(self, room_id):
return _JoinedHostsCache(self, room_id)
- @cached()
- def who_forgot_in_room(self, room_id):
- return self._simple_select_list(
- table="room_memberships",
- retcols=("user_id", "event_id"),
- keyvalues={
- "room_id": room_id,
- "forgotten": 1,
- },
- desc="who_forgot"
- )
-
class RoomMemberStore(RoomMemberWorkerStore):
def __init__(self, db_conn, hs):
@@ -581,9 +569,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
txn.execute(sql, (user_id, room_id))
txn.call_after(self.did_forget.invalidate, (user_id, room_id))
- self._invalidate_cache_and_stream(
- txn, self.who_forgot_in_room, (room_id,)
- )
return self.runInteraction("forget_membership", f)
@cachedInlineCallbacks(num_args=2)
diff --git a/synapse/visibility.py b/synapse/visibility.py
index ba0499a022..d4680863d3 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -24,7 +24,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.events.utils import prune_event
from synapse.types import get_domain_from_id
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
logger = logging.getLogger(__name__)
@@ -76,19 +75,6 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
types=types,
)
- forgotten = yield make_deferred_yieldable(defer.gatherResults([
- defer.maybeDeferred(
- preserve_fn(store.who_forgot_in_room),
- room_id,
- )
- for room_id in frozenset(e.room_id for e in events)
- ], consumeErrors=True))
-
- # Set of membership event_ids that have been forgotten
- event_id_forgotten = frozenset(
- row["event_id"] for rows in forgotten for row in rows
- )
-
ignore_dict_content = yield store.get_global_account_data_by_type_for_user(
"m.ignored_user_list", user_id,
)
@@ -177,10 +163,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
if membership is None:
membership_event = state.get((EventTypes.Member, user_id), None)
if membership_event:
- # XXX why do we do this?
- # https://github.com/matrix-org/synapse/issues/3350
- if membership_event.event_id not in event_id_forgotten:
- membership = membership_event.membership
+ membership = membership_event.membership
# if the user was a member of the room at the time of the event,
# they can see it.
|