diff --git a/synapse/__init__.py b/synapse/__init__.py
index 25c10244d3..6bb5a8b24d 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -27,4 +27,4 @@ try:
except ImportError:
pass
-__version__ = "0.99.2"
+__version__ = "0.99.3"
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index f47c33a074..dc913feeee 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -102,46 +102,6 @@ class ThirdPartyEntityKind(object):
LOCATION = "location"
-class RoomVersions(object):
- V1 = "1"
- V2 = "2"
- V3 = "3"
- STATE_V2_TEST = "state-v2-test"
-
-
-class RoomDisposition(object):
- STABLE = "stable"
- UNSTABLE = "unstable"
-
-
-# the version we will give rooms which are created on this server
-DEFAULT_ROOM_VERSION = RoomVersions.V1
-
-# vdh-test-version is a placeholder to get room versioning support working and tested
-# until we have a working v2.
-KNOWN_ROOM_VERSIONS = {
- RoomVersions.V1,
- RoomVersions.V2,
- RoomVersions.V3,
- RoomVersions.STATE_V2_TEST,
- RoomVersions.V3,
-}
-
-
-class EventFormatVersions(object):
- """This is an internal enum for tracking the version of the event format,
- independently from the room version.
- """
- V1 = 1
- V2 = 2
-
-
-KNOWN_EVENT_FORMAT_VERSIONS = {
- EventFormatVersions.V1,
- EventFormatVersions.V2,
-}
-
-
ServerNoticeMsgType = "m.server_notice"
ServerNoticeLimitReached = "m.server_notice.usage_limit_reached"
diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py
new file mode 100644
index 0000000000..e77abe1040
--- /dev/null
+++ b/synapse/api/room_versions.py
@@ -0,0 +1,91 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import attr
+
+
+class EventFormatVersions(object):
+ """This is an internal enum for tracking the version of the event format,
+ independently from the room version.
+ """
+ V1 = 1 # $id:server format
+ V2 = 2 # MSC1659-style $hash format: introduced for room v3
+
+
+KNOWN_EVENT_FORMAT_VERSIONS = {
+ EventFormatVersions.V1,
+ EventFormatVersions.V2,
+}
+
+
+class StateResolutionVersions(object):
+ """Enum to identify the state resolution algorithms"""
+ V1 = 1 # room v1 state res
+ V2 = 2 # MSC1442 state res: room v2 and later
+
+
+class RoomDisposition(object):
+ STABLE = "stable"
+ UNSTABLE = "unstable"
+
+
+@attr.s(slots=True, frozen=True)
+class RoomVersion(object):
+ """An object which describes the unique attributes of a room version."""
+
+ identifier = attr.ib() # str; the identifier for this version
+ disposition = attr.ib() # str; one of the RoomDispositions
+ event_format = attr.ib() # int; one of the EventFormatVersions
+ state_res = attr.ib() # int; one of the StateResolutionVersions
+
+
+class RoomVersions(object):
+ V1 = RoomVersion(
+ "1",
+ RoomDisposition.STABLE,
+ EventFormatVersions.V1,
+ StateResolutionVersions.V1,
+ )
+ STATE_V2_TEST = RoomVersion(
+ "state-v2-test",
+ RoomDisposition.UNSTABLE,
+ EventFormatVersions.V1,
+ StateResolutionVersions.V2,
+ )
+ V2 = RoomVersion(
+ "2",
+ RoomDisposition.STABLE,
+ EventFormatVersions.V1,
+ StateResolutionVersions.V2,
+ )
+ V3 = RoomVersion(
+ "3",
+ RoomDisposition.STABLE,
+ EventFormatVersions.V2,
+ StateResolutionVersions.V2,
+ )
+
+
+# the version we will give rooms which are created on this server
+DEFAULT_ROOM_VERSION = RoomVersions.V1
+
+
+KNOWN_ROOM_VERSIONS = {
+ v.identifier: v for v in (
+ RoomVersions.V1,
+ RoomVersions.V2,
+ RoomVersions.V3,
+ RoomVersions.STATE_V2_TEST,
+ )
+} # type: dict[str, RoomVersion]
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 9711a7147c..1d43f2b075 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -38,7 +38,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.replication.tcp.streams import ReceiptsStream
+from synapse.replication.tcp.streams._base import ReceiptsStream
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.types import ReadReceipt
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 9163b56d86..5388def28a 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -48,6 +48,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.streams.events import EventsStreamEventRow
from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
@@ -369,7 +370,9 @@ class SyncReplicationHandler(ReplicationClientHandler):
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
for row in rows:
- event = yield self.store.get_event(row.event_id)
+ if row.type != EventsStreamEventRow.TypeId:
+ continue
+ event = yield self.store.get_event(row.data.event_id)
extra_users = ()
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index d1ab9512cd..355f5aa71d 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -36,6 +36,10 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.streams.events import (
+ EventsStream,
+ EventsStreamCurrentStateRow,
+)
from synapse.rest.client.v2_alpha import user_directory
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -73,19 +77,18 @@ class UserDirectorySlaveStore(
prefilled_cache=curr_state_delta_prefill,
)
- self._current_state_delta_pos = events_max
-
def stream_positions(self):
result = super(UserDirectorySlaveStore, self).stream_positions()
- result["current_state_deltas"] = self._current_state_delta_pos
return result
def process_replication_rows(self, stream_name, token, rows):
- if stream_name == "current_state_deltas":
- self._current_state_delta_pos = token
+ if stream_name == EventsStream.NAME:
+ self._stream_id_gen.advance(token)
for row in rows:
+ if row.type != EventsStreamCurrentStateRow.TypeId:
+ continue
self._curr_state_delta_stream_cache.entity_has_changed(
- row.room_id, token
+ row.data.room_id, token
)
return super(UserDirectorySlaveStore, self).process_replication_rows(
stream_name, token, rows
@@ -170,7 +173,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
yield super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows
)
- if stream_name == "current_state_deltas":
+ if stream_name == EventsStream.NAME:
run_in_background(self._notify_directory)
@defer.inlineCallbacks
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index 8f9e330da5..203490fc36 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -20,15 +20,9 @@ from signedjson.key import decode_verify_key_bytes
from signedjson.sign import SignatureVerifyException, verify_signed_json
from unpaddedbase64 import decode_base64
-from synapse.api.constants import (
- KNOWN_ROOM_VERSIONS,
- EventFormatVersions,
- EventTypes,
- JoinRules,
- Membership,
- RoomVersions,
-)
+from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, EventSizeError, SynapseError
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions
from synapse.types import UserID, get_domain_from_id
logger = logging.getLogger(__name__)
@@ -452,16 +446,18 @@ def check_redaction(room_version, event, auth_events):
if user_level >= redact_level:
return False
- if room_version in (RoomVersions.V1, RoomVersions.V2,):
+ v = KNOWN_ROOM_VERSIONS.get(room_version)
+ if not v:
+ raise RuntimeError("Unrecognized room version %r" % (room_version,))
+
+ if v.event_format == EventFormatVersions.V1:
redacter_domain = get_domain_from_id(event.event_id)
redactee_domain = get_domain_from_id(event.redacts)
if redacter_domain == redactee_domain:
return True
- elif room_version == RoomVersions.V3:
+ else:
event.internal_metadata.recheck_redaction = True
return True
- else:
- raise RuntimeError("Unrecognized room version %r" % (room_version,))
raise AuthError(
403,
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index fafa135182..12056d5be2 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -21,7 +21,7 @@ import six
from unpaddedbase64 import encode_base64
-from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventFormatVersions, RoomVersions
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
@@ -351,18 +351,13 @@ def room_version_to_event_format(room_version):
Returns:
int
"""
- if room_version not in KNOWN_ROOM_VERSIONS:
+ v = KNOWN_ROOM_VERSIONS.get(room_version)
+
+ if not v:
# We should have already checked version, so this should not happen
raise RuntimeError("Unrecognized room version %s" % (room_version,))
- if room_version in (
- RoomVersions.V1, RoomVersions.V2, RoomVersions.STATE_V2_TEST,
- ):
- return EventFormatVersions.V1
- elif room_version in (RoomVersions.V3,):
- return EventFormatVersions.V2
- else:
- raise RuntimeError("Unrecognized room version %s" % (room_version,))
+ return v.event_format
def event_type_from_format_version(format_version):
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 06e01be918..fba27177c7 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -17,21 +17,17 @@ import attr
from twisted.internet import defer
-from synapse.api.constants import (
+from synapse.api.constants import MAX_DEPTH
+from synapse.api.room_versions import (
KNOWN_EVENT_FORMAT_VERSIONS,
KNOWN_ROOM_VERSIONS,
- MAX_DEPTH,
EventFormatVersions,
)
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.types import EventID
from synapse.util.stringutils import random_string
-from . import (
- _EventInternalMetadata,
- event_type_from_format_version,
- room_version_to_event_format,
-)
+from . import _EventInternalMetadata, event_type_from_format_version
@attr.s(slots=True, cmp=False, frozen=True)
@@ -170,21 +166,34 @@ class EventBuilderFactory(object):
def new(self, room_version, key_values):
"""Generate an event builder appropriate for the given room version
+ Deprecated: use for_room_version with a RoomVersion object instead
+
Args:
- room_version (str): Version of the room that we're creating an
- event builder for
+ room_version (str): Version of the room that we're creating an event builder
+ for
key_values (dict): Fields used as the basis of the new event
Returns:
EventBuilder
"""
-
- # There's currently only the one event version defined
- if room_version not in KNOWN_ROOM_VERSIONS:
+ v = KNOWN_ROOM_VERSIONS.get(room_version)
+ if not v:
raise Exception(
"No event format defined for version %r" % (room_version,)
)
+ return self.for_room_version(v, key_values)
+ def for_room_version(self, room_version, key_values):
+ """Generate an event builder appropriate for the given room version
+
+ Args:
+ room_version (synapse.api.room_versions.RoomVersion):
+ Version of the room that we're creating an event builder for
+ key_values (dict): Fields used as the basis of the new event
+
+ Returns:
+ EventBuilder
+ """
return EventBuilder(
store=self.store,
state=self.state,
@@ -192,7 +201,7 @@ class EventBuilderFactory(object):
clock=self.clock,
hostname=self.hostname,
signing_key=self.signing_key,
- format_version=room_version_to_event_format(room_version),
+ format_version=room_version.event_format,
type=key_values["type"],
state_key=key_values.get("state_key"),
room_id=key_values["room_id"],
@@ -222,7 +231,6 @@ def create_local_event_from_event_dict(clock, hostname, signing_key,
FrozenEvent
"""
- # There's currently only the one event version defined
if format_version not in KNOWN_EVENT_FORMAT_VERSIONS:
raise Exception(
"No event format defined for version %r" % (format_version,)
diff --git a/synapse/events/validator.py b/synapse/events/validator.py
index a072674b02..514273c792 100644
--- a/synapse/events/validator.py
+++ b/synapse/events/validator.py
@@ -15,8 +15,9 @@
from six import string_types
-from synapse.api.constants import EventFormatVersions, EventTypes, Membership
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
+from synapse.api.room_versions import EventFormatVersions
from synapse.types import EventID, RoomID, UserID
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index a7a2ec4523..dfe6b4aa5c 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -20,8 +20,9 @@ import six
from twisted.internet import defer
from twisted.internet.defer import DeferredList
-from synapse.api.constants import MAX_DEPTH, EventTypes, Membership, RoomVersions
+from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions
from synapse.crypto.event_signing import check_event_content_hash
from synapse.events import event_type_from_format_version
from synapse.events.utils import prune_event
@@ -274,9 +275,12 @@ def _check_sigs_on_pdus(keyring, room_version, pdus):
# now let's look for events where the sender's domain is different to the
# event id's domain (normally only the case for joins/leaves), and add additional
# checks. Only do this if the room version has a concept of event ID domain
- if room_version in (
- RoomVersions.V1, RoomVersions.V2, RoomVersions.STATE_V2_TEST,
- ):
+ # (ie, the room version uses old-style non-hash event IDs).
+ v = KNOWN_ROOM_VERSIONS.get(room_version)
+ if not v:
+ raise RuntimeError("Unrecognized room version %s" % (room_version,))
+
+ if v.event_format == EventFormatVersions.V1:
pdus_to_check_event_id = [
p for p in pdus_to_check
if p.sender_domain != get_domain_from_id(p.pdu.event_id)
@@ -289,10 +293,6 @@ def _check_sigs_on_pdus(keyring, room_version, pdus):
for p, d in zip(pdus_to_check_event_id, more_deferreds):
p.deferreds.append(d)
- elif room_version in (RoomVersions.V3,):
- pass # No further checks needed, as event IDs are hashes here
- else:
- raise RuntimeError("Unrecognized room version %s" % (room_version,))
# replace lists of deferreds with single Deferreds
return [_flatten_deferred_list(p.deferreds) for p in pdus_to_check]
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 58e04d81ab..f3fc897a0a 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -25,12 +25,7 @@ from prometheus_client import Counter
from twisted.internet import defer
-from synapse.api.constants import (
- KNOWN_ROOM_VERSIONS,
- EventTypes,
- Membership,
- RoomVersions,
-)
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import (
CodeMessageException,
Codes,
@@ -38,6 +33,11 @@ from synapse.api.errors import (
HttpResponseException,
SynapseError,
)
+from synapse.api.room_versions import (
+ KNOWN_ROOM_VERSIONS,
+ EventFormatVersions,
+ RoomVersions,
+)
from synapse.events import builder, room_version_to_event_format
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.util import logcontext, unwrapFirstError
@@ -570,7 +570,7 @@ class FederationClient(FederationBase):
Deferred[tuple[str, FrozenEvent, int]]: resolves to a tuple of
`(origin, event, event_format)` where origin is the remote
homeserver which generated the event, and event_format is one of
- `synapse.api.constants.EventFormatVersions`.
+ `synapse.api.room_versions.EventFormatVersions`.
Fails with a ``SynapseError`` if the chosen remote server
returns a 300/400 code.
@@ -592,7 +592,7 @@ class FederationClient(FederationBase):
# Note: If not supplied, the room version may be either v1 or v2,
# however either way the event format version will be v1.
- room_version = ret.get("room_version", RoomVersions.V1)
+ room_version = ret.get("room_version", RoomVersions.V1.identifier)
event_format = room_version_to_event_format(room_version)
pdu_dict = ret.get("event", None)
@@ -695,7 +695,9 @@ class FederationClient(FederationBase):
room_version = None
for e in state:
if (e.type, e.state_key) == (EventTypes.Create, ""):
- room_version = e.content.get("room_version", RoomVersions.V1)
+ room_version = e.content.get(
+ "room_version", RoomVersions.V1.identifier
+ )
break
if room_version is None:
@@ -802,11 +804,10 @@ class FederationClient(FederationBase):
raise err
# Otherwise, we assume that the remote server doesn't understand
- # the v2 invite API.
-
- if room_version in (RoomVersions.V1, RoomVersions.V2):
- pass # We'll fall through
- else:
+ # the v2 invite API. That's ok provided the room uses old-style event
+ # IDs.
+ v = KNOWN_ROOM_VERSIONS.get(room_version)
+ if v.event_format != EventFormatVersions.V1:
raise SynapseError(
400,
"User's homeserver does not support this room version",
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 81f3b4b1ff..df60828dba 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -25,7 +25,7 @@ from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
from twisted.python import failure
-from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import (
AuthError,
Codes,
@@ -34,6 +34,7 @@ from synapse.api.errors import (
NotFoundError,
SynapseError,
)
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.crypto.event_signing import compute_event_signature
from synapse.events import room_version_to_event_format
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 04d04a4457..0240b339b0 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -55,7 +55,12 @@ class FederationRemoteSendQueue(object):
self.is_mine_id = hs.is_mine_id
self.presence_map = {} # Pending presence map user_id -> UserPresenceState
- self.presence_changed = SortedDict() # Stream position -> user_id
+ self.presence_changed = SortedDict() # Stream position -> list[user_id]
+
+ # Stores the destinations we need to explicitly send presence to about a
+ # given user.
+ # Stream position -> (user_id, destinations)
+ self.presence_destinations = SortedDict()
self.keyed_edu = {} # (destination, key) -> EDU
self.keyed_edu_changed = SortedDict() # stream position -> (destination, key)
@@ -77,7 +82,7 @@ class FederationRemoteSendQueue(object):
for queue_name in [
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
- "edus", "device_messages", "pos_time",
+ "edus", "device_messages", "pos_time", "presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
@@ -121,6 +126,15 @@ class FederationRemoteSendQueue(object):
for user_id in uids
)
+ keys = self.presence_destinations.keys()
+ i = self.presence_destinations.bisect_left(position_to_delete)
+ for key in keys[:i]:
+ del self.presence_destinations[key]
+
+ user_ids.update(
+ user_id for user_id, _ in self.presence_destinations.values()
+ )
+
to_del = [
user_id for user_id in self.presence_map if user_id not in user_ids
]
@@ -209,6 +223,20 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
+ def send_presence_to_destinations(self, states, destinations):
+ """As per FederationSender
+
+ Args:
+ states (list[UserPresenceState])
+ destinations (list[str])
+ """
+ for state in states:
+ pos = self._next_pos()
+ self.presence_map.update({state.user_id: state for state in states})
+ self.presence_destinations[pos] = (state.user_id, destinations)
+
+ self.notifier.on_new_replication_data()
+
def send_device_messages(self, destination):
"""As per FederationSender"""
pos = self._next_pos()
@@ -261,6 +289,16 @@ class FederationRemoteSendQueue(object):
state=self.presence_map[user_id],
)))
+ # Fetch presence to send to destinations
+ i = self.presence_destinations.bisect_right(from_token)
+ j = self.presence_destinations.bisect_right(to_token) + 1
+
+ for pos, (user_id, dests) in self.presence_destinations.items()[i:j]:
+ rows.append((pos, PresenceDestinationsRow(
+ state=self.presence_map[user_id],
+ destinations=list(dests),
+ )))
+
# Fetch changes keyed edus
i = self.keyed_edu_changed.bisect_right(from_token)
j = self.keyed_edu_changed.bisect_right(to_token) + 1
@@ -357,6 +395,29 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
buff.presence.append(self.state)
+class PresenceDestinationsRow(BaseFederationRow, namedtuple("PresenceDestinationsRow", (
+ "state", # UserPresenceState
+ "destinations", # list[str]
+))):
+ TypeId = "pd"
+
+ @staticmethod
+ def from_data(data):
+ return PresenceDestinationsRow(
+ state=UserPresenceState.from_dict(data["state"]),
+ destinations=data["dests"],
+ )
+
+ def to_data(self):
+ return {
+ "state": self.state.as_dict(),
+ "dests": self.destinations,
+ }
+
+ def add_to_buffer(self, buff):
+ buff.presence_destinations.append((self.state, self.destinations))
+
+
class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
"key", # tuple(str) - the edu key passed to send_edu
"edu", # Edu
@@ -428,6 +489,7 @@ TypeToRow = {
Row.TypeId: Row
for Row in (
PresenceRow,
+ PresenceDestinationsRow,
KeyedEduRow,
EduRow,
DeviceRow,
@@ -437,6 +499,7 @@ TypeToRow = {
ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
"presence", # list(UserPresenceState)
+ "presence_destinations", # list of tuples of UserPresenceState and destinations
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
"device_destinations", # set of destinations
@@ -458,6 +521,7 @@ def process_rows_for_federation(transaction_queue, rows):
buff = ParsedFederationStreamData(
presence=[],
+ presence_destinations=[],
keyed_edus={},
edus={},
device_destinations=set(),
@@ -476,6 +540,11 @@ def process_rows_for_federation(transaction_queue, rows):
if buff.presence:
transaction_queue.send_presence(buff.presence)
+ for state, destinations in buff.presence_destinations:
+ transaction_queue.send_presence_to_destinations(
+ states=[state], destinations=destinations,
+ )
+
for destination, edu_map in iteritems(buff.keyed_edus):
for key, edu in edu_map.items():
transaction_queue.send_edu(edu, key)
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 1dc041752b..4f0f939102 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -371,7 +371,7 @@ class FederationSender(object):
return
# First we queue up the new presence by user ID, so multiple presence
- # updates in quick successtion are correctly handled
+ # updates in quick succession are correctly handled.
# We only want to send presence for our own users, so lets always just
# filter here just in case.
self.pending_presence.update({
@@ -402,6 +402,23 @@ class FederationSender(object):
finally:
self._processing_pending_presence = False
+ def send_presence_to_destinations(self, states, destinations):
+ """Send the given presence states to the given destinations.
+
+ Args:
+ states (list[UserPresenceState])
+ destinations (list[str])
+ """
+
+ if not states or not self.hs.config.use_presence:
+ # No-op if presence is disabled.
+ return
+
+ for destination in destinations:
+ if destination == self.server_name:
+ continue
+ self._get_per_destination_queue(destination).send_presence(states)
+
@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
def _process_presence_inner(self, states):
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index efb6bdca48..452599e1a1 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -21,8 +21,8 @@ import re
from twisted.internet import defer
import synapse
-from synapse.api.constants import RoomVersions
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
+from synapse.api.room_versions import RoomVersions
from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX
from synapse.http.endpoint import parse_and_validate_server_name
from synapse.http.server import JsonResource
@@ -513,7 +513,7 @@ class FederationV1InviteServlet(BaseFederationServlet):
# state resolution algorithm, and we don't use that for processing
# invites
content = yield self.handler.on_invite_request(
- origin, content, room_version=RoomVersions.V1,
+ origin, content, room_version=RoomVersions.V1.identifier,
)
# V1 federation API is defined to return a content of `[200, {...}]`
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 9eaf2d3e18..0684778882 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -29,13 +29,7 @@ from unpaddedbase64 import decode_base64
from twisted.internet import defer
-from synapse.api.constants import (
- KNOWN_ROOM_VERSIONS,
- EventTypes,
- Membership,
- RejectedReason,
- RoomVersions,
-)
+from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.api.errors import (
AuthError,
CodeMessageException,
@@ -44,6 +38,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
from synapse.crypto.event_signing import compute_event_signature
from synapse.event_auth import auth_types_for_event
from synapse.events.validator import EventValidator
@@ -1733,7 +1728,9 @@ class FederationHandler(BaseHandler):
# invalid, and it would fail auth checks anyway.
raise SynapseError(400, "No create event in state")
- room_version = create_event.content.get("room_version", RoomVersions.V1)
+ room_version = create_event.content.get(
+ "room_version", RoomVersions.V1.identifier,
+ )
missing_auth_events = set()
for e in itertools.chain(auth_events, state, [event]):
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 2d9e4c651b..054398dbcb 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,7 +22,7 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from twisted.internet.defer import succeed
-from synapse.api.constants import EventTypes, Membership, RoomVersions
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import (
AuthError,
Codes,
@@ -30,6 +30,7 @@ from synapse.api.errors import (
NotFoundError,
SynapseError,
)
+from synapse.api.room_versions import RoomVersions
from synapse.api.urls import ConsentURIBuilder
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
@@ -603,7 +604,9 @@ class EventCreationHandler(object):
"""
if event.is_state() and (event.type, event.state_key) == (EventTypes.Create, ""):
- room_version = event.content.get("room_version", RoomVersions.V1)
+ room_version = event.content.get(
+ "room_version", RoomVersions.V1.identifier
+ )
else:
room_version = yield self.store.get_room_version(event.room_id)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 37e87fc054..e85c49742d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -31,9 +31,11 @@ from prometheus_client import Counter
from twisted.internet import defer
-from synapse.api.constants import PresenceState
+import synapse.metrics
+from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
@@ -98,6 +100,7 @@ class PresenceHandler(object):
self.hs = hs
self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id
+ self.server_name = hs.hostname
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.wheel_timer = WheelTimer()
@@ -132,9 +135,6 @@ class PresenceHandler(object):
)
)
- distributor = hs.get_distributor()
- distributor.observe("user_joined_room", self.user_joined_room)
-
active_presence = self.store.take_presence_startup_info()
# A dictionary of the current state of users. This is prefilled with
@@ -220,6 +220,15 @@ class PresenceHandler(object):
LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
lambda: len(self.wheel_timer))
+ # Used to handle sending of presence to newly joined users/servers
+ if hs.config.use_presence:
+ self.notifier.add_replication_callback(self.notify_new_event)
+
+ # Presence is best effort and quickly heals itself, so lets just always
+ # stream from the current state when we restart.
+ self._event_pos = self.store.get_current_events_token()
+ self._event_processing = False
+
@defer.inlineCallbacks
def _on_shutdown(self):
"""Gets called when shutting down. This lets us persist any updates that
@@ -751,31 +760,6 @@ class PresenceHandler(object):
yield self._update_states([prev_state.copy_and_replace(**new_fields)])
@defer.inlineCallbacks
- def user_joined_room(self, user, room_id):
- """Called (via the distributor) when a user joins a room. This funciton
- sends presence updates to servers, either:
- 1. the joining user is a local user and we send their presence to
- all servers in the room.
- 2. the joining user is a remote user and so we send presence for all
- local users in the room.
- """
- # We only need to send presence to servers that don't have it yet. We
- # don't need to send to local clients here, as that is done as part
- # of the event stream/sync.
- # TODO: Only send to servers not already in the room.
- if self.is_mine(user):
- state = yield self.current_state_for_user(user.to_string())
-
- self._push_to_remotes([state])
- else:
- user_ids = yield self.store.get_users_in_room(room_id)
- user_ids = list(filter(self.is_mine_id, user_ids))
-
- states = yield self.current_state_for_users(user_ids)
-
- self._push_to_remotes(list(states.values()))
-
- @defer.inlineCallbacks
def get_presence_list(self, observer_user, accepted=None):
"""Returns the presence for all users in their presence list.
"""
@@ -945,6 +929,140 @@ class PresenceHandler(object):
rows = yield self.store.get_all_presence_updates(last_id, current_id)
defer.returnValue(rows)
+ def notify_new_event(self):
+ """Called when new events have happened. Handles users and servers
+ joining rooms and require being sent presence.
+ """
+
+ if self._event_processing:
+ return
+
+ @defer.inlineCallbacks
+ def _process_presence():
+ assert not self._event_processing
+
+ self._event_processing = True
+ try:
+ yield self._unsafe_process()
+ finally:
+ self._event_processing = False
+
+ run_as_background_process("presence.notify_new_event", _process_presence)
+
+ @defer.inlineCallbacks
+ def _unsafe_process(self):
+ # Loop round handling deltas until we're up to date
+ while True:
+ with Measure(self.clock, "presence_delta"):
+ deltas = yield self.store.get_current_state_deltas(self._event_pos)
+ if not deltas:
+ return
+
+ yield self._handle_state_delta(deltas)
+
+ self._event_pos = deltas[-1]["stream_id"]
+
+ # Expose current event processing position to prometheus
+ synapse.metrics.event_processing_positions.labels("presence").set(
+ self._event_pos
+ )
+
+ @defer.inlineCallbacks
+ def _handle_state_delta(self, deltas):
+ """Process current state deltas to find new joins that need to be
+ handled.
+ """
+ for delta in deltas:
+ typ = delta["type"]
+ state_key = delta["state_key"]
+ room_id = delta["room_id"]
+ event_id = delta["event_id"]
+ prev_event_id = delta["prev_event_id"]
+
+ logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+
+ if typ != EventTypes.Member:
+ continue
+
+ event = yield self.store.get_event(event_id)
+ if event.content.get("membership") != Membership.JOIN:
+ # We only care about joins
+ continue
+
+ if prev_event_id:
+ prev_event = yield self.store.get_event(prev_event_id)
+ if prev_event.content.get("membership") == Membership.JOIN:
+ # Ignore changes to join events.
+ continue
+
+ yield self._on_user_joined_room(room_id, state_key)
+
+ @defer.inlineCallbacks
+ def _on_user_joined_room(self, room_id, user_id):
+ """Called when we detect a user joining the room via the current state
+ delta stream.
+
+ Args:
+ room_id (str)
+ user_id (str)
+
+ Returns:
+ Deferred
+ """
+
+ if self.is_mine_id(user_id):
+ # If this is a local user then we need to send their presence
+ # out to hosts in the room (who don't already have it)
+
+ # TODO: We should be able to filter the hosts down to those that
+ # haven't previously seen the user
+
+ state = yield self.current_state_for_user(user_id)
+ hosts = yield self.state.get_current_hosts_in_room(room_id)
+
+ # Filter out ourselves.
+ hosts = set(host for host in hosts if host != self.server_name)
+
+ self.federation.send_presence_to_destinations(
+ states=[state],
+ destinations=hosts,
+ )
+ else:
+ # A remote user has joined the room, so we need to:
+ # 1. Check if this is a new server in the room
+ # 2. If so send any presence they don't already have for
+ # local users in the room.
+
+ # TODO: We should be able to filter the users down to those that
+ # the server hasn't previously seen
+
+ # TODO: Check that this is actually a new server joining the
+ # room.
+
+ user_ids = yield self.state.get_current_user_in_room(room_id)
+ user_ids = list(filter(self.is_mine_id, user_ids))
+
+ states = yield self.current_state_for_users(user_ids)
+
+ # Filter out old presence, i.e. offline presence states where
+ # the user hasn't been active for a week. We can change this
+ # depending on what we want the UX to be, but at the least we
+ # should filter out offline presence where the state is just the
+ # default state.
+ now = self.clock.time_msec()
+ states = [
+ state for state in states.values()
+ if state.state != PresenceState.OFFLINE
+ or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
+ or state.status_msg is not None
+ ]
+
+ if states:
+ self.federation.send_presence_to_destinations(
+ states=states,
+ destinations=[get_domain_from_id(user_id)],
+ )
+
def should_notify(old_state, new_state):
"""Decides if a presence state change should be sent to interested parties.
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 58940e0320..a51d11a257 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -153,6 +153,7 @@ class RegistrationHandler(BaseHandler):
user_type=None,
default_display_name=None,
address=None,
+ bind_emails=[],
):
"""Registers a new client on the server.
@@ -172,6 +173,7 @@ class RegistrationHandler(BaseHandler):
default_display_name (unicode|None): if set, the new user's displayname
will be set to this. Defaults to 'localpart'.
address (str|None): the IP address used to perform the registration.
+ bind_emails (List[str]): list of emails to bind to this account.
Returns:
A tuple of (user_id, access_token).
Raises:
@@ -261,6 +263,21 @@ class RegistrationHandler(BaseHandler):
if not self.hs.config.user_consent_at_registration:
yield self._auto_join_rooms(user_id)
+ # Bind any specified emails to this account
+ current_time = self.hs.get_clock().time_msec()
+ for email in bind_emails:
+ # generate threepid dict
+ threepid_dict = {
+ "medium": "email",
+ "address": email,
+ "validated_at": current_time,
+ }
+
+ # Bind email to new account
+ yield self._register_email_threepid(
+ user_id, threepid_dict, None, False,
+ )
+
defer.returnValue((user_id, token))
@defer.inlineCallbacks
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 67b15697fd..c3dcfec247 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -25,14 +25,9 @@ from six import iteritems, string_types
from twisted.internet import defer
-from synapse.api.constants import (
- DEFAULT_ROOM_VERSION,
- KNOWN_ROOM_VERSIONS,
- EventTypes,
- JoinRules,
- RoomCreationPreset,
-)
+from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
+from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS
from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
@@ -479,7 +474,7 @@ class RoomCreationHandler(BaseHandler):
if ratelimit:
yield self.ratelimit(requester)
- room_version = config.get("room_version", DEFAULT_ROOM_VERSION)
+ room_version = config.get("room_version", DEFAULT_ROOM_VERSION.identifier)
if not isinstance(room_version, string_types):
raise SynapseError(
400,
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 235ce8334e..b3abd1b3c6 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -74,14 +74,14 @@ class ModuleApi(object):
return self._auth_handler.check_user_exists(user_id)
@defer.inlineCallbacks
- def register(self, localpart, displayname=None):
+ def register(self, localpart, displayname=None, emails=[]):
"""Registers a new user with given localpart and optional
- displayname.
+ displayname, emails.
Args:
localpart (str): The localpart of the new user.
- displayname (str|None): The displayname of the new user. If None,
- the user's displayname will default to `localpart`.
+ displayname (str|None): The displayname of the new user.
+ emails (List[str]): Emails to bind to the new user.
Returns:
Deferred: a 2-tuple of (user_id, access_token)
@@ -90,6 +90,7 @@ class ModuleApi(object):
reg = self.hs.get_registration_handler()
user_id, access_token = yield reg.register(
localpart=localpart, default_display_name=displayname,
+ bind_emails=emails,
)
defer.returnValue((user_id, access_token))
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 4830c68f35..b457c5563f 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -16,6 +16,10 @@
import logging
from synapse.api.constants import EventTypes
+from synapse.replication.tcp.streams.events import (
+ EventsStreamCurrentStateRow,
+ EventsStreamEventRow,
+)
from synapse.storage.event_federation import EventFederationWorkerStore
from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.events_worker import EventsWorkerStore
@@ -79,11 +83,7 @@ class SlavedEventStore(EventFederationWorkerStore,
if stream_name == "events":
self._stream_id_gen.advance(token)
for row in rows:
- self.invalidate_caches_for_event(
- token, row.event_id, row.room_id, row.type, row.state_key,
- row.redacts,
- backfilled=False,
- )
+ self._process_event_stream_row(token, row)
elif stream_name == "backfill":
self._backfill_id_gen.advance(-token)
for row in rows:
@@ -96,6 +96,23 @@ class SlavedEventStore(EventFederationWorkerStore,
stream_name, token, rows
)
+ def _process_event_stream_row(self, token, row):
+ data = row.data
+
+ if row.type == EventsStreamEventRow.TypeId:
+ self.invalidate_caches_for_event(
+ token, data.event_id, data.room_id, data.type, data.state_key,
+ data.redacts,
+ backfilled=False,
+ )
+ elif row.type == EventsStreamCurrentStateRow.TypeId:
+ if data.type == EventTypes.Member:
+ self.get_rooms_for_user_with_stream_ordering.invalidate(
+ (data.state_key, ),
+ )
+ else:
+ raise Exception("Unknown events stream row type %s" % (row.type, ))
+
def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
etype, state_key, redacts, backfilled):
self._invalidate_get_event_cache(event_id)
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e558f90e1a..206dc3b397 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -103,10 +103,19 @@ class ReplicationClientHandler(object):
hs.get_reactor().connectTCP(host, port, self.factory)
def on_rdata(self, stream_name, token, rows):
- """Called when we get new replication data. By default this just pokes
- the slave store.
+ """Called to handle a batch of replication data with a given stream token.
- Can be overriden in subclasses to handle more.
+ By default this just pokes the slave store. Can be overridden in subclasses to
+ handle more.
+
+ Args:
+ stream_name (str): name of the replication stream for this batch of rows
+ token (int): stream token for this batch of rows
+ rows (list): a list of Stream.ROW_TYPE objects as returned by
+ Stream.parse_row.
+
+ Returns:
+ Deferred|None
"""
logger.debug("Received rdata %s -> %s", stream_name, token)
return self.store.process_replication_rows(stream_name, token, rows)
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 02e5bf6cc8..b51590cf8f 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -42,8 +42,8 @@ indicate which side is sending, these are *not* included on the wire::
> POSITION backfill 1
> POSITION caches 1
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
- > RDATA events 14 ["$149019767112vOHxz:localhost:8823",
- "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
+ > RDATA events 14 ["ev", ["$149019767112vOHxz:localhost:8823",
+ "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]]
< PING 1490197675618
> ERROR server stopping
* connection closed by server *
@@ -605,7 +605,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
inbound_rdata_count.labels(stream_name).inc()
try:
- row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row)
+ row = STREAMS_MAP[stream_name].parse_row(cmd.row)
except Exception:
logger.exception(
"[%s] Failed to parse RDATA: %r %r",
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 7fc346c7b6..f6a38f5140 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -30,7 +30,8 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.metrics import Measure, measure_func
from .protocol import ServerReplicationStreamProtocol
-from .streams import STREAMS_MAP, FederationStream
+from .streams import STREAMS_MAP
+from .streams.federation import FederationStream
stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates",
"", ["stream_name"])
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
new file mode 100644
index 0000000000..634f636dc9
--- /dev/null
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -0,0 +1,49 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Defines all the valid streams that clients can subscribe to, and the format
+of the rows returned by each stream.
+
+Each stream is defined by the following information:
+
+ stream name: The name of the stream
+ row type: The type that is used to serialise/deserialse the row
+ current_token: The function that returns the current token for the stream
+ update_function: The function that returns a list of updates between two tokens
+"""
+
+from . import _base, events, federation
+
+STREAMS_MAP = {
+ stream.NAME: stream
+ for stream in (
+ events.EventsStream,
+ _base.BackfillStream,
+ _base.PresenceStream,
+ _base.TypingStream,
+ _base.ReceiptsStream,
+ _base.PushRulesStream,
+ _base.PushersStream,
+ _base.CachesStream,
+ _base.PublicRoomsStream,
+ _base.DeviceListsStream,
+ _base.ToDeviceStream,
+ federation.FederationStream,
+ _base.TagAccountDataStream,
+ _base.AccountDataStream,
+ _base.GroupServerStream,
+ )
+}
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams/_base.py
index a6e9d6709e..52564136fc 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,16 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-"""Defines all the valid streams that clients can subscribe to, and the format
-of the rows returned by each stream.
-Each stream is defined by the following information:
-
- stream name: The name of the stream
- row type: The type that is used to serialise/deserialse the row
- current_token: The function that returns the current token for the stream
- update_function: The function that returns a list of updates between two tokens
-"""
import itertools
import logging
from collections import namedtuple
@@ -34,14 +26,6 @@ logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 500000
-
-EventStreamRow = namedtuple("EventStreamRow", (
- "event_id", # str
- "room_id", # str
- "type", # str
- "state_key", # str, optional
- "redacts", # str, optional
-))
BackfillStreamRow = namedtuple("BackfillStreamRow", (
"event_id", # str
"room_id", # str
@@ -96,10 +80,6 @@ DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
"entity", # str
))
-FederationStreamRow = namedtuple("FederationStreamRow", (
- "type", # str, the type of data as defined in the BaseFederationRows
- "data", # dict, serialization of a federation.send_queue.BaseFederationRow
-))
TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
"user_id", # str
"room_id", # str
@@ -111,12 +91,6 @@ AccountDataStreamRow = namedtuple("AccountDataStream", (
"data_type", # str
"data", # dict
))
-CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", (
- "room_id", # str
- "type", # str
- "state_key", # str
- "event_id", # str, optional
-))
GroupsStreamRow = namedtuple("GroupsStreamRow", (
"group_id", # str
"user_id", # str
@@ -132,9 +106,24 @@ class Stream(object):
time it was called up until the point `advance_current_token` was called.
"""
NAME = None # The name of the stream
- ROW_TYPE = None # The type of the row
+ ROW_TYPE = None # The type of the row. Used by the default impl of parse_row.
_LIMITED = True # Whether the update function takes a limit
+ @classmethod
+ def parse_row(cls, row):
+ """Parse a row received over replication
+
+ By default, assumes that the row data is an array object and passes its contents
+ to the constructor of the ROW_TYPE for this stream.
+
+ Args:
+ row: row data from the incoming RDATA command, after json decoding
+
+ Returns:
+ ROW_TYPE object for this stream
+ """
+ return cls.ROW_TYPE(*row)
+
def __init__(self, hs):
# The token from which we last asked for updates
self.last_token = self.current_token()
@@ -162,8 +151,10 @@ class Stream(object):
until the `upto_token`
Returns:
- (list(ROW_TYPE), int): list of updates plus the token used as an
- upper bound of the updates (i.e. the "current token")
+ Deferred[Tuple[List[Tuple[int, Any]], int]:
+ Resolves to a pair ``(updates, current_token)``, where ``updates`` is a
+ list of ``(token, row)`` entries. ``row`` will be json-serialised and
+ sent over the replication steam.
"""
updates, current_token = yield self.get_updates_since(self.last_token)
self.last_token = current_token
@@ -176,8 +167,10 @@ class Stream(object):
stream updates
Returns:
- (list(ROW_TYPE), int): list of updates plus the token used as an
- upper bound of the updates (i.e. the "current token")
+ Deferred[Tuple[List[Tuple[int, Any]], int]:
+ Resolves to a pair ``(updates, current_token)``, where ``updates`` is a
+ list of ``(token, row)`` entries. ``row`` will be json-serialised and
+ sent over the replication steam.
"""
if from_token in ("NOW", "now"):
defer.returnValue(([], self.upto_token))
@@ -202,7 +195,7 @@ class Stream(object):
from_token, current_token,
)
- updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows]
+ updates = [(row[0], row[1:]) for row in rows]
# check we didn't get more rows than the limit.
# doing it like this allows the update_function to be a generator.
@@ -232,20 +225,6 @@ class Stream(object):
raise NotImplementedError()
-class EventsStream(Stream):
- """We received a new event, or an event went from being an outlier to not
- """
- NAME = "events"
- ROW_TYPE = EventStreamRow
-
- def __init__(self, hs):
- store = hs.get_datastore()
- self.current_token = store.get_current_events_token
- self.update_function = store.get_all_new_forward_event_rows
-
- super(EventsStream, self).__init__(hs)
-
-
class BackfillStream(Stream):
"""We fetched some old events and either we had never seen that event before
or it went from being an outlier to not.
@@ -400,22 +379,6 @@ class ToDeviceStream(Stream):
super(ToDeviceStream, self).__init__(hs)
-class FederationStream(Stream):
- """Data to be sent over federation. Only available when master has federation
- sending disabled.
- """
- NAME = "federation"
- ROW_TYPE = FederationStreamRow
-
- def __init__(self, hs):
- federation_sender = hs.get_federation_sender()
-
- self.current_token = federation_sender.get_current_token
- self.update_function = federation_sender.get_replication_rows
-
- super(FederationStream, self).__init__(hs)
-
-
class TagAccountDataStream(Stream):
"""Someone added/removed a tag for a room
"""
@@ -459,21 +422,6 @@ class AccountDataStream(Stream):
defer.returnValue(results)
-class CurrentStateDeltaStream(Stream):
- """Current state for a room was changed
- """
- NAME = "current_state_deltas"
- ROW_TYPE = CurrentStateDeltaStreamRow
-
- def __init__(self, hs):
- store = hs.get_datastore()
-
- self.current_token = store.get_max_current_state_delta_stream_id
- self.update_function = store.get_all_updated_current_state_deltas
-
- super(CurrentStateDeltaStream, self).__init__(hs)
-
-
class GroupServerStream(Stream):
NAME = "groups"
ROW_TYPE = GroupsStreamRow
@@ -485,26 +433,3 @@ class GroupServerStream(Stream):
self.update_function = store.get_all_groups_changes
super(GroupServerStream, self).__init__(hs)
-
-
-STREAMS_MAP = {
- stream.NAME: stream
- for stream in (
- EventsStream,
- BackfillStream,
- PresenceStream,
- TypingStream,
- ReceiptsStream,
- PushRulesStream,
- PushersStream,
- CachesStream,
- PublicRoomsStream,
- DeviceListsStream,
- ToDeviceStream,
- FederationStream,
- TagAccountDataStream,
- AccountDataStream,
- CurrentStateDeltaStream,
- GroupServerStream,
- )
-}
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
new file mode 100644
index 0000000000..e0f6e29248
--- /dev/null
+++ b/synapse/replication/tcp/streams/events.py
@@ -0,0 +1,146 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import heapq
+
+import attr
+
+from twisted.internet import defer
+
+from ._base import Stream
+
+
+"""Handling of the 'events' replication stream
+
+This stream contains rows of various types. Each row therefore contains a 'type'
+identifier before the real data. For example::
+
+ RDATA events batch ["state", ["!room:id", "m.type", "", "$event:id"]]
+ RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]]
+
+An "ev" row is sent for each new event. The fields in the data part are:
+
+ * The new event id
+ * The room id for the event
+ * The type of the new event
+ * The state key of the event, for state events
+ * The event id of an event which is redacted by this event.
+
+A "state" row is sent whenever the "current state" in a room changes. The fields in the
+data part are:
+
+ * The room id for the state change
+ * The event type of the state which has changed
+ * The state_key of the state which has changed
+ * The event id of the new state
+
+"""
+
+
+@attr.s(slots=True, frozen=True)
+class EventsStreamRow(object):
+ """A parsed row from the events replication stream"""
+ type = attr.ib() # str: the TypeId of one of the *EventsStreamRows
+ data = attr.ib() # BaseEventsStreamRow
+
+
+class BaseEventsStreamRow(object):
+ """Base class for rows to be sent in the events stream.
+
+ Specifies how to identify, serialize and deserialize the different types.
+ """
+
+ TypeId = None # Unique string that ids the type. Must be overriden in sub classes.
+
+ @classmethod
+ def from_data(cls, data):
+ """Parse the data from the replication stream into a row.
+
+ By default we just call the constructor with the data list as arguments
+
+ Args:
+ data: The value of the data object from the replication stream
+ """
+ return cls(*data)
+
+
+@attr.s(slots=True, frozen=True)
+class EventsStreamEventRow(BaseEventsStreamRow):
+ TypeId = "ev"
+
+ event_id = attr.ib() # str
+ room_id = attr.ib() # str
+ type = attr.ib() # str
+ state_key = attr.ib() # str, optional
+ redacts = attr.ib() # str, optional
+
+
+@attr.s(slots=True, frozen=True)
+class EventsStreamCurrentStateRow(BaseEventsStreamRow):
+ TypeId = "state"
+
+ room_id = attr.ib() # str
+ type = attr.ib() # str
+ state_key = attr.ib() # str
+ event_id = attr.ib() # str, optional
+
+
+TypeToRow = {
+ Row.TypeId: Row
+ for Row in (
+ EventsStreamEventRow,
+ EventsStreamCurrentStateRow,
+ )
+}
+
+
+class EventsStream(Stream):
+ """We received a new event, or an event went from being an outlier to not
+ """
+ NAME = "events"
+
+ def __init__(self, hs):
+ self._store = hs.get_datastore()
+ self.current_token = self._store.get_current_events_token
+
+ super(EventsStream, self).__init__(hs)
+
+ @defer.inlineCallbacks
+ def update_function(self, from_token, current_token, limit=None):
+ event_rows = yield self._store.get_all_new_forward_event_rows(
+ from_token, current_token, limit,
+ )
+ event_updates = (
+ (row[0], EventsStreamEventRow.TypeId, row[1:])
+ for row in event_rows
+ )
+
+ state_rows = yield self._store.get_all_updated_current_state_deltas(
+ from_token, current_token, limit
+ )
+ state_updates = (
+ (row[0], EventsStreamCurrentStateRow.TypeId, row[1:])
+ for row in state_rows
+ )
+
+ all_updates = heapq.merge(event_updates, state_updates)
+
+ defer.returnValue(all_updates)
+
+ @classmethod
+ def parse_row(cls, row):
+ (typ, data) = row
+ data = TypeToRow[typ].from_data(data)
+ return EventsStreamRow(typ, data)
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
new file mode 100644
index 0000000000..9aa43aa8d2
--- /dev/null
+++ b/synapse/replication/tcp/streams/federation.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from collections import namedtuple
+
+from ._base import Stream
+
+FederationStreamRow = namedtuple("FederationStreamRow", (
+ "type", # str, the type of data as defined in the BaseFederationRows
+ "data", # dict, serialization of a federation.send_queue.BaseFederationRow
+))
+
+
+class FederationStream(Stream):
+ """Data to be sent over federation. Only available when master has federation
+ sending disabled.
+ """
+ NAME = "federation"
+ ROW_TYPE = FederationStreamRow
+
+ def __init__(self, hs):
+ federation_sender = hs.get_federation_sender()
+
+ self.current_token = federation_sender.get_current_token
+ self.update_function = federation_sender.get_replication_rows
+
+ super(FederationStream, self).__init__(hs)
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index e788769639..1a26f5a1a6 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -647,8 +647,6 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
assert_params_in_dict(params, ["new_password"])
new_password = params['new_password']
- logger.info("new_password: %r", new_password)
-
yield self._set_password_handler.set_password(
target_user_id, new_password, requester
)
diff --git a/synapse/rest/client/v2_alpha/capabilities.py b/synapse/rest/client/v2_alpha/capabilities.py
index 373f95126e..a868d06098 100644
--- a/synapse/rest/client/v2_alpha/capabilities.py
+++ b/synapse/rest/client/v2_alpha/capabilities.py
@@ -16,7 +16,7 @@ import logging
from twisted.internet import defer
-from synapse.api.constants import DEFAULT_ROOM_VERSION, RoomDisposition, RoomVersions
+from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS
from synapse.http.servlet import RestServlet
from ._base import client_v2_patterns
@@ -48,12 +48,10 @@ class CapabilitiesRestServlet(RestServlet):
response = {
"capabilities": {
"m.room_versions": {
- "default": DEFAULT_ROOM_VERSION,
+ "default": DEFAULT_ROOM_VERSION.identifier,
"available": {
- RoomVersions.V1: RoomDisposition.STABLE,
- RoomVersions.V2: RoomDisposition.STABLE,
- RoomVersions.STATE_V2_TEST: RoomDisposition.UNSTABLE,
- RoomVersions.V3: RoomDisposition.STABLE,
+ v.identifier: v.disposition
+ for v in KNOWN_ROOM_VERSIONS.values()
},
},
"m.change_password": {"enabled": change_password},
diff --git a/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py b/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py
index e6356101fd..3db7ff8d1b 100644
--- a/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py
+++ b/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py
@@ -17,8 +17,8 @@ import logging
from twisted.internet import defer
-from synapse.api.constants import KNOWN_ROOM_VERSIONS
from synapse.api.errors import Codes, SynapseError
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.http.servlet import (
RestServlet,
assert_params_in_dict,
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 68058f613c..52347fee34 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -24,7 +24,8 @@ from frozendict import frozendict
from twisted.internet import defer
-from synapse.api.constants import EventTypes, RoomVersions
+from synapse.api.constants import EventTypes
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions
from synapse.events.snapshot import EventContext
from synapse.state import v1, v2
from synapse.util.async_helpers import Linearizer
@@ -603,22 +604,15 @@ def resolve_events_with_store(room_version, state_sets, event_map, state_res_sto
Deferred[dict[(str, str), str]]:
a map from (type, state_key) to event_id.
"""
- if room_version == RoomVersions.V1:
+ v = KNOWN_ROOM_VERSIONS[room_version]
+ if v.state_res == StateResolutionVersions.V1:
return v1.resolve_events_with_store(
state_sets, event_map, state_res_store.get_events,
)
- elif room_version in (
- RoomVersions.STATE_V2_TEST, RoomVersions.V2, RoomVersions.V3,
- ):
+ else:
return v2.resolve_events_with_store(
room_version, state_sets, event_map, state_res_store,
)
- else:
- # This should only happen if we added a version but forgot to add it to
- # the list above.
- raise Exception(
- "No state resolution algorithm defined for version %r" % (room_version,)
- )
@attr.s
diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index 6d3afcae7c..29b4e86cfd 100644
--- a/synapse/state/v1.py
+++ b/synapse/state/v1.py
@@ -21,8 +21,9 @@ from six import iteritems, iterkeys, itervalues
from twisted.internet import defer
from synapse import event_auth
-from synapse.api.constants import EventTypes, RoomVersions
+from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
+from synapse.api.room_versions import RoomVersions
logger = logging.getLogger(__name__)
@@ -275,7 +276,9 @@ def _resolve_auth_events(events, auth_events):
try:
# The signatures have already been checked at this point
event_auth.check(
- RoomVersions.V1, event, auth_events,
+ RoomVersions.V1.identifier,
+ event,
+ auth_events,
do_sig_check=False,
do_size_check=False,
)
@@ -291,7 +294,9 @@ def _resolve_normal_events(events, auth_events):
try:
# The signatures have already been checked at this point
event_auth.check(
- RoomVersions.V1, event, auth_events,
+ RoomVersions.V1.identifier,
+ event,
+ auth_events,
do_sig_check=False,
do_size_check=False,
)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 7e3903859b..653b32fbf5 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -1355,11 +1355,6 @@ class SQLBaseStore(object):
members_changed (iterable[str]): The user_ids of members that have
changed
"""
- for member in members_changed:
- self._attempt_to_invalidate_cache(
- "get_rooms_for_user_with_stream_ordering", (member,),
- )
-
for host in set(get_domain_from_id(u) for u in members_changed):
self._attempt_to_invalidate_cache(
"is_host_joined", (room_id, host,),
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 428300ea0a..dfda39bbe0 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -30,7 +30,6 @@ from twisted.internet import defer
import synapse.metrics
from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
-# these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -51,8 +50,11 @@ from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
persist_event_counter = Counter("synapse_storage_events_persisted_events", "")
-event_counter = Counter("synapse_storage_events_persisted_events_sep", "",
- ["type", "origin_type", "origin_entity"])
+event_counter = Counter(
+ "synapse_storage_events_persisted_events_sep",
+ "",
+ ["type", "origin_type", "origin_entity"],
+)
# The number of times we are recalculating the current state
state_delta_counter = Counter("synapse_storage_events_state_delta", "")
@@ -60,13 +62,15 @@ state_delta_counter = Counter("synapse_storage_events_state_delta", "")
# The number of times we are recalculating state when there is only a
# single forward extremity
state_delta_single_event_counter = Counter(
- "synapse_storage_events_state_delta_single_event", "")
+ "synapse_storage_events_state_delta_single_event", ""
+)
# The number of times we are reculating state when we could have resonably
# calculated the delta when we calculated the state for an event we were
# persisting.
state_delta_reuse_delta_counter = Counter(
- "synapse_storage_events_state_delta_reuse_delta", "")
+ "synapse_storage_events_state_delta_reuse_delta", ""
+)
def encode_json(json_object):
@@ -75,7 +79,7 @@ def encode_json(json_object):
"""
out = frozendict_json_encoder.encode(json_object)
if isinstance(out, bytes):
- out = out.decode('utf8')
+ out = out.decode("utf8")
return out
@@ -84,9 +88,9 @@ class _EventPeristenceQueue(object):
concurrent transaction per room.
"""
- _EventPersistQueueItem = namedtuple("_EventPersistQueueItem", (
- "events_and_contexts", "backfilled", "deferred",
- ))
+ _EventPersistQueueItem = namedtuple(
+ "_EventPersistQueueItem", ("events_and_contexts", "backfilled", "deferred")
+ )
def __init__(self):
self._event_persist_queues = {}
@@ -119,11 +123,13 @@ class _EventPeristenceQueue(object):
deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
- queue.append(self._EventPersistQueueItem(
- events_and_contexts=events_and_contexts,
- backfilled=backfilled,
- deferred=deferred,
- ))
+ queue.append(
+ self._EventPersistQueueItem(
+ events_and_contexts=events_and_contexts,
+ backfilled=backfilled,
+ deferred=deferred,
+ )
+ )
return deferred.observe()
@@ -191,6 +197,7 @@ def _retry_on_integrity_error(func):
Args:
func: function that returns a Deferred and accepts a `delete_existing` arg
"""
+
@wraps(func)
@defer.inlineCallbacks
def f(self, *args, **kwargs):
@@ -206,8 +213,12 @@ def _retry_on_integrity_error(func):
# inherits from EventFederationStore so that we can call _update_backward_extremities
# and _handle_mult_prev_events (though arguably those could both be moved in here)
-class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore,
- BackgroundUpdateStore):
+class EventsStore(
+ StateGroupWorkerStore,
+ EventFederationStore,
+ EventsWorkerStore,
+ BackgroundUpdateStore,
+):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
@@ -265,8 +276,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
deferreds = []
for room_id, evs_ctxs in iteritems(partitioned):
d = self._event_persist_queue.add_to_queue(
- room_id, evs_ctxs,
- backfilled=backfilled,
+ room_id, evs_ctxs, backfilled=backfilled
)
deferreds.append(d)
@@ -296,8 +306,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
and the stream ordering of the latest persisted event
"""
deferred = self._event_persist_queue.add_to_queue(
- event.room_id, [(event, context)],
- backfilled=backfilled,
+ event.room_id, [(event, context)], backfilled=backfilled
)
self._maybe_start_persisting(event.room_id)
@@ -312,16 +321,16 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
def persisting_queue(item):
with Measure(self._clock, "persist_events"):
yield self._persist_events(
- item.events_and_contexts,
- backfilled=item.backfilled,
+ item.events_and_contexts, backfilled=item.backfilled
)
self._event_persist_queue.handle_queue(room_id, persisting_queue)
@_retry_on_integrity_error
@defer.inlineCallbacks
- def _persist_events(self, events_and_contexts, backfilled=False,
- delete_existing=False):
+ def _persist_events(
+ self, events_and_contexts, backfilled=False, delete_existing=False
+ ):
"""Persist events to db
Args:
@@ -345,13 +354,11 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
)
with stream_ordering_manager as stream_orderings:
- for (event, context), stream, in zip(
- events_and_contexts, stream_orderings
- ):
+ for (event, context), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
chunks = [
- events_and_contexts[x:x + 100]
+ events_and_contexts[x : x + 100]
for x in range(0, len(events_and_contexts), 100)
]
@@ -445,12 +452,9 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
state_delta_reuse_delta_counter.inc()
break
- logger.info(
- "Calculating state delta for room %s", room_id,
- )
+ logger.info("Calculating state delta for room %s", room_id)
with Measure(
- self._clock,
- "persist_events.get_new_state_after_events",
+ self._clock, "persist_events.get_new_state_after_events"
):
res = yield self._get_new_state_after_events(
room_id,
@@ -470,11 +474,10 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
state_delta_for_room[room_id] = ([], delta_ids)
elif current_state is not None:
with Measure(
- self._clock,
- "persist_events.calculate_state_delta",
+ self._clock, "persist_events.calculate_state_delta"
):
delta = yield self._calculate_state_delta(
- room_id, current_state,
+ room_id, current_state
)
state_delta_for_room[room_id] = delta
@@ -498,7 +501,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
# backfilled events have negative stream orderings, so we don't
# want to set the event_persisted_position to that.
synapse.metrics.event_persisted_position.set(
- chunk[-1][0].internal_metadata.stream_ordering,
+ chunk[-1][0].internal_metadata.stream_ordering
)
for event, context in chunk:
@@ -515,9 +518,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
event_counter.labels(event.type, origin_type, origin_entity).inc()
for room_id, new_state in iteritems(current_state_for_room):
- self.get_current_state_ids.prefill(
- (room_id, ), new_state
- )
+ self.get_current_state_ids.prefill((room_id,), new_state)
for room_id, latest_event_ids in iteritems(new_forward_extremeties):
self.get_latest_event_ids_in_room.prefill(
@@ -535,8 +536,10 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
# we're only interested in new events which aren't outliers and which aren't
# being rejected.
new_events = [
- event for event, ctx in event_contexts
- if not event.internal_metadata.is_outlier() and not ctx.rejected
+ event
+ for event, ctx in event_contexts
+ if not event.internal_metadata.is_outlier()
+ and not ctx.rejected
and not event.internal_metadata.is_soft_failed()
]
@@ -544,15 +547,11 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
result = set(latest_event_ids)
# add all the new events to the list
- result.update(
- event.event_id for event in new_events
- )
+ result.update(event.event_id for event in new_events)
# Now remove all events which are prev_events of any of the new events
result.difference_update(
- e_id
- for event in new_events
- for e_id in event.prev_event_ids()
+ e_id for event in new_events for e_id in event.prev_event_ids()
)
# Finally, remove any events which are prev_events of any existing events.
@@ -592,17 +591,14 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
results.extend(r[0] for r in txn)
for chunk in batch_iter(event_ids, 100):
- yield self.runInteraction(
- "_get_events_which_are_prevs",
- _get_events,
- chunk,
- )
+ yield self.runInteraction("_get_events_which_are_prevs", _get_events, chunk)
defer.returnValue(results)
@defer.inlineCallbacks
- def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids,
- new_latest_event_ids):
+ def _get_new_state_after_events(
+ self, room_id, events_context, old_latest_event_ids, new_latest_event_ids
+ ):
"""Calculate the current state dict after adding some new events to
a room
@@ -642,7 +638,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
if not ev.internal_metadata.is_outlier():
raise Exception(
"Context for new event %s has no state "
- "group" % (ev.event_id, ),
+ "group" % (ev.event_id,)
)
continue
@@ -682,9 +678,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
if missing_event_ids:
# Now pull out the state groups for any missing events from DB
- event_to_groups = yield self._get_state_group_for_events(
- missing_event_ids,
- )
+ event_to_groups = yield self._get_state_group_for_events(missing_event_ids)
event_id_to_state_group.update(event_to_groups)
# State groups of old_latest_event_ids
@@ -710,9 +704,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
new_state_group = next(iter(new_state_groups))
old_state_group = next(iter(old_state_groups))
- delta_ids = state_group_deltas.get(
- (old_state_group, new_state_group,), None
- )
+ delta_ids = state_group_deltas.get((old_state_group, new_state_group), None)
if delta_ids is not None:
# We have a delta from the existing to new current state,
# so lets just return that. If we happen to already have
@@ -735,9 +727,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
# Ok, we need to defer to the state handler to resolve our state sets.
- state_groups = {
- sg: state_groups_map[sg] for sg in new_state_groups
- }
+ state_groups = {sg: state_groups_map[sg] for sg in new_state_groups}
events_map = {ev.event_id: ev for ev, _ in events_context}
@@ -755,8 +745,11 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
logger.debug("calling resolve_state_groups from preserve_events")
res = yield self._state_resolution_handler.resolve_state_groups(
- room_id, room_version, state_groups, events_map,
- state_res_store=StateResolutionStore(self)
+ room_id,
+ room_version,
+ state_groups,
+ events_map,
+ state_res_store=StateResolutionStore(self),
)
defer.returnValue((res.state, None))
@@ -774,22 +767,26 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
"""
existing_state = yield self.get_current_state_ids(room_id)
- to_delete = [
- key for key in existing_state
- if key not in current_state
- ]
+ to_delete = [key for key in existing_state if key not in current_state]
to_insert = {
- key: ev_id for key, ev_id in iteritems(current_state)
+ key: ev_id
+ for key, ev_id in iteritems(current_state)
if ev_id != existing_state.get(key)
}
defer.returnValue((to_delete, to_insert))
@log_function
- def _persist_events_txn(self, txn, events_and_contexts, backfilled,
- delete_existing=False, state_delta_for_room={},
- new_forward_extremeties={}):
+ def _persist_events_txn(
+ self,
+ txn,
+ events_and_contexts,
+ backfilled,
+ delete_existing=False,
+ state_delta_for_room={},
+ new_forward_extremeties={},
+ ):
"""Insert some number of room events into the necessary database tables.
Rejected events are only inserted into the events table, the events_json table,
@@ -816,9 +813,10 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
"""
all_events_and_contexts = events_and_contexts
+ min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
- self._update_current_state_txn(txn, state_delta_for_room, max_stream_order)
+ self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
self._update_forward_extremities_txn(
txn,
@@ -828,20 +826,17 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
# Ensure that we don't have the same event twice.
events_and_contexts = self._filter_events_and_contexts_for_duplicates(
- events_and_contexts,
+ events_and_contexts
)
self._update_room_depths_txn(
- txn,
- events_and_contexts=events_and_contexts,
- backfilled=backfilled,
+ txn, events_and_contexts=events_and_contexts, backfilled=backfilled
)
# _update_outliers_txn filters out any events which have already been
# persisted, and returns the filtered list.
events_and_contexts = self._update_outliers_txn(
- txn,
- events_and_contexts=events_and_contexts,
+ txn, events_and_contexts=events_and_contexts
)
# From this point onwards the events are only events that we haven't
@@ -852,15 +847,9 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
# for these events so we can reinsert them.
# This gets around any problems with some tables already having
# entries.
- self._delete_existing_rows_txn(
- txn,
- events_and_contexts=events_and_contexts,
- )
+ self._delete_existing_rows_txn(txn, events_and_contexts=events_and_contexts)
- self._store_event_txn(
- txn,
- events_and_contexts=events_and_contexts,
- )
+ self._store_event_txn(txn, events_and_contexts=events_and_contexts)
# Insert into event_to_state_groups.
self._store_event_state_mappings_txn(txn, events_and_contexts)
@@ -889,8 +878,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
# _store_rejected_events_txn filters out any events which were
# rejected, and returns the filtered list.
events_and_contexts = self._store_rejected_events_txn(
- txn,
- events_and_contexts=events_and_contexts,
+ txn, events_and_contexts=events_and_contexts
)
# From this point onwards the events are only ones that weren't
@@ -903,7 +891,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
backfilled=backfilled,
)
- def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
+ def _update_current_state_txn(self, txn, state_delta_by_room, stream_id):
for room_id, current_state_tuple in iteritems(state_delta_by_room):
to_delete, to_insert = current_state_tuple
@@ -912,6 +900,12 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
# that we can use it to calculate the `prev_event_id`. (This
# allows us to not have to pull out the existing state
# unnecessarily).
+ #
+ # The stream_id for the update is chosen to be the minimum of the stream_ids
+ # for the batch of the events that we are persisting; that means we do not
+ # end up in a situation where workers see events before the
+ # current_state_delta updates.
+ #
sql = """
INSERT INTO current_state_delta_stream
(stream_id, room_id, type, state_key, event_id, prev_event_id)
@@ -920,22 +914,40 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
WHERE room_id = ? AND type = ? AND state_key = ?
)
"""
- txn.executemany(sql, (
+ txn.executemany(
+ sql,
(
- max_stream_order, room_id, etype, state_key, None,
- room_id, etype, state_key,
- )
- for etype, state_key in to_delete
- # We sanity check that we're deleting rather than updating
- if (etype, state_key) not in to_insert
- ))
- txn.executemany(sql, (
+ (
+ stream_id,
+ room_id,
+ etype,
+ state_key,
+ None,
+ room_id,
+ etype,
+ state_key,
+ )
+ for etype, state_key in to_delete
+ # We sanity check that we're deleting rather than updating
+ if (etype, state_key) not in to_insert
+ ),
+ )
+ txn.executemany(
+ sql,
(
- max_stream_order, room_id, etype, state_key, ev_id,
- room_id, etype, state_key,
- )
- for (etype, state_key), ev_id in iteritems(to_insert)
- ))
+ (
+ stream_id,
+ room_id,
+ etype,
+ state_key,
+ ev_id,
+ room_id,
+ etype,
+ state_key,
+ )
+ for (etype, state_key), ev_id in iteritems(to_insert)
+ ),
+ )
# Now we actually update the current_state_events table
@@ -964,7 +976,8 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
txn.call_after(
self._curr_state_delta_stream_cache.entity_has_changed,
- room_id, max_stream_order,
+ room_id,
+ stream_id,
)
# Invalidate the various caches
@@ -980,28 +993,27 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
if ev_type == EventTypes.Member
)
+ for member in members_changed:
+ txn.call_after(
+ self.get_rooms_for_user_with_stream_ordering.invalidate, (member,)
+ )
+
self._invalidate_state_caches_and_stream(txn, room_id, members_changed)
- def _update_forward_extremities_txn(self, txn, new_forward_extremities,
- max_stream_order):
+ def _update_forward_extremities_txn(
+ self, txn, new_forward_extremities, max_stream_order
+ ):
for room_id, new_extrem in iteritems(new_forward_extremities):
self._simple_delete_txn(
- txn,
- table="event_forward_extremities",
- keyvalues={"room_id": room_id},
- )
- txn.call_after(
- self.get_latest_event_ids_in_room.invalidate, (room_id,)
+ txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
)
+ txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,))
self._simple_insert_many_txn(
txn,
table="event_forward_extremities",
values=[
- {
- "event_id": ev_id,
- "room_id": room_id,
- }
+ {"event_id": ev_id, "room_id": room_id}
for room_id, new_extrem in iteritems(new_forward_extremities)
for ev_id in new_extrem
],
@@ -1021,7 +1033,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
}
for room_id, new_extrem in iteritems(new_forward_extremities)
for event_id in new_extrem
- ]
+ ],
)
@classmethod
@@ -1065,7 +1077,8 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
if not backfilled:
txn.call_after(
self._events_stream_cache.entity_has_changed,
- event.room_id, event.internal_metadata.stream_ordering,
+ event.room_id,
+ event.internal_metadata.stream_ordering,
)
if not event.internal_metadata.is_outlier() and not context.rejected:
@@ -1092,16 +1105,12 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
are already in the events table.
"""
txn.execute(
- "SELECT event_id, outlier FROM events WHERE event_id in (%s)" % (
- ",".join(["?"] * len(events_and_contexts)),
- ),
- [event.event_id for event, _ in events_and_contexts]
+ "SELECT event_id, outlier FROM events WHERE event_id in (%s)"
+ % (",".join(["?"] * len(events_and_contexts)),),
+ [event.event_id for event, _ in events_and_contexts],
)
- have_persisted = {
- event_id: outlier
- for event_id, outlier in txn
- }
+ have_persisted = {event_id: outlier for event_id, outlier in txn}
to_remove = set()
for event, context in events_and_contexts:
@@ -1128,18 +1137,12 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
logger.exception("")
raise
- metadata_json = encode_json(
- event.internal_metadata.get_dict()
- )
+ metadata_json = encode_json(event.internal_metadata.get_dict())
sql = (
- "UPDATE event_json SET internal_metadata = ?"
- " WHERE event_id = ?"
- )
- txn.execute(
- sql,
- (metadata_json, event.event_id,)
+ "UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?"
)
+ txn.execute(sql, (metadata_json, event.event_id))
# Add an entry to the ex_outlier_stream table to replicate the
# change in outlier status to our workers.
@@ -1152,25 +1155,17 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
"event_stream_ordering": stream_order,
"event_id": event.event_id,
"state_group": state_group_id,
- }
+ },
)
- sql = (
- "UPDATE events SET outlier = ?"
- " WHERE event_id = ?"
- )
- txn.execute(
- sql,
- (False, event.event_id,)
- )
+ sql = "UPDATE events SET outlier = ?" " WHERE event_id = ?"
+ txn.execute(sql, (False, event.event_id))
# Update the event_backward_extremities table now that this
# event isn't an outlier any more.
self._update_backward_extremeties(txn, [event])
- return [
- ec for ec in events_and_contexts if ec[0] not in to_remove
- ]
+ return [ec for ec in events_and_contexts if ec[0] not in to_remove]
@classmethod
def _delete_existing_rows_txn(cls, txn, events_and_contexts):
@@ -1181,39 +1176,37 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
logger.info("Deleting existing")
for table in (
- "events",
- "event_auth",
- "event_json",
- "event_content_hashes",
- "event_destinations",
- "event_edge_hashes",
- "event_edges",
- "event_forward_extremities",
- "event_reference_hashes",
- "event_search",
- "event_signatures",
- "event_to_state_groups",
- "guest_access",
- "history_visibility",
- "local_invites",
- "room_names",
- "state_events",
- "rejections",
- "redactions",
- "room_memberships",
- "topics"
+ "events",
+ "event_auth",
+ "event_json",
+ "event_content_hashes",
+ "event_destinations",
+ "event_edge_hashes",
+ "event_edges",
+ "event_forward_extremities",
+ "event_reference_hashes",
+ "event_search",
+ "event_signatures",
+ "event_to_state_groups",
+ "guest_access",
+ "history_visibility",
+ "local_invites",
+ "room_names",
+ "state_events",
+ "rejections",
+ "redactions",
+ "room_memberships",
+ "topics",
):
txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
- [(ev.event_id,) for ev, _ in events_and_contexts]
+ [(ev.event_id,) for ev, _ in events_and_contexts],
)
- for table in (
- "event_push_actions",
- ):
+ for table in ("event_push_actions",):
txn.executemany(
"DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
- [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts]
+ [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts],
)
def _store_event_txn(self, txn, events_and_contexts):
@@ -1296,17 +1289,14 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
for event, context in events_and_contexts:
if context.rejected:
# Insert the event_id into the rejections table
- self._store_rejections_txn(
- txn, event.event_id, context.rejected
- )
+ self._store_rejections_txn(txn, event.event_id, context.rejected)
to_remove.add(event)
- return [
- ec for ec in events_and_contexts if ec[0] not in to_remove
- ]
+ return [ec for ec in events_and_contexts if ec[0] not in to_remove]
- def _update_metadata_tables_txn(self, txn, events_and_contexts,
- all_events_and_contexts, backfilled):
+ def _update_metadata_tables_txn(
+ self, txn, events_and_contexts, all_events_and_contexts, backfilled
+ ):
"""Update all the miscellaneous tables for new events
Args:
@@ -1342,8 +1332,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
# Update the event_forward_extremities, event_backward_extremities and
# event_edges tables.
self._handle_mult_prev_events(
- txn,
- events=[event for event, _ in events_and_contexts],
+ txn, events=[event for event, _ in events_and_contexts]
)
for event, _ in events_and_contexts:
@@ -1401,11 +1390,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
state_values.append(vals)
- self._simple_insert_many_txn(
- txn,
- table="state_events",
- values=state_values,
- )
+ self._simple_insert_many_txn(txn, table="state_events", values=state_values)
# Prefill the event cache
self._add_to_cache(txn, events_and_contexts)
@@ -1416,10 +1401,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
rows = []
N = 200
for i in range(0, len(events_and_contexts), N):
- ev_map = {
- e[0].event_id: e[0]
- for e in events_and_contexts[i:i + N]
- }
+ ev_map = {e[0].event_id: e[0] for e in events_and_contexts[i : i + N]}
if not ev_map:
break
@@ -1439,14 +1421,14 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
for row in rows:
event = ev_map[row["event_id"]]
if not row["rejects"] and not row["redacts"]:
- to_prefill.append(_EventCacheEntry(
- event=event,
- redacted_event=None,
- ))
+ to_prefill.append(
+ _EventCacheEntry(event=event, redacted_event=None)
+ )
def prefill():
for cache_entry in to_prefill:
self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry)
+
txn.call_after(prefill)
def _store_redaction(self, txn, event):
@@ -1454,7 +1436,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
txn.call_after(self._invalidate_get_event_cache, event.redacts)
txn.execute(
"INSERT INTO redactions (event_id, redacts) VALUES (?,?)",
- (event.event_id, event.redacts)
+ (event.event_id, event.redacts),
)
@defer.inlineCallbacks
@@ -1465,6 +1447,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
If it has been significantly less or more than one day since the last
call to this function, it will return None.
"""
+
def _count_messages(txn):
sql = """
SELECT COALESCE(COUNT(*), 0) FROM events
@@ -1492,7 +1475,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
AND stream_ordering > ?
"""
- txn.execute(sql, (like_clause, self.stream_ordering_day_ago,))
+ txn.execute(sql, (like_clause, self.stream_ordering_day_ago))
count, = txn.fetchone()
return count
@@ -1557,18 +1540,16 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
update_rows.append((sender, contains_url, event_id))
- sql = (
- "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
- )
+ sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
- clump = update_rows[index:index + INSERT_CLUMP_SIZE]
+ clump = update_rows[index : index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)
progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id,
- "rows_inserted": rows_inserted + len(rows)
+ "rows_inserted": rows_inserted + len(rows),
}
self._background_update_progress_txn(
@@ -1613,10 +1594,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
rows_to_update = []
- chunks = [
- event_ids[i:i + 100]
- for i in range(0, len(event_ids), 100)
- ]
+ chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
for chunk in chunks:
ev_rows = self._simple_select_many_txn(
txn,
@@ -1639,18 +1617,16 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
rows_to_update.append((origin_server_ts, event_id))
- sql = (
- "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
- )
+ sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
- clump = rows_to_update[index:index + INSERT_CLUMP_SIZE]
+ clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)
progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id,
- "rows_inserted": rows_inserted + len(rows_to_update)
+ "rows_inserted": rows_inserted + len(rows_to_update),
}
self._background_update_progress_txn(
@@ -1714,6 +1690,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
new_event_updates.extend(txn)
return new_event_updates
+
return self.runInteraction(
"get_all_new_forward_event_rows", get_all_new_forward_event_rows
)
@@ -1756,13 +1733,20 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
new_event_updates.extend(txn.fetchall())
return new_event_updates
+
return self.runInteraction(
"get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
)
@cached(num_args=5, max_entries=10)
- def get_all_new_events(self, last_backfill_id, last_forward_id,
- current_backfill_id, current_forward_id, limit):
+ def get_all_new_events(
+ self,
+ last_backfill_id,
+ last_forward_id,
+ current_backfill_id,
+ current_forward_id,
+ limit,
+ ):
"""Get all the new events that have arrived at the server either as
new events or as backfilled events"""
have_backfill_events = last_backfill_id != current_backfill_id
@@ -1837,14 +1821,15 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
backward_ex_outliers = []
return AllNewEventsResult(
- new_forward_events, new_backfill_events,
- forward_ex_outliers, backward_ex_outliers,
+ new_forward_events,
+ new_backfill_events,
+ forward_ex_outliers,
+ backward_ex_outliers,
)
+
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
- def purge_history(
- self, room_id, token, delete_local_events,
- ):
+ def purge_history(self, room_id, token, delete_local_events):
"""Deletes room history before a certain point
Args:
@@ -1860,13 +1845,13 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
return self.runInteraction(
"purge_history",
- self._purge_history_txn, room_id, token,
+ self._purge_history_txn,
+ room_id,
+ token,
delete_local_events,
)
- def _purge_history_txn(
- self, txn, room_id, token_str, delete_local_events,
- ):
+ def _purge_history_txn(self, txn, room_id, token_str, delete_local_events):
token = RoomStreamToken.parse(token_str)
# Tables that should be pruned:
@@ -1913,7 +1898,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
"ON e.event_id = f.event_id "
"AND e.room_id = f.room_id "
"WHERE f.room_id = ?",
- (room_id,)
+ (room_id,),
)
rows = txn.fetchall()
max_depth = max(row[1] for row in rows)
@@ -1934,10 +1919,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
should_delete_expr += " AND event_id NOT LIKE ?"
# We include the parameter twice since we use the expression twice
- should_delete_params += (
- "%:" + self.hs.hostname,
- "%:" + self.hs.hostname,
- )
+ should_delete_params += ("%:" + self.hs.hostname, "%:" + self.hs.hostname)
should_delete_params += (room_id, token.topological)
@@ -1948,10 +1930,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
" SELECT event_id, %s"
" FROM events AS e LEFT JOIN state_events USING (event_id)"
" WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?"
- % (
- should_delete_expr,
- should_delete_expr,
- ),
+ % (should_delete_expr, should_delete_expr),
should_delete_params,
)
@@ -1961,23 +1940,19 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
# the should_delete / shouldn't_delete subsets
txn.execute(
"CREATE INDEX events_to_purge_should_delete"
- " ON events_to_purge(should_delete)",
+ " ON events_to_purge(should_delete)"
)
# We do joins against events_to_purge for e.g. calculating state
# groups to purge, etc., so lets make an index.
- txn.execute(
- "CREATE INDEX events_to_purge_id"
- " ON events_to_purge(event_id)",
- )
+ txn.execute("CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)")
- txn.execute(
- "SELECT event_id, should_delete FROM events_to_purge"
- )
+ txn.execute("SELECT event_id, should_delete FROM events_to_purge")
event_rows = txn.fetchall()
logger.info(
"[purge] found %i events before cutoff, of which %i can be deleted",
- len(event_rows), sum(1 for e in event_rows if e[1]),
+ len(event_rows),
+ sum(1 for e in event_rows if e[1]),
)
logger.info("[purge] Finding new backward extremities")
@@ -1989,24 +1964,21 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
"SELECT DISTINCT e.event_id FROM events_to_purge AS e"
" INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
" LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id"
- " WHERE ep2.event_id IS NULL",
+ " WHERE ep2.event_id IS NULL"
)
new_backwards_extrems = txn.fetchall()
logger.info("[purge] replacing backward extremities: %r", new_backwards_extrems)
txn.execute(
- "DELETE FROM event_backward_extremities WHERE room_id = ?",
- (room_id,)
+ "DELETE FROM event_backward_extremities WHERE room_id = ?", (room_id,)
)
# Update backward extremeties
txn.executemany(
"INSERT INTO event_backward_extremities (room_id, event_id)"
" VALUES (?, ?)",
- [
- (room_id, event_id) for event_id, in new_backwards_extrems
- ]
+ [(room_id, event_id) for event_id, in new_backwards_extrems],
)
logger.info("[purge] finding redundant state groups")
@@ -2014,28 +1986,25 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
# Get all state groups that are referenced by events that are to be
# deleted. We then go and check if they are referenced by other events
# or state groups, and if not we delete them.
- txn.execute("""
+ txn.execute(
+ """
SELECT DISTINCT state_group FROM events_to_purge
INNER JOIN event_to_state_groups USING (event_id)
- """)
+ """
+ )
referenced_state_groups = set(sg for sg, in txn)
logger.info(
- "[purge] found %i referenced state groups",
- len(referenced_state_groups),
+ "[purge] found %i referenced state groups", len(referenced_state_groups)
)
logger.info("[purge] finding state groups that can be deleted")
- state_groups_to_delete, remaining_state_groups = (
- self._find_unreferenced_groups_during_purge(
- txn, referenced_state_groups,
- )
- )
+ _ = self._find_unreferenced_groups_during_purge(txn, referenced_state_groups)
+ state_groups_to_delete, remaining_state_groups = _
logger.info(
- "[purge] found %i state groups to delete",
- len(state_groups_to_delete),
+ "[purge] found %i state groups to delete", len(state_groups_to_delete)
)
logger.info(
@@ -2047,25 +2016,15 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
# groups to non delta versions.
for sg in remaining_state_groups:
logger.info("[purge] de-delta-ing remaining state group %s", sg)
- curr_state = self._get_state_groups_from_groups_txn(
- txn, [sg],
- )
+ curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
curr_state = curr_state[sg]
self._simple_delete_txn(
- txn,
- table="state_groups_state",
- keyvalues={
- "state_group": sg,
- }
+ txn, table="state_groups_state", keyvalues={"state_group": sg}
)
self._simple_delete_txn(
- txn,
- table="state_group_edges",
- keyvalues={
- "state_group": sg,
- }
+ txn, table="state_group_edges", keyvalues={"state_group": sg}
)
self._simple_insert_many_txn(
@@ -2099,9 +2058,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
"WHERE event_id IN (SELECT event_id from events_to_purge)"
)
for event_id, _ in event_rows:
- txn.call_after(self._get_state_group_for_event.invalidate, (
- event_id,
- ))
+ txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
# Delete all remote non-state events
for table in (
@@ -2123,21 +2080,19 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
txn.execute(
"DELETE FROM %s WHERE event_id IN ("
" SELECT event_id FROM events_to_purge WHERE should_delete"
- ")" % (table,),
+ ")" % (table,)
)
# event_push_actions lacks an index on event_id, and has one on
# (room_id, event_id) instead.
- for table in (
- "event_push_actions",
- ):
+ for table in ("event_push_actions",):
logger.info("[purge] removing events from %s", table)
txn.execute(
"DELETE FROM %s WHERE room_id = ? AND event_id IN ("
" SELECT event_id FROM events_to_purge WHERE should_delete"
")" % (table,),
- (room_id, )
+ (room_id,),
)
# Mark all state and own events as outliers
@@ -2162,27 +2117,28 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
# extremities. However, the events in event_backward_extremities
# are ones we don't have yet so we need to look at the events that
# point to it via event_edges table.
- txn.execute("""
+ txn.execute(
+ """
SELECT COALESCE(MIN(depth), 0)
FROM event_backward_extremities AS eb
INNER JOIN event_edges AS eg ON eg.prev_event_id = eb.event_id
INNER JOIN events AS e ON e.event_id = eg.event_id
WHERE eb.room_id = ?
- """, (room_id,))
+ """,
+ (room_id,),
+ )
min_depth, = txn.fetchone()
logger.info("[purge] updating room_depth to %d", min_depth)
txn.execute(
"UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
- (min_depth, room_id,)
+ (min_depth, room_id),
)
# finally, drop the temp table. this will commit the txn in sqlite,
# so make sure to keep this actually last.
- txn.execute(
- "DROP TABLE events_to_purge"
- )
+ txn.execute("DROP TABLE events_to_purge")
logger.info("[purge] done")
@@ -2226,7 +2182,9 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
SELECT DISTINCT state_group FROM event_to_state_groups
LEFT JOIN events_to_purge AS ep USING (event_id)
WHERE state_group IN (%s) AND ep.event_id IS NULL
- """ % (",".join("?" for _ in current_search),)
+ """ % (
+ ",".join("?" for _ in current_search),
+ )
txn.execute(sql, list(current_search))
referenced = set(sg for sg, in txn)
@@ -2242,7 +2200,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
column="prev_state_group",
iterable=current_search,
keyvalues={},
- retcols=("prev_state_group", "state_group",),
+ retcols=("prev_state_group", "state_group"),
)
prevs = set(row["state_group"] for row in rows)
@@ -2279,16 +2237,15 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
table="events",
retcols=["topological_ordering", "stream_ordering"],
keyvalues={"event_id": event_id},
- allow_none=True
+ allow_none=True,
)
if not res:
raise SynapseError(404, "Could not find event %s" % (event_id,))
- defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"])))
-
- def get_max_current_state_delta_stream_id(self):
- return self._stream_id_gen.get_current_token()
+ defer.returnValue(
+ (int(res["topological_ordering"]), int(res["stream_ordering"]))
+ )
def get_all_updated_current_state_deltas(self, from_token, to_token, limit):
def get_all_updated_current_state_deltas_txn(txn):
@@ -2300,13 +2257,19 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
"""
txn.execute(sql, (from_token, to_token, limit))
return txn.fetchall()
+
return self.runInteraction(
"get_all_updated_current_state_deltas",
get_all_updated_current_state_deltas_txn,
)
-AllNewEventsResult = namedtuple("AllNewEventsResult", [
- "new_forward_events", "new_backfill_events",
- "forward_ex_outliers", "backward_ex_outliers",
-])
+AllNewEventsResult = namedtuple(
+ "AllNewEventsResult",
+ [
+ "new_forward_events",
+ "new_backfill_events",
+ "forward_ex_outliers",
+ "backward_ex_outliers",
+ ],
+)
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 1716be529a..53c8dc3903 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -21,8 +21,9 @@ from canonicaljson import json
from twisted.internet import defer
-from synapse.api.constants import EventFormatVersions, EventTypes
+from synapse.api.constants import EventTypes
from synapse.api.errors import NotFoundError
+from synapse.api.room_versions import EventFormatVersions
from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401
# these are only included to make the type annotations work
from synapse.events.snapshot import EventContext # noqa: F401
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 2874dabbd1..57df17bcc2 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -35,28 +35,22 @@ logger = logging.getLogger(__name__)
RoomsForUser = namedtuple(
- "RoomsForUser",
- ("room_id", "sender", "membership", "event_id", "stream_ordering")
+ "RoomsForUser", ("room_id", "sender", "membership", "event_id", "stream_ordering")
)
GetRoomsForUserWithStreamOrdering = namedtuple(
- "_GetRoomsForUserWithStreamOrdering",
- ("room_id", "stream_ordering",)
+ "_GetRoomsForUserWithStreamOrdering", ("room_id", "stream_ordering")
)
# We store this using a namedtuple so that we save about 3x space over using a
# dict.
-ProfileInfo = namedtuple(
- "ProfileInfo", ("avatar_url", "display_name")
-)
+ProfileInfo = namedtuple("ProfileInfo", ("avatar_url", "display_name"))
# "members" points to a truncated list of (user_id, event_id) tuples for users of
# a given membership type, suitable for use in calculating heroes for a room.
# "count" points to the total numberr of users of a given membership type.
-MemberSummary = namedtuple(
- "MemberSummary", ("members", "count")
-)
+MemberSummary = namedtuple("MemberSummary", ("members", "count"))
_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
@@ -67,12 +61,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
"""Returns the set of all hosts currently in the room
"""
user_ids = yield self.get_users_in_room(
- room_id, on_invalidate=cache_context.invalidate,
+ room_id, on_invalidate=cache_context.invalidate
)
hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
defer.returnValue(hosts)
- @cachedInlineCallbacks(max_entries=100000, iterable=True)
+ @cached(max_entries=100000, iterable=True)
def get_users_in_room(self, room_id):
def f(txn):
sql = (
@@ -84,16 +78,10 @@ class RoomMemberWorkerStore(EventsWorkerStore):
" WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
)
- txn.execute(sql, (room_id, Membership.JOIN,))
+ txn.execute(sql, (room_id, Membership.JOIN))
return [to_ascii(r[0]) for r in txn]
- start_time = self._clock.time_msec()
- result = yield self.runInteraction("get_users_in_room", f)
- end_time = self._clock.time_msec()
- logger.info(
- "Fetched room membership for %s (%i users) in %i ms",
- room_id, len(result), end_time - start_time,
- )
- defer.returnValue(result)
+
+ return self.runInteraction("get_users_in_room", f)
@cached(max_entries=100000)
def get_room_summary(self, room_id):
@@ -163,9 +151,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
A deferred list of RoomsForUser.
"""
- return self.get_rooms_for_user_where_membership_is(
- user_id, [Membership.INVITE]
- )
+ return self.get_rooms_for_user_where_membership_is(user_id, [Membership.INVITE])
@defer.inlineCallbacks
def get_invite_for_user_in_room(self, user_id, room_id):
@@ -203,11 +189,13 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return self.runInteraction(
"get_rooms_for_user_where_membership_is",
self._get_rooms_for_user_where_membership_is_txn,
- user_id, membership_list
+ user_id,
+ membership_list,
)
- def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id,
- membership_list):
+ def _get_rooms_for_user_where_membership_is_txn(
+ self, txn, user_id, membership_list
+ ):
do_invite = Membership.INVITE in membership_list
membership_list = [m for m in membership_list if m != Membership.INVITE]
@@ -234,9 +222,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
) % (where_clause,)
txn.execute(sql, args)
- results = [
- RoomsForUser(**r) for r in self.cursor_to_dict(txn)
- ]
+ results = [RoomsForUser(**r) for r in self.cursor_to_dict(txn)]
if do_invite:
sql = (
@@ -248,13 +234,16 @@ class RoomMemberWorkerStore(EventsWorkerStore):
)
txn.execute(sql, (user_id,))
- results.extend(RoomsForUser(
- room_id=r["room_id"],
- sender=r["inviter"],
- event_id=r["event_id"],
- stream_ordering=r["stream_ordering"],
- membership=Membership.INVITE,
- ) for r in self.cursor_to_dict(txn))
+ results.extend(
+ RoomsForUser(
+ room_id=r["room_id"],
+ sender=r["inviter"],
+ event_id=r["event_id"],
+ stream_ordering=r["stream_ordering"],
+ membership=Membership.INVITE,
+ )
+ for r in self.cursor_to_dict(txn)
+ )
return results
@@ -271,19 +260,21 @@ class RoomMemberWorkerStore(EventsWorkerStore):
of the most recent join for that user and room.
"""
rooms = yield self.get_rooms_for_user_where_membership_is(
- user_id, membership_list=[Membership.JOIN],
+ user_id, membership_list=[Membership.JOIN]
+ )
+ defer.returnValue(
+ frozenset(
+ GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering)
+ for r in rooms
+ )
)
- defer.returnValue(frozenset(
- GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering)
- for r in rooms
- ))
@defer.inlineCallbacks
def get_rooms_for_user(self, user_id, on_invalidate=None):
"""Returns a set of room_ids the user is currently joined to
"""
rooms = yield self.get_rooms_for_user_with_stream_ordering(
- user_id, on_invalidate=on_invalidate,
+ user_id, on_invalidate=on_invalidate
)
defer.returnValue(frozenset(r.room_id for r in rooms))
@@ -292,13 +283,13 @@ class RoomMemberWorkerStore(EventsWorkerStore):
"""Returns the set of users who share a room with `user_id`
"""
room_ids = yield self.get_rooms_for_user(
- user_id, on_invalidate=cache_context.invalidate,
+ user_id, on_invalidate=cache_context.invalidate
)
user_who_share_room = set()
for room_id in room_ids:
user_ids = yield self.get_users_in_room(
- room_id, on_invalidate=cache_context.invalidate,
+ room_id, on_invalidate=cache_context.invalidate
)
user_who_share_room.update(user_ids)
@@ -316,9 +307,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
current_state_ids = yield context.get_current_state_ids(self)
result = yield self._get_joined_users_from_context(
- event.room_id, state_group, current_state_ids,
- event=event,
- context=context,
+ event.room_id, state_group, current_state_ids, event=event, context=context
)
defer.returnValue(result)
@@ -332,13 +321,21 @@ class RoomMemberWorkerStore(EventsWorkerStore):
state_group = object()
return self._get_joined_users_from_context(
- room_id, state_group, state_entry.state, context=state_entry,
+ room_id, state_group, state_entry.state, context=state_entry
)
- @cachedInlineCallbacks(num_args=2, cache_context=True, iterable=True,
- max_entries=100000)
- def _get_joined_users_from_context(self, room_id, state_group, current_state_ids,
- cache_context, event=None, context=None):
+ @cachedInlineCallbacks(
+ num_args=2, cache_context=True, iterable=True, max_entries=100000
+ )
+ def _get_joined_users_from_context(
+ self,
+ room_id,
+ state_group,
+ current_state_ids,
+ cache_context,
+ event=None,
+ context=None,
+ ):
# We don't use `state_group`, it's there so that we can cache based
# on it. However, it's important that it's never None, since two current_states
# with a state_group of None are likely to be different.
@@ -378,9 +375,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# the hit ratio counts. After all, we don't populate the cache if we
# miss it here
event_map = self._get_events_from_cache(
- member_event_ids,
- allow_rejected=False,
- update_metrics=False,
+ member_event_ids, allow_rejected=False, update_metrics=False
)
missing_member_event_ids = []
@@ -404,21 +399,21 @@ class RoomMemberWorkerStore(EventsWorkerStore):
table="room_memberships",
column="event_id",
iterable=missing_member_event_ids,
- retcols=('user_id', 'display_name', 'avatar_url',),
- keyvalues={
- "membership": Membership.JOIN,
- },
+ retcols=('user_id', 'display_name', 'avatar_url'),
+ keyvalues={"membership": Membership.JOIN},
batch_size=500,
desc="_get_joined_users_from_context",
)
- users_in_room.update({
- to_ascii(row["user_id"]): ProfileInfo(
- avatar_url=to_ascii(row["avatar_url"]),
- display_name=to_ascii(row["display_name"]),
- )
- for row in rows
- })
+ users_in_room.update(
+ {
+ to_ascii(row["user_id"]): ProfileInfo(
+ avatar_url=to_ascii(row["avatar_url"]),
+ display_name=to_ascii(row["display_name"]),
+ )
+ for row in rows
+ }
+ )
if event is not None and event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
@@ -512,7 +507,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
state_group = object()
return self._get_joined_hosts(
- room_id, state_group, state_entry.state, state_entry=state_entry,
+ room_id, state_group, state_entry.state, state_entry=state_entry
)
@cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
@@ -538,6 +533,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
"""Returns whether user_id has elected to discard history for room_id.
Returns False if they have since re-joined."""
+
def f(txn):
sql = (
"SELECT"
@@ -554,6 +550,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
txn.execute(sql, (user_id, room_id))
rows = txn.fetchall()
return rows[0][0]
+
count = yield self.runInteraction("did_forget_membership", f)
defer.returnValue(count == 0)
@@ -582,13 +579,14 @@ class RoomMemberStore(RoomMemberWorkerStore):
"avatar_url": event.content.get("avatar_url", None),
}
for event in events
- ]
+ ],
)
for event in events:
txn.call_after(
self._membership_stream_cache.entity_has_changed,
- event.state_key, event.internal_metadata.stream_ordering
+ event.state_key,
+ event.internal_metadata.stream_ordering,
)
txn.call_after(
self.get_invited_rooms_for_user.invalidate, (event.state_key,)
@@ -614,7 +612,7 @@ class RoomMemberStore(RoomMemberWorkerStore):
"inviter": event.sender,
"room_id": event.room_id,
"stream_id": event.internal_metadata.stream_ordering,
- }
+ },
)
else:
sql = (
@@ -623,12 +621,15 @@ class RoomMemberStore(RoomMemberWorkerStore):
" AND replaced_by is NULL"
)
- txn.execute(sql, (
- event.internal_metadata.stream_ordering,
- event.event_id,
- event.room_id,
- event.state_key,
- ))
+ txn.execute(
+ sql,
+ (
+ event.internal_metadata.stream_ordering,
+ event.event_id,
+ event.room_id,
+ event.state_key,
+ ),
+ )
@defer.inlineCallbacks
def locally_reject_invite(self, user_id, room_id):
@@ -639,18 +640,14 @@ class RoomMemberStore(RoomMemberWorkerStore):
)
def f(txn, stream_ordering):
- txn.execute(sql, (
- stream_ordering,
- True,
- room_id,
- user_id,
- ))
+ txn.execute(sql, (stream_ordering, True, room_id, user_id))
with self._stream_id_gen.get_next() as stream_ordering:
yield self.runInteraction("locally_reject_invite", f, stream_ordering)
def forget(self, user_id, room_id):
"""Indicate that user_id wishes to discard history for room_id."""
+
def f(txn):
sql = (
"UPDATE"
@@ -664,9 +661,8 @@ class RoomMemberStore(RoomMemberWorkerStore):
)
txn.execute(sql, (user_id, room_id))
- self._invalidate_cache_and_stream(
- txn, self.did_forget, (user_id, room_id,),
- )
+ self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
+
return self.runInteraction("forget_membership", f)
@defer.inlineCallbacks
@@ -681,7 +677,7 @@ class RoomMemberStore(RoomMemberWorkerStore):
INSERT_CLUMP_SIZE = 1000
def add_membership_profile_txn(txn):
- sql = ("""
+ sql = """
SELECT stream_ordering, event_id, events.room_id, event_json.json
FROM events
INNER JOIN event_json USING (event_id)
@@ -690,7 +686,7 @@ class RoomMemberStore(RoomMemberWorkerStore):
AND type = 'm.room.member'
ORDER BY stream_ordering DESC
LIMIT ?
- """)
+ """
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
@@ -714,16 +710,14 @@ class RoomMemberStore(RoomMemberWorkerStore):
avatar_url = content.get("avatar_url", None)
if display_name or avatar_url:
- to_update.append((
- display_name, avatar_url, event_id, room_id
- ))
+ to_update.append((display_name, avatar_url, event_id, room_id))
- to_update_sql = ("""
+ to_update_sql = """
UPDATE room_memberships SET display_name = ?, avatar_url = ?
WHERE event_id = ? AND room_id = ?
- """)
+ """
for index in range(0, len(to_update), INSERT_CLUMP_SIZE):
- clump = to_update[index:index + INSERT_CLUMP_SIZE]
+ clump = to_update[index : index + INSERT_CLUMP_SIZE]
txn.executemany(to_update_sql, clump)
progress = {
@@ -796,7 +790,7 @@ class _JoinedHostsCache(object):
self.hosts_to_joined_users.pop(host, None)
else:
joined_users = yield self.store.get_joined_users_from_state(
- self.room_id, state_entry,
+ self.room_id, state_entry
)
self.hosts_to_joined_users = {}
|