summary refs log tree commit diff
path: root/synapse/appservice/__init__.py
blob: b5e7ac16bacd09d13aa98504794a01a83725f23f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api.constants import EventTypes

import logging
import re

logger = logging.getLogger(__name__)


class ApplicationService(object):
    """Defines an application service. This definition is mostly what is
    provided to the /register AS API.

    Provides methods to check if this service is "interested" in events.
    """
    NS_USERS = "users"
    NS_ALIASES = "aliases"
    NS_ROOMS = "rooms"
    # The ordering here is important as it is used to map database values (which
    # are stored as ints representing the position in this list) to namespace
    # values.
    NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS]

    def __init__(self, token, url=None, namespaces=None, hs_token=None,
                 sender=None, txn_id=None):
        self.token = token
        self.url = url
        self.hs_token = hs_token
        self.sender = sender
        self.namespaces = self._check_namespaces(namespaces)
        self.txn_id = txn_id

    def _check_namespaces(self, namespaces):
        # Sanity check that it is of the form:
        # {
        #   users: [ {regex: "[A-z]+.*", exclusive: true}, ...],
        #   aliases: [ {regex: "[A-z]+.*", exclusive: true}, ...],
        #   rooms: [ {regex: "[A-z]+.*", exclusive: true}, ...],
        # }
        if not namespaces:
            return None

        for ns in ApplicationService.NS_LIST:
            if ns not in namespaces:
                namespaces[ns] = []
                continue

            if type(namespaces[ns]) != list:
                raise ValueError("Bad namespace value for '%s'" % ns)
            for regex_obj in namespaces[ns]:
                if not isinstance(regex_obj, dict):
                    raise ValueError("Expected dict regex for ns '%s'" % ns)
                if not isinstance(regex_obj.get("exclusive"), bool):
                    raise ValueError(
                        "Expected bool for 'exclusive' in ns '%s'" % ns
                    )
                if not isinstance(regex_obj.get("regex"), basestring):
                    raise ValueError(
                        "Expected string for 'regex' in ns '%s'" % ns
                    )
        return namespaces

    def _matches_regex(self, test_string, namespace_key):
        if not isinstance(test_string, basestring):
            logger.error(
                "Expected a string to test regex against, but got %s",
                test_string
            )
            return False

        for regex in self.namespaces[namespace_key]:
            if re.match(regex, test_string):
                return True
        return False

    def _matches_user(self, event, member_list):
        if (hasattr(event, "sender") and
                self.is_interested_in_user(event.sender)):
            return True
        # also check m.room.member state key
        if (hasattr(event, "type") and event.type == EventTypes.Member
                and hasattr(event, "state_key")
                and self.is_interested_in_user(event.state_key)):
            return True
        # check joined member events
        for member in member_list:
            if self.is_interested_in_user(member.state_key):
                return True
        return False

    def _matches_room_id(self, event):
        if hasattr(event, "room_id"):
            return self.is_interested_in_room(event.room_id)
        return False

    def _matches_aliases(self, event, alias_list):
        for alias in alias_list:
            if self.is_interested_in_alias(alias):
                return True
        return False

    def is_interested(self, event, restrict_to=None, aliases_for_event=None,
                      member_list=None):
        """Check if this service is interested in this event.

        Args:
            event(Event): The event to check.
            restrict_to(str): The namespace to restrict regex tests to.
            aliases_for_event(list): A list of all the known room aliases for
            this event.
            member_list(list): A list of all joined room members in this room.
        Returns:
            bool: True if this service would like to know about this event.
        """
        if aliases_for_event is None:
            aliases_for_event = []
        if member_list is None:
            member_list = []

        if restrict_to and restrict_to not in ApplicationService.NS_LIST:
            # this is a programming error, so fail early and raise a general
            # exception
            raise Exception("Unexpected restrict_to value: %s". restrict_to)

        if not restrict_to:
            return (self._matches_user(event, member_list)
                    or self._matches_aliases(event, aliases_for_event)
                    or self._matches_room_id(event))
        elif restrict_to == ApplicationService.NS_ALIASES:
            return self._matches_aliases(event, aliases_for_event)
        elif restrict_to == ApplicationService.NS_ROOMS:
            return self._matches_room_id(event)
        elif restrict_to == ApplicationService.NS_USERS:
            return self._matches_user(event, member_list)

    def is_interested_in_user(self, user_id):
        return self._matches_regex(user_id, ApplicationService.NS_USERS)

    def is_interested_in_alias(self, alias):
        return self._matches_regex(alias, ApplicationService.NS_ALIASES)

    def is_interested_in_room(self, room_id):
        return self._matches_regex(room_id, ApplicationService.NS_ROOMS)

    def __str__(self):
        return "ApplicationService: %s" % (self.__dict__,)