16 from typing
import AsyncIterable
27 ADHOC_METHOD =
'/test/AdHoc'
31 return not bool(
set(tuple(expected)) -
set(tuple(actual)))
35 actual: Metadata) -> bool:
36 obtained = actual[expected_key]
37 return obtained == expected_value
42 state = channel.get_state()
43 while state != expected_state:
44 await channel.wait_for_state_change(state)
45 state = channel.get_state()
49 first_callback_ran = asyncio.Event()
51 def first_callback(call):
55 first_callback_ran.set()
57 second_callback_ran = asyncio.Event()
59 def second_callback(call):
63 second_callback_ran.set()
65 call.add_done_callback(first_callback)
66 call.add_done_callback(second_callback)
68 async
def validation():
69 await asyncio.wait_for(
70 asyncio.gather(first_callback_ran.wait(),
71 second_callback_ran.wait()),
72 test_constants.SHORT_TIMEOUT)
108 """A generic handler to plugin testing server methods on the fly."""
118 if handler_call_details.method == ADHOC_METHOD: