diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index ff1f39e86c..f34434bd67 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -29,7 +29,6 @@ FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2"
FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable"
STATIC_PREFIX = "/_matrix/static"
WEB_CLIENT_PREFIX = "/_matrix/client"
-CONTENT_REPO_PREFIX = "/_matrix/content"
SERVER_KEY_V2_PREFIX = "/_matrix/key/v2"
MEDIA_PREFIX = "/_matrix/media/r0"
LEGACY_MEDIA_PREFIX = "/_matrix/media/v1"
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 0e9bf7f53a..e5b44a5eed 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -39,7 +39,6 @@ import synapse
import synapse.config.logger
from synapse import events
from synapse.api.urls import (
- CONTENT_REPO_PREFIX,
FEDERATION_PREFIX,
LEGACY_MEDIA_PREFIX,
MEDIA_PREFIX,
@@ -65,7 +64,6 @@ from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest import ClientRestResource
from synapse.rest.admin import AdminRestResource
from synapse.rest.key.v2 import KeyApiV2Resource
-from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.well_known import WellKnownResource
from synapse.server import HomeServer
from synapse.storage import DataStore
@@ -223,13 +221,7 @@ class SynapseHomeServer(HomeServer):
if self.get_config().enable_media_repo:
media_repo = self.get_media_repository_resource()
resources.update(
- {
- MEDIA_PREFIX: media_repo,
- LEGACY_MEDIA_PREFIX: media_repo,
- CONTENT_REPO_PREFIX: ContentRepoResource(
- self, self.config.uploads_path
- ),
- }
+ {MEDIA_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo}
)
elif name == "media":
raise ConfigError(
@@ -318,7 +310,7 @@ def setup(config_options):
"Synapse Homeserver", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + str(e) + "\n")
+ sys.stderr.write("\nERROR: %s\n" % (e,))
sys.exit(1)
if not config:
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 4c80f257e2..a63c53dc44 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -21,7 +21,7 @@ from twisted.web.resource import NoResource
import synapse
from synapse import events
-from synapse.api.urls import CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
+from synapse.api.urls import LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
@@ -37,7 +37,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.admin import register_servlets_for_media_repo
-from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore
from synapse.util.httpresourcetree import create_resource_tree
@@ -82,9 +81,6 @@ class MediaRepositoryServer(HomeServer):
{
MEDIA_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
- CONTENT_REPO_PREFIX: ContentRepoResource(
- self, self.config.uploads_path
- ),
"/_synapse/admin": admin_resource,
}
)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index dd2132e608..03031ee34d 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -48,7 +48,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.replication.tcp.streams.events import EventsStreamEventRow
+from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow
from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
@@ -371,8 +371,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
def get_currently_syncing_users(self):
return self.presence_handler.get_currently_syncing_users()
- @defer.inlineCallbacks
- def process_and_notify(self, stream_name, token, rows):
+ async def process_and_notify(self, stream_name, token, rows):
try:
if stream_name == "events":
# We shouldn't get multiple rows per token for events stream, so
@@ -380,7 +379,14 @@ class SyncReplicationHandler(ReplicationClientHandler):
for row in rows:
if row.type != EventsStreamEventRow.TypeId:
continue
- event = yield self.store.get_event(row.data.event_id)
+ assert isinstance(row, EventsStreamRow)
+
+ event = await self.store.get_event(
+ row.data.event_id, allow_rejected=True
+ )
+ if event.rejected_reason:
+ continue
+
extra_users = ()
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
@@ -412,11 +418,11 @@ class SyncReplicationHandler(ReplicationClientHandler):
elif stream_name == "device_lists":
all_room_ids = set()
for row in rows:
- room_ids = yield self.store.get_rooms_for_user(row.user_id)
+ room_ids = await self.store.get_rooms_for_user(row.user_id)
all_room_ids.update(room_ids)
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
elif stream_name == "presence":
- yield self.presence_handler.process_replication_rows(token, rows)
+ await self.presence_handler.process_replication_rows(token, rows)
elif stream_name == "receipts":
self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows]
diff --git a/synapse/config/database.py b/synapse/config/database.py
index 134824789c..219b32f670 100644
--- a/synapse/config/database.py
+++ b/synapse/config/database.py
@@ -15,7 +15,6 @@
import logging
import os
from textwrap import indent
-from typing import List
import yaml
@@ -30,16 +29,13 @@ class DatabaseConnectionConfig:
Args:
name: A label for the database, used for logging.
db_config: The config for a particular database, as per `database`
- section of main config. Has two fields: `name` for database
- module name, and `args` for the args to give to the database
- connector.
- data_stores: The list of data stores that should be provisioned on the
- database. Defaults to all data stores.
+ section of main config. Has three fields: `name` for database
+ module name, `args` for the args to give to the database
+ connector, and optional `data_stores` that is a list of stores to
+ provision on this database (defaulting to all).
"""
- def __init__(
- self, name: str, db_config: dict, data_stores: List[str] = ["main", "state"]
- ):
+ def __init__(self, name: str, db_config: dict):
if db_config["name"] not in ("sqlite3", "psycopg2"):
raise ConfigError("Unsupported database type %r" % (db_config["name"],))
@@ -48,6 +44,10 @@ class DatabaseConnectionConfig:
{"cp_min": 1, "cp_max": 1, "check_same_thread": False}
)
+ data_stores = db_config.get("data_stores")
+ if data_stores is None:
+ data_stores = ["main", "state"]
+
self.name = name
self.config = db_config
self.data_stores = data_stores
@@ -59,14 +59,43 @@ class DatabaseConfig(Config):
def read_config(self, config, **kwargs):
self.event_cache_size = self.parse_size(config.get("event_cache_size", "10K"))
+ # We *experimentally* support specifying multiple databases via the
+ # `databases` key. This is a map from a label to database config in the
+ # same format as the `database` config option, plus an extra
+ # `data_stores` key to specify which data store goes where. For example:
+ #
+ # databases:
+ # master:
+ # name: psycopg2
+ # data_stores: ["main"]
+ # args: {}
+ # state:
+ # name: psycopg2
+ # data_stores: ["state"]
+ # args: {}
+
+ multi_database_config = config.get("databases")
database_config = config.get("database")
- if database_config is None:
- database_config = {"name": "sqlite3", "args": {}}
+ if multi_database_config and database_config:
+ raise ConfigError("Can't specify both 'database' and 'datbases' in config")
+
+ if multi_database_config:
+ if config.get("database_path"):
+ raise ConfigError("Can't specify 'database_path' with 'databases'")
+
+ self.databases = [
+ DatabaseConnectionConfig(name, db_conf)
+ for name, db_conf in multi_database_config.items()
+ ]
+
+ else:
+ if database_config is None:
+ database_config = {"name": "sqlite3", "args": {}}
- self.databases = [DatabaseConnectionConfig("master", database_config)]
+ self.databases = [DatabaseConnectionConfig("master", database_config)]
- self.set_databasepath(config.get("database_path"))
+ self.set_databasepath(config.get("database_path"))
def generate_config_section(self, data_dir_path, database_conf, **kwargs):
if not database_conf:
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 75bb904718..a25c70e928 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import argparse
import logging
import logging.config
import os
@@ -37,10 +37,17 @@ from synapse.logging._structured import (
from synapse.logging.context import LoggingContextFilter
from synapse.util.versionstring import get_version_string
-from ._base import Config
+from ._base import Config, ConfigError
DEFAULT_LOG_CONFIG = Template(
- """
+ """\
+# Log configuration for Synapse.
+#
+# This is a YAML file containing a standard Python logging configuration
+# dictionary. See [1] for details on the valid settings.
+#
+# [1]: https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
+
version: 1
formatters:
@@ -81,11 +88,18 @@ disable_existing_loggers: false
"""
)
+LOG_FILE_ERROR = """\
+Support for the log_file configuration option and --log-file command-line option was
+removed in Synapse 1.3.0. You should instead set up a separate log configuration file.
+"""
+
class LoggingConfig(Config):
section = "logging"
def read_config(self, config, **kwargs):
+ if config.get("log_file"):
+ raise ConfigError(LOG_FILE_ERROR)
self.log_config = self.abspath(config.get("log_config"))
self.no_redirect_stdio = config.get("no_redirect_stdio", False)
@@ -106,6 +120,8 @@ class LoggingConfig(Config):
def read_arguments(self, args):
if args.no_redirect_stdio is not None:
self.no_redirect_stdio = args.no_redirect_stdio
+ if args.log_file is not None:
+ raise ConfigError(LOG_FILE_ERROR)
@staticmethod
def add_arguments(parser):
@@ -118,6 +134,10 @@ class LoggingConfig(Config):
help="Do not redirect stdout/stderr to the log",
)
+ logging_group.add_argument(
+ "-f", "--log-file", dest="log_file", help=argparse.SUPPRESS,
+ )
+
def generate_files(self, config, config_dir_path):
log_config = config.get("log_config")
if log_config and not os.path.exists(log_config):
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index d0205e14b9..7d2dd27fd0 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -156,7 +156,6 @@ class ContentRepositoryConfig(Config):
(provider_class, parsed_config, wrapper_config)
)
- self.uploads_path = self.ensure_directory(config.get("uploads_path", "uploads"))
self.dynamic_thumbnails = config.get("dynamic_thumbnails", False)
self.thumbnail_requirements = parse_thumbnail_requirements(
config.get("thumbnail_sizes", DEFAULT_THUMBNAIL_SIZES)
@@ -231,10 +230,6 @@ class ContentRepositoryConfig(Config):
# config:
# directory: /mnt/some/other/directory
- # Directory where in-progress uploads are stored.
- #
- uploads_path: "%(uploads_path)s"
-
# The largest allowed upload size in bytes
#
#max_upload_size: 10M
diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py
index ccaa8a9920..e65bd61d97 100644
--- a/synapse/crypto/event_signing.py
+++ b/synapse/crypto/event_signing.py
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import collections.abc
import hashlib
import logging
@@ -40,8 +40,11 @@ def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
# some malformed events lack a 'hashes'. Protect against it being missing
# or a weird type by basically treating it the same as an unhashed event.
hashes = event.get("hashes")
- if not isinstance(hashes, dict):
- raise SynapseError(400, "Malformed 'hashes'", Codes.UNAUTHORIZED)
+ # nb it might be a frozendict or a dict
+ if not isinstance(hashes, collections.abc.Mapping):
+ raise SynapseError(
+ 400, "Malformed 'hashes': %s" % (type(hashes),), Codes.UNAUTHORIZED
+ )
if name not in hashes:
raise SynapseError(
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 7cfad192e8..6fe5a6a26a 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -511,17 +511,18 @@ class BaseV2KeyFetcher(object):
server_name = response_json["server_name"]
verified = False
for key_id in response_json["signatures"].get(server_name, {}):
- # each of the keys used for the signature must be present in the response
- # json.
key = verify_keys.get(key_id)
if not key:
- raise KeyLookupError(
- "Key response is signed by key id %s:%s but that key is not "
- "present in the response" % (server_name, key_id)
- )
+ # the key may not be present in verify_keys if:
+ # * we got the key from the notary server, and:
+ # * the key belongs to the notary server, and:
+ # * the notary server is using a different key to sign notary
+ # responses.
+ continue
verify_signed_json(response_json, server_name, key.verify_key)
verified = True
+ break
if not verified:
raise KeyLookupError(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4ad752205f..8ea3aca2f4 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -48,7 +48,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
-from synapse.types import RoomAlias, UserID, create_requester
+from synapse.types import Collection, RoomAlias, UserID, create_requester
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.metrics import measure_func
@@ -422,7 +422,7 @@ class EventCreationHandler(object):
event_dict,
token_id=None,
txn_id=None,
- prev_events_and_hashes=None,
+ prev_event_ids: Optional[Collection[str]] = None,
require_consent=True,
):
"""
@@ -439,10 +439,9 @@ class EventCreationHandler(object):
token_id (str)
txn_id (str)
- prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+ prev_event_ids:
the forward extremities to use as the prev_events for the
- new event. For each event, a tuple of (event_id, hashes, depth)
- where *hashes* is a map from algorithm to hash.
+ new event.
If None, they will be requested from the database.
@@ -498,9 +497,7 @@ class EventCreationHandler(object):
builder.internal_metadata.txn_id = txn_id
event, context = yield self.create_new_client_event(
- builder=builder,
- requester=requester,
- prev_events_and_hashes=prev_events_and_hashes,
+ builder=builder, requester=requester, prev_event_ids=prev_event_ids,
)
# In an ideal world we wouldn't need the second part of this condition. However,
@@ -714,7 +711,7 @@ class EventCreationHandler(object):
@measure_func("create_new_client_event")
@defer.inlineCallbacks
def create_new_client_event(
- self, builder, requester=None, prev_events_and_hashes=None
+ self, builder, requester=None, prev_event_ids: Optional[Collection[str]] = None
):
"""Create a new event for a local client
@@ -723,10 +720,9 @@ class EventCreationHandler(object):
requester (synapse.types.Requester|None):
- prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+ prev_event_ids:
the forward extremities to use as the prev_events for the
- new event. For each event, a tuple of (event_id, hashes, depth)
- where *hashes* is a map from algorithm to hash.
+ new event.
If None, they will be requested from the database.
@@ -734,22 +730,15 @@ class EventCreationHandler(object):
Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
"""
- if prev_events_and_hashes is not None:
- assert len(prev_events_and_hashes) <= 10, (
+ if prev_event_ids is not None:
+ assert len(prev_event_ids) <= 10, (
"Attempting to create an event with %i prev_events"
- % (len(prev_events_and_hashes),)
+ % (len(prev_event_ids),)
)
else:
- prev_events_and_hashes = yield self.store.get_prev_events_for_room(
- builder.room_id
- )
-
- prev_events = [
- (event_id, prev_hashes)
- for event_id, prev_hashes, _ in prev_events_and_hashes
- ]
+ prev_event_ids = yield self.store.get_prev_events_for_room(builder.room_id)
- event = yield builder.build(prev_event_ids=[p for p, _ in prev_events])
+ event = yield builder.build(prev_event_ids=prev_event_ids)
context = yield self.state.compute_event_context(event)
if requester:
context.app_service = requester.app_service
@@ -1042,9 +1031,7 @@ class EventCreationHandler(object):
# For each room we need to find a joined member we can use to send
# the dummy event with.
- prev_events_and_hashes = yield self.store.get_prev_events_for_room(room_id)
-
- latest_event_ids = (event_id for (event_id, _, _) in prev_events_and_hashes)
+ latest_event_ids = yield self.store.get_prev_events_for_room(room_id)
members = yield self.state.get_current_users_in_room(
room_id, latest_event_ids=latest_event_ids
@@ -1063,7 +1050,7 @@ class EventCreationHandler(object):
"room_id": room_id,
"sender": user_id,
},
- prev_events_and_hashes=prev_events_and_hashes,
+ prev_event_ids=latest_event_ids,
)
event.internal_metadata.proactively_send = False
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 240c4add12..202aa9294f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -95,12 +95,7 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
class PresenceHandler(object):
- def __init__(self, hs):
- """
-
- Args:
- hs (synapse.server.HomeServer):
- """
+ def __init__(self, hs: "synapse.server.HomeServer"):
self.hs = hs
self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id
@@ -230,7 +225,7 @@ class PresenceHandler(object):
is some spurious presence changes that will self-correct.
"""
# If the DB pool has already terminated, don't try updating
- if not self.store.database.is_running():
+ if not self.store.db.is_running():
return
logger.info(
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 89c9118b26..9cab2adbfb 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -16,6 +16,7 @@
# limitations under the License.
"""Contains functions for performing events on rooms."""
+
import itertools
import logging
import math
@@ -271,7 +272,7 @@ class RoomCreationHandler(BaseHandler):
except AuthError as e:
logger.warning("Unable to update PLs in old room: %s", e)
- logger.info("Setting correct PLs in new room")
+ logger.info("Setting correct PLs in new room to %s", old_room_pl_state.content)
yield self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
@@ -365,13 +366,18 @@ class RoomCreationHandler(BaseHandler):
needed_power_level = max(state_default, ban, max(event_power_levels.values()))
# Raise the requester's power level in the new room if necessary
- current_power_level = power_levels["users"][requester.user.to_string()]
+ current_power_level = power_levels["users"][user_id]
if current_power_level < needed_power_level:
- # Assign this power level to the requester
- power_levels["users"][requester.user.to_string()] = needed_power_level
+ # make sure we copy the event content rather than overwriting it.
+ # note that if frozen_dicts are enabled, `power_levels` will be a frozen
+ # dict so we can't just copy.deepcopy it.
- # Set the power levels to the modified state
- initial_state[(EventTypes.PowerLevels, "")] = power_levels
+ new_power_levels = {k: v for k, v in power_levels.items() if k != "users"}
+ new_power_levels["users"] = {
+ k: v for k, v in power_levels.get("users", {}).items() if k != user_id
+ }
+ new_power_levels["users"][user_id] = needed_power_level
+ initial_state[(EventTypes.PowerLevels, "")] = new_power_levels
yield self._send_events_for_new_room(
requester,
@@ -733,7 +739,7 @@ class RoomCreationHandler(BaseHandler):
initial_state,
creation_content,
room_alias=None,
- power_level_content_override=None,
+ power_level_content_override=None, # Doesn't apply when initial state has power level state event content
creator_join_profile=None,
):
def create(etype, content, **kwargs):
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 44c5e3239c..03bb52ccfb 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -25,7 +25,7 @@ from twisted.internet import defer
from synapse import types
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.types import RoomID, UserID
+from synapse.types import Collection, RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
@@ -149,7 +149,7 @@ class RoomMemberHandler(object):
target,
room_id,
membership,
- prev_events_and_hashes,
+ prev_event_ids: Collection[str],
txn_id=None,
ratelimit=True,
content=None,
@@ -177,7 +177,7 @@ class RoomMemberHandler(object):
},
token_id=requester.access_token_id,
txn_id=txn_id,
- prev_events_and_hashes=prev_events_and_hashes,
+ prev_event_ids=prev_event_ids,
require_consent=require_consent,
)
@@ -370,8 +370,7 @@ class RoomMemberHandler(object):
if block_invite:
raise SynapseError(403, "Invites have been disabled on this server")
- prev_events_and_hashes = yield self.store.get_prev_events_for_room(room_id)
- latest_event_ids = (event_id for (event_id, _, _) in prev_events_and_hashes)
+ latest_event_ids = yield self.store.get_prev_events_for_room(room_id)
current_state_ids = yield self.state_handler.get_current_state_ids(
room_id, latest_event_ids=latest_event_ids
@@ -485,7 +484,7 @@ class RoomMemberHandler(object):
membership=effective_membership_state,
txn_id=txn_id,
ratelimit=ratelimit,
- prev_events_and_hashes=prev_events_and_hashes,
+ prev_event_ids=latest_event_ids,
content=content,
require_consent=require_consent,
)
@@ -507,6 +506,8 @@ class RoomMemberHandler(object):
Returns:
Deferred
"""
+ logger.info("Transferring room state from %s to %s", old_room_id, room_id)
+
# Find all local users that were in the old room and copy over each user's state
users = yield self.store.get_users_in_room(old_room_id)
yield self.copy_user_state_on_room_upgrade(old_room_id, room_id, users)
diff --git a/synapse/http/site.py b/synapse/http/site.py
index ff8184a3d0..9f2d035fa0 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -47,9 +47,9 @@ class SynapseRequest(Request):
logcontext(LoggingContext) : the log context for this request
"""
- def __init__(self, site, channel, *args, **kw):
+ def __init__(self, channel, *args, **kw):
Request.__init__(self, channel, *args, **kw)
- self.site = site
+ self.site = channel.site
self._channel = channel # this is used by the tests
self.authenticated_entity = None
self.start_time = 0
@@ -331,18 +331,6 @@ class XForwardedForRequest(SynapseRequest):
)
-class SynapseRequestFactory(object):
- def __init__(self, site, x_forwarded_for):
- self.site = site
- self.x_forwarded_for = x_forwarded_for
-
- def __call__(self, *args, **kwargs):
- if self.x_forwarded_for:
- return XForwardedForRequest(self.site, *args, **kwargs)
- else:
- return SynapseRequest(self.site, *args, **kwargs)
-
-
class SynapseSite(Site):
"""
Subclass of a twisted http Site that does access logging with python's
@@ -364,7 +352,7 @@ class SynapseSite(Site):
self.site_tag = site_tag
proxied = config.get("x_forwarded", False)
- self.requestFactory = SynapseRequestFactory(self, proxied)
+ self.requestFactory = XForwardedForRequest if proxied else SynapseRequest
self.access_logger = logging.getLogger(logger_name)
self.server_version_string = server_version_string.encode("ascii")
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index fead78388c..bbcb84646c 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -46,7 +46,8 @@ class ReplicationClientFactory(ReconnectingClientFactory):
is required.
"""
- maxDelay = 30 # Try at least once every N seconds
+ initialDelay = 0.1
+ maxDelay = 1 # Try at least once every N seconds
def __init__(self, hs, client_name, handler: AbstractReplicationClientHandler):
self.client_name = client_name
diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py
index e7fc3f0431..bf5e0eb844 100644
--- a/synapse/rest/key/v2/remote_key_resource.py
+++ b/synapse/rest/key/v2/remote_key_resource.py
@@ -15,6 +15,7 @@
import logging
from canonicaljson import encode_canonical_json, json
+from signedjson.key import encode_verify_key_base64
from signedjson.sign import sign_json
from twisted.internet import defer
@@ -216,15 +217,28 @@ class RemoteKey(DirectServeResource):
if cache_misses and query_remote_on_cache_miss:
yield self.fetcher.get_keys(cache_misses)
yield self.query_keys(request, query, query_remote_on_cache_miss=False)
- else:
- signed_keys = []
- for key_json in json_results:
- key_json = json.loads(key_json)
+ return
+
+ signed_keys = []
+ for key_json in json_results:
+ key_json = json.loads(key_json)
+
+ # backwards-compatibility hack for #6596: if the requested key belongs
+ # to us, make sure that all of the signing keys appear in the
+ # "verify_keys" section.
+ if key_json["server_name"] == self.config.server_name:
+ verify_keys = key_json["verify_keys"]
for signing_key in self.config.key_server_signing_keys:
- key_json = sign_json(key_json, self.config.server_name, signing_key)
+ key_id = "%s:%s" % (signing_key.alg, signing_key.version)
+ verify_keys[key_id] = {
+ "key": encode_verify_key_base64(signing_key.verify_key)
+ }
+
+ for signing_key in self.config.key_server_signing_keys:
+ key_json = sign_json(key_json, self.config.server_name, signing_key)
- signed_keys.append(key_json)
+ signed_keys.append(key_json)
- results = {"server_keys": signed_keys}
+ results = {"server_keys": signed_keys}
- respond_with_json_bytes(request, 200, encode_canonical_json(results))
+ respond_with_json_bytes(request, 200, encode_canonical_json(results))
diff --git a/synapse/rest/media/v0/__init__.py b/synapse/rest/media/v0/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
--- a/synapse/rest/media/v0/__init__.py
+++ /dev/null
diff --git a/synapse/rest/media/v0/content_repository.py b/synapse/rest/media/v0/content_repository.py
deleted file mode 100644
index 86884c0ef4..0000000000
--- a/synapse/rest/media/v0/content_repository.py
+++ /dev/null
@@ -1,103 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import base64
-import logging
-import os
-import re
-
-from canonicaljson import json
-
-from twisted.protocols.basic import FileSender
-from twisted.web import resource, server
-
-from synapse.api.errors import Codes, cs_error
-from synapse.http.server import finish_request, respond_with_json_bytes
-
-logger = logging.getLogger(__name__)
-
-
-class ContentRepoResource(resource.Resource):
- """Provides file uploading and downloading.
-
- Uploads are POSTed to wherever this Resource is linked to. This resource
- returns a "content token" which can be used to GET this content again. The
- token is typically a path, but it may not be. Tokens can expire, be
- one-time uses, etc.
-
- In this case, the token is a path to the file and contains 3 interesting
- sections:
- - User ID base64d (for namespacing content to each user)
- - random 24 char string
- - Content type base64d (so we can return it when clients GET it)
-
- """
-
- isLeaf = True
-
- def __init__(self, hs, directory):
- resource.Resource.__init__(self)
- self.hs = hs
- self.directory = directory
-
- def render_GET(self, request):
- # no auth here on purpose, to allow anyone to view, even across home
- # servers.
-
- # TODO: A little crude here, we could do this better.
- filename = request.path.decode("ascii").split("/")[-1]
- # be paranoid
- filename = re.sub("[^0-9A-z.-_]", "", filename)
-
- file_path = self.directory + "/" + filename
-
- logger.debug("Searching for %s", file_path)
-
- if os.path.isfile(file_path):
- # filename has the content type
- base64_contentype = filename.split(".")[1]
- content_type = base64.urlsafe_b64decode(base64_contentype)
- logger.info("Sending file %s", file_path)
- f = open(file_path, "rb")
- request.setHeader("Content-Type", content_type)
-
- # cache for at least a day.
- # XXX: we might want to turn this off for data we don't want to
- # recommend caching as it's sensitive or private - or at least
- # select private. don't bother setting Expires as all our matrix
- # clients are smart enough to be happy with Cache-Control (right?)
- request.setHeader(b"Cache-Control", b"public,max-age=86400,s-maxage=86400")
-
- d = FileSender().beginFileTransfer(f, request)
-
- # after the file has been sent, clean up and finish the request
- def cbFinished(ignored):
- f.close()
- finish_request(request)
-
- d.addCallback(cbFinished)
- else:
- respond_with_json_bytes(
- request,
- 404,
- json.dumps(cs_error("Not found", code=Codes.NOT_FOUND)),
- send_cors=True,
- )
-
- return server.NOT_DONE_YET
-
- def render_OPTIONS(self, request):
- respond_with_json_bytes(request, 200, {}, send_cors=True)
- return server.NOT_DONE_YET
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 88546ad614..3bb9381663 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -16,6 +16,7 @@
# limitations under the License.
import logging
import random
+from abc import ABCMeta
from six import PY2
from six.moves import builtins
@@ -30,7 +31,8 @@ from synapse.types import get_domain_from_id
logger = logging.getLogger(__name__)
-class SQLBaseStore(object):
+# some of our subclasses have abstract methods, so we use the ABCMeta metaclass.
+class SQLBaseStore(metaclass=ABCMeta):
"""Base class for data stores that holds helper functions.
Note that multiple instances of this class will exist as there will be one
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 4f97fd5ab6..bd547f35cf 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -14,6 +14,7 @@
# limitations under the License.
import logging
+from typing import Optional
from canonicaljson import json
@@ -97,15 +98,14 @@ class BackgroundUpdater(object):
def start_doing_background_updates(self):
run_as_background_process("background_updates", self.run_background_updates)
- @defer.inlineCallbacks
- def run_background_updates(self, sleep=True):
+ async def run_background_updates(self, sleep=True):
logger.info("Starting background schema updates")
while True:
if sleep:
- yield self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
+ await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
try:
- result = yield self.do_next_background_update(
+ result = await self.do_next_background_update(
self.BACKGROUND_UPDATE_DURATION_MS
)
except Exception:
@@ -170,20 +170,21 @@ class BackgroundUpdater(object):
return not update_exists
- @defer.inlineCallbacks
- def do_next_background_update(self, desired_duration_ms):
+ async def do_next_background_update(
+ self, desired_duration_ms: float
+ ) -> Optional[int]:
"""Does some amount of work on the next queued background update
+ Returns once some amount of work is done.
+
Args:
desired_duration_ms(float): How long we want to spend
updating.
Returns:
- A deferred that completes once some amount of work is done.
- The deferred will have a value of None if there is currently
- no more work to do.
+ None if there is no more work to do, otherwise an int
"""
if not self._background_update_queue:
- updates = yield self.db.simple_select_list(
+ updates = await self.db.simple_select_list(
"background_updates",
keyvalues=None,
retcols=("update_name", "depends_on"),
@@ -201,11 +202,12 @@ class BackgroundUpdater(object):
update_name = self._background_update_queue.pop(0)
self._background_update_queue.append(update_name)
- res = yield self._do_background_update(update_name, desired_duration_ms)
+ res = await self._do_background_update(update_name, desired_duration_ms)
return res
- @defer.inlineCallbacks
- def _do_background_update(self, update_name, desired_duration_ms):
+ async def _do_background_update(
+ self, update_name: str, desired_duration_ms: float
+ ) -> int:
logger.info("Starting update batch on background update '%s'", update_name)
update_handler = self._background_update_handlers[update_name]
@@ -225,7 +227,7 @@ class BackgroundUpdater(object):
else:
batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
- progress_json = yield self.db.simple_select_one_onecol(
+ progress_json = await self.db.simple_select_one_onecol(
"background_updates",
keyvalues={"update_name": update_name},
retcol="progress_json",
@@ -234,7 +236,7 @@ class BackgroundUpdater(object):
progress = json.loads(progress_json)
time_start = self._clock.time_msec()
- items_updated = yield update_handler(progress, batch_size)
+ items_updated = await update_handler(progress, batch_size)
time_stop = self._clock.time_msec()
duration_ms = time_stop - time_start
@@ -263,7 +265,9 @@ class BackgroundUpdater(object):
* A dict of the current progress
* An integer count of the number of items to update in this batch.
- The handler should return a deferred integer count of items updated.
+ The handler should return a deferred or coroutine which returns an integer count
+ of items updated.
+
The handler is responsible for updating the progress of the update.
Args:
@@ -432,6 +436,21 @@ class BackgroundUpdater(object):
"background_updates", keyvalues={"update_name": update_name}
)
+ def _background_update_progress(self, update_name: str, progress: dict):
+ """Update the progress of a background update
+
+ Args:
+ update_name: The name of the background update task
+ progress: The progress of the update.
+ """
+
+ return self.db.runInteraction(
+ "background_update_progress",
+ self._background_update_progress_txn,
+ update_name,
+ progress,
+ )
+
def _background_update_progress_txn(self, txn, update_name, progress):
"""Update the progress of a background update
diff --git a/synapse/storage/data_stores/__init__.py b/synapse/storage/data_stores/__init__.py
index d20df5f076..092e803799 100644
--- a/synapse/storage/data_stores/__init__.py
+++ b/synapse/storage/data_stores/__init__.py
@@ -37,6 +37,8 @@ class DataStores(object):
# store.
self.databases = []
+ self.main = None
+ self.state = None
for database_config in hs.config.database.databases:
db_name = database_config.name
@@ -54,10 +56,22 @@ class DataStores(object):
if "main" in database_config.data_stores:
logger.info("Starting 'main' data store")
+
+ # Sanity check we don't try and configure the main store on
+ # multiple databases.
+ if self.main:
+ raise Exception("'main' data store already configured")
+
self.main = main_store_class(database, db_conn, hs)
if "state" in database_config.data_stores:
logger.info("Starting 'state' data store")
+
+ # Sanity check we don't try and configure the state store on
+ # multiple databases.
+ if self.state:
+ raise Exception("'state' data store already configured")
+
self.state = StateGroupDataStore(database, db_conn, hs)
db_conn.commit()
@@ -65,3 +79,10 @@ class DataStores(object):
self.databases.append(database)
logger.info("Database %r prepared", db_name)
+
+ # Sanity check that we have actually configured all the required stores.
+ if not self.main:
+ raise Exception("No 'main' data store configured")
+
+ if not self.state:
+ raise Exception("No 'main' data store configured")
diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py
index 1d18f13801..60c67457b4 100644
--- a/synapse/storage/data_stores/main/event_federation.py
+++ b/synapse/storage/data_stores/main/event_federation.py
@@ -14,13 +14,10 @@
# limitations under the License.
import itertools
import logging
-import random
from six.moves import range
from six.moves.queue import Empty, PriorityQueue
-from unpaddedbase64 import encode_base64
-
from twisted.internet import defer
from synapse.api.errors import StoreError
@@ -148,8 +145,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
retcol="event_id",
)
- @defer.inlineCallbacks
- def get_prev_events_for_room(self, room_id):
+ def get_prev_events_for_room(self, room_id: str):
"""
Gets a subset of the current forward extremities in the given room.
@@ -160,40 +156,29 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
room_id (str): room_id
Returns:
- Deferred[list[(str, dict[str, str], int)]]
- for each event, a tuple of (event_id, hashes, depth)
- where *hashes* is a map from algorithm to hash.
+ Deferred[List[str]]: the event ids of the forward extremites
+
"""
- res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
- if len(res) > 10:
- # Sort by reverse depth, so we point to the most recent.
- res.sort(key=lambda a: -a[2])
- # we use half of the limit for the actual most recent events, and
- # the other half to randomly point to some of the older events, to
- # make sure that we don't completely ignore the older events.
- res = res[0:5] + random.sample(res[5:], 5)
+ return self.db.runInteraction(
+ "get_prev_events_for_room", self._get_prev_events_for_room_txn, room_id
+ )
- return res
+ def _get_prev_events_for_room_txn(self, txn, room_id: str):
+ # we just use the 10 newest events. Older events will become
+ # prev_events of future events.
- def get_latest_event_ids_and_hashes_in_room(self, room_id):
+ sql = """
+ SELECT e.event_id FROM event_forward_extremities AS f
+ INNER JOIN events AS e USING (event_id)
+ WHERE f.room_id = ?
+ ORDER BY e.depth DESC
+ LIMIT 10
"""
- Gets the current forward extremities in the given room
- Args:
- room_id (str): room_id
+ txn.execute(sql, (room_id,))
- Returns:
- Deferred[list[(str, dict[str, str], int)]]
- for each event, a tuple of (event_id, hashes, depth)
- where *hashes* is a map from algorithm to hash.
- """
-
- return self.db.runInteraction(
- "get_latest_event_ids_and_hashes_in_room",
- self._get_latest_event_ids_and_hashes_in_room,
- room_id,
- )
+ return [row[0] for row in txn]
def get_rooms_with_many_extremities(self, min_count, limit, room_id_filter):
"""Get the top rooms with at least N extremities.
@@ -243,27 +228,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
desc="get_latest_event_ids_in_room",
)
- def _get_latest_event_ids_and_hashes_in_room(self, txn, room_id):
- sql = (
- "SELECT e.event_id, e.depth FROM events as e "
- "INNER JOIN event_forward_extremities as f "
- "ON e.event_id = f.event_id "
- "AND e.room_id = f.room_id "
- "WHERE f.room_id = ?"
- )
-
- txn.execute(sql, (room_id,))
-
- results = []
- for event_id, depth in txn.fetchall():
- hashes = self._get_event_reference_hashes_txn(txn, event_id)
- prev_hashes = {
- k: encode_base64(v) for k, v in hashes.items() if k == "sha256"
- }
- results.append((event_id, prev_hashes, depth))
-
- return results
-
def get_min_depth(self, room_id):
""" For hte given room, get the minimum depth we have seen for it.
"""
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 2c9142814c..0cce5232f5 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -137,7 +137,7 @@ class EventsWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def get_event(
self,
- event_id: List[str],
+ event_id: str,
redact_behaviour: EventRedactBehaviour = EventRedactBehaviour.REDACT,
get_prev_content: bool = False,
allow_rejected: bool = False,
@@ -148,15 +148,22 @@ class EventsWorkerStore(SQLBaseStore):
Args:
event_id: The event_id of the event to fetch
+
redact_behaviour: Determine what to do with a redacted event. Possible values:
* AS_IS - Return the full event body with no redacted content
* REDACT - Return the event but with a redacted body
- * DISALLOW - Do not return redacted events
+ * DISALLOW - Do not return redacted events (behave as per allow_none
+ if the event is redacted)
+
get_prev_content: If True and event is a state event,
include the previous states content in the unsigned field.
- allow_rejected: If True return rejected events.
+
+ allow_rejected: If True, return rejected events. Otherwise,
+ behave as per allow_none.
+
allow_none: If True, return None if no event found, if
False throw a NotFoundError
+
check_room_id: if not None, check the room of the found event.
If there is a mismatch, behave as per allow_none.
@@ -196,14 +203,18 @@ class EventsWorkerStore(SQLBaseStore):
Args:
event_ids: The event_ids of the events to fetch
+
redact_behaviour: Determine what to do with a redacted event. Possible
values:
* AS_IS - Return the full event body with no redacted content
* REDACT - Return the event but with a redacted body
- * DISALLOW - Do not return redacted events
+ * DISALLOW - Do not return redacted events (omit them from the response)
+
get_prev_content: If True and event is a state event,
include the previous states content in the unsigned field.
- allow_rejected: If True return rejected events.
+
+ allow_rejected: If True, return rejected events. Otherwise,
+ omits rejeted events from the response.
Returns:
Deferred : Dict from event_id to event.
@@ -228,15 +239,21 @@ class EventsWorkerStore(SQLBaseStore):
"""Get events from the database and return in a list in the same order
as given by `event_ids` arg.
+ Unknown events will be omitted from the response.
+
Args:
event_ids: The event_ids of the events to fetch
+
redact_behaviour: Determine what to do with a redacted event. Possible values:
* AS_IS - Return the full event body with no redacted content
* REDACT - Return the event but with a redacted body
- * DISALLOW - Do not return redacted events
+ * DISALLOW - Do not return redacted events (omit them from the response)
+
get_prev_content: If True and event is a state event,
include the previous states content in the unsigned field.
- allow_rejected: If True, return rejected events.
+
+ allow_rejected: If True, return rejected events. Otherwise,
+ omits rejected events from the response.
Returns:
Deferred[list[EventBase]]: List of events fetched from the database. The
@@ -369,9 +386,14 @@ class EventsWorkerStore(SQLBaseStore):
If events are pulled from the database, they will be cached for future lookups.
+ Unknown events are omitted from the response.
+
Args:
+
event_ids (Iterable[str]): The event_ids of the events to fetch
- allow_rejected (bool): Whether to include rejected events
+
+ allow_rejected (bool): Whether to include rejected events. If False,
+ rejected events are omitted from the response.
Returns:
Deferred[Dict[str, _EventCacheEntry]]:
@@ -506,9 +528,13 @@ class EventsWorkerStore(SQLBaseStore):
Returned events will be added to the cache for future lookups.
+ Unknown events are omitted from the response.
+
Args:
event_ids (Iterable[str]): The event_ids of the events to fetch
- allow_rejected (bool): Whether to include rejected events
+
+ allow_rejected (bool): Whether to include rejected events. If False,
+ rejected events are omitted from the response.
Returns:
Deferred[Dict[str, _EventCacheEntry]]:
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index aa476d0fbf..79cfd39194 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -17,6 +17,7 @@
import collections
import logging
import re
+from abc import abstractmethod
from typing import Optional, Tuple
from six import integer_types
@@ -367,6 +368,8 @@ class RoomWorkerStore(SQLBaseStore):
class RoomBackgroundUpdateStore(SQLBaseStore):
+ REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
+
def __init__(self, database: Database, db_conn, hs):
super(RoomBackgroundUpdateStore, self).__init__(database, db_conn, hs)
@@ -376,6 +379,11 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
"insert_room_retention", self._background_insert_retention,
)
+ self.db.updates.register_background_update_handler(
+ self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE,
+ self._remove_tombstoned_rooms_from_directory,
+ )
+
@defer.inlineCallbacks
def _background_insert_retention(self, progress, batch_size):
"""Retrieves a list of all rooms within a range and inserts an entry for each of
@@ -444,6 +452,62 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
defer.returnValue(batch_size)
+ async def _remove_tombstoned_rooms_from_directory(
+ self, progress, batch_size
+ ) -> int:
+ """Removes any rooms with tombstone events from the room directory
+
+ Nowadays this is handled by the room upgrade handler, but we may have some
+ that got left behind
+ """
+
+ last_room = progress.get("room_id", "")
+
+ def _get_rooms(txn):
+ txn.execute(
+ """
+ SELECT room_id
+ FROM rooms r
+ INNER JOIN current_state_events cse USING (room_id)
+ WHERE room_id > ? AND r.is_public
+ AND cse.type = '%s' AND cse.state_key = ''
+ ORDER BY room_id ASC
+ LIMIT ?;
+ """
+ % EventTypes.Tombstone,
+ (last_room, batch_size),
+ )
+
+ return [row[0] for row in txn]
+
+ rooms = await self.db.runInteraction(
+ "get_tombstoned_directory_rooms", _get_rooms
+ )
+
+ if not rooms:
+ await self.db.updates._end_background_update(
+ self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE
+ )
+ return 0
+
+ for room_id in rooms:
+ logger.info("Removing tombstoned room %s from the directory", room_id)
+ await self.set_room_is_public(room_id, False)
+
+ await self.db.updates._background_update_progress(
+ self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]}
+ )
+
+ return len(rooms)
+
+ @abstractmethod
+ def set_room_is_public(self, room_id, is_public):
+ # this will need to be implemented if a background update is performed with
+ # existing (tombstoned, public) rooms in the database.
+ #
+ # It's overridden by RoomStore for the synapse master.
+ raise NotImplementedError()
+
class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
def __init__(self, database: Database, db_conn, hs):
diff --git a/synapse/storage/data_stores/main/schema/delta/56/remove_tombstoned_rooms_from_directory.sql b/synapse/storage/data_stores/main/schema/delta/56/remove_tombstoned_rooms_from_directory.sql
new file mode 100644
index 0000000000..aeb17813d3
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/remove_tombstoned_rooms_from_directory.sql
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Now that #6232 is a thing, we can remove old rooms from the directory.
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('remove_tombstoned_rooms_from_directory', '{}');
diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py
index 0dc39f139c..d07440e3ed 100644
--- a/synapse/storage/data_stores/main/state.py
+++ b/synapse/storage/data_stores/main/state.py
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import collections.abc
import logging
from collections import namedtuple
from typing import Iterable, Tuple
@@ -107,7 +107,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
predecessor = create_event.content.get("predecessor", None)
# Ensure the key is a dictionary
- if not isinstance(predecessor, dict):
+ if not isinstance(predecessor, collections.abc.Mapping):
return None
return predecessor
diff --git a/synapse/types.py b/synapse/types.py
index aafc3ffe74..cd996c0b5a 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -15,6 +15,7 @@
# limitations under the License.
import re
import string
+import sys
from collections import namedtuple
import attr
@@ -23,6 +24,17 @@ from unpaddedbase64 import decode_base64
from synapse.api.errors import SynapseError
+# define a version of typing.Collection that works on python 3.5
+if sys.version_info[:3] >= (3, 6, 0):
+ from typing import Collection
+else:
+ from typing import Sized, Iterable, Container, TypeVar
+
+ T_co = TypeVar("T_co", covariant=True)
+
+ class Collection(Iterable[T_co], Container[T_co], Sized):
+ __slots__ = ()
+
class Requester(
namedtuple(
|