First workink prototype.

parents
# use glob syntax
syntax: glob
*.elc
*.pyc
*~
*.db
# Emacs
eproject.cfg
# Temporary files (vim backups)
*.swp
# Log files:
*.log
# Virtualenv
venv
build
conf/conf.cfg
.coverage
Cédric Bonhomme <cedric@cedricbonhomme.org>
EV3WebController project news
0.1 (2014-12-15)
* First workink prototype.
EV3WebController
================
# Presentation
This application based on the [Flask](http://flask.pocoo.org/)
microframework provides an interface in order to control the EV3 robot.
Tested with Python 3.4 and Python 2.7.
# Contact
Cédric BOnhomme.
#! /usr/bin/env python
#-*- coding: utf-8 -*-
# ***** BEGIN LICENSE BLOCK *****
# This file is part of EV3WebController.
# Copyright (c) 2014 Cédric Bonhomme.
# All rights reserved.
#
#
#
# ***** END LICENSE BLOCK *****
"""Bootstrap
Required imports and code execution for basic functionning.
"""
import sys
if 'threading' in sys.modules:
raise Exception('threading module loaded before patching!')
import conf
import logging
#! /usr/bin/env python
#-*- coding: utf-8 -*-
# ***** BEGIN LICENSE BLOCK *****
# This file is part of EV3WebController.
# Copyright (c) 2014 Cédric Bonhomme.
# All rights reserved.
#
#
#
# ***** END LICENSE BLOCK *****
""" Program variables.
This file contain the variables used by the application.
"""
import os, sys
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
PATH = os.path.abspath(".")
ON_HEROKU = int(os.environ.get('HEROKU', 0)) == 1
DEFAULTS = {"platform_url": "http://0.0.0.0:5000",
"host": "0.0.0.0",
"port": "5000",
"https": "true",
"debug": "true"
}
if not ON_HEROKU:
try:
import configparser as confparser
except:
import ConfigParser as confparser
# load the configuration
config = confparser.SafeConfigParser(defaults=DEFAULTS)
config.read(os.path.join(BASE_DIR, "conf/conf.cfg"))
else:
class Config(object):
def get(self, _, name):
return os.environ.get(name.upper(), DEFAULTS.get(name))
def getint(self, _, name):
return int(self.get(_, name))
def getboolean(self, _, name):
value = self.get(_, name)
if value == 'true':
return True
elif value == 'false':
return False
return None
config = Config()
PATH = os.path.abspath(".")
PLATFORM_URL = config.get('misc', 'platform_url')
WEBSERVER_DEBUG = config.getboolean('webserver', 'debug')
WEBSERVER_HOST = config.get('webserver', 'host')
WEBSERVER_PORT = config.getint('webserver', 'port')
WEBSERVER_HTTPS = config.getboolean('webserver', 'https')
[misc]
platform_url = http://0.0.0.0:5000
[webserver]
debug = 1
host = 0.0.0.0
port = 5000
https = false
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# ***** BEGIN LICENSE BLOCK *****
# This file is part of EV3WebController.
# Copyright (c) 2014 Cédric Bonhomme.
# All rights reserved.
#
#
#
# ***** END LICENSE BLOCK *****
from bootstrap import conf
from web import app as application
if __name__ == "__main__":
application.run(host=conf.WEBSERVER_HOST,
port=conf.WEBSERVER_PORT,
debug=conf.WEBSERVER_DEBUG)
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# ***** BEGIN LICENSE BLOCK *****
# This file is part of EV3WebController.
# Copyright (c) 2014 Cédric Bonhomme.
# All rights reserved.
#
#
#
# ***** END LICENSE BLOCK *****
import os
from flask import Flask
#from ev3.ev3dev import Ev3Dev
from ev3.ev3dev import Key, Motor
from ev3.lego import LargeMotor
from ev3.lego import TouchSensor
from ev3.lego import InfraredSensor
import conf
# Create Flask application
app = Flask(__name__)
app.debug = True
# Create a random secrey key so we can use sessions
app.config['SECRET_KEY'] = os.urandom(12)
#Ev3Dev.__init__()
#head = None#Motor(port=Motor.PORT.A)
right_wheel = None#Motor(port=Motor.PORT.B)
left_wheel = None#Motor(port=Motor.PORT.C)
button = None#TouchSensor()
ir_sensor = None#InfraredSensor()
# Views
#from flask.ext.restful import Api
#api = Api(app, prefix='/api/v1.0')
from web import views
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# ***** BEGIN LICENSE BLOCK *****
# This file is part of EV3WebController.
# Copyright (c) 2014 Cédric Bonhomme.
# All rights reserved.
#
#
#
# ***** END LICENSE BLOCK *****
import json
from functools import wraps
from threading import Thread
from flask import Response, request, session, jsonify, current_app
from web.lib.utils import default_handler
def async(f):
"""
This decorator enables to send email in a new thread.
This prevent the server to freeze.
"""
def wrapper(*args, **kwargs):
thr = Thread(target = f, args = args, kwargs = kwargs)
thr.start()
return wrapper
def to_response(func):
"""Will cast results of func as a result, and try to extract
a status_code for the Response object"""
def wrapper(*args, **kwargs):
status_code = 200
result = func(*args, **kwargs)
if isinstance(result, Response):
return result
if isinstance(result, list) and len(result) == 1:
result = result[0]
elif isinstance(result, tuple):
result, status_code = result
return Response(json.dumps(result, default=default_handler),
status=status_code)
return wrapper
import types
import logging
# ***** BEGIN LICENSE BLOCK *****
# This file is part of EV3WebController.
# Copyright (c) 2014 Cédric Bonhomme.
# All rights reserved.
#
#
#
# ***** END LICENSE BLOCK *****
logger = logging.getLogger(__name__)
def default_handler(obj):
"""JSON handler for default query formatting"""
if hasattr(obj, 'isoformat'):
return obj.isoformat()
if hasattr(obj, 'dump'):
return obj.dump()
if isinstance(obj, (set, frozenset, types.GeneratorType)):
return list(obj)
if isinstance(obj, BaseException):
return str(obj)
raise TypeError("Object of type %s with value of %r "
"is not JSON serializable" % (type(obj), obj))
from .views import *
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# ***** BEGIN LICENSE BLOCK *****
# This file is part of EV3WebController.
# Copyright (c) 2014 Cédric Bonhomme.
# All rights reserved.
#
#
#
# ***** END LICENSE BLOCK *****
__author__ = "Cedric Bonhomme"
__version__ = "$Revision: 0.1 $"
__date__ = "$Date: 2014/12/15$"
__revision__ = "$Date: 2014/12/15 $"
__copyright__ = "Copyright (c) 2013 Cédric BOnhomme"
__license__ = ""
from flask import render_template, current_app, request, flash, session, \
url_for, redirect, g, send_from_directory, make_response, abort, Markup
from flask.ext.login import LoginManager, login_user, logout_user, \
login_required, current_user, AnonymousUserMixin
from flask.ext.principal import Principal, Identity, AnonymousIdentity, \
identity_changed, identity_loaded, Permission, RoleNeed, UserNeed
import conf
from web.decorators import to_response
from web import app
from web import right_wheel, left_wheel, button, ir_sensor
#
# Default errors
#
@app.errorhandler(404)
def page_not_found(e):
return render_template('errors/404.html'), 404
@app.errorhandler(405)
def method_not_allowed(e):
return render_template('errors/405.html'), 405
@app.errorhandler(500)
def page_not_found(e):
return render_template('errors/500.html'), 500
def redirect_url(default='profile'):
return request.args.get('next') or \
request.referrer or \
url_for(default)
#
# Management of the permissions
#
principals = Principal(app)
# Create a permission with an admin role's need
admin_permission = Permission(RoleNeed('admin'))
login_manager = LoginManager(app)
login_manager.login_view = 'login'
login_manager.login_message = 'Please log in to access this page.'
login_manager.login_message_category = 'danger'
@identity_loaded.connect_via(app)
def on_identity_loaded(sender, identity):
# Set the identity user object
g.user = current_user
identity.user = current_user
# Add the UserNeed to the identity
if hasattr(current_user, 'id'):
identity.provides.add(UserNeed(current_user.id))
# Assuming the User model has a list of roles, update the
# identity with the roles that the user provides
if hasattr(current_user, 'roles'):
for role in current_user.roles:
identity.provides.add(RoleNeed(role.name))
@app.errorhandler(403)
def authentication_failed(e):
flash('You do not have enough rights.', 'danger')
return redirect(url_for('login'))
@app.errorhandler(401)
def authentication_required(e):
flash('Authenticated required.', 'info')
return redirect(url_for('login'))
@login_manager.user_loader
def load_user(id):
# Return an instance of the User model
return models.User.objects(id=id).first()
@app.before_request
def before_request():
g.user = current_user
def log_user(user):
"""
Effectively log the user and update the identity with Flask-Principal.
"""
login_user(user)
g.user = user
session['id'] = str(user.id)
# Tell Flask-Principal the identity changed
identity_changed.send(current_app._get_current_object(),
identity=Identity(str(user.id)))
@app.route('/move/<direction>', methods=['GET'])
@app.route('/move/<direction>/<speed>', methods=['GET'])
@to_response
def move(direction="forward", speed=60):
"""
"""
result = {
"action": "move",
"direction": direction,
"message": "OK"
}
return_code = 200
if direction == 'forward':
pass#left_wheel.run_forever(speed * -1, regulation_mode=False)
#right_wheel.run_forever(speed * -1, regulation_mode=False)
elif direction == 'backward':
try:
left_wheel.run_forever(speed, regulation_mode=False)
right_wheel.run_forever(speed, regulation_mode=False)
except Exception as e:
result["message"], return_code = "error", 400
elif direction == 'left':
left_wheel.run_forever(speed, regulation_mode=False)
right_wheel.run_forever(speed * -1, regulation_mode=False)
elif direction == 'right':
left_wheel.run_forever(speed * -1, regulation_mode=False)
right_wheel.run_forever(speed, regulation_mode=False)
elif direction == 'stop':
left_wheel.stop()
right_wheel.stop()
else:
result["message"], return_code = "Unknown direction", 400
return result, return_code
@app.route('/sensor/<sensor_name>', methods=['GET'])
def sensor(sensor_name=""):
"""
"""
if sensor_name == "ir_sensor":
return {"distance": ir_sensor.prox}
elif sensor_name == "button":
pass
else:
return {"message": "Unknown sensor"}, 400
@app.route('/about', methods=['GET'])
def about():
return render_template('about.html')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment