From workbench setup to threading and CPU load distribution, passing through database management and package creation. A summarized set of useful snippets to offload your memory and come back and check when needed.
Environment setup
Set up a Virtual Environment
Virtual environments allow working with different versions of packages in different projects on one machine, making them more portable.
Installation
Add the virtualenv package using pip. If you need to use pip behind a proxy, use:
pip --proxy http://user:[email protected]:proxyPort install ...
Windows needs some special instructions:
pip install virtualenv pip install virtualenvwrapper-win
Create the virtualenv
In your work folder, run:
virtualenv VIRTUAL_ENV_NAME route_to_work_dir/VIRTUAL_ENV_NAME/Scripts/activate
or, to create it directly in your_user/Envs
mkvirtualenv VIRTUAL_ENV_NAME
cd into your dev folder and run
setprojectdir .
When you are done, issue deactivate
Every time you want to work on this virtual env, just type workon $VIRTUAL_ENV
A MATLAB-like IDE for Python: Spyder
You can have it running in a minute, absolutely free.
pip install spyder
and then, open it with
python -m spyder.app.start
Anaconda
If you use Anaconda, the commands for virtual envs are similar:
conda create -n myenv anaconda spyder
Then, anytime you want to work on it, type:
activate myenv
To launch a Matlab-like IDE for python, activate your virtual env and run spyder.
activate orb_conda spyder
If you work behind a proxy, set the environment variables to:
http_proxy: http://username:[email protected]:8080 https_proxy: https://username:[email protected]:8080
Inside a conda environment, you can use both conda install and pip install. In case the command conda does not work, you have to add it to your PATH.
Comment blocks in Spyder are great for development and testing, they are separated with #%%. You can run a block with CTRL+ENTER like in Matlab.
Creating a package
__init__.py
If you plan to package an distribute your scripts, this file should be in the root of your sources dir ( /my_proj/packagename/__init__.py ). It lists all the methods that are available to use by the script that imports this package:
__main__.py
It is very important for packages to be run in __main__.py, this allows to execute the main code only when the module is run with -m
. Otherwise, it, would be run every time a module simply imports the package, which may lead to undesired behavior.
def main(): pass if __name__ == "__main__": main()
setup.py
This is the file used by the packaging app. In it, you can add multiple attributes. One of the most important is the install_requires. At the time of installing your package, all the packages listed here will be installed too.
from setuptools import setup def readme(): with open('README.rst') as f: return f.read() setup(name='orbbit', version='0.1', description='A language-independent API for cryptocurrency trading robots.', long_description=readme(), classifiers=[ 'Development Status :: 3 - Alpha', 'Programming Language :: Python :: 3', ], keywords='cryptocurrency trading robots', url='https://github.com/user/repo', author='The best people.', author_email='[email protected]', license='Closed source', packages=find_packages(), include_package_data=True, install_requires=[ 'datetime', ], scripts=['bin/start_hi'], entry_points={ 'console_scripts': [ 'start_main = my_project.__main__:main' ] }, zip_safe=False)
- entry_points and scripts are very useful, they allow your application to be run directly from the command line like any other program.
MANIFEST.in
In order to include non-python files in the package, such ad docs, these must be added to the manifest.
README.rst or .md
Maybe the first non-py file to add is always the README. You can write it using md or rst. Both are extensively supported.
.gitignore
Usually you want to use Version Control for source files and exclude generated files, so a .gitignore or applicable file is needed.
# Files to exclude: # Compiled python modules. *.pyc # Setuptools distribution folder. /dist/ # Python egg metadata, regenerated from source files by setuptools. /*.egg-info # Python cache *__pycache__*
Uploading to PyPi (enable anyone to “pip install yourpackage”)
Create .pypirc in your $HOME path:
[distutils] index-servers = pypi pypitest [pypi] username=yourusername password=yourpassword
Create setup.cfg in the same folder as setup.py:
[metadata] description-file = ./README.md
Provide a download_url in setup.py (likely this will point to a GitHub tag .tar.gz).
Finally, to upload the package run:
python setup.py sdist
twine upload dist/*
Build a REST API
Flask provides a framework that you can start using in minutes. There are apps that extend it’s functionality. The basic steps are:
# Define routes, methods and callback functions @app.route('/datamanager/<int:dataset_id>', methods=['GET']) def get_data(dataset_id): # Start the server app.run(debug=True)
Use curl for debugging. (ForWindows, download Win32 – Generic zip).
If you want to debug using Matlab as a client, it can be as simple as:
addpath ../../../../matlab-json json.startup api = 'http://localhost:5000/ticker'; S = webread(api)
Async (concurrent) code
Async is not parallel. All the code will run in a single thread. However, it is a handy trade-off to quickly deploy code with co-routines which take different times to complete. An async method, when executed, returns a coroutine which can then be awaited.
To create a coroutine, we:
- Create a future (task in this case).
- Schedule it to run wit the event loop.
- When it reaches await, it returns flow control to the event loop.
Useful functions:
- await asyncio.sleep: non-blocking wait that allows context switch.
- To schedule a coroutine, we use create_task , then we group task that should run “at the same time” and wait for them.
- To stop execution until the wait finishes, we run_until_complete from our event-loop object.
Threading
For parallel execution among different CPU cores, real threads are used instead of async libraries.
To create a thread, we inherit the class, and redefine the run() method. To launch it, call thread.start()
class save_ohlcv(threading.Thread): def __init__(self, symbol, timeframe): threading.Thread.__init__(self) self.symbol = symbol self.timeframe = timeframe def run(self): print('Started fetcher for ' + self.symbol +' '+ self.timeframe) return
The thread will run until run() returns. Alternatively, you can force it to stop from outside the thread with
thread._Thread_stop()
In a more compact way, you can run a function as a thread with:
thread = threading.Thread(target = function, args = (arg0,) )
Synchronization:
- To wait until a thread exits use thread_to_wait_for.join([timeout])
- You can set thread.daemon = True to allow it to run in background even after main exits.
Resource sharing:
- Mutex lock: This is an exception-safe way to use a mutex.
lock.acquire() try: ... access shared resource finally: lock.release()
Or in a more compact way:
resource_lock = threading.Lock()
-
with resource_lock: do something with resource
Avoid locking a resource inside another, this can lead to Deadlocks.
- Use a Reentrant lock threading.RLock() to acquire several times inside one thread. You must release an equal amount of times.
- Semaphores threading.semaphore(n) limit the number of threads accessing a resource and can also be used to signal other threads, for example, to wake them up. Used like this, you can easily implement a producer-consumer.
- Events can be awaited until a thread performs event.set()
- Condition locks allow a signaling towards one or more threads that have already acquired the lock, using notify() and notify_all() for a particular state they need.
Queues
This is an excellent mechanism because it allows for multiple producers and multiple consumers to run synchronized. Consumers wait for items in the FIFO with
item = q.get()
Whereas producers insert items to be processed with:
q.put(item)
To wait for completion and kill consumer threads, you can use
# block until all tasks are done q.join() # stop workers for i in range(num_worker_threads): q.put(None) for t in threads: t.join()
A note on performance:
In Python, only one thread gets to use the Global Interpreter Lock at a time. That means they will all compete for it an performance can be greatly reduced if the tasks are CPU bound (as opposed to IO bound).
- Once a thread’s task becomes truly intensive, the best way to implement it is via C/C++ extensions.
- You can run several instances of the Python interpreter and communicate them via pipes, sockets or FIFOs, all of them are built-in. Pickle is a great to transfer everything, between instances, variables, objects, even user defined classes.
Rule of thumb: The more IO, the more positive impact threading can have.
Sockets
The same universal procedures apply for sockets in python. These are some python3-ready snippets to serve as a base for building functionality.
In order to allow multiple clients, an independent client-attention thread is created per connection to the server:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.bind((HOST, PORT)) print('Socket bind complete') except socket.error as msg: print('Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]) sys.exit() s.listen(CLIENTS_WAITING_MAX) client_threads = [] while 1: conn, addr = s.accept() # blocks until new connection print('Connected with ' + addr[0] + ':' + str(addr[1])) client_threads.append( client_attention_thread(conn) ) client_threads[-1].start()
Thread attention class:
class client_attention_thread(threading.Thread): def __init__(self, conn): threading.Thread.__init__(self) def run(self): i = 0 while True: time.sleep(1) conn.sendall(str(i).encode('ascii'))
If you need to serve different services, you can create a series of server threads which in turn will create their correspondent client-attn threads:
port = SUBS_PORT_BASE available = 0 while available == 0: available = 1 for used_type, used_tuple in active_subscriptions.items(): if port == used_tuple[1]: available = 0 port += 1 break active_subscriptions[stream_id] = (SUBS_HOST, port) subscription_threads[stream_id] = subscription_thread(stream_id, stream_type, stream_params, SUBS_HOST, port) subscription_threads[stream_id].start()
Databases
MongoDB
Super-flexible and fast to prototype. You do not need to pre-format data nor tables. Just save jsons!
connection = pymongo.MongoClient(ab123456.mlab.com, 123456)
db = connection[databasename]
db.authenticate(database_user, database_pass)
Remember to change the ‘_id’ key for each new entry:
new_row = {} for candle in ohlcvs: new_row['_id'] = candle[0] new_row['date8061'] = candle[0] new_row['open'] = candle[1] try: collection.insert_one(new_row) except pymongo.errors.DuplicateKeyError as e: print("ERR Duplicate Key".
Insert several docs at once
try: insertion_result = collection.insert_many(new_documents, ordered = False ) # this way, repeated _id(s) are omitted, unique ones are inserted filled = len(insertion_result.inserted_ids) except pymongo.errors.BulkWriteError as ex: filled = ex.details['nInserted']
Find number of matching documents in a collection
collection.find({"key":"value"}).count()
ORM – Object Relational Mapping
To make it easier to work with database documents, ORMs create an structure based on objet-oriented programming and using its properties and methods, you can abstract database accesses.
The easiest-to-use MongoDB ORM is dictiORM.
Using it you can forget about the database and use entries like simple Python variables.
Create a variable connected to your database like this:
user_info = dictiorm.Document(collection, unique_identifier)
And now you can just use it like a dictionary and everything will be updated and saved in the database:
user_info['gender']
Read a field that was stored in the database.user_info['age'] = 26
Change the value of an existing field or create a new one.user_info.pop('age', None)
Remove a field and get it's value.list(user_info)
Convert into a list.user_info.keys()
Get the name of all the fields.
Documenting your code
If your code is pythonic, it should be readable, we all agree. Nevertheless, proper documentation is a huge help when others want to work with your Python modules. We can document the code by placing docstrings, and later use a documentation-generation tool to extract and format all this information into beautiful navigable docs.
This is a great teamplate, from Google, where you can see how to apply docstrings to every type of construct:
Exceptions
Even programs with proper syntax and operation can crash due to unhandled exceptions. During debugging, this is a useful snippet to catch them:
try:
insertion_result = collection.insert_many(new_documents)
except Exception as ex:
print(sys.exc_info()[0])
Leaving except Exception is a very bad idea since it will catch undesired exceptions too. You should always handle the exception returned by the above snippet specifically:
try: insertion_result = collection.insert_many(new_documents) except pymongo.errors.BulkWriteError as ex: ...take care of the exception...
Telegram Bots and Clients
It’s very easy to interface Telegram and you can build tons of functionality to get updates on your pocket.
This bot example from python-telegram-bot is a good starting point for any function:
import logging import json from pkg_resources import resource_filename from telegram.ext import Updater, CommandHandler, MessageHandler, Filters logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) bot_token_route = resource_filename('orbbit', 'UserInterface/telegram_bot/bot_token.key') with open(bot_token_route) as f: bot_token_key = json.load(f) updater = Updater(token = bot_token_key['token']) dispatcher = updater.dispatcher def start(bot, update): bot.send_message(chat_id=update.message.chat_id, text="Dame amorsito.") start_handler = CommandHandler('start', start) dispatcher.add_handler(start_handler) def echo(bot, update): bot.send_message(chat_id=update.message.chat_id, text=update.message.text) echo_handler = MessageHandler(Filters.text, echo) dispatcher.add_handler(echo_handler) def caps(bot, update, args): text_caps = ' '.join(args).upper() bot.send_message(chat_id=update.message.chat_id, text=text_caps) caps_handler = CommandHandler('caps', caps, pass_args=True) dispatcher.add_handler(caps_handler) updater.start_polling()
To create a client for a regular account, these are the steps:
- Login and enter API development tools
- Fill the basic details
- Install Telethon
pip install telethon
- Change this example replacing the keys with the info generated for your app.
Run it once. You will receive the code in the Telegram app. Note it down and put it in the code.# Forward new messages from a given channel to another import time from telethon import TelegramClient from telethon.tl.functions.messages import ForwardMessagesRequest from telethon.tl.types import PeerUser, PeerChat, PeerChannel #----------------------------------------------------------------------------- # Fill this section before running: #----------------------------------------------------------------------------- #%% constants from_id = -1004444444444 to_id = 'jolie' #%% api keys api_id = 12345 api_hash = '0123456789abcdef0123456789abcdef' phone = '+34600000000' #----------------------------------------------------------------------------- # connection client = TelegramClient('session_name', api_id, api_hash) client.connect() if not client.is_user_authorized(): client.sign_in(phone=phone) code = input("Code received in the app: ") me = client.sign_in(code=code) # Put whatever code you received here. #%% get entities origin = client.get_entity(from_id) destination = client.get_entity(to_id) #%% run client.send_message(destination, 'Hi, I will forward every new message to you.') previous_messages = client.get_message_history(from_id) while True: new_messages = [] current_messages = client.get_message_history(from_id) for message in current_messages: if not any( [message.id == prev_msg.id for prev_msg in previous_messages] ): # if not message in previous_messages: new_messages.append(message) if new_messages != []: client(ForwardMessagesRequest( from_peer=origin, # who sent these messages? id=[msg.id for msg in new_messages], # which are the messages? to_peer=destination # who are we forwarding them to? )) previous_messages = current_messages time.sleep(15)
- Now you can start moving messages referencing them by message.id
Task scheduling
To distribute work load or process async requests for better web server scalability, you can use a scheduler and any number of workers to execute the tasks. This is how you use Celery, the most popular one, with RabbitMQ as the message broker for a Django project.
- Install RabbitMQ (instructions depend on your OS)
- Install Celery
pip install celery
- Enable management
rabbitmq-plugins enable rabbitmq_management
If under Windows, commands should be run in C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.6\sbin or equivalent route.
The service must be kept running. - Check out the (neat) control panel at http://localhost:15672/
Username: guest Password: guest - Setup a real user
$ sudo rabbitmqctl add_user myuser mypassword
$ sudo rabbitmqctl add_vhost myvhost
$ sudo rabbitmqctl set_user_tags myuser mytag
$ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
- Your broker url (needed in Django config) will be
'amqp://myuser:[email protected]:5672/myvhost'
- Start
$ sudo rabbitmq-server
(or run it in the background)
$ sudo rabbitmq-server -detached
Stop
$ sudo rabbitmqctl stop
Never use kill to stop it!
-
Django Integration
settings.py Add:
CELERY_BROKER_URL = 'amqp://localhost'
celery.py Create this file along your settings file.import os from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings') app = Celery('mysite') app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks()
__init__.py Add:from .celery import app as celery_app __all__ = ['celery_app']
tasks.py Create a new file or add this functions with the appropiate decorators to your existing files.
from celery import shared_task @shared_task def test_shared_task(times): for i in range(times): i = i + 1 return 'DONE ' + str(i)
- Now you can call the decorated function from anywhere, for example a view:
import tasks ... tasks.test_shared_task.delay(10000)
Here is where the magic happens! Calling the function through the .delay method makes it run in the task scheduler. It may be run in the same machine by another process, or even in a different remote machine!
- Start a worker that will process the queued tasks. Run this in the folder above yourproject folder:
celery -A yourproject worker -l info
If you have problems like
In some recent Windows versions you may need to
pip install eventlet
And run it as:
celery -A yourproject worker -l info -P eventlet
Enjoy! See how the worker has processed the task:
Periodic task scheduling
Celery has another process called beat. With it, you can add periodic task execution. This is pretty awesome because:
- Tasks will be sent to the queue for a worker to process them.
- It’s super flexible in terms of how to program execution ( intervals, defined hours, dawn, sunset… ).
To enable them, just add the desired tasks and intervals using add_periodic_task()
celery.py
@app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task(5.0, test.s('Every 5s.'), name='add every 10') sender.add_periodic_task(10.0, test.s('Every 10s.'), expires=10) sender.add_periodic_task( crontab(hour=21, minute=20, day_of_week=4), test.s('Happy Thursday!'), ) @app.task def test(arg): print(arg)
or adding:
app.conf.beat_schedule = { 'add-every-30-seconds': { 'task': 'tasks.test', 'schedule': 30.0, 'args': ['test conf',] }, }
Start the beat process, which will handle the periodic tasks. It must be run in a separate process, in parallel with a worker:
$ celery -A proj beat
The tasks will be automatically processed by the worker that you previously started!