adding tests to git plugin

- git plugin now performs `git init`
- git call are checked
- only one git commit per pubs command
- SandboxedCommandTestCase for real hd usecase tests.
- `git --help` now documents `--config` and `--force-colors`
main
Fabien C. Y. Benureau 6 years ago
parent 3ee2c1eaec
commit 0df52efcd3
No known key found for this signature in database
GPG Key ID: C3FB5E831A249A9A

@ -7,8 +7,11 @@
### Implemented enhancements
- Add `citekey` filter to `query` ([#193](https://github.com/pubs/pubs/pull/193) by [Shane Stone](https://github.com/shanewstone))
- New git plugin to commit changes to the repository ([#193](https://github.com/pubs/pubs/pull/193) by [Amlesh Sivanantham](http://github.com/zamlz))
- Add `citekey` filter to `query` ([#191(https://github.com/pubs/pubs/pull/191) by [Shane Stone](https://github.com/shanewstone))
- The `--config` and `--force-colors` command line options now appear when invoking `pubs --help`
### Fixed bugs

@ -95,6 +95,14 @@ active = force_list(default=list('alias'))
# command = !pubs list -k | wc -l
# description = lists number of pubs in repo
[[git]]
# the plugin allows to use `pubs git` and commit changes automatically
# if False, will display git output when invoked
quiet = boolean(default=True)
# if True, git will not automatically commit changes
manual = boolean(default=False)
[internal]
# The version of this configuration file. Do not edit.
version = string(min=5, default='{}')

@ -25,8 +25,19 @@ class Event(object):
return wrap
class PaperEvent(Event):
_format = "Unknown modification of paper {citekey}."
# Command events
class PreCommandEvent(Event):
description = "Triggered before the command is executed"
class PostCommandEvent(Event):
description = "Triggered after the command is executed"
# Paper changes
class PaperChangeEvent(Event):
_format = "Unspecified modification of paper {citekey}."
def __init__(self, citekey):
self.citekey = citekey
@ -36,27 +47,27 @@ class PaperEvent(Event):
return self._format.format(citekey=self.citekey)
# Used by repo.push_paper()
class AddEvent(PaperEvent):
class AddEvent(PaperChangeEvent):
_format = "Adds paper {citekey}."
# Used by repo.push_doc()
class DocAddEvent(PaperEvent):
class DocAddEvent(PaperChangeEvent):
_format = "Adds document for {citekey}."
# Used by repo.remove_paper()
class RemoveEvent(PaperEvent):
class RemoveEvent(PaperChangeEvent):
_format = "Removes paper for {citekey}."
# Used by repo.remove_doc()
class DocRemoveEvent(PaperEvent):
class DocRemoveEvent(PaperChangeEvent):
_format = "Removes document for {citekey}."
# Used by commands.tag_cmd.command()
class TagEvent(PaperEvent):
class TagEvent(PaperChangeEvent):
_format = "Updates tags for {citekey}."
# Used by commands.edit_cmd.command()
class ModifyEvent(PaperEvent):
class ModifyEvent(PaperChangeEvent):
_format = "Modifies {file_type} file of {citekey}."
def __init__(self, citekey, file_type):
@ -68,7 +79,7 @@ class ModifyEvent(PaperEvent):
return self._format.format(citekey=self.citekey, file_type=self.file_type)
# Used by repo.rename_paper()
class RenameEvent(PaperEvent):
class RenameEvent(PaperChangeEvent):
_format = "Renames paper {old_citekey} to {citekey}."
def __init__(self, paper, old_citekey):
@ -81,5 +92,5 @@ class RenameEvent(PaperEvent):
return self._format.format(citekey=self.citekey, old_citekey=self.old_citekey)
# Used by commands.note_cmd.command()
class NoteEvent(PaperEvent):
class NoteEvent(PaperChangeEvent):
_format = "Modifies note {citekey}."

@ -34,6 +34,8 @@ def load_plugins(conf, ui):
package in sys.path; the module indicated should contain the
PapersPlugin subclasses desired.
"""
global _classes, _instances
_classes, _instances = [], {}
for name in conf['plugins']['active']:
if len(name) > 0:
modname = '{}.{}.{}.{}'.format('pubs', PLUGIN_NAMESPACE, name, name)
@ -50,7 +52,7 @@ def load_plugins(conf, ui):
if isinstance(obj, type) and issubclass(obj, PapersPlugin) \
and obj != PapersPlugin:
_classes.append(obj)
_instances[obj] = obj(conf)
_instances[obj] = obj(conf, ui)
def get_plugins():

@ -65,7 +65,7 @@ class AliasPlugin(PapersPlugin):
name = 'alias'
def __init__(self, conf):
def __init__(self, conf, ui):
self.aliases = []
if 'alias' in conf['plugins']:
for name, entry in conf['plugins']['alias'].items():

@ -1,39 +1,106 @@
import subprocess
import os
import sys
import argparse
from subprocess import Popen, PIPE
from pipes import quote as shell_quote
from ...plugins import PapersPlugin
from ...events import PaperEvent, RenameEvent
from ...events import PaperChangeEvent, PostCommandEvent
GITIGNORE = """# files or directories for the git plugin to ignore
.cache/
"""
class GitPlugin(PapersPlugin):
"""The git plugin creates a git repository in the pubs directory and commit the changes
to the pubs repository everytime a paper is modified.
It also add the `pubs git` subcommand, so git commands can be executed in the git repository
from the command line.
"""
name = 'git'
description = "Run git commands in the pubs directory"
def __init__(self, conf):
def __init__(self, conf, ui):
self.ui = ui
self.pubsdir = conf['main']['pubsdir']
self.manual = conf['plugins'].get('git', {}).get('manual', False)
self.quiet = conf['plugins'].get('git', {}).get('quiet', True)
self.list_of_changes = []
self._gitinit()
def _gitinit(self):
"""Initialize the git repository if necessary."""
# check that a `.git` directory is present in the pubs dir
git_path = os.path.join(self.pubsdir, '.git')
if not os.path.isdir(git_path):
self.shell('init')
# check that a `.gitignore` file is present
gitignore_path = os.path.expanduser(os.path.join(self.pubsdir, '.gitignore'))
if not os.path.isfile(gitignore_path):
print('bla')
with open(gitignore_path, 'w') as fd:
fd.write(GITIGNORE)
def update_parser(self, subparsers, conf):
"""Allow the usage of the pubs git command"""
git_parser = subparsers.add_parser(self.name, help=self.description)
git_parser.add_argument('arguments', nargs='*', help="look at man git")
# FIXME: there may be some problems here with the -c argument being ambiguous between
# pubs and git.
git_parser.add_argument('arguments', nargs=argparse.REMAINDER, help="look at man git")
git_parser.set_defaults(func=self.command)
def command(self, conf, args):
"""Runs the git program in a shell"""
"""Execute a git command in the pubs directory"""
self.shell(' '.join([shell_quote(a) for a in args.arguments]))
def shell(self, cmd):
subprocess.call('git -C {} {}'.format(self.pubsdir, cmd), shell=True)
def shell(self, cmd, input_stdin=None):
"""Runs the git program in a shell
:param cmd: the git command, and all arguments, as a single string (e.g. 'add .')
:param input_stdin: if Python 3, must be bytes (i.e., from str, s.encode('utf-8'))
"""
git_cmd = 'git -C {} {}'.format(self.pubsdir, cmd)
p = Popen(git_cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
output, err = p.communicate(input_stdin)
p.wait()
if p.returncode != 0:
msg = ('The git plugin encountered an error when running the git command:\n' +
'{}\n{}\n'.format(git_cmd, err) + 'You may fix the state of the git ' +
'repository manually.\nIf relevant, you may submit a bug report at ' +
'https://github.com/pubs/pubs/issues')
self.ui.warning(msg)
elif not self.quiet:
self.ui.info(output)
return output, err, p.returncode
@PaperEvent.listen()
def git_commit_event(PaperEventInstance):
# Stage the changes and commit
@PaperChangeEvent.listen()
def paper_change_event(event):
"""When a paper is changed, commit the changes to the directory."""
try:
git = GitPlugin.get_instance()
if not git.manual:
event_desc = event.description
for a, b in [('\\','\\\\'), ('"','\\"'), ('$','\\$'), ('`','\\`')]:
event_desc = event_desc.replace(a, b)
git.list_of_changes.append(event_desc)
except RuntimeError:
pass
@PostCommandEvent.listen()
def git_commit(event):
try:
git = GitPlugin.get_instance()
if isinstance(PaperEventInstance, RenameEvent):
git.shell("add \*/{}.\*".format(PaperEventInstance.old_citekey))
git.shell("add \*/{}.\*".format(PaperEventInstance.citekey))
cmesg = PaperEventInstance.description
cmesg = cmesg.replace('\\','\\\\').replace('"','\\"').replace('$','\\$').replace('`','\\`')
git.shell('commit -m "{}"'.format(cmesg))
if len(git.list_of_changes) > 0:
if not git.manual:
title = ' '.join(sys.argv) + '\n'
message = '\n'.join([title] + git.list_of_changes)
git.shell('add .')
git.shell('commit -F-', message.encode('utf-8'))
except RuntimeError:
pass

@ -1,10 +1,13 @@
# PYTHON_ARGCOMPLETE_OK
import sys
import argparse
import collections
from . import uis
from . import p3
from . import config
from . import events
from . import commands
from . import update
from . import plugins
@ -38,15 +41,16 @@ CORE_CMDS = collections.OrderedDict([
def execute(raw_args=sys.argv):
try:
conf_parser = p3.ArgumentParser(prog="pubs", add_help=False)
conf_parser.add_argument("-c", "--config", help="path to config file",
desc = 'Pubs: your bibliography on the command line.\nVisit https://github.com/pubs/pubs for more information.'
parser = p3.ArgumentParser(prog="pubs", add_help=False, description=desc)
parser.add_argument("-c", "--config", help="path to an alternate configuration file",
type=str, metavar="FILE")
conf_parser.add_argument('--force-colors', dest='force_colors',
parser.add_argument('--force-colors', dest='force_colors',
action='store_true', default=False,
help='color are not disabled when piping to a file or other commands')
#conf_parser.add_argument("-u", "--update", help="update config if needed",
help='colors are not disabled when piping to a file or other commands')
#parser.add_argument("-u", "--update", help="update config if needed",
# default=False, action='store_true')
top_args, remaining_args = conf_parser.parse_known_args(raw_args[1:])
top_args, remaining_args = parser.parse_known_args(raw_args[1:])
if top_args.config:
conf_path = top_args.config
@ -70,10 +74,9 @@ def execute(raw_args=sys.argv):
uis.init_ui(conf, force_colors=top_args.force_colors)
ui = uis.get_ui()
desc = 'Pubs: your bibliography on the command line.\nVisit https://github.com/pubs/pubs for more information.'
parser = p3.ArgumentParser(description=desc,
prog="pubs", add_help=True)
parser.add_argument('--version', action='version', version=__version__)
parser.add_argument('-v', '--version', action='version', version=__version__)
parser.add_argument('-h', '--help', action='help', default=argparse.SUPPRESS,
help='Show this help message and exit.')
subparsers = parser.add_subparsers(title="commands", dest="command")
# Populate the parser with core commands
@ -96,9 +99,12 @@ def execute(raw_args=sys.argv):
parser.print_help(file=sys.stderr)
sys.exit(2)
events.PreCommandEvent().send()
args.prog = "pubs" # FIXME?
args.func(conf, args)
except Exception as e:
if not uis.get_ui().handle_exception(e):
raise
finally:
events.PostCommandEvent().send()

@ -0,0 +1,254 @@
from __future__ import print_function, unicode_literals
import os
import sys
import shutil
import tempfile
import unittest
import six
from pubs import pubs_cmd, color, content, uis, p3
from pubs.config import conf
from pubs.p3 import _fake_stdio, _get_fake_stdio_ucontent
# makes the tests very noisy
PRINT_OUTPUT = True
CAPTURE_OUTPUT = True
class FakeSystemExit(Exception):
"""\
SystemExit exceptions are replaced by FakeSystemExit in the execute_cmds()
function, so they can be catched by ExpectedFailure tests in Python 2.x.
If a code is expected to raise SystemExit, catch FakeSystemExit instead.
Added explicit __init__ so SystemExit.code functionality could be emulated.
Taking form from https://stackoverflow.com/a/26938914/1634191
"""
def __init__(self, code=None, *args):
self.code = code
super(FakeSystemExit, self).__init__(
"Exited with code: {}.".format(self.code), *args)
# capture output
def capture(f, verbose=False):
"""Capture the stdout and stderr output.
Useful for comparing the output with the expected one during tests.
:param f: The function to capture output from.
:param verbose: If True, print call will still display their outputs.
If False, they will be silenced.
"""
def newf(*args, **kwargs):
old_stderr, old_stdout = sys.stderr, sys.stdout
sys.stdout = _fake_stdio(additional_out=old_stderr if verbose else None)
sys.stderr = _fake_stdio(additional_out=old_stderr if False else None)
try:
return f(*args, **kwargs), _get_fake_stdio_ucontent(sys.stdout), _get_fake_stdio_ucontent(sys.stderr)
finally:
sys.stderr, sys.stdout = old_stderr, old_stdout
return newf
# scriptable input
class FakeInput():
""" Replace the input() command, and mock user input during tests
Instanciate as :
input = FakeInput(['yes', 'no'])
then replace the input command in every module of the package :
input.as_global()
Then :
input() returns 'yes'
input() returns 'no'
input() raises IndexError
"""
class UnexpectedInput(Exception):
pass
def __init__(self, inputs, module_list=tuple()):
self.inputs = list(inputs) or []
self.module_list = module_list
self._cursor = 0
def as_global(self):
for md in self.module_list:
md.input = self
if md.__name__ == 'pubs.uis':
md.InputUI.editor_input = self
md.InputUI.edit_file = self.input_to_file
# Do not catch UnexpectedInput
original_handler = md.InputUI.handle_exception
def handler(ui, exc):
if isinstance(exc, self.UnexpectedInput):
raise
else:
original_handler(ui, exc)
md.InputUI.handle_exception = handler
def input_to_file(self, path_to_file, temporary=True):
content.write_file(path_to_file, self())
def add_input(self, inp):
self.inputs.append(inp)
def __call__(self, *args, **kwargs):
try:
inp = self.inputs[self._cursor]
self._cursor += 1
return inp
except IndexError:
raise self.UnexpectedInput('Unexpected user input in test.')
class SandboxedCommandTestCase(unittest.TestCase):
maxDiff = 1000000
def setUp(self):
super(SandboxedCommandTestCase, self).setUp()
self.temp_dir = tempfile.mkdtemp()
self.default_pubs_dir = os.path.join(self.temp_dir, 'pubs')
self.default_conf_path = os.path.join(self.temp_dir, 'pubsrc')
os.chdir(os.path.dirname(__file__))
@staticmethod
def _normalize(s):
"""Normalize a string for robust comparisons."""
s = color.undye(s)
try:
s = s.decode('utf-8')
except AttributeError:
pass
return s
def _compare_output(self, s1, s2):
if s1 is not None and s2 is not None:
return self.assertEqual(self._normalize(s1), self._normalize(s2))
def _preprocess_cmd(self, cmd):
"""Sandbox the pubs command into a temporary directory"""
cmd_chunks = cmd.split(' ')
assert cmd_chunks[0] == 'pubs'
prefix = ['pubs', '-c', self.default_conf_path]
if cmd_chunks[1] == 'init':
return ' '.join(prefix + ['init', '-p', self.default_pubs_dir] + cmd_chunks[2:])
else:
return ' '.join(prefix + cmd_chunks[1:])
def execute_cmds(self, cmds, capture_output=CAPTURE_OUTPUT):
""" Execute a list of commands, and capture their output
A command can be a string, or a tuple of size 2, 3 or 4.
In the latter case, the command is :
1. a string reprensenting the command to execute
2. the user inputs to feed to the command during execution
3. the expected output on stdout, verified with assertEqual.
4. the expected output on stderr, verified with assertEqual. (this does not work yet)
"""
try:
outs = []
for cmd in cmds:
inputs = []
expected_out, expected_err = None, None
actual_cmd = cmd
if not isinstance(cmd, p3.ustr):
actual_cmd = cmd[0]
if len(cmd) >= 2 and cmd[1] is not None: # Inputs provided
inputs = cmd[1]
if len(cmd) >= 3: # Expected output provided
capture_output = True
if cmd[2] is not None:
expected_out = color.undye(cmd[2])
if len(cmd) >= 4 and cmd[3] is not None: # Expected error output provided
expected_err = color.undye(cmd[3])
actual_cmd = self._preprocess_cmd(actual_cmd)
# Always set fake input: test should not ask unexpected user input
input = FakeInput(inputs, [content, uis, p3])
input.as_global()
try:
if capture_output:
execute_captured = capture(pubs_cmd.execute, verbose=PRINT_OUTPUT)
_, stdout, stderr = execute_captured(actual_cmd.split())
self._compare_output(stdout, expected_out)
self._compare_output(stderr, expected_err)
outs.append(self._normalize(stdout))
else:
pubs_cmd.execute(actual_cmd.split())
except FakeInput.UnexpectedInput:
self.fail('Unexpected input asked by command: {}.'.format(actual_cmd))
return outs
except SystemExit as exc:
exc_class, exc, tb = sys.exc_info()
if sys.version_info.major == 2:
# using six to avoid a SyntaxError in Python 3.x
six.reraise(FakeSystemExit, FakeSystemExit(*exc.args), tb)
else:
raise FakeSystemExit(*exc.args).with_traceback(tb)
def tearDown(self):
shutil.rmtree(self.temp_dir, ignore_errors=True)
## Testing the test environments
class TestInput(unittest.TestCase):
"""Test that the fake input mechanisms work correctly in the tests"""
def test_input(self):
input = FakeInput(['yes', 'no'])
self.assertEqual(input(), 'yes')
self.assertEqual(input(), 'no')
with self.assertRaises(FakeInput.UnexpectedInput):
input()
def test_input2(self):
other_input = FakeInput(['yes', 'no'], module_list=[color])
other_input.as_global()
self.assertEqual(color.input(), 'yes')
self.assertEqual(color.input(), 'no')
with self.assertRaises(FakeInput.UnexpectedInput):
color.input()
def test_editor_input(self):
sample_conf = conf.load_default_conf()
ui = uis.InputUI(sample_conf)
other_input = FakeInput(['yes', 'no'], module_list=[uis])
other_input.as_global()
self.assertEqual(ui.editor_input('fake_editor'), 'yes')
self.assertEqual(ui.editor_input('fake_editor'), 'no')
with self.assertRaises(FakeInput.UnexpectedInput):
ui.editor_input()
class TestSandboxedCommandTestCase(SandboxedCommandTestCase):
def test_init_add(self):
"""Simple init and add example"""
correct = ("added to pubs:\n"
"[Page99] Page, Lawrence et al. \"The PageRank Citation Ranking: Bringing Order to the Web.\" (1999) \n")
cmds = ['pubs init',
('pubs add data/pagerank.bib', [], correct),
('pubs add abc', [], '', 'error: File does not exist: /Users/self/Volumes/ResearchSync/projects/pubs/abc\n')
]
self.execute_cmds(cmds)
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -0,0 +1,94 @@
import unittest
import subprocess
import sand_env
from pubs import config
def git_hash(pubsdir):
"""Return the git revision"""
hash_cmd = ('git', '-C', pubsdir, 'rev-parse', 'HEAD')
return subprocess.check_output(hash_cmd)
class TestGitPlugin(sand_env.SandboxedCommandTestCase):
def setUp(self, nsec_stat=True):
super(TestGitPlugin, self).setUp()
self.execute_cmds(['pubs init'])
conf = config.load_conf(path=self.default_conf_path)
conf['plugins']['active'] = ['git']
config.save_conf(conf, path=self.default_conf_path)
def test_git(self):
self.execute_cmds(['pubs add data/pagerank.bib'])
hash_a = git_hash(self.default_pubs_dir)
self.execute_cmds(['pubs add data/pagerank.bib'])
hash_b = git_hash(self.default_pubs_dir)
self.execute_cmds(['pubs rename Page99a ABC'])
hash_c = git_hash(self.default_pubs_dir)
self.execute_cmds([('pubs remove ABC', ['y']),])
hash_d = git_hash(self.default_pubs_dir)
self.execute_cmds(['pubs doc add testrepo/doc/Page99.pdf Page99'])
hash_e = git_hash(self.default_pubs_dir)
self.execute_cmds([('pubs doc remove Page99', ['y'])])
hash_f = git_hash(self.default_pubs_dir)
self.execute_cmds(['pubs tag Page99 bla+bli'])
hash_g = git_hash(self.default_pubs_dir)
self.execute_cmds(['pubs list'])
hash_h = git_hash(self.default_pubs_dir)
self.execute_cmds([('pubs edit Page99', ['@misc{Page99, title="TTT" author="X. YY"}', 'y',
'@misc{Page99, title="TTT", author="X. YY"}', ''])])
hash_i = git_hash(self.default_pubs_dir)
self.assertNotEqual(hash_a, hash_b)
self.assertNotEqual(hash_b, hash_c)
self.assertNotEqual(hash_c, hash_d)
self.assertNotEqual(hash_d, hash_e)
self.assertNotEqual(hash_e, hash_f)
self.assertNotEqual(hash_f, hash_g)
self.assertEqual(hash_g, hash_h)
self.assertNotEqual(hash_h, hash_i)
conf = config.load_conf(path=self.default_conf_path)
conf['plugins']['active'] = []
config.save_conf(conf, path=self.default_conf_path)
self.execute_cmds(['pubs add data/pagerank.bib'])
hash_j = git_hash(self.default_pubs_dir)
self.assertEqual(hash_i, hash_j)
conf = config.load_conf(path=self.default_conf_path)
conf['plugins']['active'] = ['git']
conf['plugins']['git']['manual'] = True
config.save_conf(conf, path=self.default_conf_path)
self.execute_cmds(['pubs add data/pagerank.bib'])
hash_k = git_hash(self.default_pubs_dir)
self.assertEqual(hash_j, hash_k)
self.execute_cmds(['pubs git add .'])
hash_l = git_hash(self.default_pubs_dir)
self.assertEqual(hash_k, hash_l)
self.execute_cmds(['pubs git commit -m "abc"'])
hash_m = git_hash(self.default_pubs_dir)
self.assertNotEqual(hash_l, hash_m)
if __name__ == '__main__':
unittest.main()
Loading…
Cancel
Save