input: none / raw data
output: function for creating custom tff ClientData
description:
In this notebook we generate customer path data, and transform it into tensorflow federated ClientData format. You can edit the code to load and clean and transform your own client path data.
Uncomment the following cell to run with the newest version of tff
# can causes a duplicate tensorboard install, leading to errors.
#!pip uninstall --yes tensorboard tb-nightly
#!pip install --quiet --upgrade tensorflow-federated-nightly
#!pip install --quiet --upgrade nest-asyncio
#!pip install --quiet --upgrade tb-nightly # or tensorboard, but not both
from pyarrow import feather
from scipy.special import softmax
seed = 0
SHUFFLE_BUFFER = 100
NUM_EPOCHS = 1
BATCH_SIZE = 32
n_customers = 10000 # number of customers (paths, assuming only one path per customer)
max_path_length = 100 # limit simulation length
Define any immediate derivative operations from the parameters:
np.random.seed(seed)
tf.random.set_seed(seed)
(or alternatively load your own data and turn it into an applicable format)
Try for example: https://cseweb.ucsd.edu/~jmcauley/datasets.html
or https://archive.ics.uci.edu/ml/datasets/Entree+Chicago+Recommendation+Data
from pathlib import Path
import requests, zipfile, io
p = Path().cwd() / "data" / "raw_data"
if not (p / "customer-journey-unil-ch-datasets").exists(): # check if already loaded
r = requests.get(
"http://customer-journey.me/wp-content/uploads/2018/02/customer-journey-unil-ch-datasets.zip"
)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall(p)
filepaths = (
p
/ "customer-journey-unil-ch-datasets"
/ "csv"
/ "configuration6"
/ "excluding-solution"
).glob("*.csv")
def read_csv_list_to_df(filepaths):
max_trace_id = 0
df_list = []
for f in filepaths:
df = pd.read_csv(f)
df.trace_id += max_trace_id # user id count begins from 0 at each file
max_trace_id = df.trace_id.max()
df_list.append(df)
return pd.concat(df_list)
df = read_csv_list_to_df(filepaths)
df
df.nunique()
df.activities.unique()
df.age.unique()
df.income.unique()
df.trace_id = df.trace_id.astype("int")
df.activities = df.activities.astype("category")
df.owner = df.owner.map(lambda x: 1 if "yes" else 0).astype("uint8")
df.employed = df.employed.map(lambda x: 1 if "yes" else 0).astype("uint8")
df.age = df.age.map(
{"0-19yo": 0, "20-39yo": 1, "40-59yo": 2, "60-79yo": 3, "80yo+": 4}
).astype("uint8")
df.income = df.income.map({"low": 0, "middle": 1, "high": 2}).astype("uint8")
df.reset_index(inplace=True)
df.rename({"index": "action_index"}, axis=1, inplace=True)
df
# we need extra category for denoting that client activity has stopped
activity_ended = f"activity_{df.activities.nunique()+1:d}"
df.activities = df.activities.cat.add_categories([activity_ended])
order of events for customer
next and previous event
df.columns
prev_next_df = pd.DataFrame(
columns={
"client_id": int,
"action_index": "uint8",
"prev_activity": "category",
"owner": "uint8",
"employed": "uint8",
"age": "uint8",
"income": "uint8",
"next_activity": "category",
}
)
for client_id, client_data in df.groupby("trace_id"):
client_data.action_index -= client_data.action_index.min()
client_data.action_index = client_data.action_index.astype(int)
# rename columns
buf_df = client_data.rename(
{"trace_id": "client_id", "activities": "prev_activity"}, axis=1
)
# add new column for next activity
buf_df["next_activity"] = client_data.activities.shift(
periods=-1, fill_value=activity_ended
).astype("category")
# add buffer to prev_next_df
prev_next_df = pd.concat((prev_next_df, buf_df), axis=0, ignore_index=True)
prev_next_df
grouped_by_action_count = prev_next_df.groupby("client_id").max()[["action_index"]]
clients_to_be_dropped = (
grouped_by_action_count[grouped_by_action_count.action_index > 10].dropna().index
)
# clients_to_be_dropped.to_numpy()
print(clients_to_be_dropped)
outliers = (
prev_next_df.apply(
lambda row: 1 if row.client_id in clients_to_be_dropped else np.nan, axis=1
)
.dropna()
.index
)
print(outliers)
prev_next_df.drop(outliers, inplace=True)
prev_next_df.dropna(inplace=True)
prev_next_df.nunique()
Drop uninformative columns that only contain one value
prev_next_df.drop(["owner", "employed"], axis=1, inplace=True)
prev_next_df["action_count"] = prev_next_df.action_index + 1
fig, axs = plt.subplots(1, 3, figsize=(15, 4))
for ax, column in zip(axs, ["action_count", "age", "income"]):
prev_next_df.groupby("client_id").max()[column].hist(ax=ax, density=True)
ax.set_ylabel("%")
ax.set_xlabel(column)
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.grid(False)
[ax.axhline(tick, color="white", linewidth=2) for tick in ax.get_yticks()]
plt.savefig("results/client_histograms.png")
Ok, so most clients have from 3 to 5 actions (including end of journey). considering background information, all groups are well represented.
# convert state feature into one hot
onehot = pd.get_dummies(prev_next_df.prev_activity, prefix="prev")
prev_next_df[onehot.columns] = onehot
prev_next_df.drop("prev_activity", axis=1, inplace=True)
# convert label categories into numerical format
# (this is because for the moment TFF does not support multi-output models)
prev_next_df[["next_activity"]] = prev_next_df[["next_activity"]].apply(
lambda x: x.cat.codes
)
prev_next_df = prev_next_df.astype(int)
prev_next_df.dtypes
prev_next_df
# We need to convert the data into untidy nested format for TFF
# so that x is a vector and y is a scalar
cxy_df = pd.DataFrame(columns=["client_id", "x", "y"])
cxy_df.client_id = prev_next_df.client_id
cxy_df.x = prev_next_df[
prev_next_df.drop(["client_id", "next_activity"], axis=1).columns
].apply(lambda row: row.to_numpy(), axis=1)
cxy_df.y = prev_next_df.next_activity
cxy_df
feather.write_feather(cxy_df, "data/preprocessed_data/data.f")
Create function to convert df into tff ClientData
Following this discussion: https://stackoverflow.com/questions/58965488/how-to-create-federated-dataset-from-a-csv-file
Test (well, at least it should not crash)
tff_data = create_tff_client_data_from_df(cxy_df)
tff_data.create_tf_dataset_for_client(tff_data.client_ids[0])
train_data, test_data = tff.simulation.datasets.ClientData.train_test_client_split(
tff_data, 500
)