diff --git a/.nojekyll b/.nojekyll new file mode 100644 index 0000000..e69de29 diff --git a/404.html b/404.html new file mode 100644 index 0000000..fb8b013 --- /dev/null +++ b/404.html @@ -0,0 +1,1001 @@ + + + +
+ + + + + + + + + + + + + + + + +Comprehensive documentation for the TeleAgent Agent Model system, which handles AI agent behaviors and interactions.
+The main class for managing group chat interactions and user behavior.
+def __init__(
+ self,
+ name: str,
+ system_message: str,
+ llm_config: dict = None,
+ memory_config: dict = None,
+ **kwargs
+):
+ """
+ Initialize UserGroupAgent.
+
+ Args:
+ name (str): Name of the agent
+ system_message (str): Base system message defining agent behavior
+ llm_config (dict, optional): LLM configuration containing:
+ - model (str): Model name (e.g., "gpt-4")
+ - temperature (float): Response randomness
+ - max_tokens (int): Maximum response length
+ memory_config (dict, optional): Memory configuration containing:
+ - max_history (int): Maximum conversation history
+ - persistence (bool): Whether to persist memory
+
+ Raises:
+ ValueError: If required parameters are invalid
+ ConfigurationError: If configuration is invalid
+ """
+
+async def a_update_inner_state(
+ self,
+ messages: List[dict],
+ sender: str,
+ **kwargs
+) -> Tuple[bool, Optional[str]]:
+ """
+ Update agent's inner state based on new messages.
+
+ Args:
+ messages: List of message dictionaries
+ sender: The agent or user who sent the messages
+ **kwargs: Additional keyword arguments
+
+ Returns:
+ Tuple containing:
+ - bool: Whether to generate a response
+ - Optional[str]: Generated response if any
+
+ Raises:
+ StateUpdateError: If state update fails
+ """
+
+async def update_system_message(self) -> None:
+ """
+ Update agent's system message based on current state.
+
+ This method should be called after significant state changes
+ to ensure the agent's behavior remains consistent with its
+ current state and context.
+ """
+
+async def process_message(
+ self,
+ message: dict,
+ context: dict = None
+) -> Optional[str]:
+ """
+ Process incoming message and generate response.
+
+ Args:
+ message: Message dictionary containing:
+ - text (str): Message text
+ - from_user (dict): Sender information
+ - chat (dict): Chat information
+ context: Additional context information
+
+ Returns:
+ Optional[str]: Generated response if any
+
+ Raises:
+ ProcessingError: If message processing fails
+ """
+
+async def analyze_message_context(
+ self,
+ message: dict
+) -> dict:
+ """
+ Analyze message context for better understanding.
+
+ Args:
+ message: Message to analyze
+
+ Returns:
+ dict: Context analysis containing:
+ - intent (str): Detected message intent
+ - sentiment (float): Message sentiment score
+ - entities (list): Detected entities
+ - topics (list): Detected topics
+ """
+
+class Memory:
+ def __init__(self, config: dict):
+ """
+ Initialize memory system.
+
+ Args:
+ config: Memory configuration containing:
+ - max_size (int): Maximum memory size
+ - ttl (int): Time-to-live for memories
+ - storage_path (str): Path for persistent storage
+ """
+
+ async def add_memory(
+ self,
+ key: str,
+ value: Any,
+ ttl: int = None
+ ) -> None:
+ """
+ Add item to memory.
+
+ Args:
+ key: Memory identifier
+ value: Data to store
+ ttl: Optional custom time-to-live
+
+ Raises:
+ MemoryError: If storage fails
+ """
+
+ async def get_memory(
+ self,
+ key: str
+ ) -> Optional[Any]:
+ """
+ Retrieve item from memory.
+
+ Args:
+ key: Memory identifier
+
+ Returns:
+ Optional[Any]: Stored value if found
+ """
+
+ async def cleanup(self) -> None:
+ """
+ Remove expired memories and optimize storage.
+ """
+
+class SocialRelations:
+ def __init__(self):
+ self.relations = {}
+
+ async def update_relation(
+ self,
+ user_id: str,
+ metrics: dict
+ ) -> None:
+ """
+ Update social relationship metrics.
+
+ Args:
+ user_id: User identifier
+ metrics: Relationship metrics containing:
+ - trust (float): Trust score
+ - familiarity (float): Familiarity level
+ - rapport (float): Rapport score
+ """
+
+ async def get_relation(
+ self,
+ user_id: str
+ ) -> dict:
+ """
+ Get relationship metrics for user.
+
+ Args:
+ user_id: User identifier
+
+ Returns:
+ dict: Current relationship metrics
+ """
+
+class EmotionModule:
+ def __init__(self):
+ self.current_emotion = "neutral"
+ self.emotion_history = []
+
+ async def process_emotion(
+ self,
+ message: dict
+ ) -> str:
+ """
+ Process message to determine emotional response.
+
+ Args:
+ message: Message to analyze
+
+ Returns:
+ str: Determined emotion
+ """
+
+ async def get_emotional_state(self) -> dict:
+ """
+ Get current emotional state.
+
+ Returns:
+ dict: Emotional state containing:
+ - current (str): Current emotion
+ - intensity (float): Emotion intensity
+ - history (list): Recent emotion history
+ """
+
+from teleAgent.models.agent_model.user_groupagent import UserGroupAgent
+
+# Initialize agent
+agent = UserGroupAgent(
+ name="GroupAssistant",
+ system_message="I am a helpful and friendly group chat assistant",
+ llm_config={
+ "model": "gpt-4",
+ "temperature": 0.7
+ },
+ memory_config={
+ "max_history": 1000,
+ "persistence": True
+ }
+)
+
+# Process message
+response = await agent.process_message({
+ "text": "Hello, how are you?",
+ "from_user": {"id": "123", "name": "User"},
+ "chat": {"id": "456", "type": "group"}
+})
+
+# Update agent state
+state_update = await agent.a_update_inner_state(
+ messages=[{
+ "text": "Let's discuss the project",
+ "from_user": {"id": "123", "name": "User"}
+ }],
+ sender="user"
+)
+
+if state_update[0]: # Should respond
+ response = state_update[1]
+ # Send response to chat
+
+# Store conversation context
+await agent.memory.add_memory(
+ key="conversation_123",
+ value={
+ "topic": "Project Discussion",
+ "participants": ["User1", "User2"],
+ "key_points": ["Deadline", "Budget"]
+ },
+ ttl=3600 # 1 hour
+)
+
+# Retrieve context later
+context = await agent.memory.get_memory("conversation_123")
+
+class AgentError(Exception):
+ """Base exception for agent-related errors"""
+ pass
+
+class StateError(AgentError):
+ """Exception for state management errors"""
+ pass
+
+class ProcessingError(AgentError):
+ """Exception for message processing errors"""
+ pass
+
+class MemoryError(AgentError):
+ """Exception for memory-related errors"""
+ pass
+
+try:
+ await agent.process_message(message)
+except StateError as e:
+ logger.error(f"State error: {e}")
+ # Implement state recovery
+except ProcessingError as e:
+ logger.error(f"Processing error: {e}")
+ # Fallback to safe response
+except AgentError as e:
+ logger.error(f"Agent error: {e}")
+ # Handle other errors
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Comprehensive documentation for the TeleAgent Artwork Creation system, which handles AI-powered artwork generation and management.
+The main class for generating and managing AI-powered artwork.
+def __init__(
+ self,
+ config: dict,
+ dalle_client: DalleClient = None,
+ storage_manager: StorageManager = None
+):
+ """
+ Initialize ArtworkGenerator.
+
+ Args:
+ config (dict): Configuration containing:
+ - model_version (str): DALL-E model version
+ - image_size (str): Default image size (e.g., "1024x1024")
+ - quality (str): Image quality setting
+ dalle_client (DalleClient, optional): Custom DALL-E client
+ storage_manager (StorageManager, optional): Custom storage manager
+
+ Raises:
+ ValueError: If required configuration is missing
+ InitializationError: If services initialization fails
+ """
+
+async def generate_artwork(
+ self,
+ prompt: str,
+ style_config: dict = None,
+ generation_options: dict = None
+) -> dict:
+ """
+ Generate artwork using AI.
+
+ Args:
+ prompt: Text description for the artwork
+ style_config: Optional style configuration:
+ - artistic_style (str): Desired art style
+ - color_scheme (str): Preferred colors
+ - composition (str): Layout preferences
+ generation_options: Optional generation parameters:
+ - num_variations (int): Number of variations
+ - image_size (str): Custom size for this generation
+ - quality (str): Quality setting override
+
+ Returns:
+ dict: Generated artwork information:
+ - image_url (str): URL of generated image
+ - prompt_id (str): Unique prompt identifier
+ - generation_params (dict): Parameters used
+ - variations (list): URLs of variations if requested
+
+ Raises:
+ GenerationError: If artwork generation fails
+ """
+
+async def enhance_prompt(
+ self,
+ base_prompt: str,
+ enhancement_type: str = "detailed"
+) -> str:
+ """
+ Enhance artwork prompt for better results.
+
+ Args:
+ base_prompt: Original user prompt
+ enhancement_type: Type of enhancement:
+ - "detailed": Add technical details
+ - "artistic": Add artistic elements
+ - "style": Add style-specific terms
+
+ Returns:
+ str: Enhanced prompt
+ """
+
+class StyleManager:
+ def __init__(self):
+ self.available_styles = {}
+
+ async def apply_style(
+ self,
+ prompt: str,
+ style_name: str
+ ) -> str:
+ """
+ Apply predefined style to prompt.
+
+ Args:
+ prompt: Original artwork prompt
+ style_name: Name of style to apply
+
+ Returns:
+ str: Modified prompt with style elements
+
+ Raises:
+ StyleError: If style is not found
+ """
+
+ async def register_style(
+ self,
+ style_name: str,
+ style_elements: dict
+ ) -> None:
+ """
+ Register new artwork style.
+
+ Args:
+ style_name: Name for the style
+ style_elements: Style configuration:
+ - keywords (list): Style-specific keywords
+ - modifiers (list): Prompt modifiers
+ - composition (dict): Composition rules
+ """
+
+class ArtworkStorage:
+ def __init__(self, storage_config: dict):
+ """
+ Initialize artwork storage.
+
+ Args:
+ storage_config: Storage configuration:
+ - provider (str): Storage provider
+ - bucket (str): Storage bucket name
+ - path_prefix (str): Storage path prefix
+ """
+
+ async def store_artwork(
+ self,
+ image_data: bytes,
+ metadata: dict
+ ) -> str:
+ """
+ Store generated artwork.
+
+ Args:
+ image_data: Raw image data
+ metadata: Image metadata:
+ - prompt (str): Generation prompt
+ - params (dict): Generation parameters
+ - timestamp (str): Creation time
+
+ Returns:
+ str: Storage URL for the artwork
+
+ Raises:
+ StorageError: If storage operation fails
+ """
+
+ async def retrieve_artwork(
+ self,
+ artwork_id: str
+ ) -> Tuple[bytes, dict]:
+ """
+ Retrieve stored artwork.
+
+ Args:
+ artwork_id: Unique artwork identifier
+
+ Returns:
+ Tuple containing:
+ - bytes: Image data
+ - dict: Artwork metadata
+
+ Raises:
+ NotFoundError: If artwork is not found
+ """
+
+class QualityChecker:
+ def __init__(self, criteria: dict = None):
+ self.criteria = criteria or DEFAULT_CRITERIA
+
+ async def check_quality(
+ self,
+ image_data: bytes,
+ prompt: str
+ ) -> dict:
+ """
+ Check artwork quality against criteria.
+
+ Args:
+ image_data: Generated image data
+ prompt: Original generation prompt
+
+ Returns:
+ dict: Quality check results:
+ - score (float): Overall quality score
+ - aspects (dict): Individual aspect scores
+ - recommendations (list): Improvement suggestions
+ """
+
+from teleAgent.artwork.generator import ArtworkGenerator
+
+# Initialize generator
+generator = ArtworkGenerator({
+ "model_version": "dall-e-3",
+ "image_size": "1024x1024",
+ "quality": "standard"
+})
+
+# Generate artwork
+artwork = await generator.generate_artwork(
+ prompt="A serene landscape with mountains at sunset",
+ style_config={
+ "artistic_style": "impressionist",
+ "color_scheme": "warm",
+ "composition": "rule_of_thirds"
+ }
+)
+
+# Register custom style
+await generator.style_manager.register_style(
+ style_name="cyberpunk",
+ style_elements={
+ "keywords": ["neon", "futuristic", "urban"],
+ "modifiers": ["high contrast", "vibrant colors"],
+ "composition": {
+ "lighting": "dramatic",
+ "perspective": "dynamic"
+ }
+ }
+)
+
+# Generate with custom style
+artwork = await generator.generate_artwork(
+ prompt="A city street at night",
+ style_config={"artistic_style": "cyberpunk"}
+)
+
+class ArtworkError(Exception):
+ """Base exception for artwork-related errors"""
+ pass
+
+class GenerationError(ArtworkError):
+ """Exception for generation failures"""
+ pass
+
+class StyleError(ArtworkError):
+ """Exception for style-related errors"""
+ pass
+
+class StorageError(ArtworkError):
+ """Exception for storage operations"""
+ pass
+
+try:
+ artwork = await generator.generate_artwork(prompt, style_config)
+except GenerationError as e:
+ logger.error(f"Generation failed: {e}")
+ # Handle generation failure
+except StyleError as e:
+ logger.error(f"Style error: {e}")
+ # Handle style-related failure
+except ArtworkError as e:
+ logger.error(f"Artwork operation failed: {e}")
+ # Handle other errors
+
+Prompt Engineering + ```python + def optimize_prompt(prompt: str) -> str: + """Optimize prompt for better results""" + # Add detail markers + prompt = f"Detailed view: {prompt}"
+# Add quality markers + prompt = f"{prompt}, high quality, professional"
+# Add composition guidance + prompt = f"{prompt}, well composed, balanced"
+return prompt + ```
+Resource Management + ```python + from contextlib import asynccontextmanager
+@asynccontextmanager + async def artwork_session(): + """Manage artwork generation resources""" + generator = ArtworkGenerator(config) + try: + yield generator + finally: + await generator.cleanup() + ```
+python
+ async def batch_generate(
+ prompts: List[str],
+ batch_size: int = 5
+ ) -> List[dict]:
+ """Process multiple artwork generations"""
+ results = []
+ for batch in chunks(prompts, batch_size):
+ batch_results = await asyncio.gather(
+ *(generator.generate_artwork(prompt) for prompt in batch)
+ )
+ results.extend(batch_results)
+ return results
Comprehensive documentation for the TeleAgent Bargaining System, which handles automated price negotiations and deal-making.
+The main class for managing price negotiations and deal-making processes.
+def __init__(
+ self,
+ config: dict,
+ price_oracle: PriceOracle = None,
+ deal_manager: DealManager = None
+):
+ """
+ Initialize BargainingAgent.
+
+ Args:
+ config (dict): Configuration containing:
+ - min_price (float): Minimum acceptable price
+ - max_price (float): Maximum acceptable price
+ - negotiation_steps (int): Maximum negotiation rounds
+ - strategy (str): Negotiation strategy type
+ price_oracle (PriceOracle, optional): Custom price oracle
+ deal_manager (DealManager, optional): Custom deal manager
+
+ Raises:
+ ValueError: If configuration is invalid
+ InitializationError: If services initialization fails
+ """
+
+async def start_negotiation(
+ self,
+ item_id: str,
+ initial_price: float,
+ counterparty_id: str,
+ context: dict = None
+) -> dict:
+ """
+ Start a new price negotiation.
+
+ Args:
+ item_id: Identifier of item being negotiated
+ initial_price: Starting price point
+ counterparty_id: Identifier of negotiating party
+ context: Optional negotiation context:
+ - urgency (str): Negotiation urgency level
+ - market_data (dict): Current market information
+ - preferences (dict): Negotiation preferences
+
+ Returns:
+ dict: Negotiation session information:
+ - session_id (str): Unique negotiation ID
+ - initial_offer (float): Starting offer
+ - status (str): Current negotiation status
+
+ Raises:
+ NegotiationError: If negotiation cannot be started
+ """
+
+async def make_offer(
+ self,
+ session_id: str,
+ offer_amount: float,
+ justification: str = None
+) -> dict:
+ """
+ Make a price offer in negotiation.
+
+ Args:
+ session_id: Active negotiation session ID
+ offer_amount: Proposed price amount
+ justification: Optional reasoning for offer
+
+ Returns:
+ dict: Offer result containing:
+ - offer_id (str): Unique offer identifier
+ - status (str): Offer status
+ - counter_offer (float): Optional counter-offer
+
+ Raises:
+ OfferError: If offer is invalid
+ """
+
+class PriceAnalyzer:
+ def __init__(self, market_data: dict = None):
+ """
+ Initialize price analyzer.
+
+ Args:
+ market_data: Optional market data:
+ - historical_prices (list): Price history
+ - market_trends (dict): Trend indicators
+ - volatility (float): Price volatility
+ """
+
+ async def analyze_offer(
+ self,
+ offer_amount: float,
+ context: dict
+ ) -> dict:
+ """
+ Analyze price offer fairness.
+
+ Args:
+ offer_amount: Proposed price
+ context: Market context
+
+ Returns:
+ dict: Analysis results:
+ - fairness_score (float): Offer fairness
+ - market_alignment (float): Market fit
+ - recommendations (list): Action suggestions
+ """
+
+ async def suggest_counter_offer(
+ self,
+ current_offer: float,
+ negotiation_history: List[dict]
+ ) -> float:
+ """
+ Suggest counter-offer amount.
+
+ Args:
+ current_offer: Current offer amount
+ negotiation_history: Previous offers
+
+ Returns:
+ float: Suggested counter-offer amount
+ """
+
+class DealManager:
+ def __init__(self, config: dict):
+ """
+ Initialize deal manager.
+
+ Args:
+ config: Deal management configuration:
+ - approval_threshold (float): Auto-approval limit
+ - confirmation_required (bool): Need confirmation
+ - timeout (int): Deal timeout seconds
+ """
+
+ async def create_deal(
+ self,
+ negotiation_id: str,
+ final_price: float,
+ terms: dict
+ ) -> dict:
+ """
+ Create a new deal from negotiation.
+
+ Args:
+ negotiation_id: Completed negotiation ID
+ final_price: Agreed price amount
+ terms: Deal terms and conditions
+
+ Returns:
+ dict: Created deal information:
+ - deal_id (str): Unique deal identifier
+ - status (str): Deal status
+ - execution_plan (dict): Deal execution steps
+
+ Raises:
+ DealCreationError: If deal creation fails
+ """
+
+ async def execute_deal(
+ self,
+ deal_id: str
+ ) -> dict:
+ """
+ Execute finalized deal.
+
+ Args:
+ deal_id: Deal to execute
+
+ Returns:
+ dict: Execution results:
+ - transaction_id (str): Transaction identifier
+ - status (str): Execution status
+ - completion_time (str): Completion timestamp
+
+ Raises:
+ DealExecutionError: If execution fails
+ """
+
+class NegotiationStrategy:
+ def __init__(self, strategy_type: str):
+ """
+ Initialize negotiation strategy.
+
+ Args:
+ strategy_type: Strategy type:
+ - "aggressive": Quick deal focus
+ - "balanced": Balanced approach
+ - "conservative": Risk-averse approach
+ """
+
+ async def evaluate_position(
+ self,
+ current_price: float,
+ negotiation_state: dict
+ ) -> dict:
+ """
+ Evaluate negotiation position.
+
+ Args:
+ current_price: Current offer price
+ negotiation_state: Current state
+
+ Returns:
+ dict: Position evaluation:
+ - strength (float): Position strength
+ - next_action (str): Suggested action
+ - price_range (tuple): Target range
+ """
+
+from teleAgent.bargaining.agent import BargainingAgent
+
+# Initialize agent
+agent = BargainingAgent({
+ "min_price": 100.0,
+ "max_price": 1000.0,
+ "negotiation_steps": 5,
+ "strategy": "balanced"
+})
+
+# Start negotiation
+negotiation = await agent.start_negotiation(
+ item_id="nft_123",
+ initial_price=500.0,
+ counterparty_id="user_456",
+ context={
+ "urgency": "medium",
+ "market_data": {"current_floor": 450.0}
+ }
+)
+
+# Make offer
+offer_result = await agent.make_offer(
+ session_id=negotiation["session_id"],
+ offer_amount=450.0,
+ justification="Market alignment adjustment"
+)
+
+# Create and execute deal
+deal = await agent.deal_manager.create_deal(
+ negotiation_id=negotiation["session_id"],
+ final_price=450.0,
+ terms={
+ "payment_method": "crypto",
+ "delivery_time": "immediate"
+ }
+)
+
+result = await agent.deal_manager.execute_deal(
+ deal_id=deal["deal_id"]
+)
+
+class BargainingError(Exception):
+ """Base exception for bargaining-related errors"""
+ pass
+
+class NegotiationError(BargainingError):
+ """Exception for negotiation failures"""
+ pass
+
+class OfferError(BargainingError):
+ """Exception for offer-related errors"""
+ pass
+
+class DealError(BargainingError):
+ """Exception for deal-related errors"""
+ pass
+
+try:
+ negotiation = await agent.start_negotiation(item_id, price, counterparty)
+except NegotiationError as e:
+ logger.error(f"Negotiation failed: {e}")
+ # Handle negotiation failure
+except OfferError as e:
+ logger.error(f"Offer error: {e}")
+ # Handle offer failure
+except BargainingError as e:
+ logger.error(f"Bargaining operation failed: {e}")
+ # Handle other errors
+
+Price Validation
+ python
+ def validate_price(price: float, context: dict) -> bool:
+ """Validate price against market conditions"""
+ if price < context["market_floor"]:
+ return False
+ if price > context["market_ceiling"]:
+ return False
+ return True
Negotiation Timeout
+ python
+ async def negotiate_with_timeout(
+ agent: BargainingAgent,
+ timeout: int = 300
+ ):
+ """Handle negotiation with timeout"""
+ async with timeout_scope(timeout):
+ try:
+ result = await agent.start_negotiation(...)
+ return result
+ except TimeoutError:
+ await agent.cancel_negotiation(...)
+ raise NegotiationTimeout("Negotiation timed out")
Deal Safety + ```python + async def safe_deal_execution( + deal_manager: DealManager, + deal_id: str + ) -> dict: + """Execute deal with safety checks""" + # Verify deal status + if not await deal_manager.verify_deal(deal_id): + raise DealError("Invalid deal")
+# Check counterparty + if not await deal_manager.verify_counterparty(deal_id): + raise DealError("Counterparty verification failed")
+# Execute with retry + return await retry_with_backoff( + deal_manager.execute_deal, + deal_id + ) + ```
+Comprehensive documentation for the TeleAgent NFT Tools system, which handles NFT creation, management, and transactions.
+The main class for NFT creation and minting operations.
+def __init__(
+ self,
+ config: dict,
+ artwork_dao: ArtworkDAO = None,
+ nft_dao: NFTDAO = None
+):
+ """
+ Initialize NFTCreator.
+
+ Args:
+ config (dict): Configuration containing:
+ - wallet_address (str): Creator's wallet address
+ - network (str): Blockchain network (e.g., "mainnet", "devnet")
+ - api_endpoint (str): RPC endpoint URL
+ artwork_dao (ArtworkDAO, optional): Data access object for artwork
+ nft_dao (NFTDAO, optional): Data access object for NFTs
+
+ Raises:
+ ValueError: If required configuration is missing
+ ConnectionError: If network connection fails
+ """
+
+async def create_nft(
+ self,
+ artwork: dict,
+ metadata: dict,
+ options: dict = None
+) -> dict:
+ """
+ Create a new NFT from artwork.
+
+ Args:
+ artwork: Artwork information containing:
+ - image_url (str): URL of the artwork
+ - title (str): Artwork title
+ - description (str): Artwork description
+ metadata: NFT metadata containing:
+ - attributes (list): NFT attributes
+ - properties (dict): Additional properties
+ options: Optional creation parameters:
+ - royalty_percentage (float): Creator royalty
+ - is_mutable (bool): Whether metadata can be updated
+
+ Returns:
+ dict: Created NFT information containing:
+ - mint_address (str): NFT mint address
+ - metadata_url (str): Metadata URL
+ - transaction_id (str): Creation transaction ID
+
+ Raises:
+ NFTCreationError: If NFT creation fails
+ """
+
+async def mint_nft(
+ self,
+ mint_address: str,
+ recipient_address: str = None
+) -> dict:
+ """
+ Mint an NFT to a recipient.
+
+ Args:
+ mint_address: NFT mint address
+ recipient_address: Optional recipient wallet address
+ (defaults to creator's address)
+
+ Returns:
+ dict: Minting result containing:
+ - transaction_id (str): Minting transaction ID
+ - recipient (str): Recipient wallet address
+
+ Raises:
+ MintingError: If minting operation fails
+ """
+
+class MetadataManager:
+ def __init__(self, storage_config: dict):
+ """
+ Initialize metadata manager.
+
+ Args:
+ storage_config: Storage configuration containing:
+ - provider (str): Storage provider (e.g., "arweave", "ipfs")
+ - endpoint (str): Storage endpoint URL
+ """
+
+ async def upload_metadata(
+ self,
+ metadata: dict,
+ is_mutable: bool = False
+ ) -> str:
+ """
+ Upload NFT metadata to storage.
+
+ Args:
+ metadata: NFT metadata
+ is_mutable: Whether metadata can be updated
+
+ Returns:
+ str: Metadata URL
+
+ Raises:
+ StorageError: If upload fails
+ """
+
+ async def update_metadata(
+ self,
+ metadata_url: str,
+ updates: dict
+ ) -> str:
+ """
+ Update existing NFT metadata.
+
+ Args:
+ metadata_url: Current metadata URL
+ updates: Metadata updates to apply
+
+ Returns:
+ str: New metadata URL
+
+ Raises:
+ StorageError: If update fails
+ ImmutableError: If metadata is immutable
+ """
+
+class TransactionManager:
+ def __init__(self, network_config: dict):
+ self.client = AsyncClient(network_config["endpoint"])
+
+ async def send_transaction(
+ self,
+ transaction: Transaction,
+ signers: List[Keypair]
+ ) -> str:
+ """
+ Send and confirm transaction.
+
+ Args:
+ transaction: Transaction to send
+ signers: Required transaction signers
+
+ Returns:
+ str: Transaction signature
+
+ Raises:
+ TransactionError: If transaction fails
+ """
+
+ async def verify_transaction(
+ self,
+ signature: str
+ ) -> dict:
+ """
+ Verify transaction status.
+
+ Args:
+ signature: Transaction signature
+
+ Returns:
+ dict: Transaction status and details
+ """
+
+async def transfer_nft(
+ self,
+ mint_address: str,
+ recipient_address: str,
+ options: dict = None
+) -> dict:
+ """
+ Transfer NFT to new owner.
+
+ Args:
+ mint_address: NFT mint address
+ recipient_address: Recipient wallet address
+ options: Optional transfer parameters:
+ - skip_preflight (bool): Skip preflight check
+ - max_retries (int): Maximum retry attempts
+
+ Returns:
+ dict: Transfer result containing:
+ - transaction_id (str): Transfer transaction ID
+ - old_owner (str): Previous owner address
+ - new_owner (str): New owner address
+
+ Raises:
+ TransferError: If transfer fails
+ """
+
+from teleAgent.nft.creator import NFTCreator
+
+# Initialize creator
+creator = NFTCreator({
+ "wallet_address": "YOUR_WALLET_ADDRESS",
+ "network": "devnet",
+ "api_endpoint": "https://api.devnet.solana.com"
+})
+
+# Create NFT
+nft = await creator.create_nft(
+ artwork={
+ "image_url": "https://example.com/art.png",
+ "title": "My First NFT",
+ "description": "A beautiful artwork"
+ },
+ metadata={
+ "attributes": [
+ {"trait_type": "Background", "value": "Blue"},
+ {"trait_type": "Style", "value": "Abstract"}
+ ],
+ "properties": {
+ "files": [{"uri": "https://example.com/art.png", "type": "image/png"}],
+ "category": "image"
+ }
+ }
+)
+
+# Transfer NFT
+transfer_result = await creator.transfer_nft(
+ mint_address="NFT_MINT_ADDRESS",
+ recipient_address="RECIPIENT_ADDRESS",
+ options={
+ "skip_preflight": False,
+ "max_retries": 3
+ }
+)
+
+# Verify transfer
+status = await creator.transaction_manager.verify_transaction(
+ transfer_result["transaction_id"]
+)
+
+class NFTError(Exception):
+ """Base exception for NFT-related errors"""
+ pass
+
+class NFTCreationError(NFTError):
+ """Exception for NFT creation failures"""
+ pass
+
+class MintingError(NFTError):
+ """Exception for minting failures"""
+ pass
+
+class TransferError(NFTError):
+ """Exception for transfer failures"""
+ pass
+
+try:
+ nft = await creator.create_nft(artwork, metadata)
+except NFTCreationError as e:
+ logger.error(f"Creation failed: {e}")
+ # Handle creation failure
+except MintingError as e:
+ logger.error(f"Minting failed: {e}")
+ # Handle minting failure
+except NFTError as e:
+ logger.error(f"NFT operation failed: {e}")
+ # Handle other errors
+
+Transaction Safety + ```python + async def safe_transfer(mint_address: str, recipient: str) -> dict: + """Implement safe transfer with verification""" + # Verify ownership + if not await verify_ownership(mint_address): + raise OwnershipError("Not the owner")
+# Check recipient account + if not await verify_recipient(recipient): + raise RecipientError("Invalid recipient")
+# Execute transfer + result = await transfer_nft(mint_address, recipient)
+# Verify transfer + status = await verify_transaction(result["transaction_id"]) + if not status["success"]: + raise TransferError("Transfer failed")
+return result + ```
+Metadata Validation
+ python
+ def validate_metadata(metadata: dict) -> bool:
+ """Validate NFT metadata structure"""
+ required_fields = ["name", "description", "image"]
+ return all(field in metadata for field in required_fields)
Rate Limiting + ```python + from teleAgent.utilities.rate_limiter import RateLimiter
+rate_limiter = RateLimiter( + max_requests=10, # Maximum requests per window + time_window=60 # Window size in seconds + )
+@rate_limiter.limit + async def create_nft(artwork: dict, metadata: dict): + # NFT creation logic + pass + ```
+ + + + + + + + + + + + + +Comprehensive documentation for the TeleAgent Telegram integration module.
+The main class for interacting with Telegram's Bot API.
+def __init__(self, config: dict):
+ """
+ Initialize TelegramClient.
+
+ Args:
+ config (dict): Configuration dictionary containing:
+ - bot_token (str): Telegram bot token
+ - api_id (str): Telegram API ID
+ - api_hash (str): Telegram API hash
+
+ Raises:
+ ValueError: If required config values are missing
+ TelegramError: If initialization fails
+ """
+
+async def on_message(self):
+ """
+ Decorator for handling new messages.
+
+ Example:
+ @client.on_message()
+ async def handle_message(message):
+ await message.reply("Received your message!")
+ """
+
+async def on_edited_message(self):
+ """
+ Decorator for handling edited messages.
+
+ Example:
+ @client.on_edited_message()
+ async def handle_edit(message):
+ await message.reply("Message was edited!")
+ """
+
+async def on_command(self, command: str):
+ """
+ Decorator for handling bot commands.
+
+ Args:
+ command (str): Command name without '/'
+
+ Example:
+ @client.on_command("start")
+ async def handle_start(message):
+ await message.reply("Bot started!")
+ """
+
+async def send_message(
+ self,
+ chat_id: Union[int, str],
+ text: str,
+ parse_mode: str = None,
+ reply_markup: dict = None
+) -> dict:
+ """
+ Send text message to a chat.
+
+ Args:
+ chat_id: Unique identifier for the target chat
+ text: Message text
+ parse_mode: Text parsing mode (HTML/Markdown)
+ reply_markup: Additional interface options
+
+ Returns:
+ dict: Sent message information
+
+ Raises:
+ TelegramError: If message sending fails
+ """
+
+async def send_photo(
+ self,
+ chat_id: Union[int, str],
+ photo: Union[str, bytes],
+ caption: str = None,
+ reply_markup: dict = None
+) -> dict:
+ """
+ Send photo to a chat.
+
+ Args:
+ chat_id: Unique identifier for the target chat
+ photo: Photo to send (file_id, URL, or bytes)
+ caption: Photo caption
+ reply_markup: Additional interface options
+
+ Returns:
+ dict: Sent message information
+
+ Raises:
+ TelegramError: If photo sending fails
+ """
+
+async def get_chat_member(
+ self,
+ chat_id: Union[int, str],
+ user_id: int
+) -> dict:
+ """
+ Get information about a chat member.
+
+ Args:
+ chat_id: Unique identifier for the target chat
+ user_id: Unique identifier of the target user
+
+ Returns:
+ dict: Chat member information
+
+ Raises:
+ TelegramError: If request fails
+ """
+
+async def ban_chat_member(
+ self,
+ chat_id: Union[int, str],
+ user_id: int,
+ until_date: int = None
+) -> bool:
+ """
+ Ban a user in a group.
+
+ Args:
+ chat_id: Unique identifier for the target chat
+ user_id: Unique identifier of the user to ban
+ until_date: Ban duration in Unix time
+
+ Returns:
+ bool: True on success
+
+ Raises:
+ TelegramError: If ban fails
+ """
+
+async def on_new_chat_members(self):
+ """
+ Decorator for handling new chat members.
+
+ Example:
+ @client.on_new_chat_members()
+ async def welcome(message):
+ for user in message.new_chat_members:
+ await message.reply(f"Welcome {user.first_name}!")
+ """
+
+async def on_left_chat_member(self):
+ """
+ Decorator for handling members leaving chat.
+
+ Example:
+ @client.on_left_chat_member()
+ async def goodbye(message):
+ user = message.left_chat_member
+ await message.reply(f"Goodbye {user.first_name}!")
+ """
+
+class TelegramError(Exception):
+ """Base exception for Telegram-related errors"""
+ pass
+
+class APIError(TelegramError):
+ """Exception for API request failures"""
+ pass
+
+class AuthError(TelegramError):
+ """Exception for authentication failures"""
+ pass
+
+class NetworkError(TelegramError):
+ """Exception for network-related failures"""
+ pass
+
+class Message:
+ """Represents a Telegram message"""
+
+ @property
+ def message_id(self) -> int:
+ """Unique message identifier"""
+
+ @property
+ def from_user(self) -> User:
+ """Sender of the message"""
+
+ @property
+ def chat(self) -> Chat:
+ """Chat the message belongs to"""
+
+ @property
+ def text(self) -> Optional[str]:
+ """Message text"""
+
+ @property
+ def photo(self) -> Optional[List[PhotoSize]]:
+ """Available photo sizes"""
+
+class User:
+ """Represents a Telegram user"""
+
+ @property
+ def id(self) -> int:
+ """Unique identifier for this user"""
+
+ @property
+ def first_name(self) -> str:
+ """User's first name"""
+
+ @property
+ def last_name(self) -> Optional[str]:
+ """User's last name"""
+
+ @property
+ def username(self) -> Optional[str]:
+ """User's username"""
+
+from teleAgent.integrations.telegram import TelegramClient
+
+async def main():
+ # Initialize client
+ client = TelegramClient({
+ "bot_token": "YOUR_BOT_TOKEN",
+ "api_id": "YOUR_API_ID",
+ "api_hash": "YOUR_API_HASH"
+ })
+
+ # Register message handler
+ @client.on_message()
+ async def echo(message):
+ await client.send_message(
+ chat_id=message.chat.id,
+ text=message.text
+ )
+
+ # Start the bot
+ await client.start()
+ await client.run_forever()
+
+# Register command handler
+@client.on_command("start")
+async def start_command(message):
+ await client.send_message(
+ chat_id=message.chat.id,
+ text="Welcome to the bot!",
+ reply_markup={
+ "inline_keyboard": [
+ [{
+ "text": "Help",
+ "callback_data": "help"
+ }]
+ ]
+ }
+ )
+
+try:
+ await client.send_message(chat_id, text)
+except APIError as e:
+ logger.error(f"API Error: {e}")
+ # Implement retry logic
+except NetworkError as e:
+ logger.error(f"Network Error: {e}")
+ # Wait and retry
+except TelegramError as e:
+ logger.error(f"Telegram Error: {e}")
+ # Handle other errors
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+