diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 53213cdccf..8f8fd82eb0 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,7 +17,6 @@ from .register import RegistrationHandler
from .room import (
RoomCreationHandler, RoomContextHandler,
)
-from .room_member import RoomMemberHandler
from .message import MessageHandler
from .federation import FederationHandler
from .directory import DirectoryHandler
@@ -49,7 +48,6 @@ class Handlers(object):
self.registration_handler = RegistrationHandler(hs)
self.message_handler = MessageHandler(hs)
self.room_creation_handler = RoomCreationHandler(hs)
- self.room_member_handler = RoomMemberHandler(hs)
self.federation_handler = FederationHandler(hs)
self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index faa5609c0c..e089e66fde 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -158,7 +158,7 @@ class BaseHandler(object):
# homeserver.
requester = synapse.types.create_requester(
target_user, is_guest=True)
- handler = self.hs.get_handlers().room_member_handler
+ handler = self.hs.get_room_member_handler()
yield handler.update_membership(
requester,
target_user,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8832ba58bc..520612683e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2153,7 +2153,7 @@ class FederationHandler(BaseHandler):
raise e
yield self._check_signature(event, context)
- member_handler = self.hs.get_handlers().room_member_handler
+ member_handler = self.hs.get_room_member_handler()
yield member_handler.send_membership_event(None, event, context)
else:
destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
@@ -2197,7 +2197,7 @@ class FederationHandler(BaseHandler):
# TODO: Make sure the signatures actually are correct.
event.signatures.update(returned_invite.signatures)
- member_handler = self.hs.get_handlers().room_member_handler
+ member_handler = self.hs.get_room_member_handler()
yield member_handler.send_membership_event(None, event, context)
@defer.inlineCallbacks
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 9800e24453..c9c2879038 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -233,7 +233,7 @@ class ProfileHandler(BaseHandler):
)
for room_id in room_ids:
- handler = self.hs.get_handlers().room_member_handler
+ handler = self.hs.get_room_member_handler()
try:
# Assume the target_user isn't a guest,
# because we don't let guests set profile or avatar data.
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 6ab020bf41..6c425828c1 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -165,7 +165,7 @@ class RoomCreationHandler(BaseHandler):
creation_content = config.get("creation_content", {})
- room_member_handler = self.hs.get_handlers().room_member_handler
+ room_member_handler = self.hs.get_room_member_handler()
yield self._send_events_for_new_room(
requester,
@@ -224,7 +224,7 @@ class RoomCreationHandler(BaseHandler):
id_server = invite_3pid["id_server"]
address = invite_3pid["address"]
medium = invite_3pid["medium"]
- yield self.hs.get_handlers().room_member_handler.do_3pid_invite(
+ yield self.hs.get_room_member_handler().do_3pid_invite(
room_id,
requester.user,
medium,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 37dc5e99ab..7ecdf87246 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -30,24 +30,32 @@ from synapse.api.errors import AuthError, SynapseError, Codes
from synapse.types import UserID, RoomID
from synapse.util.async import Linearizer
from synapse.util.distributor import user_left_room, user_joined_room
-from ._base import BaseHandler
logger = logging.getLogger(__name__)
id_server_scheme = "https://"
-class RoomMemberHandler(BaseHandler):
+class RoomMemberHandler(object):
# TODO(paul): This handler currently contains a messy conflation of
# low-level API that works on UserID objects and so on, and REST-level
# API that takes ID strings and returns pagination chunks. These concerns
# ought to be separated out a lot better.
def __init__(self, hs):
- super(RoomMemberHandler, self).__init__(hs)
-
+ self.hs = hs
+ self.store = hs.get_datastore()
+ self.auth = hs.get_auth()
+ self.state_handler = hs.get_state_handler()
+ self.config = hs.config
+ self.simple_http_client = hs.get_simple_http_client()
+
+ self.federation_handler = hs.get_handlers().federation_handler
+ self.directory_handler = hs.get_handlers().directory_handler
+ self.registration_handler = hs.get_handlers().registration_handler
self.profile_handler = hs.get_profile_handler()
self.event_creation_hander = hs.get_event_creation_handler()
+ self.replication_layer = hs.get_replication_layer()
self.member_linearizer = Linearizer(name="member")
@@ -138,7 +146,7 @@ class RoomMemberHandler(BaseHandler):
# join dance for now, since we're kinda implicitly checking
# that we are allowed to join when we decide whether or not we
# need to do the invite/join dance.
- yield self.hs.get_handlers().federation_handler.do_invite_join(
+ yield self.federation_handler.do_invite_join(
remote_room_hosts,
room_id,
user.to_string(),
@@ -204,8 +212,7 @@ class RoomMemberHandler(BaseHandler):
# if this is a join with a 3pid signature, we may need to turn a 3pid
# invite into a normal invite before we can handle the join.
if third_party_signed is not None:
- replication = self.hs.get_replication_layer()
- yield replication.exchange_third_party_invite(
+ yield self.replication_layer.exchange_third_party_invite(
third_party_signed["sender"],
target.to_string(),
room_id,
@@ -226,7 +233,7 @@ class RoomMemberHandler(BaseHandler):
requester.user,
)
if not is_requester_admin:
- if self.hs.config.block_non_admin_invites:
+ if self.config.block_non_admin_invites:
logger.info(
"Blocking invite: user is not admin and non-admin "
"invites disabled"
@@ -321,7 +328,7 @@ class RoomMemberHandler(BaseHandler):
else:
# send the rejection to the inviter's HS.
remote_room_hosts = remote_room_hosts + [inviter.domain]
- fed_handler = self.hs.get_handlers().federation_handler
+ fed_handler = self.federation_handler
try:
ret = yield fed_handler.do_remotely_reject_invite(
remote_room_hosts,
@@ -477,7 +484,7 @@ class RoomMemberHandler(BaseHandler):
Raises:
SynapseError if room alias could not be found.
"""
- directory_handler = self.hs.get_handlers().directory_handler
+ directory_handler = self.directory_handler
mapping = yield directory_handler.get_association(room_alias)
if not mapping:
@@ -508,7 +515,7 @@ class RoomMemberHandler(BaseHandler):
requester,
txn_id
):
- if self.hs.config.block_non_admin_invites:
+ if self.config.block_non_admin_invites:
is_requester_admin = yield self.auth.is_server_admin(
requester.user,
)
@@ -555,7 +562,7 @@ class RoomMemberHandler(BaseHandler):
str: the matrix ID of the 3pid, or None if it is not recognized.
"""
try:
- data = yield self.hs.get_simple_http_client().get_json(
+ data = yield self.simple_http_client.get_json(
"%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,),
{
"medium": medium,
@@ -578,7 +585,7 @@ class RoomMemberHandler(BaseHandler):
if server_hostname not in data["signatures"]:
raise AuthError(401, "No signature from server %s" % (server_hostname,))
for key_name, signature in data["signatures"][server_hostname].items():
- key_data = yield self.hs.get_simple_http_client().get_json(
+ key_data = yield self.simple_http_client.get_json(
"%s%s/_matrix/identity/api/v1/pubkey/%s" %
(id_server_scheme, server_hostname, key_name,),
)
@@ -603,7 +610,7 @@ class RoomMemberHandler(BaseHandler):
user,
txn_id
):
- room_state = yield self.hs.get_state_handler().get_current_state(room_id)
+ room_state = yield self.state_handler.get_current_state(room_id)
inviter_display_name = ""
inviter_avatar_url = ""
@@ -727,15 +734,15 @@ class RoomMemberHandler(BaseHandler):
"sender_avatar_url": inviter_avatar_url,
}
- if self.hs.config.invite_3pid_guest:
- registration_handler = self.hs.get_handlers().registration_handler
+ if self.config.invite_3pid_guest:
+ registration_handler = self.registration_handler
guest_access_token = yield registration_handler.guest_access_token_for(
medium=medium,
address=address,
inviter_user_id=inviter_user_id,
)
- guest_user_info = yield self.hs.get_auth().get_user_by_access_token(
+ guest_user_info = yield self.auth.get_user_by_access_token(
guest_access_token
)
@@ -744,7 +751,7 @@ class RoomMemberHandler(BaseHandler):
"guest_user_id": guest_user_info["user"].to_string(),
})
- data = yield self.hs.get_simple_http_client().post_urlencoded_get_json(
+ data = yield self.simple_http_client.post_urlencoded_get_json(
is_url,
invite_config
)
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index de0b26f437..b1f64ef0d8 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -16,15 +16,13 @@
import logging
from synapse.api.constants import EventTypes
-from synapse.storage import DataStore
-from synapse.storage.event_federation import EventFederationStore
+from synapse.storage.event_federation import EventFederationWorkerStore
from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.events_worker import EventsWorkerStore
from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateGroupWorkerStore
-from synapse.storage.stream import StreamStore
-from synapse.storage.signatures import SignatureStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage.stream import StreamWorkerStore
+from synapse.storage.signatures import SignatureWorkerStore
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
@@ -40,107 +38,33 @@ logger = logging.getLogger(__name__)
# the method descriptor on the DataStore and chuck them into our class.
-class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
- EventsWorkerStore, StateGroupWorkerStore,
+class SlavedEventStore(EventFederationWorkerStore,
+ RoomMemberWorkerStore,
+ EventPushActionsWorkerStore,
+ StreamWorkerStore,
+ EventsWorkerStore,
+ StateGroupWorkerStore,
+ SignatureWorkerStore,
BaseSlavedStore):
def __init__(self, db_conn, hs):
- super(SlavedEventStore, self).__init__(db_conn, hs)
self._stream_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering",
)
self._backfill_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", step=-1
)
- events_max = self._stream_id_gen.get_current_token()
- event_cache_prefill, min_event_val = self._get_cache_dict(
- db_conn, "events",
- entity_column="room_id",
- stream_column="stream_ordering",
- max_value=events_max,
- )
- self._events_stream_cache = StreamChangeCache(
- "EventsRoomStreamChangeCache", min_event_val,
- prefilled_cache=event_cache_prefill,
- )
- self._membership_stream_cache = StreamChangeCache(
- "MembershipStreamChangeCache", events_max,
- )
- self.stream_ordering_month_ago = 0
- self._stream_order_on_start = self.get_room_max_stream_ordering()
+ super(SlavedEventStore, self).__init__(db_conn, hs)
# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.
- get_latest_event_ids_in_room = EventFederationStore.__dict__[
- "get_latest_event_ids_in_room"
- ]
-
- get_recent_event_ids_for_room = (
- StreamStore.__dict__["get_recent_event_ids_for_room"]
- )
- has_room_changed_since = DataStore.has_room_changed_since.__func__
-
- get_membership_changes_for_user = (
- DataStore.get_membership_changes_for_user.__func__
- )
- get_room_events_max_id = DataStore.get_room_events_max_id.__func__
- get_room_events_stream_for_room = (
- DataStore.get_room_events_stream_for_room.__func__
- )
- get_events_around = DataStore.get_events_around.__func__
-
- get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
- get_room_events_stream_for_rooms = (
- DataStore.get_room_events_stream_for_rooms.__func__
- )
- get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
-
- _set_before_and_after = staticmethod(DataStore._set_before_and_after)
-
- _get_events_around_txn = DataStore._get_events_around_txn.__func__
-
- get_backfill_events = DataStore.get_backfill_events.__func__
- _get_backfill_events = DataStore._get_backfill_events.__func__
- get_missing_events = DataStore.get_missing_events.__func__
- _get_missing_events = DataStore._get_missing_events.__func__
-
- get_auth_chain = DataStore.get_auth_chain.__func__
- get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__
- _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__
-
- get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__
-
- get_forward_extremeties_for_room = (
- DataStore.get_forward_extremeties_for_room.__func__
- )
- _get_forward_extremeties_for_room = (
- EventFederationStore.__dict__["_get_forward_extremeties_for_room"]
- )
-
- get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
-
- get_federation_out_pos = DataStore.get_federation_out_pos.__func__
- update_federation_out_pos = DataStore.update_federation_out_pos.__func__
-
- get_latest_event_ids_and_hashes_in_room = (
- DataStore.get_latest_event_ids_and_hashes_in_room.__func__
- )
- _get_latest_event_ids_and_hashes_in_room = (
- DataStore._get_latest_event_ids_and_hashes_in_room.__func__
- )
- _get_event_reference_hashes_txn = (
- DataStore._get_event_reference_hashes_txn.__func__
- )
- add_event_hashes = (
- DataStore.add_event_hashes.__func__
- )
- get_event_reference_hashes = (
- SignatureStore.__dict__["get_event_reference_hashes"]
- )
- get_event_reference_hash = (
- SignatureStore.__dict__["get_event_reference_hash"]
- )
+
+ def get_room_max_stream_ordering(self):
+ return self._stream_id_gen.get_current_token()
+
+ def get_room_min_stream_ordering(self):
+ return self._backfill_id_gen.get_current_token()
def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions()
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index f510384033..5ae1670157 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -14,32 +14,19 @@
# limitations under the License.
from ._base import BaseSlavedStore
-from synapse.storage import DataStore
-from synapse.storage.room import RoomStore
+from synapse.storage.room import RoomWorkerStore
from ._slaved_id_tracker import SlavedIdTracker
-class RoomStore(BaseSlavedStore):
+class RoomStore(RoomWorkerStore, BaseSlavedStore):
def __init__(self, db_conn, hs):
super(RoomStore, self).__init__(db_conn, hs)
self._public_room_id_gen = SlavedIdTracker(
db_conn, "public_room_list_stream", "stream_id"
)
- get_public_room_ids = DataStore.get_public_room_ids.__func__
- get_current_public_room_stream_id = (
- DataStore.get_current_public_room_stream_id.__func__
- )
- get_public_room_ids_at_stream_id = (
- RoomStore.__dict__["get_public_room_ids_at_stream_id"]
- )
- get_public_room_ids_at_stream_id_txn = (
- DataStore.get_public_room_ids_at_stream_id_txn.__func__
- )
- get_published_at_stream_id_txn = (
- DataStore.get_published_at_stream_id_txn.__func__
- )
- get_public_room_changes = DataStore.get_public_room_changes.__func__
+ def get_current_public_room_stream_id(self):
+ return self._public_room_id_gen.get_current_token()
def stream_positions(self):
result = super(RoomStore, self).stream_positions()
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 6073cc6fa2..3917eee42d 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -180,6 +180,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
self.handlers = hs.get_handlers()
self.state = hs.get_state_handler()
self.event_creation_handler = hs.get_event_creation_handler()
+ self.room_member_handler = hs.get_room_member_handler()
@defer.inlineCallbacks
def on_POST(self, request, room_id):
@@ -238,7 +239,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
logger.info("Kicking %r from %r...", user_id, room_id)
target_requester = create_requester(user_id)
- yield self.handlers.room_member_handler.update_membership(
+ yield self.room_member_handler.update_membership(
requester=target_requester,
target=target_requester.user,
room_id=room_id,
@@ -247,9 +248,9 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
ratelimit=False
)
- yield self.handlers.room_member_handler.forget(target_requester.user, room_id)
+ yield self.room_member_handler.forget(target_requester.user, room_id)
- yield self.handlers.room_member_handler.update_membership(
+ yield self.room_member_handler.update_membership(
requester=target_requester,
target=target_requester.user,
room_id=new_room_id,
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 817fd47842..9d745174c7 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -84,6 +84,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
super(RoomStateEventRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers()
self.event_creation_hander = hs.get_event_creation_handler()
+ self.room_member_handler = hs.get_room_member_handler()
def register(self, http_server):
# /room/$roomid/state/$eventtype
@@ -156,7 +157,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
if event_type == EventTypes.Member:
membership = content.get("membership", None)
- event = yield self.handlers.room_member_handler.update_membership(
+ event = yield self.room_member_handler.update_membership(
requester,
target=UserID.from_string(state_key),
room_id=room_id,
@@ -229,7 +230,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
class JoinRoomAliasServlet(ClientV1RestServlet):
def __init__(self, hs):
super(JoinRoomAliasServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.room_member_handler = hs.get_room_member_handler()
def register(self, http_server):
# /join/$room_identifier[/$txn_id]
@@ -257,7 +258,7 @@ class JoinRoomAliasServlet(ClientV1RestServlet):
except Exception:
remote_room_hosts = None
elif RoomAlias.is_valid(room_identifier):
- handler = self.handlers.room_member_handler
+ handler = self.room_member_handler
room_alias = RoomAlias.from_string(room_identifier)
room_id, remote_room_hosts = yield handler.lookup_room_alias(room_alias)
room_id = room_id.to_string()
@@ -266,7 +267,7 @@ class JoinRoomAliasServlet(ClientV1RestServlet):
room_identifier,
))
- yield self.handlers.room_member_handler.update_membership(
+ yield self.room_member_handler.update_membership(
requester=requester,
target=requester.user,
room_id=room_id,
@@ -562,7 +563,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
class RoomForgetRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomForgetRestServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.room_member_handler = hs.get_room_member_handler()
def register(self, http_server):
PATTERNS = ("/rooms/(?P<room_id>[^/]*)/forget")
@@ -575,7 +576,7 @@ class RoomForgetRestServlet(ClientV1RestServlet):
allow_guest=False,
)
- yield self.handlers.room_member_handler.forget(
+ yield self.room_member_handler.forget(
user=requester.user,
room_id=room_id,
)
@@ -593,7 +594,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomMembershipRestServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.room_member_handler = hs.get_room_member_handler()
def register(self, http_server):
# /rooms/$roomid/[invite|join|leave]
@@ -622,7 +623,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
content = {}
if membership_action == "invite" and self._has_3pid_invite_keys(content):
- yield self.handlers.room_member_handler.do_3pid_invite(
+ yield self.room_member_handler.do_3pid_invite(
room_id,
requester.user,
content["medium"],
@@ -644,7 +645,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
if 'reason' in content and membership_action in ['kick', 'ban']:
event_content = {'reason': content['reason']}
- yield self.handlers.room_member_handler.update_membership(
+ yield self.room_member_handler.update_membership(
requester=requester,
target=target,
room_id=room_id,
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index c6f4680a76..0ba62bddc1 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -183,7 +183,7 @@ class RegisterRestServlet(RestServlet):
self.auth_handler = hs.get_auth_handler()
self.registration_handler = hs.get_handlers().registration_handler
self.identity_handler = hs.get_handlers().identity_handler
- self.room_member_handler = hs.get_handlers().room_member_handler
+ self.room_member_handler = hs.get_room_member_handler()
self.device_handler = hs.get_device_handler()
self.macaroon_gen = hs.get_macaroon_generator()
diff --git a/synapse/server.py b/synapse/server.py
index fbd602d40e..5b6effbe31 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -45,6 +45,7 @@ from synapse.handlers.device import DeviceHandler
from synapse.handlers.e2e_keys import E2eKeysHandler
from synapse.handlers.presence import PresenceHandler
from synapse.handlers.room_list import RoomListHandler
+from synapse.handlers.room_member import RoomMemberHandler
from synapse.handlers.set_password import SetPasswordHandler
from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import TypingHandler
@@ -145,6 +146,7 @@ class HomeServer(object):
'groups_attestation_signing',
'groups_attestation_renewer',
'spam_checker',
+ 'room_member_handler',
]
def __init__(self, hostname, **kwargs):
@@ -382,6 +384,9 @@ class HomeServer(object):
def build_spam_checker(self):
return SpamChecker(self)
+ def build_room_member_handler(self):
+ return RoomMemberHandler(self)
+
def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 0f136f8a06..de00cae447 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -20,7 +20,6 @@ from synapse.storage.devices import DeviceStore
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
)
-from ._base import LoggingTransaction
from .directory import DirectoryStore
from .events import EventsStore
from .presence import PresenceStore, UserPresenceState
@@ -141,22 +140,6 @@ class DataStore(RoomMemberStore, RoomStore,
else:
self._cache_id_gen = None
- events_max = self._stream_id_gen.get_current_token()
- event_cache_prefill, min_event_val = self._get_cache_dict(
- db_conn, "events",
- entity_column="room_id",
- stream_column="stream_ordering",
- max_value=events_max,
- )
- self._events_stream_cache = StreamChangeCache(
- "EventsRoomStreamChangeCache", min_event_val,
- prefilled_cache=event_cache_prefill,
- )
-
- self._membership_stream_cache = StreamChangeCache(
- "MembershipStreamChangeCache", events_max,
- )
-
self._presence_on_startup = self._get_active_presence(db_conn)
presence_cache_prefill, min_presence_val = self._get_cache_dict(
@@ -204,6 +187,7 @@ class DataStore(RoomMemberStore, RoomStore,
"DeviceListFederationStreamChangeCache", device_list_max,
)
+ events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
db_conn, "current_state_delta_stream",
entity_column="room_id",
@@ -228,20 +212,6 @@ class DataStore(RoomMemberStore, RoomStore,
prefilled_cache=_group_updates_prefill,
)
- cur = LoggingTransaction(
- db_conn.cursor(),
- name="_find_stream_orderings_for_times_txn",
- database_engine=self.database_engine,
- after_callbacks=[],
- final_callbacks=[],
- )
- self._find_stream_orderings_for_times_txn(cur)
- cur.close()
-
- self.find_stream_orderings_looping_call = self._clock.looping_call(
- self._find_stream_orderings_for_times, 10 * 60 * 1000
- )
-
self._stream_order_on_start = self.get_room_max_stream_ordering()
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 55a05c59d5..00ee82d300 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -15,7 +15,10 @@
from twisted.internet import defer
-from ._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore
+from synapse.storage.events import EventsWorkerStore
+from synapse.storage.signatures import SignatureWorkerStore
+
from synapse.api.errors import StoreError
from synapse.util.caches.descriptors import cached
from unpaddedbase64 import encode_base64
@@ -27,30 +30,8 @@ from Queue import PriorityQueue, Empty
logger = logging.getLogger(__name__)
-class EventFederationStore(SQLBaseStore):
- """ Responsible for storing and serving up the various graphs associated
- with an event. Including the main event graph and the auth chains for an
- event.
-
- Also has methods for getting the front (latest) and back (oldest) edges
- of the event graphs. These are used to generate the parents for new events
- and backfilling from another server respectively.
- """
-
- EVENT_AUTH_STATE_ONLY = "event_auth_state_only"
-
- def __init__(self, db_conn, hs):
- super(EventFederationStore, self).__init__(db_conn, hs)
-
- self.register_background_update_handler(
- self.EVENT_AUTH_STATE_ONLY,
- self._background_delete_non_state_event_auth,
- )
-
- hs.get_clock().looping_call(
- self._delete_old_forward_extrem_cache, 60 * 60 * 1000
- )
-
+class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
+ SQLBaseStore):
def get_auth_chain(self, event_ids, include_given=False):
"""Get auth events for given event_ids. The events *must* be state events.
@@ -228,88 +209,6 @@ class EventFederationStore(SQLBaseStore):
return int(min_depth) if min_depth is not None else None
- def _update_min_depth_for_room_txn(self, txn, room_id, depth):
- min_depth = self._get_min_depth_interaction(txn, room_id)
-
- if min_depth and depth >= min_depth:
- return
-
- self._simple_upsert_txn(
- txn,
- table="room_depth",
- keyvalues={
- "room_id": room_id,
- },
- values={
- "min_depth": depth,
- },
- )
-
- def _handle_mult_prev_events(self, txn, events):
- """
- For the given event, update the event edges table and forward and
- backward extremities tables.
- """
- self._simple_insert_many_txn(
- txn,
- table="event_edges",
- values=[
- {
- "event_id": ev.event_id,
- "prev_event_id": e_id,
- "room_id": ev.room_id,
- "is_state": False,
- }
- for ev in events
- for e_id, _ in ev.prev_events
- ],
- )
-
- self._update_backward_extremeties(txn, events)
-
- def _update_backward_extremeties(self, txn, events):
- """Updates the event_backward_extremities tables based on the new/updated
- events being persisted.
-
- This is called for new events *and* for events that were outliers, but
- are now being persisted as non-outliers.
-
- Forward extremities are handled when we first start persisting the events.
- """
- events_by_room = {}
- for ev in events:
- events_by_room.setdefault(ev.room_id, []).append(ev)
-
- query = (
- "INSERT INTO event_backward_extremities (event_id, room_id)"
- " SELECT ?, ? WHERE NOT EXISTS ("
- " SELECT 1 FROM event_backward_extremities"
- " WHERE event_id = ? AND room_id = ?"
- " )"
- " AND NOT EXISTS ("
- " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
- " AND outlier = ?"
- " )"
- )
-
- txn.executemany(query, [
- (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
- for ev in events for e_id, _ in ev.prev_events
- if not ev.internal_metadata.is_outlier()
- ])
-
- query = (
- "DELETE FROM event_backward_extremities"
- " WHERE event_id = ? AND room_id = ?"
- )
- txn.executemany(
- query,
- [
- (ev.event_id, ev.room_id) for ev in events
- if not ev.internal_metadata.is_outlier()
- ]
- )
-
def get_forward_extremeties_for_room(self, room_id, stream_ordering):
"""For a given room_id and stream_ordering, return the forward
extremeties of the room at that point in "time".
@@ -371,28 +270,6 @@ class EventFederationStore(SQLBaseStore):
get_forward_extremeties_for_room_txn
)
- def _delete_old_forward_extrem_cache(self):
- def _delete_old_forward_extrem_cache_txn(txn):
- # Delete entries older than a month, while making sure we don't delete
- # the only entries for a room.
- sql = ("""
- DELETE FROM stream_ordering_to_exterm
- WHERE
- room_id IN (
- SELECT room_id
- FROM stream_ordering_to_exterm
- WHERE stream_ordering > ?
- ) AND stream_ordering < ?
- """)
- txn.execute(
- sql,
- (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
- )
- return self.runInteraction(
- "_delete_old_forward_extrem_cache",
- _delete_old_forward_extrem_cache_txn
- )
-
def get_backfill_events(self, room_id, event_list, limit):
"""Get a list of Events for a given topic that occurred before (and
including) the events in event_list. Return a list of max size `limit`
@@ -522,6 +399,135 @@ class EventFederationStore(SQLBaseStore):
return event_results
+
+class EventFederationStore(EventFederationWorkerStore):
+ """ Responsible for storing and serving up the various graphs associated
+ with an event. Including the main event graph and the auth chains for an
+ event.
+
+ Also has methods for getting the front (latest) and back (oldest) edges
+ of the event graphs. These are used to generate the parents for new events
+ and backfilling from another server respectively.
+ """
+
+ EVENT_AUTH_STATE_ONLY = "event_auth_state_only"
+
+ def __init__(self, db_conn, hs):
+ super(EventFederationStore, self).__init__(db_conn, hs)
+
+ self.register_background_update_handler(
+ self.EVENT_AUTH_STATE_ONLY,
+ self._background_delete_non_state_event_auth,
+ )
+
+ hs.get_clock().looping_call(
+ self._delete_old_forward_extrem_cache, 60 * 60 * 1000
+ )
+
+ def _update_min_depth_for_room_txn(self, txn, room_id, depth):
+ min_depth = self._get_min_depth_interaction(txn, room_id)
+
+ if min_depth and depth >= min_depth:
+ return
+
+ self._simple_upsert_txn(
+ txn,
+ table="room_depth",
+ keyvalues={
+ "room_id": room_id,
+ },
+ values={
+ "min_depth": depth,
+ },
+ )
+
+ def _handle_mult_prev_events(self, txn, events):
+ """
+ For the given event, update the event edges table and forward and
+ backward extremities tables.
+ """
+ self._simple_insert_many_txn(
+ txn,
+ table="event_edges",
+ values=[
+ {
+ "event_id": ev.event_id,
+ "prev_event_id": e_id,
+ "room_id": ev.room_id,
+ "is_state": False,
+ }
+ for ev in events
+ for e_id, _ in ev.prev_events
+ ],
+ )
+
+ self._update_backward_extremeties(txn, events)
+
+ def _update_backward_extremeties(self, txn, events):
+ """Updates the event_backward_extremities tables based on the new/updated
+ events being persisted.
+
+ This is called for new events *and* for events that were outliers, but
+ are now being persisted as non-outliers.
+
+ Forward extremities are handled when we first start persisting the events.
+ """
+ events_by_room = {}
+ for ev in events:
+ events_by_room.setdefault(ev.room_id, []).append(ev)
+
+ query = (
+ "INSERT INTO event_backward_extremities (event_id, room_id)"
+ " SELECT ?, ? WHERE NOT EXISTS ("
+ " SELECT 1 FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
+ " )"
+ " AND NOT EXISTS ("
+ " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+ " AND outlier = ?"
+ " )"
+ )
+
+ txn.executemany(query, [
+ (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+ for ev in events for e_id, _ in ev.prev_events
+ if not ev.internal_metadata.is_outlier()
+ ])
+
+ query = (
+ "DELETE FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
+ )
+ txn.executemany(
+ query,
+ [
+ (ev.event_id, ev.room_id) for ev in events
+ if not ev.internal_metadata.is_outlier()
+ ]
+ )
+
+ def _delete_old_forward_extrem_cache(self):
+ def _delete_old_forward_extrem_cache_txn(txn):
+ # Delete entries older than a month, while making sure we don't delete
+ # the only entries for a room.
+ sql = ("""
+ DELETE FROM stream_ordering_to_exterm
+ WHERE
+ room_id IN (
+ SELECT room_id
+ FROM stream_ordering_to_exterm
+ WHERE stream_ordering > ?
+ ) AND stream_ordering < ?
+ """)
+ txn.execute(
+ sql,
+ (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
+ )
+ return self.runInteraction(
+ "_delete_old_forward_extrem_cache",
+ _delete_old_forward_extrem_cache_txn
+ )
+
def clean_room_for_join(self, room_id):
return self.runInteraction(
"clean_room_for_join",
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 6454045c2d..7164293568 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, LoggingTransaction
from twisted.internet import defer
from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks
@@ -64,6 +64,27 @@ def _deserialize_action(actions, is_highlight):
class EventPushActionsWorkerStore(SQLBaseStore):
+ def __init__(self, db_conn, hs):
+ super(EventPushActionsWorkerStore, self).__init__(db_conn, hs)
+
+ # These get correctly set by _find_stream_orderings_for_times_txn
+ self.stream_ordering_month_ago = None
+ self.stream_ordering_day_ago = None
+
+ cur = LoggingTransaction(
+ db_conn.cursor(),
+ name="_find_stream_orderings_for_times_txn",
+ database_engine=self.database_engine,
+ after_callbacks=[],
+ final_callbacks=[],
+ )
+ self._find_stream_orderings_for_times_txn(cur)
+ cur.close()
+
+ self.find_stream_orderings_looping_call = self._clock.looping_call(
+ self._find_stream_orderings_for_times, 10 * 60 * 1000
+ )
+
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id
@@ -443,6 +464,69 @@ class EventPushActionsWorkerStore(SQLBaseStore):
desc="remove_push_actions_from_staging",
)
+ @defer.inlineCallbacks
+ def _find_stream_orderings_for_times(self):
+ yield self.runInteraction(
+ "_find_stream_orderings_for_times",
+ self._find_stream_orderings_for_times_txn
+ )
+
+ def _find_stream_orderings_for_times_txn(self, txn):
+ logger.info("Searching for stream ordering 1 month ago")
+ self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
+ txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
+ )
+ logger.info(
+ "Found stream ordering 1 month ago: it's %d",
+ self.stream_ordering_month_ago
+ )
+ logger.info("Searching for stream ordering 1 day ago")
+ self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
+ txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
+ )
+ logger.info(
+ "Found stream ordering 1 day ago: it's %d",
+ self.stream_ordering_day_ago
+ )
+
+ def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
+ """
+ Find the stream_ordering of the first event that was received after
+ a given timestamp. This is relatively slow as there is no index on
+ received_ts but we can then use this to delete push actions before
+ this.
+
+ received_ts must necessarily be in the same order as stream_ordering
+ and stream_ordering is indexed, so we manually binary search using
+ stream_ordering
+ """
+ txn.execute("SELECT MAX(stream_ordering) FROM events")
+ max_stream_ordering = txn.fetchone()[0]
+
+ if max_stream_ordering is None:
+ return 0
+
+ range_start = 0
+ range_end = max_stream_ordering
+
+ sql = (
+ "SELECT received_ts FROM events"
+ " WHERE stream_ordering > ?"
+ " ORDER BY stream_ordering"
+ " LIMIT 1"
+ )
+
+ while range_end - range_start > 1:
+ middle = int((range_end + range_start) / 2)
+ txn.execute(sql, (middle,))
+ middle_ts = txn.fetchone()[0]
+ if ts > middle_ts:
+ range_start = middle
+ else:
+ range_end = middle
+
+ return range_end
+
class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
@@ -651,69 +735,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
""", (room_id, user_id, stream_ordering))
@defer.inlineCallbacks
- def _find_stream_orderings_for_times(self):
- yield self.runInteraction(
- "_find_stream_orderings_for_times",
- self._find_stream_orderings_for_times_txn
- )
-
- def _find_stream_orderings_for_times_txn(self, txn):
- logger.info("Searching for stream ordering 1 month ago")
- self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
- txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
- )
- logger.info(
- "Found stream ordering 1 month ago: it's %d",
- self.stream_ordering_month_ago
- )
- logger.info("Searching for stream ordering 1 day ago")
- self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
- txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
- )
- logger.info(
- "Found stream ordering 1 day ago: it's %d",
- self.stream_ordering_day_ago
- )
-
- def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
- """
- Find the stream_ordering of the first event that was received after
- a given timestamp. This is relatively slow as there is no index on
- received_ts but we can then use this to delete push actions before
- this.
-
- received_ts must necessarily be in the same order as stream_ordering
- and stream_ordering is indexed, so we manually binary search using
- stream_ordering
- """
- txn.execute("SELECT MAX(stream_ordering) FROM events")
- max_stream_ordering = txn.fetchone()[0]
-
- if max_stream_ordering is None:
- return 0
-
- range_start = 0
- range_end = max_stream_ordering
-
- sql = (
- "SELECT received_ts FROM events"
- " WHERE stream_ordering > ?"
- " ORDER BY stream_ordering"
- " LIMIT 1"
- )
-
- while range_end - range_start > 1:
- middle = int((range_end + range_start) / 2)
- txn.execute(sql, (middle,))
- middle_ts = txn.fetchone()[0]
- if ts > middle_ts:
- range_start = middle
- else:
- range_end = middle
-
- return range_end
-
- @defer.inlineCallbacks
def _rotate_notifs(self):
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
return
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index fff6652e05..7f2c08d7a6 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -16,6 +16,7 @@
from twisted.internet import defer
from synapse.api.errors import StoreError
+from synapse.storage._base import SQLBaseStore
from synapse.storage.search import SearchStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
@@ -38,7 +39,126 @@ RatelimitOverride = collections.namedtuple(
)
-class RoomStore(SearchStore):
+class RoomWorkerStore(SQLBaseStore):
+ def get_public_room_ids(self):
+ return self._simple_select_onecol(
+ table="rooms",
+ keyvalues={
+ "is_public": True,
+ },
+ retcol="room_id",
+ desc="get_public_room_ids",
+ )
+
+ @cached(num_args=2, max_entries=100)
+ def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
+ """Get pulbic rooms for a particular list, or across all lists.
+
+ Args:
+ stream_id (int)
+ network_tuple (ThirdPartyInstanceID): The list to use (None, None)
+ means the main list, None means all lsits.
+ """
+ return self.runInteraction(
+ "get_public_room_ids_at_stream_id",
+ self.get_public_room_ids_at_stream_id_txn,
+ stream_id, network_tuple=network_tuple
+ )
+
+ def get_public_room_ids_at_stream_id_txn(self, txn, stream_id,
+ network_tuple):
+ return {
+ rm
+ for rm, vis in self.get_published_at_stream_id_txn(
+ txn, stream_id, network_tuple=network_tuple
+ ).items()
+ if vis
+ }
+
+ def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
+ if network_tuple:
+ # We want to get from a particular list. No aggregation required.
+
+ sql = ("""
+ SELECT room_id, visibility FROM public_room_list_stream
+ INNER JOIN (
+ SELECT room_id, max(stream_id) AS stream_id
+ FROM public_room_list_stream
+ WHERE stream_id <= ? %s
+ GROUP BY room_id
+ ) grouped USING (room_id, stream_id)
+ """)
+
+ if network_tuple.appservice_id is not None:
+ txn.execute(
+ sql % ("AND appservice_id = ? AND network_id = ?",),
+ (stream_id, network_tuple.appservice_id, network_tuple.network_id,)
+ )
+ else:
+ txn.execute(
+ sql % ("AND appservice_id IS NULL",),
+ (stream_id,)
+ )
+ return dict(txn)
+ else:
+ # We want to get from all lists, so we need to aggregate the results
+
+ logger.info("Executing full list")
+
+ sql = ("""
+ SELECT room_id, visibility
+ FROM public_room_list_stream
+ INNER JOIN (
+ SELECT
+ room_id, max(stream_id) AS stream_id, appservice_id,
+ network_id
+ FROM public_room_list_stream
+ WHERE stream_id <= ?
+ GROUP BY room_id, appservice_id, network_id
+ ) grouped USING (room_id, stream_id)
+ """)
+
+ txn.execute(
+ sql,
+ (stream_id,)
+ )
+
+ results = {}
+ # A room is visible if its visible on any list.
+ for room_id, visibility in txn:
+ results[room_id] = bool(visibility) or results.get(room_id, False)
+
+ return results
+
+ def get_public_room_changes(self, prev_stream_id, new_stream_id,
+ network_tuple):
+ def get_public_room_changes_txn(txn):
+ then_rooms = self.get_public_room_ids_at_stream_id_txn(
+ txn, prev_stream_id, network_tuple
+ )
+
+ now_rooms_dict = self.get_published_at_stream_id_txn(
+ txn, new_stream_id, network_tuple
+ )
+
+ now_rooms_visible = set(
+ rm for rm, vis in now_rooms_dict.items() if vis
+ )
+ now_rooms_not_visible = set(
+ rm for rm, vis in now_rooms_dict.items() if not vis
+ )
+
+ newly_visible = now_rooms_visible - then_rooms
+ newly_unpublished = now_rooms_not_visible & then_rooms
+
+ return newly_visible, newly_unpublished
+
+ return self.runInteraction(
+ "get_public_room_changes", get_public_room_changes_txn
+ )
+
+
+class RoomStore(RoomWorkerStore, SearchStore):
@defer.inlineCallbacks
def store_room(self, room_id, room_creator_user_id, is_public):
@@ -225,16 +345,6 @@ class RoomStore(SearchStore):
)
self.hs.get_notifier().on_new_replication_data()
- def get_public_room_ids(self):
- return self._simple_select_onecol(
- table="rooms",
- keyvalues={
- "is_public": True,
- },
- retcol="room_id",
- desc="get_public_room_ids",
- )
-
def get_room_count(self):
"""Retrieve a list of all rooms
"""
@@ -326,113 +436,6 @@ class RoomStore(SearchStore):
def get_current_public_room_stream_id(self):
return self._public_room_id_gen.get_current_token()
- @cached(num_args=2, max_entries=100)
- def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
- """Get pulbic rooms for a particular list, or across all lists.
-
- Args:
- stream_id (int)
- network_tuple (ThirdPartyInstanceID): The list to use (None, None)
- means the main list, None means all lsits.
- """
- return self.runInteraction(
- "get_public_room_ids_at_stream_id",
- self.get_public_room_ids_at_stream_id_txn,
- stream_id, network_tuple=network_tuple
- )
-
- def get_public_room_ids_at_stream_id_txn(self, txn, stream_id,
- network_tuple):
- return {
- rm
- for rm, vis in self.get_published_at_stream_id_txn(
- txn, stream_id, network_tuple=network_tuple
- ).items()
- if vis
- }
-
- def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
- if network_tuple:
- # We want to get from a particular list. No aggregation required.
-
- sql = ("""
- SELECT room_id, visibility FROM public_room_list_stream
- INNER JOIN (
- SELECT room_id, max(stream_id) AS stream_id
- FROM public_room_list_stream
- WHERE stream_id <= ? %s
- GROUP BY room_id
- ) grouped USING (room_id, stream_id)
- """)
-
- if network_tuple.appservice_id is not None:
- txn.execute(
- sql % ("AND appservice_id = ? AND network_id = ?",),
- (stream_id, network_tuple.appservice_id, network_tuple.network_id,)
- )
- else:
- txn.execute(
- sql % ("AND appservice_id IS NULL",),
- (stream_id,)
- )
- return dict(txn)
- else:
- # We want to get from all lists, so we need to aggregate the results
-
- logger.info("Executing full list")
-
- sql = ("""
- SELECT room_id, visibility
- FROM public_room_list_stream
- INNER JOIN (
- SELECT
- room_id, max(stream_id) AS stream_id, appservice_id,
- network_id
- FROM public_room_list_stream
- WHERE stream_id <= ?
- GROUP BY room_id, appservice_id, network_id
- ) grouped USING (room_id, stream_id)
- """)
-
- txn.execute(
- sql,
- (stream_id,)
- )
-
- results = {}
- # A room is visible if its visible on any list.
- for room_id, visibility in txn:
- results[room_id] = bool(visibility) or results.get(room_id, False)
-
- return results
-
- def get_public_room_changes(self, prev_stream_id, new_stream_id,
- network_tuple):
- def get_public_room_changes_txn(txn):
- then_rooms = self.get_public_room_ids_at_stream_id_txn(
- txn, prev_stream_id, network_tuple
- )
-
- now_rooms_dict = self.get_published_at_stream_id_txn(
- txn, new_stream_id, network_tuple
- )
-
- now_rooms_visible = set(
- rm for rm, vis in now_rooms_dict.items() if vis
- )
- now_rooms_not_visible = set(
- rm for rm, vis in now_rooms_dict.items() if not vis
- )
-
- newly_visible = now_rooms_visible - then_rooms
- newly_unpublished = now_rooms_not_visible & then_rooms
-
- return newly_visible, newly_unpublished
-
- return self.runInteraction(
- "get_public_room_changes", get_public_room_changes_txn
- )
-
def get_all_new_public_rooms(self, prev_id, current_id, limit):
def get_all_new_public_rooms(txn):
sql = ("""
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 67d5d9969a..9e6eaaa532 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -22,12 +22,12 @@ from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.util.caches.descriptors import cached, cachedList
-class SignatureStore(SQLBaseStore):
- """Persistence for event signatures and hashes"""
-
+class SignatureWorkerStore(SQLBaseStore):
@cached()
def get_event_reference_hash(self, event_id):
- return self._get_event_reference_hashes_txn(event_id)
+ # This is a dummy function to allow get_event_reference_hashes
+ # to use its cache
+ raise NotImplementedError()
@cachedList(cached_method_name="get_event_reference_hash",
list_name="event_ids", num_args=1)
@@ -74,6 +74,10 @@ class SignatureStore(SQLBaseStore):
txn.execute(query, (event_id, ))
return {k: v for k, v in txn}
+
+class SignatureStore(SignatureWorkerStore):
+ """Persistence for event signatures and hashes"""
+
def _store_event_reference_hashes_txn(self, txn, events):
"""Store a hash for a PDU
Args:
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 52bdce5be2..a2527d2a36 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -35,13 +35,17 @@ what sort order was used:
from twisted.internet import defer
-from ._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore
+from synapse.storage.events import EventsWorkerStore
+
from synapse.util.caches.descriptors import cached
from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken
+from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+import abc
import logging
@@ -143,81 +147,41 @@ def filter_to_clause(event_filter):
return " AND ".join(clauses), args
-class StreamStore(SQLBaseStore):
- @defer.inlineCallbacks
- def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
- # NB this lives here instead of appservice.py so we can reuse the
- # 'private' StreamToken class in this file.
- if limit:
- limit = max(limit, MAX_STREAM_SIZE)
- else:
- limit = MAX_STREAM_SIZE
+class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
+ """This is an abstract base class where subclasses must implement
+ `get_room_max_stream_ordering` and `get_room_min_stream_ordering`
+ which can be called in the initializer.
+ """
- # From and to keys should be integers from ordering.
- from_id = RoomStreamToken.parse_stream_token(from_key)
- to_id = RoomStreamToken.parse_stream_token(to_key)
+ __metaclass__ = abc.ABCMeta
- if from_key == to_key:
- defer.returnValue(([], to_key))
- return
+ def __init__(self, db_conn, hs):
+ super(StreamWorkerStore, self).__init__(db_conn, hs)
- # select all the events between from/to with a sensible limit
- sql = (
- "SELECT e.event_id, e.room_id, e.type, s.state_key, "
- "e.stream_ordering FROM events AS e "
- "LEFT JOIN state_events as s ON "
- "e.event_id = s.event_id "
- "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
- "ORDER BY stream_ordering ASC LIMIT %(limit)d "
- ) % {
- "limit": limit
- }
-
- def f(txn):
- # pull out all the events between the tokens
- txn.execute(sql, (from_id.stream, to_id.stream,))
- rows = self.cursor_to_dict(txn)
-
- # Logic:
- # - We want ALL events which match the AS room_id regex
- # - We want ALL events which match the rooms represented by the AS
- # room_alias regex
- # - We want ALL events for rooms that AS users have joined.
- # This is currently supported via get_app_service_rooms (which is
- # used for the Notifier listener rooms). We can't reasonably make a
- # SQL query for these room IDs, so we'll pull all the events between
- # from/to and filter in python.
- rooms_for_as = self._get_app_service_rooms_txn(txn, service)
- room_ids_for_as = [r.room_id for r in rooms_for_as]
-
- def app_service_interested(row):
- if row["room_id"] in room_ids_for_as:
- return True
-
- if row["type"] == EventTypes.Member:
- if service.is_interested_in_user(row.get("state_key")):
- return True
- return False
-
- return [r for r in rows if app_service_interested(r)]
-
- rows = yield self.runInteraction("get_appservice_room_stream", f)
-
- ret = yield self._get_events(
- [r["event_id"] for r in rows],
- get_prev_content=True
+ events_max = self.get_room_max_stream_ordering()
+ event_cache_prefill, min_event_val = self._get_cache_dict(
+ db_conn, "events",
+ entity_column="room_id",
+ stream_column="stream_ordering",
+ max_value=events_max,
+ )
+ self._events_stream_cache = StreamChangeCache(
+ "EventsRoomStreamChangeCache", min_event_val,
+ prefilled_cache=event_cache_prefill,
+ )
+ self._membership_stream_cache = StreamChangeCache(
+ "MembershipStreamChangeCache", events_max,
)
- self._set_before_and_after(ret, rows, topo_order=from_id is None)
+ self._stream_order_on_start = self.get_room_max_stream_ordering()
- if rows:
- key = "s%d" % max(r["stream_ordering"] for r in rows)
- else:
- # Assume we didn't get anything because there was nothing to
- # get.
- key = to_key
+ @abc.abstractmethod
+ def get_room_max_stream_ordering(self):
+ raise NotImplementedError()
- defer.returnValue((ret, key))
+ @abc.abstractmethod
+ def get_room_min_stream_ordering(self):
+ raise NotImplementedError()
@defer.inlineCallbacks
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
@@ -381,88 +345,6 @@ class StreamStore(SQLBaseStore):
defer.returnValue(ret)
@defer.inlineCallbacks
- def paginate_room_events(self, room_id, from_key, to_key=None,
- direction='b', limit=-1, event_filter=None):
- # Tokens really represent positions between elements, but we use
- # the convention of pointing to the event before the gap. Hence
- # we have a bit of asymmetry when it comes to equalities.
- args = [False, room_id]
- if direction == 'b':
- order = "DESC"
- bounds = upper_bound(
- RoomStreamToken.parse(from_key), self.database_engine
- )
- if to_key:
- bounds = "%s AND %s" % (bounds, lower_bound(
- RoomStreamToken.parse(to_key), self.database_engine
- ))
- else:
- order = "ASC"
- bounds = lower_bound(
- RoomStreamToken.parse(from_key), self.database_engine
- )
- if to_key:
- bounds = "%s AND %s" % (bounds, upper_bound(
- RoomStreamToken.parse(to_key), self.database_engine
- ))
-
- filter_clause, filter_args = filter_to_clause(event_filter)
-
- if filter_clause:
- bounds += " AND " + filter_clause
- args.extend(filter_args)
-
- if int(limit) > 0:
- args.append(int(limit))
- limit_str = " LIMIT ?"
- else:
- limit_str = ""
-
- sql = (
- "SELECT * FROM events"
- " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
- " ORDER BY topological_ordering %(order)s,"
- " stream_ordering %(order)s %(limit)s"
- ) % {
- "bounds": bounds,
- "order": order,
- "limit": limit_str
- }
-
- def f(txn):
- txn.execute(sql, args)
-
- rows = self.cursor_to_dict(txn)
-
- if rows:
- topo = rows[-1]["topological_ordering"]
- toke = rows[-1]["stream_ordering"]
- if direction == 'b':
- # Tokens are positions between events.
- # This token points *after* the last event in the chunk.
- # We need it to point to the event before it in the chunk
- # when we are going backwards so we subtract one from the
- # stream part.
- toke -= 1
- next_token = str(RoomStreamToken(topo, toke))
- else:
- # TODO (erikj): We should work out what to do here instead.
- next_token = to_key if to_key else from_key
-
- return rows, next_token,
-
- rows, token = yield self.runInteraction("paginate_room_events", f)
-
- events = yield self._get_events(
- [r["event_id"] for r in rows],
- get_prev_content=True
- )
-
- self._set_before_and_after(events, rows)
-
- defer.returnValue((events, token))
-
- @defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
rows, token = yield self.get_recent_event_ids_for_room(
room_id, limit, end_token, from_token
@@ -542,7 +424,7 @@ class StreamStore(SQLBaseStore):
`room_id` causes it to return the current room specific topological
token.
"""
- token = yield self._stream_id_gen.get_current_token()
+ token = yield self.get_room_max_stream_ordering()
if room_id is None:
defer.returnValue("s%d" % (token,))
else:
@@ -552,12 +434,6 @@ class StreamStore(SQLBaseStore):
)
defer.returnValue("t%d-%d" % (topo, token))
- def get_room_max_stream_ordering(self):
- return self._stream_id_gen.get_current_token()
-
- def get_room_min_stream_ordering(self):
- return self._backfill_id_gen.get_current_token()
-
def get_stream_token_for_event(self, event_id):
"""The stream token for an event
Args:
@@ -832,3 +708,168 @@ class StreamStore(SQLBaseStore):
def has_room_changed_since(self, room_id, stream_id):
return self._events_stream_cache.has_entity_changed(room_id, stream_id)
+
+
+class StreamStore(StreamWorkerStore):
+ def get_room_max_stream_ordering(self):
+ return self._stream_id_gen.get_current_token()
+
+ def get_room_min_stream_ordering(self):
+ return self._backfill_id_gen.get_current_token()
+
+ @defer.inlineCallbacks
+ def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
+ # NB this lives here instead of appservice.py so we can reuse the
+ # 'private' StreamToken class in this file.
+ if limit:
+ limit = max(limit, MAX_STREAM_SIZE)
+ else:
+ limit = MAX_STREAM_SIZE
+
+ # From and to keys should be integers from ordering.
+ from_id = RoomStreamToken.parse_stream_token(from_key)
+ to_id = RoomStreamToken.parse_stream_token(to_key)
+
+ if from_key == to_key:
+ defer.returnValue(([], to_key))
+ return
+
+ # select all the events between from/to with a sensible limit
+ sql = (
+ "SELECT e.event_id, e.room_id, e.type, s.state_key, "
+ "e.stream_ordering FROM events AS e "
+ "LEFT JOIN state_events as s ON "
+ "e.event_id = s.event_id "
+ "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
+ "ORDER BY stream_ordering ASC LIMIT %(limit)d "
+ ) % {
+ "limit": limit
+ }
+
+ def f(txn):
+ # pull out all the events between the tokens
+ txn.execute(sql, (from_id.stream, to_id.stream,))
+ rows = self.cursor_to_dict(txn)
+
+ # Logic:
+ # - We want ALL events which match the AS room_id regex
+ # - We want ALL events which match the rooms represented by the AS
+ # room_alias regex
+ # - We want ALL events for rooms that AS users have joined.
+ # This is currently supported via get_app_service_rooms (which is
+ # used for the Notifier listener rooms). We can't reasonably make a
+ # SQL query for these room IDs, so we'll pull all the events between
+ # from/to and filter in python.
+ rooms_for_as = self._get_app_service_rooms_txn(txn, service)
+ room_ids_for_as = [r.room_id for r in rooms_for_as]
+
+ def app_service_interested(row):
+ if row["room_id"] in room_ids_for_as:
+ return True
+
+ if row["type"] == EventTypes.Member:
+ if service.is_interested_in_user(row.get("state_key")):
+ return True
+ return False
+
+ return [r for r in rows if app_service_interested(r)]
+
+ rows = yield self.runInteraction("get_appservice_room_stream", f)
+
+ ret = yield self._get_events(
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
+
+ self._set_before_and_after(ret, rows, topo_order=from_id is None)
+
+ if rows:
+ key = "s%d" % max(r["stream_ordering"] for r in rows)
+ else:
+ # Assume we didn't get anything because there was nothing to
+ # get.
+ key = to_key
+
+ defer.returnValue((ret, key))
+
+ @defer.inlineCallbacks
+ def paginate_room_events(self, room_id, from_key, to_key=None,
+ direction='b', limit=-1, event_filter=None):
+ # Tokens really represent positions between elements, but we use
+ # the convention of pointing to the event before the gap. Hence
+ # we have a bit of asymmetry when it comes to equalities.
+ args = [False, room_id]
+ if direction == 'b':
+ order = "DESC"
+ bounds = upper_bound(
+ RoomStreamToken.parse(from_key), self.database_engine
+ )
+ if to_key:
+ bounds = "%s AND %s" % (bounds, lower_bound(
+ RoomStreamToken.parse(to_key), self.database_engine
+ ))
+ else:
+ order = "ASC"
+ bounds = lower_bound(
+ RoomStreamToken.parse(from_key), self.database_engine
+ )
+ if to_key:
+ bounds = "%s AND %s" % (bounds, upper_bound(
+ RoomStreamToken.parse(to_key), self.database_engine
+ ))
+
+ filter_clause, filter_args = filter_to_clause(event_filter)
+
+ if filter_clause:
+ bounds += " AND " + filter_clause
+ args.extend(filter_args)
+
+ if int(limit) > 0:
+ args.append(int(limit))
+ limit_str = " LIMIT ?"
+ else:
+ limit_str = ""
+
+ sql = (
+ "SELECT * FROM events"
+ " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
+ " ORDER BY topological_ordering %(order)s,"
+ " stream_ordering %(order)s %(limit)s"
+ ) % {
+ "bounds": bounds,
+ "order": order,
+ "limit": limit_str
+ }
+
+ def f(txn):
+ txn.execute(sql, args)
+
+ rows = self.cursor_to_dict(txn)
+
+ if rows:
+ topo = rows[-1]["topological_ordering"]
+ toke = rows[-1]["stream_ordering"]
+ if direction == 'b':
+ # Tokens are positions between events.
+ # This token points *after* the last event in the chunk.
+ # We need it to point to the event before it in the chunk
+ # when we are going backwards so we subtract one from the
+ # stream part.
+ toke -= 1
+ next_token = str(RoomStreamToken(topo, toke))
+ else:
+ # TODO (erikj): We should work out what to do here instead.
+ next_token = to_key if to_key else from_key
+
+ return rows, next_token,
+
+ rows, token = yield self.runInteraction("paginate_room_events", f)
+
+ events = yield self._get_events(
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
+
+ self._set_before_and_after(events, rows)
+
+ defer.returnValue((events, token))
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 94fa7cac98..a8dea15c1b 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -299,10 +299,6 @@ def preserve_fn(f):
Useful for wrapping functions that return a deferred which you don't yield
on.
"""
- def reset_context(result):
- LoggingContext.set_current_context(LoggingContext.sentinel)
- return result
-
def g(*args, **kwargs):
current = LoggingContext.current_context()
res = f(*args, **kwargs)
@@ -323,12 +319,11 @@ def preserve_fn(f):
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
- res.addBoth(reset_context)
+ res.addBoth(_set_context_cb, LoggingContext.sentinel)
return res
return g
-@defer.inlineCallbacks
def make_deferred_yieldable(deferred):
"""Given a deferred, make it follow the Synapse logcontext rules:
@@ -342,9 +337,16 @@ def make_deferred_yieldable(deferred):
(This is more-or-less the opposite operation to preserve_fn.)
"""
- with PreserveLoggingContext():
- r = yield deferred
- defer.returnValue(r)
+ if isinstance(deferred, defer.Deferred) and not deferred.called:
+ prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
+ deferred.addBoth(_set_context_cb, prev_context)
+ return deferred
+
+
+def _set_context_cb(result, context):
+ """A callback function which just sets the logging context"""
+ LoggingContext.set_current_context(context)
+ return result
# modules to ignore in `logcontext_tracer`
diff --git a/tests/rest/client/v1/test_typing.py b/tests/rest/client/v1/test_typing.py
index a269e6f56e..e46534cd35 100644
--- a/tests/rest/client/v1/test_typing.py
+++ b/tests/rest/client/v1/test_typing.py
@@ -95,7 +95,7 @@ class RoomTypingTestCase(RestTestCase):
else:
if remotedomains is not None:
remotedomains.add(member.domain)
- hs.get_handlers().room_member_handler.fetch_room_distributions_into = (
+ hs.get_room_member_handler().fetch_room_distributions_into = (
fetch_room_distributions_into
)
|