Modified code to self-host the bot mentioned in my profile with koboldcpp:
import asyncio
from simpx import BotProfile, SimpleXBot
from simpx.download import SimpleXDaemon
from simpx.extension import ChatWrapper
from datetime import datetime, timedelta, timezone
import traceback
import os
import time
import logging
chat_histories: dict[str, dict] = {}
try:
from openai import OpenAI
aiclient = OpenAI(
base_url="
http://localhost:5001/v1/",
api_key="KOBOLDCPP",
)
except ImportError:
aiclient = None
print("ai client import error")
CHAT_MODEL = "koboldcpp/mlabonne_Qwen3-0.6B-abliterated-Q3_K_L"
# Example usage
if __name__ == "__main__":
daemon = SimpleXDaemon()
daemon.run(port_number=5225)
# Create a bot profile
profile = BotProfile(
display_name="SimpleX Chat AI bot",
full_name="",
description="",
welcome_message="""This is a test chatbot connected to [FluxAI](
https://ai.runonflux.com/). You can use it for technical and other questions.
Use /reset command to start a new session. Session will automatically reset after 5 minutes without messages.
Do NOT share private or security-sensitive information!""",
auto_accept_message="",
command_prefix="/",
)
# Create the bot with the profile
bot = SimpleXBot(profile)
@bot.command(name="info", help="Shows bot information")
async def info_command(chat_info, profile):
"""Command that shows information about the bot's profile."""
await bot.send_message(
chat_info,
f"*Bot Information*\n"
f"Name: {profile.display_name}\n"
f"Description: {profile.description}\n"
f"Address: {profile.address}",
)
@bot.command(name="reset", help="Reset the conversation history for this chat")
async def reset_command(chat_info):
chat_type = chat_info.get("type")
if chat_type == "direct":
chat_id = chat_info["contact"]["contactId"]
elif chat_type == "group":
chat_id = chat_info["groupInfo"]["groupId"]
else:
return
chat_histories[chat_id]["messages"] = [] # wipe the list
await bot.send_message(chat_info, "Conversation history has been reset.")
if aiclient is not None:
@bot.on_message()
async def handle_plain_message(text: str, chat_info: dict, chat_item: dict):
"""
Demonstrate live messaging with an AI streaming response from OpenRouter.
Args:
chat_info: Chat information dictionary.
args: User's prompt.
"""
# Create a ChatWrapper instance using the provided chat_info and the bot's client.
chat = ChatWrapper(chat_info, bot.client)
chat_type = chat_info.get("type")
if chat_type == "direct":
chat_id = chat_info["contact"]["contactId"]
elif chat_type == "group":
chat_id = chat_info["groupInfo"]["groupId"]
else:
return
now = datetime.now(timezone.utc)
# Initialize history entry if it doesn't exist
if chat_id not in chat_histories:
chat_histories[chat_id] = {"messages": [], "last_seen": now}
else:
# Check if 5 minutes have passed since the last message
last_seen = chat_histories[chat_id]["last_seen"]
if now - last_seen >= timedelta(minutes=5):
# Clear the conversation history
chat_histories[chat_id]["messages"] = []
# Update last_seen timestamp
chat_histories[chat_id]["last_seen"] = now
# Add the new user message to the history
chat_histories[chat_id]["messages"].append(
{"role": "user", "content": text}
)
try:
# Start a live message session with an initial text.
initial_text = "Thinking..."
live_msg = await bot.send_message(chat, initial_text, live=True, ttl=60)
current_response = ""
# Create the chat completion request with streaming enabled.
response = aiclient.chat.completions.create(
model=CHAT_MODEL,
messages=chat_histories[chat_id]["messages"],
stream=True, # Enable streaming of the response.
)
# Buffer to accumulate tokens until a sentence is complete
buffer = ""
last_update = time.time()
UPDATE_INTERVAL = 2 # seconds
async def flush_buffer(force: bool = False):
"""Push complete sentences to the live message at UPDATE_INTERVAL-second cadence."""
nonlocal buffer, current_response, last_update
now = time.time()
if force or (
buffer.rstrip().endswith((".", "?", "!"))
and now - last_update >= UPDATE_INTERVAL
):
current_response += buffer
await live_msg.update_live(current_response.strip())
buffer = ""
last_update = now
# Process each streaming chunk.
for chunk in response:
# Extract text content from the current chunk.
# Assumes chunk structure similar to OpenAI's streaming response.
chunk_text = chunk.choices[0].delta.content or ""
if "<|eot_id|>" in chunk_text:
chunk_text = chunk_text.replace("<|eot_id|>", "")
if chunk_text: # keep any leftover text
buffer += chunk_text
break # no more data expected
buffer += chunk_text
await flush_buffer()
# Final flush for any remaining text
await flush_buffer(force=True)
chat_histories[chat_id]["messages"].append(
{"role": "assistant", "content": current_response}
)
# Finalize the live message.
await live_msg.finish_live()
except Exception as e:
# In case of error, finalize the live message.
traceback.print_exc()
await live_msg.finish_live()
# Start the bot
try:
asyncio.run(bot.start())
except KeyboardInterrupt:
logging.info("Exiting")