1. Sanity check MD-CLI show command#
Activity name | Sanity check MD-CLI show command |
Activity ID | 22 |
Short Description | Create a single show command for operators to quickly see personalized health data for a node |
Difficulty | Intermediate |
Tools used | SR OS Model-Driven CLI (MD-CLI) pySROS Python programming language |
Topology Nodes | PE1 PE2 PE3 PE4 |
References | MD-CLI user guide SR OS System management guide pySROS user documentation pySROS GitHub MD-CLI and YANG path finder |
Engineers that work on network issues often need a quick way of viewing the health of a node. An engineer will have to remember and run multiple show commands to look at different health metrics and the company will be reliant on each engineer to know what to look for, how to look for it and how to interpret it.
1.1 Objective#
Create a single show command that runs a sanity check so that engineers can use a single command to view all the health metrics that are important to this node. If any of the checks fail provide an output to show what fails. Be as creative as you like here and select outputs that you think are important. As a minimum have your show command check the status of:
- The Card status
- The IS-IS adjacencies
- The BGP peerings
- The CPU utilization
- The Memory utilization
Example output with all tests passing
(gl)[/]
A:admin@g15-pe1# show self_test
===============================================================================
Self Test Results
===============================================================================
Test Result
-------------------------------------------------------------------------------
CARD STATUS PASS
ISIS STATUS PASS
BGP STATUS PASS
BUSIEST CPU CORE OVER 5MIN 4%
MEMORY UTILIZATION 53%
===============================================================================
Example output with a failed test
(gl)[/]
A:admin@g15-pe1# show self_test
===============================================================================
Self Test Results
===============================================================================
Test Result
-------------------------------------------------------------------------------
CARD STATUS PASS
ISIS STATUS PASS
BGP STATUS FAIL
BUSIEST CPU CORE OVER 5MIN 5%
MEMORY UTILIZATION 53%
===============================================================================
===============================================================================
Details Of Failed Tests
===============================================================================
Failures
-------------------------------------------------------------------------------
The BGP peer 10.64.51.2 is not Established
===============================================================================
Optionally also include the following:
- The MDA status
- The MPLS tunnel status
- The Service status
- Basic ping tests
- CF3 utilization
- Fan speed
Example output with all tests passing
(gl)[/]
A:admin@g15-pe1# show self_test
===============================================================================
Self Test Results
===============================================================================
Test Result
-------------------------------------------------------------------------------
CARD STATUS PASS
MDA STATUS PASS
ISIS STATUS PASS
MPLS STATUS PASS
BGP STATUS PASS
SERVICE STATUS PASS
PING TESTS PASS
BUSIEST CPU CORE OVER 5MIN 5%
MEMORY UTILIZATION 53%
CF3 UTILIZATION 39%
FAN TRAY 1 SPEED 0%
FAN TRAY 2 SPEED 0%
FAN TRAY 3 SPEED 0%
FAN TRAY 4 SPEED 0%
===============================================================================
Example with a failed test
(gl)[/]
A:admin@g15-pe1# show self_test
===============================================================================
Self Test Results
===============================================================================
Test Result
-------------------------------------------------------------------------------
CARD STATUS PASS
MDA STATUS PASS
ISIS STATUS PASS
MPLS STATUS PASS
BGP STATUS FAIL
SERVICE STATUS FAIL
PING TESTS PASS
BUSIEST CPU CORE OVER 5MIN 5%
MEMORY UTILIZATION 53%
CF3 UTILIZATION 39%
FAN TRAY 1 SPEED 0%
FAN TRAY 2 SPEED 0%
FAN TRAY 3 SPEED 0%
FAN TRAY 4 SPEED 0%
===============================================================================
===============================================================================
Details Of Failed Tests
===============================================================================
Failures
-------------------------------------------------------------------------------
The BGP peer 10.64.54.0 is not Established
The IES service client01 is not up
===============================================================================
As a stretch goal to take it to the next level consider:
- Having an output that shows non-zero network QoS drops
- Having an output that shows non-zero port errors
- Either (or both) of the above with an optional time interval that collects two iterations of data so you can provide a delta between them
Example output without setting an iteration interval
(gl)[/]
A:admin@g15-pe1# show self_test
===============================================================================
Self Test Results
===============================================================================
Test Result
-------------------------------------------------------------------------------
CARD STATUS PASS
MDA STATUS PASS
ISIS STATUS PASS
BGP STATUS PASS
MPLS STATUS PASS
SERVICE STATUS PASS
PING TESTS FAIL
BUSIEST CPU CORE OVER 5MIN 6%
MEMORY UTILIZATION 53%
CF3 UTILIZATION 39%
FAN TRAY 1 SPEED 0%
FAN TRAY 2 SPEED 0%
FAN TRAY 3 SPEED 0%
FAN TRAY 4 SPEED 0%
===============================================================================
===============================================================================
Details Of Failed Tests
===============================================================================
Failures
-------------------------------------------------------------------------------
Ping to 10.46.15.22 saw packet loss
Ping to 10.46.15.23 saw packet loss
Ping to 10.46.15.24 saw packet loss
===============================================================================
===============================================================================
Any non-zero network QoS drops
===============================================================================
Port Q I/E Profile Drops
-------------------------------------------------------------------------------
1/1/c1/1 1 E in 37
===============================================================================
===============================================================================
Any non-zero port errors
===============================================================================
Port I/E Drops
-------------------------------------------------------------------------------
===============================================================================
Example output with a 30s iteration interval
(gl)[/]
A:admin@g15-pe1# show self_test 30
Collecting second iteration in 30 seconds. Please wait
===============================================================================
Self Test Results
===============================================================================
Test Result
-------------------------------------------------------------------------------
CARD STATUS PASS
MDA STATUS PASS
ISIS STATUS FAIL
BGP STATUS PASS
MPLS STATUS FAIL
SERVICE STATUS PASS
PING TESTS PASS
BUSIEST CPU CORE OVER 5MIN 6%
MEMORY UTILIZATION 53%
CF3 UTILIZATION 39%
FAN TRAY 1 SPEED 0%
FAN TRAY 2 SPEED 0%
FAN TRAY 3 SPEED 0%
FAN TRAY 4 SPEED 0%
===============================================================================
===============================================================================
Details Of Failed Tests
===============================================================================
Failures
-------------------------------------------------------------------------------
The ISIS adjacency on interface p1 is not up
The ISIS SR-LFA coverage is not 100%
===============================================================================
===============================================================================
Any incrementing network QoS drops over 30 seconds
===============================================================================
Port Q I/E Profile Drops
-------------------------------------------------------------------------------
1/1/c1/1 1 E in 4
===============================================================================
===============================================================================
Any incrementing port errors over 30 seconds
===============================================================================
Port I/E Drops
-------------------------------------------------------------------------------
===============================================================================
1.2 Technology explanation#
This activity primarily involves Python 3.4 coding because this is the version that MicroPython is largely based on. This, in turn, is the version of Python that is available within model-driven SR OS devices. Within this environment, several libraries have been made available to operators as we will see in the next sections.
1.2.1 Python3#
The Python version running on the nodes is 3.4 whereas your own machine may be using a different version. Consider this when writing your code that you specifically want to run on SR OS. For example, if you are used to using f-strings in your work you will find that they work on your machine but when you transfer your script to the node they will fail.
Documentation for the Python 3.4 language can be found here.
This example structures the Python code in a modular manner. Snippets of data are collected and processed within their own functions. Those functions are all then called in the main() function and the tables are printed after the session has been disconnected.
Interested to learn more about Python code structure?
# This is an example code layout
# Import the required modules
from pysros.management import connect
from pysros.pprint import Table
# Create functions to collect the data you are interest in
def data1(c)
# Collect and process your first data
def data2(c)
# Collect and process your second data
# ...etc
def main():
# Start the session
c = connect()
# Call all your functions here and collate them into data for the tables
data1(c)
data2(c)
# Disconnect the session
c.disconnect()
# Print the tables here with the data you have collected
# Call the main function
if __name__ == '__main__':
main()
1.2.2 Establishing a pySROS connection#
When developing a Python script using pySROS, a connection must always be established to the SR OS device. This is done using the connect
function, which can be used both locally, when executing the script on SR OS, or from a remote environment.
When connecting remotely, the pySROS connection is established through NETCONF, using SSH as transport. You have to provide, at least, the hostname and username to establish the connection. In such situations the SSH key located at ~/.ssh/id_rsa
would be used to log in and the default NETCONF port 830 would be used.
Host key verification
When running remotely and because we use the SSH protocol for transport, host key verification is a topic to keep track of. Host key verification is a security feature in SSH that ensures the server you are connecting to is the same server as the one you connected to previously. When connecting to an SSH server for the first time a client has the opportunity to save that server's public key. This public key can be verified upon later connection attempts to make sure the server still presents the same public key. This feature prevents man-in-the-middle attacks.
This functionality should be enabled in production environments. For ephemeral lab networks like the one we are using today it is less crucial, and requires some additional attention to set up properly as we would need to get and store each node's public key individually. This is why you will see that the hostkey_verify
parameter to the remote connect
calls has been set to False
.
When running locally on model-driven SR OS, all parameters passed to connect
are ignored, and the connection is established to the local node.
Example Python code to establish a remote pySROS connection
Example Python code to establish a local only pySROS connection
1.2.3 Finding state info with pySROS#
To find the data you are looking for, login to your node and enter the state tree by typing /state
at the CLI. Use the tree
command or question mark to navigate your way around. Type info
to see the data in that path. Once you have found the final path of data you are interested in issue the command pwc json-instance-path
whilst in that context to get a path string you can use in your Python code.
You can also use the MD-CLI and YANG path finder to find the information you are interested in.
Example MD-CLI output
Tip
When a path contains double quotes like this one, if you want to use it in your python code you will need to use single quotes to declare it as a variable
1.2.4 pySROS get_list_keys#
The get_list_keys method added in pySROS version 22 will be used in these examples to get the lists of objects like interfaces, BGP peers and ports. This then allows you to collect a specific piece of information from each of these entities. Without using this method the alternative would be to collect all data from all the entities. As an example if you did this on ports, each port has thousands of data points associated with it, if your device has lots of ports you may find that your python script fails as it runs out of memory.
Example get_list_keys
script
1.2.5 pySROS table output#
pySROS contains a module that can help you print tables that are in the style of a standard SR OS show command. Please familiarize yourself with this module here pysros.pprint.Table documentation.
Tip
Make all your column widths add up to 79
Example pprint.Table
script
from pysros.management import connect
from pysros.pprint import Table
def print_table(rows, cols, title):
"""
This function uses the Table class from the pysros.pprint module to print a table
in the style of the Nokia SROS CLI.
It takes a list of lists as the rows.
It takes a list of tuples which are the column names and their width.
It takes a title string which is printed out at the top of the table.
"""
# Initialize the Table object with the heading and columns.
table = Table(title, cols)
table.print(rows) # Print the table
c = connect()
# Just an example of how to collate the data, not real data.
list_of_lists = [
["port1", "UP"],
["port2", "UP"],
["port3", "UP"],
["port4", "DOWN"],
["port5", "UP"]
]
c.disconnect()
print_table(list_of_lists, [(50, "Ports"), (29, "Status")], "Port Status")
[/]
A:admin@g15-pe1# pyexec table_test.py
===============================================================================
Port Status
===============================================================================
Ports Status
-------------------------------------------------------------------------------
port1 UP
port2 UP
port3 UP
port4 DOWN
port5 UP
===============================================================================
1.2.6 MD-CLI alias command#
After you have created your python script you can execute it directly with the pyexec
command. That is fine for you as you are the one that has created the script and knows the name of it and where you stored it!
For the benefit of operators it would make sense to wrap your python file into a MD-CLI alias so that operators can use a simple command to execute it without having to know what the file is called and where it is.
Tip
When you first create your MD-CLI alias you must logout and log back in before you can view it working
Tip
If you have created your alias, but then you change the contents of your python file afterwards you must use the command perform python python-script reload script "self_test"
to get the python script to read the new python file. You can use the command show python python-script self_test source-in-use
to see the contents of the python file the python script has currently loaded.
Example before and after outputs
(gl)[/]
A:admin@g15-pe1# pyexec "cf3:\nos-sros-activity-22.py"
===============================================================================
Self Test Results
===============================================================================
Test Result
-------------------------------------------------------------------------------
CARD STATUS PASS
ISIS STATUS PASS
BGP STATUS PASS
BUSIEST CPU CORE OVER 5MIN 4%
MEMORY UTILIZATION 53%
===============================================================================
(gl)[/]
A:admin@g15-pe1# show self_test
===============================================================================
Self Test Results
===============================================================================
Test Result
-------------------------------------------------------------------------------
CARD STATUS PASS
ISIS STATUS PASS
BGP STATUS PASS
BUSIEST CPU CORE OVER 5MIN 4%
MEMORY UTILIZATION 53%
===============================================================================
1.3 Tasks#
You should read these tasks from top-to-bottom before beginning the activity.
It is tempting to skip ahead but tasks may require you to have completed previous tasks before tackling them.
1.3.1 Start your script file#
The first step in this activity is setting up the environment to develop your script. This can either be done with a remote pySROS connection, through the interactive Python interpreter, or by editing the script and executing it locally in SROS using pyexec
.
Refer to the Establishing a PySROS connection section for more help
Script location
You can develop the script under your VM Instance folder ~/clab-srexperts/pe1/tftpboot/
. This folder contents are accessible from PE1 node via tftp://172.31.255.29/
.
For instance, if you create a script in ~/clab-srexperts/pe1/tftpboot/hc.py
, you can run it on PE1 using pyexec tftp://172.31.255.29/hc.py
.
In a file editor of your choice create a python script with the code layout discussed in the technology section with the print_table function.
Start code (read only if you get stuck)
#!/usr/bin/env python3
# Import the required modules
from pysros.management import connect
from pysros.pprint import Table
# Our function from earlier that helps us print pretty show commands
def print_table(rows, cols, title):
"""
This function uses the Table class from the pysros.pprint module to print a table
in the style of the Nokia SROS CLI.
It takes a list of lists as the rows.
It takes a list of tuples which are the column names and their width.
It takes a title string which is printed out at the top of the table.
"""
# Initialize the Table object with the heading and columns.
table = Table(title, cols)
table.print(rows) # Print the table
# This is where we will place the functions we create to collect and process each set of data
def main():
# Start the session
c = connect()
# Call all your functions here and collate them into data for the tables
# Disconnect the session
c.disconnect()
# Print the tables here with the data you have collected
# Call the main function
if __name__ == '__main__':
main()
1.3.2 Get card status#
Stop and take time to think here
This first function you create will be similar to many of the others so it makes sense to take your time and think here how would you implement this function?
- You need a function that gathers the state of the card. Can you login to MD-CLI and go into the state tree and find a good state to gather information on?
- How will you implement a function that uses get_list_keys so we don't gather information we don't need?
- You want a function that will return a value that either passes all the tests or provides details of any failed tests. How will you do that?
Create a function that will check the operational state of all the provisioned cards in the chassis. Return a result that can signify whether all the cards are correctly operational. If a card isn't operational return a result that provides detail on what is wrong so it can be used in another table that highlights any failed tests.
Card status (read only if you get stuck)
def card(c):
"""
This function first uses the get_list_keys method to get a list of all the cards in the system.
It then iterates through the list of cards and checks the operational state of each card.
If the operational state is not 'in-service', it adds a failure string message within a list
to the result list. (list of lists). The function returns the result list, which will be empty
if all provisioned cards are operationally in-service.
"""
top_path = "/nokia-state:state/card"
cards = c.running.get_list_keys(top_path)
specific_path = '/nokia-state:state/card[slot-number="{}"]/hardware-data'
result = []
for card in cards:
# Check if the card is not operationally in-service
if c.running.get(specific_path.format(card))['oper-state'].data not in ["in-service", "empty"]:
# If it isn't then we add to the result list to signify a failure
result.append(["Card {} is not operationally in-service".format(card)])
return result
def main():
# Start the session
c = connect()
# Create two lists to hold the results table data and any failure table data
result_table_data = []
failure_table_data = []
# Card data
card_result = card(c)
if card_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["CARD STATUS", "FAIL"])
failure_table_data.extend(card_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["CARD STATUS", "PASS"])
# Disconnect the session
c.disconnect()
# Print the results table
print_table(result_table_data, [(60, "Test"), (19, "Result")], "Self Test Results")
# Print the failure table only if there are any failures
if failure_table_data:
print("\n")
print_table(failure_table_data, [(79, "Failures")], "Details Of Failed Tests")
You should be at the stage now where you have a working script (albeit with just one test at the moment). Put your script on CF3 of the node you are working on and execute your script with pyexec. If successful you should see a CLI output showing that the card test passes.
Full script at this point (read only if you get stuck)
#!/usr/bin/env python3
# Import the required modules
from pysros.management import connect
from pysros.pprint import Table
# Our function from earlier that helps us print pretty show commands
def print_table(rows, cols, title):
"""
This function uses the Table class from the pysros.pprint module to print a table
in the style of the Nokia SROS CLI.
It takes a list of lists as the rows.
It takes a list of tuples which are the column names and their width.
It takes a title string which is printed out at the top of the table.
"""
# Initialize the Table object with the heading and columns.
table = Table(title, cols)
table.print(rows) # Print the table
def card(c):
"""
This function first uses the get_list_keys method to get a list of all the cards in the system.
It then iterates through the list of cards and checks the operational state of each card.
If the operational state is not 'in-service', it adds a failure string message within a list
to the result list. (list of lists). The function returns the result list, which will be empty
if all provisioned cards are operationally in-service.
"""
top_path = "/nokia-state:state/card"
cards = c.running.get_list_keys(top_path)
specific_path = '/nokia-state:state/card[slot-number="{}"]/hardware-data'
result = []
for card in cards:
# Check if the card is not operationally in-service
if c.running.get(specific_path.format(card))['oper-state'].data not in ["in-service", "empty"]:
# If it isn't then we add to the result list to signify a failure
result.append(["Card {} is not operationally in-service".format(card)])
return result
def main():
# Start the session
c = connect()
# Create two lists to hold the results table data and any failure table data
result_table_data = []
failure_table_data = []
# Card data
card_result = card(c)
if card_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["CARD STATUS", "FAIL"])
failure_table_data.extend(card_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["CARD STATUS", "PASS"])
# Disconnect the session
c.disconnect()
# Print the results table
print_table(result_table_data, [(60, "Test"), (19, "Result")], "Self Test Results")
# Print the failure table only if there are any failures
if failure_table_data:
print("\n")
print_table(failure_table_data, [(79, "Failures")], "Details Of Failed Tests")
# Call the main function
if __name__ == '__main__':
main()
[/]
A:admin@g15-pe1# pyexec test1.py
===============================================================================
Self Test Results
===============================================================================
Test Result
-------------------------------------------------------------------------------
CARD STATUS PASS
===============================================================================
You could even shutdown card 1 and check that the test fails and shows some failure detail.
Shutdown card 1
Run your script and check the card is down
[/]
A:admin@g15-pe1# pyexec test1.py
===============================================================================
Self Test Results
===============================================================================
Test Result
-------------------------------------------------------------------------------
CARD STATUS FAIL
===============================================================================
===============================================================================
Details Of Failed Tests
===============================================================================
Failures
-------------------------------------------------------------------------------
Card 1 is not operationally in-service
===============================================================================
Warning
Don't forget to re-enable the card you shutdown.
1.3.3 Get IS-IS status#
Now go through the same process with the other data. Devise and create a function to gather IS-IS adjacency data
IS-IS status (read only if you are stuck)
def isis(c):
"""
This function first uses the get_list_keys method to get a list of all the ISIS interfaces.
It then iterates through the list of interfaces and checks the operational state of each
adjacency. If the operational state is not 'up', it adds a message to the result list.
The function returns the result list, which will be empty if all adjacencies are
operationally up.
"""
top_path = '/nokia-state:state/router[router-name="Base"]/isis[isis-instance="0"]/interface'
interfaces = c.running.get_list_keys(top_path)
specific_path = (
'/nokia-state:state/router[router-name="Base"]/isis[isis-instance="0"]'
'/interface[interface-name="{}"]/adjacency[adjacency-index="1"]'
)
result = []
for interface in interfaces:
if interface != "system": # Skip the system interface
# There could be a LookupError if there is no adjacency index at all
try:
# Check if the adjacency is not operationally up
if c.running.get(specific_path.format(interface))['oper-state'].data != "up":
# If it isn't then we add to the result list to signify a failure
result.append(["The ISIS adjacency on interface {} is not up".format(interface)])
except LookupError:
result.append(["The ISIS adjacency on interface {} is not up".format(interface)])
return result
Note
As we have already created the lists to collect the data for the tables in main()
and we have already called the print_table functions as well: That code is not repeated below. This is the only new code that needs to be added to main()
# ISIS data
isis_result = isis(c)
if isis_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["ISIS STATUS", "FAIL"])
failure_table_data.extend(isis_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["ISIS STATUS", "PASS"])
1.3.4 Get BGP status#
Now go through the same process for the BGP test. Devise and create a function to gather BGP peering data.
BGP status (read only if you are stuck)
def bgp(c):
"""
This function first uses the get_list_keys method to get a list of all the BGP peers.
It then iterates through the list of peers and checks the operational state of each peering.
If the operational state is not 'Established', it adds a message to the result list.
The function returns the result list, which will be empty if all BGP peerings are Established.
"""
top_path = '/nokia-state:state/router[router-name="Base"]/bgp/neighbor'
peerings = c.running.get_list_keys(top_path)
specific_path = (
'/nokia-state:state/router[router-name="Base"]/bgp/neighbor'
'[ip-address="{}"]/statistics'
)
result = []
for peer in peerings:
# Check if the peering is not Established
if c.running.get(specific_path.format(peer))['session-state'].data != "Established":
# If it isn't then we add to the result list to signify a failure
result.append(["The BGP peer {} is not Established".format(peer)])
return result
Note
As we have already created the lists to collect the data for the tables in main()
and we have already called the print_table functions as well: That code is not repeated below. This is the only new code that needs to be added to main()
# BGP data
bgp_result = bgp(c)
if bgp_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["BGP STATUS", "FAIL"])
failure_table_data.extend(bgp_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["BGP STATUS", "PASS"])
1.3.5 Get CPU utilization#
Stop and take time to think here
This function is a little different to the others because instead of displaying a PASS/FAIL to the user it displays an absolute value. You could - if you wish - decide you want to stay with the PASS/FAIL theme and come up with some thresholds for these values that would signify a PASS/FAIL in your view.
- How would you implement this function?
- Would you want to know the CPU sample-period over 1, 60 or 300 seconds?
- Is it right to focus on the
cpu-usage
state orcapacity-usage
state? Why?
Now go through the same process for the CPU data. Devise and create a function to gather CPU stats.
CPU status (read only if you are stuck)
def cpu(c):
"""
This function collects the cpu-usage of the businest CPU core over a 5 minute period.
The collected data is a string, so it is converted to a float and then to an int to show a whole number
"""
path = '/nokia-state:state/system/cpu[sample-period="300"]/summary/busiest-core-utilization'
result = c.running.get(path)['cpu-usage'].data
return int(float(result))
Note
As we have already created the lists to collect the data for the tables in main()
and we have already called the print_table functions as well: That code is not repeated below. This is the only new code that needs to be added to main()
1.3.6 Get memory utilization#
Stop and take time to think here
If you look at the state tree in MD-CLI you can see we don't get a utilization figure for memory. We just get 'current-total-size', 'total-in-use' and 'available-memory'. How would you calculate a utilization percentage from this?
Now go through the same process for the Memory data. Devise and create a function to gather memory stats and calculate the utilization as a percentage. You might need to use some maths in Python to come up with a percentage value!
Function code (read only if you are stuck)
def memory(c):
"""
This function collects the memory in use and available from the system
From this it calculates the memory utilization and formats it the same way as the CPU utilization
"""
path = "/nokia-state:state/system/memory-pools/summary"
result = c.running.get(path)
total_size = result['current-total-size'].data
available = result['available-memory'].data
total_mem = total_size + available
utilization = (total_size / total_mem) * 100
pretty_utilization = "{}%".format(int(utilization))
return pretty_utilization
Note
As we have already created the lists to collect the data for the tables in main()
and we have already called the print_table functions as well: That code is not repeated below. This is the only new code that needs to be added to main()
1.3.7 Create a MD-CLI alias#
Refer to section MD-CLI alias command as a reference
MD-CLI alias configuration (read only if you are stuck)
Note
Alter the url of the script below to reflect the location and name of the script you have made
/configure python python-script "self_test" admin-state enable
/configure python python-script "self_test" urls ["cf3:\nos-sros-activity-22.py"]
/configure python python-script "self_test" version python3
/configure system management-interface cli md-cli environment command-alias alias "self_test" admin-state enable
/configure system management-interface cli md-cli environment command-alias alias "self_test" description "show self_test"
/configure system management-interface cli md-cli environment command-alias alias "self_test" python-script "self_test"
/configure system { management-interface cli md-cli environment command-alias alias "self_test" mount-point "/show" }
(gl)[/]
A:admin@g15-pe1# show self_test
===============================================================================
Self Test Results
===============================================================================
Test Result
-------------------------------------------------------------------------------
CARD STATUS PASS
ISIS STATUS PASS
BGP STATUS PASS
BUSIEST CPU CORE OVER 5MIN 4%
MEMORY UTILIZATION 53%
===============================================================================
1.3.8 Initial set of tasks complete#
Consider testing your script at this stage if you haven't been doing it as you go along. It should be working but may just show all pass results. You could shutdown a BGP session or an IS-IS session and check your show command now recognizes the failure. Check the failure table appears to show what failed. If you shutdown card 1 you should now see lots of failures and multiple rows appear in the failure table.
Full script at this stage
#!/usr/bin/env python3
from pysros.management import connect
from pysros.pprint import Table
def print_table(rows, cols, title):
"""
This function uses the Table class from the pysros.pprint module to print a table
in the style of the Nokia SROS CLI.
It takes a list of lists as the rows.
It takes a list of tuples which are the column names and their width.
It takes a title string which is printed out at the top of the table.
"""
# Initialize the Table object with the heading and columns.
table = Table(title, cols)
table.print(rows) # Print the table
def card(c):
"""
This function first uses the get_list_keys method to get a list of all the cards in the system.
It then iterates through the list of cards and checks the operational state of each card.
If the operational state is not 'in-service', it adds a failure string message within a list
to the result list. (list of lists). The function returns the result list, which will be empty
if all provisioned cards are operationally in-service.
"""
top_path = "/nokia-state:state/card"
cards = c.running.get_list_keys(top_path)
specific_path = '/nokia-state:state/card[slot-number="{}"]/hardware-data'
result = []
for card in cards:
# Check if the card is not operationally in-service
if c.running.get(specific_path.format(card))['oper-state'].data not in ["in-service", "empty"]:
# If it isn't then we add to the result list to signify a failure
result.append(["Card {} is not operationally in-service".format(card)])
return result
def isis(c):
"""
This function first uses the get_list_keys method to get a list of all the ISIS interfaces.
It then iterates through the list of interfaces and checks the operational state of each
adjacency. If the operational state is not 'up', it adds a message to the result list.
The function returns the result list, which will be empty if all adjacencies are
operationally up.
"""
top_path = '/nokia-state:state/router[router-name="Base"]/isis[isis-instance="0"]/interface'
interfaces = c.running.get_list_keys(top_path)
specific_path = (
'/nokia-state:state/router[router-name="Base"]/isis[isis-instance="0"]'
'/interface[interface-name="{}"]/adjacency[adjacency-index="1"]'
)
result = []
for interface in interfaces:
if interface != "system": # Skip the system interface
# There could be a LookupError if there is no adjacency index at all
try:
# Check if the adjacency is not operationally up
if c.running.get(specific_path.format(interface))['oper-state'].data != "up":
# If it isn't then we add to the result list to signify a failure
result.append(["The ISIS adjacency on interface {} is not up".format(interface)])
except LookupError:
result.append(["The ISIS adjacency on interface {} is not up".format(interface)])
return result
def bgp(c):
"""
This function first uses the get_list_keys method to get a list of all the BGP peers.
It then iterates through the list of peers and checks the operational state of each peering.
If the operational state is not 'Established', it adds a message to the result list.
The function returns the result list, which will be empty if all BGP peerings are Established.
"""
top_path = '/nokia-state:state/router[router-name="Base"]/bgp/neighbor'
peerings = c.running.get_list_keys(top_path)
specific_path = (
'/nokia-state:state/router[router-name="Base"]/bgp/neighbor'
'[ip-address="{}"]/statistics'
)
result = []
for peer in peerings:
# Check if the peering is not Established
if c.running.get(specific_path.format(peer))['session-state'].data != "Established":
# If it isn't then we add to the result list to signify a failure
result.append(["The BGP peer {} is not Established".format(peer)])
return result
def cpu(c):
"""
This function collects the cpu-usage of the businest CPU core over a 5 minute period.
The collected data is a string, so it is converted to a float and then to an int to show a whole number
"""
path = '/nokia-state:state/system/cpu[sample-period="300"]/summary/busiest-core-utilization'
result = c.running.get(path)['cpu-usage'].data
return int(float(result))
def memory(c):
"""
This function collects the memory in use and available from the system
From this it calculates the memory utilization and formats it the same way as the CPU utilization
"""
path = "/nokia-state:state/system/memory-pools/summary"
result = c.running.get(path)
total_size = result['current-total-size'].data
available = result['available-memory'].data
total_mem = total_size + available
utilization = (total_size / total_mem) * 100
pretty_utilization = "{}%".format(int(utilization))
return pretty_utilization
def main():
# Start the session
c = connect()
# Create two lists to hold the results table data and any failure table data
result_table_data = []
failure_table_data = []
# Card data
card_result = card(c)
if card_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["CARD STATUS", "FAIL"])
failure_table_data.extend(card_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["CARD STATUS", "PASS"])
# ISIS data
isis_result = isis(c)
if isis_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["ISIS STATUS", "FAIL"])
failure_table_data.extend(isis_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["ISIS STATUS", "PASS"])
# BGP data
bgp_result = bgp(c)
if bgp_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["BGP STATUS", "FAIL"])
failure_table_data.extend(bgp_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["BGP STATUS", "PASS"])
# CPU data
result_table_data.append(["BUSIEST CPU CORE OVER 5MIN", "{}%".format(cpu(c))])
# Memory data
result_table_data.append(["MEMORY UTILIZATION", memory(c)])
# Disconnect the session
c.disconnect()
# Print the results table
print_table(result_table_data, [(60, "Test"), (19, "Result")], "Self Test Results")
# Print the failure table only if there are any failures
if failure_table_data:
print("\n")
print_table(failure_table_data, [(79, "Failures")], "Details Of Failed Tests")
# Call the main function
if __name__ == '__main__':
main()
1.4 Extension tasks#
1.4.1 Get MDA status#
Create a function that reads the status of the MDAs and add it to your show command
Testing MDAs (read only if you are stuck)
def mda(c):
"""
This function needs to be a bit different as we have to run get_list_keys twice, once
to get a list of cards, and again against each card to get a list of MDAs in that card.
MDAs are also different in that this state data also shows for MDAs that are not
provisioned so we also filter off anything that has an oper-state of 'empty'
"""
top_path = "/nokia-state:state/card"
cards = c.running.get_list_keys(top_path)
mda_path = '/nokia-state:state/card[slot-number="{}"]/mda'
specific_path = '/nokia-state:state/card[slot-number="{}"]/mda[mda-slot="{}"]/hardware-data'
result = []
for card in cards:
mdas = c.running.get_list_keys(mda_path.format(card))
# Check if the card is not operationally in-service
for mda in mdas:
if c.running.get(
specific_path.format(card, mda)
)['oper-state'].data not in ["in-service", "empty"]:
# If it isn't then we add to the result list to signify a failure
result.append(["MDA {}/{} is not operationally in-service".format(card, mda)])
return result
# MDA data
mda_result = mda(c)
if mda_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["MDA STATUS", "FAIL"])
failure_table_data.extend(mda_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["MDA STATUS", "PASS"])
1.4.2 Get MPLS status#
Create a function that checks the health of MPLS tunnels and add it to your show command.
Stop and take time to think here
If our nodes were using LDP or RSVP-TE then this would be an easy task similar to the previous functions as the MPLS would be stateful. But our lab is using SR-ISIS. What would you pick up on to signify that SR-ISIS tunnels are healthy?
Testing MPLS (read only if you are stuck)
def mpls(c):
"""
This function checks that the SR-LFA coverage is 100% for the node-sid topology.
"""
path = (
'/nokia-state:state/router[router-name="Base"]/isis[isis-instance="0"]'
'/sr-lfa-coverage[level-number="2"][multi-topology-id="ipv4"]'
'[sid-type="node-sid"][protocol-version="ipv4"]'
)
data = c.running.get(path)['lfa-covered-percent'].data
result = []
if data != 100:
# If it isn't then we add to the result list to signify a failure
result.append(["The ISIS SR-LFA coverage is not 100%"])
return result
# MPLS data
mpls_result = mpls(c)
if mpls_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["MPLS STATUS", "FAIL"])
failure_table_data.extend(mpls_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["MPLS STATUS", "PASS"])
1.4.3 Get service status#
Create a function that checks that services are up. Different service types are kept in different paths. How would you write your function to handle this?
Testing services (read only if you are stuck)
def svc(c):
"""
Different service types are in different paths, so for this function we need to check both
separately and then combine the results into one list. For this example we are just checking
VPLS and IES services. We also need to check that the IES service is not the internal one
which is used for the system
"""
top_path = "/nokia-state:state/service/{}"
vpls_services = c.running.get_list_keys(top_path.format("vpls"))
ies_services = c.running.get_list_keys(top_path.format("ies"))
specific_path = '/nokia-state:state/service/{}[service-name="{}"]'
result = []
for vpls in vpls_services:
# Check if the service is not up
if c.running.get(specific_path.format('vpls', vpls))['oper-state'].data != "up":
# If it isn't then we add to the result list to signify a failure
result.append(["The VPLS service {} is not up".format(vpls)])
for ies in ies_services:
if ies != "_tmnx_InternalIesService": # Skip the internal IES service
# Check if the service is not up
if c.running.get(specific_path.format('ies', ies))['oper-state'].data != "up":
# If it isn't then we add to the result list to signify a failure
result.append(["The IES service {} is not up".format(ies)])
return result
# svc data
svc_result = svc(c)
if svc_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["SERVICE STATUS", "FAIL"])
failure_table_data.extend(svc_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["SERVICE STATUS", "PASS"])
1.4.4 Create a ping test#
Create a function that runs some ping tests to different destinations in the network that checks for any packet loss. What destinations would you choose? Ping isn't listed in the state
tree so instead we need to use the connection.action method.
Basic ping tests (read only if you are stuck)
def ping(c):
"""
This function takes a connection object and pings destinations of your choice.
It uses the c.action method to ping and we extract just the packet loss from the result.
If any packet loss is seen then we flag a failure by populating the result list and return.
"""
ping_path = '/nokia-oper-global:global-operations/ping'
ping_data = {'destination': '{}',
'output-format': 'summary',
'interval': "0.01",
'timeout': 1}
ping_dest = [
"10.46.15.22", # PE2
"10.46.15.23", # PE3
"10.46.15.24" # PE4
]
result = []
for dest in ping_dest:
ping_data['destination'] = dest
the_ping = c.action(ping_path, ping_data)
pktloss = the_ping['results']['summary']['statistics']['packets']['loss'].data
if pktloss != '0.0':
# If packet loss isn't 0.0 then we add to the result list to signify a failure
result.append(["Ping to {} saw packet loss".format(dest)])
return result
# ping tests
ping_result = ping(c)
if ping_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["PING TESTS", "FAIL"])
failure_table_data.extend(ping_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["PING TESTS", "PASS"])
1.4.5 Check CF3 utilization#
Create a function that checks the utilization of the CF3 card. What do we need to think about here?
Check CF3 utilization (read only if you are stuck)
def cf3(c):
"""
This function collects the utilization of the CF3 flash device on the active CPM.
You need to find out first which CPM is active in order to do this.
"""
active_cpm = c.running.get("/nokia-state:state/system/active-cpm-slot").data
flash_path = '/nokia-state:state/cpm[cpm-slot="{}"]/flash[flash-id="3"]'.format(active_cpm)
result = c.running.get(flash_path)['percent-used'].data
return "{}%".format(result)
1.4.6 Get the fan tray speeds#
Create a function that checks the fan speed of the fan trays in the system. How will you make your function find the number fan trays in this particular system and adjust the result output accordingly?
Note
Virtual machines may show a fan speed of 0% as there aren't any real fan trays
Check fan speed (read only if you are stuck)
def fan(c):
"""
This function first uses the get_list_keys method to get a list of all the fan trays
It then iterates through the list of trays and gets their speed
For each tray, the speed is added to the result list
"""
top_path = '/nokia-state:state/chassis[chassis-class="router"][chassis-number="1"]/fan'
fan_trays = c.running.get_list_keys(top_path)
specific_path = (
'/nokia-state:state/chassis[chassis-class="router"][chassis-number="1"]/fan[fan-slot="{}"]'
)
result = []
for tray in fan_trays:
speed = c.running.get(specific_path.format(tray))['speed'].data
result.append(["FAN TRAY {} SPEED".format(tray), "{}%".format(speed)])
return result
1.4.7 Extension tasks complete#
Now you have completed all the optional tasks. Get your script and alias updated and test it.
Full script at this stage
#!/usr/bin/env python3
# Import the required modules
from pysros.management import connect
from pysros.pprint import Table
# Our function from earlier that helps us print pretty show commands
def print_table(rows, cols, title):
"""
This function uses the Table class from the pysros.pprint module to print a table
in the style of the Nokia SROS CLI.
It takes a list of lists as the rows.
It takes a list of tuples which are the column names and their width.
It takes a title string which is printed out at the top of the table.
"""
# Initialize the Table object with the heading and columns.
table = Table(title, cols)
table.print(rows) # Print the table
def card(c):
"""
This function first uses the get_list_keys method to get a list of all the cards in the system.
It then iterates through the list of cards and checks the operational state of each card.
If the operational state is not 'in-service', it adds a failure string message within a list
to the result list. (list of lists). The function returns the result list, which will be empty
if all provisioned cards are operationally in-service.
"""
top_path = "/nokia-state:state/card"
cards = c.running.get_list_keys(top_path)
specific_path = '/nokia-state:state/card[slot-number="{}"]/hardware-data'
result = []
for card in cards:
# Check if the card is not operationally in-service
if c.running.get(specific_path.format(card))['oper-state'].data not in ["in-service", "empty"]:
# If it isn't then we add to the result list to signify a failure
result.append(["Card {} is not operationally in-service".format(card)])
return result
def mda(c):
"""
This function needs to be a bit different as we have to run get_list_keys twice, once
to get a list of cards, and again against each card to get a list of MDAs in that card.
MDAs are also different in that this state data also shows for MDAs that are not
provisioned so we also filter off anything that has an oper-state of 'empty'
"""
top_path = "/nokia-state:state/card"
cards = c.running.get_list_keys(top_path)
mda_path = '/nokia-state:state/card[slot-number="{}"]/mda'
specific_path = '/nokia-state:state/card[slot-number="{}"]/mda[mda-slot="{}"]/hardware-data'
result = []
for card in cards:
mdas = c.running.get_list_keys(mda_path.format(card))
# Check if the card is not operationally in-service
for mda in mdas:
if c.running.get(
specific_path.format(card, mda)
)['oper-state'].data not in ["in-service", "empty"]:
# If it isn't then we add to the result list to signify a failure
result.append(["MDA {}/{} is not operationally in-service".format(card, mda)])
return result
def isis(c):
"""
This function first uses the get_list_keys method to get a list of all the ISIS interfaces.
It then iterates through the list of interfaces and checks the operational state of each
adjacency. If the operational state is not 'up', it adds a message to the result list.
The function returns the result list, which will be empty if all adjacencies are
operationally up.
"""
top_path = '/nokia-state:state/router[router-name="Base"]/isis[isis-instance="0"]/interface'
interfaces = c.running.get_list_keys(top_path)
specific_path = (
'/nokia-state:state/router[router-name="Base"]/isis[isis-instance="0"]'
'/interface[interface-name="{}"]/adjacency[adjacency-index="1"]'
)
result = []
for interface in interfaces:
if interface != "system": # Skip the system interface
# There could be a LookupError if there is no adjacency index at all
try:
# Check if the adjacency is not operationally up
if c.running.get(specific_path.format(interface))['oper-state'].data != "up":
# If it isn't then we add to the result list to signify a failure
result.append(["The ISIS adjacency on interface {} is not up".format(interface)])
except LookupError:
result.append(["The ISIS adjacency on interface {} is not up".format(interface)])
return result
def mpls(c):
"""
This function checks that the SR-LFA coverage is 100% for the node-sid topology.
"""
path = (
'/nokia-state:state/router[router-name="Base"]/isis[isis-instance="0"]'
'/sr-lfa-coverage[level-number="2"][multi-topology-id="ipv4"]'
'[sid-type="node-sid"][protocol-version="ipv4"]'
)
data = c.running.get(path)['lfa-covered-percent'].data
result = []
if data != 100:
# If it isn't then we add to the result list to signify a failure
result.append(["The ISIS SR-LFA coverage is not 100%"])
return result
def bgp(c):
"""
This function first uses the get_list_keys method to get a list of all the BGP peers.
It then iterates through the list of peers and checks the operational state of each peering.
If the operational state is not 'Established', it adds a message to the result list.
The function returns the result list, which will be empty if all BGP peerings are Established.
"""
top_path = '/nokia-state:state/router[router-name="Base"]/bgp/neighbor'
peerings = c.running.get_list_keys(top_path)
specific_path = (
'/nokia-state:state/router[router-name="Base"]/bgp/neighbor'
'[ip-address="{}"]/statistics'
)
result = []
for peer in peerings:
# Check if the peering is not Established
if c.running.get(specific_path.format(peer))['session-state'].data != "Established":
# If it isn't then we add to the result list to signify a failure
result.append(["The BGP peer {} is not Established".format(peer)])
return result
def svc(c):
"""
Different service types are in different paths, so for this function we need to check both
separately and then combine the results into one list. For this example we are just checking
VPLS and IES services. We also need to check that the IES service is not the internal one
which is used for the system
"""
top_path = "/nokia-state:state/service/{}"
vpls_services = c.running.get_list_keys(top_path.format("vpls"))
ies_services = c.running.get_list_keys(top_path.format("ies"))
specific_path = '/nokia-state:state/service/{}[service-name="{}"]'
result = []
for vpls in vpls_services:
# Check if the service is not up
if c.running.get(specific_path.format('vpls', vpls))['oper-state'].data != "up":
# If it isn't then we add to the result list to signify a failure
result.append(["The VPLS service {} is not up".format(vpls)])
for ies in ies_services:
if ies != "_tmnx_InternalIesService": # Skip the internal IES service
# Check if the service is not up
if c.running.get(specific_path.format('ies', ies))['oper-state'].data != "up":
# If it isn't then we add to the result list to signify a failure
result.append(["The IES service {} is not up".format(ies)])
return result
def ping(c):
"""
This function takes a connection object and pings destinations of your choice.
It uses the c.action method to ping and we extract just the packet loss from the result.
If any packet loss is seen then we flag a failure by populating the result list and return.
"""
ping_path = '/nokia-oper-global:global-operations/ping'
ping_data = {'destination': '{}',
'output-format': 'summary',
'interval': "0.01",
'timeout': 1}
ping_dest = [
"10.46.15.22", # PE2
"10.46.15.23", # PE3
"10.46.15.24" # PE4
]
result = []
for dest in ping_dest:
ping_data['destination'] = dest
the_ping = c.action(ping_path, ping_data)
pktloss = the_ping['results']['summary']['statistics']['packets']['loss'].data
if pktloss != '0.0':
# If packet loss isn't 0.0 then we add to the result list to signify a failure
result.append(["Ping to {} saw packet loss".format(dest)])
return result
def cpu(c):
"""
This function collects the cpu-usage of the businest CPU core over a 5 minute period.
The collected data is a string, so it is converted to a float and then to an int to show a whole number
"""
path = '/nokia-state:state/system/cpu[sample-period="300"]/summary/busiest-core-utilization'
result = c.running.get(path)['cpu-usage'].data
return int(float(result))
def memory(c):
"""
This function collects the memory in use and available from the system
From this it calculates the memory utilization and formats it the same way as the CPU utilization
"""
path = "/nokia-state:state/system/memory-pools/summary"
result = c.running.get(path)
total_size = result['current-total-size'].data
available = result['available-memory'].data
total_mem = total_size + available
utilization = (total_size / total_mem) * 100
pretty_utilization = "{}%".format(int(utilization))
return pretty_utilization
def cf3(c):
"""
This function collects the utilization of the CF3 flash device on the active CPM.
You need to find out first which CPM is active in order to do this.
"""
active_cpm = c.running.get("/nokia-state:state/system/active-cpm-slot").data
flash_path = '/nokia-state:state/cpm[cpm-slot="{}"]/flash[flash-id="3"]'.format(active_cpm)
result = c.running.get(flash_path)['percent-used'].data
return "{}%".format(result)
def fan(c):
"""
This function first uses the get_list_keys method to get a list of all the fan trays
It then iterates through the list of trays and gets their speed
For each tray, the speed is added to the result list
"""
top_path = '/nokia-state:state/chassis[chassis-class="router"][chassis-number="1"]/fan'
fan_trays = c.running.get_list_keys(top_path)
specific_path = (
'/nokia-state:state/chassis[chassis-class="router"][chassis-number="1"]/fan[fan-slot="{}"]'
)
result = []
for tray in fan_trays:
speed = c.running.get(specific_path.format(tray))['speed'].data
result.append(["FAN TRAY {} SPEED".format(tray), "{}%".format(speed)])
return result
def main():
# Start the session
c = connect()
# Create two lists to hold the results table data and any failure table data
result_table_data = []
failure_table_data = []
# Card data
card_result = card(c)
if card_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["CARD STATUS", "FAIL"])
failure_table_data.extend(card_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["CARD STATUS", "PASS"])
# MDA data
mda_result = mda(c)
if mda_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["MDA STATUS", "FAIL"])
failure_table_data.extend(mda_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["MDA STATUS", "PASS"])
# ISIS data
isis_result = isis(c)
if isis_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["ISIS STATUS", "FAIL"])
failure_table_data.extend(isis_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["ISIS STATUS", "PASS"])
# BGP data
bgp_result = bgp(c)
if bgp_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["BGP STATUS", "FAIL"])
failure_table_data.extend(bgp_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["BGP STATUS", "PASS"])
# MPLS data
mpls_result = mpls(c)
if mpls_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["MPLS STATUS", "FAIL"])
failure_table_data.extend(mpls_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["MPLS STATUS", "PASS"])
# svc data
svc_result = svc(c)
if svc_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["SERVICE STATUS", "FAIL"])
failure_table_data.extend(svc_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["SERVICE STATUS", "PASS"])
# ping tests
ping_result = ping(c)
if ping_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["PING TESTS", "FAIL"])
failure_table_data.extend(ping_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["PING TESTS", "PASS"])
# CPU data
result_table_data.append(["BUSIEST CPU CORE OVER 5MIN", "{}%".format(cpu(c))])
# Memory data
result_table_data.append(["MEMORY UTILIZATION", memory(c)])
# CF3 flash data
result_table_data.append(["CF3 UTILIZATION", cf3(c)])
# Fan data
# We 'extend' the result table list this time because of multiple fan trays
result_table_data.extend(fan(c))
# Disconnect the session
c.disconnect()
# Print the results table
print_table(result_table_data, [(60, "Test"), (19, "Result")], "Self Test Results")
# Print the failure table only if there are any failures
if failure_table_data:
print("\n")
print_table(failure_table_data, [(79, "Failures")], "Details Of Failed Tests")
# Call the main function
if __name__ == '__main__':
main()
1.5 Legendary stretch goal tasks#
1.5.1 Get non-zero network Q drops#
Create some code that creates a new table with any non-zero network QoS drops.
Stop and take time to think here
- Can you find some state information in MD-CLI that shows network queue drops?
- How will you differentiate between network ports and access/connector ports?
- How will you structure your new tables?
Network QoS drops (check only if you are stuck)
def collect_qos_drops(c):
"""
This function takes a connection object then gets a list of all ports.
It looks for ports that have network queues and finds any that are non-zero.
If any are non-zero then it gathers the info in lists of lists for the table function.
"""
# Get a list of the ports
port_list = c.running.get_list_keys("/nokia-state:state/port")
# A list to store the data in
data = []
for port in port_list:
# Grab all the info on that port
port_info = c.running.get('/nokia-state:state/port[port-id="{}"]'.format(port))
if port_info['port-class'].data != 'connector':
# If the port doesn't have network queues we ignore it
try:
port_info['network']['egress']['queue']
except:
pass
else:
# Collect the numbers of egress and ingress queues
egress_queues = sorted(port_info['network']['egress']['queue'].keys())
ingress_queues = sorted(port_info['network']['ingress']['queue'].keys())
# Go through each egress queue looking for non-zero drops
for q_e in egress_queues:
in_drops = (
port_info['network']['egress']['queue'][q_e]
['statistics']['in-profile-dropped-packets'].data
)
out_drops = (
port_info['network']['egress']['queue'][q_e]
['statistics']['out-profile-dropped-packets'].data
)
if in_drops != 0:
data.append([port, q_e, 'E', 'in', in_drops])
if out_drops != 0:
data.append([port, q_e, 'E', 'out', out_drops])
# Go through each ingress queue looking for non-zero drops
for q_i in ingress_queues:
in_drops = (
port_info['network']['ingress']['queue'][q_i]
['statistics']['in-profile-dropped-packets'].data
)
out_drops = (
port_info['network']['ingress']['queue'][q_i]
['statistics']['out-profile-dropped-packets'].data
)
if in_drops != 0:
data.append([port, q_i, 'I', 'in', in_drops])
if out_drops != 0:
data.append([port, q_i, 'I', 'out', out_drops])
return data
# Collect data on QoS drops
qos_table_title = "Any non-zero network QoS drops"
qos_table = collect_qos_drops(c)
c.disconnect() # Already in main()
print("\n") # Add an extra space in between tables
print_table(
qos_table,
[(15, "Port"), (10, "Q"), (10, "I/E"), (10, "Profile"), (34, "Drops")],
qos_table_title
)
Stop and take time to think here
- How can you test this?
- How would you create some network qos drops on queues without pushing a load of traffic through?
Answer
Setting the MBS to 0 on a queue makes a queue drop all traffic and will increment your QoS drop counter.
/configure port 1/1/c1/1 ethernet network egress queue-policy "test"
/configure qos network-queue "test" queue 1 mbs 0.0
Don't forget to remove the queue-policy from the port when finished so your network continues to work!
1.5.2 Get non-zero port errors#
Create some code that creates a new table with any non-zero port errors.
Note
It isn't an easy task to test this in a lab scenario. You could if you wish change to packet stats instead of errors just to see it all working.
Port errors (check only if you are stuck)
def collect_port_errors(c):
"""
This function takes a connection object then gets a list of all ports.
It looks for ports that have any non-zero Ethernet errors.
If any are non-zero then it gathers the info in lists of lists for the table function.
"""
# Get a list of the ports
port_list = c.running.get_list_keys("/nokia-state:state/port")
# A list to store the data in
data = []
for port in port_list:
# Grab all the info on that port
port_info = c.running.get('/nokia-state:state/port[port-id="{}"]'.format(port))
if port_info['port-class'].data != 'connector':
# If the port doesn't have Ethernet statistics we ignore it
try:
port_info['ethernet']['statistics']
except:
pass
else:
# Grab the in and out error data
in_errors = port_info['ethernet']['statistics']['in-errors'].data
out_errors = port_info['ethernet']['statistics']['out-errors'].data
# If they are not zero we add the data to the list to be printed
if in_errors != 0:
data.append([port, 'I', in_errors])
if out_errors != 0:
data.append([port, 'E', out_errors])
return data
1.5.3 Create logic to gain delta stats#
Create some new code that collects the non-zero QoS drops and port errors over an interval and only print those tables if the QoS drops or port errors are incrementing over that period.
Stop and take time to think here
- What do we need to add to allow a user to provide an argument to our script?
- What logic can we use to allow the script to be used with and without arguments?
- How can we ensure the user provides a valid argument?
- What logic will you use in a new function to compare the two iterations of data?
- How can we handle the situation where QoS drops or port errors start incrementing between iterations
Two iterations of data for delta stats (check only if you are stuck)
At the very top of your code you will want to add in the time and sys libraries so that you can pause the script for the specified period and get an argument from the user
def compare_lists(list1, list2, list_item_to_compare):
"""
This function takes two lists of lists and compares a certain index in them.
It matches items based on their unique identifiers (the rest of the content in the list).
If items are in list2 but not in list1, then they must have started incrementing
during the collection interval so we add them straight to the new list.
"""
new_list = []
# Create dictionaries to map unique identifiers to list items
def create_dict(lst):
return {
tuple(
item[:list_item_to_compare] + item[list_item_to_compare + 1:]
): item
for item in lst
}
dict1 = create_dict(list1)
dict2 = create_dict(list2)
# Find new items in list2 that are not in list1
for key in set(dict2.keys()) - set(dict1.keys()):
new_list.append(dict2[key])
# Compare items that exist in both lists
for key in set(dict1.keys()) & set(dict2.keys()):
diff = dict2[key][list_item_to_compare] - dict1[key][list_item_to_compare]
if diff != 0:
incrementing_list = dict1[key][:]
incrementing_list[list_item_to_compare] = diff
new_list.append(incrementing_list)
return new_list
Add this code to main()
before the c.disconnect()
line so the script can be used with and without an argument.
# Only run this if a user has supplied an argument with the script
if len(sys.argv) != 1:
# Grab the user supplied argument
try:
interval = int(sys.argv[1])
# If the user hasn't provided an integer let's complain
except:
print("Enter an integer argument for the number of seconds you want to collect for")
sys.exit()
else:
print("Collecting second iteration in {} seconds. Please wait".format(interval))
time.sleep(interval)
# Collect another set of data after the interval has elapsed
qos_table2 = collect_qos_drops(c)
error_table2 = collect_port_errors(c)
# Run the previous and new data against the compare_lists function
new_qos_table = compare_lists(qos_table, qos_table2, 4)
new_error_table = compare_lists(error_table, error_table2, 2)
# Update the data we show in the table so delta values are now seen.
qos_table = new_qos_table
error_table = new_error_table
# Update the title of the tables to show the delta values
qos_table_title = "Any incrementing network QoS drops over {} seconds".format(interval)
error_table_title = "Any incrementing port errors over {} seconds".format(interval)
# Disconnect the session
c.disconnect() # Already configured
1.6 Summary and review#
Congratulations for getting through all the tasks, extension tasks and legendary stretch goal tasks! Get your script and alias updated and test it. If you have got to this stage you have achieved the following:
- You have established a connection with pySROS either remotely or locally on a MD-CLI device.
- You have learnt how to navigate and find state data in MD-CLI.
- You have used the
get_list_keys
method to find out what entities a node has been configured with. - You have learnt how to output a table that looks like a SR OS show command using the
pprint.Table
method. - You have created an alias in model-driven SR OS that exposes your code as if it were a native command.
- You have written some complicated code that gets multiple iterations of data over a user defined period and calculates the difference.
This is a pretty extensive list of achievements! Well done!
If you're hungry for more then have a go at another activity, or try to expand upon this one if you have some more ideas. Why not try and add packets per second to the final delta stats?
Full legendary script (check only if you are stuck)
#!/usr/bin/env python3
# Import the required modules
from pysros.management import connect
from pysros.pprint import Table
import time
import sys
# Our function from earlier that helps us print pretty show commands
def print_table(rows, cols, title):
"""
This function uses the Table class from the pysros.pprint module to print a table
in the style of the Nokia SROS CLI.
It takes a list of lists as the rows.
It takes a list of tuples which are the column names and their width.
It takes a title string which is printed out at the top of the table.
"""
# Initialize the Table object with the heading and columns.
table = Table(title, cols)
table.print(rows) # Print the table
def card(c):
"""
This function first uses the get_list_keys method to get a list of all the cards in the system.
It then iterates through the list of cards and checks the operational state of each card.
If the operational state is not 'in-service', it adds a failure string message within a list
to the result list. (list of lists). The function returns the result list, which will be empty
if all provisioned cards are operationally in-service.
"""
top_path = "/nokia-state:state/card"
cards = c.running.get_list_keys(top_path)
specific_path = '/nokia-state:state/card[slot-number="{}"]/hardware-data'
result = []
for card in cards:
# Check if the card is not operationally in-service
if c.running.get(specific_path.format(card))['oper-state'].data not in ["in-service", "empty"]:
# If it isn't then we add to the result list to signify a failure
result.append(["Card {} is not operationally in-service".format(card)])
return result
def mda(c):
"""
This function needs to be a bit different as we have to run get_list_keys twice, once
to get a list of cards, and again against each card to get a list of MDAs in that card.
MDAs are also different in that this state data also shows for MDAs that are not
provisioned so we also filter off anything that has an oper-state of 'empty'
"""
top_path = "/nokia-state:state/card"
cards = c.running.get_list_keys(top_path)
mda_path = '/nokia-state:state/card[slot-number="{}"]/mda'
specific_path = '/nokia-state:state/card[slot-number="{}"]/mda[mda-slot="{}"]/hardware-data'
result = []
for card in cards:
mdas = c.running.get_list_keys(mda_path.format(card))
# Check if the card is not operationally in-service
for mda in mdas:
if c.running.get(
specific_path.format(card, mda)
)['oper-state'].data not in ["in-service", "empty"]:
# If it isn't then we add to the result list to signify a failure
result.append(["MDA {}/{} is not operationally in-service".format(card, mda)])
return result
def isis(c):
"""
This function first uses the get_list_keys method to get a list of all the ISIS interfaces.
It then iterates through the list of interfaces and checks the operational state of each
adjacency. If the operational state is not 'up', it adds a message to the result list.
The function returns the result list, which will be empty if all adjacencies are
operationally up.
"""
top_path = '/nokia-state:state/router[router-name="Base"]/isis[isis-instance="0"]/interface'
interfaces = c.running.get_list_keys(top_path)
specific_path = (
'/nokia-state:state/router[router-name="Base"]/isis[isis-instance="0"]'
'/interface[interface-name="{}"]/adjacency[adjacency-index="1"]'
)
result = []
for interface in interfaces:
if interface != "system": # Skip the system interface
# There could be a LookupError if there is no adjacency index at all
try:
# Check if the adjacency is not operationally up
if c.running.get(specific_path.format(interface))['oper-state'].data != "up":
# If it isn't then we add to the result list to signify a failure
result.append(["The ISIS adjacency on interface {} is not up".format(interface)])
except LookupError:
result.append(["The ISIS adjacency on interface {} is not up".format(interface)])
return result
def mpls(c):
"""
This function checks that the SR-LFA coverage is 100% for the node-sid topology.
"""
path = (
'/nokia-state:state/router[router-name="Base"]/isis[isis-instance="0"]'
'/sr-lfa-coverage[level-number="2"][multi-topology-id="ipv4"]'
'[sid-type="node-sid"][protocol-version="ipv4"]'
)
data = c.running.get(path)['lfa-covered-percent'].data
result = []
if data != 100:
# If it isn't then we add to the result list to signify a failure
result.append(["The ISIS SR-LFA coverage is not 100%"])
return result
def bgp(c):
"""
This function first uses the get_list_keys method to get a list of all the BGP peers.
It then iterates through the list of peers and checks the operational state of each peering.
If the operational state is not 'Established', it adds a message to the result list.
The function returns the result list, which will be empty if all BGP peerings are Established.
"""
top_path = '/nokia-state:state/router[router-name="Base"]/bgp/neighbor'
peerings = c.running.get_list_keys(top_path)
specific_path = (
'/nokia-state:state/router[router-name="Base"]/bgp/neighbor'
'[ip-address="{}"]/statistics'
)
result = []
for peer in peerings:
# Check if the peering is not Established
if c.running.get(specific_path.format(peer))['session-state'].data != "Established":
# If it isn't then we add to the result list to signify a failure
result.append(["The BGP peer {} is not Established".format(peer)])
return result
def svc(c):
"""
Different service types are in different paths, so for this function we need to check both
separately and then combine the results into one list. For this example we are just checking
VPLS and IES services. We also need to check that the IES service is not the internal one
which is used for the system
"""
top_path = "/nokia-state:state/service/{}"
vpls_services = c.running.get_list_keys(top_path.format("vpls"))
ies_services = c.running.get_list_keys(top_path.format("ies"))
specific_path = '/nokia-state:state/service/{}[service-name="{}"]'
result = []
for vpls in vpls_services:
# Check if the service is not up
if c.running.get(specific_path.format('vpls', vpls))['oper-state'].data != "up":
# If it isn't then we add to the result list to signify a failure
result.append(["The VPLS service {} is not up".format(vpls)])
for ies in ies_services:
if ies != "_tmnx_InternalIesService": # Skip the internal IES service
# Check if the service is not up
if c.running.get(specific_path.format('ies', ies))['oper-state'].data != "up":
# If it isn't then we add to the result list to signify a failure
result.append(["The IES service {} is not up".format(ies)])
return result
def ping(c):
"""
This function takes a connection object and pings destinations of your choice.
It uses the c.action method to ping and we extract just the packet loss from the result.
If any packet loss is seen then we flag a failure by populating the result list and return.
"""
ping_path = '/nokia-oper-global:global-operations/ping'
ping_data = {'destination': '{}',
'output-format': 'summary',
'interval': "0.01",
'timeout': 1}
ping_dest = [
"10.46.15.22", # PE2
"10.46.15.23", # PE3
"10.46.15.24" # PE4
]
result = []
for dest in ping_dest:
ping_data['destination'] = dest
the_ping = c.action(ping_path, ping_data)
pktloss = the_ping['results']['summary']['statistics']['packets']['loss'].data
if pktloss != '0.0':
# If packet loss isn't 0.0 then we add to the result list to signify a failure
result.append(["Ping to {} saw packet loss".format(dest)])
return result
def cpu(c):
"""
This function collects the cpu-usage of the businest CPU core over a 5 minute period.
The collected data is a string, so it is converted to a float and then to an int to show a whole number
"""
path = '/nokia-state:state/system/cpu[sample-period="300"]/summary/busiest-core-utilization'
result = c.running.get(path)['cpu-usage'].data
return int(float(result))
def memory(c):
"""
This function collects the memory in use and available from the system
From this it calculates the memory utilization and formats it the same way as the CPU utilization
"""
path = "/nokia-state:state/system/memory-pools/summary"
result = c.running.get(path)
total_size = result['current-total-size'].data
available = result['available-memory'].data
total_mem = total_size + available
utilization = (total_size / total_mem) * 100
pretty_utilization = "{}%".format(int(utilization))
return pretty_utilization
def cf3(c):
"""
This function collects the utilization of the CF3 flash device on the active CPM.
You need to find out first which CPM is active in order to do this.
"""
active_cpm = c.running.get("/nokia-state:state/system/active-cpm-slot").data
flash_path = '/nokia-state:state/cpm[cpm-slot="{}"]/flash[flash-id="3"]'.format(active_cpm)
result = c.running.get(flash_path)['percent-used'].data
return "{}%".format(result)
def fan(c):
"""
This function first uses the get_list_keys method to get a list of all the fan trays
It then iterates through the list of trays and gets their speed
For each tray, the speed is added to the result list
"""
top_path = '/nokia-state:state/chassis[chassis-class="router"][chassis-number="1"]/fan'
fan_trays = c.running.get_list_keys(top_path)
specific_path = (
'/nokia-state:state/chassis[chassis-class="router"][chassis-number="1"]/fan[fan-slot="{}"]'
)
result = []
for tray in fan_trays:
speed = c.running.get(specific_path.format(tray))['speed'].data
result.append(["FAN TRAY {} SPEED".format(tray), "{}%".format(speed)])
return result
def collect_qos_drops(c):
"""
This function takes a connection object then gets a list of all ports.
It looks for ports that have network queues and finds any that are non-zero.
If any are non-zero then it gathers the info in lists of lists for the table function.
"""
# Get a list of the ports
port_list = c.running.get_list_keys("/nokia-state:state/port")
# A list to store the data in
data = []
for port in port_list:
# Grab all the info on that port
port_info = c.running.get('/nokia-state:state/port[port-id="{}"]'.format(port))
if port_info['port-class'].data != 'connector':
# If the port doesn't have network queues we ignore it
try:
port_info['network']['egress']['queue']
except:
pass
else:
# Collect the numbers of egress and ingress queues
egress_queues = sorted(port_info['network']['egress']['queue'].keys())
ingress_queues = sorted(port_info['network']['ingress']['queue'].keys())
# Go through each egress queue looking for non-zero drops
for q_e in egress_queues:
in_drops = (
port_info['network']['egress']['queue'][q_e]
['statistics']['in-profile-dropped-packets'].data
)
out_drops = (
port_info['network']['egress']['queue'][q_e]
['statistics']['out-profile-dropped-packets'].data
)
if in_drops != 0:
data.append([port, q_e, 'E', 'in', in_drops])
if out_drops != 0:
data.append([port, q_e, 'E', 'out', out_drops])
# Go through each ingress queue looking for non-zero drops
for q_i in ingress_queues:
in_drops = (
port_info['network']['ingress']['queue'][q_i]
['statistics']['in-profile-dropped-packets'].data
)
out_drops = (
port_info['network']['ingress']['queue'][q_i]
['statistics']['out-profile-dropped-packets'].data
)
if in_drops != 0:
data.append([port, q_i, 'I', 'in', in_drops])
if out_drops != 0:
data.append([port, q_i, 'I', 'out', out_drops])
return data
def collect_port_errors(c):
"""
This function takes a connection object then gets a list of all ports.
It looks for ports that have any non-zero Ethernet errors.
If any are non-zero then it gathers the info in lists of lists for the table function.
"""
# Get a list of the ports
port_list = c.running.get_list_keys("/nokia-state:state/port")
# A list to store the data in
data = []
for port in port_list:
# Grab all the info on that port
port_info = c.running.get('/nokia-state:state/port[port-id="{}"]'.format(port))
if port_info['port-class'].data != 'connector':
# If the port doesn't have Ethernet statistics we ignore it
try:
port_info['ethernet']['statistics']
except:
pass
else:
# Grab the in and out error data
in_errors = port_info['ethernet']['statistics']['in-errors'].data
out_errors = port_info['ethernet']['statistics']['out-errors'].data
# If they are not zero we add the data to the list to be printed
if in_errors != 0:
data.append([port, 'I', in_errors])
if out_errors != 0:
data.append([port, 'E', out_errors])
return data
def compare_lists(list1, list2, list_item_to_compare):
"""
This function takes two lists of lists and compares a certain index in them.
It matches items based on their unique identifiers (the rest of the content in the list).
If items are in list2 but not in list1, then they must have started incrementing
during the collection interval so we add them straight to the new list.
"""
new_list = []
# Create dictionaries to map unique identifiers to list items
def create_dict(lst):
return {
tuple(
item[:list_item_to_compare] + item[list_item_to_compare + 1:]
): item
for item in lst
}
dict1 = create_dict(list1)
dict2 = create_dict(list2)
# Find new items in list2 that are not in list1
for key in set(dict2.keys()) - set(dict1.keys()):
new_list.append(dict2[key])
# Compare items that exist in both lists
for key in set(dict1.keys()) & set(dict2.keys()):
diff = dict2[key][list_item_to_compare] - dict1[key][list_item_to_compare]
if diff != 0:
incrementing_list = dict1[key][:]
incrementing_list[list_item_to_compare] = diff
new_list.append(incrementing_list)
return new_list
def main():
# Start the session
c = connect()
# Create two lists to hold the results table data and any failure table data
result_table_data = []
failure_table_data = []
# Card data
card_result = card(c)
if card_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["CARD STATUS", "FAIL"])
failure_table_data.extend(card_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["CARD STATUS", "PASS"])
# MDA data
mda_result = mda(c)
if mda_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["MDA STATUS", "FAIL"])
failure_table_data.extend(mda_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["MDA STATUS", "PASS"])
# ISIS data
isis_result = isis(c)
if isis_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["ISIS STATUS", "FAIL"])
failure_table_data.extend(isis_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["ISIS STATUS", "PASS"])
# BGP data
bgp_result = bgp(c)
if bgp_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["BGP STATUS", "FAIL"])
failure_table_data.extend(bgp_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["BGP STATUS", "PASS"])
# MPLS data
mpls_result = mpls(c)
if mpls_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["MPLS STATUS", "FAIL"])
failure_table_data.extend(mpls_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["MPLS STATUS", "PASS"])
# svc data
svc_result = svc(c)
if svc_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["SERVICE STATUS", "FAIL"])
failure_table_data.extend(svc_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["SERVICE STATUS", "PASS"])
# ping tests
ping_result = ping(c)
if ping_result: # If the returned list is not empty, it means there was a failure
result_table_data.append(["PING TESTS", "FAIL"])
failure_table_data.extend(ping_result)
else: # If the returned list is empty, it means there was no failure
result_table_data.append(["PING TESTS", "PASS"])
# CPU data
result_table_data.append(["BUSIEST CPU CORE OVER 5MIN", "{}%".format(cpu(c))])
# Memory data
result_table_data.append(["MEMORY UTILIZATION", memory(c)])
# CF3 flash data
result_table_data.append(["CF3 UTILIZATION", cf3(c)])
# Fan data
# We 'extend' the result table list this time because of multiple fan trays
result_table_data.extend(fan(c))
# Collect data on QoS drops
qos_table_title = "Any non-zero network QoS drops"
qos_table = collect_qos_drops(c)
# Collect data on port errors
error_table_title = "Any non-zero port errors"
error_table = collect_port_errors(c)
# Only run this if a user has supplied an argument with the script
if len(sys.argv) != 1:
# Grab the user supplied argument
try:
interval = int(sys.argv[1])
# If the user hasn't provided an integer let's complain
except:
print("Enter an integer argument for the number of seconds you want to collect for")
sys.exit()
else:
print("Collecting second iteration in {} seconds. Please wait".format(interval))
time.sleep(interval)
# Collect another set of data after the interval has elapsed
qos_table2 = collect_qos_drops(c)
error_table2 = collect_port_errors(c)
# Run the previous and new data against the compare_lists function
new_qos_table = compare_lists(qos_table, qos_table2, 4)
new_error_table = compare_lists(error_table, error_table2, 2)
# Update the data we show in the table so delta values are now seen.
qos_table = new_qos_table
error_table = new_error_table
# Update the title of the tables to show the delta values
qos_table_title = "Any incrementing network QoS drops over {} seconds".format(interval)
error_table_title = "Any incrementing port errors over {} seconds".format(interval)
# Disconnect the session
c.disconnect()
# Print the results table
print_table(result_table_data, [(60, "Test"), (19, "Result")], "Self Test Results")
# Print the failure table only if there are any failures
if failure_table_data:
print("\n") # Add an extra space in between tables
print_table(failure_table_data, [(79, "Failures")], "Details Of Failed Tests")
print("\n") # Add an extra space in between tables
print_table(
qos_table,
[(15, "Port"), (10, "Q"), (10, "I/E"), (10, "Profile"), (34, "Drops")],
qos_table_title
)
print("\n") # Add an extra space in between tables
print_table(
error_table,
[(15, "Port"), (10, "I/E"), (54, "Drops")],
error_table_title
)
# Call the main function
if __name__ == '__main__':
main()