[Twisted-Python] Learning about IPushProducer

Rutt, Benjamin Benjamin.Rutt at gs.com
Mon Mar 12 15:44:41 EDT 2007


Hi.
 
Anyone have any pointers as to how I can get some of my questions
answered below?  I had hoped to get some response.  Did I not use the
proper etiquitte?  Or there is some expert on the IPushProducer
mechanism or the author of page 
 
http://twistedmatrix.com/projects/core/documentation/howto/producers.htm
l
 
that I can be referred to that perhaps isn't reading this list?

Thanks,
Benjamin Rutt

________________________________

	From: twisted-python-bounces at twistedmatrix.com
[mailto:twisted-python-bounces at twistedmatrix.com] On Behalf Of Rutt,
Benjamin
	Sent: Tuesday, March 06, 2007 12:04 PM
	To: twisted-python at twistedmatrix.com
	Subject: [Twisted-Python] Learning about IPushProducer
	
	

	When running the following code (my 2nd twisted program!), it
works as I had hoped - it doesn't starve any clients that want to
receive data back, even with a simultaneously active really long
streaming server-to-client communication (i.e. one piggy client asking
for millions of bytes).  i.e. another client can get in and ask for just
a few bytes while a large payload is being delivered to a different
client.  Which is great!

	Here's a sample interaction from the client side: 

	$ telnet localhost 8007 
	Trying 127.0.0.1... 
	Connected to localhost. 
	Escape character is '^]'. 
	1 
	x 
	2 
	xx 
	3 
	xxx 
	10 
	xxxxxxxxxx 
	99999 
	
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

	[...lots of x's...] 
	xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 
	bye 
	Connection closed by foreign host. 
	$ 

	So I have 2 questions on my code: 

	1) am I doing anything wrong in setting up the plumbing? 
	2) does pauseProducing() get called by another thread whilst
resumeProducing() is running?  (I believe it must, otherwise my
resumeProducing() would only be entered once).  If so I should have an
appropriate mutex around the read/write of self.pause, no?

	Here is the code, and output from the server is at the end.
Thanks -- Benjamin 

	#!/usr/bin/env python 
	import os, os.path, sys, re, commands, pickle, tempfile, getopt,
datetime 
	import socket, string, random, time, traceback, shutil, popen2 

	from zope.interface import implements 
	from twisted.internet import protocol, defer, interfaces, error,
reactor 
	from twisted.internet.protocol import Protocol, Factory 
	from twisted.protocols.basic import LineReceiver 

	class NonStarvingXGiver: 
	    implements(interfaces.IPushProducer) 
	    def __init__(self, howmany, consumer): 
	        self.howmany = howmany 
	        self.sent_already = 0 
	        self.paused = False 
	        self.consumer = consumer 
	    def beginSendingXs(self): 
	        self.deferred = deferred = defer.Deferred() 
	        self.consumer.registerProducer(self, False) 
	        return deferred 
	    def pauseProducing(self): 
	        print 'pauseProducing: invoked' 
	        self.paused = True 
	    def resumeProducing(self): 
	        print 'resumeProducing: invoked' 
	        self.paused = False 
	        maxchunksz = 1024 
	        while not self.paused and self.howmany >
self.sent_already: 
	            chunksz = min(maxchunksz, self.howmany -
self.sent_already) 
	            self.consumer.write('x' * chunksz) 
	            self.sent_already += chunksz 
	        if self.howmany == self.sent_already: 
	            self.consumer.write('\n') 
	            self.consumer.unregisterProducer() 
	            print 'resumeProducing: exiting for the last time' 
	    def stopProducing(self): 
	        print 'stopProducing: invoked' 
	        self.consumer.unregisterProducer() 
	        
	class xgiver(LineReceiver): 
	    def lineReceived(self, howmany): 
	        print 'got line [%s] from client [%s]' % (howmany, 
	
self.transport.getPeer()) 
	        if howmany == 'bye': 
	            print 'goodbye to', self.transport.getPeer() 
	            self.transport.loseConnection() 
	            return 
	        try: 
	            howmany = int(howmany) 
	            s = NonStarvingXGiver(howmany, self.transport) 
	            s.beginSendingXs() 
	        except Exception, ex: 
	            self.transport.write("invalid input " + howmany +
"\n") 

	# Next lines are magic: 
	factory = Factory() 
	factory.protocol = xgiver 

	# 8007 is the port you want to run under. Choose something >1024

	reactor.listenTCP(8007, factory) 
	reactor.run() 


	
------------------------------------------------------------------- 
	Server output: 

	$ ./xgiver.py 
	got line [1] from client [IPv4Address(TCP, '127.0.0.1', 51007)] 
	resumeProducing: invoked 
	resumeProducing: exiting for the last time 
	got line [2] from client [IPv4Address(TCP, '127.0.0.1', 51007)] 
	resumeProducing: invoked 
	resumeProducing: exiting for the last time 
	got line [3] from client [IPv4Address(TCP, '127.0.0.1', 51007)] 
	resumeProducing: invoked 
	resumeProducing: exiting for the last time 
	got line [10] from client [IPv4Address(TCP, '127.0.0.1', 51007)]

	resumeProducing: invoked 
	resumeProducing: exiting for the last time 
	got line [99999] from client [IPv4Address(TCP, '127.0.0.1',
51007)] 
	resumeProducing: invoked 
	pauseProducing: invoked 
	resumeProducing: invoked 
	resumeProducing: exiting for the last time 
	got line [bye] from client [IPv4Address(TCP, '127.0.0.1',
51007)] 
	goodbye to IPv4Address(TCP, '127.0.0.1', 51007) 

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://twistedmatrix.com/pipermail/twisted-python/attachments/20070312/81569638/attachment.htm 


More information about the Twisted-Python mailing list