864 lines
38 KiB
Python
864 lines
38 KiB
Python
"""
|
|
A simple business simulation game with a focus on market dynamics, crafting and trading.
|
|
|
|
Note: Requires 'rich' and 'json' libraries for functionality.
|
|
"""
|
|
|
|
import json
|
|
import random
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
from rich.prompt import Prompt
|
|
from rich import box
|
|
from rich.panel import Panel
|
|
from rich.text import Text
|
|
|
|
crafting_recipes = {
|
|
'Industrial Synthesis': {'input': {'Coal': 4, 'Copper': 4}, 'output': {'Conductor': 10}, 'turns': 3},
|
|
'Manual Synthesis': {'input': {'Coal': 1, 'Copper': 1}, 'output': {'Conductor': 2}, 'turns': 2},
|
|
'Microchip Fabrication': {'input': {'Silicon': 2, 'Conductor': 1}, 'output': {'Microchips': 5}, 'turns': 4},
|
|
'Battery Production': {'input': {'Lithium': 3, 'Copper': 2}, 'output': {'Batteries': 4}, 'turns': 3}
|
|
}
|
|
|
|
|
|
def main():
|
|
"""Initializes the game environment, runs the game loop."""
|
|
console = Console()
|
|
competitors_ids = ["Player", "RationalAI", "RiskTakingAI"]
|
|
market = Market()
|
|
|
|
# Initialise the player
|
|
player = Company("Player", [cid for cid in competitors_ids if cid != "Player"], market)
|
|
|
|
# Initialise AI competitors
|
|
rational_ai = AICompany("RationalAI", competitors_ids, market, risk_tolerance=0.3)
|
|
risk_taking_ai = AICompany("RiskTakingAI", competitors_ids, market, risk_tolerance=0.7)
|
|
ai_competitors = [rational_ai, risk_taking_ai]
|
|
market.companies = {company.player_id: company for company in ai_competitors + [player]}
|
|
|
|
# Game loop
|
|
for turn in range(1, 27):
|
|
console.print(f"\n--- Turn {turn} ---\n", style="grey50")
|
|
market.inflation_rate += market.inflation_change if market.inflation_growing else -market.inflation_change
|
|
market.inflation_rate = max(0.0001, min(market.inflation_rate, 0.05))
|
|
market.gdp *= (1 + (market.inflation_rate / 20))
|
|
market.update_market()
|
|
market.print_market_events()
|
|
|
|
for company in [player] + ai_competitors:
|
|
company.update_crafting()
|
|
company.make_decision(market, ai_competitors)
|
|
|
|
print_ai_actions(ai_competitors)
|
|
|
|
final_scores = market.calculate_final_scores()
|
|
display_final_scores(console, final_scores, market)
|
|
|
|
|
|
def load_json(filename):
|
|
"""Loads and returns data from a JSON file specified by the filename."""
|
|
try:
|
|
with open(filename) as file:
|
|
return json.load(file)
|
|
except FileNotFoundError:
|
|
print(f"Error: The file {filename} was not found.")
|
|
exit(1)
|
|
|
|
|
|
class Market:
|
|
"""
|
|
Represents the game's market, managing product prices, stock ledger, and economic indicators.
|
|
|
|
Attributes:
|
|
current_turn (int): Counter for the game turn.
|
|
companies (dict): Stores company objects with their IDs.
|
|
products (dict): Current prices of products.
|
|
starting_prices (dict): Initial prices of products for comparison.
|
|
events (list): Potential market events affecting prices.
|
|
adjust_prices (function): Adjusts product prices based on trends.
|
|
event_effects (dict): Effects of market events on prices.
|
|
stock_ledger (dict): Tracks stock ownership.
|
|
inflation_rate (float): Inflation rate.
|
|
unemployment_rate (float): Unemployment rate.
|
|
gdp (float): Gross Domestic Product value.
|
|
|
|
The market is updated every turn.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.current_turn = 0
|
|
self.companies = {}
|
|
self.products = {
|
|
'Coal': 10.0, 'Copper': 15.0, 'Conductor': 20.0,
|
|
'Silicon': 12.0, 'Lithium': 18.0, 'Microchips': 30.0, 'Batteries': 25.0
|
|
}
|
|
self.starting_prices = self.products.copy()
|
|
self.events = load_json('market_events.json')["events"]
|
|
self.stock_ledger = {}
|
|
self.inflation_rate = 0.05
|
|
self.inflation_growing = True
|
|
self.inflation_change = random.uniform(0.0001, 0.001)
|
|
self.gdp = 500000
|
|
self.unemployment_rate = 0.04
|
|
self.trade_volume = 0
|
|
self.total_bought = {product: 0 for product in self.products}
|
|
self.total_sold = {product: 0 for product in self.products}
|
|
self.previous_prices = self.products.copy()
|
|
self.last_event_name = None
|
|
self.trend_factor = 1.01
|
|
|
|
def update_market(self):
|
|
self.current_turn += 1
|
|
self.previous_prices = self.products.copy()
|
|
|
|
# Update prices based on inflation and random fluctuations
|
|
for product in self.products:
|
|
inflation_adjustment = self.inflation_rate
|
|
random_fluctuation = random.uniform(-0.01, 0.01) # ±1% fluctuation
|
|
price_change_factor = 1 + inflation_adjustment + random_fluctuation
|
|
self.products[product] *= price_change_factor
|
|
|
|
self.reset_trade_volumes()
|
|
|
|
# Procedural generation of market trends
|
|
self.trend_factor = self.generate_market_trend()
|
|
|
|
# Update economic indicators
|
|
self.update_economic_indicators()
|
|
|
|
# Handle market events
|
|
self.handle_market_events()
|
|
|
|
def update_prices(self):
|
|
min_prices = {
|
|
'Coal': 0.01, 'Copper': 0.20, 'Conductor': 1.50,
|
|
'Silicon': 0.50, 'Lithium': 0.50, 'Microchips': 2.00, 'Batteries': 5.00
|
|
}
|
|
max_change = 0.1
|
|
|
|
for product in self.products:
|
|
inflation_adjustment = self.inflation_rate
|
|
random_fluctuation = random.uniform(-max_change, max_change)
|
|
price_change_factor = 1 + inflation_adjustment + random_fluctuation
|
|
|
|
new_price = self.products[product] * price_change_factor
|
|
self.products[product] = max(min_prices[product], new_price)
|
|
|
|
self.reset_trade_volumes()
|
|
self.trend_factor = self.generate_market_trend()
|
|
self.update_economic_indicators()
|
|
self.handle_market_events()
|
|
def calculate_price_change(self, product):
|
|
demand_factor = self.total_bought[product] - self.total_sold[product]
|
|
# Here you can define how strongly demand affects the price, e.g., 0.05 as in your original code
|
|
return demand_factor * 0.05
|
|
|
|
def reset_trade_volumes(self):
|
|
self.total_bought = dict.fromkeys(self.total_bought, 0)
|
|
self.total_sold = dict.fromkeys(self.total_sold, 0)
|
|
|
|
def generate_market_trend(self):
|
|
trend_change = random.uniform(-0.02, 0.02)
|
|
return max(0.5, min(self.trend_factor + trend_change, 1.5))
|
|
|
|
def update_economic_indicators(self):
|
|
"""Updates key economic indicators like inflation and unemployment rates."""
|
|
|
|
# Update inflation_rate
|
|
self.inflation_rate += self.inflation_change if self.inflation_growing else -self.inflation_change
|
|
self.inflation_rate = max(0.0001, min(self.inflation_rate, 0.05)) # Clamp the inflation_rate
|
|
|
|
# Update GDP
|
|
self.gdp *= (1 + (self.inflation_rate / 2))
|
|
|
|
# Update unemployment_rate
|
|
self.unemployment_rate = min(max(self.unemployment_rate + random.uniform(-0.005, 0.005), 0), 1)
|
|
|
|
# Optional: Print statements for debugging
|
|
print(f"Inflation Rate: {self.inflation_rate}")
|
|
print(f"GDP: {self.gdp}")
|
|
print(f"Unemployment Rate: {self.unemployment_rate}")
|
|
|
|
def update_inflation_rate(self):
|
|
lower_bound = 0.0001
|
|
upper_bound = 0.049
|
|
if self.inflation_rate <= lower_bound:
|
|
self.inflation_growing = True
|
|
elif self.inflation_rate >= upper_bound:
|
|
self.inflation_growing = False
|
|
self.inflation_rate += self.inflation_change if self.inflation_growing else -self.inflation_change
|
|
|
|
def update_unemployment_rate(self):
|
|
fluctuation = random.uniform(-0.005, 0.005)
|
|
self.unemployment_rate = max(0, min(self.unemployment_rate + fluctuation, 1))
|
|
|
|
def handle_market_events(self):
|
|
event = random.choices(self.events, weights=[e["probability"] for e in self.events])[0]
|
|
self.last_event_name = event["name"]
|
|
getattr(self, event["effect"])() if event["effect"] in dir(self) else None
|
|
|
|
def tech_boost_effect(self):
|
|
self.products['Conductor'] *= 1.2
|
|
|
|
def resource_scarcity_effect(self):
|
|
random_resource = random.choice(['Coal', 'Copper'])
|
|
self.products[random_resource] *= 1.5
|
|
|
|
def trade_agreement_effect(self, price):
|
|
"""Handles the effect of a new trade agreement event."""
|
|
return price * 0.9
|
|
|
|
def innovation_breakthrough_effect(self, price):
|
|
"""Handles the effect of an innovation breakthrough event."""
|
|
return price * 0.8
|
|
|
|
def recession_effect(self, price):
|
|
"""Handles the effect of a recession event on product prices."""
|
|
return price * 0.7
|
|
|
|
def print_market_events(self):
|
|
"""
|
|
Prints the latest market events and key economic indicators in a formatted panel.
|
|
"""
|
|
# Prepare the console and panel text for printing
|
|
console = Console()
|
|
event_info = f"Last Market Event: {self.last_event_name}\n"
|
|
economic_indicators = (
|
|
f"Inflation Rate: {self.inflation_rate * 100:.2f}%\n"
|
|
f"Unemployment Rate: {self.unemployment_rate * 100:.2f}%\n"
|
|
f"GDP: {self.gdp:.2f} €"
|
|
)
|
|
|
|
# Create a formatted panel text
|
|
panel_text = Text(event_info + economic_indicators)
|
|
|
|
# Print the panel with styling
|
|
console.print(Panel(panel_text, title="[bold]Market Events and Indicators", border_style="grey50"))
|
|
|
|
def update_stock_ledger(self, company_id, owner_id, amount):
|
|
"""Updates the stock ledger for a given company and owner based on the transaction amount."""
|
|
self.stock_ledger[company_id, owner_id] = self.stock_ledger.get((company_id, owner_id), 0) + amount
|
|
|
|
def get_stock_ownership(self, company_id, owner_id):
|
|
"""Returns the number of stocks owned by a given owner for a specified company."""
|
|
return self.stock_ledger.get((company_id, owner_id), 0)
|
|
|
|
def get_stock_price(self, company_id):
|
|
"""Calculates and returns the current stock price for a specified company."""
|
|
return round(self.companies[company_id].value / 100.0, 2)
|
|
|
|
def update_market(self):
|
|
self.current_turn += 1
|
|
self.previous_prices = self.products.copy()
|
|
|
|
# Update prices based on supply and demand
|
|
for product in self.products:
|
|
supply_chain_impact = self.simulate_supply_chain_impact(product)
|
|
demand_factor = self.total_bought[product] - self.total_sold[product]
|
|
self.products[product] *= (1 + demand_factor * 0.05) * supply_chain_impact
|
|
|
|
self.reset_trade_volumes()
|
|
|
|
# Procedural generation of market trends
|
|
self.trend_factor = self.generate_market_trend()
|
|
|
|
# Update economic indicators
|
|
self.update_economic_indicators()
|
|
|
|
# Handle market events
|
|
self.handle_market_events()
|
|
|
|
def simulate_supply_chain_impact(self, product):
|
|
return random.uniform(0.9, 1.1)
|
|
|
|
def generate_market_trend(self):
|
|
trend_change = random.uniform(-0.02, 0.02)
|
|
new_trend = self.trend_factor + trend_change
|
|
return max(0.5, min(new_trend, 1.5))
|
|
|
|
def reset_trade_volumes(self):
|
|
for product in self.total_bought.keys():
|
|
self.total_bought[product] = 0
|
|
self.total_sold[product] = 0
|
|
|
|
def record_trade(self, value):
|
|
self.trade_volume += value
|
|
|
|
def adjust_prices(self):
|
|
"""Adjusts product prices in the market based on inflation and random fluctuations."""
|
|
for product, price in self.products.items():
|
|
inflation_adjustment = price * self.inflation_rate
|
|
fluctuation = random.uniform(-0.03, 0.03)
|
|
self.products[product] = round(max(1.5, price + inflation_adjustment + fluctuation), 2)
|
|
return self.products
|
|
|
|
def handle_competitor_event(self, effect):
|
|
"""Handles market events related to competitors, adjusting product prices accordingly."""
|
|
adjustment = float(random.randint(1, 3))
|
|
self.products = {k: max(1.0, v - adjustment) if effect == "new_competitor" else v + adjustment for k, v in
|
|
self.products.items()}
|
|
|
|
def update_economic_indicators(self):
|
|
"""Updates key economic indicators like inflation and unemployment rates."""
|
|
|
|
# Check bounds and change state if necessary
|
|
if self.inflation_rate <= 0.0001: # Lower bound
|
|
self.inflation_growing = True
|
|
self.inflation_change = random.uniform(0.0001, 0.001) # Reset change rate
|
|
elif self.inflation_rate >= 0.049: # Upper bound
|
|
self.inflation_growing = False
|
|
self.inflation_change = random.uniform(0.0001, 0.001) # Reset change rate
|
|
|
|
# Adjust inflation rate based on the current state
|
|
if self.inflation_growing:
|
|
self.inflation_rate += self.inflation_change
|
|
else:
|
|
self.inflation_rate -= self.inflation_change
|
|
|
|
# Unemployment rate adjustment with random fluctuation
|
|
self.unemployment_rate = min(max(self.unemployment_rate + random.uniform(-0.005, 0.005), 0), 1)
|
|
|
|
def calculate_final_scores(self):
|
|
"""Calculates and returns final scores for each company based on value and stock ownership."""
|
|
final_scores = {}
|
|
for company_id, company in self.companies.items():
|
|
final_score = company.value
|
|
majority_owner = max(company.own_stock_ownership,
|
|
key=lambda owner: (company.own_stock_ownership[owner], owner))
|
|
majority_percentage = company.own_stock_ownership[majority_owner]
|
|
is_major_owner = majority_percentage >= 51
|
|
if company_id not in final_scores:
|
|
final_scores[company_id] = {'score': 0, 'note': '', 'majority_owner': majority_owner}
|
|
for owner_id, percentage in company.own_stock_ownership.items():
|
|
if percentage > 20:
|
|
final_scores[owner_id] = final_scores.get(owner_id, {'score': 0, 'note': '', 'majority_owner': ''})
|
|
final_scores[owner_id]['score'] += final_score * (percentage / 100)
|
|
if not is_major_owner:
|
|
remaining_score = final_score - sum(
|
|
final_scores.get(owner, {'score': 0})['score'] for owner in company.own_stock_ownership)
|
|
final_scores[company_id]['score'] += remaining_score
|
|
return final_scores
|
|
|
|
|
|
class Company:
|
|
"""
|
|
Base class for a company in the game, handling inventory, stock, and financial transactions.
|
|
|
|
Attributes:
|
|
player_id (str): Unique identifier for the company.
|
|
cash (float): Available cash for transactions.
|
|
inventory (dict): Current inventory of products.
|
|
crafting_queue (list): Queue of products being crafted.
|
|
own_stock_ownership (dict): Ownership percentage of own stocks.
|
|
stock_holdings (dict): Holdings of other companies' stocks.
|
|
total_shares (int): Total shares available in the company.
|
|
_market (Market): Reference to the game's market.
|
|
_debug (bool): Flag for enabling debug mode.
|
|
"""
|
|
|
|
def __init__(self, player_id, competitors_ids, market=None, debug=False):
|
|
self.player_id = player_id
|
|
self.cash = 500.0
|
|
self.inventory = {'Coal': 0, 'Copper': 0, 'Conductor': 0,
|
|
'Silicon': 0, 'Lithium': 0, 'Microchips': 0, 'Batteries': 0}
|
|
self.crafting_queue = []
|
|
self.own_stock_ownership = {cid: 51 if cid == player_id else 0 for cid in [player_id] + competitors_ids}
|
|
self.stock_holdings = {cid: 0 for cid in set([player_id] + competitors_ids + ["RationalAI", "RiskTakingAI"])}
|
|
self.total_shares = 100
|
|
self._market = market
|
|
self._debug = debug
|
|
|
|
@property
|
|
def value(self):
|
|
"""Calculates the total value of the company, combining cash and the market value of its inventory."""
|
|
return self.cash + sum(self.inventory[product] * price for product, price in self._market.products.items())
|
|
|
|
def craft_product(self, recipe_key):
|
|
"""Processes crafting of a product based on the chosen recipe."""
|
|
if self._debug:
|
|
print(f"Inventory before crafting: {self.inventory}")
|
|
recipe = crafting_recipes[recipe_key]
|
|
if all(self.inventory[product] >= quantity for product, quantity in recipe['input'].items()):
|
|
self.crafting_queue.append({'recipe': recipe, 'turns_remaining': recipe['turns']})
|
|
self._update_inventory(recipe['input'], decrease=True)
|
|
print("Crafting order placed.")
|
|
else:
|
|
print("Not enough resources to craft.")
|
|
if self._debug:
|
|
print(f"Inventory after crafting: {self.inventory}")
|
|
|
|
def _update_inventory(self, items, decrease=False):
|
|
"""Updates the inventory based on the given items."""
|
|
if self._debug:
|
|
print(f"Inventory before update: {self.inventory}")
|
|
for product, quantity in items.items():
|
|
if decrease:
|
|
self.inventory[product] -= quantity
|
|
else:
|
|
self.inventory[product] += quantity
|
|
if self._debug:
|
|
print(f"Inventory after update: {self.inventory}")
|
|
|
|
def update_crafting(self):
|
|
"""Updates the crafting queue, completing orders as their turns conclude."""
|
|
if self._debug:
|
|
print(f"Crafting queue before update: {self.crafting_queue}")
|
|
completed_orders = []
|
|
for order in self.crafting_queue:
|
|
if self._debug:
|
|
print(f"Processing order: {order}")
|
|
order['turns_remaining'] -= 1
|
|
if order['turns_remaining'] == 0:
|
|
if self._debug:
|
|
print(f"Completing order: {order}")
|
|
self._update_inventory(order['recipe']['output'])
|
|
if self._debug:
|
|
print(f"Inventory after completing order: {self.inventory}")
|
|
completed_orders.append(order)
|
|
|
|
for order in completed_orders:
|
|
self.crafting_queue.remove(order)
|
|
if self._debug:
|
|
print(f"Crafting queue after update: {self.crafting_queue}")
|
|
|
|
def trade_product(self, market, product, quantity, buying=True):
|
|
total_cost = market.products[product] * quantity
|
|
|
|
if buying:
|
|
if self.cash >= total_cost:
|
|
self.cash -= total_cost
|
|
self.inventory[product] += quantity
|
|
market.total_bought[product] += quantity # Record the purchase in the market
|
|
market.record_trade(total_cost) # Update the market's trade volume
|
|
else:
|
|
print("Insufficient funds to complete purchase.")
|
|
else:
|
|
if self.inventory[product] >= quantity:
|
|
self.cash += total_cost
|
|
self.inventory[product] -= quantity
|
|
market.total_sold[product] += quantity # Record the sale in the market
|
|
market.record_trade(total_cost) # Update the market's trade volume
|
|
else:
|
|
print("Insufficient inventory to complete sale.")
|
|
|
|
# Update market prices after trading
|
|
market.update_prices() # This call ensures prices are updated with constraints
|
|
|
|
def crafting_decision(self):
|
|
"""Displays crafting options and handles the user's crafting choice."""
|
|
print("\nCrafting Decision")
|
|
recipe_keys = list(crafting_recipes.keys())
|
|
print("\nAvailable Recipes:")
|
|
for idx, recipe in enumerate(recipe_keys, 1):
|
|
print(f" {idx}: {recipe}")
|
|
recipe_choice = self.get_user_choice(len(recipe_keys), "Choose a recipe to craft: ")
|
|
self.craft_product(recipe_keys[recipe_choice - 1])
|
|
|
|
def trade_stock(self, action, market, company_id, amount, is_ai=False):
|
|
"""Executes a stock trade action, buying or selling as specified."""
|
|
if company_id not in market.companies and company_id != self.player_id:
|
|
return "Company not found in the market."
|
|
|
|
stock_price = market.get_stock_price(company_id)
|
|
total_value = stock_price * amount
|
|
|
|
if action == 'buy':
|
|
return self._buy_stock(company_id, amount, total_value, is_ai)
|
|
elif action == 'sell':
|
|
return self._sell_stock(company_id, amount, total_value, is_ai)
|
|
else:
|
|
return "Invalid stock action."
|
|
|
|
def _buy_stock(self, company_id, amount, total_value, is_ai):
|
|
# Calculate the total number of shares currently owned
|
|
total_shares_owned = sum(self._market.companies[company_id].own_stock_ownership.values())
|
|
available_shares = self.total_shares - total_shares_owned
|
|
|
|
# Determine the maximum shares that can be bought based on available cash and available shares
|
|
max_affordable_shares = int(self.cash / self._market.get_stock_price(company_id))
|
|
amount = min(amount, available_shares, max_affordable_shares)
|
|
|
|
if amount == 0:
|
|
return "Cannot buy any shares due to insufficient funds or no available shares."
|
|
|
|
total_value = self._market.get_stock_price(company_id) * amount
|
|
|
|
self.cash -= total_value
|
|
self._market.update_stock_ledger(company_id, self.player_id, amount)
|
|
|
|
if company_id != self.player_id:
|
|
# Update buyer's stock_holdings for investments in other companies
|
|
self.stock_holdings[company_id] += amount
|
|
else:
|
|
# Update own_stock_ownership when buying own shares
|
|
self.own_stock_ownership[self.player_id] += amount
|
|
|
|
return f"Bought {amount} stocks of {company_id}."
|
|
|
|
def _sell_stock(self, company_id, amount, total_value, is_ai):
|
|
if not is_ai:
|
|
if self.stock_holdings[company_id] < amount:
|
|
return "Not enough stocks to sell."
|
|
# Update buyer's stock_holdings for non-AI (like player)
|
|
self.stock_holdings[company_id] -= amount
|
|
else:
|
|
if self.own_stock_ownership[company_id] < amount:
|
|
return "Not enough stocks to sell."
|
|
# Update own_stock_ownership for AI
|
|
self.own_stock_ownership[company_id] -= amount
|
|
|
|
self.cash += total_value
|
|
self._market.update_stock_ledger(company_id, self.player_id, -amount)
|
|
|
|
# If an AI company is selling its own shares, update the ownership for all shareholders
|
|
if is_ai and company_id == self.player_id:
|
|
for shareholder in self._market.companies[company_id].own_stock_ownership.keys():
|
|
self._market.companies[company_id].own_stock_ownership[shareholder] -= amount * \
|
|
(self._market.companies[
|
|
company_id].own_stock_ownership[
|
|
shareholder] / self.total_shares)
|
|
|
|
return f"Sold {amount} stocks of {company_id}."
|
|
|
|
def _calculate_available_shares(self):
|
|
"""Calculates the number of available shares for a given company."""
|
|
total_owned = sum(self.stock_holdings.values()) + sum(self.own_stock_ownership.values())
|
|
return self.total_shares - total_owned
|
|
|
|
def _get_stock_ownership(self, company_id):
|
|
"""Retrieves the stock ownership amount for a given company."""
|
|
if company_id == self.player_id:
|
|
return self.own_stock_ownership[self.player_id]
|
|
return self.stock_holdings.get(company_id, 0)
|
|
|
|
def _update_stock_ownership(self, company_id, amount, total_value, buying):
|
|
"""Updates the stock ownership details after a buy or sell action."""
|
|
if company_id == self.player_id:
|
|
if buying:
|
|
self.own_stock_ownership[self.player_id] += amount
|
|
else:
|
|
self.own_stock_ownership[self.player_id] -= amount
|
|
else:
|
|
if buying:
|
|
self.stock_holdings[company_id] += amount
|
|
else:
|
|
self.stock_holdings[company_id] -= amount
|
|
self.cash += -total_value if buying else total_value
|
|
|
|
def make_decision(self, market, competitors):
|
|
console = Console()
|
|
status_table = Table(title=f"\n[bold cyan]{self.player_id}'s Turn - Turn {market.current_turn}",
|
|
box=box.ROUNDED)
|
|
|
|
# Cash Row
|
|
status_table.add_column("Category", style="bold cyan")
|
|
status_table.add_column("Details")
|
|
status_table.add_row("Cash", f"[bold blue]{self.cash:.2f} €")
|
|
|
|
# Inventory Row
|
|
inventory_display = ', '.join([f"[bold]{item}: {quantity}" for item, quantity in self.inventory.items()])
|
|
status_table.add_row("Inventory", inventory_display)
|
|
|
|
# Market Prices Row with Color Coding and Emojis
|
|
price_info = []
|
|
for product, price in market.products.items():
|
|
price_change = price - market.previous_prices.get(product, price)
|
|
if price_change > 0:
|
|
price_info.append(f"[green]{product}: {price:.2f} € :arrow_up_small:") # Green color and up emoji
|
|
elif price_change < 0:
|
|
price_info.append(f"[red]{product}: {price:.2f} € :arrow_down_small:") # Red color and down emoji
|
|
else:
|
|
price_info.append(f"{product}: {price:.2f} €") # Default color (no change)
|
|
status_table.add_row("Market Prices", ', '.join(price_info))
|
|
|
|
# Shareholders Row
|
|
shareholders = ', '.join(
|
|
[f"[bold]{company}[/]: {ownership} shares" for company, ownership in self.own_stock_ownership.items()])
|
|
status_table.add_row("Your Shareholders", shareholders)
|
|
|
|
# Investments Row
|
|
investments = ', '.join(
|
|
[f"[bold]{company}[/]: {holding} shares" for company, holding in self.stock_holdings.items() if
|
|
holding > 0])
|
|
status_table.add_row("Your Investments", investments)
|
|
|
|
console.print(status_table)
|
|
|
|
# Action Choices
|
|
actions = {
|
|
"1": "Trade Products",
|
|
"2": "Craft",
|
|
"3": "Trade Stocks",
|
|
"4": "Skip Turn"
|
|
}
|
|
choices_display = "\n".join([f"{key}: {value}" for key, value in actions.items()])
|
|
console.print(f"Available Actions:\n{choices_display}", style="bold")
|
|
|
|
# Action Selection
|
|
action_choice = Prompt.ask("Choose your action", default="4")
|
|
selected_action = actions.get(action_choice, None)
|
|
|
|
if selected_action == "Trade Products":
|
|
self.trade_products_decision(market)
|
|
elif selected_action == "Craft":
|
|
self.crafting_decision()
|
|
elif selected_action == "Trade Stocks":
|
|
stock_actions = ["Buy", "Sell"]
|
|
stock_action_choice = Prompt.ask("Choose stock action", choices=stock_actions, default=stock_actions[0])
|
|
self.trade_stocks_decision(stock_action_choice.lower(), market, competitors)
|
|
elif selected_action == "Skip Turn":
|
|
pass # Skip turn
|
|
else:
|
|
console.print("[bold red]Invalid choice. Please enter a valid option.")
|
|
|
|
def trade_products_decision(self, market):
|
|
"""Handles the decision-making process for trading products."""
|
|
print("\nProduct Trading Decision")
|
|
products = list(market.products.keys())
|
|
print("\nAvailable Products:")
|
|
for idx, product in enumerate(products, 1):
|
|
print(f" {idx}: {product} - Price: {market.products[product]:.2f} €")
|
|
product_choice = self.get_user_choice(len(products), "Choose a product to trade: ")
|
|
product = products[product_choice - 1]
|
|
|
|
quantity = self.get_valid_input("Enter the quantity to trade: ", int, "Quantity must be positive.",
|
|
lambda x: x > 0)
|
|
|
|
self.get_user_choice(2, "Choose trade type (1: Buy, 2: Sell): ")
|
|
self.trade_product(market, product, quantity)
|
|
|
|
def trade_stocks_decision(self, action, market, competitors):
|
|
"""Facilitates the decision-making process for stock trading actions."""
|
|
print("\nStock Trading Decision")
|
|
if action == 'buy':
|
|
print("Available companies to buy stocks from:")
|
|
elif action == 'sell':
|
|
print("Your stock holdings:")
|
|
|
|
companies = competitors + [self]
|
|
|
|
for idx, company in enumerate(companies, 1):
|
|
company_id = company.player_id
|
|
stock_info = f" {idx}: {company_id} - Current stock price: {market.get_stock_price(company_id)}"
|
|
if action == 'sell' and self.stock_holdings.get(company_id, 0) > 0:
|
|
stock_info += f", Owned: {self.stock_holdings[company_id]}"
|
|
print(stock_info)
|
|
|
|
company_choice = self.get_user_choice(len(companies), "Enter the company number to trade stocks: ")
|
|
company_id = companies[company_choice - 1].player_id
|
|
|
|
amount = self.get_valid_input("Enter the amount of stocks to trade: ", int, "Stock amount must be positive.",
|
|
lambda x: x > 0)
|
|
self.trade_stock(action, market, company_id, amount)
|
|
|
|
@staticmethod
|
|
def get_user_choice(num_options, prompt):
|
|
"""Prompts the user for a choice and validates the input."""
|
|
choice = 0
|
|
while choice < 1 or choice > num_options:
|
|
try:
|
|
choice = int(input(prompt))
|
|
if choice < 1 or choice > num_options:
|
|
raise ValueError
|
|
except ValueError:
|
|
print(f"Please enter a number between 1 and {num_options}.")
|
|
return choice
|
|
|
|
@staticmethod
|
|
def get_valid_input(prompt, input_type, error_message, validation_func=lambda x: True):
|
|
"""Requests and validates user input based on specified criteria."""
|
|
while True:
|
|
try:
|
|
value = input_type(input(prompt))
|
|
if not validation_func(value):
|
|
raise ValueError
|
|
return value
|
|
except ValueError:
|
|
print(error_message)
|
|
|
|
def is_market_boom(self):
|
|
"""Checks if the market is booming."""
|
|
return all(price > 20 for price in self._market.products.values())
|
|
|
|
|
|
class AICompany(Company):
|
|
"""
|
|
AI Company. Inherits from the Company class and adds AI-specific decision-making based on risk tolerance.
|
|
|
|
Attributes:
|
|
risk_tolerance (float): A value representing the AI's willingness to take risks, influencing its decisions.
|
|
actions_history (list): Records the history of actions taken by the AI.
|
|
average_prices (dict): Tracks the average market prices of products for strategic decision-making.
|
|
|
|
Methods provide the AI's logic for crafting, trading, stock transactions, and handling market events.
|
|
"""
|
|
|
|
def __init__(self, player_id, competitors_ids, market, risk_tolerance):
|
|
super().__init__(player_id, competitors_ids, market)
|
|
self.risk_tolerance = risk_tolerance
|
|
self.actions_history = []
|
|
self.average_prices = {product: market.products[product] for product in market.products}
|
|
|
|
def update_average_prices(self):
|
|
"""Updates the average prices of products in the market for AI decision-making purposes."""
|
|
self.average_prices = {product: (self.average_prices[product] * (
|
|
self._market.current_turn - 1) + price) / self._market.current_turn
|
|
for product, price in self._market.products.items()}
|
|
|
|
def make_decision(self, market, competitors):
|
|
if self.risk_tolerance > 0.5: # High-Risk AI
|
|
self.high_risk_decision(market)
|
|
else: # Low-Risk AI
|
|
self.low_risk_decision(market)
|
|
|
|
def low_risk_decision(self, market):
|
|
"""
|
|
Defines the decision-making process for a low-risk AI player.
|
|
"""
|
|
if market.current_turn == 1:
|
|
self.buy_product('Coal', self.cash / 3)
|
|
elif market.current_turn == 2:
|
|
self.buy_product('Copper', min(self.inventory['Coal'], self.cash / market.products['Copper']))
|
|
elif market.current_turn >= 3:
|
|
self.buy_and_craft()
|
|
|
|
def high_risk_decision(self, market):
|
|
"""
|
|
Defines the decision-making process for a high-risk AI player.
|
|
"""
|
|
if market.current_turn == 1:
|
|
self.sell_own_shares(market)
|
|
self.buy_product('Coal', self.cash / 2)
|
|
self.buy_product('Copper', self.cash)
|
|
else:
|
|
if self.should_craft():
|
|
self.buy_and_craft()
|
|
if self.should_sell_products(market):
|
|
self.sell_high_value_products(market)
|
|
if market.current_turn > 6:
|
|
self.buy_stocks_strategy()
|
|
|
|
def buy_product(self, product, budget):
|
|
"""
|
|
Buys a specific quantity of a product for the AI company.
|
|
"""
|
|
quantity = int(budget // self._market.products[product])
|
|
if quantity > 0:
|
|
self.trade_product(self._market, product, quantity)
|
|
action = f"Bought {quantity} of {product}"
|
|
self.actions_history.append(action)
|
|
|
|
def buy_stocks_strategy(self):
|
|
own_stock_left = 100 - self.own_stock_ownership[self.player_id]
|
|
if own_stock_left > 0:
|
|
stock_price = self._market.get_stock_price(self.player_id)
|
|
amount_to_buy = min(own_stock_left, int(self.cash / stock_price))
|
|
if amount_to_buy > 0:
|
|
action = f"{self.player_id} is buying its own stock. Amount to buy: {amount_to_buy}"
|
|
self.actions_history.append(action)
|
|
self.trade_stock('buy', self._market, self.player_id, amount_to_buy, is_ai=True)
|
|
return
|
|
|
|
for company_id in self.stock_holdings:
|
|
if company_id != self.player_id:
|
|
stock_price = self._market.get_stock_price(company_id)
|
|
if stock_price <= self.cash:
|
|
amount_to_buy = int(self.cash / stock_price)
|
|
print(f"{self.player_id} is buying {company_id}'s stock. Amount to buy: {amount_to_buy}")
|
|
self.trade_stock('buy', self._market, company_id, amount_to_buy, is_ai=True)
|
|
return
|
|
|
|
def sell_own_shares(self, market):
|
|
"""
|
|
Sells a portion of the AI company's own shares.
|
|
"""
|
|
amount_to_sell = int(self.own_stock_ownership[self.player_id] * 0.25) # Sell 25% of own shares
|
|
if amount_to_sell > 0:
|
|
self.trade_stock('sell', market, self.player_id, amount_to_sell, is_ai=True)
|
|
action = f"Sold {amount_to_sell} of own shares"
|
|
self.actions_history.append(action)
|
|
|
|
def should_craft(self):
|
|
"""
|
|
Determines if the AI should craft products based on inventory and market conditions.
|
|
"""
|
|
return all(
|
|
self.inventory[product] >= qty for product, qty in crafting_recipes['Manual Synthesis']['input'].items())
|
|
|
|
def should_sell_products(self, market):
|
|
"""
|
|
Decides if the AI should sell products based on market prices.
|
|
"""
|
|
return any(market.products[product] >= 2 * self.average_prices[product] for product in self.inventory if
|
|
self.inventory[product] > 0)
|
|
|
|
def sell_high_value_products(self, market):
|
|
"""
|
|
Decides if the AI should sell products based on market prices.
|
|
"""
|
|
for product, quantity in self.inventory.items():
|
|
if quantity > 0 and market.products[product] >= 2 * self.average_prices[product]:
|
|
self.trade_product(market, product, quantity, buying=False)
|
|
action = f"Sold high value products"
|
|
self.actions_history.append(action)
|
|
|
|
def buy_and_craft(self):
|
|
"""
|
|
Executes buying of resources and crafting of products for the AI.
|
|
"""
|
|
chosen_recipe = crafting_recipes['Manual Synthesis']
|
|
if all(self.inventory[product] >= qty for product, qty in chosen_recipe['input'].items()):
|
|
self.craft_product('Manual Synthesis')
|
|
print(f"Crafting using Manual Synthesis")
|
|
action = "Crafted products using Manual Synthesis"
|
|
self.actions_history.append(action)
|
|
else:
|
|
print("Not enough resources to craft using Manual Synthesis")
|
|
|
|
|
|
def print_ai_actions(ai_competitors):
|
|
"""Displays the actions history, current cash, inventory, and stock information of AI competitors."""
|
|
console = Console()
|
|
for ai in ai_competitors:
|
|
panel_text = Text()
|
|
inventory_text = ', '.join([f"{product}: {quantity}" for product, quantity in ai.inventory.items()])
|
|
|
|
# List all actions
|
|
for idx, action in enumerate(ai.actions_history, 1):
|
|
panel_text.append(f"{idx}. Action: {action}\n", style="grey50")
|
|
|
|
# Append current cash and inventory to the text
|
|
panel_text.append(f"\nCurrent Cash: {ai.cash:.2f} €\n", style="bold blue")
|
|
panel_text.append(f"Inventory: {inventory_text}\n", style="bold cyan")
|
|
|
|
# Display Shareholders and Investments
|
|
shareholders_text = ', '.join(
|
|
[f"{company}: {ownership} shares" for company, ownership in ai.own_stock_ownership.items()])
|
|
investments_text = ', '.join(
|
|
[f"{company}: {holding} shares" for company, holding in ai.stock_holdings.items() if holding > 0])
|
|
|
|
panel_text.append(f"Shareholders: {shareholders_text}\n", style="bold green")
|
|
panel_text.append(f"Investments: {investments_text}", style="bold magenta")
|
|
|
|
# Display in a panel
|
|
console.print(Panel(panel_text, title=f"[bold]{ai.player_id}'s Status", border_style="grey50"))
|
|
|
|
|
|
def display_final_scores(console, final_scores, market):
|
|
"""Displays the final scores in a styled table."""
|
|
score_table = Table(header_style="bold cyan", box=box.DOUBLE_EDGE)
|
|
score_table.add_column("Company", style="bold")
|
|
score_table.add_column("Final Score", justify="right")
|
|
score_table.add_column("Majority Owner", style="bold")
|
|
score_table.add_column("Ownership Percentage", justify="right", style="dim")
|
|
|
|
for company_id, data in sorted(final_scores.items(), key=lambda item: item[1]['score'], reverse=True):
|
|
ownership_percentage = f"{data['score'] / market.companies[company_id].value * 100:.2f}%"
|
|
score_table.add_row(company_id, f"{data['score']:.0f}", data['majority_owner'], ownership_percentage)
|
|
|
|
console.print(score_table)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|