Skip to content

Jupyter Notebooks Samples

Using Jupyter Notebooks

You can use different MindSphere APIs such as Data Exchange, Model Management, and IoT Time Series from a Jupyter Notebook, a model development workspace in PrL. You will need valid access to the MindSphere APIs you want to use in PrL.

Jupyter Notebook can be accessed from the Manage Environments page and the information provided here will aide you in launching a Jupyter Notebook.

Note

  • PrL environments do not retain the Jupyter Notebooks you create, so please save them to your local machine before you stop the environment. Clusters, on the other hand, do save new work and modifications applied to a Notebook.
  • The dataset feature is going to be deprecated from all the platforms very soon. Currently, it is not available in Azure Private Cloud environment. For more information, refer to datasets

Checking your Configuration

When you begin working with your scripts, your environment will require a certain set of libraries.

Some libraries required to run the minimal services within the cluster come preinstalled. We have included links to the libraries in the Open Source Software topic in this Help.

Run these commands to examine installed packages from Jupyter:

%pip freeze 
We recommend that you install the required packages at the beginning of the notebook and execute it each time the cluster is started. The custom actions you take are not stored between stops and starts.

Installing Your Own Python Libraries

Please use the following whenever you need to install your libraries:

#upgrade pip and install required libraries
%pip install --upgrade pip 
%pip install requests 
%pip install pandas 
%pip install pyarrow 
Mindsphere API call from jupyter:

import os
import requests
import json
dlpath = '/datalake/v3/generateAccessToken'
gw = os.environ['GATEWAY_ENDPOINT'] + 'gateway/'
# increment_value = 1
headers = {
'Content-Type': 'application/json'
}   
payload="{ \"subtenantId\":\"\" } "
dl_url = gw + dlpath
response = requests.post(dl_url, data=payload, headers=headers)
#print(response.status_code)
dl = json.loads(response.text)
os.environ["AWS_ACCESS_KEY_ID"] = dl['credentials']['accessKeyId']
os.environ["AWS_SECRET_ACCESS_KEY"] = dl['credentials']['secretAccessKey']
os.environ["AWS_SESSION_TOKEN"] = dl['credentials']['sessionToken']    

Not all external library repositories are allowed. If you require additional external sources for your project, please contact your organization's Predictive Learning administrator.

When an environment stops, all libraries and modifications performed on it are lost, which requires users to run the installation paragraphs each time Jupyter imports the note. Running the installation paragraphs insures that your machine is up-to-date.

Uploading data in Integrated Data Lake:

# upgrade pip and install required libraries
%pip install --upgrade pip 
%pip install requests --force-reinstall --upgrade 
%pip install pandas --force-reinstall --upgrade 
%pip install pyarrow --force-reinstall --upgrade 
import datetime
import requests
import os
import json
import re
HEADERS = {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Content-Type': 'application/json'
}
GATEWAY = os.environ['GATEWAY_ENDPOINT'] + '/gateway/'
OUTPUT_FOLDER = 'OUTPUT_FOLDER'
#Get a signed URL for down/upload of data. The function
#attempts for 5 times to obtain the URL and then raises an exception.
def getSignedURL(fileName, folder, attempt=0, upload=True): 
if upload:
    IDLpath = 'datalake/v3/generateUploadObjectUrls'
else:
    IDLpath = 'datalake/v3/generateDownloadObjectUrls'
IDLFilePath = '/%s/%s' % (folder, fileName)
url = GATEWAY + IDLpath
body='{"paths": [{"path": "%s"}]}' % IDLFilePath
response = requests.post(url, headers=HEADERS, data=body)
try:
    return json.loads(response.text)['objectUrls'][0]['signedUrl']
except KeyError:
    if attempt < 5:
        attempt += 1
        return getSignedURL(fileName, attempt, upload)
    else:
        raise Exception('Failed to get a signed URL')   
!echo "This is a test!" >> test.txt
fileName='test.txt'
signedURL = getSignedURL(fileName, OUTPUT_FOLDER)
requests.put(signedURL, headers=HEADERS, data=fileName)
key_id = os.environ["CLIENT_ID"]
key_secret = os.environ["CLIENT_SECRET"]
tenant = os.environ["tenant"]
subTenant = os.environ.get("subtenant")
authServer = os.environ["TOKEN_ENDPOINT"]
coreGateway= os.environ["CORE_GATEWAY"]

class JsonWebToken():
    # Handles getting JWT from environment controller using HMAC.
    # Refreshes token whenever it gets expired.

    def __init__(self):
        self.token = self.__refreshToken()

    def __refreshToken(self):
        #Gets new JWT token using HMAC
        sign, timestamp = self.__getHmacSignature()
        headers = {
            'Authorization': 'HMAC-SHA256 Credential=' + key_id + '&SignedHeaders=host;x-msg-timestamp&Signature=' + sign,
            'x-msg-timestamp': timestamp
        }
        response = requests.get(authServer, headers=headers, verify=False)
        print(response.status_code)
        return json.loads(response.text)["access_token"]

    def __getHmacSignature(self):
        # Creates HMAC Signature for getting JWT
        payload, timestamp = self.__getPayload()
        sign = hmac.new(key_secret.encode(), payload.encode(), hashlib.sha256).hexdigest()
        return sign, timestamp

    def __isExpired(self):
        # Checks whether existing JWT is expired or not.
        decoded = jwt.decode(self.token, verify=False, options={'verify_signature': False})
        current = int(datetime.now().timestamp()) + 10
        if decoded['exp'] <= current:
            return True
        return False

    def getToken(self):
        # Gets token if expired. This method should be used by users of this class.
        if self.__isExpired():
        print("token is expired. getting new one...")
        self.__refreshToken()
        return self.token

    def __getPayload(self):
        timestamp = datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')
        payload = "GET\n" + urlparse(authServer).netloc + "\n" + timestamp + "\n" + tenant
        print(payload)
        if subTenant != None:
            payload = payload + "\n" + subTenant

        return payload, timestamp

    jwtobj = JsonWebToken()

    # Generate Upload URL

    # create random string
    random_string = ''.join(random.choices(string.ascii_lowercase, k = 10))
    print(random_string)
    filename = 'readme.json'

    path = random_string + "/" + filename
    body = json.dumps({
    "paths": [
        {
            "path": path
        }
    ],
    "subtenantId": "",
    "isMicrosoftRoutingEnabled": False
    })

    with open(filename, 'w') as f:
        random_content = "{ 'random_string': '"+ ''.join(random.choices(string.ascii_lowercase, k = 50)) + "' }"
        f.write(random_content)

    upload_url = coreGateway + "/api/datalake/v3/generateUploadObjectUrls"

    headers = {
        'Content-Type': 'application/json',
        'Accept': 'application/json',
        'Authorization': 'Bearer ' + jwtobj.getToken()
    }

    response = requests.post(upload_url, data=body, headers=headers)

    print("response code: " + str(response.status_code))
    print(response.content)

    signedUploadUrl = json.loads(response.content)['objectUrls'][0]['signedUrl']
    print("Signed Upload URL: " + signedUploadUrl)

Reading data from IoT

Run the following commands to read data from IoT sources:

%pip install --upgrade pip 
%pip install requests --force-reinstall --upgrade 
%pip install awscli --force-reinstall --upgrade 
%pip install pandas sklearn seaborn matplotlib joblib 
import json
import io
import os
import datetime
import time
from dateutil import parser
import random
from threading import Thread
import requests
import pandas as pd
import tempfile
def read_iot(entity_id = "<<iot_entity_id_GUID>>",
           aspect_name = "<<aspect_name>>",
           tenant = "tenantname",
           max_results = 2000, #max is 2000
           from_dt = "2020-06-01T13:09:37.029Z", 
           to_dt = "2020-07-01T08:02:27.962Z",
           variable = "pressure",
           sort = "asc"):
    if variable is not None:
       url = "?from=" + from_dt + "&to=" + to_dt + "&sort=" + sort + "&limit=" + str(max_results) + "&select=" + variable
   else:
       url = "?from=" + from_dt + "&to=" + to_dt + "&sort=" + sort + "&limit=" + str(max_results)
   #this is the IoT Timeseries API base URL
   TSpath = 'iottimeseries/v3/timeseries'
   #this is the Predictive Gateway URL that handles authentication for your API calls
   gw = os.environ['GATEWAY_ENDPOINT'] + '/gateway/'
   headers = {
       'Content-Type': 'application/json'
   }
   iot_url = gw + TSpath + "/" + entity_id + "/" + aspect_name + url
   response = requests.get(iot_url, headers=headers)
   return response

import pandas as pd
import tempfile
start = datetime.datetime.utcnow() - datetime.timedelta(days=70)
end = start + datetime.timedelta(days=30)
response = read_iot(entity_id = "<<iot_entity_id_GUID>>",
           aspect_name = "<<aspect_name>>",
           tenant = "tenantname",
           max_results = 2000, #max is 2000
           from_dt = start.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z',
           to_dt = end.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z',
           sort = "asc",
           variable = None)
if response.status_code == 200:   
   f = tempfile.TemporaryFile()
   f.write(response.content)
   f.seek(0)
   #we read the IoT data into a Pandas DataFrame
   data = pd.read_json(f.read())
   f = tempfile.TemporaryFile()
   f.write(response.content)
   f.seek(0)
   print(data.shape)
else:
   print(response.status_code)
   print(response.content)

Using Inputs from Job Execution

All job executions require parameters, and you can use any of the following:

  • IoT (Internet of Things)
  • IDL (Integrated Data Lake)

Using the first three from the list above ensures Job Manager copies the input to a temporary location available to your code. In Jupyter notebooks, there are three variables available:

inputFolder
outputFolder
datasetName
realInputFolder
You can employ the Jupyter magic command %store, as follows:
%store -r inputFolder #-r specifies a read %store -r outputFolder %store -r datasetName
The datasetName variable will only contain a value when the IoT input type is being used. The inputFolder will be prefilled by the job execution engine with a value pointing to the temporary location that holds the input files or data. That will be an S3 path on AWS or a blob storage on Azure. It does not contain the associated prefix like s3://. You can then use the outputFolder variable in a Jupyter notebook as in:

!aws s3 cp ./mylocalfile.txt s3://$outputFolder+'/myfile.txt'
!pip install azure_cli
%store -r inputFolder
%store -r outputFolder
%store -r realInputFolder
%store -r realOutputFolder

input = inputFolder + '/' + realInputFolder
output = outputFolder + '/' + realOutputFolder
!az storage blob download --account-name prlstorageanls -c jobmanager -f data.csv -n $input'/data.csv'
!az storage blob upload --account-name prlstorageanls -c jobmanager -f data.csv  -n $output'/output.csv' 
inputPath='s3://' + os.environ.get('inputFolder') + '/'
outputPath='s3://' + os.environ.get('outputFolder') + '/'   
endpoint=os.environ.get('AWS_ENDPOINT_URL') + '/'

!aws s3 cp $inputPath . --recursive --endpoint-url $endpoint

!aws s3 cp ./mdp-logs.csv $outputPath --recursive --endpoint-url $endpoint

For Jupyter notebooks we do not provide a built-in library for loading the dataset, however, there are various ways to achieve this by using Python. If you encounter any issues in loading your dataset, feel free to contact us for guidance.


Note

Both inputFolder and outputFolder variables are remote storage paths, not local folders; therefore most of the commonly-used file functions do not work against it; however, CLI and shell commands will use them as long as they use the correct prefix. For Python or Scala libraries that can work with remote storage services, we recommend checking the documentation for each respective library; for example, the pandas Python library is able to save and read files from AWS S3 storage.


Using exported IoT Datasets

We prepare the environments that you start with our prlutils library that handles reading of parquet dataset files into a Pandas Dataframe. Please use the bellow snippet to show the available datasets and then load a single dataset.

from prlutils import datasetutils
import boto3
import os
import json
import s3fs

du = datasetutils.DatasetUtils()
datasetnames = du.get_dataset_names()
print('Dataset names: ' + str(datasetnames))
#ds.shape
You will get a list of datasets like in:
['test_asset_2',
 'Last30DaysAsset2Filtered',
 'Last30DaysAsset2']
You can load the dataset you want with the:
ds = du.load_dataset_by_name(datasetnames[0])
ds
and check its data immediately: DatasetLoaded Then, you can use your dataset just like with any other Pandas Dataframe:

filteredDataset = ds[ds['temp']>60]
print("Number of entries AFTER filtering: "+ str(filteredDataset.shape))
try:
    path = "s3://"+outputFolder
    filteredDataset.write.csv(path)
    filteredDataset.write.csv('s3://prl-storage-216273414971/prlteam/data/')
except:
    print('Output folder is None.')
else:
    print('Filtered dataset written to outputFolder' + outputFolder)

If you encounter issues with using our libray, make sure that the libraries used by our Datasets utility does not conflict with your previously installed Python libraries. Our utility library uses the following:

%pip install pyarrow fastparquet fss pec s3fs boto3 awscli

More About Jupyter Notebook

Jupyter is a powerful tool that allows multiple customizations and languages.These resources can help you explore further:

https://jupyter.org/documentation

https://jupyter-notebook.readthedocs.io/en/stable/

https://ipython.readthedocs.io/en/stable/interactive/magics.html


Last update: March 7, 2024

Except where otherwise noted, content on this site is licensed under the Development License Agreement.