Never been to DZone Snippets before?

Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world

About this user

Magnus Persson http://www.derelik.net

« Newer Snippets
Older Snippets »
Showing 1-3 of 3 total  RSS 

Python: NMEA GPGGA parser

Not tested extensivly, use with caution! I used an LD-1W and it seems to work alright with this parser.

class GPGGAParser(object):
	import logging
	
	def __init__(self, sentance):
		import time, logging
		
		logging.debug("GPPGAParser started")
		logging.debug("Trying to parse: "+sentance)
		(self.format,
		 self.utc,
		 self.latitude, 
		 self.northsouth, 
		 self.longitude, 
		 self.eastwest, 
		 self.quality, 
		 self.number_of_satellites_in_use, 
		 self.horizontal_dilution, 
		 self.altitude, 
		 self.above_sea_unit, 
		 self.geoidal_separation, 
		 self.geoidal_separation_unit, 
		 self.data_age, 
		 self.diff_ref_stationID) = sentance.split(",")

		latitude_in=float(self.latitude)
		longitude_in=float(self.longitude)
		if self.northsouth == 'S':
			latitude_in = -latitude_in
		if self.eastwest == 'W':
			longitude_in = -longitude_in

		latitude_degrees = int(latitude_in/100)
		latitude_minutes = latitude_in - latitude_degrees*100
		
		longitude_degrees = int(longitude_in/100)
		longitude_minutes = longitude_in - longitude_degrees*100
		
		self.latitude = latitude_degrees + (latitude_minutes/60)
		self.longitude = longitude_degrees + (longitude_minutes/60)
		
		self.timeOfFix = time.strftime("%H:%M:%S", time.strptime(self.utc.split(".")[0],"%H%M%S"))
		self.altitude = float(self.altitude)
		logging.debug("GPPGAParser finished")

PyS60: Bluetooth GPS polling class

Threaded approach for reading NMEA data from a bluetooth GPS.

import thread, socket

class BTGPSPoller(object):
	def __init__(self, address):
		address, services = socket.bt_discover(address)
		self.target = (address, services.values()[0])
		self.active = True
		self.connected = False
		self.lock = thread.allocate_lock()
		self.sentances = list()
		
	def connect(self):
		if not self.connected:
			thread.start_new_thread(self.run, ())

	def run(self):
		print "BTGPSPoller thread activated"
		try:
			conn = socket.socket(socket.AF_BT, socket.SOCK_STREAM)
			conn.connect(self.target)
			self.connected = True
		except:
			print "Unable to connect"
		
		if self.connected:
			try:
				to_gps = conn.makefile("r", 0)
			except:
				print "Failure calling conn.makefile()"
			while self.active:
				#e32.ao_sleep(1)
				msg = None
				try:
					msg = to_gps.readline()
					if not msg == None and msg.startswith("$GPGGA"):
						gps_data = msg.split(",")
						if not gps_data[2] == "":
							self.lock.acquire()
							self.sentances.append(msg)
							if len(self.sentances) > 10:
								self.sentances.pop(0)
							self.lock.release()
				except:
					self.active = False

			try:
				to_gps.close()
				conn.close()
				self.connected = False
				print "Closed and disconnected"
			except:
				self.connected = False
				print "Unable to close"

	def disconnect(self):
		self.active = False
		print "Disconnecting from GPS"

	def getSentances(self):
		self.lock.acquire()
		l = self.sentances[:]
		self.lock.release()
		return l

Python: Transmitter. A convenience class to simplify HTTP POST handling.

I make no claims for quality of code :)

import httplib
try:
	import logging
	LOGGINGENABLED = True
except ImportError:
	LOGGINGENABLED = False

DOLOGGING = True

def log(s):
	if DOLOGGING:
		if LOGGINGENABLED:
			logging.info(s)
		else:
			print unicode(s)

class BaseTransmitter(object):
	"""Base class for all transmitters.
	
		By itself, it does nothing. Subclass this and use inherited addItem to inject parameters.
		Call transmit() when done.
		
		See classes BasicParam and BinaryParam, which are used with addItem"""
		
	def __init__(self):
		super(BaseTransmitter, self).__init__()
		self.items = []
		self.boundary = '------------ThIs_Is_tHe_bouNdaRY_$' # Kudos!

	def addItem(self, item):
		"""Adds an parameter to be sent in the request.
		
			The added item _must_ define a property 'part' that is a list of strings
			ending with a CRLF (\r\n). Do not include the boundary marker (handled
			in BaseTransmitter.transmit())
			
			See BasicParam and FileParam for supported use."""
			
		self.items.append(item)

	def transmit(self, host, selector, port=80):
		"""Sends request to specified host/selector on default port 80.
		
			Checks that there are items to send, otherwise raises an ValueError exception.
			
			Returns a tuple consisting of:
			- Returned document
			- server response code (e.g. '200' if all goes well)
			- server response string corresponding to response code
			- any RFC822 headers in the response from the server"""
			
		if len(self.items) < 1:
			raise ValueError, "Transmitter contains no items, will not send empty request."
		
		l = []
		CRLF = "\r\n"
		for item in self.items:
			l.append("--" + self.boundary + CRLF)
			l.extend(item.part)
		l.append("--" + self.boundary + CRLF)
		l.append('' + CRLF)
		body = ''.join(l)
		h = httplib.HTTP(host, port)
		h.putrequest('POST', selector)
		h.putheader('content-type', 'multipart/form-data; boundary=%s' % self.boundary)
		h.putheader('content-length', str(len(body)))
		h.putheader('host', host)
		h.endheaders()

		log("Transmitting to %s:%s %s" % (host, port, selector))
		h.send(body)
		errcode, errmsg, headers = h.getreply()

		return h.file.read(), errcode, errmsg, headers

class BasicParam(object):
	"""A basic parameter, i.e. key/value pair"""
	def __init__(self, key, value):
		super(BasicParam, self).__init__()
		
		if not value:
			value=""
		
		L=[]
		L.append('Content-Disposition: form-data; name="%s"' % key.encode("iso-8859-1"))
		L.append('')
		L.append(value.encode("iso-8859-1"))
		L.append('')
		self.part = '\r\n'.join(L)
		log("BasicParam %s added" % key)

class FileParam(object):
	"""A file parameter
	
		Specify the key used in the request, the absolute path to the file and optional
		content type for the fragment.
		
		Checks that the file exists and is actually a file, or raises an IOError."""
		
	def __init__(self, key, absolutefile, contentType="application/octet-stream"):
		import os.path
		super(FileParam, self).__init__()
		if not os.path.exists(absolutefile):
			raise IOError, 'Referenced file "%s" is invalid/nonexistent' % absolutefile
		if not os.path.isfile(absolutefile):
			raise IOError, 'Path does not point to a file (directory?)'

		import os.path
		f = open(absolutefile,'rb')
		L=[]
		L.append('Content-Disposition: form-data; name="%s"; filename="%s"' % (key.encode("iso-8859-1"), os.path.basename(absolutefile).encode("iso-8859-1")))
		L.append('Content-Type: ' + contentType)
		L.append('')
		L.append(f.read())
		L.append('')
		self.part = '\r\n'.join(L)
		log("FileParam %s added" % key)

class Base64Param(object):
	"""A base64 parameter contained in value. This will NOT be sent as a 'file'"""
	def __init__(self, key, value):
		import base64
		
		super(Base64Param, self).__init__()
		L=[]
		L.append('Content-Disposition: form-data; name="%s"' % key.encode("iso-8859-1"))
		L.append('')
		L.append(base64.encodestring(value))
		L.append('')
		self.part = '\r\n'.join(L)
		log("Base64Param %s added" % key)

class BinaryParam(object):
	"""A binary parameter which is contained in value. Filename will be set to YYYYMMDD_HHMMSS"""
	def __init__(self, key, value, contentType="application/octet-stream"):	
		import time
		
		super(BinaryParam, self).__init__()
		L=[]
		L.append('Content-Disposition: form-data; name="%s"; filename="%s"' % (key.encode("iso-8859-1"),time.strftime("%Y%m%d_%H%M%S")))
		L.append('Content-Type: ' + contentType)
		L.append('')
		L.append(value)
		L.append('')
		self.part = '\r\n'.join(L)
		log("BinaryParam %s added" % key)


Simple usage:

#!/usr/bin/python
from Transmitter import *

class LogTransmitter(BaseTransmitter):
	def __init__(self, host, paramName, paramValue, port=80):
		super(LogTransmitter, self).__init__()
		self.host = host
		self.selector = "/endpoint/"
		self.port = port

		self.addItem(BasicParam(paramName, paramValue ))

	def transmit(self):
		return super(LogTransmitter, self).transmit(self.host, self.selector, self.port)

t = LogTransmitter("domain.net", "myParam", "myValue")
print repr(t.transmit())
« Newer Snippets
Older Snippets »
Showing 1-3 of 3 total  RSS