Custom Runtime Logger <> FileLogger (#2596)

* added logger param for custom logger support

* added FileLogger

* bump: spell check

* bump: import error

* added more log functionalites

* bump: builtin logger for FileLogger

* type check and instance level logger

* tests added for the fileLogger

* formatting bump

* updated tests and removed time formatting

* separate module for the filelogger

* update file logger test

* added the FileLogger into the notebook

* bump json decode error

* updated requested changes

* Updated tests with AutoGen agents

* bump file

* bump: logger accessed before intializedsolved

* Updated notebook to guide with a filename

* added thread_id to the FileLogger

* bump type check in tests

* Updated thread_id for each log event

* Updated thread_id for each log event

* Updated with tempfile

* bump: str cleanup

* skipping-windows tests

---------

Co-authored-by: Chi Wang <wang.chi@microsoft.com>
This commit is contained in:
HRUSHIKESH DOKALA 2024-05-16 11:24:05 +05:30 committed by GitHub
parent cd44932347
commit d461100bfa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 451 additions and 10 deletions

View File

@ -0,0 +1,211 @@
from __future__ import annotations
import json
import logging
import os
import threading
import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from openai import AzureOpenAI, OpenAI
from openai.types.chat import ChatCompletion
from autogen.logger.base_logger import BaseLogger
from autogen.logger.logger_utils import get_current_ts, to_dict
from .base_logger import LLMConfig
if TYPE_CHECKING:
from autogen import Agent, ConversableAgent, OpenAIWrapper
logger = logging.getLogger(__name__)
class FileLogger(BaseLogger):
def __init__(self, config: Dict[str, Any]):
self.config = config
self.session_id = str(uuid.uuid4())
curr_dir = os.getcwd()
self.log_dir = os.path.join(curr_dir, "autogen_logs")
os.makedirs(self.log_dir, exist_ok=True)
self.log_file = os.path.join(self.log_dir, self.config.get("filename", "runtime.log"))
try:
with open(self.log_file, "a"):
pass
except Exception as e:
logger.error(f"[file_logger] Failed to create logging file: {e}")
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(self.log_file)
self.logger.addHandler(file_handler)
def start(self) -> str:
"""Start the logger and return the session_id."""
try:
self.logger.info(f"Started new session with Session ID: {self.session_id}")
except Exception as e:
logger.error(f"[file_logger] Failed to create logging file: {e}")
finally:
return self.session_id
def log_chat_completion(
self,
invocation_id: uuid.UUID,
client_id: int,
wrapper_id: int,
request: Dict[str, Union[float, str, List[Dict[str, str]]]],
response: Union[str, ChatCompletion],
is_cached: int,
cost: float,
start_time: str,
) -> None:
"""
Log a chat completion.
"""
thread_id = threading.get_ident()
try:
log_data = json.dumps(
{
"invocation_id": str(invocation_id),
"client_id": client_id,
"wrapper_id": wrapper_id,
"request": to_dict(request),
"response": str(response),
"is_cached": is_cached,
"cost": cost,
"start_time": start_time,
"end_time": get_current_ts(),
"thread_id": thread_id,
}
)
self.logger.info(log_data)
except Exception as e:
self.logger.error(f"[file_logger] Failed to log chat completion: {e}")
def log_new_agent(self, agent: ConversableAgent, init_args: Dict[str, Any] = {}) -> None:
"""
Log a new agent instance.
"""
thread_id = threading.get_ident()
try:
log_data = json.dumps(
{
"id": id(agent),
"agent_name": agent.name if hasattr(agent, "name") and agent.name is not None else "",
"wrapper_id": to_dict(
agent.client.wrapper_id if hasattr(agent, "client") and agent.client is not None else ""
),
"session_id": self.session_id,
"current_time": get_current_ts(),
"agent_type": type(agent).__name__,
"args": to_dict(init_args),
"thread_id": thread_id,
}
)
self.logger.info(log_data)
except Exception as e:
self.logger.error(f"[file_logger] Failed to log new agent: {e}")
def log_event(self, source: Union[str, Agent], name: str, **kwargs: Dict[str, Any]) -> None:
"""
Log an event from an agent or a string source.
"""
from autogen import Agent
# This takes an object o as input and returns a string. If the object o cannot be serialized, instead of raising an error,
# it returns a string indicating that the object is non-serializable, along with its type's qualified name obtained using __qualname__.
json_args = json.dumps(kwargs, default=lambda o: f"<<non-serializable: {type(o).__qualname__}>>")
thread_id = threading.get_ident()
if isinstance(source, Agent):
try:
log_data = json.dumps(
{
"source_id": id(source),
"source_name": str(source.name) if hasattr(source, "name") else source,
"event_name": name,
"agent_module": source.__module__,
"agent_class": source.__class__.__name__,
"json_state": json_args,
"timestamp": get_current_ts(),
"thread_id": thread_id,
}
)
self.logger.info(log_data)
except Exception as e:
self.logger.error(f"[file_logger] Failed to log event {e}")
else:
try:
log_data = json.dumps(
{
"source_id": id(source),
"source_name": str(source.name) if hasattr(source, "name") else source,
"event_name": name,
"json_state": json_args,
"timestamp": get_current_ts(),
"thread_id": thread_id,
}
)
self.logger.info(log_data)
except Exception as e:
self.logger.error(f"[file_logger] Failed to log event {e}")
def log_new_wrapper(
self, wrapper: OpenAIWrapper, init_args: Dict[str, Union[LLMConfig, List[LLMConfig]]] = {}
) -> None:
"""
Log a new wrapper instance.
"""
thread_id = threading.get_ident()
try:
log_data = json.dumps(
{
"wrapper_id": id(wrapper),
"session_id": self.session_id,
"json_state": json.dumps(init_args),
"timestamp": get_current_ts(),
"thread_id": thread_id,
}
)
self.logger.info(log_data)
except Exception as e:
self.logger.error(f"[file_logger] Failed to log event {e}")
def log_new_client(self, client: AzureOpenAI | OpenAI, wrapper: OpenAIWrapper, init_args: Dict[str, Any]) -> None:
"""
Log a new client instance.
"""
thread_id = threading.get_ident()
try:
log_data = json.dumps(
{
"client_id": id(client),
"wrapper_id": id(wrapper),
"session_id": self.session_id,
"class": type(client).__name__,
"json_state": json.dumps(init_args),
"timestamp": get_current_ts(),
"thread_id": thread_id,
}
)
self.logger.info(log_data)
except Exception as e:
self.logger.error(f"[file_logger] Failed to log event {e}")
def get_connection(self) -> None:
"""Method is intentionally left blank because there is no specific connection needed for the FileLogger."""
pass
def stop(self) -> None:
"""Close the file handler and remove it from the logger."""
for handler in self.logger.handlers:
if isinstance(handler, logging.FileHandler):
handler.close()
self.logger.removeHandler(handler)

View File

@ -1,6 +1,7 @@
from typing import Any, Dict, Optional from typing import Any, Dict, Literal, Optional
from autogen.logger.base_logger import BaseLogger from autogen.logger.base_logger import BaseLogger
from autogen.logger.file_logger import FileLogger
from autogen.logger.sqlite_logger import SqliteLogger from autogen.logger.sqlite_logger import SqliteLogger
__all__ = ("LoggerFactory",) __all__ = ("LoggerFactory",)
@ -8,11 +9,15 @@ __all__ = ("LoggerFactory",)
class LoggerFactory: class LoggerFactory:
@staticmethod @staticmethod
def get_logger(logger_type: str = "sqlite", config: Optional[Dict[str, Any]] = None) -> BaseLogger: def get_logger(
logger_type: Literal["sqlite", "file"] = "sqlite", config: Optional[Dict[str, Any]] = None
) -> BaseLogger:
if config is None: if config is None:
config = {} config = {}
if logger_type == "sqlite": if logger_type == "sqlite":
return SqliteLogger(config) return SqliteLogger(config)
elif logger_type == "file":
return FileLogger(config)
else: else:
raise ValueError(f"[logger_factory] Unknown logger type: {logger_type}") raise ValueError(f"[logger_factory] Unknown logger type: {logger_type}")

View File

@ -3,12 +3,12 @@ from __future__ import annotations
import logging import logging
import sqlite3 import sqlite3
import uuid import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union
from openai import AzureOpenAI, OpenAI from openai import AzureOpenAI, OpenAI
from openai.types.chat import ChatCompletion from openai.types.chat import ChatCompletion
from autogen.logger.base_logger import LLMConfig from autogen.logger.base_logger import BaseLogger, LLMConfig
from autogen.logger.logger_factory import LoggerFactory from autogen.logger.logger_factory import LoggerFactory
if TYPE_CHECKING: if TYPE_CHECKING:
@ -20,10 +20,26 @@ autogen_logger = None
is_logging = False is_logging = False
def start(logger_type: str = "sqlite", config: Optional[Dict[str, Any]] = None) -> str: def start(
logger: Optional[BaseLogger] = None,
logger_type: Literal["sqlite", "file"] = "sqlite",
config: Optional[Dict[str, Any]] = None,
) -> str:
"""
Start logging for the runtime.
Args:
logger (BaseLogger): A logger instance
logger_type (str): The type of logger to use (default: sqlite)
config (dict): Configuration for the logger
Returns:
session_id (str(uuid.uuid4)): a unique id for the logging session
"""
global autogen_logger global autogen_logger
global is_logging global is_logging
if logger:
autogen_logger = logger
else:
autogen_logger = LoggerFactory.get_logger(logger_type=logger_type, config=config) autogen_logger = LoggerFactory.get_logger(logger_type=logger_type, config=config)
try: try:

View File

@ -8,6 +8,10 @@
"\n", "\n",
"AutoGen offers utilities to log data for debugging and performance analysis. This notebook demonstrates how to use them. \n", "AutoGen offers utilities to log data for debugging and performance analysis. This notebook demonstrates how to use them. \n",
"\n", "\n",
"we log data in different modes:\n",
"- SQlite Database\n",
"- File \n",
"\n",
"In general, users can initiate logging by calling `autogen.runtime_logging.start()` and stop logging by calling `autogen.runtime_logging.stop()`" "In general, users can initiate logging by calling `autogen.runtime_logging.start()` and stop logging by calling `autogen.runtime_logging.stop()`"
] ]
}, },
@ -287,6 +291,82 @@
" + str(round(session_cost, 4))\n", " + str(round(session_cost, 4))\n",
")" ")"
] ]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Log data in File mode\n",
"\n",
"By default, the log type is set to `sqlite` as shown above, but we introduced a new parameter for the `autogen.runtime_logging.start()`\n",
"\n",
"the `logger_type = \"file\"` will start to log data in the File mode."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Logging session ID: ed493ebf-d78e-49f0-b832-69557276d557\n",
"\u001b[33muser_proxy\u001b[0m (to assistant):\n",
"\n",
"What is the height of the Eiffel Tower? Only respond with the answer and terminate\n",
"\n",
"--------------------------------------------------------------------------------\n",
"\u001b[33massistant\u001b[0m (to user_proxy):\n",
"\n",
"The height of the Eiffel Tower is 330 meters.\n",
"TERMINATE\n",
"\n",
"--------------------------------------------------------------------------------\n"
]
}
],
"source": [
"\n",
"import pandas as pd\n",
"\n",
"import autogen\n",
"from autogen import AssistantAgent, UserProxyAgent\n",
"\n",
"# Setup API key. Add your own API key to config file or environment variable\n",
"llm_config = {\n",
" \"config_list\": autogen.config_list_from_json(\n",
" env_or_file=\"OAI_CONFIG_LIST\",\n",
" ),\n",
" \"temperature\": 0.9,\n",
"}\n",
"\n",
"# Start logging with logger_type and the filename to log to\n",
"logging_session_id = autogen.runtime_logging.start(logger_type=\"file\", config={\"filename\": \"runtime.log\"})\n",
"print(\"Logging session ID: \" + str(logging_session_id))\n",
"\n",
"# Create an agent workflow and run it\n",
"assistant = AssistantAgent(name=\"assistant\", llm_config=llm_config)\n",
"user_proxy = UserProxyAgent(\n",
" name=\"user_proxy\",\n",
" code_execution_config=False,\n",
" human_input_mode=\"NEVER\",\n",
" is_termination_msg=lambda msg: \"TERMINATE\" in msg[\"content\"],\n",
")\n",
"\n",
"user_proxy.initiate_chat(\n",
" assistant, message=\"What is the height of the Eiffel Tower? Only respond with the answer and terminate\"\n",
")\n",
"autogen.runtime_logging.stop()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This should create a `runtime.log` file in your current directory. "
]
} }
], ],
"metadata": { "metadata": {
@ -312,7 +392,7 @@
"name": "python", "name": "python",
"nbconvert_exporter": "python", "nbconvert_exporter": "python",
"pygments_lexer": "ipython3", "pygments_lexer": "ipython3",
"version": "3.11.7" "version": "3.9.13"
} }
}, },
"nbformat": 4, "nbformat": 4,

View File

@ -0,0 +1,127 @@
import json
import os
import sys
import tempfile
import uuid
import pytest
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
sys.path.append(os.path.join(os.path.dirname(__file__), "../.."))
from conftest import skip_openai # noqa: E402
from test_assistant_agent import KEY_LOC, OAI_CONFIG_LIST # noqa: E402
import autogen
import autogen.runtime_logging
from autogen.logger.file_logger import FileLogger
is_windows = sys.platform.startswith("win")
@pytest.mark.skipif(is_windows, reason="Skipping file logging tests on Windows")
@pytest.fixture
def logger() -> FileLogger:
current_dir = os.path.dirname(os.path.abspath(__file__))
with tempfile.TemporaryDirectory(dir=current_dir) as temp_dir:
log_file = os.path.join(temp_dir, "test_log.log")
config = {"filename": log_file}
logger = FileLogger(config)
yield logger
logger.stop()
@pytest.mark.skipif(is_windows, reason="Skipping file logging tests on Windows")
def test_start(logger: FileLogger):
session_id = logger.start()
assert isinstance(session_id, str)
assert len(session_id) == 36
@pytest.mark.skipif(is_windows, reason="Skipping file logging tests on Windows")
def test_log_chat_completion(logger: FileLogger):
invocation_id = uuid.uuid4()
client_id = 123456789
wrapper_id = 987654321
request = {"messages": [{"content": "Test message", "role": "user"}]}
response = "Test response"
is_cached = 0
cost = 0.5
start_time = "2024-05-06 15:20:21.263231"
logger.log_chat_completion(invocation_id, client_id, wrapper_id, request, response, is_cached, cost, start_time)
with open(logger.log_file, "r") as f:
lines = f.readlines()
assert len(lines) == 1
log_data = json.loads(lines[0])
assert log_data["invocation_id"] == str(invocation_id)
assert log_data["client_id"] == client_id
assert log_data["wrapper_id"] == wrapper_id
assert log_data["response"] == response
assert log_data["is_cached"] == is_cached
assert log_data["cost"] == cost
assert log_data["start_time"] == start_time
assert isinstance(log_data["thread_id"], int)
class TestWrapper:
def __init__(self, init_args):
self.init_args = init_args
@pytest.mark.skipif(is_windows, reason="Skipping file logging tests on Windows")
def test_log_new_agent(logger: FileLogger):
agent = autogen.UserProxyAgent(name="user_proxy", code_execution_config=False)
logger.log_new_agent(agent)
with open(logger.log_file, "r") as f:
lines = f.readlines()
log_data = json.loads(lines[0]) # the first line is the session id
assert log_data["agent_name"] == "user_proxy"
@pytest.mark.skipif(is_windows, reason="Skipping file logging tests on Windows")
def test_log_event(logger: FileLogger):
source = autogen.AssistantAgent(name="TestAgent", code_execution_config=False)
name = "TestEvent"
kwargs = {"key": "value"}
logger.log_event(source, name, **kwargs)
with open(logger.log_file, "r") as f:
lines = f.readlines()
log_data = json.loads(lines[0])
assert log_data["source_name"] == "TestAgent"
assert log_data["event_name"] == name
assert log_data["json_state"] == json.dumps(kwargs)
assert isinstance(log_data["thread_id"], int)
@pytest.mark.skipif(is_windows, reason="Skipping file logging tests on Windows")
def test_log_new_wrapper(logger: FileLogger):
wrapper = TestWrapper(init_args={"foo": "bar"})
logger.log_new_wrapper(wrapper, wrapper.init_args)
with open(logger.log_file, "r") as f:
lines = f.readlines()
log_data = json.loads(lines[0])
assert log_data["wrapper_id"] == id(wrapper)
assert log_data["json_state"] == json.dumps(wrapper.init_args)
assert isinstance(log_data["thread_id"], int)
@pytest.mark.skipif(is_windows, reason="Skipping file logging tests on Windows")
def test_log_new_client(logger: FileLogger):
client = autogen.UserProxyAgent(name="user_proxy", code_execution_config=False)
wrapper = TestWrapper(init_args={"foo": "bar"})
init_args = {"foo": "bar"}
logger.log_new_client(client, wrapper, init_args)
with open(logger.log_file, "r") as f:
lines = f.readlines()
log_data = json.loads(lines[0])
assert log_data["client_id"] == id(client)
assert log_data["wrapper_id"] == id(wrapper)
assert log_data["json_state"] == json.dumps(init_args)
assert isinstance(log_data["thread_id"], int)

View File

@ -6,14 +6,16 @@ import uuid
import pytest import pytest
import autogen
import autogen.runtime_logging
sys.path.append(os.path.join(os.path.dirname(__file__), "..")) sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
sys.path.append(os.path.join(os.path.dirname(__file__), "../..")) sys.path.append(os.path.join(os.path.dirname(__file__), "../.."))
from conftest import skip_openai # noqa: E402 from conftest import skip_openai # noqa: E402
from test_assistant_agent import KEY_LOC, OAI_CONFIG_LIST # noqa: E402 from test_assistant_agent import KEY_LOC, OAI_CONFIG_LIST # noqa: E402
import autogen
import autogen.runtime_logging
TEACHER_MESSAGE = """ TEACHER_MESSAGE = """
You are roleplaying a math teacher, and your job is to help your students with linear algebra. You are roleplaying a math teacher, and your job is to help your students with linear algebra.
Keep your explanations short. Keep your explanations short.