DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
send-restcmd.py
Go to the documentation of this file.
1#!/usr/bin/env python3
2
3import argparse
4import requests
5import json
6import time
7import sys
8
9parser = argparse.ArgumentParser(description='POST command object from file to commanded endpoint.')
10parser.add_argument('--host', type=str, default='localhost', help='target host/endpoint')
11parser.add_argument('-p', '--port', type=int, default=12345, help='target port')
12parser.add_argument('-a', '--answer-port', type=int, default=12333, help='answer to service listening on this port')
13parser.add_argument('-r', '--route', type=str, default='command', help='target route on endpoint')
14parser.add_argument('-f', '--file', type=str, required=True, help='file that contains command to be posted') # This should be an argument
15parser.add_argument('-w', '--wait', type=int, default=2, help='seconds to wait between sending commands')
16parser.add_argument('-i', '--interactive', dest='interactive', action='store_true', help='interactive mode')
17parser.add_argument('--non-interactive', dest='interactive', action='store_false')
18parser.set_defaults(interactive=False)
19
20args = parser.parse_args()
21
22url = 'http://'+args.host+':'+str(args.port)+'/'+args.route
23print('Target url: ' + url)
24headers = {'content-type': 'application/json', 'X-Answer-Port': str(args.answer_port)}
25
26cmdstr = None
27try:
28 with open(args.file) as f:
29 cmdstr = json.load(f)
30except:
31 print(f"\nERROR: failed to open file '{str(args.file)}'.")
32 raise SystemExit(0)
33
34if isinstance(cmdstr, dict):
35 print('This is single command.')
36 try:
37 response = requests.post(url, data=json.dumps(cmdstr), headers=headers)
38 print('Response code: %s with content: %s' % (str(response), str(response.content)))
39 except:
40 print('Failed to send due to: %s' % sys.exc_info()[0])
41elif isinstance(cmdstr, list):
42 print('This is a list of commands.')
43 avacmds = [cdict['id'] for cdict in cmdstr if cdict["id"]]
44 if not args.interactive:
45 for cmd in cmdstr:
46 try:
47 response = requests.post(url, data=json.dumps(cmd), headers=headers)
48 print('Response code: %s with content: %s' % (str(response), str(response.content)))
49 time.sleep(args.wait)
50 except:
51 print('Failed to send due to: %s' % sys.exc_info()[0])
52 else:
53 print('Interactive mode. Type the ID of the next command to send, or type \'end\' to finish.')
54 while True:
55 try:
56 print('\nAvailable commands: %s' % avacmds)
57 nextcmd = input('Press enter a command to send next: ')
58 if nextcmd == "end":
59 break
60 cmdobj = [cdict for cdict in cmdstr if cdict["id"] == nextcmd]
61 if not cmdobj:
62 print('Unrecognized command %s. (Not present in the command list?)' % nextcmd)
63 else:
64 print('Sending %s command.' % nextcmd)
65 try:
66 response = requests.post(url, data=json.dumps(cmdobj[0]), headers=headers)
67 print('Response code: %s with content: %s' % (str(response), str(response.content)))
68 except:
69 print('Failed to send due to: %s' % sys.exc_info()[0])
70 except KeyboardInterrupt as ki:
71 break
72 except EOFError as e:
73 break
74
75print('Exiting...')