stock/populate/stock.py

612 lines
29 KiB
Python

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import logging
import math
from datetime import datetime, timedelta
from itertools import product as cartesian_product
from collections import defaultdict
from odoo import models, api
from odoo.tools import populate, groupby
_logger = logging.getLogger(__name__)
# Take X first company to put some stock on it data (it is to focus data on these companies)
COMPANY_NB_WITH_STOCK = 3 # Need to be smaller than 5 (_populate_sizes['small'] of company)
class Warehouse(models.Model):
_inherit = 'stock.warehouse'
_populate_sizes = {'small': 6, 'medium': 12, 'large': 24}
_populate_dependencies = ['res.company']
def _populate(self, size):
# Activate options used in the stock populate to have a ready Database
_logger.info("Activate settings for stock populate")
self.env['res.config.settings'].create({
'group_stock_production_lot': True, # Activate lot
'group_stock_tracking_lot': True, # Activate package
'group_stock_multi_locations': True, # Activate multi-locations
'group_stock_tracking_owner': True, # Activate owner_id
}).execute()
return super()._populate(size)
def _populate_factories(self):
company_ids = self.env.registry.populated_models['res.company'][:COMPANY_NB_WITH_STOCK]
def get_name(values, counter, random):
return "WH-%d-%d" % (values['company_id'], counter)
return [
('company_id', populate.iterate(company_ids)),
('name', populate.compute(get_name)),
('code', populate.constant("W{counter}")),
('reception_steps', populate.iterate(['one_step', 'two_steps', 'three_steps'], [0.6, 0.2, 0.2])),
('delivery_steps', populate.iterate(['ship_only', 'pick_ship', 'pick_pack_ship'], [0.6, 0.2, 0.2])),
]
class StorageCategory(models.Model):
_inherit = 'stock.storage.category'
_populate_sizes = {'small': 10, 'medium': 20, 'large': 50}
def _populate(self, size):
# Activate options used in the stock populate to have a ready Database
self.env['res.config.settings'].create({
'group_stock_storage_categories': True, # Activate storage categories
}).execute()
return super()._populate(size)
def _populate_factories(self):
return [
('name', populate.constant("SC-{counter}")),
('max_weight', populate.iterate([10, 100, 500, 1000])),
('allow_new_product', populate.randomize(['empty', 'same', 'mixed'], [0.1, 0.1, 0.8])),
]
class Location(models.Model):
_inherit = 'stock.location'
_populate_sizes = {'small': 50, 'medium': 2_000, 'large': 50_000}
_populate_dependencies = ['stock.warehouse', 'stock.storage.category']
def _populate(self, size):
locations = super()._populate(size)
random = populate.Random('stock_location_sample')
locations_sample = self.browse(random.sample(locations.ids, len(locations.ids)))
company_ids = self.env.registry.populated_models['res.company'][:COMPANY_NB_WITH_STOCK]
warehouses = self.env['stock.warehouse'].browse(self.env.registry.populated_models['stock.warehouse'])
warehouse_by_company = dict(groupby(warehouses, lambda ware: ware.company_id.id))
loc_ids_by_company = dict(groupby(locations_sample, lambda loc: loc.company_id.id))
scenario_index = 0
for company_id in company_ids:
loc_ids_by_company[company_id] = loc_ids_by_company[company_id][::-1] # Inverse the order to use pop()
warehouses = warehouse_by_company[company_id]
nb_loc_by_warehouse = math.ceil(len(loc_ids_by_company[company_id]) / len(warehouses))
for warehouse in warehouses:
# Manage the ceil, the last warehouse can have less locations than others.
nb_loc_to_take = min(nb_loc_by_warehouse, len(loc_ids_by_company[company_id]))
if scenario_index % 3 == 0:
# Scenario 1 : remain companies with "normal" level depth keep 4 levels max
depth = 3 # Force the number of level to 3 (root doesn't count)
elif scenario_index % 3 == 1:
# Scenario 2 : one company with very low level depth location tree (all child of root)
depth = 1
else:
# Scenario 3 : one company with high depth location tree
depth = 10
nb_by_level = int(math.log(nb_loc_to_take, depth)) + 1 if depth > 1 else nb_loc_to_take # number of loc to put by level
_logger.info("Create locations (%d) tree for a warehouse (%s) - depth : %d, width : %d" % (nb_loc_to_take, warehouse.code, depth, nb_by_level))
# Root is the lot_stock_id of warehouse
root = warehouse.lot_stock_id
def link_next_locations(parent, level):
if level < depth:
children = []
nonlocal nb_loc_to_take
nb_loc = min(nb_by_level, nb_loc_to_take)
nb_loc_to_take -= nb_loc
for i in range(nb_loc):
children.append(loc_ids_by_company[company_id].pop())
child_locations = self.env['stock.location'].concat(*children)
child_locations.location_id = parent # Quite slow, because the ORM flush each time
for child in child_locations:
link_next_locations(child, level + 1)
link_next_locations(root, 0)
scenario_index += 1
# Change 20 % the usage of some no-leaf location into 'view' (instead of 'internal')
to_views = locations_sample.filtered_domain([('child_ids', '!=', [])]).ids
random = populate.Random('stock_location_views')
view_locations = self.browse(random.sample(to_views, int(len(to_views) * 0.1)))
view_locations.write({
'usage': 'view',
'storage_category_id': False,
})
return locations
def _populate_factories(self):
company_ids = self.env.registry.populated_models['res.company'][:COMPANY_NB_WITH_STOCK]
removal_strategies = self.env['product.removal'].search([])
storage_category_ids = self.env.registry.populated_models['stock.storage.category']
def get_storage_category_id(values, counter, random):
if random.random() > 0.5:
return random.choice(storage_category_ids)
return False
return [
('name', populate.constant("Loc-{counter}")),
('usage', populate.constant('internal')),
('removal_strategy_id', populate.randomize(removal_strategies.ids + [False])),
('company_id', populate.iterate(company_ids)),
('storage_category_id', populate.compute(get_storage_category_id)),
]
class StockPutawayRule(models.Model):
_inherit = 'stock.putaway.rule'
_populate_sizes = {'small': 10, 'medium': 20, 'large': 50}
_populate_dependencies = ['stock.location', 'product.product']
def _populate_factories(self):
company_ids = self.env.registry.populated_models['res.company'][:COMPANY_NB_WITH_STOCK]
product_ids = self.env['product.product'].browse(self.env.registry.populated_models['product.product']).filtered(lambda p: p.type == 'product').ids
product_categ_ids = self.env.registry.populated_models['product.category']
storage_categ_ids = self.env.registry.populated_models['stock.storage.category']
location_ids = self.env['stock.location'].browse(self.env.registry.populated_models['stock.location']).filtered(lambda loc: loc.usage == 'internal')
def get_product_id(values, counter, random):
if random.random() > 0.5:
return random.choice(product_ids)
return False
def get_category_id(values, counter, random):
if not values['product_id']:
return random.choice(product_categ_ids)
return False
def get_location_in_id(values, counter, random):
locations = location_ids.filtered(lambda loc: loc.company_id.id == values['company_id'])
return random.choice(locations.ids)
def get_location_out_id(values, counter, random):
child_locs = self.env['stock.location'].search([
('id', 'child_of', values['location_in_id']),
('usage', '=', 'internal')
]) + self.env['stock.location'].browse(values['location_in_id'])
return random.choice(child_locs.ids)
return [
('company_id', populate.randomize(company_ids)),
('product_id', populate.compute(get_product_id)),
('category_id', populate.compute(get_category_id)),
('location_in_id', populate.compute(get_location_in_id)),
('location_out_id', populate.compute(get_location_out_id)),
('sequence', populate.randint(1, 1000)),
('storage_category_id', populate.randomize(storage_categ_ids)),
]
class StockWarehouseOrderpoint(models.Model):
_inherit = 'stock.warehouse.orderpoint'
_populate_sizes = {'small': 150, 'medium': 5_000, 'large': 60_000}
_populate_dependencies = ['product.product', 'product.supplierinfo', 'stock.location']
def _populate_factories(self):
warehouse_ids = self.env.registry.populated_models['stock.warehouse']
warehouses = self.env['stock.warehouse'].browse(warehouse_ids)
location_by_warehouse = {
warehouse.id: self.env['stock.location'].search([('id', 'child_of', warehouse.lot_stock_id.id)]).ids
for warehouse in warehouses
}
all_product_ids = set(self.env.registry.populated_models['product.product'])
supplierinfos = self.env['product.supplierinfo'].browse(self.env.registry.populated_models['product.supplierinfo'])
# Valid product by company (a supplier info exist for this product+company_id)
valid_product = defaultdict(set)
for suplierinfo in supplierinfos:
products = suplierinfo.product_id or suplierinfo.product_tmpl_id.product_variant_ids
# Reordering rule is only on the storable product
if products and products[0].type == 'product':
valid_product[suplierinfo.company_id.id] |= set(products.ids)
valid_product = {company_id: product_ids | valid_product[False] for company_id, product_ids in valid_product.items() if company_id}
invalid_product = {company_id: list(all_product_ids - product_ids) for company_id, product_ids in valid_product.items() if company_id}
valid_product = {company_id: list(product_ids) for company_id, product_ids in valid_product.items()}
def get_company_id(values, counter, random):
warehouse = self.env['stock.warehouse'].browse(values['warehouse_id'])
return warehouse.company_id.id
def get_location_product(iterator, field_name, model_name):
random = populate.Random('get_location_product')
# To avoid raise product_location_check : product/location/company (company is ensure because warehouse doesn't share location for now)
# Use generator to avoid cartisian product in memory
generator_valid_product_loc_dict = {}
generator_invalid_product_loc_dict = {}
for warehouse in warehouses:
# TODO: randomize cartesian product
generator_valid_product_loc_dict[warehouse.id] = cartesian_product(
# Force to begin by the main location of the warehouse
[warehouse.lot_stock_id.id] + random.sample(location_by_warehouse[warehouse.id], len(location_by_warehouse[warehouse.id])),
random.sample(valid_product[warehouse.company_id.id], len(valid_product[warehouse.company_id.id]))
)
generator_invalid_product_loc_dict[warehouse.id] = cartesian_product(
[warehouse.lot_stock_id.id] + random.sample(location_by_warehouse[warehouse.id], len(location_by_warehouse[warehouse.id])),
random.sample(invalid_product[warehouse.company_id.id], len(invalid_product[warehouse.company_id.id]))
)
for values in iterator:
# 95 % of the orderpoint will be valid (a supplier info exist for this product + company_id)
if random.random() < 0.95:
loc_id, product_id = next(generator_valid_product_loc_dict[values['warehouse_id']])
else:
loc_id, product_id = next(generator_invalid_product_loc_dict[values['warehouse_id']])
values['product_id'] = product_id
values['location_id'] = loc_id
yield values
return [
('active', populate.iterate([True, False], [0.95, 0.05])),
('warehouse_id', populate.iterate(warehouse_ids)),
('company_id', populate.compute(get_company_id)),
('_get_location_product', get_location_product),
('product_min_qty', populate.iterate([0.0, 2.0, 10.0], [0.6, 0.2, 0.2])),
('product_max_qty', populate.iterate([10.0, 20.0, 100.0], [0.6, 0.2, 0.2])),
('qty_multiple', populate.iterate([0.0, 1.0, 2.0, 10.0], [0.4, 0.2, 0.2, 0.2])),
]
class StockQuant(models.Model):
_inherit = 'stock.quant'
_populate_sizes = {'small': 100, 'medium': 5000, 'large': 20000}
_populate_dependencies = ['stock.location', 'product.product']
def _populate_factories(self):
product_ids = self.env['product.product'].search([
('id', 'in', self.env.registry.populated_models['product.product']),
('type', '=', 'product'),
('tracking', '=', 'none')
]).ids
locations = self.env['stock.location'].search([
('id', 'in', self.env.registry.populated_models['stock.location']),
('usage', '=', 'internal'),
])
return [
('location_id', populate.randomize(locations.ids)),
('product_id', populate.randomize(product_ids)),
('inventory_quantity', populate.randint(0, 100)),
]
def _populate(self, size):
res = super(StockQuant, self.with_context(inventory_move=True))._populate(size)
_logger.info("Apply %d inventories line", len(res))
res.action_apply_inventory()
return res
class PickingType(models.Model):
_inherit = 'stock.picking.type'
_populate_sizes = {'small': 9, 'medium': 30, 'large': 200}
_populate_dependencies = ['stock.location']
def _populate_factories(self):
company_ids = self.env.registry.populated_models['res.company'][:COMPANY_NB_WITH_STOCK]
warehouses = self.env['stock.warehouse'].browse(self.env.registry.populated_models['stock.warehouse'])
internal_locations = self.env['stock.location'].search([('company_id', 'in', company_ids), ('usage', '=', 'internal')])
in_warehouse_locations = self.env['stock.location'].search([('id', 'child_of', warehouses.lot_stock_id.ids)])
internal_locations &= in_warehouse_locations
def get_name(values, counter, random):
return "%d-%s-%d" % (values['company_id'], values['code'], counter)
def _compute_default_locations(iterator, field_name, model_name):
random = populate.Random('_compute_default_locations')
locations_by_company = dict(groupby(internal_locations, key=lambda loc: loc.company_id.id))
locations_by_company = {company_id: self.env['stock.location'].concat(*locations) for company_id, locations in locations_by_company.items()}
for values in iterator:
locations_company = locations_by_company[values['company_id']]
inter_location = random.choice(locations_company)
values['warehouse_id'] = inter_location.warehouse_id.id
if values['code'] == 'internal':
values['default_location_src_id'] = inter_location.id
values['default_location_dest_id'] = random.choice(locations_company - inter_location).id
elif values['code'] == 'incoming':
values['default_location_dest_id'] = inter_location.id
elif values['code'] == 'outgoing':
values['default_location_src_id'] = inter_location.id
yield values
def get_show_reserved(values, counter, random):
return values['code'] != 'incoming' # Simulate onchange of form
return [
('company_id', populate.iterate(company_ids)),
('code', populate.iterate(['incoming', 'outgoing', 'internal'], [0.3, 0.3, 0.4])),
('name', populate.compute(get_name)),
('sequence_code', populate.constant("PT{counter}")),
('_compute_default_locations', _compute_default_locations),
('show_reserved', populate.compute(get_show_reserved)),
]
class Picking(models.Model):
_inherit = 'stock.picking'
_populate_sizes = {'small': 100, 'medium': 2_000, 'large': 50_000}
_populate_dependencies = ['stock.location', 'stock.picking.type', 'res.partner']
def _populate_factories(self):
company_ids = self.env.registry.populated_models['res.company'][:COMPANY_NB_WITH_STOCK]
picking_types_ids = self.env['stock.picking.type'].browse(self.env.registry.populated_models['stock.picking.type']).ids
now = datetime.now()
cross_company_locations = self.env['stock.location'].search([('company_id', '=', False)])
locations_companies = self.env['stock.location'].search([('company_id', 'in', company_ids)])
all_partners = self.env['res.partner'].browse(self.env.registry.populated_models['res.partner'])
partners_by_company = dict(groupby(all_partners, key=lambda par: par.company_id.id))
partners_inter_company = self.env['res.partner'].concat(*partners_by_company.get(False, []))
partners_by_company = {com: self.env['res.partner'].concat(*partners) | partners_inter_company for com, partners in partners_by_company.items() if com}
def get_until_date(values, counter, random):
# 95.45 % of picking scheduled between (-10, 30) days and follow a gauss distribution (only +-15% picking is late)
delta = random.gauss(10, 10)
return now + timedelta(days=delta)
def get_partner_id(values, counter, random):
picking_type = self.env['stock.picking.type'].browse(values['picking_type_id'])
company = picking_type.company_id
return partners_by_company.get(company.id) and random.choice(partners_by_company[company.id]).id or False
def get_owner_id(values, counter, random):
picking_type = self.env['stock.picking.type'].browse(values['picking_type_id'])
company = picking_type.company_id
if company.id not in partners_by_company:
return False
if random.random() < 0.10: # For 10 % of picking, force owner_id
random.choice(partners_by_company[company.id]).id
def _compute_locations(iterator, field_name, model_name):
locations_out = cross_company_locations.filtered_domain([('usage', '=', 'customer')])
locations_in = cross_company_locations.filtered_domain([('usage', '=', 'supplier')])
locations_internal = locations_companies.filtered_domain([('usage', '=', 'internal')])
locations_by_company = dict(groupby(locations_companies, key=lambda loc: loc.company_id.id))
locations_by_company = {com: self.env['stock.location'].concat(*locs) for com, locs in locations_by_company.items()}
random = populate.Random('_compute_locations')
for values in iterator:
picking_type = self.env['stock.picking.type'].browse(values['picking_type_id'])
source_loc = picking_type.default_location_src_id
dest_loc = picking_type.default_location_dest_id
locations_company = locations_by_company[picking_type.company_id.id]
if not source_loc or random.random() > 0.8:
if picking_type.code == 'incoming':
source_loc = random.choice(locations_in)
elif picking_type.code == 'outgoing':
source_loc = random.choice(locations_internal & locations_company)
elif picking_type.code == 'internal':
source_loc = random.choice(locations_internal & locations_company)
if not dest_loc or random.random() > 0.8:
if picking_type.code == 'incoming':
dest_loc = random.choice(locations_internal & locations_company)
elif picking_type.code == 'outgoing':
dest_loc = random.choice(locations_out)
elif picking_type.code == 'internal':
# Need at most 2 internal locations
dest_loc = random.choice((locations_internal & locations_company) - source_loc)
values['location_id'] = source_loc.id
values['location_dest_id'] = dest_loc.id
yield values
return [
('priority', populate.randomize(['1', '0'], [0.05, 0.95])),
('scheduled_date', populate.compute(get_until_date)),
('picking_type_id', populate.iterate(picking_types_ids)),
('partner_id', populate.compute(get_partner_id)),
('owner_id', populate.compute(get_owner_id)),
('_compute_locations', _compute_locations),
]
class StockMove(models.Model):
_inherit = 'stock.move'
_populate_sizes = {'small': 1_000, 'medium': 20_000, 'large': 1_000_000}
_populate_dependencies = ['stock.picking', 'product.product']
def _populate(self, size):
moves = super()._populate(size)
def confirm_pickings(sample_ratio):
# Confirm sample_ratio * 100 % of picking
random = populate.Random('confirm_pickings')
picking_ids = moves.picking_id.ids
picking_to_confirm = self.env['stock.picking'].browse(random.sample(picking_ids, int(len(picking_ids) * sample_ratio)))
_logger.info("Confirm %d pickings" % len(picking_to_confirm))
picking_to_confirm.action_confirm()
return picking_to_confirm
def assign_picking(pickings):
_logger.info("Assign %d pickings" % len(pickings))
pickings.action_assign()
def validate_pickings(pickings, sample_ratio):
# Fill picking and validate it
random = populate.Random('validate_pickings')
picking_ids = pickings.ids
picking_to_validate = self.env['stock.picking'].browse(random.sample(picking_ids, int(len(picking_ids) * sample_ratio)))
_logger.info("Fill %d pickings with sml" % len(picking_to_validate))
sml_values = []
lot_values = []
package_values = []
for picking in picking_to_validate:
package_for_picking = None
if random.random() < 0.20: # 20 % of chance to use package
package_for_picking = {'name': picking.name}
for move in picking.move_ids:
# For assigned moves
for move_line in move._get_move_lines():
move_line.quantity = move_line.reserved_uom_qty
# Create move line for remaining qty
missing_to_do = move.product_qty - move.quantity
missing_to_do = move.product_uom._compute_quantity(missing_to_do, move.product_uom, rounding_method='HALF-UP')
if move.product_id.tracking == 'serial':
for i in range(int(missing_to_do)):
lot_values.append({
'name': "ValPick-%d-%d--%d" % (move.id, move.product_id.id, i),
'product_id': move.product_id.id,
'company_id': move.company_id.id
})
sml_values.append(dict(
**move._prepare_move_line_vals(),
quantity=1,
lot_id=len(lot_values) - 1,
package_id=package_for_picking and len(package_values) - 1 or False
))
elif move.product_id.tracking == 'lot':
lot_values.append({
'name': "ValPick-%d-%d" % (move.id, move.product_id.id),
'product_id': move.product_id.id,
'company_id': move.company_id.id
})
sml_values.append(dict(
**move._prepare_move_line_vals(),
quantity=missing_to_do,
lot_id=len(lot_values) - 1,
package_id=package_for_picking and len(package_values) - 1 or False
))
else:
sml_values.append(dict(
**move._prepare_move_line_vals(),
quantity=missing_to_do,
package_id=package_for_picking and len(package_values) - 1 or False
))
if package_for_picking:
package_values.append(package_for_picking)
_logger.info("Create lots (%d) for pickings to validate" % len(lot_values))
lots = self.env["stock.lot"].create(lot_values)
_logger.info("Create packages (%d) for pickings to validate" % len(package_values))
packages = self.env["stock.quant.package"].create(package_values)
_logger.info("Create sml (%d) for pickings to validate" % len(sml_values))
for vals in sml_values:
if vals.get('package_id') is not None:
vals['package_id'] = packages[vals['package_id']].id
if 'lot_id' in vals:
vals['lot_id'] = lots[vals['lot_id']].id
self.env['stock.move.line'].create(sml_values)
_logger.info("Validate %d of pickings" % len(picking_to_validate))
picking_to_validate.with_context(skip_backorder=True, skip_sms=True).button_validate()
# (Un)comment to test a DB with a lot of outgoing/incoming/internal confirmed moves, e.g. for testing of forecasted report
# pickings = confirm_pickings(0.8)
# (Un)comment to test a DB with a lot of outgoing/incoming/internal finished moves
# assign_picking(pickings)
# validate_pickings(pickings, 1)
return moves.exists() # Confirm picking can unlink some moves
@api.model
def _populate_attach_record_weight(self):
return ['picking_id'], [1]
@api.model
def _populate_attach_record_generator(self):
picking_ids = self.env['stock.picking'].browse(self.env.registry.populated_models['stock.picking'])
def next_picking_generator():
while picking_ids:
yield from picking_ids.ids
return {'picking_id': next_picking_generator()}
def _populate_factories(self):
product_ids = self.env['product.product'].browse(self.env.registry.populated_models['product.product']).filtered(lambda p: p.type in ('product', 'consu')).ids
random_products = populate.Random("move_product_sample")
product_ids = random_products.sample(product_ids, int(len(product_ids) * 0.8))
def get_product_uom(values, counter, random):
return self.env['product.product'].browse(values['product_id']).uom_id.id
def _attach_to_record(iterator, field_name, model_name):
random = populate.Random('_attach_to_record')
fields, weights = self._populate_attach_record_weight()
fields_generator = self._populate_attach_record_generator()
for values in iterator:
field = random.choices(fields, weights)[0]
values[field] = next(fields_generator[field])
yield values
def _compute_picking_values(iterator, field_name, model_name):
random = populate.Random('_compute_picking_values')
for values in iterator:
if values.get('picking_id'):
picking = self.env['stock.picking'].browse(values['picking_id'])
values['picking_id'] = picking.id
values['location_id'] = picking.location_id.id
values['location_dest_id'] = picking.location_dest_id.id
values['name'] = picking.name
values['date'] = picking.scheduled_date
values['company_id'] = picking.company_id.id
if picking.picking_type_id.code == 'incoming':
values['price_unit'] = random.randint(1, 100)
yield values
return [
('product_id', populate.randomize(product_ids)),
('product_uom', populate.compute(get_product_uom)),
('product_uom_qty', populate.randint(1, 10)),
('sequence', populate.randint(1, 1000)),
('_attach_to_record', _attach_to_record),
('_compute_picking_values', _compute_picking_values),
]