Many cleanup in unicode encoding.

Originally intended to fix a bug in edit openning fils with non-ascii
characters.

Now all data is assumed to be manipulated as unicode. Therefore all
values returned by functions from content are unicode. There are a few
exception in order to download non-unicode data without failing to
decode. These exception are marked by the 'byte_' prefix.
The io package is used instead of builtin open for all file
transactions.

The fake_env test helper has to be modified (hacked, to be honnest) since
fake_filesystem does not offer a mock of io.

This is still WIP. Two issues still have to be solved:
- first there is an UnicodeWarning raised by bibparser,
- also config is still directly using builtin open.
main
Olivier Mangin 11 years ago
parent 3a149f655f
commit 506bb24e50

@ -1,3 +1,5 @@
from __future__ import unicode_literals
import unicodedata
import re
@ -12,7 +14,7 @@ CITEKEY_EXCLUDE_RE = re.compile('[%s]'
% re.escape(CONTROL_CHARS + CITEKEY_FORBIDDEN_CHARS))
def str2citekey(s):
key = unicodedata.normalize('NFKD', ustr(s)).encode('ascii', 'ignore')
key = unicodedata.normalize('NFKD', ustr(s)).encode('ascii', 'ignore').decode()
key = CITEKEY_EXCLUDE_RE.sub('', key)
# Normalize chars and remove non-ascii
return key

@ -1,4 +1,5 @@
import os
import io
import subprocess
import tempfile
import shutil
@ -6,6 +7,16 @@ import shutil
from .p3 import urlparse, HTTPConnection, urlopen
"""Conventions:
- all files are written using utf8 encoding by default,
- any function returning or variable containing byte data should
be prefixed by 'byte_'
"""
ENCODING = 'utf8'
# files i/o
def _check_system_path_exists(path, fail=True):
@ -42,14 +53,14 @@ def check_directory(path, fail=True):
def read_file(filepath):
check_file(filepath)
with open(system_path(filepath), 'r') as f:
s = f.read()
return s
with io.open(system_path(filepath), 'r', encoding=ENCODING) as f:
content = f.read()
return content
def write_file(filepath, data):
check_directory(os.path.dirname(filepath))
with open(system_path(filepath), 'w') as f:
with io.open(system_path(filepath), 'w', encoding=ENCODING) as f:
f.write(data)
@ -83,13 +94,25 @@ def check_content(path):
return check_file(path)
def _get_byte_url_content(path, ui=None):
if ui is not None:
ui.print_('dowloading {}'.format(path))
response = urlopen(path)
return response.read()
def _dump_byte_url_content(source, target):
"""Caution: this method does not test for existing destination.
"""
byte_content = _get_byte_url_content(source)
with io.open(target, 'wb') as f:
f.write(byte_content)
def get_content(path, ui=None):
"""Will be useful when we need to get content from url"""
if content_type(path) == 'url':
if ui is not None:
ui.print_('dowloading {}'.format(path))
response = urlopen(path)
return response.read()
return _get_byte_url_content(path, ui=ui).decode(encoding=ENCODING)
else:
return read_file(path)
@ -102,24 +125,27 @@ def move_content(source, target, overwrite = False):
shutil.move(source, target)
def copy_content(source, target, overwrite = False):
def copy_content(source, target, overwrite=False):
if source == target:
return
if not overwrite and os.path.exists(target):
raise IOError('target file exists')
shutil.copy(source, target)
raise IOError('{} file exists.'.format(target))
if content_type(source) == 'url':
_dump_byte_url_content(source, target)
else:
shutil.copy(source, target)
def editor_input(editor, initial="", suffix='.tmp'):
def editor_input(editor, initial=u"", suffix='.tmp'):
"""Use an editor to get input"""
str_initial = initial.encode(ENCODING) # TODO: make it a configuration item
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as temp_file:
tfile_name = temp_file.name
temp_file.write(initial)
temp_file.write(str_initial)
cmd = editor.split() # this enable editor command with option, e.g. gvim -f
cmd.append(tfile_name)
subprocess.call(cmd)
with open(tfile_name) as temp_file:
content = temp_file.read()
content = read_file(tfile_name)
os.remove(tfile_name)
return content
@ -127,11 +153,9 @@ def editor_input(editor, initial="", suffix='.tmp'):
def edit_file(editor, path_to_file, temporary=True):
if temporary:
check_file(path_to_file, fail=True)
with open(path_to_file) as f:
content = f.read()
content = read_file(path_to_file)
content = editor_input(editor, content)
with open(path_to_file, 'w') as f:
f.write(content)
write_file(path_to_file, content)
else:
cmd = editor.split() # this enable editor command with option, e.g. gvim -f
cmd.append(path_to_file)

@ -1,10 +1,7 @@
from __future__ import print_function, absolute_import, division, unicode_literals
import copy
try:
import cStringIO as StringIO
except ImportError:
import StringIO
import io
import copy
try:
import bibtexparser as bp
@ -16,6 +13,12 @@ except ImportError:
import yaml
"""Important notice:
All functions and methods in this file assume and produce unicode data.
"""
def sanitize_citekey(record):
record['id'] = record['id'].strip('\n')
return record
@ -56,7 +59,8 @@ class EnDecoder(object):
"""
def encode_metadata(self, metadata):
return yaml.safe_dump(metadata, allow_unicode=True, encoding='UTF-8', indent = 4)
return yaml.safe_dump(metadata, allow_unicode=True,
encoding=None, indent = 4)
def decode_metadata(self, metadata_raw):
return yaml.safe_load(metadata_raw)
@ -97,9 +101,7 @@ class EnDecoder(object):
def decode_bibdata(self, bibdata_raw):
""""""
bibdata_rawutf8 = bibdata_raw
#bibdata_rawutf8 = unicode(bibdata_raw, 'utf8') # FIXME this doesn't work
bibdata_stream = StringIO.StringIO(bibdata_rawutf8)
bibdata_stream = io.StringIO(bibdata_raw)
return self._decode_bibdata(bibdata_stream)
def _decode_bibdata(self, bibdata_stream):

@ -3,7 +3,8 @@ import re
from .p3 import urlparse
from .content import (check_file, check_directory, read_file, write_file,
system_path, check_content, content_type, get_content)
system_path, check_content, content_type, get_content,
copy_content)
def filter_filename(filename, ext):
@ -161,12 +162,7 @@ class DocBroker(object):
target_path = '{}://{}'.format(self.scheme, citekey + os.path.splitext(source_path)[-1])
full_target_path = self.real_docpath(target_path)
if not overwrite and check_file(full_target_path, fail=False):
raise IOError('{} file exists.'.format(full_target_path))
doc_content = get_content(full_source_path)
write_file(full_target_path, doc_content)
copy_content(full_source_path, full_target_path, overwrite=overwrite)
return target_path
def remove_doc(self, docpath, silent=True):

@ -2,7 +2,6 @@ import sys
if sys.version_info[0] == 2:
import ConfigParser as configparser
import StringIO as io
input = raw_input
ustr = unicode
uchr = unichr
@ -11,7 +10,6 @@ if sys.version_info[0] == 2:
from httplib import HTTPConnection
else:
import configparser
import io
ustr = str
uchr = chr
from urllib.parse import urlparse
@ -19,5 +17,11 @@ else:
from http.client import HTTPConnection
configparser = configparser
io = io
input = input
def isbasestr(obj):
try:
return isinstance(obj, basestring)
except NameError:
return isinstance(obj, str) or isinstance(obj, bytes)

@ -1,4 +1,5 @@
import sys
import io
import os
import shutil
import glob
@ -9,7 +10,7 @@ import fake_filesystem
import fake_filesystem_shutil
import fake_filesystem_glob
from pubs.p3 import io, input
from pubs.p3 import input
from pubs import content, filebroker
# code for fake fs
@ -19,6 +20,7 @@ real_open = open
real_file = file
real_shutil = shutil
real_glob = glob
real_io = io
@ -35,6 +37,57 @@ real_glob = glob
# return ml
ENCODING = 'utf8'
class UnicodeStringIOWrapper(object):
"""This is a hack because fake_filesystem does not provied mock of io.
"""
override = ['read', 'readline', 'readlines', 'write', 'writelines']
def __init__(self, strio):
self._strio = strio # The real StringIO
def __getattr__(self, name):
if name in UnicodeStringIOWrapper.override:
return object.__getattribute__(self, name)
else:
return self._strio.__getattribute__(name)
def read(self, *args):
return self._strio.read(*args).decode(ENCODING)
def readline(self, *args):
return self._strio.readline(*args).decode(ENCODING)
def readlines(self, *args):
return [l.decode(ENCODING) for l in self._strio.readlines(*args)]
def write(self, data):
self._strio.write(data.encode(ENCODING))
def writelines(self, data):
self._strio.write([l.encode(ENCODING) for l in data])
def __enter__(self):
self._strio.__enter__()
return self
def __exit__(self, *args):
return self._strio.__exit__(*args)
class FakeIO(object):
def __init__(self, fake_open):
self.fake_open = fake_open
def open(self, *args, **kwargs):
# Forces python3 mode for FakeFileOpen
fakefs_stringio = self.fake_open.Call(*args, **kwargs)
return UnicodeStringIOWrapper(fakefs_stringio)
def create_fake_fs(module_list):
@ -43,6 +96,7 @@ def create_fake_fs(module_list):
fake_open = fake_filesystem.FakeFileOpen(fake_fs)
fake_shutil = fake_filesystem_shutil.FakeShutilModule(fake_fs)
fake_glob = fake_filesystem_glob.FakeGlobModule(fake_fs)
fake_io = FakeIO(fake_open)
fake_fs.CreateDirectory(fake_os.path.expanduser('~'))
@ -51,19 +105,23 @@ def create_fake_fs(module_list):
sys.modules['os'] = fake_os
sys.modules['shutil'] = fake_shutil
sys.modules['glob'] = fake_glob
sys.modules['io'] = fake_io
for md in module_list:
md.os = fake_os
md.shutil = fake_shutil
md.open = fake_open
md.file = fake_open
md.io = fake_io
return {'fs': fake_fs,
'os': fake_os,
'open': fake_open,
'io': fake_io,
'shutil': fake_shutil,
'glob': fake_glob}
def unset_fake_fs(module_list):
try:
__builtins__.open = real_open
@ -75,12 +133,14 @@ def unset_fake_fs(module_list):
sys.modules['os'] = real_os
sys.modules['shutil'] = real_shutil
sys.modules['glob'] = real_glob
sys.modules['io'] = real_io
for md in module_list:
md.os = real_os
md.shutil = real_shutil
md.open = real_open
md.file = real_file
md.io = real_io
def copy_dir(fs, real_dir, fake_dir = None):
@ -91,7 +151,7 @@ def copy_dir(fs, real_dir, fake_dir = None):
real_path = os.path.abspath(real_os.path.join(real_dir, filename))
fake_path = fs['os'].path.join(fake_dir, filename)
if real_os.path.isfile(real_path):
with real_open(real_path, 'r') as f:
with real_open(real_path, 'rb') as f:
fs['fs'].CreateFile(fake_path, contents=f.read())
if real_os.path.isdir(real_path):
fs['fs'].CreateDirectory(fake_path)
@ -103,8 +163,8 @@ def copy_dir(fs, real_dir, fake_dir = None):
def redirect(f):
def newf(*args, **kwargs):
old_stderr, old_stdout = sys.stderr, sys.stdout
stdout = io.StringIO()
stderr = io.StringIO()
stdout = io.BytesIO()
stderr = io.BytesIO()
sys.stdout, sys.stderr = stdout, stderr
try:
return f(*args, **kwargs), stdout, stderr

@ -1,5 +1,7 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import dotdot
from pubs import endecoder
import str_fixtures
@ -19,6 +21,8 @@ doe_bib = """
year = "2013"}
"""
dummy_metadata = {'docfile': 'docsdir://hop.la', 'tags': set(['a', 'b'])}
franny_bibdata = coder.decode_bibdata(franny_bib)
franny_bibentry = franny_bibdata['Franny1961']

@ -1,3 +1,6 @@
from __future__ import unicode_literals
bibtex_external0 = """
@techreport{Page99,
number = {1999-66},

@ -6,7 +6,9 @@ import yaml
import dotdot
from pubs import endecoder
from pubs.p3 import ustr
from fixtures import dummy_metadata
from str_fixtures import bibtex_raw0, metadata_raw0, turing_bib
@ -21,8 +23,19 @@ def compare_yaml_str(s1, s2):
class TestEnDecode(unittest.TestCase):
def test_endecode_bibtex(self):
def test_encode_bibtex_is_unicode(self):
decoder = endecoder.EnDecoder()
entry = decoder.decode_bibdata(bibtex_raw0)
bibraw = decoder.encode_bibdata(entry)
self.assertIsInstance(bibraw, ustr)
def test_encode_metadat_is_unicode(self):
decoder = endecoder.EnDecoder()
data = decoder.encode_metadata(dummy_metadata)
self.assertIsInstance(data, ustr)
def test_endecode_bibtex(self):
decoder = endecoder.EnDecoder()
entry = decoder.decode_bibdata(bibtex_raw0)

@ -1,4 +1,5 @@
from __future__ import print_function
from __future__ import print_function, unicode_literals
import unittest
import re
import os
@ -85,7 +86,7 @@ class CommandTestCase(unittest.TestCase):
else:
if capture_output:
assert isinstance(cmd, str)
assert p3.isbasestr(cmd)
_, stdout, stderr = fake_env.redirect(pubs_cmd.execute)(cmd.split())
else:
pubs_cmd.execute(cmd.split())
@ -213,13 +214,13 @@ class TestList(DataCommandTestCase):
class TestUsecase(DataCommandTestCase):
def test_first(self):
correct = ['Initializing pubs in /paper_first\n',
'',
'[Page99] Page, Lawrence et al. "The PageRank Citation Ranking: Bringing Order to the Web." (1999) \n',
'',
'',
'search network\n',
'[Page99] Page, Lawrence et al. "The PageRank Citation Ranking: Bringing Order to the Web." (1999) search network\n'
correct = [b'Initializing pubs in /paper_first\n',
b'',
b'[Page99] Page, Lawrence et al. "The PageRank Citation Ranking: Bringing Order to the Web." (1999) \n',
b'',
b'',
b'search network\n',
b'[Page99] Page, Lawrence et al. "The PageRank Citation Ranking: Bringing Order to the Web." (1999) search network\n'
]
cmds = ['pubs init -p paper_first/',
@ -305,9 +306,9 @@ class TestUsecase(DataCommandTestCase):
('pubs add', [str_fixtures.bibtex_external0]),
'pubs export Page99',
]
outs = self.execute_cmds(cmds)
self.assertEqual(endecoder.EnDecoder().decode_bibdata(outs[2]), fixtures.page_bibdata)
out_raw = outs[2].decode()
self.assertEqual(endecoder.EnDecoder().decode_bibdata(out_raw), fixtures.page_bibdata)
def test_import(self):
cmds = ['pubs init',

Loading…
Cancel
Save