diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index bc90605324..6da6a1b62e 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -100,11 +100,6 @@ class ApplicationServiceApi(SimpleHttpClient):
logger.warning("push_bulk to %s threw exception %s", uri, ex)
defer.returnValue(False)
- @defer.inlineCallbacks
- def push(self, service, event, txn_id=None):
- response = yield self.push_bulk(service, [event], txn_id)
- defer.returnValue(response)
-
def _serialize(self, events):
time_now = self.clock.time_msec()
return [
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index acf74c8761..9a80ac39ec 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -30,13 +30,14 @@ from .saml2 import SAML2Config
from .cas import CasConfig
from .password import PasswordConfig
from .jwt import JWTConfig
+from .ldap import LDAPConfig
class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
- JWTConfig, PasswordConfig,):
+ JWTConfig, LDAPConfig, PasswordConfig,):
pass
diff --git a/synapse/config/ldap.py b/synapse/config/ldap.py
new file mode 100644
index 0000000000..9c14593a99
--- /dev/null
+++ b/synapse/config/ldap.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 Niklas Riekenbrauck
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import Config
+
+
+class LDAPConfig(Config):
+ def read_config(self, config):
+ ldap_config = config.get("ldap_config", None)
+ if ldap_config:
+ self.ldap_enabled = ldap_config.get("enabled", False)
+ self.ldap_server = ldap_config["server"]
+ self.ldap_port = ldap_config["port"]
+ self.ldap_tls = ldap_config.get("tls", False)
+ self.ldap_search_base = ldap_config["search_base"]
+ self.ldap_search_property = ldap_config["search_property"]
+ self.ldap_email_property = ldap_config["email_property"]
+ self.ldap_full_name_property = ldap_config["full_name_property"]
+ else:
+ self.ldap_enabled = False
+ self.ldap_server = None
+ self.ldap_port = None
+ self.ldap_tls = False
+ self.ldap_search_base = None
+ self.ldap_search_property = None
+ self.ldap_email_property = None
+ self.ldap_full_name_property = None
+
+ def default_config(self, **kwargs):
+ return """\
+ # ldap_config:
+ # enabled: true
+ # server: "ldap://localhost"
+ # port: 389
+ # tls: false
+ # search_base: "ou=Users,dc=example,dc=com"
+ # search_property: "cn"
+ # email_property: "email"
+ # full_name_property: "givenName"
+ """
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index d5d6faa85f..7a13a8b11c 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -49,6 +49,21 @@ class AuthHandler(BaseHandler):
self.sessions = {}
self.INVALID_TOKEN_HTTP_STATUS = 401
+ self.ldap_enabled = hs.config.ldap_enabled
+ self.ldap_server = hs.config.ldap_server
+ self.ldap_port = hs.config.ldap_port
+ self.ldap_tls = hs.config.ldap_tls
+ self.ldap_search_base = hs.config.ldap_search_base
+ self.ldap_search_property = hs.config.ldap_search_property
+ self.ldap_email_property = hs.config.ldap_email_property
+ self.ldap_full_name_property = hs.config.ldap_full_name_property
+
+ if self.ldap_enabled is True:
+ import ldap
+ logger.info("Import ldap version: %s", ldap.__version__)
+
+ self.hs = hs # FIXME better possibility to access registrationHandler later?
+
@defer.inlineCallbacks
def check_auth(self, flows, clientdict, clientip):
"""
@@ -215,8 +230,10 @@ class AuthHandler(BaseHandler):
if not user_id.startswith('@'):
user_id = UserID.create(user_id, self.hs.hostname).to_string()
- user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id)
- self._check_password(user_id, password, password_hash)
+ if not (yield self._check_password(user_id, password)):
+ logger.warn("Failed password login for user %s", user_id)
+ raise LoginError(403, "", errcode=Codes.FORBIDDEN)
+
defer.returnValue(user_id)
@defer.inlineCallbacks
@@ -340,8 +357,10 @@ class AuthHandler(BaseHandler):
StoreError if there was a problem storing the token.
LoginError if there was an authentication problem.
"""
- user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id)
- self._check_password(user_id, password, password_hash)
+
+ if not (yield self._check_password(user_id, password)):
+ logger.warn("Failed password login for user %s", user_id)
+ raise LoginError(403, "", errcode=Codes.FORBIDDEN)
logger.info("Logging in user %s", user_id)
access_token = yield self.issue_access_token(user_id)
@@ -407,11 +426,60 @@ class AuthHandler(BaseHandler):
else:
defer.returnValue(user_infos.popitem())
- def _check_password(self, user_id, password, stored_hash):
- """Checks that user_id has passed password, raises LoginError if not."""
- if not self.validate_hash(password, stored_hash):
- logger.warn("Failed password login for user %s", user_id)
- raise LoginError(403, "", errcode=Codes.FORBIDDEN)
+ @defer.inlineCallbacks
+ def _check_password(self, user_id, password):
+ defer.returnValue(
+ not (
+ (yield self._check_ldap_password(user_id, password))
+ or
+ (yield self._check_local_password(user_id, password))
+ ))
+
+ @defer.inlineCallbacks
+ def _check_local_password(self, user_id, password):
+ try:
+ user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id)
+ defer.returnValue(not self.validate_hash(password, password_hash))
+ except LoginError:
+ defer.returnValue(False)
+
+ @defer.inlineCallbacks
+ def _check_ldap_password(self, user_id, password):
+ if self.ldap_enabled is not True:
+ logger.debug("LDAP not configured")
+ defer.returnValue(False)
+
+ import ldap
+
+ logger.info("Authenticating %s with LDAP" % user_id)
+ try:
+ ldap_url = "%s:%s" % (self.ldap_server, self.ldap_port)
+ logger.debug("Connecting LDAP server at %s" % ldap_url)
+ l = ldap.initialize(ldap_url)
+ if self.ldap_tls:
+ logger.debug("Initiating TLS")
+ self._connection.start_tls_s()
+
+ local_name = UserID.from_string(user_id).localpart
+
+ dn = "%s=%s, %s" % (
+ self.ldap_search_property,
+ local_name,
+ self.ldap_search_base)
+ logger.debug("DN for LDAP authentication: %s" % dn)
+
+ l.simple_bind_s(dn.encode('utf-8'), password.encode('utf-8'))
+
+ if not (yield self.does_user_exist(user_id)):
+ handler = self.hs.get_handlers().registration_handler
+ user_id, access_token = (
+ yield handler.register(localpart=local_name)
+ )
+
+ defer.returnValue(True)
+ except ldap.LDAPError, e:
+ logger.warn("LDAP error: %s", e)
+ defer.returnValue(False)
@defer.inlineCallbacks
def issue_access_token(self, user_id):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index eb02f0e000..c28226f840 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -40,6 +40,7 @@ from synapse.events.utils import prune_event
from synapse.util.retryutils import NotRetryingDestination
from synapse.push.action_generator import ActionGenerator
+from synapse.util.distributor import user_joined_room
from twisted.internet import defer
@@ -49,10 +50,6 @@ import logging
logger = logging.getLogger(__name__)
-def user_joined_room(distributor, user, room_id):
- return distributor.fire("user_joined_room", user, room_id)
-
-
class FederationHandler(BaseHandler):
"""Handles events that originated from federation.
Responsible for:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 10608c0dd9..f51feda2f4 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -34,10 +34,6 @@ import logging
logger = logging.getLogger(__name__)
-def collect_presencelike_data(distributor, user, content):
- return distributor.fire("collect_presencelike_data", user, content)
-
-
class MessageHandler(BaseHandler):
def __init__(self, hs):
@@ -49,35 +45,6 @@ class MessageHandler(BaseHandler):
self.snapshot_cache = SnapshotCache()
@defer.inlineCallbacks
- def get_message(self, msg_id=None, room_id=None, sender_id=None,
- user_id=None):
- """ Retrieve a message.
-
- Args:
- msg_id (str): The message ID to obtain.
- room_id (str): The room where the message resides.
- sender_id (str): The user ID of the user who sent the message.
- user_id (str): The user ID of the user making this request.
- Returns:
- The message, or None if no message exists.
- Raises:
- SynapseError if something went wrong.
- """
- yield self.auth.check_joined_room(room_id, user_id)
-
- # Pull out the message from the db
-# msg = yield self.store.get_message(
-# room_id=room_id,
-# msg_id=msg_id,
-# user_id=sender_id
-# )
-
- # TODO (erikj): Once we work out the correct c-s api we need to think
- # on how to do this.
-
- defer.returnValue(None)
-
- @defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
as_client_event=True):
"""Get messages in a room.
@@ -202,12 +169,8 @@ class MessageHandler(BaseHandler):
membership = builder.content.get("membership", None)
target = UserID.from_string(builder.state_key)
- if membership == Membership.JOIN:
+ if membership in {Membership.JOIN, Membership.INVITE}:
# If event doesn't include a display name, add one.
- yield collect_presencelike_data(
- self.distributor, target, builder.content
- )
- elif membership == Membership.INVITE:
profile = self.hs.get_handlers().profile_handler
content = builder.content
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index b45eafbb49..e37409170d 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -17,7 +17,6 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError, CodeMessageException
from synapse.types import UserID, Requester
-from synapse.util import unwrapFirstError
from ._base import BaseHandler
@@ -27,14 +26,6 @@ import logging
logger = logging.getLogger(__name__)
-def changed_presencelike_data(distributor, user, state):
- return distributor.fire("changed_presencelike_data", user, state)
-
-
-def collect_presencelike_data(distributor, user, content):
- return distributor.fire("collect_presencelike_data", user, content)
-
-
class ProfileHandler(BaseHandler):
def __init__(self, hs):
@@ -46,17 +37,9 @@ class ProfileHandler(BaseHandler):
)
distributor = hs.get_distributor()
- self.distributor = distributor
-
- distributor.declare("collect_presencelike_data")
- distributor.declare("changed_presencelike_data")
distributor.observe("registered_user", self.registered_user)
- distributor.observe(
- "collect_presencelike_data", self.collect_presencelike_data
- )
-
def registered_user(self, user):
return self.store.create_profile(user.localpart)
@@ -105,10 +88,6 @@ class ProfileHandler(BaseHandler):
target_user.localpart, new_displayname
)
- yield changed_presencelike_data(self.distributor, target_user, {
- "displayname": new_displayname,
- })
-
yield self._update_join_states(requester)
@defer.inlineCallbacks
@@ -152,31 +131,9 @@ class ProfileHandler(BaseHandler):
target_user.localpart, new_avatar_url
)
- yield changed_presencelike_data(self.distributor, target_user, {
- "avatar_url": new_avatar_url,
- })
-
yield self._update_join_states(requester)
@defer.inlineCallbacks
- def collect_presencelike_data(self, user, state):
- if not self.hs.is_mine(user):
- defer.returnValue(None)
-
- (displayname, avatar_url) = yield defer.gatherResults(
- [
- self.store.get_profile_displayname(user.localpart),
- self.store.get_profile_avatar_url(user.localpart),
- ],
- consumeErrors=True
- ).addErrback(unwrapFirstError)
-
- state["displayname"] = displayname
- state["avatar_url"] = avatar_url
-
- defer.returnValue(None)
-
- @defer.inlineCallbacks
def on_profile_query(self, args):
user = UserID.from_string(args["user_id"])
if not self.hs.is_mine(user):
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index f287ee247b..b0862067e1 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -23,6 +23,7 @@ from synapse.api.errors import (
from ._base import BaseHandler
from synapse.util.async import run_on_reactor
from synapse.http.client import CaptchaServerHttpClient
+from synapse.util.distributor import registered_user
import logging
import urllib
@@ -30,10 +31,6 @@ import urllib
logger = logging.getLogger(__name__)
-def registered_user(distributor, user):
- return distributor.fire("registered_user", user)
-
-
class RegistrationHandler(BaseHandler):
def __init__(self, hs):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 3e1d9282d7..ea306cd42a 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -25,7 +25,6 @@ from synapse.api.constants import (
from synapse.api.errors import AuthError, StoreError, SynapseError
from synapse.util import stringutils
from synapse.util.async import concurrently_execute
-from synapse.util.logcontext import preserve_context_over_fn
from synapse.util.caches.response_cache import ResponseCache
from collections import OrderedDict
@@ -39,20 +38,6 @@ logger = logging.getLogger(__name__)
id_server_scheme = "https://"
-def user_left_room(distributor, user, room_id):
- return preserve_context_over_fn(
- distributor.fire,
- "user_left_room", user=user, room_id=room_id
- )
-
-
-def user_joined_room(distributor, user, room_id):
- return preserve_context_over_fn(
- distributor.fire,
- "user_joined_room", user=user, room_id=room_id
- )
-
-
class RoomCreationHandler(BaseHandler):
PRESETS_DICT = {
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 8c41cb6f3c..b69f36aefe 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -23,7 +23,8 @@ from synapse.api.constants import (
EventTypes, Membership,
)
from synapse.api.errors import AuthError, SynapseError, Codes
-from synapse.util.logcontext import preserve_context_over_fn
+from synapse.util.async import Linearizer
+from synapse.util.distributor import user_left_room, user_joined_room
from signedjson.sign import verify_signed_json
from signedjson.key import decode_verify_key_bytes
@@ -37,20 +38,6 @@ logger = logging.getLogger(__name__)
id_server_scheme = "https://"
-def user_left_room(distributor, user, room_id):
- return preserve_context_over_fn(
- distributor.fire,
- "user_left_room", user=user, room_id=room_id
- )
-
-
-def user_joined_room(distributor, user, room_id):
- return preserve_context_over_fn(
- distributor.fire,
- "user_joined_room", user=user, room_id=room_id
- )
-
-
class RoomMemberHandler(BaseHandler):
# TODO(paul): This handler currently contains a messy conflation of
# low-level API that works on UserID objects and so on, and REST-level
@@ -60,6 +47,8 @@ class RoomMemberHandler(BaseHandler):
def __init__(self, hs):
super(RoomMemberHandler, self).__init__(hs)
+ self.member_linearizer = Linearizer()
+
self.clock = hs.get_clock()
self.distributor = hs.get_distributor()
@@ -183,6 +172,34 @@ class RoomMemberHandler(BaseHandler):
third_party_signed=None,
ratelimit=True,
):
+ key = (target, room_id,)
+
+ with (yield self.member_linearizer.queue(key)):
+ result = yield self._update_membership(
+ requester,
+ target,
+ room_id,
+ action,
+ txn_id=txn_id,
+ remote_room_hosts=remote_room_hosts,
+ third_party_signed=third_party_signed,
+ ratelimit=ratelimit,
+ )
+
+ defer.returnValue(result)
+
+ @defer.inlineCallbacks
+ def _update_membership(
+ self,
+ requester,
+ target,
+ room_id,
+ action,
+ txn_id=None,
+ remote_room_hosts=None,
+ third_party_signed=None,
+ ratelimit=True,
+ ):
effective_membership_state = action
if action in ["kick", "unban"]:
effective_membership_state = "leave"
@@ -375,19 +392,6 @@ class RoomMemberHandler(BaseHandler):
and guest_access.content["guest_access"] == "can_join"
)
- def _should_do_dance(self, current_state, inviter, room_hosts=None):
- # TODO: Shouldn't this be remote_room_host?
- room_hosts = room_hosts or []
-
- is_host_in_room = self.is_host_in_room(current_state)
- if is_host_in_room:
- return False, room_hosts
-
- if inviter and not self.hs.is_mine(inviter):
- room_hosts.append(inviter.domain)
-
- return True, room_hosts
-
@defer.inlineCallbacks
def lookup_room_alias(self, room_alias):
"""
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 680dc89536..cfc728a038 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -69,6 +69,7 @@ class SlavedEventStore(BaseSlavedStore):
"_get_current_state_for_key"
]
+ get_event = DataStore.get_event.__func__
get_current_state = DataStore.get_current_state.__func__
get_current_state_for_key = DataStore.get_current_state_for_key.__func__
get_rooms_for_user_where_membership_is = (
@@ -89,8 +90,11 @@ class SlavedEventStore(BaseSlavedStore):
_invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
_parse_events_txn = DataStore._parse_events_txn.__func__
_get_events_txn = DataStore._get_events_txn.__func__
+ _enqueue_events = DataStore._enqueue_events.__func__
+ _do_fetch = DataStore._do_fetch.__func__
_fetch_events_txn = DataStore._fetch_events_txn.__func__
_fetch_event_rows = DataStore._fetch_event_rows.__func__
+ _get_event_from_row = DataStore._get_event_from_row.__func__
_get_event_from_row_txn = DataStore._get_event_from_row_txn.__func__
_get_rooms_for_user_where_membership_is_txn = (
DataStore._get_rooms_for_user_where_membership_is_txn.__func__
@@ -100,7 +104,7 @@ class SlavedEventStore(BaseSlavedStore):
def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions()
result["events"] = self._stream_id_gen.get_current_token()
- result["backfilled"] = self._backfill_id_gen.get_current_token()
+ result["backfill"] = self._backfill_id_gen.get_current_token()
return result
def process_replication(self, result):
@@ -142,7 +146,6 @@ class SlavedEventStore(BaseSlavedStore):
position = row[0]
internal = json.loads(row[1])
event_json = json.loads(row[2])
-
event = FrozenEvent(event_json, internal_metadata_dict=internal)
self._invalidate_caches_for_event(
event, backfilled, reset_state=position in state_resets
@@ -158,6 +161,8 @@ class SlavedEventStore(BaseSlavedStore):
self._invalidate_get_event_cache(event.event_id)
+ self.get_latest_event_ids_in_room.invalidate((event.room_id,))
+
if not backfilled:
self._events_stream_cache.entity_has_changed(
event.room_id, event.internal_metadata.stream_ordering
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 04d7fcf6d6..1e27c2c0ce 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -810,12 +810,6 @@ class SQLBaseStore(object):
return txn.execute(sql, keyvalues.values())
- def get_next_stream_id(self):
- with self._next_stream_id_lock:
- i = self._next_stream_id
- self._next_stream_id += 1
- return i
-
def _get_cache_dict(self, db_conn, table, entity_column, stream_column,
max_value):
# Fetch a mapping of room_id -> max stream position for "recent" rooms.
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 00833422af..57f14fd12b 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -30,18 +30,6 @@ SCHEMA_VERSION = 31
dir_path = os.path.abspath(os.path.dirname(__file__))
-def read_schema(path):
- """ Read the named database schema.
-
- Args:
- path: Path of the database schema.
- Returns:
- A string containing the database schema.
- """
- with open(path) as schema_file:
- return schema_file.read()
-
-
class PrepareDatabaseException(Exception):
pass
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 59b4ef5ce6..07f5fae8dd 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -176,16 +176,6 @@ class PresenceStore(SQLBaseStore):
desc="disallow_presence_visible",
)
- def is_presence_visible(self, observed_localpart, observer_userid):
- return self._simple_select_one(
- table="presence_allow_inbound",
- keyvalues={"observed_user_id": observed_localpart,
- "observer_user_id": observer_userid},
- retcols=["observed_user_id"],
- allow_none=True,
- desc="is_presence_visible",
- )
-
def add_presence_list_pending(self, observer_localpart, observed_userid):
return self._simple_insert(
table="presence_list",
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 66e7a40e3c..77518e893f 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -121,26 +121,6 @@ class RoomMemberStore(SQLBaseStore):
with self._stream_id_gen.get_next() as stream_ordering:
yield self.runInteraction("locally_reject_invite", f, stream_ordering)
- def get_room_member(self, user_id, room_id):
- """Retrieve the current state of a room member.
-
- Args:
- user_id (str): The member's user ID.
- room_id (str): The room the member is in.
- Returns:
- Deferred: Results in a MembershipEvent or None.
- """
- return self.runInteraction(
- "get_room_member",
- self._get_members_events_txn,
- room_id,
- user_id=user_id,
- ).addCallback(
- self._get_events
- ).addCallback(
- lambda events: events[0] if events else None
- )
-
@cached(max_entries=5000)
def get_users_in_room(self, room_id):
def f(txn):
@@ -203,19 +183,6 @@ class RoomMemberStore(SQLBaseStore):
defer.returnValue(invite)
defer.returnValue(None)
- def get_leave_and_ban_events_for_user(self, user_id):
- """ Get all the leave events for a user
- Args:
- user_id (str): The user ID.
- Returns:
- A deferred list of event objects.
- """
- return self.get_rooms_for_user_where_membership_is(
- user_id, (Membership.LEAVE, Membership.BAN)
- ).addCallback(lambda leaves: self._get_events([
- leave.event_id for leave in leaves
- ]))
-
def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
""" Get all the rooms for this user where the membership for this user
matches one in the membership list.
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 76bcd9cd00..95b12559a6 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -303,96 +303,6 @@ class StreamStore(SQLBaseStore):
defer.returnValue(ret)
- def get_room_events_stream(
- self,
- user_id,
- from_key,
- to_key,
- limit=0,
- is_guest=False,
- room_ids=None
- ):
- room_ids = room_ids or []
- room_ids = [r for r in room_ids]
- if is_guest:
- current_room_membership_sql = (
- "SELECT c.room_id FROM history_visibility AS h"
- " INNER JOIN current_state_events AS c"
- " ON h.event_id = c.event_id"
- " WHERE c.room_id IN (%s)"
- " AND h.history_visibility = 'world_readable'" % (
- ",".join(map(lambda _: "?", room_ids))
- )
- )
- current_room_membership_args = room_ids
- else:
- current_room_membership_sql = (
- "SELECT m.room_id FROM room_memberships as m "
- " INNER JOIN current_state_events as c"
- " ON m.event_id = c.event_id AND c.state_key = m.user_id"
- " WHERE m.user_id = ? AND m.membership = 'join'"
- )
- current_room_membership_args = [user_id]
-
- # We also want to get any membership events about that user, e.g.
- # invites or leave notifications.
- membership_sql = (
- "SELECT m.event_id FROM room_memberships as m "
- "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
- "WHERE m.user_id = ? "
- )
- membership_args = [user_id]
-
- if limit:
- limit = max(limit, MAX_STREAM_SIZE)
- else:
- limit = MAX_STREAM_SIZE
-
- # From and to keys should be integers from ordering.
- from_id = RoomStreamToken.parse_stream_token(from_key)
- to_id = RoomStreamToken.parse_stream_token(to_key)
-
- if from_key == to_key:
- return defer.succeed(([], to_key))
-
- sql = (
- "SELECT e.event_id, e.stream_ordering FROM events AS e WHERE "
- "(e.outlier = ? AND (room_id IN (%(current)s)) OR "
- "(event_id IN (%(invites)s))) "
- "AND e.stream_ordering > ? AND e.stream_ordering <= ? "
- "ORDER BY stream_ordering ASC LIMIT %(limit)d "
- ) % {
- "current": current_room_membership_sql,
- "invites": membership_sql,
- "limit": limit
- }
-
- def f(txn):
- args = ([False] + current_room_membership_args + membership_args +
- [from_id.stream, to_id.stream])
- txn.execute(sql, args)
-
- rows = self.cursor_to_dict(txn)
-
- ret = self._get_events_txn(
- txn,
- [r["event_id"] for r in rows],
- get_prev_content=True
- )
-
- self._set_before_and_after(ret, rows)
-
- if rows:
- key = "s%d" % max(r["stream_ordering"] for r in rows)
- else:
- # Assume we didn't get anything because there was nothing to
- # get.
- key = to_key
-
- return ret, key
-
- return self.runInteraction("get_room_events_stream", f)
-
@defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None,
direction='b', limit=-1):
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index f69f1cdad4..46cf93ff87 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -112,7 +112,7 @@ class StreamIdGenerator(object):
self._current + self._step * (n + 1),
self._step
)
- self._current += n
+ self._current += n * self._step
for next_id in next_ids:
self._unfinished_ids.append(next_id)
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 3b9da5b34a..b462495eb8 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -49,9 +49,6 @@ class Clock(object):
l.start(msec / 1000.0, now=False)
return l
- def stop_looping_call(self, loop):
- loop.stop()
-
def call_later(self, delay, callback, *args, **kwargs):
"""Call something later
diff --git a/synapse/util/async.py b/synapse/util/async.py
index cd4d90f3cf..0d6f48e2d8 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -16,9 +16,13 @@
from twisted.internet import defer, reactor
-from .logcontext import PreserveLoggingContext, preserve_fn
+from .logcontext import (
+ PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
+)
from synapse.util import unwrapFirstError
+from contextlib import contextmanager
+
@defer.inlineCallbacks
def sleep(seconds):
@@ -137,3 +141,47 @@ def concurrently_execute(func, args, limit):
preserve_fn(_concurrently_execute_inner)()
for _ in xrange(limit)
], consumeErrors=True).addErrback(unwrapFirstError)
+
+
+class Linearizer(object):
+ """Linearizes access to resources based on a key. Useful to ensure only one
+ thing is happening at a time on a given resource.
+
+ Example:
+
+ with (yield linearizer.queue("test_key")):
+ # do some work.
+
+ """
+ def __init__(self):
+ self.key_to_defer = {}
+
+ @defer.inlineCallbacks
+ def queue(self, key):
+ # If there is already a deferred in the queue, we pull it out so that
+ # we can wait on it later.
+ # Then we replace it with a deferred that we resolve *after* the
+ # context manager has exited.
+ # We only return the context manager after the previous deferred has
+ # resolved.
+ # This all has the net effect of creating a chain of deferreds that
+ # wait for the previous deferred before starting their work.
+ current_defer = self.key_to_defer.get(key)
+
+ new_defer = defer.Deferred()
+ self.key_to_defer[key] = new_defer
+
+ if current_defer:
+ yield preserve_context_over_deferred(current_defer)
+
+ @contextmanager
+ def _ctx_manager():
+ try:
+ yield
+ finally:
+ new_defer.callback(None)
+ current_d = self.key_to_defer.get(key)
+ if current_d is new_defer:
+ self.key_to_defer.pop(key, None)
+
+ defer.returnValue(_ctx_manager())
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index be310ba320..36686b479e 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -35,7 +35,7 @@ class ResponseCache(object):
return None
def set(self, key, deferred):
- result = ObservableDeferred(deferred)
+ result = ObservableDeferred(deferred, consumeErrors=True)
self.pending_result_cache[key] = result
def remove(r):
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 8875813de4..d7cccc06b1 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -15,7 +15,9 @@
from twisted.internet import defer
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import (
+ PreserveLoggingContext, preserve_context_over_fn
+)
from synapse.util import unwrapFirstError
@@ -25,6 +27,24 @@ import logging
logger = logging.getLogger(__name__)
+def registered_user(distributor, user):
+ return distributor.fire("registered_user", user)
+
+
+def user_left_room(distributor, user, room_id):
+ return preserve_context_over_fn(
+ distributor.fire,
+ "user_left_room", user=user, room_id=room_id
+ )
+
+
+def user_joined_room(distributor, user, room_id):
+ return preserve_context_over_fn(
+ distributor.fire,
+ "user_joined_room", user=user, room_id=room_id
+ )
+
+
class Distributor(object):
"""A central dispatch point for loosely-connected pieces of code to
register, observe, and fire signals.
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index 4076eed269..1101881a2d 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -100,20 +100,6 @@ class _PerHostRatelimiter(object):
self.current_processing = set()
self.request_times = []
- def is_empty(self):
- time_now = self.clock.time_msec()
- self.request_times[:] = [
- r for r in self.request_times
- if time_now - r < self.window_size
- ]
-
- return not (
- self.ready_request_queue
- or self.sleeping_requests
- or self.current_processing
- or self.request_times
- )
-
@contextlib.contextmanager
def ratelimit(self):
# `contextlib.contextmanager` takes a generator and turns it into a
diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py
index b490bb8725..a100f151d4 100644
--- a/synapse/util/stringutils.py
+++ b/synapse/util/stringutils.py
@@ -21,10 +21,6 @@ _string_with_symbols = (
)
-def origin_from_ucid(ucid):
- return ucid.split("@", 1)[1]
-
-
def random_string(length):
return ''.join(random.choice(string.ascii_letters) for _ in xrange(length))
|