[lldb] [test] Split TestGdbRemoteFork in two

Split the test that's gotten very long in two, in hope that it will
resolve the arm/aarch64 buildbot failures.  Even if it does not, it
could help pinpointing where the problem lies.

Sponsored by: The FreeBSD Foundation
This commit is contained in:
Michał Górny 2022-06-29 06:56:32 +02:00
parent 9c04851cf5
commit 7fc12da898
3 changed files with 322 additions and 313 deletions

View File

@ -0,0 +1,204 @@
import gdbremote_testcase
class GdbRemoteForkTestBase(gdbremote_testcase.GdbRemoteTestCaseBase):
fork_regex = ("[$]T05thread:p([0-9a-f]+)[.]([0-9a-f]+);.*"
"{}:p([0-9a-f]+)[.]([0-9a-f]+).*")
fork_regex_nonstop = ("%Stop:T05thread:p([0-9a-f]+)[.]([0-9a-f]+);.*"
"{}:p([0-9a-f]+)[.]([0-9a-f]+).*")
fork_capture = {1: "parent_pid", 2: "parent_tid",
3: "child_pid", 4: "child_tid"}
def start_fork_test(self, args, variant="fork", nonstop=False):
self.build()
self.prep_debug_monitor_and_inferior(inferior_args=args)
self.add_qSupported_packets(["multiprocess+",
"{}-events+".format(variant)])
ret = self.expect_gdbremote_sequence()
self.assertIn("{}-events+".format(variant), ret["qSupported_response"])
self.reset_test_sequence()
# continue and expect fork
if nonstop:
self.test_sequence.add_log_lines([
"read packet: $QNonStop:1#00",
"send packet: $OK#00",
"read packet: $c#00",
"send packet: $OK#00",
{"direction": "send",
"regex": self.fork_regex_nonstop.format(variant),
"capture": self.fork_capture},
"read packet: $vStopped#00",
"send packet: $OK#00",
], True)
else:
self.test_sequence.add_log_lines([
"read packet: $c#00",
{"direction": "send", "regex": self.fork_regex.format(variant),
"capture": self.fork_capture},
], True)
ret = self.expect_gdbremote_sequence()
self.reset_test_sequence()
return tuple(ret[x] for x in ("parent_pid", "parent_tid",
"child_pid", "child_tid"))
def fork_and_detach_test(self, variant, nonstop=False):
parent_pid, parent_tid, child_pid, child_tid = (
self.start_fork_test([variant], variant, nonstop=nonstop))
# detach the forked child
self.test_sequence.add_log_lines([
"read packet: $D;{}#00".format(child_pid),
"send packet: $OK#00",
# verify that the current process is correct
"read packet: $qC#00",
"send packet: $QCp{}.{}#00".format(parent_pid, parent_tid),
# verify that the correct processes are detached/available
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $Eff#00",
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: $OK#00",
], True)
self.expect_gdbremote_sequence()
self.reset_test_sequence()
return parent_pid, parent_tid
def fork_and_follow_test(self, variant, nonstop=False):
parent_pid, parent_tid, child_pid, child_tid = (
self.start_fork_test([variant], variant, nonstop=nonstop))
# switch to the forked child
self.test_sequence.add_log_lines([
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $OK#00",
"read packet: $Hcp{}.{}#00".format(child_pid, child_tid),
"send packet: $OK#00",
# detach the parent
"read packet: $D;{}#00".format(parent_pid),
"send packet: $OK#00",
# verify that the correct processes are detached/available
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: $Eff#00",
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $OK#00",
# then resume the child
"read packet: $c#00",
], True)
if nonstop:
self.test_sequence.add_log_lines([
"send packet: $OK#00",
"send packet: %Stop:W00;process:{}#00".format(child_pid),
"read packet: $vStopped#00",
"send packet: $OK#00",
], True)
else:
self.test_sequence.add_log_lines([
"send packet: $W00;process:{}#00".format(child_pid),
], True)
self.expect_gdbremote_sequence()
def detach_all_test(self, nonstop=False):
parent_pid, parent_tid, child_pid, child_tid = (
self.start_fork_test(["fork"], nonstop=nonstop))
self.test_sequence.add_log_lines([
# double-check our PIDs
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: $OK#00",
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $OK#00",
# detach all processes
"read packet: $D#00",
"send packet: $OK#00",
# verify that both PIDs are invalid now
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: $Eff#00",
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $Eff#00",
], True)
self.expect_gdbremote_sequence()
def vkill_test(self, kill_parent=False, kill_child=False, nonstop=False):
assert kill_parent or kill_child
parent_pid, parent_tid, child_pid, child_tid = (
self.start_fork_test(["fork"], nonstop=nonstop))
if kill_parent:
self.test_sequence.add_log_lines([
# kill the process
"read packet: $vKill;{}#00".format(parent_pid),
"send packet: $OK#00",
], True)
if kill_child:
self.test_sequence.add_log_lines([
# kill the process
"read packet: $vKill;{}#00".format(child_pid),
"send packet: $OK#00",
], True)
self.test_sequence.add_log_lines([
# check child PID/TID
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: ${}#00".format("Eff" if kill_child else "OK"),
# check parent PID/TID
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: ${}#00".format("Eff" if kill_parent else "OK"),
], True)
self.expect_gdbremote_sequence()
def resume_one_test(self, run_order, use_vCont=False, nonstop=False):
parent_pid, parent_tid, child_pid, child_tid = (
self.start_fork_test(["fork", "trap"], nonstop=nonstop))
parent_expect = [
"T05thread:p{}.{};.*".format(parent_pid, parent_tid),
"W00;process:{}#.*".format(parent_pid),
]
child_expect = [
"T05thread:p{}.{};.*".format(child_pid, child_tid),
"W00;process:{}#.*".format(child_pid),
]
for x in run_order:
if x == "parent":
pidtid = (parent_pid, parent_tid)
expect = parent_expect.pop(0)
elif x == "child":
pidtid = (child_pid, child_tid)
expect = child_expect.pop(0)
else:
assert False, "unexpected x={}".format(x)
if use_vCont:
self.test_sequence.add_log_lines([
# continue the selected process
"read packet: $vCont;c:p{}.{}#00".format(*pidtid),
], True)
else:
self.test_sequence.add_log_lines([
# continue the selected process
"read packet: $Hcp{}.{}#00".format(*pidtid),
"send packet: $OK#00",
"read packet: $c#00",
], True)
if nonstop:
self.test_sequence.add_log_lines([
"send packet: $OK#00",
{"direction": "send", "regex": "%Stop:" + expect},
"read packet: $vStopped#00",
"send packet: $OK#00",
], True)
else:
self.test_sequence.add_log_lines([
{"direction": "send", "regex": "[$]" + expect},
], True)
# if at least one process remained, check both PIDs
if parent_expect or child_expect:
self.test_sequence.add_log_lines([
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: ${}#00".format("OK" if parent_expect else "Eff"),
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: ${}#00".format("OK" if child_expect else "Eff"),
], True)
self.expect_gdbremote_sequence()

View File

@ -1,53 +1,12 @@
import random import random
import gdbremote_testcase
from lldbsuite.test.decorators import * from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import * from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil
class TestGdbRemoteFork(gdbremote_testcase.GdbRemoteTestCaseBase): from fork_testbase import GdbRemoteForkTestBase
fork_regex = ("[$]T05thread:p([0-9a-f]+)[.]([0-9a-f]+);.*"
"{}:p([0-9a-f]+)[.]([0-9a-f]+).*")
fork_regex_nonstop = ("%Stop:T05thread:p([0-9a-f]+)[.]([0-9a-f]+);.*"
"{}:p([0-9a-f]+)[.]([0-9a-f]+).*")
fork_capture = {1: "parent_pid", 2: "parent_tid",
3: "child_pid", 4: "child_tid"}
def start_fork_test(self, args, variant="fork", nonstop=False):
self.build()
self.prep_debug_monitor_and_inferior(inferior_args=args)
self.add_qSupported_packets(["multiprocess+",
"{}-events+".format(variant)])
ret = self.expect_gdbremote_sequence()
self.assertIn("{}-events+".format(variant), ret["qSupported_response"])
self.reset_test_sequence()
# continue and expect fork
if nonstop:
self.test_sequence.add_log_lines([
"read packet: $QNonStop:1#00",
"send packet: $OK#00",
"read packet: $c#00",
"send packet: $OK#00",
{"direction": "send",
"regex": self.fork_regex_nonstop.format(variant),
"capture": self.fork_capture},
"read packet: $vStopped#00",
"send packet: $OK#00",
], True)
else:
self.test_sequence.add_log_lines([
"read packet: $c#00",
{"direction": "send", "regex": self.fork_regex.format(variant),
"capture": self.fork_capture},
], True)
ret = self.expect_gdbremote_sequence()
self.reset_test_sequence()
return tuple(ret[x] for x in ("parent_pid", "parent_tid",
"child_pid", "child_tid"))
class TestGdbRemoteFork(GdbRemoteForkTestBase):
@add_test_categories(["fork"]) @add_test_categories(["fork"])
def test_fork_multithreaded(self): def test_fork_multithreaded(self):
_, _, child_pid, _ = self.start_fork_test(["thread:new"]*2 + ["fork"]) _, _, child_pid, _ = self.start_fork_test(["thread:new"]*2 + ["fork"])
@ -60,27 +19,6 @@ class TestGdbRemoteFork(gdbremote_testcase.GdbRemoteTestCaseBase):
], True) ], True)
self.expect_gdbremote_sequence() self.expect_gdbremote_sequence()
def fork_and_detach_test(self, variant, nonstop=False):
parent_pid, parent_tid, child_pid, child_tid = (
self.start_fork_test([variant], variant, nonstop=nonstop))
# detach the forked child
self.test_sequence.add_log_lines([
"read packet: $D;{}#00".format(child_pid),
"send packet: $OK#00",
# verify that the current process is correct
"read packet: $qC#00",
"send packet: $QCp{}.{}#00".format(parent_pid, parent_tid),
# verify that the correct processes are detached/available
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $Eff#00",
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: $OK#00",
], True)
self.expect_gdbremote_sequence()
self.reset_test_sequence()
return parent_pid, parent_tid
@add_test_categories(["fork"]) @add_test_categories(["fork"])
def test_fork(self): def test_fork(self):
parent_pid, _ = self.fork_and_detach_test("fork") parent_pid, _ = self.fork_and_detach_test("fork")
@ -92,20 +30,6 @@ class TestGdbRemoteFork(gdbremote_testcase.GdbRemoteTestCaseBase):
], True) ], True)
self.expect_gdbremote_sequence() self.expect_gdbremote_sequence()
@add_test_categories(["fork"])
def test_fork_nonstop(self):
parent_pid, _ = self.fork_and_detach_test("fork", nonstop=True)
# resume the parent
self.test_sequence.add_log_lines([
"read packet: $c#00",
"send packet: $OK#00",
"send packet: %Stop:W00;process:{}#00".format(parent_pid),
"read packet: $vStopped#00",
"send packet: $OK#00",
], True)
self.expect_gdbremote_sequence()
@add_test_categories(["fork"]) @add_test_categories(["fork"])
def test_vfork(self): def test_vfork(self):
parent_pid, parent_tid = self.fork_and_detach_test("vfork") parent_pid, parent_tid = self.fork_and_detach_test("vfork")
@ -122,80 +46,14 @@ class TestGdbRemoteFork(gdbremote_testcase.GdbRemoteTestCaseBase):
], True) ], True)
self.expect_gdbremote_sequence() self.expect_gdbremote_sequence()
@add_test_categories(["fork"])
def test_vfork_nonstop(self):
parent_pid, parent_tid = self.fork_and_detach_test("vfork",
nonstop=True)
# resume the parent
self.test_sequence.add_log_lines([
"read packet: $c#00",
"send packet: $OK#00",
{"direction": "send",
"regex": r"%Stop:T05thread:p{}[.]{}.*vforkdone.*".format(
parent_pid, parent_tid),
},
"read packet: $vStopped#00",
"send packet: $OK#00",
"read packet: $c#00",
"send packet: $OK#00",
"send packet: %Stop:W00;process:{}#00".format(parent_pid),
"read packet: $vStopped#00",
"send packet: $OK#00",
], True)
self.expect_gdbremote_sequence()
def fork_and_follow_test(self, variant, nonstop=False):
parent_pid, parent_tid, child_pid, child_tid = (
self.start_fork_test([variant], variant, nonstop=nonstop))
# switch to the forked child
self.test_sequence.add_log_lines([
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $OK#00",
"read packet: $Hcp{}.{}#00".format(child_pid, child_tid),
"send packet: $OK#00",
# detach the parent
"read packet: $D;{}#00".format(parent_pid),
"send packet: $OK#00",
# verify that the correct processes are detached/available
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: $Eff#00",
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $OK#00",
# then resume the child
"read packet: $c#00",
], True)
if nonstop:
self.test_sequence.add_log_lines([
"send packet: $OK#00",
"send packet: %Stop:W00;process:{}#00".format(child_pid),
"read packet: $vStopped#00",
"send packet: $OK#00",
], True)
else:
self.test_sequence.add_log_lines([
"send packet: $W00;process:{}#00".format(child_pid),
], True)
self.expect_gdbremote_sequence()
@add_test_categories(["fork"]) @add_test_categories(["fork"])
def test_fork_follow(self): def test_fork_follow(self):
self.fork_and_follow_test("fork") self.fork_and_follow_test("fork")
@add_test_categories(["fork"])
def test_fork_follow_nonstop(self):
self.fork_and_follow_test("fork", nonstop=True)
@add_test_categories(["fork"]) @add_test_categories(["fork"])
def test_vfork_follow(self): def test_vfork_follow(self):
self.fork_and_follow_test("vfork") self.fork_and_follow_test("vfork")
@add_test_categories(["fork"])
def test_vfork_follow_nonstop(self):
self.fork_and_follow_test("vfork", nonstop=True)
@add_test_categories(["fork"]) @add_test_categories(["fork"])
def test_select_wrong_pid(self): def test_select_wrong_pid(self):
self.build() self.build()
@ -262,35 +120,10 @@ class TestGdbRemoteFork(gdbremote_testcase.GdbRemoteTestCaseBase):
], True) ], True)
self.expect_gdbremote_sequence() self.expect_gdbremote_sequence()
def detach_all_test(self, nonstop=False):
parent_pid, parent_tid, child_pid, child_tid = (
self.start_fork_test(["fork"], nonstop=nonstop))
self.test_sequence.add_log_lines([
# double-check our PIDs
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: $OK#00",
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $OK#00",
# detach all processes
"read packet: $D#00",
"send packet: $OK#00",
# verify that both PIDs are invalid now
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: $Eff#00",
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $Eff#00",
], True)
self.expect_gdbremote_sequence()
@add_test_categories(["fork"]) @add_test_categories(["fork"])
def test_detach_all(self): def test_detach_all(self):
self.detach_all_test() self.detach_all_test()
@add_test_categories(["fork"])
def test_detach_all_nonstop(self):
self.detach_all_test(nonstop=True)
@add_test_categories(["fork"]) @add_test_categories(["fork"])
def test_kill_all(self): def test_kill_all(self):
parent_pid, _, child_pid, _ = self.start_fork_test(["fork"]) parent_pid, _, child_pid, _ = self.start_fork_test(["fork"])
@ -308,74 +141,6 @@ class TestGdbRemoteFork(gdbremote_testcase.GdbRemoteTestCaseBase):
self.assertEqual(set([ret["pid1"], ret["pid2"]]), self.assertEqual(set([ret["pid1"], ret["pid2"]]),
set([parent_pid, child_pid])) set([parent_pid, child_pid]))
@add_test_categories(["fork"])
def test_kill_all_nonstop(self):
parent_pid, _, child_pid, _ = self.start_fork_test(["fork"],
nonstop=True)
exit_regex = "X09;process:([0-9a-f]+)"
# Depending on a potential race, the second kill may make it into
# the async queue before we issue vStopped or after. In the former
# case, we should expect the exit status in reply to vStopped.
# In the latter, we should expect an OK response (queue empty),
# followed by another async notification.
vstop_regex = "[$](OK|{})#.*".format(exit_regex)
self.test_sequence.add_log_lines([
# kill all processes
"read packet: $k#00",
"send packet: $OK#00",
{"direction": "send", "regex": "%Stop:{}#.*".format(exit_regex),
"capture": {1: "pid1"}},
"read packet: $vStopped#00",
{"direction": "send", "regex": vstop_regex,
"capture": {1: "vstop_reply", 2: "pid2"}},
], True)
ret = self.expect_gdbremote_sequence()
pid1 = ret["pid1"]
if ret["vstop_reply"] == "OK":
self.reset_test_sequence()
self.test_sequence.add_log_lines([
{"direction": "send", "regex": "%Stop:{}#.*".format(exit_regex),
"capture": {1: "pid2"}},
], True)
ret = self.expect_gdbremote_sequence()
pid2 = ret["pid2"]
self.reset_test_sequence()
self.test_sequence.add_log_lines([
"read packet: $vStopped#00",
"send packet: $OK#00",
], True)
self.expect_gdbremote_sequence()
self.assertEqual(set([ret["pid1"], ret["pid2"]]),
set([parent_pid, child_pid]))
def vkill_test(self, kill_parent=False, kill_child=False, nonstop=False):
assert kill_parent or kill_child
parent_pid, parent_tid, child_pid, child_tid = (
self.start_fork_test(["fork"], nonstop=nonstop))
if kill_parent:
self.test_sequence.add_log_lines([
# kill the process
"read packet: $vKill;{}#00".format(parent_pid),
"send packet: $OK#00",
], True)
if kill_child:
self.test_sequence.add_log_lines([
# kill the process
"read packet: $vKill;{}#00".format(child_pid),
"send packet: $OK#00",
], True)
self.test_sequence.add_log_lines([
# check child PID/TID
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: ${}#00".format("Eff" if kill_child else "OK"),
# check parent PID/TID
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: ${}#00".format("Eff" if kill_parent else "OK"),
], True)
self.expect_gdbremote_sequence()
@add_test_categories(["fork"]) @add_test_categories(["fork"])
def test_vkill_child(self): def test_vkill_child(self):
self.vkill_test(kill_child=True) self.vkill_test(kill_child=True)
@ -388,66 +153,6 @@ class TestGdbRemoteFork(gdbremote_testcase.GdbRemoteTestCaseBase):
def test_vkill_both(self): def test_vkill_both(self):
self.vkill_test(kill_parent=True, kill_child=True) self.vkill_test(kill_parent=True, kill_child=True)
@add_test_categories(["fork"])
def test_vkill_both_nonstop(self):
self.vkill_test(kill_parent=True, kill_child=True, nonstop=True)
def resume_one_test(self, run_order, use_vCont=False, nonstop=False):
parent_pid, parent_tid, child_pid, child_tid = (
self.start_fork_test(["fork", "trap"], nonstop=nonstop))
parent_expect = [
"T05thread:p{}.{};.*".format(parent_pid, parent_tid),
"W00;process:{}#.*".format(parent_pid),
]
child_expect = [
"T05thread:p{}.{};.*".format(child_pid, child_tid),
"W00;process:{}#.*".format(child_pid),
]
for x in run_order:
if x == "parent":
pidtid = (parent_pid, parent_tid)
expect = parent_expect.pop(0)
elif x == "child":
pidtid = (child_pid, child_tid)
expect = child_expect.pop(0)
else:
assert False, "unexpected x={}".format(x)
if use_vCont:
self.test_sequence.add_log_lines([
# continue the selected process
"read packet: $vCont;c:p{}.{}#00".format(*pidtid),
], True)
else:
self.test_sequence.add_log_lines([
# continue the selected process
"read packet: $Hcp{}.{}#00".format(*pidtid),
"send packet: $OK#00",
"read packet: $c#00",
], True)
if nonstop:
self.test_sequence.add_log_lines([
"send packet: $OK#00",
{"direction": "send", "regex": "%Stop:" + expect},
"read packet: $vStopped#00",
"send packet: $OK#00",
], True)
else:
self.test_sequence.add_log_lines([
{"direction": "send", "regex": "[$]" + expect},
], True)
# if at least one process remained, check both PIDs
if parent_expect or child_expect:
self.test_sequence.add_log_lines([
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: ${}#00".format("OK" if parent_expect else "Eff"),
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: ${}#00".format("OK" if child_expect else "Eff"),
], True)
self.expect_gdbremote_sequence()
@expectedFailureAll(archs=["arm"]) # TODO @expectedFailureAll(archs=["arm"]) # TODO
@expectedFailureAll(archs=["aarch64"], @expectedFailureAll(archs=["aarch64"],
bugnumber="https://github.com/llvm/llvm-project/issues/56268") bugnumber="https://github.com/llvm/llvm-project/issues/56268")
@ -483,14 +188,6 @@ class TestGdbRemoteFork(gdbremote_testcase.GdbRemoteTestCaseBase):
def test_c_interspersed(self): def test_c_interspersed(self):
self.resume_one_test(run_order=["parent", "child", "parent", "child"]) self.resume_one_test(run_order=["parent", "child", "parent", "child"])
@expectedFailureAll(archs=["arm"]) # TODO
@expectedFailureAll(archs=["aarch64"],
bugnumber="https://github.com/llvm/llvm-project/issues/56268")
@add_test_categories(["fork"])
def test_c_interspersed_nonstop(self):
self.resume_one_test(run_order=["parent", "child", "parent", "child"],
nonstop=True)
@expectedFailureAll(archs=["arm"]) # TODO @expectedFailureAll(archs=["arm"]) # TODO
@expectedFailureAll(archs=["aarch64"], @expectedFailureAll(archs=["aarch64"],
bugnumber="https://github.com/llvm/llvm-project/issues/56268") bugnumber="https://github.com/llvm/llvm-project/issues/56268")
@ -529,14 +226,6 @@ class TestGdbRemoteFork(gdbremote_testcase.GdbRemoteTestCaseBase):
self.resume_one_test(run_order=["parent", "child", "parent", "child"], self.resume_one_test(run_order=["parent", "child", "parent", "child"],
use_vCont=True) use_vCont=True)
@expectedFailureAll(archs=["arm"]) # TODO
@expectedFailureAll(archs=["aarch64"],
bugnumber="https://github.com/llvm/llvm-project/issues/56268")
@add_test_categories(["fork"])
def test_vCont_interspersed_nonstop(self):
self.resume_one_test(run_order=["parent", "child", "parent", "child"],
use_vCont=True, nonstop=True)
@add_test_categories(["fork"]) @add_test_categories(["fork"])
def test_vCont_two_processes(self): def test_vCont_two_processes(self):
parent_pid, parent_tid, child_pid, child_tid = ( parent_pid, parent_tid, child_pid, child_tid = (

View File

@ -0,0 +1,116 @@
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from fork_testbase import GdbRemoteForkTestBase
class TestGdbRemoteForkNonStop(GdbRemoteForkTestBase):
@add_test_categories(["fork"])
def test_vfork_nonstop(self):
parent_pid, parent_tid = self.fork_and_detach_test("vfork",
nonstop=True)
# resume the parent
self.test_sequence.add_log_lines([
"read packet: $c#00",
"send packet: $OK#00",
{"direction": "send",
"regex": r"%Stop:T05thread:p{}[.]{}.*vforkdone.*".format(
parent_pid, parent_tid),
},
"read packet: $vStopped#00",
"send packet: $OK#00",
"read packet: $c#00",
"send packet: $OK#00",
"send packet: %Stop:W00;process:{}#00".format(parent_pid),
"read packet: $vStopped#00",
"send packet: $OK#00",
], True)
self.expect_gdbremote_sequence()
@add_test_categories(["fork"])
def test_fork_nonstop(self):
parent_pid, _ = self.fork_and_detach_test("fork", nonstop=True)
# resume the parent
self.test_sequence.add_log_lines([
"read packet: $c#00",
"send packet: $OK#00",
"send packet: %Stop:W00;process:{}#00".format(parent_pid),
"read packet: $vStopped#00",
"send packet: $OK#00",
], True)
self.expect_gdbremote_sequence()
@add_test_categories(["fork"])
def test_fork_follow_nonstop(self):
self.fork_and_follow_test("fork", nonstop=True)
@add_test_categories(["fork"])
def test_vfork_follow_nonstop(self):
self.fork_and_follow_test("vfork", nonstop=True)
@add_test_categories(["fork"])
def test_detach_all_nonstop(self):
self.detach_all_test(nonstop=True)
@add_test_categories(["fork"])
def test_kill_all_nonstop(self):
parent_pid, _, child_pid, _ = self.start_fork_test(["fork"],
nonstop=True)
exit_regex = "X09;process:([0-9a-f]+)"
# Depending on a potential race, the second kill may make it into
# the async queue before we issue vStopped or after. In the former
# case, we should expect the exit status in reply to vStopped.
# In the latter, we should expect an OK response (queue empty),
# followed by another async notification.
vstop_regex = "[$](OK|{})#.*".format(exit_regex)
self.test_sequence.add_log_lines([
# kill all processes
"read packet: $k#00",
"send packet: $OK#00",
{"direction": "send", "regex": "%Stop:{}#.*".format(exit_regex),
"capture": {1: "pid1"}},
"read packet: $vStopped#00",
{"direction": "send", "regex": vstop_regex,
"capture": {1: "vstop_reply", 2: "pid2"}},
], True)
ret = self.expect_gdbremote_sequence()
pid1 = ret["pid1"]
if ret["vstop_reply"] == "OK":
self.reset_test_sequence()
self.test_sequence.add_log_lines([
{"direction": "send", "regex": "%Stop:{}#.*".format(exit_regex),
"capture": {1: "pid2"}},
], True)
ret = self.expect_gdbremote_sequence()
pid2 = ret["pid2"]
self.reset_test_sequence()
self.test_sequence.add_log_lines([
"read packet: $vStopped#00",
"send packet: $OK#00",
], True)
self.expect_gdbremote_sequence()
self.assertEqual(set([ret["pid1"], ret["pid2"]]),
set([parent_pid, child_pid]))
@add_test_categories(["fork"])
def test_vkill_both_nonstop(self):
self.vkill_test(kill_parent=True, kill_child=True, nonstop=True)
@expectedFailureAll(archs=["arm"]) # TODO
@expectedFailureAll(archs=["aarch64"],
bugnumber="https://github.com/llvm/llvm-project/issues/56268")
@add_test_categories(["fork"])
def test_c_interspersed_nonstop(self):
self.resume_one_test(run_order=["parent", "child", "parent", "child"],
nonstop=True)
@expectedFailureAll(archs=["arm"]) # TODO
@expectedFailureAll(archs=["aarch64"],
bugnumber="https://github.com/llvm/llvm-project/issues/56268")
@add_test_categories(["fork"])
def test_vCont_interspersed_nonstop(self):
self.resume_one_test(run_order=["parent", "child", "parent", "child"],
use_vCont=True, nonstop=True)