In this tutorial we solve the optimal control problem
$$\min J(y, u) = \frac{1}{2} \int_{\Omega} |v - v_d|^2 dx + \frac{\alpha}{2} \int_{\Omega} |u|^2 dx$$ s.t. $$\begin{cases} - \nu \Delta v + v \cdot \nabla v + \nabla p = f + u & \text{in } \Omega\\ \text{div} v = 0 & \text{in } \Omega\\ v = 0 & \text{on } \partial\Omega \end{cases}$$
where $$\begin{align*} & \Omega & \text{unit square}\\ & u \in [L^2(\Omega)]^2 & \text{control variable}\\ & v \in [H^1_0(\Omega)]^2 & \text{state velocity variable}\\ & p \in L^2(\Omega) & \text{state pressure variable}\\ & \alpha > 0 & \text{penalization parameter}\\ & v_d & \text{desired state}\\ & \nu & \text{kinematic viscosity}\\ & f & \text{forcing term} \end{align*}$$ using an adjoint formulation solved by a one shot approach
import dolfinx.fem
import dolfinx.fem.petsc
import dolfinx.io
import dolfinx.mesh
import mpi4py.MPI
import numpy as np
import numpy.typing
import petsc4py.PETSc
import sympy
import ufl
import viskex
import multiphenicsx.fem
import multiphenicsx.fem.petsc
mesh = dolfinx.mesh.create_unit_square(mpi4py.MPI.COMM_WORLD, 32, 32)
# Create connectivities required by the rest of the code
mesh.topology.create_connectivity(mesh.topology.dim - 1, mesh.topology.dim)
def bottom(x: np.typing.NDArray[np.float64]) -> np.typing.NDArray[np.bool_]:
"""Condition that defines the bottom boundary."""
return abs(x[1] - 0.) < np.finfo(float).eps # type: ignore[no-any-return]
def left(x: np.typing.NDArray[np.float64]) -> np.typing.NDArray[np.bool_]:
"""Condition that defines the left boundary."""
return abs(x[0] - 0.) < np.finfo(float).eps # type: ignore[no-any-return]
def top(x: np.typing.NDArray[np.float64]) -> np.typing.NDArray[np.bool_]:
"""Condition that defines the top boundary."""
return abs(x[1] - 1.) < np.finfo(float).eps # type: ignore[no-any-return]
def right(x: np.typing.NDArray[np.float64]) -> np.typing.NDArray[np.bool_]:
"""Condition that defines the right boundary."""
return abs(x[0] - 1.) < np.finfo(float).eps # type: ignore[no-any-return]
boundaries_entities = dict()
boundaries_values = dict()
for (boundary, boundary_id) in zip((bottom, left, top, right), (1, 2, 3, 4)):
boundaries_entities[boundary_id] = dolfinx.mesh.locate_entities_boundary(
mesh, mesh.topology.dim - 1, boundary)
boundaries_values[boundary_id] = np.full(
boundaries_entities[boundary_id].shape, boundary_id, dtype=np.int32)
boundaries_entities_unsorted = np.hstack(list(boundaries_entities.values()))
boundaries_values_unsorted = np.hstack(list(boundaries_values.values()))
boundaries_entities_argsort = np.argsort(boundaries_entities_unsorted)
boundaries_entities_sorted = boundaries_entities_unsorted[boundaries_entities_argsort]
boundaries_values_sorted = boundaries_values_unsorted[boundaries_entities_argsort]
boundaries = dolfinx.mesh.meshtags(
mesh, mesh.topology.dim - 1,
boundaries_entities_sorted, boundaries_values_sorted)
boundaries.name = "boundaries"
boundaries_1234 = boundaries.indices[np.isin(boundaries.values, (1, 2, 3, 4))]
viskex.dolfinx.plot_mesh(mesh)
error: XDG_RUNTIME_DIR is invalid or not set in the environment. MESA: error: ZINK: failed to choose pdev glx: failed to create drisw screen
viskex.dolfinx.plot_mesh_tags(mesh, boundaries, "boundaries")
error: XDG_RUNTIME_DIR is invalid or not set in the environment. MESA: error: ZINK: failed to choose pdev glx: failed to create drisw screen
Y_velocity = dolfinx.fem.functionspace(mesh, ("Lagrange", 2, (mesh.geometry.dim, )))
Y_pressure = dolfinx.fem.functionspace(mesh, ("Lagrange", 1))
U = dolfinx.fem.functionspace(mesh, ("Lagrange", 2, (mesh.geometry.dim, )))
Q_velocity = Y_velocity.clone()
Q_pressure = Y_pressure.clone()
Create element Create element Create element
(dv, dp) = (ufl.TrialFunction(Y_velocity), ufl.TrialFunction(Y_pressure))
(w, q) = (ufl.TestFunction(Y_velocity), ufl.TestFunction(Y_pressure))
du = ufl.TrialFunction(U)
r = ufl.TestFunction(U)
(dz, db) = (ufl.TrialFunction(Q_velocity), ufl.TrialFunction(Q_pressure))
(s, d) = (ufl.TestFunction(Q_velocity), ufl.TestFunction(Q_pressure))
(v, p) = (dolfinx.fem.Function(Y_velocity), dolfinx.fem.Function(Y_pressure))
u = dolfinx.fem.Function(U)
(z, b) = (dolfinx.fem.Function(Q_velocity), dolfinx.fem.Function(Q_pressure))
alpha = 1.e-5
epsilon = 1.e-5
x, y = sympy.symbols("x[0], x[1]")
psi_d = 10 * (1 - sympy.cos(0.8 * np.pi * x)) * (1 - sympy.cos(0.8 * np.pi * y)) * (1 - x)**2 * (1 - y)**2
v_d_x = sympy.lambdify([x, y], psi_d.diff(y, 1))
v_d_y = sympy.lambdify([x, y], - psi_d.diff(x, 1))
v_d = dolfinx.fem.Function(Y_velocity)
v_d.interpolate(lambda x: np.stack((v_d_x(x[0], x[1]), v_d_y(x[0], x[1])), axis=0))
nu = 0.01
ff = dolfinx.fem.Constant(mesh, tuple(petsc4py.PETSc.ScalarType(0) for _ in range(2)))
bc0 = np.zeros((2, ), dtype=petsc4py.PETSc.ScalarType)
F = [nu * ufl.inner(ufl.grad(z), ufl.grad(w)) * ufl.dx
+ ufl.inner(z, ufl.grad(w) * v) * ufl.dx + ufl.inner(z, ufl.grad(v) * w) * ufl.dx
- ufl.inner(b, ufl.div(w)) * ufl.dx + ufl.inner(v - v_d, w) * ufl.dx,
- ufl.inner(ufl.div(z), q) * ufl.dx + epsilon * ufl.inner(b, q) * ufl.dx,
alpha * ufl.inner(u, r) * ufl.dx - ufl.inner(z, r) * ufl.dx,
nu * ufl.inner(ufl.grad(v), ufl.grad(s)) * ufl.dx + ufl.inner(ufl.grad(v) * v, s) * ufl.dx
- ufl.inner(p, ufl.div(s)) * ufl.dx - ufl.inner(u + ff, s) * ufl.dx,
- ufl.inner(ufl.div(v), d) * ufl.dx + epsilon * ufl.inner(p, d) * ufl.dx]
dF = [[ufl.derivative(F_i, u_j, du_j) for (u_j, du_j) in zip((v, p, u, z, b), (dv, dp, du, dz, db))] for F_i in F]
dF[3][3] = dolfinx.fem.Constant(mesh, petsc4py.PETSc.ScalarType(0)) * ufl.inner(dz, s) * ufl.dx
bdofs_Y_velocity_1234 = dolfinx.fem.locate_dofs_topological(
Y_velocity, mesh.topology.dim - 1, boundaries_1234)
bdofs_Q_velocity_1234 = dolfinx.fem.locate_dofs_topological(
Q_velocity, mesh.topology.dim - 1, boundaries_1234)
bc = [dolfinx.fem.dirichletbc(bc0, bdofs_Y_velocity_1234, Y_velocity),
dolfinx.fem.dirichletbc(bc0, bdofs_Q_velocity_1234, Q_velocity)]
J = 0.5 * ufl.inner(v - v_d, v - v_d) * ufl.dx + 0.5 * alpha * ufl.inner(u, u) * ufl.dx
J_cpp = dolfinx.fem.form(J)
class NonlinearBlockProblem:
"""Define a nonlinear problem, interfacing with SNES."""
def __init__( # type: ignore[no-any-unimported]
self, F: list[ufl.Form], dF: list[list[ufl.Form]],
solutions: tuple[dolfinx.fem.Function, ...], bcs: list[dolfinx.fem.DirichletBC]
) -> None:
self._F = dolfinx.fem.form(F)
self._dF = dolfinx.fem.form(dF)
self._obj_vec = dolfinx.fem.petsc.create_vector_block(self._F)
self._solutions = solutions
self._bcs = bcs
def create_snes_solution(self) -> petsc4py.PETSc.Vec: # type: ignore[no-any-unimported]
"""
Create a petsc4py.PETSc.Vec to be passed to petsc4py.PETSc.SNES.solve.
The returned vector will be initialized with the initial guesses provided in `self._solutions`,
properly stacked together in a single block vector.
"""
x = dolfinx.fem.petsc.create_vector_block(self._F)
with multiphenicsx.fem.petsc.BlockVecSubVectorWrapper(
x, [c.function_space.dofmap for c in self._solutions]) as x_wrapper:
for x_wrapper_local, component in zip(x_wrapper, self._solutions):
with component.x.petsc_vec.localForm() as component_local:
x_wrapper_local[:] = component_local
return x
def update_solutions(self, x: petsc4py.PETSc.Vec) -> None: # type: ignore[no-any-unimported]
"""Update `self._solutions` with data in `x`."""
x.ghostUpdate(addv=petsc4py.PETSc.InsertMode.INSERT, mode=petsc4py.PETSc.ScatterMode.FORWARD)
with multiphenicsx.fem.petsc.BlockVecSubVectorWrapper(
x, [c.function_space.dofmap for c in self._solutions]) as x_wrapper:
for x_wrapper_local, component in zip(x_wrapper, self._solutions):
with component.x.petsc_vec.localForm() as component_local:
component_local[:] = x_wrapper_local
def obj( # type: ignore[no-any-unimported]
self, snes: petsc4py.PETSc.SNES, x: petsc4py.PETSc.Vec
) -> np.float64:
"""Compute the norm of the residual."""
self.F(snes, x, self._obj_vec)
return self._obj_vec.norm() # type: ignore[no-any-return]
def F( # type: ignore[no-any-unimported]
self, snes: petsc4py.PETSc.SNES, x: petsc4py.PETSc.Vec, F_vec: petsc4py.PETSc.Vec
) -> None:
"""Assemble the residual."""
self.update_solutions(x)
with F_vec.localForm() as F_vec_local:
F_vec_local.set(0.0)
dolfinx.fem.petsc.assemble_vector_block( # type: ignore[misc]
F_vec, self._F, self._dF, self._bcs, x0=x, alpha=-1.0)
def dF( # type: ignore[no-any-unimported]
self, snes: petsc4py.PETSc.SNES, x: petsc4py.PETSc.Vec, dF_mat: petsc4py.PETSc.Mat,
_: petsc4py.PETSc.Mat
) -> None:
"""Assemble the jacobian."""
dF_mat.zeroEntries()
dolfinx.fem.petsc.assemble_matrix_block( # type: ignore[misc]
dF_mat, self._dF, self._bcs, diagonal=1.0) # type: ignore[arg-type]
dF_mat.assemble()
# Create problem by extracting state forms from the optimality conditions
F_state = [ufl.replace(F[i], {s: w, d: q}) for i in (3, 4)]
dF_state = [[ufl.replace(dF[i][j], {s: w, d: q}) for j in (0, 1)] for i in (3, 4)]
bc_state = [bc[0]]
problem_state = NonlinearBlockProblem(F_state, dF_state, (v, p), bc_state)
F_vec_state = dolfinx.fem.petsc.create_vector_block(problem_state._F)
dF_mat_state = dolfinx.fem.petsc.create_matrix_block(problem_state._dF)
# Solve
snes = petsc4py.PETSc.SNES().create(mesh.comm)
snes.setTolerances(max_it=20)
snes.getKSP().setType("preonly")
snes.getKSP().getPC().setType("lu")
snes.getKSP().getPC().setFactorSolverType("mumps")
snes.setObjective(problem_state.obj)
snes.setFunction(problem_state.F, F_vec_state)
snes.setJacobian(problem_state.dF, J=dF_mat_state, P=None)
snes.setMonitor(lambda _, it, residual: print(it, residual))
vp = problem_state.create_snes_solution()
snes.solve(None, vp)
problem_state.update_solutions(vp) # TODO can this be safely removed?
vp.destroy()
snes.destroy()
0 0.0
<petsc4py.PETSc.SNES at 0x7f9bc698aca0>
J_uncontrolled = mesh.comm.allreduce(dolfinx.fem.assemble_scalar(J_cpp), op=mpi4py.MPI.SUM)
print("Uncontrolled J =", J_uncontrolled)
assert np.isclose(J_uncontrolled, 0.1784536)
Uncontrolled J = 0.17845361493420545
viskex.dolfinx.plot_vector_field(v, "uncontrolled state velocity", glyph_factor=1e-1)
error: XDG_RUNTIME_DIR is invalid or not set in the environment. MESA: error: ZINK: failed to choose pdev glx: failed to create drisw screen
viskex.dolfinx.plot_scalar_field(p, "uncontrolled state pressure")
error: XDG_RUNTIME_DIR is invalid or not set in the environment. MESA: error: ZINK: failed to choose pdev glx: failed to create drisw screen
# PYTEST_XFAIL_AND_SKIP_NEXT: ufl.derivative(adjoint of the trilinear term) introduces spurious conj(trial)
"""Expect this cell to fail.
assert not np.issubdtype(petsc4py.PETSc.ScalarType, np.complexfloating)
"""
'Expect this cell to fail.\n\nassert not np.issubdtype(petsc4py.PETSc.ScalarType, np.complexfloating)\n'
"""Skip cell due to a previously xfailed cell.
# Create problem associated to the optimality conditions
problem = NonlinearBlockProblem(F, dF, (v, p, u, z, b), bc)
F_vec = dolfinx.fem.petsc.create_vector_block(problem._F)
dF_mat = dolfinx.fem.petsc.create_matrix_block(problem._dF)
"""
'Skip cell due to a previously xfailed cell.\n\n# Create problem associated to the optimality conditions\nproblem = NonlinearBlockProblem(F, dF, (v, p, u, z, b), bc)\nF_vec = dolfinx.fem.petsc.create_vector_block(problem._F)\ndF_mat = dolfinx.fem.petsc.create_matrix_block(problem._dF)\n'
"""Skip cell due to a previously xfailed cell.
# Solve
snes = petsc4py.PETSc.SNES().create(mesh.comm)
snes.setTolerances(max_it=20)
snes.getKSP().setType("preonly")
snes.getKSP().getPC().setType("lu")
snes.getKSP().getPC().setFactorSolverType("mumps")
snes.setObjective(problem.obj)
snes.setFunction(problem.F, F_vec)
snes.setJacobian(problem.dF, J=dF_mat, P=None)
snes.setMonitor(lambda _, it, residual: print(it, residual))
vpuzb = problem.create_snes_solution()
snes.solve(None, vpuzb)
problem.update_solutions(vpuzb) # TODO can this be safely removed?
vpuzb.destroy()
snes.destroy()
"""
'Skip cell due to a previously xfailed cell.\n\n# Solve\nsnes = petsc4py.PETSc.SNES().create(mesh.comm)\nsnes.setTolerances(max_it=20)\nsnes.getKSP().setType("preonly")\nsnes.getKSP().getPC().setType("lu")\nsnes.getKSP().getPC().setFactorSolverType("mumps")\nsnes.setObjective(problem.obj)\nsnes.setFunction(problem.F, F_vec)\nsnes.setJacobian(problem.dF, J=dF_mat, P=None)\nsnes.setMonitor(lambda _, it, residual: print(it, residual))\nvpuzb = problem.create_snes_solution()\nsnes.solve(None, vpuzb)\nproblem.update_solutions(vpuzb) # TODO can this be safely removed?\nvpuzb.destroy()\nsnes.destroy()\n'
"""Skip cell due to a previously xfailed cell.
J_controlled = mesh.comm.allreduce(dolfinx.fem.assemble_scalar(J_cpp), op=mpi4py.MPI.SUM)
print("Optimal J =", J_controlled)
assert np.isclose(J_controlled, 9.9127635e-7)
"""
'Skip cell due to a previously xfailed cell.\n\nJ_controlled = mesh.comm.allreduce(dolfinx.fem.assemble_scalar(J_cpp), op=mpi4py.MPI.SUM)\nprint("Optimal J =", J_controlled)\nassert np.isclose(J_controlled, 9.9127635e-7)\n'
"""Skip cell due to a previously xfailed cell.
viskex.dolfinx.plot_vector_field(v, "state velocity", glyph_factor=1e-1)
"""
'Skip cell due to a previously xfailed cell.\n\nviskex.dolfinx.plot_vector_field(v, "state velocity", glyph_factor=1e-1)\n'
"""Skip cell due to a previously xfailed cell.
viskex.dolfinx.plot_scalar_field(p, "state pressure")
"""
'Skip cell due to a previously xfailed cell.\n\nviskex.dolfinx.plot_scalar_field(p, "state pressure")\n'
"""Skip cell due to a previously xfailed cell.
viskex.dolfinx.plot_vector_field(u, "control", glyph_factor=1e-1)
"""
'Skip cell due to a previously xfailed cell.\n\nviskex.dolfinx.plot_vector_field(u, "control", glyph_factor=1e-1)\n'
"""Skip cell due to a previously xfailed cell.
viskex.dolfinx.plot_vector_field(z, "adjoint velocity", glyph_factor=1e4)
"""
'Skip cell due to a previously xfailed cell.\n\nviskex.dolfinx.plot_vector_field(z, "adjoint velocity", glyph_factor=1e4)\n'
"""Skip cell due to a previously xfailed cell.
viskex.dolfinx.plot_scalar_field(b, "adjoint pressure")
"""
'Skip cell due to a previously xfailed cell.\n\nviskex.dolfinx.plot_scalar_field(b, "adjoint pressure")\n'