new print ui, more robust colors

main
Fabien Benureau 9 years ago
parent d5a4fcf73c
commit cce9406670

@ -54,7 +54,6 @@ def generate_citekey(bibdata):
:raise ValueError: if no author nor editor is defined.
"""
citekey, entry = get_entry(bibdata)
author_key = 'author' if 'author' in entry else 'editor'
try:
first_author = entry[author_key][0]

@ -1,47 +1,90 @@
"""
Small code to handle colored text
"""
import sys
import re
bold = '\033[1m'
end = '\033[0m'
black = '\033[0;30m'
red = '\033[0;31m'
green = '\033[0;32m'
yellow = '\033[0;33m'
blue = '\033[0;34m'
purple = '\033[0;35m'
cyan = '\033[0;36m'
grey = '\033[0;37m'
ok = green
error = red
normal = grey
citekey = purple
filepath = bold
tag = cyan
def dye(s, color=end, bold=False):
assert color[0] == '\033'
if bold:
color = '\033[1' + color[3:]
return color + s + end
_dye = dye
def _color_supported(stream):
"""Returns True is the stream supports colors"""
if sys.platform == 'win32' and 'ANSICON' not in os.environ:
return False
if hasattr(stream, 'isatty') and stream.isatty(): # we have a tty
try:
import curses
curses.setupterm()
return curses.tigetnum('colors') >= 8
except Exception: # not picky.
return False
return False
COLOR_LIST = [u'black', u'red', u'green', u'yellow', u'blue', u'purple', u'cyan', u'grey']
def generate_colors(stream, color=True, bold=True, italic=True):
colors = {name: u'' for name in COLOR_LIST}
colors.update({u'b' +name: u'' for name in COLOR_LIST})
colors.update({u'i' +name: u'' for name in COLOR_LIST})
colors.update({u'bi'+name: u'' for name in COLOR_LIST})
colors[u'bold'] = u''
colors[u'italic'] = u''
colors[u'end'] = u''
if (color or bold or italic) and _color_supported(stream):
bold_flag, italic_flag = '', ''
if bold:
colors['bold'] = u'\x1b[1m'
bold_flag = '1;'
if italic:
colors['italic'] = u'\x1b[3m'
italic_flag = '3;'
for i, name in enumerate(COLOR_LIST):
if color:
color_flag = '3{}'.format(name)
colors[name] = u'\x1b[{}m'.format(color_flag)
colors.update({u'b'+name: u'\x1b[{}3{}m'.format(bold_flag, i) for i, name in enumerate(COLOR_LIST)})
colors.update({u'i'+name: u'\x1b[{}3{}m'.format(italic_flag, i) for i, name in enumerate(COLOR_LIST)})
colors.update({u'bi'+name: u'\x1b[{}3{}m'.format(bold_flag, italic_flag, i) for i, name in enumerate(COLOR_LIST)})
else:
if bold:
colors.update({u'b'+name: u'\x1b[{}m'.format(bold_flag, i) for i, name in enumerate(COLOR_LIST)})
if italic:
colors.update({u'i'+name: u'\x1b[{}m'.format(italic_flag, i) for i, name in enumerate(COLOR_LIST)})
if bold or italic:
colors.update({u'bi'+name: u'\x1b[{}m'.format(bold_flag, italic_flag, i) for i, name in enumerate(COLOR_LIST)})
if color or bold or italic:
colors[u'end'] = u'\x1b[0m'
return colors
COLORS_OUT = generate_colors(sys.stdout, color=True, bold=True, italic=True)
COLORS_ERR = generate_colors(sys.stderr, color=True, bold=True, italic=True)
def dye_out(s, color='end'):
return '{}{}{}'.format(COLORS_OUT[color], s, COLORS_OUT['end'])
def dye_err(s, color='end'):
return '{}{}{}'.format(COLORS_ERR[color], s, COLORS_OUT['end'])
def _nodye(s, *args, **kwargs):
return s
def setup(enable = True):
global dye
if enable:
dye = _dye
else:
dye = _nodye
def setup(color=True, bold=True, italic=True):
global COLORS_OUT, COLORS_ERR
COLORS_OUT = generate_colors(sys.stdout, color=color, bold=color, italic=color)
COLORS_ERR = generate_colors(sys.stderr, color=color, bold=color, italic=color)
# undye
undye_re = re.compile('\x1b\[[;\d]*[A-Za-z]')
def undye(s):
"""Purge string s of color"""
return undye_re.sub('', s)
# colors
ok = 'green'
error = 'red'
citekey = 'purple'
filepath = 'bold'
tag = 'cyan'

@ -128,13 +128,14 @@ def command(args):
try:
rp.push_paper(p)
if docfile is not None:
rp.push_doc(p.citekey, docfile, copy=args.copy)
rp.push_doc(p.citekey, docfile, copy=args.copy or args.move)
if args.copy:
if args.move:
content.remove_file(docfile)
elif ui.input_yn('{} has been copied into pubs; should the original be removed?'.format(color.dye(docfile, color.bold))):
content.remove_file(docfile)
ui.print_('added to pubs:\n{}'.format(pretty.paper_oneliner(p)))
# elif ui.input_yn('{} has been copied into pubs; should the original be removed?'.format(color.dye_out(docfile, 'bold'))):
# content.remove_file(docfile)
ui.print_out('added to pubs:\n{}'.format(pretty.paper_oneliner(p)))
except ValueError as v:
ui.error(v.message)
ui.exit(1)

@ -34,10 +34,14 @@ def command(args):
try:
document = args.document
rp.push_doc(paper.citekey, document, copy=args.copy)
if args.copy and args.move:
content.remove_file(document)
ui.print_('{} attached to {}'.format(color.dye(document, color.bold), color.dye(paper.citekey, color.citekey)))
if args.copy:
if args.move:
content.remove_file(document)
# else:
# if ui.input_yn('{} has been copied into pubs; should the original be removed?'.format(color.dye_out(document, 'bold'))):
# content.remove_file(document)
ui.print_out('{} attached to {}'.format(color.dye_out(document, 'bold'), color.dye_out(paper.citekey, color.citekey)))
except ValueError as v:
ui.error(v.message)

@ -36,4 +36,4 @@ def command(args):
bib[p.citekey] = p.bibdata
exporter = endecoder.EnDecoder()
bibdata_raw = exporter.encode_bibdata(bib)
ui.print_(bibdata_raw)
ui.print_out(bibdata_raw)

@ -80,7 +80,7 @@ def command(args):
ui.error('could not load entry for citekey {}.'.format(k))
else:
rp.push_paper(p)
ui.print_('{} imported'.format(color.dye(p.citekey, color.cyan)))
ui.print_out('{} imported'.format(color.dye_out(p.citekey, color.citekey)))
docfile = bibstruct.extract_docfile(p.bibdata)
if docfile is None:
ui.warning("no file for {}.".format(p.citekey))

@ -34,11 +34,10 @@ def command(args):
if check_directory(pubsdir, fail=False) and len(os.listdir(pubsdir)) > 0:
ui.error('directory {} is not empty.'.format(
color.dye(pubsdir, color.filepath)))
color.dye_err(pubsdir, color.filepath)))
ui.exit()
ui.print_('Initializing pubs in {}'.format(
color.dye(pubsdir, color.filepath)))
ui.print_out('Initializing pubs in {}'.format(color.dye_out(pubsdir, color.filepath)))
config().pubsdir = pubsdir
config().docsdir = docsdir

@ -49,7 +49,7 @@ def command(args):
else:
papers = sorted(papers, key=date_added)
if len(papers) > 0:
ui.print_('\n'.join(
ui.print_out('\n'.join(
pretty.paper_oneliner(p, citekey_only=args.citekeys)
for p in papers))

@ -31,7 +31,7 @@ def command(args):
if paper.docpath is None:
ui.error('No document associated with the entry {}.'.format(
color.dye(citekey, color.citekey)))
color.dye_err(citekey, color.citekey)))
ui.exit()
try:
@ -39,7 +39,7 @@ def command(args):
cmd = with_command.split()
cmd.append(docpath)
subprocess.Popen(cmd)
ui.print_('{} opened.'.format(color.dye(docpath, color.filepath)))
ui.print_out('{} opened.'.format(color.dye(docpath, color.filepath)))
except OSError:
ui.error("Command does not exist: %s." % with_command)
ui.exit(127)

@ -22,12 +22,12 @@ def command(args):
if force is None:
are_you_sure = (("Are you sure you want to delete paper(s) [{}]"
" (this will also delete associated documents)?")
.format(', '.join([color.dye(c, color.citekey) for c in args.citekeys])))
.format(', '.join([color.dye_out(c, color.citekey) for c in args.citekeys])))
sure = ui.input_yn(question=are_you_sure, default='n')
if force or sure:
for c in args.citekeys:
rp.remove_paper(c)
ui.print_('The paper(s) [{}] were removed'.format(', '.join([color.dye(c, color.citekey) for c in args.citekeys])))
ui.print_out('The paper(s) [{}] were removed'.format(', '.join([color.dye_out(c, color.citekey) for c in args.citekeys])))
# FIXME: print should check that removal proceeded well.
else:
ui.print_('The paper(s) [{}] were not removed'.format(', '.join([color.dye(c, color.citekey) for c in args.citekeys])))
ui.print_out('The paper(s) [{}] were *not* removed'.format(', '.join([color.dye_out(c, color.citekey) for c in args.citekeys])))

@ -83,13 +83,13 @@ def command(args):
rp = Repository(config())
if citekeyOrTag is None:
ui.print_(color.dye(' '.join(sorted(rp.get_tags())), color=color.blue))
ui.print_out(color.dye_out(' '.join(sorted(rp.get_tags())), color.tag))
else:
if rp.databroker.exists(citekeyOrTag):
p = rp.pull_paper(citekeyOrTag)
if tags == []:
ui.print_(color.dye(' '.join(sorted(p.tags)),
color=color.blue))
ui.print_out(color.dye_out(' '.join(sorted(p.tags)),
color.tag))
else:
add_tags, remove_tags = _tag_groups(_parse_tags(tags))
for tag in add_tags:
@ -108,5 +108,5 @@ def command(args):
len(p.tags.intersection(excluded)) == 0):
papers_list.append(p)
ui.print_('\n'.join(pretty.paper_oneliner(p)
for p in papers_list))
ui.print_out('\n'.join(pretty.paper_oneliner(p)
for p in papers_list))

@ -19,15 +19,15 @@ def command(args):
repo_version = int(config().version)
if repo_version == code_version:
ui.print_('Your pubs repository is up-to-date.')
ui.print_out('Your pubs repository is up-to-date.')
sys.exit(0)
elif repo_version > code_version:
ui.print_('Your repository was generated with an newer version of pubs.\n'
'You should not use pubs until you install the newest version.')
ui.print_out('Your repository was generated with an newer version of pubs.\n'
'You should not use pubs until you install the newest version.')
sys.exit(0)
else:
msg = ("You should backup the pubs directory {} before continuing."
"Continue ?").format(color.dye(config().papers_dir, color.filepath))
"Continue ?").format(color.dye_out(config().papers_dir, color.filepath))
sure = ui.input_yn(question=msg, default='n')
if not sure:
sys.exit(0)

@ -102,7 +102,7 @@ def check_content(path):
def _get_byte_url_content(path, ui=None):
if ui is not None:
ui.print_(u'dowloading {}'.format(path))
ui.print_out(u'dowloading {}'.format(path))
response = urlopen(path)
return response.read()

@ -13,6 +13,8 @@ if sys.version_info[0] == 2:
# for test_usecase.
def _get_raw_stdout():
return sys.stdout
def _get_raw_stderr():
return sys.stderr
ustr = unicode
uchr = unichr
@ -37,6 +39,13 @@ else:
from urllib.request import urlopen
from http.client import HTTPConnection
# The following has to be a function so that it can be mocked
# for test_usecase.
def _get_raw_stdout():
return sys.stdout.buffer
def _get_raw_stderr():
return sys.stderr.buffer
def _fake_stdio():
return io.TextIOWrapper(io.BytesIO()) # Only for tests to capture std{out,err}
@ -45,10 +54,6 @@ else:
stdio.seek(0)
return stdio.read()
# The following has to be a function so that it can be mocked
# for test_usecase.
def _get_raw_stdout():
return sys.stdout.buffer
configparser = configparser
input = input

@ -33,11 +33,11 @@ def bib_oneliner(bibdata):
journal = ' ' + bibdata.get('booktitle', '')
return u'{authors} \"{title}\"{journal}{year}'.format(
authors=color.dye(authors, color.grey, bold=True),
title=bibdata.get('title', ''),
journal=color.dye(journal, color.yellow),
year=' ({})'.format(bibdata['year']) if 'year' in bibdata else '',
)
authors=color.dye_out(authors, 'bold'),
title=bibdata.get('title', ''),
journal=color.dye_out(journal, 'italic'),
year=' ({})'.format(bibdata['year']) if 'year' in bibdata else '',
)
def bib_desc(bib_data):
@ -55,7 +55,7 @@ def paper_oneliner(p, citekey_only=False):
else:
bibdesc = bib_oneliner(p.bibdata)
tags = '' if len(p.tags) == 0 else '| {}'.format(
','.join(color.dye(t, color.tag) for t in sorted(p.tags)))
','.join(color.dye_out(t, color.tag) for t in sorted(p.tags)))
return u'[{citekey}] {descr} {tags}'.format(
citekey=color.dye(p.citekey, color.purple),
citekey=color.dye_out(p.citekey, 'purple'),
descr=bibdesc, tags=tags)

@ -46,9 +46,9 @@ def _update_check(config, ui):
'to bypass this error)')
sys.exit()
elif repo_version < code_version:
ui.print_(
ui.print_out(
'warning: your repository version (v{})'.format(repo_version)
+ 'must be updated to version {}.\n'.format(code_version)
+ 'must be updated to version {}.\n'.format(code_version)
+ "run 'pubs update'.")
sys.exit()

@ -6,7 +6,7 @@ import codecs
from .content import editor_input
from . import color
from .p3 import _get_raw_stdout, input
from .p3 import _get_raw_stdout, _get_raw_stderr, input
# package-shared ui that can be accessed using :
@ -23,7 +23,7 @@ def _get_encoding(config):
enc = locale.getdefaultlocale()[1]
except ValueError:
pass # Keep default
return config.get('terminal-encoding', enc or 'utf8')
return config.get('terminal-encoding', enc or 'utf-8')
def get_ui():
@ -34,34 +34,60 @@ def get_ui():
def init_ui(config):
global _ui
_ui = UI(config)
_ui = InputUI(config)
class UI:
"""UI class. Stores configuration parameters and system information.
"""
class PrintUI(object):
def __init__(self, config):
color.setup(config.color)
self.editor = config.edit_cmd
self.encoding = _get_encoding(config)
self._stdout = codecs.getwriter(self.encoding)(_get_raw_stdout(),
errors='replace')
self._stdout = codecs.getwriter(self.encoding)(_get_raw_stdout(),
errors='replace')
self._stderr = codecs.getwriter(self.encoding)(_get_raw_stderr(),
errors='replace')
def print_(self, *strings, **kwargs):
def print_out(self, *strings, **kwargs):
"""Like print, but rather than raising an error when a character
is not in the terminal's encoding's character set, just silently
replaces it.
"""
print(' '.join(strings), file=self._stdout, **kwargs)
def print_err(self, *strings, **kwargs):
"""Like print, but rather than raising an error when a character
is not in the terminal's encoding's character set, just silently
replaces it.
"""
print(' '.join(strings), file=self._stderr, **kwargs)
def error(self, message):
self.print_err('{}: {}'.format(color.dye_err('error', 'red'), message))
def warning(self, message):
self.print_err("%s: %s" % (color.dye_err('warning', 'yellow'), message))
class InputUI(PrintUI):
"""UI class. Stores configuration parameters and system information.
"""
def __init__(self, config):
super(InputUI, self).__init__(config)
self.editor = config.edit_cmd
def exit(self, error_code=1):
sys.exit(error_code)
def input(self):
try:
data = input()
except EOFError:
self.error('Standard input ended while waiting for answer.')
self.error(u'Standard input ended while waiting for answer.')
self.exit(1)
return data
return data.decode('utf-8')
def input_choice_ng(self, options, option_chars=None, default=None, question=''):
"""Ask the user to chose between a set of options. The iser is asked
@ -75,7 +101,7 @@ class UI:
:returns: int
the index of the chosen option
"""
char_color = color.bold
char_color = 'bold'
option_chars = [s[0] for s in options]
displayed_chars = [c.upper() if i == default else c
for i, c in enumerate(option_chars)]
@ -84,10 +110,10 @@ class UI:
option_chars = []
char_color = color.end
option_str = '/'.join(["{}{}".format(color.dye(c, color.bold), s[1:])
option_str = '/'.join(["{}{}".format(color.dye_out(c, 'bold'), s[1:])
for c, s in zip(displayed_chars, options)])
self.print_('{} {}: '.format(question, option_str), end='')
self.print_out('{} {}: '.format(question, option_str), end='')
while True:
answer = self.input()
if answer is None or answer == '':
@ -101,8 +127,7 @@ class UI:
return option_chars.index(answer.lower())
except ValueError:
pass
self.print_('Incorrect option.', option_str)
self.print_out('Incorrect option.', option_str)
def input_choice(self, options, option_chars, default=None, question=''):
@ -122,9 +147,9 @@ class UI:
"""
displayed_chars = [s.upper() if i == default else s
for i, s in enumerate(option_chars)]
option_str = ', '.join(["[%s]%s" % (color.dye(c, color.cyan), o)
option_str = ', '.join(["[%s]%s" % (color.dye_out(c, 'cyan'), o)
for c, o in zip(displayed_chars, options)])
self.print_(question, option_str)
self.print_out(question, option_str)
while True:
answer = self.input()
if answer is None or answer == '':
@ -135,21 +160,12 @@ class UI:
return option_chars.index(answer.lower())
except ValueError:
pass
self.print_('Incorrect option.', option_str)
self.print_out('Incorrect option.', option_str)
def input_yn(self, question='', default='y'):
d = 0 if default in (True, 'y', 'yes') else 1
answer = self.input_choice_ng(['yes', 'no'], default=d, question=question)
return [True, False][answer]
def exit(self, error_code=1):
sys.exit(error_code)
def error(self, message):
self.print_("%s: %s" % (color.dye('error', color.red), message))
def warning(self, message):
self.print_("%s: %s" % (color.dye('warning', color.yellow), message))
def editor_input(self, initial="", suffix='.tmp'):
return editor_input(self.editor, initial=initial, suffix=suffix)

@ -24,9 +24,7 @@ def resolve_citekey(repo, citekey, ui=None, exit_on_fail=True):
citekey))
for c in citekeys:
p = repo.pull_paper(c)
ui.print_(u' {}'.format(pretty.paper_oneliner(p)))
ui.print_out(u' {}'.format(pretty.paper_oneliner(p)))
if exit_on_fail:
ui.exit()
return citekey

@ -11,7 +11,8 @@ setup(
url = '',
description = 'command-line scientific bibliography manager',
packages = find_packages(), #['pubs', 'pubs.commands', 'pubs.templates', 'pubs.plugs'],
#packages = find_packages(), #['pubs', 'pubs.commands', 'pubs.templates', 'pubs.plugs'],
packages = ['pubs', 'pubs.commands', 'pubs.templates', 'pubs.plugs'],
scripts = ['pubs/pubs'],
install_requires = ['pyyaml', 'bibtexparser', 'python-dateutil', 'requests'],

@ -1,3 +1,6 @@
# Adjusting paths.
import os, sys
sys.path.insert(0, os.path.abspath(os.path.join(__file__, '../..')))
sys.path.insert(0, os.path.abspath(os.path.join(__file__, '../..')))
import logging
logging.disable(logging.CRITICAL)

@ -11,14 +11,16 @@ coder = endecoder.EnDecoder()
franny_bib = """@article{Franny1961,
author = "Salinger, J. D.",
title = "Franny and Zooey",
year = "1961"}
year = "1961",
}
"""
doe_bib = """
@article{Doe2013,
author = "Doe, John",
title = "Nice Title",
year = "2013"}
year = "2013",
}
"""
dummy_metadata = {'docfile': 'docsdir://hop.la', 'tags': set(['a', 'b'])}

@ -29,7 +29,7 @@ bibtex_raw0 = """@techreport{
note = "Previous number = SIDL-WP-1999-0120",
year = "1999",
type = "Technical Report",
institution = "Stanford InfoLab"
institution = "Stanford InfoLab",
}
"""

@ -13,7 +13,7 @@ from str_fixtures import bibtex_raw0
class TestPretty(unittest.TestCase):
def setUp(self):
color.setup(enable=False)
color.setup()
def test_oneliner(self):
decoder = endecoder.EnDecoder()

@ -53,7 +53,7 @@ class TestFakeInput(unittest.TestCase):
class CommandTestCase(unittest.TestCase):
"""Abstract TestCase intializing the fake filesystem."""
maxDiff = None
maxDiff = 1000000
def setUp(self):
self.fs = fake_env.create_fake_fs([content, filebroker, configs, init_cmd, import_cmd])
@ -62,42 +62,43 @@ class CommandTestCase(unittest.TestCase):
def execute_cmds(self, cmds, capture_output=CAPTURE_OUTPUT):
""" Execute a list of commands, and capture their output
A command can be a string, or a tuple of size 2 or 3.
A command can be a string, or a tuple of size 2, 3 or 4.
In the latter case, the command is :
1. a string reprensenting the command to execute
2. the user inputs to feed to the command during execution
3. the output expected, verified with assertEqual. Always captures
output in this case.
3. the expected output on stdout, verified with assertEqual.
4. the expected output on stderr, verified with assertEqual.
"""
outs = []
for cmd in cmds:
inputs = []
output = None
expected_out, expected_err = None, None
actual_cmd = cmd
current_capture_output = capture_output
if not isinstance(cmd, p3.ustr):
actual_cmd = cmd[0]
if len(cmd) == 2: # Inputs provided
inputs = cmd[1]
if len(cmd) == 3: # Expected output provided
current_capture_output = True
output = cmd[2]
capture_output = True
expected_out = color.undye(cmd[2])
if len(cmd) == 4: # Expected error output provided
expected_err = color.undye(cmd[3])
# Always set fake input: test should not ask unexpected user input
input = fake_env.FakeInput(inputs, [content, uis, p3])
input.as_global()
try:
if current_capture_output:
if capture_output:
_, stdout, stderr = fake_env.redirect(pubs_cmd.execute)(
actual_cmd.split())
self.assertEqual(stderr, '')
actual_out = color.undye(stdout)
if output is not None:
correct_out = color.undye(output)
self.assertEqual(actual_out, correct_out)
actual_err = color.undye(stderr)
if expected_out is not None:
self.assertEqual(actual_out, expected_out)
if expected_err is not None:
self.assertEqual(actual_err, expected_err)
outs.append(color.undye(actual_out))
else:
pubs_cmd.execute(cmd.split())
pubs_cmd.execute(actual_cmd.split())
except fake_env.FakeInput.UnexpectedInput:
self.fail('Unexpected input asked by command: {}.'.format(
actual_cmd))
@ -252,7 +253,7 @@ class TestUsecase(DataCommandTestCase):
def test_first(self):
correct = ['Initializing pubs in /paper_first\n',
'[Page99] Page, Lawrence et al. "The PageRank Citation Ranking: Bringing Order to the Web." (1999) \nwas added to pubs.\n',
'added to pubs:\n[Page99] Page, Lawrence et al. "The PageRank Citation Ranking: Bringing Order to the Web." (1999) \n',
'[Page99] Page, Lawrence et al. "The PageRank Citation Ranking: Bringing Order to the Web." (1999) \n',
'\n',
'',

Loading…
Cancel
Save