Web surfer test (#248)

* Add web surfer test

* delete temp files

* formatting

* fix send_message calls

* fix mypy errors

* Add web surfer test

* delete temp files

* formatting

* fix send_message calls

* fix mypy errors

* fix CI checks

* CI code formatting

* Update hatch commands, add tests to CI

* add playwright to test env

* try fixing toml

* Update .github/workflows/checks.yml

Co-authored-by: Jack Gerrits <jackgerrits@users.noreply.github.com>

* try fixing toml

* try fixing toml

* Update python/pyproject.toml

Co-authored-by: Jack Gerrits <jackgerrits@users.noreply.github.com>

* try fixing toml

* try fixing toml

* modify correct pyproject.toml file

* add missing dependency

* Add browser_utils tests

* fix check errors

* run normalize code for test

* add missing dependency

---------

Co-authored-by: Jack Gerrits <jackgerrits@users.noreply.github.com>
Co-authored-by: afourney <adam.fourney@gmail.com>
This commit is contained in:
peterychang 2024-07-26 15:34:47 -04:00 committed by GitHub
parent 7e75dc8df8
commit cf2bcd31ea
16 changed files with 3357 additions and 158 deletions

View File

@ -69,6 +69,20 @@ jobs:
- run: hatch run +python=${{ matrix.python-version }} test-matrix:pytest -n auto
working-directory: ./python
team-one-test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v4
- name: Install Hatch
uses: pypa/hatch@install
- run: |
hatch run +python=${{ matrix.python-version }} teamone-test-matrix:playwright install
hatch run +python=${{ matrix.python-version }} teamone-test-matrix:pytest -n auto
working-directory: ./python/teams/team-one
docs:
runs-on: ubuntu-latest
steps:

View File

@ -42,11 +42,14 @@ dependencies = [
"mypy==1.10.0",
"ruff==0.4.8",
"pytest",
"pytest-asyncio",
"pytest-xdist",
"aiofiles",
"types-aiofiles",
"types-requests",
"types-pillow",
"azure-identity",
"openpyxl",
]
[tool.hatch.envs.default.extra-scripts]
@ -55,15 +58,25 @@ pip = "{env:HATCH_UV} pip {args}"
[tool.hatch.envs.default.scripts]
fmt = "ruff format"
lint = "ruff check"
test = "pytest -n auto"
test = [
"playwright install",
"pytest -n auto",
]
check = [
"ruff format",
"ruff check --fix",
"pyright",
"mypy --non-interactive --install-types",
"playwright install",
"pytest",
]
[tool.hatch.envs.teamone-test-matrix]
template = "default"
[[tool.hatch.envs.teamone-test-matrix.matrix]]
python = ["3.10", "3.11", "3.12"]
[tool.hatch.metadata]
allow-direct-references = true

View File

@ -12,6 +12,7 @@ from urllib.parse import quote_plus # parse_qs, quote, unquote, urlparse, urlun
import aiofiles
from agnext.application.logging import EVENT_LOGGER_NAME
from agnext.components import FunctionCall
from agnext.components import Image as AGImage
from agnext.components.models import (
AssistantMessage,
@ -155,6 +156,7 @@ class MultimodalWebSurfer(BaseAgent):
# Create the page
self._context.set_default_timeout(60000) # One minute
self._page = await self._context.new_page()
assert self._page is not None
# self._page.route(lambda x: True, self._route_handler)
self._page.on("download", self._download_handler)
await self._page.set_viewport_size({"width": VIEWPORT_WIDTH, "height": VIEWPORT_HEIGHT})
@ -252,6 +254,162 @@ setInterval(function() {{
except Exception:
return False, f"Web surfing error:\n\n{traceback.format_exc()}"
async def _execute_tool(
self, message: List[FunctionCall], rects: Dict[str, InteractiveRegion], tool_names: str, use_ocr: bool = True
) -> Tuple[bool, UserContent]:
name = message[0].name
args = json.loads(message[0].arguments)
action_description = ""
assert self._page is not None
logger.info(
WebSurferEvent(
source=self.metadata["name"],
url=self._page.url,
action=name,
arguments=args,
message=f"{name}( {json.dumps(args)} )",
)
)
if name == "visit_url":
url = args.get("url")
action_description = f"I typed '{url}' into the browser address bar."
# Check if the argument starts with a known protocol
if url.startswith(("https://", "http://", "file://", "about:")):
await self._visit_page(url)
# If the argument contains a space, treat it as a search query
elif " " in url:
await self._visit_page(f"https://www.bing.com/search?q={quote_plus(url)}&FORM=QBLH")
# Otherwise, prefix with https://
else:
await self._visit_page("https://" + url)
elif name == "history_back":
action_description = "I clicked the browser back button."
await self._back()
elif name == "web_search":
query = args.get("query")
action_description = f"I typed '{query}' into the browser search bar."
await self._visit_page(f"https://www.bing.com/search?q={quote_plus(query)}&FORM=QBLH")
elif name == "page_up":
action_description = "I scrolled up one page in the browser."
await self._page_up()
elif name == "page_down":
action_description = "I scrolled down one page in the browser."
await self._page_down()
elif name == "click":
target_id = str(args.get("target_id"))
target_name = self._target_name(target_id, rects)
if target_name:
action_description = f"I clicked '{target_name}'."
else:
action_description = "I clicked the control."
await self._click_id(target_id)
elif name == "input_text":
input_field_id = str(args.get("input_field_id"))
text_value = str(args.get("text_value"))
input_field_name = self._target_name(input_field_id, rects)
if input_field_name:
action_description = f"I typed '{text_value}' into '{input_field_name}'."
else:
action_description = f"I input '{text_value}'."
await self._fill_id(input_field_id, text_value)
elif name == "scroll_element_up":
target_id = str(args.get("target_id"))
target_name = self._target_name(target_id, rects)
if target_name:
action_description = f"I scrolled '{target_name}' up."
else:
action_description = "I scrolled the control up."
await self._scroll_id(target_id, "up")
elif name == "scroll_element_down":
target_id = str(args.get("target_id"))
target_name = self._target_name(target_id, rects)
if target_name:
action_description = f"I scrolled '{target_name}' down."
else:
action_description = "I scrolled the control down."
await self._scroll_id(target_id, "down")
elif name == "answer_question":
question = str(args.get("question"))
# Do Q&A on the DOM. No need to take further action. Browser state does not change.
return False, await self._summarize_page(question=question)
elif name == "summarize_page":
# Summarize the DOM. No need to take further action. Browser state does not change.
return False, await self._summarize_page()
elif name == "sleep":
action_description = "I am waiting a short period of time before taking further action."
await self._sleep(3) # There's a 2s sleep below too
else:
raise ValueError(f"Unknown tool '{name}'. Please choose from:\n\n{tool_names}")
await self._page.wait_for_load_state()
await self._sleep(3)
# Handle downloads
if self._last_download is not None and self.downloads_folder is not None:
fname = os.path.join(self.downloads_folder, self._last_download.suggested_filename)
# TODO: Fix this type
await self._last_download.save_as(fname) # type: ignore
page_body = f"<html><head><title>Download Successful</title></head><body style=\"margin: 20px;\"><h1>Successfully downloaded '{self._last_download.suggested_filename}' to local path:<br><br>{fname}</h1></body></html>"
await self._page.goto(
"data:text/html;base64," + base64.b64encode(page_body.encode("utf-8")).decode("utf-8")
)
await self._page.wait_for_load_state()
# Handle metadata
page_metadata = json.dumps(await self._get_page_metadata(), indent=4)
metadata_hash = hashlib.md5(page_metadata.encode("utf-8")).hexdigest()
if metadata_hash != self._prior_metadata_hash:
page_metadata = (
"\nThe following metadata was extracted from the webpage:\n\n" + page_metadata.strip() + "\n"
)
else:
page_metadata = ""
self._prior_metadata_hash = metadata_hash
# Describe the viewport of the new page in words
viewport = await self._get_visual_viewport()
percent_visible = int(viewport["height"] * 100 / viewport["scrollHeight"])
percent_scrolled = int(viewport["pageTop"] * 100 / viewport["scrollHeight"])
if percent_scrolled < 1: # Allow some rounding error
position_text = "at the top of the page"
elif percent_scrolled + percent_visible >= 99: # Allow some rounding error
position_text = "at the bottom of the page"
else:
position_text = str(percent_scrolled) + "% down from the top of the page"
new_screenshot = await self._page.screenshot()
if self.debug_dir:
async with aiofiles.open(os.path.join(self.debug_dir, "screenshot.png"), "wb") as file:
await file.write(new_screenshot)
ocr_text = await self._get_ocr_text(new_screenshot) if use_ocr is True else ""
# Return the complete observation
message_content = "" # message.content or ""
page_title = await self._page.title()
return False, [
f"{message_content}\n\n{action_description}\n\nHere is a screenshot of [{page_title}]({self._page.url}). The viewport shows {percent_visible}% of the webpage, and is positioned {position_text}.{page_metadata}\nAutomatic OCR of the page screenshot has detected the following text:\n\n{ocr_text}".strip(),
AGImage.from_pil(Image.open(io.BytesIO(new_screenshot))),
]
async def __generate_reply(self, cancellation_token: CancellationToken) -> Tuple[bool, UserContent]:
assert self._page is not None
"""Generates the actual reply."""
@ -369,171 +527,18 @@ When deciding between tools, consider if the request can be best addressed by:
) # , "parallel_tool_calls": False})
message = response.content
action_description = ""
self._last_download = None
if isinstance(message, str):
# Answer directly
return False, message
elif isinstance(message, list):
# Take an action
name = message[0].name
args = json.loads(message[0].arguments)
logger.info(
WebSurferEvent(
source=self.metadata["name"],
url=self._page.url,
action=name,
arguments=args,
message=f"{name}( {json.dumps(args)} )",
)
)
if name == "visit_url":
url = args.get("url")
action_description = f"I typed '{url}' into the browser address bar."
# Check if the argument starts with a known protocol
if url.startswith(("https://", "http://", "file://", "about:")):
await self._visit_page(url)
# If the argument contains a space, treat it as a search query
elif " " in url:
await self._visit_page(f"https://www.bing.com/search?q={quote_plus(url)}&FORM=QBLH")
# Otherwise, prefix with https://
else:
await self._visit_page("https://" + url)
elif name == "history_back":
action_description = "I clicked the browser back button."
await self._back()
elif name == "web_search":
query = args.get("query")
action_description = f"I typed '{query}' into the browser search bar."
await self._visit_page(f"https://www.bing.com/search?q={quote_plus(query)}&FORM=QBLH")
elif name == "page_up":
action_description = "I scrolled up one page in the browser."
await self._page_up()
elif name == "page_down":
action_description = "I scrolled down one page in the browser."
await self._page_down()
elif name == "click":
target_id = str(args.get("target_id"))
target_name = self._target_name(target_id, rects)
if target_name:
action_description = f"I clicked '{target_name}'."
else:
action_description = "I clicked the control."
await self._click_id(target_id)
elif name == "input_text":
input_field_id = str(args.get("input_field_id"))
text_value = str(args.get("text_value"))
input_field_name = self._target_name(input_field_id, rects)
if input_field_name:
action_description = f"I typed '{text_value}' into '{input_field_name}'."
else:
action_description = f"I input '{text_value}'."
await self._fill_id(input_field_id, text_value)
elif name == "scroll_element_up":
target_id = str(args.get("target_id"))
target_name = self._target_name(target_id, rects)
if target_name:
action_description = f"I scrolled '{target_name}' up."
else:
action_description = "I scrolled the control up."
await self._scroll_id(target_id, "up")
elif name == "scroll_element_down":
target_id = str(args.get("target_id"))
target_name = self._target_name(target_id, rects)
if target_name:
action_description = f"I scrolled '{target_name}' down."
else:
action_description = "I scrolled the control down."
await self._scroll_id(target_id, "down")
elif name == "answer_question":
question = str(args.get("question"))
# Do Q&A on the DOM. No need to take further action. Browser state does not change.
return False, await self._summarize_page(question=question)
elif name == "summarize_page":
# Summarize the DOM. No need to take further action. Browser state does not change.
return False, await self._summarize_page()
elif name == "sleep":
action_description = "I am waiting a short period of time before taking further action."
await self._sleep(3) # There's a 2s sleep below too
else:
raise ValueError(f"Unknown tool '{name}'. Please choose from:\n\n{tool_names}")
return await self._execute_tool(message, rects, tool_names)
else:
# Not sure what happened here
raise AssertionError(f"Unknown response format '{message}'")
await self._page.wait_for_load_state()
await self._sleep(3)
# Handle downloads
if self._last_download is not None and self.downloads_folder is not None:
fname = os.path.join(self.downloads_folder, self._last_download.suggested_filename)
# TODO: Fix this type
await self._last_download.save_as(fname) # type: ignore
page_body = f"<html><head><title>Download Successful</title></head><body style=\"margin: 20px;\"><h1>Successfully downloaded '{self._last_download.suggested_filename}' to local path:<br><br>{fname}</h1></body></html>"
await self._page.goto(
"data:text/html;base64," + base64.b64encode(page_body.encode("utf-8")).decode("utf-8")
)
await self._page.wait_for_load_state()
# Handle metadata
page_metadata = json.dumps(await self._get_page_metadata(), indent=4)
metadata_hash = hashlib.md5(page_metadata.encode("utf-8")).hexdigest()
if metadata_hash != self._prior_metadata_hash:
page_metadata = (
"\nThe following metadata was extracted from the webpage:\n\n" + page_metadata.strip() + "\n"
)
else:
page_metadata = ""
self._prior_metadata_hash = metadata_hash
# Describe the viewport of the new page in words
viewport = await self._get_visual_viewport()
percent_visible = int(viewport["height"] * 100 / viewport["scrollHeight"])
percent_scrolled = int(viewport["pageTop"] * 100 / viewport["scrollHeight"])
if percent_scrolled < 1: # Allow some rounding error
position_text = "at the top of the page"
elif percent_scrolled + percent_visible >= 99: # Allow some rounding error
position_text = "at the bottom of the page"
else:
position_text = str(percent_scrolled) + "% down from the top of the page"
new_screenshot = await self._page.screenshot()
if self.debug_dir:
async with aiofiles.open(os.path.join(self.debug_dir, "screenshot.png"), "wb") as file:
await file.write(new_screenshot)
ocr_text = await self._get_ocr_text(new_screenshot)
# Return the complete observation
message_content = "" # message.content or ""
page_title = await self._page.title()
return False, [
f"{message_content}\n\n{action_description}\n\nHere is a screenshot of [{page_title}]({self._page.url}). The viewport shows {percent_visible}% of the webpage, and is positioned {position_text}.{page_metadata}\nAutomatic OCR of the page screenshot has detected the following text:\n\n{ocr_text}".strip(),
AGImage.from_pil(Image.open(io.BytesIO(new_screenshot))),
]
async def _get_interactive_rects(self) -> Dict[str, InteractiveRegion]:
assert self._page is not None
@ -591,6 +596,7 @@ When deciding between tools, consider if the request can be best addressed by:
async def _on_new_page(self, page: Page) -> None:
self._page = page
assert self._page is not None
# self._page.route(lambda x: True, self._route_handler)
self._page.on("download", self._download_handler)
await self._page.set_viewport_size({"width": VIEWPORT_WIDTH, "height": VIEWPORT_HEIGHT})

View File

@ -162,7 +162,6 @@ class PlainTextConverter(DocumentConverter):
text_content = ""
with open(local_path, "rt") as fh:
text_content = fh.read()
return DocumentConverterResult(
title=None,
text_content=text_content,
@ -893,7 +892,8 @@ class MarkdownConverter:
# Convert
return self._convert(path, extensions, **kwargs)
def convert_stream(self, stream, **kwargs: Any) -> DocumentConverterResult: # TODO: deal with kwargs
# TODO what should stream's type be?
def convert_stream(self, stream: Any, **kwargs: Any) -> DocumentConverterResult: # TODO: deal with kwargs
# Prepare a list of extensions to try (in order of priority)
ext = kwargs.get("file_extension")
extensions = [ext] if ext is not None else []
@ -969,7 +969,6 @@ class MarkdownConverter:
# Convert
result = self._convert(temp_path, extensions, url=response.url)
# Clean up
finally:
try:

View File

@ -0,0 +1,45 @@
#!/usr/bin/env python3 -m pytest
import os
import pytest
from team_one.markdown_browser import BingMarkdownSearch
skip_all = False
bing_api_key = None
if "BING_API_KEY" in os.environ:
bing_api_key = os.environ["BING_API_KEY"]
del os.environ["BING_API_KEY"]
skip_api = bing_api_key is None
BING_QUERY = "Microsoft wikipedia"
BING_STRING = f"A Bing search for '{BING_QUERY}' found"
BING_EXPECTED_RESULT = "https://en.wikipedia.org/wiki/Microsoft"
@pytest.mark.skipif(
skip_all,
reason="do not run if dependency is not installed",
)
def test_bing_markdown_search() -> None:
search_engine = BingMarkdownSearch()
results = search_engine.search(BING_QUERY)
assert BING_STRING in results
assert BING_EXPECTED_RESULT in results
@pytest.mark.skipif(
skip_api,
reason="skipping tests that require a Bing API key",
)
def test_bing_markdown_search_api() -> None:
search_engine = BingMarkdownSearch(bing_api_key=bing_api_key)
results = search_engine.search(BING_QUERY)
assert BING_STRING in results
assert BING_EXPECTED_RESULT in results
if __name__ == "__main__":
"""Runs this file's tests from the command line."""
test_bing_markdown_search()
test_bing_markdown_search_api()

View File

@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:9390b34525fd044df69265e022a06346abb6d203b14cbc9b2473c080c680e82e
size 474288

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,177 @@
#!/usr/bin/env python3 -m pytest
import io
import os
import shutil
import pytest
import requests
from team_one.markdown_browser import MarkdownConverter
skip_all = False
skip_exiftool = shutil.which("exiftool") is None
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "test_files")
JPG_TEST_EXIFTOOL = {
"Author": "AutoGen Authors",
"Title": "AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation",
"Description": "AutoGen enables diverse LLM-based applications",
"ImageSize": "1615x1967",
"DateTimeOriginal": "2024:03:14 22:10:00",
}
PDF_TEST_URL = "https://arxiv.org/pdf/2308.08155v2.pdf"
PDF_TEST_STRINGS = ["While there is contemporaneous exploration of multi-agent approaches"]
YOUTUBE_TEST_URL = "https://www.youtube.com/watch?v=V2qZ_lgxTzg"
YOUTUBE_TEST_STRINGS = [
"## AutoGen FULL Tutorial with Python (Step-By-Step)",
"This is an intermediate tutorial for installing and using AutoGen locally",
"PT15M4S",
"the model we're going to be using today is GPT 3.5 turbo", # From the transcript
]
XLSX_TEST_STRINGS = [
"## 09060124-b5e7-4717-9d07-3c046eb",
"6ff4173b-42a5-4784-9b19-f49caff4d93d",
"affc7dad-52dc-4b98-9b5d-51e65d8a8ad0",
]
DOCX_TEST_STRINGS = [
"314b0a30-5b04-470b-b9f7-eed2c2bec74a",
"49e168b7-d2ae-407f-a055-2167576f39a1",
"## d666f1f7-46cb-42bd-9a39-9a39cf2a509f",
"# Abstract",
"# Introduction",
"AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation",
]
PPTX_TEST_STRINGS = [
"2cdda5c8-e50e-4db4-b5f0-9722a649f455",
"04191ea8-5c73-4215-a1d3-1cfb43aaaf12",
"44bf7d06-5e7a-4a40-a2e1-a2e42ef28c8a",
"1b92870d-e3b5-4e65-8153-919f4ff45592",
"AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation",
]
BLOG_TEST_URL = "https://microsoft.github.io/autogen/blog/2023/04/21/LLM-tuning-math"
BLOG_TEST_STRINGS = [
"Large language models (LLMs) are powerful tools that can generate natural language texts for various applications, such as chatbots, summarization, translation, and more. GPT-4 is currently the state of the art LLM in the world. Is model selection irrelevant? What about inference parameters?",
"an example where high cost can easily prevent a generic complex",
]
WIKIPEDIA_TEST_URL = "https://en.wikipedia.org/wiki/Microsoft"
WIKIPEDIA_TEST_STRINGS = [
"Microsoft entered the operating system (OS) business in 1980 with its own version of [Unix]",
'Microsoft was founded by [Bill Gates](/wiki/Bill_Gates "Bill Gates")',
]
WIKIPEDIA_TEST_EXCLUDES = [
"You are encouraged to create an account and log in",
"154 languages",
"move to sidebar",
]
SERP_TEST_URL = "https://www.bing.com/search?q=microsoft+wikipedia"
SERP_TEST_STRINGS = [
"](https://en.wikipedia.org/wiki/Microsoft",
"Microsoft Corporation is **an American multinational corporation and technology company headquartered** in Redmond",
"19952007: Foray into the Web, Windows 95, Windows XP, and Xbox",
]
SERP_TEST_EXCLUDES = [
"https://www.bing.com/ck/a?!&&p=",
"data:image/svg+xml,%3Csvg%20width%3D",
]
@pytest.mark.skipif(
skip_all,
reason="do not run if dependency is not installed",
)
def test_mdconvert_remote() -> None:
mdconvert = MarkdownConverter()
# By URL
result = mdconvert.convert(PDF_TEST_URL)
for test_string in PDF_TEST_STRINGS:
assert test_string in result.text_content
# By stream
response = requests.get(PDF_TEST_URL)
result = mdconvert.convert_stream(io.BytesIO(response.content), file_extension=".pdf", url=PDF_TEST_URL)
for test_string in PDF_TEST_STRINGS:
assert test_string in result.text_content
# Youtube
result = mdconvert.convert(YOUTUBE_TEST_URL)
for test_string in YOUTUBE_TEST_STRINGS:
assert test_string in result.text_content
@pytest.mark.skipif(
skip_all,
reason="do not run if dependency is not installed",
)
def test_mdconvert_local() -> None:
mdconvert = MarkdownConverter()
# Test XLSX processing
result = mdconvert.convert(os.path.join(TEST_FILES_DIR, "test.xlsx"))
for test_string in XLSX_TEST_STRINGS:
text_content = result.text_content.replace('\\','')
assert test_string in text_content
# Test DOCX processing
result = mdconvert.convert(os.path.join(TEST_FILES_DIR, "test.docx"))
for test_string in DOCX_TEST_STRINGS:
text_content = result.text_content.replace('\\','')
assert test_string in text_content
# Test PPTX processing
result = mdconvert.convert(os.path.join(TEST_FILES_DIR, "test.pptx"))
for test_string in PPTX_TEST_STRINGS:
text_content = result.text_content.replace('\\','')
assert test_string in text_content
# Test HTML processing
result = mdconvert.convert(os.path.join(TEST_FILES_DIR, "test_blog.html"), url=BLOG_TEST_URL)
for test_string in BLOG_TEST_STRINGS:
text_content = result.text_content.replace('\\','')
assert test_string in text_content
# Test Wikipedia processing
result = mdconvert.convert(os.path.join(TEST_FILES_DIR, "test_wikipedia.html"), url=WIKIPEDIA_TEST_URL)
text_content = result.text_content.replace('\\','')
for test_string in WIKIPEDIA_TEST_EXCLUDES:
assert test_string not in text_content
for test_string in WIKIPEDIA_TEST_STRINGS:
assert test_string in text_content
# Test Bing processing
result = mdconvert.convert(os.path.join(TEST_FILES_DIR, "test_serp.html"), url=SERP_TEST_URL)
text_content = result.text_content.replace('\\','')
for test_string in SERP_TEST_EXCLUDES:
assert test_string not in text_content
for test_string in SERP_TEST_STRINGS:
assert test_string in text_content
@pytest.mark.skipif(
skip_exiftool,
reason="do not run if exiftool is not installed",
)
def test_mdconvert_exiftool() -> None:
mdconvert = MarkdownConverter()
# Test JPG metadata processing
result = mdconvert.convert(os.path.join(TEST_FILES_DIR, "test.jpg"))
for key in JPG_TEST_EXIFTOOL:
target = f"{key}: {JPG_TEST_EXIFTOOL[key]}"
assert target in result.text_content
if __name__ == "__main__":
"""Runs this file's tests from the command line."""
# test_mdconvert_remote()
test_mdconvert_local()
# test_mdconvert_exiftool()

View File

@ -0,0 +1,235 @@
#!/usr/bin/env python3 -m pytest
import hashlib
import math
import os
import pathlib
import re
import pytest
import requests
BLOG_POST_URL = "https://microsoft.github.io/autogen/blog/2023/04/21/LLM-tuning-math"
BLOG_POST_TITLE = "Does Model and Inference Parameter Matter in LLM Applications? - A Case Study for MATH | AutoGen"
BLOG_POST_STRING = "Large language models (LLMs) are powerful tools that can generate natural language texts for various applications, such as chatbots, summarization, translation, and more. GPT-4 is currently the state of the art LLM in the world. Is model selection irrelevant? What about inference parameters?"
BLOG_POST_FIND_ON_PAGE_QUERY = "an example where high * complex"
BLOG_POST_FIND_ON_PAGE_MATCH = "an example where high cost can easily prevent a generic complex"
WIKIPEDIA_URL = "https://en.wikipedia.org/wiki/Microsoft"
WIKIPEDIA_TITLE = "Microsoft"
WIKIPEDIA_STRING = "Redmond"
PLAIN_TEXT_URL = "https://raw.githubusercontent.com/microsoft/autogen/main/README.md"
DOWNLOAD_URL = "https://arxiv.org/src/2308.08155"
PDF_URL = "https://arxiv.org/pdf/2308.08155.pdf"
PDF_STRING = "Figure 1: AutoGen enables diverse LLM-based applications using multi-agent conversations."
DIR_TEST_STRINGS = [
"# Index of ",
"[.. (parent directory)]",
"/python/teams/team-one/tests/browser_utils",
]
LOCAL_FILE_TEST_STRINGS = [
BLOG_POST_STRING,
BLOG_POST_FIND_ON_PAGE_MATCH,
]
from team_one.markdown_browser import BingMarkdownSearch, RequestsMarkdownBrowser
skip_all = False
def _rm_folder(path: str) -> None:
"""Remove all the regular files in a folder, then deletes the folder. Assumes a flat file structure, with no subdirectories."""
for fname in os.listdir(path):
fpath = os.path.join(path, fname)
if os.path.isfile(fpath):
os.unlink(fpath)
os.rmdir(path)
def normalize_text(text: str) -> str:
text = "\n".join([line.rstrip() for line in re.split(r"\r?\n", text)])
return re.sub(r"\n{3,}", "\n\n", text)
@pytest.mark.skipif(
skip_all,
reason="do not run if dependency is not installed",
)
def test_requests_markdown_browser() -> None:
# Create a downloads folder (removing any leftover ones from prior tests)
downloads_folder = os.path.join(os.getcwd(), "downloads")
if os.path.isdir(downloads_folder):
_rm_folder(downloads_folder)
os.mkdir(downloads_folder)
# Instantiate the browser
viewport_size = 1024
browser = RequestsMarkdownBrowser(
viewport_size=viewport_size,
downloads_folder=downloads_folder,
search_engine=BingMarkdownSearch(),
)
# Test that we can visit a page and find what we expect there
top_viewport = browser.visit_page(BLOG_POST_URL)
assert browser.viewport == top_viewport
assert browser.page_title is not None
assert browser.page_title.strip() == BLOG_POST_TITLE.strip()
page_content = browser.page_content.replace('\\','')
assert BLOG_POST_STRING in page_content
# Check if page splitting works
approx_pages = math.ceil(len(browser.page_content) / viewport_size) # May be fewer, since it aligns to word breaks
assert len(browser.viewport_pages) <= approx_pages
assert abs(len(browser.viewport_pages) - approx_pages) <= 1 # allow only a small deviation
assert browser.viewport_pages[0][0] == 0
assert browser.viewport_pages[-1][1] == len(browser.page_content)
# Make sure we can reconstruct the full contents from the split pages
buffer = ""
for bounds in browser.viewport_pages:
buffer += browser.page_content[bounds[0] : bounds[1]]
assert buffer == browser.page_content
# Test scrolling (scroll all the way to the bottom)
for i in range(1, len(browser.viewport_pages)):
browser.page_down()
assert browser.viewport_current_page == i
# Test scrolloing beyond the limits
for i in range(0, 5):
browser.page_down()
assert browser.viewport_current_page == len(browser.viewport_pages) - 1
# Test scrolling (scroll all the way to the bottom)
for i in range(len(browser.viewport_pages) - 2, 0, -1):
browser.page_up()
assert browser.viewport_current_page == i
# Test scrolloing beyond the limits
for i in range(0, 5):
browser.page_up()
assert browser.viewport_current_page == 0
# Test Wikipedia handling
assert WIKIPEDIA_STRING in browser.visit_page(WIKIPEDIA_URL)
assert WIKIPEDIA_TITLE.strip() == browser.page_title.strip()
# Visit a plain-text file
response = requests.get(PLAIN_TEXT_URL)
response.raise_for_status()
expected_results = re.sub(r"\s+", " ", response.text, re.DOTALL).strip()
# Run the normalize code that the markdown request module uses
expected_results = normalize_text(expected_results)
browser.visit_page(PLAIN_TEXT_URL)
assert re.sub(r"\s+", " ", browser.page_content, re.DOTALL).strip() == expected_results
# Disrectly download a ZIP file and compute its md5
response = requests.get(DOWNLOAD_URL, stream=True)
response.raise_for_status()
expected_md5 = hashlib.md5(response.raw.read()).hexdigest()
# Download it with the browser and check for a match
viewport = browser.visit_page(DOWNLOAD_URL)
m = re.search(r"Saved file to '(.*?)'", viewport)
assert m is not None
download_loc = m.group(1)
with open(download_loc, "rb") as fh:
downloaded_md5 = hashlib.md5(fh.read()).hexdigest()
# MD%s should match
assert expected_md5 == downloaded_md5
# Fetch a PDF
viewport = browser.visit_page(PDF_URL)
assert PDF_STRING in viewport
# Test find in page
browser.visit_page(BLOG_POST_URL)
find_viewport = browser.find_on_page(BLOG_POST_FIND_ON_PAGE_QUERY)
assert find_viewport is not None
assert BLOG_POST_FIND_ON_PAGE_MATCH in find_viewport
assert find_viewport is not None
loc = browser.viewport_current_page
find_viewport = browser.find_on_page("LLM app*")
assert find_viewport is not None
# Find next using the same query
for i in range(0, 10):
find_viewport = browser.find_on_page("LLM app*")
assert find_viewport is not None
new_loc = browser.viewport_current_page
assert new_loc != loc
loc = new_loc
# Find next using find_next
for i in range(0, 10):
find_viewport = browser.find_next()
assert find_viewport is not None
new_loc = browser.viewport_current_page
assert new_loc != loc
loc = new_loc
# Bounce around
browser.viewport_current_page = 0
find_viewport = browser.find_on_page("For Further Reading")
assert find_viewport is not None
loc = browser.viewport_current_page
browser.page_up()
assert browser.viewport_current_page != loc
find_viewport = browser.find_on_page("For Further Reading")
assert find_viewport is not None
assert loc == browser.viewport_current_page
# Find something that doesn't exist
find_viewport = browser.find_on_page("7c748f9a-8dce-461f-a092-4e8d29913f2d")
assert find_viewport is None
assert loc == browser.viewport_current_page # We didn't move
# Clean up
_rm_folder(downloads_folder)
@pytest.mark.skipif(
skip_all,
reason="do not run if dependency is not installed",
)
def test_local_file_browsing() -> None:
directory = os.path.dirname(__file__)
test_file = os.path.join(directory, "test_files", "test_blog.html")
browser = RequestsMarkdownBrowser()
# Directory listing via open_local_file
viewport = browser.open_local_file(directory)
for target_string in DIR_TEST_STRINGS:
viewport = viewport.replace('\\','')
assert target_string in viewport
# Directory listing via file URI
viewport = browser.visit_page(pathlib.Path(os.path.abspath(directory)).as_uri())
for target_string in DIR_TEST_STRINGS:
viewport = viewport.replace('\\','')
assert target_string in viewport
# File access via file open_local_file
browser.open_local_file(test_file)
for target_string in LOCAL_FILE_TEST_STRINGS:
page_content = browser.page_content.replace('\\','')
assert target_string in page_content
# File access via file URI
browser.visit_page(pathlib.Path(os.path.abspath(test_file)).as_uri())
for target_string in LOCAL_FILE_TEST_STRINGS:
page_content = browser.page_content.replace('\\','')
assert target_string in page_content
if __name__ == "__main__":
"""Runs this file's tests from the command line."""
test_requests_markdown_browser()
test_local_file_browsing()

View File

@ -0,0 +1,11 @@
skip_openai: bool = False
skip_redis: bool = False
skip_docker: bool = False
reason: str = "requested to skip"
MOCK_OPEN_AI_API_KEY: str = "sk-mockopenaiAPIkeyinexpectedformatfortestingonly"
MOCK_CHAT_COMPLETION_KWARGS: str = """
{
"api_key": "sk-mockopenaiAPIkeyinexpectedformatfortestingonly",
"model": "gpt-4o-2024-05-13"
}
"""

View File

@ -0,0 +1,248 @@
#!/usr/bin/env python3 -m pytest
import os
import re
import sys
from math import ceil
import asyncio
import pytest
pytest_plugins = ('pytest_asyncio',)
from json import dumps
from team_one.utils import (
ENVIRON_KEY_CHAT_COMPLETION_PROVIDER,
ENVIRON_KEY_CHAT_COMPLETION_KWARGS_JSON,
create_completion_client_from_env
)
from team_one.agents.user_proxy import UserProxy
from team_one.agents.orchestrator import RoundRobinOrchestrator
from team_one.messages import BroadcastMessage
from agnext.application import SingleThreadedAgentRuntime
from agnext.components import FunctionCall
from agnext.components.models import (
UserMessage,
)
from agnext.components.tools._base import ToolSchema
from openai import AuthenticationError
sys.path.append(os.path.join(os.path.dirname(__file__), "../.."))
from conftest import MOCK_CHAT_COMPLETION_KWARGS, reason
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
#from test_assistant_agent import KEY_LOC, OAI_CONFIG_LIST # noqa: E402
BLOG_POST_URL = "https://microsoft.github.io/autogen/blog/2023/04/21/LLM-tuning-math"
BLOG_POST_TITLE = "Does Model and Inference Parameter Matter in LLM Applications? - A Case Study for MATH | AutoGen"
BING_QUERY = "Microsoft"
from team_one.agents.multimodal_web_surfer import MultimodalWebSurfer
from team_one.agents.multimodal_web_surfer.tool_definitions import (
TOOL_PAGE_DOWN,
TOOL_PAGE_UP,
TOOL_READ_PAGE_AND_ANSWER,
TOOL_SUMMARIZE_PAGE,
TOOL_VISIT_URL,
TOOL_WEB_SEARCH,
)
skip_all = False
#except ImportError:
# skip_all = True
#else:
# skip_all = False
#try:
# BING_API_KEY = os.environ["BING_API_KEY"]
#except KeyError:
# skip_bing = True
#else:
# skip_bing = False
# Search currently does not require an API key
skip_bing = False
if os.getenv(ENVIRON_KEY_CHAT_COMPLETION_KWARGS_JSON):
skip_openai = False
else:
skip_openai = True
def generate_tool_request(tool: ToolSchema, args: dict[str, str]) -> list[FunctionCall]:
ret = [FunctionCall(id='', arguments='', name=tool["name"])]
ret[0].arguments = dumps(args)
return ret
async def make_browser_request(browser: MultimodalWebSurfer, tool: ToolSchema, args: dict[str, str]={}) -> str:
rects = await browser._get_interactive_rects() # type: ignore
req = generate_tool_request(tool, args)
return str((await browser._execute_tool(req, rects, "", use_ocr=False))[1][0]) # type: ignore
@pytest.mark.skipif(
skip_all,
reason="do not run if dependency is not installed",
)
@pytest.mark.asyncio
async def test_web_surfer() -> None:
env = {
ENVIRON_KEY_CHAT_COMPLETION_PROVIDER: "openai",
ENVIRON_KEY_CHAT_COMPLETION_KWARGS_JSON: MOCK_CHAT_COMPLETION_KWARGS
}
runtime = SingleThreadedAgentRuntime()
# Create an appropriate client
client = create_completion_client_from_env(env)
# Register agents.
# Register agents.
web_surfer = await runtime.register_and_get_proxy(
"WebSurfer",
lambda: MultimodalWebSurfer(),
)
run_context = runtime.start()
actual_surfer = await runtime.try_get_underlying_agent_instance(web_surfer.id, MultimodalWebSurfer)
await actual_surfer.init(model_client=client, downloads_folder=os.getcwd(), browser_channel="chromium")
# Test some basic navigations
tool_resp = await make_browser_request(actual_surfer, TOOL_VISIT_URL, {"url": BLOG_POST_URL})
metadata = await actual_surfer._get_page_metadata() # type: ignore
assert f"{BLOG_POST_URL}".strip() in metadata["meta_tags"]["og:url"]
assert f"{BLOG_POST_TITLE}".strip() in metadata["meta_tags"]["og:title"]
# Get the % of the page the viewport shows so we can check it scrolled down properly
m = re.search(r"\bThe viewport shows (\d+)% of the webpage", tool_resp)
assert m is not None
viewport_percentage = int(m.group(1))
tool_resp = await make_browser_request(actual_surfer, TOOL_PAGE_DOWN)
assert (
f"The viewport shows {viewport_percentage}% of the webpage, and is positioned {viewport_percentage}% down from the top of the page." in tool_resp
) # Assumes the content is longer than one screen
tool_resp = await make_browser_request(actual_surfer, TOOL_PAGE_UP)
assert (
f"The viewport shows {viewport_percentage}% of the webpage, and is positioned at the top of the page" in tool_resp
) # Assumes the content is longer than one screen
# # Try to scroll too far back up
tool_resp = await make_browser_request(actual_surfer, TOOL_PAGE_UP)
assert (
f"The viewport shows {viewport_percentage}% of the webpage, and is positioned at the top of the page" in tool_resp
)
# Try to scroll too far down
total_pages = ceil(100/viewport_percentage)
for _ in range(0, total_pages + 1):
tool_resp = await make_browser_request(actual_surfer, TOOL_PAGE_DOWN)
assert (
f"The viewport shows {viewport_percentage}% of the webpage, and is positioned at the bottom of the page" in tool_resp
)
# Test Q&A and summarization -- we don't have a key so we expect it to fail #(but it means the code path is correct)
with pytest.raises(AuthenticationError):
tool_resp = await make_browser_request(actual_surfer, TOOL_READ_PAGE_AND_ANSWER, {"question": "When was it founded?"})
with pytest.raises(AuthenticationError):
tool_resp = await make_browser_request(actual_surfer, TOOL_SUMMARIZE_PAGE)
await run_context.stop_when_idle()
@pytest.mark.skipif(
skip_all or skip_openai,
reason="dependency is not installed OR" + reason,
)
@pytest.mark.asyncio
async def test_web_surfer_oai() -> None:
runtime = SingleThreadedAgentRuntime()
# Create an appropriate client
client = create_completion_client_from_env()
# Register agents.
web_surfer = await runtime.register_and_get_proxy(
"WebSurfer",
lambda: MultimodalWebSurfer(),
)
user_proxy = await runtime.register_and_get_proxy(
"UserProxy",
lambda: UserProxy(),
)
await runtime.register("orchestrator", lambda: RoundRobinOrchestrator([web_surfer, user_proxy]))
run_context = runtime.start()
actual_surfer = await runtime.try_get_underlying_agent_instance(web_surfer.id, MultimodalWebSurfer)
await actual_surfer.init(model_client=client, downloads_folder=os.getcwd(), browser_channel="chromium")
await runtime.send_message(
BroadcastMessage(content=UserMessage(content="Please visit the page 'https://en.wikipedia.org/wiki/Microsoft'", source="user")),
recipient=web_surfer.id,
sender=user_proxy.id
)
await runtime.send_message(
BroadcastMessage(content=UserMessage(content="Please scroll down.", source="user")),
recipient=web_surfer.id,
sender=user_proxy.id
)
await runtime.send_message(
BroadcastMessage(content=UserMessage(content="Please scroll up.", source="user")),
recipient=web_surfer.id,
sender=user_proxy.id
)
await runtime.send_message(
BroadcastMessage(content=UserMessage(content="When was it founded?", source="user")),
recipient=web_surfer.id,
sender=user_proxy.id
)
await runtime.send_message(
BroadcastMessage(content=UserMessage(content="What's this page about?", source="user")),
recipient=web_surfer.id,
sender=user_proxy.id
)
await run_context.stop_when_idle()
@pytest.mark.skipif(
skip_bing,
reason="do not run if bing api key is not available",
)
@pytest.mark.asyncio
async def test_web_surfer_bing() -> None:
env = {
ENVIRON_KEY_CHAT_COMPLETION_PROVIDER: "openai",
ENVIRON_KEY_CHAT_COMPLETION_KWARGS_JSON: MOCK_CHAT_COMPLETION_KWARGS
}
runtime = SingleThreadedAgentRuntime()
# Create an appropriate client
client = create_completion_client_from_env(env)
# Register agents.
# Register agents.
web_surfer = await runtime.register_and_get_proxy(
"WebSurfer",
lambda: MultimodalWebSurfer(),
)
run_context = runtime.start()
actual_surfer = await runtime.try_get_underlying_agent_instance(web_surfer.id, MultimodalWebSurfer)
await actual_surfer.init(model_client=client, downloads_folder=os.getcwd(), browser_channel="chromium")
# Test some basic navigations
tool_resp = await make_browser_request(actual_surfer, TOOL_WEB_SEARCH, {"query": BING_QUERY})
metadata = await actual_surfer._get_page_metadata() # type: ignore
assert f"{BING_QUERY}".strip() in metadata["meta_tags"]["og:url"]
assert f"{BING_QUERY}".strip() in metadata["meta_tags"]["og:title"]
assert f"I typed '{BING_QUERY}' into the browser search bar." in tool_resp.replace("\\","")
tool_resp = await make_browser_request(actual_surfer, TOOL_WEB_SEARCH, {"query": BING_QUERY + " Wikipedia"})
markdown = await actual_surfer._get_page_markdown() # type: ignore
assert "https://en.wikipedia.org/wiki/" in markdown
await run_context.stop_when_idle()
if __name__ == "__main__":
"""Runs this file's tests from the command line."""
asyncio.run(test_web_surfer())
asyncio.run(test_web_surfer_oai())
asyncio.run(test_web_surfer_bing())