diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index f8266d1c81..e3da45b416 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -209,7 +209,7 @@ class Auth(object):
)[0]
if user and access_token and ip_addr:
self.store.insert_client_ip(
- user=user,
+ user_id=user.to_string(),
access_token=access_token,
ip=ip_addr,
user_agent=user_agent,
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
new file mode 100644
index 0000000000..c8fa7854ad
--- /dev/null
+++ b/synapse/app/frontend_proxy.py
@@ -0,0 +1,267 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.http.site import SynapseSite
+from synapse.http.server import JsonResource
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
+from synapse.storage.engines import create_engine
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from synapse.crypto import context_factory
+from synapse.api.errors import SynapseError
+from synapse.http.servlet import (
+ RestServlet, parse_json_object_from_request,
+)
+from synapse.rest.client.v2_alpha._base import client_v2_patterns
+
+from synapse import events
+
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+
+
+logger = logging.getLogger("synapse.app.frontend_proxy")
+
+
+class KeyUploadServlet(RestServlet):
+ PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$",
+ releases=())
+
+ def __init__(self, hs):
+ """
+ Args:
+ hs (synapse.server.HomeServer): server
+ """
+ super(KeyUploadServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
+ self.http_client = hs.get_simple_http_client()
+ self.main_uri = hs.config.worker_main_http_uri
+
+ @defer.inlineCallbacks
+ def on_POST(self, request, device_id):
+ requester = yield self.auth.get_user_by_req(request, allow_guest=True)
+ user_id = requester.user.to_string()
+ body = parse_json_object_from_request(request)
+
+ if device_id is not None:
+ # passing the device_id here is deprecated; however, we allow it
+ # for now for compatibility with older clients.
+ if (requester.device_id is not None and
+ device_id != requester.device_id):
+ logger.warning("Client uploading keys for a different device "
+ "(logged in as %s, uploading for %s)",
+ requester.device_id, device_id)
+ else:
+ device_id = requester.device_id
+
+ if device_id is None:
+ raise SynapseError(
+ 400,
+ "To upload keys, you must pass device_id when authenticating"
+ )
+
+ if body:
+ # They're actually trying to upload something, proxy to main synapse.
+ result = yield self.http_client.post_json_get_json(
+ self.main_uri + request.uri,
+ body,
+ )
+
+ defer.returnValue((200, result))
+ else:
+ # Just interested in counts.
+ result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
+ defer.returnValue((200, {"one_time_key_counts": result}))
+
+
+class FrontendProxySlavedStore(
+ SlavedDeviceStore,
+ SlavedClientIpStore,
+ BaseSlavedStore,
+):
+ pass
+
+
+class FrontendProxyServer(HomeServer):
+ def get_db_conn(self, run_new_connection=True):
+ # Any param beginning with cp_ is a parameter for adbapi, and should
+ # not be passed to the database engine.
+ db_params = {
+ k: v for k, v in self.db_config.get("args", {}).items()
+ if not k.startswith("cp_")
+ }
+ db_conn = self.database_engine.module.connect(**db_params)
+
+ if run_new_connection:
+ self.database_engine.on_new_connection(db_conn)
+ return db_conn
+
+ def setup(self):
+ logger.info("Setting up.")
+ self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self)
+ logger.info("Finished setting up.")
+
+ def _listen_http(self, listener_config):
+ port = listener_config["port"]
+ bind_addresses = listener_config["bind_addresses"]
+ site_tag = listener_config.get("tag", port)
+ resources = {}
+ for res in listener_config["resources"]:
+ for name in res["names"]:
+ if name == "metrics":
+ resources[METRICS_PREFIX] = MetricsResource(self)
+ elif name == "client":
+ resource = JsonResource(self, canonical_json=False)
+ KeyUploadServlet(self).register(resource)
+ resources.update({
+ "/_matrix/client/r0": resource,
+ "/_matrix/client/unstable": resource,
+ "/_matrix/client/v2_alpha": resource,
+ "/_matrix/client/api/v1": resource,
+ })
+
+ root_resource = create_resource_tree(resources, Resource())
+
+ for address in bind_addresses:
+ reactor.listenTCP(
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ ),
+ interface=address
+ )
+
+ logger.info("Synapse client reader now listening on port %d", port)
+
+ def start_listening(self, listeners):
+ for listener in listeners:
+ if listener["type"] == "http":
+ self._listen_http(listener)
+ elif listener["type"] == "manhole":
+ bind_addresses = listener["bind_addresses"]
+
+ for address in bind_addresses:
+ reactor.listenTCP(
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ ),
+ interface=address
+ )
+ else:
+ logger.warn("Unrecognized listener type: %s", listener["type"])
+
+ self.get_tcp_replication().start_replication(self)
+
+ def build_tcp_replication(self):
+ return ReplicationClientHandler(self.get_datastore())
+
+
+def start(config_options):
+ try:
+ config = HomeServerConfig.load_config(
+ "Synapse frontend proxy", config_options
+ )
+ except ConfigError as e:
+ sys.stderr.write("\n" + e.message + "\n")
+ sys.exit(1)
+
+ assert config.worker_app == "synapse.app.frontend_proxy"
+
+ assert config.worker_main_http_uri is not None
+
+ setup_logging(config, use_worker_options=True)
+
+ events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+ database_engine = create_engine(config.database_config)
+
+ tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+ ss = FrontendProxyServer(
+ config.server_name,
+ db_config=config.database_config,
+ tls_server_context_factory=tls_server_context_factory,
+ config=config,
+ version_string="Synapse/" + get_version_string(synapse),
+ database_engine=database_engine,
+ )
+
+ ss.setup()
+ ss.get_handlers()
+ ss.start_listening(config.worker_listeners)
+
+ def run():
+ # make sure that we run the reactor with the sentinel log context,
+ # otherwise other PreserveLoggingContext instances will get confused
+ # and complain when they see the logcontext arbitrarily swapping
+ # between the sentinel and `run` logcontexts.
+ with PreserveLoggingContext():
+ logger.info("Running")
+ change_resource_limit(config.soft_file_limit)
+ if config.gc_thresholds:
+ gc.set_threshold(*config.gc_thresholds)
+ reactor.run()
+
+ def start():
+ ss.get_state_handler().start_caching()
+ ss.get_datastore().start_profiling()
+
+ reactor.callWhenRunning(start)
+
+ if config.worker_daemonize:
+ daemon = Daemonize(
+ app="synapse-frontend-proxy",
+ pid=config.worker_pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+ daemon.start()
+ else:
+ run()
+
+
+if __name__ == '__main__':
+ with LoggingContext("main"):
+ start(sys.argv[1:])
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index ea48d931a1..99d5d8aaeb 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -32,6 +32,8 @@ class WorkerConfig(Config):
self.worker_replication_port = config.get("worker_replication_port", None)
self.worker_name = config.get("worker_name", self.worker_app)
+ self.worker_main_http_uri = config.get("worker_main_http_uri", None)
+
if self.worker_listeners:
for listener in self.worker_listeners:
bind_address = listener.pop("bind_address", None)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 3d676e7d8b..a78f01e442 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -153,12 +153,10 @@ class Authenticator(object):
class BaseFederationServlet(object):
REQUIRE_AUTH = True
- def __init__(self, handler, authenticator, ratelimiter, server_name,
- room_list_handler):
+ def __init__(self, handler, authenticator, ratelimiter, server_name):
self.handler = handler
self.authenticator = authenticator
self.ratelimiter = ratelimiter
- self.room_list_handler = room_list_handler
def _wrap(self, func):
authenticator = self.authenticator
@@ -590,7 +588,7 @@ class PublicRoomList(BaseFederationServlet):
else:
network_tuple = ThirdPartyInstanceID(None, None)
- data = yield self.room_list_handler.get_local_public_room_list(
+ data = yield self.handler.get_local_public_room_list(
limit, since_token,
network_tuple=network_tuple
)
@@ -611,7 +609,7 @@ class FederationVersionServlet(BaseFederationServlet):
}))
-SERVLET_CLASSES = (
+FEDERATION_SERVLET_CLASSES = (
FederationSendServlet,
FederationPullServlet,
FederationEventServlet,
@@ -634,17 +632,27 @@ SERVLET_CLASSES = (
FederationThirdPartyInviteExchangeServlet,
On3pidBindServlet,
OpenIdUserInfo,
- PublicRoomList,
FederationVersionServlet,
)
+ROOM_LIST_CLASSES = (
+ PublicRoomList,
+)
+
def register_servlets(hs, resource, authenticator, ratelimiter):
- for servletclass in SERVLET_CLASSES:
+ for servletclass in FEDERATION_SERVLET_CLASSES:
servletclass(
handler=hs.get_replication_layer(),
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
- room_list_handler=hs.get_room_list_handler(),
+ ).register(resource)
+
+ for servletclass in ROOM_LIST_CLASSES:
+ servletclass(
+ handler=hs.get_room_list_handler(),
+ authenticator=authenticator,
+ ratelimiter=ratelimiter,
+ server_name=hs.hostname,
).register(resource)
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 9a96e6fe8f..803ac3e75b 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -19,7 +19,6 @@ from twisted.internet import defer
from .push_rule_evaluator import PushRuleEvaluatorForEvent
-from synapse.visibility import filter_events_for_clients_context
from synapse.api.constants import EventTypes, Membership
from synapse.util.caches.descriptors import cached
from synapse.util.async import Linearizer
@@ -92,15 +91,6 @@ class BulkPushRuleEvaluator(object):
rules_by_user = yield self._get_rules_for_event(event, context)
actions_by_user = {}
- # None of these users can be peeking since this list of users comes
- # from the set of users in the room, so we know for sure they're all
- # actually in the room.
- user_tuples = [(u, False) for u in rules_by_user]
-
- filtered_by_user = yield filter_events_for_clients_context(
- self.store, user_tuples, [event], {event.event_id: context}
- )
-
room_members = yield self.store.get_joined_users_from_context(
event, context
)
@@ -110,6 +100,14 @@ class BulkPushRuleEvaluator(object):
condition_cache = {}
for uid, rules in rules_by_user.iteritems():
+ if event.sender == uid:
+ continue
+
+ if not event.is_state():
+ is_ignored = yield self.store.is_ignored_by(event.sender, uid)
+ if is_ignored:
+ continue
+
display_name = None
profile_info = room_members.get(uid)
if profile_info:
@@ -121,13 +119,6 @@ class BulkPushRuleEvaluator(object):
if event.type == EventTypes.Member and event.state_key == uid:
display_name = event.content.get("displayname", None)
- filtered = filtered_by_user[uid]
- if len(filtered) == 0:
- continue
-
- if filtered[0].sender == uid:
- continue
-
for rule in rules:
if 'enabled' in rule and not rule['enabled']:
continue
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 4d88046579..172c27c137 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -200,7 +200,9 @@ def _glob_to_re(glob, word_boundary):
return re.compile(r, flags=re.IGNORECASE)
-def _flatten_dict(d, prefix=[], result={}):
+def _flatten_dict(d, prefix=[], result=None):
+ if result is None:
+ result = {}
for key, value in d.items():
if isinstance(value, basestring):
result[".".join(prefix + [key])] = value.lower()
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 65250285e8..352c9a2aa8 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -29,9 +29,8 @@ class SlavedClientIpStore(BaseSlavedStore):
max_entries=50000 * CACHE_SIZE_FACTOR,
)
- def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
+ def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id):
now = int(self._clock.time_msec())
- user_id = user.to_string()
key = (user_id, access_token, ip)
try:
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index a009214e43..171227cce2 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -323,14 +323,18 @@ class UserIpCommand(Command):
@classmethod
def from_line(cls, line):
- user_id, access_token, ip, device_id, last_seen, user_agent = line.split(" ", 5)
+ user_id, jsn = line.split(" ", 1)
- return cls(user_id, access_token, ip, user_agent, device_id, int(last_seen))
+ access_token, ip, user_agent, device_id, last_seen = json.loads(jsn)
+
+ return cls(
+ user_id, access_token, ip, user_agent, device_id, last_seen
+ )
def to_line(self):
- return " ".join((
- self.user_id, self.access_token, self.ip, self.device_id,
- str(self.last_seen), self.user_agent,
+ return self.user_id + " " + json.dumps((
+ self.access_token, self.ip, self.user_agent, self.device_id,
+ self.last_seen,
))
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index aa84ffc2b0..ff14e54c11 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -308,3 +308,16 @@ class AccountDataStore(SQLBaseStore):
" WHERE stream_id < ?"
)
txn.execute(update_max_id_sql, (next_id, next_id))
+
+ @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000)
+ def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):
+ ignored_account_data = yield self.get_global_account_data_by_type_for_user(
+ "m.ignored_user_list", ignorer_user_id,
+ on_invalidate=cache_context.invalidate,
+ )
+ if not ignored_account_data:
+ defer.returnValue(False)
+
+ defer.returnValue(
+ ignored_user_id in ignored_account_data.get("ignored_users", {})
+ )
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index fc468ea185..3c95e90eca 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -56,9 +56,11 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
)
reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch)
- def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
- now = int(self._clock.time_msec())
- key = (user.to_string(), access_token, ip)
+ def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
+ now=None):
+ if not now:
+ now = int(self._clock.time_msec())
+ key = (user_id, access_token, ip)
try:
last_seen = self.client_ip_last_seen.get(key)
diff --git a/synapse/visibility.py b/synapse/visibility.py
index c4dd9ae2c7..5590b866ed 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -189,25 +189,6 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
@defer.inlineCallbacks
-def filter_events_for_clients_context(store, user_tuples, events, event_id_to_context):
- user_ids = set(u[0] for u in user_tuples)
- event_id_to_state = {}
- for event_id, context in event_id_to_context.items():
- state = yield store.get_events([
- e_id
- for key, e_id in context.current_state_ids.iteritems()
- if key == (EventTypes.RoomHistoryVisibility, "")
- or (key[0] == EventTypes.Member and key[1] in user_ids)
- ])
- event_id_to_state[event_id] = state
-
- res = yield filter_events_for_clients(
- store, user_tuples, events, event_id_to_state
- )
- defer.returnValue(res)
-
-
-@defer.inlineCallbacks
def filter_events_for_client(store, user_id, events, is_peeking=False):
"""
Check which events a user is allowed to see
diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py
index 2eaaa8253c..778ff2f6e9 100644
--- a/tests/handlers/test_device.py
+++ b/tests/handlers/test_device.py
@@ -19,7 +19,6 @@ import synapse.api.errors
import synapse.handlers.device
import synapse.storage
-from synapse import types
from tests import unittest, utils
user1 = "@boris:aaa"
@@ -179,6 +178,6 @@ class DeviceTestCase(unittest.TestCase):
if ip is not None:
yield self.store.insert_client_ip(
- types.UserID.from_string(user_id),
+ user_id,
access_token, ip, "user_agent", device_id)
self.clock.advance_time(1000)
diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py
index 03df697575..bd6fda6cb1 100644
--- a/tests/storage/test_client_ips.py
+++ b/tests/storage/test_client_ips.py
@@ -15,9 +15,6 @@
from twisted.internet import defer
-import synapse.server
-import synapse.storage
-import synapse.types
import tests.unittest
import tests.utils
@@ -39,7 +36,7 @@ class ClientIpStoreTestCase(tests.unittest.TestCase):
self.clock.now = 12345678
user_id = "@user:id"
yield self.store.insert_client_ip(
- synapse.types.UserID.from_string(user_id),
+ user_id,
"access_token", "ip", "user_agent", "device_id",
)
|