SageMaker Regression on Abalone Data
Estimation of the age of an abalone using readily available measurements
1. Introduction
What is Abalone? It is a large marine gastropod mollusk that lives in coastal saltwater and is a member of the Haliotidae family. Abalone is often found around the waters of South Africa, Australia, New Zealand, Japan, and the west coast of North America. The abalone shell is flat and spiral-shaped with several small holes around the edges. It has a single shell on top with a large foot to cling to rocks and lives on algae. Sizes range from 4 to 10 inches. The interior of the shell has an iridescent mother of pearl appearance (Figure 1).
As a highly prized culinary delicacy (Figure 2), it has a rich, flavorful taste that is sweet buttery, and salty. Abalone is often sold live in the shell, but also frozen, or canned. It is among the world's most expensive seafood. For preparation it is often cut into thick steaks and pan-fried. It can also be eaten raw.
2. Data Understanding
There is more information on the Abalone Dataset available at UCI data repository.
The dataset has 9 features:
- Rings (number of)
- sex (M, F, Infant)
- Length (Longest shell measurement in mm)
- Diameter (in mm)
- Height (with meat in shell, in mm)
- Whole Weight (whole abalone, in grams)
- Shucked Weight (weight of meat, in grams)
- Viscera Weight (gut weight after bleeding, in grams)
- Shell Weight (after being dried, in grams)
The number of rings indicates the age of the abalone. The age of abalone is determined by cutting the shell through the cone, staining it, and counting the number of rings through a microscope. Not only is this a boring and time-consuming task but it is also relatively expensive in terms of waste. The remaining measurements, on the other hand, are readily achievable with the correct tools, and with much less effort. The purpose of this model is to estimate the abalone age, specifically the number of rings, based on the other features.
import urllib.request
import pandas as pd
import seaborn as sns
import random
# from IPython.core.debugger import set_trace
import boto3
import sagemaker
from sagemaker.image_uris import retrieve
from time import gmtime, strftime
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer
# import json
# from itertools import islice
# import math
# import struct
!pip install smdebug
from smdebug.trials import create_trial
import matplotlib.pyplot as plt
import re
2.1 Download
The Abalone data is available in the libsvm format. Next, we will download it.
%%time
# Load the dataset
SOURCE_DATA = "abalone_libsvm.txt"
urllib.request.urlretrieve(
"https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression/abalone", SOURCE_DATA
)
!head -10 ./{SOURCE_DATA}
df = pd.read_csv(
SOURCE_DATA,
sep=" ",
encoding="latin1",
names=[
"age",
"sex",
"Length",
"Diameter",
"Height",
"Whole.weight",
"Shucked.weight",
"Viscera.weight",
"Shell.weight",
],
); df
features = [
"sex",
"Length",
"Diameter",
"Height",
"Whole.weight",
"Shucked.weight",
"Viscera.weight",
"Shell.weight",
]
for f in features:
if f == "sex":
df[f] = (df[f].str.split(":", n=1, expand=True)[1])
else:
df[f] = (df[f].str.split(":", n=1, expand=True)[1])
df
df.info()
To understand the data better, we need to convert all the string types to numeric types.
df = df.astype({
'age':'int32',
'sex':'int32',
'Length':'float32',
'Diameter':'float32',
'Height':'float32',
'Whole.weight':'float32',
'Shucked.weight':'float32',
'Viscera.weight':'float32',
'Shell.weight':'float32',
})
df.info()
df.isnull().values.any()
df.isnull().sum().sum()
sns.set(style="ticks", color_codes=True)
# g = sns.pairplot(df)
g = sns.pairplot(df, diag_kind='kde')
The data is now clean with no missing values. We will write this clean data to a file:
CLEAN_DATA = "abalone_clean.txt"
# df = df.sample(n=10, random_state=1)
df.to_csv(CLEAN_DATA, sep=',', header=None, index=False)
def split_data(
FILE_TOTAL,
FILE_TRAIN,
FILE_VALID,
FILE_TESTG,
FRAC_TRAIN,
FRAC_VALID,
FRAC_TESTG,
):
total = [row for row in open(FILE_TOTAL, "r")]
train_file = open(FILE_TRAIN, "w")
valid_file = open(FILE_VALID, "w")
testg_file = open(FILE_TESTG, "w")
num_total = len(total)
num_train = int(FRAC_TRAIN*num_total)
num_valid = int(FRAC_VALID*num_total)
num_testg = int(FRAC_TESTG*num_total)
sizes = [num_train, num_valid, num_testg]
splits = [[], [], []]
rand_total_ind = 0
#set_trace()
for split_ind,size in enumerate(sizes):
for _ in range(size):
if len(total)<1:
print('ERROR. Make sure fractions are decimals.')
break
rand_total_ind = random.randint(0, len(total) - 1)
#print('len(total) - 1',len(total) - 1)
#print('rand_total_ind:',rand_total_ind)
#print('total[rand_total_ind]:',total[rand_total_ind])
splits[split_ind].append(total[rand_total_ind])
total.pop(rand_total_ind)
for row in splits[0]:
train_file.write(row)
print(f'Training data: {len(splits[0])} rows ({len(splits[0])/num_total})')
for row in splits[1]:
valid_file.write(row)
print(f'Validation data: {len(splits[1])} rows ({len(splits[1])/num_total})')
for row in splits[2]:
testg_file.write(row)
print(f'Testing data: {len(splits[2])} rows ({len(splits[2])/num_total})')
train_file.close()
valid_file.close()
testg_file.close()
FILE_TOTAL = "abalone_clean.txt"
FILE_TRAIN = "abalone_train.csv"
FILE_VALID = "abalone_valid.csv"
FILE_TESTG = "abalone_testg.csv"
FRAC_TRAIN = .70
FRAC_VALID = .15
FRAC_TESTG = .15
split_data(
FILE_TOTAL,
FILE_TRAIN,
FILE_VALID,
FILE_TESTG,
FRAC_TRAIN,
FRAC_VALID,
FRAC_TESTG,
)
def write_to_s3(fobj, bucket, key):
return (
boto3.Session(region_name=region)
.resource("s3")
.Bucket(bucket)
.Object(key)
.upload_fileobj(fobj)
)
def upload_to_s3(bucket, prefix, channel, filename):
fobj = open(filename, "rb")
key = f"{prefix}/{channel}/{filename}"
url = f"s3://{bucket}/{key}"
print(f"Writing to {url}")
write_to_s3(fobj, bucket, key)
upload_to_s3(bucket, prefix, "train", FILE_TRAIN)
upload_to_s3(bucket, prefix, "valid", FILE_VALID)
upload_to_s3(bucket, prefix, "testg", FILE_TESTG)
s3_train_data = f"s3://{bucket}/{prefix}/train"
print(f"training files will be taken from: {s3_train_data}")
s3_valid_data = f"s3://{bucket}/{prefix}/valid"
print(f"validation files will be taken from: {s3_valid_data}")
s3_testg_data = f"s3://{bucket}/{prefix}/testg"
print(f"testing files will be taken from: {s3_testg_data}")
s3_output = f"s3://{bucket}/{prefix}/output"
print(f"training artifacts output location: {s3_output}")
# generating the session.s3_input() format for fit() accepted by the sdk
train_data = sagemaker.inputs.TrainingInput(
s3_train_data,
distribution="FullyReplicated",
content_type="text/csv",
s3_data_type="S3Prefix",
record_wrapping=None,
compression=None,
)
valid_data = sagemaker.inputs.TrainingInput(
s3_valid_data,
distribution="FullyReplicated",
content_type="text/csv",
s3_data_type="S3Prefix",
record_wrapping=None,
compression=None,
)
testg_data = sagemaker.inputs.TrainingInput(
s3_testg_data,
distribution="FullyReplicated",
content_type="text/csv",
s3_data_type="S3Prefix",
record_wrapping=None,
compression=None,
)
4.3 Training a Linear Learner model
First, we retrieve the image for the Linear Learner Algorithm according to the region.
Then we create an estimator from the SageMaker Python SDK using the Linear Learner container image and we setup the training parameters and hyperparameters configuration.
# get the linear learner image
image_uri = retrieve("linear-learner", boto3.Session().region_name, version="1")
%%time
from sagemaker.debugger import rule_configs, Rule, DebuggerHookConfig, CollectionConfig
save_interval = 3
sess = sagemaker.Session()
job_name = "abalone-regression-" + strftime("%H-%M-%S", gmtime())
print("Training job: ", job_name)
linear = sagemaker.estimator.Estimator(
image_uri=image_uri,
role=role,
instance_count=1,
instance_type="ml.m4.xlarge",
#instance_type="local",
input_mode="File",
output_path=s3_output,
base_job_name="abalone-regression-sagemaker",
sagemaker_session=sess,
#hyperparameters=hyperparameters,
#train_max_run=100
debugger_hook_config=DebuggerHookConfig(
#s3_output_path="s3://learnableloopai-blog/abalone/output_debugger",
s3_output_path=s3_output,
collection_configs=[
CollectionConfig(
name="metrics",
parameters={
"save_interval": str(save_interval)
}
),
# CollectionConfig(
# name="feature_importance",
# parameters={
# "save_interval": str(save_interval)
# }
# ),
# CollectionConfig(
# name="full_shap",
# parameters={
# "save_interval": str(save_interval)
# }
# ),
# CollectionConfig(
# name="average_shap",
# parameters={
# "save_interval": str(save_interval)
# }
# ),
# CollectionConfig(
# name="mini_batch_size",
# parameters={
# "save_interval": str(save_interval)
# }
# )
]
),
rules=[
Rule.sagemaker(
rule_configs.loss_not_decreasing(),
rule_parameters={
"collection_names": "metrics",
"num_steps": str(save_interval*2),
},
),
# Rule.sagemaker(
# rule_configs.overtraining(),
# rule_parameters={
# "collection_names": "metrics",
# "patience_validation": str(10),
# },
# ),
# Rule.sagemaker(
# rule_configs.overfit(),
# rule_parameters={
# "collection_names": "metrics",
# "patience": str(10),
# },
# )
]
)
linear.set_hyperparameters(
feature_dim=8,
epochs=16,
wd=0.01,
loss="absolute_loss",
predictor_type="regressor",
normalize_data=True,
optimizer="adam",
mini_batch_size=100,
lr_scheduler_step=100,
lr_scheduler_factor=0.99,
lr_scheduler_minimum_lr=0.0001,
learning_rate=0.1,
)
%%time
linear.fit(inputs={
"train": train_data,
"validation": valid_data,
#"test": testg_data
},
wait=False) #cell won't block until done
%%time
linear_predictor = linear.deploy(initial_instance_count=1, instance_type="ml.c4.xlarge")
print(f"\nEndpoint: {linear_predictor.endpoint_name}")
6.1 Test Inference
Now that the trained model is deployed at an endpoint that is up-and-running, we can use this endpoint for inference. To do this, we are going to configure the predictor object to parse contents of type text/csv and deserialize the reply received from the endpoint to json format.
linear_predictor.serializer = CSVSerializer()
linear_predictor.deserializer = JSONDeserializer()
We use the test file containing the records of the data that we kept to test the model prediction. Run the following cell multiple times to perform inference:
%%time
# get a testing sample from the test file
test_data = [row for row in open(FILE_TESTG, "r")]
sample = random.choice(test_data).split(",")
actual_age = sample[0]
payload = sample[1:] # removing actual age from the sample
payload = ",".join(map(str, payload))
# invoke the predicor and analyise the result
result = linear_predictor.predict(payload)
# extract the prediction value
result = round(float(result["predictions"][0]["score"]), 2)
accuracy = str(round(100 - ((abs(float(result) - float(actual_age)) / float(actual_age)) * 100), 2))
print(f"Actual age: {actual_age}\nPrediction: {result}\nAccuracy: {accuracy}")
sagemaker.Session().delete_endpoint(linear_predictor.endpoint_name)
print(f"Deleted {linear_predictor.endpoint_name} successfully!")