Commit db150527 authored by agb80's avatar agb80

style(black): aplica black a todo el repositorio

parent cdec1f22
Pipeline #5477 failed with stage
in 2 minutes and 37 seconds
# -*- coding: utf-8 -*-
{
'name': 'Add category to taxes',
'version': '2.0.0',
'depends': [
'account',
"name": "Add category to taxes",
"version": "2.0.0",
"depends": ["account"],
"author": "OpenPyme",
"license": "AGPL-3",
"website": "http://openpyme.mx",
"category": "Generic Modules",
"data": [
"security/ir.model.access.csv",
"views/account_tax_category.xml",
"data/account_tax_category_data.xml",
],
'author': 'OpenPyme',
'license': 'AGPL-3',
'website': 'http://openpyme.mx',
'category': 'Generic Modules',
'data': [
'security/ir.model.access.csv',
'views/account_tax_category.xml',
'data/account_tax_category_data.xml',
],
'demo': [
],
'installable': True,
"demo": [],
"installable": True,
}
......@@ -8,15 +8,12 @@ from openerp import SUPERUSER_ID
@tools.migrate(use_env=True, uid=SUPERUSER_ID)
def migrate(env, installed_version):
"""Upgrade name for tables and models from account.tax.group"""
if tools.table_exists(env.cr, 'account_tax_category'):
tools.rename_tables(
env.cr, [('account_tax_category', 'account_tax_group')],
)
tools.rename_models(
env.cr, [('account.tax.category', 'account.tax.group')],
)
if tools.table_exists(env.cr, "account_tax_category"):
tools.rename_tables(env.cr, [("account_tax_category", "account_tax_group")])
tools.rename_models(env.cr, [("account.tax.category", "account.tax.group")])
tools.logged_query(
env.cr, """
env.cr,
"""
DELETE from ir_ui_view v
USING ir_model_data d
WHERE v.id=d.res_id AND d.model='ir.ui.view' AND
......
......@@ -4,27 +4,22 @@ from openerp import fields, models
class AccountTaxGroup(models.Model):
_name = 'account.tax.group'
_name = "account.tax.group"
name = fields.Char(
size=64, required=True, help='Name for this group',
)
code = fields.Char(
size=32, required=True, help='Code for this group',
)
active = fields.Boolean(
help='Indicate if this category is active', default=True,
)
name = fields.Char(size=64, required=True, help="Name for this group")
code = fields.Char(size=32, required=True, help="Code for this group")
active = fields.Boolean(help="Indicate if this category is active", default=True)
category_ids = fields.One2many(
'account.tax', 'tax_category_id',
'Group', help='Tax that belong of this category',
"account.tax",
"tax_category_id",
"Group",
help="Tax that belong of this category",
)
class AccountTax(models.Model):
_inherit = 'account.tax'
_inherit = "account.tax"
tax_category_id = fields.Many2one(
'account.tax.group',
'Tax Group', required=False, help='Group of this tax',
"account.tax.group", "Tax Group", required=False, help="Group of this tax"
)
# -*- coding: utf-8 -*-
{
'name': 'l10n_mx_facturae_cer',
'version': '1.0.1',
'author': 'OpenPyme',
'category': 'Localization/Mexico',
'website': 'http://www.openpyme.mx',
'license': 'AGPL-3',
'depends': [
'account',
"name": "l10n_mx_facturae_cer",
"version": "1.0.1",
"author": "OpenPyme",
"category": "Localization/Mexico",
"website": "http://www.openpyme.mx",
"license": "AGPL-3",
"depends": ["account"],
"demo": ["demo/l10n_mx_facturae_cer_demo.xml"],
"data": [
"security/l10n_mx_facturae_cer_security.xml",
"security/ir.model.access.csv",
"views/res_company.xml",
"views/res_company_facturae_certificate.xml",
],
'demo': [
'demo/l10n_mx_facturae_cer_demo.xml',
],
'data': [
'security/l10n_mx_facturae_cer_security.xml',
'security/ir.model.access.csv',
'views/res_company.xml',
'views/res_company_facturae_certificate.xml',
],
'installable': True,
'external_dependencies': {
'python': [
'M2Crypto',
],
'bin': [
'openssl',
],
},
"installable": True,
"external_dependencies": {"python": ["M2Crypto"], "bin": ["openssl"]},
}
......@@ -6,37 +6,40 @@ from openerp import api, fields, models
class ResCompany(models.Model):
_inherit = 'res.company'
_inherit = "res.company"
certificate_ids = fields.One2many(
'res.company.facturae.certificate',
'company_id', 'Certificates',
help='Certificates configurated in this company',
"res.company.facturae.certificate",
"company_id",
"Certificates",
help="Certificates configurated in this company",
)
invoice_out_sequence_id = fields.Many2one(
'ir.sequence', 'Invoice Out Sequence',
help='The sequence used for invoice out numbers.',
"ir.sequence",
"Invoice Out Sequence",
help="The sequence used for invoice out numbers.",
)
invoice_out_refund_sequence_id = fields.Many2one(
'ir.sequence', 'Invoice Out Refund Sequence',
help='The sequence used for invoice out refund numbers.',
"ir.sequence",
"Invoice Out Refund Sequence",
help="The sequence used for invoice out refund numbers.",
)
certificate_id = fields.Many2one(
comodel_name='res.company.facturae.certificate',
string='Certificate Current',
compute='_compute_certificate_id',
help='Serial Number of the certificate active and inside of '
'dates in this company',
comodel_name="res.company.facturae.certificate",
string="Certificate Current",
compute="_compute_certificate_id",
help="Serial Number of the certificate active and inside of "
"dates in this company",
)
@api.one
def _compute_certificate_id(self):
cert_obj = self.env['res.company.facturae.certificate']
date = time.strftime('%Y-%m-%d')
cert_obj = self.env["res.company.facturae.certificate"]
date = time.strftime("%Y-%m-%d")
domain = [
('company_id', '=', self.id),
('date_start', '<=', date),
('date_end', '>=', date),
('active', '=', True),
("company_id", "=", self.id),
("date_start", "<=", date),
("date_end", ">=", date),
("active", "=", True),
]
self.certificate_id = cert_obj.search(domain, limit=1)
......@@ -16,56 +16,54 @@ _logger = logging.getLogger(__name__)
class ResCompanyFacturaeCertificate(models.Model):
_name = 'res.company.facturae.certificate'
_name = "res.company.facturae.certificate"
_rec_name = 'serial_number'
_rec_name = "serial_number"
def _default_company_id(self):
res_company_obj = self.env['res.company']
res_company_obj = self.env["res.company"]
company = res_company_obj._company_default_get(
'res.company.facturae.certificate')
"res.company.facturae.certificate"
)
return company
company_id = fields.Many2one(
'res.company', 'Company', required=True,
"res.company",
"Company",
required=True,
default=_default_company_id,
help='Company where you add this certificate',
help="Company where you add this certificate",
)
certificate_file = fields.Binary(
filters='*.cer,*.certificate,*.cert', required=True,
help='This file .cer is proportionate by the SAT',
filters="*.cer,*.certificate,*.cert",
required=True,
help="This file .cer is proportionate by the SAT",
)
certificate_key_file = fields.Binary(
filters='*.key', required=True,
help='This file .key is proportionate by the SAT',
filters="*.key",
required=True,
help="This file .key is proportionate by the SAT",
)
certificate_password = fields.Char(
size=64, invisible=False,
required=True, help='This password is proportionate by the SAT',
size=64,
invisible=False,
required=True,
help="This password is proportionate by the SAT",
)
certificate_file_pem = fields.Binary(
filters='*.pem,*.cer,*.certificate,*.cert',
help='This file is generated with the file.cer',
filters="*.pem,*.cer,*.certificate,*.cert",
help="This file is generated with the file.cer",
)
certificate_key_file_pem = fields.Binary(
filters='*.pem,*.key',
help='This file is generated with the file.key',
)
date_start = fields.Date(
help='Date start the certificate before the SAT',
)
date_end = fields.Date(
help='Date end of validity of the certificate',
)
serial_number = fields.Char(
help='Number of serie of the certificate',
filters="*.pem,*.key", help="This file is generated with the file.key"
)
date_start = fields.Date(help="Date start the certificate before the SAT")
date_end = fields.Date(help="Date end of validity of the certificate")
serial_number = fields.Char(help="Number of serie of the certificate")
fname_xslt = fields.Char(
'File XML Parser (.xslt)', help='Folder in server with XSLT file',
)
active = fields.Boolean(
default=True, help='Indicate if this certificate is active',
"File XML Parser (.xslt)", help="Folder in server with XSLT file"
)
active = fields.Boolean(default=True, help="Indicate if this certificate is active")
def _load_certificate_and_private_key(self):
"""Verify certificate and key files to be loaded are right
......@@ -82,9 +80,11 @@ class ResCompanyFacturaeCertificate(models.Model):
except m2.X509.X509Error:
_logger.warning(m2.X509.X509Error.message)
raise UserError(
_('You are trying to upload an known format file.'
' File must be encoded as DER. '
'For further information review log'),
_(
"You are trying to upload an known format file."
" File must be encoded as DER. "
"For further information review log"
)
)
# TODO: find a way to perform next task using M2Crypto
# Transform DER private key into PEM format
......@@ -92,38 +92,40 @@ class ResCompanyFacturaeCertificate(models.Model):
# Helper file to read module from private key
file_module = tempfile.NamedTemporaryFile()
with tempfile.NamedTemporaryFile() as private_key_file_der:
private_key_file_der.write(
base64.decodestring(self.certificate_key_file),
)
private_key_file_der.write(base64.decodestring(self.certificate_key_file))
private_key_file_der.flush()
subprocess.call(
'openssl pkcs8 -inform DER -outform PEM'
' -in "%s" -passin pass:%s -out %s' % (
private_key_file_der.name, self.certificate_password,
"openssl pkcs8 -inform DER -outform PEM"
' -in "%s" -passin pass:%s -out %s'
% (
private_key_file_der.name,
self.certificate_password,
private_key_file_pem.name,
),
shell=True,
)
key_pem = private_key_file_pem.read()
cmd = (
'openssl rsa -noout -modulus -in "%s" -out %s' % (
private_key_file_pem.name, file_module.name))
cmd = 'openssl rsa -noout -modulus -in "%s" -out %s' % (
private_key_file_pem.name,
file_module.name,
)
subprocess.call(cmd, shell=True)
key_module = file_module.read().replace('Modulus=', '')
key_module = file_module.read().replace("Modulus=", "")
# Close files and destroy them
private_key_file_pem.close()
file_module.close()
# Compare modules to know if certificate and key match
if cert_module.strip() != key_module.strip():
raise UserError(
_('The certificate and private key that you are loading '
'do not match'),
_(
"The certificate and private key that you are loading "
"do not match"
)
)
# Save certificate as PEM and KEY as PEM too
self.write({
'certificate_file_pem': cer_pem,
'certificate_key_file_pem': key_pem,
})
self.write(
{"certificate_file_pem": cer_pem, "certificate_key_file_pem": key_pem}
)
return cer_der
@api.one
......@@ -134,9 +136,9 @@ class ResCompanyFacturaeCertificate(models.Model):
"""Function uses to get such values as: end certificate date,
start certificate date and serial number """
if (
not self.certificate_file and not
self.certificate_key_file and not
self.certificate_password
not self.certificate_file
and not self.certificate_key_file
and not self.certificate_password
):
return
cer_der = self._load_certificate_and_private_key()
......@@ -144,15 +146,18 @@ class ResCompanyFacturaeCertificate(models.Model):
serial_number = cer_der.get_serial_number()
# but what really they are a long type
# But they are ASCII characters
serial_number = '{0:x}'.format(serial_number).decode('hex')
serial_number = "{0:x}".format(serial_number).decode("hex")
# Time in datetime object
date_start = cer_der.get_not_before().get_datetime()
date_end = cer_der.get_not_after().get_datetime()
# Transform into string
date_start = date_start.strftime('%Y-%m-%d')
date_end = date_end.strftime('%Y-%m-%d')
date_start = date_start.strftime("%Y-%m-%d")
date_end = date_end.strftime("%Y-%m-%d")
# Write values
self.write({
'date_start': date_start, 'date_end': date_end,
'serial_number': serial_number,
})
self.write(
{
"date_start": date_start,
"date_end": date_end,
"serial_number": serial_number,
}
)
# -*- coding: utf-8 -*-
{
'name': 'CFDI',
'version': '2.1.1',
'author': 'OpenPyme',
'category': 'Localization/Mexico',
'website': 'http://www.openpyme.mx/',
'license': 'AGPL-3',
'depends': [
'email_template',
'document',
'l10n_mx_facturae_cer',
'l10n_mx_params_pac',
'mail',
'report_aeroo',
"name": "CFDI",
"version": "2.1.1",
"author": "OpenPyme",
"category": "Localization/Mexico",
"website": "http://www.openpyme.mx/",
"license": "AGPL-3",
"depends": [
"email_template",
"document",
"l10n_mx_facturae_cer",
"l10n_mx_params_pac",
"mail",
"report_aeroo",
],
'data': [
'security/ir.model.access.csv',
'views/account_journal.xml',
'views/ir_attachment_facturae.xml',
'views/ir_attachment_facturae_config.xml',
'views/res_company.xml',
"data": [
"security/ir.model.access.csv",
"views/account_journal.xml",
"views/ir_attachment_facturae.xml",
"views/ir_attachment_facturae_config.xml",
"views/res_company.xml",
],
'installable': True,
'external_dependencies': {
'python': [
'M2Crypto',
'xmlsec',
],
},
"installable": True,
"external_dependencies": {"python": ["M2Crypto", "xmlsec"]},
}
# -*- coding: utf-8 -*-
# xmldsig
DS_NS = 'http://www.w3.org/2000/09/xmldsig#'
DS_NS = "http://www.w3.org/2000/09/xmldsig#"
# xmlenc
ENC_NS = 'http://www.w3.org/2001/04/xmlenc#'
ENC_NS = "http://www.w3.org/2001/04/xmlenc#"
......@@ -2,4 +2,4 @@
def ns(namespace, tagname):
return '{%s}%s' % (namespace, tagname)
return "{%s}%s" % (namespace, tagname)
......@@ -23,18 +23,20 @@ def migrate(cr, installed_version):
results = cr.dictfetchall()
# Write in account_journal table type_cdfi
for result in results:
if 'cfdi32' in result['type_cfdi']:
if "cfdi32" in result["type_cfdi"]:
cr.execute(
"""UPDATE account_journal SET sign_sat = True
WHERE id = {journal_id}
""".format(journal_id=result['id']),
""".format(
journal_id=result["id"]
)
)
if tools.column_exists(cr, 'account_journal', 'type_cfdi'):
if tools.column_exists(cr, "account_journal", "type_cfdi"):
cr.execute(
"""UPDATE account_journal SET type_cfdi = '{type_cfdi}'
WHERE id = {journal_id}""".format(
type_cfdi=result['type_cfdi'], journal_id=result['id'],
),
type_cfdi=result["type_cfdi"], journal_id=result["id"]
)
)
cr.execute(
......
......@@ -9,23 +9,19 @@ Create Date: 2016-05-05
"""
from openupgradelib import openupgrade as tools
revision = '1.3.0'
down_revision = '1.2.0'
revision = "1.3.0"
down_revision = "1.2.0"
branch_labels = None
depends_on = None
column_renames = {
'ir_attachment_facturae_mx': [
('invoice_id', 'res_id'),
],
}
column_renames = {"ir_attachment_facturae_mx": [("invoice_id", "res_id")]}
@tools.migrate()
def migrate(cr, installed_version):
# First update
# Rename column from invoice_id to res_id
if not tools.column_exists(cr, 'ir_attachment_facturae_mx', 'res_id'):
if not tools.column_exists(cr, "ir_attachment_facturae_mx", "res_id"):
# Remove constraints on invoice_id column
tools.lift_constraints(cr, 'ir_attachment_facturae_mx', 'invoice_id')
tools.lift_constraints(cr, "ir_attachment_facturae_mx", "invoice_id")
tools.rename_columns(cr, column_renames)
......@@ -11,7 +11,7 @@ def migrate(env, installed_version):
# table, if not exist means migration is from pre 1.3.0 version and need
# a special treatment during migration
type_attachment_exists = tools.column_exists(
env.cr, 'ir_attachment_facturae_mx', 'type_attachment',
env.cr, "ir_attachment_facturae_mx", "type_attachment"
)
# Some states where deprecated on ir.attachment.facturae.mx object
# search for registers still using deprecated states and move to the
......
......@@ -9,7 +9,5 @@ from openerp.tools.parse_version import parse_version
@tools.migrate(use_env=True, uid=SUPERUSER_ID)
def migrate(env, installed_version):
# Remove constraints from field res_id from table ir.attachment.facturae.mx
if parse_version(installed_version) >= parse_version('2.0.0'):
tools.lift_constraints(
env.cr, 'ir_attachment_facturae_mx', 'res_id',
)
if parse_version(installed_version) >= parse_version("2.0.0"):
tools.lift_constraints(env.cr, "ir_attachment_facturae_mx", "res_id")
......@@ -4,10 +4,11 @@ from openerp import fields, models
class AccountJournal(models.Model):
_inherit = 'account.journal'
_inherit = "account.journal"
sign_sat = fields.Boolean(
'Sign SAT', copy=False,
help='If this field is enabled, then sign through the'
' webservice of the Mexican SAT',
"Sign SAT",
copy=False,
help="If this field is enabled, then sign through the"
" webservice of the Mexican SAT",
)
......@@ -8,12 +8,12 @@ from openerp import api, fields, models
def _sanitize(text):
# Chars not allowed on CFDI
chars = '\\`*_{}[]()>#+-.!$/' # noqa
chars = "\\`*_{}[]()>#+-.!$/" # noqa
# Replace all characters based on best performance answer as found on:
# https://stackoverflow.com/a/27086669/2817675
for c in chars:
if c in text:
text = text.replace(c, ' ')
text = text.replace(c, " ")
return text
......@@ -23,17 +23,19 @@ class BaseCfdi(models.AbstractModel):
object.
"""
_name = 'base.cfdi'
_name = "base.cfdi"
_description = __doc__
@property
def xml_name(self):
fname = ''
if self.state != 'draft':
fname = '_'.join([
self.company_id.partner_id.vat_split,
self.number or self.internal_number,
])
fname = ""
if self.state != "draft":
fname = "_".join(
[
self.company_id.partner_id.vat_split,
self.number or self.internal_number,
]
)
return fname
@property
......@@ -42,12 +44,12 @@ class BaseCfdi(models.AbstractModel):
self.ensure_one()
# Get interpolated sequence
if self.journal_id.sequence_id.prefix:
d = self.env['ir.sequence']._interpolation_dict_context()
d = self.env["ir.sequence"]._interpolation_dict_context()
serie = self.journal_id.sequence_id._interpolate(
self.journal_id.sequence_id.prefix, d,
self.journal_id.sequence_id.prefix, d
)
else:
serie = ''
serie = ""
return _sanitize(serie).strip()
@property
......@@ -55,7 +57,7 @@ class BaseCfdi(models.AbstractModel):
"""Return folio for display on CFDI"""
self.ensure_one()
folio = _sanitize(self.number or self.internal_number)
return folio.replace(self.serie, '').strip()
return folio.replace(self.serie, "").strip()
@property
def cfdi_datetime(self):
......@@ -64,34 +66,42 @@ class BaseCfdi(models.AbstractModel):
return fields.Datetime.to_string(datetime.now(tz))
cfdi_id = fields.Many2one(
'ir.attachment.facturae.mx', 'CFDI', copy=False,
help='CFDI realed to the selected record.',
"ir.attachment.facturae.mx",
"CFDI",
copy=False,
help="CFDI realed to the selected record.",
)
cfdi_state = fields.Selection(related='cfdi_id.state')
cfdi_state = fields.Selection(related="cfdi_id.state")
cfdi_relation_type = fields.Many2one(
'cfdi.relation.type', 'CFDI Relation type', copy=False,
"cfdi.relation.type", "CFDI Relation type", copy=False
)
related_cfdi_ids = fields.Many2many(
'ir.attachment.facturae.mx', 'cfdis_related_rel',
'related_cfdi_id', 'original_cfdi_id', 'Refund invoices',
readonly=True, copy=False,
help='Original CFDI to which this CFDI is referred to',
"ir.attachment.facturae.mx",
"cfdis_related_rel",
"related_cfdi_id",
"original_cfdi_id",
"Refund invoices",
readonly=True,
copy=False,
help="Original CFDI to which this CFDI is referred to",
)
@api.one
def create_cfdi(self):
"""Creates a new cfdi object related with current record"""
self.cfdi_id = self.env['ir.attachment.facturae.mx'].create({
'name': self.xml_name,
'res_id': self.id,
'type_attachment': self._name,
'company_id': self.company_id.id,
})
self.cfdi_id = self.env["ir.attachment.facturae.mx"].create(
{
"name": self.xml_name,
"res_id": self.id,
"type_attachment": self._name,
"company_id": self.company_id.id,
}
)
self.cfdi_id.action_validate()
@api.one
def cancel_cfdi(self):
"""Cancels the cfdi related with current record and delete relation"""
if self.cfdi_id and self.cfdi_id.state in ['signed', 'done']:
if self.cfdi_id and self.cfdi_id.state in ["signed", "done"]:
self.cfdi_id.action_cancel()
self.cfdi_id = False
......@@ -6,19 +6,14 @@ from openerp.tools.translate import _
class IrAttachment(models.Model):
_inherit = 'ir.attachment'
_inherit = "ir.attachment"
@api.one
def unlink(self):
ir_attachment_obj = self.env['ir.attachment.facturae.mx']
ir_attachment_obj = self.env["ir.attachment.facturae.mx"]
attachments = ir_attachment_obj.sudo().search(
[
'|', ('file_xml_sign', 'in', self._ids),
('file_pdf', 'in', self._ids),
],
["|", ("file_xml_sign", "in", self._ids), ("file_pdf", "in", self._ids)]
)
if attachments:
raise UserError(
_('You can not remove an attachment of an invoice'),
)
raise UserError(_("You can not remove an attachment of an invoice"))
return super(IrAttachment, self).unlink()
......@@ -7,39 +7,41 @@ class IrAttachmentFacturaeMx(models.Model):
"""Model used to hold configuration for different type of CFDI documents
that can be generated on the system.
"""
_name = 'ir.attachment.facturae.mx.config'
_name = "ir.attachment.facturae.mx.config"
_description = __doc__
_sql_constraints = [
(
'unique_model', 'unique (model, active, version)',
'It is no possible to define more than one active template for a '
'single model and version!',
),
"unique_model",
"unique (model, active, version)",
"It is no possible to define more than one active template for a "
"single model and version!",
)
]
model = fields.Char(
required=True, readonly=True, help='This is the object model',
)
model = fields.Char(required=True, readonly=True, help="This is the object model")
active = fields.Boolean(default=True)
version = fields.Selection(
[('3.2', '3.2'), ('3.3', '3.3')], required=True,
help='Specification Version used to create this CFDI',
[("3.2", "3.2"), ("3.3", "3.3")],
required=True,
help="Specification Version used to create this CFDI",
)
template_xml_sign = fields.Char(
'XML report', required=True, help='Name of XML template when signing',
"XML report", required=True, help="Name of XML template when signing"
)
template_pdf_sign = fields.Char(
'PDF report', required=True, help='Name of PDF template when signing',
"PDF report", required=True, help="Name of PDF template when signing"
)
email_template_id = fields.Many2one(
'email.template',
help='Name of email template to use when sending email to partner',
"email.template",
help="Name of email template to use when sending email to partner",
)
template_xml_cancel = fields.Char(
'Cancelation XML report', required=False,
help='Name of XML template when canceling',
"Cancelation XML report",
required=False,
help="Name of XML template when canceling",
)
template_pdf_cancel = fields.Char(
required=False, help='Name of PDF template when canceling',
required=False, help="Name of PDF template when canceling"
)
......@@ -4,14 +4,14 @@ from openerp import fields, models
class ResCompany(models.Model):
_inherit = 'res.company'
_inherit = "res.company"
pac_ids = fields.Many2many('params.pac', string='Pacs')
pac_ids = fields.Many2many("params.pac", string="Pacs")
class AccountConfigSettings(models.TransientModel):
_inherit = 'account.config.settings'
_inherit = "account.config.settings"
pac_ids = fields.Many2many(
'params.pac', string='Pacs', related='company_id.pac_ids',
"params.pac", string="Pacs", related="company_id.pac_ids"
)
......@@ -55,57 +55,52 @@ class CFDI(TransactionCase):
def setUp(self):
super(CFDI, self).setUp()
self.cfdi = self.registry('ir.attachment.facturae.mx')
self.cfdi = self.registry("ir.attachment.facturae.mx")
def test_xml2cad_orig(self):
"""Test original chain creation using 3.2v"""
# Get paths to necessary test
path_xml = pkg_resources.resource_filename(
'openerp.addons.l10n_mx_ir_attachment_facturae',
'tests/unit/cfdi_32_before_sign.xml',
"openerp.addons.l10n_mx_ir_attachment_facturae",
"tests/unit/cfdi_32_before_sign.xml",
)
path_xlst = pkg_resources.resource_filename(
'openerp.addons.l10n_mx_ir_attachment_facturae',
'cfd/cadenaoriginal_3_2.xslt',
"openerp.addons.l10n_mx_ir_attachment_facturae",
"cfd/cadenaoriginal_3_2.xslt",
)
with open(path_xml, 'rb') as file:
with open(path_xml, "rb") as file:
xml = file.read()
original_chain = self.cfdi._xml2cad_orig(xml, path_xlst)
self.assertEqual(
original_chain, original_chain_to_test.replace('\n', '').strip(),
'Original chain creation went wrong',
original_chain,
original_chain_to_test.replace("\n", "").strip(),
"Original chain creation went wrong",
)
def test_get_certificate_str(self):
"""Test getting certificate string"""
# Get paths to necessary test
certifcate_pem_path = pkg_resources.resource_filename(
'openerp.addons.l10n_mx_ir_attachment_facturae',
'tests/unit/certifcate.pem',
"openerp.addons.l10n_mx_ir_attachment_facturae", "tests/unit/certifcate.pem"
)
with open(certifcate_pem_path, 'rb') as file:
with open(certifcate_pem_path, "rb") as file:
certifcate_pem = file.read()
certificate_string = self.cfdi._get_certificate_str(certifcate_pem)
self.assertEqual(
certificate_string,
certificate_string_to_test.replace('\n', '').strip(),
'Certificate string went wrong',
certificate_string_to_test.replace("\n", "").strip(),
"Certificate string went wrong",
)
def test_get_sello_sha1(self):
"""Test getting sello using sha1"""
# Get paths to necessary test
key_pem_path = pkg_resources.resource_filename(
'openerp.addons.l10n_mx_ir_attachment_facturae',
'tests/unit/key.pem',
"openerp.addons.l10n_mx_ir_attachment_facturae", "tests/unit/key.pem"
)
with open(key_pem_path, 'rb') as file:
with open(key_pem_path, "rb") as file:
key_pem = file.read()
sello = self.cfdi._get_sello(
key_pem, original_chain_to_test, 'sha1',
)
sello = self.cfdi._get_sello(key_pem, original_chain_to_test, "sha1")
self.assertEqual(
sello,
sello_to_test.replace('\n', '').strip(),
'Sello went wrong',
sello, sello_to_test.replace("\n", "").strip(), "Sello went wrong"
)
......@@ -56,57 +56,52 @@ class CFDI(TransactionCase):
def setUp(self):
super(CFDI, self).setUp()
self.cfdi = self.registry('ir.attachment.facturae.mx')
self.cfdi = self.registry("ir.attachment.facturae.mx")
def test_xml2cad_orig(self):
"""Test original chain creation using 3.2v"""
# Get paths to necessary test
path_xml = pkg_resources.resource_filename(
'openerp.addons.l10n_mx_ir_attachment_facturae',
'tests/unit/cfdi_33_before_sign.xml',
"openerp.addons.l10n_mx_ir_attachment_facturae",
"tests/unit/cfdi_33_before_sign.xml",
)
path_xlst = pkg_resources.resource_filename(
'openerp.addons.l10n_mx_ir_attachment_facturae',
'cfd/cadenaoriginal_3_3.xslt',
"openerp.addons.l10n_mx_ir_attachment_facturae",
"cfd/cadenaoriginal_3_3.xslt",
)
with open(path_xml, 'rb') as file:
with open(path_xml, "rb") as file:
xml = file.read()
original_chain = self.cfdi._xml2cad_orig(xml, path_xlst)
self.assertEqual(
original_chain, original_chain_to_test.replace('\n', '').strip(),
'Original chain creation went wrong',
original_chain,
original_chain_to_test.replace("\n", "").strip(),
"Original chain creation went wrong",
)
def test_get_certificate_str(self):
"""Test getting certificate string"""
# Get paths to necessary test
certifcate_pem_path = pkg_resources.resource_filename(
'openerp.addons.l10n_mx_ir_attachment_facturae',
'tests/unit/certifcate.pem',
"openerp.addons.l10n_mx_ir_attachment_facturae", "tests/unit/certifcate.pem"
)
with open(certifcate_pem_path, 'rb') as file:
with open(certifcate_pem_path, "rb") as file:
certifcate_pem = file.read()
certificate_string = self.cfdi._get_certificate_str(certifcate_pem)
self.assertEqual(
certificate_string,
certificate_string_to_test.replace('\n', '').strip(),
'Certificate string went wrong',
certificate_string_to_test.replace("\n", "").strip(),
"Certificate string went wrong",
)
def test_get_sello_sha256(self):
"""Test getting sello using sha1"""
# Get paths to necessary test
key_pem_path = pkg_resources.resource_filename(
'openerp.addons.l10n_mx_ir_attachment_facturae',
'tests/unit/key.pem',
"openerp.addons.l10n_mx_ir_attachment_facturae", "tests/unit/key.pem"
)
with open(key_pem_path, 'rb') as file:
with open(key_pem_path, "rb") as file:
key_pem = file.read()
sello = self.cfdi._get_sello(
key_pem, original_chain_to_test, 'sha256',
)
sello = self.cfdi._get_sello(key_pem, original_chain_to_test, "sha256")
self.assertEqual(
sello,
sello_to_test.replace('\n', '').strip(),
'Sello went wrong',
sello, sello_to_test.replace("\n", "").strip(), "Sello went wrong"
)
# -*- coding: utf-8 -*-
{
'name': 'PAC Params for CFDI',
'version': '1.0.0',
'author': 'OpenPyme',
'category': 'Localization/Mexico',
'website': 'http://www.openpyme.mx',
'license': 'AGPL-3',
'depends': [
'base',
'l10n_mx',
"name": "PAC Params for CFDI",
"version": "1.0.0",
"author": "OpenPyme",
"category": "Localization/Mexico",
"website": "http://www.openpyme.mx",
"license": "AGPL-3",
"depends": ["base", "l10n_mx"],
"demo": ["demo/params_pac.xml"],
"data": [
"security/ir.model.access.csv",
"security/params_pac_security.xml",
"views/params_pac_view.xml",
],
'demo': [
'demo/params_pac.xml',
],
'data': [
'security/ir.model.access.csv',
'security/params_pac_security.xml',
'views/params_pac_view.xml',
],
'installable': True,
'external_dependencies': {
'python': [
'SOAPpy',
'suds',
'requests',
'xmltodict',
],
},
"installable": True,
"external_dependencies": {"python": ["SOAPpy", "suds", "requests", "xmltodict"]},
}
......@@ -2,19 +2,43 @@
# List use to know if error must be handled with ValidationError exception
validation_error_list = [
'301', '302', '303', '304', '305', '306',
'308', '401', '402', '403', '611', '612',
"301",
"302",
"303",
"304",
"305",
"306",
"308",
"401",
"402",
"403",
"611",
"612",
]
# List use to know if error must be handled with CancelError exception
cancel_error_list = [
'611', '621', '1701', '1702', '1703', '1704', '1710',
'1711', '1712', '1713', '1803', '203', '204', '205', '402',
"611",
"621",
"1701",
"1702",
"1703",
"1704",
"1710",
"1711",
"1712",
"1713",
"1803",
"203",
"204",
"205",
"402",
]
class PacError(Exception):
"""General class for errors define in this module"""
pass
......@@ -27,18 +51,18 @@ class PrimarySectorError(PacError):
312 - Error al consultar al SAT
"""
def __init__(self, code, message=''):
def __init__(self, code, message=""):
self.code = code
self.message = message
super(PrimarySectorError, self).__init__()
def __repr__(self):
if self.code == '311':
message = 'No autorizado'
elif self.code == '312':
message = 'Error al consultar al SAT'
message = 'Error code: {code} \n {message}'.format(
code=self.code, message=message,
if self.code == "311":
message = "No autorizado"
elif self.code == "312":
message = "Error al consultar al SAT"
message = "Error code: {code} \n {message}".format(
code=self.code, message=message
)
return message
......@@ -71,8 +95,8 @@ class ValidationError(PrimarySectorError):
"""
def __repr__(self):
message = 'Error code: {code} \n {message}'.format(
code=self.code, message=self.message.encode('UTF-8'),
message = "Error code: {code} \n {message}".format(
code=self.code, message=self.message.encode("UTF-8")
)
return message
......@@ -104,4 +128,5 @@ class CancelError(ValidationError):
205 - El UUID no existe o no ha sido procesado por el SAT.
402 - El Contribuyente no se encuentra el la LCO o la validez de obligaciones se reporta como negativa.
"""
pass
......@@ -13,8 +13,8 @@ logger = logging.getLogger(__name__)
class ParamsPac(models.Model):
_name = 'params.pac'
_order = 'sequence ASC'
_name = "params.pac"
_order = "sequence ASC"
def get_driver_fc_sign(self):
"""function to inherit from custom PAC module"""
......@@ -30,52 +30,51 @@ class ParamsPac(models.Model):
types = []
return types
name = fields.Char(
size=128, required=True, help='Name for this PAC',
)
user = fields.Char(
size=128, help='Name user for login to PAC',
)
password = fields.Char(
size=128, help='Password user for login to PAC',
)
name = fields.Char(size=128, required=True, help="Name for this PAC")
user = fields.Char(size=128, help="Name user for login to PAC")
password = fields.Char(size=128, help="Password user for login to PAC")
pac_type = fields.Selection(
_get_pac_type,
'PAC type', type='char', size=64, required=True,
_get_pac_type, "PAC type", type="char", size=64, required=True
)
company_id = fields.Many2one(
'res.company', 'Company', required=True,
"res.company",
"Company",
required=True,
default=lambda self: self.env.user.company_id,
help='Company where this PAC will be used',
)
active = fields.Boolean(
default=1, help='Indicate if this PAC is active',
help="Company where this PAC will be used",
)
active = fields.Boolean(default=1, help="Indicate if this PAC is active")
sequence = fields.Integer(
default=10,
help='If have more than one PAC is active, the one with lower number'
'in sequence will be used first.',
help="If have more than one PAC is active, the one with lower number"
"in sequence will be used first.",
)
certificate_link = fields.Char(
size=256,
help='PAC public certificate used to validate authenticity for a given'
'XML file.',
help="PAC public certificate used to validate authenticity for a given"
"XML file.",
)
url_webservice_sign = fields.Char(
'URL WebService', size=256, required=True,
help='URL of WebService used for send to sign the XML to PAC',
"URL WebService",
size=256,
required=True,
help="URL of WebService used for send to sign the XML to PAC",
)
namespace_sign = fields.Char(
'NameSpace', size=256,
help='NameSpace of XML of the page of WebService of the PAC',
"NameSpace",
size=256,
help="NameSpace of XML of the page of WebService of the PAC",
)
url_webservice_cancel = fields.Char(
'URL WebService', size=256, required=True,
help='URL of WebService used for send to sign the XML to PAC',
"URL WebService",
size=256,
required=True,
help="URL of WebService used for send to sign the XML to PAC",
)
namespace_cancel = fields.Char(
'NameSpace', size=256,
help='NameSpace of XML of the page of WebService of the PAC',
"NameSpace",
size=256,
help="NameSpace of XML of the page of WebService of the PAC",
)
@api.multi
......@@ -99,7 +98,7 @@ class ParamsPac(models.Model):
# the priority selected by the user
# If all PACs would fail this variable will recollect
# all error in order to display them at the end
message_error = ''
message_error = ""
for param_pac in self:
# Get all possible drivers to perform action
function = getattr(param_pac, action)()
......@@ -108,24 +107,15 @@ class ParamsPac(models.Model):
# This kind of exception means that
# nevertheless you try with all PACs in the world
# CFDI would be failed
except (
exceptions.ValidationError, exceptions.PrimarySectorError
) as error:
raise UserError(
_(repr(error)),
)
except (exceptions.ValidationError, exceptions.PrimarySectorError) as error:
raise UserError(_(repr(error)))
# TODO: We need to catch errors like timeout and similar
# because the idea is when a PAC fails try next
# pylint: disable=broad-except
except Exception as error:
# If something went wrong try with next PAC
message_error = '\n'.join(
[message_error, param_pac.name, repr(error)],
)
logger.info(
param_pac.name.encode('UTF-8'),
repr(error).encode('UTF-8'),
)
message_error = "\n".join([message_error, param_pac.name, repr(error)])
logger.info(param_pac.name.encode("UTF-8"), repr(error).encode("UTF-8"))
continue
# api.one decorator always returns a list
# and some get_driver_fc_sign functions are decorated with api.one
......@@ -138,9 +128,7 @@ class ParamsPac(models.Model):
except UnboundLocalError:
# If result value does not exist
# it is because all PACs failed
raise UserError(
_(message_error),
)
raise UserError(_(message_error))
@api.multi
def sign_cfdi(self, data):
......@@ -155,11 +143,13 @@ class ParamsPac(models.Model):
"""
if not len(self):
raise ValidationError(
_('No PAC have being selected.\nGo to Configuration '
'>> Accounting >> Electronic invoicing (MX) and select '
'a PAC.'),
_(
"No PAC have being selected.\nGo to Configuration "
">> Accounting >> Electronic invoicing (MX) and select "
"a PAC."
)
)
result = self.fire_connection(data, 'get_driver_fc_sign')
result = self.fire_connection(data, "get_driver_fc_sign")
return result
@api.multi
......@@ -173,5 +163,5 @@ class ParamsPac(models.Model):
@param data (string): data to sign in base64 encoding
@return: Dictionary
"""
result = self.fire_connection(uuid, 'get_driver_fc_cancel')
result = self.fire_connection(uuid, "get_driver_fc_cancel")
return result
......@@ -25,9 +25,7 @@ _logger = logging.getLogger(__name__)
TIMEOUT = 60
_ACTIONS = {
'timbrar': 'timbrar',
}
_ACTIONS = {"timbrar": "timbrar"}
_SOAPENV = """<?xml version="1.0" encoding="UTF-8"?>
<soapenv:Envelope
......@@ -46,20 +44,21 @@ _SOAPENV = """<?xml version="1.0" encoding="UTF-8"?>
class ParamsPac(models.Model):
_inherit = 'params.pac'
_inherit = "params.pac"
@api.model
def _get_pac_type(self):
types = super(ParamsPac, self)._get_pac_type()
types.extend([
('pac_facturalo', _('PAC Expide tu factura')),
])
types.extend([("pac_facturalo", _("PAC Expide tu factura"))])
return types
pac_type = fields.Selection(
_get_pac_type,
'Process to perform', type='char', size=64, required=True,
help='Type of process to configure in this pac',
"Process to perform",
type="char",
size=64,
required=True,
help="Type of process to configure in this pac",
)
def get_driver_fc_sign(self):
......@@ -69,7 +68,7 @@ class ParamsPac(models.Model):
factura_mx_type__fc = super(ParamsPac, self).get_driver_fc_sign()
if factura_mx_type__fc is None:
factura_mx_type__fc = {}
factura_mx_type__fc.update({'pac_facturalo': self.sign_file})
factura_mx_type__fc.update({"pac_facturalo": self.sign_file})
return factura_mx_type__fc
def get_driver_fc_cancel(self):
......@@ -79,7 +78,7 @@ class ParamsPac(models.Model):
factura_mx_type__fc = super(ParamsPac, self).get_driver_fc_cancel()
if factura_mx_type__fc is None:
factura_mx_type__fc = {}
factura_mx_type__fc.update({'pac_facturalo': self.cancel_file})
factura_mx_type__fc.update({"pac_facturalo": self.cancel_file})
return factura_mx_type__fc
@api.one
......@@ -88,23 +87,17 @@ class ParamsPac(models.Model):
@params
fdata : File.xml base64 codificated
"""
method = 'timbrar'
msg = ''
if self.user == 'pruebas':
msg += _('WARNING, SIGNED IN TEST!!!!\n\n')
method = "timbrar"
msg = ""
if self.user == "pruebas":
msg += _("WARNING, SIGNED IN TEST!!!!\n\n")
data = _SOAPENV % (
self.namespace_sign, self.user, self.password, fdata)
data = _SOAPENV % (self.namespace_sign, self.user, self.password, fdata)
_logger.debug(data)
headers = {
'SOAPAction': '%s' % (_ACTIONS[method]),
'Content-Type': 'text/xml',
}
headers = {"SOAPAction": "%s" % (_ACTIONS[method]), "Content-Type": "text/xml"}
s = Session()
req = Request(
'POST', self.url_webservice_sign, data=data, headers=headers,
)
req = Request("POST", self.url_webservice_sign, data=data, headers=headers)
prepped = req.prepare()
try:
# Timbrar factura
......@@ -116,49 +109,45 @@ class ParamsPac(models.Model):
raise
# Procesar los resultados obtenidos del PAC
try:
resultado = res['soapenv:Envelope']['soapenv:Body']['ser:timbrarResponse']['return'] # noqa
resultado = res["soapenv:Envelope"]["soapenv:Body"]["ser:timbrarResponse"][
"return"
] # noqa
except KeyError:
resultado = res['soap:Envelope']['soap:Body']['ns2:timbrarResponse']['return'] # noqa
resultado = res["soap:Envelope"]["soap:Body"]["ns2:timbrarResponse"][
"return"
] # noqa
except Exception:
raise UserError(
_('Unknow answer.\n%s') % resultado,
)
raise UserError(_("Unknow answer.\n%s") % resultado)
mensaje = resultado['mensaje']
valid_codes = ['200', '504']
if resultado['codigo'] in valid_codes:
mensaje = resultado["mensaje"]
valid_codes = ["200", "504"]
if resultado["codigo"] in valid_codes:
# Proceso satisfactorio
timbre = resultado['timbre']
uuid = resultado.get('uuid', '')
timbre = resultado["timbre"]
uuid = resultado.get("uuid", "")
if not uuid:
regex = '<cfdi:Complemento>(.*?)</cfdi:Complemento>'
regex = "<cfdi:Complemento>(.*?)</cfdi:Complemento>"
# Obtiene el folio de la factura
try:
timbref = re.search(regex, resultado['timbre']).group(1)
timbref = re.search(regex, resultado["timbre"]).group(1)
uuid = re.search('UUID="(.+?)" ', timbref).group(1)
except: # noqa
uuid = ''
uuid = ""
# Get back XML information to file before store it
timbre = '<?xml version="1.0" encoding="UTF-8"?>\n' + timbre
# Codifica el XML para almacenarlo
f = base64.encodestring(timbre.encode('UTF-8') or '')
msg += string.join([mensaje, '. Folio Fiscal ', uuid, '.'])
return {
'file': f,
'msg': msg,
'cfdi_xml': timbre,
}
elif resultado['codigo'] in pac_exceptions.validation_error_list:
f = base64.encodestring(timbre.encode("UTF-8") or "")
msg += string.join([mensaje, ". Folio Fiscal ", uuid, "."])
return {"file": f, "msg": msg, "cfdi_xml": timbre}
elif resultado["codigo"] in pac_exceptions.validation_error_list:
# El CFDI no ha sido timbrado
raise pac_exceptions.ValidationError(
code=resultado['codigo'], message=mensaje,
code=resultado["codigo"], message=mensaje
)
else:
raise Exception(
' '.join(['Code', resultado['codigo'], mensaje]),
)
raise Exception(" ".join(["Code", resultado["codigo"], mensaje]))
@api.model
def cancel_file(self, uuid):
......@@ -167,27 +156,25 @@ class ParamsPac(models.Model):
@params
id:s Id from invoice to validate
"""
msg = ''
msg = ""
xml_data = self.cancel_xml(uuid)
# Send file to PAC for cancelation
if self.user == 'pruebas':
msg += _('WARNING, CANCEL IN TEST!!!!\n\n')
if self.user == "pruebas":
msg += _("WARNING, CANCEL IN TEST!!!!\n\n")
try:
from SOAPpy import Types
wsdl = WSDL.SOAPProxy(
self.url_webservice_cancel, self.namespace_cancel,
)
wsdl = WSDL.SOAPProxy(self.url_webservice_cancel, self.namespace_cancel)
# Webservice configuration
wsdl.soapproxy.config.dumpSOAPOut = 0
wsdl.soapproxy.config.dumpSOAPIn = 0
wsdl.soapproxy.config.debug = 0
wsdl.config.preventescape = 1
wsdl.soapproxy.config.dict_encoding = 'UTF-8'
wsdl.soapproxy.config.dict_encoding = "UTF-8"
# SUBIR factura al PAC
resultado = wsdl.cancelar(
......@@ -196,30 +183,25 @@ class ParamsPac(models.Model):
xmlBytes=Types.base64BinaryType(xml_data),
)
except Exception, e:
except Exception as e:
# Error al establecer comunicación con el PAC
raise UserError(
_('PAC communication failed. \n%s') % str(e),
)
raise UserError(_("PAC communication failed. \n%s") % str(e))
# Procesar los resultados obtenidos del PAC
if resultado['codEstatus'] in pac_exceptions.cancel_error_list:
if resultado["codEstatus"] in pac_exceptions.cancel_error_list:
raise pac_exceptions.CancelError(
code=resultado['codEstatus'], message=resultado['codMensaje'],
code=resultado["codEstatus"], message=resultado["codMensaje"]
)
elif resultado['codEstatus'] not in '200':
elif resultado["codEstatus"] not in "200":
raise Exception(
' '.join(
['Code', resultado['codEstatus'], resultado['codMensaje']],
),
" ".join(["Code", resultado["codEstatus"], resultado["codMensaje"]])
)
# If code result is 200 then all was okay
else:
return True
def binary2file(
self, cr, uid, ids, binary_data=False, file_prefix='',
file_suffix='',
self, cr, uid, ids, binary_data=False, file_prefix="", file_suffix=""
):
"""
@param binary_data : Field binary with the information of certificate
......@@ -230,8 +212,9 @@ class ParamsPac(models.Model):
this function
"""
import os
(fileno, fname) = tempfile.mkstemp(file_suffix, file_prefix)
f = open(fname, 'wb')
f = open(fname, "wb")
f.write(binary_data)
f.close()
os.close(fileno)
......@@ -251,39 +234,38 @@ class ParamsPac(models.Model):
cert = self.company_id.certificate_id
# Get certificate files for sign the xml
fname_cer_pem = fname_key_pem = False
name = 'openerp_' + (cert.serial_number or '') + '__certificate__'
name = "openerp_" + (cert.serial_number or "") + "__certificate__"
if cert:
try:
fname_cer_pem = self.binary2file(
cert.certificate_file_pem, name, '.cer.pem',
cert.certificate_file_pem, name, ".cer.pem"
)
except: # noqa
raise UserError(
_('Not captured a CERTIFICATE file in the company!'),
)
raise UserError(_("Not captured a CERTIFICATE file in the company!"))
try:
fname_key_pem = self.binary2file(
cert.certificate_key_file_pem, name, '.key.pem',
cert.certificate_key_file_pem, name, ".key.pem"
)
except: # noqa
raise UserError(
_('Not captured a KEY file in format PEM, in the '
'company!'),
_("Not captured a KEY file in format PEM, in the " "company!")
)
else:
raise UserError(
_('There is no VALID certificate for company: {company}!\n'
'It is possible that the CERTIFICATE is already expired.\n'
'Go to Configuration >> Companies >> Companies to verify '
'it.').format(company=self.company_id.name),
_(
"There is no VALID certificate for company: {company}!\n"
"It is possible that the CERTIFICATE is already expired.\n"
"Go to Configuration >> Companies >> Companies to verify "
"it."
).format(company=self.company_id.name)
)
signed_xml = pyxmldsig.sign_file(
template_file=new_file.name,
cert_file=fname_cer_pem.encode('ascii', 'ignore'),
key_file=fname_key_pem.encode('ascii', 'ignore'),
password=cert.certificate_password.encode('ascii', 'ignore'),
cert_file=fname_cer_pem.encode("ascii", "ignore"),
key_file=fname_key_pem.encode("ascii", "ignore"),
password=cert.certificate_password.encode("ascii", "ignore"),
)
_logger.debug('Signed file %s', signed_xml)
_logger.debug("Signed file %s", signed_xml)
return signed_xml # data_dict
@api.model
......@@ -298,55 +280,46 @@ class ParamsPac(models.Model):
# get localized dates
date = datetime.now(utc).astimezone(tz)
xmlns = 'http://cancelacfd.sat.gob.mx'
xsd = 'http://www.w3.org/2001/XMLSchema'
xsi = 'http://www.w3.org/2001/XMLSchema-instance'
signed_xmlns = 'http://www.w3.org/2000/09/xmldsig#'
xmlns = "http://cancelacfd.sat.gob.mx"
xsd = "http://www.w3.org/2001/XMLSchema"
xsi = "http://www.w3.org/2001/XMLSchema-instance"
signed_xmlns = "http://www.w3.org/2000/09/xmldsig#"
nsmap = {None: xmlns, 'xsd': xsd, 'xsi': xsi}
nsmap = {None: xmlns, "xsd": xsd, "xsi": xsi}
signed_nsmap = {None: signed_xmlns, 'xsd': xsd, 'xsi': xsi}
signed_nsmap = {None: signed_xmlns, "xsd": xsd, "xsi": xsi}
# section: Cancelation
# create XML
cancelacion = etree.Element('Cancelacion', nsmap=nsmap)
cancelacion.set('Fecha', date.strftime('%Y-%m-%dT%H:%M:%S'))
cancelacion.set(
'RfcEmisor', self.company_id.partner_id.vat_split,
)
cancelacion = etree.Element("Cancelacion", nsmap=nsmap)
cancelacion.set("Fecha", date.strftime("%Y-%m-%dT%H:%M:%S"))
cancelacion.set("RfcEmisor", self.company_id.partner_id.vat_split)
folios = etree.Element('Folios')
uuid = etree.Element('UUID')
folios = etree.Element("Folios")
uuid = etree.Element("UUID")
uuid.text = uuid_to_cancel
folios.append(uuid)
cancelacion.append(folios)
signature = etree.Element('Signature', nsmap={None: signed_xmlns})
signedinfo = etree.Element('SignedInfo', nsmap=signed_nsmap)
signature = etree.Element("Signature", nsmap={None: signed_xmlns})
signedinfo = etree.Element("SignedInfo", nsmap=signed_nsmap)
canonicalizacion = etree.Element('CanonicalizationMethod')
canonicalizacion = etree.Element("CanonicalizationMethod")
canonicalizacion.set(
'Algorithm',
'http://www.w3.org/TR/2001/REC-xml-c14n-20010315',
)
signaturemethod = etree.Element('SignatureMethod')
signaturemethod.set(
'Algorithm', 'http://www.w3.org/2000/09/xmldsig#rsa-sha1',
"Algorithm", "http://www.w3.org/TR/2001/REC-xml-c14n-20010315"
)
reference = etree.Element('Reference')
reference.set('URI', '')
transforms = etree.Element('Transforms')
transform = etree.Element('Transform')
signaturemethod = etree.Element("SignatureMethod")
signaturemethod.set("Algorithm", "http://www.w3.org/2000/09/xmldsig#rsa-sha1")
reference = etree.Element("Reference")
reference.set("URI", "")
transforms = etree.Element("Transforms")
transform = etree.Element("Transform")
transform.set(
'Algorithm',
'http://www.w3.org/2000/09/xmldsig#enveloped-signature',
)
digestme = etree.Element('DigestMethod')
digestme.set(
'Algorithm',
'http://www.w3.org/2000/09/xmldsig#sha1',
"Algorithm", "http://www.w3.org/2000/09/xmldsig#enveloped-signature"
)
digestva = etree.Element('DigestValue')
digestva.text = 'TEMPLATE'
digestme = etree.Element("DigestMethod")
digestme.set("Algorithm", "http://www.w3.org/2000/09/xmldsig#sha1")
digestva = etree.Element("DigestValue")
digestva.text = "TEMPLATE"
signedinfo.append(canonicalizacion)
signedinfo.append(signaturemethod)
......@@ -358,19 +331,19 @@ class ParamsPac(models.Model):
reference.append(digestva)
signature.append(signedinfo)
signaturevalue = etree.Element('SignatureValue')
signaturevalue.text = 'TEMPLATE'
signaturevalue = etree.Element("SignatureValue")
signaturevalue.text = "TEMPLATE"
signature.append(signaturevalue)
keyinfo = etree.Element('KeyInfo')
keyinfo = etree.Element("KeyInfo")
# Issuername & Serialnumber not added because library
# auto fill the fields on X509IssuerSerial
# see www.aleksey.com/pipermail/xmlsec/2007/008125.html
x509 = etree.Element('X509Data')
xiserial = etree.Element('X509IssuerSerial')
xiserial.text = ''
xcertificate = etree.Element('X509Certificate')
xcertificate.text = ''
x509 = etree.Element("X509Data")
xiserial = etree.Element("X509IssuerSerial")
xiserial.text = ""
xcertificate = etree.Element("X509Certificate")
xcertificate.text = ""
x509.append(xiserial)
x509.append(xcertificate)
......@@ -379,5 +352,5 @@ class ParamsPac(models.Model):
cancelacion.append(signature)
# pretty string
_logger.debug('XML file %s', etree.tostring(cancelacion))
_logger.debug("XML file %s", etree.tostring(cancelacion))
return etree.tostring(cancelacion)
# -*- coding: utf-8 -*-
import logging
from suds.client import Client
from suds.client import (
WebFault,
BuildError,
Client,
MethodNotFound,
PortNotFound,
ServiceNotFound,
TypeNotFound,
BuildError,
SoapHeadersNotPermitted,
) # noqa
TypeNotFound,
WebFault,
)
from openerp import api, fields, models
from openerp.exceptions import Warning as UserError
......@@ -63,7 +63,7 @@ class ParamsPac(models.Model):
factura_mx_type__fc = super(ParamsPac, self).get_driver_fc_sign()
if factura_mx_type__fc is None:
factura_mx_type__fc = {}
factura_mx_type__fc.update({'pac_finkok': self.sign_cfdi_finkok})
factura_mx_type__fc.update({"pac_finkok": self.sign_cfdi_finkok})
return factura_mx_type__fc
def get_driver_fc_cancel(self):
......@@ -73,7 +73,7 @@ class ParamsPac(models.Model):
factura_mx_type__fc = super(ParamsPac, self).get_driver_fc_cancel()
if factura_mx_type__fc is None:
factura_mx_type__fc = {}
factura_mx_type__fc.update({'pac_finkok': self.cancel_cfdi_finkok})
factura_mx_type__fc.update({"pac_finkok": self.cancel_cfdi_finkok})
return factura_mx_type__fc
def sign_cfdi_finkok(self, fdata):
......@@ -182,4 +182,3 @@ class ParamsPac(models.Model):
raise pac_exceptions.CancelError(
result_status, _cancel_errors.get(result_status, "")
)
......@@ -4,10 +4,12 @@
import base64
import logging
# Librerías
import os
import string
#caracteres de escape
# caracteres de escape
from xml.sax.saxutils import escape
from suds.client import Client
......@@ -20,22 +22,22 @@ from openerp.tools.translate import _
logger = logging.getLogger(__name__)
class params_pac(osv.Model):
_inherit = 'params.pac'
_inherit = "params.pac"
@api.model
def _get_pac_type(self):
types = super(params_pac, self)._get_pac_type()
types.extend([
('cfdi32_4g', _('CFDI 3.2 4G Factor')),
])
types.extend([("cfdi32_4g", _("CFDI 3.2 4G Factor"))])
return types
pac_type = fields.Selection(
_get_pac_type,
'Process to perform', type='char', size=64, required=True,
help='Type of process to configure in this pac'
"Process to perform",
type="char",
size=64,
required=True,
help="Type of process to configure in this pac",
)
def get_driver_fc_sign(self):
......@@ -45,7 +47,7 @@ class params_pac(osv.Model):
factura_mx_type__fc = super(params_pac, self).get_driver_fc_sign()
if factura_mx_type__fc == None:
factura_mx_type__fc = {}
factura_mx_type__fc.update({'cfdi32_4g': self._upload_ws_file_4g})
factura_mx_type__fc.update({"cfdi32_4g": self._upload_ws_file_4g})
return factura_mx_type__fc
def get_driver_fc_cancel(self):
......@@ -55,17 +57,21 @@ class params_pac(osv.Model):
factura_mx_type__fc = super(params_pac, self).get_driver_fc_cancel()
if factura_mx_type__fc == None:
factura_mx_type__fc = {}
factura_mx_type__fc.update({'cfdi32_4g': self._cancel_ws_file_4g})
factura_mx_type__fc.update({"cfdi32_4g": self._cancel_ws_file_4g})
return factura_mx_type__fc
def _get_ws(self, wsparam):
WS = ''
WS = ""
# Search char "," for addons_path, now is multi-path
all_paths = tools.config["addons_path"].split(",")
for my_path in all_paths:
if os.path.isdir(os.path.join(my_path, 'l10n_mx_facturae_pac_4g', 'ws')):
if os.path.isdir(os.path.join(my_path, "l10n_mx_facturae_pac_4g", "ws")):
# If dir is in path, save it on real_path
WS = my_path and os.path.join(my_path, 'l10n_mx_facturae_pac_4g', 'ws', wsparam) or ''
WS = (
my_path
and os.path.join(my_path, "l10n_mx_facturae_pac_4g", "ws", wsparam)
or ""
)
WS = "file:///" + WS
return WS
......@@ -77,46 +83,50 @@ class params_pac(osv.Model):
"""
if context is None:
context = {}
pac_params_obj = self.pool.get('params.pac')
pac_params_obj = self.pool.get("params.pac")
for attachment in self.browse(cr, uid, ids, context=context):
invoice = attachment.invoice_id
f = False
timbre = False
msg = ''
msg = ""
pac_ids = pac_params_obj.search(
cr, uid,
[('method_type', '=', 'pac_4g_firmar'),
('company_id', '=', invoice.company_emitter_id.id),
('active', '=', True)
],
limit=1, context=context
cr,
uid,
[
("method_type", "=", "pac_4g_firmar"),
("company_id", "=", invoice.company_emitter_id.id),
("active", "=", True),
],
limit=1,
context=context,
)
if not pac_ids:
raise osv.except_osv(
_('Warning'),
_('Not found information from web services of PAC'
'Verify that the configuration of PAC is correct')
_("Warning"),
_(
"Not found information from web services of PAC"
"Verify that the configuration of PAC is correct"
),
)
# Get pac params for use on this invoice
pac = pac_params_obj.browse(cr, uid, pac_ids, context)[0]
if pac.user == 'VE223.test_ws':
msg += _('WARNING, SIGNED IN TEST 4G FACTOR!!!!\n\n')
if pac.user == "VE223.test_ws":
msg += _("WARNING, SIGNED IN TEST 4G FACTOR!!!!\n\n")
try:
WSDL = self._get_ws(pac.url_webservice)
client = Client(WSDL)
# Crea sesión
try:
session = client.service.openSession(pac.user, pac.password)
except Exception, e:
except Exception as e:
# Error al establecer comunicación con el PAC
logger.debug(str(e))
raise osv.except_osv(
_('Error'),
_(u"PAC communication failed (openSession). \n%s") %
str(e)
_("Error"),
_(u"PAC communication failed (openSession). \n%s") % str(e),
)
# Obtén token de sesión
if session.ok:
......@@ -126,38 +136,44 @@ class params_pac(osv.Model):
cfd.token = session.token
# Asigna XML
fdatadecode = base64.b64decode(fdata)
fdatadecode = fdatadecode.decode('UTF-8')
fdatadecode = fdatadecode.decode("UTF-8")
fdatadecode = escape(fdatadecode)
fdatadecode = fdatadecode.replace("'",'&#39;')
fdatadecode = fdatadecode.replace("'", "&#39;")
cfd.xml = fdatadecode
# Timbra CFD
cfdi = None
try:
cfdi = client.service.createCFDI(cfd)
except Exception, e:
raise osv.except_osv(_('Error'), _(u"Create CFDI communication failed. \n%s") % str(e))
except Exception as e:
raise osv.except_osv(
_("Error"),
_(u"Create CFDI communication failed. \n%s") % str(e),
)
# Verifica timbrado
if cfdi.ok:
# Codifica el XML para almacenarlo
f = base64.encodestring(cfdi.xml.encode('UTF-8') or '')
f = base64.encodestring(cfdi.xml.encode("UTF-8") or "")
timbre = cfdi.xml
msg += string.join(
[_('successful process'),
". Folio Fiscal ", cfdi.uuid, "."]
[_("successful process"), ". Folio Fiscal ", cfdi.uuid, "."]
)
# Cierra token
client.service.closeSession(cfd.token)
else:
# Error al generar CFDI
raise osv.except_osv(_('Error'), _(u"could not stamp CFDI, code: %d") % cfdi.errorCode)
raise osv.except_osv(
_("Error"),
_(u"could not stamp CFDI, code: %d") % cfdi.errorCode,
)
else:
raise osv.except_osv(_('Error'),
raise osv.except_osv(
_("Error"),
_("Error, could not create a session, code: %d")
% str(session.errorCode)
% str(session.errorCode),
)
except Exception, err:
except Exception as err:
raise err
return {'file': f, 'msg': msg, 'cfdi_xml': timbre}
return {"file": f, "msg": msg, "cfdi_xml": timbre}
def _cancel_ws_file_4g(self, cr, uid, ids, context=None):
"""
......@@ -166,35 +182,42 @@ class params_pac(osv.Model):
id:s Id from invoice to validate
"""
import time
if context is None:
context = {}
pac_params_obj = self.pool.get('params.pac')
pac_params_obj = self.pool.get("params.pac")
for attachment in self.browse(cr, uid, ids, context=context):
invoice = attachment.invoice_id
msg = ''
cod_status = ''
msg = ""
cod_status = ""
# Get data from PAC configuration
pac_ids = pac_params_obj.search(
cr, uid,
[('method_type', '=', 'pac_4g_cancelar'),
('company_id', '=', invoice.company_emitter_id.id),
('active', '=', True)
], limit=1, context=context
cr,
uid,
[
("method_type", "=", "pac_4g_cancelar"),
("company_id", "=", invoice.company_emitter_id.id),
("active", "=", True),
],
limit=1,
context=context,
)
if not pac_ids:
raise osv.except_osv(
_('Warning'),
_('Not found information from web services of PAC, '
'verify that the configuration of PAC is correct')
_("Warning"),
_(
"Not found information from web services of PAC, "
"verify that the configuration of PAC is correct"
),
)
pac = pac_params_obj.browse(cr, uid, pac_ids, context)[0]
# Send file to PAC for cancelation
if pac.user == 'VE223.test_ws':
msg += _(u'WARNING, CANCEL IN TEST 4G FACTOR!!!!\n\n')
if pac.user == "VE223.test_ws":
msg += _(u"WARNING, CANCEL IN TEST 4G FACTOR!!!!\n\n")
try:
WSDL = self._get_ws(pac.url_webservice)
......@@ -212,46 +235,54 @@ class params_pac(osv.Model):
# Cancela Factura
response = client.service.cancelCFDIRequest(request)
# Obtiene el codigo que esta en uuid separado por :
cod_status = response.uuid.split(':')[1]
cod_status = response.uuid.split(":")[1]
if response.ok:
msg += _("Please wait 5 minutes before check the invoice in the SAT.")
msg += _(
"Please wait 5 minutes before check the invoice in the SAT."
)
# Invoice canceled, save the cancelation date on database
self.pool.get('account.invoice').write(
cr, uid, [invoice.id],
{'cfdi_fecha_cancelacion': time.strftime('%Y-%m-%d %H:%M:%S')},
context=None
self.pool.get("account.invoice").write(
cr,
uid,
[invoice.id],
{
"cfdi_fecha_cancelacion": time.strftime(
"%Y-%m-%d %H:%M:%S"
)
},
context=None,
)
# Cierra sesión
client.service.closeSession(token)
else:
errors = {
'200': _('UUID en proceso de cancelación'),
'202': _('UUID previamente cancelado'),
'203': _('UUID a cancelar no pertenece a contribuyente'),
'205': _('UUID a cancelar desconocido'),
'206': _('UUID no solicitado para cancelación'),
'207': _('UUID en fecha inválida para cancelación (debe ser cancelado al menos 48 horas depués de su emisión)'),
'208': _('UUID inválido')
"200": _("UUID en proceso de cancelación"),
"202": _("UUID previamente cancelado"),
"203": _("UUID a cancelar no pertenece a contribuyente"),
"205": _("UUID a cancelar desconocido"),
"206": _("UUID no solicitado para cancelación"),
"207": _(
"UUID en fecha inválida para cancelación (debe ser cancelado al menos 48 horas depués de su emisión)"
),
"208": _("UUID inválido"),
}
# Cierra sesión
client.service.closeSession(token)
# error
raise osv.except_osv(
_('Error'),
_('Stamped Code: %s. \n Stamped Message: %s.')
% (cod_status, errors[cod_status])
_("Error"),
_("Stamped Code: %s. \n Stamped Message: %s.")
% (cod_status, errors[cod_status]),
)
else:
raise osv.except_osv(
_("Error, could not create a session, code: %d")
% str(session.errorCode)
)
except Exception, e:
except Exception as e:
# Error al establecer comunicación con el PAC
logger.debug(str(e))
raise osv.except_osv(
_('Error'),
_(u"PAC communication failed. \n%s") %
str(e)
_("Error"), _(u"PAC communication failed. \n%s") % str(e)
)
return {'message': msg, 'status': cod_status}
return {"message": msg, "status": cod_status}
......@@ -18,34 +18,35 @@ logger = logging.getLogger(__name__)
class ParamsPac(models.Model):
_inherit = 'params.pac'
_inherit = "params.pac"
@api.model
def _get_pac_type(self):
types = super(ParamsPac, self)._get_pac_type()
types.extend([
('pac_sf', _('PAC SF')),
])
types.extend([("pac_sf", _("PAC SF"))])
return types
pac_type = fields.Selection(
_get_pac_type,
'Process to perform', type='char', size=64, required=True,
help='Type of process to configure in this pac',
"Process to perform",
type="char",
size=64,
required=True,
help="Type of process to configure in this pac",
)
def get_driver_fc_sign(self):
factura_mx_type__fc = super(ParamsPac, self).get_driver_fc_sign()
if factura_mx_type__fc is None:
factura_mx_type__fc = {}
factura_mx_type__fc.update({'pac_sf': self._upload_ws_file})
factura_mx_type__fc.update({"pac_sf": self._upload_ws_file})
return factura_mx_type__fc
def get_driver_fc_cancel(self):
factura_mx_type__fc = super(ParamsPac, self).get_driver_fc_cancel()
if factura_mx_type__fc is None:
factura_mx_type__fc = {}
factura_mx_type__fc.update({'pac_sf': self.sf_cancel})
factura_mx_type__fc.update({"pac_sf": self.sf_cancel})
return factura_mx_type__fc
def sf_cancel(self, uuid):
......@@ -60,37 +61,39 @@ class ParamsPac(models.Model):
# Get certificate and key from certificate config in
# company
cer_csd = base64.encodestring(
self.company_id.certificate_id.certificate_file_pem,
self.company_id.certificate_id.certificate_file_pem
)
key_csd = base64.encodestring(
self.company_id.certificate_id.certificate_key_file_pem,
self.company_id.certificate_id.certificate_key_file_pem
)
contrasena_csd = self.company_id.certificate_id.certificate_password
# It does not make sense to send an email
# when the CFDI is cancelled since we have an ERP
# and could review the cancelled CFDI directly in the ERP
email_company = ''
email_company = ""
params = [
user, password, uuid, vat_emmiter,
email_company, cer_csd, key_csd, contrasena_csd,
user,
password,
uuid,
vat_emmiter,
email_company,
cer_csd,
key_csd,
contrasena_csd,
]
wsdl_client.soapproxy.config.dumpSOAPOut = 0
wsdl_client.soapproxy.config.dumpSOAPIn = 0
wsdl_client.soapproxy.config.debug = 0
wsdl_client.soapproxy.config.dict_encoding = 'UTF-8'
wsdl_client.soapproxy.config.dict_encoding = "UTF-8"
result = wsdl_client.cancelarAsincrono(*params)
status_cancel = result['status'] or ''
mensaje_cancel = result['mensaje'] or ''
status_cancel = result["status"] or ""
mensaje_cancel = result["mensaje"] or ""
# Something went wrong and raise exception
if status_cancel in pac_exceptions.cancel_error_list:
raise pac_exceptions.CancelError(
code=status_cancel, message=mensaje_cancel,
)
raise pac_exceptions.CancelError(code=status_cancel, message=mensaje_cancel)
# Cancel process was unsuccessful
elif status_cancel not in '200':
raise Exception(
' '.join(['Code', status_cancel, mensaje_cancel]),
)
elif status_cancel not in "200":
raise Exception(" ".join(["Code", status_cancel, mensaje_cancel]))
# If code result is 200 then all was okay
else:
return True
......@@ -99,25 +102,23 @@ class ParamsPac(models.Model):
"""
@params fdata : File.xml codification in base64
"""
msg = ''
msg = ""
cfdi_xml = False
comprobante = 'cfdi:Comprobante'
comprobante = "cfdi:Comprobante"
cfd_data = base64.decodestring(fdata)
xml_res_str = xml.dom.minidom.parseString(cfd_data)
xml_res_addenda = self.add_addenta_xml(
xml_res_str, comprobante,
)
xml_res_str_addenda = xml_res_addenda.toxml('UTF-8')
xml_res_str_addenda = xml_res_str_addenda.replace(codecs.BOM_UTF8, '')
xml_res_addenda = self.add_addenta_xml(xml_res_str, comprobante)
xml_res_str_addenda = xml_res_addenda.toxml("UTF-8")
xml_res_str_addenda = xml_res_str_addenda.replace(codecs.BOM_UTF8, "")
# Get values use to connect with SAT
user = self.user
password = self.password
wsdl_url = self.url_webservice_sign
namespace = self.namespace_sign
if 'testing' in wsdl_url:
logger.info('WARNING, SIGNED IN TEST!!!!')
msg += _('WARNING, SIGNED IN TEST!!!!\n\n')
if "testing" in wsdl_url:
logger.info("WARNING, SIGNED IN TEST!!!!")
msg += _("WARNING, SIGNED IN TEST!!!!\n\n")
try:
wsdl_client = WSDL.SOAPProxy(wsdl_url, namespace)
except: # noqa
......@@ -129,67 +130,73 @@ class ParamsPac(models.Model):
wsdl_client.soapproxy.config.dumpSOAPOut = 0
wsdl_client.soapproxy.config.dumpSOAPIn = 0
wsdl_client.soapproxy.config.debug = 0
wsdl_client.soapproxy.config.dict_encoding = 'UTF-8'
wsdl_client.soapproxy.config.dict_encoding = "UTF-8"
resultado = wsdl_client.timbrar(*params)
mensaje = _(tools.ustr(resultado['mensaje']))
resultados_mensaje = resultado['resultados'] and \
resultado['resultados']['mensaje'] or ''
folio_fiscal = resultado['resultados'] and \
resultado['resultados']['uuid'] or ''
codigo_timbrado = resultado['status'] or ''
codigo_validacion = resultado['resultados'] and \
resultado['resultados']['status'] or ''
if codigo_timbrado in ['311', '312']:
raise pac_exceptions.PrimarySectorError(
code=resultado['codigo'],
mensaje = _(tools.ustr(resultado["mensaje"]))
resultados_mensaje = (
resultado["resultados"] and resultado["resultados"]["mensaje"] or ""
)
folio_fiscal = resultado["resultados"] and resultado["resultados"]["uuid"] or ""
codigo_timbrado = resultado["status"] or ""
codigo_validacion = (
resultado["resultados"] and resultado["resultados"]["status"] or ""
)
if codigo_timbrado in ["311", "312"]:
raise pac_exceptions.PrimarySectorError(code=resultado["codigo"])
elif (
codigo_timbrado == "200"
and codigo_validacion == "200"
or codigo_validacion == "307"
): # noqa
fecha_timbrado = resultado["resultados"]["fechaTimbrado"] or False
fecha_timbrado = (
fecha_timbrado
and time.strftime(
"%Y-%m-%d %H:%M:%S",
time.strptime(fecha_timbrado[:19], "%Y-%m-%dT%H:%M:%S"),
)
or False
)
elif codigo_timbrado == '200' and codigo_validacion == '200' or codigo_validacion == '307': # noqa
fecha_timbrado = resultado[
'resultados']['fechaTimbrado'] or False
fecha_timbrado = fecha_timbrado and time.strftime(
'%Y-%m-%d %H:%M:%S', time.strptime(
fecha_timbrado[:19], '%Y-%m-%dT%H:%M:%S',
),
) or False
cfdi_data = {
'cfdi_xml': base64.decodestring(
resultado['resultados']['cfdiTimbrado'] or '',
), # este se necesita en uno que no es base64
"cfdi_xml": base64.decodestring(
resultado["resultados"]["cfdiTimbrado"] or ""
) # este se necesita en uno que no es base64
}
msg += mensaje + '.' + resultados_mensaje + \
' Folio Fiscal: ' + folio_fiscal + '.'
msg += (
mensaje
+ "."
+ resultados_mensaje
+ " Folio Fiscal: "
+ folio_fiscal
+ "."
)
msg += _(
'Make Sure to the file really has generated correctly to '
'the SAT\nhttps://www.consulta.sat.gob.mx/sicofi_web/'
'moduloECFD_plus/ValidadorCFDI/Validador%20cfdi.html',
"Make Sure to the file really has generated correctly to "
"the SAT\nhttps://www.consulta.sat.gob.mx/sicofi_web/"
"moduloECFD_plus/ValidadorCFDI/Validador%20cfdi.html"
)
if cfdi_data.get('cfdi_xml', False):
url_pac = '</"{comprobante}"><!--Para validar el XML CFDI puede descargar el certificado del PAC desde la siguiente liga: https://solucionfactible.com/cfdi/00001000000102699425.zip-->'.format(comprobante=comprobante) # noqa
cfdi_data['cfdi_xml'] = cfdi_data['cfdi_xml'].replace(
'</"%s">' % (comprobante), url_pac,
if cfdi_data.get("cfdi_xml", False):
url_pac = '</"{comprobante}"><!--Para validar el XML CFDI puede descargar el certificado del PAC desde la siguiente liga: https://solucionfactible.com/cfdi/00001000000102699425.zip-->'.format(
comprobante=comprobante
) # noqa
cfdi_data["cfdi_xml"] = cfdi_data["cfdi_xml"].replace(
'</"%s">' % (comprobante), url_pac
)
f = base64.encodestring(cfdi_data['cfdi_xml'] or '')
cfdi_xml = cfdi_data.pop('cfdi_xml')
f = base64.encodestring(cfdi_data["cfdi_xml"] or "")
cfdi_xml = cfdi_data.pop("cfdi_xml")
if cfdi_xml:
cfdi_data['cfdi_xml'] = cfdi_xml
cfdi_data["cfdi_xml"] = cfdi_xml
else:
msg += _('Can\'t extract the file XML of PAC')
return {
'file': f,
'msg': msg,
'cfdi_xml': cfdi_xml.decode('utf-8'),
}
msg += _("Can't extract the file XML of PAC")
return {"file": f, "msg": msg, "cfdi_xml": cfdi_xml.decode("utf-8")}
elif codigo_timbrado in pac_exceptions.validation_error_list:
# El CFDI no ha sido timbrado
raise pac_exceptions.ValidationError(
code=codigo_timbrado,
message=' '.join([mensaje, resultados_mensaje]),
code=codigo_timbrado, message=" ".join([mensaje, resultados_mensaje])
)
else:
raise Exception(
' '.join(
['Code', codigo_timbrado, mensaje, resultados_mensaje],
),
" ".join(["Code", codigo_timbrado, mensaje, resultados_mensaje])
)
@staticmethod
......@@ -198,63 +205,68 @@ class ParamsPac(models.Model):
@params xml_res_str: File XML
@params comprobante: Name to the Node that contain the XML
"""
def add_node(node_name, attrs, parent_node, minidom_xml_obj, attrs_types, order=False): # noqa
def add_node(
node_name, attrs, parent_node, minidom_xml_obj, attrs_types, order=False
): # noqa
if not order:
order = attrs
new_node = minidom_xml_obj.createElement(node_name)
for key in order:
if attrs_types[key] == 'attribute':
if attrs_types[key] == "attribute":
new_node.setAttribute(key, attrs[key])
elif attrs_types[key] == 'textNode':
elif attrs_types[key] == "textNode":
key_node = minidom_xml_obj.createElement(key)
text_node = minidom_xml_obj.createTextNode(attrs[key])
key_node.appendChild(text_node)
new_node.appendChild(key_node)
elif attrs_types[key] == 'att_text':
new_node.setAttribute('name', key)
elif attrs_types[key] == "att_text":
new_node.setAttribute("name", key)
text_node = minidom_xml_obj.createTextNode(attrs[key])
new_node.appendChild(text_node)
parent_node.appendChild(new_node)
return new_node
if xml_res_str:
node_addenda = xml_res_str.getElementsByTagName('cfdi:Addenda')
node_addenda = xml_res_str.getElementsByTagName("cfdi:Addenda")
if len(node_addenda) == 0:
node_comprobante = xml_res_str.getElementsByTagName(
comprobante)[0]
node_comprobante = xml_res_str.getElementsByTagName(comprobante)[0]
node_addenda = add_node(
'cfdi:Addenda', {}, node_comprobante,
xml_res_str, attrs_types={},
"cfdi:Addenda", {}, node_comprobante, xml_res_str, attrs_types={}
)
node_partner_attrs = {
'xmlns:sf': 'http://timbrado.solucionfactible.com/partners', # noqa
'xsi:schemaLocation': '"http://timbrado.solucionfactible.com/partners https://solucionfactible.com/timbrado/partners/partners.xsd', # noqa
'id': '150731',
"xmlns:sf": "http://timbrado.solucionfactible.com/partners", # noqa
"xsi:schemaLocation": '"http://timbrado.solucionfactible.com/partners https://solucionfactible.com/timbrado/partners/partners.xsd', # noqa
"id": "150731",
}
node_partner_attrs_types = {
'xmlns:sf': 'attribute',
'xsi:schemaLocation': 'attribute',
'id': 'attribute',
"xmlns:sf": "attribute",
"xsi:schemaLocation": "attribute",
"id": "attribute",
}
add_node(
'sf:Partner', node_partner_attrs,
node_addenda, xml_res_str,
"sf:Partner",
node_partner_attrs,
node_addenda,
xml_res_str,
attrs_types=node_partner_attrs_types,
)
else:
node_partner_attrs = {
'xmlns:sf': 'http://timbrado.solucionfactible.com/partners', # noqa
'xsi:schemaLocation': 'http://timbrado.solucionfactible.com/partners https://solucionfactible.com/timbrado/partners/partners.xsd', # noqa
'id': '150731',
"xmlns:sf": "http://timbrado.solucionfactible.com/partners", # noqa
"xsi:schemaLocation": "http://timbrado.solucionfactible.com/partners https://solucionfactible.com/timbrado/partners/partners.xsd", # noqa
"id": "150731",
}
node_partner_attrs_types = {
'xmlns:sf': 'attribute',
'xsi:schemaLocation': 'attribute',
'id': 'attribute',
"xmlns:sf": "attribute",
"xsi:schemaLocation": "attribute",
"id": "attribute",
}
add_node(
'sf:Partner', node_partner_attrs,
node_addenda, xml_res_str,
"sf:Partner",
node_partner_attrs,
node_addenda,
xml_res_str,
attrs_types=node_partner_attrs_types,
)
return xml_res_str
# -*- coding: utf-8 -*-
{
'name': 'Clabe Interbancaria',
'version': '1.1.0',
'author': 'OpenPyme',
'category': 'Localization/Mexico',
'website': 'http://openpyme.mx',
'license': 'AGPL-3',
'depends': [
'l10n_mx',
],
'data': [
'views/res_partner_bank.xml',
],
'installable': True,
"name": "Clabe Interbancaria",
"version": "1.1.0",
"author": "OpenPyme",
"category": "Localization/Mexico",
"website": "http://openpyme.mx",
"license": "AGPL-3",
"depends": ["l10n_mx"],
"data": ["views/res_partner_bank.xml"],
"installable": True,
}
......@@ -8,34 +8,34 @@ from openerp.tools.translate import _
class ResPartnerBank(models.Model):
_inherit = 'res.partner.bank'
_inherit = "res.partner.bank"
clabe = fields.Char('Clabe Interbancaria', size=64)
clabe = fields.Char("Clabe Interbancaria", size=64)
last_acc_number = fields.Char(
compute='_compute_last_acc_number',
string='Ultimos 4 digitos',
compute="_compute_last_acc_number",
string="Ultimos 4 digitos",
size=4,
store=True,
)
currency2_id = fields.Many2one('res.currency', 'Currency')
reference = fields.Char(size=64, help='Reference used in this bank')
currency2_id = fields.Many2one("res.currency", "Currency")
reference = fields.Char(size=64, help="Reference used in this bank")
@api.multi
@api.depends('acc_number')
@api.depends("acc_number")
def _compute_last_acc_number(self):
""" Get last 4 digits from account number"""
for res_bank in self.filtered(lambda r: r.acc_number):
res_bank.last_acc_number = res_bank.acc_number[-4:]
@api.constrains('clabe')
@api.constrains("clabe")
def _check_clabe(self):
"""Ensure field Clabe is valid"""
regex = re.compile(r'[0-9]{18}')
regex = re.compile(r"[0-9]{18}")
for res_bank in self.filtered(lambda r: r.clabe):
if not regex.match(res_bank.clabe):
raise ValidationError(_('Invalid Clabe'))
raise ValidationError(_("Invalid Clabe"))
if res_bank.acc_number and res_bank.acc_number not in res_bank.clabe:
raise ValidationError(_('Clabe does not correspond to given account'))
raise ValidationError(_("Clabe does not correspond to given account"))
@api.multi
def is_valid_for_payment_form(self, regex=False):
......
......@@ -9,33 +9,30 @@ class ResPartnerBank(TransactionCase):
def setUp(self):
super(ResPartnerBank, self).setUp()
self.res_partner_bank_model = self.env['res.partner.bank']
self.res_partner_bank_model = self.env["res.partner.bank"]
def test_wrong_clabe(self):
"""Raise exception when clabe is incorrect"""
examples = [
('00000000000', '0000000000000000'), # Invalid clabe
('00000001000', '0000000000000000'), # Clabe not match account
("00000000000", "0000000000000000"), # Invalid clabe
("00000001000", "0000000000000000"), # Clabe not match account
]
for account, clabe in examples:
with self.assertRaises(except_orm):
self.env['res.partner.bank'].create({
'state': 'bank',
'acc_number': account,
'clabe': clabe,
})
self.env["res.partner.bank"].create(
{"state": "bank", "acc_number": account, "clabe": clabe}
)
def test_validation_regex(self):
"""Validate several accounts against regex"""
examples = [
('000000000000000000', '[0-9]{11}|[0-9]{18}', True),
('00000000000', '[0-9]{11}|[0-9]{18}', True),
('000000000', '[0-9]{11}|[0-9]{18}', False),
("000000000000000000", "[0-9]{11}|[0-9]{18}", True),
("00000000000", "[0-9]{11}|[0-9]{18}", True),
("000000000", "[0-9]{11}|[0-9]{18}", False),
]
for account, regex, result in examples:
new_account = self.res_partner_bank_model.create({
'state': 'bank',
'acc_number': account,
})
new_account = self.res_partner_bank_model.create(
{"state": "bank", "acc_number": account}
)
res = new_account.is_valid_for_payment_form(regex)
self.assertEqual(res, result, 'Account not pass validation')
self.assertEqual(res, result, "Account not pass validation")
......@@ -6,7 +6,7 @@ import os
from setuptools import find_packages, setup
MANIFEST_NAMES = ('__openerp__.py', '__manifest__.py', '__terp__.py')
MANIFEST_NAMES = ("__openerp__.py", "__manifest__.py", "__terp__.py")
class NoManifestFound(Exception):
......@@ -27,39 +27,36 @@ def parse_manifest(s):
def read_manifest(addon_dir):
manifest_path = get_manifest_path(addon_dir)
if not manifest_path:
raise NoManifestFound('No manifest found in %s' % addon_dir)
raise NoManifestFound("No manifest found in %s" % addon_dir)
return parse_manifest(open(manifest_path).read())
addon_dir = 'l10n_mx_ir_attachment_facturae'
addon_dir = "l10n_mx_ir_attachment_facturae"
manifest = read_manifest(addon_dir)
long_description = (
open(os.path.join(addon_dir, 'README.rst')).read() +
'\n' +
'Contributors\n'
'============\n' +
'\n' +
open(os.path.join(addon_dir, 'CONTRIBUTORS.rst')).read() +
'\n' +
open(os.path.join(addon_dir, 'CHANGES.md')).read() +
'\n')
open(os.path.join(addon_dir, "README.rst")).read() + "\n" + "Contributors\n"
"============\n"
+ "\n"
+ open(os.path.join(addon_dir, "CONTRIBUTORS.rst")).read()
+ "\n"
+ open(os.path.join(addon_dir, "CHANGES.md")).read()
+ "\n"
)
setup(
name=addon_dir,
version=manifest.get('version'),
version=manifest.get("version"),
long_description=long_description,
# Get more from http://pypi.python.org/pypi?%3Aaction=list_classifiers
classifiers=[
'Programming Language :: Python',
'Operating System :: OS Independent',
'Programming Language :: Python :: 2.7',
"Programming Language :: Python",
"Operating System :: OS Independent",
"Programming Language :: Python :: 2.7",
],
keywords='Python PyERP',
license=manifest.get('license', 'GPL-3'),
keywords="Python PyERP",
license=manifest.get("license", "GPL-3"),
packages=find_packages(),
include_package_data=True,
zip_safe=False,
install_requires=[
'setuptools',
],
install_requires=["setuptools"],
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment