Skip to content
Snippets Groups Projects
Commit 0d726be0 authored by Anton Sarukhanov's avatar Anton Sarukhanov
Browse files

Refactored to be more object-oriented/scalable.

parent 2049c48d
No related branches found
No related tags found
No related merge requests found
Pipeline #34 failed with stage
in 1 minute and 21 seconds
*.py[cod]
/venv
.cache
*.egg
*.egg-info
# Coverage
/htmlcov
.coverage
......@@ -2,7 +2,8 @@ before_script:
- apt-get update -qq && apt-get install -y -qq python-virtualenv python3 python3-dev libxslt1-dev libxml2-dev
- virtualenv venv -p$(which python3)
- . venv/bin/activate
- pip install -r requirements.txt
- pip install -e .
- pip install -r test-requirements.txt
pep8:
script:
......
......@@ -6,8 +6,17 @@ WordPress plugin.
## Supported Sources
* [Ecwid](https://support.ecwid.com/hc/en-us/articles/207099979-Import-Export)
- [Ecwid](https://support.ecwid.com/hc/en-us/articles/207099979-Import-Export)
## Usage
`wooify.py adapter input_file`
* `./wooify source_filename [destination_filename]`
- `adapter`: an import adapter
- `input_file`: the source file you want to convert
Available import adapters: `ecwid`
Output will be written to:
- `woocommerce_products.csv`
- `woocommerce_categories.csv`
awesome-slugify==1.6.5
pep8==1.7.0
lxml==3.7.0
setup.py 0 → 100644
import os
from setuptools import setup
def read(fname):
"""Return contents of named file as a string."""
return open(os.path.join(os.path.dirname(__file__), fname)).read()
setup(
name='Wooify',
license='MIT',
author='Anton Sarukhanov',
author_email='code@ant.sr',
description='A script to prepare eCommerce product data'
'for import into the "Woo Import Export" WordPress plugin.',
long_description=read('README.md'),
packages=['wooify', 'tests'],
keywords='woocommerce wordpress products ecommerce',
install_requires=[
'awesome-slugify==1.6.5',
'lxml==3.7.0',
],
entry_points='''
[console_scripts]
wooify=wooify.__main__:main
''',
)
pep8==1.7.0
#!/usr/bin/env python3
"""
Prepare eCommerce product data for import into the
"Woo Import Export" WordPress plugin.
Supported sources:
- Ecwid
"""
import sys
import csv
import re
import json
import datetime
import lxml.html.clean as lxml_clean
from lxml.etree import XMLSyntaxError
from slugify import UniqueSlugify
ECWID_CATEGORY_DELIMITER = ' / '
ECWID_CSV_DELIMITER = ';'
WP_SLUG_REGEX = re.compile('[\W-]+')
HTML_ALLOWED_TAGS = ['a', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'ul', 'ol',
'li', 'b', 'i', 'strong', 'em', 'u', 'table', 'tr', 'td',
'th', 'thead', 'tbody']
WOO_IMP_EXP_PRODUCT_FIELDS = [
# ("FieldName", "Default Value"),
("Id", ""),
("Product Name", ""),
("Created Date", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
("Description", ""),
("Product Type", "simple"),
("Categories", json.dumps([])),
("Price", 0),
("Short Description", ""),
("Product Status", "publish"),
("Permalink", ""),
("Tags", ""),
("SKU", ""),
("Sale Price", ""),
("Visibility", "visible"),
("On Sale", "no"),
("Stock Status", "instock"),
("Regular Price", 0),
("Total Sales", 0),
("Downloadable", "no"),
("Virtual", "no"),
("Purchase Note", ""),
("Weight", ""),
("Length", ""),
("Width", ""),
("Height", ""),
("Unit", "in"),
("Sold Individually", "no"),
("Manage Stock", "no"),
("Stock", ""),
("Backorders Allowed", "false"),
("Backorders", "no"),
("Purchaseable", "yes"),
("Featured", "no"),
("Is Taxable", "yes"),
("Tax Status", "taxable"),
("Tax Class", ""),
("Product Images", "false"),
("Product Image Set", "no"),
("Download Limit", ""),
("Download Expiry", ""),
("Downloadable Files", ""),
("Download Type", ""),
("Product URL", ""),
("Button Text", ""),
("Shipping Required", "yes"),
("Shipping Taxable", "yes"),
("Shipping Class", ""),
("Shipping Class Id", ""),
("Average Rating", 0),
("Rating Count", 0),
("Related Ids", ""),
("Upsell Ids", ""),
("Cross Sell Ids", ""),
("Attributes", ""),
("Custom Fields", ""),
("Product Parent id", 0),
("Variation Description", ""),
("Menu Order", 0),
("Comment Status", "closed"),
("Ping Status", "open")
]
WOO_IMP_EXP_CATEGORY_FIELDS = [
# ("FieldName", "Default Value"),
("Id", ''),
("Name", ''),
("Slug", ''),
("Term Taxonomy Id", ''),
("Taxonomy", 'product_cat'),
("Parent Id", ''),
("Parent Slug", ''),
("Description", ''),
("Term Group", 0),
("Count", ''),
("Category Image", ''),
("Woocommerce Term Meta", ''),
]
def clean_html(html_string):
"""Remove all attributes from HTML string."""
cleaner = lxml_clean.Cleaner(
allow_tags=HTML_ALLOWED_TAGS, remove_unknown_tags=False,
page_structure=False, safe_attrs_only=True, safe_attrs=frozenset())
return cleaner.clean_html("<div>{0}</div>".format(html_string))
def ecwid_parser(r, r_id=None):
try:
r["description"] = clean_html(r["description"])
except XMLSyntaxError:
print("Error parsing HTML for product: "
"{0} ({1})".format(r["name"], (r["sku"])))
raw_category_names = list(filter(None, [r["category{0}".format(c)]
if r["category{0}".format(c)] else None
for c in range(1, 3)]))
product = dict(WOO_IMP_EXP_PRODUCT_FIELDS)
product.update({
"Id": r["product_id"],
"Product Name": r["name"],
"Categories": raw_category_names,
"SKU": r["sku"],
"Weight": r["weight"] if float(r["weight"]) > 0 else '',
"Product Images": r["image"],
"Product Image Set": "yes" if r["image"] else "no",
"Short Description": r["seo_description"],
"Description": r["description"],
"Price": r["price"],
"Regular Price": r["recommended_price"] or r["price"],
"Visibility": "visible" if r["enabled"] == "yes" else "invisible",
"Purchaseable": r["enabled"] or "no",
})
return product
def expand_category(name, cat_id, slugify=None, delimiter=None, products=None):
"""Build a rich category entry from minimal input."""
slugify = slugify or UniqueSlugify()
clean_name = (name[name.rfind(delimiter) + len(delimiter):]
if delimiter and delimiter in name else name)
return {
'Id': cat_id,
'Term Taxonomy Id': cat_id,
'Name': clean_name,
'Slug': slugify(clean_name),
'Parent Name': (name[:name.rfind(delimiter)]
if delimiter and delimiter in name
else ''),
'Count': (sum([1 for p in products if name in p['Categories']])
if products else 0),
}
def add_missing_parents(categories, products, delimiter, slugify=None):
"""Find categories whose parents do not exist, and recursively add
those parents to the list."""
def split_category_name(name):
"""Return the name of each of this categories ancestors, and itself."""
return [(name[:name.rfind(delimiter)]
if delimiter and delimiter in name
else '') for i in range(name.count(delimiter))] + [name]
slugify = slugify or UniqueSlugify()
categories_with_parents = categories.copy()
for i, c in categories.items():
try:
for ancestor in split_category_name(c['Parent Name']):
if ancestor not in categories_with_parents:
cat_id = len(categories_with_parents) + 1
categories_with_parents[ancestor] = expand_category(
ancestor, cat_id, slugify, delimiter,
products)
except KeyError:
continue
return categories_with_parents
def make_woocommerce_csv(input_file, products_file, categories_file=None):
"""Convert input_file, a CSV of eCommerce products, into
a format that can be imported by WooCommerce."""
with open(input_file) as csv_in,\
open(products_file, 'w') as csv_products,\
categories_file and open(categories_file, 'w') as csv_categories:
ecwid_reader = csv.DictReader(csv_in, delimiter=ECWID_CSV_DELIMITER)
products_writer = csv.DictWriter(
csv_products,
fieldnames=[f for f, d in WOO_IMP_EXP_PRODUCT_FIELDS])
products_writer.writeheader()
raw_category_names = []
products = []
for row in ecwid_reader:
product = ecwid_parser(row)
products.append(product)
raw_category_names.extend(product['Categories'])
raw_category_names = list(set(raw_category_names)) # Uniquify
categories = {}
slugify = UniqueSlugify(to_lower=True)
for idx, name in enumerate(raw_category_names):
categories[name] = expand_category(name, idx, slugify,
ECWID_CATEGORY_DELIMITER,
products)
categories = add_missing_parents(categories, products,
ECWID_CATEGORY_DELIMITER, slugify)
for i, c in categories.items():
if c['Parent Name']:
parent = categories[c['Parent Name']]
c['Parent Id'] = parent['Id']
c['Parent Slug'] = parent['Slug']
else:
c['Parent Id'] = ''
c['Parent Slug'] = ''
del c['Parent Name']
for product in products:
product['Categories'] = json.dumps([{
'name': categories[c]['Name'], 'slug': categories[c]['Slug']}
for c in product['Categories']])
products_writer.writerow(product)
if csv_categories:
categories_writer = csv.DictWriter(
csv_categories,
fieldnames=[f for f, d in WOO_IMP_EXP_CATEGORY_FIELDS])
categories_writer.writeheader()
for category in list(categories.values()):
categories_writer.writerow(category)
if __name__ == '__main__':
try:
input_file = sys.argv[1]
except IndexError:
print("\n Usage: {0} input_file\n".format(sys.argv[0]))
sys.exit()
make_woocommerce_csv(input_file,
'woocommerce_products.csv',
'woocommerce_categories.csv')
from .wooifier import Wooifier
#!/usr/bin/env python
"""
Prepare eCommerce product data for import into the "Woo Import Export" \
WordPress plugin.
Usage: {0} adapter input_file
adapter: an import adapter
input_file: the source file you want to convert
Available import adapters: ecwid
Output will be written to:
woocommerce_products.csv
woocommerce_categories.csv
"""
import sys
from .wooifier import Wooifier
def main():
try:
adapter_name = sys.argv[1]
input_file = sys.argv[2]
except IndexError:
print(__doc__.format(sys.argv[0]))
sys.exit()
adapter_mod = __import__('wooify.adapters.{}'.format(adapter_name),
fromlist=['{}Adapter'.format(
adapter_name.title())])
adapter_cls = getattr(adapter_mod, '{}Adapter'.format(
adapter_name.title()))
wooifier = Wooifier(adapter_cls)
wooifier.wooify(input_file,
'woocommerce_products.csv',
'woocommerce_categories.csv')
import lxml.html.clean
from ..constants import ALLOWED_HTML_TAGS, HTML_FIELDS
class Adapter(object):
"""The base class for adapters. An adapter is a handler for
some format wooify should support as input.
"""
products = []
categories = []
def __init__(self, filename):
self.parse(filename)
def parse(self, filename):
"""Parse an input file."""
raise NotImplementedError
@property
def clean_products(self):
clean_products = []
for product in self.products:
for field in HTML_FIELDS:
product[field] = self._clean_html(product[field])
clean_products.append(product)
return clean_products
def _clean_html(self, html_string):
"""Remove all attributes from HTML string."""
cleaner = lxml.html.clean.Cleaner(
allow_tags=ALLOWED_HTML_TAGS, remove_unknown_tags=False,
page_structure=False, safe_attrs_only=True, safe_attrs=frozenset())
return cleaner.clean_html("<div>{0}</div>".format(html_string))
import csv
import json
from slugify import UniqueSlugify
from .adapter import Adapter as BaseAdapter
from ..constants import (ALLOWED_HTML_TAGS, HTML_FIELDS,
WOO_IMP_EXP_PRODUCT_FIELDS)
CATEGORY_DELIMITER = ' / '
CSV_DELIMITER = ';'
class EcwidAdapter(BaseAdapter):
"""Input handler for product data exported from Ecwid.
See https://www.ecwid.com/
"""
categories = {}
def parse(self, filename):
"""Parse an ecwid CSV file."""
def parse_row(r):
"""Parse a row of the Ecwid CSV file and rearrange the fields for
WooCommerce."""
raw_category_names = list(filter(None, [
r["category{0}".format(c)] if r["category{0}".format(c)]
else None
for c in range(1, 3)]))
product = dict(WOO_IMP_EXP_PRODUCT_FIELDS)
product.update({
"Id": r["product_id"],
"Product Name": r["name"],
"Categories": raw_category_names,
"SKU": r["sku"],
"Weight": r["weight"] if float(r["weight"]) > 0 else '',
"Product Images": r["image"],
"Product Image Set": "yes" if r["image"] else "no",
"Short Description": r["seo_description"],
"Description": r["description"],
"Price": r["price"],
"Regular Price": r["recommended_price"] or r["price"],
"Visibility": ("visible" if r["enabled"] == "yes"
else "invisible"),
"Purchaseable": r["enabled"] or "no",
})
return product
with open(filename) as csv_in:
csv_reader = csv.DictReader(csv_in, delimiter=CSV_DELIMITER)
raw_category_names = []
for row in csv_reader:
product = parse_row(row)
self.products.append(product)
raw_category_names += product['Categories']
raw_category_names = list(set(raw_category_names)) # Uniquify
categories = self._expand_categories(raw_category_names)
for product in self.products:
if 'Categories' in product and product['Categories']:
product['Categories'] = [{
'name': categories[c]['Name'],
'slug': categories[c]['Slug']}
for c in product['Categories']]
self.categories = categories.values()
def _expand_categories(self, raw_category_names):
"""Convert a list of category name strings into a list of WooCommerce-ready
category dicts. Adds parent categories where missing."""
def expand_category(name, cat_id, slugify, products):
"""Build a rich category entry from minimal input."""
clean_name = (name[name.rfind(CATEGORY_DELIMITER) +
len(CATEGORY_DELIMITER):]
if CATEGORY_DELIMITER and CATEGORY_DELIMITER in name
else name)
return {
'Id': cat_id,
'Term Taxonomy Id': cat_id,
'Name': clean_name,
'Slug': slugify(clean_name),
'Parent Name': (
name[:name.rfind(CATEGORY_DELIMITER)]
if CATEGORY_DELIMITER and CATEGORY_DELIMITER in name
else ''),
'Count': sum([1 for p in products if name in p['Categories']]),
}
def add_missing_parents(categories, products, slugify):
"""Find categories whose parents do not exist, and recursively add
those parents to the list."""
def split_category_name(name):
"""Get the names of this categories ancestors, and itself."""
return [(name[:name.rfind(CATEGORY_DELIMITER)]
if CATEGORY_DELIMITER and CATEGORY_DELIMITER in name
else '') for i in range(
name.count(CATEGORY_DELIMITER))] + [name]
cats_with_parents = categories.copy()
for i, c in categories.items():
try:
for ancestor in split_category_name(c['Parent Name']):
if ancestor not in cats_with_parents:
cat_id = len(cats_with_parents) + 1
cats_with_parents[ancestor] = expand_category(
ancestor, cat_id, slugify, products)
except KeyError:
continue
return cats_with_parents
categories = {}
slugify = UniqueSlugify(to_lower=True)
for idx, name in enumerate(raw_category_names):
categories[name] = expand_category(name, idx, slugify,
self.products)
categories = add_missing_parents(categories, self.products, slugify)
for i, c in categories.items():
if c['Parent Name']:
parent = categories[c['Parent Name']]
c['Parent Id'] = parent['Id']
c['Parent Slug'] = parent['Slug']
else:
c['Parent Id'] = ''
c['Parent Slug'] = ''
del c['Parent Name']
return categories
adapter_cls = EcwidAdapter
import datetime
import json
ALLOWED_HTML_TAGS = [
'a',
'h1',
'h2',
'h3',
'h4',
'h5',
'h6',
'p',
'ul',
'ol',
'li',
'b',
'i',
'strong',
'em',
'u',
'table',
'tr',
'td',
'th',
'thead',
'tbody'
]
HTML_FIELDS = [
'Description'
]
WOO_IMP_EXP_PRODUCT_FIELDS = [
# ("FieldName", "Default Value"),
("Id", ""),
("Product Name", ""),
("Created Date", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
("Description", ""),
("Product Type", "simple"),
("Categories", json.dumps([])),
("Price", 0),
("Short Description", ""),
("Product Status", "publish"),
("Permalink", ""),
("Tags", ""),
("SKU", ""),
("Sale Price", ""),
("Visibility", "visible"),
("On Sale", "no"),
("Stock Status", "instock"),
("Regular Price", 0),
("Total Sales", 0),
("Downloadable", "no"),
("Virtual", "no"),
("Purchase Note", ""),
("Weight", ""),
("Length", ""),
("Width", ""),
("Height", ""),
("Unit", "in"),
("Sold Individually", "no"),
("Manage Stock", "no"),
("Stock", ""),
("Backorders Allowed", "false"),
("Backorders", "no"),
("Purchaseable", "yes"),
("Featured", "no"),
("Is Taxable", "yes"),
("Tax Status", "taxable"),
("Tax Class", ""),
("Product Images", "false"),
("Product Image Set", "no"),
("Download Limit", ""),
("Download Expiry", ""),
("Downloadable Files", ""),
("Download Type", ""),
("Product URL", ""),
("Button Text", ""),
("Shipping Required", "yes"),
("Shipping Taxable", "yes"),
("Shipping Class", ""),
("Shipping Class Id", ""),
("Average Rating", 0),
("Rating Count", 0),
("Related Ids", ""),
("Upsell Ids", ""),
("Cross Sell Ids", ""),
("Attributes", ""),
("Custom Fields", ""),
("Product Parent id", 0),
("Variation Description", ""),
("Menu Order", 0),
("Comment Status", "closed"),
("Ping Status", "open")
]
WOO_IMP_EXP_CATEGORY_FIELDS = [
# ("FieldName", "Default Value"),
("Id", ''),
("Name", ''),
("Slug", ''),
("Term Taxonomy Id", ''),
("Taxonomy", 'product_cat'),
("Parent Id", ''),
("Parent Slug", ''),
("Description", ''),
("Term Group", 0),
("Count", ''),
("Category Image", ''),
("Woocommerce Term Meta", ''),
]
import csv
from .constants import WOO_IMP_EXP_PRODUCT_FIELDS, WOO_IMP_EXP_CATEGORY_FIELDS
class Wooifier(object):
adapter_cls = None
def __init__(self, adapter_cls):
self.adapter_cls = adapter_cls
def wooify(self, input_file, products_file, categories_file=None):
adapter = self.adapter_cls(input_file)
with open(products_file, 'w') as csv_products:
products_writer = csv.DictWriter(
csv_products,
fieldnames=[f for f, d in WOO_IMP_EXP_PRODUCT_FIELDS])
products_writer.writeheader()
for product in adapter.clean_products:
products_writer.writerow(product)
if categories_file and adapter.categories:
with open(categories_file, 'w') as csv_categories:
categories_writer = csv.DictWriter(
csv_categories,
fieldnames=[f for f, d in WOO_IMP_EXP_CATEGORY_FIELDS])
categories_writer.writeheader()
for category in list(adapter.categories):
categories_writer.writerow(category)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment