Distributed computing with remote threads in Python: rthread.py

Download

Documentation

Distributed computing with remote threads in Python: rthread.py

Author: Antti Kervinen, ask@cs.tut.fi

Introduction
============

Python standard library contains thread library which provides
primitives for writing multi-threaded programs. There are primitives
for locks (allocating, acquiring and releasing), getting an
identification of the thread, raising an exception in the main thread
and, most importantly, start_new_thread function for starting a new
thread.

rthread library introduced here mimics the interface of
thread to some extent. The objective is to make distributed
computing look and feel similar to multi-threaded programming.

Of course, a number of issues raises from the fact that the remote
threads are actually running on remote computers, in contexts
completely independent from the context of the main thread. First, the
program code for the thread as well as its arguments has to be located
in the remote peer before they can be executed. Second, there has to
be a mechanism for accessing local resources such as other functions
and global variables from the remote peer. Related to that, there is
the problem synchronising data between the local and remote
environments. How these issues are handled in rthread library will
be explained in the following sections.

At the time of writing this document, rthread library is at very
early stage of development. The library can be already used, just like
the proof-of-concept distributed mandelbrot viewer shows, but the set
of supported features of the language are still limited.

How it works
============

--- Figure 1: Hello from remote thread ---------------------------------
# example_hello.py

from rthread import start_remote_thread, status_finished
import time

def print_hello():
    import commands
    print "Hello from %s" % commands.getoutput("uname -o")
    return 10

r=start_remote_thread(print_hello,(),
                      remotehost="pikkukorppi.cs.tut.fi")

time.sleep(2)
print "Finished: %s" % (r.status==status_finished)
print "Returned: %s" % r.retval
------------------------------------------------------------------------

Figure 1 contains a simple example that runs a remotely
a hello world thread. The thread prints hello from the operating
system in which the thread is executed.

When start_remote_thread is called the following actions
take place.

1. A new local thread is created.

2. RemoteRunner object is returned to the main thread. The object
   contains status information on the remote thread and holds the
   return value of the remotely executed function.

3. Connection to rthread server is established. The new thread
   executes a shell command which creates SSH connection to the remote
   host and remotely executes rthread_server. The command is currently

   ssh -C -c blowfish  "rthread_server"

   Standard input and output pipes of the shell process will be used
   as the communication channel. This is the default way of creating a
   connection at the moment. Other communication channels (direct
   TCP/IP, for example) can be implemented easily later on.

4. Function and the arguments are sent to the remote host.  The
   function is sent to the server as a dictionary that contains
   bytecode of the function and its closest contex: global variables
   that can be accessed by the function and the call information on
   other functions that the function can call. The call information
   specifies which functions can be sent to remote host if needed
   (RETRIEVABLE_FUNCTION in rthread code) and which must be executed
   locally (FORWARD_CALL).

5. The function is executed in the remote peer. If a retrievable
   function, that is a function that can be but has not yet been
   copied to the remote peer, is called during the execution, the
   server requests the code from the local thread. When received, the
   code is stored to the global namespace in the remote peer. Thus
   only the first call of a retrievable function causes the delay. If
   a called function is forced to local execution, then the server
   sends the name of the function and its arguments to the local
   thread, which executes the function with the arguments and finally
   sends the return value back to the remote peer.

6. Return value of the remotely run function is sent to the local
   thread. The local thread stores the return value to the
   RemoteRunner object. The local thread terminates.

Note that remote peers are expected to include only a standard Python
with rthread library and rthread_server program.
Especially, any resources related to the software that uses rthread
are not required to be located initially in remote peers.

Automatic synchronisation of global variables between peers is not
supported. Therefore, remote threads should neither directly change
the values of global variables, nor directly read the values of
variables that may be changed by other threads. That is, the global
variables accessed by remote threads should be actually constants.
Accessing shared memory should be done by calling functions that have
been forced local. These functions are executed in local threads
created at step 1 above. An example of this is shown in
Figure 2.

--- Figure 2: Accessing a global variable from a remote thread ---------
# example_sharedmem.py

from rthread import start_remote_thread, status_finished
import time

fibnums=[1,1]

def append_fibnum(x):
    # a global variable is written, so this function
    # should always be executed locally
    global fibnums
    fibnums.append(x)

def calc_fibnums(count):
    localfibnums=[1,1]
    for i in range(count):
        newfibnum=localfibnums[-1]+localfibnums[-2]
        localfibnums.append(newfibnum)
        append_fibnum(newfibnum)

r=start_remote_thread(calc_fibnums,(10,),
                      remotehost="pikkukorppi.cs.tut.fi",
                      keep_local={append_fibnum:0})

while r.status!=status_finished: time.sleep(0.1)

print "Finished: %s" % (r.status==status_finished)
print "Returned: %s" % r.retval
print "Local list: %s" % fibnums
------------------------------------------------------------------------


How it is implemented
=====================

The communication between the local and the remote peers has two
layers. Comms object implements a simple point-to-point communication
protocol over any channels that has receive and send functions and
optionally a function for flushing send buffer. The protocol offers
send and receive methods that are capable of handling arbitrarily long
messages.

If the protocol receives data that is not part of any message sent
with the protocol, the protocol prints the data to the standard
output. This enables printing text data from remote threads directly
with print command and receiving uncaught exceptions to the output of
the local peer. However, currently the data that remote threads print
to standarad error becomes also printed to the starndard output by the
local thread. The protocol packets have a two-byte header (0x16,
0x00), followed by the length of the data (number in ASCII) and the
data. Therefore, the packets can be distinguished from plain text in
the same channel.

Some optimisations have been implemented for faster restarting of
remote threads.  Remote servers stays listening their input channels
even if executions of remotely run functions is over. Received
functions and variables are also saved in the context. Connections to
the remote servers are pooled in the local machine and the functions
sent to each remote peer are remembered. This makes it possible for a
local thread to choose a free remote server and send only arguments
for the function that has been executed most recently in the
server. Only one message is then sent to the remote peer and nothing
else but the arguments need to be serialised.

There is a tradeoff in the communication between peers before
launching a remote thread. Sending all information that may be used by
the remote thread may cause sending tons of data that may turn out to
be unnecessary in the end. On the other hand, sending only the
information that is needed for launching a thread may cause the remote
peer request for more information later on. In the current rthread,
the latter choice has been taken. Only the program code that is
required for starting the execution is sent at first, and more will be
sent on-demand.


Future work
===========

o More datatypes should be supported. Currently, only a limited number
  of built-in types (int, float, str, tuple, list, dictionary and
  function) can be sent and received between the local and remote
  peers. Normal objects in general would be an important extension.

o Communication protocol should be cleaner so that also binary data
  could be safely printed by remote threads. Also standard output and
  standard error should be distinguished.

o Further optimisation. It may not be always important for the remote
  peer to wait for the return value from forwarded function call. An
  asynchronous call would allow the remote peer to continue the
  execution immediately.

o Some policies should be decided on file IO, for example. Should the
  remote thread access local or remote files by default?

o Automatic distribution of data structures would be an interesting
  research topic.


References
==========

[1] Documentation of the thread module in the standard
    library of Python.
    http://www.python.org/doc/current/lib/module-thread.html


Antti Kervinen email: ask@cs.tut.fi
Back to my homepage

Created: Tue Dec 12 12:12:12 EEST 2006
Last modified: Mon Nov 10 00:00:53 EET 2008