A Twisted protocol handles data in an asynchronous manner. The protocol responds to events as they arrive from the network and the events arrive as calls to methods on the protocol.
from twisted.internet.protocol import Protocol
class Echo(Protocol):
def dataReceived(self, data):
self.transport.write(data)
This is one of the simplest protocols. It simply writes back whatever is written to it, and does not respond to all events. Here is an example of a Protocol responding to another event:
from twisted.internet.protocol import Protocol
class QOTD(Protocol):
def connectionMade(self):
self.transport.write("An apple a day keeps the doctor away\r\n")
self.transport.loseConnection()
This protocol responds to the initial connection with a well known quote, and then terminates the connection. The connectionMade event is usually where setup of the connection object happens, as well as any initial greetings (as in the QOTD protocol above, which is actually based on RFC 865). The connectionLost event is where tearing down of any connection-specific objects is done. Here is an example: Here connectionMade and connectionLost cooperate to keep a count of the active protocols in a shared object, the factory. The factory must be passed to Echo.init when creating a new instance. The factory is used to share state that exists beyond the lifetime of any given connection. You will see why this object is called a “factory” in the next section.
from twisted.internet.protocol import Protocol
class Echo(Protocol):
def __init__(self, factory):
self.factory = factory
def connectionMade(self):
self.factory.numProtocols = self.factory.numProtocols + 1
self.transport.write(
"Welcome! There are currently %d open connections.\n" %
(self.factory.numProtocols,))
def connectionLost(self, reason):
self.factory.numProtocols = self.factory.numProtocols - 1
def dataReceived(self, data):
self.transport.write(data)
Using the protocol: Here is code that will run the QOTD server discussed earlier: In this example, I create a protocol Factory. I want to tell this factory that its job is to build QOTD protocol instances, so I set its buildProtocol method to return instances of the QOTD class. Then, I want to listen on a TCP port, so I make a TCP4ServerEndpoint to identify the port that I want to bind to, and then pass the factory I just created to its listen method. endpoint.listen() tells the reactor to handle connections to the endpoint’s address using a particular protocol, but the reactor needs to be running in order for it to do anything. reactor.run() starts the reactor and then waits forever for connections to arrive on the port you’ve specified. You can stop the reactor by hitting Control-C in a terminal or calling reactor.stop().
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor
class QOTDFactory(Factory):
def buildProtocol(self, addr):
return QOTD()
# 8007 is the port you want to run under. Choose something >1024
endpoint = TCP4ServerEndpoint(reactor, 8007)
endpoint.listen(QOTDFactory())
reactor.run()
Simpler: factory creation
For a factory which simply instantiates instances of a specific protocol class, there is a simpler way to implement the factory.
The default implementation of the buildProtocol method calls the protocol attribute of the factory to create a Protocol instance,
and then sets an attribute on it called factory which points to the factory itself. This lets every Protocol access, and possibly modify, the persistent configuration. Here is an example that uses these features instead of overriding buildProtocol:
from twisted.internet.protocol import Factory, Protocol
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor
class QOTD(Protocol):
def connectionMade(self):
# self.factory was set by the factory's default buildProtocol:
self.transport.write(self.factory.quote + '\r\n')
self.transport.loseConnection()
class QOTDFactory(Factory):
# This will be used by the default buildProtocol to create new protocols:
protocol = QOTD
def __init__(self, quote=None):
self.quote = quote or 'An apple a day keeps the doctor away'
endpoint = TCP4ServerEndpoint(reactor, 8007)
endpoint.listen(QOTDFactory("configurable quote"))
A Factory has two methods to perform application-specific building up and tearing down (since a Factory is frequently persisted, it is often not appropriate to do them in init or del, and would frequently be too early or too late). Here is an example of a factory which allows its Protocols to write to a special log-file:
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
class LoggingProtocol(LineReceiver):
def lineReceived(self, line):
self.factory.fp.write(line + '\n')
class LogfileFactory(Factory):
protocol = LoggingProtocol
def __init__(self, fileName):
self.file = fileName
def startFactory(self):
self.fp = open(self.file, 'a')
def stopFactory(self):
self.fp.close()
Chat server using thing slearned above:
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor
from clint.textui import colored
import sys
import datetime
port = 8000
chatLog = []
class ChatProtocol(LineReceiver):
def __init__(self, factory):
self.factory = factory
self.name = None
self.state = "REGISTER"
def getTime(self):
return '({:%Y/%m/%d %H:%M:%S})'.format(datetime.datetime.now())
def connectionMade(self):
banner = ("""
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#### Successfully Connected to the Chat Server ####
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
""")
self.sendLine(banner)
self.sendLine(self.getTime())
self.sendLine("Choose a username:")
def connectionLost(self, reason):
leftMsg = colored.red('%s has left the channel.' % (self.name,))
if self.name in self.factory.users:
del self.factory.users[self.name]
self.broadcastMessage(leftMsg)
chatLog.append(self.name + " exits.")
self.updateSessionInfo()
def lineReceived(self, line):
if self.state == "REGISTER":
self.handle_REGISTER(line)
else:
self.handle_CHAT(line)
def handle_REGISTER(self, name):
if name in self.factory.users:
self.sendLine("Sorry, %r is taken. Try something else." % name)
return
welcomeMsg = ('Welcome to the chat, %s.' % (name,))
joinedMsg = colored.green('%s has joined the chanel.' % (name,))
self.sendLine(welcomeMsg)
self.broadcastMessage(joinedMsg)
self.name = name
self.factory.users[name] = self
self.state = "CHAT"
self.updateSessionInfo()
if len(self.factory.users) > 1:
self.sendLine('Participants in chat: %s ' % (", ".join(self.factory.users)))
else:
self.sendLine("You're the only one here, %r" % name)
def handle_CHAT(self, message):
message = self.getTime() + "<%s> %s" % (self.name, message)
self.broadcastMessage(colored.magenta(message))
chatLog.append(message)
self.updateSessionInfo()
def broadcastMessage(self, message):
for name, protocol in self.factory.users.iteritems():
if protocol != self:
protocol.sendLine(colored.white(message))
self.updateSessionInfo()
def updateSessionInfo(self):
print(chr(27) + "[2J")
molding = "============================================"
print(molding)
print('Users in chat: %s ' % (", ".join(self.factory.users)))
print(molding)
global chatLog
chatLog = chatLog[-20:]
print("\n".join(chatLog))
class ChatFactory(Factory):
def __init__(self):
self.users = {}
def buildProtocol(self, addr):
return ChatProtocol(self)
reactor.listenTCP(port, ChatFactory())
print("Chat Server started on port %s" % (port,))
reactor.run()
UDP Networking: Unlike TCP, UDP has no notion of connections. A UDP socket can receive datagrams from any server on the network and send datagrams to any host on the network. In addition, datagrams may arrive in any order, never arrive at all, or be duplicated in transit. Since there are no connections, we only use a single object, a protocol, for each UDP socket. We then use the reactor to connect this protocol to a UDP transport, using the twisted.internet.interfaces.IReactorUDP reactor API.
The class where you actually implement the protocol parsing and handling will usually be descended from twisted.internet.protocol.DatagramProtocol or from one of its convenience children. The DatagramProtocol class receives datagrams and can send them out over the network. Received datagrams include the address they were sent from. When sending datagrams the destination address must be specified.
from __future__ import print_function
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
class Echo(DatagramProtocol):
def datagramReceived(self, data, addr):
print("received %r from %s" % (data, addr))
self.transport.write(data, addr)
reactor.listenUDP(9999, Echo())
reactor.run()
Twisted in built client :Agent
A simple GET request
from __future__ import print_function
from twisted.internet import reactor
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
agent = Agent(reactor)
d = agent.request(
'GET',
'http://example.com/',
Headers({'User-Agent': ['Twisted Web Client Example']}),
None)
def cbResponse(ignored):
print('Response received')
d.addCallback(cbResponse)
def cbShutdown(ignored):
reactor.stop()
d.addBoth(cbShutdown)
reactor.run()
Simple POST Request using Producers:
from zope.interface import implementer
from twisted.internet.defer import succeed
from twisted.web.iweb import IBodyProducer
from __future__ import print_function
from twisted.internet import reactor
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
from stringprod import StringProducer
@implementer(IBodyProducer)
class StringProducer(object):
def __init__(self, body):
self.body = body
self.length = len(body)
def startProducing(self, consumer):
consumer.write(self.body)
return succeed(None)
def pauseProducing(self):
pass
def stopProducing(self):
pass
agent = Agent(reactor)
body = StringProducer("hello, world")
d = agent.request(
'GET',
'http://example.com/',
Headers({'User-Agent': ['Twisted Web Client Example'],
'Content-Type': ['text/x-greeting']}),
body)
def cbResponse(ignored):
print('Response received')
d.addCallback(cbResponse)
def cbShutdown(ignored):
reactor.stop()
d.addBoth(cbShutdown)
reactor.run()
Handling Response: ;)
from __future__ import print_function
from pprint import pformat
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
class BeginningPrinter(Protocol):
def __init__(self, finished):
self.finished = finished
self.remaining = 1024 * 10
def dataReceived(self, bytes):
if self.remaining:
display = bytes[:self.remaining]
print('Some data received:')
print(display)
self.remaining -= len(display)
def connectionLost(self, reason):
print('Finished receiving body:', reason.getErrorMessage())
self.finished.callback(None)
agent = Agent(reactor)
d = agent.request(
'GET',
'http://jsonplaceholder.typicode.com/posts',
Headers({'User-Agent': ['Twisted Web Client Example']}),
None)
def cbRequest(response):
print('Response version:', response.version)
print('Response code:', response.code)
print('Response phrase:', response.phrase)
print('Response headers:')
print(pformat(list(response.headers.getAllRawHeaders())))
finished = Deferred()
response.deliverBody(BeginningPrinter(finished))
return finished
d.addCallback(cbRequest)
def cbShutdown(ignored):
reactor.stop()
d.addBoth(cbShutdown)
reactor.run()
from twisted.web.server import Site
from twisted.web.static import File
from twisted.internet import reactor
resource = File('/tmp')
factory = Site(resource)
reactor.listenTCP(8888, factory)
reactor.run()
Generating page dynaically
from twisted.internet import reactor
from twisted.web.server import Site
from twisted.web.resource import Resource
import time
class ClockPage(Resource):
isLeaf = True
def render_GET(self, request):
return "%s" % (time.ctime(),)
resource = ClockPage()
factory = Site(resource)
reactor.listenTCP(8880, factory)
reactor.run()
Static url dispatch
from twisted.web.server import Site
from twisted.web.resource import Resource
from twisted.internet import reactor
from twisted.web.static import File
root = Resource()
root.putChild("foo", File("/tmp"))
root.putChild("bar", File("/lost+found"))
root.putChild("baz", File("/opt"))
factory = Site(root)
reactor.listenTCP(8880, factory)
reactor.run()
Dynamic url dispatch
<pre>
from twisted.web.server import Site
from twisted.web.resource import Resource
from twisted.internet import reactor
from calendar import calendar
class YearPage(Resource): def init(self, year): Resource.init(self) self.year = year
def render_GET(self, request):
return "<html><body><pre>%s</pre></body></html>" % (calendar(self.year),)
class Calendar(Resource): def getChild(self, name, request): return YearPage(int(name))
root = Calendar() factory = Site(root) reactor.listenTCP(8880, factory) reactor.run() </code></pre>
Handling errors
from twisted.web.server import Site
from twisted.web.resource import Resource
from twisted.internet import reactor
from twisted.web.resource import NoResource
from calendar import calendar
class YearPage(Resource):
def __init__(self, year):
Resource.__init__(self)
self.year = year
def render_GET(self, request):
return "%s
" % (calendar(self.year),)
class Calendar(Resource):
def getChild(self, name, request):
try:
year = int(name)
except ValueError:
return NoResource()
else:
return YearPage(year)
root = Calendar()
factory = Site(root)
reactor.listenTCP(8880, factory)
reactor.run()
Aynchronous response: Next we need to define the render method. Here’s where things change a bit. Instead of using callLater , We’re going to use deferLater this time. deferLater accepts a reactor, delay (in seconds, as with callLater ), and a function to call after the delay to produce that elusive object discussed in the description of Deferred s. We’re also going to use _delayedRender as the callback to add to the Deferred returned by deferLater . Since it expects the request object as an argument, we’re going to set up the deferLater call to return a Deferred which has the request object as its result.
from twisted.internet.task import deferLater
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
from twisted.internet import reactor
class DelayedResource(Resource):
def _delayedRender(self, request):
request.write("Sorry to keep you waiting.")
request.finish()
def render_GET(self, request):
d = deferLater(reactor, 5, lambda: request)
d.addCallback(self._delayedRender)
return NOT_DONE_YET
resource = DelayedResource()
Interrupting Connection: Notice that since _responseFailed needs a reference to the delayed call object in order to cancel it, we passed that object to addErrback . Any additional arguments passed to addErrback (or addCallback ) will be passed along to the errback after the Failure instance which is always passed as the first argument. Passing call here means it will be passed to _responseFailed , where it is expected and required.
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
from twisted.internet import reactor
class DelayedResource(Resource):
def _delayedRender(self, request):
request.write("Sorry to keep you waiting.")
request.finish()
def _responseFailed(self, err, call):
call.cancel()
def render_GET(self, request):
call = reactor.callLater(5, self._delayedRender, request)
request.notifyFinish().addErrback(self._responseFailed, call)
return NOT_DONE_YET
encoding response : Using compression on SSL served resources where the user can influence the content can lead to information leak, so be careful which resources use request encoders.
Note that only encoder can be used per request: the first encoder factory returning an object will be used, so the order in which they are specified matters.
from twisted.web.server import Site, GzipEncoderFactory
from twisted.web.resource import Resource, EncodingResourceWrapper
from twisted.internet import reactor, endpoints
class Simple(Resource):
isLeaf = True
def render_GET(self, request):
return "Hello, world!"
resource = Simple()
wrapped = EncodingResourceWrapper(resource, [GzipEncoderFactory()])
site = Site(wrapped)
endpoint = endpoints.TCP4ServerEndpoint(reactor, 8080)
endpoint.listen(site)
reactor.run()
</html>