612 lines
29 KiB
Python
612 lines
29 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
|
|
|
import logging
|
|
import math
|
|
from datetime import datetime, timedelta
|
|
from itertools import product as cartesian_product
|
|
from collections import defaultdict
|
|
|
|
from odoo import models, api
|
|
from odoo.tools import populate, groupby
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
# Take X first company to put some stock on it data (it is to focus data on these companies)
|
|
COMPANY_NB_WITH_STOCK = 3 # Need to be smaller than 5 (_populate_sizes['small'] of company)
|
|
|
|
|
|
class Warehouse(models.Model):
|
|
_inherit = 'stock.warehouse'
|
|
|
|
_populate_sizes = {'small': 6, 'medium': 12, 'large': 24}
|
|
_populate_dependencies = ['res.company']
|
|
|
|
def _populate(self, size):
|
|
# Activate options used in the stock populate to have a ready Database
|
|
|
|
_logger.info("Activate settings for stock populate")
|
|
self.env['res.config.settings'].create({
|
|
'group_stock_production_lot': True, # Activate lot
|
|
'group_stock_tracking_lot': True, # Activate package
|
|
'group_stock_multi_locations': True, # Activate multi-locations
|
|
'group_stock_tracking_owner': True, # Activate owner_id
|
|
}).execute()
|
|
|
|
return super()._populate(size)
|
|
|
|
def _populate_factories(self):
|
|
company_ids = self.env.registry.populated_models['res.company'][:COMPANY_NB_WITH_STOCK]
|
|
|
|
def get_name(values, counter, random):
|
|
return "WH-%d-%d" % (values['company_id'], counter)
|
|
|
|
return [
|
|
('company_id', populate.iterate(company_ids)),
|
|
('name', populate.compute(get_name)),
|
|
('code', populate.constant("W{counter}")),
|
|
('reception_steps', populate.iterate(['one_step', 'two_steps', 'three_steps'], [0.6, 0.2, 0.2])),
|
|
('delivery_steps', populate.iterate(['ship_only', 'pick_ship', 'pick_pack_ship'], [0.6, 0.2, 0.2])),
|
|
]
|
|
|
|
|
|
class StorageCategory(models.Model):
|
|
_inherit = 'stock.storage.category'
|
|
|
|
_populate_sizes = {'small': 10, 'medium': 20, 'large': 50}
|
|
|
|
def _populate(self, size):
|
|
# Activate options used in the stock populate to have a ready Database
|
|
|
|
self.env['res.config.settings'].create({
|
|
'group_stock_storage_categories': True, # Activate storage categories
|
|
}).execute()
|
|
|
|
return super()._populate(size)
|
|
|
|
def _populate_factories(self):
|
|
|
|
return [
|
|
('name', populate.constant("SC-{counter}")),
|
|
('max_weight', populate.iterate([10, 100, 500, 1000])),
|
|
('allow_new_product', populate.randomize(['empty', 'same', 'mixed'], [0.1, 0.1, 0.8])),
|
|
]
|
|
|
|
|
|
class Location(models.Model):
|
|
_inherit = 'stock.location'
|
|
|
|
_populate_sizes = {'small': 50, 'medium': 2_000, 'large': 50_000}
|
|
_populate_dependencies = ['stock.warehouse', 'stock.storage.category']
|
|
|
|
def _populate(self, size):
|
|
locations = super()._populate(size)
|
|
|
|
random = populate.Random('stock_location_sample')
|
|
locations_sample = self.browse(random.sample(locations.ids, len(locations.ids)))
|
|
|
|
company_ids = self.env.registry.populated_models['res.company'][:COMPANY_NB_WITH_STOCK]
|
|
warehouses = self.env['stock.warehouse'].browse(self.env.registry.populated_models['stock.warehouse'])
|
|
|
|
warehouse_by_company = dict(groupby(warehouses, lambda ware: ware.company_id.id))
|
|
loc_ids_by_company = dict(groupby(locations_sample, lambda loc: loc.company_id.id))
|
|
|
|
scenario_index = 0
|
|
for company_id in company_ids:
|
|
loc_ids_by_company[company_id] = loc_ids_by_company[company_id][::-1] # Inverse the order to use pop()
|
|
warehouses = warehouse_by_company[company_id]
|
|
|
|
nb_loc_by_warehouse = math.ceil(len(loc_ids_by_company[company_id]) / len(warehouses))
|
|
|
|
for warehouse in warehouses:
|
|
# Manage the ceil, the last warehouse can have less locations than others.
|
|
nb_loc_to_take = min(nb_loc_by_warehouse, len(loc_ids_by_company[company_id]))
|
|
if scenario_index % 3 == 0:
|
|
# Scenario 1 : remain companies with "normal" level depth keep 4 levels max
|
|
depth = 3 # Force the number of level to 3 (root doesn't count)
|
|
elif scenario_index % 3 == 1:
|
|
# Scenario 2 : one company with very low level depth location tree (all child of root)
|
|
depth = 1
|
|
else:
|
|
# Scenario 3 : one company with high depth location tree
|
|
depth = 10
|
|
|
|
nb_by_level = int(math.log(nb_loc_to_take, depth)) + 1 if depth > 1 else nb_loc_to_take # number of loc to put by level
|
|
|
|
_logger.info("Create locations (%d) tree for a warehouse (%s) - depth : %d, width : %d" % (nb_loc_to_take, warehouse.code, depth, nb_by_level))
|
|
|
|
# Root is the lot_stock_id of warehouse
|
|
root = warehouse.lot_stock_id
|
|
|
|
def link_next_locations(parent, level):
|
|
if level < depth:
|
|
children = []
|
|
nonlocal nb_loc_to_take
|
|
nb_loc = min(nb_by_level, nb_loc_to_take)
|
|
nb_loc_to_take -= nb_loc
|
|
for i in range(nb_loc):
|
|
children.append(loc_ids_by_company[company_id].pop())
|
|
|
|
child_locations = self.env['stock.location'].concat(*children)
|
|
child_locations.location_id = parent # Quite slow, because the ORM flush each time
|
|
for child in child_locations:
|
|
link_next_locations(child, level + 1)
|
|
|
|
link_next_locations(root, 0)
|
|
scenario_index += 1
|
|
|
|
# Change 20 % the usage of some no-leaf location into 'view' (instead of 'internal')
|
|
to_views = locations_sample.filtered_domain([('child_ids', '!=', [])]).ids
|
|
random = populate.Random('stock_location_views')
|
|
view_locations = self.browse(random.sample(to_views, int(len(to_views) * 0.1)))
|
|
view_locations.write({
|
|
'usage': 'view',
|
|
'storage_category_id': False,
|
|
})
|
|
|
|
return locations
|
|
|
|
def _populate_factories(self):
|
|
company_ids = self.env.registry.populated_models['res.company'][:COMPANY_NB_WITH_STOCK]
|
|
removal_strategies = self.env['product.removal'].search([])
|
|
storage_category_ids = self.env.registry.populated_models['stock.storage.category']
|
|
|
|
def get_storage_category_id(values, counter, random):
|
|
if random.random() > 0.5:
|
|
return random.choice(storage_category_ids)
|
|
return False
|
|
|
|
return [
|
|
('name', populate.constant("Loc-{counter}")),
|
|
('usage', populate.constant('internal')),
|
|
('removal_strategy_id', populate.randomize(removal_strategies.ids + [False])),
|
|
('company_id', populate.iterate(company_ids)),
|
|
('storage_category_id', populate.compute(get_storage_category_id)),
|
|
]
|
|
|
|
|
|
class StockPutawayRule(models.Model):
|
|
_inherit = 'stock.putaway.rule'
|
|
|
|
_populate_sizes = {'small': 10, 'medium': 20, 'large': 50}
|
|
_populate_dependencies = ['stock.location', 'product.product']
|
|
|
|
def _populate_factories(self):
|
|
company_ids = self.env.registry.populated_models['res.company'][:COMPANY_NB_WITH_STOCK]
|
|
product_ids = self.env['product.product'].browse(self.env.registry.populated_models['product.product']).filtered(lambda p: p.type == 'product').ids
|
|
product_categ_ids = self.env.registry.populated_models['product.category']
|
|
storage_categ_ids = self.env.registry.populated_models['stock.storage.category']
|
|
location_ids = self.env['stock.location'].browse(self.env.registry.populated_models['stock.location']).filtered(lambda loc: loc.usage == 'internal')
|
|
|
|
def get_product_id(values, counter, random):
|
|
if random.random() > 0.5:
|
|
return random.choice(product_ids)
|
|
return False
|
|
|
|
def get_category_id(values, counter, random):
|
|
if not values['product_id']:
|
|
return random.choice(product_categ_ids)
|
|
return False
|
|
|
|
def get_location_in_id(values, counter, random):
|
|
locations = location_ids.filtered(lambda loc: loc.company_id.id == values['company_id'])
|
|
return random.choice(locations.ids)
|
|
|
|
def get_location_out_id(values, counter, random):
|
|
child_locs = self.env['stock.location'].search([
|
|
('id', 'child_of', values['location_in_id']),
|
|
('usage', '=', 'internal')
|
|
]) + self.env['stock.location'].browse(values['location_in_id'])
|
|
return random.choice(child_locs.ids)
|
|
|
|
return [
|
|
('company_id', populate.randomize(company_ids)),
|
|
('product_id', populate.compute(get_product_id)),
|
|
('category_id', populate.compute(get_category_id)),
|
|
('location_in_id', populate.compute(get_location_in_id)),
|
|
('location_out_id', populate.compute(get_location_out_id)),
|
|
('sequence', populate.randint(1, 1000)),
|
|
('storage_category_id', populate.randomize(storage_categ_ids)),
|
|
]
|
|
|
|
|
|
class StockWarehouseOrderpoint(models.Model):
|
|
_inherit = 'stock.warehouse.orderpoint'
|
|
|
|
_populate_sizes = {'small': 150, 'medium': 5_000, 'large': 60_000}
|
|
_populate_dependencies = ['product.product', 'product.supplierinfo', 'stock.location']
|
|
|
|
def _populate_factories(self):
|
|
|
|
warehouse_ids = self.env.registry.populated_models['stock.warehouse']
|
|
warehouses = self.env['stock.warehouse'].browse(warehouse_ids)
|
|
|
|
location_by_warehouse = {
|
|
warehouse.id: self.env['stock.location'].search([('id', 'child_of', warehouse.lot_stock_id.id)]).ids
|
|
for warehouse in warehouses
|
|
}
|
|
|
|
all_product_ids = set(self.env.registry.populated_models['product.product'])
|
|
|
|
supplierinfos = self.env['product.supplierinfo'].browse(self.env.registry.populated_models['product.supplierinfo'])
|
|
|
|
# Valid product by company (a supplier info exist for this product+company_id)
|
|
valid_product = defaultdict(set)
|
|
for suplierinfo in supplierinfos:
|
|
products = suplierinfo.product_id or suplierinfo.product_tmpl_id.product_variant_ids
|
|
# Reordering rule is only on the storable product
|
|
if products and products[0].type == 'product':
|
|
valid_product[suplierinfo.company_id.id] |= set(products.ids)
|
|
valid_product = {company_id: product_ids | valid_product[False] for company_id, product_ids in valid_product.items() if company_id}
|
|
invalid_product = {company_id: list(all_product_ids - product_ids) for company_id, product_ids in valid_product.items() if company_id}
|
|
valid_product = {company_id: list(product_ids) for company_id, product_ids in valid_product.items()}
|
|
|
|
def get_company_id(values, counter, random):
|
|
warehouse = self.env['stock.warehouse'].browse(values['warehouse_id'])
|
|
return warehouse.company_id.id
|
|
|
|
def get_location_product(iterator, field_name, model_name):
|
|
random = populate.Random('get_location_product')
|
|
|
|
# To avoid raise product_location_check : product/location/company (company is ensure because warehouse doesn't share location for now)
|
|
# Use generator to avoid cartisian product in memory
|
|
generator_valid_product_loc_dict = {}
|
|
generator_invalid_product_loc_dict = {}
|
|
for warehouse in warehouses:
|
|
# TODO: randomize cartesian product
|
|
generator_valid_product_loc_dict[warehouse.id] = cartesian_product(
|
|
# Force to begin by the main location of the warehouse
|
|
[warehouse.lot_stock_id.id] + random.sample(location_by_warehouse[warehouse.id], len(location_by_warehouse[warehouse.id])),
|
|
random.sample(valid_product[warehouse.company_id.id], len(valid_product[warehouse.company_id.id]))
|
|
)
|
|
generator_invalid_product_loc_dict[warehouse.id] = cartesian_product(
|
|
[warehouse.lot_stock_id.id] + random.sample(location_by_warehouse[warehouse.id], len(location_by_warehouse[warehouse.id])),
|
|
random.sample(invalid_product[warehouse.company_id.id], len(invalid_product[warehouse.company_id.id]))
|
|
)
|
|
|
|
for values in iterator:
|
|
# 95 % of the orderpoint will be valid (a supplier info exist for this product + company_id)
|
|
if random.random() < 0.95:
|
|
loc_id, product_id = next(generator_valid_product_loc_dict[values['warehouse_id']])
|
|
else:
|
|
loc_id, product_id = next(generator_invalid_product_loc_dict[values['warehouse_id']])
|
|
|
|
values['product_id'] = product_id
|
|
values['location_id'] = loc_id
|
|
yield values
|
|
|
|
return [
|
|
('active', populate.iterate([True, False], [0.95, 0.05])),
|
|
('warehouse_id', populate.iterate(warehouse_ids)),
|
|
('company_id', populate.compute(get_company_id)),
|
|
('_get_location_product', get_location_product),
|
|
('product_min_qty', populate.iterate([0.0, 2.0, 10.0], [0.6, 0.2, 0.2])),
|
|
('product_max_qty', populate.iterate([10.0, 20.0, 100.0], [0.6, 0.2, 0.2])),
|
|
('qty_multiple', populate.iterate([0.0, 1.0, 2.0, 10.0], [0.4, 0.2, 0.2, 0.2])),
|
|
]
|
|
|
|
|
|
class StockQuant(models.Model):
|
|
_inherit = 'stock.quant'
|
|
|
|
_populate_sizes = {'small': 100, 'medium': 5000, 'large': 20000}
|
|
_populate_dependencies = ['stock.location', 'product.product']
|
|
|
|
def _populate_factories(self):
|
|
|
|
product_ids = self.env['product.product'].search([
|
|
('id', 'in', self.env.registry.populated_models['product.product']),
|
|
('type', '=', 'product'),
|
|
('tracking', '=', 'none')
|
|
]).ids
|
|
locations = self.env['stock.location'].search([
|
|
('id', 'in', self.env.registry.populated_models['stock.location']),
|
|
('usage', '=', 'internal'),
|
|
])
|
|
|
|
return [
|
|
('location_id', populate.randomize(locations.ids)),
|
|
('product_id', populate.randomize(product_ids)),
|
|
('inventory_quantity', populate.randint(0, 100)),
|
|
]
|
|
|
|
def _populate(self, size):
|
|
res = super(StockQuant, self.with_context(inventory_move=True))._populate(size)
|
|
|
|
_logger.info("Apply %d inventories line", len(res))
|
|
res.action_apply_inventory()
|
|
|
|
return res
|
|
|
|
class PickingType(models.Model):
|
|
_inherit = 'stock.picking.type'
|
|
|
|
_populate_sizes = {'small': 9, 'medium': 30, 'large': 200}
|
|
_populate_dependencies = ['stock.location']
|
|
|
|
def _populate_factories(self):
|
|
company_ids = self.env.registry.populated_models['res.company'][:COMPANY_NB_WITH_STOCK]
|
|
warehouses = self.env['stock.warehouse'].browse(self.env.registry.populated_models['stock.warehouse'])
|
|
internal_locations = self.env['stock.location'].search([('company_id', 'in', company_ids), ('usage', '=', 'internal')])
|
|
in_warehouse_locations = self.env['stock.location'].search([('id', 'child_of', warehouses.lot_stock_id.ids)])
|
|
internal_locations &= in_warehouse_locations
|
|
|
|
def get_name(values, counter, random):
|
|
return "%d-%s-%d" % (values['company_id'], values['code'], counter)
|
|
|
|
def _compute_default_locations(iterator, field_name, model_name):
|
|
random = populate.Random('_compute_default_locations')
|
|
locations_by_company = dict(groupby(internal_locations, key=lambda loc: loc.company_id.id))
|
|
locations_by_company = {company_id: self.env['stock.location'].concat(*locations) for company_id, locations in locations_by_company.items()}
|
|
|
|
for values in iterator:
|
|
|
|
locations_company = locations_by_company[values['company_id']]
|
|
inter_location = random.choice(locations_company)
|
|
values['warehouse_id'] = inter_location.warehouse_id.id
|
|
if values['code'] == 'internal':
|
|
values['default_location_src_id'] = inter_location.id
|
|
values['default_location_dest_id'] = random.choice(locations_company - inter_location).id
|
|
elif values['code'] == 'incoming':
|
|
values['default_location_dest_id'] = inter_location.id
|
|
elif values['code'] == 'outgoing':
|
|
values['default_location_src_id'] = inter_location.id
|
|
|
|
yield values
|
|
|
|
def get_show_reserved(values, counter, random):
|
|
return values['code'] != 'incoming' # Simulate onchange of form
|
|
|
|
return [
|
|
('company_id', populate.iterate(company_ids)),
|
|
('code', populate.iterate(['incoming', 'outgoing', 'internal'], [0.3, 0.3, 0.4])),
|
|
('name', populate.compute(get_name)),
|
|
('sequence_code', populate.constant("PT{counter}")),
|
|
('_compute_default_locations', _compute_default_locations),
|
|
('show_reserved', populate.compute(get_show_reserved)),
|
|
]
|
|
|
|
|
|
class Picking(models.Model):
|
|
_inherit = 'stock.picking'
|
|
|
|
_populate_sizes = {'small': 100, 'medium': 2_000, 'large': 50_000}
|
|
_populate_dependencies = ['stock.location', 'stock.picking.type', 'res.partner']
|
|
|
|
def _populate_factories(self):
|
|
company_ids = self.env.registry.populated_models['res.company'][:COMPANY_NB_WITH_STOCK]
|
|
|
|
picking_types_ids = self.env['stock.picking.type'].browse(self.env.registry.populated_models['stock.picking.type']).ids
|
|
|
|
now = datetime.now()
|
|
|
|
cross_company_locations = self.env['stock.location'].search([('company_id', '=', False)])
|
|
locations_companies = self.env['stock.location'].search([('company_id', 'in', company_ids)])
|
|
|
|
all_partners = self.env['res.partner'].browse(self.env.registry.populated_models['res.partner'])
|
|
partners_by_company = dict(groupby(all_partners, key=lambda par: par.company_id.id))
|
|
partners_inter_company = self.env['res.partner'].concat(*partners_by_company.get(False, []))
|
|
partners_by_company = {com: self.env['res.partner'].concat(*partners) | partners_inter_company for com, partners in partners_by_company.items() if com}
|
|
|
|
def get_until_date(values, counter, random):
|
|
# 95.45 % of picking scheduled between (-10, 30) days and follow a gauss distribution (only +-15% picking is late)
|
|
delta = random.gauss(10, 10)
|
|
return now + timedelta(days=delta)
|
|
|
|
def get_partner_id(values, counter, random):
|
|
picking_type = self.env['stock.picking.type'].browse(values['picking_type_id'])
|
|
company = picking_type.company_id
|
|
return partners_by_company.get(company.id) and random.choice(partners_by_company[company.id]).id or False
|
|
|
|
def get_owner_id(values, counter, random):
|
|
picking_type = self.env['stock.picking.type'].browse(values['picking_type_id'])
|
|
company = picking_type.company_id
|
|
if company.id not in partners_by_company:
|
|
return False
|
|
if random.random() < 0.10: # For 10 % of picking, force owner_id
|
|
random.choice(partners_by_company[company.id]).id
|
|
|
|
def _compute_locations(iterator, field_name, model_name):
|
|
locations_out = cross_company_locations.filtered_domain([('usage', '=', 'customer')])
|
|
locations_in = cross_company_locations.filtered_domain([('usage', '=', 'supplier')])
|
|
locations_internal = locations_companies.filtered_domain([('usage', '=', 'internal')])
|
|
locations_by_company = dict(groupby(locations_companies, key=lambda loc: loc.company_id.id))
|
|
locations_by_company = {com: self.env['stock.location'].concat(*locs) for com, locs in locations_by_company.items()}
|
|
|
|
random = populate.Random('_compute_locations')
|
|
for values in iterator:
|
|
picking_type = self.env['stock.picking.type'].browse(values['picking_type_id'])
|
|
|
|
source_loc = picking_type.default_location_src_id
|
|
dest_loc = picking_type.default_location_dest_id
|
|
|
|
locations_company = locations_by_company[picking_type.company_id.id]
|
|
if not source_loc or random.random() > 0.8:
|
|
if picking_type.code == 'incoming':
|
|
source_loc = random.choice(locations_in)
|
|
elif picking_type.code == 'outgoing':
|
|
source_loc = random.choice(locations_internal & locations_company)
|
|
elif picking_type.code == 'internal':
|
|
source_loc = random.choice(locations_internal & locations_company)
|
|
|
|
if not dest_loc or random.random() > 0.8:
|
|
if picking_type.code == 'incoming':
|
|
dest_loc = random.choice(locations_internal & locations_company)
|
|
elif picking_type.code == 'outgoing':
|
|
dest_loc = random.choice(locations_out)
|
|
elif picking_type.code == 'internal':
|
|
# Need at most 2 internal locations
|
|
dest_loc = random.choice((locations_internal & locations_company) - source_loc)
|
|
|
|
values['location_id'] = source_loc.id
|
|
values['location_dest_id'] = dest_loc.id
|
|
yield values
|
|
|
|
return [
|
|
('priority', populate.randomize(['1', '0'], [0.05, 0.95])),
|
|
('scheduled_date', populate.compute(get_until_date)),
|
|
('picking_type_id', populate.iterate(picking_types_ids)),
|
|
('partner_id', populate.compute(get_partner_id)),
|
|
('owner_id', populate.compute(get_owner_id)),
|
|
('_compute_locations', _compute_locations),
|
|
]
|
|
|
|
|
|
class StockMove(models.Model):
|
|
_inherit = 'stock.move'
|
|
|
|
_populate_sizes = {'small': 1_000, 'medium': 20_000, 'large': 1_000_000}
|
|
_populate_dependencies = ['stock.picking', 'product.product']
|
|
|
|
def _populate(self, size):
|
|
moves = super()._populate(size)
|
|
|
|
def confirm_pickings(sample_ratio):
|
|
# Confirm sample_ratio * 100 % of picking
|
|
random = populate.Random('confirm_pickings')
|
|
picking_ids = moves.picking_id.ids
|
|
picking_to_confirm = self.env['stock.picking'].browse(random.sample(picking_ids, int(len(picking_ids) * sample_ratio)))
|
|
_logger.info("Confirm %d pickings" % len(picking_to_confirm))
|
|
picking_to_confirm.action_confirm()
|
|
return picking_to_confirm
|
|
|
|
def assign_picking(pickings):
|
|
_logger.info("Assign %d pickings" % len(pickings))
|
|
pickings.action_assign()
|
|
|
|
def validate_pickings(pickings, sample_ratio):
|
|
# Fill picking and validate it
|
|
random = populate.Random('validate_pickings')
|
|
picking_ids = pickings.ids
|
|
picking_to_validate = self.env['stock.picking'].browse(random.sample(picking_ids, int(len(picking_ids) * sample_ratio)))
|
|
|
|
_logger.info("Fill %d pickings with sml" % len(picking_to_validate))
|
|
sml_values = []
|
|
lot_values = []
|
|
package_values = []
|
|
for picking in picking_to_validate:
|
|
package_for_picking = None
|
|
if random.random() < 0.20: # 20 % of chance to use package
|
|
package_for_picking = {'name': picking.name}
|
|
for move in picking.move_ids:
|
|
# For assigned moves
|
|
for move_line in move._get_move_lines():
|
|
move_line.quantity = move_line.reserved_uom_qty
|
|
# Create move line for remaining qty
|
|
missing_to_do = move.product_qty - move.quantity
|
|
missing_to_do = move.product_uom._compute_quantity(missing_to_do, move.product_uom, rounding_method='HALF-UP')
|
|
if move.product_id.tracking == 'serial':
|
|
for i in range(int(missing_to_do)):
|
|
lot_values.append({
|
|
'name': "ValPick-%d-%d--%d" % (move.id, move.product_id.id, i),
|
|
'product_id': move.product_id.id,
|
|
'company_id': move.company_id.id
|
|
})
|
|
sml_values.append(dict(
|
|
**move._prepare_move_line_vals(),
|
|
quantity=1,
|
|
lot_id=len(lot_values) - 1,
|
|
package_id=package_for_picking and len(package_values) - 1 or False
|
|
))
|
|
elif move.product_id.tracking == 'lot':
|
|
lot_values.append({
|
|
'name': "ValPick-%d-%d" % (move.id, move.product_id.id),
|
|
'product_id': move.product_id.id,
|
|
'company_id': move.company_id.id
|
|
})
|
|
sml_values.append(dict(
|
|
**move._prepare_move_line_vals(),
|
|
quantity=missing_to_do,
|
|
lot_id=len(lot_values) - 1,
|
|
package_id=package_for_picking and len(package_values) - 1 or False
|
|
))
|
|
else:
|
|
sml_values.append(dict(
|
|
**move._prepare_move_line_vals(),
|
|
quantity=missing_to_do,
|
|
package_id=package_for_picking and len(package_values) - 1 or False
|
|
))
|
|
if package_for_picking:
|
|
package_values.append(package_for_picking)
|
|
|
|
_logger.info("Create lots (%d) for pickings to validate" % len(lot_values))
|
|
lots = self.env["stock.lot"].create(lot_values)
|
|
_logger.info("Create packages (%d) for pickings to validate" % len(package_values))
|
|
packages = self.env["stock.quant.package"].create(package_values)
|
|
|
|
_logger.info("Create sml (%d) for pickings to validate" % len(sml_values))
|
|
for vals in sml_values:
|
|
if vals.get('package_id') is not None:
|
|
vals['package_id'] = packages[vals['package_id']].id
|
|
if 'lot_id' in vals:
|
|
vals['lot_id'] = lots[vals['lot_id']].id
|
|
self.env['stock.move.line'].create(sml_values)
|
|
|
|
_logger.info("Validate %d of pickings" % len(picking_to_validate))
|
|
picking_to_validate.with_context(skip_backorder=True, skip_sms=True).button_validate()
|
|
|
|
# (Un)comment to test a DB with a lot of outgoing/incoming/internal confirmed moves, e.g. for testing of forecasted report
|
|
# pickings = confirm_pickings(0.8)
|
|
|
|
# (Un)comment to test a DB with a lot of outgoing/incoming/internal finished moves
|
|
# assign_picking(pickings)
|
|
# validate_pickings(pickings, 1)
|
|
|
|
return moves.exists() # Confirm picking can unlink some moves
|
|
|
|
@api.model
|
|
def _populate_attach_record_weight(self):
|
|
return ['picking_id'], [1]
|
|
|
|
@api.model
|
|
def _populate_attach_record_generator(self):
|
|
picking_ids = self.env['stock.picking'].browse(self.env.registry.populated_models['stock.picking'])
|
|
|
|
def next_picking_generator():
|
|
while picking_ids:
|
|
yield from picking_ids.ids
|
|
|
|
return {'picking_id': next_picking_generator()}
|
|
|
|
def _populate_factories(self):
|
|
product_ids = self.env['product.product'].browse(self.env.registry.populated_models['product.product']).filtered(lambda p: p.type in ('product', 'consu')).ids
|
|
random_products = populate.Random("move_product_sample")
|
|
product_ids = random_products.sample(product_ids, int(len(product_ids) * 0.8))
|
|
|
|
def get_product_uom(values, counter, random):
|
|
return self.env['product.product'].browse(values['product_id']).uom_id.id
|
|
|
|
def _attach_to_record(iterator, field_name, model_name):
|
|
random = populate.Random('_attach_to_record')
|
|
fields, weights = self._populate_attach_record_weight()
|
|
fields_generator = self._populate_attach_record_generator()
|
|
|
|
for values in iterator:
|
|
field = random.choices(fields, weights)[0]
|
|
values[field] = next(fields_generator[field])
|
|
yield values
|
|
|
|
def _compute_picking_values(iterator, field_name, model_name):
|
|
random = populate.Random('_compute_picking_values')
|
|
for values in iterator:
|
|
if values.get('picking_id'):
|
|
picking = self.env['stock.picking'].browse(values['picking_id'])
|
|
values['picking_id'] = picking.id
|
|
values['location_id'] = picking.location_id.id
|
|
values['location_dest_id'] = picking.location_dest_id.id
|
|
values['name'] = picking.name
|
|
values['date'] = picking.scheduled_date
|
|
values['company_id'] = picking.company_id.id
|
|
if picking.picking_type_id.code == 'incoming':
|
|
values['price_unit'] = random.randint(1, 100)
|
|
yield values
|
|
|
|
return [
|
|
('product_id', populate.randomize(product_ids)),
|
|
('product_uom', populate.compute(get_product_uom)),
|
|
('product_uom_qty', populate.randint(1, 10)),
|
|
('sequence', populate.randint(1, 1000)),
|
|
('_attach_to_record', _attach_to_record),
|
|
('_compute_picking_values', _compute_picking_values),
|
|
]
|