input: data + tff conversion function from CustomClientData
output: TFF model for predicting customer paths
description:
Simulating federated learning on predicting customer paths.
# uncomment this cell to get the newest version of tff
# tensorflow_federated_nightly also bring in tf_nightly, which
# can causes a duplicate tensorboard install, leading to errors.
#!pip uninstall --yes tensorboard tb-nightly
#!pip install --quiet --upgrade tensorflow-federated-nightly
#!pip install --quiet --upgrade nest-asyncio
#!pip install --quiet --upgrade tb-nightly # or tensorboard, but not both
import collections
import matplotlib.pyplot as plt
import numpy as np
import nest_asyncio
nest_asyncio.apply()
from pathlib import Path
from pyarrow import feather
import pandas as pd
import tensorflow as tf
import tensorflow_federated as tff
from ml_federated_customer_journey.customclientdata import (
create_tff_client_data_from_df,
)
train_test_client_split = tff.simulation.datasets.ClientData.train_test_client_split
You can also view the results using tensorboard:
%load_ext tensorboard
tff.federated_computation(lambda: "Hello, World!")()
seed = 0
data_filepath_parts = ("data", "preprocessed_data", "data.f") # for pathlib
test_split = 0.2
NUM_EPOCHS = 20
BATCH_SIZE = 32
SHUFFLE_BUFFER = 100
FEDERATED_UPDATES = 50
Make immediate derivations from the parameters:
np.random.seed(seed)
tf.random.set_seed(seed)
data_filepath = Path.cwd() / Path(*data_filepath_parts)
df = feather.read_feather(data_filepath)
df.head()
How many data points
df.shape
How many features
NUM_FEATURES = df.x[0].shape[0]
NUM_FEATURES
NUNIQUE_LABELS = df.y.nunique()
Convert into tff ClientData (training + testing datasets):
client_data = create_tff_client_data_from_df(df, sample_size=1)
train_data, test_data = train_test_client_split(
client_data, int(df.client_id.nunique() * test_split)
)
Test and train dataset size (number of clients in each set)
len(train_data.client_ids)
len(test_data.client_ids)
ELEMENT_SPEC = train_data.element_type_structure
ELEMENT_SPEC
def create_keras_model():
"""
Return new keras model instance
"""
visible = tf.keras.layers.Input(shape=(NUM_FEATURES,))
hidden1 = tf.keras.layers.Dense(
48,
activation=None,
name="l1relu",
)(visible)
output = tf.keras.layers.Dense(
NUNIQUE_LABELS + 1,
activation="softmax",
name="l3softmax",
)(hidden1)
model = tf.keras.Model(inputs=visible, outputs=output)
return model
def model_fn():
"""
Create tff model (keras model + data format + loss & metrics)
"""
# We _must_ create a new model here, and _not_ capture it from an external
# scope. TFF will call this within different graph contexts.
keras_model = create_keras_model()
return tff.learning.from_keras_model(
keras_model,
input_spec=collections.OrderedDict(
x=ELEMENT_SPEC["x"],
y=ELEMENT_SPEC["y"],
),
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
)
Create federated averaging process:
iterative_process = tff.learning.build_federated_averaging_process(
model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.RMSprop(learning_rate=0.02),
server_optimizer_fn=lambda: tf.keras.optimizers.RMSprop(learning_rate=1.0),
)
Initialize federated averaging process:
state = iterative_process.initialize()
Create federated evaluation process (validation):
evaluation = tff.learning.build_federated_evaluation(model_fn)
Function for loading client data in batches:
def batch_client_data(client_data, batch_size=BATCH_SIZE):
batch = [
client_data.create_tf_dataset_for_client(client_data.client_ids[idx])
for idx in np.random.choice(
np.arange(len(client_data.client_ids)), size=BATCH_SIZE
)
]
return batch
%%time
# first iteration
state, train_metrics = iterative_process.next(state, batch_client_data(train_data))
test_metrics = evaluation(state.model, batch_client_data(test_data))
# see progress
print("federated_update {}, loss={:.2f}, accuracy={:.2f}".format(0, train_metrics['train']['loss'], train_metrics['train']['sparse_categorical_accuracy']))
# save results
metrics_df = pd.DataFrame(
{
"federated_update": [0],
"train_loss": [train_metrics["train"]["loss"]],
"train_accuracy": [train_metrics["train"]["sparse_categorical_accuracy"]],
"train_size": [train_metrics["stat"]["num_examples"]],
"test_loss": [test_metrics["loss"]],
"test_accuracy": [test_metrics["sparse_categorical_accuracy"]],
}
) # , 'test_loss': float, 'test_size':int})
# run federated update cycles
for i in range(FEDERATED_UPDATES):
# update, get train metrics
state, train_metrics = iterative_process.next(state, batch_client_data(train_data))
# evaluate
test_metrics = evaluation(state.model, batch_client_data(test_data))
# save results
metrics_df = pd.concat(
(
metrics_df,
pd.DataFrame(
{
"federated_update": [i + 1],
"train_loss": [train_metrics["train"]["loss"]],
"train_accuracy": [
train_metrics["train"]["sparse_categorical_accuracy"]
],
"train_size": [train_metrics["stat"]["num_examples"]],
"test_loss": [test_metrics["loss"]],
"test_accuracy": [test_metrics["sparse_categorical_accuracy"]],
}
),
),
axis=0,
)
# see progress
print("federated_update {}, loss={:.2f}, accuracy={:.2f}".format(i + 1, train_metrics['train']['loss'], train_metrics['train']['sparse_categorical_accuracy']))
metrics_df.set_index("federated_update", drop=True, inplace=True)
%%time
# create train and test data for centralized setup
# (use the same train and test data, although it is passed to the models in different ways)
train_data_centralized = (
train_data.create_tf_dataset_from_all_clients()
.map(lambda x: (x["x"][0], x["y"][0]))
.shuffle(SHUFFLE_BUFFER)
.batch(BATCH_SIZE)
.prefetch(tf.data.AUTOTUNE)
)
test_data_centralized = (
test_data.create_tf_dataset_from_all_clients()
.map(lambda x: (x["x"][0], x["y"][0]))
.shuffle(SHUFFLE_BUFFER)
.batch(BATCH_SIZE)
.prefetch(tf.data.AUTOTUNE)
)
# create centralized model
centralized_model = create_keras_model()
centralized_model.compile(
loss="sparse_categorical_crossentropy",
metrics=["sparse_categorical_accuracy"],
optimizer="RMSprop",
)
# fit and evaluate
history = centralized_model.fit(
train_data_centralized,
validation_data=test_data_centralized,
epochs=NUM_EPOCHS,
batch_size=BATCH_SIZE,
)
# view results
pd.DataFrame(history.history).plot()
Plot results:
fig, ax = plt.subplots(1, constrained_layout=True)
# federated results
metrics_df.plot(ax=ax, y=["train_loss", "test_loss"], color=["royalblue", "red"])
# centralized baseline
ax.axhline(
history.history["val_loss"][-1],
linestyle="--",
color="black",
label="baseline (centralized computation)",
)
ax.set_ylabel("loss (sparse_categorical_crossentropy)")
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.legend()
ax.set_ylim(ymin=0)
fig, ax = plt.subplots(1, constrained_layout=True)
# plot federated learning results
metrics_df.plot(
ax=ax, y=["train_accuracy", "test_accuracy"], color=["royalblue", "red"]
)
# centralized baseline
ax.axhline(
history.history["val_sparse_categorical_accuracy"][-1],
linestyle="--",
color="black",
label="baseline (centralized computation)",
)
# random guessing baseline
ax.axhline(
1 / NUNIQUE_LABELS, linestyle="-.", color="grey", label="baseline (random quessing)"
)
ax.legend()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.set_ylim(0, 1)
plt.savefig(Path.cwd() / "results" / "accuracy.png")
With the current setup, we can reach test accuracy of 50% and slightly above (depending on the run). However, at times the model falls back to the level of random quessing.
- We can do federated learning of customer paths even with quite little data
- It's doing significantly better than random guessing
- However, cetralized model stil outperforms the federated computation. More work is required for optimization.