telepathy-python ‘echo’ protocol – step 2

right then!  let’s get straight into it – we need to add the following to our ‘caveConnection’ class from step 1:

  1. long-exposure photograph of circular light trails in a tunnel

    sciee di luce by i k o on

    a Connection.Interface.Requests interface (as required by 0.17.23 of the telepathy spec) for creating new communication channels

  2. a Connection.Interface.Contacts interface (as required by 0.17.23 of the telepathy spec) so we can say who we want to talk to .. even though we’re actually not talking to anyone yet
  3. a Channel Manager (not an interface) which can spawn Channel objects on request
  4. a Connect method that issues the required signals
  5. a Disconnect method which does the signals and cleanly shuts down the connection
  6. some extra magic in support of the above gleaned from the telepathy-python source and butterfly source.

the actual Contacts interface and Channel Manager are split off into more separate files we’ll come to in a minute, so here’s the resultant, updated


import dbus, weakref
from telepathy.server import Connection, ConnectionInterfaceRequests, Handle

from Constants import PROTOCOL, PROGRAM
from Contacts import caveContacts
from ChannelManager import caveChannelManager

# many connections per manager -> class for connections
# make a fancy new-type connection with a 'Requests' interface
class caveConnection(Connection,

    def __init__(self, manager, parameters):
        self._manager = weakref.proxy(manager)
        # create a new channel manager and tell it we're it's connection
        self._channel_manager = caveChannelManager(self)

        # assume we have an 'account' name passed to us
        Connection.__init__(self, PROTOCOL, parameters['account'], PROGRAM)

        self._self_handle = Handle(
            self.get_handle_id(), HANDLE_TYPE_CONTACT,
        self._handles[HANDLE_TYPE_CONTACT, self._self_handle.get_id()] =\

    # borrowed from butterfly, required by telepathy's channel init
    def handle(self, handle_type, handle_id):
        self.check_handle(handle_type, handle_id)
        return self._handles[handle_type, handle_id]

    def Connect(self):

    def Disconnect(self):
        # stop handling all channels
        # stop handling this connection

so, getting somewhat more involved but still not too hairy!

the Contacts interface is handled by extending a telepathy-python ConnectionInterfaceContacts object, just as we have extended its Connection object above.  This time we need to add two things:

  1. implement the ContactAttributeInterfaces property
  2. make the GetContactAttributes return some minimal contact details
  3. a bit more magic copied from butterfly

and that looks like this (which i’ve saved as


import dbus
from telepathy.server import ConnectionInterfaceContacts

# Contacts interface with our minimal requirements implemented
class caveContacts(ConnectionInterfaceContacts):
    def __init__(self):
            {'ContactAttributeInterfaces' :
            lambda:  dbus.Array([CONNECTION], signature='s')})

    # Overwrite the dbus attribute to get the sender argument
    @dbus.service.method(CONNECTION_INTERFACE_CONTACTS, in_signature='auasb',
        out_signature='a{ua{sv}}', sender_keyword='sender')
    def GetContactAttributes(self, handles, interfaces, hold, sender):

        # this is required to allow the channel to close down correctly
        if hold: self.HoldHandles(HANDLE_TYPE_CONTACT, handles, sender)

        ret = dbus.Dictionary(signature='ua{sv}')
        for handle in handles:
            ret[handle] = dbus.Dictionary(signature='sv')
            ret[handle][CONNECTION + "/contact-id"] =\
                self.InspectHandles(HANDLE_TYPE_CONTACT, [handle])[0]
        return ret

next up is the Channel Manager.  here we extend the telepathy-python ChannelManager by specifying which channel types can be created (just the text type here) and by providing a callback to create a new channel of that type.

that looks like this (saved as


import dbus
from telepathy.server import ChannelManager

from Channel import caveTextChannel

# a Channel Manager with our required channels built in
class caveChannelManager(ChannelManager):
    def __init__(self, conn):
        self.__text_channel_id = 0
        ChannelManager.__init__(self, conn)
        # ChannelManager magic for handling channels
            CHANNEL_TYPE_TEXT, self._get_text_channel, [
            # accepting text channels to/from a contact allows empathy to
            # offer 'new conversation'
            ({CHANNEL_INTERFACE + '.ChannelType': CHANNEL_TYPE_TEXT,
            CHANNEL_INTERFACE + '.TargetHandleType':
            [CHANNEL_INTERFACE + '.TargetHandle',
            CHANNEL_INTERFACE + '.TargetID'])

    def _get_text_channel(self, props):
        # make up a name for the channel
        path = "TextChannel%d" % self.__text_channel_id
        self.__text_channel_id += 1
        return caveTextChannel(self._conn, self, props,

you can see at the end there that the callback returns a caveTextChannel object.  can you guess what that is?!  yup, we’re extending telepathy-python’s ChannelTypeText class in order to provide a Channel.Type.Text interface for our Channel object.  here we add the following:

  1. override the Send method to issue the appropriate signals including the received ‘echo’ of the sent message
  2. override the Close method to shut things down nicely and issue signals

we really ought to use the newer SendMessage method but the old Send is simpler and, i think, sufficient for this demonstration.

so, that looks like this:


from time import time
from telepathy.server import ChannelTypeText

class caveTextChannel(ChannelTypeText):
    def Send(self, message_type, text):
        # tell the chat client, "yeah, i sent that"
        self.Sent(int(time()), message_type, text)
        # now tell the chat client we got a reply 🙂
        self.Received(0, int(time()), self._handle.get_id(),\
            message_type, 0, text)

# make sure the channel shuts down properly
    def Close(self):

and we’re done!

save that lot and fire up empathy (or equivalent) and add an ‘echo’ account.  since we haven’t implemented a contact list (as there aren’t actually any real contacts out there) you need to start conversations ‘blind’ – go to ‘New Conversation…’ in the ‘Chat’ menu, make up a contact name and click ‘Chat’.  a chat window should appear and any message you ‘send’ should get sent right back to you.

screenshot of 'echo' chat

the "five year old child" chat protocol

next up, extend the Connection and Channel to talk to a Real Thingany Real Thing you like!  i’m using email.  🙂

do leave a comment if you use this to ‘chat’ with any other non-chat things.

telepathy-python ‘echo’ protocol – step 1

i thought i’d make a telepathy plugin so that i could use empathy to talk to some colleagues who don’t have access to an IM client and instead send me many short messages over email.

the telepathy dbus API is documented and there is documentation for telepathy-python but the examples are all on the client side.  for a server-side example the docs refer us to the ‘butterfly’ MSN connector (citation required) .. which is quite big.

a shaft of light in a sandy cave

"Beam Me Up" by Kevin Eddy on flickr

so, several days of experimentation, reading telepathy-python and butterfly source got me a working email connector and i thought it’d be useful to strip that down even further to an “echo” connector – which just repeats anything you say, like an annoying kid brother, as a reference and example.

what i’ve NOT created is a “reference implementation” – i don’t understand the dbus API docs well enough for that.  i think i’ve left out lots of required elements.  but it Works™ – at least empathy will let me create an account, start a new conversation and type stuff, which gets repeated.  i can disable and re-enable the account without breaking anything – which is nice.

so, today:

step 1 – advertising the service

you will need: dbus, a telepathy client (e.g. empathy), the telepathy python module (and whatever modules it depends on)

the spec says we should have a name for the messaging protocol we’re implementing and a different name for the program which implements it.  so i’m calling the protocol “echo” and the program “cave”.  the actual script will be called “telepathy-cave” in line with similar programs.

here’s the code for the main program:


# IMPORTANT! makes asynchronous dbus things work
from dbus.mainloop.glib import DBusGMainLoop

# get the mainloop before creating the ConnectionManager
# so that dbus (telepathy) can use it
from gobject import MainLoop
ml = MainLoop()

from cave import caveConnectionManager

# get telepathy to start listening for our dbus stuff via the mainloop
# and ... Go!

the most obvious thing is that it’s running a gobject MainLoop – you wouldn’t want it setting up the Connection Manager and then quitting!  the dbus python library uses the mainloop to do it’s event-driven stuff.

that DBusGMainLoop() had me foxed for days.  the error message you get if you miss it out:

RuntimeError: To make asynchronous calls, receive signals or export objects, D-Bus connections must be attached to a main loop by passing mainloop=… to the constructor or calling dbus.set_default_main_loop(…)

isn’t all that helpful – since we can’t fix it by “passing mainloop=” or by “calling dbus.set_default_main_loop” because it’s telepathy-python that’s doing that bit.  googling for telepathy gets a whole bunch of unhelpful E.S.P. articles!  and the butterfly code uses the old way of doing this .. which is Evil™! – it goes like this:

import dbus.glib

nope, i haven’t missed anything out there – just importing the module makes a critical something happen.  like i said, not good.  glad they’ve got rid of that – just need to update butterfly to use the new method!  once i’d figured this out i noticed that all the current examples in the documentation do this right – it’s even the very first piece of example code – unfortunately i’d given up looking there because it all seemed client-based.

anyhoo, the last thing that happens before we kick off the MainLoop is we import and create the caveConnectionManager.  that’s my telepathy Connection Manager which looks like this (you can save this as


# create a telepathy ConnectionManager for 'cave'
import dbus
from telepathy.server import ConnectionManager

from Constants import PROTOCOL, PROGRAM
from Connection import caveConnection

class caveConnectionManager(ConnectionManager):
    def __init__(self):
        ConnectionManager.__init__(self, PROGRAM)
        # use telepathy magic to provide required methods
        self._protos[PROTOCOL] = caveConnection

    def GetParameters(self, proto):
        from telepathy import NotImplemented, CONN_MGR_PARAM_FLAG_REQUIRED
        if proto != PROTOCOL:
            raise NotImplemented('unknown protocol %s' % proto)
        return [('account', CONN_MGR_PARAM_FLAG_REQUIRED, 's', '')]

the connection manager sets itself up on the dbus session bus letting the world know that it can create telepathy connections to handle a certain set of protocols – in this case, only “echo”.  when a client requests (over dbus) a ‘echo’ connection the connection manager will create a new Connection instance to handle the request.

you can see that the telepathy module has done plenty of the work for us – there’s no code here for talking to dbus or handling client requests – we just need to tell it our name and what protocol we’re providing and what class does the actual work.  it’s even set up the GetParameters dbus method for us – we just override it to provide the correct response.

to complete the picture we need a couple more files – and  Constants is nice and easy:


PROTOCOL = 'echo'
PROGRAM = 'cave'

i just put them there so they didn’t get duplicated across files. will get a bit more involved later on but here’s what it looks like for now while it’s not doing anything at all:


from telepathy.server import Connection
from Constants import PROTOCOL, PROGRAM

class caveConnection(Connection):

    def __init__(self, manager, parameters):
         # assume we have an 'account' name passed to us
        Connection.__init__(self, PROTOCOL, parameters['account'], PROGRAM)

so, much like the Connection Manager, we let the telepathy module do the work, implementing the basis of the Connection interface API for us.

if you save that lot and run telepathy-cave then you should see “echo” in the list of protocols available in empathy’s “add new account” interface and you should be able to create a new account using it.

next up, creating a channel and sending messages on it!

what to backup

to celebrate the release of fedora 15 and in the spirit of the “release early” part of “release early, release often” i’ve decided to post my latest script.

this one’s a bit more involved than the last ones but the task is to answer this question:

what files do i need to back up from my fedora system?

in particular there are a few problems to solve:

  1. i don’t want to waste space backing up unmodified files from software packages.
  2. i want to see which parts of the file system are using up the most backup space so i can see if i can do something about that.
  3. i need a concise summary of what needs to be backed up so i can review it.  if an entire directory tree needs to be backed up then i only want to see that mentioned once, not a line of output for each file!

if it wasn’t for 2 and 3 i could just rpm -Va and then maybe something like find / -exec rpm -q | grep “no package” but that’s all rather fiddly and produces vast, unmanageable lists of files.  so i turn to python.  🙂

what i’ve ended up with is a tiny app, a file/package scanning class and a helper class for showing progress – checking every file on the system takes hours so you want to know how you’re getting on.

so, first up, the progress display class.  i’ve saved this as

# coding=utf-8

# a utility function taken from stackoverflow
def getTerminalSize():
    returns (lines:int, cols:int)
    import os, struct
    def ioctl_GWINSZ(fd):
        import fcntl, termios
        return struct.unpack("hh", fcntl.ioctl(fd, termios.TIOCGWINSZ, "1234"))
    # try stdin, stdout, stderr
    for fd in (0, 1, 2):
            return ioctl_GWINSZ(fd)
    # try os.ctermid()
        fd =, os.O_RDONLY)
            return ioctl_GWINSZ(fd)
    # try `stty size`
        return tuple(int(x) for x in os.popen("stty size", "r").read().split())
    # try environment variables
        return tuple(int(os.getenv(var)) for var in ("LINES", "COLUMNS"))
    # i give up. return default.
    return (25, 80)

from datetime import datetime, timedelta
from os.path import commonprefix
from sys import stderr
class cmdmsg():
    def __init__(self, interval = timedelta(0, 1, 0)):
        self.msg = ""
        self.height, self.width = getTerminalSize()
        self.last =
        self.interval = interval

    def say(self, msg, interval = None):
        if interval == None: interval = self.interval
        if - self.last < interval: return
        self.last =
        # multi-byte characters really futz with this stuff
        msg = msg.replace("\t", " ").decode(
            "utf8", 'replace').encode("ascii", 'replace')
        if len(msg) > (self.width - 1):
            ends = self.width / 2 - 2
            msg = msg[:ends] + "..." + msg[-ends:]
        offset = len(commonprefix([self.msg, msg]))
        # BS moves cursor but doesn't appear to remove content - so print spaces
        if len(self.msg) > len(msg):
            extra = len(self.msg) - len(msg)
            stderr.write("\b" * extra + " " * extra)
        stderr.write("\b" * len(self.msg[offset:]) + msg[offset:])
        self.msg = msg

    def saynow(self, msg):
        self.say(msg, timedelta(0, 0, 0))

    def end(self):

    def spit(self, msg):
        stderr.write("\r" + " " * len(self.msg) + "\r" + msg + "\n" + self.msg)

this allows the scanning module to write and overwrite progress messages to the terminal without lots of annoying scrolling (which takes a lot of CPU and means you lose key messages).

next, the scanning module.  i didn’t want to maintain a complete list of all files on the system in a big array and i wanted to do the summarising as i went along, so this has got some tricksy fiddling around with ‘references’ into a big dictionary hierarchy.

(each level of filesystem hierarchy uses two levels of hierarchy in the dictionary.  this is because the dictionary entry for a folder doesn’t have the sub-folders as keys, it contains a set of metadata keys and a ‘dirs’ key for the sub-folders.)

but basically it allows you to maintain a list of files and directories with the required information – should they be backed up or not.

# coding=utf-8

# a utility function
def fileSize(bytes):
    suffix = [' bytes', 'K', 'M', 'G', 'T', 'P', 'E']
    size = float(bytes)
    index = 0
    while size >= 1000 and index < len(suffix) - 1:
        index += 1
        size /= 1024
    return str(int(round(size))) + suffix[index]

from pwd import getpwuid
import yum
from datetime import datetime, timedelta
from os.path import commonprefix
from cmdmsg import cmdmsg
from os import path, stat

class pkgScanner():
    def __init__(self):
        self._lastroot = ""
        self._results = {}
        self._rootpath = []
        self._mounts = []
        self._root = ""
        self._rpmva = {}
        self._cm = cmdmsg(timedelta(0, 0, 25000))
        self._cd = self._results

    def __str__(self):
        return "scanned:\n" + self._pprec() + "\nnot scanned:\n" + "\n".join(

# see if sub folders can be 'collapsed' into their parent
    def _check(self, folder, thisroot = None):
        removes = []
        if 'dirs' in folder:
            for sub in folder['dirs']:
# copy sizes to parent so it has totals for the whole tree
                if 'savesize' in folder['dirs'][sub] and\
                    if 'savesize' not in folder: folder['savesize'] = 0
                    folder['savesize'] += folder['dirs'][sub]['savesize']
                if 'unmodifiedsize' in folder['dirs'][sub] and\
                    if 'unmodifiedsize' not in folder: folder['unmodifiedsize'] = 0
                    folder['unmodifiedsize'] += folder['dirs'][sub]['unmodifiedsize']
                if not len(folder['dirs'][sub]):
                    if thisroot: self._cm.spit(
                        "removing empty " + sub + " from " + thisroot)
                elif 'dirs' not in folder['dirs'][sub] and\
                    'unmodified' not in folder['dirs'][sub] and\
                    'save' in folder['dirs'][sub]:
                    if 'save' not in folder: folder['save'] = []
                    if thisroot: self._cm.spit(
                        "removing all new/modified " + sub + " from " + thisroot)
                elif 'dirs' not in folder['dirs'][sub] and\
                    'unmodified' in folder['dirs'][sub] and\
                    'save' not in folder['dirs'][sub]:
                    if 'unmodified' not in folder: folder['unmodified'] = []
                    if thisroot: self._cm.spit(
                        "removing all unmodified " + sub + " from " + thisroot)
            for sub in removes:
                del folder['dirs'][sub]
            if not len(folder['dirs']): del folder['dirs']

# figure out which folders should be checked now
    def _checkpath(self):
        self._rootpath = self._root.split("/")
        if self._rootpath[-1] == "": self._rootpath = self._rootpath[:-1]
        lastrootpath = self._lastroot.split("/")
        if lastrootpath[-1] == "": lastrootpath = lastrootpath[:-1]
        if len(self._rootpath) <= len(lastrootpath):
            # find common path of root and lastroot
            n = 1 # skip leading blank before "/"
            folder = self._results
            while n < len(self._rootpath) and self._rootpath[n] == lastrootpath[n]:
                folder = folder['dirs'][self._rootpath[n]]
                n += 1
            checkpath = "/".join(lastrootpath[:n])
            tocheck = []
            checkpaths = []
            while n < len(lastrootpath):
                folder = folder['dirs'][lastrootpath[n]]
                checkpath += "/" + lastrootpath[n]
                n += 1
            for index, folder in enumerate(tocheck):

# nicely formatted 'pretty print' of hierarchy
# should probably just make this two levels per recursion
# rather than checking if depth % 2
    def _pprec(self, p = None, depth=0):
        if not p:
            p = self._results
            output = "/"
        else: output = ""
        if type(p) is dict:
            if not depth % 2:
                if 'unmodifiedsize' not in p and 'savesize' not in p:
                    output += " (empty)"
                elif 'savesize' not in p or not p['savesize']: output += " (none)"
                elif 'unmodifiedsize' not in p or not p['unmodifiedsize']:
                    output += " (all)"
                else: output += " save " + fileSize(p['savesize']) + "/" +\
                    fileSize(p['savesize'] + p['unmodifiedsize']) + "=" +\
                    str((100 * p['savesize']) / (p['savesize'] +\
                    p['unmodifiedsize'])) + "%"
                if 'save' in p:
                    names = ", ".join(p['save'])
                    if len(names) > 30: names = names[:30] + "..."
                    output += " (save " + str(len(p['save'])) + " local: " + names + ")"
                if 'unmodified' in p:
                    names = ", ".join(p['unmodified'])
                    if len(names) > 30: names = names[:30] + "..."
                    output += " (unmodified " + str(len(p['unmodified'])) + " local: " +\
                        names + ")"
                output += "\n"
                if 'dirs' in p: output += self._pprec(p['dirs'], depth + 1)
                output += ''.join("  " * depth + str(x) + self._pprec(p[x],
                     depth + 1) for x in sorted(p))
        else: output += "  " * depth + str(p) + "\n"
        return output

    def dump(self, p = None, path = "/", depth=0):
        if not p:
            p = self._results
        output = ""
        if type(p) is dict:
            if not depth % 2:
                if 'save' in p:
                    output += "".join(path + x + "\n" for x in p['save'])
                if 'dirs' in p: output += self.dump(p['dirs'], path, depth + 1)
                if 'save' not in p and 'dirs' not in p: output += path + "/\n"
                output += ''.join(self.dump(p[x], path + x + "/",
                    depth + 1) for x in sorted(p))
        return output

    def setRoot(self, root):
        self._root = root
        self._lastroot = self._root

        self._cd = self._results
        for folder in self._rootpath[1:]: # skip blank before leading "/"
            if 'dirs' not in self._cd: self._cd['dirs'] = {}
            if folder not in self._cd['dirs']: self._cd['dirs'][folder] = {}
            self._cd = self._cd['dirs'][folder]

    def processFiles(self, files):
        if not files: return
        locallinks = []
        for f in files:
            thispath = path.join(self._root, f)
            if path.islink(thispath): locallinks.append(f)
        for link in locallinks: files.remove(link)
        linenum = 0
        morepackagesthanfiles = False
        packages = {}
        newfiles = {}
        for doc in files:
            thispath = path.join(self._root, doc)
            thisdoc = {'size': 0, 'owner': 0}
                thisstat = stat(thispath)
            except OSError: # assume permission denied
            thisdoc['size'] = thisstat.st_size
                thisdoc['owner'] = getpwuid(thisstat.st_uid).pw_name
            except KeyError:
                thisdoc['owner'] = thisstat.st_uid
            self._cm.saynow(thispath + " - providers")
# get yum to ask rpm if this file is from a package
            pckgs = self._yb.rpmdb.whatProvides(thispath, None, (None, None, None))
            if not len(pckgs): newfiles[doc] = thisdoc
                package = pckgs[0] # assume first match will do
                if package not in packages: packages[package] = {}
                packages[package][doc] = thisdoc

        modified = {}
        unmodified = {}
        pk = packages.keys()
        for p in pk:
            if p not in self._rpmva:
                self._cm.say(self._root + " - checking " + str(p))
                self._cm.saynow(self._root + " - " + str(p) + " - checking")
# get yum to ask rpm to verify this package
                self._rpmva[p] = dict((f, ", ".join(list(x.message for x in m)))
                    for f,m in self._yb.rpmdb.searchNevra(p[0], p[2], p[3], p[4],
                self._cm.saynow(self._root + " - " + str(p))
            for f in packages[p]:
                if path.join(self._root, f) in self._rpmva[p]:
                    modified[f] = packages[p][f]
                else: unmodified[f] = packages[p][f]
        #rpmva = {} # trash the cache - trade speed for memory
        if modified or newfiles:
            self._cd['save'] = modified.keys() + newfiles.keys()
            self._cd['savesize'] = sum(modified[x]['size'] for x in modified) +\
                sum(newfiles[x]['size'] for x in newfiles)
        if unmodified:
            self._cd['unmodified'] = unmodified.keys()
            self._cd['unmodifiedsize'] = sum(
                unmodified[x]['size'] for x in unmodified)

    def processFolders(self, dirs):
        localmounts = []
        locallinks = []
        for folder in dirs:
            thispath = path.join(self._root, folder)
            if path.islink(thispath): locallinks.append(folder)
            elif path.ismount(thispath): localmounts.append(folder)
        for link in locallinks: dirs.remove(link)
        for mount in localmounts:
            self._mounts.append(path.join(self._root, mount))

    def close(self):
        self._root = "/"

    def getRootPath(self):
        return self._rootpath

and so the actual app is nice and small.  it prints those progress messages and the final summary to stderr and the flat list of files and whole directories to stdout.  and it takes hours so i usually run it like this: time > backup-datetime.out; paplay –volume 30000 /usr/share/sounds/gnome/default/alerts/sonar.ogg

# coding=utf-8

from os import walk
from sys import stdout, stderr
from pkgscan import pkgScanner

ps = pkgScanner()
for root, dirs, files in walk("/"):


and that’s that.  oh, sometimes it can use up an awful lot of memory.  keep an eye on it.

what’s next?  i’d like to specify a set of starting points for the scan on the command line, maybe pass in an exclusions file.  also i want to check that i actually have permission to read those files i want to back up.

it’d be nice to be able to generate a backup list for my non-admin user, then pass that list in to the scanner when run as root to generate a short list of stuff that has to be backed up by root.

looking at the output generated so far i’ll need to start writing some (possibly plugin-based) rules to handle/exclude certain files – some config files should be diffed rather than just saved, some files should be backed up by their application’s own backup system (e.g. databases), some files should only be backed up when the user isn’t logged in, some only on shutdown/startup, some only in single user mode.

wnck FTW!

well, yesterday’s hack turned out to be pretty useless.  effectively it just converted hamster into zeitgeist journal (or gnome activity monitor)  – which is okay until you start using an app which isn’t a zeitgeist data provider or which doesn’t happen to emit a signal for the activity you’re performing.

so, i thought i’d have a go at converting hamster into creeper instead.  thanks to creeper for showing me how it’s done.  i’ve called this 😉


# a python version of creeper (a vala app) to monitor active windows
# actually turns hamster into creeper

# because the mainloop appears to catch exceptions
from traceback import print_exc
import hamster.client
class hamster_handler(hamster.client.Storage):
   def __init__(self): = None
   def add_fact(self, fact):
      # FIXME insert clever rules here
      fact = fact.replace(",", ";") # don't accidentally create descriptions
      fact = fact.replace("@", "(a)") # don't accidentally create categories
      hamster.client.Storage.add_fact(self, fact)

   def handler(self, scr, prev = None):
         if prev and != None: prev.disconnect(
         win = scr.get_active_window()
         if win:
   = win.connect("name_changed", self.name_handler)
      except KeyboardInterrupt: raise
   def name_handler(self, win):
      except KeyboardInterrupt: raise

hh = hamster_handler()
from gobject import MainLoop
ml = MainLoop()
from wnck import screen_get_default
sc = screen_get_default()
sc.connect("active_window_changed", hh.handler)
sc.connect("window_stacking_changed", hh.handler)

this is working pretty well for me so far .. on fedora 14.  just tried on 13 and the hamster python library is too old i think.  ah well – yet another reason to upgrade!

python, hamster and zeitgeist FTW!

update: note that this turned out not to be all that useful – tracking active windows into hamster turned out better.

in response to gnome bug 639018 and my general desire to track automatically what i’ve done, i’ve made a python script which connects to the zeitgeist activity monitor and copies its messages to the hamster time tracker.  it goes like this:


# monitor zeitgeist and do stuff
from zeitgeist.client import ZeitgeistClient
from zeitgeist.datamodel import TimeRange, Event
from gobject import MainLoop

import hamster.client
class hamster_handler(hamster.client.Storage):
   def handler(self, tr, ev):
      # because the mainloop appears to catch exceptions
      from traceback import print_exc
      from urlparse import urlparse
         # FIXME insert clever rules here
         app = urlparse(ev[0].actor).netloc
         desk = open("/usr/share/applications/" + app)
         comments = filter(lambda x: x.startswith("Comment[en_GB]="), desk)
         comment = comments[0].split("=")[1].strip()
         self.add_fact(comment + " - " + ev[0].subjects[0].text)

hh = hamster_handler()
ml = MainLoop()

It never ends until it’s killed so you’ll probably want to run it in the background – i’ve added it to my session ‘startup applications’.  if it doesn’t appear to be working then run it from the command line instead – you should see some error messages if it’s failing to update hamster.

on my fedora 14 system i only get updates for local text files, images and videos opened in gedit, EoG and totem.  on ubuntu i imagine you’ll get a lot more updates.  OTOH, on ubuntu the script will probably need some tweaking for the hard-coded paths and locale.

under surveillance

i’ve decided not to link this to my hamster-to-empathy updater – i don’t really want to broadcast a stream of every little thing i do .. particularly if my IM accounts include twitter and facebook status feeds.  🙂

python + dbus FTW!!!

was just thinking it’d be nice to be able to push a button to copy my current activity from Hamster to my work XMPP status in Empathy.  couldn’t find any such options or plugins for either app.  but with dbus (and dfeet) and python, how hard can it be?  turns out, this hard:


# get status from hamster, write it to empathy

myjid = "" # my XMPP node and domain - any XMPP resource

from dbus import SessionBus

def busAsPath(bus):
  return "/" + bus.replace(".", "/")

def jidToDbus(jid):
  return jid.replace(".", "_2e").replace("@", "_40").replace("/", "_2f")

ogh = "org.gnome.Hamster"
mybus = "org.freedesktop.Telepathy.Connection.gabble.jabber." + jidToDbus(myjid)
sb = SessionBus()

fact = sb.get_object(ogh, busAsPath(ogh)).GetCurrentFact()['name']
for bus in sb.list_names():
  if bus.startswith(mybus):
    sb.get_object(bus, busAsPath(bus)).SetPresence("available", fact,
    dbus_interface = "org.freedesktop.Telepathy.Connection.Interface.SimplePresence")

stuck that in a file, made a custom launcher for it on the gnome panel and put that next to hamster.  lovely.

this only sets the status on that one account. if you’ve got multiple accounts which can have their presence set then empathy won’t change what it displays in its status selector until they all match. easily done by removing the account-specific code and applying the SetPresence to every bus which startswith(“org.freedesktop.Telepathy.Connection.”) – then you need a try/pass to ignore any exceptions generated by attempting to SetPresence on accounts which don’t have one. 🙂

how CJK is your page?

i made a python app that scans the content of a web page to see how Japanese it is based on the characters used. so it could be Japanese, Chinese etc. here it is:

# -*- coding: utf-8 -*-

from sys import argv
from urllib2 import build_opener
from HTMLParser import HTMLParser

class jaHTMLParser(HTMLParser):

ja = nonja = 0
encoding = “utf-8”

def handle_starttag(self, tag, attrs):
     for attr in attrs:
         if tag == ‘meta’ and attr[0] == “content” and attr[1].find(“charset=”) != -1 :
         self.encoding = attr[1].split(“charset=”)[1]

def handle_data(self, data):
     if data in (“/*”, “*/”) or data.isspace(): return
    uni = data.decode(self.encoding)
     for c in uni:
         u8 = c.encode(“utf-8”)
        if u8 >= ‘⺀’ and u8 <= ‘𯨟’: self.ja += 1 # very approximate
         else: self.nonja += 1

def unknown_decl(self, data): pass # CDATA is not an error!

opener = build_opener()
opener.addheaders = [(‘User-agent’, ‘Mozilla/5.0’)] # avoid 403 forbidden

reader = jaHTMLParser()
reader.CDATA_CONTENT_ELEMENTS = [] # don’t treat any CDATA as textual content
print “%d%%” % (float(100 * reader.ja) / float(reader.nonja + reader.ja))