-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserver.py
97 lines (74 loc) · 2.82 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# -*- coding: utf-8 -*-
from twisted.internet.protocol import ServerFactory
from twisted.internet import reactor, threads
from twisted.protocols import basic, policies
from twisted.internet.defer import Deferred
from twisted.internet.error import ConnectionLost
import time
import signal
import twisted.python.log
observer = twisted.python.log.PythonLoggingObserver('ufc')
observer.start()
import logging
log = logging.getLogger('ufc')
class UFCProtocol(basic.LineReceiver, policies.TimeoutMixin):
"""
This is an implementation of Postfix policy protocol
"""
timeout = 600
delimiter = '\n'
def __init__(self):
self._request = []
def connectionMade(self):
self.setTimeout(self.timeout)
log.debug('Connect from %s' % self.transport.getPeer())
def connectionLost(self, reason):
if reason.type != ConnectionLost:
log.debug('Disconnect from %s' % self.transport.getPeer())
else:
log.error('Disconnect from %s: %s' % (self.transport.getPeer(), reason))
def lineReceived(self, line):
self.resetTimeout()
if line:
self._request.append(line)
else:
self._process_request()
self._request = []
def _process_request(self):
log.debug('Processing request: %s' % self._request)
# Run check in a thread so database access don't block our twisted reactor
d = threads.deferToThread(self.factory.check, self._request)
d.addCallback(self._callback)
d.addErrback(self._errback)
def _callback(self, action):
self._send_action(action)
def _errback(self, reason):
log.error("Processing request error: %s" % reason)
self._send_action('DUNNO')
def _send_action(self, action):
answer = "action=%s" % action
log.debug("send_action: %s" % answer)
self.sendLine(answer)
self.sendLine('')
class UFCFactory(ServerFactory):
# Crea instancias de UFCProtocol por cada conexión que se cree
protocol = UFCProtocol
def __init__(self, ufc):
self.ufc = ufc
signal.signal(signal.SIGHUP, self._sighup_handler)
signal.signal(signal.SIGUSR1, self._sigusr1_handler)
def _sighup_handler(self, signum, frame):
self.ufc.configure()
# Si hemos cambiado la configuración de base de datos debemos abrir
# de nuevo todas las conexiones.
log.info("Restarting threadpool...")
reactor.getThreadPool().stop()
reactor.getThreadPool().start()
def _sigusr1_handler(self, signum, frame):
reactor.getThreadPool().dumpStats()
def check(self, lines):
return self.ufc.check(lines)
def start(ufc):
port = reactor.listenTCP(9000, UFCFactory(ufc), interface='127.0.0.1')
log.info('Listening on %s' % port.getHost())
reactor.run()