Never been to DZone Snippets before?

Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world

« Newer Snippets
Older Snippets »
Showing 1-10 of 57 total  RSS 

Python: NMEA GPGGA parser

Not tested extensivly, use with caution! I used an LD-1W and it seems to work alright with this parser.

class GPGGAParser(object):
	import logging
	
	def __init__(self, sentance):
		import time, logging
		
		logging.debug("GPPGAParser started")
		logging.debug("Trying to parse: "+sentance)
		(self.format,
		 self.utc,
		 self.latitude, 
		 self.northsouth, 
		 self.longitude, 
		 self.eastwest, 
		 self.quality, 
		 self.number_of_satellites_in_use, 
		 self.horizontal_dilution, 
		 self.altitude, 
		 self.above_sea_unit, 
		 self.geoidal_separation, 
		 self.geoidal_separation_unit, 
		 self.data_age, 
		 self.diff_ref_stationID) = sentance.split(",")

		latitude_in=float(self.latitude)
		longitude_in=float(self.longitude)
		if self.northsouth == 'S':
			latitude_in = -latitude_in
		if self.eastwest == 'W':
			longitude_in = -longitude_in

		latitude_degrees = int(latitude_in/100)
		latitude_minutes = latitude_in - latitude_degrees*100
		
		longitude_degrees = int(longitude_in/100)
		longitude_minutes = longitude_in - longitude_degrees*100
		
		self.latitude = latitude_degrees + (latitude_minutes/60)
		self.longitude = longitude_degrees + (longitude_minutes/60)
		
		self.timeOfFix = time.strftime("%H:%M:%S", time.strptime(self.utc.split(".")[0],"%H%M%S"))
		self.altitude = float(self.altitude)
		logging.debug("GPPGAParser finished")

PyS60: Bluetooth GPS polling class

Threaded approach for reading NMEA data from a bluetooth GPS.

import thread, socket

class BTGPSPoller(object):
	def __init__(self, address):
		address, services = socket.bt_discover(address)
		self.target = (address, services.values()[0])
		self.active = True
		self.connected = False
		self.lock = thread.allocate_lock()
		self.sentances = list()
		
	def connect(self):
		if not self.connected:
			thread.start_new_thread(self.run, ())

	def run(self):
		print "BTGPSPoller thread activated"
		try:
			conn = socket.socket(socket.AF_BT, socket.SOCK_STREAM)
			conn.connect(self.target)
			self.connected = True
		except:
			print "Unable to connect"
		
		if self.connected:
			try:
				to_gps = conn.makefile("r", 0)
			except:
				print "Failure calling conn.makefile()"
			while self.active:
				#e32.ao_sleep(1)
				msg = None
				try:
					msg = to_gps.readline()
					if not msg == None and msg.startswith("$GPGGA"):
						gps_data = msg.split(",")
						if not gps_data[2] == "":
							self.lock.acquire()
							self.sentances.append(msg)
							if len(self.sentances) > 10:
								self.sentances.pop(0)
							self.lock.release()
				except:
					self.active = False

			try:
				to_gps.close()
				conn.close()
				self.connected = False
				print "Closed and disconnected"
			except:
				self.connected = False
				print "Unable to close"

	def disconnect(self):
		self.active = False
		print "Disconnecting from GPS"

	def getSentances(self):
		self.lock.acquire()
		l = self.sentances[:]
		self.lock.release()
		return l

Python: Transmitter. A convenience class to simplify HTTP POST handling.

I make no claims for quality of code :)

import httplib
try:
	import logging
	LOGGINGENABLED = True
except ImportError:
	LOGGINGENABLED = False

DOLOGGING = True

def log(s):
	if DOLOGGING:
		if LOGGINGENABLED:
			logging.info(s)
		else:
			print unicode(s)

class BaseTransmitter(object):
	"""Base class for all transmitters.
	
		By itself, it does nothing. Subclass this and use inherited addItem to inject parameters.
		Call transmit() when done.
		
		See classes BasicParam and BinaryParam, which are used with addItem"""
		
	def __init__(self):
		super(BaseTransmitter, self).__init__()
		self.items = []
		self.boundary = '------------ThIs_Is_tHe_bouNdaRY_$' # Kudos!

	def addItem(self, item):
		"""Adds an parameter to be sent in the request.
		
			The added item _must_ define a property 'part' that is a list of strings
			ending with a CRLF (\r\n). Do not include the boundary marker (handled
			in BaseTransmitter.transmit())
			
			See BasicParam and FileParam for supported use."""
			
		self.items.append(item)

	def transmit(self, host, selector, port=80):
		"""Sends request to specified host/selector on default port 80.
		
			Checks that there are items to send, otherwise raises an ValueError exception.
			
			Returns a tuple consisting of:
			- Returned document
			- server response code (e.g. '200' if all goes well)
			- server response string corresponding to response code
			- any RFC822 headers in the response from the server"""
			
		if len(self.items) < 1:
			raise ValueError, "Transmitter contains no items, will not send empty request."
		
		l = []
		CRLF = "\r\n"
		for item in self.items:
			l.append("--" + self.boundary + CRLF)
			l.extend(item.part)
		l.append("--" + self.boundary + CRLF)
		l.append('' + CRLF)
		body = ''.join(l)
		h = httplib.HTTP(host, port)
		h.putrequest('POST', selector)
		h.putheader('content-type', 'multipart/form-data; boundary=%s' % self.boundary)
		h.putheader('content-length', str(len(body)))
		h.putheader('host', host)
		h.endheaders()

		log("Transmitting to %s:%s %s" % (host, port, selector))
		h.send(body)
		errcode, errmsg, headers = h.getreply()

		return h.file.read(), errcode, errmsg, headers

class BasicParam(object):
	"""A basic parameter, i.e. key/value pair"""
	def __init__(self, key, value):
		super(BasicParam, self).__init__()
		
		if not value:
			value=""
		
		L=[]
		L.append('Content-Disposition: form-data; name="%s"' % key.encode("iso-8859-1"))
		L.append('')
		L.append(value.encode("iso-8859-1"))
		L.append('')
		self.part = '\r\n'.join(L)
		log("BasicParam %s added" % key)

class FileParam(object):
	"""A file parameter
	
		Specify the key used in the request, the absolute path to the file and optional
		content type for the fragment.
		
		Checks that the file exists and is actually a file, or raises an IOError."""
		
	def __init__(self, key, absolutefile, contentType="application/octet-stream"):
		import os.path
		super(FileParam, self).__init__()
		if not os.path.exists(absolutefile):
			raise IOError, 'Referenced file "%s" is invalid/nonexistent' % absolutefile
		if not os.path.isfile(absolutefile):
			raise IOError, 'Path does not point to a file (directory?)'

		import os.path
		f = open(absolutefile,'rb')
		L=[]
		L.append('Content-Disposition: form-data; name="%s"; filename="%s"' % (key.encode("iso-8859-1"), os.path.basename(absolutefile).encode("iso-8859-1")))
		L.append('Content-Type: ' + contentType)
		L.append('')
		L.append(f.read())
		L.append('')
		self.part = '\r\n'.join(L)
		log("FileParam %s added" % key)

class Base64Param(object):
	"""A base64 parameter contained in value. This will NOT be sent as a 'file'"""
	def __init__(self, key, value):
		import base64
		
		super(Base64Param, self).__init__()
		L=[]
		L.append('Content-Disposition: form-data; name="%s"' % key.encode("iso-8859-1"))
		L.append('')
		L.append(base64.encodestring(value))
		L.append('')
		self.part = '\r\n'.join(L)
		log("Base64Param %s added" % key)

class BinaryParam(object):
	"""A binary parameter which is contained in value. Filename will be set to YYYYMMDD_HHMMSS"""
	def __init__(self, key, value, contentType="application/octet-stream"):	
		import time
		
		super(BinaryParam, self).__init__()
		L=[]
		L.append('Content-Disposition: form-data; name="%s"; filename="%s"' % (key.encode("iso-8859-1"),time.strftime("%Y%m%d_%H%M%S")))
		L.append('Content-Type: ' + contentType)
		L.append('')
		L.append(value)
		L.append('')
		self.part = '\r\n'.join(L)
		log("BinaryParam %s added" % key)


Simple usage:

#!/usr/bin/python
from Transmitter import *

class LogTransmitter(BaseTransmitter):
	def __init__(self, host, paramName, paramValue, port=80):
		super(LogTransmitter, self).__init__()
		self.host = host
		self.selector = "/endpoint/"
		self.port = port

		self.addItem(BasicParam(paramName, paramValue ))

	def transmit(self):
		return super(LogTransmitter, self).transmit(self.host, self.selector, self.port)

t = LogTransmitter("domain.net", "myParam", "myValue")
print repr(t.transmit())

dbms database class

I'm at my first attempt at creating something for the PyS60 and I needed a simple sql wrapper class...
I'm an experienced programmer, but has never programmed in python for s60 and it's been a while since I've used Python... So I hope this will help somebody, not annoy ;-)

# Filename: db.py save in your Python dir
import e32db

class db:
    def __init__(self, dbpath):
	self.db = e32db.Dbms()
	self.dbv = e32db.Db_view()
	self.reset_counters()
	try:
	    self.db.open(unicode(dbpath))
	except:
	    self.db.create(unicode(dbpath))
	    self.db.open(unicode(dbpath))

    def reset_counters(self):
	self.affected_rows = 0
	self.num_rows = 0
	self.__internal_counter = 0

    def query(self, sql):
	self.reset_counters()
	if sql.lower().startswith('select'):
	    self.dbv.prepare(self.db, unicode(sql))
	    self.dbv.first_line()
	    self.num_rows = self.dbv.count_line()
	else:
	    self.affected_rows = self.db.execute(unicode(sql))

    def next(self):
	row = {'id': 0}
	if self.num_rows < 1:
	    self.reset_counters()
	    raise StopIteration
	elif self.__internal_counter < self.num_rows:
	    self.dbv.get_line()
	    for i in range(self.dbv.col_count()):
		row[i] = self.dbv.col(i+1)
	    self.dbv.next_line()
	    self.__internal_counter += 1
	    return row
	else:
	    self.reset_counters()
	    raise StopIteration

    def __iter__(self):
	return self


Now to create and fill db, create a file in the same dir as the db.py with this content:
# Change __exec_path to the path of your python script dir
__exec_path = "E:\\Python\\"
dbname = "test"

import sys
sys.path.append(__exec_path)
from db import db

# This will open E:\\Python\test.db - it will be created first, if not existing
mydb = db(__exec_path+dbname+'.db')
mydb.query("create table testing (id counter, name varchar)")
mydb.query("insert into testing (name) values ('test 1')")
mydb.query("insert into testing (name) values ('test 2')")



Now go ahead and have fun:
# Again change __exec_path to the path of your python script dir
__exec_path = "E:\\Python\\"
dbname = "test"

import sys
sys.path.append(__exec_path)
from db import db

# Opens E:\\Python\test.db
mydb = db(__exec_path+dbname+'.db')
mydb.query("select * from testing")
for row in mydb:
    print "-> ",row[0]," ",row[1]," <-"


Have fun!
Dan

PyS60 - DaemonS60

// description of your code here

import appuifw
import e32
import thread

lock = e32.Ao_lock()
lockTHR = thread.allocate_lock()

###### Sostituisci questa funzione con quello che vuoi fare eseguire
i = 0
def funzDaemon():
	global i
	while(1):
		lockTHR.acquire()
		e32.ao_sleep(1)
		open('E:\\Others\\tmp.txt', 'a').write(str(i)+"\n")
		i+=1
		lockTHR.release()
####################################################################

appuifw.app.title = u'DaemonS60'
appuifw.app.exit_key_handler = lambda:lock.signal()

print 'DaemonS60 - Start'

thread.start_new_thread(funzDaemon, ())

lock.wait()

print 'DaemonS60 - Stop'

Like a Touch screen in the air... Like a mouse with a pencil.

I had create a motion detection that find a black pixel in the square, and centralize on it.
Its very simple but very fun!

References
http://www.danilocesar.com/blog/2006/12/03/smartphones-aonde-podemos-parar

If you want to see this working:
http://www.youtube.com/watch?v=sjm_g9MSYPc



#################################################################
# Developed by Danilo Cesar [http://www.danilocesar.com]
# Inspired on:  http://www.bigbold.com/snippets/posts/show/636
#################################################################

from appuifw import *
from graphics import Image
import camera, e32

app.body = c = Canvas()

running = 1
def quit():
    global running
    running = 0

app.exit_key_handler=quit
app.title = u"O controle"
app.screen = 'full'   # or 'normal', 'large'

def getdata(im, bpp=24):
    import struct, zlib
    im.save('D:\\pixels.png', bpp=bpp, compression='no')
    f = open('D:\\pixels.png', 'rb')
    f.seek(8 +8+13+4)
    chunk = []
    while 1:
        n = struct.unpack('>L', f.read(4))[0]
        if n==0: break  # 'IEND' chunk
        f.read(4) # 'IDAT'
        chunk.append(f.read(n))
        f.read(4)   # CRC
    f.close()
    return zlib.decompress(''.join(chunk))  # '\x00' prefix each line


X = 80
Y = 60
while running:
    if X < 0: X = 0
    if Y< 0: Y = 0
    if X > 160 - 30: X = 160 - 30
    if Y > 120 - 30: Y = 120-30
    im = camera.take_photo('RGB', (160,120))
    im.rectangle([(X,Y),(X+30,Y+30)], 0xff0000)   # red outline
    # check hot spot whether active
    box = Image.new((30,30), 'L')  # gray scale
    box.blit(im, (X,Y,X+30,Y+30))
    data = getdata(box, 8)

    # check black
    for i in range(len(data)):
        if ord(data[i]) < 30 and ord(data[i]) > 0:
            X += i%31 - 15
            Y += int(i/31) - 15
            break

    c.blit(im, (0,0), (8,12))   # show camera
 


PyS60 - BabelFish

// Translate from language A to language B
// code not complete but it works

import urllib

####################################################################################### <BabelFish>
class BabelFish(object):
    
    def translate(self, lang, message):
        
        try:
            url = urllib.URLopener()
        
            query = urllib.urlencode({
                                      'doit':'done',
                                      'intl':'1',
                                      'lp':lang,
                                      'tt':'urltext',
                                      'urltext':message
                                      })
        
            responde = url.open('http://babelfish.altavista.com/tr', query).read()
        
            start = responde.find('<div style=padding:10px;>') + 25
            stop = responde.find('</div>', start)
        
            return responde[start:stop]
        
        except Exception, error:
            return '-' + str(error)
####################################################################################### </BabelFish>

####################################################################################### <BabelFishUI>
from graphics import *

import appuifw
import e32

class BabelFishUI(object):
    
    def __init__(self):
        
        self.__lock = e32.Ao_lock()
        self.__img = Image.new((176, 144))
        self.__language = 'it_en'
        self.__textUI = None
        
        appuifw.app.exit_key_handler = lambda:self.__lock.signal()
        
        appuifw.app.title = u'BabelFish v1.0'
        appuifw.app.body = self.__canvas = appuifw.Canvas(redraw_callback=self.updateScreen)
        
        appuifw.app.menu = [(u'Translate', lambda:self.__translateUI()), (u'About', lambda:appuifw.note(u'BabelFish: v1.0", "Created by\nWhite Tiger\n<Z-TEAM@Libero.it>', 'info')), (u'Exit', lambda:self.__lock.signal)]
        
        self.updateScreen(None)
        
        self.__menuMain = appuifw.app.menu
        self.__bgMain = appuifw.app.body
        
        self.__lock.wait()
    
    def updateScreen(self, rect):
        
        self.__canvas.blit(self.__img)
    
    def __back(self):
        
        appuifw.app.menu = self.__menuMain
        appuifw.app.body = self.__bgMain
        
        appuifw.app.set_tabs([u'Back'], lambda x:None)
    
    def __translateUI(self):
        
        self.__textUI = appuifw.Text()
                
        appuifw.app.menu = [(u'Translate', lambda:self.__translate()), (u'Language', lambda:self.__setLanguage()), (u'Clear', lambda:self.__textUI.clear()), (u'Back', lambda:self.__back())]
                   
        appuifw.app.body = self.__textUI
                
    def __setLanguage(self):
        
        resp = appuifw.selection_list([u'italiano-inglese', u'inglese-italiano', u'inglese-francese', u'francese-inglese', u'inglese-tedesco', u'tedesco-inglese',
                                       u'francese-italiano', u'italiano-francese'], 1)
        
        if resp == 0:
            self.__language = 'it_en'
        elif resp == 1:
            self.__language = 'en_it'
        elif resp == 2:
            self.__language = 'en_fr'
        elif resp == 3:
            self.__language = 'fr_en'
        elif resp == 4:
            self.__language = 'en_de'
        elif resp == 5:
            self.__language = 'de_en'
        elif resp == 6:
            self.__language = 'fr_it'
        elif resp == 7:
            self.__language = 'it_fr'
            
    def __translate(self):
        
        babel = BabelFish()
        
        resp = babel.translate(self.__language, self.__textUI.get())
        
        if resp[0] == '-':
            self.__textUI.set(unicode(resp[1:]))
        else:
            self.__textUI.set(unicode(': ' +self.__textUI.get() + '\n: ' + resp))
                
        appuifw.note(u'Translate', 'conf')
####################################################################################### </BabelFishUI>

if __name__ == '__main__':
    
    BabelFishUI()

PyS60 - listSMS to File.txt

// Salva la lista degli SMS in un file

from time import ctime

import codecs
import inbox

box = inbox.Inbox()
msg = box.sms_messages()

f = codecs.open('E:/Others/listSMS.txt', 'w', 'utf8') # Apre il file in codifica UTF8
for i in msg:
	f.write(box.address(i))
	f.write('\n')
	f.write(ctime(box.time(i))) # Converte i secondi in una stringa rappresentante il tempo
	f.write('\n')
	f.write(box.content(i))
	f.write('\n')
f.close()

print 'Fine'

f = codecs.open('E:/Others/listSMS.txt', 'r', 'utf8')
print f.read()
f.close()

PyS60 - SendFile

// Send File over Bluetooth

from appuifw import *
from e32socket import *

try:
    phone = bt_obex_discover()
    file = query(u'File Selection', 'text')
    bt_obex_send_file(phone[0], phone[1].values()[0], file)
    note(u'File Sent')
except Exception, error:
    note(unicode(error), 'error')

Python - Change user-agent

// Cambiare user-agent con urllib

import urllib

class AppURLopener(urllib.FancyURLopener):

	version = 'Nokia6630/1.0 (2.3.129) SymbianOS/8.0 Series60/2.6 Profile/MIDP-2.0 Configuration/CLDC-1.1'

urllib._urlopener = AppURLopener()


// Cambiare user-agent con urllib2

import urllib2

headers = { 'user-agent':'Nokia6630/1.0 (2.3.129) SymbianOS/8.0 Series60/2.6 Profile/MIDP-2.0 Configuration/CLDC-1.1',
		    'keep-alive':'300',
		    'content-type':'application/x-www-form-urlencoded'
		  }
« Newer Snippets
Older Snippets »
Showing 1-10 of 57 total  RSS