Python from 0 to infinity

From workbench setup to threading and CPU load distribution, passing through database management and package creation. A summarized set of useful snippets to offload your memory and come back and check when needed.

Environment setup

Set up a Virtual Environment

Virtual environments allow working with different versions of packages in different projects on one machine, making them more portable.

Installation

Add the virtualenv  package using pip. If you need to use pip behind a proxy, use:

pip --proxy http://user:pass@proxyAddress:proxyPort install ...

Windows needs some special instructions:

pip install virtualenv
pip install virtualenvwrapper-win

Create the virtualenv

In your work folder, run:

virtualenv VIRTUAL_ENV_NAME
route_to_work_dir/VIRTUAL_ENV_NAME/Scripts/activate

or, to create it directly in your_user/Envs

mkvirtualenv VIRTUAL_ENV_NAME

cd into your dev folder and run

setprojectdir .

When you are done, issue deactivate

Every time you want to work on this virtual env, just type workon $VIRTUAL_ENV

 

 

 

A MATLAB-like IDE for Python: Spyder

You can have it running in a minute, absolutely free.

pip install spyder

and then, open it with

python -m spyder.app.start

 

 

Anaconda

If you use Anaconda, the commands for virtual envs are similar:

conda create -n myenv anaconda spyder

Then, anytime you want to work on it, type:

activate myenv

To launch a Matlab-like IDE for python, activate your virtual env and run spyder.

activate orb_conda
spyder

If you work behind a proxy, set the environment variables to:

http_proxy: http://username:password@corp.com:8080 

https_proxy: https://username:password@corp.com:8080

Inside a conda environment, you can use both conda install and pip install. In case the command conda does not work, you have to add it to your PATH.

Comment blocks in Spyder are great for development and testing, they are separated with #%%. You can run a block with CTRL+ENTER like in Matlab.

 

 

 

 

Creating a package

__init__.py

If you plan to package an distribute your scripts, this file should be in the root of your sources dir ( /my_proj/packagename/__init__.py ).  It lists all the methods that are available to use by the script that imports this package:

 

__main__.py

It is very important for packages to be run in __main__.py, this allows to execute the main code only  when the module is run with -m. Otherwise, it, would be run every time a module simply imports the package, which may lead to undesired behavior.

def main():
  pass

if __name__ == "__main__":
  main()

 

setup.py

This is the file used by the packaging app. In it, you can add multiple attributes. One of the most important is the install_requires. At the time of installing your package, all the packages listed here will be installed too.

from setuptools import setup

def readme():
 with open('README.rst') as f:
 return f.read()

setup(name='orbbit',
 version='0.1',
 description='A language-independent API for cryptocurrency trading robots.',
 long_description=readme(),
 classifiers=[
 'Development Status :: 3 - Alpha',
 'Programming Language :: Python :: 3',
 ],
 keywords='cryptocurrency trading robots',
 url='https://github.com/user/repo',
 author='The best people.',
 author_email='name@email.com',
 license='Closed source',
 packages=find_packages(),
 include_package_data=True,
 install_requires=[
 'datetime',
 ],
 scripts=['bin/start_hi'],
 entry_points={
 'console_scripts': [
 'start_main = my_project.__main__:main'
 ]
 },
 zip_safe=False)
  • entry_points and scripts are very useful, they allow your application to be run directly from the command line like any other program.

 

MANIFEST.in

In order to include non-python files in the package, such ad docs, these must be added to the manifest.

 

README.rst or .md

Maybe the first non-py file to add is always the README. You can write it using md or rst. Both are extensively supported.

 

.gitignore

Usually you want to use Version Control for source files and exclude generated files, so a .gitignore or applicable file is needed.

Work with Version Control like a boss in 5min

# Files to exclude:
# Compiled python modules.
*.pyc

# Setuptools distribution folder.
/dist/

# Python egg metadata, regenerated from source files by setuptools.
/*.egg-info

# Python cache
*__pycache__*

 

 

 

 

Uploading to PyPi (enable anyone to “pip install yourpackage”)

Create .pypirc in your $HOME path:

[distutils]
index-servers =
 pypi
 pypitest

[pypi]
username=yourusername
password=yourpassword

Create setup.cfg in the same folder as setup.py:

[metadata]
description-file = ./README.md

Provide a download_url in setup.py (likely this will point to a GitHub tag .tar.gz).

Finally, to upload the package run:

python setup.py sdist
twine upload dist/*

 

 

 

 

 

 

Build a REST API

Flask provides a framework that you can start using in minutes. There are apps that extend it’s functionality. The basic steps are:

# Define routes, methods and callback functions
@app.route('/datamanager/<int:dataset_id>', methods=['GET'])
def get_data(dataset_id):

# Start the server
app.run(debug=True)

Use curl for debugging.  (ForWindows, download Win32 – Generic zip).

If you want to debug using Matlab as a client, it can be as simple as:

addpath ../../../../matlab-json
json.startup

api = 'http://localhost:5000/ticker';
S = webread(api)

 

 

 

 

Async (concurrent) code

Async is not parallel. All the code will run in a single thread.  However, it is a handy trade-off to quickly deploy code with co-routines  which take different times to complete. An async method, when executed, returns a coroutine which can then be awaited.

To create a coroutine, we:

  1. Create a future (task in this case).
  2. Schedule it to run wit the event loop.
  3. When it reaches await, it returns flow control to the event loop.

Useful functions:

  • await asyncio.sleep: non-blocking wait that allows context switch.
  • To schedule a coroutine, we use create_task , then we group task that should run “at the same time” and wait  for them.
  • To stop execution until the wait finishes, we run_until_complete from our event-loop object.

 

 

 

 

Threading

For parallel execution among different CPU cores, real threads are used instead of async libraries.

To create a thread, we inherit the class, and redefine the run() method. To launch it, call thread.start()

class save_ohlcv(threading.Thread):
  def __init__(self, symbol, timeframe):
    threading.Thread.__init__(self)
    self.symbol = symbol
    self.timeframe = timeframe

def run(self):
 print('Started fetcher for ' + self.symbol +' '+ self.timeframe)
 return

The thread will run until run() returns. Alternatively, you can force it to stop from outside the thread with

thread._Thread_stop()

In a more compact way, you can run a function as a thread with:

thread = threading.Thread(target = functionargs = (arg0,) )

Synchronization:

  • To wait until a thread exits use  thread_to_wait_for.join([timeout])
  • You can set thread.daemon = True to allow it to run in background even after main exits.

Resource sharing:

  • Mutex lock: This is an exception-safe way to use a mutex.
    lock.acquire()
    try:
        ... access shared resource
    finally:
        lock.release()

    Or in a more compact way:

    resource_lock = threading.Lock()
  • with resource_lock:
        do something with resource

    Avoid locking a resource inside another, this can lead to Deadlocks.

  • Use a Reentrant lock threading.RLock() to acquire several times inside one thread. You must release an equal amount of times.
  • Semaphores threading.semaphore(n)  limit the number of threads accessing a resource and can also be used to signal other threads, for example, to wake them up. Used like this, you can easily implement a producer-consumer.
  • Events can be awaited until a thread performs event.set()
  • Condition locks allow a signaling towards one or more threads that have already acquired the lock, using notify() and notify_all() for a particular state they need.

 

 

 

 

Queues

This is an excellent mechanism because it allows for multiple producers and multiple consumers to run synchronized. Consumers wait for items in the FIFO with

item = q.get()

Whereas producers insert items to be processed with:

 q.put(item)

To wait for completion and kill consumer threads, you can use

# block until all tasks are done
q.join()

# stop workers
for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()

 

 

A note on performance:

In Python, only one thread gets to use the Global Interpreter Lock at a time. That means they will all compete for it an performance can be greatly reduced if the tasks are CPU bound (as opposed to IO bound).

  • Once a thread’s task becomes truly intensive, the best way to implement it is via C/C++ extensions.
  • You can run several instances of the Python interpreter and communicate them via pipes, sockets or FIFOs, all of them are built-in.  Pickle is a great to transfer everything, between instances, variables, objects, even user defined classes.

Rule of thumb: The more IO, the more positive impact threading can have.

 

 

 

 

 

Sockets

The same universal procedures apply for sockets in python. These are some python3-ready snippets to serve as a base for building functionality.

In order to allow multiple clients, an independent client-attention thread is created per connection to the server:

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
try:
 s.bind((HOST, PORT))
 print('Socket bind complete')
except socket.error as msg:
 print('Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1])
 sys.exit()
 
 
s.listen(CLIENTS_WAITING_MAX)

client_threads = []
while 1:
 conn, addr = s.accept() # blocks until new connection
 print('Connected with ' + addr[0] + ':' + str(addr[1]))
 
 client_threads.append( client_attention_thread(conn) )
 client_threads[-1].start()

Thread attention class:

class client_attention_thread(threading.Thread):
 def __init__(self, conn):
 threading.Thread.__init__(self)
 
 def run(self):
 i = 0
 while True:
 time.sleep(1)
 conn.sendall(str(i).encode('ascii'))

If you need to serve different services, you can create a series of server threads which in turn will create their correspondent client-attn threads:

port = SUBS_PORT_BASE

available = 0
 while available == 0:
 available = 1
 for used_type, used_tuple in active_subscriptions.items():
  if port == used_tuple[1]:
   available = 0
   port += 1
   break

active_subscriptions[stream_id] = (SUBS_HOST, port)

subscription_threads[stream_id] = subscription_thread(stream_id, stream_type, stream_params, SUBS_HOST, port)
 subscription_threads[stream_id].start()

 

 

 

 

 

Databases

MongoDB

Super-flexible and fast to prototype. You do not need to pre-format data nor tables. Just save jsons!

connection = pymongo.MongoClient(ab123456.mlab.com, 123456)
db = connection[databasename]
db.authenticate(database_user, database_pass)

Remember to change the ‘_id’ key for each new entry:

new_row = {}
for candle in ohlcvs:
 new_row['_id'] = candle[0]
 new_row['date8061'] = candle[0]
 new_row['open'] = candle[1]

try:
 collection.insert_one(new_row)
except pymongo.errors.DuplicateKeyError as e:
 print("ERR Duplicate Key".

Insert several docs at once

try:
 insertion_result = collection.insert_many(new_documents, ordered = False ) # this way, repeated _id(s) are omitted, unique ones are inserted
 filled = len(insertion_result.inserted_ids)
except pymongo.errors.BulkWriteError as ex:
 filled = ex.details['nInserted']

Find number of matching documents in a collection

collection.find({"key":"value"}).count()

 

 

ORM – Object Relational Mapping

To make it easier to work with database documents, ORMs create an structure based on objet-oriented programming and using its properties and methods, you can abstract database accesses.

The easiest-to-use MongoDB ORM is dictiORM.

Using it you can forget about the database and use entries like simple Python variables.

Create a variable connected to your database like this:

user_info = dictiorm.Document(collection, unique_identifier)

And now you can just use it like a dictionary and everything will be updated and saved in the database:

user_info['gender'] Read a field that was stored in the database. user_info['age'] = 26 Change the value of an existing field or create a new one. user_info.pop('age', None) Remove a field and get it's value. list(user_info) Convert into a list. user_info.keys() Get the name of all the fields.

 

 

 

Documenting your code

If your code is pythonic, it should be readable, we all agree. Nevertheless, proper documentation is a huge help when others want to work with your Python modules. We can document the code by placing docstrings, and later use a documentation-generation tool to extract and format all this information into beautiful navigable docs.

This is a great teamplate, from Google, where you can see how to apply docstrings to every type of construct:


 

Exceptions

Even programs with proper syntax and operation can crash due to unhandled exceptions. During debugging, this is a useful snippet to catch them:

try:
  insertion_result = collection.insert_many(new_documents) 
except Exception as ex:
  print(sys.exc_info()[0])

Leaving except Exception is a very bad idea since it will catch undesired exceptions too. You should always handle the exception returned by the above snippet specifically:

try:
  insertion_result = collection.insert_many(new_documents) 
except pymongo.errors.BulkWriteError as ex:
  ...take care of the exception...

 

 

 

 

 

 

 

 

Telegram Bots and Clients

It’s very easy to interface Telegram and you can build tons of functionality to get updates on your pocket.

This bot example from python-telegram-bot is a good starting point for any function:

import logging
import json
from pkg_resources import resource_filename
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters

logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
 level=logging.INFO)

bot_token_route = resource_filename('orbbit', 'UserInterface/telegram_bot/bot_token.key')
with open(bot_token_route) as f:
 bot_token_key = json.load(f)

updater = Updater(token = bot_token_key['token'])
dispatcher = updater.dispatcher

def start(bot, update):
 bot.send_message(chat_id=update.message.chat_id, text="Dame amorsito.")

start_handler = CommandHandler('start', start)
dispatcher.add_handler(start_handler)


def echo(bot, update):
 bot.send_message(chat_id=update.message.chat_id, text=update.message.text)

echo_handler = MessageHandler(Filters.text, echo)
dispatcher.add_handler(echo_handler)


def caps(bot, update, args):
 text_caps = ' '.join(args).upper()
 bot.send_message(chat_id=update.message.chat_id, text=text_caps)

caps_handler = CommandHandler('caps', caps, pass_args=True)
dispatcher.add_handler(caps_handler)

updater.start_polling()

 

To create a client for a regular account, these are the steps:

  1. Login and enter API development tools
  2.  Fill the basic details
  3. Install Telethon
    pip install telethon
  4. Change this example replacing the keys with the info generated for your app.
    Run it once. You will receive the code in the Telegram app. Note it down and put it in the code.
    # Forward new messages from a given channel to another
    
    import time
    from telethon import TelegramClient
    from telethon.tl.functions.messages import ForwardMessagesRequest
    from telethon.tl.types import PeerUser, PeerChat, PeerChannel
    
    #-----------------------------------------------------------------------------
    # Fill this section before running:
    #-----------------------------------------------------------------------------
    #%% constants
    from_id = -1004444444444
    to_id = 'jolie'
    
    #%% api keys
    api_id = 12345
    api_hash = '0123456789abcdef0123456789abcdef'
    phone = '+34600000000'
    #-----------------------------------------------------------------------------
    
    
    # connection
    client = TelegramClient('session_name', api_id, api_hash)
    client.connect()
    
    if not client.is_user_authorized():
     client.sign_in(phone=phone)
     code = input("Code received in the app: ")
     me = client.sign_in(code=code) # Put whatever code you received here.
    
    
    #%% get entities
    origin = client.get_entity(from_id)
    destination = client.get_entity(to_id)
    
    #%% run
    client.send_message(destination, 'Hi, I will forward every new message to you.')
    
    
    previous_messages = client.get_message_history(from_id)
    
    
    while True:
     new_messages = []
    
    current_messages = client.get_message_history(from_id)
     for message in current_messages:
     if not any( [message.id == prev_msg.id for prev_msg in previous_messages] ):
     # if not message in previous_messages:
     new_messages.append(message)
    
    if new_messages != []:
     client(ForwardMessagesRequest(
     from_peer=origin, # who sent these messages?
     id=[msg.id for msg in new_messages], # which are the messages?
     to_peer=destination # who are we forwarding them to?
     ))
    
    previous_messages = current_messages
    time.sleep(15)
  5. Now you can start moving messages referencing them by message.id

 

 

 

 

 

Task scheduling

To distribute work load or process async requests for better web server scalability, you can use a scheduler and any number of workers to execute the tasks. This is how you use Celery, the most popular one, with RabbitMQ as the message broker for a Django project.

  1. Install RabbitMQ (instructions depend on your OS)
  2. Install Celery
    pip install celery
  3. Enable management
    rabbitmq-plugins enable rabbitmq_management

    If under Windows, commands should be run in C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.6\sbin or equivalent route.
    The service must be kept running.

  4. Check out the (neat) control panel at http://localhost:15672/
    Username: guest Password: guest
  5. Setup a real user
    $ sudo rabbitmqctl add_user myuser mypassword
    
    $ sudo rabbitmqctl add_vhost myvhost
    
    $ sudo rabbitmqctl set_user_tags myuser mytag
    
    $ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
  6. Your broker url (needed in Django config) will be
    'amqp://myuser:mypassword@localhost:5672/myvhost'
  7. Start
    $ sudo rabbitmq-server

    (or run it in the background)

    $ sudo rabbitmq-server -detached

    Stop

    $ sudo rabbitmqctl stop

    Never use kill to stop it!

  8. Django Integration

    settings.py    Add:

    CELERY_BROKER_URL = 'amqp://localhost'


    celery.py    
    Create this file along your settings file.

    import os
    from celery import Celery
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')
    
    app = Celery('mysite')
    app.config_from_object('django.conf:settings', namespace='CELERY')
    app.autodiscover_tasks()


    __init__.py    
    Add:

    from .celery import app as celery_app
    
    __all__ = ['celery_app']

    tasks.py    Create a new file or add this functions with the appropiate decorators to your existing files.

    from celery import shared_task
    
    @shared_task
    def test_shared_task(times):
     for i in range(times):
     i = i + 1
     return 'DONE ' + str(i)
  9. Now you can call the decorated function from anywhere, for example a view:
    import tasks
    ...
    tasks.test_shared_task.delay(10000)

    Here is where the magic happens! Calling the function through the .delay method makes it run in the task scheduler. It may be run in the same machine by another process, or even in a different remote machine!

  10. Start a worker that will process the queued tasks. Run this in the folder above yourproject folder:
    celery -A yourproject worker -l info

    If you have problems like

    In some recent Windows versions you may need to

    pip install eventlet

    And run it as:

    celery -A yourproject worker -l info -P eventlet

 

Enjoy! See how the worker has processed the task:

 

 

Periodic task scheduling

Celery has another process called beat. With it, you can add periodic task execution. This is pretty awesome because:

  • Tasks will be sent to the queue for a worker to process them.
  • It’s super flexible in terms of how to program execution ( intervals, defined hours, dawn, sunset… ).

To enable them, just add the desired tasks and intervals using add_periodic_task()

celery.py

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
 sender.add_periodic_task(5.0, test.s('Every 5s.'), name='add every 10')

 sender.add_periodic_task(10.0, test.s('Every 10s.'), expires=10)

 sender.add_periodic_task(
 crontab(hour=21, minute=20, day_of_week=4),
 test.s('Happy Thursday!'),
 )

@app.task
def test(arg):
 print(arg)

or adding:

app.conf.beat_schedule = {
 'add-every-30-seconds': {
 'task': 'tasks.test',
 'schedule': 30.0,
 'args': ['test conf',]
 },
}

 

Start the beat process, which will handle the periodic tasks. It must be run in a separate process, in parallel with a worker:

$ celery -A proj beat

 

The tasks will be automatically processed by the worker that you previously started!