NNExperiments/src/nn/network.nim

267 lines
11 KiB
Nim

# Copyright 2022 Mattia Giambirtone & All Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import util/matrix
import std/strformat
import std/random
import std/math
import std/sequtils
randomize()
type
NeuralNetwork* = ref object
## A generic feed-forward
## neural network
layers*: seq[Layer]
loss: Loss # The cost function along with its derivative
# The network's learn rate determines
# the amount of progress that is made
# at each step when performing gradient
# descent
learnRate*: float
# The momentum serves to speed up convergence
# time when performing SGD: the higher the output
# of the derivative of the cost function, the more
# we nudge our inputs for our next epoch
momentum*: float
Loss* = ref object
## A loss function and its derivative
function: proc (a, b: Matrix[float]): float
derivative: proc (x, y: Matrix[float]): Matrix[float] {.noSideEffect.}
Activation* = ref object
## An activation function
function: proc (input: Matrix[float]): Matrix[float] {.noSideEffect.}
derivative: proc (x: Matrix[float]): Matrix[float] {.noSideEffect.}
Layer* = ref object
## A generic neural network
## layer
inputSize*: int # The number of inputs we process
outputSize*: int # The number of outputs we produce
weights*: Matrix[float] # The weights for each connection (2D)
biases*: Matrix[float] # The biases for each neuron (1D)
gradients: tuple[weights, biases: Matrix[float]] # Gradient coefficients for weights and biases
activation: Activation # The layer's activation function
proc `$`*(self: Layer): string =
## Returns a string representation
## of the layer
result = &"Layer(inputs={self.inputSize}, outputs={self.outputSize})"
proc `$`*(self: NeuralNetwork): string =
## Returns a string representation
## of the network
result = &"NeuralNetwork(learnRate={self.learnRate}, layers={self.layers})"
proc newLoss*(function: proc (a, b: Matrix[float]): float, derivative: proc (x, y: Matrix[float]): Matrix[float] {.noSideEffect.}): Loss =
## Creates a new Loss object
new(result)
result.function = function
result.derivative = derivative
proc newActivation*(function: proc (input: Matrix[float]): Matrix[float] {.noSideEffect.}, derivative: proc (x: Matrix[float]): Matrix[float] {.noSideEffect.}): Activation =
## Creates a new Activation object
new(result)
result.function = function
result.derivative = derivative
proc newDenseLayer*(inputSize: int, outputSize: int, activationFunc: Activation): Layer =
## Creates a new dense layer with inputSize input
## parameters and outputSize outgoing outputs and
## using the chosen activation function.
new(result)
result.inputSize = inputSize
result.outputSize = outputSize
result.activation = activationFunc
proc newNeuralNetwork*(topology: seq[Layer], lossFunc: Loss, learnRate: float, momentum: float,
weightRange, biasRange: tuple[start, stop: float]): NeuralNetwork =
## Initializes a new neural network with
## the given topology and hyperparameters.
## Weights and biases are initialized with
## random values in the chosen range
new(result)
result.layers = topology
for layer in result.layers:
var biases = newSeqOfCap[float](layer.outputSize)
var biasGradients = newSeqOfCap[float](layer.outputSize)
for _ in 0..<layer.outputSize:
biases.add(rand(biasRange.start..biasRange.stop))
biasGradients.add(0.0)
var weights = newSeqOfCap[float](layer.inputSize * layer.outputSize)
var weightGradients = newSeqOfCap[float](layer.inputSize * layer.outputSize)
for _ in 0..<layer.outputSize:
for _ in 0..<layer.inputSize:
weights.add(rand(weightRange.start..weightRange.stop))
weightGradients.add(0.0)
layer.biases = newMatrix[float](biases)
# Why swap outputSize and inputSize in the matrix shape? The reason is simple: this
# spares us from having to transpose it later when we perform the dot product (I get
# that it's a constant time operation, but if we can avoid it altogether, that's even
# better!)
layer.weights = newMatrixFromSeq[float](weights, (layer.outputSize, layer.inputSize))
layer.gradients = (weights: newMatrix[float](weightGradients),
biases: newMatrixFromSeq[float](biasGradients, (layer.outputSize, layer.inputSize)))
result.loss = lossFunc
result.learnRate = learnRate
result.momentum = momentum
proc backprop(self: NeuralNetwork, x, y: Matrix[float]): tuple[weights, biases: seq[Matrix[float]]] =
## Performs a single backpropagation step and returns the
## gradient of the cost function for the weights and biases
## of the network according to the given training sample
var
# The deltas for the weights and biases of
# each layer in the network
deltaW: seq[Matrix[float]] = @[]
deltaB: seq[Matrix[float]] = @[]
# Activations of each layer
activation = x
activations: seq[Matrix[float]] = @[x]
# Unactivated outputs of each layer
unactivated: seq[Matrix[float]] = @[]
# Forward pass through the network
for layer in self.layers:
deltaW.add(zeros[float](layer.weights.shape))
deltaB.add(zeros[float](layer.biases.shape))
unactivated.add(layer.weights.dot(activation) + layer.biases)
activations.add(layer.activation.function(unactivated[^1]))
# Backwards pass
# The negative gradient of each layer for this sample: this is a
# partial derivative, so the multiplication here is just an
# application of the chain rule!
var diff: Matrix[float] = self.loss.derivative(activations[^1], y) * self.layers[^1].activation.derivative(unactivated[^1])
deltaB[^1].replace(diff)
deltaW[^1].replace(activations[^2].transpose())
for l in 2..self.layers.high():
# The ^ makes our indeces start from the back instead of
# from the front, so we're really iterating over our layers
# backwards!
diff = self.layers[^l].weights.transpose.dot(diff) * self.layers[^l].activation.derivative(unactivated[^l])
deltaB[^l].replace(diff)
deltaW[^l].replace(diff.dot(activations[^(l - 1)].transpose()))
return (deltaW, deltaB)
proc miniBatch(self: NeuralNetwork, data: seq[tuple[x, y: Matrix[float]]]) =
## Performs a single mini-batch step in stochastic gradient
## descent and updates the network's weights and biases
## accordingly
var gradient: tuple[weights, biases: seq[Matrix[float]]]
# New weights and biases
var
weights: seq[Matrix[float]] = @[]
biases: seq[Matrix[float]] = @[]
for layer in self.layers:
weights.add(zeros[float](layer.weights.shape))
biases.add(zeros[float](layer.biases.shape))
for dataPoint in data:
gradient = self.backprop(dataPoint.x, dataPoint.y)
for i, (currentBiases, newBiases) in zip(biases, gradient.biases):
biases[i] = currentBiases + newBiases
for i, (currentWeights, newWeights) in zip(weights, gradient.weights):
weights[i] = currentWeights + newWeights
# The backpropagation algorithm lets us find the gradient of steepest ascent
# in our cost function, so we subtract it from the current weights and biases
# to descend it the fastest (it's not actually *the* fastest because true gradient
# descent would perform this over all training samples, but it's a pretty good
# approximation nonetheless, it converges quickly and it actually helps prevent
# overfitting by not letting the network train over the same data over and over
# again)
for (layer, newBiases) in zip(self.layers, biases):
layer.biases = layer.biases - (self.learnRate / data.len().float) * newBiases
for (layer, newWeights) in zip(self.layers, weights):
layer.weights = layer.weights - (self.learnRate / data.len().float) * newWeights
proc train*(self: NeuralNetwork, epochs: int, batchSize: int, data: var seq[tuple[x, y: Matrix[float]]]) =
## Train the network on the given data for the speficied
## number of epochs using the given batch size by applying
## stochastic gradient descent
var batches: seq[seq[tuple[x, y: Matrix[float]]]]
for epoch in 0..<epochs:
# We shuffle the data so that different epochs work
# on different data points. This will hopefully help
# the network generalize its training onto unseen data
shuffle(data)
batches = @[]
var i = 0
while i < data.len():
batches.add(@[])
for j in 0..<batchSize:
batches[^1].add(data[i])
i += batchSize
for batch in batches:
self.miniBatch(batch)
## Utility functions
# Mean squared error
proc mse(a, b: Matrix[float]): float =
result = (b - a).apply(proc (x: float): float = pow(x, 2), axis = -1).sum() / len(a).float
# Derivative of MSE
func dxMSE(x, y: Matrix[float]): Matrix[float] = 2.0 * (x - y)
# A bunch of vectorized activation functions
func sigmoid(input: Matrix[float]): Matrix[float] =
result = input.apply(proc (x: float): float = 1 / (1 + exp(-x)) , axis = -1)
func sigmoidDerivative(input: Matrix[float]): Matrix[float] = sigmoid(input) * (1.0 - sigmoid(input))
func softmax(input: Matrix[float]): Matrix[float] =
var input = input - input.max()
result = input.apply(math.exp, axis = -1) / input.apply(math.exp, axis = -1).sum()
func softmaxDerivative(input: Matrix[float]): Matrix[float] =
var input = input.reshape(input.shape.cols, 1)
result = input.diagflat() - input.dot(input.transpose())
# TODO: Add derivatives for this stuff
func step(input: Matrix[float]): Matrix[float] {.used.} = input.apply(proc (x: float): float = (if x < 0.0: 0.0 else: x), axis = -1)
func silu(input: Matrix[float]): Matrix[float] {.used.} = input.apply(proc (x: float): float = 1 / (1 + exp(-x)), axis= -1)
func relu(input: Matrix[float]): Matrix[float] {.used.} = input.apply(proc (x: float): float = max(0.0, x), axis = -1)
func htan(input: Matrix[float]): Matrix[float] {.used.} =
let f = proc (x: float): float =
let temp = exp(2 * x)
result = (temp - 1) / (temp + 1)
input.apply(f, axis = -1)
{.push.}
{.hints: off.} # So nim doesn't complain about the naming
var Sigmoid* = newActivation(sigmoid, sigmoidDerivative)
var Softmax* = newActivation(softmax, softmaxDerivative)
var MSE* = newLoss(mse, dxMSE)
{.pop.}