add cli app and fix relationship typos

This commit is contained in:
Thomas Peetz
2025-01-13 22:54:25 +01:00
committed by Thomas Peetz
parent 89b7b87b8c
commit 54bc17ee7d
54 changed files with 1138 additions and 5 deletions
+105
View File
@@ -0,0 +1,105 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
coverage-report/
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
+5
View File
@@ -0,0 +1,5 @@
# Kontor Change History
## 0.0.1
Initial release.
+10
View File
@@ -0,0 +1,10 @@
FROM python:3.9-alpine
LABEL MAINTAINER="Thomas Peetz <thomas.peetz@thpeetz.de>"
ENV PS1="\[\e[0;33m\]|> kontor <| \[\e[1;35m\]\W\[\e[0m\] \[\e[0m\]# "
WORKDIR /src
COPY . /src
RUN pip install --no-cache-dir -r requirements.txt \
&& python setup.py install
WORKDIR /
ENTRYPOINT ["kontor"]
+1
View File
@@ -0,0 +1 @@
+5
View File
@@ -0,0 +1,5 @@
recursive-include *.py
include setup.cfg
include README.md CHANGELOG.md LICENSE.md
include *.txt
recursive-include kontor/templates *
+31
View File
@@ -0,0 +1,31 @@
.PHONY: clean virtualenv test docker dist dist-upload
clean:
find . -name '*.py[co]' -delete
virtualenv:
virtualenv --prompt '|> kontor <| ' env
env/bin/pip install -r requirements-dev.txt
env/bin/python setup.py develop
@echo
@echo "VirtualENV Setup Complete. Now run: source env/bin/activate"
@echo
test:
python -m pytest \
-v \
--cov=kontor \
--cov-report=term \
--cov-report=html:coverage-report \
tests/
docker: clean
docker build -t kontor:latest .
dist: clean
rm -rf dist/*
python setup.py sdist
python setup.py bdist_wheel
dist-upload:
twine upload dist/*
+69
View File
@@ -0,0 +1,69 @@
# Kontor CLI Tool
## Installation
```
$ pip install -r requirements.txt
$ python setup.py install
```
## Development
This project includes a number of helpers in the `Makefile` to streamline common development tasks.
### Environment Setup
The following demonstrates setting up and working with a development environment:
```
### create a virtualenv for development
$ make virtualenv
$ source env/bin/activate
### run kontor cli application
$ kontor --help
### run pytest / coverage
$ make test
```
### Releasing to PyPi
Before releasing to PyPi, you must configure your login credentials:
**~/.pypirc**:
```
[pypi]
username = YOUR_USERNAME
password = YOUR_PASSWORD
```
Then use the included helper function via the `Makefile`:
```
$ make dist
$ make dist-upload
```
## Deployments
### Docker
Included is a basic `Dockerfile` for building and distributing `Kontor`,
and can be built with the included `make` helper:
```
$ make docker
$ docker run -it kontor --help
```
+46
View File
@@ -0,0 +1,46 @@
### Kontor Configuration Settings
---
kontor:
### Toggle application level debug (does not toggle framework debugging)
# debug: false
### Where external (third-party) plugins are loaded from
# plugin_dir: /var/lib/kontor/plugins/
### Where all plugin configurations are loaded from
# plugin_config_dir: /etc/kontor/plugins.d/
### Where external templates are loaded from
# template_dir: /var/lib/kontor/templates/
### The log handler label
# log_handler: colorlog
### The output handler label
# output_handler: jinja2
### sample foo option
# foo: bar
log.colorlog:
### Where the log file lives (no log file by default)
# file: null
### The level for which to log. One of: info, warning, error, fatal, debug
# level: info
### Whether or not to log to console
# to_console: true
### Whether or not to rotate the log file when it reaches `max_bytes`
# rotate: false
### Max size in bytes that a log file can grow until it is rotated.
# max_bytes: 512000
### The maximum number of log files to maintain when rotating
# max_files: 4
View File
View File
+60
View File
@@ -0,0 +1,60 @@
from cement import Controller, ex
from cement.utils.version import get_version_banner
from ..core.version import get_version
VERSION_BANNER = """
Kontor CLI Tool %s
%s
""" % (get_version(), get_version_banner())
class CliBase(Controller):
class Meta:
label = 'base'
# text displayed at the top of --help output
description = 'Kontor CLI Tool'
# text displayed at the bottom of --help output
epilog = 'Usage: kontor command1 --foo bar'
# controller level arguments. ex: 'kontor --version'
arguments = [
### add a version banner
( [ '-v', '--version' ],
{ 'action' : 'version',
'version' : VERSION_BANNER } ),
]
def _default(self):
"""Default action if no sub-command is passed."""
self.app.args.print_help()
@ex(
help='example sub command1',
# sub-command level arguments. ex: 'kontor command1 --foo bar'
arguments=[
### add a sample foo option under subcommand namespace
( [ '-f', '--foo' ],
{ 'help' : 'notorious foo option',
'action' : 'store',
'dest' : 'foo' } ),
],
)
def command1(self):
"""Example sub-command."""
data = {
'foo' : 'bar',
}
### do something with arguments
if self.app.pargs.foo is not None:
data['foo'] = self.app.pargs.foo
self.app.render(data, 'command1.jinja2')
View File
+4
View File
@@ -0,0 +1,4 @@
class KontorError(Exception):
"""Generic errors."""
pass
+7
View File
@@ -0,0 +1,7 @@
from cement.utils.version import get_version as cement_get_version
VERSION = (0, 0, 1, 'alpha', 0)
def get_version(version=VERSION):
return cement_get_version(version)
+183
View File
@@ -0,0 +1,183 @@
import json
from datetime import datetime
from pathlib import Path
import mariadb
from sqlalchemy import create_engine, select, text, MetaData, join
from sqlalchemy.orm import DeclarativeBase, relationship, sessionmaker
from .base import Base
from .comic import Comic
from .metadata import MetaDataTable, MetaDataColumn
class KontorDB:
def __init__(self, db_config):
self.db_conn = mariadb.connect(
host=db_config['mariadb']['host'],
port=db_config['mariadb']['port'],
user=db_config['mariadb']['user'],
password=db_config['mariadb']['password'],
database=db_config['mariadb']['database']
)
connect_string = ('mariadb+mariadbconnector://{}:{}@{}:{}/{}'
.format(
db_config['mariadb']['user'],
db_config['mariadb']['password'],
db_config['mariadb']['host'],
db_config['mariadb']['port'],
db_config['mariadb']['database']))
# engine = create_engine(connect_string, echo=True)
engine = create_engine(connect_string)
Base.metadata.create_all(bind=engine)
__session__ = sessionmaker(bind=engine)
self.session = __session__()
def get_table_names(self) -> list:
tables = self.session.query(MetaDataTable).all()
result = [table.table_name for table in tables]
return result
def get_column_meta_data(self, table_name: str, view_only=True) -> dict:
meta_data = {}
order = 0
if view_only:
for (_, column) in (self.session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).
filter(MetaDataColumn.is_shown == 1).all()):
meta_data[order] = {'column': column.column_name, 'label': column.column_label,
'order': column.column_order, 'ref_column': column.ref_column}
order += 1
else:
for (_, column) in (self.session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).all()):
meta_data[order] = {
'column': column.column_name,
'order': column.column_order,
'ref_column': column.ref_column
}
order += 1
return meta_data
def get_filters(self, table_name):
_filter_map = {}
for (_, column) in (self.session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).
filter(MetaDataColumn.show_filter == 1).all()):
_filter_map[column.column_name] = {'label': column.filter_label, 'widget': None}
print(f"retrieved {len(_filter_map)} filters: {_filter_map}")
return _filter_map
def data(self, table, columns: dict, filters) -> list:
data = []
entries = []
if len(filters) == 0:
entries = self.session.query(table).all()
else:
entries = self.session.query(table).filter_by(**filters)
for entry in entries:
row = []
for order in columns.keys():
column_name = columns[order]['column']
if str(column_name).endswith("_id"):
ref_table = column_name[:-3]
# print(f"{ref_table=}")
ref = getattr(entry, ref_table)
value = getattr(ref, "name")
# print(f"{value=}")
row.append(value)
else:
row.append(getattr(entry, column_name))
# print(repr(row))
data.append(row)
return data
def get_data(self, table_name: str, columns: dict, where_clause: str) -> list:
data = []
cursor = self.db_conn.cursor()
cursor.execute(self.get_statement(table_name, columns, where_clause))
rows = cursor.fetchall()
print(len(rows))
for row in rows:
# print(f"KontorDB.get_data: {row}")
data.append(list(row))
cursor.close()
# print(f"KontorDB.getData: return {len(data)}")
if table_name == 'comic' and len(where_clause) == 0:
data.clear()
comics = self.session.query(Comic).all()
for item in comics:
# print(item)
row = []
for order in columns.keys():
column_name = columns[order]['column']
if str(column_name).endswith("_id"):
ref_table = column_name[:-3]
# print(f"{ref_table=}")
ref = getattr(item, ref_table)
value = getattr(ref, "name")
# print(f"{value=}")
row.append(value)
else:
row.append(getattr(item, column_name))
# print(repr(row))
data.append(row)
return data
def get_statement(self, table: str, header: dict, where_clause):
columns = ""
for index, column in header.items():
if index > 0:
columns += ", "
columns += column['column']
if len(columns) == 0:
columns = "*"
statement = f"SELECT {columns} FROM {table} {where_clause}"
print(f"{statement=}")
return statement
def export_db(self, export_type: str, export_file_name: str, export_table_list: list):
print(f"export DB to {export_file_name} as {export_type}")
db = {}
for table in export_table_list:
columns = self.get_column_meta_data(table, view_only=False)
model = Base.model_lookup_by_table_name(table)
rows = self.session.query(model).all()
entries = []
print(f"found {len(rows)} entries")
print(f"found {len(columns)} columns")
for row in rows:
print(row)
entry = {}
for order in columns:
print(columns[order])
column_name = columns[order]['column']
print(f"get value {column_name} from {row} of table {table}")
try:
value = getattr(row, column_name)
if isinstance(value, datetime):
entry[column_name] = str(value)
else:
entry[column_name] = value
except AttributeError as error:
print("could not get value")
entries.append(entry)
db[table] = entries
export_file = Path(export_file_name)
match export_type:
case "JSON":
json_dump = json.dumps(db, indent=4)
with open(export_file_name, "w") as dump_file:
dump_file.write(json_dump)
case "YAML":
export_file = Path(export_file_name)
case "SQLite":
export_file = Path(export_file_name)
case _:
print("unknown export type")
if export_file.exists():
print(f"{export_file} exists")
+20
View File
@@ -0,0 +1,20 @@
from sqlalchemy import Column, String, DateTime, Integer
from sqlalchemy.orm import DeclarativeBase, relationship, sessionmaker, declarative_base
# class Base(DeclarativeBase):
# pass
class BaseModel:
@classmethod
def model_lookup_by_table_name(cls, table_name):
registry_instance = getattr(cls, "registry")
for mapper_ in registry_instance.mappers:
model = mapper_.class_
model_class_name = model.__tablename__
if model_class_name == table_name:
return model
Base = declarative_base(cls=BaseModel)
+130
View File
@@ -0,0 +1,130 @@
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from .base import Base
class Publisher(Base):
__tablename__ = "publisher"
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
name = Column(String(length=255), unique=True)
comics = relationship("Comic")
def __repr__(self):
return f'Publisher({self.id} {self.name})'
def __str__(self):
return self.__repr__()
class Comic(Base):
__tablename__ = 'comic'
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
title = Column(String(length=255), unique=True)
publisher_id = Column(String, ForeignKey('publisher.id'), nullable=False)
publisher = relationship("Publisher", back_populates="comics")
current_order = Column(BIT(1))
completed = Column(BIT(1))
issues = relationship("Issue")
story_arcs = relationship("StoryArc")
trade_paperbacks = relationship("TradePaperback")
volumes = relationship("Volume")
comic_works = relationship("ComicWork")
def __repr__(self):
return f'Comic({self.id} {self.version} {self.title} {self.publisher.name})'
def __str__(self):
return f'{self.title}({self.id})'
class Volume(Base):
__tablename__ = "volume"
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
name = Column(String(length=255), nullable=False)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="volumes")
issues = relationship("Issue")
class TradePaperback(Base):
__tablename__ = "trade_paperback"
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
name = Column(String(length=255), nullable=False)
issue_start = Column(Integer)
issue_end = Column(Integer)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="trade_paperbacks")
class StoryArc(Base):
__tablename__ = "story_arc"
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
name = Column(String(length=255), nullable=False)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="story_arcs")
class Issue(Base):
__tablename__ = "issue"
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
issue_number = Column(String(255))
in_stock = Column(BIT(1))
is_read = Column(BIT(1))
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="issues")
volume_id = Column(String, ForeignKey("volume.id"), nullable=True)
volume = relationship("Volume", back_populates="issues")
class Artist(Base):
__tablename__ = "artist"
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
name = Column(String(length=255), nullable=False)
comic_works = relationship("ComicWork")
class WorkType(Base):
__tablename__ = "worktype"
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
name = Column(String(length=255), nullable=False, unique=True)
comic_works = relationship("ComicWork")
class ComicWork(Base):
__tablename__ = "comic_work"
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="comic_works")
artist_id = Column(String, ForeignKey("artist.id"), nullable=False)
artist = relationship("Artist", back_populates="comic_works")
work_type_id = Column(String, ForeignKey("worktype.id"), nullable=False)
work_type = relationship("WorkType", back_populates="comic_works")
+25
View File
@@ -0,0 +1,25 @@
from sqlalchemy import Column, DateTime, Integer, String
from sqlalchemy.dialects.mysql import BIT
from .base import Base
class MediaFile(Base):
__tablename__ = 'media_file'
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
cloud_link = Column(String(255))
file_name = Column(String(255))
path = Column(String(255))
review = Column(BIT(1))
title = Column(String(255))
url = Column(String(255))
should_download = Column(BIT(1))
def __repr__(self):
return f'MediaFile({self.id} {self.title} {self.title})'
def __str__(self):
return f'{self.title}({self.id})'
+49
View File
@@ -0,0 +1,49 @@
from sqlalchemy import Column, String, ForeignKey, DateTime, Integer, Boolean
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from .base import Base
class MetaDataTable(Base):
__tablename__ = 'meta_data_table'
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
table_name = Column(String(255), unique=True)
table_columns = relationship("MetaDataColumn")
def __repr__(self):
return f'MetaDataTable({self.id} {self.table_name})'
def __str__(self):
return f'{self.table_name}({self.id})'
class MetaDataColumn(Base):
__tablename__ = 'meta_data_column'
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
column_modifier = Column(String(255), nullable=True)
column_name = Column(String(255))
column_order = Column(Integer)
column_sync_name = Column(String(255))
column_type = Column(String(255))
table_id = Column(String, ForeignKey('meta_data_table.id'))
table = relationship("MetaDataTable", back_populates="table_columns")
column_label = Column(String(255))
filter_label = Column(String(255))
is_shown = Column(BIT(1))
show_filter = Column(BIT(1))
ref_column = Column(String, nullable=True)
def __repr__(self):
if self.column_name is None:
return f'MetaDataColumn({self.id} {self.table.table_name}.__)'
else:
return f'MetaDataColumn({self.id} {self.table.table_name}.{self.column_name})'
def __str__(self):
return f'{self.column_name}({self.id})'
+131
View File
@@ -0,0 +1,131 @@
from sqlalchemy import Column, DateTime, Integer, String, ForeignKey, UniqueConstraint
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from .base import Base
class Sport(Base):
__tablename__ = "sport"
__table_args__ = (
UniqueConstraint("name"),
)
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
name = Column(String(255), nullable=False, index=True, unique=True)
teams = relationship("Team")
positions = relationship("FieldPosition")
class Team(Base):
__tablename__ = "team"
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
name = Column(String(255), nullable=False, index=True, unique=True)
short_name = Column(String(255), nullable=False, )
sport_id = Column(String, ForeignKey("sport.id"), nullable=False)
sport = relationship("Sport", back_populates="positions")
roosters = relationship("Rooster")
class FieldPosition(Base):
__tablename__ = "field_position"
__table_args__ = (
UniqueConstraint("name", "sport_id"),
UniqueConstraint("short_name", "sport_id"),
)
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
name = Column(String(255), nullable=False, index=True)
short_name = Column(String(255), nullable=False)
sport_id = Column(String, ForeignKey("sport.id"), nullable=False, index=True)
sport = relationship("Sport", back_populates="positions")
roosters = relationship("Rooster")
class Player(Base):
__tablename__ = "player"
__table_args__ = (
UniqueConstraint("first_name", "last_name"),
)
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
first_name = Column(String(255), nullable=False, index=True)
last_name = Column(String(255), nullable=False, index=True)
roosters = relationship("Rooster")
def get_full_name(self) -> str:
return f"{self.last_name}, {self.first_name}"
class Rooster(Base):
__tablename__ = "rooster"
__table_args__ = (
UniqueConstraint("year", "team_id", "player_id", "position_id"),
)
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
year = Column(Integer)
team_id = Column(String, ForeignKey("team.id"), nullable=False, index=True)
team = relationship("Team", back_populates="roosters")
player_id = Column(String, ForeignKey("player.id"), nullable=False, index=True)
player = relationship("Player", back_populates="roosters")
position_id = Column(String, ForeignKey("field_position.id"), nullable=False, index=True)
position = relationship("roosters")
cards = relationship("Card")
class Vendor(Base):
__tablename__ = "vendor"
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
name = Column(String(255), nullable=False, unique=True, index=True)
card_sets = relationship("CardSet")
cards = relationship("Card")
class CardSet(Base):
__tablename__ = "card_set"
__table_args__ = (
UniqueConstraint("name", "vendor_id"),
)
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
name = Column(String(255), index=True)
parallel_set = Column(BIT(1))
insert_set = Column(BIT(1))
vendor_id = Column(String, ForeignKey("vendor.id"), nullable=False, index=True)
vendor = relationship("Vendor", back_populates="card_sets")
cards = relationship("Card")
class Card(Base):
__tablename__ = "card"
__table_args__ = (
UniqueConstraint("card_number", "year", "vendor_id", "card_set_id"),
)
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
card_number = Column(Integer, index=True)
year = Column(Integer, index=True)
card_set_id = Column(String, ForeignKey("card_set.id"), nullable=False)
card_set = relationship("cards")
rooster_id = Column(String, ForeignKey("rooster.id"), nullable=False)
rooster = relationship("cards")
vendor_id = Column(String, ForeignKey("vendor.id"), nullable=False)
vendor = relationship("Vendor", back_populates="cards")
View File
+111
View File
@@ -0,0 +1,111 @@
from cement import App, TestApp, init_defaults
from cement.core.exc import CaughtSignal
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from .core.exc import KontorError
from .database.base import Base
from .controllers.clibase import CliBase
# configuration defaults
CONFIG = init_defaults('kontor', 'mariadb')
CONFIG['kontor']['foo'] = 'bar'
CONFIG['mariadb']['user'] = 'kontor'
CONFIG['mariadb']['password'] = 'kontor'
CONFIG['mariadb']['host'] = '127.0.0.1'
CONFIG['mariadb']['port'] = '3306'
CONFIG['mariadb']['database'] = 'kontor'
def extend_sqlalchemy(app):
app.log.info('extending kontor application with sqlalchemy')
connect_string = ('mariadb+mariadbconnector://{}:{}@{}:{}/{}'.format(
app.config.get('mariadb', 'user'),
app.config.get('mariadb', 'password'),
app.config.get('mariadb', 'host'),
app.config.get('mariadb', 'port'),
app.config.get('mariadb', 'database')
))
# engine = create_engine(connect_string, echo=True)
engine = create_engine(connect_string)
Base.metadata.create_all(bind=engine)
__session__ = sessionmaker(bind=engine)
app.extend('session', __session__())
class Kontor(App):
"""Kontor primary application."""
class Meta:
label = 'kontor'
# configuration defaults
config_defaults = CONFIG
# call sys.exit() on close
exit_on_close = True
# load additional framework extensions
extensions = [
'yaml',
'colorlog',
'jinja2',
]
# configuration handler
config_handler = 'yaml'
# configuration file suffix
config_file_suffix = '.yml'
# set the log handler
log_handler = 'colorlog'
# set the output handler
output_handler = 'jinja2'
hooks = [
('post_setup', extend_sqlalchemy),
]
# register handlers
handlers = [
CliBase
]
class KontorTest(TestApp,Kontor):
"""A sub-class of Kontor that is better suited for testing."""
class Meta:
label = 'kontor'
def main():
with Kontor() as app:
try:
app.run()
except AssertionError as e:
print('AssertionError > %s' % e.args[0])
app.exit_code = 1
if app.debug is True:
import traceback
traceback.print_exc()
except KontorError as e:
print('KontorError > %s' % e.args[0])
app.exit_code = 1
if app.debug is True:
import traceback
traceback.print_exc()
except CaughtSignal as e:
# Default Cement signals are SIGINT and SIGTERM, exit 0 (non-error)
print('\n%s' % e)
app.exit_code = 0
if __name__ == '__main__':
main()
@@ -0,0 +1,4 @@
Example Template (templates/command1.jinja2)
Foo => {{ foo }}
+8
View File
@@ -0,0 +1,8 @@
-r requirements.txt
pytest
pytest-cov
coverage
twine>=1.11.0
setuptools>=38.6.0
wheel>=0.31.0
+6
View File
@@ -0,0 +1,6 @@
cement==3.0.12
cement[jinja2]
cement[yaml]
cement[colorlog]
mariadb
sqlalchemy
View File
+28
View File
@@ -0,0 +1,28 @@
from setuptools import setup, find_packages
from kontor.core.version import get_version
VERSION = get_version()
f = open('README.md', 'r')
LONG_DESCRIPTION = f.read()
f.close()
setup(
name='kontor',
version=VERSION,
description='Kontor CLI Tool',
long_description=LONG_DESCRIPTION,
long_description_content_type='text/markdown',
author='Thomas Peetz',
author_email='thomas.peetz@thpeetz.de',
url='https://gitlab.com/tpeetz/kontor',
license='MIT',
packages=find_packages(exclude=['ez_setup', 'tests*']),
package_data={'kontor': ['templates/*']},
include_package_data=True,
entry_points="""
[console_scripts]
kontor = kontor.main:main
""",
)
+16
View File
@@ -0,0 +1,16 @@
"""
PyTest Fixtures.
"""
import pytest
from cement import fs
@pytest.fixture(scope="function")
def tmp(request):
"""
Create a `tmp` object that geneates a unique temporary directory, and file
for each test function that requires it.
"""
t = fs.Tmp()
yield t
t.remove()
+36
View File
@@ -0,0 +1,36 @@
from pytest import raises
from kontor.main import KontorTest
def test_kontor():
# test kontor without any subcommands or arguments
with KontorTest() as app:
app.run()
assert app.exit_code == 0
def test_kontor_debug():
# test that debug mode is functional
argv = ['--debug']
with KontorTest(argv=argv) as app:
app.run()
assert app.debug is True
def test_command1():
# test command1 without arguments
argv = ['command1']
with KontorTest(argv=argv) as app:
app.run()
data,output = app.last_rendered
assert data['foo'] == 'bar'
assert output.find('Foo => bar')
# test command1 with arguments
argv = ['command1', '--foo', 'not-bar']
with KontorTest(argv=argv) as app:
app.run()
data,output = app.last_rendered
assert data['foo'] == 'not-bar'
assert output.find('Foo => not-bar')